xref: /haiku/src/apps/cortex/NodeManager/NodeGroup.cpp (revision 1acbe440b8dd798953bec31d18ee589aa3f71b73)
1 // NodeGroup.cpp
2 
3 #include "NodeGroup.h"
4 //#include "NodeGroup_transport_thread.h"
5 
6 #include "NodeManager.h"
7 #include "NodeRef.h"
8 
9 #include <MediaRoster.h>
10 #include <OS.h>
11 #include <TimeSource.h>
12 
13 #include <algorithm>
14 #include <functional>
15 
16 #include "array_delete.h"
17 #include "BasicThread.h"
18 #include "node_manager_impl.h"
19 #include "functional_tools.h"
20 
21 using namespace std;
22 
23 __USE_CORTEX_NAMESPACE
24 #define D_METHOD(x) //PRINT (x)
25 #define D_ROSTER(x) //PRINT (x)
26 #define D_LOCK(x) //PRINT (x)
27 
28 
29 
30 // -------------------------------------------------------- //
31 // *** ctor/dtor
32 // -------------------------------------------------------- //
33 
34 // free the group, including all nodes within it
35 // (this call will result in the eventual deletion of the object.)
36 // returns B_OK on success; B_NOT_ALLOWED if release() has
37 // already been called; other error codes if the Media Roster
38 // call fails.
39 // * THE MANAGER MUST BE LOCKED
40 
41 status_t NodeGroup::release() {
42 
43 	D_METHOD((
44 		"NodeGroup::release()\n"));
45 
46 	if(isReleased())
47 		return B_NOT_ALLOWED;
48 
49 	// clean up
50 	lock();
51 
52 	// halt all nodes
53 	_stop();
54 
55 	// remove & release all nodes
56 	// +++++ causes triply-nested lock: eww!
57 	while(m_nodes.size()) {
58 		NodeRef* last = m_nodes.back();
59 		removeNode(m_nodes.size()-1);
60 		last->release();
61 	}
62 
63 	unlock();
64 
65 // [e.moon 7nov99]
66 // removing the released group is now NodeManager's responsibility
67 //
68 	// remove from NodeManager
69 	if(!m_manager->lock()) {
70 		ASSERT(!"* m_manager->lock() failed.\n");
71 	}
72 	m_manager->_removeGroup(this);
73 	m_manager->unlock();
74 
75 	// hand off to IObservable
76 	return _inherited::release();
77 }
78 
79 // call release() rather than deleting NodeGroup objects
80 NodeGroup::~NodeGroup() {
81 
82 	Autolock _l(this);
83 	D_METHOD((
84 		"~NodeGroup()\n"));
85 
86 	ASSERT(!m_nodes.size());
87 
88 	if(m_timeSourceObj) {
89 		m_timeSourceObj->Release();
90 		m_timeSourceObj = 0;
91 	}
92 }
93 
94 
95 // -------------------------------------------------------- //
96 // *** accessors
97 // -------------------------------------------------------- //
98 
99 // [e.moon 13oct99] moved to header
100 //inline uint32 NodeGroup::id() const { return m_id; }
101 
102 // -------------------------------------------------------- //
103 // *** operations
104 // -------------------------------------------------------- //
105 
106 // name access
107 const char* NodeGroup::name() const {
108 	Autolock _l(this);
109 	return m_name.String();
110 }
111 
112 status_t NodeGroup::setName(const char* name) {
113 	Autolock _l(this);
114 	m_name = name;
115 	return B_OK;
116 }
117 
118 // content access
119 uint32 NodeGroup::countNodes() const {
120 	Autolock _l(this);
121 	return m_nodes.size();
122 }
123 
124 NodeRef* NodeGroup::nodeAt(
125 	uint32											index) const {
126 	Autolock _l(this);
127 	return (index < m_nodes.size()) ?
128 		m_nodes[index] :
129 		0;
130 }
131 
132 // add/remove nodes:
133 // - you may only add a node with no current group.
134 // - nodes added during playback will be started;
135 //   nodes removed during playback will be stopped (unless
136 //   the NO_START_STOP transport restriction flag is set
137 //   for a given node.)
138 
139 status_t NodeGroup::addNode(
140 	NodeRef*										node) {
141 
142 	D_METHOD((
143 		"NodeGroup::addNode()\n"));
144 
145 	// lock the manager first; if the node has no current group,
146 	// this locks the node.
147 	m_manager->lock();
148 
149 	Autolock _l(this);
150 
151 	// precondition: GROUP_LOCKED not set
152 	if(m_flags & GROUP_LOCKED)
153 		return B_NOT_ALLOWED;
154 
155 	// precondition: no current group
156 	if(node->m_group) {
157 		// [e.moon 28sep99] whoops, forgot one
158 		PRINT((
159 			"!!! node already in group '%s'\n", node->m_group->name()));
160 
161 		m_manager->unlock();
162 		return B_NOT_ALLOWED;
163 	}
164 
165 	// add it
166 	m_nodes.push_back(node);
167 	node->_setGroup(this);
168 
169 	// release the manager
170 	m_manager->unlock();
171 
172 	// first node? the transport is now ready to start
173 	if(m_nodes.size() == 1) {
174 		_changeState(TRANSPORT_INVALID, TRANSPORT_STOPPED);
175 	}
176 //
177 //	if(m_syncNode == media_node::null) {
178 //		// assign as sync node
179 //		setSyncNode(node->node());
180 //	}
181 //
182 	// initialize the new node
183 	status_t err = node->_initTransportState();
184 	if(err < B_OK)
185 		return err;
186 
187 	// set time source
188 	node->_setTimeSource(m_timeSource.node);
189 
190 	// set run mode
191 	node->_setRunMode(m_runMode);
192 
193 	// add to cycle set if need be
194 	// +++++ should I call _cycleAddRef() instead?
195 	if(node->m_cycle)
196 		_refCycleChanged(node);
197 
198 	if(m_transportState == TRANSPORT_RUNNING) {
199 		// +++++ start if necessary!
200 	}
201 	// +++++ not started if TRANSPORT_ROLLING: is that proper? [e.moon 11oct99]
202 
203 	// send notification
204 	if(!LockLooper()) {
205 		ASSERT(!"LockLooper() failed.");
206 	}
207 	BMessage m(M_NODE_ADDED);
208 	m.AddInt32("groupID", id());
209 	m.AddInt32("nodeID", node->id());
210 	notify(&m);
211 	UnlockLooper();
212 
213 	// success
214 	return B_OK;
215 }
216 
217 
218 status_t NodeGroup::removeNode(
219 	NodeRef*										node) {
220 
221 	D_METHOD((
222 		"NodeGroup::removeNode()\n"));
223 
224 	// lock the manager first; once the node is ungrouped,
225 	// the manager lock applies to it
226 	m_manager->lock();
227 
228 	Autolock _l(this);
229 
230 	// precondition: this must be the node's group
231 	if(node->m_group != this) {
232 		// [e.moon 28sep99] whoops, forgot one
233 		PRINT((
234 			"!!! node not in group '%s'\n", node->m_group->name()));
235 
236 		m_manager->unlock();
237 		return B_NOT_ALLOWED;
238 	}
239 
240 	// remove from the cycle set
241 	if(node->m_cycle)
242 		_cycleRemoveRef(node);
243 
244 	// remove it
245 	ASSERT(m_nodes.size());
246 	remove(
247 		m_nodes.begin(),
248 		m_nodes.end(),
249 		node);
250 
251 	// should have removed one and only one entry
252 	m_nodes.resize(m_nodes.size()-1);
253 
254 //	// 6aug99: the timesource is now the sync node...
255 //	// is this the sync node? reassign if so
256 //
257 //	if(node->node() == m_syncNode) {
258 //
259 //		// look for another sync-capable node
260 //		bool found = false;
261 //		for(int n = 0; !found && n < m_nodes.size(); ++n)
262 //			if(setSyncNode(m_nodes[n]->node()) == B_OK)
263 //				found = true;
264 //
265 //		// no luck? admit defeat:
266 //		if(!found) {
267 //			PRINT((
268 //				"* NodeGroup::removeNode(): no sync-capable nodes left!\n"));
269 //
270 //			// +++++ stop & set to invalid state?
271 //
272 //			setSyncNode(media_node::null);
273 //		}
274 //	}
275 
276 	// stop the node if necessary
277 	status_t err = node->_stop();
278 	if(err < B_OK) {
279 		PRINT((
280 			"*** NodeGroup::removeNode('%s'): error from node->_stop():\n"
281 			"    %s\n",
282 			node->name(),
283 			strerror(err)));
284 	}
285 
286 	// clear the node's group pointer
287 	node->_setGroup(0);
288 
289 	// release the manager lock; the node is now ungrouped and
290 	// unlocked
291 	m_manager->unlock();
292 
293 	// was that the last node? stop/disable the transport if so
294 	if(!m_nodes.size()) {
295 
296 		// +++++ kill sync thread(s)
297 
298 		_changeState(TRANSPORT_INVALID);
299 	}
300 
301 	// send notification
302 	if(!LockLooper()) {
303 		ASSERT(!"LockLooper() failed.");
304 	}
305 	BMessage m(M_NODE_REMOVED);
306 	m.AddInt32("groupID", id());
307 	m.AddInt32("nodeID", node->id());
308 	notify(&m);
309 	UnlockLooper();
310 
311 	// success
312 	return B_OK;
313 }
314 
315 status_t NodeGroup::removeNode(
316 	uint32											index) {
317 
318 	D_METHOD((
319 		"NodeGroup::removeNode(by index)\n"));
320 
321 	// +++++ icky nested lock
322 	Autolock _l(this);
323 
324 	ASSERT(m_nodes.size() > index);
325 	return removeNode(m_nodes[index]);
326 }
327 
328 uint32 NodeGroup::groupFlags() const {
329 	Autolock _l(this);
330 	return m_flags;
331 }
332 
333 status_t NodeGroup::setGroupFlags(
334 	uint32											flags) {
335 	Autolock _l(this);
336 	m_flags = flags;
337 	return B_OK;
338 }
339 
340 
341 // returns true if one or more nodes in the group have cycling
342 // enabled, and the start- and end-positions are valid
343 bool NodeGroup::canCycle() const {
344 	Autolock _l(this);
345 
346 	return
347 		m_cycleNodes.size() > 0 &&
348 		m_endPosition - m_startPosition > s_minCyclePeriod;
349 }
350 
351 // -------------------------------------------------------- //
352 // *** TRANSPORT POSITIONING (LOCK REQUIRED)
353 // -------------------------------------------------------- //
354 
355 // Fetch the current transport state
356 
357 NodeGroup::transport_state_t NodeGroup::transportState() const {
358 	Autolock _l(this);
359 	return m_transportState;
360 }
361 
362 // Set the starting media time:
363 //   This is the point at which playback will begin in any media
364 //   files/documents being played by the nodes in this group.
365 //   When cycle mode is enabled, this is the point to which each
366 //   node will be seek'd at the end of each cycle (loop).
367 //
368 //   The starting time can't be changed in the B_OFFLINE run mode
369 //   (this call will return an error.)
370 
371 status_t NodeGroup::setStartPosition(
372 	bigtime_t										start) {
373 	Autolock _l(this);
374 
375 	D_METHOD((
376 		"NodeGroup::setStartPosition(%Ld)\n", start));
377 
378 	if(
379 		m_transportState == TRANSPORT_RUNNING ||
380 		m_transportState == TRANSPORT_ROLLING ||
381 		m_transportState == TRANSPORT_STARTING) {
382 
383 		if(m_runMode == BMediaNode::B_OFFLINE)
384 			return B_NOT_ALLOWED;
385 
386 		ASSERT(m_timeSourceObj);
387 
388 		if(_cycleValid()) {
389 			if(m_timeSourceObj->Now() >= m_cycleDeadline) {
390 				// too late to change start position; defer
391 //				PRINT((" - deferred\n"));
392 				m_newStartPosition = start;
393 				m_newStart = true;
394 				return B_OK;
395 			}
396 
397 			// not at deadline yet; fall through to set start position
398 		}
399 	}
400 
401 	m_startPosition = start;
402 
403 	// +++++ notify [e.moon 11oct99]
404 
405 	return B_OK;
406 }
407 
408 // Fetch the starting position:
409 
410 // +++++ if a previously-set start position was deferred, it won't be
411 //       returned yet
412 
413 bigtime_t NodeGroup::startPosition() const {
414 	Autolock _l(this);
415 
416 	return m_startPosition;
417 }
418 
419 // Set the ending media time:
420 //   This is the point at which playback will end relative to
421 //   media documents begin played by the nodes in this group;
422 //   in cycle mode, this specifies the loop point.  If the
423 //   ending time is less than or equal to the starting time,
424 //   the transport will continue until stopped manually.
425 //   If the end position is changed while the transport is playing,
426 //   it must take effect retroactively (if it's before the current
427 //   position and looping is enabled, all nodes must 'warp' to
428 //   the proper post-loop position.)
429 //
430 //   The ending time can't be changed if run mode is B_OFFLINE and
431 //   the transport is running (this call will return an error.)
432 
433 status_t NodeGroup::setEndPosition(
434 	bigtime_t										end) {
435 	Autolock _l(this);
436 
437 	D_METHOD((
438 		"NodeGroup::setEndPosition(%Ld)\n", end));
439 
440 	if(
441 		m_transportState == TRANSPORT_RUNNING ||
442 		m_transportState == TRANSPORT_ROLLING ||
443 		m_transportState == TRANSPORT_STARTING) {
444 
445 		if(m_runMode == BMediaNode::B_OFFLINE)
446 			return B_NOT_ALLOWED;
447 
448 		ASSERT(m_timeSourceObj);
449 
450 		bigtime_t endDelta = end - m_endPosition;
451 
452 		if(_cycleValid()) {
453 			if(m_timeSourceObj->Now() >= m_cycleDeadline + endDelta) {
454 				// too late to change end position; defer
455 //				PRINT((" - deferred\n"));
456 				m_newEndPosition = end;
457 				m_newEnd = true;
458 				return B_OK;
459 			}
460 			else {
461 				// set new end position
462 				m_endPosition = end;
463 
464 				// inform thread
465 				ASSERT(m_cyclePort);
466 				write_port(
467 					m_cyclePort,
468 					_CYCLE_END_CHANGED,
469 					0,
470 					0);
471 //
472 //				// restart nodes' cycle threads with new end position
473 //				_cycleInit(m_cycleStart);
474 //				for(node_set::iterator it = m_cycleNodes.begin();
475 //					it != m_cycleNodes.end(); ++it) {
476 //					(*it)->_scheduleCycle(m_cycleBoundary);
477 //				}
478 //				return B_OK;
479 			}
480 		}
481 	}
482 
483 	m_endPosition = end;
484 
485 	// +++++ notify [e.moon 11oct99]
486 
487 	return B_OK;
488 }
489 
490 
491 // Fetch the end position:
492 //   Note that if the end position is less than or equal to the start
493 //   position, it's ignored.
494 
495 // +++++ if a previously-set end position was deferred, it won't be
496 //       returned yet
497 
498 bigtime_t NodeGroup::endPosition() const {
499 	Autolock _l(this);
500 	return m_endPosition;
501 }
502 
503 // -------------------------------------------------------- //
504 // *** TRANSPORT OPERATIONS (LOCK REQUIRED)
505 // -------------------------------------------------------- //
506 
507 // Preroll the group:
508 //   Seeks, then prerolls, each node in the group (honoring the
509 //   NO_SEEK and NO_PREROLL flags.)  This ensures that the group
510 //   can start as quickly as possible.
511 //
512 //   Does not return until all nodes in the group have been
513 //   prepared.
514 
515 status_t NodeGroup::preroll() {
516 	D_METHOD((
517 		"NodeGroup::preroll()\n"));
518 
519 	Autolock _l(this);
520 	return _preroll();
521 }
522 
523 // Start all nodes in the group:
524 //   Nodes with the NO_START_STOP flag aren't molested.
525 
526 status_t NodeGroup::start() {
527 	D_METHOD((
528 		"NodeGroup::start()\n"));
529 
530 	Autolock _l(this);
531 	return _start();
532 }
533 
534 // Stop all nodes in the group:
535 //   Nodes with the NO_START_STOP flag aren't molested.
536 
537 status_t NodeGroup::stop() {
538 	D_METHOD((
539 		"NodeGroup::stop()\n"));
540 
541 	Autolock _l(this);
542 	return _stop();
543 }
544 
545 // Roll all nodes in the group:
546 //   Queues a start and stop atomically (via BMediaRoster::RollNode()).
547 //   Returns B_NOT_ALLOWED if endPosition <= startPosition;
548 
549 status_t NodeGroup::roll() {
550 	D_METHOD((
551 		"NodeGroup::roll()\n"));
552 
553 	Autolock _l(this);
554 	return _roll();
555 }
556 
557 // -------------------------------------------------------- //
558 // *** TIME SOURCE & RUN-MODE OPERATIONS (LOCK REQUIRED)
559 // -------------------------------------------------------- //
560 
561 // time source control:
562 //   getTimeSource():
563 //   returns B_ERROR if no time source has been set; otherwise,
564 //   returns the node ID of the current time source for all
565 //   nodes in the group.
566 //
567 //   setTimeSource():
568 //   Calls SetTimeSourceFor() on every node in the group.
569 //   The group must be stopped; B_NOT_ALLOWED will be returned
570 //   if the state is TRANSPORT_RUNNING or TRANSPORT_ROLLING.
571 
572 status_t NodeGroup::getTimeSource(
573 	media_node*									outTimeSource) const {
574 	Autolock _l(this);
575 
576 	if(m_timeSource != media_node::null) {
577 		*outTimeSource = m_timeSource;
578 		return B_OK;
579 	}
580 	return B_ERROR;
581 }
582 
583 status_t NodeGroup::setTimeSource(
584 	const media_node&						timeSource) {
585 
586 	Autolock _l(this);
587 
588 	if(m_transportState == TRANSPORT_RUNNING || m_transportState == TRANSPORT_ROLLING)
589 		return B_NOT_ALLOWED;
590 
591 	if(m_timeSourceObj)
592 		m_timeSourceObj->Release();
593 
594 	m_timeSource = timeSource;
595 
596 	// cache a BTimeSource*
597 	m_timeSourceObj = m_manager->roster->MakeTimeSourceFor(timeSource);
598 	ASSERT(m_timeSourceObj);
599 
600 	// apply new time source to all nodes
601 	for_each(
602 		m_nodes.begin(),
603 		m_nodes.end(),
604 		bind2nd(
605 			mem_fun(&NodeRef::_setTimeSource),
606 			m_timeSource.node
607 		)
608 	);
609 
610 //	// try to set as sync node
611 //	err = setSyncNode(timeSource);
612 //	if(err < B_OK) {
613 //		PRINT((
614 //			"* NodeGroup::setTimeSource(): setSyncNode() failed: %s\n",
615 //			strerror(err)));
616 //	}
617 
618 	// notify
619 	if(!LockLooper()) {
620 		ASSERT(!"LockLooper() failed.");
621 	}
622 	BMessage m(M_TIME_SOURCE_CHANGED);
623 	m.AddInt32("groupID", id());
624 	m.AddInt32("timeSourceID", timeSource.node);
625 	notify(&m);
626 	UnlockLooper();
627 
628 	return B_OK;
629 }
630 
631 // run mode access:
632 //   Sets the default run mode for the group.  This will be
633 //   applied to every node with a wildcard (0) run mode.
634 //
635 //   Special case: if the run mode is B_OFFLINE, it will be
636 //   applied to all nodes in the group.
637 
638 BMediaNode::run_mode NodeGroup::runMode() const {
639 	Autolock _l(this);
640 	return m_runMode;
641 }
642 
643 status_t NodeGroup::setRunMode(BMediaNode::run_mode mode) {
644 	Autolock _l(this);
645 
646 	m_runMode = mode;
647 
648 	// apply to all nodes
649 	for_each(
650 		m_nodes.begin(),
651 		m_nodes.end(),
652 		bind2nd(
653 			mem_fun(&NodeRef::_setRunModeAuto),
654 			m_runMode
655 		)
656 //		bound_method(
657 //			*this,
658 //			&NodeGroup::setRunModeFor)
659 	);
660 
661 
662 	return B_OK;
663 }
664 
665 // -------------------------------------------------------- //
666 // *** BHandler
667 // -------------------------------------------------------- //
668 
669 void NodeGroup::MessageReceived(
670 	BMessage*										message) {
671 
672 //	PRINT((
673 //		"NodeGroup::MessageReceived():\n"));
674 //	message->PrintToStream();
675 	status_t err;
676 
677 	switch(message->what) {
678 		case M_SET_TIME_SOURCE:
679 			{
680 				media_node timeSource;
681 				void* data;
682 				ssize_t dataSize;
683 				err = message->FindData(
684 					"timeSourceNode",
685 					B_RAW_TYPE,
686 					(const void**)&data,
687 					&dataSize);
688 				if(err < B_OK) {
689 					PRINT((
690 						"* NodeGroup::MessageReceived(M_SET_TIME_SOURCE):\n"
691 						"  no timeSourceNode!\n"));
692 					break;
693 				}
694 				timeSource = *(media_node*)data;
695 
696 				setTimeSource(timeSource);
697 			}
698 			break;
699 
700 		case M_SET_RUN_MODE:
701 			{
702 				uint32 runMode;
703 				err = message->FindInt32("runMode", (int32*)&runMode);
704 				if(err < B_OK) {
705 					PRINT((
706 						"* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n"
707 						"  no runMode!\n"));
708 					break;
709 				}
710 
711 				if(runMode < BMediaNode::B_OFFLINE ||
712 					runMode > BMediaNode::B_RECORDING) {
713 					PRINT((
714 						"* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n"
715 						"  invalid run mode (%ld)\n", runMode));
716 					break;
717 				}
718 
719 				setRunMode((BMediaNode::run_mode)runMode);
720 			}
721 			break;
722 
723 		case M_SET_START_POSITION:
724 			{
725 				bigtime_t position;
726 				err = message->FindInt64("position", (int64*)&position);
727 				if(err < B_OK) {
728 					PRINT((
729 						"* NodeGroup::MessageReceived(M_SET_START_POSITION):\n"
730 						"  no position!\n"));
731 					break;
732 				}
733 				setStartPosition(position);
734 			}
735 			break;
736 
737 		case M_SET_END_POSITION:
738 			{
739 				bigtime_t position;
740 				err = message->FindInt64("position", (int64*)&position);
741 				if(err < B_OK) {
742 					PRINT((
743 						"* NodeGroup::MessageReceived(M_SET_END_POSITION):\n"
744 						"  no position!\n"));
745 					break;
746 				}
747 				setEndPosition(position);
748 			}
749 			break;
750 
751 		case M_PREROLL:
752 			preroll();
753 			break;
754 
755 		case M_START:
756 			start();
757 			break;
758 
759 		case M_STOP:
760 			stop();
761 			break;
762 
763 		case M_ROLL:
764 			roll();
765 			break;
766 
767 		default:
768 			_inherited::MessageReceived(message);
769 			break;
770 	}
771 }
772 
773 
774 // -------------------------------------------------------- //
775 // *** IPersistent
776 // -------------------------------------------------------- //
777 
778 // !
779 #if CORTEX_XML
780 // !
781 
782 // +++++
783 
784 // Default constructor
785 NodeGroup::NodeGroup() :
786 	m_manager(0) {} // +++++ finish initialization
787 
788 
789 // !
790 #endif /*CORTEX_XML*/
791 // !
792 
793 // -------------------------------------------------------- //
794 // *** IObservable:		[19aug99]
795 // -------------------------------------------------------- //
796 
797 void NodeGroup::observerAdded(
798 	const BMessenger&				observer) {
799 
800 	BMessage m(M_OBSERVER_ADDED);
801 	m.AddInt32("groupID", id());
802 	m.AddMessenger("target", BMessenger(this));
803 	observer.SendMessage(&m);
804 }
805 
806 void NodeGroup::observerRemoved(
807 	const BMessenger&				observer) {
808 
809 	BMessage m(M_OBSERVER_REMOVED);
810 	m.AddInt32("groupID", id());
811 	m.AddMessenger("target", BMessenger(this));
812 	observer.SendMessage(&m);
813 }
814 
815 void NodeGroup::notifyRelease() {
816 
817 	BMessage m(M_RELEASED);
818 	m.AddInt32("groupID", id());
819 	m.AddMessenger("target", BMessenger(this));
820 	notify(&m);
821 }
822 
823 void NodeGroup::releaseComplete() {
824 	// +++++
825 }
826 
827 // -------------------------------------------------------- //
828 // *** ILockable: pass lock requests to m_lock
829 // -------------------------------------------------------- //
830 
831 bool NodeGroup::lock(
832 	lock_t type,
833 	bigtime_t timeout) {
834 
835 	D_LOCK(("*** NodeGroup::lock(): %ld\n", find_thread(0)));
836 
837 	ASSERT(type == WRITE);
838 	status_t err = m_lock.LockWithTimeout(timeout);
839 
840 	D_LOCK(("*** NodeGroup::lock() ACQUIRED: %ld\n", find_thread(0)));
841 
842 	return err == B_OK;
843 }
844 
845 bool NodeGroup::unlock(
846 	lock_t type) {
847 
848 	D_LOCK(("*** NodeGroup::unlock(): %ld\n", find_thread(0)));
849 
850 	ASSERT(type == WRITE);
851 	m_lock.Unlock();
852 
853 	D_LOCK(("*** NodeGroup::unlock() RELEASED: %ld\n", find_thread(0)));
854 
855 	return true;
856 }
857 
858 bool NodeGroup::isLocked(
859 	lock_t type) const {
860 
861 	ASSERT(type == WRITE);
862 	return m_lock.IsLocked();
863 }
864 
865 // -------------------------------------------------------- //
866 // *** ctor (accessible to NodeManager)
867 // -------------------------------------------------------- //
868 
869 NodeGroup::NodeGroup(
870 	const char*									name,
871 	NodeManager*								manager,
872 	BMediaNode::run_mode				runMode) :
873 
874 	ObservableHandler(name),
875 	m_lock("NodeGroup::m_lock"),
876 	m_manager(manager),
877 	m_id(NextID()),
878 	m_name(name),
879 	m_flags(0),
880 	m_transportState(TRANSPORT_INVALID),
881 	m_runMode(runMode),
882 	m_timeSourceObj(0),
883 	m_released(false),
884 	m_cycleThread(0),
885 	m_cyclePort(0),
886 	m_startPosition(0LL),
887 	m_endPosition(0LL),
888 	m_newStart(false),
889 	m_newEnd(false) {
890 
891 	ASSERT(m_manager);
892 
893 	if(!m_manager->Lock()) {
894 		ASSERT(!"m_manager->Lock() failed");
895 	}
896 	m_manager->AddHandler(this);
897 	m_manager->Unlock();
898 
899 	// set default time source
900 	media_node ts;
901 	D_ROSTER(("# roster->GetTimeSource()\n"));
902 	status_t err = m_manager->roster->GetTimeSource(&ts);
903 	if(err < B_OK) {
904 		PRINT((
905 			"*** NodeGroup(): roster->GetTimeSource() failed:\n"
906 			"    %s\n", strerror(err)));
907 	}
908 	setTimeSource(ts);
909 }
910 
911 // -------------------------------------------------------- //
912 // *** internal operations
913 // -------------------------------------------------------- //
914 
915 uint32 NodeGroup::s_nextID = 1;
916 uint32 NodeGroup::NextID() {
917 	return atomic_add((int32*)&s_nextID, 1);
918 }
919 
920 // -------------------------------------------------------- //
921 // *** ref->group communication (LOCK REQUIRED)
922 // -------------------------------------------------------- //
923 
924 // when a NodeRef's cycle state (ie. looping or not looping)
925 // changes, it must pass that information on via this method
926 
927 void NodeGroup::_refCycleChanged(
928 	NodeRef*										ref) {
929 	assert_locked(this);
930 	D_METHOD((
931 		"NodeGroup::_refCycleChanged('%s')\n",
932 		ref->name()));
933 
934 	if(ref->m_cycle) {
935 		_cycleAddRef(ref);
936 	}	else {
937 		_cycleRemoveRef(ref);
938 	}
939 
940 	// +++++ if running & cycle valid, the node should be properly
941 	//       seek'd and start'd
942 }
943 
944 
945 // when a cycling node's latency changes, call this method.
946 
947 void NodeGroup::_refLatencyChanged(
948 	NodeRef*										ref) {
949 	assert_locked(this);
950 	D_METHOD((
951 		"NodeGroup::_refLatencyChanged('%s')\n",
952 		ref->name()));
953 
954 	if(!_cycleValid())
955 		return;
956 
957 	// remove & replace ref (positions it properly)
958 	_cycleRemoveRef(ref);
959 	_cycleAddRef(ref);
960 
961 	// slap my thread up
962 	ASSERT(m_cyclePort);
963 	write_port(
964 		m_cyclePort,
965 		_CYCLE_LATENCY_CHANGED,
966 		0,
967 		0);
968 
969 	// +++++ zat it?
970 }
971 
972 // when a NodeRef receives notification that it has been stopped,
973 // but is labeled as still running, it must call this method.
974 // [e.moon 11oct99: roll/B_OFFLINE support]
975 
976 void NodeGroup::_refStopped(
977 	NodeRef*										ref) {
978 	assert_locked(this);
979 	D_METHOD((
980 		"NodeGroup::_refStopped('%s')\n",
981 		ref->name()));
982 
983 	// roll/B_OFFLINE support [e.moon 11oct99]
984 	// (check to see if any other nodes in the group are still running;
985 	//  mark group stopped if not.)
986 	if(m_transportState == TRANSPORT_ROLLING) {
987 		bool nodesRunning = false;
988 		for(node_set::iterator it = m_nodes.begin();
989 			it != m_nodes.end(); ++it) {
990 			if((*it)->isRunning()) {
991 				nodesRunning = true;
992 				break;
993 			}
994 		}
995 		if(!nodesRunning)
996 			// the group has stopped; update transport state
997 			_changeState(TRANSPORT_STOPPED);
998 
999 	}
1000 
1001 }
1002 
1003 
1004 // -------------------------------------------------------- //
1005 // *** transport helpers (LOCK REQUIRED)
1006 // -------------------------------------------------------- //
1007 
1008 
1009 // Preroll all nodes in the group; this is the implementation
1010 // of preroll().
1011 // *** this method should not be called from the transport thread
1012 // (since preroll operations can block for a relatively long time.)
1013 
1014 status_t NodeGroup::_preroll() {
1015 	assert_locked(this);
1016 
1017 	D_METHOD((
1018 		"NodeGroup::_preroll()\n"));
1019 
1020 	if(
1021 		m_transportState == TRANSPORT_RUNNING ||
1022 		m_transportState == TRANSPORT_ROLLING)
1023 		// too late
1024 		return B_NOT_ALLOWED;
1025 
1026 	// * preroll all nodes to the start position
1027 
1028 	// +++++ currently, if an error is encountered it's ignored.
1029 	//       should the whole operation fail if one node couldn't
1030 	//       be prerolled?
1031 	//
1032 	//       My gut response is 'no', since the preroll step is
1033 	//       optional, but the caller should have some inkling that
1034 	//       one of its nodes didn't behave.
1035 
1036 // [e.moon 13oct99] making PPC compiler happy
1037 //	for_each(
1038 //		m_nodes.begin(),
1039 //		m_nodes.end(),
1040 //		bind2nd(
1041 //			mem_fun(&NodeRef::_preroll),
1042 //			m_startPosition
1043 //		)
1044 //	);
1045 	for(node_set::iterator it = m_nodes.begin();
1046 		it != m_nodes.end(); ++it) {
1047 		(*it)->_preroll(m_startPosition);
1048 	}
1049 
1050 //    replaces
1051 //		bind2nd(
1052 //			bound_method(*this, &NodeGroup::prerollNode),
1053 //			m_startPosition
1054 //		)
1055 
1056 	return B_OK;
1057 }
1058 
1059 
1060 //// functor: calculates latency of each node it's handed, caching
1061 //// the largest one found; includes initial latency if nodes report it.
1062 //
1063 //class NodeGroup::calcLatencyFn { public:
1064 //	bigtime_t& maxLatency;
1065 //
1066 //	calcLatencyFn(bigtime_t& _m) : maxLatency(_m) {}
1067 //
1068 //	void operator()(NodeRef* r) {
1069 //		ASSERT(r);
1070 //
1071 ////		PRINT((
1072 ////			"# calcLatencyFn(): '%s'\n",
1073 ////			r->name()));
1074 //
1075 //		if(!(r->node().kind & B_BUFFER_PRODUCER)) {
1076 //			// node can't incur latency
1077 ////			PRINT((
1078 ////				"-   not a producer\n"));
1079 //			return;
1080 //		}
1081 //
1082 //		bigtime_t latency;
1083 //		status_t err =
1084 //			BMediaRoster::Roster()->GetLatencyFor(
1085 //				r->node(),
1086 //				&latency);
1087 //		if(err < B_OK) {
1088 //			PRINT((
1089 //				"* calcLatencyFn: GetLatencyFor() failed: %s\n",
1090 //				strerror(err)));
1091 //			return;
1092 //		}
1093 ////		PRINT(("-   %Ld\n", latency));
1094 //
1095 //		bigtime_t add;
1096 //		err = BMediaRoster::Roster()->GetInitialLatencyFor(
1097 //			r->node(),
1098 //			&add);
1099 ////		PRINT(("-   %Ld\n", add));
1100 //		if(err < B_OK) {
1101 //			PRINT((
1102 //				"* calcLatencyFn: GetInitialLatencyFor() failed: %s\n",
1103 //				strerror(err)));
1104 //		}
1105 //		else
1106 //			latency += add;
1107 //
1108 //		if(latency > maxLatency)
1109 //			maxLatency = latency;
1110 //
1111 ////		PRINT((
1112 ////			"-   max latency: %Ld\n",
1113 ////			maxLatency));
1114 //	}
1115 //};
1116 
1117 // Start all nodes in the group; this is the implementation of
1118 // start().  Fails if the run mode is B_OFFLINE; use _roll() instead
1119 // in that case.
1120 //
1121 // (this may be called from the transport thread or from
1122 //  an API-implementation method.)
1123 
1124 status_t NodeGroup::_start() {
1125 	assert_locked(this);
1126 
1127 	D_METHOD((
1128 		"NodeGroup::_start()\n"));
1129 	status_t err;
1130 
1131 	if(m_transportState != TRANSPORT_STOPPED)
1132 		return B_NOT_ALLOWED;
1133 
1134 	if(m_runMode == BMediaNode::B_OFFLINE)
1135 		return B_NOT_ALLOWED;
1136 
1137 	ASSERT(m_nodes.size());
1138 
1139 	_changeState(TRANSPORT_STARTING);
1140 
1141 	// * Find the highest latency in the group
1142 
1143 	bigtime_t offset = 0LL;
1144 	calcLatencyFn _f(offset);
1145 	for_each(
1146 		m_nodes.begin(),
1147 		m_nodes.end(),
1148 		_f);
1149 
1150 	offset += s_rosterLatency;
1151 	PRINT((
1152 		"- offset: %Ld\n", offset));
1153 
1154 	// * Seek all nodes (in case one or more failed to preroll)
1155 
1156 	for(node_set::iterator it = m_nodes.begin();
1157 		it != m_nodes.end(); ++it) {
1158 		err = (*it)->_seekStopped(m_startPosition);
1159 		if(err < B_OK) {
1160 			PRINT((
1161 				"! NodeGroup('%s')::_start():\n"
1162 				"  ref('%s')->_seekStopped(%Ld) failed:\n"
1163 				"  %s\n",
1164 				name(), (*it)->name(), m_startPosition,
1165 				strerror(err)));
1166 
1167 			// +++++ continue?
1168 		}
1169 	}
1170 
1171 	// * Start all nodes, allowing for the max latency found
1172 
1173 	ASSERT(m_timeSourceObj);
1174 	bigtime_t when = m_timeSourceObj->Now() + offset;
1175 
1176 	// 10aug99: initialize cycle (loop) settings
1177 	if(_cycleValid()) {
1178 		_initCycleThread();
1179 		_cycleInit(when);
1180 	}
1181 
1182 	// start the nodes
1183 	for(node_set::iterator it = m_nodes.begin();
1184 		it != m_nodes.end(); ++it) {
1185 		err = (*it)->_start(when);
1186 		if(err < B_OK) {
1187 			PRINT((
1188 				"! NodeGroup('%s')::_start():\n"
1189 				"  ref('%s')->_start(%Ld) failed:\n"
1190 				"  %s\n",
1191 				name(), (*it)->name(), when,
1192 				strerror(err)));
1193 
1194 			// +++++ continue?
1195 		}
1196 	}
1197 
1198 	// notify observers
1199 	_changeState(TRANSPORT_RUNNING);
1200 	return B_OK;
1201 }
1202 
1203 // Stop all nodes in the group; this is the implementation of
1204 // stop().
1205 //
1206 // (this may be called from the transport thread or from
1207 //  an API-implementation method.)
1208 
1209 status_t NodeGroup::_stop() {
1210 
1211 	D_METHOD((
1212 		"NodeGroup::_stop()\n"));
1213 
1214 	assert_locked(this);
1215 
1216 	if(
1217 		m_transportState != TRANSPORT_RUNNING &&
1218 		m_transportState != TRANSPORT_ROLLING)
1219 		return B_NOT_ALLOWED;
1220 
1221 	_changeState(TRANSPORT_STOPPING);
1222 
1223 	// * stop the cycle thread if need be
1224 	_destroyCycleThread();
1225 
1226 	// * stop all nodes
1227 	//   +++++ error reports would be nice
1228 
1229 	for_each(
1230 		m_nodes.begin(),
1231 		m_nodes.end(),
1232 		mem_fun(&NodeRef::_stop)
1233 	);
1234 
1235 	// update transport state
1236 	_changeState(TRANSPORT_STOPPED);
1237 
1238 	return B_OK;
1239 }
1240 
1241 // Roll all nodes in the group; this is the implementation of
1242 // roll().
1243 //
1244 // (this may be called from the transport thread or from
1245 //  an API-implementation method.)
1246 
1247 status_t NodeGroup::_roll() {
1248 
1249 	D_METHOD((
1250 		"NodeGroup::_roll()\n"));
1251 	assert_locked(this);
1252 	status_t err;
1253 
1254 	if(m_transportState != TRANSPORT_STOPPED)
1255 		return B_NOT_ALLOWED;
1256 
1257 	bigtime_t period = m_endPosition - m_startPosition;
1258 	if(period <= 0LL)
1259 		return B_NOT_ALLOWED;
1260 
1261 	_changeState(TRANSPORT_STARTING);
1262 
1263 	bigtime_t tpStart = 0LL;
1264 	bigtime_t tpStop = period;
1265 
1266 	if(m_runMode != BMediaNode::B_OFFLINE) {
1267 
1268 		// * Find the highest latency in the group
1269 		bigtime_t offset = 0LL;
1270 		calcLatencyFn _f(offset);
1271 		for_each(
1272 			m_nodes.begin(),
1273 			m_nodes.end(),
1274 			_f);
1275 
1276 		offset += s_rosterLatency;
1277 		PRINT((
1278 			"- offset: %Ld\n", offset));
1279 
1280 		ASSERT(m_timeSourceObj);
1281 		tpStart = m_timeSourceObj->Now() + offset;
1282 		tpStop += tpStart;
1283 	}
1284 
1285 	// * Roll all nodes; watch for errors
1286 	bool allFailed = true;
1287 	err = B_OK;
1288 	for(
1289 		node_set::iterator it = m_nodes.begin();
1290 		it != m_nodes.end(); ++it) {
1291 
1292 		status_t e = (*it)->_roll(
1293 			tpStart,
1294 			tpStop,
1295 			m_startPosition);
1296 		if(e < B_OK)
1297 			err = e;
1298 		else
1299 			allFailed = false;
1300 	}
1301 
1302 	if(!allFailed)
1303 		// notify observers
1304 		_changeState(TRANSPORT_ROLLING);
1305 
1306 	return err;
1307 }
1308 
1309 
1310 // State transition; notify listeners
1311 // +++++ [18aug99] DANGER: should notification happen in the middle
1312 //                         of such an operation?
1313 inline void NodeGroup::_changeState(
1314 	transport_state_t			to) {
1315 
1316 	assert_locked(this);
1317 
1318 	m_transportState = to;
1319 
1320 	if(!LockLooper()) {
1321 		ASSERT(!"LockLooper() failed.");
1322 	}
1323 	BMessage m(M_TRANSPORT_STATE_CHANGED);
1324 	m.AddInt32("groupID", id());
1325 	m.AddInt32("transportState", m_transportState);
1326 	notify(&m);
1327 	UnlockLooper();
1328 }
1329 
1330 // Enforce a state transition, and notify listeners
1331 inline void NodeGroup::_changeState(
1332 	transport_state_t			from,
1333 	transport_state_t			to) {
1334 
1335 	assert_locked(this);
1336 	ASSERT(m_transportState == from);
1337 
1338 	_changeState(to);
1339 }
1340 
1341 
1342 // -------------------------------------------------------- //
1343 // *** cycle thread & helpers (LOCK REQUIRED)
1344 // -------------------------------------------------------- //
1345 
1346 // *** cycle port definitions
1347 
1348 const int32					_portLength			= 32;
1349 const char* const		_portName				= "NodeGroup::m_cyclePort";
1350 const size_t				_portMsgMaxSize	= 256;
1351 
1352 
1353 // set up the cycle thread (including its kernel port)
1354 status_t NodeGroup::_initCycleThread() {
1355 	assert_locked(this);
1356 	status_t err;
1357 	D_METHOD((
1358 		"NodeGroup::_initCycleThread()\n"));
1359 
1360 	if(m_cycleThread) {
1361 		// thread is still alive
1362 		err = _destroyCycleThread();
1363 		if(err < B_OK)
1364 			return err;
1365 	}
1366 
1367 	// create
1368 	m_cycleThreadDone = false;
1369 	m_cycleThread = spawn_thread(
1370 		&_CycleThread,
1371 		"NodeGroup[cycleThread]",
1372 		B_NORMAL_PRIORITY,
1373 		(void*)this);
1374 	if(m_cycleThread < B_OK) {
1375 		PRINT((
1376 			"* NodeGroup::_initCycleThread(): spawn_thread() failed:\n"
1377 			"  %s\n",
1378 			strerror(m_cycleThread)));
1379 		return m_cycleThread;
1380 	}
1381 
1382 	// launch
1383 	return resume_thread(m_cycleThread);
1384 }
1385 
1386 // shut down the cycle thread/port
1387 status_t NodeGroup::_destroyCycleThread() {
1388 	assert_locked(this);
1389 	status_t err;
1390 	D_METHOD((
1391 		"NodeGroup::_destroyCycleThread()\n"));
1392 
1393 	if(!m_cycleThread)
1394 		return B_OK;
1395 
1396 	if(!m_cycleThreadDone) {
1397 		// kill the thread
1398 		ASSERT(m_cyclePort);
1399 		err = write_port_etc(
1400 			m_cyclePort,
1401 			_CYCLE_STOP,
1402 			0,
1403 			0,
1404 			B_TIMEOUT,
1405 			10000LL);
1406 
1407 		if(err < B_OK) {
1408 			// bad thread.  die, thread, die.
1409 			PRINT((
1410 				"* NodeGroup::_destroyCycleThread(): port write failed; killing.\n"));
1411 			delete_port(m_cyclePort);
1412 			m_cyclePort = 0;
1413 			kill_thread(m_cycleThread);
1414 			m_cycleThread = 0;
1415 			return B_OK;
1416 		}
1417 
1418 		// the thread got the message; wait for it to quit
1419 		unlock();
1420 		while(wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED) {
1421 			PRINT((
1422 				"! wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED\n"));
1423 		}
1424 		lock();
1425 	}
1426 
1427 	// it's up to the thread to close its port
1428 	ASSERT(!m_cyclePort);
1429 
1430 	m_cycleThread = 0;
1431 
1432 	return B_OK;
1433 }
1434 
1435 
1436 // 1) do the current positions specify a valid cycle region?
1437 // 2) are any nodes in the group cycle-enabled?
1438 
1439 bool NodeGroup::_cycleValid() {
1440 	assert_locked(this);
1441 	return
1442 		(m_transportState == TRANSPORT_RUNNING ||
1443 		 m_transportState == TRANSPORT_STARTING) &&
1444 		 canCycle();
1445 }
1446 
1447 // initialize the cycle members (call when starting)
1448 
1449 void NodeGroup::_cycleInit(
1450 	bigtime_t										startTime) {
1451 	assert_locked(this);
1452 	ASSERT(m_cycleNodes.size() > 0);
1453 	D_METHOD((
1454 		"NodeGroup::_cycleInit(%Ld)\n",
1455 		startTime));
1456 
1457 	// +++++ rescan latencies?
1458 
1459 	// figure new boundary & deadline from region length
1460 	bigtime_t cyclePeriod = m_endPosition - m_startPosition;
1461 
1462 	if(cyclePeriod <= 0) {
1463 		// cycle region is no longer valid
1464 		m_cycleBoundary = 0LL;
1465 		m_cycleDeadline = 0LL;
1466 
1467 //		no no no -- deadlocks when the thread calls this method
1468 //		// stop the thread
1469 //		_destroyCycleThread();
1470 		return;
1471 	}
1472 
1473 	m_cycleStart = startTime;
1474 	m_cycleBoundary = startTime + cyclePeriod;
1475 	m_cycleDeadline = m_cycleBoundary - (m_cycleMaxLatency + s_rosterLatency);
1476 }
1477 
1478 
1479 // add a ref to the cycle set (in proper order, based on latency)
1480 void NodeGroup::_cycleAddRef(
1481 	NodeRef*										ref) {
1482 	assert_locked(this);
1483 
1484 	// make sure it's not already there
1485 	ASSERT(find(
1486 		m_cycleNodes.begin(),
1487 		m_cycleNodes.end(),
1488 		ref) == m_cycleNodes.end());
1489 
1490 	// [re]calc latency if 0
1491 	if(!ref->m_latency)
1492 		ref->_updateLatency();
1493 
1494 	node_set::iterator it;
1495 	for(it = m_cycleNodes.begin();
1496 		it != m_cycleNodes.end(); ++it) {
1497 		if(ref->m_latency > (*it)->m_latency) {
1498 			m_cycleNodes.insert(it, ref);
1499 			break;
1500 		}
1501 	}
1502 
1503 	// not inserted? new ref belongs at the end
1504 	if(it == m_cycleNodes.end())
1505 		m_cycleNodes.insert(it, ref);
1506 }
1507 
1508 // remove a ref from the cycle set
1509 void NodeGroup::_cycleRemoveRef(
1510 	NodeRef*										ref) {
1511 	assert_locked(this);
1512 
1513 	node_set::iterator it = find(
1514 		m_cycleNodes.begin(),
1515 		m_cycleNodes.end(),
1516 		ref);
1517 	ASSERT(it != m_cycleNodes.end());
1518 	m_cycleNodes.erase(it);
1519 }
1520 
1521 bigtime_t NodeGroup::_cycleBoundary() const {
1522 	Autolock _l(this);
1523 	return m_cycleBoundary;
1524 }
1525 
1526 // cycle thread impl.
1527 /*static*/
1528 status_t NodeGroup::_CycleThread(void* user) {
1529 	((NodeGroup*)user)->_cycleThread();
1530 	return B_OK;
1531 }
1532 
1533 void NodeGroup::_cycleThread() {
1534 
1535 	status_t err;
1536 	int32 code;
1537 	int32 errorCount = 0;
1538 
1539 	// +++++ liability -- if the thread has to be killed, this buffer
1540 	//       won't be reclaimed
1541 	char* msgBuffer = new char[_portMsgMaxSize];
1542 	array_delete<char> _d(msgBuffer);
1543 
1544 	// create port
1545 	ASSERT(!m_cyclePort);
1546 	m_cyclePort = create_port(
1547 		_portLength,
1548 		_portName);
1549 	ASSERT(m_cyclePort >= B_OK);
1550 
1551 	// the message-handling loop
1552 	bool done = false;
1553 	while(!done) {
1554 
1555 		// *** wait until it's time to queue the next cycle, or until
1556 		// *** a message arrives
1557 
1558 		lock();				// **** BEGIN LOCKED SECTION ****
1559 		if(!_cycleValid()) {
1560 			unlock();
1561 			break;
1562 		}
1563 
1564 		ASSERT(m_cycleNodes.size() > 0);
1565 		ASSERT(m_timeSourceObj);
1566 
1567 		bigtime_t maxLatency = m_cycleNodes.front()->m_latency;
1568 		bigtime_t wakeUpAt = m_timeSourceObj->RealTimeFor(
1569 			m_cycleBoundary, maxLatency + s_rosterLatency);
1570 		bigtime_t timeout = wakeUpAt - m_timeSourceObj->RealTime();
1571 
1572 		if(timeout <= 0) {
1573 			// +++++ whoops, I'm late.
1574 			// +++++ adjust to compensate !!!
1575 			PRINT((
1576 				"*** NodeGroup::_cycleThread(): LATE\n"
1577 				"    by %Ld\n", -timeout));
1578 		}
1579 
1580 		// +++++ if timeout is very short, spin until the target time arrives
1581 
1582 		unlock();			// **** END LOCKED SECTION ****
1583 
1584 		// block until message arrives or it's time to wake up
1585 		err = read_port_etc(
1586 			m_cyclePort,
1587 			&code,
1588 			msgBuffer,
1589 			_portMsgMaxSize,
1590 			B_TIMEOUT,
1591 			timeout);
1592 
1593 		if(err == B_TIMED_OUT) {
1594 			// the time has come to seek my nodes
1595 			_handleCycleService();
1596 			continue;
1597 		}
1598 		else if(err < B_OK) {
1599 			// any other error is bad news
1600 			PRINT((
1601 				"* NodeGroup::_cycleThread(): read_port error:\n"
1602 				"  %s\n"
1603 				"  ABORTING\n\n", strerror(err)));
1604 			if(++errorCount > 10) {
1605 				PRINT((
1606 					"*** Too many errors; aborting.\n"));
1607 				break;
1608 			}
1609 			continue;
1610 		}
1611 
1612 		errorCount = 0;
1613 
1614 		// process the message
1615 		switch(code) {
1616 			case _CYCLE_STOP:
1617 				// bail
1618 				done = true;
1619 				break;
1620 
1621 			case _CYCLE_END_CHANGED:
1622 			case _CYCLE_LATENCY_CHANGED:
1623 				// fall through to next loop; for now, these messages
1624 				// serve only to slap me out of my stupor and reassess
1625 				// the timing situation...
1626 				break;
1627 
1628 			default:
1629 				PRINT((
1630 					"* NodeGroup::_cycleThread(): unknown message code '%ld'\n", code));
1631 				break;
1632 		}
1633 	} // while(!done)
1634 
1635 
1636 	// delete port
1637 	delete_port(m_cyclePort);
1638 	m_cyclePort = 0;
1639 
1640 	// done
1641 	m_cycleThreadDone = true;
1642 }
1643 
1644 // cycle service: seek all nodes & initiate next cycle
1645 void NodeGroup::_handleCycleService() {
1646 	Autolock _l(this);
1647 //	D_METHOD((
1648 //		"NodeGroup::_handleCycleService()\n"));
1649 	status_t err;
1650 
1651 	if(!_cycleValid()) {
1652 //		PRINT((
1653 //			"- _handleCycleService(): cycle not valid; quitting.\n"));
1654 		return;
1655 	}
1656 
1657 	// seek
1658 	for(node_set::iterator it = m_cycleNodes.begin();
1659 		it != m_cycleNodes.end(); ++it) {
1660 		err = (*it)->_seek(
1661 			m_startPosition,
1662 			m_cycleBoundary);
1663 		if(err < B_OK) {
1664 			PRINT((
1665 				"- _handleCycleService(): node('%s')::_seek() failed:\n"
1666 				"  %s\n",
1667 				(*it)->name(), strerror(err)));
1668 		}
1669 	}
1670 
1671 	// update cycle settings
1672 	if(m_newStart) {
1673 		m_newStart = false;
1674 		m_startPosition = m_newStartPosition;
1675 	}
1676 	if(m_newEnd) {
1677 		m_newEnd = false;
1678 		m_endPosition = m_newEndPosition;
1679 	}
1680 
1681 	// prepare next cycle
1682 	_cycleInit(m_cycleBoundary);
1683 }
1684 
1685 // END -- NodeGroup.cpp --
1686