xref: /haiku/src/apps/cortex/NodeManager/NodeGroup.cpp (revision e1c4049fed1047bdb957b0529e1921e97ef94770)
1 /*
2  * Copyright (c) 1999-2000, Eric Moon.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions, and the following disclaimer.
11  *
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions, and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * 3. The name of the author may not be used to endorse or promote products
17  *    derived from this software without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR
20  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21  * OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
23  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
27  * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 
32 // NodeGroup.cpp
33 
34 #include "NodeGroup.h"
35 //#include "NodeGroup_transport_thread.h"
36 
37 #include "NodeManager.h"
38 #include "NodeRef.h"
39 
40 #include <MediaRoster.h>
41 #include <OS.h>
42 #include <TimeSource.h>
43 
44 #include <algorithm>
45 #include <functional>
46 
47 #include "array_delete.h"
48 #include "BasicThread.h"
49 #include "node_manager_impl.h"
50 #include "functional_tools.h"
51 
52 using namespace std;
53 
54 __USE_CORTEX_NAMESPACE
55 #define D_METHOD(x) //PRINT (x)
56 #define D_ROSTER(x) //PRINT (x)
57 #define D_LOCK(x) //PRINT (x)
58 
59 
60 
61 // -------------------------------------------------------- //
62 // *** ctor/dtor
63 // -------------------------------------------------------- //
64 
65 // free the group, including all nodes within it
66 // (this call will result in the eventual deletion of the object.)
67 // returns B_OK on success; B_NOT_ALLOWED if release() has
68 // already been called; other error codes if the Media Roster
69 // call fails.
70 // * THE MANAGER MUST BE LOCKED
71 
72 status_t NodeGroup::release() {
73 
74 	D_METHOD((
75 		"NodeGroup::release()\n"));
76 
77 	if(isReleased())
78 		return B_NOT_ALLOWED;
79 
80 	// clean up
81 	lock();
82 
83 	// halt all nodes
84 	_stop();
85 
86 	// remove & release all nodes
87 	// +++++ causes triply-nested lock: eww!
88 	while(m_nodes.size()) {
89 		NodeRef* last = m_nodes.back();
90 		removeNode(m_nodes.size()-1);
91 		last->release();
92 	}
93 
94 	unlock();
95 
96 // [e.moon 7nov99]
97 // removing the released group is now NodeManager's responsibility
98 //
99 	// remove from NodeManager
100 	if(!m_manager->lock()) {
101 		ASSERT(!"* m_manager->lock() failed.\n");
102 	}
103 	m_manager->_removeGroup(this);
104 	m_manager->unlock();
105 
106 	// hand off to IObservable
107 	return _inherited::release();
108 }
109 
110 // call release() rather than deleting NodeGroup objects
111 NodeGroup::~NodeGroup() {
112 
113 	Autolock _l(this);
114 	D_METHOD((
115 		"~NodeGroup()\n"));
116 
117 	ASSERT(!m_nodes.size());
118 
119 	if(m_timeSourceObj) {
120 		m_timeSourceObj->Release();
121 		m_timeSourceObj = 0;
122 	}
123 }
124 
125 
126 // -------------------------------------------------------- //
127 // *** accessors
128 // -------------------------------------------------------- //
129 
130 // [e.moon 13oct99] moved to header
131 //inline uint32 NodeGroup::id() const { return m_id; }
132 
133 // -------------------------------------------------------- //
134 // *** operations
135 // -------------------------------------------------------- //
136 
137 // name access
138 const char* NodeGroup::name() const {
139 	Autolock _l(this);
140 	return m_name.String();
141 }
142 
143 status_t NodeGroup::setName(const char* name) {
144 	Autolock _l(this);
145 	m_name = name;
146 	return B_OK;
147 }
148 
149 // content access
150 uint32 NodeGroup::countNodes() const {
151 	Autolock _l(this);
152 	return m_nodes.size();
153 }
154 
155 NodeRef* NodeGroup::nodeAt(
156 	uint32											index) const {
157 	Autolock _l(this);
158 	return (index < m_nodes.size()) ?
159 		m_nodes[index] :
160 		0;
161 }
162 
163 // add/remove nodes:
164 // - you may only add a node with no current group.
165 // - nodes added during playback will be started;
166 //   nodes removed during playback will be stopped (unless
167 //   the NO_START_STOP transport restriction flag is set
168 //   for a given node.)
169 
170 status_t NodeGroup::addNode(
171 	NodeRef*										node) {
172 
173 	D_METHOD((
174 		"NodeGroup::addNode()\n"));
175 
176 	// lock the manager first; if the node has no current group,
177 	// this locks the node.
178 	m_manager->lock();
179 
180 	Autolock _l(this);
181 
182 	// precondition: GROUP_LOCKED not set
183 	if(m_flags & GROUP_LOCKED)
184 		return B_NOT_ALLOWED;
185 
186 	// precondition: no current group
187 	if(node->m_group) {
188 		// [e.moon 28sep99] whoops, forgot one
189 		PRINT((
190 			"!!! node already in group '%s'\n", node->m_group->name()));
191 
192 		m_manager->unlock();
193 		return B_NOT_ALLOWED;
194 	}
195 
196 	// add it
197 	m_nodes.push_back(node);
198 	node->_setGroup(this);
199 
200 	// release the manager
201 	m_manager->unlock();
202 
203 	// first node? the transport is now ready to start
204 	if(m_nodes.size() == 1) {
205 		_changeState(TRANSPORT_INVALID, TRANSPORT_STOPPED);
206 	}
207 //
208 //	if(m_syncNode == media_node::null) {
209 //		// assign as sync node
210 //		setSyncNode(node->node());
211 //	}
212 //
213 	// initialize the new node
214 	status_t err = node->_initTransportState();
215 	if(err < B_OK)
216 		return err;
217 
218 	// set time source
219 	node->_setTimeSource(m_timeSource.node);
220 
221 	// set run mode
222 	node->_setRunMode(m_runMode);
223 
224 	// add to cycle set if need be
225 	// +++++ should I call _cycleAddRef() instead?
226 	if(node->m_cycle)
227 		_refCycleChanged(node);
228 
229 	if(m_transportState == TRANSPORT_RUNNING) {
230 		// +++++ start if necessary!
231 	}
232 	// +++++ not started if TRANSPORT_ROLLING: is that proper? [e.moon 11oct99]
233 
234 	// send notification
235 	if(!LockLooper()) {
236 		ASSERT(!"LockLooper() failed.");
237 	}
238 	BMessage m(M_NODE_ADDED);
239 	m.AddInt32("groupID", id());
240 	m.AddInt32("nodeID", node->id());
241 	notify(&m);
242 	UnlockLooper();
243 
244 	// success
245 	return B_OK;
246 }
247 
248 
249 status_t NodeGroup::removeNode(
250 	NodeRef*										node) {
251 
252 	D_METHOD((
253 		"NodeGroup::removeNode()\n"));
254 
255 	// lock the manager first; once the node is ungrouped,
256 	// the manager lock applies to it
257 	m_manager->lock();
258 
259 	Autolock _l(this);
260 
261 	// precondition: this must be the node's group
262 	if(node->m_group != this) {
263 		// [e.moon 28sep99] whoops, forgot one
264 		PRINT((
265 			"!!! node not in group '%s'\n", node->m_group->name()));
266 
267 		m_manager->unlock();
268 		return B_NOT_ALLOWED;
269 	}
270 
271 	// remove from the cycle set
272 	if(node->m_cycle)
273 		_cycleRemoveRef(node);
274 
275 	// remove it
276 	ASSERT(m_nodes.size());
277 	remove(
278 		m_nodes.begin(),
279 		m_nodes.end(),
280 		node);
281 
282 	// should have removed one and only one entry
283 	m_nodes.resize(m_nodes.size()-1);
284 
285 //	// 6aug99: the timesource is now the sync node...
286 //	// is this the sync node? reassign if so
287 //
288 //	if(node->node() == m_syncNode) {
289 //
290 //		// look for another sync-capable node
291 //		bool found = false;
292 //		for(int n = 0; !found && n < m_nodes.size(); ++n)
293 //			if(setSyncNode(m_nodes[n]->node()) == B_OK)
294 //				found = true;
295 //
296 //		// no luck? admit defeat:
297 //		if(!found) {
298 //			PRINT((
299 //				"* NodeGroup::removeNode(): no sync-capable nodes left!\n"));
300 //
301 //			// +++++ stop & set to invalid state?
302 //
303 //			setSyncNode(media_node::null);
304 //		}
305 //	}
306 
307 	// stop the node if necessary
308 	status_t err = node->_stop();
309 	if(err < B_OK) {
310 		PRINT((
311 			"*** NodeGroup::removeNode('%s'): error from node->_stop():\n"
312 			"    %s\n",
313 			node->name(),
314 			strerror(err)));
315 	}
316 
317 	// clear the node's group pointer
318 	node->_setGroup(0);
319 
320 	// release the manager lock; the node is now ungrouped and
321 	// unlocked
322 	m_manager->unlock();
323 
324 	// was that the last node? stop/disable the transport if so
325 	if(!m_nodes.size()) {
326 
327 		// +++++ kill sync thread(s)
328 
329 		_changeState(TRANSPORT_INVALID);
330 	}
331 
332 	// send notification
333 	if(!LockLooper()) {
334 		ASSERT(!"LockLooper() failed.");
335 	}
336 	BMessage m(M_NODE_REMOVED);
337 	m.AddInt32("groupID", id());
338 	m.AddInt32("nodeID", node->id());
339 	notify(&m);
340 	UnlockLooper();
341 
342 	// success
343 	return B_OK;
344 }
345 
346 status_t NodeGroup::removeNode(
347 	uint32											index) {
348 
349 	D_METHOD((
350 		"NodeGroup::removeNode(by index)\n"));
351 
352 	// +++++ icky nested lock
353 	Autolock _l(this);
354 
355 	ASSERT(m_nodes.size() > index);
356 	return removeNode(m_nodes[index]);
357 }
358 
359 uint32 NodeGroup::groupFlags() const {
360 	Autolock _l(this);
361 	return m_flags;
362 }
363 
364 status_t NodeGroup::setGroupFlags(
365 	uint32											flags) {
366 	Autolock _l(this);
367 	m_flags = flags;
368 	return B_OK;
369 }
370 
371 
372 // returns true if one or more nodes in the group have cycling
373 // enabled, and the start- and end-positions are valid
374 bool NodeGroup::canCycle() const {
375 	Autolock _l(this);
376 
377 	return
378 		m_cycleNodes.size() > 0 &&
379 		m_endPosition - m_startPosition > s_minCyclePeriod;
380 }
381 
382 // -------------------------------------------------------- //
383 // *** TRANSPORT POSITIONING (LOCK REQUIRED)
384 // -------------------------------------------------------- //
385 
386 // Fetch the current transport state
387 
388 NodeGroup::transport_state_t NodeGroup::transportState() const {
389 	Autolock _l(this);
390 	return m_transportState;
391 }
392 
393 // Set the starting media time:
394 //   This is the point at which playback will begin in any media
395 //   files/documents being played by the nodes in this group.
396 //   When cycle mode is enabled, this is the point to which each
397 //   node will be seek'd at the end of each cycle (loop).
398 //
399 //   The starting time can't be changed in the B_OFFLINE run mode
400 //   (this call will return an error.)
401 
402 status_t NodeGroup::setStartPosition(
403 	bigtime_t										start) {
404 	Autolock _l(this);
405 
406 	D_METHOD((
407 		"NodeGroup::setStartPosition(%lld)\n", start));
408 
409 	if(
410 		m_transportState == TRANSPORT_RUNNING ||
411 		m_transportState == TRANSPORT_ROLLING ||
412 		m_transportState == TRANSPORT_STARTING) {
413 
414 		if(m_runMode == BMediaNode::B_OFFLINE)
415 			return B_NOT_ALLOWED;
416 
417 		ASSERT(m_timeSourceObj);
418 
419 		if(_cycleValid()) {
420 			if(m_timeSourceObj->Now() >= m_cycleDeadline) {
421 				// too late to change start position; defer
422 //				PRINT((" - deferred\n"));
423 				m_newStartPosition = start;
424 				m_newStart = true;
425 				return B_OK;
426 			}
427 
428 			// not at deadline yet; fall through to set start position
429 		}
430 	}
431 
432 	m_startPosition = start;
433 
434 	// +++++ notify [e.moon 11oct99]
435 
436 	return B_OK;
437 }
438 
439 // Fetch the starting position:
440 
441 // +++++ if a previously-set start position was deferred, it won't be
442 //       returned yet
443 
444 bigtime_t NodeGroup::startPosition() const {
445 	Autolock _l(this);
446 
447 	return m_startPosition;
448 }
449 
450 // Set the ending media time:
451 //   This is the point at which playback will end relative to
452 //   media documents begin played by the nodes in this group;
453 //   in cycle mode, this specifies the loop point.  If the
454 //   ending time is less than or equal to the starting time,
455 //   the transport will continue until stopped manually.
456 //   If the end position is changed while the transport is playing,
457 //   it must take effect retroactively (if it's before the current
458 //   position and looping is enabled, all nodes must 'warp' to
459 //   the proper post-loop position.)
460 //
461 //   The ending time can't be changed if run mode is B_OFFLINE and
462 //   the transport is running (this call will return an error.)
463 
464 status_t NodeGroup::setEndPosition(
465 	bigtime_t										end) {
466 	Autolock _l(this);
467 
468 	D_METHOD((
469 		"NodeGroup::setEndPosition(%lld)\n", end));
470 
471 	if(
472 		m_transportState == TRANSPORT_RUNNING ||
473 		m_transportState == TRANSPORT_ROLLING ||
474 		m_transportState == TRANSPORT_STARTING) {
475 
476 		if(m_runMode == BMediaNode::B_OFFLINE)
477 			return B_NOT_ALLOWED;
478 
479 		ASSERT(m_timeSourceObj);
480 
481 		bigtime_t endDelta = end - m_endPosition;
482 
483 		if(_cycleValid()) {
484 			if(m_timeSourceObj->Now() >= m_cycleDeadline + endDelta) {
485 				// too late to change end position; defer
486 //				PRINT((" - deferred\n"));
487 				m_newEndPosition = end;
488 				m_newEnd = true;
489 				return B_OK;
490 			}
491 			else {
492 				// set new end position
493 				m_endPosition = end;
494 
495 				// inform thread
496 				ASSERT(m_cyclePort);
497 				write_port(
498 					m_cyclePort,
499 					_CYCLE_END_CHANGED,
500 					0,
501 					0);
502 //
503 //				// restart nodes' cycle threads with new end position
504 //				_cycleInit(m_cycleStart);
505 //				for(node_set::iterator it = m_cycleNodes.begin();
506 //					it != m_cycleNodes.end(); ++it) {
507 //					(*it)->_scheduleCycle(m_cycleBoundary);
508 //				}
509 //				return B_OK;
510 			}
511 		}
512 	}
513 
514 	m_endPosition = end;
515 
516 	// +++++ notify [e.moon 11oct99]
517 
518 	return B_OK;
519 }
520 
521 
522 // Fetch the end position:
523 //   Note that if the end position is less than or equal to the start
524 //   position, it's ignored.
525 
526 // +++++ if a previously-set end position was deferred, it won't be
527 //       returned yet
528 
529 bigtime_t NodeGroup::endPosition() const {
530 	Autolock _l(this);
531 	return m_endPosition;
532 }
533 
534 // -------------------------------------------------------- //
535 // *** TRANSPORT OPERATIONS (LOCK REQUIRED)
536 // -------------------------------------------------------- //
537 
538 // Preroll the group:
539 //   Seeks, then prerolls, each node in the group (honoring the
540 //   NO_SEEK and NO_PREROLL flags.)  This ensures that the group
541 //   can start as quickly as possible.
542 //
543 //   Does not return until all nodes in the group have been
544 //   prepared.
545 
546 status_t NodeGroup::preroll() {
547 	D_METHOD((
548 		"NodeGroup::preroll()\n"));
549 
550 	Autolock _l(this);
551 	return _preroll();
552 }
553 
554 // Start all nodes in the group:
555 //   Nodes with the NO_START_STOP flag aren't molested.
556 
557 status_t NodeGroup::start() {
558 	D_METHOD((
559 		"NodeGroup::start()\n"));
560 
561 	Autolock _l(this);
562 	return _start();
563 }
564 
565 // Stop all nodes in the group:
566 //   Nodes with the NO_START_STOP flag aren't molested.
567 
568 status_t NodeGroup::stop() {
569 	D_METHOD((
570 		"NodeGroup::stop()\n"));
571 
572 	Autolock _l(this);
573 	return _stop();
574 }
575 
576 // Roll all nodes in the group:
577 //   Queues a start and stop atomically (via BMediaRoster::RollNode()).
578 //   Returns B_NOT_ALLOWED if endPosition <= startPosition;
579 
580 status_t NodeGroup::roll() {
581 	D_METHOD((
582 		"NodeGroup::roll()\n"));
583 
584 	Autolock _l(this);
585 	return _roll();
586 }
587 
588 // -------------------------------------------------------- //
589 // *** TIME SOURCE & RUN-MODE OPERATIONS (LOCK REQUIRED)
590 // -------------------------------------------------------- //
591 
592 // time source control:
593 //   getTimeSource():
594 //   returns B_ERROR if no time source has been set; otherwise,
595 //   returns the node ID of the current time source for all
596 //   nodes in the group.
597 //
598 //   setTimeSource():
599 //   Calls SetTimeSourceFor() on every node in the group.
600 //   The group must be stopped; B_NOT_ALLOWED will be returned
601 //   if the state is TRANSPORT_RUNNING or TRANSPORT_ROLLING.
602 
603 status_t NodeGroup::getTimeSource(
604 	media_node*									outTimeSource) const {
605 	Autolock _l(this);
606 
607 	if(m_timeSource != media_node::null) {
608 		*outTimeSource = m_timeSource;
609 		return B_OK;
610 	}
611 	return B_ERROR;
612 }
613 
614 status_t NodeGroup::setTimeSource(
615 	const media_node&						timeSource) {
616 
617 	Autolock _l(this);
618 
619 	if(m_transportState == TRANSPORT_RUNNING || m_transportState == TRANSPORT_ROLLING)
620 		return B_NOT_ALLOWED;
621 
622 	if(m_timeSourceObj)
623 		m_timeSourceObj->Release();
624 
625 	m_timeSource = timeSource;
626 
627 	// cache a BTimeSource*
628 	m_timeSourceObj = m_manager->roster->MakeTimeSourceFor(timeSource);
629 	ASSERT(m_timeSourceObj);
630 
631 	// apply new time source to all nodes
632 	for_each(
633 		m_nodes.begin(),
634 		m_nodes.end(),
635 #if __GNUC__ <= 2
636 		bind2nd(
637 			mem_fun(&NodeRef::_setTimeSource),
638 			m_timeSource.node
639 		)
640 #else
641 		[this](NodeRef* nodeRef) { nodeRef->_setTimeSource(m_timeSource.node); }
642 #endif
643 	);
644 
645 //	// try to set as sync node
646 //	err = setSyncNode(timeSource);
647 //	if(err < B_OK) {
648 //		PRINT((
649 //			"* NodeGroup::setTimeSource(): setSyncNode() failed: %s\n",
650 //			strerror(err)));
651 //	}
652 
653 	// notify
654 	if(!LockLooper()) {
655 		ASSERT(!"LockLooper() failed.");
656 	}
657 	BMessage m(M_TIME_SOURCE_CHANGED);
658 	m.AddInt32("groupID", id());
659 	m.AddInt32("timeSourceID", timeSource.node);
660 	notify(&m);
661 	UnlockLooper();
662 
663 	return B_OK;
664 }
665 
666 // run mode access:
667 //   Sets the default run mode for the group.  This will be
668 //   applied to every node with a wildcard (0) run mode.
669 //
670 //   Special case: if the run mode is B_OFFLINE, it will be
671 //   applied to all nodes in the group.
672 
673 BMediaNode::run_mode NodeGroup::runMode() const {
674 	Autolock _l(this);
675 	return m_runMode;
676 }
677 
678 status_t NodeGroup::setRunMode(BMediaNode::run_mode mode) {
679 	Autolock _l(this);
680 
681 	m_runMode = mode;
682 
683 	// apply to all nodes
684 	for_each(
685 		m_nodes.begin(),
686 		m_nodes.end(),
687 #if __GNUC__ <= 2
688 		bind2nd(
689 			mem_fun(&NodeRef::_setRunModeAuto),
690 			m_runMode
691 		)
692 #else
693 		[this](NodeRef* ref) { ref->_setRunModeAuto(this->m_runMode); }
694 #endif
695 //		bound_method(
696 //			*this,
697 //			&NodeGroup::setRunModeFor)
698 	);
699 
700 
701 	return B_OK;
702 }
703 
704 // -------------------------------------------------------- //
705 // *** BHandler
706 // -------------------------------------------------------- //
707 
708 void NodeGroup::MessageReceived(
709 	BMessage*										message) {
710 
711 //	PRINT((
712 //		"NodeGroup::MessageReceived():\n"));
713 //	message->PrintToStream();
714 	status_t err;
715 
716 	switch(message->what) {
717 		case M_SET_TIME_SOURCE:
718 			{
719 				media_node timeSource;
720 				void* data;
721 				ssize_t dataSize;
722 				err = message->FindData(
723 					"timeSourceNode",
724 					B_RAW_TYPE,
725 					(const void**)&data,
726 					&dataSize);
727 				if(err < B_OK) {
728 					PRINT((
729 						"* NodeGroup::MessageReceived(M_SET_TIME_SOURCE):\n"
730 						"  no timeSourceNode!\n"));
731 					break;
732 				}
733 				timeSource = *(media_node*)data;
734 
735 				setTimeSource(timeSource);
736 			}
737 			break;
738 
739 		case M_SET_RUN_MODE:
740 			{
741 				uint32 runMode;
742 				err = message->FindInt32("runMode", (int32*)&runMode);
743 				if(err < B_OK) {
744 					PRINT((
745 						"* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n"
746 						"  no runMode!\n"));
747 					break;
748 				}
749 
750 				if(runMode < BMediaNode::B_OFFLINE ||
751 					runMode > BMediaNode::B_RECORDING) {
752 					PRINT((
753 						"* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n"
754 						"  invalid run mode (%" B_PRIu32 ")\n", runMode));
755 					break;
756 				}
757 
758 				setRunMode((BMediaNode::run_mode)runMode);
759 			}
760 			break;
761 
762 		case M_SET_START_POSITION:
763 			{
764 				bigtime_t position;
765 				err = message->FindInt64("position", (int64*)&position);
766 				if(err < B_OK) {
767 					PRINT((
768 						"* NodeGroup::MessageReceived(M_SET_START_POSITION):\n"
769 						"  no position!\n"));
770 					break;
771 				}
772 				setStartPosition(position);
773 			}
774 			break;
775 
776 		case M_SET_END_POSITION:
777 			{
778 				bigtime_t position;
779 				err = message->FindInt64("position", (int64*)&position);
780 				if(err < B_OK) {
781 					PRINT((
782 						"* NodeGroup::MessageReceived(M_SET_END_POSITION):\n"
783 						"  no position!\n"));
784 					break;
785 				}
786 				setEndPosition(position);
787 			}
788 			break;
789 
790 		case M_PREROLL:
791 			preroll();
792 			break;
793 
794 		case M_START:
795 			start();
796 			break;
797 
798 		case M_STOP:
799 			stop();
800 			break;
801 
802 		case M_ROLL:
803 			roll();
804 			break;
805 
806 		default:
807 			_inherited::MessageReceived(message);
808 			break;
809 	}
810 }
811 
812 
813 // -------------------------------------------------------- //
814 // *** IPersistent
815 // -------------------------------------------------------- //
816 
817 // !
818 #if CORTEX_XML
819 // !
820 
821 // +++++
822 
823 // Default constructor
824 NodeGroup::NodeGroup() :
825 	m_manager(0) {} // +++++ finish initialization
826 
827 
828 // !
829 #endif /*CORTEX_XML*/
830 // !
831 
832 // -------------------------------------------------------- //
833 // *** IObservable:		[19aug99]
834 // -------------------------------------------------------- //
835 
836 void NodeGroup::observerAdded(
837 	const BMessenger&				observer) {
838 
839 	BMessage m(M_OBSERVER_ADDED);
840 	m.AddInt32("groupID", id());
841 	m.AddMessenger("target", BMessenger(this));
842 	observer.SendMessage(&m);
843 }
844 
845 void NodeGroup::observerRemoved(
846 	const BMessenger&				observer) {
847 
848 	BMessage m(M_OBSERVER_REMOVED);
849 	m.AddInt32("groupID", id());
850 	m.AddMessenger("target", BMessenger(this));
851 	observer.SendMessage(&m);
852 }
853 
854 void NodeGroup::notifyRelease() {
855 
856 	BMessage m(M_RELEASED);
857 	m.AddInt32("groupID", id());
858 	m.AddMessenger("target", BMessenger(this));
859 	notify(&m);
860 }
861 
862 void NodeGroup::releaseComplete() {
863 	// +++++
864 }
865 
866 // -------------------------------------------------------- //
867 // *** ILockable: pass lock requests to m_lock
868 // -------------------------------------------------------- //
869 
870 bool NodeGroup::lock(
871 	lock_t type,
872 	bigtime_t timeout) {
873 
874 	D_LOCK(("*** NodeGroup::lock(): %ld\n", find_thread(0)));
875 
876 	ASSERT(type == WRITE);
877 	status_t err = m_lock.LockWithTimeout(timeout);
878 
879 	D_LOCK(("*** NodeGroup::lock() ACQUIRED: %ld\n", find_thread(0)));
880 
881 	return err == B_OK;
882 }
883 
884 bool NodeGroup::unlock(
885 	lock_t type) {
886 
887 	D_LOCK(("*** NodeGroup::unlock(): %ld\n", find_thread(0)));
888 
889 	ASSERT(type == WRITE);
890 	m_lock.Unlock();
891 
892 	D_LOCK(("*** NodeGroup::unlock() RELEASED: %ld\n", find_thread(0)));
893 
894 	return true;
895 }
896 
897 bool NodeGroup::isLocked(
898 	lock_t type) const {
899 
900 	ASSERT(type == WRITE);
901 	return m_lock.IsLocked();
902 }
903 
904 // -------------------------------------------------------- //
905 // *** ctor (accessible to NodeManager)
906 // -------------------------------------------------------- //
907 
908 NodeGroup::NodeGroup(
909 	const char*									name,
910 	NodeManager*								manager,
911 	BMediaNode::run_mode				runMode) :
912 
913 	ObservableHandler(name),
914 	m_lock("NodeGroup::m_lock"),
915 	m_manager(manager),
916 	m_id(NextID()),
917 	m_name(name),
918 	m_flags(0),
919 	m_transportState(TRANSPORT_INVALID),
920 	m_runMode(runMode),
921 	m_timeSourceObj(0),
922 	m_released(false),
923 	m_cycleThread(0),
924 	m_cyclePort(0),
925 	m_startPosition(0LL),
926 	m_endPosition(0LL),
927 	m_newStart(false),
928 	m_newEnd(false) {
929 
930 	ASSERT(m_manager);
931 
932 	if(!m_manager->Lock()) {
933 		ASSERT(!"m_manager->Lock() failed");
934 	}
935 	m_manager->AddHandler(this);
936 	m_manager->Unlock();
937 
938 	// set default time source
939 	media_node ts;
940 	D_ROSTER(("# roster->GetTimeSource()\n"));
941 	status_t err = m_manager->roster->GetTimeSource(&ts);
942 	if(err < B_OK) {
943 		PRINT((
944 			"*** NodeGroup(): roster->GetTimeSource() failed:\n"
945 			"    %s\n", strerror(err)));
946 	}
947 	setTimeSource(ts);
948 }
949 
950 // -------------------------------------------------------- //
951 // *** internal operations
952 // -------------------------------------------------------- //
953 
954 uint32 NodeGroup::s_nextID = 1;
955 uint32 NodeGroup::NextID() {
956 	return atomic_add((int32*)&s_nextID, 1);
957 }
958 
959 // -------------------------------------------------------- //
960 // *** ref->group communication (LOCK REQUIRED)
961 // -------------------------------------------------------- //
962 
963 // when a NodeRef's cycle state (ie. looping or not looping)
964 // changes, it must pass that information on via this method
965 
966 void NodeGroup::_refCycleChanged(
967 	NodeRef*										ref) {
968 	assert_locked(this);
969 	D_METHOD((
970 		"NodeGroup::_refCycleChanged('%s')\n",
971 		ref->name()));
972 
973 	if(ref->m_cycle) {
974 		_cycleAddRef(ref);
975 	}	else {
976 		_cycleRemoveRef(ref);
977 	}
978 
979 	// +++++ if running & cycle valid, the node should be properly
980 	//       seek'd and start'd
981 }
982 
983 
984 // when a cycling node's latency changes, call this method.
985 
986 void NodeGroup::_refLatencyChanged(
987 	NodeRef*										ref) {
988 	assert_locked(this);
989 	D_METHOD((
990 		"NodeGroup::_refLatencyChanged('%s')\n",
991 		ref->name()));
992 
993 	if(!_cycleValid())
994 		return;
995 
996 	// remove & replace ref (positions it properly)
997 	_cycleRemoveRef(ref);
998 	_cycleAddRef(ref);
999 
1000 	// slap my thread up
1001 	ASSERT(m_cyclePort);
1002 	write_port(
1003 		m_cyclePort,
1004 		_CYCLE_LATENCY_CHANGED,
1005 		0,
1006 		0);
1007 
1008 	// +++++ zat it?
1009 }
1010 
1011 // when a NodeRef receives notification that it has been stopped,
1012 // but is labeled as still running, it must call this method.
1013 // [e.moon 11oct99: roll/B_OFFLINE support]
1014 
1015 void NodeGroup::_refStopped(
1016 	NodeRef*										ref) {
1017 	assert_locked(this);
1018 	D_METHOD((
1019 		"NodeGroup::_refStopped('%s')\n",
1020 		ref->name()));
1021 
1022 	// roll/B_OFFLINE support [e.moon 11oct99]
1023 	// (check to see if any other nodes in the group are still running;
1024 	//  mark group stopped if not.)
1025 	if(m_transportState == TRANSPORT_ROLLING) {
1026 		bool nodesRunning = false;
1027 		for(node_set::iterator it = m_nodes.begin();
1028 			it != m_nodes.end(); ++it) {
1029 			if((*it)->isRunning()) {
1030 				nodesRunning = true;
1031 				break;
1032 			}
1033 		}
1034 		if(!nodesRunning)
1035 			// the group has stopped; update transport state
1036 			_changeState(TRANSPORT_STOPPED);
1037 
1038 	}
1039 
1040 }
1041 
1042 
1043 // -------------------------------------------------------- //
1044 // *** transport helpers (LOCK REQUIRED)
1045 // -------------------------------------------------------- //
1046 
1047 
1048 // Preroll all nodes in the group; this is the implementation
1049 // of preroll().
1050 // *** this method should not be called from the transport thread
1051 // (since preroll operations can block for a relatively long time.)
1052 
1053 status_t NodeGroup::_preroll() {
1054 	assert_locked(this);
1055 
1056 	D_METHOD((
1057 		"NodeGroup::_preroll()\n"));
1058 
1059 	if(
1060 		m_transportState == TRANSPORT_RUNNING ||
1061 		m_transportState == TRANSPORT_ROLLING)
1062 		// too late
1063 		return B_NOT_ALLOWED;
1064 
1065 	// * preroll all nodes to the start position
1066 
1067 	// +++++ currently, if an error is encountered it's ignored.
1068 	//       should the whole operation fail if one node couldn't
1069 	//       be prerolled?
1070 	//
1071 	//       My gut response is 'no', since the preroll step is
1072 	//       optional, but the caller should have some inkling that
1073 	//       one of its nodes didn't behave.
1074 
1075 // [e.moon 13oct99] making PPC compiler happy
1076 //	for_each(
1077 //		m_nodes.begin(),
1078 //		m_nodes.end(),
1079 //		bind2nd(
1080 //			mem_fun(&NodeRef::_preroll),
1081 //			m_startPosition
1082 //		)
1083 //	);
1084 	for(node_set::iterator it = m_nodes.begin();
1085 		it != m_nodes.end(); ++it) {
1086 		(*it)->_preroll(m_startPosition);
1087 	}
1088 
1089 //    replaces
1090 //		bind2nd(
1091 //			bound_method(*this, &NodeGroup::prerollNode),
1092 //			m_startPosition
1093 //		)
1094 
1095 	return B_OK;
1096 }
1097 
1098 
1099 //// functor: calculates latency of each node it's handed, caching
1100 //// the largest one found; includes initial latency if nodes report it.
1101 //
1102 //class NodeGroup::calcLatencyFn { public:
1103 //	bigtime_t& maxLatency;
1104 //
1105 //	calcLatencyFn(bigtime_t& _m) : maxLatency(_m) {}
1106 //
1107 //	void operator()(NodeRef* r) {
1108 //		ASSERT(r);
1109 //
1110 ////		PRINT((
1111 ////			"# calcLatencyFn(): '%s'\n",
1112 ////			r->name()));
1113 //
1114 //		if(!(r->node().kind & B_BUFFER_PRODUCER)) {
1115 //			// node can't incur latency
1116 ////			PRINT((
1117 ////				"-   not a producer\n"));
1118 //			return;
1119 //		}
1120 //
1121 //		bigtime_t latency;
1122 //		status_t err =
1123 //			BMediaRoster::Roster()->GetLatencyFor(
1124 //				r->node(),
1125 //				&latency);
1126 //		if(err < B_OK) {
1127 //			PRINT((
1128 //				"* calcLatencyFn: GetLatencyFor() failed: %s\n",
1129 //				strerror(err)));
1130 //			return;
1131 //		}
1132 ////		PRINT(("-   %lld\n", latency));
1133 //
1134 //		bigtime_t add;
1135 //		err = BMediaRoster::Roster()->GetInitialLatencyFor(
1136 //			r->node(),
1137 //			&add);
1138 ////		PRINT(("-   %lld\n", add));
1139 //		if(err < B_OK) {
1140 //			PRINT((
1141 //				"* calcLatencyFn: GetInitialLatencyFor() failed: %s\n",
1142 //				strerror(err)));
1143 //		}
1144 //		else
1145 //			latency += add;
1146 //
1147 //		if(latency > maxLatency)
1148 //			maxLatency = latency;
1149 //
1150 ////		PRINT((
1151 ////			"-   max latency: %lld\n",
1152 ////			maxLatency));
1153 //	}
1154 //};
1155 
1156 // Start all nodes in the group; this is the implementation of
1157 // start().  Fails if the run mode is B_OFFLINE; use _roll() instead
1158 // in that case.
1159 //
1160 // (this may be called from the transport thread or from
1161 //  an API-implementation method.)
1162 
1163 status_t NodeGroup::_start() {
1164 	assert_locked(this);
1165 
1166 	D_METHOD((
1167 		"NodeGroup::_start()\n"));
1168 	status_t err;
1169 
1170 	if(m_transportState != TRANSPORT_STOPPED)
1171 		return B_NOT_ALLOWED;
1172 
1173 	if(m_runMode == BMediaNode::B_OFFLINE)
1174 		return B_NOT_ALLOWED;
1175 
1176 	ASSERT(m_nodes.size());
1177 
1178 	_changeState(TRANSPORT_STARTING);
1179 
1180 	// * Find the highest latency in the group
1181 
1182 	bigtime_t offset = 0LL;
1183 	calcLatencyFn _f(offset);
1184 	for_each(
1185 		m_nodes.begin(),
1186 		m_nodes.end(),
1187 		_f);
1188 
1189 	offset += s_rosterLatency;
1190 	PRINT((
1191 		"- offset: %" B_PRIdBIGTIME "\n", offset));
1192 
1193 	// * Seek all nodes (in case one or more failed to preroll)
1194 
1195 	for(node_set::iterator it = m_nodes.begin();
1196 		it != m_nodes.end(); ++it) {
1197 		err = (*it)->_seekStopped(m_startPosition);
1198 		if(err < B_OK) {
1199 			PRINT((
1200 				"! NodeGroup('%s')::_start():\n"
1201 				"  ref('%s')->_seekStopped(%" B_PRIdBIGTIME ") failed:\n"
1202 				"  %s\n",
1203 				name(), (*it)->name(), m_startPosition,
1204 				strerror(err)));
1205 
1206 			// +++++ continue?
1207 		}
1208 	}
1209 
1210 	// * Start all nodes, allowing for the max latency found
1211 
1212 	ASSERT(m_timeSourceObj);
1213 	bigtime_t when = m_timeSourceObj->Now() + offset;
1214 
1215 	// 10aug99: initialize cycle (loop) settings
1216 	if(_cycleValid()) {
1217 		_initCycleThread();
1218 		_cycleInit(when);
1219 	}
1220 
1221 	// start the nodes
1222 	for(node_set::iterator it = m_nodes.begin();
1223 		it != m_nodes.end(); ++it) {
1224 		err = (*it)->_start(when);
1225 		if(err < B_OK) {
1226 			PRINT((
1227 				"! NodeGroup('%s')::_start():\n"
1228 				"  ref('%s')->_start(%" B_PRIdBIGTIME ") failed:\n"
1229 				"  %s\n",
1230 				name(), (*it)->name(), when,
1231 				strerror(err)));
1232 
1233 			// +++++ continue?
1234 		}
1235 	}
1236 
1237 	// notify observers
1238 	_changeState(TRANSPORT_RUNNING);
1239 	return B_OK;
1240 }
1241 
1242 // Stop all nodes in the group; this is the implementation of
1243 // stop().
1244 //
1245 // (this may be called from the transport thread or from
1246 //  an API-implementation method.)
1247 
1248 status_t NodeGroup::_stop() {
1249 
1250 	D_METHOD((
1251 		"NodeGroup::_stop()\n"));
1252 
1253 	assert_locked(this);
1254 
1255 	if(
1256 		m_transportState != TRANSPORT_RUNNING &&
1257 		m_transportState != TRANSPORT_ROLLING)
1258 		return B_NOT_ALLOWED;
1259 
1260 	_changeState(TRANSPORT_STOPPING);
1261 
1262 	// * stop the cycle thread if need be
1263 	_destroyCycleThread();
1264 
1265 	// * stop all nodes
1266 	//   +++++ error reports would be nice
1267 
1268 	for_each(
1269 		m_nodes.begin(),
1270 		m_nodes.end(),
1271 #if __GNUC__ <= 2
1272 		mem_fun(&NodeRef::_stop)
1273 #else
1274 		[](NodeRef* ref) { ref->_stop(); }
1275 #endif
1276 	);
1277 
1278 	// update transport state
1279 	_changeState(TRANSPORT_STOPPED);
1280 
1281 	return B_OK;
1282 }
1283 
1284 // Roll all nodes in the group; this is the implementation of
1285 // roll().
1286 //
1287 // (this may be called from the transport thread or from
1288 //  an API-implementation method.)
1289 
1290 status_t NodeGroup::_roll() {
1291 
1292 	D_METHOD((
1293 		"NodeGroup::_roll()\n"));
1294 	assert_locked(this);
1295 	status_t err;
1296 
1297 	if(m_transportState != TRANSPORT_STOPPED)
1298 		return B_NOT_ALLOWED;
1299 
1300 	bigtime_t period = m_endPosition - m_startPosition;
1301 	if(period <= 0LL)
1302 		return B_NOT_ALLOWED;
1303 
1304 	_changeState(TRANSPORT_STARTING);
1305 
1306 	bigtime_t tpStart = 0LL;
1307 	bigtime_t tpStop = period;
1308 
1309 	if(m_runMode != BMediaNode::B_OFFLINE) {
1310 
1311 		// * Find the highest latency in the group
1312 		bigtime_t offset = 0LL;
1313 		calcLatencyFn _f(offset);
1314 		for_each(
1315 			m_nodes.begin(),
1316 			m_nodes.end(),
1317 			_f);
1318 
1319 		offset += s_rosterLatency;
1320 		PRINT((
1321 			"- offset: %" B_PRIdBIGTIME "\n", offset));
1322 
1323 		ASSERT(m_timeSourceObj);
1324 		tpStart = m_timeSourceObj->Now() + offset;
1325 		tpStop += tpStart;
1326 	}
1327 
1328 	// * Roll all nodes; watch for errors
1329 	bool allFailed = true;
1330 	err = B_OK;
1331 	for(
1332 		node_set::iterator it = m_nodes.begin();
1333 		it != m_nodes.end(); ++it) {
1334 
1335 		status_t e = (*it)->_roll(
1336 			tpStart,
1337 			tpStop,
1338 			m_startPosition);
1339 		if(e < B_OK)
1340 			err = e;
1341 		else
1342 			allFailed = false;
1343 	}
1344 
1345 	if(!allFailed)
1346 		// notify observers
1347 		_changeState(TRANSPORT_ROLLING);
1348 
1349 	return err;
1350 }
1351 
1352 
1353 // State transition; notify listeners
1354 // +++++ [18aug99] DANGER: should notification happen in the middle
1355 //                         of such an operation?
1356 inline void NodeGroup::_changeState(
1357 	transport_state_t			to) {
1358 
1359 	assert_locked(this);
1360 
1361 	m_transportState = to;
1362 
1363 	if(!LockLooper()) {
1364 		ASSERT(!"LockLooper() failed.");
1365 	}
1366 	BMessage m(M_TRANSPORT_STATE_CHANGED);
1367 	m.AddInt32("groupID", id());
1368 	m.AddInt32("transportState", m_transportState);
1369 	notify(&m);
1370 	UnlockLooper();
1371 }
1372 
1373 // Enforce a state transition, and notify listeners
1374 inline void NodeGroup::_changeState(
1375 	transport_state_t			from,
1376 	transport_state_t			to) {
1377 
1378 	assert_locked(this);
1379 	ASSERT(m_transportState == from);
1380 
1381 	_changeState(to);
1382 }
1383 
1384 
1385 // -------------------------------------------------------- //
1386 // *** cycle thread & helpers (LOCK REQUIRED)
1387 // -------------------------------------------------------- //
1388 
1389 // *** cycle port definitions
1390 
1391 const int32					_portLength			= 32;
1392 const char* const		_portName				= "NodeGroup::m_cyclePort";
1393 const size_t				_portMsgMaxSize	= 256;
1394 
1395 
1396 // set up the cycle thread (including its kernel port)
1397 status_t NodeGroup::_initCycleThread() {
1398 	assert_locked(this);
1399 	status_t err;
1400 	D_METHOD((
1401 		"NodeGroup::_initCycleThread()\n"));
1402 
1403 	if(m_cycleThread) {
1404 		// thread is still alive
1405 		err = _destroyCycleThread();
1406 		if(err < B_OK)
1407 			return err;
1408 	}
1409 
1410 	// create
1411 	m_cycleThreadDone = false;
1412 	m_cycleThread = spawn_thread(
1413 		&_CycleThread,
1414 		"NodeGroup[cycleThread]",
1415 		B_NORMAL_PRIORITY,
1416 		(void*)this);
1417 	if(m_cycleThread < B_OK) {
1418 		PRINT((
1419 			"* NodeGroup::_initCycleThread(): spawn_thread() failed:\n"
1420 			"  %s\n",
1421 			strerror(m_cycleThread)));
1422 		return m_cycleThread;
1423 	}
1424 
1425 	// launch
1426 	return resume_thread(m_cycleThread);
1427 }
1428 
1429 // shut down the cycle thread/port
1430 status_t NodeGroup::_destroyCycleThread() {
1431 	assert_locked(this);
1432 	status_t err;
1433 	D_METHOD((
1434 		"NodeGroup::_destroyCycleThread()\n"));
1435 
1436 	if(!m_cycleThread)
1437 		return B_OK;
1438 
1439 	if(!m_cycleThreadDone) {
1440 		// kill the thread
1441 		ASSERT(m_cyclePort);
1442 		err = write_port_etc(
1443 			m_cyclePort,
1444 			_CYCLE_STOP,
1445 			0,
1446 			0,
1447 			B_TIMEOUT,
1448 			10000LL);
1449 
1450 		if(err < B_OK) {
1451 			// bad thread.  die, thread, die.
1452 			PRINT((
1453 				"* NodeGroup::_destroyCycleThread(): port write failed; killing.\n"));
1454 			delete_port(m_cyclePort);
1455 			m_cyclePort = 0;
1456 			kill_thread(m_cycleThread);
1457 			m_cycleThread = 0;
1458 			return B_OK;
1459 		}
1460 
1461 		// the thread got the message; wait for it to quit
1462 		unlock();
1463 		while(wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED) {
1464 			PRINT((
1465 				"! wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED\n"));
1466 		}
1467 		lock();
1468 	}
1469 
1470 	// it's up to the thread to close its port
1471 	ASSERT(!m_cyclePort);
1472 
1473 	m_cycleThread = 0;
1474 
1475 	return B_OK;
1476 }
1477 
1478 
1479 // 1) do the current positions specify a valid cycle region?
1480 // 2) are any nodes in the group cycle-enabled?
1481 
1482 bool NodeGroup::_cycleValid() {
1483 	assert_locked(this);
1484 	return
1485 		(m_transportState == TRANSPORT_RUNNING ||
1486 		 m_transportState == TRANSPORT_STARTING) &&
1487 		 canCycle();
1488 }
1489 
1490 // initialize the cycle members (call when starting)
1491 
1492 void NodeGroup::_cycleInit(
1493 	bigtime_t										startTime) {
1494 	assert_locked(this);
1495 	ASSERT(m_cycleNodes.size() > 0);
1496 	D_METHOD((
1497 		"NodeGroup::_cycleInit(%lld)\n",
1498 		startTime));
1499 
1500 	// +++++ rescan latencies?
1501 
1502 	// figure new boundary & deadline from region length
1503 	bigtime_t cyclePeriod = m_endPosition - m_startPosition;
1504 
1505 	if(cyclePeriod <= 0) {
1506 		// cycle region is no longer valid
1507 		m_cycleBoundary = 0LL;
1508 		m_cycleDeadline = 0LL;
1509 
1510 //		no no no -- deadlocks when the thread calls this method
1511 //		// stop the thread
1512 //		_destroyCycleThread();
1513 		return;
1514 	}
1515 
1516 	m_cycleStart = startTime;
1517 	m_cycleBoundary = startTime + cyclePeriod;
1518 	m_cycleDeadline = m_cycleBoundary - (m_cycleMaxLatency + s_rosterLatency);
1519 }
1520 
1521 
1522 // add a ref to the cycle set (in proper order, based on latency)
1523 void NodeGroup::_cycleAddRef(
1524 	NodeRef*										ref) {
1525 	assert_locked(this);
1526 
1527 	// make sure it's not already there
1528 	ASSERT(find(
1529 		m_cycleNodes.begin(),
1530 		m_cycleNodes.end(),
1531 		ref) == m_cycleNodes.end());
1532 
1533 	// [re]calc latency if 0
1534 	if(!ref->m_latency)
1535 		ref->_updateLatency();
1536 
1537 	node_set::iterator it;
1538 	for(it = m_cycleNodes.begin();
1539 		it != m_cycleNodes.end(); ++it) {
1540 		if(ref->m_latency > (*it)->m_latency) {
1541 			m_cycleNodes.insert(it, ref);
1542 			break;
1543 		}
1544 	}
1545 
1546 	// not inserted? new ref belongs at the end
1547 	if(it == m_cycleNodes.end())
1548 		m_cycleNodes.insert(it, ref);
1549 }
1550 
1551 // remove a ref from the cycle set
1552 void NodeGroup::_cycleRemoveRef(
1553 	NodeRef*										ref) {
1554 	assert_locked(this);
1555 
1556 	node_set::iterator it = find(
1557 		m_cycleNodes.begin(),
1558 		m_cycleNodes.end(),
1559 		ref);
1560 	ASSERT(it != m_cycleNodes.end());
1561 	m_cycleNodes.erase(it);
1562 }
1563 
1564 bigtime_t NodeGroup::_cycleBoundary() const {
1565 	Autolock _l(this);
1566 	return m_cycleBoundary;
1567 }
1568 
1569 // cycle thread impl.
1570 /*static*/
1571 status_t NodeGroup::_CycleThread(void* user) {
1572 	((NodeGroup*)user)->_cycleThread();
1573 	return B_OK;
1574 }
1575 
1576 void NodeGroup::_cycleThread() {
1577 
1578 	status_t err;
1579 	int32 code;
1580 	int32 errorCount = 0;
1581 
1582 	// +++++ liability -- if the thread has to be killed, this buffer
1583 	//       won't be reclaimed
1584 	char* msgBuffer = new char[_portMsgMaxSize];
1585 	array_delete<char> _d(msgBuffer);
1586 
1587 	// create port
1588 	ASSERT(!m_cyclePort);
1589 	m_cyclePort = create_port(
1590 		_portLength,
1591 		_portName);
1592 	ASSERT(m_cyclePort >= B_OK);
1593 
1594 	// the message-handling loop
1595 	bool done = false;
1596 	while(!done) {
1597 
1598 		// *** wait until it's time to queue the next cycle, or until
1599 		// *** a message arrives
1600 
1601 		lock();				// **** BEGIN LOCKED SECTION ****
1602 		if(!_cycleValid()) {
1603 			unlock();
1604 			break;
1605 		}
1606 
1607 		ASSERT(m_cycleNodes.size() > 0);
1608 		ASSERT(m_timeSourceObj);
1609 
1610 		bigtime_t maxLatency = m_cycleNodes.front()->m_latency;
1611 		bigtime_t wakeUpAt = m_timeSourceObj->RealTimeFor(
1612 			m_cycleBoundary, maxLatency + s_rosterLatency);
1613 		bigtime_t timeout = wakeUpAt - m_timeSourceObj->RealTime();
1614 
1615 		if(timeout <= 0) {
1616 			// +++++ whoops, I'm late.
1617 			// +++++ adjust to compensate !!!
1618 			PRINT((
1619 				"*** NodeGroup::_cycleThread(): LATE\n"
1620 				"    by %" B_PRIdBIGTIME "\n", -timeout));
1621 		}
1622 
1623 		// +++++ if timeout is very short, spin until the target time arrives
1624 
1625 		unlock();			// **** END LOCKED SECTION ****
1626 
1627 		// block until message arrives or it's time to wake up
1628 		err = read_port_etc(
1629 			m_cyclePort,
1630 			&code,
1631 			msgBuffer,
1632 			_portMsgMaxSize,
1633 			B_TIMEOUT,
1634 			timeout);
1635 
1636 		if(err == B_TIMED_OUT) {
1637 			// the time has come to seek my nodes
1638 			_handleCycleService();
1639 			continue;
1640 		}
1641 		else if(err < B_OK) {
1642 			// any other error is bad news
1643 			PRINT((
1644 				"* NodeGroup::_cycleThread(): read_port error:\n"
1645 				"  %s\n"
1646 				"  ABORTING\n\n", strerror(err)));
1647 			if(++errorCount > 10) {
1648 				PRINT((
1649 					"*** Too many errors; aborting.\n"));
1650 				break;
1651 			}
1652 			continue;
1653 		}
1654 
1655 		errorCount = 0;
1656 
1657 		// process the message
1658 		switch(code) {
1659 			case _CYCLE_STOP:
1660 				// bail
1661 				done = true;
1662 				break;
1663 
1664 			case _CYCLE_END_CHANGED:
1665 			case _CYCLE_LATENCY_CHANGED:
1666 				// fall through to next loop; for now, these messages
1667 				// serve only to slap me out of my stupor and reassess
1668 				// the timing situation...
1669 				break;
1670 
1671 			default:
1672 				PRINT((
1673 					"* NodeGroup::_cycleThread(): unknown message code '%"
1674 						B_PRId32 "'\n", code));
1675 				break;
1676 		}
1677 	} // while(!done)
1678 
1679 
1680 	// delete port
1681 	delete_port(m_cyclePort);
1682 	m_cyclePort = 0;
1683 
1684 	// done
1685 	m_cycleThreadDone = true;
1686 }
1687 
1688 // cycle service: seek all nodes & initiate next cycle
1689 void NodeGroup::_handleCycleService() {
1690 	Autolock _l(this);
1691 //	D_METHOD((
1692 //		"NodeGroup::_handleCycleService()\n"));
1693 	status_t err;
1694 
1695 	if(!_cycleValid()) {
1696 //		PRINT((
1697 //			"- _handleCycleService(): cycle not valid; quitting.\n"));
1698 		return;
1699 	}
1700 
1701 	// seek
1702 	for(node_set::iterator it = m_cycleNodes.begin();
1703 		it != m_cycleNodes.end(); ++it) {
1704 		err = (*it)->_seek(
1705 			m_startPosition,
1706 			m_cycleBoundary);
1707 		if(err < B_OK) {
1708 			PRINT((
1709 				"- _handleCycleService(): node('%s')::_seek() failed:\n"
1710 				"  %s\n",
1711 				(*it)->name(), strerror(err)));
1712 		}
1713 	}
1714 
1715 	// update cycle settings
1716 	if(m_newStart) {
1717 		m_newStart = false;
1718 		m_startPosition = m_newStartPosition;
1719 	}
1720 	if(m_newEnd) {
1721 		m_newEnd = false;
1722 		m_endPosition = m_newEndPosition;
1723 	}
1724 
1725 	// prepare next cycle
1726 	_cycleInit(m_cycleBoundary);
1727 }
1728 
1729 // END -- NodeGroup.cpp --
1730