1 // NodeGroup.cpp 2 3 #include "NodeGroup.h" 4 //#include "NodeGroup_transport_thread.h" 5 6 #include "NodeManager.h" 7 #include "NodeRef.h" 8 9 #include <MediaRoster.h> 10 #include <OS.h> 11 #include <TimeSource.h> 12 13 #include <algorithm> 14 #include <functional> 15 16 #include "array_delete.h" 17 #include "BasicThread.h" 18 #include "node_manager_impl.h" 19 #include "functional_tools.h" 20 21 using namespace std; 22 23 __USE_CORTEX_NAMESPACE 24 #define D_METHOD(x) //PRINT (x) 25 #define D_ROSTER(x) //PRINT (x) 26 #define D_LOCK(x) //PRINT (x) 27 28 29 30 // -------------------------------------------------------- // 31 // *** ctor/dtor 32 // -------------------------------------------------------- // 33 34 // free the group, including all nodes within it 35 // (this call will result in the eventual deletion of the object.) 36 // returns B_OK on success; B_NOT_ALLOWED if release() has 37 // already been called; other error codes if the Media Roster 38 // call fails. 39 // * THE MANAGER MUST BE LOCKED 40 41 status_t NodeGroup::release() { 42 43 D_METHOD(( 44 "NodeGroup::release()\n")); 45 46 if(isReleased()) 47 return B_NOT_ALLOWED; 48 49 // clean up 50 lock(); 51 52 // halt all nodes 53 _stop(); 54 55 // remove & release all nodes 56 // +++++ causes triply-nested lock: eww! 57 while(m_nodes.size()) { 58 NodeRef* last = m_nodes.back(); 59 removeNode(m_nodes.size()-1); 60 last->release(); 61 } 62 63 unlock(); 64 65 // [e.moon 7nov99] 66 // removing the released group is now NodeManager's responsibility 67 // 68 // remove from NodeManager 69 if(!m_manager->lock()) { 70 ASSERT(!"* m_manager->lock() failed.\n"); 71 } 72 m_manager->_removeGroup(this); 73 m_manager->unlock(); 74 75 // hand off to IObservable 76 return _inherited::release(); 77 } 78 79 // call release() rather than deleting NodeGroup objects 80 NodeGroup::~NodeGroup() { 81 82 Autolock _l(this); 83 D_METHOD(( 84 "~NodeGroup()\n")); 85 86 ASSERT(!m_nodes.size()); 87 88 if(m_timeSourceObj) { 89 m_timeSourceObj->Release(); 90 m_timeSourceObj = 0; 91 } 92 } 93 94 95 // -------------------------------------------------------- // 96 // *** accessors 97 // -------------------------------------------------------- // 98 99 // [e.moon 13oct99] moved to header 100 //inline uint32 NodeGroup::id() const { return m_id; } 101 102 // -------------------------------------------------------- // 103 // *** operations 104 // -------------------------------------------------------- // 105 106 // name access 107 const char* NodeGroup::name() const { 108 Autolock _l(this); 109 return m_name.String(); 110 } 111 112 status_t NodeGroup::setName(const char* name) { 113 Autolock _l(this); 114 m_name = name; 115 return B_OK; 116 } 117 118 // content access 119 uint32 NodeGroup::countNodes() const { 120 Autolock _l(this); 121 return m_nodes.size(); 122 } 123 124 NodeRef* NodeGroup::nodeAt( 125 uint32 index) const { 126 Autolock _l(this); 127 return (index < m_nodes.size()) ? 128 m_nodes[index] : 129 0; 130 } 131 132 // add/remove nodes: 133 // - you may only add a node with no current group. 134 // - nodes added during playback will be started; 135 // nodes removed during playback will be stopped (unless 136 // the NO_START_STOP transport restriction flag is set 137 // for a given node.) 138 139 status_t NodeGroup::addNode( 140 NodeRef* node) { 141 142 D_METHOD(( 143 "NodeGroup::addNode()\n")); 144 145 // lock the manager first; if the node has no current group, 146 // this locks the node. 147 m_manager->lock(); 148 149 Autolock _l(this); 150 151 // precondition: GROUP_LOCKED not set 152 if(m_flags & GROUP_LOCKED) 153 return B_NOT_ALLOWED; 154 155 // precondition: no current group 156 if(node->m_group) { 157 // [e.moon 28sep99] whoops, forgot one 158 PRINT(( 159 "!!! node already in group '%s'\n", node->m_group->name())); 160 161 m_manager->unlock(); 162 return B_NOT_ALLOWED; 163 } 164 165 // add it 166 m_nodes.push_back(node); 167 node->_setGroup(this); 168 169 // release the manager 170 m_manager->unlock(); 171 172 // first node? the transport is now ready to start 173 if(m_nodes.size() == 1) { 174 _changeState(TRANSPORT_INVALID, TRANSPORT_STOPPED); 175 } 176 // 177 // if(m_syncNode == media_node::null) { 178 // // assign as sync node 179 // setSyncNode(node->node()); 180 // } 181 // 182 // initialize the new node 183 status_t err = node->_initTransportState(); 184 if(err < B_OK) 185 return err; 186 187 // set time source 188 node->_setTimeSource(m_timeSource.node); 189 190 // set run mode 191 node->_setRunMode(m_runMode); 192 193 // add to cycle set if need be 194 // +++++ should I call _cycleAddRef() instead? 195 if(node->m_cycle) 196 _refCycleChanged(node); 197 198 if(m_transportState == TRANSPORT_RUNNING) { 199 // +++++ start if necessary! 200 } 201 // +++++ not started if TRANSPORT_ROLLING: is that proper? [e.moon 11oct99] 202 203 // send notification 204 if(!LockLooper()) { 205 ASSERT(!"LockLooper() failed."); 206 } 207 BMessage m(M_NODE_ADDED); 208 m.AddInt32("groupID", id()); 209 m.AddInt32("nodeID", node->id()); 210 notify(&m); 211 UnlockLooper(); 212 213 // success 214 return B_OK; 215 } 216 217 218 status_t NodeGroup::removeNode( 219 NodeRef* node) { 220 221 D_METHOD(( 222 "NodeGroup::removeNode()\n")); 223 224 // lock the manager first; once the node is ungrouped, 225 // the manager lock applies to it 226 m_manager->lock(); 227 228 Autolock _l(this); 229 230 // precondition: this must be the node's group 231 if(node->m_group != this) { 232 // [e.moon 28sep99] whoops, forgot one 233 PRINT(( 234 "!!! node not in group '%s'\n", node->m_group->name())); 235 236 m_manager->unlock(); 237 return B_NOT_ALLOWED; 238 } 239 240 // remove from the cycle set 241 if(node->m_cycle) 242 _cycleRemoveRef(node); 243 244 // remove it 245 ASSERT(m_nodes.size()); 246 remove( 247 m_nodes.begin(), 248 m_nodes.end(), 249 node); 250 251 // should have removed one and only one entry 252 m_nodes.resize(m_nodes.size()-1); 253 254 // // 6aug99: the timesource is now the sync node... 255 // // is this the sync node? reassign if so 256 // 257 // if(node->node() == m_syncNode) { 258 // 259 // // look for another sync-capable node 260 // bool found = false; 261 // for(int n = 0; !found && n < m_nodes.size(); ++n) 262 // if(setSyncNode(m_nodes[n]->node()) == B_OK) 263 // found = true; 264 // 265 // // no luck? admit defeat: 266 // if(!found) { 267 // PRINT(( 268 // "* NodeGroup::removeNode(): no sync-capable nodes left!\n")); 269 // 270 // // +++++ stop & set to invalid state? 271 // 272 // setSyncNode(media_node::null); 273 // } 274 // } 275 276 // stop the node if necessary 277 status_t err = node->_stop(); 278 if(err < B_OK) { 279 PRINT(( 280 "*** NodeGroup::removeNode('%s'): error from node->_stop():\n" 281 " %s\n", 282 node->name(), 283 strerror(err))); 284 } 285 286 // clear the node's group pointer 287 node->_setGroup(0); 288 289 // release the manager lock; the node is now ungrouped and 290 // unlocked 291 m_manager->unlock(); 292 293 // was that the last node? stop/disable the transport if so 294 if(!m_nodes.size()) { 295 296 // +++++ kill sync thread(s) 297 298 _changeState(TRANSPORT_INVALID); 299 } 300 301 // send notification 302 if(!LockLooper()) { 303 ASSERT(!"LockLooper() failed."); 304 } 305 BMessage m(M_NODE_REMOVED); 306 m.AddInt32("groupID", id()); 307 m.AddInt32("nodeID", node->id()); 308 notify(&m); 309 UnlockLooper(); 310 311 // success 312 return B_OK; 313 } 314 315 status_t NodeGroup::removeNode( 316 uint32 index) { 317 318 D_METHOD(( 319 "NodeGroup::removeNode(by index)\n")); 320 321 // +++++ icky nested lock 322 Autolock _l(this); 323 324 ASSERT(m_nodes.size() > index); 325 return removeNode(m_nodes[index]); 326 } 327 328 uint32 NodeGroup::groupFlags() const { 329 Autolock _l(this); 330 return m_flags; 331 } 332 333 status_t NodeGroup::setGroupFlags( 334 uint32 flags) { 335 Autolock _l(this); 336 m_flags = flags; 337 return B_OK; 338 } 339 340 341 // returns true if one or more nodes in the group have cycling 342 // enabled, and the start- and end-positions are valid 343 bool NodeGroup::canCycle() const { 344 Autolock _l(this); 345 346 return 347 m_cycleNodes.size() > 0 && 348 m_endPosition - m_startPosition > s_minCyclePeriod; 349 } 350 351 // -------------------------------------------------------- // 352 // *** TRANSPORT POSITIONING (LOCK REQUIRED) 353 // -------------------------------------------------------- // 354 355 // Fetch the current transport state 356 357 NodeGroup::transport_state_t NodeGroup::transportState() const { 358 Autolock _l(this); 359 return m_transportState; 360 } 361 362 // Set the starting media time: 363 // This is the point at which playback will begin in any media 364 // files/documents being played by the nodes in this group. 365 // When cycle mode is enabled, this is the point to which each 366 // node will be seek'd at the end of each cycle (loop). 367 // 368 // The starting time can't be changed in the B_OFFLINE run mode 369 // (this call will return an error.) 370 371 status_t NodeGroup::setStartPosition( 372 bigtime_t start) { 373 Autolock _l(this); 374 375 D_METHOD(( 376 "NodeGroup::setStartPosition(%Ld)\n", start)); 377 378 if( 379 m_transportState == TRANSPORT_RUNNING || 380 m_transportState == TRANSPORT_ROLLING || 381 m_transportState == TRANSPORT_STARTING) { 382 383 if(m_runMode == BMediaNode::B_OFFLINE) 384 return B_NOT_ALLOWED; 385 386 ASSERT(m_timeSourceObj); 387 388 if(_cycleValid()) { 389 if(m_timeSourceObj->Now() >= m_cycleDeadline) { 390 // too late to change start position; defer 391 // PRINT((" - deferred\n")); 392 m_newStartPosition = start; 393 m_newStart = true; 394 return B_OK; 395 } 396 397 // not at deadline yet; fall through to set start position 398 } 399 } 400 401 m_startPosition = start; 402 403 // +++++ notify [e.moon 11oct99] 404 405 return B_OK; 406 } 407 408 // Fetch the starting position: 409 410 // +++++ if a previously-set start position was deferred, it won't be 411 // returned yet 412 413 bigtime_t NodeGroup::startPosition() const { 414 Autolock _l(this); 415 416 return m_startPosition; 417 } 418 419 // Set the ending media time: 420 // This is the point at which playback will end relative to 421 // media documents begin played by the nodes in this group; 422 // in cycle mode, this specifies the loop point. If the 423 // ending time is less than or equal to the starting time, 424 // the transport will continue until stopped manually. 425 // If the end position is changed while the transport is playing, 426 // it must take effect retroactively (if it's before the current 427 // position and looping is enabled, all nodes must 'warp' to 428 // the proper post-loop position.) 429 // 430 // The ending time can't be changed if run mode is B_OFFLINE and 431 // the transport is running (this call will return an error.) 432 433 status_t NodeGroup::setEndPosition( 434 bigtime_t end) { 435 Autolock _l(this); 436 437 D_METHOD(( 438 "NodeGroup::setEndPosition(%Ld)\n", end)); 439 440 if( 441 m_transportState == TRANSPORT_RUNNING || 442 m_transportState == TRANSPORT_ROLLING || 443 m_transportState == TRANSPORT_STARTING) { 444 445 if(m_runMode == BMediaNode::B_OFFLINE) 446 return B_NOT_ALLOWED; 447 448 ASSERT(m_timeSourceObj); 449 450 bigtime_t endDelta = end - m_endPosition; 451 452 if(_cycleValid()) { 453 if(m_timeSourceObj->Now() >= m_cycleDeadline + endDelta) { 454 // too late to change end position; defer 455 // PRINT((" - deferred\n")); 456 m_newEndPosition = end; 457 m_newEnd = true; 458 return B_OK; 459 } 460 else { 461 // set new end position 462 m_endPosition = end; 463 464 // inform thread 465 ASSERT(m_cyclePort); 466 write_port( 467 m_cyclePort, 468 _CYCLE_END_CHANGED, 469 0, 470 0); 471 // 472 // // restart nodes' cycle threads with new end position 473 // _cycleInit(m_cycleStart); 474 // for(node_set::iterator it = m_cycleNodes.begin(); 475 // it != m_cycleNodes.end(); ++it) { 476 // (*it)->_scheduleCycle(m_cycleBoundary); 477 // } 478 // return B_OK; 479 } 480 } 481 } 482 483 m_endPosition = end; 484 485 // +++++ notify [e.moon 11oct99] 486 487 return B_OK; 488 } 489 490 491 // Fetch the end position: 492 // Note that if the end position is less than or equal to the start 493 // position, it's ignored. 494 495 // +++++ if a previously-set end position was deferred, it won't be 496 // returned yet 497 498 bigtime_t NodeGroup::endPosition() const { 499 Autolock _l(this); 500 return m_endPosition; 501 } 502 503 // -------------------------------------------------------- // 504 // *** TRANSPORT OPERATIONS (LOCK REQUIRED) 505 // -------------------------------------------------------- // 506 507 // Preroll the group: 508 // Seeks, then prerolls, each node in the group (honoring the 509 // NO_SEEK and NO_PREROLL flags.) This ensures that the group 510 // can start as quickly as possible. 511 // 512 // Does not return until all nodes in the group have been 513 // prepared. 514 515 status_t NodeGroup::preroll() { 516 D_METHOD(( 517 "NodeGroup::preroll()\n")); 518 519 Autolock _l(this); 520 return _preroll(); 521 } 522 523 // Start all nodes in the group: 524 // Nodes with the NO_START_STOP flag aren't molested. 525 526 status_t NodeGroup::start() { 527 D_METHOD(( 528 "NodeGroup::start()\n")); 529 530 Autolock _l(this); 531 return _start(); 532 } 533 534 // Stop all nodes in the group: 535 // Nodes with the NO_START_STOP flag aren't molested. 536 537 status_t NodeGroup::stop() { 538 D_METHOD(( 539 "NodeGroup::stop()\n")); 540 541 Autolock _l(this); 542 return _stop(); 543 } 544 545 // Roll all nodes in the group: 546 // Queues a start and stop atomically (via BMediaRoster::RollNode()). 547 // Returns B_NOT_ALLOWED if endPosition <= startPosition; 548 549 status_t NodeGroup::roll() { 550 D_METHOD(( 551 "NodeGroup::roll()\n")); 552 553 Autolock _l(this); 554 return _roll(); 555 } 556 557 // -------------------------------------------------------- // 558 // *** TIME SOURCE & RUN-MODE OPERATIONS (LOCK REQUIRED) 559 // -------------------------------------------------------- // 560 561 // time source control: 562 // getTimeSource(): 563 // returns B_ERROR if no time source has been set; otherwise, 564 // returns the node ID of the current time source for all 565 // nodes in the group. 566 // 567 // setTimeSource(): 568 // Calls SetTimeSourceFor() on every node in the group. 569 // The group must be stopped; B_NOT_ALLOWED will be returned 570 // if the state is TRANSPORT_RUNNING or TRANSPORT_ROLLING. 571 572 status_t NodeGroup::getTimeSource( 573 media_node* outTimeSource) const { 574 Autolock _l(this); 575 576 if(m_timeSource != media_node::null) { 577 *outTimeSource = m_timeSource; 578 return B_OK; 579 } 580 return B_ERROR; 581 } 582 583 status_t NodeGroup::setTimeSource( 584 const media_node& timeSource) { 585 586 Autolock _l(this); 587 588 if(m_transportState == TRANSPORT_RUNNING || m_transportState == TRANSPORT_ROLLING) 589 return B_NOT_ALLOWED; 590 591 if(m_timeSourceObj) 592 m_timeSourceObj->Release(); 593 594 m_timeSource = timeSource; 595 596 // cache a BTimeSource* 597 m_timeSourceObj = m_manager->roster->MakeTimeSourceFor(timeSource); 598 ASSERT(m_timeSourceObj); 599 600 // apply new time source to all nodes 601 for_each( 602 m_nodes.begin(), 603 m_nodes.end(), 604 bind2nd( 605 mem_fun(&NodeRef::_setTimeSource), 606 m_timeSource.node 607 ) 608 ); 609 610 // // try to set as sync node 611 // err = setSyncNode(timeSource); 612 // if(err < B_OK) { 613 // PRINT(( 614 // "* NodeGroup::setTimeSource(): setSyncNode() failed: %s\n", 615 // strerror(err))); 616 // } 617 618 // notify 619 if(!LockLooper()) { 620 ASSERT(!"LockLooper() failed."); 621 } 622 BMessage m(M_TIME_SOURCE_CHANGED); 623 m.AddInt32("groupID", id()); 624 m.AddInt32("timeSourceID", timeSource.node); 625 notify(&m); 626 UnlockLooper(); 627 628 return B_OK; 629 } 630 631 // run mode access: 632 // Sets the default run mode for the group. This will be 633 // applied to every node with a wildcard (0) run mode. 634 // 635 // Special case: if the run mode is B_OFFLINE, it will be 636 // applied to all nodes in the group. 637 638 BMediaNode::run_mode NodeGroup::runMode() const { 639 Autolock _l(this); 640 return m_runMode; 641 } 642 643 status_t NodeGroup::setRunMode(BMediaNode::run_mode mode) { 644 Autolock _l(this); 645 646 m_runMode = mode; 647 648 // apply to all nodes 649 for_each( 650 m_nodes.begin(), 651 m_nodes.end(), 652 bind2nd( 653 mem_fun(&NodeRef::_setRunModeAuto), 654 m_runMode 655 ) 656 // bound_method( 657 // *this, 658 // &NodeGroup::setRunModeFor) 659 ); 660 661 662 return B_OK; 663 } 664 665 // -------------------------------------------------------- // 666 // *** BHandler 667 // -------------------------------------------------------- // 668 669 void NodeGroup::MessageReceived( 670 BMessage* message) { 671 672 // PRINT(( 673 // "NodeGroup::MessageReceived():\n")); 674 // message->PrintToStream(); 675 status_t err; 676 677 switch(message->what) { 678 case M_SET_TIME_SOURCE: 679 { 680 media_node timeSource; 681 void* data; 682 ssize_t dataSize; 683 err = message->FindData( 684 "timeSourceNode", 685 B_RAW_TYPE, 686 (const void**)&data, 687 &dataSize); 688 if(err < B_OK) { 689 PRINT(( 690 "* NodeGroup::MessageReceived(M_SET_TIME_SOURCE):\n" 691 " no timeSourceNode!\n")); 692 break; 693 } 694 timeSource = *(media_node*)data; 695 696 setTimeSource(timeSource); 697 } 698 break; 699 700 case M_SET_RUN_MODE: 701 { 702 uint32 runMode; 703 err = message->FindInt32("runMode", (int32*)&runMode); 704 if(err < B_OK) { 705 PRINT(( 706 "* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n" 707 " no runMode!\n")); 708 break; 709 } 710 711 if(runMode < BMediaNode::B_OFFLINE || 712 runMode > BMediaNode::B_RECORDING) { 713 PRINT(( 714 "* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n" 715 " invalid run mode (%ld)\n", runMode)); 716 break; 717 } 718 719 setRunMode((BMediaNode::run_mode)runMode); 720 } 721 break; 722 723 case M_SET_START_POSITION: 724 { 725 bigtime_t position; 726 err = message->FindInt64("position", (int64*)&position); 727 if(err < B_OK) { 728 PRINT(( 729 "* NodeGroup::MessageReceived(M_SET_START_POSITION):\n" 730 " no position!\n")); 731 break; 732 } 733 setStartPosition(position); 734 } 735 break; 736 737 case M_SET_END_POSITION: 738 { 739 bigtime_t position; 740 err = message->FindInt64("position", (int64*)&position); 741 if(err < B_OK) { 742 PRINT(( 743 "* NodeGroup::MessageReceived(M_SET_END_POSITION):\n" 744 " no position!\n")); 745 break; 746 } 747 setEndPosition(position); 748 } 749 break; 750 751 case M_PREROLL: 752 preroll(); 753 break; 754 755 case M_START: 756 start(); 757 break; 758 759 case M_STOP: 760 stop(); 761 break; 762 763 case M_ROLL: 764 roll(); 765 break; 766 767 default: 768 _inherited::MessageReceived(message); 769 break; 770 } 771 } 772 773 774 // -------------------------------------------------------- // 775 // *** IPersistent 776 // -------------------------------------------------------- // 777 778 // ! 779 #if CORTEX_XML 780 // ! 781 782 // +++++ 783 784 // Default constructor 785 NodeGroup::NodeGroup() : 786 m_manager(0) {} // +++++ finish initialization 787 788 789 // ! 790 #endif /*CORTEX_XML*/ 791 // ! 792 793 // -------------------------------------------------------- // 794 // *** IObservable: [19aug99] 795 // -------------------------------------------------------- // 796 797 void NodeGroup::observerAdded( 798 const BMessenger& observer) { 799 800 BMessage m(M_OBSERVER_ADDED); 801 m.AddInt32("groupID", id()); 802 m.AddMessenger("target", BMessenger(this)); 803 observer.SendMessage(&m); 804 } 805 806 void NodeGroup::observerRemoved( 807 const BMessenger& observer) { 808 809 BMessage m(M_OBSERVER_REMOVED); 810 m.AddInt32("groupID", id()); 811 m.AddMessenger("target", BMessenger(this)); 812 observer.SendMessage(&m); 813 } 814 815 void NodeGroup::notifyRelease() { 816 817 BMessage m(M_RELEASED); 818 m.AddInt32("groupID", id()); 819 m.AddMessenger("target", BMessenger(this)); 820 notify(&m); 821 } 822 823 void NodeGroup::releaseComplete() { 824 // +++++ 825 } 826 827 // -------------------------------------------------------- // 828 // *** ILockable: pass lock requests to m_lock 829 // -------------------------------------------------------- // 830 831 bool NodeGroup::lock( 832 lock_t type, 833 bigtime_t timeout) { 834 835 D_LOCK(("*** NodeGroup::lock(): %ld\n", find_thread(0))); 836 837 ASSERT(type == WRITE); 838 status_t err = m_lock.LockWithTimeout(timeout); 839 840 D_LOCK(("*** NodeGroup::lock() ACQUIRED: %ld\n", find_thread(0))); 841 842 return err == B_OK; 843 } 844 845 bool NodeGroup::unlock( 846 lock_t type) { 847 848 D_LOCK(("*** NodeGroup::unlock(): %ld\n", find_thread(0))); 849 850 ASSERT(type == WRITE); 851 m_lock.Unlock(); 852 853 D_LOCK(("*** NodeGroup::unlock() RELEASED: %ld\n", find_thread(0))); 854 855 return true; 856 } 857 858 bool NodeGroup::isLocked( 859 lock_t type) const { 860 861 ASSERT(type == WRITE); 862 return m_lock.IsLocked(); 863 } 864 865 // -------------------------------------------------------- // 866 // *** ctor (accessible to NodeManager) 867 // -------------------------------------------------------- // 868 869 NodeGroup::NodeGroup( 870 const char* name, 871 NodeManager* manager, 872 BMediaNode::run_mode runMode) : 873 874 ObservableHandler(name), 875 m_lock("NodeGroup::m_lock"), 876 m_manager(manager), 877 m_id(NextID()), 878 m_name(name), 879 m_flags(0), 880 m_transportState(TRANSPORT_INVALID), 881 m_runMode(runMode), 882 m_timeSourceObj(0), 883 m_released(false), 884 m_cycleThread(0), 885 m_cyclePort(0), 886 m_startPosition(0LL), 887 m_endPosition(0LL), 888 m_newStart(false), 889 m_newEnd(false) { 890 891 ASSERT(m_manager); 892 893 if(!m_manager->Lock()) { 894 ASSERT(!"m_manager->Lock() failed"); 895 } 896 m_manager->AddHandler(this); 897 m_manager->Unlock(); 898 899 // set default time source 900 media_node ts; 901 D_ROSTER(("# roster->GetTimeSource()\n")); 902 status_t err = m_manager->roster->GetTimeSource(&ts); 903 if(err < B_OK) { 904 PRINT(( 905 "*** NodeGroup(): roster->GetTimeSource() failed:\n" 906 " %s\n", strerror(err))); 907 } 908 setTimeSource(ts); 909 } 910 911 // -------------------------------------------------------- // 912 // *** internal operations 913 // -------------------------------------------------------- // 914 915 uint32 NodeGroup::s_nextID = 1; 916 uint32 NodeGroup::NextID() { 917 return atomic_add((int32*)&s_nextID, 1); 918 } 919 920 // -------------------------------------------------------- // 921 // *** ref->group communication (LOCK REQUIRED) 922 // -------------------------------------------------------- // 923 924 // when a NodeRef's cycle state (ie. looping or not looping) 925 // changes, it must pass that information on via this method 926 927 void NodeGroup::_refCycleChanged( 928 NodeRef* ref) { 929 assert_locked(this); 930 D_METHOD(( 931 "NodeGroup::_refCycleChanged('%s')\n", 932 ref->name())); 933 934 if(ref->m_cycle) { 935 _cycleAddRef(ref); 936 } else { 937 _cycleRemoveRef(ref); 938 } 939 940 // +++++ if running & cycle valid, the node should be properly 941 // seek'd and start'd 942 } 943 944 945 // when a cycling node's latency changes, call this method. 946 947 void NodeGroup::_refLatencyChanged( 948 NodeRef* ref) { 949 assert_locked(this); 950 D_METHOD(( 951 "NodeGroup::_refLatencyChanged('%s')\n", 952 ref->name())); 953 954 if(!_cycleValid()) 955 return; 956 957 // remove & replace ref (positions it properly) 958 _cycleRemoveRef(ref); 959 _cycleAddRef(ref); 960 961 // slap my thread up 962 ASSERT(m_cyclePort); 963 write_port( 964 m_cyclePort, 965 _CYCLE_LATENCY_CHANGED, 966 0, 967 0); 968 969 // +++++ zat it? 970 } 971 972 // when a NodeRef receives notification that it has been stopped, 973 // but is labeled as still running, it must call this method. 974 // [e.moon 11oct99: roll/B_OFFLINE support] 975 976 void NodeGroup::_refStopped( 977 NodeRef* ref) { 978 assert_locked(this); 979 D_METHOD(( 980 "NodeGroup::_refStopped('%s')\n", 981 ref->name())); 982 983 // roll/B_OFFLINE support [e.moon 11oct99] 984 // (check to see if any other nodes in the group are still running; 985 // mark group stopped if not.) 986 if(m_transportState == TRANSPORT_ROLLING) { 987 bool nodesRunning = false; 988 for(node_set::iterator it = m_nodes.begin(); 989 it != m_nodes.end(); ++it) { 990 if((*it)->isRunning()) { 991 nodesRunning = true; 992 break; 993 } 994 } 995 if(!nodesRunning) 996 // the group has stopped; update transport state 997 _changeState(TRANSPORT_STOPPED); 998 999 } 1000 1001 } 1002 1003 1004 // -------------------------------------------------------- // 1005 // *** transport helpers (LOCK REQUIRED) 1006 // -------------------------------------------------------- // 1007 1008 1009 // Preroll all nodes in the group; this is the implementation 1010 // of preroll(). 1011 // *** this method should not be called from the transport thread 1012 // (since preroll operations can block for a relatively long time.) 1013 1014 status_t NodeGroup::_preroll() { 1015 assert_locked(this); 1016 1017 D_METHOD(( 1018 "NodeGroup::_preroll()\n")); 1019 1020 if( 1021 m_transportState == TRANSPORT_RUNNING || 1022 m_transportState == TRANSPORT_ROLLING) 1023 // too late 1024 return B_NOT_ALLOWED; 1025 1026 // * preroll all nodes to the start position 1027 1028 // +++++ currently, if an error is encountered it's ignored. 1029 // should the whole operation fail if one node couldn't 1030 // be prerolled? 1031 // 1032 // My gut response is 'no', since the preroll step is 1033 // optional, but the caller should have some inkling that 1034 // one of its nodes didn't behave. 1035 1036 // [e.moon 13oct99] making PPC compiler happy 1037 // for_each( 1038 // m_nodes.begin(), 1039 // m_nodes.end(), 1040 // bind2nd( 1041 // mem_fun(&NodeRef::_preroll), 1042 // m_startPosition 1043 // ) 1044 // ); 1045 for(node_set::iterator it = m_nodes.begin(); 1046 it != m_nodes.end(); ++it) { 1047 (*it)->_preroll(m_startPosition); 1048 } 1049 1050 // replaces 1051 // bind2nd( 1052 // bound_method(*this, &NodeGroup::prerollNode), 1053 // m_startPosition 1054 // ) 1055 1056 return B_OK; 1057 } 1058 1059 1060 //// functor: calculates latency of each node it's handed, caching 1061 //// the largest one found; includes initial latency if nodes report it. 1062 // 1063 //class NodeGroup::calcLatencyFn { public: 1064 // bigtime_t& maxLatency; 1065 // 1066 // calcLatencyFn(bigtime_t& _m) : maxLatency(_m) {} 1067 // 1068 // void operator()(NodeRef* r) { 1069 // ASSERT(r); 1070 // 1071 //// PRINT(( 1072 //// "# calcLatencyFn(): '%s'\n", 1073 //// r->name())); 1074 // 1075 // if(!(r->node().kind & B_BUFFER_PRODUCER)) { 1076 // // node can't incur latency 1077 //// PRINT(( 1078 //// "- not a producer\n")); 1079 // return; 1080 // } 1081 // 1082 // bigtime_t latency; 1083 // status_t err = 1084 // BMediaRoster::Roster()->GetLatencyFor( 1085 // r->node(), 1086 // &latency); 1087 // if(err < B_OK) { 1088 // PRINT(( 1089 // "* calcLatencyFn: GetLatencyFor() failed: %s\n", 1090 // strerror(err))); 1091 // return; 1092 // } 1093 //// PRINT(("- %Ld\n", latency)); 1094 // 1095 // bigtime_t add; 1096 // err = BMediaRoster::Roster()->GetInitialLatencyFor( 1097 // r->node(), 1098 // &add); 1099 //// PRINT(("- %Ld\n", add)); 1100 // if(err < B_OK) { 1101 // PRINT(( 1102 // "* calcLatencyFn: GetInitialLatencyFor() failed: %s\n", 1103 // strerror(err))); 1104 // } 1105 // else 1106 // latency += add; 1107 // 1108 // if(latency > maxLatency) 1109 // maxLatency = latency; 1110 // 1111 //// PRINT(( 1112 //// "- max latency: %Ld\n", 1113 //// maxLatency)); 1114 // } 1115 //}; 1116 1117 // Start all nodes in the group; this is the implementation of 1118 // start(). Fails if the run mode is B_OFFLINE; use _roll() instead 1119 // in that case. 1120 // 1121 // (this may be called from the transport thread or from 1122 // an API-implementation method.) 1123 1124 status_t NodeGroup::_start() { 1125 assert_locked(this); 1126 1127 D_METHOD(( 1128 "NodeGroup::_start()\n")); 1129 status_t err; 1130 1131 if(m_transportState != TRANSPORT_STOPPED) 1132 return B_NOT_ALLOWED; 1133 1134 if(m_runMode == BMediaNode::B_OFFLINE) 1135 return B_NOT_ALLOWED; 1136 1137 ASSERT(m_nodes.size()); 1138 1139 _changeState(TRANSPORT_STARTING); 1140 1141 // * Find the highest latency in the group 1142 1143 bigtime_t offset = 0LL; 1144 calcLatencyFn _f(offset); 1145 for_each( 1146 m_nodes.begin(), 1147 m_nodes.end(), 1148 _f); 1149 1150 offset += s_rosterLatency; 1151 PRINT(( 1152 "- offset: %Ld\n", offset)); 1153 1154 // * Seek all nodes (in case one or more failed to preroll) 1155 1156 for(node_set::iterator it = m_nodes.begin(); 1157 it != m_nodes.end(); ++it) { 1158 err = (*it)->_seekStopped(m_startPosition); 1159 if(err < B_OK) { 1160 PRINT(( 1161 "! NodeGroup('%s')::_start():\n" 1162 " ref('%s')->_seekStopped(%Ld) failed:\n" 1163 " %s\n", 1164 name(), (*it)->name(), m_startPosition, 1165 strerror(err))); 1166 1167 // +++++ continue? 1168 } 1169 } 1170 1171 // * Start all nodes, allowing for the max latency found 1172 1173 ASSERT(m_timeSourceObj); 1174 bigtime_t when = m_timeSourceObj->Now() + offset; 1175 1176 // 10aug99: initialize cycle (loop) settings 1177 if(_cycleValid()) { 1178 _initCycleThread(); 1179 _cycleInit(when); 1180 } 1181 1182 // start the nodes 1183 for(node_set::iterator it = m_nodes.begin(); 1184 it != m_nodes.end(); ++it) { 1185 err = (*it)->_start(when); 1186 if(err < B_OK) { 1187 PRINT(( 1188 "! NodeGroup('%s')::_start():\n" 1189 " ref('%s')->_start(%Ld) failed:\n" 1190 " %s\n", 1191 name(), (*it)->name(), when, 1192 strerror(err))); 1193 1194 // +++++ continue? 1195 } 1196 } 1197 1198 // notify observers 1199 _changeState(TRANSPORT_RUNNING); 1200 return B_OK; 1201 } 1202 1203 // Stop all nodes in the group; this is the implementation of 1204 // stop(). 1205 // 1206 // (this may be called from the transport thread or from 1207 // an API-implementation method.) 1208 1209 status_t NodeGroup::_stop() { 1210 1211 D_METHOD(( 1212 "NodeGroup::_stop()\n")); 1213 1214 assert_locked(this); 1215 1216 if( 1217 m_transportState != TRANSPORT_RUNNING && 1218 m_transportState != TRANSPORT_ROLLING) 1219 return B_NOT_ALLOWED; 1220 1221 _changeState(TRANSPORT_STOPPING); 1222 1223 // * stop the cycle thread if need be 1224 _destroyCycleThread(); 1225 1226 // * stop all nodes 1227 // +++++ error reports would be nice 1228 1229 for_each( 1230 m_nodes.begin(), 1231 m_nodes.end(), 1232 mem_fun(&NodeRef::_stop) 1233 ); 1234 1235 // update transport state 1236 _changeState(TRANSPORT_STOPPED); 1237 1238 return B_OK; 1239 } 1240 1241 // Roll all nodes in the group; this is the implementation of 1242 // roll(). 1243 // 1244 // (this may be called from the transport thread or from 1245 // an API-implementation method.) 1246 1247 status_t NodeGroup::_roll() { 1248 1249 D_METHOD(( 1250 "NodeGroup::_roll()\n")); 1251 assert_locked(this); 1252 status_t err; 1253 1254 if(m_transportState != TRANSPORT_STOPPED) 1255 return B_NOT_ALLOWED; 1256 1257 bigtime_t period = m_endPosition - m_startPosition; 1258 if(period <= 0LL) 1259 return B_NOT_ALLOWED; 1260 1261 _changeState(TRANSPORT_STARTING); 1262 1263 bigtime_t tpStart = 0LL; 1264 bigtime_t tpStop = period; 1265 1266 if(m_runMode != BMediaNode::B_OFFLINE) { 1267 1268 // * Find the highest latency in the group 1269 bigtime_t offset = 0LL; 1270 calcLatencyFn _f(offset); 1271 for_each( 1272 m_nodes.begin(), 1273 m_nodes.end(), 1274 _f); 1275 1276 offset += s_rosterLatency; 1277 PRINT(( 1278 "- offset: %Ld\n", offset)); 1279 1280 ASSERT(m_timeSourceObj); 1281 tpStart = m_timeSourceObj->Now() + offset; 1282 tpStop += tpStart; 1283 } 1284 1285 // * Roll all nodes; watch for errors 1286 bool allFailed = true; 1287 err = B_OK; 1288 for( 1289 node_set::iterator it = m_nodes.begin(); 1290 it != m_nodes.end(); ++it) { 1291 1292 status_t e = (*it)->_roll( 1293 tpStart, 1294 tpStop, 1295 m_startPosition); 1296 if(e < B_OK) 1297 err = e; 1298 else 1299 allFailed = false; 1300 } 1301 1302 if(!allFailed) 1303 // notify observers 1304 _changeState(TRANSPORT_ROLLING); 1305 1306 return err; 1307 } 1308 1309 1310 // State transition; notify listeners 1311 // +++++ [18aug99] DANGER: should notification happen in the middle 1312 // of such an operation? 1313 inline void NodeGroup::_changeState( 1314 transport_state_t to) { 1315 1316 assert_locked(this); 1317 1318 m_transportState = to; 1319 1320 if(!LockLooper()) { 1321 ASSERT(!"LockLooper() failed."); 1322 } 1323 BMessage m(M_TRANSPORT_STATE_CHANGED); 1324 m.AddInt32("groupID", id()); 1325 m.AddInt32("transportState", m_transportState); 1326 notify(&m); 1327 UnlockLooper(); 1328 } 1329 1330 // Enforce a state transition, and notify listeners 1331 inline void NodeGroup::_changeState( 1332 transport_state_t from, 1333 transport_state_t to) { 1334 1335 assert_locked(this); 1336 ASSERT(m_transportState == from); 1337 1338 _changeState(to); 1339 } 1340 1341 1342 // -------------------------------------------------------- // 1343 // *** cycle thread & helpers (LOCK REQUIRED) 1344 // -------------------------------------------------------- // 1345 1346 // *** cycle port definitions 1347 1348 const int32 _portLength = 32; 1349 const char* const _portName = "NodeGroup::m_cyclePort"; 1350 const size_t _portMsgMaxSize = 256; 1351 1352 1353 // set up the cycle thread (including its kernel port) 1354 status_t NodeGroup::_initCycleThread() { 1355 assert_locked(this); 1356 status_t err; 1357 D_METHOD(( 1358 "NodeGroup::_initCycleThread()\n")); 1359 1360 if(m_cycleThread) { 1361 // thread is still alive 1362 err = _destroyCycleThread(); 1363 if(err < B_OK) 1364 return err; 1365 } 1366 1367 // create 1368 m_cycleThreadDone = false; 1369 m_cycleThread = spawn_thread( 1370 &_CycleThread, 1371 "NodeGroup[cycleThread]", 1372 B_NORMAL_PRIORITY, 1373 (void*)this); 1374 if(m_cycleThread < B_OK) { 1375 PRINT(( 1376 "* NodeGroup::_initCycleThread(): spawn_thread() failed:\n" 1377 " %s\n", 1378 strerror(m_cycleThread))); 1379 return m_cycleThread; 1380 } 1381 1382 // launch 1383 return resume_thread(m_cycleThread); 1384 } 1385 1386 // shut down the cycle thread/port 1387 status_t NodeGroup::_destroyCycleThread() { 1388 assert_locked(this); 1389 status_t err; 1390 D_METHOD(( 1391 "NodeGroup::_destroyCycleThread()\n")); 1392 1393 if(!m_cycleThread) 1394 return B_OK; 1395 1396 if(!m_cycleThreadDone) { 1397 // kill the thread 1398 ASSERT(m_cyclePort); 1399 err = write_port_etc( 1400 m_cyclePort, 1401 _CYCLE_STOP, 1402 0, 1403 0, 1404 B_TIMEOUT, 1405 10000LL); 1406 1407 if(err < B_OK) { 1408 // bad thread. die, thread, die. 1409 PRINT(( 1410 "* NodeGroup::_destroyCycleThread(): port write failed; killing.\n")); 1411 delete_port(m_cyclePort); 1412 m_cyclePort = 0; 1413 kill_thread(m_cycleThread); 1414 m_cycleThread = 0; 1415 return B_OK; 1416 } 1417 1418 // the thread got the message; wait for it to quit 1419 unlock(); 1420 while(wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED) { 1421 PRINT(( 1422 "! wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED\n")); 1423 } 1424 lock(); 1425 } 1426 1427 // it's up to the thread to close its port 1428 ASSERT(!m_cyclePort); 1429 1430 m_cycleThread = 0; 1431 1432 return B_OK; 1433 } 1434 1435 1436 // 1) do the current positions specify a valid cycle region? 1437 // 2) are any nodes in the group cycle-enabled? 1438 1439 bool NodeGroup::_cycleValid() { 1440 assert_locked(this); 1441 return 1442 (m_transportState == TRANSPORT_RUNNING || 1443 m_transportState == TRANSPORT_STARTING) && 1444 canCycle(); 1445 } 1446 1447 // initialize the cycle members (call when starting) 1448 1449 void NodeGroup::_cycleInit( 1450 bigtime_t startTime) { 1451 assert_locked(this); 1452 ASSERT(m_cycleNodes.size() > 0); 1453 D_METHOD(( 1454 "NodeGroup::_cycleInit(%Ld)\n", 1455 startTime)); 1456 1457 // +++++ rescan latencies? 1458 1459 // figure new boundary & deadline from region length 1460 bigtime_t cyclePeriod = m_endPosition - m_startPosition; 1461 1462 if(cyclePeriod <= 0) { 1463 // cycle region is no longer valid 1464 m_cycleBoundary = 0LL; 1465 m_cycleDeadline = 0LL; 1466 1467 // no no no -- deadlocks when the thread calls this method 1468 // // stop the thread 1469 // _destroyCycleThread(); 1470 return; 1471 } 1472 1473 m_cycleStart = startTime; 1474 m_cycleBoundary = startTime + cyclePeriod; 1475 m_cycleDeadline = m_cycleBoundary - (m_cycleMaxLatency + s_rosterLatency); 1476 } 1477 1478 1479 // add a ref to the cycle set (in proper order, based on latency) 1480 void NodeGroup::_cycleAddRef( 1481 NodeRef* ref) { 1482 assert_locked(this); 1483 1484 // make sure it's not already there 1485 ASSERT(find( 1486 m_cycleNodes.begin(), 1487 m_cycleNodes.end(), 1488 ref) == m_cycleNodes.end()); 1489 1490 // [re]calc latency if 0 1491 if(!ref->m_latency) 1492 ref->_updateLatency(); 1493 1494 node_set::iterator it; 1495 for(it = m_cycleNodes.begin(); 1496 it != m_cycleNodes.end(); ++it) { 1497 if(ref->m_latency > (*it)->m_latency) { 1498 m_cycleNodes.insert(it, ref); 1499 break; 1500 } 1501 } 1502 1503 // not inserted? new ref belongs at the end 1504 if(it == m_cycleNodes.end()) 1505 m_cycleNodes.insert(it, ref); 1506 } 1507 1508 // remove a ref from the cycle set 1509 void NodeGroup::_cycleRemoveRef( 1510 NodeRef* ref) { 1511 assert_locked(this); 1512 1513 node_set::iterator it = find( 1514 m_cycleNodes.begin(), 1515 m_cycleNodes.end(), 1516 ref); 1517 ASSERT(it != m_cycleNodes.end()); 1518 m_cycleNodes.erase(it); 1519 } 1520 1521 bigtime_t NodeGroup::_cycleBoundary() const { 1522 Autolock _l(this); 1523 return m_cycleBoundary; 1524 } 1525 1526 // cycle thread impl. 1527 /*static*/ 1528 status_t NodeGroup::_CycleThread(void* user) { 1529 ((NodeGroup*)user)->_cycleThread(); 1530 return B_OK; 1531 } 1532 1533 void NodeGroup::_cycleThread() { 1534 1535 status_t err; 1536 int32 code; 1537 int32 errorCount = 0; 1538 1539 // +++++ liability -- if the thread has to be killed, this buffer 1540 // won't be reclaimed 1541 char* msgBuffer = new char[_portMsgMaxSize]; 1542 array_delete<char> _d(msgBuffer); 1543 1544 // create port 1545 ASSERT(!m_cyclePort); 1546 m_cyclePort = create_port( 1547 _portLength, 1548 _portName); 1549 ASSERT(m_cyclePort >= B_OK); 1550 1551 // the message-handling loop 1552 bool done = false; 1553 while(!done) { 1554 1555 // *** wait until it's time to queue the next cycle, or until 1556 // *** a message arrives 1557 1558 lock(); // **** BEGIN LOCKED SECTION **** 1559 if(!_cycleValid()) { 1560 unlock(); 1561 break; 1562 } 1563 1564 ASSERT(m_cycleNodes.size() > 0); 1565 ASSERT(m_timeSourceObj); 1566 1567 bigtime_t maxLatency = m_cycleNodes.front()->m_latency; 1568 bigtime_t wakeUpAt = m_timeSourceObj->RealTimeFor( 1569 m_cycleBoundary, maxLatency + s_rosterLatency); 1570 bigtime_t timeout = wakeUpAt - m_timeSourceObj->RealTime(); 1571 1572 if(timeout <= 0) { 1573 // +++++ whoops, I'm late. 1574 // +++++ adjust to compensate !!! 1575 PRINT(( 1576 "*** NodeGroup::_cycleThread(): LATE\n" 1577 " by %Ld\n", -timeout)); 1578 } 1579 1580 // +++++ if timeout is very short, spin until the target time arrives 1581 1582 unlock(); // **** END LOCKED SECTION **** 1583 1584 // block until message arrives or it's time to wake up 1585 err = read_port_etc( 1586 m_cyclePort, 1587 &code, 1588 msgBuffer, 1589 _portMsgMaxSize, 1590 B_TIMEOUT, 1591 timeout); 1592 1593 if(err == B_TIMED_OUT) { 1594 // the time has come to seek my nodes 1595 _handleCycleService(); 1596 continue; 1597 } 1598 else if(err < B_OK) { 1599 // any other error is bad news 1600 PRINT(( 1601 "* NodeGroup::_cycleThread(): read_port error:\n" 1602 " %s\n" 1603 " ABORTING\n\n", strerror(err))); 1604 if(++errorCount > 10) { 1605 PRINT(( 1606 "*** Too many errors; aborting.\n")); 1607 break; 1608 } 1609 continue; 1610 } 1611 1612 errorCount = 0; 1613 1614 // process the message 1615 switch(code) { 1616 case _CYCLE_STOP: 1617 // bail 1618 done = true; 1619 break; 1620 1621 case _CYCLE_END_CHANGED: 1622 case _CYCLE_LATENCY_CHANGED: 1623 // fall through to next loop; for now, these messages 1624 // serve only to slap me out of my stupor and reassess 1625 // the timing situation... 1626 break; 1627 1628 default: 1629 PRINT(( 1630 "* NodeGroup::_cycleThread(): unknown message code '%ld'\n", code)); 1631 break; 1632 } 1633 } // while(!done) 1634 1635 1636 // delete port 1637 delete_port(m_cyclePort); 1638 m_cyclePort = 0; 1639 1640 // done 1641 m_cycleThreadDone = true; 1642 } 1643 1644 // cycle service: seek all nodes & initiate next cycle 1645 void NodeGroup::_handleCycleService() { 1646 Autolock _l(this); 1647 // D_METHOD(( 1648 // "NodeGroup::_handleCycleService()\n")); 1649 status_t err; 1650 1651 if(!_cycleValid()) { 1652 // PRINT(( 1653 // "- _handleCycleService(): cycle not valid; quitting.\n")); 1654 return; 1655 } 1656 1657 // seek 1658 for(node_set::iterator it = m_cycleNodes.begin(); 1659 it != m_cycleNodes.end(); ++it) { 1660 err = (*it)->_seek( 1661 m_startPosition, 1662 m_cycleBoundary); 1663 if(err < B_OK) { 1664 PRINT(( 1665 "- _handleCycleService(): node('%s')::_seek() failed:\n" 1666 " %s\n", 1667 (*it)->name(), strerror(err))); 1668 } 1669 } 1670 1671 // update cycle settings 1672 if(m_newStart) { 1673 m_newStart = false; 1674 m_startPosition = m_newStartPosition; 1675 } 1676 if(m_newEnd) { 1677 m_newEnd = false; 1678 m_endPosition = m_newEndPosition; 1679 } 1680 1681 // prepare next cycle 1682 _cycleInit(m_cycleBoundary); 1683 } 1684 1685 // END -- NodeGroup.cpp -- 1686