1 /* 2 * Copyright 2001-2007, Haiku. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Erik Jaesler (erik@cgsoftware.com) 7 * DarkWyrm (bpmagic@columbus.rr.com) 8 * Ingo Weinhold, bonefish@@users.sf.net 9 * Axel Dörfler, axeld@pinc-software.de 10 */ 11 12 /*! BLooper class spawns a thread that runs a message loop. */ 13 14 #include <AppMisc.h> 15 #include <AutoLocker.h> 16 #include <DirectMessageTarget.h> 17 #include <LooperList.h> 18 #include <MessagePrivate.h> 19 #include <TokenSpace.h> 20 21 #include <Autolock.h> 22 #include <Looper.h> 23 #include <Message.h> 24 #include <MessageFilter.h> 25 #include <MessageQueue.h> 26 #include <Messenger.h> 27 #include <PropertyInfo.h> 28 29 #include <new> 30 #include <stdio.h> 31 32 33 // debugging 34 //#define DBG(x) x 35 #define DBG(x) ; 36 #define PRINT(x) DBG({ printf("[%6ld] ", find_thread(NULL)); printf x; }) 37 38 /* 39 #include <Autolock.h> 40 #include <Locker.h> 41 static BLocker sDebugPrintLocker("BLooper debug print"); 42 #define PRINT(x) DBG({ \ 43 BAutolock _(sDebugPrintLocker); \ 44 debug_printf("[%6ld] ", find_thread(NULL)); \ 45 debug_printf x; \ 46 }) 47 */ 48 49 50 #define FILTER_LIST_BLOCK_SIZE 5 51 #define DATA_BLOCK_SIZE 5 52 53 // Globals --------------------------------------------------------------------- 54 using BPrivate::gDefaultTokens; 55 using BPrivate::gLooperList; 56 using BPrivate::BLooperList; 57 58 port_id _get_looper_port_(const BLooper* looper); 59 60 enum { 61 BLOOPER_PROCESS_INTERNALLY = 0, 62 BLOOPER_HANDLER_BY_INDEX 63 }; 64 65 static property_info gLooperPropInfo[] = { 66 { 67 "Handler", 68 {}, 69 {B_INDEX_SPECIFIER, B_REVERSE_INDEX_SPECIFIER}, 70 NULL, BLOOPER_HANDLER_BY_INDEX, 71 {}, 72 {}, 73 {} 74 }, 75 { 76 "Handlers", 77 {B_GET_PROPERTY}, 78 {B_DIRECT_SPECIFIER}, 79 NULL, BLOOPER_PROCESS_INTERNALLY, 80 {B_MESSENGER_TYPE}, 81 {}, 82 {} 83 }, 84 { 85 "Handler", 86 {B_COUNT_PROPERTIES}, 87 {B_DIRECT_SPECIFIER}, 88 NULL, BLOOPER_PROCESS_INTERNALLY, 89 {B_INT32_TYPE}, 90 {}, 91 {} 92 }, 93 {} 94 }; 95 96 struct _loop_data_ { 97 BLooper* looper; 98 thread_id thread; 99 }; 100 101 102 // #pragma mark - 103 104 105 BLooper::BLooper(const char* name, int32 priority, int32 port_capacity) 106 : BHandler(name) 107 { 108 _InitData(name, priority, port_capacity); 109 } 110 111 112 BLooper::~BLooper() 113 { 114 if (fRunCalled && !fTerminating) { 115 debugger("You can't call delete on a BLooper object " 116 "once it is running."); 117 } 118 119 Lock(); 120 121 // In case the looper thread calls Quit() fLastMessage is not deleted. 122 if (fLastMessage) { 123 delete fLastMessage; 124 fLastMessage = NULL; 125 } 126 127 // Close the message port and read and reply to the remaining messages. 128 if (fMsgPort >= 0) 129 close_port(fMsgPort); 130 131 // Clear the queue so our call to IsMessageWaiting() below doesn't give 132 // us bogus info 133 fDirectTarget->Close(); 134 135 BMessage *message; 136 while ((message = fDirectTarget->Queue()->NextMessage()) != NULL) { 137 delete message; 138 // msg will automagically post generic reply 139 } 140 141 do { 142 delete ReadMessageFromPort(0); 143 // msg will automagically post generic reply 144 } while (IsMessageWaiting()); 145 146 fDirectTarget->Release(); 147 delete_port(fMsgPort); 148 149 // Clean up our filters 150 SetCommonFilterList(NULL); 151 152 AutoLocker<BLooperList> ListLock(gLooperList); 153 RemoveHandler(this); 154 155 // Remove all the "child" handlers 156 BHandler *child; 157 while (CountHandlers()) { 158 child = HandlerAt(0); 159 if (child) 160 RemoveHandler(child); 161 } 162 163 Unlock(); 164 gLooperList.RemoveLooper(this); 165 delete_sem(fLockSem); 166 } 167 168 169 BLooper::BLooper(BMessage *data) 170 : BHandler(data) 171 { 172 int32 portCapacity; 173 if (data->FindInt32("_port_cap", &portCapacity) != B_OK 174 || portCapacity < 0) 175 portCapacity = B_LOOPER_PORT_DEFAULT_CAPACITY; 176 177 _InitData(Name(), B_NORMAL_PRIORITY, portCapacity); 178 } 179 180 181 BArchivable * 182 BLooper::Instantiate(BMessage *data) 183 { 184 if (validate_instantiation(data, "BLooper")) 185 return new BLooper(data); 186 187 return NULL; 188 } 189 190 191 status_t 192 BLooper::Archive(BMessage *data, bool deep) const 193 { 194 status_t status = BHandler::Archive(data, deep); 195 if (status < B_OK) 196 return status; 197 198 port_info info; 199 status = get_port_info(fMsgPort, &info); 200 if (status == B_OK) 201 status = data->AddInt32("_port_cap", info.capacity); 202 203 // TODO: what about the thread priority? 204 205 return status; 206 } 207 208 209 status_t 210 BLooper::PostMessage(uint32 command) 211 { 212 BMessage message(command); 213 return _PostMessage(&message, this, NULL); 214 } 215 216 217 status_t 218 BLooper::PostMessage(BMessage *message) 219 { 220 return _PostMessage(message, this, NULL); 221 } 222 223 224 status_t 225 BLooper::PostMessage(uint32 command, BHandler *handler, 226 BHandler *replyTo) 227 { 228 BMessage message(command); 229 return _PostMessage(&message, handler, replyTo); 230 } 231 232 233 status_t 234 BLooper::PostMessage(BMessage *message, BHandler *handler, 235 BHandler *replyTo) 236 { 237 return _PostMessage(message, handler, replyTo); 238 } 239 240 241 void 242 BLooper::DispatchMessage(BMessage *message, BHandler *handler) 243 { 244 PRINT(("BLooper::DispatchMessage(%.4s)\n", (char*)&message->what)); 245 246 switch (message->what) { 247 case _QUIT_: 248 // Can't call Quit() to do this, because of the slight chance 249 // another thread with have us locked between now and then. 250 fTerminating = true; 251 252 // After returning from DispatchMessage(), the looper will be 253 // deleted in _task0_() 254 break; 255 256 case B_QUIT_REQUESTED: 257 if (handler == this) { 258 _QuitRequested(message); 259 break; 260 } 261 262 // fall through 263 264 default: 265 handler->MessageReceived(message); 266 break; 267 } 268 PRINT(("BLooper::DispatchMessage() done\n")); 269 } 270 271 272 void 273 BLooper::MessageReceived(BMessage *msg) 274 { 275 // TODO: implement scripting support 276 BHandler::MessageReceived(msg); 277 } 278 279 280 BMessage* 281 BLooper::CurrentMessage() const 282 { 283 return fLastMessage; 284 } 285 286 287 BMessage* 288 BLooper::DetachCurrentMessage() 289 { 290 BMessage* msg = fLastMessage; 291 fLastMessage = NULL; 292 return msg; 293 } 294 295 296 BMessageQueue* 297 BLooper::MessageQueue() const 298 { 299 return fDirectTarget->Queue(); 300 } 301 302 303 bool 304 BLooper::IsMessageWaiting() const 305 { 306 AssertLocked(); 307 308 if (!fDirectTarget->Queue()->IsEmpty()) 309 return true; 310 311 int32 count; 312 do { 313 count = port_buffer_size_etc(fMsgPort, B_RELATIVE_TIMEOUT, 0); 314 } while (count == B_INTERRUPTED); 315 316 return count > 0; 317 } 318 319 320 void 321 BLooper::AddHandler(BHandler* handler) 322 { 323 if (handler == NULL) 324 return; 325 326 AssertLocked(); 327 328 if (handler->Looper() == NULL) { 329 fHandlers.AddItem(handler); 330 handler->SetLooper(this); 331 if (handler != this) // avoid a cycle 332 handler->SetNextHandler(this); 333 } 334 } 335 336 337 bool 338 BLooper::RemoveHandler(BHandler* handler) 339 { 340 if (handler == NULL) 341 return false; 342 343 AssertLocked(); 344 345 if (handler->Looper() == this && fHandlers.RemoveItem(handler)) { 346 if (handler == fPreferred) 347 fPreferred = NULL; 348 349 handler->SetNextHandler(NULL); 350 handler->SetLooper(NULL); 351 return true; 352 } 353 354 return false; 355 } 356 357 358 int32 359 BLooper::CountHandlers() const 360 { 361 AssertLocked(); 362 363 return fHandlers.CountItems(); 364 } 365 366 367 BHandler* 368 BLooper::HandlerAt(int32 index) const 369 { 370 AssertLocked(); 371 372 return (BHandler*)fHandlers.ItemAt(index); 373 } 374 375 376 int32 377 BLooper::IndexOf(BHandler* handler) const 378 { 379 AssertLocked(); 380 381 return fHandlers.IndexOf(handler); 382 } 383 384 385 BHandler* 386 BLooper::PreferredHandler() const 387 { 388 return fPreferred; 389 } 390 391 392 void 393 BLooper::SetPreferredHandler(BHandler* handler) 394 { 395 if (handler && handler->Looper() == this && IndexOf(handler) >= 0) { 396 fPreferred = handler; 397 } else { 398 fPreferred = NULL; 399 } 400 } 401 402 403 thread_id 404 BLooper::Run() 405 { 406 AssertLocked(); 407 408 if (fRunCalled) { 409 // Not allowed to call Run() more than once 410 debugger("can't call BLooper::Run twice!"); 411 return fThread; 412 } 413 414 fThread = spawn_thread(_task0_, Name(), fInitPriority, this); 415 if (fThread < B_OK) 416 return fThread; 417 418 if (fMsgPort < B_OK) 419 return fMsgPort; 420 421 fRunCalled = true; 422 Unlock(); 423 424 status_t err = resume_thread(fThread); 425 if (err < B_OK) 426 return err; 427 428 return fThread; 429 } 430 431 432 void 433 BLooper::Quit() 434 { 435 PRINT(("BLooper::Quit()\n")); 436 437 if (!IsLocked()) { 438 printf("ERROR - you must Lock a looper before calling Quit(), " 439 "team=%ld, looper=%s\n", Team(), Name() ? Name() : "unnamed"); 440 } 441 442 // Try to lock 443 if (!Lock()) { 444 // We're toast already 445 return; 446 } 447 448 PRINT((" is locked\n")); 449 450 if (!fRunCalled) { 451 PRINT((" Run() has not been called yet\n")); 452 fTerminating = true; 453 delete this; 454 } else if (find_thread(NULL) == fThread) { 455 PRINT((" We are the looper thread\n")); 456 fTerminating = true; 457 delete this; 458 exit_thread(0); 459 } else { 460 PRINT((" Run() has already been called and we are not the looper thread\n")); 461 462 // As with sem in _Lock(), we need to cache this here in case the looper 463 // disappears before we get to the wait_for_thread() below 464 thread_id tid = Thread(); 465 466 // We need to unlock here. Otherwise the looper thread can't 467 // dispatch the _QUIT_ message we're going to post. 468 UnlockFully(); 469 470 // As per the BeBook, if we've been called by a thread other than 471 // our own, the rest of the message queue has to get processed. So 472 // we put this in the queue, and when it shows up, we'll call Quit() 473 // from our own thread. 474 // A little testing with BMessageFilter shows _QUIT_ is being used here. 475 // I got suspicious when my test QuitRequested() wasn't getting called 476 // when Quit() was invoked from another thread. Makes a nice proof that 477 // this is how it's handled, too. 478 479 while (PostMessage(_QUIT_) == B_WOULD_BLOCK) { 480 // There's a slight chance that PostMessage() will return B_WOULD_BLOCK 481 // because the port is full, so we'll wait a bit and re-post until 482 // we won't block. 483 snooze(25000); 484 } 485 486 // We have to wait until the looper is done processing any remaining messages. 487 int32 temp; 488 while (wait_for_thread(tid, &temp) == B_INTERRUPTED) 489 ; 490 } 491 492 PRINT(("BLooper::Quit() done\n")); 493 } 494 495 496 bool 497 BLooper::QuitRequested() 498 { 499 return true; 500 } 501 502 503 bool 504 BLooper::Lock() 505 { 506 // Defer to global _Lock(); see notes there 507 return _Lock(this, -1, B_INFINITE_TIMEOUT) == B_OK; 508 } 509 510 511 void 512 BLooper::Unlock() 513 { 514 PRINT(("BLooper::Unlock()\n")); 515 // Make sure we're locked to begin with 516 AssertLocked(); 517 518 // Decrement fOwnerCount 519 --fOwnerCount; 520 PRINT((" fOwnerCount now: %ld\n", fOwnerCount)); 521 // Check to see if the owner still wants a lock 522 if (fOwnerCount == 0) { 523 // Set fOwner to invalid thread_id (< 0) 524 fOwner = -1; 525 fCachedStack = 0; 526 527 #if DEBUG < 1 528 // Decrement requested lock count (using fAtomicCount for this) 529 int32 atomicCount = atomic_add(&fAtomicCount, -1); 530 PRINT((" fAtomicCount now: %ld\n", fAtomicCount)); 531 532 // Check if anyone is waiting for a lock 533 // and release if it's the case 534 if (atomicCount > 1) 535 #endif 536 release_sem(fLockSem); 537 } 538 PRINT(("BLooper::Unlock() done\n")); 539 } 540 541 542 bool 543 BLooper::IsLocked() const 544 { 545 if (!gLooperList.IsLooperValid(this)) { 546 // The looper is gone, so of course it's not locked 547 return false; 548 } 549 550 return find_thread(NULL) == fOwner; 551 } 552 553 554 status_t 555 BLooper::LockWithTimeout(bigtime_t timeout) 556 { 557 return _Lock(this, -1, timeout); 558 } 559 560 561 thread_id 562 BLooper::Thread() const 563 { 564 return fThread; 565 } 566 567 568 team_id 569 BLooper::Team() const 570 { 571 return BPrivate::current_team(); 572 } 573 574 575 BLooper* 576 BLooper::LooperForThread(thread_id thread) 577 { 578 return gLooperList.LooperForThread(thread); 579 } 580 581 582 thread_id 583 BLooper::LockingThread() const 584 { 585 return fOwner; 586 } 587 588 589 int32 590 BLooper::CountLocks() const 591 { 592 return fOwnerCount; 593 } 594 595 596 int32 597 BLooper::CountLockRequests() const 598 { 599 return fAtomicCount; 600 } 601 602 603 sem_id 604 BLooper::Sem() const 605 { 606 return fLockSem; 607 } 608 609 610 BHandler* 611 BLooper::ResolveSpecifier(BMessage* msg, int32 index, 612 BMessage* specifier, int32 form, const char* property) 613 { 614 /** 615 @note When I was first dumping the results of GetSupportedSuites() from 616 various classes, the use of the extra_data field was quite 617 mysterious to me. Then I dumped BApplication and compared the 618 result against the BeBook's docs for scripting BApplication. A 619 bunch of it isn't documented, but what is tipped me to the idea 620 that the extra_data is being used as a quick and dirty way to tell 621 what scripting "command" has been sent, e.g., for easy use in a 622 switch statement. Would certainly be a lot faster than a bunch of 623 string comparisons -- which wouldn't tell the whole story anyway, 624 because of the same name being used for multiple properties. 625 */ 626 BPropertyInfo propertyInfo(gLooperPropInfo); 627 uint32 data; 628 status_t err = B_OK; 629 const char* errMsg = ""; 630 if (propertyInfo.FindMatch(msg, index, specifier, form, property, &data) >= 0) { 631 switch (data) { 632 case BLOOPER_PROCESS_INTERNALLY: 633 return this; 634 635 case BLOOPER_HANDLER_BY_INDEX: 636 { 637 int32 index = specifier->FindInt32("index"); 638 if (form == B_REVERSE_INDEX_SPECIFIER) { 639 index = CountHandlers() - index; 640 } 641 BHandler* target = HandlerAt(index); 642 if (target) { 643 // Specifier has been fully handled 644 msg->PopSpecifier(); 645 return target; 646 } else { 647 err = B_BAD_INDEX; 648 errMsg = "handler index out of range"; 649 } 650 break; 651 } 652 653 default: 654 err = B_BAD_SCRIPT_SYNTAX; 655 errMsg = "Didn't understand the specifier(s)"; 656 break; 657 } 658 } else { 659 return BHandler::ResolveSpecifier(msg, index, specifier, form, 660 property); 661 } 662 663 BMessage reply(B_MESSAGE_NOT_UNDERSTOOD); 664 reply.AddInt32("error", err); 665 reply.AddString("message", errMsg); 666 msg->SendReply(&reply); 667 668 return NULL; 669 } 670 671 672 status_t 673 BLooper::GetSupportedSuites(BMessage* data) 674 { 675 if (data == NULL) 676 return B_BAD_VALUE; 677 678 status_t status = data->AddString("suites", "suite/vnd.Be-looper"); 679 if (status == B_OK) { 680 BPropertyInfo PropertyInfo(gLooperPropInfo); 681 status = data->AddFlat("messages", &PropertyInfo); 682 if (status == B_OK) 683 status = BHandler::GetSupportedSuites(data); 684 } 685 686 return status; 687 } 688 689 690 void 691 BLooper::AddCommonFilter(BMessageFilter* filter) 692 { 693 if (!filter) 694 return; 695 696 AssertLocked(); 697 698 if (filter->Looper()) { 699 debugger("A MessageFilter can only be used once."); 700 return; 701 } 702 703 if (!fCommonFilters) 704 fCommonFilters = new BList(FILTER_LIST_BLOCK_SIZE); 705 706 filter->SetLooper(this); 707 fCommonFilters->AddItem(filter); 708 } 709 710 711 bool 712 BLooper::RemoveCommonFilter(BMessageFilter* filter) 713 { 714 AssertLocked(); 715 716 if (!fCommonFilters) 717 return false; 718 719 bool result = fCommonFilters->RemoveItem(filter); 720 if (result) 721 filter->SetLooper(NULL); 722 723 return result; 724 } 725 726 727 void 728 BLooper::SetCommonFilterList(BList* filters) 729 { 730 // We have a somewhat serious problem here. It is entirely possible in R5 731 // to assign a given list of filters to *two* BLoopers simultaneously. This 732 // becomes problematic when the loopers are destroyed: the last looper 733 // destroyed will have a problem when it tries to delete a filter list that 734 // has already been deleted. In R5, this results in a general protection 735 // fault. We fix this by checking the filter list for ownership issues. 736 737 AssertLocked(); 738 739 BMessageFilter* filter; 740 if (filters) { 741 // Check for ownership issues 742 for (int32 i = 0; i < filters->CountItems(); ++i) { 743 filter = (BMessageFilter*)filters->ItemAt(i); 744 if (filter->Looper()) { 745 debugger("A MessageFilter can only be used once."); 746 return; 747 } 748 } 749 } 750 751 if (fCommonFilters) { 752 for (int32 i = 0; i < fCommonFilters->CountItems(); ++i) { 753 delete (BMessageFilter*)fCommonFilters->ItemAt(i); 754 } 755 756 delete fCommonFilters; 757 fCommonFilters = NULL; 758 } 759 760 // Per the BeBook, we take ownership of the list 761 fCommonFilters = filters; 762 if (fCommonFilters) { 763 for (int32 i = 0; i < fCommonFilters->CountItems(); ++i) { 764 filter = (BMessageFilter*)fCommonFilters->ItemAt(i); 765 filter->SetLooper(this); 766 } 767 } 768 } 769 770 771 BList* 772 BLooper::CommonFilterList() const 773 { 774 return fCommonFilters; 775 } 776 777 778 status_t 779 BLooper::Perform(perform_code d, void* arg) 780 { 781 // This is sort of what we're doing for this function everywhere 782 return BHandler::Perform(d, arg); 783 } 784 785 786 BMessage* 787 BLooper::MessageFromPort(bigtime_t timeout) 788 { 789 return ReadMessageFromPort(timeout); 790 } 791 792 793 void BLooper::_ReservedLooper1() {} 794 void BLooper::_ReservedLooper2() {} 795 void BLooper::_ReservedLooper3() {} 796 void BLooper::_ReservedLooper4() {} 797 void BLooper::_ReservedLooper5() {} 798 void BLooper::_ReservedLooper6() {} 799 800 801 BLooper::BLooper(const BLooper&) 802 { 803 // Copy construction not allowed 804 } 805 806 807 BLooper& BLooper::operator=(const BLooper& ) 808 { 809 // Looper copying not allowed 810 return *this; 811 } 812 813 814 BLooper::BLooper(int32 priority, port_id port, const char* name) 815 { 816 // This must be a legacy constructor 817 fMsgPort = port; 818 _InitData(name, priority, B_LOOPER_PORT_DEFAULT_CAPACITY); 819 } 820 821 822 status_t 823 BLooper::_PostMessage(BMessage *msg, BHandler *handler, 824 BHandler *replyTo) 825 { 826 AutoLocker<BLooperList> listLocker(gLooperList); 827 if (!listLocker.IsLocked()) 828 return B_ERROR; 829 830 if (!gLooperList.IsLooperValid(this)) 831 return B_BAD_VALUE; 832 833 // Does handler belong to this looper? 834 if (handler && handler->Looper() != this) 835 return B_MISMATCHED_VALUES; 836 837 status_t status; 838 BMessenger messenger(handler, this, &status); 839 if (status == B_OK) 840 status = messenger.SendMessage(msg, replyTo, 0); 841 842 return status; 843 } 844 845 846 /*! 847 Locks a looper either by port or using a direct pointer to the looper. 848 849 \param looper looper to lock, if not NULL 850 \param port port to identify the looper in case \a looper is NULL 851 \param timeout timeout for acquiring the lock 852 */ 853 status_t 854 BLooper::_Lock(BLooper* looper, port_id port, bigtime_t timeout) 855 { 856 PRINT(("BLooper::_Lock(%p, %lx)\n", looper, port)); 857 858 // Check params (loop, port) 859 if (looper == NULL && port < 0) { 860 PRINT(("BLooper::_Lock() done 1\n")); 861 return B_BAD_VALUE; 862 } 863 864 thread_id currentThread = find_thread(NULL); 865 int32 oldCount; 866 sem_id sem; 867 868 { 869 AutoLocker<BLooperList> ListLock(gLooperList); 870 if (!ListLock.IsLocked()) 871 return B_BAD_VALUE; 872 873 // Look up looper by port_id, if necessary 874 if (looper == NULL) { 875 looper = gLooperList.LooperForPort(port); 876 if (looper == NULL) { 877 PRINT(("BLooper::_Lock() done 3\n")); 878 return B_BAD_VALUE; 879 } 880 } else if (!gLooperList.IsLooperValid(looper)) { 881 // Check looper validity 882 PRINT(("BLooper::_Lock() done 4\n")); 883 return B_BAD_VALUE; 884 } 885 886 // Check for nested lock attempt 887 if (currentThread == looper->fOwner) { 888 ++looper->fOwnerCount; 889 PRINT(("BLooper::_Lock() done 5: fOwnerCount: %ld\n", loop->fOwnerCount)); 890 return B_OK; 891 } 892 893 // Cache the semaphore, so that we can safely access it after having 894 // unlocked the looper list 895 sem = looper->fLockSem; 896 if (sem < 0) { 897 PRINT(("BLooper::_Lock() done 6\n")); 898 return B_BAD_VALUE; 899 } 900 901 // Bump the requested lock count (using fAtomicCount for this) 902 oldCount = atomic_add(&looper->fAtomicCount, 1); 903 } 904 905 return _LockComplete(looper, oldCount, currentThread, sem, timeout); 906 } 907 908 909 status_t 910 BLooper::_LockComplete(BLooper *looper, int32 oldCount, thread_id thread, 911 sem_id sem, bigtime_t timeout) 912 { 913 status_t err = B_OK; 914 915 #if DEBUG < 1 916 if (oldCount > 0) { 917 #endif 918 do { 919 err = acquire_sem_etc(sem, 1, B_RELATIVE_TIMEOUT, timeout); 920 } while (err == B_INTERRUPTED); 921 #if DEBUG < 1 922 } 923 #endif 924 if (err == B_OK) { 925 looper->fOwner = thread; 926 looper->fCachedStack = (addr_t)&err & ~(B_PAGE_SIZE - 1); 927 looper->fOwnerCount = 1; 928 } 929 930 PRINT(("BLooper::_LockComplete() done: %lx\n", err)); 931 return err; 932 } 933 934 935 void 936 BLooper::_InitData(const char *name, int32 priority, int32 portCapacity) 937 { 938 fOwner = B_ERROR; 939 fCachedStack = 0; 940 fRunCalled = false; 941 fDirectTarget = new (std::nothrow) BPrivate::BDirectMessageTarget(); 942 fCommonFilters = NULL; 943 fLastMessage = NULL; 944 fPreferred = NULL; 945 fThread = B_ERROR; 946 fTerminating = false; 947 fMsgPort = -1; 948 fAtomicCount = 0; 949 950 if (name == NULL) 951 name = "anonymous looper"; 952 953 #if DEBUG 954 fLockSem = create_sem(1, name); 955 #else 956 fLockSem = create_sem(0, name); 957 #endif 958 959 if (portCapacity <= 0) 960 portCapacity = B_LOOPER_PORT_DEFAULT_CAPACITY; 961 962 fMsgPort = create_port(portCapacity, name); 963 964 fInitPriority = priority; 965 966 gLooperList.AddLooper(this); 967 AddHandler(this); 968 } 969 970 971 void 972 BLooper::AddMessage(BMessage* message) 973 { 974 _AddMessagePriv(message); 975 976 // wakeup looper when being called from other threads if necessary 977 if (find_thread(NULL) != Thread() 978 && fDirectTarget->Queue()->IsNextMessage(message) && port_count(fMsgPort) <= 0) { 979 // there is currently no message waiting, and we need to wakeup the looper 980 write_port_etc(fMsgPort, 0, NULL, 0, B_RELATIVE_TIMEOUT, 0); 981 } 982 } 983 984 985 void 986 BLooper::_AddMessagePriv(BMessage* msg) 987 { 988 // ToDo: if no target token is specified, set to preferred handler 989 // Others may want to peek into our message queue, so the preferred 990 // handler must be set correctly already if no token was given 991 992 fDirectTarget->Queue()->AddMessage(msg); 993 } 994 995 996 status_t 997 BLooper::_task0_(void *arg) 998 { 999 BLooper *looper = (BLooper *)arg; 1000 1001 PRINT(("LOOPER: _task0_()\n")); 1002 1003 if (looper->Lock()) { 1004 PRINT(("LOOPER: looper locked\n")); 1005 looper->task_looper(); 1006 1007 delete looper; 1008 } 1009 1010 PRINT(("LOOPER: _task0_() done: thread %ld\n", find_thread(NULL))); 1011 return B_OK; 1012 } 1013 1014 1015 void * 1016 BLooper::ReadRawFromPort(int32 *msgCode, bigtime_t timeout) 1017 { 1018 PRINT(("BLooper::ReadRawFromPort()\n")); 1019 uint8 *buffer = NULL; 1020 ssize_t bufferSize; 1021 1022 do { 1023 bufferSize = port_buffer_size_etc(fMsgPort, B_RELATIVE_TIMEOUT, timeout); 1024 } while (bufferSize == B_INTERRUPTED); 1025 1026 if (bufferSize < B_OK) { 1027 PRINT(("BLooper::ReadRawFromPort(): failed: %ld\n", bufferSize)); 1028 return NULL; 1029 } 1030 1031 if (bufferSize > 0) 1032 buffer = (uint8 *)malloc(bufferSize); 1033 1034 // we don't want to wait again here, since that can only mean 1035 // that someone else has read our message and our bufferSize 1036 // is now probably wrong 1037 PRINT(("read_port()...\n")); 1038 bufferSize = read_port_etc(fMsgPort, msgCode, buffer, bufferSize, 1039 B_RELATIVE_TIMEOUT, 0); 1040 1041 if (bufferSize < B_OK) { 1042 free(buffer); 1043 return NULL; 1044 } 1045 1046 PRINT(("BLooper::ReadRawFromPort() read: %.4s, %p (%d bytes)\n", (char *)msgCode, buffer, bufferSize)); 1047 return buffer; 1048 } 1049 1050 1051 BMessage * 1052 BLooper::ReadMessageFromPort(bigtime_t timeout) 1053 { 1054 PRINT(("BLooper::ReadMessageFromPort()\n")); 1055 int32 msgCode; 1056 BMessage *message = NULL; 1057 1058 void *buffer = ReadRawFromPort(&msgCode, timeout); 1059 if (!buffer) 1060 return NULL; 1061 1062 message = ConvertToMessage(buffer, msgCode); 1063 free(buffer); 1064 1065 PRINT(("BLooper::ReadMessageFromPort() done: %p\n", message)); 1066 return message; 1067 } 1068 1069 1070 BMessage * 1071 BLooper::ConvertToMessage(void *buffer, int32 code) 1072 { 1073 PRINT(("BLooper::ConvertToMessage()\n")); 1074 if (!buffer) 1075 return NULL; 1076 1077 BMessage *message = new BMessage(); 1078 if (message->Unflatten((const char *)buffer) != B_OK) { 1079 PRINT(("BLooper::ConvertToMessage(): unflattening message failed\n")); 1080 delete message; 1081 message = NULL; 1082 } 1083 1084 PRINT(("BLooper::ConvertToMessage(): %p\n", message)); 1085 return message; 1086 } 1087 1088 1089 void 1090 BLooper::task_looper() 1091 { 1092 PRINT(("BLooper::task_looper()\n")); 1093 // Check that looper is locked (should be) 1094 AssertLocked(); 1095 // Unlock the looper 1096 Unlock(); 1097 1098 if (IsLocked()) 1099 debugger("looper must not be locked!"); 1100 1101 // loop: As long as we are not terminating. 1102 while (!fTerminating) { 1103 PRINT(("LOOPER: outer loop\n")); 1104 // TODO: timeout determination algo 1105 // Read from message port (how do we determine what the timeout is?) 1106 PRINT(("LOOPER: MessageFromPort()...\n")); 1107 BMessage *msg = MessageFromPort(); 1108 PRINT(("LOOPER: ...done\n")); 1109 1110 // Did we get a message? 1111 if (msg) 1112 _AddMessagePriv(msg); 1113 1114 // Get message count from port 1115 int32 msgCount = port_count(fMsgPort); 1116 for (int32 i = 0; i < msgCount; ++i) { 1117 // Read 'count' messages from port (so we will not block) 1118 // We use zero as our timeout since we know there is stuff there 1119 msg = MessageFromPort(0); 1120 if (msg) 1121 _AddMessagePriv(msg); 1122 } 1123 1124 // loop: As long as there are messages in the queue and the port is 1125 // empty... and we are not terminating, of course. 1126 bool dispatchNextMessage = true; 1127 while (!fTerminating && dispatchNextMessage) { 1128 PRINT(("LOOPER: inner loop\n")); 1129 // Get next message from queue (assign to fLastMessage) 1130 fLastMessage = fDirectTarget->Queue()->NextMessage(); 1131 1132 Lock(); 1133 1134 if (!fLastMessage) { 1135 // No more messages: Unlock the looper and terminate the 1136 // dispatch loop. 1137 dispatchNextMessage = false; 1138 } else { 1139 PRINT(("LOOPER: fLastMessage: 0x%lx: %.4s\n", fLastMessage->what, 1140 (char*)&fLastMessage->what)); 1141 DBG(fLastMessage->PrintToStream()); 1142 1143 // Get the target handler 1144 BHandler *handler = NULL; 1145 BMessage::Private messagePrivate(fLastMessage); 1146 bool usePreferred = messagePrivate.UsePreferredTarget(); 1147 1148 if (usePreferred) { 1149 PRINT(("LOOPER: use preferred target\n")); 1150 handler = fPreferred; 1151 if (handler == NULL) 1152 handler = this; 1153 } else { 1154 gDefaultTokens.GetToken(messagePrivate.GetTarget(), 1155 B_HANDLER_TOKEN, (void **)&handler); 1156 1157 // if this handler doesn't belong to us, we drop the message 1158 if (handler != NULL && handler->Looper() != this) 1159 handler = NULL; 1160 1161 PRINT(("LOOPER: use %ld, handler: %p, this: %p\n", 1162 messagePrivate.GetTarget(), handler, this)); 1163 } 1164 1165 // Is this a scripting message? (BMessage::HasSpecifiers()) 1166 if (handler != NULL && fLastMessage->HasSpecifiers()) { 1167 int32 index = 0; 1168 // Make sure the current specifier is kosher 1169 if (fLastMessage->GetCurrentSpecifier(&index) == B_OK) 1170 handler = resolve_specifier(handler, fLastMessage); 1171 } 1172 1173 if (handler) { 1174 // Do filtering 1175 handler = _TopLevelFilter(fLastMessage, handler); 1176 PRINT(("LOOPER: _TopLevelFilter(): %p\n", handler)); 1177 if (handler && handler->Looper() == this) 1178 DispatchMessage(fLastMessage, handler); 1179 } 1180 } 1181 1182 if (fTerminating) { 1183 // we leave the looper locked when we quit 1184 return; 1185 } 1186 1187 // Unlock the looper 1188 Unlock(); 1189 1190 // Delete the current message (fLastMessage) 1191 if (fLastMessage) { 1192 delete fLastMessage; 1193 fLastMessage = NULL; 1194 } 1195 1196 // Are any messages on the port? 1197 if (port_count(fMsgPort) > 0) { 1198 // Do outer loop 1199 dispatchNextMessage = false; 1200 } 1201 } 1202 } 1203 PRINT(("BLooper::task_looper() done\n")); 1204 } 1205 1206 1207 void 1208 BLooper::_QuitRequested(BMessage *msg) 1209 { 1210 bool isQuitting = QuitRequested(); 1211 1212 // We send a reply to the sender, when they're waiting for a reply or 1213 // if the request message contains a boolean "_shutdown_" field with value 1214 // true. In the latter case the message came from the registrar, asking 1215 // the application to shut down. 1216 bool shutdown; 1217 if (msg->IsSourceWaiting() 1218 || (msg->FindBool("_shutdown_", &shutdown) == B_OK && shutdown)) { 1219 BMessage replyMsg(B_REPLY); 1220 replyMsg.AddBool("result", isQuitting); 1221 replyMsg.AddInt32("thread", fThread); 1222 msg->SendReply(&replyMsg); 1223 } 1224 1225 if (isQuitting) 1226 Quit(); 1227 } 1228 1229 1230 bool 1231 BLooper::AssertLocked() const 1232 { 1233 if (!IsLocked()) { 1234 debugger("looper must be locked before proceeding\n"); 1235 return false; 1236 } 1237 1238 return true; 1239 } 1240 1241 1242 BHandler * 1243 BLooper::_TopLevelFilter(BMessage* msg, BHandler* target) 1244 { 1245 if (msg) { 1246 // Apply the common filters first 1247 target = _ApplyFilters(CommonFilterList(), msg, target); 1248 if (target) { 1249 if (target->Looper() != this) { 1250 debugger("Targeted handler does not belong to the looper."); 1251 target = NULL; 1252 } else { 1253 // Now apply handler-specific filters 1254 target = _HandlerFilter(msg, target); 1255 } 1256 } 1257 } 1258 1259 return target; 1260 } 1261 1262 1263 BHandler * 1264 BLooper::_HandlerFilter(BMessage* msg, BHandler* target) 1265 { 1266 // Keep running filters until our handler is NULL, or until the filtering 1267 // handler returns itself as the designated handler 1268 BHandler* previousTarget = NULL; 1269 while (target != NULL && target != previousTarget) { 1270 previousTarget = target; 1271 1272 target = _ApplyFilters(target->FilterList(), msg, target); 1273 if (target != NULL && target->Looper() != this) { 1274 debugger("Targeted handler does not belong to the looper."); 1275 target = NULL; 1276 } 1277 } 1278 1279 return target; 1280 } 1281 1282 1283 BHandler * 1284 BLooper::_ApplyFilters(BList* list, BMessage* msg, BHandler* target) 1285 { 1286 // This is where the action is! 1287 // Check the parameters 1288 if (!list || !msg) 1289 return target; 1290 1291 // For each filter in the provided list 1292 BMessageFilter* filter = NULL; 1293 for (int32 i = 0; i < list->CountItems(); ++i) { 1294 filter = (BMessageFilter*)list->ItemAt(i); 1295 1296 // Check command conditions 1297 if (filter->FiltersAnyCommand() || (filter->Command() == msg->what)) { 1298 // Check delivery conditions 1299 message_delivery delivery = filter->MessageDelivery(); 1300 bool dropped = msg->WasDropped(); 1301 if (delivery == B_ANY_DELIVERY 1302 || (delivery == B_DROPPED_DELIVERY && dropped) 1303 || (delivery == B_PROGRAMMED_DELIVERY && !dropped)) { 1304 // Check source conditions 1305 message_source source = filter->MessageSource(); 1306 bool remote = msg->IsSourceRemote(); 1307 if (source == B_ANY_SOURCE 1308 || (source == B_REMOTE_SOURCE && remote) 1309 || (source == B_LOCAL_SOURCE && !remote)) { 1310 // Are we using an "external" function? 1311 filter_result result; 1312 filter_hook func = filter->FilterFunction(); 1313 if (func) 1314 result = func(msg, &target, filter); 1315 else 1316 result = filter->Filter(msg, &target); 1317 1318 // Is further processing allowed? 1319 if (result == B_SKIP_MESSAGE) { 1320 // No; time to bail out 1321 return NULL; 1322 } 1323 } 1324 } 1325 } 1326 } 1327 1328 return target; 1329 } 1330 1331 1332 void 1333 BLooper::check_lock() 1334 { 1335 // This is a cheap variant of AssertLocked() 1336 // It is used in situations where it's clear that the looper is valid, 1337 // ie. from handlers 1338 uint32 stack; 1339 if (((uint32)&stack & ~(B_PAGE_SIZE - 1)) == fCachedStack 1340 || fOwner == find_thread(NULL)) 1341 return; 1342 1343 debugger("Looper must be locked."); 1344 } 1345 1346 1347 BHandler * 1348 BLooper::resolve_specifier(BHandler* target, BMessage* msg) 1349 { 1350 // Check params 1351 if (!target || !msg) 1352 return NULL; 1353 1354 int32 index; 1355 BMessage specifier; 1356 int32 form; 1357 const char* property; 1358 status_t err = B_OK; 1359 BHandler* newTarget = target; 1360 // Loop to deal with nested specifiers 1361 // (e.g., the 3rd button on the 4th view) 1362 do { 1363 err = msg->GetCurrentSpecifier(&index, &specifier, &form, &property); 1364 if (err) { 1365 BMessage reply(B_REPLY); 1366 reply.AddInt32("error", err); 1367 msg->SendReply(&reply); 1368 return NULL; 1369 } 1370 // Current target gets what was the new target 1371 target = newTarget; 1372 newTarget = target->ResolveSpecifier(msg, index, &specifier, form, 1373 property); 1374 // Check that new target is owned by looper; 1375 // use IndexOf() to avoid dereferencing newTarget 1376 // (possible race condition with object destruction 1377 // by another looper) 1378 if (!newTarget || IndexOf(newTarget) < 0) 1379 return NULL; 1380 1381 // Get current specifier index (may change in ResolveSpecifier()) 1382 err = msg->GetCurrentSpecifier(&index); 1383 } while (newTarget && newTarget != target && !err && index >= 0); 1384 1385 return newTarget; 1386 } 1387 1388 1389 void 1390 BLooper::UnlockFully() 1391 { 1392 AssertLocked(); 1393 1394 /** 1395 @note What we're doing here is completely undoing the current owner's lock 1396 on the looper. This is actually pretty easy, since the owner only 1397 has a single aquisition on the semaphore; every subsequent "lock" 1398 is just an increment to the owner count. The whole thing is quite 1399 similar to Unlock(), except that we clear the ownership variables, 1400 rather than merely decrementing them. 1401 */ 1402 // Clear the owner count 1403 fOwnerCount = 0; 1404 // Nobody owns the lock now 1405 fOwner = -1; 1406 fCachedStack = 0; 1407 #if DEBUG < 1 1408 // There is now one less thread holding a lock on this looper 1409 int32 atomicCount = atomic_add(&fAtomicCount, -1); 1410 if (atomicCount > 1) 1411 #endif 1412 release_sem(fLockSem); 1413 } 1414 1415 1416 // #pragma mark - 1417 1418 1419 port_id 1420 _get_looper_port_(const BLooper *looper) 1421 { 1422 return looper->fMsgPort; 1423 } 1424 1425