1 /*
2 * Copyright 2005, Ingo Weinhold, bonefish@users.sf.net. All rights reserved.
3 * Distributed under the terms of the MIT License.
4 */
5
6 #include <map>
7 #include <new>
8 #include <set>
9 #include <string.h>
10
11 #include <AutoDeleter.h>
12 #include <Autolock.h>
13 #include <DataIO.h>
14 #include <MessagePrivate.h>
15 #include <MessengerPrivate.h>
16 #include <OS.h>
17 #include <TokenSpace.h>
18 #include <util/DoublyLinkedList.h>
19
20 #include <messaging.h>
21
22 #include "Debug.h"
23 #include "MessageDeliverer.h"
24 #include "Referenceable.h"
25
26 using std::map;
27 using std::nothrow;
28 using std::set;
29
30 // sDeliverer -- the singleton instance
31 MessageDeliverer *MessageDeliverer::sDeliverer = NULL;
32
33 static const bigtime_t kRetryDelay = 100000; // 100 ms
34
35 // per port sanity limits
36 static const int32 kMaxMessagesPerPort = 10000;
37 static const int32 kMaxDataPerPort = 50 * 1024 * 1024; // 50 MB
38
39
40 // MessagingTargetSet
41
42 // destructor
~MessagingTargetSet()43 MessagingTargetSet::~MessagingTargetSet()
44 {
45 }
46
47
48 // #pragma mark -
49
50 // DefaultMessagingTargetSet
51
52 // constructor
DefaultMessagingTargetSet(const messaging_target * targets,int32 targetCount)53 DefaultMessagingTargetSet::DefaultMessagingTargetSet(
54 const messaging_target *targets, int32 targetCount)
55 : MessagingTargetSet(),
56 fTargets(targets),
57 fTargetCount(targetCount),
58 fNextIndex(0)
59 {
60 }
61
62 // destructor
~DefaultMessagingTargetSet()63 DefaultMessagingTargetSet::~DefaultMessagingTargetSet()
64 {
65 }
66
67 // HasNext
68 bool
HasNext() const69 DefaultMessagingTargetSet::HasNext() const
70 {
71 return (fNextIndex < fTargetCount);
72 }
73
74 // Next
75 bool
Next(port_id & port,int32 & token)76 DefaultMessagingTargetSet::Next(port_id &port, int32 &token)
77 {
78 if (fNextIndex >= fTargetCount)
79 return false;
80
81 port = fTargets[fNextIndex].port;
82 token = fTargets[fNextIndex].token;
83 fNextIndex++;
84
85 return true;
86 }
87
88 // Rewind
89 void
Rewind()90 DefaultMessagingTargetSet::Rewind()
91 {
92 fNextIndex = 0;
93 }
94
95
96 // #pragma mark -
97
98 // SingleMessagingTargetSet
99
100 // constructor
SingleMessagingTargetSet(BMessenger target)101 SingleMessagingTargetSet::SingleMessagingTargetSet(BMessenger target)
102 : MessagingTargetSet(),
103 fAtBeginning(true)
104 {
105 BMessenger::Private messengerPrivate(target);
106 fPort = messengerPrivate.Port();
107 fToken = (messengerPrivate.IsPreferredTarget()
108 ? B_PREFERRED_TOKEN : messengerPrivate.Token());
109 }
110
111 // constructor
SingleMessagingTargetSet(port_id port,int32 token)112 SingleMessagingTargetSet::SingleMessagingTargetSet(port_id port, int32 token)
113 : MessagingTargetSet(),
114 fPort(port),
115 fToken(token),
116 fAtBeginning(true)
117 {
118 }
119
120 // destructor
~SingleMessagingTargetSet()121 SingleMessagingTargetSet::~SingleMessagingTargetSet()
122 {
123 }
124
125 // HasNext
126 bool
HasNext() const127 SingleMessagingTargetSet::HasNext() const
128 {
129 return fAtBeginning;
130 }
131
132 // Next
133 bool
Next(port_id & port,int32 & token)134 SingleMessagingTargetSet::Next(port_id &port, int32 &token)
135 {
136 if (!fAtBeginning)
137 return false;
138
139 port = fPort;
140 token = fToken;
141 fAtBeginning = false;
142
143 return true;
144 }
145
146 // Rewind
147 void
Rewind()148 SingleMessagingTargetSet::Rewind()
149 {
150 fAtBeginning = true;
151 }
152
153
154 // #pragma mark -
155
156 // Message
157 /*! \brief Encapsulates a message to be delivered.
158
159 Besides the flattened message it also stores the when the message was
160 created and when the delivery attempts shall time out.
161 */
162 class MessageDeliverer::Message : public BReferenceable {
163 public:
Message(void * data,int32 dataSize,bigtime_t timeout)164 Message(void *data, int32 dataSize, bigtime_t timeout)
165 : BReferenceable(),
166 fData(data),
167 fDataSize(dataSize),
168 fCreationTime(system_time()),
169 fBusy(false)
170 {
171 if (B_INFINITE_TIMEOUT - fCreationTime <= timeout)
172 fTimeoutTime = B_INFINITE_TIMEOUT;
173 else if (timeout <= 0)
174 fTimeoutTime = fCreationTime;
175 else
176 fTimeoutTime = fCreationTime + timeout;
177 }
178
~Message()179 ~Message()
180 {
181 free(fData);
182 }
183
Data() const184 void *Data() const
185 {
186 return fData;
187 }
188
DataSize() const189 int32 DataSize() const
190 {
191 return fDataSize;
192 }
193
CreationTime() const194 bigtime_t CreationTime() const
195 {
196 return fCreationTime;
197 }
198
TimeoutTime() const199 bigtime_t TimeoutTime() const
200 {
201 return fTimeoutTime;
202 }
203
HasTimeout() const204 bool HasTimeout() const
205 {
206 return (fTimeoutTime < B_INFINITE_TIMEOUT);
207 }
208
SetBusy(bool busy)209 void SetBusy(bool busy)
210 {
211 fBusy = busy;
212 }
213
IsBusy() const214 bool IsBusy() const
215 {
216 return fBusy;
217 }
218
219 private:
220 void *fData;
221 int32 fDataSize;
222 bigtime_t fCreationTime;
223 bigtime_t fTimeoutTime;
224 bool fBusy;
225 };
226
227 // TargetMessage
228 /*! \brief Encapsulates a Message to be sent to a specific handler.
229
230 A TargetMessage is always associated with (i.e. queued in) a TargetPort.
231 While a Message stores only the message data and some timing info, this
232 object adds the token of a the target BHandler.
233
234 A Message can be referred to by more than one TargetMessage (when
235 broadcasting), but a TargetMessage is referred to exactly once, by
236 the TargetPort.
237 */
238 class MessageDeliverer::TargetMessage
239 : public DoublyLinkedListLinkImpl<MessageDeliverer::TargetMessage> {
240 public:
TargetMessage(Message * message,int32 token)241 TargetMessage(Message *message, int32 token)
242 : fMessage(message),
243 fToken(token)
244 {
245 if (fMessage)
246 fMessage->AcquireReference();
247 }
248
~TargetMessage()249 ~TargetMessage()
250 {
251 if (fMessage)
252 fMessage->ReleaseReference();
253 }
254
GetMessage() const255 Message *GetMessage() const
256 {
257 return fMessage;
258 }
259
Token() const260 int32 Token() const
261 {
262 return fToken;
263 }
264
265 private:
266 Message *fMessage;
267 int32 fToken;
268 };
269
270 // TargetMessageHandle
271 /*! \brief A small wrapper for TargetMessage providing a complete order.
272
273 This class only exists to provide the comparison operators required to
274 put a TargetMessage into a set. The order implemented is by ascending by
275 timeout time (primary) and by TargetMessage pointer (secondary).
276 Hence TargetMessageHandles referring to the same TargetMessage are equal
277 (and only those).
278 */
279 class MessageDeliverer::TargetMessageHandle {
280 public:
TargetMessageHandle(TargetMessage * message)281 TargetMessageHandle(TargetMessage *message)
282 : fMessage(message)
283 {
284 }
285
TargetMessageHandle(const TargetMessageHandle & other)286 TargetMessageHandle(const TargetMessageHandle &other)
287 : fMessage(other.fMessage)
288 {
289 }
290
GetMessage() const291 TargetMessage *GetMessage() const
292 {
293 return fMessage;
294 }
295
operator =(const TargetMessageHandle & other)296 TargetMessageHandle &operator=(const TargetMessageHandle &other)
297 {
298 fMessage = other.fMessage;
299 return *this;
300 }
301
operator ==(const TargetMessageHandle & other) const302 bool operator==(const TargetMessageHandle &other) const
303 {
304 return (fMessage == other.fMessage);
305 }
306
operator !=(const TargetMessageHandle & other) const307 bool operator!=(const TargetMessageHandle &other) const
308 {
309 return (fMessage != other.fMessage);
310 }
311
operator <(const TargetMessageHandle & other) const312 bool operator<(const TargetMessageHandle &other) const
313 {
314 bigtime_t timeout = fMessage->GetMessage()->TimeoutTime();
315 bigtime_t otherTimeout = other.fMessage->GetMessage()->TimeoutTime();
316 if (timeout < otherTimeout)
317 return true;
318 if (timeout > otherTimeout)
319 return false;
320 return (fMessage < other.fMessage);
321 }
322
323 private:
324 TargetMessage *fMessage;
325 };
326
327 // TargetPort
328 /*! \brief Represents a full target port, queuing the not yet delivered
329 messages.
330
331 A TargetPort internally queues TargetMessages in the order the are to be
332 delivered. Furthermore the object maintains an ordered set of
333 TargetMessages that can timeout (in ascending order of timeout time), so
334 that timed out messages can be dropped easily.
335 */
336 class MessageDeliverer::TargetPort {
337 public:
TargetPort(port_id portID)338 TargetPort(port_id portID)
339 : fPortID(portID),
340 fMessages(),
341 fMessageCount(0),
342 fMessageSize(0)
343 {
344 }
345
~TargetPort()346 ~TargetPort()
347 {
348 while (!fMessages.IsEmpty())
349 PopMessage();
350 }
351
PortID() const352 port_id PortID() const
353 {
354 return fPortID;
355 }
356
PushMessage(Message * message,int32 token)357 status_t PushMessage(Message *message, int32 token)
358 {
359 PRINT("MessageDeliverer::TargetPort::PushMessage(port: %" B_PRId32 ", %p, %"
360 B_PRId32 ")\n", fPortID, message, token);
361 // create a target message
362 TargetMessage *targetMessage
363 = new(nothrow) TargetMessage(message, token);
364 if (!targetMessage)
365 return B_NO_MEMORY;
366
367 // push it
368 fMessages.Insert(targetMessage);
369 fMessageCount++;
370 fMessageSize += targetMessage->GetMessage()->DataSize();
371
372 // add it to the timeoutable messages, if it has a timeout
373 if (message->HasTimeout())
374 fTimeoutableMessages.insert(targetMessage);
375
376 _EnforceLimits();
377
378 return B_OK;
379 }
380
PeekMessage(int32 & token) const381 Message *PeekMessage(int32 &token) const
382 {
383 if (!fMessages.Head())
384 return NULL;
385
386 token = fMessages.Head()->Token();
387 return fMessages.Head()->GetMessage();
388 }
389
PopMessage()390 void PopMessage()
391 {
392 if (fMessages.Head()) {
393 PRINT("MessageDeliverer::TargetPort::PopMessage(): port: %" B_PRId32 ", %p\n",
394 fPortID, fMessages.Head()->GetMessage());
395 _RemoveMessage(fMessages.Head());
396 }
397 }
398
DropTimedOutMessages()399 void DropTimedOutMessages()
400 {
401 bigtime_t now = system_time();
402
403 while (fTimeoutableMessages.begin() != fTimeoutableMessages.end()) {
404 TargetMessage *message = fTimeoutableMessages.begin()->GetMessage();
405 if (message->GetMessage()->TimeoutTime() > now)
406 break;
407
408 PRINT("MessageDeliverer::TargetPort::DropTimedOutMessages(): port: %" B_PRId32
409 ": message %p timed out\n", fPortID, message->GetMessage());
410 _RemoveMessage(message);
411 }
412 }
413
IsEmpty() const414 bool IsEmpty() const
415 {
416 return fMessages.IsEmpty();
417 }
418
419 private:
_RemoveMessage(TargetMessage * message)420 void _RemoveMessage(TargetMessage *message)
421 {
422 fMessages.Remove(message);
423 fMessageCount--;
424 fMessageSize -= message->GetMessage()->DataSize();
425
426 if (message->GetMessage()->HasTimeout())
427 fTimeoutableMessages.erase(message);
428
429 delete message;
430 }
431
_EnforceLimits()432 void _EnforceLimits()
433 {
434 // message count
435 while (fMessageCount > kMaxMessagesPerPort) {
436 PRINT("MessageDeliverer::TargetPort::_EnforceLimits(): port: %" B_PRId32
437 ": hit maximum message count limit.\n", fPortID);
438 PopMessage();
439 }
440
441 // message size
442 while (fMessageSize > kMaxDataPerPort) {
443 PRINT("MessageDeliverer::TargetPort::_EnforceLimits(): port: %" B_PRId32
444 ": hit maximum message size limit.\n", fPortID);
445 PopMessage();
446 }
447 }
448
449 typedef DoublyLinkedList<TargetMessage> MessageList;
450
451 port_id fPortID;
452 MessageList fMessages;
453 int32 fMessageCount;
454 int32 fMessageSize;
455 set<TargetMessageHandle> fTimeoutableMessages;
456 };
457
458 // TargetPortMap
459 struct MessageDeliverer::TargetPortMap : public map<port_id, TargetPort*> {
460 };
461
462
463 // #pragma mark -
464
465 /*! \class MessageDeliverer
466 \brief Service for delivering messages, which retries the delivery as long
467 as the target port is full.
468
469 For the user of the service only the MessageDeliverer::DeliverMessage()
470 will be of interest. Some of them allow broadcasting a message to several
471 recepients.
472
473 The class maintains a TargetPort for each target port which was full at the
474 time a message was to be delivered to it. A TargetPort has a queue of
475 undelivered messages. A separate worker thread retries periodically to send
476 the yet undelivered messages to the respective target ports.
477 */
478
479 // constructor
MessageDeliverer()480 MessageDeliverer::MessageDeliverer()
481 : fLock("message deliverer"),
482 fTargetPorts(NULL),
483 fDelivererThread(-1),
484 fTerminating(false)
485 {
486 }
487
488 // destructor
~MessageDeliverer()489 MessageDeliverer::~MessageDeliverer()
490 {
491 fTerminating = true;
492
493 if (fDelivererThread >= 0) {
494 int32 result;
495 wait_for_thread(fDelivererThread, &result);
496 }
497
498 delete fTargetPorts;
499 }
500
501 // Init
502 status_t
Init()503 MessageDeliverer::Init()
504 {
505 // create the target port map
506 fTargetPorts = new(nothrow) TargetPortMap;
507 if (!fTargetPorts)
508 return B_NO_MEMORY;
509
510 // spawn the deliverer thread
511 fDelivererThread = spawn_thread(MessageDeliverer::_DelivererThreadEntry,
512 "message deliverer", B_NORMAL_PRIORITY + 1, this);
513 if (fDelivererThread < 0)
514 return fDelivererThread;
515
516 // resume the deliverer thread
517 resume_thread(fDelivererThread);
518
519 return B_OK;
520 }
521
522 // CreateDefault
523 status_t
CreateDefault()524 MessageDeliverer::CreateDefault()
525 {
526 if (sDeliverer)
527 return B_OK;
528
529 // create the deliverer
530 MessageDeliverer *deliverer = new(nothrow) MessageDeliverer;
531 if (!deliverer)
532 return B_NO_MEMORY;
533
534 // init it
535 status_t error = deliverer->Init();
536 if (error != B_OK) {
537 delete deliverer;
538 return error;
539 }
540
541 sDeliverer = deliverer;
542 return B_OK;
543 }
544
545 // DeleteDefault
546 void
DeleteDefault()547 MessageDeliverer::DeleteDefault()
548 {
549 if (sDeliverer) {
550 delete sDeliverer;
551 sDeliverer = NULL;
552 }
553 }
554
555 // Default
556 MessageDeliverer *
Default()557 MessageDeliverer::Default()
558 {
559 return sDeliverer;
560 }
561
562 // DeliverMessage
563 /*! \brief Delivers a message to the supplied target.
564
565 The method tries to send the message right now (if there are not already
566 messages pending for the target port). If that fails due to a full target
567 port, the message is queued for later delivery.
568
569 \param message The message to be delivered.
570 \param target A BMessenger identifying the delivery target.
571 \param timeout If given, the message will be dropped, when it couldn't be
572 delivered after this amount of microseconds.
573 \return
574 - \c B_OK, if sending the message succeeded or if the target port was
575 full and the message has been queued,
576 - another error code otherwise.
577 */
578 status_t
DeliverMessage(BMessage * message,BMessenger target,bigtime_t timeout)579 MessageDeliverer::DeliverMessage(BMessage *message, BMessenger target,
580 bigtime_t timeout)
581 {
582 SingleMessagingTargetSet set(target);
583 return DeliverMessage(message, set, timeout);
584 }
585
586 // DeliverMessage
587 /*! \brief Delivers a message to the supplied targets.
588
589 The method tries to send the message right now to each of the given targets
590 (if there are not already messages pending for a target port). If that
591 fails due to a full target port, the message is queued for later delivery.
592
593 \param message The message to be delivered.
594 \param targets MessagingTargetSet providing the the delivery targets.
595 \param timeout If given, the message will be dropped, when it couldn't be
596 delivered after this amount of microseconds.
597 \return
598 - \c B_OK, if for each of the given targets sending the message succeeded
599 or if the target port was full and the message has been queued,
600 - another error code otherwise.
601 */
602 status_t
DeliverMessage(BMessage * message,MessagingTargetSet & targets,bigtime_t timeout)603 MessageDeliverer::DeliverMessage(BMessage *message, MessagingTargetSet &targets,
604 bigtime_t timeout)
605 {
606 if (!message)
607 return B_BAD_VALUE;
608
609 // flatten the message
610 BMallocIO mallocIO;
611 status_t error = message->Flatten(&mallocIO, NULL);
612 if (error < B_OK)
613 return error;
614
615 return DeliverMessage(mallocIO.Buffer(), mallocIO.BufferLength(), targets,
616 timeout);
617 }
618
619 // DeliverMessage
620 /*! \brief Delivers a flattened message to the supplied targets.
621
622 The method tries to send the message right now to each of the given targets
623 (if there are not already messages pending for a target port). If that
624 fails due to a full target port, the message is queued for later delivery.
625
626 \param message The flattened message to be delivered. This may be a
627 flattened BMessage or KMessage.
628 \param messageSize The size of the flattened message buffer.
629 \param targets MessagingTargetSet providing the the delivery targets.
630 \param timeout If given, the message will be dropped, when it couldn't be
631 delivered after this amount of microseconds.
632 \return
633 - \c B_OK, if for each of the given targets sending the message succeeded
634 or if the target port was full and the message has been queued,
635 - another error code otherwise.
636 */
637 status_t
DeliverMessage(const void * messageData,int32 messageSize,MessagingTargetSet & targets,bigtime_t timeout)638 MessageDeliverer::DeliverMessage(const void *messageData, int32 messageSize,
639 MessagingTargetSet &targets, bigtime_t timeout)
640 {
641 if (!messageData || messageSize <= 0)
642 return B_BAD_VALUE;
643
644 // clone the buffer
645 void *data = malloc(messageSize);
646 if (!data)
647 return B_NO_MEMORY;
648 memcpy(data, messageData, messageSize);
649
650 // create a Message
651 Message *message = new(nothrow) Message(data, messageSize, timeout);
652 if (!message) {
653 free(data);
654 return B_NO_MEMORY;
655 }
656 BReference<Message> _(message, true);
657
658 // add the message to the respective target ports
659 BAutolock locker(fLock);
660 for (int32 targetIndex = 0; targets.HasNext(); targetIndex++) {
661 port_id portID;
662 int32 token;
663 targets.Next(portID, token);
664
665 // get the target port
666 TargetPort *port = _GetTargetPort(portID, true);
667 if (!port)
668 return B_NO_MEMORY;
669
670 // try sending the message, if there are no queued messages yet
671 if (port->IsEmpty()) {
672 status_t error = _SendMessage(message, portID, token);
673 // if the message was delivered OK, we're done with the target
674 if (error == B_OK) {
675 _PutTargetPort(port);
676 continue;
677 }
678
679 // if the port is not full, but an error occurred, we skip this target
680 if (error != B_WOULD_BLOCK) {
681 _PutTargetPort(port);
682 if (targetIndex == 0 && !targets.HasNext())
683 return error;
684 continue;
685 }
686 }
687
688 // add the message
689 status_t error = port->PushMessage(message, token);
690 _PutTargetPort(port);
691 if (error != B_OK)
692 return error;
693 }
694
695 return B_OK;
696 }
697
698 // _GetTargetPort
699 MessageDeliverer::TargetPort *
_GetTargetPort(port_id portID,bool create)700 MessageDeliverer::_GetTargetPort(port_id portID, bool create)
701 {
702 // get the port from the map
703 TargetPortMap::iterator it = fTargetPorts->find(portID);
704 if (it != fTargetPorts->end())
705 return it->second;
706
707 if (!create)
708 return NULL;
709
710 // create a port
711 TargetPort *port = new(nothrow) TargetPort(portID);
712 if (!port)
713 return NULL;
714 (*fTargetPorts)[portID] = port;
715
716 return port;
717 }
718
719 // _PutTargetPort
720 void
_PutTargetPort(TargetPort * port)721 MessageDeliverer::_PutTargetPort(TargetPort *port)
722 {
723 if (!port)
724 return;
725
726 if (port->IsEmpty()) {
727 fTargetPorts->erase(port->PortID());
728 delete port;
729 }
730 }
731
732 // _SendMessage
733 status_t
_SendMessage(Message * message,port_id portID,int32 token)734 MessageDeliverer::_SendMessage(Message *message, port_id portID, int32 token)
735 {
736 status_t error = BMessage::Private::SendFlattenedMessage(message->Data(),
737 message->DataSize(), portID, token, 0);
738 //PRINT("MessageDeliverer::_SendMessage(%p, port: %ld, token: %ld): %lx\n",
739 //message, portID, token, error);
740 return error;
741 }
742
743 // _DelivererThreadEntry
744 int32
_DelivererThreadEntry(void * data)745 MessageDeliverer::_DelivererThreadEntry(void *data)
746 {
747 return ((MessageDeliverer*)data)->_DelivererThread();
748 }
749
750 // _DelivererThread
751 int32
_DelivererThread()752 MessageDeliverer::_DelivererThread()
753 {
754 while (!fTerminating) {
755 snooze(kRetryDelay);
756 if (fTerminating)
757 break;
758
759 // iterate through all target ports and try sending the messages
760 BAutolock _(fLock);
761 for (TargetPortMap::iterator it = fTargetPorts->begin();
762 it != fTargetPorts->end();) {
763 TargetPort *port = it->second;
764 bool portError = false;
765
766 port->DropTimedOutMessages();
767
768 // try sending all messages
769 int32 token;
770 while (Message *message = port->PeekMessage(token)) {
771 status_t error = B_OK;
772 // if (message->TimeoutTime() > system_time()) {
773 error = _SendMessage(message, port->PortID(), token);
774 // } else {
775 // // timeout, drop message
776 // PRINT("MessageDeliverer::_DelivererThread(): port %ld, "
777 // "message %p timed out\n", port->PortID(), message);
778 // }
779
780 if (error == B_OK) {
781 port->PopMessage();
782 } else if (error == B_WOULD_BLOCK) {
783 // no luck yet -- port is still full
784 break;
785 } else {
786 // unexpected error -- probably the port is gone
787 portError = true;
788 break;
789 }
790 }
791
792 // next port
793 if (portError || port->IsEmpty()) {
794 TargetPortMap::iterator oldIt = it;
795 ++it;
796 delete port;
797 fTargetPorts->erase(oldIt);
798 } else
799 ++it;
800 }
801 }
802
803 return 0;
804 }
805