1 /* 2 * Copyright 2005, Ingo Weinhold, bonefish@users.sf.net. All rights reserved. 3 * Distributed under the terms of the MIT License. 4 */ 5 6 #include <map> 7 #include <new> 8 #include <set> 9 10 #include <AutoDeleter.h> 11 #include <Autolock.h> 12 #include <DataIO.h> 13 #include <MessagePrivate.h> 14 #include <MessengerPrivate.h> 15 #include <OS.h> 16 #include <TokenSpace.h> 17 #include <util/DoublyLinkedList.h> 18 19 #include <messaging.h> 20 21 #include "Debug.h" 22 #include "MessageDeliverer.h" 23 #include "Referenceable.h" 24 25 using std::map; 26 using std::nothrow; 27 using std::set; 28 29 // sDeliverer -- the singleton instance 30 MessageDeliverer *MessageDeliverer::sDeliverer = NULL; 31 32 static const bigtime_t kRetryDelay = 100000; // 100 ms 33 34 // per port sanity limits 35 static const int32 kMaxMessagesPerPort = 10000; 36 static const int32 kMaxDataPerPort = 50 * 1024 * 1024; // 50 MB 37 38 39 // MessagingTargetSet 40 41 // destructor 42 MessagingTargetSet::~MessagingTargetSet() 43 { 44 } 45 46 47 // #pragma mark - 48 49 // DefaultMessagingTargetSet 50 51 // constructor 52 DefaultMessagingTargetSet::DefaultMessagingTargetSet( 53 const messaging_target *targets, int32 targetCount) 54 : MessagingTargetSet(), 55 fTargets(targets), 56 fTargetCount(targetCount), 57 fNextIndex(0) 58 { 59 } 60 61 // destructor 62 DefaultMessagingTargetSet::~DefaultMessagingTargetSet() 63 { 64 } 65 66 // HasNext 67 bool 68 DefaultMessagingTargetSet::HasNext() const 69 { 70 return (fNextIndex < fTargetCount); 71 } 72 73 // Next 74 bool 75 DefaultMessagingTargetSet::Next(port_id &port, int32 &token) 76 { 77 if (fNextIndex >= fTargetCount) 78 return false; 79 80 port = fTargets[fNextIndex].port; 81 token = fTargets[fNextIndex].token; 82 fNextIndex++; 83 84 return true; 85 } 86 87 // Rewind 88 void 89 DefaultMessagingTargetSet::Rewind() 90 { 91 fNextIndex = 0; 92 } 93 94 95 // #pragma mark - 96 97 // SingleMessagingTargetSet 98 99 // constructor 100 SingleMessagingTargetSet::SingleMessagingTargetSet(BMessenger target) 101 : MessagingTargetSet(), 102 fAtBeginning(true) 103 { 104 BMessenger::Private messengerPrivate(target); 105 fPort = messengerPrivate.Port(); 106 fToken = (messengerPrivate.IsPreferredTarget() 107 ? B_PREFERRED_TOKEN : messengerPrivate.Token()); 108 } 109 110 // constructor 111 SingleMessagingTargetSet::SingleMessagingTargetSet(port_id port, int32 token) 112 : MessagingTargetSet(), 113 fPort(port), 114 fToken(token), 115 fAtBeginning(true) 116 { 117 } 118 119 // destructor 120 SingleMessagingTargetSet::~SingleMessagingTargetSet() 121 { 122 } 123 124 // HasNext 125 bool 126 SingleMessagingTargetSet::HasNext() const 127 { 128 return fAtBeginning; 129 } 130 131 // Next 132 bool 133 SingleMessagingTargetSet::Next(port_id &port, int32 &token) 134 { 135 if (!fAtBeginning) 136 return false; 137 138 port = fPort; 139 token = fToken; 140 fAtBeginning = false; 141 142 return true; 143 } 144 145 // Rewind 146 void 147 SingleMessagingTargetSet::Rewind() 148 { 149 fAtBeginning = true; 150 } 151 152 153 // #pragma mark - 154 155 // Message 156 /*! \brief Encapsulates a message to be delivered. 157 158 Besides the flattened message it also stores the when the message was 159 created and when the delivery attempts shall time out. 160 */ 161 class MessageDeliverer::Message : public Referenceable { 162 public: 163 Message(void *data, int32 dataSize, bigtime_t timeout) 164 : Referenceable(true), 165 fData(data), 166 fDataSize(dataSize), 167 fCreationTime(system_time()), 168 fBusy(false) 169 { 170 if (B_INFINITE_TIMEOUT - fCreationTime <= timeout) 171 fTimeoutTime = B_INFINITE_TIMEOUT; 172 else if (timeout <= 0) 173 fTimeoutTime = fCreationTime; 174 else 175 fTimeoutTime = fCreationTime + timeout; 176 } 177 178 ~Message() 179 { 180 free(fData); 181 } 182 183 void *Data() const 184 { 185 return fData; 186 } 187 188 int32 DataSize() const 189 { 190 return fDataSize; 191 } 192 193 bigtime_t CreationTime() const 194 { 195 return fCreationTime; 196 } 197 198 bigtime_t TimeoutTime() const 199 { 200 return fTimeoutTime; 201 } 202 203 bool HasTimeout() const 204 { 205 return (fTimeoutTime < B_INFINITE_TIMEOUT); 206 } 207 208 void SetBusy(bool busy) 209 { 210 fBusy = busy; 211 } 212 213 bool IsBusy() const 214 { 215 return fBusy; 216 } 217 218 private: 219 void *fData; 220 int32 fDataSize; 221 bigtime_t fCreationTime; 222 bigtime_t fTimeoutTime; 223 bool fBusy; 224 }; 225 226 // TargetMessage 227 /*! \brief Encapsulates a Message to be sent to a specific handler. 228 229 A TargetMessage is always associated with (i.e. queued in) a TargetPort. 230 While a Message stores only the message data and some timing info, this 231 object adds the token of a the target BHandler. 232 233 A Message can be referred to by more than one TargetMessage (when 234 broadcasting), but a TargetMessage is referred to exactly once, by 235 the TargetPort. 236 */ 237 class MessageDeliverer::TargetMessage 238 : public DoublyLinkedListLinkImpl<MessageDeliverer::TargetMessage> { 239 public: 240 TargetMessage(Message *message, int32 token) 241 : fMessage(message), 242 fToken(token) 243 { 244 if (fMessage) 245 fMessage->AddReference(); 246 } 247 248 ~TargetMessage() 249 { 250 if (fMessage) 251 fMessage->RemoveReference(); 252 } 253 254 Message *GetMessage() const 255 { 256 return fMessage; 257 } 258 259 int32 Token() const 260 { 261 return fToken; 262 } 263 264 private: 265 Message *fMessage; 266 int32 fToken; 267 }; 268 269 // TargetMessageHandle 270 /*! \brief A small wrapper for TargetMessage providing a complete order. 271 272 This class only exists to provide the comparison operators required to 273 put a TargetMessage into a set. The order implemented is by ascending by 274 timeout time (primary) and by TargetMessage pointer (secondary). 275 Hence TargetMessageHandles referring to the same TargetMessage are equal 276 (and only those). 277 */ 278 class MessageDeliverer::TargetMessageHandle { 279 public: 280 TargetMessageHandle(TargetMessage *message) 281 : fMessage(message) 282 { 283 } 284 285 TargetMessageHandle(const TargetMessageHandle &other) 286 : fMessage(other.fMessage) 287 { 288 } 289 290 TargetMessage *GetMessage() const 291 { 292 return fMessage; 293 } 294 295 TargetMessageHandle &operator=(const TargetMessageHandle &other) 296 { 297 fMessage = other.fMessage; 298 return *this; 299 } 300 301 bool operator==(const TargetMessageHandle &other) const 302 { 303 return (fMessage == other.fMessage); 304 } 305 306 bool operator!=(const TargetMessageHandle &other) const 307 { 308 return (fMessage != other.fMessage); 309 } 310 311 bool operator<(const TargetMessageHandle &other) const 312 { 313 bigtime_t timeout = fMessage->GetMessage()->TimeoutTime(); 314 bigtime_t otherTimeout = other.fMessage->GetMessage()->TimeoutTime(); 315 if (timeout < otherTimeout) 316 return true; 317 if (timeout > otherTimeout) 318 return false; 319 return (fMessage < other.fMessage); 320 } 321 322 private: 323 TargetMessage *fMessage; 324 }; 325 326 // TargetPort 327 /*! \brief Represents a full target port, queuing the not yet delivered 328 messages. 329 330 A TargetPort internally queues TargetMessages in the order the are to be 331 delivered. Furthermore the object maintains an ordered set of 332 TargetMessages that can timeout (in ascending order of timeout time), so 333 that timed out messages can be dropped easily. 334 */ 335 class MessageDeliverer::TargetPort { 336 public: 337 TargetPort(port_id portID) 338 : fPortID(portID), 339 fMessages(), 340 fMessageCount(0), 341 fMessageSize(0) 342 { 343 } 344 345 ~TargetPort() 346 { 347 while (!fMessages.IsEmpty()) 348 PopMessage(); 349 } 350 351 port_id PortID() const 352 { 353 return fPortID; 354 } 355 356 status_t PushMessage(Message *message, int32 token) 357 { 358 PRINT(("MessageDeliverer::TargetPort::PushMessage(port: %ld, %p, %ld)\n", 359 fPortID, message, token)); 360 // create a target message 361 TargetMessage *targetMessage 362 = new(nothrow) TargetMessage(message, token); 363 if (!targetMessage) 364 return B_NO_MEMORY; 365 366 // push it 367 fMessages.Insert(targetMessage); 368 fMessageCount++; 369 fMessageSize += targetMessage->GetMessage()->DataSize(); 370 371 // add it to the timeoutable messages, if it has a timeout 372 if (message->HasTimeout()) 373 fTimeoutableMessages.insert(targetMessage); 374 375 _EnforceLimits(); 376 377 return B_OK; 378 } 379 380 Message *PeekMessage(int32 &token) const 381 { 382 if (!fMessages.Head()) 383 return NULL; 384 385 token = fMessages.Head()->Token(); 386 return fMessages.Head()->GetMessage(); 387 } 388 389 void PopMessage() 390 { 391 if (fMessages.Head()) { 392 PRINT(("MessageDeliverer::TargetPort::PopMessage(): port: %ld, %p\n", 393 fPortID, fMessages.Head()->GetMessage())); 394 _RemoveMessage(fMessages.Head()); 395 } 396 } 397 398 void DropTimedOutMessages() 399 { 400 bigtime_t now = system_time(); 401 402 while (fTimeoutableMessages.begin() != fTimeoutableMessages.end()) { 403 TargetMessage *message = fTimeoutableMessages.begin()->GetMessage(); 404 if (message->GetMessage()->TimeoutTime() > now) 405 break; 406 407 PRINT(("MessageDeliverer::TargetPort::DropTimedOutMessages(): port: %ld: " 408 "message %p timed out\n", fPortID, message->GetMessage())); 409 _RemoveMessage(message); 410 } 411 } 412 413 bool IsEmpty() const 414 { 415 return fMessages.IsEmpty(); 416 } 417 418 private: 419 void _RemoveMessage(TargetMessage *message) 420 { 421 fMessages.Remove(message); 422 fMessageCount--; 423 fMessageSize -= message->GetMessage()->DataSize(); 424 425 if (message->GetMessage()->HasTimeout()) 426 fTimeoutableMessages.erase(message); 427 428 delete message; 429 } 430 431 void _EnforceLimits() 432 { 433 // message count 434 while (fMessageCount > kMaxMessagesPerPort) { 435 PRINT(("MessageDeliverer::TargetPort::_EnforceLimits(): port: %ld: hit maximum " 436 "message count limit.\n", fPortID)); 437 PopMessage(); 438 } 439 440 // message size 441 while (fMessageSize > kMaxDataPerPort) { 442 PRINT(("MessageDeliverer::TargetPort::_EnforceLimits(): port: %ld: hit maximum " 443 "message size limit.\n", fPortID)); 444 PopMessage(); 445 } 446 } 447 448 typedef DoublyLinkedList<TargetMessage> MessageList; 449 450 port_id fPortID; 451 MessageList fMessages; 452 int32 fMessageCount; 453 int32 fMessageSize; 454 set<TargetMessageHandle> fTimeoutableMessages; 455 }; 456 457 // TargetPortMap 458 struct MessageDeliverer::TargetPortMap : public map<port_id, TargetPort*> { 459 }; 460 461 462 // #pragma mark - 463 464 /*! \class MessageDeliverer 465 \brief Service for delivering messages, which retries the delivery as long 466 as the target port is full. 467 468 For the user of the service only the MessageDeliverer::DeliverMessage() 469 will be of interest. Some of them allow broadcasting a message to several 470 recepients. 471 472 The class maintains a TargetPort for each target port which was full at the 473 time a message was to be delivered to it. A TargetPort has a queue of 474 undelivered messages. A separate worker thread retries periodically to send 475 the yet undelivered messages to the respective target ports. 476 */ 477 478 // constructor 479 MessageDeliverer::MessageDeliverer() 480 : fLock("message deliverer"), 481 fTargetPorts(NULL), 482 fDelivererThread(-1), 483 fTerminating(false) 484 { 485 } 486 487 // destructor 488 MessageDeliverer::~MessageDeliverer() 489 { 490 fTerminating = true; 491 492 if (fDelivererThread >= 0) { 493 int32 result; 494 wait_for_thread(fDelivererThread, &result); 495 } 496 497 delete fTargetPorts; 498 } 499 500 // Init 501 status_t 502 MessageDeliverer::Init() 503 { 504 // create the target port map 505 fTargetPorts = new(nothrow) TargetPortMap; 506 if (!fTargetPorts) 507 return B_NO_MEMORY; 508 509 // spawn the deliverer thread 510 fDelivererThread = spawn_thread(MessageDeliverer::_DelivererThreadEntry, 511 "message deliverer", B_NORMAL_PRIORITY, this); 512 if (fDelivererThread < 0) 513 return fDelivererThread; 514 515 // resume the deliverer thread 516 resume_thread(fDelivererThread); 517 518 return B_OK; 519 } 520 521 // CreateDefault 522 status_t 523 MessageDeliverer::CreateDefault() 524 { 525 if (sDeliverer) 526 return B_OK; 527 528 // create the deliverer 529 MessageDeliverer *deliverer = new(nothrow) MessageDeliverer; 530 if (!deliverer) 531 return B_NO_MEMORY; 532 533 // init it 534 status_t error = deliverer->Init(); 535 if (error != B_OK) { 536 delete deliverer; 537 return error; 538 } 539 540 sDeliverer = deliverer; 541 return B_OK; 542 } 543 544 // DeleteDefault 545 void 546 MessageDeliverer::DeleteDefault() 547 { 548 if (sDeliverer) { 549 delete sDeliverer; 550 sDeliverer = NULL; 551 } 552 } 553 554 // Default 555 MessageDeliverer * 556 MessageDeliverer::Default() 557 { 558 return sDeliverer; 559 } 560 561 // DeliverMessage 562 /*! \brief Delivers a message to the supplied target. 563 564 The method tries to send the message right now (if there are not already 565 messages pending for the target port). If that fails due to a full target 566 port, the message is queued for later delivery. 567 568 \param message The message to be delivered. 569 \param target A BMessenger identifying the delivery target. 570 \param timeout If given, the message will be dropped, when it couldn't be 571 delivered after this amount of microseconds. 572 \return 573 - \c B_OK, if sending the message succeeded or if the target port was 574 full and the message has been queued, 575 - another error code otherwise. 576 */ 577 status_t 578 MessageDeliverer::DeliverMessage(BMessage *message, BMessenger target, 579 bigtime_t timeout) 580 { 581 SingleMessagingTargetSet set(target); 582 return DeliverMessage(message, set, timeout); 583 } 584 585 // DeliverMessage 586 /*! \brief Delivers a message to the supplied targets. 587 588 The method tries to send the message right now to each of the given targets 589 (if there are not already messages pending for a target port). If that 590 fails due to a full target port, the message is queued for later delivery. 591 592 \param message The message to be delivered. 593 \param targets MessagingTargetSet providing the the delivery targets. 594 \param timeout If given, the message will be dropped, when it couldn't be 595 delivered after this amount of microseconds. 596 \return 597 - \c B_OK, if for each of the given targets sending the message succeeded 598 or if the target port was full and the message has been queued, 599 - another error code otherwise. 600 */ 601 status_t 602 MessageDeliverer::DeliverMessage(BMessage *message, MessagingTargetSet &targets, 603 bigtime_t timeout) 604 { 605 if (!message) 606 return B_BAD_VALUE; 607 608 // flatten the message 609 BMallocIO mallocIO; 610 status_t error = BMessage::Private(message).NativeFlatten(&mallocIO, NULL); 611 if (error < B_OK) 612 return error; 613 614 return DeliverMessage(mallocIO.Buffer(), mallocIO.BufferLength(), targets, 615 timeout); 616 } 617 618 // DeliverMessage 619 /*! \brief Delivers a flattened message to the supplied targets. 620 621 The method tries to send the message right now to each of the given targets 622 (if there are not already messages pending for a target port). If that 623 fails due to a full target port, the message is queued for later delivery. 624 625 \param message The flattened message to be delivered. This may be a 626 flattened BMessage or KMessage. 627 \param messageSize The size of the flattened message buffer. 628 \param targets MessagingTargetSet providing the the delivery targets. 629 \param timeout If given, the message will be dropped, when it couldn't be 630 delivered after this amount of microseconds. 631 \return 632 - \c B_OK, if for each of the given targets sending the message succeeded 633 or if the target port was full and the message has been queued, 634 - another error code otherwise. 635 */ 636 status_t 637 MessageDeliverer::DeliverMessage(const void *messageData, int32 messageSize, 638 MessagingTargetSet &targets, bigtime_t timeout) 639 { 640 if (!messageData || messageSize <= 0) 641 return B_BAD_VALUE; 642 643 // clone the buffer 644 void *data = malloc(messageSize); 645 if (!data) 646 return B_NO_MEMORY; 647 memcpy(data, messageData, messageSize); 648 649 // create a Message 650 Message *message = new(nothrow) Message(data, messageSize, timeout); 651 if (!message) { 652 free(data); 653 return B_NO_MEMORY; 654 } 655 Reference<Message> _(message, true); 656 657 // add the message to the respective target ports 658 BAutolock locker(fLock); 659 for (int32 targetIndex = 0; targets.HasNext(); targetIndex++) { 660 port_id portID; 661 int32 token; 662 targets.Next(portID, token); 663 664 // get the target port 665 TargetPort *port = _GetTargetPort(portID, true); 666 if (!port) 667 return B_NO_MEMORY; 668 669 // try sending the message, if there are no queued messages yet 670 if (port->IsEmpty()) { 671 status_t error = _SendMessage(message, portID, token); 672 // if the message was delivered OK, we're done with the target 673 if (error == B_OK) { 674 _PutTargetPort(port); 675 continue; 676 } 677 678 // if the port is not full, but an error occurred, we skip this target 679 if (error != B_WOULD_BLOCK) { 680 _PutTargetPort(port); 681 if (targetIndex == 0 && !targets.HasNext()) 682 return error; 683 continue; 684 } 685 } 686 687 // add the message 688 status_t error = port->PushMessage(message, token); 689 _PutTargetPort(port); 690 if (error != B_OK) 691 return error; 692 } 693 694 return B_OK; 695 } 696 697 // _GetTargetPort 698 MessageDeliverer::TargetPort * 699 MessageDeliverer::_GetTargetPort(port_id portID, bool create) 700 { 701 // get the port from the map 702 TargetPortMap::iterator it = fTargetPorts->find(portID); 703 if (it != fTargetPorts->end()) 704 return it->second; 705 706 if (!create) 707 return NULL; 708 709 // create a port 710 TargetPort *port = new(nothrow) TargetPort(portID); 711 if (!port) 712 return NULL; 713 (*fTargetPorts)[portID] = port; 714 715 return port; 716 } 717 718 // _PutTargetPort 719 void 720 MessageDeliverer::_PutTargetPort(TargetPort *port) 721 { 722 if (!port) 723 return; 724 725 if (port->IsEmpty()) { 726 fTargetPorts->erase(port->PortID()); 727 delete port; 728 } 729 } 730 731 // _SendMessage 732 status_t 733 MessageDeliverer::_SendMessage(Message *message, port_id portID, int32 token) 734 { 735 status_t error = BMessage::Private::SendFlattenedMessage(message->Data(), 736 message->DataSize(), portID, token, 0); 737 //PRINT(("MessageDeliverer::_SendMessage(%p, port: %ld, token: %ld): %lx\n", 738 //message, portID, token, error)); 739 return error; 740 } 741 742 // _DelivererThreadEntry 743 int32 744 MessageDeliverer::_DelivererThreadEntry(void *data) 745 { 746 return ((MessageDeliverer*)data)->_DelivererThread(); 747 } 748 749 // _DelivererThread 750 int32 751 MessageDeliverer::_DelivererThread() 752 { 753 while (!fTerminating) { 754 snooze(kRetryDelay); 755 if (fTerminating) 756 break; 757 758 // iterate through all target ports and try sending the messages 759 BAutolock _(fLock); 760 for (TargetPortMap::iterator it = fTargetPorts->begin(); 761 it != fTargetPorts->end();) { 762 TargetPort *port = it->second; 763 bool portError = false; 764 765 port->DropTimedOutMessages(); 766 767 // try sending all messages 768 int32 token; 769 while (Message *message = port->PeekMessage(token)) { 770 status_t error = B_OK; 771 // if (message->TimeoutTime() > system_time()) { 772 error = _SendMessage(message, port->PortID(), token); 773 // } else { 774 // // timeout, drop message 775 // PRINT(("MessageDeliverer::_DelivererThread(): port %ld, " 776 // "message %p timed out\n", port->PortID(), message)); 777 // } 778 779 if (error == B_OK) { 780 port->PopMessage(); 781 } else if (error == B_WOULD_BLOCK) { 782 // no luck yet -- port is still full 783 break; 784 } else { 785 // unexpected error -- probably the port is gone 786 portError = true; 787 break; 788 } 789 } 790 791 // next port 792 if (portError || port->IsEmpty()) { 793 TargetPortMap::iterator oldIt = it; 794 ++it; 795 delete port; 796 fTargetPorts->erase(oldIt); 797 } else 798 ++it; 799 } 800 } 801 802 return 0; 803 } 804