1 /*
2 * Copyright 2009-2011, Axel Dörfler, axeld@pinc-software.de.
3 * Copyright 2016, Rene Gollent, rene@gollent.com.
4 */
5
6
7 #include <SocketMessenger.h>
8
9 #include <Message.h>
10 #include <MessageQueue.h>
11 #include <Messenger.h>
12
13 #include <AutoDeleter.h>
14 #include <AutoLocker.h>
15 #include <HashMap.h>
16
17
18 static const char* kReplySenderIDField = "socket_messenger:sender_reply_id";
19 static const char* kReplyReceiverIDField = "socket_messenger:reply_id";
20
21
22 // #pragma mark - BSocketMessenger::Private
23
24
25 struct BSocketMessenger::Private {
26 typedef SynchronizedHashMap<HashKey64<int64>,
27 BMessage> ReplyMessageMap;
28
29 Private();
30 virtual ~Private();
31
32 void ClearMessages();
33
34 sem_id fMessageWaiters;
35 thread_id fReplyReader;
36 ReplyMessageMap fReceivedReplies;
37 BMessageQueue fReceivedMessages;
38 int64 fReplyIDCounter;
39 };
40
41
Private()42 BSocketMessenger::Private::Private()
43 :
44 fMessageWaiters(-1),
45 fReplyReader(-1),
46 fReceivedReplies(),
47 fReceivedMessages(),
48 fReplyIDCounter(0)
49 {
50 }
51
52
~Private()53 BSocketMessenger::Private::~Private()
54 {
55 if (fMessageWaiters > 0)
56 delete_sem(fMessageWaiters);
57 if (fReplyReader > 0)
58 wait_for_thread(fReplyReader, NULL);
59
60 ClearMessages();
61 }
62
63
64 void
ClearMessages()65 BSocketMessenger::Private::ClearMessages()
66 {
67 fReceivedReplies.Clear();
68 AutoLocker<BMessageQueue> queueLocker(fReceivedMessages);
69 while (!fReceivedMessages.IsEmpty())
70 delete fReceivedMessages.NextMessage();
71 }
72
73
74 // #pragma mark - BSocketMessenger
75
76
BSocketMessenger()77 BSocketMessenger::BSocketMessenger()
78 :
79 fPrivateData(NULL),
80 fSocket(),
81 fInitStatus(B_NO_INIT)
82 {
83 _Init();
84 }
85
86
BSocketMessenger(const BNetworkAddress & address,bigtime_t timeout)87 BSocketMessenger::BSocketMessenger(const BNetworkAddress& address,
88 bigtime_t timeout)
89 :
90 fPrivateData(NULL),
91 fSocket(),
92 fInitStatus(B_NO_INIT)
93 {
94 _Init();
95 SetTo(address, timeout);
96 }
97
98
BSocketMessenger(const BSocket & socket)99 BSocketMessenger::BSocketMessenger(const BSocket& socket)
100 :
101 fPrivateData(NULL),
102 fSocket(socket),
103 fInitStatus(B_NO_INIT)
104 {
105 _Init();
106 if (fPrivateData == NULL)
107 return;
108
109 fInitStatus = socket.InitCheck();
110 if (fInitStatus != B_OK)
111 return;
112
113 fPrivateData->fReplyReader = spawn_thread(&_MessageReader,
114 "Message Reader", B_NORMAL_PRIORITY, this);
115 if (fPrivateData->fReplyReader < 0)
116 fInitStatus = fPrivateData->fReplyReader;
117 if (fInitStatus != B_OK) {
118 exit_thread(fPrivateData->fReplyReader);
119 fPrivateData->fReplyReader = -1;
120 return;
121 }
122
123 fInitStatus = resume_thread(fPrivateData->fReplyReader);
124 }
125
126
~BSocketMessenger()127 BSocketMessenger::~BSocketMessenger()
128 {
129 Unset();
130
131 delete fPrivateData;
132 }
133
134
135 void
Unset()136 BSocketMessenger::Unset()
137 {
138 if (fPrivateData == NULL)
139 return;
140
141 fSocket.Disconnect();
142 wait_for_thread(fPrivateData->fReplyReader, NULL);
143 fPrivateData->fReplyReader = -1;
144 fPrivateData->ClearMessages();
145
146 release_sem_etc(fPrivateData->fMessageWaiters, 1, B_RELEASE_ALL);
147
148 fInitStatus = B_NO_INIT;
149 }
150
151
152 status_t
SetTo(const BNetworkAddress & address,bigtime_t timeout)153 BSocketMessenger::SetTo(const BNetworkAddress& address, bigtime_t timeout)
154 {
155 Unset();
156
157 if (fPrivateData == NULL)
158 return B_NO_MEMORY;
159
160 fPrivateData->fReplyReader = spawn_thread(&_MessageReader,
161 "Message Reader", B_NORMAL_PRIORITY, this);
162 if (fPrivateData->fReplyReader < 0)
163 return fPrivateData->fReplyReader;
164 status_t error = fSocket.Connect(address, timeout);
165 if (error != B_OK) {
166 Unset();
167 return error;
168 }
169
170 return fInitStatus = resume_thread(fPrivateData->fReplyReader);
171 }
172
173
174 status_t
SetTo(const BSocketMessenger & target,bigtime_t timeout)175 BSocketMessenger::SetTo(const BSocketMessenger& target, bigtime_t timeout)
176 {
177 return SetTo(target.Address(), timeout);
178 }
179
180
181 status_t
SendMessage(const BMessage & message)182 BSocketMessenger::SendMessage(const BMessage& message)
183 {
184 return _SendMessage(message);
185 }
186
187
188 status_t
SendMessage(const BMessage & message,BMessage & _reply,bigtime_t timeout)189 BSocketMessenger::SendMessage(const BMessage& message, BMessage& _reply,
190 bigtime_t timeout)
191 {
192 int64 replyID = atomic_add64(&fPrivateData->fReplyIDCounter, 1);
193 BMessage temp(message);
194 temp.AddInt64(kReplySenderIDField, replyID);
195 status_t error = _SendMessage(temp);
196 if (error != B_OK)
197 return error;
198
199 return _ReadReply(replyID, _reply, timeout);
200 }
201
202
203 status_t
SendMessage(const BMessage & message,BMessenger & replyTarget,bigtime_t timeout)204 BSocketMessenger::SendMessage(const BMessage& message,
205 BMessenger& replyTarget, bigtime_t timeout)
206 {
207 BMessage reply;
208 status_t error = SendMessage(message, reply, timeout);
209 if (error != B_OK)
210 return error;
211
212 return replyTarget.SendMessage(&reply);
213 }
214
215
216 status_t
SendReply(const BMessage & message,const BMessage & reply)217 BSocketMessenger::SendReply(const BMessage& message, const BMessage& reply)
218 {
219 int64 replyID;
220 if (message.FindInt64(kReplySenderIDField, &replyID) != B_OK)
221 return B_NOT_ALLOWED;
222
223 BMessage replyMessage(reply);
224 replyMessage.AddInt64(kReplyReceiverIDField, replyID);
225 return SendMessage(replyMessage);
226 }
227
228
229 status_t
ReceiveMessage(BMessage & _message,bigtime_t timeout)230 BSocketMessenger::ReceiveMessage(BMessage& _message, bigtime_t timeout)
231 {
232 status_t error = B_OK;
233 AutoLocker<BMessageQueue> queueLocker(fPrivateData->fReceivedMessages);
234 for (;;) {
235 if (!fPrivateData->fReceivedMessages.IsEmpty()) {
236 BMessage* nextMessage
237 = fPrivateData->fReceivedMessages.NextMessage();
238 _message = *nextMessage;
239 delete nextMessage;
240 break;
241 }
242
243 queueLocker.Unlock();
244 error = _WaitForMessage(timeout);
245 if (error != B_OK)
246 break;
247 if (!fSocket.IsConnected()) {
248 error = B_CANCELED;
249 break;
250 }
251 queueLocker.Lock();
252 }
253
254 return error;
255 }
256
257
258 void
_Init()259 BSocketMessenger::_Init()
260 {
261 if (fPrivateData != NULL)
262 return;
263
264 BSocketMessenger::Private* data
265 = new(std::nothrow) BSocketMessenger::Private;
266
267 if (data == NULL) {
268 fInitStatus = B_NO_MEMORY;
269 return;
270 }
271
272 data->fMessageWaiters = create_sem(0, "message waiters");
273 if (data->fMessageWaiters < 0) {
274 fInitStatus = data->fMessageWaiters;
275 delete data;
276 return;
277 }
278
279 fPrivateData = data;
280 }
281
282
283 status_t
_WaitForMessage(bigtime_t timeout)284 BSocketMessenger::_WaitForMessage(bigtime_t timeout)
285 {
286 for (;;) {
287 status_t error = acquire_sem_etc(fPrivateData->fMessageWaiters, 1,
288 B_RELATIVE_TIMEOUT, timeout);
289 if (error == B_INTERRUPTED) {
290 if (timeout != B_INFINITE_TIMEOUT)
291 timeout -= system_time();
292 continue;
293 }
294 if (error != B_OK)
295 return error;
296 break;
297 }
298
299 return B_OK;
300 }
301
302
303 status_t
_SendMessage(const BMessage & message)304 BSocketMessenger::_SendMessage(const BMessage& message)
305 {
306 ssize_t flatSize = message.FlattenedSize();
307 ssize_t totalSize = flatSize + sizeof(ssize_t);
308
309 char* buffer = new(std::nothrow) char[totalSize];
310 if (buffer == NULL)
311 return B_NO_MEMORY;
312
313 ArrayDeleter<char> bufferDeleter(buffer);
314 *(ssize_t*)buffer = flatSize;
315 char* messageBuffer = buffer + sizeof(ssize_t);
316 status_t error = message.Flatten(messageBuffer, flatSize);
317 if (error != B_OK)
318 return error;
319
320 ssize_t size = fSocket.Write(buffer, totalSize);
321 if (size < 0)
322 return size;
323
324 return B_OK;
325 }
326
327
328 status_t
_ReadMessage(BMessage & _message)329 BSocketMessenger::_ReadMessage(BMessage& _message)
330 {
331 status_t error = fSocket.WaitForReadable(B_INFINITE_TIMEOUT);
332 if (error != B_OK)
333 return error;
334
335 ssize_t size = 0;
336 ssize_t readSize = fSocket.Read(&size, sizeof(ssize_t));
337 if (readSize < 0)
338 return readSize;
339
340 if (readSize != sizeof(ssize_t))
341 return B_BAD_VALUE;
342
343 if (size <= 0)
344 return B_MISMATCHED_VALUES;
345
346 char* buffer = new(std::nothrow) char[size];
347 if (buffer == NULL)
348 return B_NO_MEMORY;
349
350 ArrayDeleter<char> bufferDeleter(buffer);
351 readSize = fSocket.Read(buffer, size);
352 if (readSize < 0)
353 return readSize;
354
355 if (readSize != size)
356 return B_MISMATCHED_VALUES;
357
358 return _message.Unflatten(buffer);
359 }
360
361
362 status_t
_ReadReply(const int64 replyID,BMessage & reply,bigtime_t timeout)363 BSocketMessenger::_ReadReply(const int64 replyID, BMessage& reply,
364 bigtime_t timeout)
365 {
366 status_t error = B_OK;
367 for (;;) {
368 if (fPrivateData->fReceivedReplies.ContainsKey(replyID)) {
369 reply = fPrivateData->fReceivedReplies.Remove(replyID);
370 break;
371 }
372
373 error = _WaitForMessage(timeout);
374 if (error != B_OK)
375 break;
376 if (!fSocket.IsConnected()) {
377 error = B_CANCELED;
378 break;
379 }
380 }
381
382 return error;
383 }
384
385
386 status_t
_MessageReader(void * arg)387 BSocketMessenger::_MessageReader(void* arg)
388 {
389 BSocketMessenger* messenger = (BSocketMessenger*)arg;
390 BSocketMessenger::Private* data = messenger->fPrivateData;
391 status_t error = B_OK;
392
393 for (;;) {
394 BMessage message;
395 error = messenger->_ReadMessage(message);
396 if (error != B_OK)
397 break;
398
399 int64 replyID;
400 if (message.FindInt64(kReplyReceiverIDField, &replyID) == B_OK) {
401 error = data->fReceivedReplies.Put(replyID, message);
402 if (error != B_OK)
403 break;
404 } else {
405 BMessage* queueMessage = new(std::nothrow) BMessage(message);
406 if (queueMessage == NULL) {
407 error = B_NO_MEMORY;
408 break;
409 }
410
411 AutoLocker<BMessageQueue> queueLocker(
412 data->fReceivedMessages);
413 data->fReceivedMessages.AddMessage(queueMessage);
414 }
415
416
417 release_sem_etc(data->fMessageWaiters, 1, B_RELEASE_ALL);
418 }
419
420 // if we exit our message loop, ensure everybody wakes up and knows
421 // we're no longer receiving messages.
422 messenger->fSocket.Disconnect();
423 release_sem_etc(data->fMessageWaiters, 1, B_RELEASE_ALL);
424 return error;
425 }
426