1493cced1SRene Gollent /*
2493cced1SRene Gollent * Copyright 2009-2011, Axel Dörfler, axeld@pinc-software.de.
3493cced1SRene Gollent * Copyright 2016, Rene Gollent, rene@gollent.com.
4493cced1SRene Gollent */
5493cced1SRene Gollent
6493cced1SRene Gollent
7493cced1SRene Gollent #include <SocketMessenger.h>
8493cced1SRene Gollent
9493cced1SRene Gollent #include <Message.h>
10*b8a71696SRene Gollent #include <MessageQueue.h>
11493cced1SRene Gollent #include <Messenger.h>
12493cced1SRene Gollent
13493cced1SRene Gollent #include <AutoDeleter.h>
14*b8a71696SRene Gollent #include <AutoLocker.h>
15*b8a71696SRene Gollent #include <HashMap.h>
16*b8a71696SRene Gollent
17*b8a71696SRene Gollent
18*b8a71696SRene Gollent static const char* kReplySenderIDField = "socket_messenger:sender_reply_id";
19*b8a71696SRene Gollent static const char* kReplyReceiverIDField = "socket_messenger:reply_id";
20*b8a71696SRene Gollent
21*b8a71696SRene Gollent
22*b8a71696SRene Gollent // #pragma mark - BSocketMessenger::Private
23*b8a71696SRene Gollent
24*b8a71696SRene Gollent
25*b8a71696SRene Gollent struct BSocketMessenger::Private {
26*b8a71696SRene Gollent typedef SynchronizedHashMap<HashKey64<int64>,
27*b8a71696SRene Gollent BMessage> ReplyMessageMap;
28*b8a71696SRene Gollent
29*b8a71696SRene Gollent Private();
30*b8a71696SRene Gollent virtual ~Private();
31*b8a71696SRene Gollent
32*b8a71696SRene Gollent void ClearMessages();
33*b8a71696SRene Gollent
34*b8a71696SRene Gollent sem_id fMessageWaiters;
35*b8a71696SRene Gollent thread_id fReplyReader;
36*b8a71696SRene Gollent ReplyMessageMap fReceivedReplies;
37*b8a71696SRene Gollent BMessageQueue fReceivedMessages;
38*b8a71696SRene Gollent int64 fReplyIDCounter;
39*b8a71696SRene Gollent };
40*b8a71696SRene Gollent
41*b8a71696SRene Gollent
Private()42*b8a71696SRene Gollent BSocketMessenger::Private::Private()
43*b8a71696SRene Gollent :
44*b8a71696SRene Gollent fMessageWaiters(-1),
45*b8a71696SRene Gollent fReplyReader(-1),
46*b8a71696SRene Gollent fReceivedReplies(),
47*b8a71696SRene Gollent fReceivedMessages(),
48*b8a71696SRene Gollent fReplyIDCounter(0)
49*b8a71696SRene Gollent {
50*b8a71696SRene Gollent }
51*b8a71696SRene Gollent
52*b8a71696SRene Gollent
~Private()53*b8a71696SRene Gollent BSocketMessenger::Private::~Private()
54*b8a71696SRene Gollent {
55*b8a71696SRene Gollent if (fMessageWaiters > 0)
56*b8a71696SRene Gollent delete_sem(fMessageWaiters);
57*b8a71696SRene Gollent if (fReplyReader > 0)
58*b8a71696SRene Gollent wait_for_thread(fReplyReader, NULL);
59*b8a71696SRene Gollent
60*b8a71696SRene Gollent ClearMessages();
61*b8a71696SRene Gollent }
62*b8a71696SRene Gollent
63*b8a71696SRene Gollent
64*b8a71696SRene Gollent void
ClearMessages()65*b8a71696SRene Gollent BSocketMessenger::Private::ClearMessages()
66*b8a71696SRene Gollent {
67*b8a71696SRene Gollent fReceivedReplies.Clear();
68*b8a71696SRene Gollent AutoLocker<BMessageQueue> queueLocker(fReceivedMessages);
69*b8a71696SRene Gollent while (!fReceivedMessages.IsEmpty())
70*b8a71696SRene Gollent delete fReceivedMessages.NextMessage();
71*b8a71696SRene Gollent }
72*b8a71696SRene Gollent
73*b8a71696SRene Gollent
74*b8a71696SRene Gollent // #pragma mark - BSocketMessenger
75493cced1SRene Gollent
76493cced1SRene Gollent
BSocketMessenger()77493cced1SRene Gollent BSocketMessenger::BSocketMessenger()
78493cced1SRene Gollent :
79*b8a71696SRene Gollent fPrivateData(NULL),
80493cced1SRene Gollent fSocket(),
81493cced1SRene Gollent fInitStatus(B_NO_INIT)
82493cced1SRene Gollent {
83*b8a71696SRene Gollent _Init();
84493cced1SRene Gollent }
85493cced1SRene Gollent
86493cced1SRene Gollent
BSocketMessenger(const BNetworkAddress & address,bigtime_t timeout)87493cced1SRene Gollent BSocketMessenger::BSocketMessenger(const BNetworkAddress& address,
88493cced1SRene Gollent bigtime_t timeout)
89*b8a71696SRene Gollent :
90*b8a71696SRene Gollent fPrivateData(NULL),
91*b8a71696SRene Gollent fSocket(),
92*b8a71696SRene Gollent fInitStatus(B_NO_INIT)
93493cced1SRene Gollent {
94*b8a71696SRene Gollent _Init();
95493cced1SRene Gollent SetTo(address, timeout);
96493cced1SRene Gollent }
97493cced1SRene Gollent
98493cced1SRene Gollent
BSocketMessenger(const BSocket & socket)99493cced1SRene Gollent BSocketMessenger::BSocketMessenger(const BSocket& socket)
100493cced1SRene Gollent :
101*b8a71696SRene Gollent fPrivateData(NULL),
102*b8a71696SRene Gollent fSocket(socket),
103*b8a71696SRene Gollent fInitStatus(B_NO_INIT)
104493cced1SRene Gollent {
105*b8a71696SRene Gollent _Init();
106*b8a71696SRene Gollent if (fPrivateData == NULL)
107*b8a71696SRene Gollent return;
108*b8a71696SRene Gollent
109493cced1SRene Gollent fInitStatus = socket.InitCheck();
110*b8a71696SRene Gollent if (fInitStatus != B_OK)
111*b8a71696SRene Gollent return;
112*b8a71696SRene Gollent
113*b8a71696SRene Gollent fPrivateData->fReplyReader = spawn_thread(&_MessageReader,
114*b8a71696SRene Gollent "Message Reader", B_NORMAL_PRIORITY, this);
115*b8a71696SRene Gollent if (fPrivateData->fReplyReader < 0)
116*b8a71696SRene Gollent fInitStatus = fPrivateData->fReplyReader;
117*b8a71696SRene Gollent if (fInitStatus != B_OK) {
118*b8a71696SRene Gollent exit_thread(fPrivateData->fReplyReader);
119*b8a71696SRene Gollent fPrivateData->fReplyReader = -1;
120*b8a71696SRene Gollent return;
121*b8a71696SRene Gollent }
122*b8a71696SRene Gollent
123*b8a71696SRene Gollent fInitStatus = resume_thread(fPrivateData->fReplyReader);
124493cced1SRene Gollent }
125493cced1SRene Gollent
126493cced1SRene Gollent
~BSocketMessenger()127493cced1SRene Gollent BSocketMessenger::~BSocketMessenger()
128493cced1SRene Gollent {
129493cced1SRene Gollent Unset();
130*b8a71696SRene Gollent
131*b8a71696SRene Gollent delete fPrivateData;
132493cced1SRene Gollent }
133493cced1SRene Gollent
134493cced1SRene Gollent
135493cced1SRene Gollent void
Unset()136493cced1SRene Gollent BSocketMessenger::Unset()
137493cced1SRene Gollent {
138*b8a71696SRene Gollent if (fPrivateData == NULL)
139*b8a71696SRene Gollent return;
140*b8a71696SRene Gollent
141493cced1SRene Gollent fSocket.Disconnect();
142*b8a71696SRene Gollent wait_for_thread(fPrivateData->fReplyReader, NULL);
143*b8a71696SRene Gollent fPrivateData->fReplyReader = -1;
144*b8a71696SRene Gollent fPrivateData->ClearMessages();
145*b8a71696SRene Gollent
146*b8a71696SRene Gollent release_sem_etc(fPrivateData->fMessageWaiters, 1, B_RELEASE_ALL);
147*b8a71696SRene Gollent
148493cced1SRene Gollent fInitStatus = B_NO_INIT;
149493cced1SRene Gollent }
150493cced1SRene Gollent
151493cced1SRene Gollent
152493cced1SRene Gollent status_t
SetTo(const BNetworkAddress & address,bigtime_t timeout)153493cced1SRene Gollent BSocketMessenger::SetTo(const BNetworkAddress& address, bigtime_t timeout)
154493cced1SRene Gollent {
155*b8a71696SRene Gollent Unset();
156*b8a71696SRene Gollent
157*b8a71696SRene Gollent if (fPrivateData == NULL)
158*b8a71696SRene Gollent return B_NO_MEMORY;
159*b8a71696SRene Gollent
160*b8a71696SRene Gollent fPrivateData->fReplyReader = spawn_thread(&_MessageReader,
161*b8a71696SRene Gollent "Message Reader", B_NORMAL_PRIORITY, this);
162*b8a71696SRene Gollent if (fPrivateData->fReplyReader < 0)
163*b8a71696SRene Gollent return fPrivateData->fReplyReader;
164*b8a71696SRene Gollent status_t error = fSocket.Connect(address, timeout);
165*b8a71696SRene Gollent if (error != B_OK) {
166*b8a71696SRene Gollent Unset();
167*b8a71696SRene Gollent return error;
168*b8a71696SRene Gollent }
169*b8a71696SRene Gollent
170*b8a71696SRene Gollent return fInitStatus = resume_thread(fPrivateData->fReplyReader);
171493cced1SRene Gollent }
172493cced1SRene Gollent
173493cced1SRene Gollent
174493cced1SRene Gollent status_t
SetTo(const BSocketMessenger & target,bigtime_t timeout)175493cced1SRene Gollent BSocketMessenger::SetTo(const BSocketMessenger& target, bigtime_t timeout)
176493cced1SRene Gollent {
177493cced1SRene Gollent return SetTo(target.Address(), timeout);
178493cced1SRene Gollent }
179493cced1SRene Gollent
180493cced1SRene Gollent
181493cced1SRene Gollent status_t
SendMessage(const BMessage & message)182493cced1SRene Gollent BSocketMessenger::SendMessage(const BMessage& message)
183493cced1SRene Gollent {
184493cced1SRene Gollent return _SendMessage(message);
185493cced1SRene Gollent }
186493cced1SRene Gollent
187493cced1SRene Gollent
188493cced1SRene Gollent status_t
SendMessage(const BMessage & message,BMessage & _reply,bigtime_t timeout)189493cced1SRene Gollent BSocketMessenger::SendMessage(const BMessage& message, BMessage& _reply,
190493cced1SRene Gollent bigtime_t timeout)
191493cced1SRene Gollent {
192*b8a71696SRene Gollent int64 replyID = atomic_add64(&fPrivateData->fReplyIDCounter, 1);
193*b8a71696SRene Gollent BMessage temp(message);
194*b8a71696SRene Gollent temp.AddInt64(kReplySenderIDField, replyID);
195*b8a71696SRene Gollent status_t error = _SendMessage(temp);
196493cced1SRene Gollent if (error != B_OK)
197493cced1SRene Gollent return error;
198493cced1SRene Gollent
199*b8a71696SRene Gollent return _ReadReply(replyID, _reply, timeout);
200493cced1SRene Gollent }
201493cced1SRene Gollent
202493cced1SRene Gollent
203493cced1SRene Gollent status_t
SendMessage(const BMessage & message,BMessenger & replyTarget,bigtime_t timeout)204493cced1SRene Gollent BSocketMessenger::SendMessage(const BMessage& message,
205493cced1SRene Gollent BMessenger& replyTarget, bigtime_t timeout)
206493cced1SRene Gollent {
207493cced1SRene Gollent BMessage reply;
208493cced1SRene Gollent status_t error = SendMessage(message, reply, timeout);
209493cced1SRene Gollent if (error != B_OK)
210493cced1SRene Gollent return error;
211493cced1SRene Gollent
212493cced1SRene Gollent return replyTarget.SendMessage(&reply);
213493cced1SRene Gollent }
214493cced1SRene Gollent
215493cced1SRene Gollent
216493cced1SRene Gollent status_t
SendReply(const BMessage & message,const BMessage & reply)217*b8a71696SRene Gollent BSocketMessenger::SendReply(const BMessage& message, const BMessage& reply)
218*b8a71696SRene Gollent {
219*b8a71696SRene Gollent int64 replyID;
220*b8a71696SRene Gollent if (message.FindInt64(kReplySenderIDField, &replyID) != B_OK)
221*b8a71696SRene Gollent return B_NOT_ALLOWED;
222*b8a71696SRene Gollent
223*b8a71696SRene Gollent BMessage replyMessage(reply);
224*b8a71696SRene Gollent replyMessage.AddInt64(kReplyReceiverIDField, replyID);
225*b8a71696SRene Gollent return SendMessage(replyMessage);
226*b8a71696SRene Gollent }
227*b8a71696SRene Gollent
228*b8a71696SRene Gollent
229*b8a71696SRene Gollent status_t
ReceiveMessage(BMessage & _message,bigtime_t timeout)230493cced1SRene Gollent BSocketMessenger::ReceiveMessage(BMessage& _message, bigtime_t timeout)
231493cced1SRene Gollent {
232*b8a71696SRene Gollent status_t error = B_OK;
233*b8a71696SRene Gollent AutoLocker<BMessageQueue> queueLocker(fPrivateData->fReceivedMessages);
234*b8a71696SRene Gollent for (;;) {
235*b8a71696SRene Gollent if (!fPrivateData->fReceivedMessages.IsEmpty()) {
236*b8a71696SRene Gollent BMessage* nextMessage
237*b8a71696SRene Gollent = fPrivateData->fReceivedMessages.NextMessage();
238*b8a71696SRene Gollent _message = *nextMessage;
239*b8a71696SRene Gollent delete nextMessage;
240*b8a71696SRene Gollent break;
241*b8a71696SRene Gollent }
242*b8a71696SRene Gollent
243*b8a71696SRene Gollent queueLocker.Unlock();
244*b8a71696SRene Gollent error = _WaitForMessage(timeout);
245*b8a71696SRene Gollent if (error != B_OK)
246*b8a71696SRene Gollent break;
247*b8a71696SRene Gollent if (!fSocket.IsConnected()) {
248*b8a71696SRene Gollent error = B_CANCELED;
249*b8a71696SRene Gollent break;
250*b8a71696SRene Gollent }
251*b8a71696SRene Gollent queueLocker.Lock();
252*b8a71696SRene Gollent }
253*b8a71696SRene Gollent
254*b8a71696SRene Gollent return error;
255*b8a71696SRene Gollent }
256*b8a71696SRene Gollent
257*b8a71696SRene Gollent
258*b8a71696SRene Gollent void
_Init()259*b8a71696SRene Gollent BSocketMessenger::_Init()
260*b8a71696SRene Gollent {
261*b8a71696SRene Gollent if (fPrivateData != NULL)
262*b8a71696SRene Gollent return;
263*b8a71696SRene Gollent
264*b8a71696SRene Gollent BSocketMessenger::Private* data
265*b8a71696SRene Gollent = new(std::nothrow) BSocketMessenger::Private;
266*b8a71696SRene Gollent
267*b8a71696SRene Gollent if (data == NULL) {
268*b8a71696SRene Gollent fInitStatus = B_NO_MEMORY;
269*b8a71696SRene Gollent return;
270*b8a71696SRene Gollent }
271*b8a71696SRene Gollent
272*b8a71696SRene Gollent data->fMessageWaiters = create_sem(0, "message waiters");
273*b8a71696SRene Gollent if (data->fMessageWaiters < 0) {
274*b8a71696SRene Gollent fInitStatus = data->fMessageWaiters;
275*b8a71696SRene Gollent delete data;
276*b8a71696SRene Gollent return;
277*b8a71696SRene Gollent }
278*b8a71696SRene Gollent
279*b8a71696SRene Gollent fPrivateData = data;
280*b8a71696SRene Gollent }
281*b8a71696SRene Gollent
282*b8a71696SRene Gollent
283*b8a71696SRene Gollent status_t
_WaitForMessage(bigtime_t timeout)284*b8a71696SRene Gollent BSocketMessenger::_WaitForMessage(bigtime_t timeout)
285*b8a71696SRene Gollent {
286*b8a71696SRene Gollent for (;;) {
287*b8a71696SRene Gollent status_t error = acquire_sem_etc(fPrivateData->fMessageWaiters, 1,
288*b8a71696SRene Gollent B_RELATIVE_TIMEOUT, timeout);
289*b8a71696SRene Gollent if (error == B_INTERRUPTED) {
290*b8a71696SRene Gollent if (timeout != B_INFINITE_TIMEOUT)
291*b8a71696SRene Gollent timeout -= system_time();
292*b8a71696SRene Gollent continue;
293*b8a71696SRene Gollent }
294*b8a71696SRene Gollent if (error != B_OK)
295*b8a71696SRene Gollent return error;
296*b8a71696SRene Gollent break;
297*b8a71696SRene Gollent }
298*b8a71696SRene Gollent
299*b8a71696SRene Gollent return B_OK;
300493cced1SRene Gollent }
301493cced1SRene Gollent
302493cced1SRene Gollent
303493cced1SRene Gollent status_t
_SendMessage(const BMessage & message)304493cced1SRene Gollent BSocketMessenger::_SendMessage(const BMessage& message)
305493cced1SRene Gollent {
306493cced1SRene Gollent ssize_t flatSize = message.FlattenedSize();
3076d4e35f6SRene Gollent ssize_t totalSize = flatSize + sizeof(ssize_t);
308493cced1SRene Gollent
3096d4e35f6SRene Gollent char* buffer = new(std::nothrow) char[totalSize];
310493cced1SRene Gollent if (buffer == NULL)
311493cced1SRene Gollent return B_NO_MEMORY;
312493cced1SRene Gollent
313493cced1SRene Gollent ArrayDeleter<char> bufferDeleter(buffer);
314493cced1SRene Gollent *(ssize_t*)buffer = flatSize;
315493cced1SRene Gollent char* messageBuffer = buffer + sizeof(ssize_t);
316493cced1SRene Gollent status_t error = message.Flatten(messageBuffer, flatSize);
317493cced1SRene Gollent if (error != B_OK)
318493cced1SRene Gollent return error;
319493cced1SRene Gollent
3206d4e35f6SRene Gollent ssize_t size = fSocket.Write(buffer, totalSize);
321493cced1SRene Gollent if (size < 0)
322493cced1SRene Gollent return size;
323493cced1SRene Gollent
324493cced1SRene Gollent return B_OK;
325493cced1SRene Gollent }
326493cced1SRene Gollent
327493cced1SRene Gollent
328493cced1SRene Gollent status_t
_ReadMessage(BMessage & _message)329*b8a71696SRene Gollent BSocketMessenger::_ReadMessage(BMessage& _message)
330493cced1SRene Gollent {
331*b8a71696SRene Gollent status_t error = fSocket.WaitForReadable(B_INFINITE_TIMEOUT);
332493cced1SRene Gollent if (error != B_OK)
333493cced1SRene Gollent return error;
334493cced1SRene Gollent
335493cced1SRene Gollent ssize_t size = 0;
336493cced1SRene Gollent ssize_t readSize = fSocket.Read(&size, sizeof(ssize_t));
337493cced1SRene Gollent if (readSize < 0)
338493cced1SRene Gollent return readSize;
3396dd98243SRene Gollent
3406dd98243SRene Gollent if (readSize != sizeof(ssize_t))
341493cced1SRene Gollent return B_BAD_VALUE;
342493cced1SRene Gollent
343493cced1SRene Gollent if (size <= 0)
344493cced1SRene Gollent return B_MISMATCHED_VALUES;
345493cced1SRene Gollent
346493cced1SRene Gollent char* buffer = new(std::nothrow) char[size];
347493cced1SRene Gollent if (buffer == NULL)
348493cced1SRene Gollent return B_NO_MEMORY;
349493cced1SRene Gollent
350493cced1SRene Gollent ArrayDeleter<char> bufferDeleter(buffer);
351493cced1SRene Gollent readSize = fSocket.Read(buffer, size);
352493cced1SRene Gollent if (readSize < 0)
353493cced1SRene Gollent return readSize;
3546dd98243SRene Gollent
3556dd98243SRene Gollent if (readSize != size)
356493cced1SRene Gollent return B_MISMATCHED_VALUES;
357493cced1SRene Gollent
358493cced1SRene Gollent return _message.Unflatten(buffer);
359493cced1SRene Gollent }
360*b8a71696SRene Gollent
361*b8a71696SRene Gollent
362*b8a71696SRene Gollent status_t
_ReadReply(const int64 replyID,BMessage & reply,bigtime_t timeout)363*b8a71696SRene Gollent BSocketMessenger::_ReadReply(const int64 replyID, BMessage& reply,
364*b8a71696SRene Gollent bigtime_t timeout)
365*b8a71696SRene Gollent {
366*b8a71696SRene Gollent status_t error = B_OK;
367*b8a71696SRene Gollent for (;;) {
368*b8a71696SRene Gollent if (fPrivateData->fReceivedReplies.ContainsKey(replyID)) {
369*b8a71696SRene Gollent reply = fPrivateData->fReceivedReplies.Remove(replyID);
370*b8a71696SRene Gollent break;
371*b8a71696SRene Gollent }
372*b8a71696SRene Gollent
373*b8a71696SRene Gollent error = _WaitForMessage(timeout);
374*b8a71696SRene Gollent if (error != B_OK)
375*b8a71696SRene Gollent break;
376*b8a71696SRene Gollent if (!fSocket.IsConnected()) {
377*b8a71696SRene Gollent error = B_CANCELED;
378*b8a71696SRene Gollent break;
379*b8a71696SRene Gollent }
380*b8a71696SRene Gollent }
381*b8a71696SRene Gollent
382*b8a71696SRene Gollent return error;
383*b8a71696SRene Gollent }
384*b8a71696SRene Gollent
385*b8a71696SRene Gollent
386*b8a71696SRene Gollent status_t
_MessageReader(void * arg)387*b8a71696SRene Gollent BSocketMessenger::_MessageReader(void* arg)
388*b8a71696SRene Gollent {
389*b8a71696SRene Gollent BSocketMessenger* messenger = (BSocketMessenger*)arg;
390*b8a71696SRene Gollent BSocketMessenger::Private* data = messenger->fPrivateData;
391*b8a71696SRene Gollent status_t error = B_OK;
392*b8a71696SRene Gollent
393*b8a71696SRene Gollent for (;;) {
394*b8a71696SRene Gollent BMessage message;
395*b8a71696SRene Gollent error = messenger->_ReadMessage(message);
396*b8a71696SRene Gollent if (error != B_OK)
397*b8a71696SRene Gollent break;
398*b8a71696SRene Gollent
399*b8a71696SRene Gollent int64 replyID;
400*b8a71696SRene Gollent if (message.FindInt64(kReplyReceiverIDField, &replyID) == B_OK) {
401*b8a71696SRene Gollent error = data->fReceivedReplies.Put(replyID, message);
402*b8a71696SRene Gollent if (error != B_OK)
403*b8a71696SRene Gollent break;
404*b8a71696SRene Gollent } else {
405*b8a71696SRene Gollent BMessage* queueMessage = new(std::nothrow) BMessage(message);
406*b8a71696SRene Gollent if (queueMessage == NULL) {
407*b8a71696SRene Gollent error = B_NO_MEMORY;
408*b8a71696SRene Gollent break;
409*b8a71696SRene Gollent }
410*b8a71696SRene Gollent
411*b8a71696SRene Gollent AutoLocker<BMessageQueue> queueLocker(
412*b8a71696SRene Gollent data->fReceivedMessages);
413*b8a71696SRene Gollent data->fReceivedMessages.AddMessage(queueMessage);
414*b8a71696SRene Gollent }
415*b8a71696SRene Gollent
416*b8a71696SRene Gollent
417*b8a71696SRene Gollent release_sem_etc(data->fMessageWaiters, 1, B_RELEASE_ALL);
418*b8a71696SRene Gollent }
419*b8a71696SRene Gollent
420*b8a71696SRene Gollent // if we exit our message loop, ensure everybody wakes up and knows
421*b8a71696SRene Gollent // we're no longer receiving messages.
422*b8a71696SRene Gollent messenger->fSocket.Disconnect();
423*b8a71696SRene Gollent release_sem_etc(data->fMessageWaiters, 1, B_RELEASE_ALL);
424*b8a71696SRene Gollent return error;
425*b8a71696SRene Gollent }
426