1 /* 2 * Copyright (c) 1999-2000, Eric Moon. 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions, and the following disclaimer. 11 * 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions, and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * 3. The name of the author may not be used to endorse or promote products 17 * derived from this software without specific prior written permission. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR 20 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 21 * OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR 22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY 23 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 24 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 25 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 26 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR 27 * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 32 // NodeGroup.cpp 33 34 #include "NodeGroup.h" 35 //#include "NodeGroup_transport_thread.h" 36 37 #include "NodeManager.h" 38 #include "NodeRef.h" 39 40 #include <MediaRoster.h> 41 #include <OS.h> 42 #include <TimeSource.h> 43 44 #include <algorithm> 45 #include <functional> 46 47 #include "array_delete.h" 48 #include "BasicThread.h" 49 #include "node_manager_impl.h" 50 #include "functional_tools.h" 51 52 using namespace std; 53 54 __USE_CORTEX_NAMESPACE 55 #define D_METHOD(x) //PRINT (x) 56 #define D_ROSTER(x) //PRINT (x) 57 #define D_LOCK(x) //PRINT (x) 58 59 60 61 // -------------------------------------------------------- // 62 // *** ctor/dtor 63 // -------------------------------------------------------- // 64 65 // free the group, including all nodes within it 66 // (this call will result in the eventual deletion of the object.) 67 // returns B_OK on success; B_NOT_ALLOWED if release() has 68 // already been called; other error codes if the Media Roster 69 // call fails. 70 // * THE MANAGER MUST BE LOCKED 71 72 status_t NodeGroup::release() { 73 74 D_METHOD(( 75 "NodeGroup::release()\n")); 76 77 if(isReleased()) 78 return B_NOT_ALLOWED; 79 80 // clean up 81 lock(); 82 83 // halt all nodes 84 _stop(); 85 86 // remove & release all nodes 87 // +++++ causes triply-nested lock: eww! 88 while(m_nodes.size()) { 89 NodeRef* last = m_nodes.back(); 90 removeNode(m_nodes.size()-1); 91 last->release(); 92 } 93 94 unlock(); 95 96 // [e.moon 7nov99] 97 // removing the released group is now NodeManager's responsibility 98 // 99 // remove from NodeManager 100 if(!m_manager->lock()) { 101 ASSERT(!"* m_manager->lock() failed.\n"); 102 } 103 m_manager->_removeGroup(this); 104 m_manager->unlock(); 105 106 // hand off to IObservable 107 return _inherited::release(); 108 } 109 110 // call release() rather than deleting NodeGroup objects 111 NodeGroup::~NodeGroup() { 112 113 Autolock _l(this); 114 D_METHOD(( 115 "~NodeGroup()\n")); 116 117 ASSERT(!m_nodes.size()); 118 119 if(m_timeSourceObj) { 120 m_timeSourceObj->Release(); 121 m_timeSourceObj = 0; 122 } 123 } 124 125 126 // -------------------------------------------------------- // 127 // *** accessors 128 // -------------------------------------------------------- // 129 130 // [e.moon 13oct99] moved to header 131 //inline uint32 NodeGroup::id() const { return m_id; } 132 133 // -------------------------------------------------------- // 134 // *** operations 135 // -------------------------------------------------------- // 136 137 // name access 138 const char* NodeGroup::name() const { 139 Autolock _l(this); 140 return m_name.String(); 141 } 142 143 status_t NodeGroup::setName(const char* name) { 144 Autolock _l(this); 145 m_name = name; 146 return B_OK; 147 } 148 149 // content access 150 uint32 NodeGroup::countNodes() const { 151 Autolock _l(this); 152 return m_nodes.size(); 153 } 154 155 NodeRef* NodeGroup::nodeAt( 156 uint32 index) const { 157 Autolock _l(this); 158 return (index < m_nodes.size()) ? 159 m_nodes[index] : 160 0; 161 } 162 163 // add/remove nodes: 164 // - you may only add a node with no current group. 165 // - nodes added during playback will be started; 166 // nodes removed during playback will be stopped (unless 167 // the NO_START_STOP transport restriction flag is set 168 // for a given node.) 169 170 status_t NodeGroup::addNode( 171 NodeRef* node) { 172 173 D_METHOD(( 174 "NodeGroup::addNode()\n")); 175 176 // lock the manager first; if the node has no current group, 177 // this locks the node. 178 m_manager->lock(); 179 180 Autolock _l(this); 181 182 // precondition: GROUP_LOCKED not set 183 if(m_flags & GROUP_LOCKED) 184 return B_NOT_ALLOWED; 185 186 // precondition: no current group 187 if(node->m_group) { 188 // [e.moon 28sep99] whoops, forgot one 189 PRINT(( 190 "!!! node already in group '%s'\n", node->m_group->name())); 191 192 m_manager->unlock(); 193 return B_NOT_ALLOWED; 194 } 195 196 // add it 197 m_nodes.push_back(node); 198 node->_setGroup(this); 199 200 // release the manager 201 m_manager->unlock(); 202 203 // first node? the transport is now ready to start 204 if(m_nodes.size() == 1) { 205 _changeState(TRANSPORT_INVALID, TRANSPORT_STOPPED); 206 } 207 // 208 // if(m_syncNode == media_node::null) { 209 // // assign as sync node 210 // setSyncNode(node->node()); 211 // } 212 // 213 // initialize the new node 214 status_t err = node->_initTransportState(); 215 if(err < B_OK) 216 return err; 217 218 // set time source 219 node->_setTimeSource(m_timeSource.node); 220 221 // set run mode 222 node->_setRunMode(m_runMode); 223 224 // add to cycle set if need be 225 // +++++ should I call _cycleAddRef() instead? 226 if(node->m_cycle) 227 _refCycleChanged(node); 228 229 if(m_transportState == TRANSPORT_RUNNING) { 230 // +++++ start if necessary! 231 } 232 // +++++ not started if TRANSPORT_ROLLING: is that proper? [e.moon 11oct99] 233 234 // send notification 235 if(!LockLooper()) { 236 ASSERT(!"LockLooper() failed."); 237 } 238 BMessage m(M_NODE_ADDED); 239 m.AddInt32("groupID", id()); 240 m.AddInt32("nodeID", node->id()); 241 notify(&m); 242 UnlockLooper(); 243 244 // success 245 return B_OK; 246 } 247 248 249 status_t NodeGroup::removeNode( 250 NodeRef* node) { 251 252 D_METHOD(( 253 "NodeGroup::removeNode()\n")); 254 255 // lock the manager first; once the node is ungrouped, 256 // the manager lock applies to it 257 m_manager->lock(); 258 259 Autolock _l(this); 260 261 // precondition: this must be the node's group 262 if(node->m_group != this) { 263 // [e.moon 28sep99] whoops, forgot one 264 PRINT(( 265 "!!! node not in group '%s'\n", node->m_group->name())); 266 267 m_manager->unlock(); 268 return B_NOT_ALLOWED; 269 } 270 271 // remove from the cycle set 272 if(node->m_cycle) 273 _cycleRemoveRef(node); 274 275 // remove it 276 ASSERT(m_nodes.size()); 277 remove( 278 m_nodes.begin(), 279 m_nodes.end(), 280 node); 281 282 // should have removed one and only one entry 283 m_nodes.resize(m_nodes.size()-1); 284 285 // // 6aug99: the timesource is now the sync node... 286 // // is this the sync node? reassign if so 287 // 288 // if(node->node() == m_syncNode) { 289 // 290 // // look for another sync-capable node 291 // bool found = false; 292 // for(int n = 0; !found && n < m_nodes.size(); ++n) 293 // if(setSyncNode(m_nodes[n]->node()) == B_OK) 294 // found = true; 295 // 296 // // no luck? admit defeat: 297 // if(!found) { 298 // PRINT(( 299 // "* NodeGroup::removeNode(): no sync-capable nodes left!\n")); 300 // 301 // // +++++ stop & set to invalid state? 302 // 303 // setSyncNode(media_node::null); 304 // } 305 // } 306 307 // stop the node if necessary 308 status_t err = node->_stop(); 309 if(err < B_OK) { 310 PRINT(( 311 "*** NodeGroup::removeNode('%s'): error from node->_stop():\n" 312 " %s\n", 313 node->name(), 314 strerror(err))); 315 } 316 317 // clear the node's group pointer 318 node->_setGroup(0); 319 320 // release the manager lock; the node is now ungrouped and 321 // unlocked 322 m_manager->unlock(); 323 324 // was that the last node? stop/disable the transport if so 325 if(!m_nodes.size()) { 326 327 // +++++ kill sync thread(s) 328 329 _changeState(TRANSPORT_INVALID); 330 } 331 332 // send notification 333 if(!LockLooper()) { 334 ASSERT(!"LockLooper() failed."); 335 } 336 BMessage m(M_NODE_REMOVED); 337 m.AddInt32("groupID", id()); 338 m.AddInt32("nodeID", node->id()); 339 notify(&m); 340 UnlockLooper(); 341 342 // success 343 return B_OK; 344 } 345 346 status_t NodeGroup::removeNode( 347 uint32 index) { 348 349 D_METHOD(( 350 "NodeGroup::removeNode(by index)\n")); 351 352 // +++++ icky nested lock 353 Autolock _l(this); 354 355 ASSERT(m_nodes.size() > index); 356 return removeNode(m_nodes[index]); 357 } 358 359 uint32 NodeGroup::groupFlags() const { 360 Autolock _l(this); 361 return m_flags; 362 } 363 364 status_t NodeGroup::setGroupFlags( 365 uint32 flags) { 366 Autolock _l(this); 367 m_flags = flags; 368 return B_OK; 369 } 370 371 372 // returns true if one or more nodes in the group have cycling 373 // enabled, and the start- and end-positions are valid 374 bool NodeGroup::canCycle() const { 375 Autolock _l(this); 376 377 return 378 m_cycleNodes.size() > 0 && 379 m_endPosition - m_startPosition > s_minCyclePeriod; 380 } 381 382 // -------------------------------------------------------- // 383 // *** TRANSPORT POSITIONING (LOCK REQUIRED) 384 // -------------------------------------------------------- // 385 386 // Fetch the current transport state 387 388 NodeGroup::transport_state_t NodeGroup::transportState() const { 389 Autolock _l(this); 390 return m_transportState; 391 } 392 393 // Set the starting media time: 394 // This is the point at which playback will begin in any media 395 // files/documents being played by the nodes in this group. 396 // When cycle mode is enabled, this is the point to which each 397 // node will be seek'd at the end of each cycle (loop). 398 // 399 // The starting time can't be changed in the B_OFFLINE run mode 400 // (this call will return an error.) 401 402 status_t NodeGroup::setStartPosition( 403 bigtime_t start) { 404 Autolock _l(this); 405 406 D_METHOD(( 407 "NodeGroup::setStartPosition(%lld)\n", start)); 408 409 if( 410 m_transportState == TRANSPORT_RUNNING || 411 m_transportState == TRANSPORT_ROLLING || 412 m_transportState == TRANSPORT_STARTING) { 413 414 if(m_runMode == BMediaNode::B_OFFLINE) 415 return B_NOT_ALLOWED; 416 417 ASSERT(m_timeSourceObj); 418 419 if(_cycleValid()) { 420 if(m_timeSourceObj->Now() >= m_cycleDeadline) { 421 // too late to change start position; defer 422 // PRINT((" - deferred\n")); 423 m_newStartPosition = start; 424 m_newStart = true; 425 return B_OK; 426 } 427 428 // not at deadline yet; fall through to set start position 429 } 430 } 431 432 m_startPosition = start; 433 434 // +++++ notify [e.moon 11oct99] 435 436 return B_OK; 437 } 438 439 // Fetch the starting position: 440 441 // +++++ if a previously-set start position was deferred, it won't be 442 // returned yet 443 444 bigtime_t NodeGroup::startPosition() const { 445 Autolock _l(this); 446 447 return m_startPosition; 448 } 449 450 // Set the ending media time: 451 // This is the point at which playback will end relative to 452 // media documents begin played by the nodes in this group; 453 // in cycle mode, this specifies the loop point. If the 454 // ending time is less than or equal to the starting time, 455 // the transport will continue until stopped manually. 456 // If the end position is changed while the transport is playing, 457 // it must take effect retroactively (if it's before the current 458 // position and looping is enabled, all nodes must 'warp' to 459 // the proper post-loop position.) 460 // 461 // The ending time can't be changed if run mode is B_OFFLINE and 462 // the transport is running (this call will return an error.) 463 464 status_t NodeGroup::setEndPosition( 465 bigtime_t end) { 466 Autolock _l(this); 467 468 D_METHOD(( 469 "NodeGroup::setEndPosition(%lld)\n", end)); 470 471 if( 472 m_transportState == TRANSPORT_RUNNING || 473 m_transportState == TRANSPORT_ROLLING || 474 m_transportState == TRANSPORT_STARTING) { 475 476 if(m_runMode == BMediaNode::B_OFFLINE) 477 return B_NOT_ALLOWED; 478 479 ASSERT(m_timeSourceObj); 480 481 bigtime_t endDelta = end - m_endPosition; 482 483 if(_cycleValid()) { 484 if(m_timeSourceObj->Now() >= m_cycleDeadline + endDelta) { 485 // too late to change end position; defer 486 // PRINT((" - deferred\n")); 487 m_newEndPosition = end; 488 m_newEnd = true; 489 return B_OK; 490 } 491 else { 492 // set new end position 493 m_endPosition = end; 494 495 // inform thread 496 ASSERT(m_cyclePort); 497 write_port( 498 m_cyclePort, 499 _CYCLE_END_CHANGED, 500 0, 501 0); 502 // 503 // // restart nodes' cycle threads with new end position 504 // _cycleInit(m_cycleStart); 505 // for(node_set::iterator it = m_cycleNodes.begin(); 506 // it != m_cycleNodes.end(); ++it) { 507 // (*it)->_scheduleCycle(m_cycleBoundary); 508 // } 509 // return B_OK; 510 } 511 } 512 } 513 514 m_endPosition = end; 515 516 // +++++ notify [e.moon 11oct99] 517 518 return B_OK; 519 } 520 521 522 // Fetch the end position: 523 // Note that if the end position is less than or equal to the start 524 // position, it's ignored. 525 526 // +++++ if a previously-set end position was deferred, it won't be 527 // returned yet 528 529 bigtime_t NodeGroup::endPosition() const { 530 Autolock _l(this); 531 return m_endPosition; 532 } 533 534 // -------------------------------------------------------- // 535 // *** TRANSPORT OPERATIONS (LOCK REQUIRED) 536 // -------------------------------------------------------- // 537 538 // Preroll the group: 539 // Seeks, then prerolls, each node in the group (honoring the 540 // NO_SEEK and NO_PREROLL flags.) This ensures that the group 541 // can start as quickly as possible. 542 // 543 // Does not return until all nodes in the group have been 544 // prepared. 545 546 status_t NodeGroup::preroll() { 547 D_METHOD(( 548 "NodeGroup::preroll()\n")); 549 550 Autolock _l(this); 551 return _preroll(); 552 } 553 554 // Start all nodes in the group: 555 // Nodes with the NO_START_STOP flag aren't molested. 556 557 status_t NodeGroup::start() { 558 D_METHOD(( 559 "NodeGroup::start()\n")); 560 561 Autolock _l(this); 562 return _start(); 563 } 564 565 // Stop all nodes in the group: 566 // Nodes with the NO_START_STOP flag aren't molested. 567 568 status_t NodeGroup::stop() { 569 D_METHOD(( 570 "NodeGroup::stop()\n")); 571 572 Autolock _l(this); 573 return _stop(); 574 } 575 576 // Roll all nodes in the group: 577 // Queues a start and stop atomically (via BMediaRoster::RollNode()). 578 // Returns B_NOT_ALLOWED if endPosition <= startPosition; 579 580 status_t NodeGroup::roll() { 581 D_METHOD(( 582 "NodeGroup::roll()\n")); 583 584 Autolock _l(this); 585 return _roll(); 586 } 587 588 // -------------------------------------------------------- // 589 // *** TIME SOURCE & RUN-MODE OPERATIONS (LOCK REQUIRED) 590 // -------------------------------------------------------- // 591 592 // time source control: 593 // getTimeSource(): 594 // returns B_ERROR if no time source has been set; otherwise, 595 // returns the node ID of the current time source for all 596 // nodes in the group. 597 // 598 // setTimeSource(): 599 // Calls SetTimeSourceFor() on every node in the group. 600 // The group must be stopped; B_NOT_ALLOWED will be returned 601 // if the state is TRANSPORT_RUNNING or TRANSPORT_ROLLING. 602 603 status_t NodeGroup::getTimeSource( 604 media_node* outTimeSource) const { 605 Autolock _l(this); 606 607 if(m_timeSource != media_node::null) { 608 *outTimeSource = m_timeSource; 609 return B_OK; 610 } 611 return B_ERROR; 612 } 613 614 status_t NodeGroup::setTimeSource( 615 const media_node& timeSource) { 616 617 Autolock _l(this); 618 619 if(m_transportState == TRANSPORT_RUNNING || m_transportState == TRANSPORT_ROLLING) 620 return B_NOT_ALLOWED; 621 622 if(m_timeSourceObj) 623 m_timeSourceObj->Release(); 624 625 m_timeSource = timeSource; 626 627 // cache a BTimeSource* 628 m_timeSourceObj = m_manager->roster->MakeTimeSourceFor(timeSource); 629 ASSERT(m_timeSourceObj); 630 631 // apply new time source to all nodes 632 for_each( 633 m_nodes.begin(), 634 m_nodes.end(), 635 #if __GNUC__ <= 2 636 bind2nd( 637 mem_fun(&NodeRef::_setTimeSource), 638 m_timeSource.node 639 ) 640 #else 641 [this](NodeRef* nodeRef) { nodeRef->_setTimeSource(m_timeSource.node); } 642 #endif 643 ); 644 645 // // try to set as sync node 646 // err = setSyncNode(timeSource); 647 // if(err < B_OK) { 648 // PRINT(( 649 // "* NodeGroup::setTimeSource(): setSyncNode() failed: %s\n", 650 // strerror(err))); 651 // } 652 653 // notify 654 if(!LockLooper()) { 655 ASSERT(!"LockLooper() failed."); 656 } 657 BMessage m(M_TIME_SOURCE_CHANGED); 658 m.AddInt32("groupID", id()); 659 m.AddInt32("timeSourceID", timeSource.node); 660 notify(&m); 661 UnlockLooper(); 662 663 return B_OK; 664 } 665 666 // run mode access: 667 // Sets the default run mode for the group. This will be 668 // applied to every node with a wildcard (0) run mode. 669 // 670 // Special case: if the run mode is B_OFFLINE, it will be 671 // applied to all nodes in the group. 672 673 BMediaNode::run_mode NodeGroup::runMode() const { 674 Autolock _l(this); 675 return m_runMode; 676 } 677 678 status_t NodeGroup::setRunMode(BMediaNode::run_mode mode) { 679 Autolock _l(this); 680 681 m_runMode = mode; 682 683 // apply to all nodes 684 for_each( 685 m_nodes.begin(), 686 m_nodes.end(), 687 #if __GNUC__ <= 2 688 bind2nd( 689 mem_fun(&NodeRef::_setRunModeAuto), 690 m_runMode 691 ) 692 #else 693 [this](NodeRef* ref) { ref->_setRunModeAuto(this->m_runMode); } 694 #endif 695 // bound_method( 696 // *this, 697 // &NodeGroup::setRunModeFor) 698 ); 699 700 701 return B_OK; 702 } 703 704 // -------------------------------------------------------- // 705 // *** BHandler 706 // -------------------------------------------------------- // 707 708 void NodeGroup::MessageReceived( 709 BMessage* message) { 710 711 // PRINT(( 712 // "NodeGroup::MessageReceived():\n")); 713 // message->PrintToStream(); 714 status_t err; 715 716 switch(message->what) { 717 case M_SET_TIME_SOURCE: 718 { 719 media_node timeSource; 720 void* data; 721 ssize_t dataSize; 722 err = message->FindData( 723 "timeSourceNode", 724 B_RAW_TYPE, 725 (const void**)&data, 726 &dataSize); 727 if(err < B_OK) { 728 PRINT(( 729 "* NodeGroup::MessageReceived(M_SET_TIME_SOURCE):\n" 730 " no timeSourceNode!\n")); 731 break; 732 } 733 timeSource = *(media_node*)data; 734 735 setTimeSource(timeSource); 736 } 737 break; 738 739 case M_SET_RUN_MODE: 740 { 741 uint32 runMode; 742 err = message->FindInt32("runMode", (int32*)&runMode); 743 if(err < B_OK) { 744 PRINT(( 745 "* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n" 746 " no runMode!\n")); 747 break; 748 } 749 750 if(runMode < BMediaNode::B_OFFLINE || 751 runMode > BMediaNode::B_RECORDING) { 752 PRINT(( 753 "* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n" 754 " invalid run mode (%" B_PRIu32 ")\n", runMode)); 755 break; 756 } 757 758 setRunMode((BMediaNode::run_mode)runMode); 759 } 760 break; 761 762 case M_SET_START_POSITION: 763 { 764 bigtime_t position; 765 err = message->FindInt64("position", (int64*)&position); 766 if(err < B_OK) { 767 PRINT(( 768 "* NodeGroup::MessageReceived(M_SET_START_POSITION):\n" 769 " no position!\n")); 770 break; 771 } 772 setStartPosition(position); 773 } 774 break; 775 776 case M_SET_END_POSITION: 777 { 778 bigtime_t position; 779 err = message->FindInt64("position", (int64*)&position); 780 if(err < B_OK) { 781 PRINT(( 782 "* NodeGroup::MessageReceived(M_SET_END_POSITION):\n" 783 " no position!\n")); 784 break; 785 } 786 setEndPosition(position); 787 } 788 break; 789 790 case M_PREROLL: 791 preroll(); 792 break; 793 794 case M_START: 795 start(); 796 break; 797 798 case M_STOP: 799 stop(); 800 break; 801 802 case M_ROLL: 803 roll(); 804 break; 805 806 default: 807 _inherited::MessageReceived(message); 808 break; 809 } 810 } 811 812 813 // -------------------------------------------------------- // 814 // *** IPersistent 815 // -------------------------------------------------------- // 816 817 // ! 818 #if CORTEX_XML 819 // ! 820 821 // +++++ 822 823 // Default constructor 824 NodeGroup::NodeGroup() : 825 m_manager(0) {} // +++++ finish initialization 826 827 828 // ! 829 #endif /*CORTEX_XML*/ 830 // ! 831 832 // -------------------------------------------------------- // 833 // *** IObservable: [19aug99] 834 // -------------------------------------------------------- // 835 836 void NodeGroup::observerAdded( 837 const BMessenger& observer) { 838 839 BMessage m(M_OBSERVER_ADDED); 840 m.AddInt32("groupID", id()); 841 m.AddMessenger("target", BMessenger(this)); 842 observer.SendMessage(&m); 843 } 844 845 void NodeGroup::observerRemoved( 846 const BMessenger& observer) { 847 848 BMessage m(M_OBSERVER_REMOVED); 849 m.AddInt32("groupID", id()); 850 m.AddMessenger("target", BMessenger(this)); 851 observer.SendMessage(&m); 852 } 853 854 void NodeGroup::notifyRelease() { 855 856 BMessage m(M_RELEASED); 857 m.AddInt32("groupID", id()); 858 m.AddMessenger("target", BMessenger(this)); 859 notify(&m); 860 } 861 862 void NodeGroup::releaseComplete() { 863 // +++++ 864 } 865 866 // -------------------------------------------------------- // 867 // *** ILockable: pass lock requests to m_lock 868 // -------------------------------------------------------- // 869 870 bool NodeGroup::lock( 871 lock_t type, 872 bigtime_t timeout) { 873 874 D_LOCK(("*** NodeGroup::lock(): %ld\n", find_thread(0))); 875 876 ASSERT(type == WRITE); 877 status_t err = m_lock.LockWithTimeout(timeout); 878 879 D_LOCK(("*** NodeGroup::lock() ACQUIRED: %ld\n", find_thread(0))); 880 881 return err == B_OK; 882 } 883 884 bool NodeGroup::unlock( 885 lock_t type) { 886 887 D_LOCK(("*** NodeGroup::unlock(): %ld\n", find_thread(0))); 888 889 ASSERT(type == WRITE); 890 m_lock.Unlock(); 891 892 D_LOCK(("*** NodeGroup::unlock() RELEASED: %ld\n", find_thread(0))); 893 894 return true; 895 } 896 897 bool NodeGroup::isLocked( 898 lock_t type) const { 899 900 ASSERT(type == WRITE); 901 return m_lock.IsLocked(); 902 } 903 904 // -------------------------------------------------------- // 905 // *** ctor (accessible to NodeManager) 906 // -------------------------------------------------------- // 907 908 NodeGroup::NodeGroup( 909 const char* name, 910 NodeManager* manager, 911 BMediaNode::run_mode runMode) : 912 913 ObservableHandler(name), 914 m_lock("NodeGroup::m_lock"), 915 m_manager(manager), 916 m_id(NextID()), 917 m_name(name), 918 m_flags(0), 919 m_transportState(TRANSPORT_INVALID), 920 m_runMode(runMode), 921 m_timeSourceObj(0), 922 m_released(false), 923 m_cycleThread(0), 924 m_cyclePort(0), 925 m_startPosition(0LL), 926 m_endPosition(0LL), 927 m_newStart(false), 928 m_newEnd(false) { 929 930 ASSERT(m_manager); 931 932 if(!m_manager->Lock()) { 933 ASSERT(!"m_manager->Lock() failed"); 934 } 935 m_manager->AddHandler(this); 936 m_manager->Unlock(); 937 938 // set default time source 939 media_node ts; 940 D_ROSTER(("# roster->GetTimeSource()\n")); 941 status_t err = m_manager->roster->GetTimeSource(&ts); 942 if(err < B_OK) { 943 PRINT(( 944 "*** NodeGroup(): roster->GetTimeSource() failed:\n" 945 " %s\n", strerror(err))); 946 } 947 setTimeSource(ts); 948 } 949 950 // -------------------------------------------------------- // 951 // *** internal operations 952 // -------------------------------------------------------- // 953 954 uint32 NodeGroup::s_nextID = 1; 955 uint32 NodeGroup::NextID() { 956 return atomic_add((int32*)&s_nextID, 1); 957 } 958 959 // -------------------------------------------------------- // 960 // *** ref->group communication (LOCK REQUIRED) 961 // -------------------------------------------------------- // 962 963 // when a NodeRef's cycle state (ie. looping or not looping) 964 // changes, it must pass that information on via this method 965 966 void NodeGroup::_refCycleChanged( 967 NodeRef* ref) { 968 assert_locked(this); 969 D_METHOD(( 970 "NodeGroup::_refCycleChanged('%s')\n", 971 ref->name())); 972 973 if(ref->m_cycle) { 974 _cycleAddRef(ref); 975 } else { 976 _cycleRemoveRef(ref); 977 } 978 979 // +++++ if running & cycle valid, the node should be properly 980 // seek'd and start'd 981 } 982 983 984 // when a cycling node's latency changes, call this method. 985 986 void NodeGroup::_refLatencyChanged( 987 NodeRef* ref) { 988 assert_locked(this); 989 D_METHOD(( 990 "NodeGroup::_refLatencyChanged('%s')\n", 991 ref->name())); 992 993 if(!_cycleValid()) 994 return; 995 996 // remove & replace ref (positions it properly) 997 _cycleRemoveRef(ref); 998 _cycleAddRef(ref); 999 1000 // slap my thread up 1001 ASSERT(m_cyclePort); 1002 write_port( 1003 m_cyclePort, 1004 _CYCLE_LATENCY_CHANGED, 1005 0, 1006 0); 1007 1008 // +++++ zat it? 1009 } 1010 1011 // when a NodeRef receives notification that it has been stopped, 1012 // but is labeled as still running, it must call this method. 1013 // [e.moon 11oct99: roll/B_OFFLINE support] 1014 1015 void NodeGroup::_refStopped( 1016 NodeRef* ref) { 1017 assert_locked(this); 1018 D_METHOD(( 1019 "NodeGroup::_refStopped('%s')\n", 1020 ref->name())); 1021 1022 // roll/B_OFFLINE support [e.moon 11oct99] 1023 // (check to see if any other nodes in the group are still running; 1024 // mark group stopped if not.) 1025 if(m_transportState == TRANSPORT_ROLLING) { 1026 bool nodesRunning = false; 1027 for(node_set::iterator it = m_nodes.begin(); 1028 it != m_nodes.end(); ++it) { 1029 if((*it)->isRunning()) { 1030 nodesRunning = true; 1031 break; 1032 } 1033 } 1034 if(!nodesRunning) 1035 // the group has stopped; update transport state 1036 _changeState(TRANSPORT_STOPPED); 1037 1038 } 1039 1040 } 1041 1042 1043 // -------------------------------------------------------- // 1044 // *** transport helpers (LOCK REQUIRED) 1045 // -------------------------------------------------------- // 1046 1047 1048 // Preroll all nodes in the group; this is the implementation 1049 // of preroll(). 1050 // *** this method should not be called from the transport thread 1051 // (since preroll operations can block for a relatively long time.) 1052 1053 status_t NodeGroup::_preroll() { 1054 assert_locked(this); 1055 1056 D_METHOD(( 1057 "NodeGroup::_preroll()\n")); 1058 1059 if( 1060 m_transportState == TRANSPORT_RUNNING || 1061 m_transportState == TRANSPORT_ROLLING) 1062 // too late 1063 return B_NOT_ALLOWED; 1064 1065 // * preroll all nodes to the start position 1066 1067 // +++++ currently, if an error is encountered it's ignored. 1068 // should the whole operation fail if one node couldn't 1069 // be prerolled? 1070 // 1071 // My gut response is 'no', since the preroll step is 1072 // optional, but the caller should have some inkling that 1073 // one of its nodes didn't behave. 1074 1075 // [e.moon 13oct99] making PPC compiler happy 1076 // for_each( 1077 // m_nodes.begin(), 1078 // m_nodes.end(), 1079 // bind2nd( 1080 // mem_fun(&NodeRef::_preroll), 1081 // m_startPosition 1082 // ) 1083 // ); 1084 for(node_set::iterator it = m_nodes.begin(); 1085 it != m_nodes.end(); ++it) { 1086 (*it)->_preroll(m_startPosition); 1087 } 1088 1089 // replaces 1090 // bind2nd( 1091 // bound_method(*this, &NodeGroup::prerollNode), 1092 // m_startPosition 1093 // ) 1094 1095 return B_OK; 1096 } 1097 1098 1099 //// functor: calculates latency of each node it's handed, caching 1100 //// the largest one found; includes initial latency if nodes report it. 1101 // 1102 //class NodeGroup::calcLatencyFn { public: 1103 // bigtime_t& maxLatency; 1104 // 1105 // calcLatencyFn(bigtime_t& _m) : maxLatency(_m) {} 1106 // 1107 // void operator()(NodeRef* r) { 1108 // ASSERT(r); 1109 // 1110 //// PRINT(( 1111 //// "# calcLatencyFn(): '%s'\n", 1112 //// r->name())); 1113 // 1114 // if(!(r->node().kind & B_BUFFER_PRODUCER)) { 1115 // // node can't incur latency 1116 //// PRINT(( 1117 //// "- not a producer\n")); 1118 // return; 1119 // } 1120 // 1121 // bigtime_t latency; 1122 // status_t err = 1123 // BMediaRoster::Roster()->GetLatencyFor( 1124 // r->node(), 1125 // &latency); 1126 // if(err < B_OK) { 1127 // PRINT(( 1128 // "* calcLatencyFn: GetLatencyFor() failed: %s\n", 1129 // strerror(err))); 1130 // return; 1131 // } 1132 //// PRINT(("- %lld\n", latency)); 1133 // 1134 // bigtime_t add; 1135 // err = BMediaRoster::Roster()->GetInitialLatencyFor( 1136 // r->node(), 1137 // &add); 1138 //// PRINT(("- %lld\n", add)); 1139 // if(err < B_OK) { 1140 // PRINT(( 1141 // "* calcLatencyFn: GetInitialLatencyFor() failed: %s\n", 1142 // strerror(err))); 1143 // } 1144 // else 1145 // latency += add; 1146 // 1147 // if(latency > maxLatency) 1148 // maxLatency = latency; 1149 // 1150 //// PRINT(( 1151 //// "- max latency: %lld\n", 1152 //// maxLatency)); 1153 // } 1154 //}; 1155 1156 // Start all nodes in the group; this is the implementation of 1157 // start(). Fails if the run mode is B_OFFLINE; use _roll() instead 1158 // in that case. 1159 // 1160 // (this may be called from the transport thread or from 1161 // an API-implementation method.) 1162 1163 status_t NodeGroup::_start() { 1164 assert_locked(this); 1165 1166 D_METHOD(( 1167 "NodeGroup::_start()\n")); 1168 status_t err; 1169 1170 if(m_transportState != TRANSPORT_STOPPED) 1171 return B_NOT_ALLOWED; 1172 1173 if(m_runMode == BMediaNode::B_OFFLINE) 1174 return B_NOT_ALLOWED; 1175 1176 ASSERT(m_nodes.size()); 1177 1178 _changeState(TRANSPORT_STARTING); 1179 1180 // * Find the highest latency in the group 1181 1182 bigtime_t offset = 0LL; 1183 calcLatencyFn _f(offset); 1184 for_each( 1185 m_nodes.begin(), 1186 m_nodes.end(), 1187 _f); 1188 1189 offset += s_rosterLatency; 1190 PRINT(( 1191 "- offset: %" B_PRIdBIGTIME "\n", offset)); 1192 1193 // * Seek all nodes (in case one or more failed to preroll) 1194 1195 for(node_set::iterator it = m_nodes.begin(); 1196 it != m_nodes.end(); ++it) { 1197 err = (*it)->_seekStopped(m_startPosition); 1198 if(err < B_OK) { 1199 PRINT(( 1200 "! NodeGroup('%s')::_start():\n" 1201 " ref('%s')->_seekStopped(%" B_PRIdBIGTIME ") failed:\n" 1202 " %s\n", 1203 name(), (*it)->name(), m_startPosition, 1204 strerror(err))); 1205 1206 // +++++ continue? 1207 } 1208 } 1209 1210 // * Start all nodes, allowing for the max latency found 1211 1212 ASSERT(m_timeSourceObj); 1213 bigtime_t when = m_timeSourceObj->Now() + offset; 1214 1215 // 10aug99: initialize cycle (loop) settings 1216 if(_cycleValid()) { 1217 _initCycleThread(); 1218 _cycleInit(when); 1219 } 1220 1221 // start the nodes 1222 for(node_set::iterator it = m_nodes.begin(); 1223 it != m_nodes.end(); ++it) { 1224 err = (*it)->_start(when); 1225 if(err < B_OK) { 1226 PRINT(( 1227 "! NodeGroup('%s')::_start():\n" 1228 " ref('%s')->_start(%" B_PRIdBIGTIME ") failed:\n" 1229 " %s\n", 1230 name(), (*it)->name(), when, 1231 strerror(err))); 1232 1233 // +++++ continue? 1234 } 1235 } 1236 1237 // notify observers 1238 _changeState(TRANSPORT_RUNNING); 1239 return B_OK; 1240 } 1241 1242 // Stop all nodes in the group; this is the implementation of 1243 // stop(). 1244 // 1245 // (this may be called from the transport thread or from 1246 // an API-implementation method.) 1247 1248 status_t NodeGroup::_stop() { 1249 1250 D_METHOD(( 1251 "NodeGroup::_stop()\n")); 1252 1253 assert_locked(this); 1254 1255 if( 1256 m_transportState != TRANSPORT_RUNNING && 1257 m_transportState != TRANSPORT_ROLLING) 1258 return B_NOT_ALLOWED; 1259 1260 _changeState(TRANSPORT_STOPPING); 1261 1262 // * stop the cycle thread if need be 1263 _destroyCycleThread(); 1264 1265 // * stop all nodes 1266 // +++++ error reports would be nice 1267 1268 for_each( 1269 m_nodes.begin(), 1270 m_nodes.end(), 1271 #if __GNUC__ <= 2 1272 mem_fun(&NodeRef::_stop) 1273 #else 1274 [](NodeRef* ref) { ref->_stop(); } 1275 #endif 1276 ); 1277 1278 // update transport state 1279 _changeState(TRANSPORT_STOPPED); 1280 1281 return B_OK; 1282 } 1283 1284 // Roll all nodes in the group; this is the implementation of 1285 // roll(). 1286 // 1287 // (this may be called from the transport thread or from 1288 // an API-implementation method.) 1289 1290 status_t NodeGroup::_roll() { 1291 1292 D_METHOD(( 1293 "NodeGroup::_roll()\n")); 1294 assert_locked(this); 1295 status_t err; 1296 1297 if(m_transportState != TRANSPORT_STOPPED) 1298 return B_NOT_ALLOWED; 1299 1300 bigtime_t period = m_endPosition - m_startPosition; 1301 if(period <= 0LL) 1302 return B_NOT_ALLOWED; 1303 1304 _changeState(TRANSPORT_STARTING); 1305 1306 bigtime_t tpStart = 0LL; 1307 bigtime_t tpStop = period; 1308 1309 if(m_runMode != BMediaNode::B_OFFLINE) { 1310 1311 // * Find the highest latency in the group 1312 bigtime_t offset = 0LL; 1313 calcLatencyFn _f(offset); 1314 for_each( 1315 m_nodes.begin(), 1316 m_nodes.end(), 1317 _f); 1318 1319 offset += s_rosterLatency; 1320 PRINT(( 1321 "- offset: %" B_PRIdBIGTIME "\n", offset)); 1322 1323 ASSERT(m_timeSourceObj); 1324 tpStart = m_timeSourceObj->Now() + offset; 1325 tpStop += tpStart; 1326 } 1327 1328 // * Roll all nodes; watch for errors 1329 bool allFailed = true; 1330 err = B_OK; 1331 for( 1332 node_set::iterator it = m_nodes.begin(); 1333 it != m_nodes.end(); ++it) { 1334 1335 status_t e = (*it)->_roll( 1336 tpStart, 1337 tpStop, 1338 m_startPosition); 1339 if(e < B_OK) 1340 err = e; 1341 else 1342 allFailed = false; 1343 } 1344 1345 if(!allFailed) 1346 // notify observers 1347 _changeState(TRANSPORT_ROLLING); 1348 1349 return err; 1350 } 1351 1352 1353 // State transition; notify listeners 1354 // +++++ [18aug99] DANGER: should notification happen in the middle 1355 // of such an operation? 1356 inline void NodeGroup::_changeState( 1357 transport_state_t to) { 1358 1359 assert_locked(this); 1360 1361 m_transportState = to; 1362 1363 if(!LockLooper()) { 1364 ASSERT(!"LockLooper() failed."); 1365 } 1366 BMessage m(M_TRANSPORT_STATE_CHANGED); 1367 m.AddInt32("groupID", id()); 1368 m.AddInt32("transportState", m_transportState); 1369 notify(&m); 1370 UnlockLooper(); 1371 } 1372 1373 // Enforce a state transition, and notify listeners 1374 inline void NodeGroup::_changeState( 1375 transport_state_t from, 1376 transport_state_t to) { 1377 1378 assert_locked(this); 1379 ASSERT(m_transportState == from); 1380 1381 _changeState(to); 1382 } 1383 1384 1385 // -------------------------------------------------------- // 1386 // *** cycle thread & helpers (LOCK REQUIRED) 1387 // -------------------------------------------------------- // 1388 1389 // *** cycle port definitions 1390 1391 const int32 _portLength = 32; 1392 const char* const _portName = "NodeGroup::m_cyclePort"; 1393 const size_t _portMsgMaxSize = 256; 1394 1395 1396 // set up the cycle thread (including its kernel port) 1397 status_t NodeGroup::_initCycleThread() { 1398 assert_locked(this); 1399 status_t err; 1400 D_METHOD(( 1401 "NodeGroup::_initCycleThread()\n")); 1402 1403 if(m_cycleThread) { 1404 // thread is still alive 1405 err = _destroyCycleThread(); 1406 if(err < B_OK) 1407 return err; 1408 } 1409 1410 // create 1411 m_cycleThreadDone = false; 1412 m_cycleThread = spawn_thread( 1413 &_CycleThread, 1414 "NodeGroup[cycleThread]", 1415 B_NORMAL_PRIORITY, 1416 (void*)this); 1417 if(m_cycleThread < B_OK) { 1418 PRINT(( 1419 "* NodeGroup::_initCycleThread(): spawn_thread() failed:\n" 1420 " %s\n", 1421 strerror(m_cycleThread))); 1422 return m_cycleThread; 1423 } 1424 1425 // launch 1426 return resume_thread(m_cycleThread); 1427 } 1428 1429 // shut down the cycle thread/port 1430 status_t NodeGroup::_destroyCycleThread() { 1431 assert_locked(this); 1432 status_t err; 1433 D_METHOD(( 1434 "NodeGroup::_destroyCycleThread()\n")); 1435 1436 if(!m_cycleThread) 1437 return B_OK; 1438 1439 if(!m_cycleThreadDone) { 1440 // kill the thread 1441 ASSERT(m_cyclePort); 1442 err = write_port_etc( 1443 m_cyclePort, 1444 _CYCLE_STOP, 1445 0, 1446 0, 1447 B_TIMEOUT, 1448 10000LL); 1449 1450 if(err < B_OK) { 1451 // bad thread. die, thread, die. 1452 PRINT(( 1453 "* NodeGroup::_destroyCycleThread(): port write failed; killing.\n")); 1454 delete_port(m_cyclePort); 1455 m_cyclePort = 0; 1456 kill_thread(m_cycleThread); 1457 m_cycleThread = 0; 1458 return B_OK; 1459 } 1460 1461 // the thread got the message; wait for it to quit 1462 unlock(); 1463 while(wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED) { 1464 PRINT(( 1465 "! wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED\n")); 1466 } 1467 lock(); 1468 } 1469 1470 // it's up to the thread to close its port 1471 ASSERT(!m_cyclePort); 1472 1473 m_cycleThread = 0; 1474 1475 return B_OK; 1476 } 1477 1478 1479 // 1) do the current positions specify a valid cycle region? 1480 // 2) are any nodes in the group cycle-enabled? 1481 1482 bool NodeGroup::_cycleValid() { 1483 assert_locked(this); 1484 return 1485 (m_transportState == TRANSPORT_RUNNING || 1486 m_transportState == TRANSPORT_STARTING) && 1487 canCycle(); 1488 } 1489 1490 // initialize the cycle members (call when starting) 1491 1492 void NodeGroup::_cycleInit( 1493 bigtime_t startTime) { 1494 assert_locked(this); 1495 ASSERT(m_cycleNodes.size() > 0); 1496 D_METHOD(( 1497 "NodeGroup::_cycleInit(%lld)\n", 1498 startTime)); 1499 1500 // +++++ rescan latencies? 1501 1502 // figure new boundary & deadline from region length 1503 bigtime_t cyclePeriod = m_endPosition - m_startPosition; 1504 1505 if(cyclePeriod <= 0) { 1506 // cycle region is no longer valid 1507 m_cycleBoundary = 0LL; 1508 m_cycleDeadline = 0LL; 1509 1510 // no no no -- deadlocks when the thread calls this method 1511 // // stop the thread 1512 // _destroyCycleThread(); 1513 return; 1514 } 1515 1516 m_cycleStart = startTime; 1517 m_cycleBoundary = startTime + cyclePeriod; 1518 m_cycleDeadline = m_cycleBoundary - (m_cycleMaxLatency + s_rosterLatency); 1519 } 1520 1521 1522 // add a ref to the cycle set (in proper order, based on latency) 1523 void NodeGroup::_cycleAddRef( 1524 NodeRef* ref) { 1525 assert_locked(this); 1526 1527 // make sure it's not already there 1528 ASSERT(find( 1529 m_cycleNodes.begin(), 1530 m_cycleNodes.end(), 1531 ref) == m_cycleNodes.end()); 1532 1533 // [re]calc latency if 0 1534 if(!ref->m_latency) 1535 ref->_updateLatency(); 1536 1537 node_set::iterator it; 1538 for(it = m_cycleNodes.begin(); 1539 it != m_cycleNodes.end(); ++it) { 1540 if(ref->m_latency > (*it)->m_latency) { 1541 m_cycleNodes.insert(it, ref); 1542 break; 1543 } 1544 } 1545 1546 // not inserted? new ref belongs at the end 1547 if(it == m_cycleNodes.end()) 1548 m_cycleNodes.insert(it, ref); 1549 } 1550 1551 // remove a ref from the cycle set 1552 void NodeGroup::_cycleRemoveRef( 1553 NodeRef* ref) { 1554 assert_locked(this); 1555 1556 node_set::iterator it = find( 1557 m_cycleNodes.begin(), 1558 m_cycleNodes.end(), 1559 ref); 1560 ASSERT(it != m_cycleNodes.end()); 1561 m_cycleNodes.erase(it); 1562 } 1563 1564 bigtime_t NodeGroup::_cycleBoundary() const { 1565 Autolock _l(this); 1566 return m_cycleBoundary; 1567 } 1568 1569 // cycle thread impl. 1570 /*static*/ 1571 status_t NodeGroup::_CycleThread(void* user) { 1572 ((NodeGroup*)user)->_cycleThread(); 1573 return B_OK; 1574 } 1575 1576 void NodeGroup::_cycleThread() { 1577 1578 status_t err; 1579 int32 code; 1580 int32 errorCount = 0; 1581 1582 // +++++ liability -- if the thread has to be killed, this buffer 1583 // won't be reclaimed 1584 char* msgBuffer = new char[_portMsgMaxSize]; 1585 array_delete<char> _d(msgBuffer); 1586 1587 // create port 1588 ASSERT(!m_cyclePort); 1589 m_cyclePort = create_port( 1590 _portLength, 1591 _portName); 1592 ASSERT(m_cyclePort >= B_OK); 1593 1594 // the message-handling loop 1595 bool done = false; 1596 while(!done) { 1597 1598 // *** wait until it's time to queue the next cycle, or until 1599 // *** a message arrives 1600 1601 lock(); // **** BEGIN LOCKED SECTION **** 1602 if(!_cycleValid()) { 1603 unlock(); 1604 break; 1605 } 1606 1607 ASSERT(m_cycleNodes.size() > 0); 1608 ASSERT(m_timeSourceObj); 1609 1610 bigtime_t maxLatency = m_cycleNodes.front()->m_latency; 1611 bigtime_t wakeUpAt = m_timeSourceObj->RealTimeFor( 1612 m_cycleBoundary, maxLatency + s_rosterLatency); 1613 bigtime_t timeout = wakeUpAt - m_timeSourceObj->RealTime(); 1614 1615 if(timeout <= 0) { 1616 // +++++ whoops, I'm late. 1617 // +++++ adjust to compensate !!! 1618 PRINT(( 1619 "*** NodeGroup::_cycleThread(): LATE\n" 1620 " by %" B_PRIdBIGTIME "\n", -timeout)); 1621 } 1622 1623 // +++++ if timeout is very short, spin until the target time arrives 1624 1625 unlock(); // **** END LOCKED SECTION **** 1626 1627 // block until message arrives or it's time to wake up 1628 err = read_port_etc( 1629 m_cyclePort, 1630 &code, 1631 msgBuffer, 1632 _portMsgMaxSize, 1633 B_TIMEOUT, 1634 timeout); 1635 1636 if(err == B_TIMED_OUT) { 1637 // the time has come to seek my nodes 1638 _handleCycleService(); 1639 continue; 1640 } 1641 else if(err < B_OK) { 1642 // any other error is bad news 1643 PRINT(( 1644 "* NodeGroup::_cycleThread(): read_port error:\n" 1645 " %s\n" 1646 " ABORTING\n\n", strerror(err))); 1647 if(++errorCount > 10) { 1648 PRINT(( 1649 "*** Too many errors; aborting.\n")); 1650 break; 1651 } 1652 continue; 1653 } 1654 1655 errorCount = 0; 1656 1657 // process the message 1658 switch(code) { 1659 case _CYCLE_STOP: 1660 // bail 1661 done = true; 1662 break; 1663 1664 case _CYCLE_END_CHANGED: 1665 case _CYCLE_LATENCY_CHANGED: 1666 // fall through to next loop; for now, these messages 1667 // serve only to slap me out of my stupor and reassess 1668 // the timing situation... 1669 break; 1670 1671 default: 1672 PRINT(( 1673 "* NodeGroup::_cycleThread(): unknown message code '%" 1674 B_PRId32 "'\n", code)); 1675 break; 1676 } 1677 } // while(!done) 1678 1679 1680 // delete port 1681 delete_port(m_cyclePort); 1682 m_cyclePort = 0; 1683 1684 // done 1685 m_cycleThreadDone = true; 1686 } 1687 1688 // cycle service: seek all nodes & initiate next cycle 1689 void NodeGroup::_handleCycleService() { 1690 Autolock _l(this); 1691 // D_METHOD(( 1692 // "NodeGroup::_handleCycleService()\n")); 1693 status_t err; 1694 1695 if(!_cycleValid()) { 1696 // PRINT(( 1697 // "- _handleCycleService(): cycle not valid; quitting.\n")); 1698 return; 1699 } 1700 1701 // seek 1702 for(node_set::iterator it = m_cycleNodes.begin(); 1703 it != m_cycleNodes.end(); ++it) { 1704 err = (*it)->_seek( 1705 m_startPosition, 1706 m_cycleBoundary); 1707 if(err < B_OK) { 1708 PRINT(( 1709 "- _handleCycleService(): node('%s')::_seek() failed:\n" 1710 " %s\n", 1711 (*it)->name(), strerror(err))); 1712 } 1713 } 1714 1715 // update cycle settings 1716 if(m_newStart) { 1717 m_newStart = false; 1718 m_startPosition = m_newStartPosition; 1719 } 1720 if(m_newEnd) { 1721 m_newEnd = false; 1722 m_endPosition = m_newEndPosition; 1723 } 1724 1725 // prepare next cycle 1726 _cycleInit(m_cycleBoundary); 1727 } 1728 1729 // END -- NodeGroup.cpp -- 1730