1 /* 2 * Copyright 2015, Haiku. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Joseph Groover <looncraz@looncraz.net> 7 */ 8 9 10 #include "DelayedMessage.h" 11 12 #include <stdio.h> 13 #include <stdlib.h> 14 #include <string.h> 15 16 #include <Autolock.h> 17 #include <String.h> 18 19 #include <LinkSender.h> 20 #include <ServerProtocol.h> 21 22 23 // DelayedMessageSender constants 24 static const int32 kWakeupMessage = AS_LAST_CODE + 2048; 25 static const int32 kExitMessage = kWakeupMessage + 1; 26 27 static const char* kName = "DMT is here for you, eventually..."; 28 static int32 kPriority = B_URGENT_DISPLAY_PRIORITY; 29 static int32 kPortCapacity = 10; 30 31 32 //! Data attachment structure. 33 struct Attachment { 34 Attachment(const void* data, size_t size); 35 ~Attachment(); 36 37 const void* constData; 38 void* data; 39 size_t size; 40 }; 41 42 43 typedef BObjectList<Attachment> AttachmentList; 44 45 46 /*! \class ScheduledMessage 47 \brief Responsible for sending of delayed message. 48 */ 49 class ScheduledMessage { 50 public: 51 ScheduledMessage(DelayedMessage& message); 52 ~ScheduledMessage(); 53 54 int32 CountTargets() const; 55 56 void Finalize(); 57 bigtime_t ScheduledTime() const; 58 int32 SendMessage(); 59 bool IsValid() const; 60 bool Merge(DelayedMessage& message); 61 62 status_t SendMessageToPort(port_id port); 63 bool operator<(const ScheduledMessage& other) const; 64 65 DelayedMessageData* fData; 66 }; 67 68 69 /*! \class DelayedMessageSender DelayedMessageSender.h 70 \brief Responsible for scheduling and sending of delayed messages 71 */ 72 class DelayedMessageSender { 73 public: 74 explicit DelayedMessageSender(); 75 ~DelayedMessageSender(); 76 77 status_t ScheduleMessage (DelayedMessage& message); 78 79 int32 CountDelayedMessages() const; 80 int64 CountSentMessages() const; 81 82 private: 83 void _MessageLoop(); 84 int32 _SendDelayedMessages(); 85 static int32 _thread_func(void* sender); 86 void _Wakeup(bigtime_t whatTime); 87 88 private: 89 typedef BObjectList<ScheduledMessage> ScheduledList; 90 91 mutable BLocker fLock; 92 ScheduledList fMessages; 93 94 bigtime_t fScheduledWakeup; 95 96 int32 fWakeupRetry; 97 thread_id fThread; 98 port_id fPort; 99 100 mutable int64 fSentCount; 101 }; 102 103 104 DelayedMessageSender gDelayedMessageSender; 105 106 107 /*! \class DelayedMessageData DelayedMessageSender.h 108 \brief Owns DelayedMessage data, allocates memory and copies data only 109 when needed, 110 */ 111 class DelayedMessageData { 112 typedef BObjectList<port_id> PortList; 113 typedef void(*FailureCallback)(int32 code, port_id port, void* data); 114 public: 115 DelayedMessageData(int32 code, bigtime_t delay, 116 bool isSpecificTime); 117 ~DelayedMessageData(); 118 119 bool AddTarget(port_id port); 120 void RemoveTarget(port_id port); 121 int32 CountTargets() const; 122 123 void MergeTargets(DelayedMessageData* other); 124 125 bool CopyData(); 126 bool MergeData(DelayedMessageData* other); 127 128 bool IsValid() const; 129 // Only valid after a successful CopyData(). 130 131 status_t Attach(const void* data, size_t size); 132 133 bool Compare(Attachment* one, Attachment* two, 134 int32 index); 135 136 void SetMerge(DMMergeMode mode, uint32 mask); 137 void SendFailed(port_id port); 138 139 void SetFailureCallback(FailureCallback callback, 140 void* data); 141 142 // Accessors. 143 int32& Code() {return fCode;} 144 const int32& Code() const {return fCode;} 145 146 bigtime_t& ScheduledTime() {return fScheduledTime;} 147 const bigtime_t& ScheduledTime() const {return fScheduledTime;} 148 149 AttachmentList& Attachments() {return fAttachments;} 150 const AttachmentList& Attachments() const {return fAttachments;} 151 152 PortList& Targets() {return fTargets;} 153 const PortList& Targets() const {return fTargets;} 154 155 private: 156 // Data members. 157 158 int32 fCode; 159 bigtime_t fScheduledTime; 160 bool fValid; 161 162 AttachmentList fAttachments; 163 PortList fTargets; 164 165 DMMergeMode fMergeMode; 166 uint32 fMergeMask; 167 168 FailureCallback fFailureCallback; 169 void* fFailureData; 170 }; 171 172 173 // #pragma mark - 174 175 176 177 DelayedMessage::DelayedMessage(int32 code, bigtime_t delay, 178 bool isSpecificTime) 179 : 180 fData(new(std::nothrow) DelayedMessageData(code, delay < DM_MINIMUM_DELAY 181 ? DM_MINIMUM_DELAY : delay, isSpecificTime)), 182 fHandedOff(false) 183 { 184 } 185 186 187 DelayedMessage::~DelayedMessage() 188 { 189 // Message is canceled without a handoff. 190 if (!fHandedOff) 191 delete fData; 192 } 193 194 195 bool 196 DelayedMessage::AddTarget(port_id port) 197 { 198 if (fData == NULL || fHandedOff) 199 return false; 200 201 return fData->AddTarget(port); 202 } 203 204 205 void 206 DelayedMessage::SetMerge(DMMergeMode mode, uint32 match) 207 { 208 if (fData == NULL || fHandedOff) 209 return; 210 211 fData->SetMerge(mode, match); 212 } 213 214 215 void 216 DelayedMessage::SetFailureCallback(void (*callback)(int32, port_id, void*), 217 void* data) 218 { 219 if (fData == NULL || fHandedOff) 220 return; 221 222 fData->SetFailureCallback(callback, data); 223 } 224 225 226 //! Attach data to message. Memory is not allocated nor copied until handoff. 227 status_t 228 DelayedMessage::Attach(const void* data, size_t size) 229 { 230 if (fData == NULL) 231 return B_NO_MEMORY; 232 233 if (fHandedOff) 234 return B_ERROR; 235 236 if (data == NULL || size == 0) 237 return B_BAD_VALUE; 238 239 return fData->Attach(data, size); 240 } 241 242 243 status_t 244 DelayedMessage::Flush() 245 { 246 if (fData == NULL) 247 return B_NO_MEMORY; 248 249 if (fHandedOff) 250 return B_ERROR; 251 252 if (fData->CountTargets() == 0) 253 return B_BAD_VALUE; 254 255 return gDelayedMessageSender.ScheduleMessage(*this); 256 } 257 258 259 /*! The data handoff occurs upon scheduling and reduces copies to only 260 when a message is actually scheduled. Canceled messages have low cost. 261 */ 262 DelayedMessageData* 263 DelayedMessage::HandOff() 264 { 265 if (fData == NULL || fHandedOff) 266 return NULL; 267 268 if (fData->CopyData()) { 269 fHandedOff = true; 270 return fData; 271 } 272 273 return NULL; 274 } 275 276 277 // #pragma mark - 278 279 280 Attachment::Attachment(const void* _data, size_t _size) 281 : 282 constData(_data), 283 data(NULL), 284 size(_size) 285 { 286 } 287 288 289 Attachment::~Attachment() 290 { 291 free(data); 292 } 293 294 295 // #pragma mark - 296 297 298 DelayedMessageData::DelayedMessageData(int32 code, bigtime_t delay, 299 bool isSpecificTime) 300 : 301 fCode(code), 302 fScheduledTime(delay + (isSpecificTime ? 0 : system_time())), 303 fValid(false), 304 305 fAttachments(3, true), 306 fTargets(4, true), 307 308 fMergeMode(DM_NO_MERGE), 309 fMergeMask(DM_DATA_DEFAULT), 310 311 fFailureCallback(NULL), 312 fFailureData(NULL) 313 { 314 } 315 316 317 DelayedMessageData::~DelayedMessageData() 318 { 319 } 320 321 322 bool 323 DelayedMessageData::AddTarget(port_id port) 324 { 325 if (port <= 0) 326 return false; 327 328 // check for duplicates: 329 for (int32 index = 0; index < fTargets.CountItems(); ++index) { 330 if (port == *fTargets.ItemAt(index)) 331 return false; 332 } 333 334 return fTargets.AddItem(new(std::nothrow) port_id(port)); 335 } 336 337 338 void 339 DelayedMessageData::RemoveTarget(port_id port) 340 { 341 if (port == B_BAD_PORT_ID) 342 return; 343 344 // Search for a match by value. 345 for (int32 index = 0; index < fTargets.CountItems(); ++index) { 346 port_id* target = fTargets.ItemAt(index); 347 if (port == *target) { 348 fTargets.RemoveItem(target, true); 349 return; 350 } 351 } 352 } 353 354 355 int32 356 DelayedMessageData::CountTargets() const 357 { 358 return fTargets.CountItems(); 359 } 360 361 362 void 363 DelayedMessageData::MergeTargets(DelayedMessageData* other) 364 { 365 // Failure to add one target does not abort the loop! 366 // It could just mean we already have the target. 367 for (int32 index = 0; index < other->fTargets.CountItems(); ++index) 368 AddTarget(*(other->fTargets.ItemAt(index))); 369 } 370 371 372 //! Copy data from original location - merging failed 373 bool 374 DelayedMessageData::CopyData() 375 { 376 Attachment* attached = NULL; 377 378 for (int32 index = 0; index < fAttachments.CountItems(); ++index) { 379 attached = fAttachments.ItemAt(index); 380 381 if (attached == NULL || attached->data != NULL) 382 return false; 383 384 attached->data = malloc(attached->size); 385 if (attached->data == NULL) 386 return false; 387 388 memcpy(attached->data, attached->constData, attached->size); 389 } 390 391 fValid = true; 392 return true; 393 } 394 395 396 bool 397 DelayedMessageData::MergeData(DelayedMessageData* other) 398 { 399 if (!fValid 400 || other == NULL 401 || other->fCode != fCode 402 || fMergeMode == DM_NO_MERGE 403 || other->fMergeMode == DM_NO_MERGE 404 || other->fMergeMode != fMergeMode 405 || other->fAttachments.CountItems() != fAttachments.CountItems()) 406 return false; 407 408 if (other->fMergeMode == DM_MERGE_CANCEL) { 409 MergeTargets(other); 410 return true; 411 } 412 413 // Compare data 414 Attachment* attached = NULL; 415 Attachment* otherAttached = NULL; 416 417 for (int32 index = 0; index < fAttachments.CountItems(); ++index) { 418 attached = fAttachments.ItemAt(index); 419 otherAttached = other->fAttachments.ItemAt(index); 420 421 if (attached == NULL 422 || otherAttached == NULL 423 || attached->data == NULL 424 || otherAttached->constData == NULL 425 || attached->size != otherAttached->size) 426 return false; 427 428 // Compares depending upon mode & flags 429 if (!Compare(attached, otherAttached, index)) 430 return false; 431 } 432 433 // add any targets not included in the existing message! 434 MergeTargets(other); 435 436 // since these are duplicates, we need not copy anything... 437 if (fMergeMode == DM_MERGE_DUPLICATES) 438 return true; 439 440 // DM_MERGE_REPLACE: 441 442 // Import the new data! 443 for (int32 index = 0; index < fAttachments.CountItems(); ++index) { 444 attached = fAttachments.ItemAt(index); 445 otherAttached = other->fAttachments.ItemAt(index); 446 447 // We already have allocated our memory, but the other data 448 // has not. So this reduces memory allocations. 449 memcpy(attached->data, otherAttached->constData, attached->size); 450 } 451 452 return true; 453 } 454 455 456 bool 457 DelayedMessageData::IsValid() const 458 { 459 return fValid; 460 } 461 462 463 status_t 464 DelayedMessageData::Attach(const void* data, size_t size) 465 { 466 // Sanity checking already performed 467 Attachment* attach = new(std::nothrow) Attachment(data, size); 468 469 if (attach == NULL) 470 return B_NO_MEMORY; 471 472 if (fAttachments.AddItem(attach) == false) { 473 delete attach; 474 return B_ERROR; 475 } 476 477 return B_OK; 478 } 479 480 481 bool 482 DelayedMessageData::Compare(Attachment* one, Attachment* two, int32 index) 483 { 484 if (fMergeMode == DM_MERGE_DUPLICATES) { 485 486 // Default-policy: all data must match 487 if (fMergeMask == DM_DATA_DEFAULT || (fMergeMask & 1 << index) != 0) 488 return memcmp(one->data, two->constData, one->size) == 0; 489 490 } else if (fMergeMode == DM_MERGE_REPLACE) { 491 492 // Default Policy: no data needs to match 493 if (fMergeMask != DM_DATA_DEFAULT && (fMergeMask & 1 << index) != 0) 494 return memcmp(one->data, two->constData, one->size) == 0; 495 } 496 497 return true; 498 } 499 500 501 void 502 DelayedMessageData::SetMerge(DMMergeMode mode, uint32 mask) 503 { 504 fMergeMode = mode; 505 fMergeMask = mask; 506 } 507 508 509 void 510 DelayedMessageData::SendFailed(port_id port) 511 { 512 if (fFailureCallback != NULL) 513 fFailureCallback(fCode, port, fFailureData); 514 } 515 516 517 void 518 DelayedMessageData::SetFailureCallback(FailureCallback callback, void* data) 519 { 520 fFailureCallback = callback; 521 fFailureData = data; 522 } 523 524 525 // #pragma mark - 526 527 528 ScheduledMessage::ScheduledMessage(DelayedMessage& message) 529 : 530 fData(message.HandOff()) 531 { 532 } 533 534 535 ScheduledMessage::~ScheduledMessage() 536 { 537 delete fData; 538 } 539 540 541 int32 542 ScheduledMessage::CountTargets() const 543 { 544 if (fData == NULL) 545 return 0; 546 547 return fData->CountTargets(); 548 } 549 550 551 bigtime_t 552 ScheduledMessage::ScheduledTime() const 553 { 554 if (fData == NULL) 555 return 0; 556 557 return fData->ScheduledTime(); 558 } 559 560 561 //! Send our message and data to their intended target(s) 562 int32 563 ScheduledMessage::SendMessage() 564 { 565 if (fData == NULL || !fData->IsValid()) 566 return 0; 567 568 int32 sent = 0; 569 for (int32 index = 0; index < fData->Targets().CountItems(); ++index) { 570 port_id port = *(fData->Targets().ItemAt(index)); 571 status_t error = SendMessageToPort(port); 572 573 if (error == B_OK) { 574 ++sent; 575 continue; 576 } 577 578 if (error != B_TIMED_OUT) 579 fData->SendFailed(port); 580 } 581 582 return sent; 583 } 584 585 586 status_t 587 ScheduledMessage::SendMessageToPort(port_id port) 588 { 589 if (fData == NULL || !fData->IsValid()) 590 return B_BAD_DATA; 591 592 if (port == B_BAD_PORT_ID) 593 return B_BAD_VALUE; 594 595 BPrivate::LinkSender sender(port); 596 if (sender.StartMessage(fData->Code()) != B_OK) 597 return B_ERROR; 598 599 AttachmentList& list = fData->Attachments(); 600 Attachment* attached = NULL; 601 status_t error = B_OK; 602 603 // The data has been checked already, so we assume it is all good 604 for (int32 index = 0; index < list.CountItems(); ++index) { 605 attached = list.ItemAt(index); 606 607 error = sender.Attach(attached->data, attached->size); 608 if (error != B_OK) { 609 sender.CancelMessage(); 610 return error; 611 } 612 } 613 614 // We do not want to ever hold up the sender thread for too long, we 615 // set a 1 second sending delay, which should be more than enough for 616 // 99.992% of all cases. Approximately. 617 error = sender.Flush(1000000); 618 619 if (error == B_OK || error == B_BAD_PORT_ID) 620 fData->RemoveTarget(port); 621 622 return error; 623 } 624 625 626 bool 627 ScheduledMessage::IsValid() const 628 { 629 return fData != NULL && fData->IsValid(); 630 } 631 632 633 bool 634 ScheduledMessage::Merge(DelayedMessage& other) 635 { 636 if (!IsValid()) 637 return false; 638 639 return fData->MergeData(other.Data()); 640 } 641 642 643 bool 644 ScheduledMessage::operator<(const ScheduledMessage& other) const 645 { 646 if (!IsValid() || !other.IsValid()) 647 return false; 648 649 return fData->ScheduledTime() < other.fData->ScheduledTime(); 650 } 651 652 653 int 654 CompareMessages(const ScheduledMessage* one, const ScheduledMessage* two) 655 { 656 return *one < *two; 657 } 658 659 660 // #pragma mark - 661 662 663 DelayedMessageSender::DelayedMessageSender() 664 : 665 fLock("DelayedMessageSender"), 666 fMessages(20, true), 667 fScheduledWakeup(B_INFINITE_TIMEOUT), 668 fWakeupRetry(0), 669 fThread(spawn_thread(&_thread_func, kName, kPriority, this)), 670 fPort(create_port(kPortCapacity, "DelayedMessageSender")), 671 fSentCount(0) 672 { 673 resume_thread(fThread); 674 } 675 676 677 DelayedMessageSender::~DelayedMessageSender() 678 { 679 // write the exit message to our port 680 write_port(fPort, kExitMessage, NULL, 0); 681 682 status_t status = B_OK; 683 while (wait_for_thread(fThread, &status) == B_OK); 684 685 // We now know the thread has exited, it is safe to cleanup 686 delete_port(fPort); 687 } 688 689 690 status_t 691 DelayedMessageSender::ScheduleMessage(DelayedMessage& message) 692 { 693 BAutolock _(fLock); 694 695 // Can we merge with a pending message? 696 ScheduledMessage* pending = NULL; 697 for (int32 index = 0; index < fMessages.CountItems(); ++index) { 698 pending = fMessages.ItemAt(index); 699 if (pending->Merge(message)) 700 return B_OK; 701 } 702 703 // Guess not, add it to our list! 704 ScheduledMessage* scheduled = new(std::nothrow) ScheduledMessage(message); 705 706 if (scheduled == NULL) 707 return B_NO_MEMORY; 708 709 if (!scheduled->IsValid()) { 710 delete scheduled; 711 return B_BAD_DATA; 712 } 713 714 if (fMessages.AddItem(scheduled)) { 715 fMessages.SortItems(&CompareMessages); 716 _Wakeup(scheduled->ScheduledTime()); 717 return B_OK; 718 } 719 720 return B_ERROR; 721 } 722 723 724 int32 725 DelayedMessageSender::CountDelayedMessages() const 726 { 727 BAutolock _(fLock); 728 return fMessages.CountItems(); 729 } 730 731 732 int64 733 DelayedMessageSender::CountSentMessages() const 734 { 735 return atomic_get64(&fSentCount); 736 } 737 738 739 void 740 DelayedMessageSender::_MessageLoop() 741 { 742 int32 code = -1; 743 status_t status = B_TIMED_OUT; 744 bigtime_t timeout = B_INFINITE_TIMEOUT; 745 746 while (true) { 747 timeout = atomic_get64(&fScheduledWakeup); 748 if (timeout != B_INFINITE_TIMEOUT) 749 timeout -= (system_time() + (DM_MINIMUM_DELAY / 2)); 750 751 if (timeout > DM_MINIMUM_DELAY / 4) { 752 status = read_port_etc(fPort, &code, NULL, 0, B_RELATIVE_TIMEOUT, 753 timeout); 754 } else 755 status = B_TIMED_OUT; 756 757 if (status == B_INTERRUPTED) 758 continue; 759 760 if (status == B_TIMED_OUT) { 761 _SendDelayedMessages(); 762 continue; 763 } 764 765 if (status == B_OK) { 766 switch (code) { 767 case kWakeupMessage: 768 continue; 769 770 case kExitMessage: 771 return; 772 773 // TODO: trace unhandled messages 774 default: 775 continue; 776 } 777 } 778 779 // port deleted? 780 if (status < B_OK) 781 break; 782 } 783 } 784 785 786 int32 787 DelayedMessageSender::_thread_func(void* sender) 788 { 789 (static_cast<DelayedMessageSender*>(sender))->_MessageLoop(); 790 return 0; 791 } 792 793 794 //! Sends pending messages, call ONLY from sender thread! 795 int32 796 DelayedMessageSender::_SendDelayedMessages() 797 { 798 // avoid sending messages during times of contention 799 if (fLock.LockWithTimeout(30000) != B_OK) { 800 atomic_add64(&fScheduledWakeup, DM_MINIMUM_DELAY); 801 return 0; 802 } 803 804 atomic_set64(&fScheduledWakeup, B_INFINITE_TIMEOUT); 805 806 if (fMessages.CountItems() == 0) { 807 fLock.Unlock(); 808 return 0; 809 } 810 811 int32 sent = 0; 812 813 bigtime_t time = system_time() + DM_MINIMUM_DELAY / 2; 814 // capture any that may be on the verge of being sent. 815 816 BObjectList<ScheduledMessage> remove; 817 818 ScheduledMessage* message = NULL; 819 for (int32 index = 0; index < fMessages.CountItems(); ++index) { 820 message = fMessages.ItemAt(index); 821 822 if (message->ScheduledTime() > time) { 823 atomic_set64(&fScheduledWakeup, message->ScheduledTime()); 824 break; 825 } 826 827 int32 sendCount = message->SendMessage(); 828 if (sendCount > 0) 829 sent += sendCount; 830 831 if (message->CountTargets() == 0) 832 remove.AddItem(message); 833 } 834 835 // remove serviced messages 836 for (int32 index = 0; index < remove.CountItems(); ++index) 837 fMessages.RemoveItem(remove.ItemAt(index)); 838 839 atomic_add64(&fSentCount, sent); 840 841 // catch any partly-failed messages (possibly late): 842 if (fMessages.CountItems() > 0 843 && atomic_get64(&fScheduledWakeup) == B_INFINITE_TIMEOUT) { 844 845 fMessages.SortItems(&CompareMessages); 846 message = fMessages.ItemAt(0); 847 bigtime_t timeout = message->ScheduledTime() - time; 848 849 if (timeout < 0) 850 timeout = DM_MINIMUM_DELAY; 851 852 atomic_set64(&fScheduledWakeup, timeout); 853 } 854 855 fLock.Unlock(); 856 return sent; 857 } 858 859 860 void 861 DelayedMessageSender::_Wakeup(bigtime_t when) 862 { 863 if (atomic_get64(&fScheduledWakeup) < when 864 && atomic_get(&fWakeupRetry) == 0) 865 return; 866 867 atomic_set64(&fScheduledWakeup, when); 868 869 BPrivate::LinkSender sender(fPort); 870 sender.StartMessage(kWakeupMessage); 871 status_t error = sender.Flush(30000); 872 atomic_set(&fWakeupRetry, (int32)error == B_TIMED_OUT); 873 } 874 875