1 /* 2 * Copyright 2005, Ingo Weinhold, bonefish@users.sf.net. All rights reserved. 3 * Distributed under the terms of the MIT License. 4 */ 5 6 #include <map> 7 #include <new> 8 #include <set> 9 #include <string.h> 10 11 #include <AutoDeleter.h> 12 #include <Autolock.h> 13 #include <DataIO.h> 14 #include <MessagePrivate.h> 15 #include <MessengerPrivate.h> 16 #include <OS.h> 17 #include <TokenSpace.h> 18 #include <util/DoublyLinkedList.h> 19 20 #include <messaging.h> 21 22 #include "Debug.h" 23 #include "MessageDeliverer.h" 24 #include "Referenceable.h" 25 26 using std::map; 27 using std::nothrow; 28 using std::set; 29 30 // sDeliverer -- the singleton instance 31 MessageDeliverer *MessageDeliverer::sDeliverer = NULL; 32 33 static const bigtime_t kRetryDelay = 100000; // 100 ms 34 35 // per port sanity limits 36 static const int32 kMaxMessagesPerPort = 10000; 37 static const int32 kMaxDataPerPort = 50 * 1024 * 1024; // 50 MB 38 39 40 // MessagingTargetSet 41 42 // destructor 43 MessagingTargetSet::~MessagingTargetSet() 44 { 45 } 46 47 48 // #pragma mark - 49 50 // DefaultMessagingTargetSet 51 52 // constructor 53 DefaultMessagingTargetSet::DefaultMessagingTargetSet( 54 const messaging_target *targets, int32 targetCount) 55 : MessagingTargetSet(), 56 fTargets(targets), 57 fTargetCount(targetCount), 58 fNextIndex(0) 59 { 60 } 61 62 // destructor 63 DefaultMessagingTargetSet::~DefaultMessagingTargetSet() 64 { 65 } 66 67 // HasNext 68 bool 69 DefaultMessagingTargetSet::HasNext() const 70 { 71 return (fNextIndex < fTargetCount); 72 } 73 74 // Next 75 bool 76 DefaultMessagingTargetSet::Next(port_id &port, int32 &token) 77 { 78 if (fNextIndex >= fTargetCount) 79 return false; 80 81 port = fTargets[fNextIndex].port; 82 token = fTargets[fNextIndex].token; 83 fNextIndex++; 84 85 return true; 86 } 87 88 // Rewind 89 void 90 DefaultMessagingTargetSet::Rewind() 91 { 92 fNextIndex = 0; 93 } 94 95 96 // #pragma mark - 97 98 // SingleMessagingTargetSet 99 100 // constructor 101 SingleMessagingTargetSet::SingleMessagingTargetSet(BMessenger target) 102 : MessagingTargetSet(), 103 fAtBeginning(true) 104 { 105 BMessenger::Private messengerPrivate(target); 106 fPort = messengerPrivate.Port(); 107 fToken = (messengerPrivate.IsPreferredTarget() 108 ? B_PREFERRED_TOKEN : messengerPrivate.Token()); 109 } 110 111 // constructor 112 SingleMessagingTargetSet::SingleMessagingTargetSet(port_id port, int32 token) 113 : MessagingTargetSet(), 114 fPort(port), 115 fToken(token), 116 fAtBeginning(true) 117 { 118 } 119 120 // destructor 121 SingleMessagingTargetSet::~SingleMessagingTargetSet() 122 { 123 } 124 125 // HasNext 126 bool 127 SingleMessagingTargetSet::HasNext() const 128 { 129 return fAtBeginning; 130 } 131 132 // Next 133 bool 134 SingleMessagingTargetSet::Next(port_id &port, int32 &token) 135 { 136 if (!fAtBeginning) 137 return false; 138 139 port = fPort; 140 token = fToken; 141 fAtBeginning = false; 142 143 return true; 144 } 145 146 // Rewind 147 void 148 SingleMessagingTargetSet::Rewind() 149 { 150 fAtBeginning = true; 151 } 152 153 154 // #pragma mark - 155 156 // Message 157 /*! \brief Encapsulates a message to be delivered. 158 159 Besides the flattened message it also stores the when the message was 160 created and when the delivery attempts shall time out. 161 */ 162 class MessageDeliverer::Message : public Referenceable { 163 public: 164 Message(void *data, int32 dataSize, bigtime_t timeout) 165 : Referenceable(true), 166 fData(data), 167 fDataSize(dataSize), 168 fCreationTime(system_time()), 169 fBusy(false) 170 { 171 if (B_INFINITE_TIMEOUT - fCreationTime <= timeout) 172 fTimeoutTime = B_INFINITE_TIMEOUT; 173 else if (timeout <= 0) 174 fTimeoutTime = fCreationTime; 175 else 176 fTimeoutTime = fCreationTime + timeout; 177 } 178 179 ~Message() 180 { 181 free(fData); 182 } 183 184 void *Data() const 185 { 186 return fData; 187 } 188 189 int32 DataSize() const 190 { 191 return fDataSize; 192 } 193 194 bigtime_t CreationTime() const 195 { 196 return fCreationTime; 197 } 198 199 bigtime_t TimeoutTime() const 200 { 201 return fTimeoutTime; 202 } 203 204 bool HasTimeout() const 205 { 206 return (fTimeoutTime < B_INFINITE_TIMEOUT); 207 } 208 209 void SetBusy(bool busy) 210 { 211 fBusy = busy; 212 } 213 214 bool IsBusy() const 215 { 216 return fBusy; 217 } 218 219 private: 220 void *fData; 221 int32 fDataSize; 222 bigtime_t fCreationTime; 223 bigtime_t fTimeoutTime; 224 bool fBusy; 225 }; 226 227 // TargetMessage 228 /*! \brief Encapsulates a Message to be sent to a specific handler. 229 230 A TargetMessage is always associated with (i.e. queued in) a TargetPort. 231 While a Message stores only the message data and some timing info, this 232 object adds the token of a the target BHandler. 233 234 A Message can be referred to by more than one TargetMessage (when 235 broadcasting), but a TargetMessage is referred to exactly once, by 236 the TargetPort. 237 */ 238 class MessageDeliverer::TargetMessage 239 : public DoublyLinkedListLinkImpl<MessageDeliverer::TargetMessage> { 240 public: 241 TargetMessage(Message *message, int32 token) 242 : fMessage(message), 243 fToken(token) 244 { 245 if (fMessage) 246 fMessage->AddReference(); 247 } 248 249 ~TargetMessage() 250 { 251 if (fMessage) 252 fMessage->RemoveReference(); 253 } 254 255 Message *GetMessage() const 256 { 257 return fMessage; 258 } 259 260 int32 Token() const 261 { 262 return fToken; 263 } 264 265 private: 266 Message *fMessage; 267 int32 fToken; 268 }; 269 270 // TargetMessageHandle 271 /*! \brief A small wrapper for TargetMessage providing a complete order. 272 273 This class only exists to provide the comparison operators required to 274 put a TargetMessage into a set. The order implemented is by ascending by 275 timeout time (primary) and by TargetMessage pointer (secondary). 276 Hence TargetMessageHandles referring to the same TargetMessage are equal 277 (and only those). 278 */ 279 class MessageDeliverer::TargetMessageHandle { 280 public: 281 TargetMessageHandle(TargetMessage *message) 282 : fMessage(message) 283 { 284 } 285 286 TargetMessageHandle(const TargetMessageHandle &other) 287 : fMessage(other.fMessage) 288 { 289 } 290 291 TargetMessage *GetMessage() const 292 { 293 return fMessage; 294 } 295 296 TargetMessageHandle &operator=(const TargetMessageHandle &other) 297 { 298 fMessage = other.fMessage; 299 return *this; 300 } 301 302 bool operator==(const TargetMessageHandle &other) const 303 { 304 return (fMessage == other.fMessage); 305 } 306 307 bool operator!=(const TargetMessageHandle &other) const 308 { 309 return (fMessage != other.fMessage); 310 } 311 312 bool operator<(const TargetMessageHandle &other) const 313 { 314 bigtime_t timeout = fMessage->GetMessage()->TimeoutTime(); 315 bigtime_t otherTimeout = other.fMessage->GetMessage()->TimeoutTime(); 316 if (timeout < otherTimeout) 317 return true; 318 if (timeout > otherTimeout) 319 return false; 320 return (fMessage < other.fMessage); 321 } 322 323 private: 324 TargetMessage *fMessage; 325 }; 326 327 // TargetPort 328 /*! \brief Represents a full target port, queuing the not yet delivered 329 messages. 330 331 A TargetPort internally queues TargetMessages in the order the are to be 332 delivered. Furthermore the object maintains an ordered set of 333 TargetMessages that can timeout (in ascending order of timeout time), so 334 that timed out messages can be dropped easily. 335 */ 336 class MessageDeliverer::TargetPort { 337 public: 338 TargetPort(port_id portID) 339 : fPortID(portID), 340 fMessages(), 341 fMessageCount(0), 342 fMessageSize(0) 343 { 344 } 345 346 ~TargetPort() 347 { 348 while (!fMessages.IsEmpty()) 349 PopMessage(); 350 } 351 352 port_id PortID() const 353 { 354 return fPortID; 355 } 356 357 status_t PushMessage(Message *message, int32 token) 358 { 359 PRINT(("MessageDeliverer::TargetPort::PushMessage(port: %ld, %p, %ld)\n", 360 fPortID, message, token)); 361 // create a target message 362 TargetMessage *targetMessage 363 = new(nothrow) TargetMessage(message, token); 364 if (!targetMessage) 365 return B_NO_MEMORY; 366 367 // push it 368 fMessages.Insert(targetMessage); 369 fMessageCount++; 370 fMessageSize += targetMessage->GetMessage()->DataSize(); 371 372 // add it to the timeoutable messages, if it has a timeout 373 if (message->HasTimeout()) 374 fTimeoutableMessages.insert(targetMessage); 375 376 _EnforceLimits(); 377 378 return B_OK; 379 } 380 381 Message *PeekMessage(int32 &token) const 382 { 383 if (!fMessages.Head()) 384 return NULL; 385 386 token = fMessages.Head()->Token(); 387 return fMessages.Head()->GetMessage(); 388 } 389 390 void PopMessage() 391 { 392 if (fMessages.Head()) { 393 PRINT(("MessageDeliverer::TargetPort::PopMessage(): port: %ld, %p\n", 394 fPortID, fMessages.Head()->GetMessage())); 395 _RemoveMessage(fMessages.Head()); 396 } 397 } 398 399 void DropTimedOutMessages() 400 { 401 bigtime_t now = system_time(); 402 403 while (fTimeoutableMessages.begin() != fTimeoutableMessages.end()) { 404 TargetMessage *message = fTimeoutableMessages.begin()->GetMessage(); 405 if (message->GetMessage()->TimeoutTime() > now) 406 break; 407 408 PRINT(("MessageDeliverer::TargetPort::DropTimedOutMessages(): port: %ld: " 409 "message %p timed out\n", fPortID, message->GetMessage())); 410 _RemoveMessage(message); 411 } 412 } 413 414 bool IsEmpty() const 415 { 416 return fMessages.IsEmpty(); 417 } 418 419 private: 420 void _RemoveMessage(TargetMessage *message) 421 { 422 fMessages.Remove(message); 423 fMessageCount--; 424 fMessageSize -= message->GetMessage()->DataSize(); 425 426 if (message->GetMessage()->HasTimeout()) 427 fTimeoutableMessages.erase(message); 428 429 delete message; 430 } 431 432 void _EnforceLimits() 433 { 434 // message count 435 while (fMessageCount > kMaxMessagesPerPort) { 436 PRINT(("MessageDeliverer::TargetPort::_EnforceLimits(): port: %ld: hit maximum " 437 "message count limit.\n", fPortID)); 438 PopMessage(); 439 } 440 441 // message size 442 while (fMessageSize > kMaxDataPerPort) { 443 PRINT(("MessageDeliverer::TargetPort::_EnforceLimits(): port: %ld: hit maximum " 444 "message size limit.\n", fPortID)); 445 PopMessage(); 446 } 447 } 448 449 typedef DoublyLinkedList<TargetMessage> MessageList; 450 451 port_id fPortID; 452 MessageList fMessages; 453 int32 fMessageCount; 454 int32 fMessageSize; 455 set<TargetMessageHandle> fTimeoutableMessages; 456 }; 457 458 // TargetPortMap 459 struct MessageDeliverer::TargetPortMap : public map<port_id, TargetPort*> { 460 }; 461 462 463 // #pragma mark - 464 465 /*! \class MessageDeliverer 466 \brief Service for delivering messages, which retries the delivery as long 467 as the target port is full. 468 469 For the user of the service only the MessageDeliverer::DeliverMessage() 470 will be of interest. Some of them allow broadcasting a message to several 471 recepients. 472 473 The class maintains a TargetPort for each target port which was full at the 474 time a message was to be delivered to it. A TargetPort has a queue of 475 undelivered messages. A separate worker thread retries periodically to send 476 the yet undelivered messages to the respective target ports. 477 */ 478 479 // constructor 480 MessageDeliverer::MessageDeliverer() 481 : fLock("message deliverer"), 482 fTargetPorts(NULL), 483 fDelivererThread(-1), 484 fTerminating(false) 485 { 486 } 487 488 // destructor 489 MessageDeliverer::~MessageDeliverer() 490 { 491 fTerminating = true; 492 493 if (fDelivererThread >= 0) { 494 int32 result; 495 wait_for_thread(fDelivererThread, &result); 496 } 497 498 delete fTargetPorts; 499 } 500 501 // Init 502 status_t 503 MessageDeliverer::Init() 504 { 505 // create the target port map 506 fTargetPorts = new(nothrow) TargetPortMap; 507 if (!fTargetPorts) 508 return B_NO_MEMORY; 509 510 // spawn the deliverer thread 511 fDelivererThread = spawn_thread(MessageDeliverer::_DelivererThreadEntry, 512 "message deliverer", B_NORMAL_PRIORITY + 1, this); 513 if (fDelivererThread < 0) 514 return fDelivererThread; 515 516 // resume the deliverer thread 517 resume_thread(fDelivererThread); 518 519 return B_OK; 520 } 521 522 // CreateDefault 523 status_t 524 MessageDeliverer::CreateDefault() 525 { 526 if (sDeliverer) 527 return B_OK; 528 529 // create the deliverer 530 MessageDeliverer *deliverer = new(nothrow) MessageDeliverer; 531 if (!deliverer) 532 return B_NO_MEMORY; 533 534 // init it 535 status_t error = deliverer->Init(); 536 if (error != B_OK) { 537 delete deliverer; 538 return error; 539 } 540 541 sDeliverer = deliverer; 542 return B_OK; 543 } 544 545 // DeleteDefault 546 void 547 MessageDeliverer::DeleteDefault() 548 { 549 if (sDeliverer) { 550 delete sDeliverer; 551 sDeliverer = NULL; 552 } 553 } 554 555 // Default 556 MessageDeliverer * 557 MessageDeliverer::Default() 558 { 559 return sDeliverer; 560 } 561 562 // DeliverMessage 563 /*! \brief Delivers a message to the supplied target. 564 565 The method tries to send the message right now (if there are not already 566 messages pending for the target port). If that fails due to a full target 567 port, the message is queued for later delivery. 568 569 \param message The message to be delivered. 570 \param target A BMessenger identifying the delivery target. 571 \param timeout If given, the message will be dropped, when it couldn't be 572 delivered after this amount of microseconds. 573 \return 574 - \c B_OK, if sending the message succeeded or if the target port was 575 full and the message has been queued, 576 - another error code otherwise. 577 */ 578 status_t 579 MessageDeliverer::DeliverMessage(BMessage *message, BMessenger target, 580 bigtime_t timeout) 581 { 582 SingleMessagingTargetSet set(target); 583 return DeliverMessage(message, set, timeout); 584 } 585 586 // DeliverMessage 587 /*! \brief Delivers a message to the supplied targets. 588 589 The method tries to send the message right now to each of the given targets 590 (if there are not already messages pending for a target port). If that 591 fails due to a full target port, the message is queued for later delivery. 592 593 \param message The message to be delivered. 594 \param targets MessagingTargetSet providing the the delivery targets. 595 \param timeout If given, the message will be dropped, when it couldn't be 596 delivered after this amount of microseconds. 597 \return 598 - \c B_OK, if for each of the given targets sending the message succeeded 599 or if the target port was full and the message has been queued, 600 - another error code otherwise. 601 */ 602 status_t 603 MessageDeliverer::DeliverMessage(BMessage *message, MessagingTargetSet &targets, 604 bigtime_t timeout) 605 { 606 if (!message) 607 return B_BAD_VALUE; 608 609 // flatten the message 610 BMallocIO mallocIO; 611 status_t error = message->Flatten(&mallocIO, NULL); 612 if (error < B_OK) 613 return error; 614 615 return DeliverMessage(mallocIO.Buffer(), mallocIO.BufferLength(), targets, 616 timeout); 617 } 618 619 // DeliverMessage 620 /*! \brief Delivers a flattened message to the supplied targets. 621 622 The method tries to send the message right now to each of the given targets 623 (if there are not already messages pending for a target port). If that 624 fails due to a full target port, the message is queued for later delivery. 625 626 \param message The flattened message to be delivered. This may be a 627 flattened BMessage or KMessage. 628 \param messageSize The size of the flattened message buffer. 629 \param targets MessagingTargetSet providing the the delivery targets. 630 \param timeout If given, the message will be dropped, when it couldn't be 631 delivered after this amount of microseconds. 632 \return 633 - \c B_OK, if for each of the given targets sending the message succeeded 634 or if the target port was full and the message has been queued, 635 - another error code otherwise. 636 */ 637 status_t 638 MessageDeliverer::DeliverMessage(const void *messageData, int32 messageSize, 639 MessagingTargetSet &targets, bigtime_t timeout) 640 { 641 if (!messageData || messageSize <= 0) 642 return B_BAD_VALUE; 643 644 // clone the buffer 645 void *data = malloc(messageSize); 646 if (!data) 647 return B_NO_MEMORY; 648 memcpy(data, messageData, messageSize); 649 650 // create a Message 651 Message *message = new(nothrow) Message(data, messageSize, timeout); 652 if (!message) { 653 free(data); 654 return B_NO_MEMORY; 655 } 656 Reference<Message> _(message, true); 657 658 // add the message to the respective target ports 659 BAutolock locker(fLock); 660 for (int32 targetIndex = 0; targets.HasNext(); targetIndex++) { 661 port_id portID; 662 int32 token; 663 targets.Next(portID, token); 664 665 // get the target port 666 TargetPort *port = _GetTargetPort(portID, true); 667 if (!port) 668 return B_NO_MEMORY; 669 670 // try sending the message, if there are no queued messages yet 671 if (port->IsEmpty()) { 672 status_t error = _SendMessage(message, portID, token); 673 // if the message was delivered OK, we're done with the target 674 if (error == B_OK) { 675 _PutTargetPort(port); 676 continue; 677 } 678 679 // if the port is not full, but an error occurred, we skip this target 680 if (error != B_WOULD_BLOCK) { 681 _PutTargetPort(port); 682 if (targetIndex == 0 && !targets.HasNext()) 683 return error; 684 continue; 685 } 686 } 687 688 // add the message 689 status_t error = port->PushMessage(message, token); 690 _PutTargetPort(port); 691 if (error != B_OK) 692 return error; 693 } 694 695 return B_OK; 696 } 697 698 // _GetTargetPort 699 MessageDeliverer::TargetPort * 700 MessageDeliverer::_GetTargetPort(port_id portID, bool create) 701 { 702 // get the port from the map 703 TargetPortMap::iterator it = fTargetPorts->find(portID); 704 if (it != fTargetPorts->end()) 705 return it->second; 706 707 if (!create) 708 return NULL; 709 710 // create a port 711 TargetPort *port = new(nothrow) TargetPort(portID); 712 if (!port) 713 return NULL; 714 (*fTargetPorts)[portID] = port; 715 716 return port; 717 } 718 719 // _PutTargetPort 720 void 721 MessageDeliverer::_PutTargetPort(TargetPort *port) 722 { 723 if (!port) 724 return; 725 726 if (port->IsEmpty()) { 727 fTargetPorts->erase(port->PortID()); 728 delete port; 729 } 730 } 731 732 // _SendMessage 733 status_t 734 MessageDeliverer::_SendMessage(Message *message, port_id portID, int32 token) 735 { 736 status_t error = BMessage::Private::SendFlattenedMessage(message->Data(), 737 message->DataSize(), portID, token, 0); 738 //PRINT(("MessageDeliverer::_SendMessage(%p, port: %ld, token: %ld): %lx\n", 739 //message, portID, token, error)); 740 return error; 741 } 742 743 // _DelivererThreadEntry 744 int32 745 MessageDeliverer::_DelivererThreadEntry(void *data) 746 { 747 return ((MessageDeliverer*)data)->_DelivererThread(); 748 } 749 750 // _DelivererThread 751 int32 752 MessageDeliverer::_DelivererThread() 753 { 754 while (!fTerminating) { 755 snooze(kRetryDelay); 756 if (fTerminating) 757 break; 758 759 // iterate through all target ports and try sending the messages 760 BAutolock _(fLock); 761 for (TargetPortMap::iterator it = fTargetPorts->begin(); 762 it != fTargetPorts->end();) { 763 TargetPort *port = it->second; 764 bool portError = false; 765 766 port->DropTimedOutMessages(); 767 768 // try sending all messages 769 int32 token; 770 while (Message *message = port->PeekMessage(token)) { 771 status_t error = B_OK; 772 // if (message->TimeoutTime() > system_time()) { 773 error = _SendMessage(message, port->PortID(), token); 774 // } else { 775 // // timeout, drop message 776 // PRINT(("MessageDeliverer::_DelivererThread(): port %ld, " 777 // "message %p timed out\n", port->PortID(), message)); 778 // } 779 780 if (error == B_OK) { 781 port->PopMessage(); 782 } else if (error == B_WOULD_BLOCK) { 783 // no luck yet -- port is still full 784 break; 785 } else { 786 // unexpected error -- probably the port is gone 787 portError = true; 788 break; 789 } 790 } 791 792 // next port 793 if (portError || port->IsEmpty()) { 794 TargetPortMap::iterator oldIt = it; 795 ++it; 796 delete port; 797 fTargetPorts->erase(oldIt); 798 } else 799 ++it; 800 } 801 } 802 803 return 0; 804 } 805