xref: /haiku/src/servers/registrar/MessageDeliverer.cpp (revision b55a57da7173b9af0432bd3e148d03f06161d036)
1 /*
2  * Copyright 2005, Ingo Weinhold, bonefish@users.sf.net. All rights reserved.
3  * Distributed under the terms of the MIT License.
4  */
5 
6 #include <map>
7 #include <new>
8 #include <set>
9 #include <string.h>
10 
11 #include <AutoDeleter.h>
12 #include <Autolock.h>
13 #include <DataIO.h>
14 #include <MessagePrivate.h>
15 #include <MessengerPrivate.h>
16 #include <OS.h>
17 #include <TokenSpace.h>
18 #include <util/DoublyLinkedList.h>
19 
20 #include <messaging.h>
21 
22 #include "Debug.h"
23 #include "MessageDeliverer.h"
24 #include "Referenceable.h"
25 
26 using std::map;
27 using std::nothrow;
28 using std::set;
29 
30 // sDeliverer -- the singleton instance
31 MessageDeliverer *MessageDeliverer::sDeliverer = NULL;
32 
33 static const bigtime_t	kRetryDelay			= 100000;			// 100 ms
34 
35 // per port sanity limits
36 static const int32		kMaxMessagesPerPort	= 10000;
37 static const int32		kMaxDataPerPort		= 50 * 1024 * 1024;	// 50 MB
38 
39 
40 // MessagingTargetSet
41 
42 // destructor
43 MessagingTargetSet::~MessagingTargetSet()
44 {
45 }
46 
47 
48 // #pragma mark -
49 
50 // DefaultMessagingTargetSet
51 
52 // constructor
53 DefaultMessagingTargetSet::DefaultMessagingTargetSet(
54 		const messaging_target *targets, int32 targetCount)
55 	: MessagingTargetSet(),
56 	  fTargets(targets),
57 	  fTargetCount(targetCount),
58 	  fNextIndex(0)
59 {
60 }
61 
62 // destructor
63 DefaultMessagingTargetSet::~DefaultMessagingTargetSet()
64 {
65 }
66 
67 // HasNext
68 bool
69 DefaultMessagingTargetSet::HasNext() const
70 {
71 	return (fNextIndex < fTargetCount);
72 }
73 
74 // Next
75 bool
76 DefaultMessagingTargetSet::Next(port_id &port, int32 &token)
77 {
78 	if (fNextIndex >= fTargetCount)
79 		return false;
80 
81 	port = fTargets[fNextIndex].port;
82 	token = fTargets[fNextIndex].token;
83 	fNextIndex++;
84 
85 	return true;
86 }
87 
88 // Rewind
89 void
90 DefaultMessagingTargetSet::Rewind()
91 {
92 	fNextIndex = 0;
93 }
94 
95 
96 // #pragma mark -
97 
98 // SingleMessagingTargetSet
99 
100 // constructor
101 SingleMessagingTargetSet::SingleMessagingTargetSet(BMessenger target)
102 	: MessagingTargetSet(),
103 	  fAtBeginning(true)
104 {
105 	BMessenger::Private messengerPrivate(target);
106 	fPort = messengerPrivate.Port();
107 	fToken = (messengerPrivate.IsPreferredTarget()
108 		? B_PREFERRED_TOKEN : messengerPrivate.Token());
109 }
110 
111 // constructor
112 SingleMessagingTargetSet::SingleMessagingTargetSet(port_id port, int32 token)
113 	: MessagingTargetSet(),
114 	  fPort(port),
115 	  fToken(token),
116 	  fAtBeginning(true)
117 {
118 }
119 
120 // destructor
121 SingleMessagingTargetSet::~SingleMessagingTargetSet()
122 {
123 }
124 
125 // HasNext
126 bool
127 SingleMessagingTargetSet::HasNext() const
128 {
129 	return fAtBeginning;
130 }
131 
132 // Next
133 bool
134 SingleMessagingTargetSet::Next(port_id &port, int32 &token)
135 {
136 	if (!fAtBeginning)
137 		return false;
138 
139 	port = fPort;
140 	token = fToken;
141 	fAtBeginning = false;
142 
143 	return true;
144 }
145 
146 // Rewind
147 void
148 SingleMessagingTargetSet::Rewind()
149 {
150 	fAtBeginning = true;
151 }
152 
153 
154 // #pragma mark -
155 
156 // Message
157 /*!	\brief Encapsulates a message to be delivered.
158 
159 	Besides the flattened message it also stores the when the message was
160 	created and when the delivery attempts shall time out.
161 */
162 class MessageDeliverer::Message : public Referenceable {
163 public:
164 	Message(void *data, int32 dataSize, bigtime_t timeout)
165 		: Referenceable(true),
166 		  fData(data),
167 		  fDataSize(dataSize),
168 		  fCreationTime(system_time()),
169 		  fBusy(false)
170 	{
171 		if (B_INFINITE_TIMEOUT - fCreationTime <= timeout)
172 			fTimeoutTime = B_INFINITE_TIMEOUT;
173 		else if (timeout <= 0)
174 			fTimeoutTime = fCreationTime;
175 		else
176 			fTimeoutTime = fCreationTime + timeout;
177 	}
178 
179 	~Message()
180 	{
181 		free(fData);
182 	}
183 
184 	void *Data() const
185 	{
186 		return fData;
187 	}
188 
189 	int32 DataSize() const
190 	{
191 		return fDataSize;
192 	}
193 
194 	bigtime_t CreationTime() const
195 	{
196 		return fCreationTime;
197 	}
198 
199 	bigtime_t TimeoutTime() const
200 	{
201 		return fTimeoutTime;
202 	}
203 
204 	bool HasTimeout() const
205 	{
206 		return (fTimeoutTime < B_INFINITE_TIMEOUT);
207 	}
208 
209 	void SetBusy(bool busy)
210 	{
211 		fBusy = busy;
212 	}
213 
214 	bool IsBusy() const
215 	{
216 		return fBusy;
217 	}
218 
219 private:
220 	void		*fData;
221 	int32		fDataSize;
222 	bigtime_t	fCreationTime;
223 	bigtime_t	fTimeoutTime;
224 	bool		fBusy;
225 };
226 
227 // TargetMessage
228 /*!	\brief Encapsulates a Message to be sent to a specific handler.
229 
230 	A TargetMessage is always associated with (i.e. queued in) a TargetPort.
231 	While a Message stores only the message data and some timing info, this
232 	object adds the token of a the target BHandler.
233 
234 	A Message can be referred to by more than one TargetMessage (when
235 	broadcasting), but a TargetMessage is referred to exactly once, by
236 	the TargetPort.
237 */
238 class MessageDeliverer::TargetMessage
239 	: public DoublyLinkedListLinkImpl<MessageDeliverer::TargetMessage> {
240 public:
241 	TargetMessage(Message *message, int32 token)
242 		: fMessage(message),
243 		  fToken(token)
244 	{
245 		if (fMessage)
246 			fMessage->AddReference();
247 	}
248 
249 	~TargetMessage()
250 	{
251 		if (fMessage)
252 			fMessage->RemoveReference();
253 	}
254 
255 	Message *GetMessage() const
256 	{
257 		return fMessage;
258 	}
259 
260 	int32 Token() const
261 	{
262 		return fToken;
263 	}
264 
265 private:
266 	Message				*fMessage;
267 	int32				fToken;
268 };
269 
270 // TargetMessageHandle
271 /*!	\brief A small wrapper for TargetMessage providing a complete order.
272 
273 	This class only exists to provide the comparison operators required to
274 	put a TargetMessage into a set. The order implemented is by ascending by
275 	timeout time (primary) and by TargetMessage pointer (secondary).
276 	Hence TargetMessageHandles referring to the same TargetMessage are equal
277 	(and only those).
278 */
279 class MessageDeliverer::TargetMessageHandle {
280 public:
281 	TargetMessageHandle(TargetMessage *message)
282 		: fMessage(message)
283 	{
284 	}
285 
286 	TargetMessageHandle(const TargetMessageHandle &other)
287 		: fMessage(other.fMessage)
288 	{
289 	}
290 
291 	TargetMessage *GetMessage() const
292 	{
293 		return fMessage;
294 	}
295 
296 	TargetMessageHandle &operator=(const TargetMessageHandle &other)
297 	{
298 		fMessage = other.fMessage;
299 		return *this;
300 	}
301 
302 	bool operator==(const TargetMessageHandle &other) const
303 	{
304 		return (fMessage == other.fMessage);
305 	}
306 
307 	bool operator!=(const TargetMessageHandle &other) const
308 	{
309 		return (fMessage != other.fMessage);
310 	}
311 
312 	bool operator<(const TargetMessageHandle &other) const
313 	{
314 		bigtime_t timeout = fMessage->GetMessage()->TimeoutTime();
315 		bigtime_t otherTimeout = other.fMessage->GetMessage()->TimeoutTime();
316 		if (timeout < otherTimeout)
317 			return true;
318 		if (timeout > otherTimeout)
319 			return false;
320 		return (fMessage < other.fMessage);
321 	}
322 
323 private:
324 	TargetMessage	*fMessage;
325 };
326 
327 // TargetPort
328 /*!	\brief Represents a full target port, queuing the not yet delivered
329 		   messages.
330 
331 	A TargetPort internally queues TargetMessages in the order the are to be
332 	delivered. Furthermore the object maintains an ordered set of
333 	TargetMessages that can timeout (in ascending order of timeout time), so
334 	that timed out messages can be dropped easily.
335 */
336 class MessageDeliverer::TargetPort {
337 public:
338 	TargetPort(port_id portID)
339 		: fPortID(portID),
340 		  fMessages(),
341 		  fMessageCount(0),
342 		  fMessageSize(0)
343 	{
344 	}
345 
346 	~TargetPort()
347 	{
348 		while (!fMessages.IsEmpty())
349 			PopMessage();
350 	}
351 
352 	port_id PortID() const
353 	{
354 		return fPortID;
355 	}
356 
357 	status_t PushMessage(Message *message, int32 token)
358 	{
359 PRINT(("MessageDeliverer::TargetPort::PushMessage(port: %ld, %p, %ld)\n",
360 fPortID, message, token));
361 		// create a target message
362 		TargetMessage *targetMessage
363 			= new(nothrow) TargetMessage(message, token);
364 		if (!targetMessage)
365 			return B_NO_MEMORY;
366 
367 		// push it
368 		fMessages.Insert(targetMessage);
369 		fMessageCount++;
370 		fMessageSize += targetMessage->GetMessage()->DataSize();
371 
372 		// add it to the timeoutable messages, if it has a timeout
373 		if (message->HasTimeout())
374 			fTimeoutableMessages.insert(targetMessage);
375 
376 		_EnforceLimits();
377 
378 		return B_OK;
379 	}
380 
381 	Message *PeekMessage(int32 &token) const
382 	{
383 		if (!fMessages.Head())
384 			return NULL;
385 
386 		token = fMessages.Head()->Token();
387 		return fMessages.Head()->GetMessage();
388 	}
389 
390 	void PopMessage()
391 	{
392 		if (fMessages.Head()) {
393 PRINT(("MessageDeliverer::TargetPort::PopMessage(): port: %ld, %p\n",
394 fPortID, fMessages.Head()->GetMessage()));
395 			_RemoveMessage(fMessages.Head());
396 		}
397 	}
398 
399 	void DropTimedOutMessages()
400 	{
401 		bigtime_t now = system_time();
402 
403 		while (fTimeoutableMessages.begin() != fTimeoutableMessages.end()) {
404 			TargetMessage *message = fTimeoutableMessages.begin()->GetMessage();
405 			if (message->GetMessage()->TimeoutTime() > now)
406 				break;
407 
408 PRINT(("MessageDeliverer::TargetPort::DropTimedOutMessages(): port: %ld: "
409 "message %p timed out\n", fPortID, message->GetMessage()));
410 			_RemoveMessage(message);
411 		}
412 	}
413 
414 	bool IsEmpty() const
415 	{
416 		return fMessages.IsEmpty();
417 	}
418 
419 private:
420 	void _RemoveMessage(TargetMessage *message)
421 	{
422 		fMessages.Remove(message);
423 		fMessageCount--;
424 		fMessageSize -= message->GetMessage()->DataSize();
425 
426 		if (message->GetMessage()->HasTimeout())
427 			fTimeoutableMessages.erase(message);
428 
429 		delete message;
430 	}
431 
432 	void _EnforceLimits()
433 	{
434 		// message count
435 		while (fMessageCount > kMaxMessagesPerPort) {
436 PRINT(("MessageDeliverer::TargetPort::_EnforceLimits(): port: %ld: hit maximum "
437 "message count limit.\n", fPortID));
438 			PopMessage();
439 		}
440 
441 		// message size
442 		while (fMessageSize > kMaxDataPerPort) {
443 PRINT(("MessageDeliverer::TargetPort::_EnforceLimits(): port: %ld: hit maximum "
444 "message size limit.\n", fPortID));
445 			PopMessage();
446 		}
447 	}
448 
449 	typedef DoublyLinkedList<TargetMessage>	MessageList;
450 
451 	port_id						fPortID;
452 	MessageList					fMessages;
453 	int32						fMessageCount;
454 	int32						fMessageSize;
455 	set<TargetMessageHandle>	fTimeoutableMessages;
456 };
457 
458 // TargetPortMap
459 struct MessageDeliverer::TargetPortMap : public map<port_id, TargetPort*> {
460 };
461 
462 
463 // #pragma mark -
464 
465 /*!	\class MessageDeliverer
466 	\brief Service for delivering messages, which retries the delivery as long
467 		   as the target port is full.
468 
469 	For the user of the service only the MessageDeliverer::DeliverMessage()
470 	will be of interest. Some of them allow broadcasting a message to several
471 	recepients.
472 
473 	The class maintains a TargetPort for each target port which was full at the
474 	time a message was to be delivered to it. A TargetPort has a queue of
475 	undelivered messages. A separate worker thread retries periodically to send
476 	the yet undelivered messages to the respective target ports.
477 */
478 
479 // constructor
480 MessageDeliverer::MessageDeliverer()
481 	: fLock("message deliverer"),
482 	  fTargetPorts(NULL),
483 	  fDelivererThread(-1),
484 	  fTerminating(false)
485 {
486 }
487 
488 // destructor
489 MessageDeliverer::~MessageDeliverer()
490 {
491 	fTerminating = true;
492 
493 	if (fDelivererThread >= 0) {
494 		int32 result;
495 		wait_for_thread(fDelivererThread, &result);
496 	}
497 
498 	delete fTargetPorts;
499 }
500 
501 // Init
502 status_t
503 MessageDeliverer::Init()
504 {
505 	// create the target port map
506 	fTargetPorts = new(nothrow) TargetPortMap;
507 	if (!fTargetPorts)
508 		return B_NO_MEMORY;
509 
510 	// spawn the deliverer thread
511 	fDelivererThread = spawn_thread(MessageDeliverer::_DelivererThreadEntry,
512 		"message deliverer", B_NORMAL_PRIORITY + 1, this);
513 	if (fDelivererThread < 0)
514 		return fDelivererThread;
515 
516 	// resume the deliverer thread
517 	resume_thread(fDelivererThread);
518 
519 	return B_OK;
520 }
521 
522 // CreateDefault
523 status_t
524 MessageDeliverer::CreateDefault()
525 {
526 	if (sDeliverer)
527 		return B_OK;
528 
529 	// create the deliverer
530 	MessageDeliverer *deliverer = new(nothrow) MessageDeliverer;
531 	if (!deliverer)
532 		return B_NO_MEMORY;
533 
534 	// init it
535 	status_t error = deliverer->Init();
536 	if (error != B_OK) {
537 		delete deliverer;
538 		return error;
539 	}
540 
541 	sDeliverer = deliverer;
542 	return B_OK;
543 }
544 
545 // DeleteDefault
546 void
547 MessageDeliverer::DeleteDefault()
548 {
549 	if (sDeliverer) {
550 		delete sDeliverer;
551 		sDeliverer = NULL;
552 	}
553 }
554 
555 // Default
556 MessageDeliverer *
557 MessageDeliverer::Default()
558 {
559 	return sDeliverer;
560 }
561 
562 // DeliverMessage
563 /*!	\brief Delivers a message to the supplied target.
564 
565 	The method tries to send the message right now (if there are not already
566 	messages pending for the target port). If that fails due to a full target
567 	port, the message is queued for later delivery.
568 
569 	\param message The message to be delivered.
570 	\param target A BMessenger identifying the delivery target.
571 	\param timeout If given, the message will be dropped, when it couldn't be
572 		   delivered after this amount of microseconds.
573 	\return
574 	- \c B_OK, if sending the message succeeded or if the target port was
575 	  full and the message has been queued,
576 	- another error code otherwise.
577 */
578 status_t
579 MessageDeliverer::DeliverMessage(BMessage *message, BMessenger target,
580 	bigtime_t timeout)
581 {
582 	SingleMessagingTargetSet set(target);
583 	return DeliverMessage(message, set, timeout);
584 }
585 
586 // DeliverMessage
587 /*!	\brief Delivers a message to the supplied targets.
588 
589 	The method tries to send the message right now to each of the given targets
590 	(if there are not already messages pending for a target port). If that
591 	fails due to a full target port, the message is queued for later delivery.
592 
593 	\param message The message to be delivered.
594 	\param targets MessagingTargetSet providing the the delivery targets.
595 	\param timeout If given, the message will be dropped, when it couldn't be
596 		   delivered after this amount of microseconds.
597 	\return
598 	- \c B_OK, if for each of the given targets sending the message succeeded
599 	  or if the target port was full and the message has been queued,
600 	- another error code otherwise.
601 */
602 status_t
603 MessageDeliverer::DeliverMessage(BMessage *message, MessagingTargetSet &targets,
604 	bigtime_t timeout)
605 {
606 	if (!message)
607 		return B_BAD_VALUE;
608 
609 	// flatten the message
610 	BMallocIO mallocIO;
611 	status_t error = message->Flatten(&mallocIO, NULL);
612 	if (error < B_OK)
613 		return error;
614 
615 	return DeliverMessage(mallocIO.Buffer(), mallocIO.BufferLength(), targets,
616 		timeout);
617 }
618 
619 // DeliverMessage
620 /*!	\brief Delivers a flattened message to the supplied targets.
621 
622 	The method tries to send the message right now to each of the given targets
623 	(if there are not already messages pending for a target port). If that
624 	fails due to a full target port, the message is queued for later delivery.
625 
626 	\param message The flattened message to be delivered. This may be a
627 		   flattened BMessage or KMessage.
628 	\param messageSize The size of the flattened message buffer.
629 	\param targets MessagingTargetSet providing the the delivery targets.
630 	\param timeout If given, the message will be dropped, when it couldn't be
631 		   delivered after this amount of microseconds.
632 	\return
633 	- \c B_OK, if for each of the given targets sending the message succeeded
634 	  or if the target port was full and the message has been queued,
635 	- another error code otherwise.
636 */
637 status_t
638 MessageDeliverer::DeliverMessage(const void *messageData, int32 messageSize,
639 	MessagingTargetSet &targets, bigtime_t timeout)
640 {
641 	if (!messageData || messageSize <= 0)
642 		return B_BAD_VALUE;
643 
644 	// clone the buffer
645 	void *data = malloc(messageSize);
646 	if (!data)
647 		return B_NO_MEMORY;
648 	memcpy(data, messageData, messageSize);
649 
650 	// create a Message
651 	Message *message = new(nothrow) Message(data, messageSize, timeout);
652 	if (!message) {
653 		free(data);
654 		return B_NO_MEMORY;
655 	}
656 	Reference<Message> _(message, true);
657 
658 	// add the message to the respective target ports
659 	BAutolock locker(fLock);
660 	for (int32 targetIndex = 0; targets.HasNext(); targetIndex++) {
661 		port_id portID;
662 		int32 token;
663 		targets.Next(portID, token);
664 
665 		// get the target port
666 		TargetPort *port = _GetTargetPort(portID, true);
667 		if (!port)
668 			return B_NO_MEMORY;
669 
670 		// try sending the message, if there are no queued messages yet
671 		if (port->IsEmpty()) {
672 			status_t error = _SendMessage(message, portID, token);
673 			// if the message was delivered OK, we're done with the target
674 			if (error == B_OK) {
675 				_PutTargetPort(port);
676 				continue;
677 			}
678 
679 			// if the port is not full, but an error occurred, we skip this target
680 			if (error != B_WOULD_BLOCK) {
681 				_PutTargetPort(port);
682 				if (targetIndex == 0 && !targets.HasNext())
683 					return error;
684 				continue;
685 			}
686 		}
687 
688 		// add the message
689 		status_t error = port->PushMessage(message, token);
690 		_PutTargetPort(port);
691 		if (error != B_OK)
692 			return error;
693 	}
694 
695 	return B_OK;
696 }
697 
698 // _GetTargetPort
699 MessageDeliverer::TargetPort *
700 MessageDeliverer::_GetTargetPort(port_id portID, bool create)
701 {
702 	// get the port from the map
703 	TargetPortMap::iterator it = fTargetPorts->find(portID);
704 	if (it != fTargetPorts->end())
705 		return it->second;
706 
707 	if (!create)
708 		return NULL;
709 
710 	// create a port
711 	TargetPort *port = new(nothrow) TargetPort(portID);
712 	if (!port)
713 		return NULL;
714 	(*fTargetPorts)[portID] = port;
715 
716 	return port;
717 }
718 
719 // _PutTargetPort
720 void
721 MessageDeliverer::_PutTargetPort(TargetPort *port)
722 {
723 	if (!port)
724 		return;
725 
726 	if (port->IsEmpty()) {
727 		fTargetPorts->erase(port->PortID());
728 		delete port;
729 	}
730 }
731 
732 // _SendMessage
733 status_t
734 MessageDeliverer::_SendMessage(Message *message, port_id portID, int32 token)
735 {
736 	status_t error = BMessage::Private::SendFlattenedMessage(message->Data(),
737 		message->DataSize(), portID, token, 0);
738 //PRINT(("MessageDeliverer::_SendMessage(%p, port: %ld, token: %ld): %lx\n",
739 //message, portID, token, error));
740 	return error;
741 }
742 
743 // _DelivererThreadEntry
744 int32
745 MessageDeliverer::_DelivererThreadEntry(void *data)
746 {
747 	return ((MessageDeliverer*)data)->_DelivererThread();
748 }
749 
750 // _DelivererThread
751 int32
752 MessageDeliverer::_DelivererThread()
753 {
754 	while (!fTerminating) {
755 		snooze(kRetryDelay);
756 		if (fTerminating)
757 			break;
758 
759 		// iterate through all target ports and try sending the messages
760 		BAutolock _(fLock);
761 		for (TargetPortMap::iterator it = fTargetPorts->begin();
762 			 it != fTargetPorts->end();) {
763 			TargetPort *port = it->second;
764 			bool portError = false;
765 
766 			port->DropTimedOutMessages();
767 
768 			// try sending all messages
769 			int32 token;
770 			while (Message *message = port->PeekMessage(token)) {
771 				status_t error = B_OK;
772 //				if (message->TimeoutTime() > system_time()) {
773 					error = _SendMessage(message, port->PortID(), token);
774 //				} else {
775 //					// timeout, drop message
776 //					PRINT(("MessageDeliverer::_DelivererThread(): port %ld, "
777 //						"message %p timed out\n", port->PortID(), message));
778 //				}
779 
780 				if (error == B_OK) {
781 					port->PopMessage();
782 				} else if (error == B_WOULD_BLOCK) {
783 					// no luck yet -- port is still full
784 					break;
785 				} else {
786 					// unexpected error -- probably the port is gone
787 					portError = true;
788 					break;
789 				}
790 			}
791 
792 			// next port
793 			if (portError || port->IsEmpty()) {
794 				TargetPortMap::iterator oldIt = it;
795 				++it;
796 				delete port;
797 				fTargetPorts->erase(oldIt);
798 			} else
799 				++it;
800 		}
801 	}
802 
803 	return 0;
804 }
805