xref: /haiku/src/kits/network/libnetapi/SocketMessenger.cpp (revision a55deaea91d64802ed655d4ffcb41a3519338144)
1 /*
2  * Copyright 2009-2011, Axel Dörfler, axeld@pinc-software.de.
3  * Copyright 2016, Rene Gollent, rene@gollent.com.
4  */
5 
6 
7 #include <SocketMessenger.h>
8 
9 #include <Message.h>
10 #include <MessageQueue.h>
11 #include <Messenger.h>
12 
13 #include <AutoDeleter.h>
14 #include <AutoLocker.h>
15 #include <HashMap.h>
16 
17 
18 static const char* kReplySenderIDField = "socket_messenger:sender_reply_id";
19 static const char* kReplyReceiverIDField = "socket_messenger:reply_id";
20 
21 
22 // #pragma mark - BSocketMessenger::Private
23 
24 
25 struct BSocketMessenger::Private {
26 			typedef SynchronizedHashMap<HashKey64<int64>,
27 									BMessage> ReplyMessageMap;
28 
29 								Private();
30 	virtual						~Private();
31 
32 			void				ClearMessages();
33 
34 			sem_id				fMessageWaiters;
35 			thread_id			fReplyReader;
36 			ReplyMessageMap		fReceivedReplies;
37 			BMessageQueue		fReceivedMessages;
38 			int64				fReplyIDCounter;
39 };
40 
41 
42 BSocketMessenger::Private::Private()
43 	:
44 	fMessageWaiters(-1),
45 	fReplyReader(-1),
46 	fReceivedReplies(),
47 	fReceivedMessages(),
48 	fReplyIDCounter(0)
49 {
50 }
51 
52 
53 BSocketMessenger::Private::~Private()
54 {
55 	if (fMessageWaiters > 0)
56 		delete_sem(fMessageWaiters);
57 	if (fReplyReader > 0)
58 		wait_for_thread(fReplyReader, NULL);
59 
60 	ClearMessages();
61 }
62 
63 
64 void
65 BSocketMessenger::Private::ClearMessages()
66 {
67 	fReceivedReplies.Clear();
68 	AutoLocker<BMessageQueue> queueLocker(fReceivedMessages);
69 	while (!fReceivedMessages.IsEmpty())
70 		delete fReceivedMessages.NextMessage();
71 }
72 
73 
74 // #pragma mark - BSocketMessenger
75 
76 
77 BSocketMessenger::BSocketMessenger()
78 	:
79 	fPrivateData(NULL),
80 	fSocket(),
81 	fInitStatus(B_NO_INIT)
82 {
83 	_Init();
84 }
85 
86 
87 BSocketMessenger::BSocketMessenger(const BNetworkAddress& address,
88 	bigtime_t timeout)
89 	:
90 	fPrivateData(NULL),
91 	fSocket(),
92 	fInitStatus(B_NO_INIT)
93 {
94 	_Init();
95 	SetTo(address, timeout);
96 }
97 
98 
99 BSocketMessenger::BSocketMessenger(const BSocket& socket)
100 	:
101 	fPrivateData(NULL),
102 	fSocket(socket),
103 	fInitStatus(B_NO_INIT)
104 {
105 	_Init();
106 	if (fPrivateData == NULL)
107 		return;
108 
109 	fInitStatus = socket.InitCheck();
110 	if (fInitStatus != B_OK)
111 		return;
112 
113 	fPrivateData->fReplyReader = spawn_thread(&_MessageReader,
114 		"Message Reader", B_NORMAL_PRIORITY, this);
115 	if (fPrivateData->fReplyReader < 0)
116 		fInitStatus = fPrivateData->fReplyReader;
117 	if (fInitStatus != B_OK) {
118 		exit_thread(fPrivateData->fReplyReader);
119 		fPrivateData->fReplyReader = -1;
120 		return;
121 	}
122 
123 	fInitStatus = resume_thread(fPrivateData->fReplyReader);
124 }
125 
126 
127 BSocketMessenger::~BSocketMessenger()
128 {
129 	Unset();
130 
131 	delete fPrivateData;
132 }
133 
134 
135 void
136 BSocketMessenger::Unset()
137 {
138 	if (fPrivateData == NULL)
139 		return;
140 
141 	fSocket.Disconnect();
142 	wait_for_thread(fPrivateData->fReplyReader, NULL);
143 	fPrivateData->fReplyReader = -1;
144 	fPrivateData->ClearMessages();
145 
146 	release_sem_etc(fPrivateData->fMessageWaiters, 1, B_RELEASE_ALL);
147 
148 	fInitStatus = B_NO_INIT;
149 }
150 
151 
152 status_t
153 BSocketMessenger::SetTo(const BNetworkAddress& address, bigtime_t timeout)
154 {
155 	Unset();
156 
157 	if (fPrivateData == NULL)
158 		return B_NO_MEMORY;
159 
160 	fPrivateData->fReplyReader = spawn_thread(&_MessageReader,
161 		"Message Reader", B_NORMAL_PRIORITY, this);
162 	if (fPrivateData->fReplyReader < 0)
163 		return fPrivateData->fReplyReader;
164 	status_t error = fSocket.Connect(address, timeout);
165 	if (error != B_OK) {
166 		Unset();
167 		return error;
168 	}
169 
170 	return fInitStatus = resume_thread(fPrivateData->fReplyReader);
171 }
172 
173 
174 status_t
175 BSocketMessenger::SetTo(const BSocketMessenger& target, bigtime_t timeout)
176 {
177 	return SetTo(target.Address(), timeout);
178 }
179 
180 
181 status_t
182 BSocketMessenger::SendMessage(const BMessage& message)
183 {
184 	return _SendMessage(message);
185 }
186 
187 
188 status_t
189 BSocketMessenger::SendMessage(const BMessage& message, BMessage& _reply,
190 	bigtime_t timeout)
191 {
192 	int64 replyID = atomic_add64(&fPrivateData->fReplyIDCounter, 1);
193 	BMessage temp(message);
194 	temp.AddInt64(kReplySenderIDField, replyID);
195 	status_t error = _SendMessage(temp);
196 	if (error != B_OK)
197 		return error;
198 
199 	return _ReadReply(replyID, _reply, timeout);
200 }
201 
202 
203 status_t
204 BSocketMessenger::SendMessage(const BMessage& message,
205 	BMessenger& replyTarget, bigtime_t timeout)
206 {
207 	BMessage reply;
208 	status_t error = SendMessage(message, reply, timeout);
209 	if (error != B_OK)
210 		return error;
211 
212 	return replyTarget.SendMessage(&reply);
213 }
214 
215 
216 status_t
217 BSocketMessenger::SendReply(const BMessage& message, const BMessage& reply)
218 {
219 	int64 replyID;
220 	if (message.FindInt64(kReplySenderIDField, &replyID) != B_OK)
221 		return B_NOT_ALLOWED;
222 
223 	BMessage replyMessage(reply);
224 	replyMessage.AddInt64(kReplyReceiverIDField, replyID);
225 	return SendMessage(replyMessage);
226 }
227 
228 
229 status_t
230 BSocketMessenger::ReceiveMessage(BMessage& _message, bigtime_t timeout)
231 {
232 	status_t error = B_OK;
233 	AutoLocker<BMessageQueue> queueLocker(fPrivateData->fReceivedMessages);
234 	for (;;) {
235 		if (!fPrivateData->fReceivedMessages.IsEmpty()) {
236 			BMessage* nextMessage
237 				= fPrivateData->fReceivedMessages.NextMessage();
238 			_message = *nextMessage;
239 			delete nextMessage;
240 			break;
241 		}
242 
243 		queueLocker.Unlock();
244 		error = _WaitForMessage(timeout);
245 		if (error != B_OK)
246 			break;
247 		if (!fSocket.IsConnected()) {
248 			error = B_CANCELED;
249 			break;
250 		}
251 		queueLocker.Lock();
252 	}
253 
254 	return error;
255 }
256 
257 
258 void
259 BSocketMessenger::_Init()
260 {
261 	if (fPrivateData != NULL)
262 		return;
263 
264 	BSocketMessenger::Private* data
265 		= new(std::nothrow) BSocketMessenger::Private;
266 
267 	if (data == NULL) {
268 		fInitStatus = B_NO_MEMORY;
269 		return;
270 	}
271 
272 	data->fMessageWaiters = create_sem(0, "message waiters");
273 	if (data->fMessageWaiters < 0) {
274 		fInitStatus = data->fMessageWaiters;
275 		delete data;
276 		return;
277 	}
278 
279 	fPrivateData = data;
280 }
281 
282 
283 status_t
284 BSocketMessenger::_WaitForMessage(bigtime_t timeout)
285 {
286 	for (;;) {
287 		status_t error = acquire_sem_etc(fPrivateData->fMessageWaiters, 1,
288 			B_RELATIVE_TIMEOUT, timeout);
289 		if (error == B_INTERRUPTED) {
290 			if (timeout != B_INFINITE_TIMEOUT)
291 				timeout -= system_time();
292 			continue;
293 		}
294 		if (error != B_OK)
295 			return error;
296 		break;
297 	}
298 
299 	return B_OK;
300 }
301 
302 
303 status_t
304 BSocketMessenger::_SendMessage(const BMessage& message)
305 {
306 	ssize_t flatSize = message.FlattenedSize();
307 	ssize_t totalSize = flatSize + sizeof(ssize_t);
308 
309 	char* buffer = new(std::nothrow) char[totalSize];
310 	if (buffer == NULL)
311 		return B_NO_MEMORY;
312 
313 	ArrayDeleter<char> bufferDeleter(buffer);
314 	*(ssize_t*)buffer = flatSize;
315 	char* messageBuffer = buffer + sizeof(ssize_t);
316 	status_t error = message.Flatten(messageBuffer, flatSize);
317 	if (error != B_OK)
318 		return error;
319 
320 	ssize_t size = fSocket.Write(buffer, totalSize);
321 	if (size < 0)
322 		return size;
323 
324 	return B_OK;
325 }
326 
327 
328 status_t
329 BSocketMessenger::_ReadMessage(BMessage& _message)
330 {
331 	status_t error = fSocket.WaitForReadable(B_INFINITE_TIMEOUT);
332 	if (error != B_OK)
333 		return error;
334 
335 	ssize_t size = 0;
336 	ssize_t readSize = fSocket.Read(&size, sizeof(ssize_t));
337 	if (readSize < 0)
338 		return readSize;
339 
340 	if (readSize != sizeof(ssize_t))
341 		return B_BAD_VALUE;
342 
343 	if (size <= 0)
344 		return B_MISMATCHED_VALUES;
345 
346 	char* buffer = new(std::nothrow) char[size];
347 	if (buffer == NULL)
348 		return B_NO_MEMORY;
349 
350 	ArrayDeleter<char> bufferDeleter(buffer);
351 	readSize = fSocket.Read(buffer, size);
352 	if (readSize < 0)
353 		return readSize;
354 
355 	if (readSize != size)
356 		return B_MISMATCHED_VALUES;
357 
358 	return _message.Unflatten(buffer);
359 }
360 
361 
362 status_t
363 BSocketMessenger::_ReadReply(const int64 replyID, BMessage& reply,
364 	bigtime_t timeout)
365 {
366 	status_t error = B_OK;
367 	for (;;) {
368 		if (fPrivateData->fReceivedReplies.ContainsKey(replyID)) {
369 			reply = fPrivateData->fReceivedReplies.Remove(replyID);
370 			break;
371 		}
372 
373 		error = _WaitForMessage(timeout);
374 		if (error != B_OK)
375 			break;
376 		if (!fSocket.IsConnected()) {
377 			error = B_CANCELED;
378 			break;
379 		}
380 	}
381 
382 	return error;
383 }
384 
385 
386 status_t
387 BSocketMessenger::_MessageReader(void* arg)
388 {
389 	BSocketMessenger* messenger = (BSocketMessenger*)arg;
390 	BSocketMessenger::Private* data = messenger->fPrivateData;
391 	status_t error = B_OK;
392 
393 	for (;;) {
394 		BMessage message;
395 		error = messenger->_ReadMessage(message);
396 		if (error != B_OK)
397 			break;
398 
399 		int64 replyID;
400 		if (message.FindInt64(kReplyReceiverIDField, &replyID) == B_OK) {
401 			error = data->fReceivedReplies.Put(replyID, message);
402 			if (error != B_OK)
403 				break;
404 		} else {
405 			BMessage* queueMessage = new(std::nothrow) BMessage(message);
406 			if (queueMessage == NULL) {
407 				error = B_NO_MEMORY;
408 				break;
409 			}
410 
411 			AutoLocker<BMessageQueue> queueLocker(
412 				data->fReceivedMessages);
413 			data->fReceivedMessages.AddMessage(queueMessage);
414 		}
415 
416 
417 		release_sem_etc(data->fMessageWaiters, 1, B_RELEASE_ALL);
418 	}
419 
420 	// if we exit our message loop, ensure everybody wakes up and knows
421 	// we're no longer receiving messages.
422 	messenger->fSocket.Disconnect();
423 	release_sem_etc(data->fMessageWaiters, 1, B_RELEASE_ALL);
424 	return error;
425 }
426