1 // NodeGroup.cpp 2 3 #include "NodeGroup.h" 4 //#include "NodeGroup_transport_thread.h" 5 6 #include "NodeManager.h" 7 #include "NodeRef.h" 8 9 #include <MediaRoster.h> 10 #include <OS.h> 11 #include <TimeSource.h> 12 13 #include <algorithm> 14 #include <functional> 15 16 #include "array_delete.h" 17 #include "BasicThread.h" 18 #include "node_manager_impl.h" 19 #include "functional_tools.h" 20 21 __USE_CORTEX_NAMESPACE 22 #define D_METHOD(x) //PRINT (x) 23 #define D_ROSTER(x) //PRINT (x) 24 #define D_LOCK(x) //PRINT (x) 25 26 27 28 // -------------------------------------------------------- // 29 // *** ctor/dtor 30 // -------------------------------------------------------- // 31 32 // free the group, including all nodes within it 33 // (this call will result in the eventual deletion of the object.) 34 // returns B_OK on success; B_NOT_ALLOWED if release() has 35 // already been called; other error codes if the Media Roster 36 // call fails. 37 // * THE MANAGER MUST BE LOCKED 38 39 status_t NodeGroup::release() { 40 41 D_METHOD(( 42 "NodeGroup::release()\n")); 43 44 if(isReleased()) 45 return B_NOT_ALLOWED; 46 47 // clean up 48 lock(); 49 50 // halt all nodes 51 _stop(); 52 53 // remove & release all nodes 54 // +++++ causes triply-nested lock: eww! 55 while(m_nodes.size()) { 56 NodeRef* last = m_nodes.back(); 57 removeNode(m_nodes.size()-1); 58 last->release(); 59 } 60 61 unlock(); 62 63 // [e.moon 7nov99] 64 // removing the released group is now NodeManager's responsibility 65 // 66 // remove from NodeManager 67 if(!m_manager->lock()) { 68 ASSERT(!"* m_manager->lock() failed.\n"); 69 } 70 m_manager->_removeGroup(this); 71 m_manager->unlock(); 72 73 // hand off to IObservable 74 return _inherited::release(); 75 } 76 77 // call release() rather than deleting NodeGroup objects 78 NodeGroup::~NodeGroup() { 79 80 Autolock _l(this); 81 D_METHOD(( 82 "~NodeGroup()\n")); 83 84 ASSERT(!m_nodes.size()); 85 86 if(m_timeSourceObj) { 87 m_timeSourceObj->Release(); 88 m_timeSourceObj = 0; 89 } 90 } 91 92 93 // -------------------------------------------------------- // 94 // *** accessors 95 // -------------------------------------------------------- // 96 97 // [e.moon 13oct99] moved to header 98 //inline uint32 NodeGroup::id() const { return m_id; } 99 100 // -------------------------------------------------------- // 101 // *** operations 102 // -------------------------------------------------------- // 103 104 // name access 105 const char* NodeGroup::name() const { 106 Autolock _l(this); 107 return m_name.String(); 108 } 109 110 status_t NodeGroup::setName(const char* name) { 111 Autolock _l(this); 112 m_name = name; 113 return B_OK; 114 } 115 116 // content access 117 uint32 NodeGroup::countNodes() const { 118 Autolock _l(this); 119 return m_nodes.size(); 120 } 121 122 NodeRef* NodeGroup::nodeAt( 123 uint32 index) const { 124 Autolock _l(this); 125 return (index < m_nodes.size()) ? 126 m_nodes[index] : 127 0; 128 } 129 130 // add/remove nodes: 131 // - you may only add a node with no current group. 132 // - nodes added during playback will be started; 133 // nodes removed during playback will be stopped (unless 134 // the NO_START_STOP transport restriction flag is set 135 // for a given node.) 136 137 status_t NodeGroup::addNode( 138 NodeRef* node) { 139 140 D_METHOD(( 141 "NodeGroup::addNode()\n")); 142 143 // lock the manager first; if the node has no current group, 144 // this locks the node. 145 m_manager->lock(); 146 147 Autolock _l(this); 148 149 // precondition: GROUP_LOCKED not set 150 if(m_flags & GROUP_LOCKED) 151 return B_NOT_ALLOWED; 152 153 // precondition: no current group 154 if(node->m_group) { 155 // [e.moon 28sep99] whoops, forgot one 156 PRINT(( 157 "!!! node already in group '%s'\n", node->m_group->name())); 158 159 m_manager->unlock(); 160 return B_NOT_ALLOWED; 161 } 162 163 // add it 164 m_nodes.push_back(node); 165 node->_setGroup(this); 166 167 // release the manager 168 m_manager->unlock(); 169 170 // first node? the transport is now ready to start 171 if(m_nodes.size() == 1) { 172 _changeState(TRANSPORT_INVALID, TRANSPORT_STOPPED); 173 } 174 // 175 // if(m_syncNode == media_node::null) { 176 // // assign as sync node 177 // setSyncNode(node->node()); 178 // } 179 // 180 // initialize the new node 181 status_t err = node->_initTransportState(); 182 if(err < B_OK) 183 return err; 184 185 // set time source 186 node->_setTimeSource(m_timeSource.node); 187 188 // set run mode 189 node->_setRunMode(m_runMode); 190 191 // add to cycle set if need be 192 // +++++ should I call _cycleAddRef() instead? 193 if(node->m_cycle) 194 _refCycleChanged(node); 195 196 if(m_transportState == TRANSPORT_RUNNING) { 197 // +++++ start if necessary! 198 } 199 // +++++ not started if TRANSPORT_ROLLING: is that proper? [e.moon 11oct99] 200 201 // send notification 202 if(!LockLooper()) { 203 ASSERT(!"LockLooper() failed."); 204 } 205 BMessage m(M_NODE_ADDED); 206 m.AddInt32("groupID", id()); 207 m.AddInt32("nodeID", node->id()); 208 notify(&m); 209 UnlockLooper(); 210 211 // success 212 return B_OK; 213 } 214 215 216 status_t NodeGroup::removeNode( 217 NodeRef* node) { 218 219 D_METHOD(( 220 "NodeGroup::removeNode()\n")); 221 222 // lock the manager first; once the node is ungrouped, 223 // the manager lock applies to it 224 m_manager->lock(); 225 226 Autolock _l(this); 227 228 // precondition: this must be the node's group 229 if(node->m_group != this) { 230 // [e.moon 28sep99] whoops, forgot one 231 PRINT(( 232 "!!! node not in group '%s'\n", node->m_group->name())); 233 234 m_manager->unlock(); 235 return B_NOT_ALLOWED; 236 } 237 238 // remove from the cycle set 239 if(node->m_cycle) 240 _cycleRemoveRef(node); 241 242 // remove it 243 ASSERT(m_nodes.size()); 244 remove( 245 m_nodes.begin(), 246 m_nodes.end(), 247 node); 248 249 // should have removed one and only one entry 250 m_nodes.resize(m_nodes.size()-1); 251 252 // // 6aug99: the timesource is now the sync node... 253 // // is this the sync node? reassign if so 254 // 255 // if(node->node() == m_syncNode) { 256 // 257 // // look for another sync-capable node 258 // bool found = false; 259 // for(int n = 0; !found && n < m_nodes.size(); ++n) 260 // if(setSyncNode(m_nodes[n]->node()) == B_OK) 261 // found = true; 262 // 263 // // no luck? admit defeat: 264 // if(!found) { 265 // PRINT(( 266 // "* NodeGroup::removeNode(): no sync-capable nodes left!\n")); 267 // 268 // // +++++ stop & set to invalid state? 269 // 270 // setSyncNode(media_node::null); 271 // } 272 // } 273 274 // stop the node if necessary 275 status_t err = node->_stop(); 276 if(err < B_OK) { 277 PRINT(( 278 "*** NodeGroup::removeNode('%s'): error from node->_stop():\n" 279 " %s\n", 280 node->name(), 281 strerror(err))); 282 } 283 284 // clear the node's group pointer 285 node->_setGroup(0); 286 287 // release the manager lock; the node is now ungrouped and 288 // unlocked 289 m_manager->unlock(); 290 291 // was that the last node? stop/disable the transport if so 292 if(!m_nodes.size()) { 293 294 // +++++ kill sync thread(s) 295 296 _changeState(TRANSPORT_INVALID); 297 } 298 299 // send notification 300 if(!LockLooper()) { 301 ASSERT(!"LockLooper() failed."); 302 } 303 BMessage m(M_NODE_REMOVED); 304 m.AddInt32("groupID", id()); 305 m.AddInt32("nodeID", node->id()); 306 notify(&m); 307 UnlockLooper(); 308 309 // success 310 return B_OK; 311 } 312 313 status_t NodeGroup::removeNode( 314 uint32 index) { 315 316 D_METHOD(( 317 "NodeGroup::removeNode(by index)\n")); 318 319 // +++++ icky nested lock 320 Autolock _l(this); 321 322 ASSERT(m_nodes.size() > index); 323 return removeNode(m_nodes[index]); 324 } 325 326 uint32 NodeGroup::groupFlags() const { 327 Autolock _l(this); 328 return m_flags; 329 } 330 331 status_t NodeGroup::setGroupFlags( 332 uint32 flags) { 333 Autolock _l(this); 334 m_flags = flags; 335 return B_OK; 336 } 337 338 339 // returns true if one or more nodes in the group have cycling 340 // enabled, and the start- and end-positions are valid 341 bool NodeGroup::canCycle() const { 342 Autolock _l(this); 343 344 return 345 m_cycleNodes.size() > 0 && 346 m_endPosition - m_startPosition > s_minCyclePeriod; 347 } 348 349 // -------------------------------------------------------- // 350 // *** TRANSPORT POSITIONING (LOCK REQUIRED) 351 // -------------------------------------------------------- // 352 353 // Fetch the current transport state 354 355 NodeGroup::transport_state_t NodeGroup::transportState() const { 356 Autolock _l(this); 357 return m_transportState; 358 } 359 360 // Set the starting media time: 361 // This is the point at which playback will begin in any media 362 // files/documents being played by the nodes in this group. 363 // When cycle mode is enabled, this is the point to which each 364 // node will be seek'd at the end of each cycle (loop). 365 // 366 // The starting time can't be changed in the B_OFFLINE run mode 367 // (this call will return an error.) 368 369 status_t NodeGroup::setStartPosition( 370 bigtime_t start) { 371 Autolock _l(this); 372 373 D_METHOD(( 374 "NodeGroup::setStartPosition(%Ld)\n", start)); 375 376 if( 377 m_transportState == TRANSPORT_RUNNING || 378 m_transportState == TRANSPORT_ROLLING || 379 m_transportState == TRANSPORT_STARTING) { 380 381 if(m_runMode == BMediaNode::B_OFFLINE) 382 return B_NOT_ALLOWED; 383 384 ASSERT(m_timeSourceObj); 385 386 if(_cycleValid()) { 387 if(m_timeSourceObj->Now() >= m_cycleDeadline) { 388 // too late to change start position; defer 389 // PRINT((" - deferred\n")); 390 m_newStartPosition = start; 391 m_newStart = true; 392 return B_OK; 393 } 394 395 // not at deadline yet; fall through to set start position 396 } 397 } 398 399 m_startPosition = start; 400 401 // +++++ notify [e.moon 11oct99] 402 403 return B_OK; 404 } 405 406 // Fetch the starting position: 407 408 // +++++ if a previously-set start position was deferred, it won't be 409 // returned yet 410 411 bigtime_t NodeGroup::startPosition() const { 412 Autolock _l(this); 413 414 return m_startPosition; 415 } 416 417 // Set the ending media time: 418 // This is the point at which playback will end relative to 419 // media documents begin played by the nodes in this group; 420 // in cycle mode, this specifies the loop point. If the 421 // ending time is less than or equal to the starting time, 422 // the transport will continue until stopped manually. 423 // If the end position is changed while the transport is playing, 424 // it must take effect retroactively (if it's before the current 425 // position and looping is enabled, all nodes must 'warp' to 426 // the proper post-loop position.) 427 // 428 // The ending time can't be changed if run mode is B_OFFLINE and 429 // the transport is running (this call will return an error.) 430 431 status_t NodeGroup::setEndPosition( 432 bigtime_t end) { 433 Autolock _l(this); 434 435 D_METHOD(( 436 "NodeGroup::setEndPosition(%Ld)\n", end)); 437 438 if( 439 m_transportState == TRANSPORT_RUNNING || 440 m_transportState == TRANSPORT_ROLLING || 441 m_transportState == TRANSPORT_STARTING) { 442 443 if(m_runMode == BMediaNode::B_OFFLINE) 444 return B_NOT_ALLOWED; 445 446 ASSERT(m_timeSourceObj); 447 448 bigtime_t endDelta = end - m_endPosition; 449 450 if(_cycleValid()) { 451 if(m_timeSourceObj->Now() >= m_cycleDeadline + endDelta) { 452 // too late to change end position; defer 453 // PRINT((" - deferred\n")); 454 m_newEndPosition = end; 455 m_newEnd = true; 456 return B_OK; 457 } 458 else { 459 // set new end position 460 m_endPosition = end; 461 462 // inform thread 463 ASSERT(m_cyclePort); 464 write_port( 465 m_cyclePort, 466 _CYCLE_END_CHANGED, 467 0, 468 0); 469 // 470 // // restart nodes' cycle threads with new end position 471 // _cycleInit(m_cycleStart); 472 // for(node_set::iterator it = m_cycleNodes.begin(); 473 // it != m_cycleNodes.end(); ++it) { 474 // (*it)->_scheduleCycle(m_cycleBoundary); 475 // } 476 // return B_OK; 477 } 478 } 479 } 480 481 m_endPosition = end; 482 483 // +++++ notify [e.moon 11oct99] 484 485 return B_OK; 486 } 487 488 489 // Fetch the end position: 490 // Note that if the end position is less than or equal to the start 491 // position, it's ignored. 492 493 // +++++ if a previously-set end position was deferred, it won't be 494 // returned yet 495 496 bigtime_t NodeGroup::endPosition() const { 497 Autolock _l(this); 498 return m_endPosition; 499 } 500 501 // -------------------------------------------------------- // 502 // *** TRANSPORT OPERATIONS (LOCK REQUIRED) 503 // -------------------------------------------------------- // 504 505 // Preroll the group: 506 // Seeks, then prerolls, each node in the group (honoring the 507 // NO_SEEK and NO_PREROLL flags.) This ensures that the group 508 // can start as quickly as possible. 509 // 510 // Does not return until all nodes in the group have been 511 // prepared. 512 513 status_t NodeGroup::preroll() { 514 D_METHOD(( 515 "NodeGroup::preroll()\n")); 516 517 Autolock _l(this); 518 return _preroll(); 519 } 520 521 // Start all nodes in the group: 522 // Nodes with the NO_START_STOP flag aren't molested. 523 524 status_t NodeGroup::start() { 525 D_METHOD(( 526 "NodeGroup::start()\n")); 527 528 Autolock _l(this); 529 return _start(); 530 } 531 532 // Stop all nodes in the group: 533 // Nodes with the NO_START_STOP flag aren't molested. 534 535 status_t NodeGroup::stop() { 536 D_METHOD(( 537 "NodeGroup::stop()\n")); 538 539 Autolock _l(this); 540 return _stop(); 541 } 542 543 // Roll all nodes in the group: 544 // Queues a start and stop atomically (via BMediaRoster::RollNode()). 545 // Returns B_NOT_ALLOWED if endPosition <= startPosition; 546 547 status_t NodeGroup::roll() { 548 D_METHOD(( 549 "NodeGroup::roll()\n")); 550 551 Autolock _l(this); 552 return _roll(); 553 } 554 555 // -------------------------------------------------------- // 556 // *** TIME SOURCE & RUN-MODE OPERATIONS (LOCK REQUIRED) 557 // -------------------------------------------------------- // 558 559 // time source control: 560 // getTimeSource(): 561 // returns B_ERROR if no time source has been set; otherwise, 562 // returns the node ID of the current time source for all 563 // nodes in the group. 564 // 565 // setTimeSource(): 566 // Calls SetTimeSourceFor() on every node in the group. 567 // The group must be stopped; B_NOT_ALLOWED will be returned 568 // if the state is TRANSPORT_RUNNING or TRANSPORT_ROLLING. 569 570 status_t NodeGroup::getTimeSource( 571 media_node* outTimeSource) const { 572 Autolock _l(this); 573 574 if(m_timeSource != media_node::null) { 575 *outTimeSource = m_timeSource; 576 return B_OK; 577 } 578 return B_ERROR; 579 } 580 581 status_t NodeGroup::setTimeSource( 582 const media_node& timeSource) { 583 584 Autolock _l(this); 585 586 if(m_transportState == TRANSPORT_RUNNING || m_transportState == TRANSPORT_ROLLING) 587 return B_NOT_ALLOWED; 588 589 if(m_timeSourceObj) 590 m_timeSourceObj->Release(); 591 592 m_timeSource = timeSource; 593 594 // cache a BTimeSource* 595 m_timeSourceObj = m_manager->roster->MakeTimeSourceFor(timeSource); 596 ASSERT(m_timeSourceObj); 597 598 // apply new time source to all nodes 599 for_each( 600 m_nodes.begin(), 601 m_nodes.end(), 602 bind2nd( 603 mem_fun(&NodeRef::_setTimeSource), 604 m_timeSource.node 605 ) 606 ); 607 608 // // try to set as sync node 609 // err = setSyncNode(timeSource); 610 // if(err < B_OK) { 611 // PRINT(( 612 // "* NodeGroup::setTimeSource(): setSyncNode() failed: %s\n", 613 // strerror(err))); 614 // } 615 616 // notify 617 if(!LockLooper()) { 618 ASSERT(!"LockLooper() failed."); 619 } 620 BMessage m(M_TIME_SOURCE_CHANGED); 621 m.AddInt32("groupID", id()); 622 m.AddInt32("timeSourceID", timeSource.node); 623 notify(&m); 624 UnlockLooper(); 625 626 return B_OK; 627 } 628 629 // run mode access: 630 // Sets the default run mode for the group. This will be 631 // applied to every node with a wildcard (0) run mode. 632 // 633 // Special case: if the run mode is B_OFFLINE, it will be 634 // applied to all nodes in the group. 635 636 BMediaNode::run_mode NodeGroup::runMode() const { 637 Autolock _l(this); 638 return m_runMode; 639 } 640 641 status_t NodeGroup::setRunMode(BMediaNode::run_mode mode) { 642 Autolock _l(this); 643 644 m_runMode = mode; 645 646 // apply to all nodes 647 for_each( 648 m_nodes.begin(), 649 m_nodes.end(), 650 bind2nd( 651 mem_fun(&NodeRef::_setRunModeAuto), 652 m_runMode 653 ) 654 // bound_method( 655 // *this, 656 // &NodeGroup::setRunModeFor) 657 ); 658 659 660 return B_OK; 661 } 662 663 // -------------------------------------------------------- // 664 // *** BHandler 665 // -------------------------------------------------------- // 666 667 void NodeGroup::MessageReceived( 668 BMessage* message) { 669 670 // PRINT(( 671 // "NodeGroup::MessageReceived():\n")); 672 // message->PrintToStream(); 673 status_t err; 674 675 switch(message->what) { 676 case M_SET_TIME_SOURCE: 677 { 678 media_node timeSource; 679 void* data; 680 ssize_t dataSize; 681 err = message->FindData( 682 "timeSourceNode", 683 B_RAW_TYPE, 684 (const void**)&data, 685 &dataSize); 686 if(err < B_OK) { 687 PRINT(( 688 "* NodeGroup::MessageReceived(M_SET_TIME_SOURCE):\n" 689 " no timeSourceNode!\n")); 690 break; 691 } 692 timeSource = *(media_node*)data; 693 694 setTimeSource(timeSource); 695 } 696 break; 697 698 case M_SET_RUN_MODE: 699 { 700 uint32 runMode; 701 err = message->FindInt32("runMode", (int32*)&runMode); 702 if(err < B_OK) { 703 PRINT(( 704 "* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n" 705 " no runMode!\n")); 706 break; 707 } 708 709 if(runMode < BMediaNode::B_OFFLINE || 710 runMode > BMediaNode::B_RECORDING) { 711 PRINT(( 712 "* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n" 713 " invalid run mode (%ld)\n", runMode)); 714 break; 715 } 716 717 setRunMode((BMediaNode::run_mode)runMode); 718 } 719 break; 720 721 case M_SET_START_POSITION: 722 { 723 bigtime_t position; 724 err = message->FindInt64("position", (int64*)&position); 725 if(err < B_OK) { 726 PRINT(( 727 "* NodeGroup::MessageReceived(M_SET_START_POSITION):\n" 728 " no position!\n")); 729 break; 730 } 731 setStartPosition(position); 732 } 733 break; 734 735 case M_SET_END_POSITION: 736 { 737 bigtime_t position; 738 err = message->FindInt64("position", (int64*)&position); 739 if(err < B_OK) { 740 PRINT(( 741 "* NodeGroup::MessageReceived(M_SET_END_POSITION):\n" 742 " no position!\n")); 743 break; 744 } 745 setEndPosition(position); 746 } 747 break; 748 749 case M_PREROLL: 750 preroll(); 751 break; 752 753 case M_START: 754 start(); 755 break; 756 757 case M_STOP: 758 stop(); 759 break; 760 761 case M_ROLL: 762 roll(); 763 break; 764 765 default: 766 _inherited::MessageReceived(message); 767 break; 768 } 769 } 770 771 772 // -------------------------------------------------------- // 773 // *** IPersistent 774 // -------------------------------------------------------- // 775 776 // ! 777 #if CORTEX_XML 778 // ! 779 780 // +++++ 781 782 // Default constructor 783 NodeGroup::NodeGroup() : 784 m_manager(0) {} // +++++ finish initialization 785 786 787 // ! 788 #endif /*CORTEX_XML*/ 789 // ! 790 791 // -------------------------------------------------------- // 792 // *** IObservable: [19aug99] 793 // -------------------------------------------------------- // 794 795 void NodeGroup::observerAdded( 796 const BMessenger& observer) { 797 798 BMessage m(M_OBSERVER_ADDED); 799 m.AddInt32("groupID", id()); 800 m.AddMessenger("target", BMessenger(this)); 801 observer.SendMessage(&m); 802 } 803 804 void NodeGroup::observerRemoved( 805 const BMessenger& observer) { 806 807 BMessage m(M_OBSERVER_REMOVED); 808 m.AddInt32("groupID", id()); 809 m.AddMessenger("target", BMessenger(this)); 810 observer.SendMessage(&m); 811 } 812 813 void NodeGroup::notifyRelease() { 814 815 BMessage m(M_RELEASED); 816 m.AddInt32("groupID", id()); 817 m.AddMessenger("target", BMessenger(this)); 818 notify(&m); 819 } 820 821 void NodeGroup::releaseComplete() { 822 // +++++ 823 } 824 825 // -------------------------------------------------------- // 826 // *** ILockable: pass lock requests to m_lock 827 // -------------------------------------------------------- // 828 829 bool NodeGroup::lock( 830 lock_t type, 831 bigtime_t timeout) { 832 833 D_LOCK(("*** NodeGroup::lock(): %ld\n", find_thread(0))); 834 835 ASSERT(type == WRITE); 836 status_t err = m_lock.LockWithTimeout(timeout); 837 838 D_LOCK(("*** NodeGroup::lock() ACQUIRED: %ld\n", find_thread(0))); 839 840 return err == B_OK; 841 } 842 843 bool NodeGroup::unlock( 844 lock_t type) { 845 846 D_LOCK(("*** NodeGroup::unlock(): %ld\n", find_thread(0))); 847 848 ASSERT(type == WRITE); 849 m_lock.Unlock(); 850 851 D_LOCK(("*** NodeGroup::unlock() RELEASED: %ld\n", find_thread(0))); 852 853 return true; 854 } 855 856 bool NodeGroup::isLocked( 857 lock_t type) const { 858 859 ASSERT(type == WRITE); 860 return m_lock.IsLocked(); 861 } 862 863 // -------------------------------------------------------- // 864 // *** ctor (accessible to NodeManager) 865 // -------------------------------------------------------- // 866 867 NodeGroup::NodeGroup( 868 const char* name, 869 NodeManager* manager, 870 BMediaNode::run_mode runMode) : 871 872 ObservableHandler(name), 873 m_lock("NodeGroup::m_lock"), 874 m_manager(manager), 875 m_id(NextID()), 876 m_name(name), 877 m_flags(0), 878 m_transportState(TRANSPORT_INVALID), 879 m_runMode(runMode), 880 m_timeSourceObj(0), 881 m_released(false), 882 m_cycleThread(0), 883 m_cyclePort(0), 884 m_startPosition(0LL), 885 m_endPosition(0LL), 886 m_newStart(false), 887 m_newEnd(false) { 888 889 ASSERT(m_manager); 890 891 if(!m_manager->Lock()) { 892 ASSERT(!"m_manager->Lock() failed"); 893 } 894 m_manager->AddHandler(this); 895 m_manager->Unlock(); 896 897 // set default time source 898 media_node ts; 899 D_ROSTER(("# roster->GetTimeSource()\n")); 900 status_t err = m_manager->roster->GetTimeSource(&ts); 901 if(err < B_OK) { 902 PRINT(( 903 "*** NodeGroup(): roster->GetTimeSource() failed:\n" 904 " %s\n", strerror(err))); 905 } 906 setTimeSource(ts); 907 } 908 909 // -------------------------------------------------------- // 910 // *** internal operations 911 // -------------------------------------------------------- // 912 913 uint32 NodeGroup::s_nextID = 1; 914 uint32 NodeGroup::NextID() { 915 return atomic_add((int32*)&s_nextID, 1); 916 } 917 918 // -------------------------------------------------------- // 919 // *** ref->group communication (LOCK REQUIRED) 920 // -------------------------------------------------------- // 921 922 // when a NodeRef's cycle state (ie. looping or not looping) 923 // changes, it must pass that information on via this method 924 925 void NodeGroup::_refCycleChanged( 926 NodeRef* ref) { 927 assert_locked(this); 928 D_METHOD(( 929 "NodeGroup::_refCycleChanged('%s')\n", 930 ref->name())); 931 932 if(ref->m_cycle) { 933 _cycleAddRef(ref); 934 } else { 935 _cycleRemoveRef(ref); 936 } 937 938 // +++++ if running & cycle valid, the node should be properly 939 // seek'd and start'd 940 } 941 942 943 // when a cycling node's latency changes, call this method. 944 945 void NodeGroup::_refLatencyChanged( 946 NodeRef* ref) { 947 assert_locked(this); 948 D_METHOD(( 949 "NodeGroup::_refLatencyChanged('%s')\n", 950 ref->name())); 951 952 if(!_cycleValid()) 953 return; 954 955 // remove & replace ref (positions it properly) 956 _cycleRemoveRef(ref); 957 _cycleAddRef(ref); 958 959 // slap my thread up 960 ASSERT(m_cyclePort); 961 write_port( 962 m_cyclePort, 963 _CYCLE_LATENCY_CHANGED, 964 0, 965 0); 966 967 // +++++ zat it? 968 } 969 970 // when a NodeRef receives notification that it has been stopped, 971 // but is labeled as still running, it must call this method. 972 // [e.moon 11oct99: roll/B_OFFLINE support] 973 974 void NodeGroup::_refStopped( 975 NodeRef* ref) { 976 assert_locked(this); 977 D_METHOD(( 978 "NodeGroup::_refStopped('%s')\n", 979 ref->name())); 980 981 // roll/B_OFFLINE support [e.moon 11oct99] 982 // (check to see if any other nodes in the group are still running; 983 // mark group stopped if not.) 984 if(m_transportState == TRANSPORT_ROLLING) { 985 bool nodesRunning = false; 986 for(node_set::iterator it = m_nodes.begin(); 987 it != m_nodes.end(); ++it) { 988 if((*it)->isRunning()) { 989 nodesRunning = true; 990 break; 991 } 992 } 993 if(!nodesRunning) 994 // the group has stopped; update transport state 995 _changeState(TRANSPORT_STOPPED); 996 997 } 998 999 } 1000 1001 1002 // -------------------------------------------------------- // 1003 // *** transport helpers (LOCK REQUIRED) 1004 // -------------------------------------------------------- // 1005 1006 1007 // Preroll all nodes in the group; this is the implementation 1008 // of preroll(). 1009 // *** this method should not be called from the transport thread 1010 // (since preroll operations can block for a relatively long time.) 1011 1012 status_t NodeGroup::_preroll() { 1013 assert_locked(this); 1014 1015 D_METHOD(( 1016 "NodeGroup::_preroll()\n")); 1017 1018 if( 1019 m_transportState == TRANSPORT_RUNNING || 1020 m_transportState == TRANSPORT_ROLLING) 1021 // too late 1022 return B_NOT_ALLOWED; 1023 1024 // * preroll all nodes to the start position 1025 1026 // +++++ currently, if an error is encountered it's ignored. 1027 // should the whole operation fail if one node couldn't 1028 // be prerolled? 1029 // 1030 // My gut response is 'no', since the preroll step is 1031 // optional, but the caller should have some inkling that 1032 // one of its nodes didn't behave. 1033 1034 // [e.moon 13oct99] making PPC compiler happy 1035 // for_each( 1036 // m_nodes.begin(), 1037 // m_nodes.end(), 1038 // bind2nd( 1039 // mem_fun(&NodeRef::_preroll), 1040 // m_startPosition 1041 // ) 1042 // ); 1043 for(node_set::iterator it = m_nodes.begin(); 1044 it != m_nodes.end(); ++it) { 1045 (*it)->_preroll(m_startPosition); 1046 } 1047 1048 // replaces 1049 // bind2nd( 1050 // bound_method(*this, &NodeGroup::prerollNode), 1051 // m_startPosition 1052 // ) 1053 1054 return B_OK; 1055 } 1056 1057 1058 //// functor: calculates latency of each node it's handed, caching 1059 //// the largest one found; includes initial latency if nodes report it. 1060 // 1061 //class NodeGroup::calcLatencyFn { public: 1062 // bigtime_t& maxLatency; 1063 // 1064 // calcLatencyFn(bigtime_t& _m) : maxLatency(_m) {} 1065 // 1066 // void operator()(NodeRef* r) { 1067 // ASSERT(r); 1068 // 1069 //// PRINT(( 1070 //// "# calcLatencyFn(): '%s'\n", 1071 //// r->name())); 1072 // 1073 // if(!(r->node().kind & B_BUFFER_PRODUCER)) { 1074 // // node can't incur latency 1075 //// PRINT(( 1076 //// "- not a producer\n")); 1077 // return; 1078 // } 1079 // 1080 // bigtime_t latency; 1081 // status_t err = 1082 // BMediaRoster::Roster()->GetLatencyFor( 1083 // r->node(), 1084 // &latency); 1085 // if(err < B_OK) { 1086 // PRINT(( 1087 // "* calcLatencyFn: GetLatencyFor() failed: %s\n", 1088 // strerror(err))); 1089 // return; 1090 // } 1091 //// PRINT(("- %Ld\n", latency)); 1092 // 1093 // bigtime_t add; 1094 // err = BMediaRoster::Roster()->GetInitialLatencyFor( 1095 // r->node(), 1096 // &add); 1097 //// PRINT(("- %Ld\n", add)); 1098 // if(err < B_OK) { 1099 // PRINT(( 1100 // "* calcLatencyFn: GetInitialLatencyFor() failed: %s\n", 1101 // strerror(err))); 1102 // } 1103 // else 1104 // latency += add; 1105 // 1106 // if(latency > maxLatency) 1107 // maxLatency = latency; 1108 // 1109 //// PRINT(( 1110 //// "- max latency: %Ld\n", 1111 //// maxLatency)); 1112 // } 1113 //}; 1114 1115 // Start all nodes in the group; this is the implementation of 1116 // start(). Fails if the run mode is B_OFFLINE; use _roll() instead 1117 // in that case. 1118 // 1119 // (this may be called from the transport thread or from 1120 // an API-implementation method.) 1121 1122 status_t NodeGroup::_start() { 1123 assert_locked(this); 1124 1125 D_METHOD(( 1126 "NodeGroup::_start()\n")); 1127 status_t err; 1128 1129 if(m_transportState != TRANSPORT_STOPPED) 1130 return B_NOT_ALLOWED; 1131 1132 if(m_runMode == BMediaNode::B_OFFLINE) 1133 return B_NOT_ALLOWED; 1134 1135 ASSERT(m_nodes.size()); 1136 1137 _changeState(TRANSPORT_STARTING); 1138 1139 // * Find the highest latency in the group 1140 1141 bigtime_t offset = 0LL; 1142 calcLatencyFn _f(offset); 1143 for_each( 1144 m_nodes.begin(), 1145 m_nodes.end(), 1146 _f); 1147 1148 offset += s_rosterLatency; 1149 PRINT(( 1150 "- offset: %Ld\n", offset)); 1151 1152 // * Seek all nodes (in case one or more failed to preroll) 1153 1154 for(node_set::iterator it = m_nodes.begin(); 1155 it != m_nodes.end(); ++it) { 1156 err = (*it)->_seekStopped(m_startPosition); 1157 if(err < B_OK) { 1158 PRINT(( 1159 "! NodeGroup('%s')::_start():\n" 1160 " ref('%s')->_seekStopped(%Ld) failed:\n" 1161 " %s\n", 1162 name(), (*it)->name(), m_startPosition, 1163 strerror(err))); 1164 1165 // +++++ continue? 1166 } 1167 } 1168 1169 // * Start all nodes, allowing for the max latency found 1170 1171 ASSERT(m_timeSourceObj); 1172 bigtime_t when = m_timeSourceObj->Now() + offset; 1173 1174 // 10aug99: initialize cycle (loop) settings 1175 if(_cycleValid()) { 1176 _initCycleThread(); 1177 _cycleInit(when); 1178 } 1179 1180 // start the nodes 1181 for(node_set::iterator it = m_nodes.begin(); 1182 it != m_nodes.end(); ++it) { 1183 err = (*it)->_start(when); 1184 if(err < B_OK) { 1185 PRINT(( 1186 "! NodeGroup('%s')::_start():\n" 1187 " ref('%s')->_start(%Ld) failed:\n" 1188 " %s\n", 1189 name(), (*it)->name(), when, 1190 strerror(err))); 1191 1192 // +++++ continue? 1193 } 1194 } 1195 1196 // notify observers 1197 _changeState(TRANSPORT_RUNNING); 1198 return B_OK; 1199 } 1200 1201 // Stop all nodes in the group; this is the implementation of 1202 // stop(). 1203 // 1204 // (this may be called from the transport thread or from 1205 // an API-implementation method.) 1206 1207 status_t NodeGroup::_stop() { 1208 1209 D_METHOD(( 1210 "NodeGroup::_stop()\n")); 1211 1212 assert_locked(this); 1213 1214 if( 1215 m_transportState != TRANSPORT_RUNNING && 1216 m_transportState != TRANSPORT_ROLLING) 1217 return B_NOT_ALLOWED; 1218 1219 _changeState(TRANSPORT_STOPPING); 1220 1221 // * stop the cycle thread if need be 1222 _destroyCycleThread(); 1223 1224 // * stop all nodes 1225 // +++++ error reports would be nice 1226 1227 for_each( 1228 m_nodes.begin(), 1229 m_nodes.end(), 1230 mem_fun(&NodeRef::_stop) 1231 ); 1232 1233 // update transport state 1234 _changeState(TRANSPORT_STOPPED); 1235 1236 return B_OK; 1237 } 1238 1239 // Roll all nodes in the group; this is the implementation of 1240 // roll(). 1241 // 1242 // (this may be called from the transport thread or from 1243 // an API-implementation method.) 1244 1245 status_t NodeGroup::_roll() { 1246 1247 D_METHOD(( 1248 "NodeGroup::_roll()\n")); 1249 assert_locked(this); 1250 status_t err; 1251 1252 if(m_transportState != TRANSPORT_STOPPED) 1253 return B_NOT_ALLOWED; 1254 1255 bigtime_t period = m_endPosition - m_startPosition; 1256 if(period <= 0LL) 1257 return B_NOT_ALLOWED; 1258 1259 _changeState(TRANSPORT_STARTING); 1260 1261 bigtime_t tpStart = 0LL; 1262 bigtime_t tpStop = period; 1263 1264 if(m_runMode != BMediaNode::B_OFFLINE) { 1265 1266 // * Find the highest latency in the group 1267 bigtime_t offset = 0LL; 1268 calcLatencyFn _f(offset); 1269 for_each( 1270 m_nodes.begin(), 1271 m_nodes.end(), 1272 _f); 1273 1274 offset += s_rosterLatency; 1275 PRINT(( 1276 "- offset: %Ld\n", offset)); 1277 1278 ASSERT(m_timeSourceObj); 1279 tpStart = m_timeSourceObj->Now() + offset; 1280 tpStop += tpStart; 1281 } 1282 1283 // * Roll all nodes; watch for errors 1284 bool allFailed = true; 1285 err = B_OK; 1286 for( 1287 node_set::iterator it = m_nodes.begin(); 1288 it != m_nodes.end(); ++it) { 1289 1290 status_t e = (*it)->_roll( 1291 tpStart, 1292 tpStop, 1293 m_startPosition); 1294 if(e < B_OK) 1295 err = e; 1296 else 1297 allFailed = false; 1298 } 1299 1300 if(!allFailed) 1301 // notify observers 1302 _changeState(TRANSPORT_ROLLING); 1303 1304 return err; 1305 } 1306 1307 1308 // State transition; notify listeners 1309 // +++++ [18aug99] DANGER: should notification happen in the middle 1310 // of such an operation? 1311 inline void NodeGroup::_changeState( 1312 transport_state_t to) { 1313 1314 assert_locked(this); 1315 1316 m_transportState = to; 1317 1318 if(!LockLooper()) { 1319 ASSERT(!"LockLooper() failed."); 1320 } 1321 BMessage m(M_TRANSPORT_STATE_CHANGED); 1322 m.AddInt32("groupID", id()); 1323 m.AddInt32("transportState", m_transportState); 1324 notify(&m); 1325 UnlockLooper(); 1326 } 1327 1328 // Enforce a state transition, and notify listeners 1329 inline void NodeGroup::_changeState( 1330 transport_state_t from, 1331 transport_state_t to) { 1332 1333 assert_locked(this); 1334 ASSERT(m_transportState == from); 1335 1336 _changeState(to); 1337 } 1338 1339 1340 // -------------------------------------------------------- // 1341 // *** cycle thread & helpers (LOCK REQUIRED) 1342 // -------------------------------------------------------- // 1343 1344 // *** cycle port definitions 1345 1346 const int32 _portLength = 32; 1347 const char* const _portName = "NodeGroup::m_cyclePort"; 1348 const size_t _portMsgMaxSize = 256; 1349 1350 1351 // set up the cycle thread (including its kernel port) 1352 status_t NodeGroup::_initCycleThread() { 1353 assert_locked(this); 1354 status_t err; 1355 D_METHOD(( 1356 "NodeGroup::_initCycleThread()\n")); 1357 1358 if(m_cycleThread) { 1359 // thread is still alive 1360 err = _destroyCycleThread(); 1361 if(err < B_OK) 1362 return err; 1363 } 1364 1365 // create 1366 m_cycleThreadDone = false; 1367 m_cycleThread = spawn_thread( 1368 &_CycleThread, 1369 "NodeGroup[cycleThread]", 1370 B_NORMAL_PRIORITY, 1371 (void*)this); 1372 if(m_cycleThread < B_OK) { 1373 PRINT(( 1374 "* NodeGroup::_initCycleThread(): spawn_thread() failed:\n" 1375 " %s\n", 1376 strerror(m_cycleThread))); 1377 return m_cycleThread; 1378 } 1379 1380 // launch 1381 return resume_thread(m_cycleThread); 1382 } 1383 1384 // shut down the cycle thread/port 1385 status_t NodeGroup::_destroyCycleThread() { 1386 assert_locked(this); 1387 status_t err; 1388 D_METHOD(( 1389 "NodeGroup::_destroyCycleThread()\n")); 1390 1391 if(!m_cycleThread) 1392 return B_OK; 1393 1394 if(!m_cycleThreadDone) { 1395 // kill the thread 1396 ASSERT(m_cyclePort); 1397 err = write_port_etc( 1398 m_cyclePort, 1399 _CYCLE_STOP, 1400 0, 1401 0, 1402 B_TIMEOUT, 1403 10000LL); 1404 1405 if(err < B_OK) { 1406 // bad thread. die, thread, die. 1407 PRINT(( 1408 "* NodeGroup::_destroyCycleThread(): port write failed; killing.\n")); 1409 delete_port(m_cyclePort); 1410 m_cyclePort = 0; 1411 kill_thread(m_cycleThread); 1412 m_cycleThread = 0; 1413 return B_OK; 1414 } 1415 1416 // the thread got the message; wait for it to quit 1417 unlock(); 1418 while(wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED) { 1419 PRINT(( 1420 "! wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED\n")); 1421 } 1422 lock(); 1423 } 1424 1425 // it's up to the thread to close its port 1426 ASSERT(!m_cyclePort); 1427 1428 m_cycleThread = 0; 1429 1430 return B_OK; 1431 } 1432 1433 1434 // 1) do the current positions specify a valid cycle region? 1435 // 2) are any nodes in the group cycle-enabled? 1436 1437 bool NodeGroup::_cycleValid() { 1438 assert_locked(this); 1439 return 1440 (m_transportState == TRANSPORT_RUNNING || 1441 m_transportState == TRANSPORT_STARTING) && 1442 canCycle(); 1443 } 1444 1445 // initialize the cycle members (call when starting) 1446 1447 void NodeGroup::_cycleInit( 1448 bigtime_t startTime) { 1449 assert_locked(this); 1450 ASSERT(m_cycleNodes.size() > 0); 1451 D_METHOD(( 1452 "NodeGroup::_cycleInit(%Ld)\n", 1453 startTime)); 1454 1455 // +++++ rescan latencies? 1456 1457 // figure new boundary & deadline from region length 1458 bigtime_t cyclePeriod = m_endPosition - m_startPosition; 1459 1460 if(cyclePeriod <= 0) { 1461 // cycle region is no longer valid 1462 m_cycleBoundary = 0LL; 1463 m_cycleDeadline = 0LL; 1464 1465 // no no no -- deadlocks when the thread calls this method 1466 // // stop the thread 1467 // _destroyCycleThread(); 1468 return; 1469 } 1470 1471 m_cycleStart = startTime; 1472 m_cycleBoundary = startTime + cyclePeriod; 1473 m_cycleDeadline = m_cycleBoundary - (m_cycleMaxLatency + s_rosterLatency); 1474 } 1475 1476 1477 // add a ref to the cycle set (in proper order, based on latency) 1478 void NodeGroup::_cycleAddRef( 1479 NodeRef* ref) { 1480 assert_locked(this); 1481 1482 // make sure it's not already there 1483 ASSERT(find( 1484 m_cycleNodes.begin(), 1485 m_cycleNodes.end(), 1486 ref) == m_cycleNodes.end()); 1487 1488 // [re]calc latency if 0 1489 if(!ref->m_latency) 1490 ref->_updateLatency(); 1491 1492 node_set::iterator it; 1493 for(it = m_cycleNodes.begin(); 1494 it != m_cycleNodes.end(); ++it) { 1495 if(ref->m_latency > (*it)->m_latency) { 1496 m_cycleNodes.insert(it, ref); 1497 break; 1498 } 1499 } 1500 1501 // not inserted? new ref belongs at the end 1502 if(it == m_cycleNodes.end()) 1503 m_cycleNodes.insert(it, ref); 1504 } 1505 1506 // remove a ref from the cycle set 1507 void NodeGroup::_cycleRemoveRef( 1508 NodeRef* ref) { 1509 assert_locked(this); 1510 1511 node_set::iterator it = find( 1512 m_cycleNodes.begin(), 1513 m_cycleNodes.end(), 1514 ref); 1515 ASSERT(it != m_cycleNodes.end()); 1516 m_cycleNodes.erase(it); 1517 } 1518 1519 bigtime_t NodeGroup::_cycleBoundary() const { 1520 Autolock _l(this); 1521 return m_cycleBoundary; 1522 } 1523 1524 // cycle thread impl. 1525 /*static*/ 1526 status_t NodeGroup::_CycleThread(void* user) { 1527 ((NodeGroup*)user)->_cycleThread(); 1528 return B_OK; 1529 } 1530 1531 void NodeGroup::_cycleThread() { 1532 1533 status_t err; 1534 int32 code; 1535 int32 errorCount = 0; 1536 1537 // +++++ liability -- if the thread has to be killed, this buffer 1538 // won't be reclaimed 1539 char* msgBuffer = new char[_portMsgMaxSize]; 1540 array_delete<char> _d(msgBuffer); 1541 1542 // create port 1543 ASSERT(!m_cyclePort); 1544 m_cyclePort = create_port( 1545 _portLength, 1546 _portName); 1547 ASSERT(m_cyclePort >= B_OK); 1548 1549 // the message-handling loop 1550 bool done = false; 1551 while(!done) { 1552 1553 // *** wait until it's time to queue the next cycle, or until 1554 // *** a message arrives 1555 1556 lock(); // **** BEGIN LOCKED SECTION **** 1557 if(!_cycleValid()) { 1558 unlock(); 1559 break; 1560 } 1561 1562 ASSERT(m_cycleNodes.size() > 0); 1563 ASSERT(m_timeSourceObj); 1564 1565 bigtime_t maxLatency = m_cycleNodes.front()->m_latency; 1566 bigtime_t wakeUpAt = m_timeSourceObj->RealTimeFor( 1567 m_cycleBoundary, maxLatency + s_rosterLatency); 1568 bigtime_t timeout = wakeUpAt - m_timeSourceObj->RealTime(); 1569 1570 if(timeout <= 0) { 1571 // +++++ whoops, I'm late. 1572 // +++++ adjust to compensate !!! 1573 PRINT(( 1574 "*** NodeGroup::_cycleThread(): LATE\n" 1575 " by %Ld\n", -timeout)); 1576 } 1577 1578 // +++++ if timeout is very short, spin until the target time arrives 1579 1580 unlock(); // **** END LOCKED SECTION **** 1581 1582 // block until message arrives or it's time to wake up 1583 err = read_port_etc( 1584 m_cyclePort, 1585 &code, 1586 msgBuffer, 1587 _portMsgMaxSize, 1588 B_TIMEOUT, 1589 timeout); 1590 1591 if(err == B_TIMED_OUT) { 1592 // the time has come to seek my nodes 1593 _handleCycleService(); 1594 continue; 1595 } 1596 else if(err < B_OK) { 1597 // any other error is bad news 1598 PRINT(( 1599 "* NodeGroup::_cycleThread(): read_port error:\n" 1600 " %s\n" 1601 " ABORTING\n\n", strerror(err))); 1602 if(++errorCount > 10) { 1603 PRINT(( 1604 "*** Too many errors; aborting.\n")); 1605 break; 1606 } 1607 continue; 1608 } 1609 1610 errorCount = 0; 1611 1612 // process the message 1613 switch(code) { 1614 case _CYCLE_STOP: 1615 // bail 1616 done = true; 1617 break; 1618 1619 case _CYCLE_END_CHANGED: 1620 case _CYCLE_LATENCY_CHANGED: 1621 // fall through to next loop; for now, these messages 1622 // serve only to slap me out of my stupor and reassess 1623 // the timing situation... 1624 break; 1625 1626 default: 1627 PRINT(( 1628 "* NodeGroup::_cycleThread(): unknown message code '%ld'\n", code)); 1629 break; 1630 } 1631 } // while(!done) 1632 1633 1634 // delete port 1635 delete_port(m_cyclePort); 1636 m_cyclePort = 0; 1637 1638 // done 1639 m_cycleThreadDone = true; 1640 } 1641 1642 // cycle service: seek all nodes & initiate next cycle 1643 void NodeGroup::_handleCycleService() { 1644 Autolock _l(this); 1645 // D_METHOD(( 1646 // "NodeGroup::_handleCycleService()\n")); 1647 status_t err; 1648 1649 if(!_cycleValid()) { 1650 // PRINT(( 1651 // "- _handleCycleService(): cycle not valid; quitting.\n")); 1652 return; 1653 } 1654 1655 // seek 1656 for(node_set::iterator it = m_cycleNodes.begin(); 1657 it != m_cycleNodes.end(); ++it) { 1658 err = (*it)->_seek( 1659 m_startPosition, 1660 m_cycleBoundary); 1661 if(err < B_OK) { 1662 PRINT(( 1663 "- _handleCycleService(): node('%s')::_seek() failed:\n" 1664 " %s\n", 1665 (*it)->name(), strerror(err))); 1666 } 1667 } 1668 1669 // update cycle settings 1670 if(m_newStart) { 1671 m_newStart = false; 1672 m_startPosition = m_newStartPosition; 1673 } 1674 if(m_newEnd) { 1675 m_newEnd = false; 1676 m_endPosition = m_newEndPosition; 1677 } 1678 1679 // prepare next cycle 1680 _cycleInit(m_cycleBoundary); 1681 } 1682 1683 // END -- NodeGroup.cpp -- 1684