xref: /haiku/src/servers/registrar/MessageDeliverer.cpp (revision ed6250c95736c0b55da79d6e9dd01369532260c0)
1 /*
2  * Copyright 2005, Ingo Weinhold, bonefish@users.sf.net. All rights reserved.
3  * Distributed under the terms of the MIT License.
4  */
5 
6 #include <map>
7 #include <new>
8 #include <set>
9 
10 #include <AutoDeleter.h>
11 #include <Autolock.h>
12 #include <DataIO.h>
13 #include <MessagePrivate.h>
14 #include <MessengerPrivate.h>
15 #include <OS.h>
16 #include <TokenSpace.h>
17 #include <util/DoublyLinkedList.h>
18 
19 #include <messaging.h>
20 
21 #include "Debug.h"
22 #include "MessageDeliverer.h"
23 #include "Referenceable.h"
24 
25 using std::map;
26 using std::nothrow;
27 using std::set;
28 
29 // sDeliverer -- the singleton instance
30 MessageDeliverer *MessageDeliverer::sDeliverer = NULL;
31 
32 static const bigtime_t	kRetryDelay			= 100000;			// 100 ms
33 
34 // per port sanity limits
35 static const int32		kMaxMessagesPerPort	= 10000;
36 static const int32		kMaxDataPerPort		= 50 * 1024 * 1024;	// 50 MB
37 
38 
39 // MessagingTargetSet
40 
41 // destructor
42 MessagingTargetSet::~MessagingTargetSet()
43 {
44 }
45 
46 
47 // #pragma mark -
48 
49 // DefaultMessagingTargetSet
50 
51 // constructor
52 DefaultMessagingTargetSet::DefaultMessagingTargetSet(
53 		const messaging_target *targets, int32 targetCount)
54 	: MessagingTargetSet(),
55 	  fTargets(targets),
56 	  fTargetCount(targetCount),
57 	  fNextIndex(0)
58 {
59 }
60 
61 // destructor
62 DefaultMessagingTargetSet::~DefaultMessagingTargetSet()
63 {
64 }
65 
66 // HasNext
67 bool
68 DefaultMessagingTargetSet::HasNext() const
69 {
70 	return (fNextIndex < fTargetCount);
71 }
72 
73 // Next
74 bool
75 DefaultMessagingTargetSet::Next(port_id &port, int32 &token)
76 {
77 	if (fNextIndex >= fTargetCount)
78 		return false;
79 
80 	port = fTargets[fNextIndex].port;
81 	token = fTargets[fNextIndex].token;
82 	fNextIndex++;
83 
84 	return true;
85 }
86 
87 // Rewind
88 void
89 DefaultMessagingTargetSet::Rewind()
90 {
91 	fNextIndex = 0;
92 }
93 
94 
95 // #pragma mark -
96 
97 // SingleMessagingTargetSet
98 
99 // constructor
100 SingleMessagingTargetSet::SingleMessagingTargetSet(BMessenger target)
101 	: MessagingTargetSet(),
102 	  fAtBeginning(true)
103 {
104 	BMessenger::Private messengerPrivate(target);
105 	fPort = messengerPrivate.Port();
106 	fToken = (messengerPrivate.IsPreferredTarget()
107 		? B_PREFERRED_TOKEN : messengerPrivate.Token());
108 }
109 
110 // constructor
111 SingleMessagingTargetSet::SingleMessagingTargetSet(port_id port, int32 token)
112 	: MessagingTargetSet(),
113 	  fPort(port),
114 	  fToken(token),
115 	  fAtBeginning(true)
116 {
117 }
118 
119 // destructor
120 SingleMessagingTargetSet::~SingleMessagingTargetSet()
121 {
122 }
123 
124 // HasNext
125 bool
126 SingleMessagingTargetSet::HasNext() const
127 {
128 	return fAtBeginning;
129 }
130 
131 // Next
132 bool
133 SingleMessagingTargetSet::Next(port_id &port, int32 &token)
134 {
135 	if (!fAtBeginning)
136 		return false;
137 
138 	port = fPort;
139 	token = fToken;
140 	fAtBeginning = false;
141 
142 	return true;
143 }
144 
145 // Rewind
146 void
147 SingleMessagingTargetSet::Rewind()
148 {
149 	fAtBeginning = true;
150 }
151 
152 
153 // #pragma mark -
154 
155 // Message
156 /*!	\brief Encapsulates a message to be delivered.
157 
158 	Besides the flattened message it also stores the when the message was
159 	created and when the delivery attempts shall time out.
160 */
161 class MessageDeliverer::Message : public Referenceable {
162 public:
163 	Message(void *data, int32 dataSize, bigtime_t timeout)
164 		: Referenceable(true),
165 		  fData(data),
166 		  fDataSize(dataSize),
167 		  fCreationTime(system_time()),
168 		  fBusy(false)
169 	{
170 		if (B_INFINITE_TIMEOUT - fCreationTime <= timeout)
171 			fTimeoutTime = B_INFINITE_TIMEOUT;
172 		else if (timeout <= 0)
173 			fTimeoutTime = fCreationTime;
174 		else
175 			fTimeoutTime = fCreationTime + timeout;
176 	}
177 
178 	~Message()
179 	{
180 		free(fData);
181 	}
182 
183 	void *Data() const
184 	{
185 		return fData;
186 	}
187 
188 	int32 DataSize() const
189 	{
190 		return fDataSize;
191 	}
192 
193 	bigtime_t CreationTime() const
194 	{
195 		return fCreationTime;
196 	}
197 
198 	bigtime_t TimeoutTime() const
199 	{
200 		return fTimeoutTime;
201 	}
202 
203 	bool HasTimeout() const
204 	{
205 		return (fTimeoutTime < B_INFINITE_TIMEOUT);
206 	}
207 
208 	void SetBusy(bool busy)
209 	{
210 		fBusy = busy;
211 	}
212 
213 	bool IsBusy() const
214 	{
215 		return fBusy;
216 	}
217 
218 private:
219 	void		*fData;
220 	int32		fDataSize;
221 	bigtime_t	fCreationTime;
222 	bigtime_t	fTimeoutTime;
223 	bool		fBusy;
224 };
225 
226 // TargetMessage
227 /*!	\brief Encapsulates a Message to be sent to a specific handler.
228 
229 	A TargetMessage is always associated with (i.e. queued in) a TargetPort.
230 	While a Message stores only the message data and some timing info, this
231 	object adds the token of a the target BHandler.
232 
233 	A Message can be referred to by more than one TargetMessage (when
234 	broadcasting), but a TargetMessage is referred to exactly once, by
235 	the TargetPort.
236 */
237 class MessageDeliverer::TargetMessage
238 	: public DoublyLinkedListLinkImpl<MessageDeliverer::TargetMessage> {
239 public:
240 	TargetMessage(Message *message, int32 token)
241 		: fMessage(message),
242 		  fToken(token)
243 	{
244 		if (fMessage)
245 			fMessage->AddReference();
246 	}
247 
248 	~TargetMessage()
249 	{
250 		if (fMessage)
251 			fMessage->RemoveReference();
252 	}
253 
254 	Message *GetMessage() const
255 	{
256 		return fMessage;
257 	}
258 
259 	int32 Token() const
260 	{
261 		return fToken;
262 	}
263 
264 private:
265 	Message				*fMessage;
266 	int32				fToken;
267 };
268 
269 // TargetMessageHandle
270 /*!	\brief A small wrapper for TargetMessage providing a complete order.
271 
272 	This class only exists to provide the comparison operators required to
273 	put a TargetMessage into a set. The order implemented is by ascending by
274 	timeout time (primary) and by TargetMessage pointer (secondary).
275 	Hence TargetMessageHandles referring to the same TargetMessage are equal
276 	(and only those).
277 */
278 class MessageDeliverer::TargetMessageHandle {
279 public:
280 	TargetMessageHandle(TargetMessage *message)
281 		: fMessage(message)
282 	{
283 	}
284 
285 	TargetMessageHandle(const TargetMessageHandle &other)
286 		: fMessage(other.fMessage)
287 	{
288 	}
289 
290 	TargetMessage *GetMessage() const
291 	{
292 		return fMessage;
293 	}
294 
295 	TargetMessageHandle &operator=(const TargetMessageHandle &other)
296 	{
297 		fMessage = other.fMessage;
298 		return *this;
299 	}
300 
301 	bool operator==(const TargetMessageHandle &other) const
302 	{
303 		return (fMessage == other.fMessage);
304 	}
305 
306 	bool operator!=(const TargetMessageHandle &other) const
307 	{
308 		return (fMessage != other.fMessage);
309 	}
310 
311 	bool operator<(const TargetMessageHandle &other) const
312 	{
313 		bigtime_t timeout = fMessage->GetMessage()->TimeoutTime();
314 		bigtime_t otherTimeout = other.fMessage->GetMessage()->TimeoutTime();
315 		if (timeout < otherTimeout)
316 			return true;
317 		if (timeout > otherTimeout)
318 			return false;
319 		return (fMessage < other.fMessage);
320 	}
321 
322 private:
323 	TargetMessage	*fMessage;
324 };
325 
326 // TargetPort
327 /*!	\brief Represents a full target port, queuing the not yet delivered
328 		   messages.
329 
330 	A TargetPort internally queues TargetMessages in the order the are to be
331 	delivered. Furthermore the object maintains an ordered set of
332 	TargetMessages that can timeout (in ascending order of timeout time), so
333 	that timed out messages can be dropped easily.
334 */
335 class MessageDeliverer::TargetPort {
336 public:
337 	TargetPort(port_id portID)
338 		: fPortID(portID),
339 		  fMessages(),
340 		  fMessageCount(0),
341 		  fMessageSize(0)
342 	{
343 	}
344 
345 	~TargetPort()
346 	{
347 		while (!fMessages.IsEmpty())
348 			PopMessage();
349 	}
350 
351 	port_id PortID() const
352 	{
353 		return fPortID;
354 	}
355 
356 	status_t PushMessage(Message *message, int32 token)
357 	{
358 PRINT(("MessageDeliverer::TargetPort::PushMessage(port: %ld, %p, %ld)\n",
359 fPortID, message, token));
360 		// create a target message
361 		TargetMessage *targetMessage
362 			= new(nothrow) TargetMessage(message, token);
363 		if (!targetMessage)
364 			return B_NO_MEMORY;
365 
366 		// push it
367 		fMessages.Insert(targetMessage);
368 		fMessageCount++;
369 		fMessageSize += targetMessage->GetMessage()->DataSize();
370 
371 		// add it to the timeoutable messages, if it has a timeout
372 		if (message->HasTimeout())
373 			fTimeoutableMessages.insert(targetMessage);
374 
375 		_EnforceLimits();
376 
377 		return B_OK;
378 	}
379 
380 	Message *PeekMessage(int32 &token) const
381 	{
382 		if (!fMessages.Head())
383 			return NULL;
384 
385 		token = fMessages.Head()->Token();
386 		return fMessages.Head()->GetMessage();
387 	}
388 
389 	void PopMessage()
390 	{
391 		if (fMessages.Head()) {
392 PRINT(("MessageDeliverer::TargetPort::PopMessage(): port: %ld, %p\n",
393 fPortID, fMessages.Head()->GetMessage()));
394 			_RemoveMessage(fMessages.Head());
395 		}
396 	}
397 
398 	void DropTimedOutMessages()
399 	{
400 		bigtime_t now = system_time();
401 
402 		while (fTimeoutableMessages.begin() != fTimeoutableMessages.end()) {
403 			TargetMessage *message = fTimeoutableMessages.begin()->GetMessage();
404 			if (message->GetMessage()->TimeoutTime() > now)
405 				break;
406 
407 PRINT(("MessageDeliverer::TargetPort::DropTimedOutMessages(): port: %ld: "
408 "message %p timed out\n", fPortID, message->GetMessage()));
409 			_RemoveMessage(message);
410 		}
411 	}
412 
413 	bool IsEmpty() const
414 	{
415 		return fMessages.IsEmpty();
416 	}
417 
418 private:
419 	void _RemoveMessage(TargetMessage *message)
420 	{
421 		fMessages.Remove(message);
422 		fMessageCount--;
423 		fMessageSize -= message->GetMessage()->DataSize();
424 
425 		if (message->GetMessage()->HasTimeout())
426 			fTimeoutableMessages.erase(message);
427 
428 		delete message;
429 	}
430 
431 	void _EnforceLimits()
432 	{
433 		// message count
434 		while (fMessageCount > kMaxMessagesPerPort) {
435 PRINT(("MessageDeliverer::TargetPort::_EnforceLimits(): port: %ld: hit maximum "
436 "message count limit.\n", fPortID));
437 			PopMessage();
438 		}
439 
440 		// message size
441 		while (fMessageSize > kMaxDataPerPort) {
442 PRINT(("MessageDeliverer::TargetPort::_EnforceLimits(): port: %ld: hit maximum "
443 "message size limit.\n", fPortID));
444 			PopMessage();
445 		}
446 	}
447 
448 	typedef DoublyLinkedList<TargetMessage>	MessageList;
449 
450 	port_id						fPortID;
451 	MessageList					fMessages;
452 	int32						fMessageCount;
453 	int32						fMessageSize;
454 	set<TargetMessageHandle>	fTimeoutableMessages;
455 };
456 
457 // TargetPortMap
458 struct MessageDeliverer::TargetPortMap : public map<port_id, TargetPort*> {
459 };
460 
461 
462 // #pragma mark -
463 
464 /*!	\class MessageDeliverer
465 	\brief Service for delivering messages, which retries the delivery as long
466 		   as the target port is full.
467 
468 	For the user of the service only the MessageDeliverer::DeliverMessage()
469 	will be of interest. Some of them allow broadcasting a message to several
470 	recepients.
471 
472 	The class maintains a TargetPort for each target port which was full at the
473 	time a message was to be delivered to it. A TargetPort has a queue of
474 	undelivered messages. A separate worker thread retries periodically to send
475 	the yet undelivered messages to the respective target ports.
476 */
477 
478 // constructor
479 MessageDeliverer::MessageDeliverer()
480 	: fLock("message deliverer"),
481 	  fTargetPorts(NULL),
482 	  fDelivererThread(-1),
483 	  fTerminating(false)
484 {
485 }
486 
487 // destructor
488 MessageDeliverer::~MessageDeliverer()
489 {
490 	fTerminating = true;
491 
492 	if (fDelivererThread >= 0) {
493 		int32 result;
494 		wait_for_thread(fDelivererThread, &result);
495 	}
496 
497 	delete fTargetPorts;
498 }
499 
500 // Init
501 status_t
502 MessageDeliverer::Init()
503 {
504 	// create the target port map
505 	fTargetPorts = new(nothrow) TargetPortMap;
506 	if (!fTargetPorts)
507 		return B_NO_MEMORY;
508 
509 	// spawn the deliverer thread
510 	fDelivererThread = spawn_thread(MessageDeliverer::_DelivererThreadEntry,
511 		"message deliverer", B_NORMAL_PRIORITY, this);
512 	if (fDelivererThread < 0)
513 		return fDelivererThread;
514 
515 	// resume the deliverer thread
516 	resume_thread(fDelivererThread);
517 
518 	return B_OK;
519 }
520 
521 // CreateDefault
522 status_t
523 MessageDeliverer::CreateDefault()
524 {
525 	if (sDeliverer)
526 		return B_OK;
527 
528 	// create the deliverer
529 	MessageDeliverer *deliverer = new(nothrow) MessageDeliverer;
530 	if (!deliverer)
531 		return B_NO_MEMORY;
532 
533 	// init it
534 	status_t error = deliverer->Init();
535 	if (error != B_OK) {
536 		delete deliverer;
537 		return error;
538 	}
539 
540 	sDeliverer = deliverer;
541 	return B_OK;
542 }
543 
544 // DeleteDefault
545 void
546 MessageDeliverer::DeleteDefault()
547 {
548 	if (sDeliverer) {
549 		delete sDeliverer;
550 		sDeliverer = NULL;
551 	}
552 }
553 
554 // Default
555 MessageDeliverer *
556 MessageDeliverer::Default()
557 {
558 	return sDeliverer;
559 }
560 
561 // DeliverMessage
562 /*!	\brief Delivers a message to the supplied target.
563 
564 	The method tries to send the message right now (if there are not already
565 	messages pending for the target port). If that fails due to a full target
566 	port, the message is queued for later delivery.
567 
568 	\param message The message to be delivered.
569 	\param target A BMessenger identifying the delivery target.
570 	\param timeout If given, the message will be dropped, when it couldn't be
571 		   delivered after this amount of microseconds.
572 	\return
573 	- \c B_OK, if sending the message succeeded or if the target port was
574 	  full and the message has been queued,
575 	- another error code otherwise.
576 */
577 status_t
578 MessageDeliverer::DeliverMessage(BMessage *message, BMessenger target,
579 	bigtime_t timeout)
580 {
581 	SingleMessagingTargetSet set(target);
582 	return DeliverMessage(message, set, timeout);
583 }
584 
585 // DeliverMessage
586 /*!	\brief Delivers a message to the supplied targets.
587 
588 	The method tries to send the message right now to each of the given targets
589 	(if there are not already messages pending for a target port). If that
590 	fails due to a full target port, the message is queued for later delivery.
591 
592 	\param message The message to be delivered.
593 	\param targets MessagingTargetSet providing the the delivery targets.
594 	\param timeout If given, the message will be dropped, when it couldn't be
595 		   delivered after this amount of microseconds.
596 	\return
597 	- \c B_OK, if for each of the given targets sending the message succeeded
598 	  or if the target port was full and the message has been queued,
599 	- another error code otherwise.
600 */
601 status_t
602 MessageDeliverer::DeliverMessage(BMessage *message, MessagingTargetSet &targets,
603 	bigtime_t timeout)
604 {
605 	if (!message)
606 		return B_BAD_VALUE;
607 
608 	// flatten the message
609 	BMallocIO mallocIO;
610 	status_t error = BMessage::Private(message).NativeFlatten(&mallocIO, NULL);
611 	if (error < B_OK)
612 		return error;
613 
614 	return DeliverMessage(mallocIO.Buffer(), mallocIO.BufferLength(), targets,
615 		timeout);
616 }
617 
618 // DeliverMessage
619 /*!	\brief Delivers a flattened message to the supplied targets.
620 
621 	The method tries to send the message right now to each of the given targets
622 	(if there are not already messages pending for a target port). If that
623 	fails due to a full target port, the message is queued for later delivery.
624 
625 	\param message The flattened message to be delivered. This may be a
626 		   flattened BMessage or KMessage.
627 	\param messageSize The size of the flattened message buffer.
628 	\param targets MessagingTargetSet providing the the delivery targets.
629 	\param timeout If given, the message will be dropped, when it couldn't be
630 		   delivered after this amount of microseconds.
631 	\return
632 	- \c B_OK, if for each of the given targets sending the message succeeded
633 	  or if the target port was full and the message has been queued,
634 	- another error code otherwise.
635 */
636 status_t
637 MessageDeliverer::DeliverMessage(const void *messageData, int32 messageSize,
638 	MessagingTargetSet &targets, bigtime_t timeout)
639 {
640 	if (!messageData || messageSize <= 0)
641 		return B_BAD_VALUE;
642 
643 	// clone the buffer
644 	void *data = malloc(messageSize);
645 	if (!data)
646 		return B_NO_MEMORY;
647 	memcpy(data, messageData, messageSize);
648 
649 	// create a Message
650 	Message *message = new(nothrow) Message(data, messageSize, timeout);
651 	if (!message) {
652 		free(data);
653 		return B_NO_MEMORY;
654 	}
655 	Reference<Message> _(message, true);
656 
657 	// add the message to the respective target ports
658 	BAutolock locker(fLock);
659 	for (int32 targetIndex = 0; targets.HasNext(); targetIndex++) {
660 		port_id portID;
661 		int32 token;
662 		targets.Next(portID, token);
663 
664 		// get the target port
665 		TargetPort *port = _GetTargetPort(portID, true);
666 		if (!port)
667 			return B_NO_MEMORY;
668 
669 		// try sending the message, if there are no queued messages yet
670 		if (port->IsEmpty()) {
671 			status_t error = _SendMessage(message, portID, token);
672 			// if the message was delivered OK, we're done with the target
673 			if (error == B_OK) {
674 				_PutTargetPort(port);
675 				continue;
676 			}
677 
678 			// if the port is not full, but an error occurred, we skip this target
679 			if (error != B_WOULD_BLOCK) {
680 				_PutTargetPort(port);
681 				if (targetIndex == 0 && !targets.HasNext())
682 					return error;
683 				continue;
684 			}
685 		}
686 
687 		// add the message
688 		status_t error = port->PushMessage(message, token);
689 		_PutTargetPort(port);
690 		if (error != B_OK)
691 			return error;
692 	}
693 
694 	return B_OK;
695 }
696 
697 // _GetTargetPort
698 MessageDeliverer::TargetPort *
699 MessageDeliverer::_GetTargetPort(port_id portID, bool create)
700 {
701 	// get the port from the map
702 	TargetPortMap::iterator it = fTargetPorts->find(portID);
703 	if (it != fTargetPorts->end())
704 		return it->second;
705 
706 	if (!create)
707 		return NULL;
708 
709 	// create a port
710 	TargetPort *port = new(nothrow) TargetPort(portID);
711 	if (!port)
712 		return NULL;
713 	(*fTargetPorts)[portID] = port;
714 
715 	return port;
716 }
717 
718 // _PutTargetPort
719 void
720 MessageDeliverer::_PutTargetPort(TargetPort *port)
721 {
722 	if (!port)
723 		return;
724 
725 	if (port->IsEmpty()) {
726 		fTargetPorts->erase(port->PortID());
727 		delete port;
728 	}
729 }
730 
731 // _SendMessage
732 status_t
733 MessageDeliverer::_SendMessage(Message *message, port_id portID, int32 token)
734 {
735 	status_t error = BMessage::Private::SendFlattenedMessage(message->Data(),
736 		message->DataSize(), portID, token, 0);
737 //PRINT(("MessageDeliverer::_SendMessage(%p, port: %ld, token: %ld): %lx\n",
738 //message, portID, token, error));
739 	return error;
740 }
741 
742 // _DelivererThreadEntry
743 int32
744 MessageDeliverer::_DelivererThreadEntry(void *data)
745 {
746 	return ((MessageDeliverer*)data)->_DelivererThread();
747 }
748 
749 // _DelivererThread
750 int32
751 MessageDeliverer::_DelivererThread()
752 {
753 	while (!fTerminating) {
754 		snooze(kRetryDelay);
755 		if (fTerminating)
756 			break;
757 
758 		// iterate through all target ports and try sending the messages
759 		BAutolock _(fLock);
760 		for (TargetPortMap::iterator it = fTargetPorts->begin();
761 			 it != fTargetPorts->end();) {
762 			TargetPort *port = it->second;
763 			bool portError = false;
764 
765 			port->DropTimedOutMessages();
766 
767 			// try sending all messages
768 			int32 token;
769 			while (Message *message = port->PeekMessage(token)) {
770 				status_t error = B_OK;
771 //				if (message->TimeoutTime() > system_time()) {
772 					error = _SendMessage(message, port->PortID(), token);
773 //				} else {
774 //					// timeout, drop message
775 //					PRINT(("MessageDeliverer::_DelivererThread(): port %ld, "
776 //						"message %p timed out\n", port->PortID(), message));
777 //				}
778 
779 				if (error == B_OK) {
780 					port->PopMessage();
781 				} else if (error == B_WOULD_BLOCK) {
782 					// no luck yet -- port is still full
783 					break;
784 				} else {
785 					// unexpected error -- probably the port is gone
786 					portError = true;
787 					break;
788 				}
789 			}
790 
791 			// next port
792 			if (portError || port->IsEmpty()) {
793 				TargetPortMap::iterator oldIt = it;
794 				++it;
795 				delete port;
796 				fTargetPorts->erase(oldIt);
797 			} else
798 				++it;
799 		}
800 	}
801 
802 	return 0;
803 }
804