xref: /haiku/src/system/kernel/posix/xsi_message_queue.cpp (revision e1c4049fed1047bdb957b0529e1921e97ef94770)
1 /*
2  * Copyright 2008-2023, Haiku, Inc. All rights reserved.
3  * Distributed under the terms of the MIT License.
4  *
5  * Authors:
6  *		Salvatore Benedetto <salvatore.benedetto@gmail.com>
7  */
8 
9 #include <posix/xsi_message_queue.h>
10 
11 #include <new>
12 
13 #include <sys/ipc.h>
14 #include <sys/types.h>
15 
16 #include <OS.h>
17 
18 #include <kernel.h>
19 #include <syscall_restart.h>
20 
21 #include <util/atomic.h>
22 #include <util/AutoLock.h>
23 #include <util/DoublyLinkedList.h>
24 #include <util/OpenHashTable.h>
25 
26 
27 #define TRACE_XSI_MSG_QUEUE
28 #ifdef TRACE_XSI_MSG_QUEUE
29 #	define TRACE(x)			dprintf x
30 #	define TRACE_ERROR(x)	dprintf x
31 #else
32 #	define TRACE(x)			/* nothing */
33 #	define TRACE_ERROR(x)	dprintf x
34 #endif
35 
36 
37 namespace {
38 
39 
40 struct queued_message : DoublyLinkedListLinkImpl<queued_message> {
41 	queued_message(const void *_message, ssize_t _length)
42 		:
43 		initOK(false),
44 		length(_length)
45 	{
46 		message = (char *)malloc(sizeof(char) * _length);
47 		if (message == NULL)
48 			return;
49 
50 		if (user_memcpy(&type, _message, sizeof(long)) != B_OK
51 			|| user_memcpy(message, (void *)((char *)_message + sizeof(long)),
52 			_length) != B_OK) {
53 			free(message);
54 			return;
55 		}
56 		initOK = true;
57 	}
58 
59 	~queued_message()
60 	{
61 		if (initOK)
62 			free(message);
63 	}
64 
65 	ssize_t copy_to_user_buffer(void *_message, ssize_t _length)
66 	{
67 		if (_length > length)
68 			_length = length;
69 
70 		if (user_memcpy(_message, &type, sizeof(long)) != B_OK
71 			|| user_memcpy((void *)((char *)_message + sizeof(long)), message,
72 			_length) != B_OK)
73 			return B_ERROR;
74 		return _length;
75 	}
76 
77 	bool		initOK;
78 	ssize_t		length;
79 	char		*message;
80 	long		type;
81 };
82 
83 typedef DoublyLinkedList<queued_message> MessageQueue;
84 
85 // Arbitrary limit
86 #define MAX_BYTES_PER_QUEUE		2048
87 
88 class XsiMessageQueue {
89 public:
90 	XsiMessageQueue(int flags)
91 		:
92 		fBytesInQueue(0)
93 	{
94 		mutex_init(&fLock, "XsiMessageQueue private mutex");
95 		fWaitingToReceive.Init(this, "XsiMessageQueue");
96 		fWaitingToSend.Init(this, "XsiMessageQueue");
97 
98 		SetIpcKey((key_t)-1);
99 		SetPermissions(flags);
100 		// Initialize all fields to zero
101 		memset((void *)&fMessageQueue, 0, sizeof(struct msqid_ds));
102 		fMessageQueue.msg_ctime = (time_t)real_time_clock();
103 		fMessageQueue.msg_qbytes = MAX_BYTES_PER_QUEUE;
104 	}
105 
106 	// Implemented after sXsiMessageCount is declared
107 	~XsiMessageQueue();
108 
109 	status_t BlockAndUnlock(ConditionVariableEntry *queueEntry, MutexLocker *queueLocker)
110 	{
111 		// Unlock the queue before blocking
112 		queueLocker->Unlock();
113 		return queueEntry->Wait(B_CAN_INTERRUPT);
114 	}
115 
116 	void DoIpcSet(struct msqid_ds *result)
117 	{
118 		fMessageQueue.msg_perm.uid = result->msg_perm.uid;
119 		fMessageQueue.msg_perm.gid = result->msg_perm.gid;
120 		fMessageQueue.msg_perm.mode = (fMessageQueue.msg_perm.mode & ~0x01ff)
121 			| (result->msg_perm.mode & 0x01ff);
122 		fMessageQueue.msg_qbytes = result->msg_qbytes;
123 		fMessageQueue.msg_ctime = (time_t)real_time_clock();
124 	}
125 
126 	void Dequeue(ConditionVariableEntry *queueEntry)
127 	{
128 		queueEntry->Wait(B_RELATIVE_TIMEOUT, 0);
129 	}
130 
131 	void Enqueue(ConditionVariableEntry *queueEntry, bool waitForMessage)
132 	{
133 		if (waitForMessage) {
134 			fWaitingToReceive.Add(queueEntry);
135 		} else {
136 			fWaitingToSend.Add(queueEntry);
137 		}
138 	}
139 
140 	struct msqid_ds &GetMessageQueue()
141 	{
142 		return fMessageQueue;
143 	}
144 
145 	bool HasPermission() const
146 	{
147 		if ((fMessageQueue.msg_perm.mode & S_IWOTH) != 0)
148 			return true;
149 
150 		uid_t uid = geteuid();
151 		if (uid == 0 || (uid == fMessageQueue.msg_perm.uid
152 			&& (fMessageQueue.msg_perm.mode & S_IWUSR) != 0))
153 			return true;
154 
155 		gid_t gid = getegid();
156 		if (gid == fMessageQueue.msg_perm.gid
157 			&& (fMessageQueue.msg_perm.mode & S_IWGRP) != 0)
158 			return true;
159 
160 		return false;
161 	}
162 
163 	bool HasReadPermission() const
164 	{
165 		// TODO: fix this
166 		return HasPermission();
167 	}
168 
169 	int ID() const
170 	{
171 		return fID;
172 	}
173 
174 	// Implemented after sXsiMessageCount is declared
175 	bool Insert(queued_message *message);
176 
177 	key_t IpcKey() const
178 	{
179 		return fMessageQueue.msg_perm.key;
180 	}
181 
182 	mutex &Lock()
183 	{
184 		return fLock;
185 	}
186 
187 	msglen_t MaxBytes() const
188 	{
189 		return fMessageQueue.msg_qbytes;
190 	}
191 
192 	// Implemented after sXsiMessageCount is declared
193 	queued_message *Remove(long typeRequested);
194 
195 	uint32 SequenceNumber() const
196 	{
197 		return fSequenceNumber;
198 	}
199 
200 	// Implemented after sMessageQueueHashTable is declared
201 	void SetID();
202 
203 	void SetIpcKey(key_t key)
204 	{
205 		fMessageQueue.msg_perm.key = key;
206 	}
207 
208 	void SetPermissions(int flags)
209 	{
210 		fMessageQueue.msg_perm.uid = fMessageQueue.msg_perm.cuid = geteuid();
211 		fMessageQueue.msg_perm.gid = fMessageQueue.msg_perm.cgid = getegid();
212 		fMessageQueue.msg_perm.mode = (flags & 0x01ff);
213 	}
214 
215 	void WakeUpThread(bool waitForMessage)
216 	{
217 		if (waitForMessage) {
218 			// Wake up all waiting thread for a message
219 			// TODO: this can cause starvation for any
220 			// very-unlucky-and-slow thread
221 			fWaitingToReceive.NotifyAll();
222 		} else {
223 			// Wake up only one thread waiting to send
224 			fWaitingToSend.NotifyOne();
225 		}
226 	}
227 
228 	XsiMessageQueue*& Link()
229 	{
230 		return fLink;
231 	}
232 
233 private:
234 	msglen_t			fBytesInQueue;
235 	int					fID;
236 	mutex				fLock;
237 	MessageQueue		fMessage;
238 	struct msqid_ds		fMessageQueue;
239 	uint32				fSequenceNumber;
240 
241 	ConditionVariable	fWaitingToReceive;
242 	ConditionVariable	fWaitingToSend;
243 
244 	XsiMessageQueue*	fLink;
245 };
246 
247 
248 // Xsi message queue hash table
249 struct MessageQueueHashTableDefinition {
250 	typedef int					KeyType;
251 	typedef XsiMessageQueue		ValueType;
252 
253 	size_t HashKey (const int key) const
254 	{
255 		return (size_t)key;
256 	}
257 
258 	size_t Hash(XsiMessageQueue *variable) const
259 	{
260 		return (size_t)variable->ID();
261 	}
262 
263 	bool Compare(const int key, XsiMessageQueue *variable) const
264 	{
265 		return (int)key == (int)variable->ID();
266 	}
267 
268 	XsiMessageQueue*& GetLink(XsiMessageQueue *variable) const
269 	{
270 		return variable->Link();
271 	}
272 };
273 
274 
275 // IPC class
276 class Ipc {
277 public:
278 	Ipc(key_t key)
279 		: fKey(key),
280 		fMessageQueueId(-1)
281 	{
282 	}
283 
284 	key_t Key() const
285 	{
286 		return fKey;
287 	}
288 
289 	int MessageQueueID() const
290 	{
291 		return fMessageQueueId;
292 	}
293 
294 	void SetMessageQueueID(XsiMessageQueue *messageQueue)
295 	{
296 		fMessageQueueId = messageQueue->ID();
297 	}
298 
299 	Ipc*& Link()
300 	{
301 		return fLink;
302 	}
303 
304 private:
305 	key_t				fKey;
306 	int					fMessageQueueId;
307 	Ipc*				fLink;
308 };
309 
310 
311 struct IpcHashTableDefinition {
312 	typedef key_t	KeyType;
313 	typedef Ipc		ValueType;
314 
315 	size_t HashKey (const key_t key) const
316 	{
317 		return (size_t)(key);
318 	}
319 
320 	size_t Hash(Ipc *variable) const
321 	{
322 		return (size_t)HashKey(variable->Key());
323 	}
324 
325 	bool Compare(const key_t key, Ipc *variable) const
326 	{
327 		return (key_t)key == (key_t)variable->Key();
328 	}
329 
330 	Ipc*& GetLink(Ipc *variable) const
331 	{
332 		return variable->Link();
333 	}
334 };
335 
336 } // namespace
337 
338 
339 // Arbitrary limits
340 #define MAX_XSI_MESSAGE			4096
341 #define MAX_XSI_MESSAGE_QUEUE	1024
342 static BOpenHashTable<IpcHashTableDefinition> sIpcHashTable;
343 static BOpenHashTable<MessageQueueHashTableDefinition> sMessageQueueHashTable;
344 
345 static mutex sIpcLock;
346 static mutex sXsiMessageQueueLock;
347 
348 static uint32 sGlobalSequenceNumber = 1;
349 static int32 sXsiMessageCount = 0;
350 static int32 sXsiMessageQueueCount = 0;
351 
352 
353 //	#pragma mark -
354 
355 
356 XsiMessageQueue::~XsiMessageQueue()
357 {
358 	mutex_destroy(&fLock);
359 
360 	// Wake up any threads still waiting
361 	fWaitingToReceive.NotifyAll(EIDRM);
362 	fWaitingToSend.NotifyAll(EIDRM);
363 
364 	// Free up any remaining messages
365 	if (fMessageQueue.msg_qnum) {
366 		while (queued_message *message = fMessage.RemoveHead()) {
367 			atomic_add(&sXsiMessageCount, -1);
368 			delete message;
369 		}
370 	}
371 }
372 
373 
374 bool
375 XsiMessageQueue::Insert(queued_message *message)
376 {
377 	// The only situation that would make us (potentially) wait
378 	// is that we exceed with bytes or with the total number of messages
379 	if (fBytesInQueue + message->length > fMessageQueue.msg_qbytes)
380 		return true;
381 
382 	while (true) {
383 		int32 oldCount = atomic_get(&sXsiMessageCount);
384 		if (oldCount >= MAX_XSI_MESSAGE)
385 			return true;
386 		// If another thread updates the counter we keep
387 		// iterating
388 		if (atomic_test_and_set(&sXsiMessageCount, oldCount + 1, oldCount)
389 			== oldCount)
390 			break;
391 	}
392 
393 	fMessage.Add(message);
394 	fMessageQueue.msg_qnum++;
395 	fMessageQueue.msg_lspid = getpid();
396 	fMessageQueue.msg_stime = real_time_clock();
397 	fBytesInQueue += message->length;
398 
399 	WakeUpThread(true /* WaitForMessage */);
400 	return false;
401 }
402 
403 
404 queued_message*
405 XsiMessageQueue::Remove(long typeRequested)
406 {
407 	queued_message *message = NULL;
408 	if (typeRequested < 0) {
409 		// Return first message of the lowest type
410 		// that is less than or equal to the absolute
411 		// value of type requested.
412 		MessageQueue::Iterator iterator = fMessage.GetIterator();
413 		while (iterator.HasNext()) {
414 			queued_message *current = iterator.Next();
415 			if (current->type <= -typeRequested) {
416 				message = iterator.Remove();
417 				break;
418 			}
419 		}
420 	} else if (typeRequested == 0) {
421 		// Return the first message on the queue
422 		message = fMessage.RemoveHead();
423 	} else {
424 		// Return the first message of type requested
425 		MessageQueue::Iterator iterator = fMessage.GetIterator();
426 		while (iterator.HasNext()) {
427 			queued_message *current = iterator.Next();
428 			if (current->type == typeRequested) {
429 				message = iterator.Remove();
430 				break;
431 			}
432 		}
433 	}
434 
435 	if (message == NULL)
436 		return NULL;
437 
438 	fMessageQueue.msg_qnum--;
439 	fMessageQueue.msg_lrpid = getpid();
440 	fMessageQueue.msg_rtime = real_time_clock();
441 	fBytesInQueue -= message->length;
442 	atomic_add(&sXsiMessageCount, -1);
443 
444 	WakeUpThread(false /* WaitForMessage */);
445 	return message;
446 }
447 
448 
449 void
450 XsiMessageQueue::SetID()
451 {
452 	fID = real_time_clock();
453 	// The lock is held before calling us
454 	while (true) {
455 		if (sMessageQueueHashTable.Lookup(fID) == NULL)
456 			break;
457 		fID++;
458 	}
459 	sGlobalSequenceNumber = (sGlobalSequenceNumber + 1) % UINT_MAX;
460 	fSequenceNumber = sGlobalSequenceNumber;
461 }
462 
463 
464 //	#pragma mark - Kernel exported API
465 
466 
467 void
468 xsi_msg_init()
469 {
470 	// Initialize hash tables
471 	status_t status = sIpcHashTable.Init();
472 	if (status != B_OK)
473 		panic("xsi_msg_init() failed to initialize ipc hash table\n");
474 	status =  sMessageQueueHashTable.Init();
475 	if (status != B_OK)
476 		panic("xsi_msg_init() failed to initialize message queue hash table\n");
477 
478 	mutex_init(&sIpcLock, "global POSIX message queue IPC table");
479 	mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table");
480 }
481 
482 
483 //	#pragma mark - Syscalls
484 
485 
486 int
487 _user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer)
488 {
489 	TRACE(("xsi_msgctl: messageQueueID = %d, command = %d\n", messageQueueID, command));
490 	MutexLocker ipcHashLocker(sIpcLock);
491 	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
492 	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
493 	if (messageQueue == NULL) {
494 		TRACE(("xsi_msgctl: message queue id %d not valid\n", messageQueueID));
495 		return EINVAL;
496 	}
497 	if (buffer != NULL && !IS_USER_ADDRESS(buffer)) {
498 		TRACE(("xsi_msgctl: buffer address is not valid\n"));
499 		return B_BAD_ADDRESS;
500 	}
501 
502 	// Lock the message queue itself and release both the ipc hash table lock
503 	// and the message queue hash table lock _only_ if the command it's not
504 	// IPC_RMID, this prevents undesidered situation from happening while
505 	// (hopefully) improving the concurrency.
506 	MutexLocker messageQueueLocker;
507 	if (command != IPC_RMID) {
508 		messageQueueLocker.SetTo(&messageQueue->Lock(), false);
509 		messageQueueHashLocker.Unlock();
510 		ipcHashLocker.Unlock();
511 	} else
512 		// Since we are going to delete the message queue object
513 		// along with its mutex, we can't use a MutexLocker object,
514 		// as the mutex itself won't exist on function exit
515 		mutex_lock(&messageQueue->Lock());
516 
517 	switch (command) {
518 		case IPC_STAT: {
519 			if (!messageQueue->HasReadPermission()) {
520 				TRACE(("xsi_msgctl: calling process has not read "
521 					"permission on message queue %d, key %d\n", messageQueueID,
522 					(int)messageQueue->IpcKey()));
523 				return EACCES;
524 			}
525 			struct msqid_ds msg = messageQueue->GetMessageQueue();
526 			if (user_memcpy(buffer, &msg, sizeof(struct msqid_ds)) < B_OK) {
527 				TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
528 				return B_BAD_ADDRESS;
529 			}
530 			break;
531 		}
532 
533 		case IPC_SET: {
534 			if (!messageQueue->HasPermission()) {
535 				TRACE(("xsi_msgctl: calling process has not permission "
536 					"on message queue %d, key %d\n", messageQueueID,
537 					(int)messageQueue->IpcKey()));
538 				return EPERM;
539 			}
540 			struct msqid_ds msg;
541 			if (user_memcpy(&msg, buffer, sizeof(struct msqid_ds)) < B_OK) {
542 				TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
543 				return B_BAD_ADDRESS;
544 			}
545 			if (msg.msg_qbytes > messageQueue->MaxBytes() && getuid() != 0) {
546 				TRACE(("xsi_msgctl: user does not have permission to "
547 					"increase the maximum number of bytes allowed on queue\n"));
548 				return EPERM;
549 			}
550 			if (msg.msg_qbytes == 0) {
551 				TRACE(("xsi_msgctl: can't set msg_qbytes to 0!\n"));
552 				return EINVAL;
553 			}
554 
555 			messageQueue->DoIpcSet(&msg);
556 			break;
557 		}
558 
559 		case IPC_RMID: {
560 			// If this was the command, we are still holding the message
561 			// queue hash table lock along with the ipc one, but not the
562 			// message queue lock itself. This prevents other process
563 			// to try and acquire a destroyed mutex
564 			if (!messageQueue->HasPermission()) {
565 				TRACE(("xsi_msgctl: calling process has not permission "
566 					"on message queue %d, key %d\n", messageQueueID,
567 					(int)messageQueue->IpcKey()));
568 				return EPERM;
569 			}
570 			key_t key = messageQueue->IpcKey();
571 			Ipc *ipcKey = NULL;
572 			if (key != -1) {
573 				ipcKey = sIpcHashTable.Lookup(key);
574 				sIpcHashTable.Remove(ipcKey);
575 			}
576 			sMessageQueueHashTable.Remove(messageQueue);
577 			// Wake up of any threads waiting on this
578 			// queue happens in destructor
579 			if (key != -1)
580 				delete ipcKey;
581 			atomic_add(&sXsiMessageQueueCount, -1);
582 
583 			delete messageQueue;
584 			break;
585 		}
586 
587 		default:
588 			TRACE_ERROR(("xsi_semctl: command %d not valid\n", command));
589 			return EINVAL;
590 	}
591 
592 	return B_OK;
593 }
594 
595 
596 int
597 _user_xsi_msgget(key_t key, int flags)
598 {
599 	TRACE(("xsi_msgget: key = %d, flags = %d\n", (int)key, flags));
600 	XsiMessageQueue *messageQueue = NULL;
601 	Ipc *ipcKey = NULL;
602 	// Default assumptions
603 	bool isPrivate = true;
604 	bool create = true;
605 
606 	if (key != IPC_PRIVATE) {
607 		isPrivate = false;
608 		// Check if key already exist, if it does it already has a message
609 		// queue associated with it
610 		ipcKey = sIpcHashTable.Lookup(key);
611 		if (ipcKey == NULL || ipcKey->MessageQueueID() == -1) {
612 			if (!(flags & IPC_CREAT)) {
613 				TRACE(("xsi_msgget: key %d does not exist, but the "
614 					"caller did not ask for creation\n", (int)key));
615 				return ENOENT;
616 			}
617 			if (ipcKey == NULL) {
618 				ipcKey = new(std::nothrow) Ipc(key);
619 				if (ipcKey == NULL) {
620 					TRACE(("xsi_msgget: failed to create new Ipc object "
621 						"for key %d\n", (int)key));
622 					return ENOMEM;
623 				}
624 				sIpcHashTable.Insert(ipcKey);
625 			}
626 		} else {
627 			// The IPC key exist and it already has a message queue
628 			if ((flags & IPC_CREAT) && (flags & IPC_EXCL)) {
629 				TRACE_ERROR(("xsi_msgget: key %d already exist\n", (int)key));
630 				return EEXIST;
631 			}
632 			int messageQueueID = ipcKey->MessageQueueID();
633 
634 			MutexLocker _(sXsiMessageQueueLock);
635 			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
636 			if (!messageQueue->HasPermission()) {
637 				TRACE(("xsi_msgget: calling process has not permission "
638 					"on message queue %d, key %d\n", messageQueue->ID(),
639 					(int)key));
640 				return EACCES;
641 			}
642 			create = false;
643 		}
644 	}
645 
646 	if (create) {
647 		// Create a new message queue for this key
648 		if (atomic_get(&sXsiMessageQueueCount) >= MAX_XSI_MESSAGE_QUEUE) {
649 			TRACE_ERROR(("xsi_msgget: reached limit of maximun number of "
650 				"message queues\n"));
651 			return ENOSPC;
652 		}
653 
654 		messageQueue = new(std::nothrow) XsiMessageQueue(flags);
655 		if (messageQueue == NULL) {
656 			TRACE_ERROR(("xsi_msgget: failed to allocate new xsi "
657 				"message queue\n"));
658 			return ENOMEM;
659 		}
660 		atomic_add(&sXsiMessageQueueCount, 1);
661 
662 		MutexLocker _(sXsiMessageQueueLock);
663 		messageQueue->SetID();
664 		if (isPrivate)
665 			messageQueue->SetIpcKey((key_t)-1);
666 		else {
667 			messageQueue->SetIpcKey(key);
668 			ipcKey->SetMessageQueueID(messageQueue);
669 		}
670 		sMessageQueueHashTable.Insert(messageQueue);
671 	}
672 
673 	return messageQueue->ID();
674 }
675 
676 
677 ssize_t
678 _user_xsi_msgrcv(int messageQueueID, void *messagePointer,
679 	size_t messageSize, long messageType, int messageFlags)
680 {
681 	TRACE(("xsi_msgrcv: messageQueueID = %d, messageSize = %ld\n",
682 		messageQueueID, messageSize));
683 	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
684 	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
685 	if (messageQueue == NULL) {
686 		TRACE(("xsi_msgrcv: message queue id %d not valid\n",
687 			messageQueueID));
688 		return EINVAL;
689 	}
690 	MutexLocker messageQueueLocker(messageQueue->Lock());
691 	messageQueueHashLocker.Unlock();
692 
693 	if (messageSize > MAX_BYTES_PER_QUEUE) {
694 		TRACE_ERROR(("xsi_msgrcv: message size is out of range\n"));
695 		return EINVAL;
696 	}
697 	if (!messageQueue->HasPermission()) {
698 		TRACE(("xsi_msgrcv: calling process has not permission "
699 			"on message queue id %d, key %d\n", messageQueueID,
700 			(int)messageQueue->IpcKey()));
701 		return EACCES;
702 	}
703 	if (!IS_USER_ADDRESS(messagePointer)) {
704 		TRACE(("xsi_msgrcv: message address is not valid\n"));
705 		return B_BAD_ADDRESS;
706 	}
707 
708 	queued_message *message = NULL;
709 	while (true) {
710 		message = messageQueue->Remove(messageType);
711 
712 		if (message == NULL && !(messageFlags & IPC_NOWAIT)) {
713 			// We are going to sleep
714 			ConditionVariableEntry queueEntry;
715 			messageQueue->Enqueue(&queueEntry, /* waitForMessage */ true);
716 
717 			uint32 sequenceNumber = messageQueue->SequenceNumber();
718 
719 			TRACE(("xsi_msgrcv: thread %d going to sleep\n", (int)thread_get_current_thread_id()));
720 			status_t result
721 				= messageQueue->BlockAndUnlock(&queueEntry, &messageQueueLocker);
722 			TRACE(("xsi_msgrcv: thread %d back to life\n", (int)thread_get_current_thread_id()));
723 
724 			messageQueueHashLocker.Lock();
725 			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
726 			if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
727 				&& sequenceNumber != messageQueue->SequenceNumber())) {
728 				TRACE(("xsi_msgrcv: message queue id %d (sequence = "
729 					"%" B_PRIu32 ") got destroyed\n", messageQueueID,
730 					sequenceNumber));
731 				return EIDRM;
732 			} else if (result == B_INTERRUPTED) {
733 				TRACE(("xsi_msgrcv: thread %d got interrupted while "
734 					"waiting on message queue %d\n", (int)thread_get_current_thread_id(),
735 					messageQueueID));
736 				messageQueue->Dequeue(&queueEntry);
737 				return EINTR;
738 			} else {
739 				messageQueueLocker.Lock();
740 				messageQueueHashLocker.Unlock();
741 			}
742 		} else if (message == NULL) {
743 			// There is not message of type requested and
744 			// we can't wait
745 			return ENOMSG;
746 		} else {
747 			// Message received correctly (so far)
748 			if ((ssize_t)messageSize < message->length
749 				&& !(messageFlags & MSG_NOERROR)) {
750 				TRACE_ERROR(("xsi_msgrcv: message too big!\n"));
751 				// Put the message back inside. Since we hold the
752 				// queue message lock, not one else could have filled
753 				// up the queue meanwhile
754 				messageQueue->Insert(message);
755 				return E2BIG;
756 			}
757 
758 			ssize_t result
759 				= message->copy_to_user_buffer(messagePointer, messageSize);
760 			if (result < 0) {
761 				messageQueue->Insert(message);
762 				return B_BAD_ADDRESS;
763 			}
764 
765 			delete message;
766 			TRACE(("xsi_msgrcv: message received correctly\n"));
767 			return result;
768 		}
769 	}
770 
771 	return B_OK;
772 }
773 
774 
775 int
776 _user_xsi_msgsnd(int messageQueueID, const void *messagePointer,
777 	size_t messageSize, int messageFlags)
778 {
779 	TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %ld\n",
780 		messageQueueID, messageSize));
781 	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
782 	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
783 	if (messageQueue == NULL) {
784 		TRACE(("xsi_msgsnd: message queue id %d not valid\n",
785 			messageQueueID));
786 		return EINVAL;
787 	}
788 	MutexLocker messageQueueLocker(messageQueue->Lock());
789 	messageQueueHashLocker.Unlock();
790 
791 	if (messageSize > MAX_BYTES_PER_QUEUE) {
792 		TRACE_ERROR(("xsi_msgsnd: message size is out of range\n"));
793 		return EINVAL;
794 	}
795 	if (!messageQueue->HasPermission()) {
796 		TRACE(("xsi_msgsnd: calling process has not permission "
797 			"on message queue id %d, key %d\n", messageQueueID,
798 			(int)messageQueue->IpcKey()));
799 		return EACCES;
800 	}
801 	if (!IS_USER_ADDRESS(messagePointer)) {
802 		TRACE(("xsi_msgsnd: message address is not valid\n"));
803 		return B_BAD_ADDRESS;
804 	}
805 
806 	queued_message *message
807 		= new(std::nothrow) queued_message(messagePointer, messageSize);
808 	if (message == NULL || message->initOK != true) {
809 		TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n"));
810 		delete message;
811 		return ENOMEM;
812 	}
813 
814 	bool notSent = true;
815 	status_t result = B_OK;
816 	while (notSent) {
817 		bool goToSleep = messageQueue->Insert(message);
818 
819 		if (goToSleep && !(messageFlags & IPC_NOWAIT)) {
820 			// We are going to sleep
821 			ConditionVariableEntry queueEntry;
822 			messageQueue->Enqueue(&queueEntry, /* waitForMessage */ false);
823 
824 			uint32 sequenceNumber = messageQueue->SequenceNumber();
825 
826 			TRACE(("xsi_msgsnd: thread %d going to sleep\n", (int)thread_get_current_thread_id()));
827 			result = messageQueue->BlockAndUnlock(&queueEntry, &messageQueueLocker);
828 			TRACE(("xsi_msgsnd: thread %d back to life\n", (int)thread_get_current_thread_id()));
829 
830 			messageQueueHashLocker.Lock();
831 			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
832 			if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
833 				&& sequenceNumber != messageQueue->SequenceNumber())) {
834 				TRACE(("xsi_msgsnd: message queue id %d (sequence = "
835 					"%" B_PRIu32 ") got destroyed\n", messageQueueID,
836 					sequenceNumber));
837 				delete message;
838 				notSent = false;
839 				result = EIDRM;
840 			} else if (result == B_INTERRUPTED) {
841 				TRACE(("xsi_msgsnd: thread %d got interrupted while "
842 					"waiting on message queue %d\n", (int)thread_get_current_thread_id(),
843 					messageQueueID));
844 				messageQueue->Dequeue(&queueEntry);
845 				delete message;
846 				notSent = false;
847 				result = EINTR;
848 			} else {
849 				messageQueueLocker.Lock();
850 				messageQueueHashLocker.Unlock();
851 			}
852 		} else if (goToSleep) {
853 			// We did not send the message and we can't wait
854 			delete message;
855 			notSent = false;
856 			result = EAGAIN;
857 		} else {
858 			// Message delivered correctly
859 			TRACE(("xsi_msgsnd: message sent correctly\n"));
860 			notSent = false;
861 		}
862 	}
863 
864 	return result;
865 }
866