1 /*
2 * Copyright 2015, Haiku.
3 * Distributed under the terms of the MIT License.
4 *
5 * Authors:
6 * Joseph Groover <looncraz@looncraz.net>
7 */
8
9
10 #include "DelayedMessage.h"
11
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <string.h>
15
16 #include <Autolock.h>
17 #include <String.h>
18
19 #include <LinkSender.h>
20 #include <ServerProtocol.h>
21
22
23 // DelayedMessageSender constants
24 static const int32 kWakeupMessage = AS_LAST_CODE + 2048;
25 static const int32 kExitMessage = kWakeupMessage + 1;
26
27 static const char* kName = "DMT is here for you, eventually...";
28 static int32 kPriority = B_URGENT_DISPLAY_PRIORITY;
29 static int32 kPortCapacity = 10;
30
31
32 //! Data attachment structure.
33 struct Attachment {
34 Attachment(const void* data, size_t size);
35 ~Attachment();
36
37 const void* constData;
38 void* data;
39 size_t size;
40 };
41
42
43 typedef BObjectList<Attachment> AttachmentList;
44
45
46 /*! \class ScheduledMessage
47 \brief Responsible for sending of delayed message.
48 */
49 class ScheduledMessage {
50 public:
51 ScheduledMessage(DelayedMessage& message);
52 ~ScheduledMessage();
53
54 int32 CountTargets() const;
55
56 void Finalize();
57 bigtime_t ScheduledTime() const;
58 int32 SendMessage();
59 bool IsValid() const;
60 bool Merge(DelayedMessage& message);
61
62 status_t SendMessageToPort(port_id port);
63 bool operator<(const ScheduledMessage& other) const;
64
65 DelayedMessageData* fData;
66 };
67
68
69 /*! \class DelayedMessageSender DelayedMessageSender.h
70 \brief Responsible for scheduling and sending of delayed messages
71 */
72 class DelayedMessageSender {
73 public:
74 explicit DelayedMessageSender();
75 ~DelayedMessageSender();
76
77 status_t ScheduleMessage (DelayedMessage& message);
78
79 int32 CountDelayedMessages() const;
80 int64 CountSentMessages() const;
81
82 private:
83 void _MessageLoop();
84 int32 _SendDelayedMessages();
85 static int32 _thread_func(void* sender);
86 void _Wakeup(bigtime_t whatTime);
87
88 private:
89 typedef BObjectList<ScheduledMessage> ScheduledList;
90
91 mutable BLocker fLock;
92 ScheduledList fMessages;
93
94 bigtime_t fScheduledWakeup;
95
96 int32 fWakeupRetry;
97 thread_id fThread;
98 port_id fPort;
99
100 mutable int64 fSentCount;
101 };
102
103
104 DelayedMessageSender gDelayedMessageSender;
105
106
107 /*! \class DelayedMessageData DelayedMessageSender.h
108 \brief Owns DelayedMessage data, allocates memory and copies data only
109 when needed,
110 */
111 class DelayedMessageData {
112 typedef BObjectList<port_id> PortList;
113 typedef void(*FailureCallback)(int32 code, port_id port, void* data);
114 public:
115 DelayedMessageData(int32 code, bigtime_t delay,
116 bool isSpecificTime);
117 ~DelayedMessageData();
118
119 bool AddTarget(port_id port);
120 void RemoveTarget(port_id port);
121 int32 CountTargets() const;
122
123 void MergeTargets(DelayedMessageData* other);
124
125 bool CopyData();
126 bool MergeData(DelayedMessageData* other);
127
128 bool IsValid() const;
129 // Only valid after a successful CopyData().
130
131 status_t Attach(const void* data, size_t size);
132
133 bool Compare(Attachment* one, Attachment* two,
134 int32 index);
135
136 void SetMerge(DMMergeMode mode, uint32 mask);
137 void SendFailed(port_id port);
138
139 void SetFailureCallback(FailureCallback callback,
140 void* data);
141
142 // Accessors.
Code()143 int32& Code() {return fCode;}
Code() const144 const int32& Code() const {return fCode;}
145
ScheduledTime()146 bigtime_t& ScheduledTime() {return fScheduledTime;}
ScheduledTime() const147 const bigtime_t& ScheduledTime() const {return fScheduledTime;}
148
Attachments()149 AttachmentList& Attachments() {return fAttachments;}
Attachments() const150 const AttachmentList& Attachments() const {return fAttachments;}
151
Targets()152 PortList& Targets() {return fTargets;}
Targets() const153 const PortList& Targets() const {return fTargets;}
154
155 private:
156 // Data members.
157
158 int32 fCode;
159 bigtime_t fScheduledTime;
160 bool fValid;
161
162 AttachmentList fAttachments;
163 PortList fTargets;
164
165 DMMergeMode fMergeMode;
166 uint32 fMergeMask;
167
168 FailureCallback fFailureCallback;
169 void* fFailureData;
170 };
171
172
173 // #pragma mark -
174
175
176
DelayedMessage(int32 code,bigtime_t delay,bool isSpecificTime)177 DelayedMessage::DelayedMessage(int32 code, bigtime_t delay,
178 bool isSpecificTime)
179 :
180 fData(new(std::nothrow) DelayedMessageData(code, delay < DM_MINIMUM_DELAY
181 ? DM_MINIMUM_DELAY : delay, isSpecificTime)),
182 fHandedOff(false)
183 {
184 }
185
186
~DelayedMessage()187 DelayedMessage::~DelayedMessage()
188 {
189 // Message is canceled without a handoff.
190 if (!fHandedOff)
191 delete fData;
192 }
193
194
195 bool
AddTarget(port_id port)196 DelayedMessage::AddTarget(port_id port)
197 {
198 if (fData == NULL || fHandedOff)
199 return false;
200
201 return fData->AddTarget(port);
202 }
203
204
205 void
SetMerge(DMMergeMode mode,uint32 match)206 DelayedMessage::SetMerge(DMMergeMode mode, uint32 match)
207 {
208 if (fData == NULL || fHandedOff)
209 return;
210
211 fData->SetMerge(mode, match);
212 }
213
214
215 void
SetFailureCallback(void (* callback)(int32,port_id,void *),void * data)216 DelayedMessage::SetFailureCallback(void (*callback)(int32, port_id, void*),
217 void* data)
218 {
219 if (fData == NULL || fHandedOff)
220 return;
221
222 fData->SetFailureCallback(callback, data);
223 }
224
225
226 //! Attach data to message. Memory is not allocated nor copied until handoff.
227 status_t
Attach(const void * data,size_t size)228 DelayedMessage::Attach(const void* data, size_t size)
229 {
230 if (fData == NULL)
231 return B_NO_MEMORY;
232
233 if (fHandedOff)
234 return B_ERROR;
235
236 if (data == NULL || size == 0)
237 return B_BAD_VALUE;
238
239 return fData->Attach(data, size);
240 }
241
242
243 status_t
Flush()244 DelayedMessage::Flush()
245 {
246 if (fData == NULL)
247 return B_NO_MEMORY;
248
249 if (fHandedOff)
250 return B_ERROR;
251
252 if (fData->CountTargets() == 0)
253 return B_BAD_VALUE;
254
255 return gDelayedMessageSender.ScheduleMessage(*this);
256 }
257
258
259 /*! The data handoff occurs upon scheduling and reduces copies to only
260 when a message is actually scheduled. Canceled messages have low cost.
261 */
262 DelayedMessageData*
HandOff()263 DelayedMessage::HandOff()
264 {
265 if (fData == NULL || fHandedOff)
266 return NULL;
267
268 if (fData->CopyData()) {
269 fHandedOff = true;
270 return fData;
271 }
272
273 return NULL;
274 }
275
276
277 // #pragma mark -
278
279
Attachment(const void * _data,size_t _size)280 Attachment::Attachment(const void* _data, size_t _size)
281 :
282 constData(_data),
283 data(NULL),
284 size(_size)
285 {
286 }
287
288
~Attachment()289 Attachment::~Attachment()
290 {
291 free(data);
292 }
293
294
295 // #pragma mark -
296
297
DelayedMessageData(int32 code,bigtime_t delay,bool isSpecificTime)298 DelayedMessageData::DelayedMessageData(int32 code, bigtime_t delay,
299 bool isSpecificTime)
300 :
301 fCode(code),
302 fScheduledTime(delay + (isSpecificTime ? 0 : system_time())),
303 fValid(false),
304
305 fAttachments(3, true),
306 fTargets(4, true),
307
308 fMergeMode(DM_NO_MERGE),
309 fMergeMask(DM_DATA_DEFAULT),
310
311 fFailureCallback(NULL),
312 fFailureData(NULL)
313 {
314 }
315
316
~DelayedMessageData()317 DelayedMessageData::~DelayedMessageData()
318 {
319 }
320
321
322 bool
AddTarget(port_id port)323 DelayedMessageData::AddTarget(port_id port)
324 {
325 if (port <= 0)
326 return false;
327
328 // check for duplicates:
329 for (int32 index = 0; index < fTargets.CountItems(); ++index) {
330 if (port == *fTargets.ItemAt(index))
331 return false;
332 }
333
334 return fTargets.AddItem(new(std::nothrow) port_id(port));
335 }
336
337
338 void
RemoveTarget(port_id port)339 DelayedMessageData::RemoveTarget(port_id port)
340 {
341 if (port == B_BAD_PORT_ID)
342 return;
343
344 // Search for a match by value.
345 for (int32 index = 0; index < fTargets.CountItems(); ++index) {
346 port_id* target = fTargets.ItemAt(index);
347 if (port == *target) {
348 fTargets.RemoveItem(target, true);
349 return;
350 }
351 }
352 }
353
354
355 int32
CountTargets() const356 DelayedMessageData::CountTargets() const
357 {
358 return fTargets.CountItems();
359 }
360
361
362 void
MergeTargets(DelayedMessageData * other)363 DelayedMessageData::MergeTargets(DelayedMessageData* other)
364 {
365 // Failure to add one target does not abort the loop!
366 // It could just mean we already have the target.
367 for (int32 index = 0; index < other->fTargets.CountItems(); ++index)
368 AddTarget(*(other->fTargets.ItemAt(index)));
369 }
370
371
372 //! Copy data from original location - merging failed
373 bool
CopyData()374 DelayedMessageData::CopyData()
375 {
376 Attachment* attached = NULL;
377
378 for (int32 index = 0; index < fAttachments.CountItems(); ++index) {
379 attached = fAttachments.ItemAt(index);
380
381 if (attached == NULL || attached->data != NULL)
382 return false;
383
384 attached->data = malloc(attached->size);
385 if (attached->data == NULL)
386 return false;
387
388 memcpy(attached->data, attached->constData, attached->size);
389 }
390
391 fValid = true;
392 return true;
393 }
394
395
396 bool
MergeData(DelayedMessageData * other)397 DelayedMessageData::MergeData(DelayedMessageData* other)
398 {
399 if (!fValid
400 || other == NULL
401 || other->fCode != fCode
402 || fMergeMode == DM_NO_MERGE
403 || other->fMergeMode == DM_NO_MERGE
404 || other->fMergeMode != fMergeMode
405 || other->fAttachments.CountItems() != fAttachments.CountItems())
406 return false;
407
408 if (other->fMergeMode == DM_MERGE_CANCEL) {
409 MergeTargets(other);
410 return true;
411 }
412
413 // Compare data
414 Attachment* attached = NULL;
415 Attachment* otherAttached = NULL;
416
417 for (int32 index = 0; index < fAttachments.CountItems(); ++index) {
418 attached = fAttachments.ItemAt(index);
419 otherAttached = other->fAttachments.ItemAt(index);
420
421 if (attached == NULL
422 || otherAttached == NULL
423 || attached->data == NULL
424 || otherAttached->constData == NULL
425 || attached->size != otherAttached->size)
426 return false;
427
428 // Compares depending upon mode & flags
429 if (!Compare(attached, otherAttached, index))
430 return false;
431 }
432
433 // add any targets not included in the existing message!
434 MergeTargets(other);
435
436 // since these are duplicates, we need not copy anything...
437 if (fMergeMode == DM_MERGE_DUPLICATES)
438 return true;
439
440 // DM_MERGE_REPLACE:
441
442 // Import the new data!
443 for (int32 index = 0; index < fAttachments.CountItems(); ++index) {
444 attached = fAttachments.ItemAt(index);
445 otherAttached = other->fAttachments.ItemAt(index);
446
447 // We already have allocated our memory, but the other data
448 // has not. So this reduces memory allocations.
449 memcpy(attached->data, otherAttached->constData, attached->size);
450 }
451
452 return true;
453 }
454
455
456 bool
IsValid() const457 DelayedMessageData::IsValid() const
458 {
459 return fValid;
460 }
461
462
463 status_t
Attach(const void * data,size_t size)464 DelayedMessageData::Attach(const void* data, size_t size)
465 {
466 // Sanity checking already performed
467 Attachment* attach = new(std::nothrow) Attachment(data, size);
468
469 if (attach == NULL)
470 return B_NO_MEMORY;
471
472 if (fAttachments.AddItem(attach) == false) {
473 delete attach;
474 return B_ERROR;
475 }
476
477 return B_OK;
478 }
479
480
481 bool
Compare(Attachment * one,Attachment * two,int32 index)482 DelayedMessageData::Compare(Attachment* one, Attachment* two, int32 index)
483 {
484 if (fMergeMode == DM_MERGE_DUPLICATES) {
485
486 // Default-policy: all data must match
487 if (fMergeMask == DM_DATA_DEFAULT || (fMergeMask & 1 << index) != 0)
488 return memcmp(one->data, two->constData, one->size) == 0;
489
490 } else if (fMergeMode == DM_MERGE_REPLACE) {
491
492 // Default Policy: no data needs to match
493 if (fMergeMask != DM_DATA_DEFAULT && (fMergeMask & 1 << index) != 0)
494 return memcmp(one->data, two->constData, one->size) == 0;
495 }
496
497 return true;
498 }
499
500
501 void
SetMerge(DMMergeMode mode,uint32 mask)502 DelayedMessageData::SetMerge(DMMergeMode mode, uint32 mask)
503 {
504 fMergeMode = mode;
505 fMergeMask = mask;
506 }
507
508
509 void
SendFailed(port_id port)510 DelayedMessageData::SendFailed(port_id port)
511 {
512 if (fFailureCallback != NULL)
513 fFailureCallback(fCode, port, fFailureData);
514 }
515
516
517 void
SetFailureCallback(FailureCallback callback,void * data)518 DelayedMessageData::SetFailureCallback(FailureCallback callback, void* data)
519 {
520 fFailureCallback = callback;
521 fFailureData = data;
522 }
523
524
525 // #pragma mark -
526
527
ScheduledMessage(DelayedMessage & message)528 ScheduledMessage::ScheduledMessage(DelayedMessage& message)
529 :
530 fData(message.HandOff())
531 {
532 }
533
534
~ScheduledMessage()535 ScheduledMessage::~ScheduledMessage()
536 {
537 delete fData;
538 }
539
540
541 int32
CountTargets() const542 ScheduledMessage::CountTargets() const
543 {
544 if (fData == NULL)
545 return 0;
546
547 return fData->CountTargets();
548 }
549
550
551 bigtime_t
ScheduledTime() const552 ScheduledMessage::ScheduledTime() const
553 {
554 if (fData == NULL)
555 return 0;
556
557 return fData->ScheduledTime();
558 }
559
560
561 //! Send our message and data to their intended target(s)
562 int32
SendMessage()563 ScheduledMessage::SendMessage()
564 {
565 if (fData == NULL || !fData->IsValid())
566 return 0;
567
568 int32 sent = 0;
569 for (int32 index = 0; index < fData->Targets().CountItems(); ++index) {
570 port_id port = *(fData->Targets().ItemAt(index));
571 status_t error = SendMessageToPort(port);
572
573 if (error == B_OK) {
574 ++sent;
575 continue;
576 }
577
578 if (error != B_TIMED_OUT)
579 fData->SendFailed(port);
580 }
581
582 return sent;
583 }
584
585
586 status_t
SendMessageToPort(port_id port)587 ScheduledMessage::SendMessageToPort(port_id port)
588 {
589 if (fData == NULL || !fData->IsValid())
590 return B_BAD_DATA;
591
592 if (port == B_BAD_PORT_ID)
593 return B_BAD_VALUE;
594
595 BPrivate::LinkSender sender(port);
596 if (sender.StartMessage(fData->Code()) != B_OK)
597 return B_ERROR;
598
599 AttachmentList& list = fData->Attachments();
600 Attachment* attached = NULL;
601 status_t error = B_OK;
602
603 // The data has been checked already, so we assume it is all good
604 for (int32 index = 0; index < list.CountItems(); ++index) {
605 attached = list.ItemAt(index);
606
607 error = sender.Attach(attached->data, attached->size);
608 if (error != B_OK) {
609 sender.CancelMessage();
610 return error;
611 }
612 }
613
614 // We do not want to ever hold up the sender thread for too long, we
615 // set a 1 second sending delay, which should be more than enough for
616 // 99.992% of all cases. Approximately.
617 error = sender.Flush(1000000);
618
619 if (error == B_OK || error == B_BAD_PORT_ID)
620 fData->RemoveTarget(port);
621
622 return error;
623 }
624
625
626 bool
IsValid() const627 ScheduledMessage::IsValid() const
628 {
629 return fData != NULL && fData->IsValid();
630 }
631
632
633 bool
Merge(DelayedMessage & other)634 ScheduledMessage::Merge(DelayedMessage& other)
635 {
636 if (!IsValid())
637 return false;
638
639 return fData->MergeData(other.Data());
640 }
641
642
643 bool
operator <(const ScheduledMessage & other) const644 ScheduledMessage::operator<(const ScheduledMessage& other) const
645 {
646 if (!IsValid() || !other.IsValid())
647 return false;
648
649 return fData->ScheduledTime() < other.fData->ScheduledTime();
650 }
651
652
653 int
CompareMessages(const ScheduledMessage * one,const ScheduledMessage * two)654 CompareMessages(const ScheduledMessage* one, const ScheduledMessage* two)
655 {
656 return *one < *two;
657 }
658
659
660 // #pragma mark -
661
662
DelayedMessageSender()663 DelayedMessageSender::DelayedMessageSender()
664 :
665 fLock("DelayedMessageSender"),
666 fMessages(20, true),
667 fScheduledWakeup(B_INFINITE_TIMEOUT),
668 fWakeupRetry(0),
669 fThread(spawn_thread(&_thread_func, kName, kPriority, this)),
670 fPort(create_port(kPortCapacity, "DelayedMessageSender")),
671 fSentCount(0)
672 {
673 resume_thread(fThread);
674 }
675
676
~DelayedMessageSender()677 DelayedMessageSender::~DelayedMessageSender()
678 {
679 // write the exit message to our port
680 write_port(fPort, kExitMessage, NULL, 0);
681
682 status_t status = B_OK;
683 while (wait_for_thread(fThread, &status) == B_OK);
684
685 // We now know the thread has exited, it is safe to cleanup
686 delete_port(fPort);
687 }
688
689
690 status_t
ScheduleMessage(DelayedMessage & message)691 DelayedMessageSender::ScheduleMessage(DelayedMessage& message)
692 {
693 BAutolock _(fLock);
694
695 // Can we merge with a pending message?
696 ScheduledMessage* pending = NULL;
697 for (int32 index = 0; index < fMessages.CountItems(); ++index) {
698 pending = fMessages.ItemAt(index);
699 if (pending->Merge(message))
700 return B_OK;
701 }
702
703 // Guess not, add it to our list!
704 ScheduledMessage* scheduled = new(std::nothrow) ScheduledMessage(message);
705
706 if (scheduled == NULL)
707 return B_NO_MEMORY;
708
709 if (!scheduled->IsValid()) {
710 delete scheduled;
711 return B_BAD_DATA;
712 }
713
714 if (fMessages.AddItem(scheduled)) {
715 fMessages.SortItems(&CompareMessages);
716 _Wakeup(scheduled->ScheduledTime());
717 return B_OK;
718 }
719
720 return B_ERROR;
721 }
722
723
724 int32
CountDelayedMessages() const725 DelayedMessageSender::CountDelayedMessages() const
726 {
727 BAutolock _(fLock);
728 return fMessages.CountItems();
729 }
730
731
732 int64
CountSentMessages() const733 DelayedMessageSender::CountSentMessages() const
734 {
735 return atomic_get64(&fSentCount);
736 }
737
738
739 void
_MessageLoop()740 DelayedMessageSender::_MessageLoop()
741 {
742 int32 code = -1;
743 status_t status = B_TIMED_OUT;
744 bigtime_t timeout = B_INFINITE_TIMEOUT;
745
746 while (true) {
747 timeout = atomic_get64(&fScheduledWakeup);
748 if (timeout != B_INFINITE_TIMEOUT)
749 timeout -= (system_time() + (DM_MINIMUM_DELAY / 2));
750
751 if (timeout > DM_MINIMUM_DELAY / 4) {
752 status = read_port_etc(fPort, &code, NULL, 0, B_RELATIVE_TIMEOUT,
753 timeout);
754 } else
755 status = B_TIMED_OUT;
756
757 if (status == B_INTERRUPTED)
758 continue;
759
760 if (status == B_TIMED_OUT) {
761 _SendDelayedMessages();
762 continue;
763 }
764
765 if (status == B_OK) {
766 switch (code) {
767 case kWakeupMessage:
768 continue;
769
770 case kExitMessage:
771 return;
772
773 // TODO: trace unhandled messages
774 default:
775 continue;
776 }
777 }
778
779 // port deleted?
780 if (status < B_OK)
781 break;
782 }
783 }
784
785
786 int32
_thread_func(void * sender)787 DelayedMessageSender::_thread_func(void* sender)
788 {
789 (static_cast<DelayedMessageSender*>(sender))->_MessageLoop();
790 return 0;
791 }
792
793
794 //! Sends pending messages, call ONLY from sender thread!
795 int32
_SendDelayedMessages()796 DelayedMessageSender::_SendDelayedMessages()
797 {
798 // avoid sending messages during times of contention
799 if (fLock.LockWithTimeout(30000) != B_OK) {
800 atomic_add64(&fScheduledWakeup, DM_MINIMUM_DELAY);
801 return 0;
802 }
803
804 atomic_set64(&fScheduledWakeup, B_INFINITE_TIMEOUT);
805
806 if (fMessages.CountItems() == 0) {
807 fLock.Unlock();
808 return 0;
809 }
810
811 int32 sent = 0;
812
813 bigtime_t time = system_time() + DM_MINIMUM_DELAY / 2;
814 // capture any that may be on the verge of being sent.
815
816 BObjectList<ScheduledMessage> remove;
817
818 ScheduledMessage* message = NULL;
819 for (int32 index = 0; index < fMessages.CountItems(); ++index) {
820 message = fMessages.ItemAt(index);
821
822 if (message->ScheduledTime() > time) {
823 atomic_set64(&fScheduledWakeup, message->ScheduledTime());
824 break;
825 }
826
827 int32 sendCount = message->SendMessage();
828 if (sendCount > 0)
829 sent += sendCount;
830
831 if (message->CountTargets() == 0)
832 remove.AddItem(message);
833 }
834
835 // remove serviced messages
836 for (int32 index = 0; index < remove.CountItems(); ++index)
837 fMessages.RemoveItem(remove.ItemAt(index));
838
839 atomic_add64(&fSentCount, sent);
840
841 // catch any partly-failed messages (possibly late):
842 if (fMessages.CountItems() > 0
843 && atomic_get64(&fScheduledWakeup) == B_INFINITE_TIMEOUT) {
844
845 fMessages.SortItems(&CompareMessages);
846 message = fMessages.ItemAt(0);
847 bigtime_t timeout = message->ScheduledTime() - time;
848
849 if (timeout < 0)
850 timeout = DM_MINIMUM_DELAY;
851
852 atomic_set64(&fScheduledWakeup, timeout);
853 }
854
855 fLock.Unlock();
856 return sent;
857 }
858
859
860 void
_Wakeup(bigtime_t when)861 DelayedMessageSender::_Wakeup(bigtime_t when)
862 {
863 if (atomic_get64(&fScheduledWakeup) < when
864 && atomic_get(&fWakeupRetry) == 0)
865 return;
866
867 atomic_set64(&fScheduledWakeup, when);
868
869 BPrivate::LinkSender sender(fPort);
870 sender.StartMessage(kWakeupMessage);
871 status_t error = sender.Flush(30000);
872 atomic_set(&fWakeupRetry, (int32)error == B_TIMED_OUT);
873 }
874
875