xref: /haiku/src/servers/app/DelayedMessage.cpp (revision 1a76488fc88584bf66b9751d7fb9b6527ac20d87)
1 /*
2  * Copyright 2015, Haiku.
3  * Distributed under the terms of the MIT License.
4  *
5  * Authors:
6  *			Joseph Groover <looncraz@looncraz.net>
7 */
8 
9 
10 #include "DelayedMessage.h"
11 
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <string.h>
15 
16 #include <Autolock.h>
17 #include <String.h>
18 
19 #include <LinkSender.h>
20 #include <ServerProtocol.h>
21 
22 
23 // DelayedMessageSender constants
24 static const int32 kWakeupMessage = AS_LAST_CODE + 2048;
25 static const int32 kExitMessage = kWakeupMessage + 1;
26 
27 static const char* kName = "DMT is here for you, eventually...";
28 static int32 kPriority = B_URGENT_DISPLAY_PRIORITY;
29 static int32 kPortCapacity = 10;
30 
31 
32 //! Data attachment structure.
33 struct Attachment {
34 								Attachment(const void* data, size_t size);
35 								~Attachment();
36 
37 			const void*			constData;
38 			void*				data;
39 			size_t				size;
40 };
41 
42 
43 typedef BObjectList<Attachment> AttachmentList;
44 
45 
46 /*!	\class ScheduledMessage
47 	\brief Responsible for sending of delayed message.
48 */
49 class ScheduledMessage {
50 public:
51 								ScheduledMessage(DelayedMessage& message);
52 								~ScheduledMessage();
53 
54 			int32				CountTargets() const;
55 
56 			void				Finalize();
57 			bigtime_t			ScheduledTime() const;
58 			int32				SendMessage();
59 			bool				IsValid() const;
60 			bool				Merge(DelayedMessage& message);
61 
62 			status_t			SendMessageToPort(port_id port);
63 			bool 				operator<(const ScheduledMessage& other) const;
64 
65 			DelayedMessageData*	fData;
66 };
67 
68 
69 /*!	\class DelayedMessageSender DelayedMessageSender.h
70 	\brief Responsible for scheduling and sending of delayed messages
71 */
72 class DelayedMessageSender {
73 public:
74 			explicit			DelayedMessageSender();
75 								~DelayedMessageSender();
76 
77 			status_t			ScheduleMessage	(DelayedMessage& message);
78 
79 			int32				CountDelayedMessages() const;
80 			int64				CountSentMessages() const;
81 
82 private:
83 			void				_MessageLoop();
84 			int32				_SendDelayedMessages();
85 	static	int32				_thread_func(void* sender);
86 			void				_Wakeup(bigtime_t whatTime);
87 
88 private:
89 	typedef BObjectList<ScheduledMessage> ScheduledList;
90 
91 	mutable	BLocker				fLock;
92 			ScheduledList		fMessages;
93 
94 			bigtime_t			fScheduledWakeup;
95 
96 			int32				fWakeupRetry;
97 			thread_id			fThread;
98 			port_id				fPort;
99 
100 	mutable	int64				fSentCount;
101 };
102 
103 
104 DelayedMessageSender gDelayedMessageSender;
105 
106 
107 /*!	\class DelayedMessageData DelayedMessageSender.h
108 	\brief Owns DelayedMessage data, allocates memory and copies data only
109 			when needed,
110 */
111 class DelayedMessageData {
112 	typedef BObjectList<port_id> PortList;
113 	typedef void(*FailureCallback)(int32 code, port_id port, void* data);
114 public:
115 								DelayedMessageData(int32 code, bigtime_t delay,
116 									bool isSpecificTime);
117 								~DelayedMessageData();
118 
119 			bool				AddTarget(port_id port);
120 			void				RemoveTarget(port_id port);
121 			int32				CountTargets() const;
122 
123 			void				MergeTargets(DelayedMessageData* other);
124 
125 			bool				CopyData();
126 			bool				MergeData(DelayedMessageData* other);
127 
128 			bool				IsValid() const;
129 				// Only valid after a successful CopyData().
130 
131 			status_t			Attach(const void* data, size_t size);
132 
133 			bool				Compare(Attachment* one, Attachment* two,
134 									int32 index);
135 
136 			void				SetMerge(DMMergeMode mode, uint32 mask);
137 			void				SendFailed(port_id port);
138 
139 			void				SetFailureCallback(FailureCallback callback,
140 									void* data);
141 
142 			// Accessors.
143 			int32&				Code() {return fCode;}
144 			const int32&		Code() const {return fCode;}
145 
146 			bigtime_t&			ScheduledTime() {return fScheduledTime;}
147 			const bigtime_t&	ScheduledTime() const {return fScheduledTime;}
148 
149 			AttachmentList&		Attachments() {return fAttachments;}
150 			const AttachmentList&	Attachments() const {return fAttachments;}
151 
152 			PortList&			Targets() {return fTargets;}
153 			const PortList&		Targets() const {return fTargets;}
154 
155 private:
156 		// Data members.
157 
158 			int32				fCode;
159 			bigtime_t			fScheduledTime;
160 			bool				fValid;
161 
162 			AttachmentList		fAttachments;
163 			PortList			fTargets;
164 
165 			DMMergeMode			fMergeMode;
166 			uint32				fMergeMask;
167 
168 			FailureCallback		fFailureCallback;
169 			void*				fFailureData;
170 };
171 
172 
173 // #pragma mark -
174 
175 
176 
177 DelayedMessage::DelayedMessage(int32 code, bigtime_t delay,
178 		bool isSpecificTime)
179 	:
180 	fData(new(std::nothrow) DelayedMessageData(code, delay < DM_MINIMUM_DELAY
181 		? DM_MINIMUM_DELAY : delay, isSpecificTime)),
182 	fHandedOff(false)
183 {
184 }
185 
186 
187 DelayedMessage::~DelayedMessage()
188 {
189 	// Message is canceled without a handoff.
190 	if (!fHandedOff)
191 		delete fData;
192 }
193 
194 
195 bool
196 DelayedMessage::AddTarget(port_id port)
197 {
198 	if (fData == NULL || fHandedOff)
199 		return false;
200 
201 	return fData->AddTarget(port);
202 }
203 
204 
205 void
206 DelayedMessage::SetMerge(DMMergeMode mode, uint32 match)
207 {
208 	if (fData == NULL || fHandedOff)
209 		return;
210 
211 	fData->SetMerge(mode, match);
212 }
213 
214 
215 void
216 DelayedMessage::SetFailureCallback(void (*callback)(int32, port_id, void*),
217 	void* data)
218 {
219 	if (fData == NULL || fHandedOff)
220 		return;
221 
222 	fData->SetFailureCallback(callback, data);
223 }
224 
225 
226 //! Attach data to message. Memory is not allocated nor copied until handoff.
227 status_t
228 DelayedMessage::Attach(const void* data, size_t size)
229 {
230 	if (fData == NULL)
231 		return B_NO_MEMORY;
232 
233 	if (fHandedOff)
234 		return B_ERROR;
235 
236 	if (data == NULL || size == 0)
237 		return B_BAD_VALUE;
238 
239 	return	fData->Attach(data, size);
240 }
241 
242 
243 status_t
244 DelayedMessage::Flush()
245 {
246 	if (fData == NULL)
247 		return B_NO_MEMORY;
248 
249 	if (fHandedOff)
250 		return B_ERROR;
251 
252 	if (fData->CountTargets() == 0)
253 		return B_BAD_VALUE;
254 
255 	return gDelayedMessageSender.ScheduleMessage(*this);
256 }
257 
258 
259 /*!	The data handoff occurs upon scheduling and reduces copies to only
260 	when a message is actually scheduled. Canceled messages have low cost.
261 */
262 DelayedMessageData*
263 DelayedMessage::HandOff()
264 {
265 	if (fData == NULL || fHandedOff)
266 		return NULL;
267 
268 	if (fData->CopyData()) {
269 		fHandedOff = true;
270 		return fData;
271 	}
272 
273 	return NULL;
274 }
275 
276 
277 // #pragma mark -
278 
279 
280 Attachment::Attachment(const void* _data, size_t _size)
281 	:
282 	constData(_data),
283 	data(NULL),
284 	size(_size)
285 {
286 }
287 
288 
289 Attachment::~Attachment()
290 {
291 	free(data);
292 }
293 
294 
295 // #pragma mark -
296 
297 
298 DelayedMessageData::DelayedMessageData(int32 code, bigtime_t delay,
299 	bool isSpecificTime)
300 	:
301 	fCode(code),
302 	fScheduledTime(delay + (isSpecificTime ? 0 : system_time())),
303 	fValid(false),
304 
305 	fAttachments(3, true),
306 	fTargets(4, true),
307 
308 	fMergeMode(DM_NO_MERGE),
309 	fMergeMask(DM_DATA_DEFAULT),
310 
311 	fFailureCallback(NULL),
312 	fFailureData(NULL)
313 {
314 }
315 
316 
317 DelayedMessageData::~DelayedMessageData()
318 {
319 }
320 
321 
322 bool
323 DelayedMessageData::AddTarget(port_id port)
324 {
325 	if (port <= 0)
326 		return false;
327 
328 	// check for duplicates:
329 	for (int32 index = 0; index < fTargets.CountItems(); ++index) {
330 		if (port == *fTargets.ItemAt(index))
331 			return false;
332 	}
333 
334 	return fTargets.AddItem(new(std::nothrow) port_id(port));
335 }
336 
337 
338 void
339 DelayedMessageData::RemoveTarget(port_id port)
340 {
341 	if (port == B_BAD_PORT_ID)
342 		return;
343 
344 	// Search for a match by value.
345 	for (int32 index = 0; index < fTargets.CountItems(); ++index) {
346 		port_id* target = fTargets.ItemAt(index);
347 		if (port == *target) {
348 			fTargets.RemoveItem(target, true);
349 			return;
350 		}
351 	}
352 }
353 
354 
355 int32
356 DelayedMessageData::CountTargets() const
357 {
358 	return fTargets.CountItems();
359 }
360 
361 
362 void
363 DelayedMessageData::MergeTargets(DelayedMessageData* other)
364 {
365 	// Failure to add one target does not abort the loop!
366 	// It could just mean we already have the target.
367 	for (int32 index = 0; index < other->fTargets.CountItems(); ++index)
368 		AddTarget(*(other->fTargets.ItemAt(index)));
369 }
370 
371 
372 //! Copy data from original location - merging failed
373 bool
374 DelayedMessageData::CopyData()
375 {
376 	Attachment* attached = NULL;
377 
378 	for (int32 index = 0; index < fAttachments.CountItems(); ++index) {
379 		attached = fAttachments.ItemAt(index);
380 
381 		if (attached == NULL || attached->data != NULL)
382 			return false;
383 
384 		attached->data = malloc(attached->size);
385 		if (attached->data == NULL)
386 			return false;
387 
388 		memcpy(attached->data, attached->constData, attached->size);
389 	}
390 
391 	fValid = true;
392 	return true;
393 }
394 
395 
396 bool
397 DelayedMessageData::MergeData(DelayedMessageData* other)
398 {
399 	if (!fValid
400 		|| other == NULL
401 		|| other->fCode != fCode
402 		|| fMergeMode == DM_NO_MERGE
403 		|| other->fMergeMode == DM_NO_MERGE
404 		|| other->fMergeMode != fMergeMode
405 		|| other->fAttachments.CountItems() != fAttachments.CountItems())
406 		return false;
407 
408 	if (other->fMergeMode == DM_MERGE_CANCEL) {
409 		MergeTargets(other);
410 		return true;
411 	}
412 
413 	// Compare data
414 	Attachment* attached = NULL;
415 	Attachment* otherAttached = NULL;
416 
417 	for (int32 index = 0; index < fAttachments.CountItems(); ++index) {
418 		attached = fAttachments.ItemAt(index);
419 		otherAttached = other->fAttachments.ItemAt(index);
420 
421 		if (attached == NULL
422 			|| otherAttached == NULL
423 			|| attached->data == NULL
424 			|| otherAttached->constData == NULL
425 			|| attached->size != otherAttached->size)
426 			return false;
427 
428 		// Compares depending upon mode & flags
429 		if (!Compare(attached, otherAttached, index))
430 			return false;
431 	}
432 
433 	// add any targets not included in the existing message!
434 	MergeTargets(other);
435 
436 	// since these are duplicates, we need not copy anything...
437 	if (fMergeMode == DM_MERGE_DUPLICATES)
438 		return true;
439 
440 	// DM_MERGE_REPLACE:
441 
442 	// Import the new data!
443 	for (int32 index = 0; index < fAttachments.CountItems(); ++index) {
444 		attached = fAttachments.ItemAt(index);
445 		otherAttached = other->fAttachments.ItemAt(index);
446 
447 		// We already have allocated our memory, but the other data
448 		// has not.  So this reduces memory allocations.
449 		memcpy(attached->data, otherAttached->constData, attached->size);
450 	}
451 
452 	return true;
453 }
454 
455 
456 bool
457 DelayedMessageData::IsValid() const
458 {
459 	return fValid;
460 }
461 
462 
463 status_t
464 DelayedMessageData::Attach(const void* data, size_t size)
465 {
466 	// Sanity checking already performed
467 	Attachment* attach = new(std::nothrow) Attachment(data, size);
468 
469 	if (attach == NULL)
470 		return B_NO_MEMORY;
471 
472 	if (fAttachments.AddItem(attach) == false) {
473 		delete attach;
474 		return B_ERROR;
475 	}
476 
477 	return B_OK;
478 }
479 
480 
481 bool
482 DelayedMessageData::Compare(Attachment* one, Attachment* two, int32 index)
483 {
484 	if (fMergeMode == DM_MERGE_DUPLICATES) {
485 
486 		// Default-policy: all data must match
487 		if (fMergeMask == DM_DATA_DEFAULT || (fMergeMask & 1 << index) != 0)
488 			return memcmp(one->data, two->constData, one->size) == 0;
489 
490 	} else if (fMergeMode == DM_MERGE_REPLACE) {
491 
492 		// Default Policy: no data needs to match
493 		if (fMergeMask != DM_DATA_DEFAULT && (fMergeMask & 1 << index) != 0)
494 			return memcmp(one->data, two->constData, one->size) == 0;
495 	}
496 
497 	return true;
498 }
499 
500 
501 void
502 DelayedMessageData::SetMerge(DMMergeMode mode, uint32 mask)
503 {
504 	fMergeMode = mode;
505 	fMergeMask = mask;
506 }
507 
508 
509 void
510 DelayedMessageData::SendFailed(port_id port)
511 {
512 	if (fFailureCallback != NULL)
513 		fFailureCallback(fCode, port, fFailureData);
514 }
515 
516 
517 void
518 DelayedMessageData::SetFailureCallback(FailureCallback callback, void* data)
519 {
520 	fFailureCallback = callback;
521 	fFailureData = data;
522 }
523 
524 
525 // #pragma mark -
526 
527 
528 ScheduledMessage::ScheduledMessage(DelayedMessage& message)
529 	:
530 	fData(message.HandOff())
531 {
532 }
533 
534 
535 ScheduledMessage::~ScheduledMessage()
536 {
537 	delete fData;
538 }
539 
540 
541 int32
542 ScheduledMessage::CountTargets() const
543 {
544 	if (fData == NULL)
545 		return 0;
546 
547 	return fData->CountTargets();
548 }
549 
550 
551 bigtime_t
552 ScheduledMessage::ScheduledTime() const
553 {
554 	if (fData == NULL)
555 		return 0;
556 
557 	return fData->ScheduledTime();
558 }
559 
560 
561 //! Send our message and data to their intended target(s)
562 int32
563 ScheduledMessage::SendMessage()
564 {
565 	if (fData == NULL || !fData->IsValid())
566 		return 0;
567 
568 	int32 sent = 0;
569 	for (int32 index = 0; index < fData->Targets().CountItems(); ++index) {
570 		port_id port = *(fData->Targets().ItemAt(index));
571 		status_t error = SendMessageToPort(port);
572 
573 		if (error == B_OK) {
574 			++sent;
575 			continue;
576 		}
577 
578 		if (error != B_TIMED_OUT)
579 			fData->SendFailed(port);
580 	}
581 
582 	return sent;
583 }
584 
585 
586 status_t
587 ScheduledMessage::SendMessageToPort(port_id port)
588 {
589 	if (fData == NULL || !fData->IsValid())
590 		return B_BAD_DATA;
591 
592 	if (port == B_BAD_PORT_ID)
593 		return B_BAD_VALUE;
594 
595 	BPrivate::LinkSender sender(port);
596 	if (sender.StartMessage(fData->Code()) != B_OK)
597 		return B_ERROR;
598 
599 	AttachmentList& list = fData->Attachments();
600 	Attachment* attached = NULL;
601 	status_t error = B_OK;
602 
603 	// The data has been checked already, so we assume it is all good
604 	for (int32 index = 0; index < list.CountItems(); ++index) {
605 		attached = list.ItemAt(index);
606 
607 		error = sender.Attach(attached->data, attached->size);
608 		if (error != B_OK) {
609 			sender.CancelMessage();
610 			return error;
611 		}
612 	}
613 
614 	// We do not want to ever hold up the sender thread for too long, we
615 	// set a 1 second sending delay, which should be more than enough for
616 	// 99.992% of all cases.  Approximately.
617 	error = sender.Flush(1000000);
618 
619 	if (error == B_OK || error == B_BAD_PORT_ID)
620 		fData->RemoveTarget(port);
621 
622 	return error;
623 }
624 
625 
626 bool
627 ScheduledMessage::IsValid() const
628 {
629 	return fData != NULL && fData->IsValid();
630 }
631 
632 
633 bool
634 ScheduledMessage::Merge(DelayedMessage& other)
635 {
636 	if (!IsValid())
637 		return false;
638 
639 	return fData->MergeData(other.Data());
640 }
641 
642 
643 bool
644 ScheduledMessage::operator<(const ScheduledMessage& other) const
645 {
646 	if (!IsValid() || !other.IsValid())
647 		return false;
648 
649 	return fData->ScheduledTime() < other.fData->ScheduledTime();
650 }
651 
652 
653 int
654 CompareMessages(const ScheduledMessage* one, const ScheduledMessage* two)
655 {
656 	return *one < *two;
657 }
658 
659 
660 // #pragma mark -
661 
662 
663 DelayedMessageSender::DelayedMessageSender()
664 	:
665 	fLock("DelayedMessageSender"),
666 	fMessages(20, true),
667 	fScheduledWakeup(B_INFINITE_TIMEOUT),
668 	fWakeupRetry(0),
669 	fThread(spawn_thread(&_thread_func, kName, kPriority, this)),
670 	fPort(create_port(kPortCapacity, "DelayedMessageSender")),
671 	fSentCount(0)
672 {
673 	resume_thread(fThread);
674 }
675 
676 
677 DelayedMessageSender::~DelayedMessageSender()
678 {
679 	// write the exit message to our port
680 	write_port(fPort, kExitMessage, NULL, 0);
681 
682 	status_t status = B_OK;
683 	while (wait_for_thread(fThread, &status) == B_OK);
684 
685 	// We now know the thread has exited, it is safe to cleanup
686 	delete_port(fPort);
687 }
688 
689 
690 status_t
691 DelayedMessageSender::ScheduleMessage(DelayedMessage& message)
692 {
693 	BAutolock _(fLock);
694 
695 	// Can we merge with a pending message?
696 	ScheduledMessage* pending = NULL;
697 	for (int32 index = 0; index < fMessages.CountItems(); ++index) {
698 		pending = fMessages.ItemAt(index);
699 		if (pending->Merge(message))
700 			return B_OK;
701 	}
702 
703 	// Guess not, add it to our list!
704 	ScheduledMessage* scheduled = new(std::nothrow) ScheduledMessage(message);
705 
706 	if (scheduled == NULL)
707 		return B_NO_MEMORY;
708 
709 	if (!scheduled->IsValid()) {
710 		delete scheduled;
711 		return B_BAD_DATA;
712 	}
713 
714 	if (fMessages.AddItem(scheduled)) {
715 		fMessages.SortItems(&CompareMessages);
716 		_Wakeup(scheduled->ScheduledTime());
717 		return B_OK;
718 	}
719 
720 	return B_ERROR;
721 }
722 
723 
724 int32
725 DelayedMessageSender::CountDelayedMessages() const
726 {
727 	BAutolock _(fLock);
728 	return fMessages.CountItems();
729 }
730 
731 
732 int64
733 DelayedMessageSender::CountSentMessages() const
734 {
735 	return atomic_get64(&fSentCount);
736 }
737 
738 
739 void
740 DelayedMessageSender::_MessageLoop()
741 {
742 	int32 code = -1;
743 	status_t status = B_TIMED_OUT;
744 	bigtime_t timeout = B_INFINITE_TIMEOUT;
745 
746 	while (true) {
747 		timeout = atomic_get64(&fScheduledWakeup) - (system_time()
748 			+ (DM_MINIMUM_DELAY / 2));
749 
750 		if (timeout > DM_MINIMUM_DELAY / 4) {
751 			status = read_port_etc(fPort, &code, NULL, 0, B_RELATIVE_TIMEOUT,
752 				timeout);
753 		} else
754 			status = B_TIMED_OUT;
755 
756 		if (status == B_INTERRUPTED)
757 			continue;
758 
759 		if (status == B_TIMED_OUT) {
760 			_SendDelayedMessages();
761 			continue;
762 		}
763 
764 		if (status == B_OK) {
765 			switch (code) {
766 				case kWakeupMessage:
767 					continue;
768 
769 				case kExitMessage:
770 					return;
771 
772 				// TODO: trace unhandled messages
773 				default:
774 					continue;
775 			}
776 		}
777 
778 		// port deleted?
779 		if (status < B_OK)
780 			break;
781 	}
782 }
783 
784 
785 int32
786 DelayedMessageSender::_thread_func(void* sender)
787 {
788 	(static_cast<DelayedMessageSender*>(sender))->_MessageLoop();
789 	return 0;
790 }
791 
792 
793 //! Sends pending messages, call ONLY from sender thread!
794 int32
795 DelayedMessageSender::_SendDelayedMessages()
796 {
797 	// avoid sending messages during times of contention
798 	if (fLock.LockWithTimeout(30000) != B_OK) {
799 		atomic_add64(&fScheduledWakeup, DM_MINIMUM_DELAY);
800 		return 0;
801 	}
802 
803 	atomic_set64(&fScheduledWakeup, B_INFINITE_TIMEOUT);
804 
805 	if (fMessages.CountItems() == 0) {
806 		fLock.Unlock();
807 		return 0;
808 	}
809 
810 	int32 sent = 0;
811 
812 	bigtime_t time = system_time() + DM_MINIMUM_DELAY / 2;
813 		// capture any that may be on the verge of being sent.
814 
815 	BObjectList<ScheduledMessage> remove;
816 
817 	ScheduledMessage* message = NULL;
818 	for (int32 index = 0; index < fMessages.CountItems(); ++index) {
819 		message = fMessages.ItemAt(index);
820 
821 		if (message->ScheduledTime() > time) {
822 			atomic_set64(&fScheduledWakeup, message->ScheduledTime());
823 			break;
824 		}
825 
826 		int32 sendCount = message->SendMessage();
827 		if (sendCount > 0)
828 			sent += sendCount;
829 
830 		if (message->CountTargets() == 0)
831 			remove.AddItem(message);
832 	}
833 
834 	// remove serviced messages
835 	for (int32 index = 0; index < remove.CountItems(); ++index)
836 		fMessages.RemoveItem(remove.ItemAt(index));
837 
838 	atomic_add64(&fSentCount, sent);
839 
840 	// catch any partly-failed messages (possibly late):
841 	if (fMessages.CountItems() > 0
842 		&& atomic_get64(&fScheduledWakeup) == B_INFINITE_TIMEOUT) {
843 
844 		fMessages.SortItems(&CompareMessages);
845 		message = fMessages.ItemAt(0);
846 		bigtime_t timeout = message->ScheduledTime() - time;
847 
848 		if (timeout < 0)
849 			timeout = DM_MINIMUM_DELAY;
850 
851 		atomic_set64(&fScheduledWakeup, timeout);
852 	}
853 
854 	fLock.Unlock();
855 	return sent;
856 }
857 
858 
859 void
860 DelayedMessageSender::_Wakeup(bigtime_t when)
861 {
862 	if (atomic_get64(&fScheduledWakeup) < when
863 		&& atomic_get(&fWakeupRetry) == 0)
864 		return;
865 
866 	atomic_set64(&fScheduledWakeup, when);
867 
868 	BPrivate::LinkSender sender(fPort);
869 	sender.StartMessage(kWakeupMessage);
870 	status_t error = sender.Flush(30000);
871 	atomic_set(&fWakeupRetry, (int32)error == B_TIMED_OUT);
872 }
873 
874