xref: /haiku/src/kits/network/libnetapi/SocketMessenger.cpp (revision b8a716965a5647b9e991cfbaac4538446ac7f00e)
1493cced1SRene Gollent /*
2493cced1SRene Gollent  * Copyright 2009-2011, Axel Dörfler, axeld@pinc-software.de.
3493cced1SRene Gollent  * Copyright 2016, Rene Gollent, rene@gollent.com.
4493cced1SRene Gollent  */
5493cced1SRene Gollent 
6493cced1SRene Gollent 
7493cced1SRene Gollent #include <SocketMessenger.h>
8493cced1SRene Gollent 
9493cced1SRene Gollent #include <Message.h>
10*b8a71696SRene Gollent #include <MessageQueue.h>
11493cced1SRene Gollent #include <Messenger.h>
12493cced1SRene Gollent 
13493cced1SRene Gollent #include <AutoDeleter.h>
14*b8a71696SRene Gollent #include <AutoLocker.h>
15*b8a71696SRene Gollent #include <HashMap.h>
16*b8a71696SRene Gollent 
17*b8a71696SRene Gollent 
18*b8a71696SRene Gollent static const char* kReplySenderIDField = "socket_messenger:sender_reply_id";
19*b8a71696SRene Gollent static const char* kReplyReceiverIDField = "socket_messenger:reply_id";
20*b8a71696SRene Gollent 
21*b8a71696SRene Gollent 
22*b8a71696SRene Gollent // #pragma mark - BSocketMessenger::Private
23*b8a71696SRene Gollent 
24*b8a71696SRene Gollent 
25*b8a71696SRene Gollent struct BSocketMessenger::Private {
26*b8a71696SRene Gollent 			typedef SynchronizedHashMap<HashKey64<int64>,
27*b8a71696SRene Gollent 									BMessage> ReplyMessageMap;
28*b8a71696SRene Gollent 
29*b8a71696SRene Gollent 								Private();
30*b8a71696SRene Gollent 	virtual						~Private();
31*b8a71696SRene Gollent 
32*b8a71696SRene Gollent 			void				ClearMessages();
33*b8a71696SRene Gollent 
34*b8a71696SRene Gollent 			sem_id				fMessageWaiters;
35*b8a71696SRene Gollent 			thread_id			fReplyReader;
36*b8a71696SRene Gollent 			ReplyMessageMap		fReceivedReplies;
37*b8a71696SRene Gollent 			BMessageQueue		fReceivedMessages;
38*b8a71696SRene Gollent 			int64				fReplyIDCounter;
39*b8a71696SRene Gollent };
40*b8a71696SRene Gollent 
41*b8a71696SRene Gollent 
Private()42*b8a71696SRene Gollent BSocketMessenger::Private::Private()
43*b8a71696SRene Gollent 	:
44*b8a71696SRene Gollent 	fMessageWaiters(-1),
45*b8a71696SRene Gollent 	fReplyReader(-1),
46*b8a71696SRene Gollent 	fReceivedReplies(),
47*b8a71696SRene Gollent 	fReceivedMessages(),
48*b8a71696SRene Gollent 	fReplyIDCounter(0)
49*b8a71696SRene Gollent {
50*b8a71696SRene Gollent }
51*b8a71696SRene Gollent 
52*b8a71696SRene Gollent 
~Private()53*b8a71696SRene Gollent BSocketMessenger::Private::~Private()
54*b8a71696SRene Gollent {
55*b8a71696SRene Gollent 	if (fMessageWaiters > 0)
56*b8a71696SRene Gollent 		delete_sem(fMessageWaiters);
57*b8a71696SRene Gollent 	if (fReplyReader > 0)
58*b8a71696SRene Gollent 		wait_for_thread(fReplyReader, NULL);
59*b8a71696SRene Gollent 
60*b8a71696SRene Gollent 	ClearMessages();
61*b8a71696SRene Gollent }
62*b8a71696SRene Gollent 
63*b8a71696SRene Gollent 
64*b8a71696SRene Gollent void
ClearMessages()65*b8a71696SRene Gollent BSocketMessenger::Private::ClearMessages()
66*b8a71696SRene Gollent {
67*b8a71696SRene Gollent 	fReceivedReplies.Clear();
68*b8a71696SRene Gollent 	AutoLocker<BMessageQueue> queueLocker(fReceivedMessages);
69*b8a71696SRene Gollent 	while (!fReceivedMessages.IsEmpty())
70*b8a71696SRene Gollent 		delete fReceivedMessages.NextMessage();
71*b8a71696SRene Gollent }
72*b8a71696SRene Gollent 
73*b8a71696SRene Gollent 
74*b8a71696SRene Gollent // #pragma mark - BSocketMessenger
75493cced1SRene Gollent 
76493cced1SRene Gollent 
BSocketMessenger()77493cced1SRene Gollent BSocketMessenger::BSocketMessenger()
78493cced1SRene Gollent 	:
79*b8a71696SRene Gollent 	fPrivateData(NULL),
80493cced1SRene Gollent 	fSocket(),
81493cced1SRene Gollent 	fInitStatus(B_NO_INIT)
82493cced1SRene Gollent {
83*b8a71696SRene Gollent 	_Init();
84493cced1SRene Gollent }
85493cced1SRene Gollent 
86493cced1SRene Gollent 
BSocketMessenger(const BNetworkAddress & address,bigtime_t timeout)87493cced1SRene Gollent BSocketMessenger::BSocketMessenger(const BNetworkAddress& address,
88493cced1SRene Gollent 	bigtime_t timeout)
89*b8a71696SRene Gollent 	:
90*b8a71696SRene Gollent 	fPrivateData(NULL),
91*b8a71696SRene Gollent 	fSocket(),
92*b8a71696SRene Gollent 	fInitStatus(B_NO_INIT)
93493cced1SRene Gollent {
94*b8a71696SRene Gollent 	_Init();
95493cced1SRene Gollent 	SetTo(address, timeout);
96493cced1SRene Gollent }
97493cced1SRene Gollent 
98493cced1SRene Gollent 
BSocketMessenger(const BSocket & socket)99493cced1SRene Gollent BSocketMessenger::BSocketMessenger(const BSocket& socket)
100493cced1SRene Gollent 	:
101*b8a71696SRene Gollent 	fPrivateData(NULL),
102*b8a71696SRene Gollent 	fSocket(socket),
103*b8a71696SRene Gollent 	fInitStatus(B_NO_INIT)
104493cced1SRene Gollent {
105*b8a71696SRene Gollent 	_Init();
106*b8a71696SRene Gollent 	if (fPrivateData == NULL)
107*b8a71696SRene Gollent 		return;
108*b8a71696SRene Gollent 
109493cced1SRene Gollent 	fInitStatus = socket.InitCheck();
110*b8a71696SRene Gollent 	if (fInitStatus != B_OK)
111*b8a71696SRene Gollent 		return;
112*b8a71696SRene Gollent 
113*b8a71696SRene Gollent 	fPrivateData->fReplyReader = spawn_thread(&_MessageReader,
114*b8a71696SRene Gollent 		"Message Reader", B_NORMAL_PRIORITY, this);
115*b8a71696SRene Gollent 	if (fPrivateData->fReplyReader < 0)
116*b8a71696SRene Gollent 		fInitStatus = fPrivateData->fReplyReader;
117*b8a71696SRene Gollent 	if (fInitStatus != B_OK) {
118*b8a71696SRene Gollent 		exit_thread(fPrivateData->fReplyReader);
119*b8a71696SRene Gollent 		fPrivateData->fReplyReader = -1;
120*b8a71696SRene Gollent 		return;
121*b8a71696SRene Gollent 	}
122*b8a71696SRene Gollent 
123*b8a71696SRene Gollent 	fInitStatus = resume_thread(fPrivateData->fReplyReader);
124493cced1SRene Gollent }
125493cced1SRene Gollent 
126493cced1SRene Gollent 
~BSocketMessenger()127493cced1SRene Gollent BSocketMessenger::~BSocketMessenger()
128493cced1SRene Gollent {
129493cced1SRene Gollent 	Unset();
130*b8a71696SRene Gollent 
131*b8a71696SRene Gollent 	delete fPrivateData;
132493cced1SRene Gollent }
133493cced1SRene Gollent 
134493cced1SRene Gollent 
135493cced1SRene Gollent void
Unset()136493cced1SRene Gollent BSocketMessenger::Unset()
137493cced1SRene Gollent {
138*b8a71696SRene Gollent 	if (fPrivateData == NULL)
139*b8a71696SRene Gollent 		return;
140*b8a71696SRene Gollent 
141493cced1SRene Gollent 	fSocket.Disconnect();
142*b8a71696SRene Gollent 	wait_for_thread(fPrivateData->fReplyReader, NULL);
143*b8a71696SRene Gollent 	fPrivateData->fReplyReader = -1;
144*b8a71696SRene Gollent 	fPrivateData->ClearMessages();
145*b8a71696SRene Gollent 
146*b8a71696SRene Gollent 	release_sem_etc(fPrivateData->fMessageWaiters, 1, B_RELEASE_ALL);
147*b8a71696SRene Gollent 
148493cced1SRene Gollent 	fInitStatus = B_NO_INIT;
149493cced1SRene Gollent }
150493cced1SRene Gollent 
151493cced1SRene Gollent 
152493cced1SRene Gollent status_t
SetTo(const BNetworkAddress & address,bigtime_t timeout)153493cced1SRene Gollent BSocketMessenger::SetTo(const BNetworkAddress& address, bigtime_t timeout)
154493cced1SRene Gollent {
155*b8a71696SRene Gollent 	Unset();
156*b8a71696SRene Gollent 
157*b8a71696SRene Gollent 	if (fPrivateData == NULL)
158*b8a71696SRene Gollent 		return B_NO_MEMORY;
159*b8a71696SRene Gollent 
160*b8a71696SRene Gollent 	fPrivateData->fReplyReader = spawn_thread(&_MessageReader,
161*b8a71696SRene Gollent 		"Message Reader", B_NORMAL_PRIORITY, this);
162*b8a71696SRene Gollent 	if (fPrivateData->fReplyReader < 0)
163*b8a71696SRene Gollent 		return fPrivateData->fReplyReader;
164*b8a71696SRene Gollent 	status_t error = fSocket.Connect(address, timeout);
165*b8a71696SRene Gollent 	if (error != B_OK) {
166*b8a71696SRene Gollent 		Unset();
167*b8a71696SRene Gollent 		return error;
168*b8a71696SRene Gollent 	}
169*b8a71696SRene Gollent 
170*b8a71696SRene Gollent 	return fInitStatus = resume_thread(fPrivateData->fReplyReader);
171493cced1SRene Gollent }
172493cced1SRene Gollent 
173493cced1SRene Gollent 
174493cced1SRene Gollent status_t
SetTo(const BSocketMessenger & target,bigtime_t timeout)175493cced1SRene Gollent BSocketMessenger::SetTo(const BSocketMessenger& target, bigtime_t timeout)
176493cced1SRene Gollent {
177493cced1SRene Gollent 	return SetTo(target.Address(), timeout);
178493cced1SRene Gollent }
179493cced1SRene Gollent 
180493cced1SRene Gollent 
181493cced1SRene Gollent status_t
SendMessage(const BMessage & message)182493cced1SRene Gollent BSocketMessenger::SendMessage(const BMessage& message)
183493cced1SRene Gollent {
184493cced1SRene Gollent 	return _SendMessage(message);
185493cced1SRene Gollent }
186493cced1SRene Gollent 
187493cced1SRene Gollent 
188493cced1SRene Gollent status_t
SendMessage(const BMessage & message,BMessage & _reply,bigtime_t timeout)189493cced1SRene Gollent BSocketMessenger::SendMessage(const BMessage& message, BMessage& _reply,
190493cced1SRene Gollent 	bigtime_t timeout)
191493cced1SRene Gollent {
192*b8a71696SRene Gollent 	int64 replyID = atomic_add64(&fPrivateData->fReplyIDCounter, 1);
193*b8a71696SRene Gollent 	BMessage temp(message);
194*b8a71696SRene Gollent 	temp.AddInt64(kReplySenderIDField, replyID);
195*b8a71696SRene Gollent 	status_t error = _SendMessage(temp);
196493cced1SRene Gollent 	if (error != B_OK)
197493cced1SRene Gollent 		return error;
198493cced1SRene Gollent 
199*b8a71696SRene Gollent 	return _ReadReply(replyID, _reply, timeout);
200493cced1SRene Gollent }
201493cced1SRene Gollent 
202493cced1SRene Gollent 
203493cced1SRene Gollent status_t
SendMessage(const BMessage & message,BMessenger & replyTarget,bigtime_t timeout)204493cced1SRene Gollent BSocketMessenger::SendMessage(const BMessage& message,
205493cced1SRene Gollent 	BMessenger& replyTarget, bigtime_t timeout)
206493cced1SRene Gollent {
207493cced1SRene Gollent 	BMessage reply;
208493cced1SRene Gollent 	status_t error = SendMessage(message, reply, timeout);
209493cced1SRene Gollent 	if (error != B_OK)
210493cced1SRene Gollent 		return error;
211493cced1SRene Gollent 
212493cced1SRene Gollent 	return replyTarget.SendMessage(&reply);
213493cced1SRene Gollent }
214493cced1SRene Gollent 
215493cced1SRene Gollent 
216493cced1SRene Gollent status_t
SendReply(const BMessage & message,const BMessage & reply)217*b8a71696SRene Gollent BSocketMessenger::SendReply(const BMessage& message, const BMessage& reply)
218*b8a71696SRene Gollent {
219*b8a71696SRene Gollent 	int64 replyID;
220*b8a71696SRene Gollent 	if (message.FindInt64(kReplySenderIDField, &replyID) != B_OK)
221*b8a71696SRene Gollent 		return B_NOT_ALLOWED;
222*b8a71696SRene Gollent 
223*b8a71696SRene Gollent 	BMessage replyMessage(reply);
224*b8a71696SRene Gollent 	replyMessage.AddInt64(kReplyReceiverIDField, replyID);
225*b8a71696SRene Gollent 	return SendMessage(replyMessage);
226*b8a71696SRene Gollent }
227*b8a71696SRene Gollent 
228*b8a71696SRene Gollent 
229*b8a71696SRene Gollent status_t
ReceiveMessage(BMessage & _message,bigtime_t timeout)230493cced1SRene Gollent BSocketMessenger::ReceiveMessage(BMessage& _message, bigtime_t timeout)
231493cced1SRene Gollent {
232*b8a71696SRene Gollent 	status_t error = B_OK;
233*b8a71696SRene Gollent 	AutoLocker<BMessageQueue> queueLocker(fPrivateData->fReceivedMessages);
234*b8a71696SRene Gollent 	for (;;) {
235*b8a71696SRene Gollent 		if (!fPrivateData->fReceivedMessages.IsEmpty()) {
236*b8a71696SRene Gollent 			BMessage* nextMessage
237*b8a71696SRene Gollent 				= fPrivateData->fReceivedMessages.NextMessage();
238*b8a71696SRene Gollent 			_message = *nextMessage;
239*b8a71696SRene Gollent 			delete nextMessage;
240*b8a71696SRene Gollent 			break;
241*b8a71696SRene Gollent 		}
242*b8a71696SRene Gollent 
243*b8a71696SRene Gollent 		queueLocker.Unlock();
244*b8a71696SRene Gollent 		error = _WaitForMessage(timeout);
245*b8a71696SRene Gollent 		if (error != B_OK)
246*b8a71696SRene Gollent 			break;
247*b8a71696SRene Gollent 		if (!fSocket.IsConnected()) {
248*b8a71696SRene Gollent 			error = B_CANCELED;
249*b8a71696SRene Gollent 			break;
250*b8a71696SRene Gollent 		}
251*b8a71696SRene Gollent 		queueLocker.Lock();
252*b8a71696SRene Gollent 	}
253*b8a71696SRene Gollent 
254*b8a71696SRene Gollent 	return error;
255*b8a71696SRene Gollent }
256*b8a71696SRene Gollent 
257*b8a71696SRene Gollent 
258*b8a71696SRene Gollent void
_Init()259*b8a71696SRene Gollent BSocketMessenger::_Init()
260*b8a71696SRene Gollent {
261*b8a71696SRene Gollent 	if (fPrivateData != NULL)
262*b8a71696SRene Gollent 		return;
263*b8a71696SRene Gollent 
264*b8a71696SRene Gollent 	BSocketMessenger::Private* data
265*b8a71696SRene Gollent 		= new(std::nothrow) BSocketMessenger::Private;
266*b8a71696SRene Gollent 
267*b8a71696SRene Gollent 	if (data == NULL) {
268*b8a71696SRene Gollent 		fInitStatus = B_NO_MEMORY;
269*b8a71696SRene Gollent 		return;
270*b8a71696SRene Gollent 	}
271*b8a71696SRene Gollent 
272*b8a71696SRene Gollent 	data->fMessageWaiters = create_sem(0, "message waiters");
273*b8a71696SRene Gollent 	if (data->fMessageWaiters < 0) {
274*b8a71696SRene Gollent 		fInitStatus = data->fMessageWaiters;
275*b8a71696SRene Gollent 		delete data;
276*b8a71696SRene Gollent 		return;
277*b8a71696SRene Gollent 	}
278*b8a71696SRene Gollent 
279*b8a71696SRene Gollent 	fPrivateData = data;
280*b8a71696SRene Gollent }
281*b8a71696SRene Gollent 
282*b8a71696SRene Gollent 
283*b8a71696SRene Gollent status_t
_WaitForMessage(bigtime_t timeout)284*b8a71696SRene Gollent BSocketMessenger::_WaitForMessage(bigtime_t timeout)
285*b8a71696SRene Gollent {
286*b8a71696SRene Gollent 	for (;;) {
287*b8a71696SRene Gollent 		status_t error = acquire_sem_etc(fPrivateData->fMessageWaiters, 1,
288*b8a71696SRene Gollent 			B_RELATIVE_TIMEOUT, timeout);
289*b8a71696SRene Gollent 		if (error == B_INTERRUPTED) {
290*b8a71696SRene Gollent 			if (timeout != B_INFINITE_TIMEOUT)
291*b8a71696SRene Gollent 				timeout -= system_time();
292*b8a71696SRene Gollent 			continue;
293*b8a71696SRene Gollent 		}
294*b8a71696SRene Gollent 		if (error != B_OK)
295*b8a71696SRene Gollent 			return error;
296*b8a71696SRene Gollent 		break;
297*b8a71696SRene Gollent 	}
298*b8a71696SRene Gollent 
299*b8a71696SRene Gollent 	return B_OK;
300493cced1SRene Gollent }
301493cced1SRene Gollent 
302493cced1SRene Gollent 
303493cced1SRene Gollent status_t
_SendMessage(const BMessage & message)304493cced1SRene Gollent BSocketMessenger::_SendMessage(const BMessage& message)
305493cced1SRene Gollent {
306493cced1SRene Gollent 	ssize_t flatSize = message.FlattenedSize();
3076d4e35f6SRene Gollent 	ssize_t totalSize = flatSize + sizeof(ssize_t);
308493cced1SRene Gollent 
3096d4e35f6SRene Gollent 	char* buffer = new(std::nothrow) char[totalSize];
310493cced1SRene Gollent 	if (buffer == NULL)
311493cced1SRene Gollent 		return B_NO_MEMORY;
312493cced1SRene Gollent 
313493cced1SRene Gollent 	ArrayDeleter<char> bufferDeleter(buffer);
314493cced1SRene Gollent 	*(ssize_t*)buffer = flatSize;
315493cced1SRene Gollent 	char* messageBuffer = buffer + sizeof(ssize_t);
316493cced1SRene Gollent 	status_t error = message.Flatten(messageBuffer, flatSize);
317493cced1SRene Gollent 	if (error != B_OK)
318493cced1SRene Gollent 		return error;
319493cced1SRene Gollent 
3206d4e35f6SRene Gollent 	ssize_t size = fSocket.Write(buffer, totalSize);
321493cced1SRene Gollent 	if (size < 0)
322493cced1SRene Gollent 		return size;
323493cced1SRene Gollent 
324493cced1SRene Gollent 	return B_OK;
325493cced1SRene Gollent }
326493cced1SRene Gollent 
327493cced1SRene Gollent 
328493cced1SRene Gollent status_t
_ReadMessage(BMessage & _message)329*b8a71696SRene Gollent BSocketMessenger::_ReadMessage(BMessage& _message)
330493cced1SRene Gollent {
331*b8a71696SRene Gollent 	status_t error = fSocket.WaitForReadable(B_INFINITE_TIMEOUT);
332493cced1SRene Gollent 	if (error != B_OK)
333493cced1SRene Gollent 		return error;
334493cced1SRene Gollent 
335493cced1SRene Gollent 	ssize_t size = 0;
336493cced1SRene Gollent 	ssize_t readSize = fSocket.Read(&size, sizeof(ssize_t));
337493cced1SRene Gollent 	if (readSize < 0)
338493cced1SRene Gollent 		return readSize;
3396dd98243SRene Gollent 
3406dd98243SRene Gollent 	if (readSize != sizeof(ssize_t))
341493cced1SRene Gollent 		return B_BAD_VALUE;
342493cced1SRene Gollent 
343493cced1SRene Gollent 	if (size <= 0)
344493cced1SRene Gollent 		return B_MISMATCHED_VALUES;
345493cced1SRene Gollent 
346493cced1SRene Gollent 	char* buffer = new(std::nothrow) char[size];
347493cced1SRene Gollent 	if (buffer == NULL)
348493cced1SRene Gollent 		return B_NO_MEMORY;
349493cced1SRene Gollent 
350493cced1SRene Gollent 	ArrayDeleter<char> bufferDeleter(buffer);
351493cced1SRene Gollent 	readSize = fSocket.Read(buffer, size);
352493cced1SRene Gollent 	if (readSize < 0)
353493cced1SRene Gollent 		return readSize;
3546dd98243SRene Gollent 
3556dd98243SRene Gollent 	if (readSize != size)
356493cced1SRene Gollent 		return B_MISMATCHED_VALUES;
357493cced1SRene Gollent 
358493cced1SRene Gollent 	return _message.Unflatten(buffer);
359493cced1SRene Gollent }
360*b8a71696SRene Gollent 
361*b8a71696SRene Gollent 
362*b8a71696SRene Gollent status_t
_ReadReply(const int64 replyID,BMessage & reply,bigtime_t timeout)363*b8a71696SRene Gollent BSocketMessenger::_ReadReply(const int64 replyID, BMessage& reply,
364*b8a71696SRene Gollent 	bigtime_t timeout)
365*b8a71696SRene Gollent {
366*b8a71696SRene Gollent 	status_t error = B_OK;
367*b8a71696SRene Gollent 	for (;;) {
368*b8a71696SRene Gollent 		if (fPrivateData->fReceivedReplies.ContainsKey(replyID)) {
369*b8a71696SRene Gollent 			reply = fPrivateData->fReceivedReplies.Remove(replyID);
370*b8a71696SRene Gollent 			break;
371*b8a71696SRene Gollent 		}
372*b8a71696SRene Gollent 
373*b8a71696SRene Gollent 		error = _WaitForMessage(timeout);
374*b8a71696SRene Gollent 		if (error != B_OK)
375*b8a71696SRene Gollent 			break;
376*b8a71696SRene Gollent 		if (!fSocket.IsConnected()) {
377*b8a71696SRene Gollent 			error = B_CANCELED;
378*b8a71696SRene Gollent 			break;
379*b8a71696SRene Gollent 		}
380*b8a71696SRene Gollent 	}
381*b8a71696SRene Gollent 
382*b8a71696SRene Gollent 	return error;
383*b8a71696SRene Gollent }
384*b8a71696SRene Gollent 
385*b8a71696SRene Gollent 
386*b8a71696SRene Gollent status_t
_MessageReader(void * arg)387*b8a71696SRene Gollent BSocketMessenger::_MessageReader(void* arg)
388*b8a71696SRene Gollent {
389*b8a71696SRene Gollent 	BSocketMessenger* messenger = (BSocketMessenger*)arg;
390*b8a71696SRene Gollent 	BSocketMessenger::Private* data = messenger->fPrivateData;
391*b8a71696SRene Gollent 	status_t error = B_OK;
392*b8a71696SRene Gollent 
393*b8a71696SRene Gollent 	for (;;) {
394*b8a71696SRene Gollent 		BMessage message;
395*b8a71696SRene Gollent 		error = messenger->_ReadMessage(message);
396*b8a71696SRene Gollent 		if (error != B_OK)
397*b8a71696SRene Gollent 			break;
398*b8a71696SRene Gollent 
399*b8a71696SRene Gollent 		int64 replyID;
400*b8a71696SRene Gollent 		if (message.FindInt64(kReplyReceiverIDField, &replyID) == B_OK) {
401*b8a71696SRene Gollent 			error = data->fReceivedReplies.Put(replyID, message);
402*b8a71696SRene Gollent 			if (error != B_OK)
403*b8a71696SRene Gollent 				break;
404*b8a71696SRene Gollent 		} else {
405*b8a71696SRene Gollent 			BMessage* queueMessage = new(std::nothrow) BMessage(message);
406*b8a71696SRene Gollent 			if (queueMessage == NULL) {
407*b8a71696SRene Gollent 				error = B_NO_MEMORY;
408*b8a71696SRene Gollent 				break;
409*b8a71696SRene Gollent 			}
410*b8a71696SRene Gollent 
411*b8a71696SRene Gollent 			AutoLocker<BMessageQueue> queueLocker(
412*b8a71696SRene Gollent 				data->fReceivedMessages);
413*b8a71696SRene Gollent 			data->fReceivedMessages.AddMessage(queueMessage);
414*b8a71696SRene Gollent 		}
415*b8a71696SRene Gollent 
416*b8a71696SRene Gollent 
417*b8a71696SRene Gollent 		release_sem_etc(data->fMessageWaiters, 1, B_RELEASE_ALL);
418*b8a71696SRene Gollent 	}
419*b8a71696SRene Gollent 
420*b8a71696SRene Gollent 	// if we exit our message loop, ensure everybody wakes up and knows
421*b8a71696SRene Gollent 	// we're no longer receiving messages.
422*b8a71696SRene Gollent 	messenger->fSocket.Disconnect();
423*b8a71696SRene Gollent 	release_sem_etc(data->fMessageWaiters, 1, B_RELEASE_ALL);
424*b8a71696SRene Gollent 	return error;
425*b8a71696SRene Gollent }
426