xref: /haiku/src/apps/cortex/NodeManager/NodeGroup.cpp (revision d3d8b26997fac34a84981e6d2b649521de2cc45a)
1 // NodeGroup.cpp
2 
3 #include "NodeGroup.h"
4 //#include "NodeGroup_transport_thread.h"
5 
6 #include "NodeManager.h"
7 #include "NodeRef.h"
8 
9 #include <MediaRoster.h>
10 #include <OS.h>
11 #include <TimeSource.h>
12 
13 #include <algorithm>
14 #include <functional>
15 
16 #include "array_delete.h"
17 #include "BasicThread.h"
18 #include "node_manager_impl.h"
19 #include "functional_tools.h"
20 
21 __USE_CORTEX_NAMESPACE
22 #define D_METHOD(x) //PRINT (x)
23 #define D_ROSTER(x) //PRINT (x)
24 #define D_LOCK(x) //PRINT (x)
25 
26 
27 
28 // -------------------------------------------------------- //
29 // *** ctor/dtor
30 // -------------------------------------------------------- //
31 
32 // free the group, including all nodes within it
33 // (this call will result in the eventual deletion of the object.)
34 // returns B_OK on success; B_NOT_ALLOWED if release() has
35 // already been called; other error codes if the Media Roster
36 // call fails.
37 // * THE MANAGER MUST BE LOCKED
38 
39 status_t NodeGroup::release() {
40 
41 	D_METHOD((
42 		"NodeGroup::release()\n"));
43 
44 	if(isReleased())
45 		return B_NOT_ALLOWED;
46 
47 	// clean up
48 	lock();
49 
50 	// halt all nodes
51 	_stop();
52 
53 	// remove & release all nodes
54 	// +++++ causes triply-nested lock: eww!
55 	while(m_nodes.size()) {
56 		NodeRef* last = m_nodes.back();
57 		removeNode(m_nodes.size()-1);
58 		last->release();
59 	}
60 
61 	unlock();
62 
63 // [e.moon 7nov99]
64 // removing the released group is now NodeManager's responsibility
65 //
66 	// remove from NodeManager
67 	if(!m_manager->lock()) {
68 		ASSERT(!"* m_manager->lock() failed.\n");
69 	}
70 	m_manager->_removeGroup(this);
71 	m_manager->unlock();
72 
73 	// hand off to IObservable
74 	return _inherited::release();
75 }
76 
77 // call release() rather than deleting NodeGroup objects
78 NodeGroup::~NodeGroup() {
79 
80 	Autolock _l(this);
81 	D_METHOD((
82 		"~NodeGroup()\n"));
83 
84 	ASSERT(!m_nodes.size());
85 
86 	if(m_timeSourceObj) {
87 		m_timeSourceObj->Release();
88 		m_timeSourceObj = 0;
89 	}
90 }
91 
92 
93 // -------------------------------------------------------- //
94 // *** accessors
95 // -------------------------------------------------------- //
96 
97 // [e.moon 13oct99] moved to header
98 //inline uint32 NodeGroup::id() const { return m_id; }
99 
100 // -------------------------------------------------------- //
101 // *** operations
102 // -------------------------------------------------------- //
103 
104 // name access
105 const char* NodeGroup::name() const {
106 	Autolock _l(this);
107 	return m_name.String();
108 }
109 
110 status_t NodeGroup::setName(const char* name) {
111 	Autolock _l(this);
112 	m_name = name;
113 	return B_OK;
114 }
115 
116 // content access
117 uint32 NodeGroup::countNodes() const {
118 	Autolock _l(this);
119 	return m_nodes.size();
120 }
121 
122 NodeRef* NodeGroup::nodeAt(
123 	uint32											index) const {
124 	Autolock _l(this);
125 	return (index < m_nodes.size()) ?
126 		m_nodes[index] :
127 		0;
128 }
129 
130 // add/remove nodes:
131 // - you may only add a node with no current group.
132 // - nodes added during playback will be started;
133 //   nodes removed during playback will be stopped (unless
134 //   the NO_START_STOP transport restriction flag is set
135 //   for a given node.)
136 
137 status_t NodeGroup::addNode(
138 	NodeRef*										node) {
139 
140 	D_METHOD((
141 		"NodeGroup::addNode()\n"));
142 
143 	// lock the manager first; if the node has no current group,
144 	// this locks the node.
145 	m_manager->lock();
146 
147 	Autolock _l(this);
148 
149 	// precondition: GROUP_LOCKED not set
150 	if(m_flags & GROUP_LOCKED)
151 		return B_NOT_ALLOWED;
152 
153 	// precondition: no current group
154 	if(node->m_group) {
155 		// [e.moon 28sep99] whoops, forgot one
156 		PRINT((
157 			"!!! node already in group '%s'\n", node->m_group->name()));
158 
159 		m_manager->unlock();
160 		return B_NOT_ALLOWED;
161 	}
162 
163 	// add it
164 	m_nodes.push_back(node);
165 	node->_setGroup(this);
166 
167 	// release the manager
168 	m_manager->unlock();
169 
170 	// first node? the transport is now ready to start
171 	if(m_nodes.size() == 1) {
172 		_changeState(TRANSPORT_INVALID, TRANSPORT_STOPPED);
173 	}
174 //
175 //	if(m_syncNode == media_node::null) {
176 //		// assign as sync node
177 //		setSyncNode(node->node());
178 //	}
179 //
180 	// initialize the new node
181 	status_t err = node->_initTransportState();
182 	if(err < B_OK)
183 		return err;
184 
185 	// set time source
186 	node->_setTimeSource(m_timeSource.node);
187 
188 	// set run mode
189 	node->_setRunMode(m_runMode);
190 
191 	// add to cycle set if need be
192 	// +++++ should I call _cycleAddRef() instead?
193 	if(node->m_cycle)
194 		_refCycleChanged(node);
195 
196 	if(m_transportState == TRANSPORT_RUNNING) {
197 		// +++++ start if necessary!
198 	}
199 	// +++++ not started if TRANSPORT_ROLLING: is that proper? [e.moon 11oct99]
200 
201 	// send notification
202 	if(!LockLooper()) {
203 		ASSERT(!"LockLooper() failed.");
204 	}
205 	BMessage m(M_NODE_ADDED);
206 	m.AddInt32("groupID", id());
207 	m.AddInt32("nodeID", node->id());
208 	notify(&m);
209 	UnlockLooper();
210 
211 	// success
212 	return B_OK;
213 }
214 
215 
216 status_t NodeGroup::removeNode(
217 	NodeRef*										node) {
218 
219 	D_METHOD((
220 		"NodeGroup::removeNode()\n"));
221 
222 	// lock the manager first; once the node is ungrouped,
223 	// the manager lock applies to it
224 	m_manager->lock();
225 
226 	Autolock _l(this);
227 
228 	// precondition: this must be the node's group
229 	if(node->m_group != this) {
230 		// [e.moon 28sep99] whoops, forgot one
231 		PRINT((
232 			"!!! node not in group '%s'\n", node->m_group->name()));
233 
234 		m_manager->unlock();
235 		return B_NOT_ALLOWED;
236 	}
237 
238 	// remove from the cycle set
239 	if(node->m_cycle)
240 		_cycleRemoveRef(node);
241 
242 	// remove it
243 	ASSERT(m_nodes.size());
244 	remove(
245 		m_nodes.begin(),
246 		m_nodes.end(),
247 		node);
248 
249 	// should have removed one and only one entry
250 	m_nodes.resize(m_nodes.size()-1);
251 
252 //	// 6aug99: the timesource is now the sync node...
253 //	// is this the sync node? reassign if so
254 //
255 //	if(node->node() == m_syncNode) {
256 //
257 //		// look for another sync-capable node
258 //		bool found = false;
259 //		for(int n = 0; !found && n < m_nodes.size(); ++n)
260 //			if(setSyncNode(m_nodes[n]->node()) == B_OK)
261 //				found = true;
262 //
263 //		// no luck? admit defeat:
264 //		if(!found) {
265 //			PRINT((
266 //				"* NodeGroup::removeNode(): no sync-capable nodes left!\n"));
267 //
268 //			// +++++ stop & set to invalid state?
269 //
270 //			setSyncNode(media_node::null);
271 //		}
272 //	}
273 
274 	// stop the node if necessary
275 	status_t err = node->_stop();
276 	if(err < B_OK) {
277 		PRINT((
278 			"*** NodeGroup::removeNode('%s'): error from node->_stop():\n"
279 			"    %s\n",
280 			node->name(),
281 			strerror(err)));
282 	}
283 
284 	// clear the node's group pointer
285 	node->_setGroup(0);
286 
287 	// release the manager lock; the node is now ungrouped and
288 	// unlocked
289 	m_manager->unlock();
290 
291 	// was that the last node? stop/disable the transport if so
292 	if(!m_nodes.size()) {
293 
294 		// +++++ kill sync thread(s)
295 
296 		_changeState(TRANSPORT_INVALID);
297 	}
298 
299 	// send notification
300 	if(!LockLooper()) {
301 		ASSERT(!"LockLooper() failed.");
302 	}
303 	BMessage m(M_NODE_REMOVED);
304 	m.AddInt32("groupID", id());
305 	m.AddInt32("nodeID", node->id());
306 	notify(&m);
307 	UnlockLooper();
308 
309 	// success
310 	return B_OK;
311 }
312 
313 status_t NodeGroup::removeNode(
314 	uint32											index) {
315 
316 	D_METHOD((
317 		"NodeGroup::removeNode(by index)\n"));
318 
319 	// +++++ icky nested lock
320 	Autolock _l(this);
321 
322 	ASSERT(m_nodes.size() > index);
323 	return removeNode(m_nodes[index]);
324 }
325 
326 uint32 NodeGroup::groupFlags() const {
327 	Autolock _l(this);
328 	return m_flags;
329 }
330 
331 status_t NodeGroup::setGroupFlags(
332 	uint32											flags) {
333 	Autolock _l(this);
334 	m_flags = flags;
335 	return B_OK;
336 }
337 
338 
339 // returns true if one or more nodes in the group have cycling
340 // enabled, and the start- and end-positions are valid
341 bool NodeGroup::canCycle() const {
342 	Autolock _l(this);
343 
344 	return
345 		m_cycleNodes.size() > 0 &&
346 		m_endPosition - m_startPosition > s_minCyclePeriod;
347 }
348 
349 // -------------------------------------------------------- //
350 // *** TRANSPORT POSITIONING (LOCK REQUIRED)
351 // -------------------------------------------------------- //
352 
353 // Fetch the current transport state
354 
355 NodeGroup::transport_state_t NodeGroup::transportState() const {
356 	Autolock _l(this);
357 	return m_transportState;
358 }
359 
360 // Set the starting media time:
361 //   This is the point at which playback will begin in any media
362 //   files/documents being played by the nodes in this group.
363 //   When cycle mode is enabled, this is the point to which each
364 //   node will be seek'd at the end of each cycle (loop).
365 //
366 //   The starting time can't be changed in the B_OFFLINE run mode
367 //   (this call will return an error.)
368 
369 status_t NodeGroup::setStartPosition(
370 	bigtime_t										start) {
371 	Autolock _l(this);
372 
373 	D_METHOD((
374 		"NodeGroup::setStartPosition(%Ld)\n", start));
375 
376 	if(
377 		m_transportState == TRANSPORT_RUNNING ||
378 		m_transportState == TRANSPORT_ROLLING ||
379 		m_transportState == TRANSPORT_STARTING) {
380 
381 		if(m_runMode == BMediaNode::B_OFFLINE)
382 			return B_NOT_ALLOWED;
383 
384 		ASSERT(m_timeSourceObj);
385 
386 		if(_cycleValid()) {
387 			if(m_timeSourceObj->Now() >= m_cycleDeadline) {
388 				// too late to change start position; defer
389 //				PRINT((" - deferred\n"));
390 				m_newStartPosition = start;
391 				m_newStart = true;
392 				return B_OK;
393 			}
394 
395 			// not at deadline yet; fall through to set start position
396 		}
397 	}
398 
399 	m_startPosition = start;
400 
401 	// +++++ notify [e.moon 11oct99]
402 
403 	return B_OK;
404 }
405 
406 // Fetch the starting position:
407 
408 // +++++ if a previously-set start position was deferred, it won't be
409 //       returned yet
410 
411 bigtime_t NodeGroup::startPosition() const {
412 	Autolock _l(this);
413 
414 	return m_startPosition;
415 }
416 
417 // Set the ending media time:
418 //   This is the point at which playback will end relative to
419 //   media documents begin played by the nodes in this group;
420 //   in cycle mode, this specifies the loop point.  If the
421 //   ending time is less than or equal to the starting time,
422 //   the transport will continue until stopped manually.
423 //   If the end position is changed while the transport is playing,
424 //   it must take effect retroactively (if it's before the current
425 //   position and looping is enabled, all nodes must 'warp' to
426 //   the proper post-loop position.)
427 //
428 //   The ending time can't be changed if run mode is B_OFFLINE and
429 //   the transport is running (this call will return an error.)
430 
431 status_t NodeGroup::setEndPosition(
432 	bigtime_t										end) {
433 	Autolock _l(this);
434 
435 	D_METHOD((
436 		"NodeGroup::setEndPosition(%Ld)\n", end));
437 
438 	if(
439 		m_transportState == TRANSPORT_RUNNING ||
440 		m_transportState == TRANSPORT_ROLLING ||
441 		m_transportState == TRANSPORT_STARTING) {
442 
443 		if(m_runMode == BMediaNode::B_OFFLINE)
444 			return B_NOT_ALLOWED;
445 
446 		ASSERT(m_timeSourceObj);
447 
448 		bigtime_t endDelta = end - m_endPosition;
449 
450 		if(_cycleValid()) {
451 			if(m_timeSourceObj->Now() >= m_cycleDeadline + endDelta) {
452 				// too late to change end position; defer
453 //				PRINT((" - deferred\n"));
454 				m_newEndPosition = end;
455 				m_newEnd = true;
456 				return B_OK;
457 			}
458 			else {
459 				// set new end position
460 				m_endPosition = end;
461 
462 				// inform thread
463 				ASSERT(m_cyclePort);
464 				write_port(
465 					m_cyclePort,
466 					_CYCLE_END_CHANGED,
467 					0,
468 					0);
469 //
470 //				// restart nodes' cycle threads with new end position
471 //				_cycleInit(m_cycleStart);
472 //				for(node_set::iterator it = m_cycleNodes.begin();
473 //					it != m_cycleNodes.end(); ++it) {
474 //					(*it)->_scheduleCycle(m_cycleBoundary);
475 //				}
476 //				return B_OK;
477 			}
478 		}
479 	}
480 
481 	m_endPosition = end;
482 
483 	// +++++ notify [e.moon 11oct99]
484 
485 	return B_OK;
486 }
487 
488 
489 // Fetch the end position:
490 //   Note that if the end position is less than or equal to the start
491 //   position, it's ignored.
492 
493 // +++++ if a previously-set end position was deferred, it won't be
494 //       returned yet
495 
496 bigtime_t NodeGroup::endPosition() const {
497 	Autolock _l(this);
498 	return m_endPosition;
499 }
500 
501 // -------------------------------------------------------- //
502 // *** TRANSPORT OPERATIONS (LOCK REQUIRED)
503 // -------------------------------------------------------- //
504 
505 // Preroll the group:
506 //   Seeks, then prerolls, each node in the group (honoring the
507 //   NO_SEEK and NO_PREROLL flags.)  This ensures that the group
508 //   can start as quickly as possible.
509 //
510 //   Does not return until all nodes in the group have been
511 //   prepared.
512 
513 status_t NodeGroup::preroll() {
514 	D_METHOD((
515 		"NodeGroup::preroll()\n"));
516 
517 	Autolock _l(this);
518 	return _preroll();
519 }
520 
521 // Start all nodes in the group:
522 //   Nodes with the NO_START_STOP flag aren't molested.
523 
524 status_t NodeGroup::start() {
525 	D_METHOD((
526 		"NodeGroup::start()\n"));
527 
528 	Autolock _l(this);
529 	return _start();
530 }
531 
532 // Stop all nodes in the group:
533 //   Nodes with the NO_START_STOP flag aren't molested.
534 
535 status_t NodeGroup::stop() {
536 	D_METHOD((
537 		"NodeGroup::stop()\n"));
538 
539 	Autolock _l(this);
540 	return _stop();
541 }
542 
543 // Roll all nodes in the group:
544 //   Queues a start and stop atomically (via BMediaRoster::RollNode()).
545 //   Returns B_NOT_ALLOWED if endPosition <= startPosition;
546 
547 status_t NodeGroup::roll() {
548 	D_METHOD((
549 		"NodeGroup::roll()\n"));
550 
551 	Autolock _l(this);
552 	return _roll();
553 }
554 
555 // -------------------------------------------------------- //
556 // *** TIME SOURCE & RUN-MODE OPERATIONS (LOCK REQUIRED)
557 // -------------------------------------------------------- //
558 
559 // time source control:
560 //   getTimeSource():
561 //   returns B_ERROR if no time source has been set; otherwise,
562 //   returns the node ID of the current time source for all
563 //   nodes in the group.
564 //
565 //   setTimeSource():
566 //   Calls SetTimeSourceFor() on every node in the group.
567 //   The group must be stopped; B_NOT_ALLOWED will be returned
568 //   if the state is TRANSPORT_RUNNING or TRANSPORT_ROLLING.
569 
570 status_t NodeGroup::getTimeSource(
571 	media_node*									outTimeSource) const {
572 	Autolock _l(this);
573 
574 	if(m_timeSource != media_node::null) {
575 		*outTimeSource = m_timeSource;
576 		return B_OK;
577 	}
578 	return B_ERROR;
579 }
580 
581 status_t NodeGroup::setTimeSource(
582 	const media_node&						timeSource) {
583 
584 	Autolock _l(this);
585 
586 	if(m_transportState == TRANSPORT_RUNNING || m_transportState == TRANSPORT_ROLLING)
587 		return B_NOT_ALLOWED;
588 
589 	if(m_timeSourceObj)
590 		m_timeSourceObj->Release();
591 
592 	m_timeSource = timeSource;
593 
594 	// cache a BTimeSource*
595 	m_timeSourceObj = m_manager->roster->MakeTimeSourceFor(timeSource);
596 	ASSERT(m_timeSourceObj);
597 
598 	// apply new time source to all nodes
599 	for_each(
600 		m_nodes.begin(),
601 		m_nodes.end(),
602 		bind2nd(
603 			mem_fun(&NodeRef::_setTimeSource),
604 			m_timeSource.node
605 		)
606 	);
607 
608 //	// try to set as sync node
609 //	err = setSyncNode(timeSource);
610 //	if(err < B_OK) {
611 //		PRINT((
612 //			"* NodeGroup::setTimeSource(): setSyncNode() failed: %s\n",
613 //			strerror(err)));
614 //	}
615 
616 	// notify
617 	if(!LockLooper()) {
618 		ASSERT(!"LockLooper() failed.");
619 	}
620 	BMessage m(M_TIME_SOURCE_CHANGED);
621 	m.AddInt32("groupID", id());
622 	m.AddInt32("timeSourceID", timeSource.node);
623 	notify(&m);
624 	UnlockLooper();
625 
626 	return B_OK;
627 }
628 
629 // run mode access:
630 //   Sets the default run mode for the group.  This will be
631 //   applied to every node with a wildcard (0) run mode.
632 //
633 //   Special case: if the run mode is B_OFFLINE, it will be
634 //   applied to all nodes in the group.
635 
636 BMediaNode::run_mode NodeGroup::runMode() const {
637 	Autolock _l(this);
638 	return m_runMode;
639 }
640 
641 status_t NodeGroup::setRunMode(BMediaNode::run_mode mode) {
642 	Autolock _l(this);
643 
644 	m_runMode = mode;
645 
646 	// apply to all nodes
647 	for_each(
648 		m_nodes.begin(),
649 		m_nodes.end(),
650 		bind2nd(
651 			mem_fun(&NodeRef::_setRunModeAuto),
652 			m_runMode
653 		)
654 //		bound_method(
655 //			*this,
656 //			&NodeGroup::setRunModeFor)
657 	);
658 
659 
660 	return B_OK;
661 }
662 
663 // -------------------------------------------------------- //
664 // *** BHandler
665 // -------------------------------------------------------- //
666 
667 void NodeGroup::MessageReceived(
668 	BMessage*										message) {
669 
670 //	PRINT((
671 //		"NodeGroup::MessageReceived():\n"));
672 //	message->PrintToStream();
673 	status_t err;
674 
675 	switch(message->what) {
676 		case M_SET_TIME_SOURCE:
677 			{
678 				media_node timeSource;
679 				void* data;
680 				ssize_t dataSize;
681 				err = message->FindData(
682 					"timeSourceNode",
683 					B_RAW_TYPE,
684 					(const void**)&data,
685 					&dataSize);
686 				if(err < B_OK) {
687 					PRINT((
688 						"* NodeGroup::MessageReceived(M_SET_TIME_SOURCE):\n"
689 						"  no timeSourceNode!\n"));
690 					break;
691 				}
692 				timeSource = *(media_node*)data;
693 
694 				setTimeSource(timeSource);
695 			}
696 			break;
697 
698 		case M_SET_RUN_MODE:
699 			{
700 				uint32 runMode;
701 				err = message->FindInt32("runMode", (int32*)&runMode);
702 				if(err < B_OK) {
703 					PRINT((
704 						"* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n"
705 						"  no runMode!\n"));
706 					break;
707 				}
708 
709 				if(runMode < BMediaNode::B_OFFLINE ||
710 					runMode > BMediaNode::B_RECORDING) {
711 					PRINT((
712 						"* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n"
713 						"  invalid run mode (%ld)\n", runMode));
714 					break;
715 				}
716 
717 				setRunMode((BMediaNode::run_mode)runMode);
718 			}
719 			break;
720 
721 		case M_SET_START_POSITION:
722 			{
723 				bigtime_t position;
724 				err = message->FindInt64("position", (int64*)&position);
725 				if(err < B_OK) {
726 					PRINT((
727 						"* NodeGroup::MessageReceived(M_SET_START_POSITION):\n"
728 						"  no position!\n"));
729 					break;
730 				}
731 				setStartPosition(position);
732 			}
733 			break;
734 
735 		case M_SET_END_POSITION:
736 			{
737 				bigtime_t position;
738 				err = message->FindInt64("position", (int64*)&position);
739 				if(err < B_OK) {
740 					PRINT((
741 						"* NodeGroup::MessageReceived(M_SET_END_POSITION):\n"
742 						"  no position!\n"));
743 					break;
744 				}
745 				setEndPosition(position);
746 			}
747 			break;
748 
749 		case M_PREROLL:
750 			preroll();
751 			break;
752 
753 		case M_START:
754 			start();
755 			break;
756 
757 		case M_STOP:
758 			stop();
759 			break;
760 
761 		case M_ROLL:
762 			roll();
763 			break;
764 
765 		default:
766 			_inherited::MessageReceived(message);
767 			break;
768 	}
769 }
770 
771 
772 // -------------------------------------------------------- //
773 // *** IPersistent
774 // -------------------------------------------------------- //
775 
776 // !
777 #if CORTEX_XML
778 // !
779 
780 // +++++
781 
782 // Default constructor
783 NodeGroup::NodeGroup() :
784 	m_manager(0) {} // +++++ finish initialization
785 
786 
787 // !
788 #endif /*CORTEX_XML*/
789 // !
790 
791 // -------------------------------------------------------- //
792 // *** IObservable:		[19aug99]
793 // -------------------------------------------------------- //
794 
795 void NodeGroup::observerAdded(
796 	const BMessenger&				observer) {
797 
798 	BMessage m(M_OBSERVER_ADDED);
799 	m.AddInt32("groupID", id());
800 	m.AddMessenger("target", BMessenger(this));
801 	observer.SendMessage(&m);
802 }
803 
804 void NodeGroup::observerRemoved(
805 	const BMessenger&				observer) {
806 
807 	BMessage m(M_OBSERVER_REMOVED);
808 	m.AddInt32("groupID", id());
809 	m.AddMessenger("target", BMessenger(this));
810 	observer.SendMessage(&m);
811 }
812 
813 void NodeGroup::notifyRelease() {
814 
815 	BMessage m(M_RELEASED);
816 	m.AddInt32("groupID", id());
817 	m.AddMessenger("target", BMessenger(this));
818 	notify(&m);
819 }
820 
821 void NodeGroup::releaseComplete() {
822 	// +++++
823 }
824 
825 // -------------------------------------------------------- //
826 // *** ILockable: pass lock requests to m_lock
827 // -------------------------------------------------------- //
828 
829 bool NodeGroup::lock(
830 	lock_t type,
831 	bigtime_t timeout) {
832 
833 	D_LOCK(("*** NodeGroup::lock(): %ld\n", find_thread(0)));
834 
835 	ASSERT(type == WRITE);
836 	status_t err = m_lock.LockWithTimeout(timeout);
837 
838 	D_LOCK(("*** NodeGroup::lock() ACQUIRED: %ld\n", find_thread(0)));
839 
840 	return err == B_OK;
841 }
842 
843 bool NodeGroup::unlock(
844 	lock_t type) {
845 
846 	D_LOCK(("*** NodeGroup::unlock(): %ld\n", find_thread(0)));
847 
848 	ASSERT(type == WRITE);
849 	m_lock.Unlock();
850 
851 	D_LOCK(("*** NodeGroup::unlock() RELEASED: %ld\n", find_thread(0)));
852 
853 	return true;
854 }
855 
856 bool NodeGroup::isLocked(
857 	lock_t type) const {
858 
859 	ASSERT(type == WRITE);
860 	return m_lock.IsLocked();
861 }
862 
863 // -------------------------------------------------------- //
864 // *** ctor (accessible to NodeManager)
865 // -------------------------------------------------------- //
866 
867 NodeGroup::NodeGroup(
868 	const char*									name,
869 	NodeManager*								manager,
870 	BMediaNode::run_mode				runMode) :
871 
872 	ObservableHandler(name),
873 	m_lock("NodeGroup::m_lock"),
874 	m_manager(manager),
875 	m_id(NextID()),
876 	m_name(name),
877 	m_flags(0),
878 	m_transportState(TRANSPORT_INVALID),
879 	m_runMode(runMode),
880 	m_timeSourceObj(0),
881 	m_released(false),
882 	m_cycleThread(0),
883 	m_cyclePort(0),
884 	m_startPosition(0LL),
885 	m_endPosition(0LL),
886 	m_newStart(false),
887 	m_newEnd(false) {
888 
889 	ASSERT(m_manager);
890 
891 	if(!m_manager->Lock()) {
892 		ASSERT(!"m_manager->Lock() failed");
893 	}
894 	m_manager->AddHandler(this);
895 	m_manager->Unlock();
896 
897 	// set default time source
898 	media_node ts;
899 	D_ROSTER(("# roster->GetTimeSource()\n"));
900 	status_t err = m_manager->roster->GetTimeSource(&ts);
901 	if(err < B_OK) {
902 		PRINT((
903 			"*** NodeGroup(): roster->GetTimeSource() failed:\n"
904 			"    %s\n", strerror(err)));
905 	}
906 	setTimeSource(ts);
907 }
908 
909 // -------------------------------------------------------- //
910 // *** internal operations
911 // -------------------------------------------------------- //
912 
913 uint32 NodeGroup::s_nextID = 1;
914 uint32 NodeGroup::NextID() {
915 	return atomic_add((int32*)&s_nextID, 1);
916 }
917 
918 // -------------------------------------------------------- //
919 // *** ref->group communication (LOCK REQUIRED)
920 // -------------------------------------------------------- //
921 
922 // when a NodeRef's cycle state (ie. looping or not looping)
923 // changes, it must pass that information on via this method
924 
925 void NodeGroup::_refCycleChanged(
926 	NodeRef*										ref) {
927 	assert_locked(this);
928 	D_METHOD((
929 		"NodeGroup::_refCycleChanged('%s')\n",
930 		ref->name()));
931 
932 	if(ref->m_cycle) {
933 		_cycleAddRef(ref);
934 	}	else {
935 		_cycleRemoveRef(ref);
936 	}
937 
938 	// +++++ if running & cycle valid, the node should be properly
939 	//       seek'd and start'd
940 }
941 
942 
943 // when a cycling node's latency changes, call this method.
944 
945 void NodeGroup::_refLatencyChanged(
946 	NodeRef*										ref) {
947 	assert_locked(this);
948 	D_METHOD((
949 		"NodeGroup::_refLatencyChanged('%s')\n",
950 		ref->name()));
951 
952 	if(!_cycleValid())
953 		return;
954 
955 	// remove & replace ref (positions it properly)
956 	_cycleRemoveRef(ref);
957 	_cycleAddRef(ref);
958 
959 	// slap my thread up
960 	ASSERT(m_cyclePort);
961 	write_port(
962 		m_cyclePort,
963 		_CYCLE_LATENCY_CHANGED,
964 		0,
965 		0);
966 
967 	// +++++ zat it?
968 }
969 
970 // when a NodeRef receives notification that it has been stopped,
971 // but is labeled as still running, it must call this method.
972 // [e.moon 11oct99: roll/B_OFFLINE support]
973 
974 void NodeGroup::_refStopped(
975 	NodeRef*										ref) {
976 	assert_locked(this);
977 	D_METHOD((
978 		"NodeGroup::_refStopped('%s')\n",
979 		ref->name()));
980 
981 	// roll/B_OFFLINE support [e.moon 11oct99]
982 	// (check to see if any other nodes in the group are still running;
983 	//  mark group stopped if not.)
984 	if(m_transportState == TRANSPORT_ROLLING) {
985 		bool nodesRunning = false;
986 		for(node_set::iterator it = m_nodes.begin();
987 			it != m_nodes.end(); ++it) {
988 			if((*it)->isRunning()) {
989 				nodesRunning = true;
990 				break;
991 			}
992 		}
993 		if(!nodesRunning)
994 			// the group has stopped; update transport state
995 			_changeState(TRANSPORT_STOPPED);
996 
997 	}
998 
999 }
1000 
1001 
1002 // -------------------------------------------------------- //
1003 // *** transport helpers (LOCK REQUIRED)
1004 // -------------------------------------------------------- //
1005 
1006 
1007 // Preroll all nodes in the group; this is the implementation
1008 // of preroll().
1009 // *** this method should not be called from the transport thread
1010 // (since preroll operations can block for a relatively long time.)
1011 
1012 status_t NodeGroup::_preroll() {
1013 	assert_locked(this);
1014 
1015 	D_METHOD((
1016 		"NodeGroup::_preroll()\n"));
1017 
1018 	if(
1019 		m_transportState == TRANSPORT_RUNNING ||
1020 		m_transportState == TRANSPORT_ROLLING)
1021 		// too late
1022 		return B_NOT_ALLOWED;
1023 
1024 	// * preroll all nodes to the start position
1025 
1026 	// +++++ currently, if an error is encountered it's ignored.
1027 	//       should the whole operation fail if one node couldn't
1028 	//       be prerolled?
1029 	//
1030 	//       My gut response is 'no', since the preroll step is
1031 	//       optional, but the caller should have some inkling that
1032 	//       one of its nodes didn't behave.
1033 
1034 // [e.moon 13oct99] making PPC compiler happy
1035 //	for_each(
1036 //		m_nodes.begin(),
1037 //		m_nodes.end(),
1038 //		bind2nd(
1039 //			mem_fun(&NodeRef::_preroll),
1040 //			m_startPosition
1041 //		)
1042 //	);
1043 	for(node_set::iterator it = m_nodes.begin();
1044 		it != m_nodes.end(); ++it) {
1045 		(*it)->_preroll(m_startPosition);
1046 	}
1047 
1048 //    replaces
1049 //		bind2nd(
1050 //			bound_method(*this, &NodeGroup::prerollNode),
1051 //			m_startPosition
1052 //		)
1053 
1054 	return B_OK;
1055 }
1056 
1057 
1058 //// functor: calculates latency of each node it's handed, caching
1059 //// the largest one found; includes initial latency if nodes report it.
1060 //
1061 //class NodeGroup::calcLatencyFn { public:
1062 //	bigtime_t& maxLatency;
1063 //
1064 //	calcLatencyFn(bigtime_t& _m) : maxLatency(_m) {}
1065 //
1066 //	void operator()(NodeRef* r) {
1067 //		ASSERT(r);
1068 //
1069 ////		PRINT((
1070 ////			"# calcLatencyFn(): '%s'\n",
1071 ////			r->name()));
1072 //
1073 //		if(!(r->node().kind & B_BUFFER_PRODUCER)) {
1074 //			// node can't incur latency
1075 ////			PRINT((
1076 ////				"-   not a producer\n"));
1077 //			return;
1078 //		}
1079 //
1080 //		bigtime_t latency;
1081 //		status_t err =
1082 //			BMediaRoster::Roster()->GetLatencyFor(
1083 //				r->node(),
1084 //				&latency);
1085 //		if(err < B_OK) {
1086 //			PRINT((
1087 //				"* calcLatencyFn: GetLatencyFor() failed: %s\n",
1088 //				strerror(err)));
1089 //			return;
1090 //		}
1091 ////		PRINT(("-   %Ld\n", latency));
1092 //
1093 //		bigtime_t add;
1094 //		err = BMediaRoster::Roster()->GetInitialLatencyFor(
1095 //			r->node(),
1096 //			&add);
1097 ////		PRINT(("-   %Ld\n", add));
1098 //		if(err < B_OK) {
1099 //			PRINT((
1100 //				"* calcLatencyFn: GetInitialLatencyFor() failed: %s\n",
1101 //				strerror(err)));
1102 //		}
1103 //		else
1104 //			latency += add;
1105 //
1106 //		if(latency > maxLatency)
1107 //			maxLatency = latency;
1108 //
1109 ////		PRINT((
1110 ////			"-   max latency: %Ld\n",
1111 ////			maxLatency));
1112 //	}
1113 //};
1114 
1115 // Start all nodes in the group; this is the implementation of
1116 // start().  Fails if the run mode is B_OFFLINE; use _roll() instead
1117 // in that case.
1118 //
1119 // (this may be called from the transport thread or from
1120 //  an API-implementation method.)
1121 
1122 status_t NodeGroup::_start() {
1123 	assert_locked(this);
1124 
1125 	D_METHOD((
1126 		"NodeGroup::_start()\n"));
1127 	status_t err;
1128 
1129 	if(m_transportState != TRANSPORT_STOPPED)
1130 		return B_NOT_ALLOWED;
1131 
1132 	if(m_runMode == BMediaNode::B_OFFLINE)
1133 		return B_NOT_ALLOWED;
1134 
1135 	ASSERT(m_nodes.size());
1136 
1137 	_changeState(TRANSPORT_STARTING);
1138 
1139 	// * Find the highest latency in the group
1140 
1141 	bigtime_t offset = 0LL;
1142 	calcLatencyFn _f(offset);
1143 	for_each(
1144 		m_nodes.begin(),
1145 		m_nodes.end(),
1146 		_f);
1147 
1148 	offset += s_rosterLatency;
1149 	PRINT((
1150 		"- offset: %Ld\n", offset));
1151 
1152 	// * Seek all nodes (in case one or more failed to preroll)
1153 
1154 	for(node_set::iterator it = m_nodes.begin();
1155 		it != m_nodes.end(); ++it) {
1156 		err = (*it)->_seekStopped(m_startPosition);
1157 		if(err < B_OK) {
1158 			PRINT((
1159 				"! NodeGroup('%s')::_start():\n"
1160 				"  ref('%s')->_seekStopped(%Ld) failed:\n"
1161 				"  %s\n",
1162 				name(), (*it)->name(), m_startPosition,
1163 				strerror(err)));
1164 
1165 			// +++++ continue?
1166 		}
1167 	}
1168 
1169 	// * Start all nodes, allowing for the max latency found
1170 
1171 	ASSERT(m_timeSourceObj);
1172 	bigtime_t when = m_timeSourceObj->Now() + offset;
1173 
1174 	// 10aug99: initialize cycle (loop) settings
1175 	if(_cycleValid()) {
1176 		_initCycleThread();
1177 		_cycleInit(when);
1178 	}
1179 
1180 	// start the nodes
1181 	for(node_set::iterator it = m_nodes.begin();
1182 		it != m_nodes.end(); ++it) {
1183 		err = (*it)->_start(when);
1184 		if(err < B_OK) {
1185 			PRINT((
1186 				"! NodeGroup('%s')::_start():\n"
1187 				"  ref('%s')->_start(%Ld) failed:\n"
1188 				"  %s\n",
1189 				name(), (*it)->name(), when,
1190 				strerror(err)));
1191 
1192 			// +++++ continue?
1193 		}
1194 	}
1195 
1196 	// notify observers
1197 	_changeState(TRANSPORT_RUNNING);
1198 	return B_OK;
1199 }
1200 
1201 // Stop all nodes in the group; this is the implementation of
1202 // stop().
1203 //
1204 // (this may be called from the transport thread or from
1205 //  an API-implementation method.)
1206 
1207 status_t NodeGroup::_stop() {
1208 
1209 	D_METHOD((
1210 		"NodeGroup::_stop()\n"));
1211 
1212 	assert_locked(this);
1213 
1214 	if(
1215 		m_transportState != TRANSPORT_RUNNING &&
1216 		m_transportState != TRANSPORT_ROLLING)
1217 		return B_NOT_ALLOWED;
1218 
1219 	_changeState(TRANSPORT_STOPPING);
1220 
1221 	// * stop the cycle thread if need be
1222 	_destroyCycleThread();
1223 
1224 	// * stop all nodes
1225 	//   +++++ error reports would be nice
1226 
1227 	for_each(
1228 		m_nodes.begin(),
1229 		m_nodes.end(),
1230 		mem_fun(&NodeRef::_stop)
1231 	);
1232 
1233 	// update transport state
1234 	_changeState(TRANSPORT_STOPPED);
1235 
1236 	return B_OK;
1237 }
1238 
1239 // Roll all nodes in the group; this is the implementation of
1240 // roll().
1241 //
1242 // (this may be called from the transport thread or from
1243 //  an API-implementation method.)
1244 
1245 status_t NodeGroup::_roll() {
1246 
1247 	D_METHOD((
1248 		"NodeGroup::_roll()\n"));
1249 	assert_locked(this);
1250 	status_t err;
1251 
1252 	if(m_transportState != TRANSPORT_STOPPED)
1253 		return B_NOT_ALLOWED;
1254 
1255 	bigtime_t period = m_endPosition - m_startPosition;
1256 	if(period <= 0LL)
1257 		return B_NOT_ALLOWED;
1258 
1259 	_changeState(TRANSPORT_STARTING);
1260 
1261 	bigtime_t tpStart = 0LL;
1262 	bigtime_t tpStop = period;
1263 
1264 	if(m_runMode != BMediaNode::B_OFFLINE) {
1265 
1266 		// * Find the highest latency in the group
1267 		bigtime_t offset = 0LL;
1268 		calcLatencyFn _f(offset);
1269 		for_each(
1270 			m_nodes.begin(),
1271 			m_nodes.end(),
1272 			_f);
1273 
1274 		offset += s_rosterLatency;
1275 		PRINT((
1276 			"- offset: %Ld\n", offset));
1277 
1278 		ASSERT(m_timeSourceObj);
1279 		tpStart = m_timeSourceObj->Now() + offset;
1280 		tpStop += tpStart;
1281 	}
1282 
1283 	// * Roll all nodes; watch for errors
1284 	bool allFailed = true;
1285 	err = B_OK;
1286 	for(
1287 		node_set::iterator it = m_nodes.begin();
1288 		it != m_nodes.end(); ++it) {
1289 
1290 		status_t e = (*it)->_roll(
1291 			tpStart,
1292 			tpStop,
1293 			m_startPosition);
1294 		if(e < B_OK)
1295 			err = e;
1296 		else
1297 			allFailed = false;
1298 	}
1299 
1300 	if(!allFailed)
1301 		// notify observers
1302 		_changeState(TRANSPORT_ROLLING);
1303 
1304 	return err;
1305 }
1306 
1307 
1308 // State transition; notify listeners
1309 // +++++ [18aug99] DANGER: should notification happen in the middle
1310 //                         of such an operation?
1311 inline void NodeGroup::_changeState(
1312 	transport_state_t			to) {
1313 
1314 	assert_locked(this);
1315 
1316 	m_transportState = to;
1317 
1318 	if(!LockLooper()) {
1319 		ASSERT(!"LockLooper() failed.");
1320 	}
1321 	BMessage m(M_TRANSPORT_STATE_CHANGED);
1322 	m.AddInt32("groupID", id());
1323 	m.AddInt32("transportState", m_transportState);
1324 	notify(&m);
1325 	UnlockLooper();
1326 }
1327 
1328 // Enforce a state transition, and notify listeners
1329 inline void NodeGroup::_changeState(
1330 	transport_state_t			from,
1331 	transport_state_t			to) {
1332 
1333 	assert_locked(this);
1334 	ASSERT(m_transportState == from);
1335 
1336 	_changeState(to);
1337 }
1338 
1339 
1340 // -------------------------------------------------------- //
1341 // *** cycle thread & helpers (LOCK REQUIRED)
1342 // -------------------------------------------------------- //
1343 
1344 // *** cycle port definitions
1345 
1346 const int32					_portLength			= 32;
1347 const char* const		_portName				= "NodeGroup::m_cyclePort";
1348 const size_t				_portMsgMaxSize	= 256;
1349 
1350 
1351 // set up the cycle thread (including its kernel port)
1352 status_t NodeGroup::_initCycleThread() {
1353 	assert_locked(this);
1354 	status_t err;
1355 	D_METHOD((
1356 		"NodeGroup::_initCycleThread()\n"));
1357 
1358 	if(m_cycleThread) {
1359 		// thread is still alive
1360 		err = _destroyCycleThread();
1361 		if(err < B_OK)
1362 			return err;
1363 	}
1364 
1365 	// create
1366 	m_cycleThreadDone = false;
1367 	m_cycleThread = spawn_thread(
1368 		&_CycleThread,
1369 		"NodeGroup[cycleThread]",
1370 		B_NORMAL_PRIORITY,
1371 		(void*)this);
1372 	if(m_cycleThread < B_OK) {
1373 		PRINT((
1374 			"* NodeGroup::_initCycleThread(): spawn_thread() failed:\n"
1375 			"  %s\n",
1376 			strerror(m_cycleThread)));
1377 		return m_cycleThread;
1378 	}
1379 
1380 	// launch
1381 	return resume_thread(m_cycleThread);
1382 }
1383 
1384 // shut down the cycle thread/port
1385 status_t NodeGroup::_destroyCycleThread() {
1386 	assert_locked(this);
1387 	status_t err;
1388 	D_METHOD((
1389 		"NodeGroup::_destroyCycleThread()\n"));
1390 
1391 	if(!m_cycleThread)
1392 		return B_OK;
1393 
1394 	if(!m_cycleThreadDone) {
1395 		// kill the thread
1396 		ASSERT(m_cyclePort);
1397 		err = write_port_etc(
1398 			m_cyclePort,
1399 			_CYCLE_STOP,
1400 			0,
1401 			0,
1402 			B_TIMEOUT,
1403 			10000LL);
1404 
1405 		if(err < B_OK) {
1406 			// bad thread.  die, thread, die.
1407 			PRINT((
1408 				"* NodeGroup::_destroyCycleThread(): port write failed; killing.\n"));
1409 			delete_port(m_cyclePort);
1410 			m_cyclePort = 0;
1411 			kill_thread(m_cycleThread);
1412 			m_cycleThread = 0;
1413 			return B_OK;
1414 		}
1415 
1416 		// the thread got the message; wait for it to quit
1417 		unlock();
1418 		while(wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED) {
1419 			PRINT((
1420 				"! wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED\n"));
1421 		}
1422 		lock();
1423 	}
1424 
1425 	// it's up to the thread to close its port
1426 	ASSERT(!m_cyclePort);
1427 
1428 	m_cycleThread = 0;
1429 
1430 	return B_OK;
1431 }
1432 
1433 
1434 // 1) do the current positions specify a valid cycle region?
1435 // 2) are any nodes in the group cycle-enabled?
1436 
1437 bool NodeGroup::_cycleValid() {
1438 	assert_locked(this);
1439 	return
1440 		(m_transportState == TRANSPORT_RUNNING ||
1441 		 m_transportState == TRANSPORT_STARTING) &&
1442 		 canCycle();
1443 }
1444 
1445 // initialize the cycle members (call when starting)
1446 
1447 void NodeGroup::_cycleInit(
1448 	bigtime_t										startTime) {
1449 	assert_locked(this);
1450 	ASSERT(m_cycleNodes.size() > 0);
1451 	D_METHOD((
1452 		"NodeGroup::_cycleInit(%Ld)\n",
1453 		startTime));
1454 
1455 	// +++++ rescan latencies?
1456 
1457 	// figure new boundary & deadline from region length
1458 	bigtime_t cyclePeriod = m_endPosition - m_startPosition;
1459 
1460 	if(cyclePeriod <= 0) {
1461 		// cycle region is no longer valid
1462 		m_cycleBoundary = 0LL;
1463 		m_cycleDeadline = 0LL;
1464 
1465 //		no no no -- deadlocks when the thread calls this method
1466 //		// stop the thread
1467 //		_destroyCycleThread();
1468 		return;
1469 	}
1470 
1471 	m_cycleStart = startTime;
1472 	m_cycleBoundary = startTime + cyclePeriod;
1473 	m_cycleDeadline = m_cycleBoundary - (m_cycleMaxLatency + s_rosterLatency);
1474 }
1475 
1476 
1477 // add a ref to the cycle set (in proper order, based on latency)
1478 void NodeGroup::_cycleAddRef(
1479 	NodeRef*										ref) {
1480 	assert_locked(this);
1481 
1482 	// make sure it's not already there
1483 	ASSERT(find(
1484 		m_cycleNodes.begin(),
1485 		m_cycleNodes.end(),
1486 		ref) == m_cycleNodes.end());
1487 
1488 	// [re]calc latency if 0
1489 	if(!ref->m_latency)
1490 		ref->_updateLatency();
1491 
1492 	node_set::iterator it;
1493 	for(it = m_cycleNodes.begin();
1494 		it != m_cycleNodes.end(); ++it) {
1495 		if(ref->m_latency > (*it)->m_latency) {
1496 			m_cycleNodes.insert(it, ref);
1497 			break;
1498 		}
1499 	}
1500 
1501 	// not inserted? new ref belongs at the end
1502 	if(it == m_cycleNodes.end())
1503 		m_cycleNodes.insert(it, ref);
1504 }
1505 
1506 // remove a ref from the cycle set
1507 void NodeGroup::_cycleRemoveRef(
1508 	NodeRef*										ref) {
1509 	assert_locked(this);
1510 
1511 	node_set::iterator it = find(
1512 		m_cycleNodes.begin(),
1513 		m_cycleNodes.end(),
1514 		ref);
1515 	ASSERT(it != m_cycleNodes.end());
1516 	m_cycleNodes.erase(it);
1517 }
1518 
1519 bigtime_t NodeGroup::_cycleBoundary() const {
1520 	Autolock _l(this);
1521 	return m_cycleBoundary;
1522 }
1523 
1524 // cycle thread impl.
1525 /*static*/
1526 status_t NodeGroup::_CycleThread(void* user) {
1527 	((NodeGroup*)user)->_cycleThread();
1528 	return B_OK;
1529 }
1530 
1531 void NodeGroup::_cycleThread() {
1532 
1533 	status_t err;
1534 	int32 code;
1535 	int32 errorCount = 0;
1536 
1537 	// +++++ liability -- if the thread has to be killed, this buffer
1538 	//       won't be reclaimed
1539 	char* msgBuffer = new char[_portMsgMaxSize];
1540 	array_delete<char> _d(msgBuffer);
1541 
1542 	// create port
1543 	ASSERT(!m_cyclePort);
1544 	m_cyclePort = create_port(
1545 		_portLength,
1546 		_portName);
1547 	ASSERT(m_cyclePort >= B_OK);
1548 
1549 	// the message-handling loop
1550 	bool done = false;
1551 	while(!done) {
1552 
1553 		// *** wait until it's time to queue the next cycle, or until
1554 		// *** a message arrives
1555 
1556 		lock();				// **** BEGIN LOCKED SECTION ****
1557 		if(!_cycleValid()) {
1558 			unlock();
1559 			break;
1560 		}
1561 
1562 		ASSERT(m_cycleNodes.size() > 0);
1563 		ASSERT(m_timeSourceObj);
1564 
1565 		bigtime_t maxLatency = m_cycleNodes.front()->m_latency;
1566 		bigtime_t wakeUpAt = m_timeSourceObj->RealTimeFor(
1567 			m_cycleBoundary, maxLatency + s_rosterLatency);
1568 		bigtime_t timeout = wakeUpAt - m_timeSourceObj->RealTime();
1569 
1570 		if(timeout <= 0) {
1571 			// +++++ whoops, I'm late.
1572 			// +++++ adjust to compensate !!!
1573 			PRINT((
1574 				"*** NodeGroup::_cycleThread(): LATE\n"
1575 				"    by %Ld\n", -timeout));
1576 		}
1577 
1578 		// +++++ if timeout is very short, spin until the target time arrives
1579 
1580 		unlock();			// **** END LOCKED SECTION ****
1581 
1582 		// block until message arrives or it's time to wake up
1583 		err = read_port_etc(
1584 			m_cyclePort,
1585 			&code,
1586 			msgBuffer,
1587 			_portMsgMaxSize,
1588 			B_TIMEOUT,
1589 			timeout);
1590 
1591 		if(err == B_TIMED_OUT) {
1592 			// the time has come to seek my nodes
1593 			_handleCycleService();
1594 			continue;
1595 		}
1596 		else if(err < B_OK) {
1597 			// any other error is bad news
1598 			PRINT((
1599 				"* NodeGroup::_cycleThread(): read_port error:\n"
1600 				"  %s\n"
1601 				"  ABORTING\n\n", strerror(err)));
1602 			if(++errorCount > 10) {
1603 				PRINT((
1604 					"*** Too many errors; aborting.\n"));
1605 				break;
1606 			}
1607 			continue;
1608 		}
1609 
1610 		errorCount = 0;
1611 
1612 		// process the message
1613 		switch(code) {
1614 			case _CYCLE_STOP:
1615 				// bail
1616 				done = true;
1617 				break;
1618 
1619 			case _CYCLE_END_CHANGED:
1620 			case _CYCLE_LATENCY_CHANGED:
1621 				// fall through to next loop; for now, these messages
1622 				// serve only to slap me out of my stupor and reassess
1623 				// the timing situation...
1624 				break;
1625 
1626 			default:
1627 				PRINT((
1628 					"* NodeGroup::_cycleThread(): unknown message code '%ld'\n", code));
1629 				break;
1630 		}
1631 	} // while(!done)
1632 
1633 
1634 	// delete port
1635 	delete_port(m_cyclePort);
1636 	m_cyclePort = 0;
1637 
1638 	// done
1639 	m_cycleThreadDone = true;
1640 }
1641 
1642 // cycle service: seek all nodes & initiate next cycle
1643 void NodeGroup::_handleCycleService() {
1644 	Autolock _l(this);
1645 //	D_METHOD((
1646 //		"NodeGroup::_handleCycleService()\n"));
1647 	status_t err;
1648 
1649 	if(!_cycleValid()) {
1650 //		PRINT((
1651 //			"- _handleCycleService(): cycle not valid; quitting.\n"));
1652 		return;
1653 	}
1654 
1655 	// seek
1656 	for(node_set::iterator it = m_cycleNodes.begin();
1657 		it != m_cycleNodes.end(); ++it) {
1658 		err = (*it)->_seek(
1659 			m_startPosition,
1660 			m_cycleBoundary);
1661 		if(err < B_OK) {
1662 			PRINT((
1663 				"- _handleCycleService(): node('%s')::_seek() failed:\n"
1664 				"  %s\n",
1665 				(*it)->name(), strerror(err)));
1666 		}
1667 	}
1668 
1669 	// update cycle settings
1670 	if(m_newStart) {
1671 		m_newStart = false;
1672 		m_startPosition = m_newStartPosition;
1673 	}
1674 	if(m_newEnd) {
1675 		m_newEnd = false;
1676 		m_endPosition = m_newEndPosition;
1677 	}
1678 
1679 	// prepare next cycle
1680 	_cycleInit(m_cycleBoundary);
1681 }
1682 
1683 // END -- NodeGroup.cpp --
1684