1 /*
2 * Copyright (c) 1999-2000, Eric Moon.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions, and the following disclaimer.
11 *
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions, and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * 3. The name of the author may not be used to endorse or promote products
17 * derived from this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR
20 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21 * OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
23 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
27 * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31
32 // NodeGroup.cpp
33
34 #include "NodeGroup.h"
35 //#include "NodeGroup_transport_thread.h"
36
37 #include "NodeManager.h"
38 #include "NodeRef.h"
39
40 #include <MediaRoster.h>
41 #include <OS.h>
42 #include <TimeSource.h>
43
44 #include <algorithm>
45 #include <functional>
46
47 #include "array_delete.h"
48 #include "BasicThread.h"
49 #include "node_manager_impl.h"
50 #include "functional_tools.h"
51
52 using namespace std;
53
54 __USE_CORTEX_NAMESPACE
55 #define D_METHOD(x) //PRINT (x)
56 #define D_ROSTER(x) //PRINT (x)
57 #define D_LOCK(x) //PRINT (x)
58
59
60
61 // -------------------------------------------------------- //
62 // *** ctor/dtor
63 // -------------------------------------------------------- //
64
65 // free the group, including all nodes within it
66 // (this call will result in the eventual deletion of the object.)
67 // returns B_OK on success; B_NOT_ALLOWED if release() has
68 // already been called; other error codes if the Media Roster
69 // call fails.
70 // * THE MANAGER MUST BE LOCKED
71
release()72 status_t NodeGroup::release() {
73
74 D_METHOD((
75 "NodeGroup::release()\n"));
76
77 if(isReleased())
78 return B_NOT_ALLOWED;
79
80 // clean up
81 lock();
82
83 // halt all nodes
84 _stop();
85
86 // remove & release all nodes
87 // +++++ causes triply-nested lock: eww!
88 while(m_nodes.size()) {
89 NodeRef* last = m_nodes.back();
90 removeNode(m_nodes.size()-1);
91 last->release();
92 }
93
94 unlock();
95
96 // [e.moon 7nov99]
97 // removing the released group is now NodeManager's responsibility
98 //
99 // remove from NodeManager
100 if(!m_manager->lock()) {
101 ASSERT(!"* m_manager->lock() failed.\n");
102 }
103 m_manager->_removeGroup(this);
104 m_manager->unlock();
105
106 // hand off to IObservable
107 return _inherited::release();
108 }
109
110 // call release() rather than deleting NodeGroup objects
~NodeGroup()111 NodeGroup::~NodeGroup() {
112
113 Autolock _l(this);
114 D_METHOD((
115 "~NodeGroup()\n"));
116
117 ASSERT(!m_nodes.size());
118
119 if(m_timeSourceObj) {
120 m_timeSourceObj->Release();
121 m_timeSourceObj = 0;
122 }
123 }
124
125
126 // -------------------------------------------------------- //
127 // *** accessors
128 // -------------------------------------------------------- //
129
130 // [e.moon 13oct99] moved to header
131 //inline uint32 NodeGroup::id() const { return m_id; }
132
133 // -------------------------------------------------------- //
134 // *** operations
135 // -------------------------------------------------------- //
136
137 // name access
name() const138 const char* NodeGroup::name() const {
139 Autolock _l(this);
140 return m_name.String();
141 }
142
setName(const char * name)143 status_t NodeGroup::setName(const char* name) {
144 Autolock _l(this);
145 m_name = name;
146 return B_OK;
147 }
148
149 // content access
countNodes() const150 uint32 NodeGroup::countNodes() const {
151 Autolock _l(this);
152 return m_nodes.size();
153 }
154
nodeAt(uint32 index) const155 NodeRef* NodeGroup::nodeAt(
156 uint32 index) const {
157 Autolock _l(this);
158 return (index < m_nodes.size()) ?
159 m_nodes[index] :
160 0;
161 }
162
163 // add/remove nodes:
164 // - you may only add a node with no current group.
165 // - nodes added during playback will be started;
166 // nodes removed during playback will be stopped (unless
167 // the NO_START_STOP transport restriction flag is set
168 // for a given node.)
169
addNode(NodeRef * node)170 status_t NodeGroup::addNode(
171 NodeRef* node) {
172
173 D_METHOD((
174 "NodeGroup::addNode()\n"));
175
176 // lock the manager first; if the node has no current group,
177 // this locks the node.
178 m_manager->lock();
179
180 Autolock _l(this);
181
182 // precondition: GROUP_LOCKED not set
183 if(m_flags & GROUP_LOCKED)
184 return B_NOT_ALLOWED;
185
186 // precondition: no current group
187 if(node->m_group) {
188 // [e.moon 28sep99] whoops, forgot one
189 PRINT((
190 "!!! node already in group '%s'\n", node->m_group->name()));
191
192 m_manager->unlock();
193 return B_NOT_ALLOWED;
194 }
195
196 // add it
197 m_nodes.push_back(node);
198 node->_setGroup(this);
199
200 // release the manager
201 m_manager->unlock();
202
203 // first node? the transport is now ready to start
204 if(m_nodes.size() == 1) {
205 _changeState(TRANSPORT_INVALID, TRANSPORT_STOPPED);
206 }
207 //
208 // if(m_syncNode == media_node::null) {
209 // // assign as sync node
210 // setSyncNode(node->node());
211 // }
212 //
213 // initialize the new node
214 status_t err = node->_initTransportState();
215 if(err < B_OK)
216 return err;
217
218 // set time source
219 node->_setTimeSource(m_timeSource.node);
220
221 // set run mode
222 node->_setRunMode(m_runMode);
223
224 // add to cycle set if need be
225 // +++++ should I call _cycleAddRef() instead?
226 if(node->m_cycle)
227 _refCycleChanged(node);
228
229 if(m_transportState == TRANSPORT_RUNNING) {
230 // +++++ start if necessary!
231 }
232 // +++++ not started if TRANSPORT_ROLLING: is that proper? [e.moon 11oct99]
233
234 // send notification
235 if(!LockLooper()) {
236 ASSERT(!"LockLooper() failed.");
237 }
238 BMessage m(M_NODE_ADDED);
239 m.AddInt32("groupID", id());
240 m.AddInt32("nodeID", node->id());
241 notify(&m);
242 UnlockLooper();
243
244 // success
245 return B_OK;
246 }
247
248
removeNode(NodeRef * node)249 status_t NodeGroup::removeNode(
250 NodeRef* node) {
251
252 D_METHOD((
253 "NodeGroup::removeNode()\n"));
254
255 // lock the manager first; once the node is ungrouped,
256 // the manager lock applies to it
257 m_manager->lock();
258
259 Autolock _l(this);
260
261 // precondition: this must be the node's group
262 if(node->m_group != this) {
263 // [e.moon 28sep99] whoops, forgot one
264 PRINT((
265 "!!! node not in group '%s'\n", node->m_group->name()));
266
267 m_manager->unlock();
268 return B_NOT_ALLOWED;
269 }
270
271 // remove from the cycle set
272 if(node->m_cycle)
273 _cycleRemoveRef(node);
274
275 // remove it
276 ASSERT(m_nodes.size());
277 remove(
278 m_nodes.begin(),
279 m_nodes.end(),
280 node);
281
282 // should have removed one and only one entry
283 m_nodes.resize(m_nodes.size()-1);
284
285 // // 6aug99: the timesource is now the sync node...
286 // // is this the sync node? reassign if so
287 //
288 // if(node->node() == m_syncNode) {
289 //
290 // // look for another sync-capable node
291 // bool found = false;
292 // for(int n = 0; !found && n < m_nodes.size(); ++n)
293 // if(setSyncNode(m_nodes[n]->node()) == B_OK)
294 // found = true;
295 //
296 // // no luck? admit defeat:
297 // if(!found) {
298 // PRINT((
299 // "* NodeGroup::removeNode(): no sync-capable nodes left!\n"));
300 //
301 // // +++++ stop & set to invalid state?
302 //
303 // setSyncNode(media_node::null);
304 // }
305 // }
306
307 // stop the node if necessary
308 status_t err = node->_stop();
309 if(err < B_OK) {
310 PRINT((
311 "*** NodeGroup::removeNode('%s'): error from node->_stop():\n"
312 " %s\n",
313 node->name(),
314 strerror(err)));
315 }
316
317 // clear the node's group pointer
318 node->_setGroup(0);
319
320 // release the manager lock; the node is now ungrouped and
321 // unlocked
322 m_manager->unlock();
323
324 // was that the last node? stop/disable the transport if so
325 if(!m_nodes.size()) {
326
327 // +++++ kill sync thread(s)
328
329 _changeState(TRANSPORT_INVALID);
330 }
331
332 // send notification
333 if(!LockLooper()) {
334 ASSERT(!"LockLooper() failed.");
335 }
336 BMessage m(M_NODE_REMOVED);
337 m.AddInt32("groupID", id());
338 m.AddInt32("nodeID", node->id());
339 notify(&m);
340 UnlockLooper();
341
342 // success
343 return B_OK;
344 }
345
removeNode(uint32 index)346 status_t NodeGroup::removeNode(
347 uint32 index) {
348
349 D_METHOD((
350 "NodeGroup::removeNode(by index)\n"));
351
352 // +++++ icky nested lock
353 Autolock _l(this);
354
355 ASSERT(m_nodes.size() > index);
356 return removeNode(m_nodes[index]);
357 }
358
groupFlags() const359 uint32 NodeGroup::groupFlags() const {
360 Autolock _l(this);
361 return m_flags;
362 }
363
setGroupFlags(uint32 flags)364 status_t NodeGroup::setGroupFlags(
365 uint32 flags) {
366 Autolock _l(this);
367 m_flags = flags;
368 return B_OK;
369 }
370
371
372 // returns true if one or more nodes in the group have cycling
373 // enabled, and the start- and end-positions are valid
canCycle() const374 bool NodeGroup::canCycle() const {
375 Autolock _l(this);
376
377 return
378 m_cycleNodes.size() > 0 &&
379 m_endPosition - m_startPosition > s_minCyclePeriod;
380 }
381
382 // -------------------------------------------------------- //
383 // *** TRANSPORT POSITIONING (LOCK REQUIRED)
384 // -------------------------------------------------------- //
385
386 // Fetch the current transport state
387
transportState() const388 NodeGroup::transport_state_t NodeGroup::transportState() const {
389 Autolock _l(this);
390 return m_transportState;
391 }
392
393 // Set the starting media time:
394 // This is the point at which playback will begin in any media
395 // files/documents being played by the nodes in this group.
396 // When cycle mode is enabled, this is the point to which each
397 // node will be seek'd at the end of each cycle (loop).
398 //
399 // The starting time can't be changed in the B_OFFLINE run mode
400 // (this call will return an error.)
401
setStartPosition(bigtime_t start)402 status_t NodeGroup::setStartPosition(
403 bigtime_t start) {
404 Autolock _l(this);
405
406 D_METHOD((
407 "NodeGroup::setStartPosition(%lld)\n", start));
408
409 if(
410 m_transportState == TRANSPORT_RUNNING ||
411 m_transportState == TRANSPORT_ROLLING ||
412 m_transportState == TRANSPORT_STARTING) {
413
414 if(m_runMode == BMediaNode::B_OFFLINE)
415 return B_NOT_ALLOWED;
416
417 ASSERT(m_timeSourceObj);
418
419 if(_cycleValid()) {
420 if(m_timeSourceObj->Now() >= m_cycleDeadline) {
421 // too late to change start position; defer
422 // PRINT((" - deferred\n"));
423 m_newStartPosition = start;
424 m_newStart = true;
425 return B_OK;
426 }
427
428 // not at deadline yet; fall through to set start position
429 }
430 }
431
432 m_startPosition = start;
433
434 // +++++ notify [e.moon 11oct99]
435
436 return B_OK;
437 }
438
439 // Fetch the starting position:
440
441 // +++++ if a previously-set start position was deferred, it won't be
442 // returned yet
443
startPosition() const444 bigtime_t NodeGroup::startPosition() const {
445 Autolock _l(this);
446
447 return m_startPosition;
448 }
449
450 // Set the ending media time:
451 // This is the point at which playback will end relative to
452 // media documents begin played by the nodes in this group;
453 // in cycle mode, this specifies the loop point. If the
454 // ending time is less than or equal to the starting time,
455 // the transport will continue until stopped manually.
456 // If the end position is changed while the transport is playing,
457 // it must take effect retroactively (if it's before the current
458 // position and looping is enabled, all nodes must 'warp' to
459 // the proper post-loop position.)
460 //
461 // The ending time can't be changed if run mode is B_OFFLINE and
462 // the transport is running (this call will return an error.)
463
setEndPosition(bigtime_t end)464 status_t NodeGroup::setEndPosition(
465 bigtime_t end) {
466 Autolock _l(this);
467
468 D_METHOD((
469 "NodeGroup::setEndPosition(%lld)\n", end));
470
471 if(
472 m_transportState == TRANSPORT_RUNNING ||
473 m_transportState == TRANSPORT_ROLLING ||
474 m_transportState == TRANSPORT_STARTING) {
475
476 if(m_runMode == BMediaNode::B_OFFLINE)
477 return B_NOT_ALLOWED;
478
479 ASSERT(m_timeSourceObj);
480
481 bigtime_t endDelta = end - m_endPosition;
482
483 if(_cycleValid()) {
484 if(m_timeSourceObj->Now() >= m_cycleDeadline + endDelta) {
485 // too late to change end position; defer
486 // PRINT((" - deferred\n"));
487 m_newEndPosition = end;
488 m_newEnd = true;
489 return B_OK;
490 }
491 else {
492 // set new end position
493 m_endPosition = end;
494
495 // inform thread
496 ASSERT(m_cyclePort);
497 write_port(
498 m_cyclePort,
499 _CYCLE_END_CHANGED,
500 0,
501 0);
502 //
503 // // restart nodes' cycle threads with new end position
504 // _cycleInit(m_cycleStart);
505 // for(node_set::iterator it = m_cycleNodes.begin();
506 // it != m_cycleNodes.end(); ++it) {
507 // (*it)->_scheduleCycle(m_cycleBoundary);
508 // }
509 // return B_OK;
510 }
511 }
512 }
513
514 m_endPosition = end;
515
516 // +++++ notify [e.moon 11oct99]
517
518 return B_OK;
519 }
520
521
522 // Fetch the end position:
523 // Note that if the end position is less than or equal to the start
524 // position, it's ignored.
525
526 // +++++ if a previously-set end position was deferred, it won't be
527 // returned yet
528
endPosition() const529 bigtime_t NodeGroup::endPosition() const {
530 Autolock _l(this);
531 return m_endPosition;
532 }
533
534 // -------------------------------------------------------- //
535 // *** TRANSPORT OPERATIONS (LOCK REQUIRED)
536 // -------------------------------------------------------- //
537
538 // Preroll the group:
539 // Seeks, then prerolls, each node in the group (honoring the
540 // NO_SEEK and NO_PREROLL flags.) This ensures that the group
541 // can start as quickly as possible.
542 //
543 // Does not return until all nodes in the group have been
544 // prepared.
545
preroll()546 status_t NodeGroup::preroll() {
547 D_METHOD((
548 "NodeGroup::preroll()\n"));
549
550 Autolock _l(this);
551 return _preroll();
552 }
553
554 // Start all nodes in the group:
555 // Nodes with the NO_START_STOP flag aren't molested.
556
start()557 status_t NodeGroup::start() {
558 D_METHOD((
559 "NodeGroup::start()\n"));
560
561 Autolock _l(this);
562 return _start();
563 }
564
565 // Stop all nodes in the group:
566 // Nodes with the NO_START_STOP flag aren't molested.
567
stop()568 status_t NodeGroup::stop() {
569 D_METHOD((
570 "NodeGroup::stop()\n"));
571
572 Autolock _l(this);
573 return _stop();
574 }
575
576 // Roll all nodes in the group:
577 // Queues a start and stop atomically (via BMediaRoster::RollNode()).
578 // Returns B_NOT_ALLOWED if endPosition <= startPosition;
579
roll()580 status_t NodeGroup::roll() {
581 D_METHOD((
582 "NodeGroup::roll()\n"));
583
584 Autolock _l(this);
585 return _roll();
586 }
587
588 // -------------------------------------------------------- //
589 // *** TIME SOURCE & RUN-MODE OPERATIONS (LOCK REQUIRED)
590 // -------------------------------------------------------- //
591
592 // time source control:
593 // getTimeSource():
594 // returns B_ERROR if no time source has been set; otherwise,
595 // returns the node ID of the current time source for all
596 // nodes in the group.
597 //
598 // setTimeSource():
599 // Calls SetTimeSourceFor() on every node in the group.
600 // The group must be stopped; B_NOT_ALLOWED will be returned
601 // if the state is TRANSPORT_RUNNING or TRANSPORT_ROLLING.
602
getTimeSource(media_node * outTimeSource) const603 status_t NodeGroup::getTimeSource(
604 media_node* outTimeSource) const {
605 Autolock _l(this);
606
607 if(m_timeSource != media_node::null) {
608 *outTimeSource = m_timeSource;
609 return B_OK;
610 }
611 return B_ERROR;
612 }
613
setTimeSource(const media_node & timeSource)614 status_t NodeGroup::setTimeSource(
615 const media_node& timeSource) {
616
617 Autolock _l(this);
618
619 if(m_transportState == TRANSPORT_RUNNING || m_transportState == TRANSPORT_ROLLING)
620 return B_NOT_ALLOWED;
621
622 if(m_timeSourceObj)
623 m_timeSourceObj->Release();
624
625 m_timeSource = timeSource;
626
627 // cache a BTimeSource*
628 m_timeSourceObj = m_manager->roster->MakeTimeSourceFor(timeSource);
629 ASSERT(m_timeSourceObj);
630
631 // apply new time source to all nodes
632 for_each(
633 m_nodes.begin(),
634 m_nodes.end(),
635 #if __GNUC__ <= 2
636 bind2nd(
637 mem_fun(&NodeRef::_setTimeSource),
638 m_timeSource.node
639 )
640 #else
641 [this](NodeRef* nodeRef) { nodeRef->_setTimeSource(m_timeSource.node); }
642 #endif
643 );
644
645 // // try to set as sync node
646 // err = setSyncNode(timeSource);
647 // if(err < B_OK) {
648 // PRINT((
649 // "* NodeGroup::setTimeSource(): setSyncNode() failed: %s\n",
650 // strerror(err)));
651 // }
652
653 // notify
654 if(!LockLooper()) {
655 ASSERT(!"LockLooper() failed.");
656 }
657 BMessage m(M_TIME_SOURCE_CHANGED);
658 m.AddInt32("groupID", id());
659 m.AddInt32("timeSourceID", timeSource.node);
660 notify(&m);
661 UnlockLooper();
662
663 return B_OK;
664 }
665
666 // run mode access:
667 // Sets the default run mode for the group. This will be
668 // applied to every node with a wildcard (0) run mode.
669 //
670 // Special case: if the run mode is B_OFFLINE, it will be
671 // applied to all nodes in the group.
672
runMode() const673 BMediaNode::run_mode NodeGroup::runMode() const {
674 Autolock _l(this);
675 return m_runMode;
676 }
677
setRunMode(BMediaNode::run_mode mode)678 status_t NodeGroup::setRunMode(BMediaNode::run_mode mode) {
679 Autolock _l(this);
680
681 m_runMode = mode;
682
683 // apply to all nodes
684 for_each(
685 m_nodes.begin(),
686 m_nodes.end(),
687 #if __GNUC__ <= 2
688 bind2nd(
689 mem_fun(&NodeRef::_setRunModeAuto),
690 m_runMode
691 )
692 #else
693 [this](NodeRef* ref) { ref->_setRunModeAuto(this->m_runMode); }
694 #endif
695 // bound_method(
696 // *this,
697 // &NodeGroup::setRunModeFor)
698 );
699
700
701 return B_OK;
702 }
703
704 // -------------------------------------------------------- //
705 // *** BHandler
706 // -------------------------------------------------------- //
707
MessageReceived(BMessage * message)708 void NodeGroup::MessageReceived(
709 BMessage* message) {
710
711 // PRINT((
712 // "NodeGroup::MessageReceived():\n"));
713 // message->PrintToStream();
714 status_t err;
715
716 switch(message->what) {
717 case M_SET_TIME_SOURCE:
718 {
719 media_node timeSource;
720 void* data;
721 ssize_t dataSize;
722 err = message->FindData(
723 "timeSourceNode",
724 B_RAW_TYPE,
725 (const void**)&data,
726 &dataSize);
727 if(err < B_OK) {
728 PRINT((
729 "* NodeGroup::MessageReceived(M_SET_TIME_SOURCE):\n"
730 " no timeSourceNode!\n"));
731 break;
732 }
733 timeSource = *(media_node*)data;
734
735 setTimeSource(timeSource);
736 }
737 break;
738
739 case M_SET_RUN_MODE:
740 {
741 uint32 runMode;
742 err = message->FindInt32("runMode", (int32*)&runMode);
743 if(err < B_OK) {
744 PRINT((
745 "* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n"
746 " no runMode!\n"));
747 break;
748 }
749
750 if(runMode < BMediaNode::B_OFFLINE ||
751 runMode > BMediaNode::B_RECORDING) {
752 PRINT((
753 "* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n"
754 " invalid run mode (%" B_PRIu32 ")\n", runMode));
755 break;
756 }
757
758 setRunMode((BMediaNode::run_mode)runMode);
759 }
760 break;
761
762 case M_SET_START_POSITION:
763 {
764 bigtime_t position;
765 err = message->FindInt64("position", (int64*)&position);
766 if(err < B_OK) {
767 PRINT((
768 "* NodeGroup::MessageReceived(M_SET_START_POSITION):\n"
769 " no position!\n"));
770 break;
771 }
772 setStartPosition(position);
773 }
774 break;
775
776 case M_SET_END_POSITION:
777 {
778 bigtime_t position;
779 err = message->FindInt64("position", (int64*)&position);
780 if(err < B_OK) {
781 PRINT((
782 "* NodeGroup::MessageReceived(M_SET_END_POSITION):\n"
783 " no position!\n"));
784 break;
785 }
786 setEndPosition(position);
787 }
788 break;
789
790 case M_PREROLL:
791 preroll();
792 break;
793
794 case M_START:
795 start();
796 break;
797
798 case M_STOP:
799 stop();
800 break;
801
802 case M_ROLL:
803 roll();
804 break;
805
806 default:
807 _inherited::MessageReceived(message);
808 break;
809 }
810 }
811
812
813 // -------------------------------------------------------- //
814 // *** IPersistent
815 // -------------------------------------------------------- //
816
817 // !
818 #if CORTEX_XML
819 // !
820
821 // +++++
822
823 // Default constructor
NodeGroup()824 NodeGroup::NodeGroup() :
825 m_manager(0) {} // +++++ finish initialization
826
827
828 // !
829 #endif /*CORTEX_XML*/
830 // !
831
832 // -------------------------------------------------------- //
833 // *** IObservable: [19aug99]
834 // -------------------------------------------------------- //
835
observerAdded(const BMessenger & observer)836 void NodeGroup::observerAdded(
837 const BMessenger& observer) {
838
839 BMessage m(M_OBSERVER_ADDED);
840 m.AddInt32("groupID", id());
841 m.AddMessenger("target", BMessenger(this));
842 observer.SendMessage(&m);
843 }
844
observerRemoved(const BMessenger & observer)845 void NodeGroup::observerRemoved(
846 const BMessenger& observer) {
847
848 BMessage m(M_OBSERVER_REMOVED);
849 m.AddInt32("groupID", id());
850 m.AddMessenger("target", BMessenger(this));
851 observer.SendMessage(&m);
852 }
853
notifyRelease()854 void NodeGroup::notifyRelease() {
855
856 BMessage m(M_RELEASED);
857 m.AddInt32("groupID", id());
858 m.AddMessenger("target", BMessenger(this));
859 notify(&m);
860 }
861
releaseComplete()862 void NodeGroup::releaseComplete() {
863 // +++++
864 }
865
866 // -------------------------------------------------------- //
867 // *** ILockable: pass lock requests to m_lock
868 // -------------------------------------------------------- //
869
lock(lock_t type,bigtime_t timeout)870 bool NodeGroup::lock(
871 lock_t type,
872 bigtime_t timeout) {
873
874 D_LOCK(("*** NodeGroup::lock(): %ld\n", find_thread(0)));
875
876 ASSERT(type == WRITE);
877 status_t err = m_lock.LockWithTimeout(timeout);
878
879 D_LOCK(("*** NodeGroup::lock() ACQUIRED: %ld\n", find_thread(0)));
880
881 return err == B_OK;
882 }
883
unlock(lock_t type)884 bool NodeGroup::unlock(
885 lock_t type) {
886
887 D_LOCK(("*** NodeGroup::unlock(): %ld\n", find_thread(0)));
888
889 ASSERT(type == WRITE);
890 m_lock.Unlock();
891
892 D_LOCK(("*** NodeGroup::unlock() RELEASED: %ld\n", find_thread(0)));
893
894 return true;
895 }
896
isLocked(lock_t type) const897 bool NodeGroup::isLocked(
898 lock_t type) const {
899
900 ASSERT(type == WRITE);
901 return m_lock.IsLocked();
902 }
903
904 // -------------------------------------------------------- //
905 // *** ctor (accessible to NodeManager)
906 // -------------------------------------------------------- //
907
NodeGroup(const char * name,NodeManager * manager,BMediaNode::run_mode runMode)908 NodeGroup::NodeGroup(
909 const char* name,
910 NodeManager* manager,
911 BMediaNode::run_mode runMode) :
912
913 ObservableHandler(name),
914 m_lock("NodeGroup::m_lock"),
915 m_manager(manager),
916 m_id(NextID()),
917 m_name(name),
918 m_flags(0),
919 m_transportState(TRANSPORT_INVALID),
920 m_runMode(runMode),
921 m_timeSourceObj(0),
922 m_released(false),
923 m_cycleThread(0),
924 m_cyclePort(0),
925 m_startPosition(0LL),
926 m_endPosition(0LL),
927 m_newStart(false),
928 m_newEnd(false) {
929
930 ASSERT(m_manager);
931
932 if(!m_manager->Lock()) {
933 ASSERT(!"m_manager->Lock() failed");
934 }
935 m_manager->AddHandler(this);
936 m_manager->Unlock();
937
938 // set default time source
939 media_node ts;
940 D_ROSTER(("# roster->GetTimeSource()\n"));
941 status_t err = m_manager->roster->GetTimeSource(&ts);
942 if(err < B_OK) {
943 PRINT((
944 "*** NodeGroup(): roster->GetTimeSource() failed:\n"
945 " %s\n", strerror(err)));
946 }
947 setTimeSource(ts);
948 }
949
950 // -------------------------------------------------------- //
951 // *** internal operations
952 // -------------------------------------------------------- //
953
954 uint32 NodeGroup::s_nextID = 1;
NextID()955 uint32 NodeGroup::NextID() {
956 return atomic_add((int32*)&s_nextID, 1);
957 }
958
959 // -------------------------------------------------------- //
960 // *** ref->group communication (LOCK REQUIRED)
961 // -------------------------------------------------------- //
962
963 // when a NodeRef's cycle state (ie. looping or not looping)
964 // changes, it must pass that information on via this method
965
_refCycleChanged(NodeRef * ref)966 void NodeGroup::_refCycleChanged(
967 NodeRef* ref) {
968 assert_locked(this);
969 D_METHOD((
970 "NodeGroup::_refCycleChanged('%s')\n",
971 ref->name()));
972
973 if(ref->m_cycle) {
974 _cycleAddRef(ref);
975 } else {
976 _cycleRemoveRef(ref);
977 }
978
979 // +++++ if running & cycle valid, the node should be properly
980 // seek'd and start'd
981 }
982
983
984 // when a cycling node's latency changes, call this method.
985
_refLatencyChanged(NodeRef * ref)986 void NodeGroup::_refLatencyChanged(
987 NodeRef* ref) {
988 assert_locked(this);
989 D_METHOD((
990 "NodeGroup::_refLatencyChanged('%s')\n",
991 ref->name()));
992
993 if(!_cycleValid())
994 return;
995
996 // remove & replace ref (positions it properly)
997 _cycleRemoveRef(ref);
998 _cycleAddRef(ref);
999
1000 // slap my thread up
1001 ASSERT(m_cyclePort);
1002 write_port(
1003 m_cyclePort,
1004 _CYCLE_LATENCY_CHANGED,
1005 0,
1006 0);
1007
1008 // +++++ zat it?
1009 }
1010
1011 // when a NodeRef receives notification that it has been stopped,
1012 // but is labeled as still running, it must call this method.
1013 // [e.moon 11oct99: roll/B_OFFLINE support]
1014
_refStopped(NodeRef * ref)1015 void NodeGroup::_refStopped(
1016 NodeRef* ref) {
1017 assert_locked(this);
1018 D_METHOD((
1019 "NodeGroup::_refStopped('%s')\n",
1020 ref->name()));
1021
1022 // roll/B_OFFLINE support [e.moon 11oct99]
1023 // (check to see if any other nodes in the group are still running;
1024 // mark group stopped if not.)
1025 if(m_transportState == TRANSPORT_ROLLING) {
1026 bool nodesRunning = false;
1027 for(node_set::iterator it = m_nodes.begin();
1028 it != m_nodes.end(); ++it) {
1029 if((*it)->isRunning()) {
1030 nodesRunning = true;
1031 break;
1032 }
1033 }
1034 if(!nodesRunning)
1035 // the group has stopped; update transport state
1036 _changeState(TRANSPORT_STOPPED);
1037
1038 }
1039
1040 }
1041
1042
1043 // -------------------------------------------------------- //
1044 // *** transport helpers (LOCK REQUIRED)
1045 // -------------------------------------------------------- //
1046
1047
1048 // Preroll all nodes in the group; this is the implementation
1049 // of preroll().
1050 // *** this method should not be called from the transport thread
1051 // (since preroll operations can block for a relatively long time.)
1052
_preroll()1053 status_t NodeGroup::_preroll() {
1054 assert_locked(this);
1055
1056 D_METHOD((
1057 "NodeGroup::_preroll()\n"));
1058
1059 if(
1060 m_transportState == TRANSPORT_RUNNING ||
1061 m_transportState == TRANSPORT_ROLLING)
1062 // too late
1063 return B_NOT_ALLOWED;
1064
1065 // * preroll all nodes to the start position
1066
1067 // +++++ currently, if an error is encountered it's ignored.
1068 // should the whole operation fail if one node couldn't
1069 // be prerolled?
1070 //
1071 // My gut response is 'no', since the preroll step is
1072 // optional, but the caller should have some inkling that
1073 // one of its nodes didn't behave.
1074
1075 // [e.moon 13oct99] making PPC compiler happy
1076 // for_each(
1077 // m_nodes.begin(),
1078 // m_nodes.end(),
1079 // bind2nd(
1080 // mem_fun(&NodeRef::_preroll),
1081 // m_startPosition
1082 // )
1083 // );
1084 for(node_set::iterator it = m_nodes.begin();
1085 it != m_nodes.end(); ++it) {
1086 (*it)->_preroll(m_startPosition);
1087 }
1088
1089 // replaces
1090 // bind2nd(
1091 // bound_method(*this, &NodeGroup::prerollNode),
1092 // m_startPosition
1093 // )
1094
1095 return B_OK;
1096 }
1097
1098
1099 //// functor: calculates latency of each node it's handed, caching
1100 //// the largest one found; includes initial latency if nodes report it.
1101 //
1102 //class NodeGroup::calcLatencyFn { public:
1103 // bigtime_t& maxLatency;
1104 //
1105 // calcLatencyFn(bigtime_t& _m) : maxLatency(_m) {}
1106 //
1107 // void operator()(NodeRef* r) {
1108 // ASSERT(r);
1109 //
1110 //// PRINT((
1111 //// "# calcLatencyFn(): '%s'\n",
1112 //// r->name()));
1113 //
1114 // if(!(r->node().kind & B_BUFFER_PRODUCER)) {
1115 // // node can't incur latency
1116 //// PRINT((
1117 //// "- not a producer\n"));
1118 // return;
1119 // }
1120 //
1121 // bigtime_t latency;
1122 // status_t err =
1123 // BMediaRoster::Roster()->GetLatencyFor(
1124 // r->node(),
1125 // &latency);
1126 // if(err < B_OK) {
1127 // PRINT((
1128 // "* calcLatencyFn: GetLatencyFor() failed: %s\n",
1129 // strerror(err)));
1130 // return;
1131 // }
1132 //// PRINT(("- %lld\n", latency));
1133 //
1134 // bigtime_t add;
1135 // err = BMediaRoster::Roster()->GetInitialLatencyFor(
1136 // r->node(),
1137 // &add);
1138 //// PRINT(("- %lld\n", add));
1139 // if(err < B_OK) {
1140 // PRINT((
1141 // "* calcLatencyFn: GetInitialLatencyFor() failed: %s\n",
1142 // strerror(err)));
1143 // }
1144 // else
1145 // latency += add;
1146 //
1147 // if(latency > maxLatency)
1148 // maxLatency = latency;
1149 //
1150 //// PRINT((
1151 //// "- max latency: %lld\n",
1152 //// maxLatency));
1153 // }
1154 //};
1155
1156 // Start all nodes in the group; this is the implementation of
1157 // start(). Fails if the run mode is B_OFFLINE; use _roll() instead
1158 // in that case.
1159 //
1160 // (this may be called from the transport thread or from
1161 // an API-implementation method.)
1162
_start()1163 status_t NodeGroup::_start() {
1164 assert_locked(this);
1165
1166 D_METHOD((
1167 "NodeGroup::_start()\n"));
1168 status_t err;
1169
1170 if(m_transportState != TRANSPORT_STOPPED)
1171 return B_NOT_ALLOWED;
1172
1173 if(m_runMode == BMediaNode::B_OFFLINE)
1174 return B_NOT_ALLOWED;
1175
1176 ASSERT(m_nodes.size());
1177
1178 _changeState(TRANSPORT_STARTING);
1179
1180 // * Find the highest latency in the group
1181
1182 bigtime_t offset = 0LL;
1183 calcLatencyFn _f(offset);
1184 for_each(
1185 m_nodes.begin(),
1186 m_nodes.end(),
1187 _f);
1188
1189 offset += s_rosterLatency;
1190 PRINT((
1191 "- offset: %" B_PRIdBIGTIME "\n", offset));
1192
1193 // * Seek all nodes (in case one or more failed to preroll)
1194
1195 for(node_set::iterator it = m_nodes.begin();
1196 it != m_nodes.end(); ++it) {
1197 err = (*it)->_seekStopped(m_startPosition);
1198 if(err < B_OK) {
1199 PRINT((
1200 "! NodeGroup('%s')::_start():\n"
1201 " ref('%s')->_seekStopped(%" B_PRIdBIGTIME ") failed:\n"
1202 " %s\n",
1203 name(), (*it)->name(), m_startPosition,
1204 strerror(err)));
1205
1206 // +++++ continue?
1207 }
1208 }
1209
1210 // * Start all nodes, allowing for the max latency found
1211
1212 ASSERT(m_timeSourceObj);
1213 bigtime_t when = m_timeSourceObj->Now() + offset;
1214
1215 // 10aug99: initialize cycle (loop) settings
1216 if(_cycleValid()) {
1217 _initCycleThread();
1218 _cycleInit(when);
1219 }
1220
1221 // start the nodes
1222 for(node_set::iterator it = m_nodes.begin();
1223 it != m_nodes.end(); ++it) {
1224 err = (*it)->_start(when);
1225 if(err < B_OK) {
1226 PRINT((
1227 "! NodeGroup('%s')::_start():\n"
1228 " ref('%s')->_start(%" B_PRIdBIGTIME ") failed:\n"
1229 " %s\n",
1230 name(), (*it)->name(), when,
1231 strerror(err)));
1232
1233 // +++++ continue?
1234 }
1235 }
1236
1237 // notify observers
1238 _changeState(TRANSPORT_RUNNING);
1239 return B_OK;
1240 }
1241
1242 // Stop all nodes in the group; this is the implementation of
1243 // stop().
1244 //
1245 // (this may be called from the transport thread or from
1246 // an API-implementation method.)
1247
_stop()1248 status_t NodeGroup::_stop() {
1249
1250 D_METHOD((
1251 "NodeGroup::_stop()\n"));
1252
1253 assert_locked(this);
1254
1255 if(
1256 m_transportState != TRANSPORT_RUNNING &&
1257 m_transportState != TRANSPORT_ROLLING)
1258 return B_NOT_ALLOWED;
1259
1260 _changeState(TRANSPORT_STOPPING);
1261
1262 // * stop the cycle thread if need be
1263 _destroyCycleThread();
1264
1265 // * stop all nodes
1266 // +++++ error reports would be nice
1267
1268 for_each(
1269 m_nodes.begin(),
1270 m_nodes.end(),
1271 #if __GNUC__ <= 2
1272 mem_fun(&NodeRef::_stop)
1273 #else
1274 [](NodeRef* ref) { ref->_stop(); }
1275 #endif
1276 );
1277
1278 // update transport state
1279 _changeState(TRANSPORT_STOPPED);
1280
1281 return B_OK;
1282 }
1283
1284 // Roll all nodes in the group; this is the implementation of
1285 // roll().
1286 //
1287 // (this may be called from the transport thread or from
1288 // an API-implementation method.)
1289
_roll()1290 status_t NodeGroup::_roll() {
1291
1292 D_METHOD((
1293 "NodeGroup::_roll()\n"));
1294 assert_locked(this);
1295 status_t err;
1296
1297 if(m_transportState != TRANSPORT_STOPPED)
1298 return B_NOT_ALLOWED;
1299
1300 bigtime_t period = m_endPosition - m_startPosition;
1301 if(period <= 0LL)
1302 return B_NOT_ALLOWED;
1303
1304 _changeState(TRANSPORT_STARTING);
1305
1306 bigtime_t tpStart = 0LL;
1307 bigtime_t tpStop = period;
1308
1309 if(m_runMode != BMediaNode::B_OFFLINE) {
1310
1311 // * Find the highest latency in the group
1312 bigtime_t offset = 0LL;
1313 calcLatencyFn _f(offset);
1314 for_each(
1315 m_nodes.begin(),
1316 m_nodes.end(),
1317 _f);
1318
1319 offset += s_rosterLatency;
1320 PRINT((
1321 "- offset: %" B_PRIdBIGTIME "\n", offset));
1322
1323 ASSERT(m_timeSourceObj);
1324 tpStart = m_timeSourceObj->Now() + offset;
1325 tpStop += tpStart;
1326 }
1327
1328 // * Roll all nodes; watch for errors
1329 bool allFailed = true;
1330 err = B_OK;
1331 for(
1332 node_set::iterator it = m_nodes.begin();
1333 it != m_nodes.end(); ++it) {
1334
1335 status_t e = (*it)->_roll(
1336 tpStart,
1337 tpStop,
1338 m_startPosition);
1339 if(e < B_OK)
1340 err = e;
1341 else
1342 allFailed = false;
1343 }
1344
1345 if(!allFailed)
1346 // notify observers
1347 _changeState(TRANSPORT_ROLLING);
1348
1349 return err;
1350 }
1351
1352
1353 // State transition; notify listeners
1354 // +++++ [18aug99] DANGER: should notification happen in the middle
1355 // of such an operation?
_changeState(transport_state_t to)1356 inline void NodeGroup::_changeState(
1357 transport_state_t to) {
1358
1359 assert_locked(this);
1360
1361 m_transportState = to;
1362
1363 if(!LockLooper()) {
1364 ASSERT(!"LockLooper() failed.");
1365 }
1366 BMessage m(M_TRANSPORT_STATE_CHANGED);
1367 m.AddInt32("groupID", id());
1368 m.AddInt32("transportState", m_transportState);
1369 notify(&m);
1370 UnlockLooper();
1371 }
1372
1373 // Enforce a state transition, and notify listeners
_changeState(transport_state_t from,transport_state_t to)1374 inline void NodeGroup::_changeState(
1375 transport_state_t from,
1376 transport_state_t to) {
1377
1378 assert_locked(this);
1379 ASSERT(m_transportState == from);
1380
1381 _changeState(to);
1382 }
1383
1384
1385 // -------------------------------------------------------- //
1386 // *** cycle thread & helpers (LOCK REQUIRED)
1387 // -------------------------------------------------------- //
1388
1389 // *** cycle port definitions
1390
1391 const int32 _portLength = 32;
1392 const char* const _portName = "NodeGroup::m_cyclePort";
1393 const size_t _portMsgMaxSize = 256;
1394
1395
1396 // set up the cycle thread (including its kernel port)
_initCycleThread()1397 status_t NodeGroup::_initCycleThread() {
1398 assert_locked(this);
1399 status_t err;
1400 D_METHOD((
1401 "NodeGroup::_initCycleThread()\n"));
1402
1403 if(m_cycleThread) {
1404 // thread is still alive
1405 err = _destroyCycleThread();
1406 if(err < B_OK)
1407 return err;
1408 }
1409
1410 // create
1411 m_cycleThreadDone = false;
1412 m_cycleThread = spawn_thread(
1413 &_CycleThread,
1414 "NodeGroup[cycleThread]",
1415 B_NORMAL_PRIORITY,
1416 (void*)this);
1417 if(m_cycleThread < B_OK) {
1418 PRINT((
1419 "* NodeGroup::_initCycleThread(): spawn_thread() failed:\n"
1420 " %s\n",
1421 strerror(m_cycleThread)));
1422 return m_cycleThread;
1423 }
1424
1425 // launch
1426 return resume_thread(m_cycleThread);
1427 }
1428
1429 // shut down the cycle thread/port
_destroyCycleThread()1430 status_t NodeGroup::_destroyCycleThread() {
1431 assert_locked(this);
1432 status_t err;
1433 D_METHOD((
1434 "NodeGroup::_destroyCycleThread()\n"));
1435
1436 if(!m_cycleThread)
1437 return B_OK;
1438
1439 if(!m_cycleThreadDone) {
1440 // kill the thread
1441 ASSERT(m_cyclePort);
1442 err = write_port_etc(
1443 m_cyclePort,
1444 _CYCLE_STOP,
1445 0,
1446 0,
1447 B_TIMEOUT,
1448 10000LL);
1449
1450 if(err < B_OK) {
1451 // bad thread. die, thread, die.
1452 PRINT((
1453 "* NodeGroup::_destroyCycleThread(): port write failed; killing.\n"));
1454 delete_port(m_cyclePort);
1455 m_cyclePort = 0;
1456 kill_thread(m_cycleThread);
1457 m_cycleThread = 0;
1458 return B_OK;
1459 }
1460
1461 // the thread got the message; wait for it to quit
1462 unlock();
1463 while(wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED) {
1464 PRINT((
1465 "! wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED\n"));
1466 }
1467 lock();
1468 }
1469
1470 // it's up to the thread to close its port
1471 ASSERT(!m_cyclePort);
1472
1473 m_cycleThread = 0;
1474
1475 return B_OK;
1476 }
1477
1478
1479 // 1) do the current positions specify a valid cycle region?
1480 // 2) are any nodes in the group cycle-enabled?
1481
_cycleValid()1482 bool NodeGroup::_cycleValid() {
1483 assert_locked(this);
1484 return
1485 (m_transportState == TRANSPORT_RUNNING ||
1486 m_transportState == TRANSPORT_STARTING) &&
1487 canCycle();
1488 }
1489
1490 // initialize the cycle members (call when starting)
1491
_cycleInit(bigtime_t startTime)1492 void NodeGroup::_cycleInit(
1493 bigtime_t startTime) {
1494 assert_locked(this);
1495 ASSERT(m_cycleNodes.size() > 0);
1496 D_METHOD((
1497 "NodeGroup::_cycleInit(%lld)\n",
1498 startTime));
1499
1500 // +++++ rescan latencies?
1501
1502 // figure new boundary & deadline from region length
1503 bigtime_t cyclePeriod = m_endPosition - m_startPosition;
1504
1505 if(cyclePeriod <= 0) {
1506 // cycle region is no longer valid
1507 m_cycleBoundary = 0LL;
1508 m_cycleDeadline = 0LL;
1509
1510 // no no no -- deadlocks when the thread calls this method
1511 // // stop the thread
1512 // _destroyCycleThread();
1513 return;
1514 }
1515
1516 m_cycleStart = startTime;
1517 m_cycleBoundary = startTime + cyclePeriod;
1518 m_cycleDeadline = m_cycleBoundary - (m_cycleMaxLatency + s_rosterLatency);
1519 }
1520
1521
1522 // add a ref to the cycle set (in proper order, based on latency)
_cycleAddRef(NodeRef * ref)1523 void NodeGroup::_cycleAddRef(
1524 NodeRef* ref) {
1525 assert_locked(this);
1526
1527 // make sure it's not already there
1528 ASSERT(find(
1529 m_cycleNodes.begin(),
1530 m_cycleNodes.end(),
1531 ref) == m_cycleNodes.end());
1532
1533 // [re]calc latency if 0
1534 if(!ref->m_latency)
1535 ref->_updateLatency();
1536
1537 node_set::iterator it;
1538 for(it = m_cycleNodes.begin();
1539 it != m_cycleNodes.end(); ++it) {
1540 if(ref->m_latency > (*it)->m_latency) {
1541 m_cycleNodes.insert(it, ref);
1542 break;
1543 }
1544 }
1545
1546 // not inserted? new ref belongs at the end
1547 if(it == m_cycleNodes.end())
1548 m_cycleNodes.insert(it, ref);
1549 }
1550
1551 // remove a ref from the cycle set
_cycleRemoveRef(NodeRef * ref)1552 void NodeGroup::_cycleRemoveRef(
1553 NodeRef* ref) {
1554 assert_locked(this);
1555
1556 node_set::iterator it = find(
1557 m_cycleNodes.begin(),
1558 m_cycleNodes.end(),
1559 ref);
1560 ASSERT(it != m_cycleNodes.end());
1561 m_cycleNodes.erase(it);
1562 }
1563
_cycleBoundary() const1564 bigtime_t NodeGroup::_cycleBoundary() const {
1565 Autolock _l(this);
1566 return m_cycleBoundary;
1567 }
1568
1569 // cycle thread impl.
1570 /*static*/
_CycleThread(void * user)1571 status_t NodeGroup::_CycleThread(void* user) {
1572 ((NodeGroup*)user)->_cycleThread();
1573 return B_OK;
1574 }
1575
_cycleThread()1576 void NodeGroup::_cycleThread() {
1577
1578 status_t err;
1579 int32 code;
1580 int32 errorCount = 0;
1581
1582 // +++++ liability -- if the thread has to be killed, this buffer
1583 // won't be reclaimed
1584 char* msgBuffer = new char[_portMsgMaxSize];
1585 array_delete<char> _d(msgBuffer);
1586
1587 // create port
1588 ASSERT(!m_cyclePort);
1589 m_cyclePort = create_port(
1590 _portLength,
1591 _portName);
1592 ASSERT(m_cyclePort >= B_OK);
1593
1594 // the message-handling loop
1595 bool done = false;
1596 while(!done) {
1597
1598 // *** wait until it's time to queue the next cycle, or until
1599 // *** a message arrives
1600
1601 lock(); // **** BEGIN LOCKED SECTION ****
1602 if(!_cycleValid()) {
1603 unlock();
1604 break;
1605 }
1606
1607 ASSERT(m_cycleNodes.size() > 0);
1608 ASSERT(m_timeSourceObj);
1609
1610 bigtime_t maxLatency = m_cycleNodes.front()->m_latency;
1611 bigtime_t wakeUpAt = m_timeSourceObj->RealTimeFor(
1612 m_cycleBoundary, maxLatency + s_rosterLatency);
1613 bigtime_t timeout = wakeUpAt - m_timeSourceObj->RealTime();
1614
1615 if(timeout <= 0) {
1616 // +++++ whoops, I'm late.
1617 // +++++ adjust to compensate !!!
1618 PRINT((
1619 "*** NodeGroup::_cycleThread(): LATE\n"
1620 " by %" B_PRIdBIGTIME "\n", -timeout));
1621 }
1622
1623 // +++++ if timeout is very short, spin until the target time arrives
1624
1625 unlock(); // **** END LOCKED SECTION ****
1626
1627 // block until message arrives or it's time to wake up
1628 err = read_port_etc(
1629 m_cyclePort,
1630 &code,
1631 msgBuffer,
1632 _portMsgMaxSize,
1633 B_TIMEOUT,
1634 timeout);
1635
1636 if(err == B_TIMED_OUT) {
1637 // the time has come to seek my nodes
1638 _handleCycleService();
1639 continue;
1640 }
1641 else if(err < B_OK) {
1642 // any other error is bad news
1643 PRINT((
1644 "* NodeGroup::_cycleThread(): read_port error:\n"
1645 " %s\n"
1646 " ABORTING\n\n", strerror(err)));
1647 if(++errorCount > 10) {
1648 PRINT((
1649 "*** Too many errors; aborting.\n"));
1650 break;
1651 }
1652 continue;
1653 }
1654
1655 errorCount = 0;
1656
1657 // process the message
1658 switch(code) {
1659 case _CYCLE_STOP:
1660 // bail
1661 done = true;
1662 break;
1663
1664 case _CYCLE_END_CHANGED:
1665 case _CYCLE_LATENCY_CHANGED:
1666 // fall through to next loop; for now, these messages
1667 // serve only to slap me out of my stupor and reassess
1668 // the timing situation...
1669 break;
1670
1671 default:
1672 PRINT((
1673 "* NodeGroup::_cycleThread(): unknown message code '%"
1674 B_PRId32 "'\n", code));
1675 break;
1676 }
1677 } // while(!done)
1678
1679
1680 // delete port
1681 delete_port(m_cyclePort);
1682 m_cyclePort = 0;
1683
1684 // done
1685 m_cycleThreadDone = true;
1686 }
1687
1688 // cycle service: seek all nodes & initiate next cycle
_handleCycleService()1689 void NodeGroup::_handleCycleService() {
1690 Autolock _l(this);
1691 // D_METHOD((
1692 // "NodeGroup::_handleCycleService()\n"));
1693 status_t err;
1694
1695 if(!_cycleValid()) {
1696 // PRINT((
1697 // "- _handleCycleService(): cycle not valid; quitting.\n"));
1698 return;
1699 }
1700
1701 // seek
1702 for(node_set::iterator it = m_cycleNodes.begin();
1703 it != m_cycleNodes.end(); ++it) {
1704 err = (*it)->_seek(
1705 m_startPosition,
1706 m_cycleBoundary);
1707 if(err < B_OK) {
1708 PRINT((
1709 "- _handleCycleService(): node('%s')::_seek() failed:\n"
1710 " %s\n",
1711 (*it)->name(), strerror(err)));
1712 }
1713 }
1714
1715 // update cycle settings
1716 if(m_newStart) {
1717 m_newStart = false;
1718 m_startPosition = m_newStartPosition;
1719 }
1720 if(m_newEnd) {
1721 m_newEnd = false;
1722 m_endPosition = m_newEndPosition;
1723 }
1724
1725 // prepare next cycle
1726 _cycleInit(m_cycleBoundary);
1727 }
1728
1729 // END -- NodeGroup.cpp --
1730