xref: /haiku/src/system/kernel/posix/xsi_message_queue.cpp (revision 68ea01249e1e2088933cb12f9c28d4e5c5d1c9ef)
1 /*
2  * Copyright 2008-2011, Haiku, Inc. All rights reserved.
3  * Distributed under the terms of the MIT License.
4  *
5  * Authors:
6  *		Salvatore Benedetto <salvatore.benedetto@gmail.com>
7  */
8 
9 #include <posix/xsi_message_queue.h>
10 
11 #include <new>
12 
13 #include <sys/ipc.h>
14 #include <sys/types.h>
15 
16 #include <OS.h>
17 
18 #include <kernel.h>
19 #include <syscall_restart.h>
20 
21 #include <util/atomic.h>
22 #include <util/AutoLock.h>
23 #include <util/DoublyLinkedList.h>
24 #include <util/OpenHashTable.h>
25 
26 
27 #define TRACE_XSI_MSG_QUEUE
28 #ifdef TRACE_XSI_MSG_QUEUE
29 #	define TRACE(x)			dprintf x
30 #	define TRACE_ERROR(x)	dprintf x
31 #else
32 #	define TRACE(x)			/* nothing */
33 #	define TRACE_ERROR(x)	dprintf x
34 #endif
35 
36 
37 namespace {
38 
39 // Queue for holding blocked threads
40 struct queued_thread : DoublyLinkedListLinkImpl<queued_thread> {
41 	queued_thread(Thread *_thread, int32 _message_length)
42 		:
43 		thread(_thread),
44 		message_length(_message_length),
45 		queued(false)
46 	{
47 	}
48 
49 	Thread	*thread;
50 	int32	message_length;
51 	bool	queued;
52 };
53 
54 typedef DoublyLinkedList<queued_thread> ThreadQueue;
55 
56 
57 struct queued_message : DoublyLinkedListLinkImpl<queued_message> {
58 	queued_message(const void *_message, ssize_t _length)
59 		:
60 		initOK(false),
61 		length(_length)
62 	{
63 		message = (char *)malloc(sizeof(char) * _length);
64 		if (message == NULL)
65 			return;
66 
67 		if (user_memcpy(&type, _message, sizeof(long)) != B_OK
68 			|| user_memcpy(message, (void *)((char *)_message + sizeof(long)),
69 			_length) != B_OK) {
70 			free(message);
71 			return;
72 		}
73 		initOK = true;
74 	}
75 
76 	~queued_message()
77 	{
78 		if (initOK)
79 			free(message);
80 	}
81 
82 	ssize_t copy_to_user_buffer(void *_message, ssize_t _length)
83 	{
84 		if (_length > length)
85 			_length = length;
86 
87 		if (user_memcpy(_message, &type, sizeof(long)) != B_OK
88 			|| user_memcpy((void *)((char *)_message + sizeof(long)), message,
89 			_length) != B_OK)
90 			return B_ERROR;
91 		return _length;
92 	}
93 
94 	bool		initOK;
95 	ssize_t		length;
96 	char		*message;
97 	long		type;
98 };
99 
100 typedef DoublyLinkedList<queued_message> MessageQueue;
101 
102 // Arbitrary limit
103 #define MAX_BYTES_PER_QUEUE		2048
104 
105 class XsiMessageQueue {
106 public:
107 	XsiMessageQueue(int flags)
108 		:
109 		fBytesInQueue(0),
110 		fThreadsWaitingToReceive(0),
111 		fThreadsWaitingToSend(0)
112 	{
113 		mutex_init(&fLock, "XsiMessageQueue private mutex");
114 		SetIpcKey((key_t)-1);
115 		SetPermissions(flags);
116 		// Initialize all fields to zero
117 		memset((void *)&fMessageQueue, 0, sizeof(struct msqid_ds));
118 		fMessageQueue.msg_ctime = (time_t)real_time_clock();
119 		fMessageQueue.msg_qbytes = MAX_BYTES_PER_QUEUE;
120 	}
121 
122 	// Implemented after sXsiMessageCount is declared
123 	~XsiMessageQueue();
124 
125 	status_t BlockAndUnlock(Thread *thread, MutexLocker *queueLocker)
126 	{
127 		thread_prepare_to_block(thread, B_CAN_INTERRUPT,
128 				THREAD_BLOCK_TYPE_OTHER, (void*)"xsi message queue");
129 		// Unlock the queue before blocking
130 		queueLocker->Unlock();
131 
132 // TODO: We've got a serious race condition: If BlockAndUnlock() returned due to
133 // interruption, we will still be queued. A WakeUpThread() at this point will
134 // call thread_unblock() and might thus screw with our trying to re-lock the
135 // mutex.
136 		return thread_block();
137 	}
138 
139 	void DoIpcSet(struct msqid_ds *result)
140 	{
141 		fMessageQueue.msg_perm.uid = result->msg_perm.uid;
142 		fMessageQueue.msg_perm.gid = result->msg_perm.gid;
143 		fMessageQueue.msg_perm.mode = (fMessageQueue.msg_perm.mode & ~0x01ff)
144 			| (result->msg_perm.mode & 0x01ff);
145 		fMessageQueue.msg_qbytes = result->msg_qbytes;
146 		fMessageQueue.msg_ctime = (time_t)real_time_clock();
147 	}
148 
149 	void Deque(queued_thread *queueEntry, bool waitForMessage)
150 	{
151 		if (queueEntry->queued) {
152 			if (waitForMessage) {
153 				fWaitingToReceive.Remove(queueEntry);
154 				fThreadsWaitingToReceive--;
155 			} else {
156 				fWaitingToSend.Remove(queueEntry);
157 				fThreadsWaitingToSend--;
158 			}
159 		}
160 	}
161 
162 	void Enqueue(queued_thread *queueEntry, bool waitForMessage)
163 	{
164 		if (waitForMessage) {
165 			fWaitingToReceive.Add(queueEntry);
166 			fThreadsWaitingToReceive++;
167 		} else {
168 			fWaitingToSend.Add(queueEntry);
169 			fThreadsWaitingToSend++;
170 		}
171 		queueEntry->queued = true;
172 	}
173 
174 	struct msqid_ds &GetMessageQueue()
175 	{
176 		return fMessageQueue;
177 	}
178 
179 	bool HasPermission() const
180 	{
181 		if ((fMessageQueue.msg_perm.mode & S_IWOTH) != 0)
182 			return true;
183 
184 		uid_t uid = geteuid();
185 		if (uid == 0 || (uid == fMessageQueue.msg_perm.uid
186 			&& (fMessageQueue.msg_perm.mode & S_IWUSR) != 0))
187 			return true;
188 
189 		gid_t gid = getegid();
190 		if (gid == fMessageQueue.msg_perm.gid
191 			&& (fMessageQueue.msg_perm.mode & S_IWGRP) != 0)
192 			return true;
193 
194 		return false;
195 	}
196 
197 	bool HasReadPermission() const
198 	{
199 		// TODO: fix this
200 		return HasPermission();
201 	}
202 
203 	int ID() const
204 	{
205 		return fID;
206 	}
207 
208 	// Implemented after sXsiMessageCount is declared
209 	bool Insert(queued_message *message);
210 
211 	key_t IpcKey() const
212 	{
213 		return fMessageQueue.msg_perm.key;
214 	}
215 
216 	mutex &Lock()
217 	{
218 		return fLock;
219 	}
220 
221 	msglen_t MaxBytes() const
222 	{
223 		return fMessageQueue.msg_qbytes;
224 	}
225 
226 	// Implemented after sXsiMessageCount is declared
227 	queued_message *Remove(long typeRequested);
228 
229 	uint32 SequenceNumber() const
230 	{
231 		return fSequenceNumber;
232 	}
233 
234 	// Implemented after sMessageQueueHashTable is declared
235 	void SetID();
236 
237 	void SetIpcKey(key_t key)
238 	{
239 		fMessageQueue.msg_perm.key = key;
240 	}
241 
242 	void SetPermissions(int flags)
243 	{
244 		fMessageQueue.msg_perm.uid = fMessageQueue.msg_perm.cuid = geteuid();
245 		fMessageQueue.msg_perm.gid = fMessageQueue.msg_perm.cgid = getegid();
246 		fMessageQueue.msg_perm.mode = (flags & 0x01ff);
247 	}
248 
249 	void WakeUpThread(bool waitForMessage)
250 	{
251 		if (waitForMessage) {
252 			// Wake up all waiting thread for a message
253 			// TODO: this can cause starvation for any
254 			// very-unlucky-and-slow thread
255 			while (queued_thread *entry = fWaitingToReceive.RemoveHead()) {
256 				entry->queued = false;
257 				fThreadsWaitingToReceive--;
258 				thread_unblock(entry->thread, 0);
259 			}
260 		} else {
261 			// Wake up only one thread waiting to send
262 			if (queued_thread *entry = fWaitingToSend.RemoveHead()) {
263 				entry->queued = false;
264 				fThreadsWaitingToSend--;
265 				thread_unblock(entry->thread, 0);
266 			}
267 		}
268 	}
269 
270 	XsiMessageQueue*& Link()
271 	{
272 		return fLink;
273 	}
274 
275 private:
276 	msglen_t			fBytesInQueue;
277 	int					fID;
278 	mutex				fLock;
279 	MessageQueue		fMessage;
280 	struct msqid_ds		fMessageQueue;
281 	uint32				fSequenceNumber;
282 	uint32				fThreadsWaitingToReceive;
283 	uint32				fThreadsWaitingToSend;
284 
285 	ThreadQueue			fWaitingToReceive;
286 	ThreadQueue			fWaitingToSend;
287 
288 	XsiMessageQueue*	fLink;
289 };
290 
291 
292 // Xsi message queue hash table
293 struct MessageQueueHashTableDefinition {
294 	typedef int					KeyType;
295 	typedef XsiMessageQueue		ValueType;
296 
297 	size_t HashKey (const int key) const
298 	{
299 		return (size_t)key;
300 	}
301 
302 	size_t Hash(XsiMessageQueue *variable) const
303 	{
304 		return (size_t)variable->ID();
305 	}
306 
307 	bool Compare(const int key, XsiMessageQueue *variable) const
308 	{
309 		return (int)key == (int)variable->ID();
310 	}
311 
312 	XsiMessageQueue*& GetLink(XsiMessageQueue *variable) const
313 	{
314 		return variable->Link();
315 	}
316 };
317 
318 
319 // IPC class
320 class Ipc {
321 public:
322 	Ipc(key_t key)
323 		: fKey(key),
324 		fMessageQueueId(-1)
325 	{
326 	}
327 
328 	key_t Key() const
329 	{
330 		return fKey;
331 	}
332 
333 	int MessageQueueID() const
334 	{
335 		return fMessageQueueId;
336 	}
337 
338 	void SetMessageQueueID(XsiMessageQueue *messageQueue)
339 	{
340 		fMessageQueueId = messageQueue->ID();
341 	}
342 
343 	Ipc*& Link()
344 	{
345 		return fLink;
346 	}
347 
348 private:
349 	key_t				fKey;
350 	int					fMessageQueueId;
351 	Ipc*				fLink;
352 };
353 
354 
355 struct IpcHashTableDefinition {
356 	typedef key_t	KeyType;
357 	typedef Ipc		ValueType;
358 
359 	size_t HashKey (const key_t key) const
360 	{
361 		return (size_t)(key);
362 	}
363 
364 	size_t Hash(Ipc *variable) const
365 	{
366 		return (size_t)HashKey(variable->Key());
367 	}
368 
369 	bool Compare(const key_t key, Ipc *variable) const
370 	{
371 		return (key_t)key == (key_t)variable->Key();
372 	}
373 
374 	Ipc*& GetLink(Ipc *variable) const
375 	{
376 		return variable->Link();
377 	}
378 };
379 
380 } // namespace
381 
382 
383 // Arbitrary limits
384 #define MAX_XSI_MESSAGE			4096
385 #define MAX_XSI_MESSAGE_QUEUE	1024
386 static BOpenHashTable<IpcHashTableDefinition> sIpcHashTable;
387 static BOpenHashTable<MessageQueueHashTableDefinition> sMessageQueueHashTable;
388 
389 static mutex sIpcLock;
390 static mutex sXsiMessageQueueLock;
391 
392 static uint32 sGlobalSequenceNumber = 1;
393 static int32 sXsiMessageCount = 0;
394 static int32 sXsiMessageQueueCount = 0;
395 
396 
397 //	#pragma mark -
398 
399 
400 XsiMessageQueue::~XsiMessageQueue()
401 {
402 	mutex_destroy(&fLock);
403 
404 	// Wake up any threads still waiting
405 	if (fThreadsWaitingToSend || fThreadsWaitingToReceive) {
406 		while (queued_thread *entry = fWaitingToReceive.RemoveHead()) {
407 			entry->queued = false;
408 			thread_unblock(entry->thread, EIDRM);
409 		}
410 		while (queued_thread *entry = fWaitingToSend.RemoveHead()) {
411 			entry->queued = false;
412 			thread_unblock(entry->thread, EIDRM);
413 		}
414 	}
415 
416 	// Free up any remaining messages
417 	if (fMessageQueue.msg_qnum) {
418 		while (queued_message *message = fMessage.RemoveHead()) {
419 			atomic_add(&sXsiMessageCount, -1);
420 			delete message;
421 		}
422 	}
423 }
424 
425 
426 bool
427 XsiMessageQueue::Insert(queued_message *message)
428 {
429 	// The only situation that would make us (potentially) wait
430 	// is that we exceed with bytes or with the total number of messages
431 	if (fBytesInQueue + message->length > fMessageQueue.msg_qbytes)
432 		return true;
433 
434 	while (true) {
435 		int32 oldCount = atomic_get(&sXsiMessageCount);
436 		if (oldCount >= MAX_XSI_MESSAGE)
437 			return true;
438 		// If another thread updates the counter we keep
439 		// iterating
440 		if (atomic_test_and_set(&sXsiMessageCount, oldCount + 1, oldCount)
441 			== oldCount)
442 			break;
443 	}
444 
445 	fMessage.Add(message);
446 	fMessageQueue.msg_qnum++;
447 	fMessageQueue.msg_lspid = getpid();
448 	fMessageQueue.msg_stime = real_time_clock();
449 	fBytesInQueue += message->length;
450 	if (fThreadsWaitingToReceive)
451 		WakeUpThread(true /* WaitForMessage */);
452 	return false;
453 }
454 
455 
456 queued_message*
457 XsiMessageQueue::Remove(long typeRequested)
458 {
459 	queued_message *message = NULL;
460 	if (typeRequested < 0) {
461 		// Return first message of the lowest type
462 		// that is less than or equal to the absolute
463 		// value of type requested.
464 		MessageQueue::Iterator iterator = fMessage.GetIterator();
465 		while (iterator.HasNext()) {
466 			queued_message *current = iterator.Next();
467 			if (current->type <= -typeRequested) {
468 				message = iterator.Remove();
469 				break;
470 			}
471 		}
472 	} else if (typeRequested == 0) {
473 		// Return the first message on the queue
474 		message = fMessage.RemoveHead();
475 	} else {
476 		// Return the first message of type requested
477 		MessageQueue::Iterator iterator = fMessage.GetIterator();
478 		while (iterator.HasNext()) {
479 			queued_message *current = iterator.Next();
480 			if (current->type == typeRequested) {
481 				message = iterator.Remove();
482 				break;
483 			}
484 		}
485 	}
486 
487 	if (message == NULL)
488 		return NULL;
489 
490 	fMessageQueue.msg_qnum--;
491 	fMessageQueue.msg_lrpid = getpid();
492 	fMessageQueue.msg_rtime = real_time_clock();
493 	fBytesInQueue -= message->length;
494 	atomic_add(&sXsiMessageCount, -1);
495 	if (fThreadsWaitingToSend)
496 		WakeUpThread(false /* WaitForMessage */);
497 	return message;
498 }
499 
500 
501 void
502 XsiMessageQueue::SetID()
503 {
504 	fID = real_time_clock();
505 	// The lock is held before calling us
506 	while (true) {
507 		if (sMessageQueueHashTable.Lookup(fID) == NULL)
508 			break;
509 		fID++;
510 	}
511 	sGlobalSequenceNumber = (sGlobalSequenceNumber + 1) % UINT_MAX;
512 	fSequenceNumber = sGlobalSequenceNumber;
513 }
514 
515 
516 //	#pragma mark - Kernel exported API
517 
518 
519 void
520 xsi_msg_init()
521 {
522 	// Initialize hash tables
523 	status_t status = sIpcHashTable.Init();
524 	if (status != B_OK)
525 		panic("xsi_msg_init() failed to initialize ipc hash table\n");
526 	status =  sMessageQueueHashTable.Init();
527 	if (status != B_OK)
528 		panic("xsi_msg_init() failed to initialize message queue hash table\n");
529 
530 	mutex_init(&sIpcLock, "global POSIX message queue IPC table");
531 	mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table");
532 }
533 
534 
535 //	#pragma mark - Syscalls
536 
537 
538 int
539 _user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer)
540 {
541 	TRACE(("xsi_msgctl: messageQueueID = %d, command = %d\n", messageQueueID, command));
542 	MutexLocker ipcHashLocker(sIpcLock);
543 	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
544 	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
545 	if (messageQueue == NULL) {
546 		TRACE_ERROR(("xsi_msgctl: message queue id %d not valid\n", messageQueueID));
547 		return EINVAL;
548 	}
549 	if (!IS_USER_ADDRESS(buffer)) {
550 		TRACE_ERROR(("xsi_msgctl: buffer address is not valid\n"));
551 		return B_BAD_ADDRESS;
552 	}
553 
554 	// Lock the message queue itself and release both the ipc hash table lock
555 	// and the message queue hash table lock _only_ if the command it's not
556 	// IPC_RMID, this prevents undesidered situation from happening while
557 	// (hopefully) improving the concurrency.
558 	MutexLocker messageQueueLocker;
559 	if (command != IPC_RMID) {
560 		messageQueueLocker.SetTo(&messageQueue->Lock(), false);
561 		messageQueueHashLocker.Unlock();
562 		ipcHashLocker.Unlock();
563 	} else
564 		// Since we are going to delete the message queue object
565 		// along with its mutex, we can't use a MutexLocker object,
566 		// as the mutex itself won't exist on function exit
567 		mutex_lock(&messageQueue->Lock());
568 
569 	switch (command) {
570 		case IPC_STAT: {
571 			if (!messageQueue->HasReadPermission()) {
572 				TRACE_ERROR(("xsi_msgctl: calling process has not read "
573 					"permission on message queue %d, key %d\n", messageQueueID,
574 					(int)messageQueue->IpcKey()));
575 				return EACCES;
576 			}
577 			struct msqid_ds msg = messageQueue->GetMessageQueue();
578 			if (user_memcpy(buffer, &msg, sizeof(struct msqid_ds)) < B_OK) {
579 				TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
580 				return B_BAD_ADDRESS;
581 			}
582 			break;
583 		}
584 
585 		case IPC_SET: {
586 			if (!messageQueue->HasPermission()) {
587 				TRACE_ERROR(("xsi_msgctl: calling process has not permission "
588 					"on message queue %d, key %d\n", messageQueueID,
589 					(int)messageQueue->IpcKey()));
590 				return EPERM;
591 			}
592 			struct msqid_ds msg;
593 			if (user_memcpy(&msg, buffer, sizeof(struct msqid_ds)) < B_OK) {
594 				TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
595 				return B_BAD_ADDRESS;
596 			}
597 			if (msg.msg_qbytes > messageQueue->MaxBytes() && getuid() != 0) {
598 				TRACE_ERROR(("xsi_msgctl: user does not have permission to "
599 					"increase the maximum number of bytes allowed on queue\n"));
600 				return EPERM;
601 			}
602 			if (msg.msg_qbytes == 0) {
603 				TRACE_ERROR(("xsi_msgctl: can't set msg_qbytes to 0!\n"));
604 				return EINVAL;
605 			}
606 
607 			messageQueue->DoIpcSet(&msg);
608 			break;
609 		}
610 
611 		case IPC_RMID: {
612 			// If this was the command, we are still holding the message
613 			// queue hash table lock along with the ipc one, but not the
614 			// message queue lock itself. This prevents other process
615 			// to try and acquire a destroyed mutex
616 			if (!messageQueue->HasPermission()) {
617 				TRACE_ERROR(("xsi_msgctl: calling process has not permission "
618 					"on message queue %d, key %d\n", messageQueueID,
619 					(int)messageQueue->IpcKey()));
620 				return EPERM;
621 			}
622 			key_t key = messageQueue->IpcKey();
623 			Ipc *ipcKey = NULL;
624 			if (key != -1) {
625 				ipcKey = sIpcHashTable.Lookup(key);
626 				sIpcHashTable.Remove(ipcKey);
627 			}
628 			sMessageQueueHashTable.Remove(messageQueue);
629 			// Wake up of any threads waiting on this
630 			// queue happens in destructor
631 			if (key != -1)
632 				delete ipcKey;
633 			atomic_add(&sXsiMessageQueueCount, -1);
634 
635 			delete messageQueue;
636 			break;
637 		}
638 
639 		default:
640 			TRACE_ERROR(("xsi_semctl: command %d not valid\n", command));
641 			return EINVAL;
642 	}
643 
644 	return B_OK;
645 }
646 
647 
648 int
649 _user_xsi_msgget(key_t key, int flags)
650 {
651 	TRACE(("xsi_msgget: key = %d, flags = %d\n", (int)key, flags));
652 	XsiMessageQueue *messageQueue = NULL;
653 	Ipc *ipcKey = NULL;
654 	// Default assumptions
655 	bool isPrivate = true;
656 	bool create = true;
657 
658 	if (key != IPC_PRIVATE) {
659 		isPrivate = false;
660 		// Check if key already exist, if it does it already has a message
661 		// queue associated with it
662 		ipcKey = sIpcHashTable.Lookup(key);
663 		if (ipcKey == NULL) {
664 			if (!(flags & IPC_CREAT)) {
665 				TRACE_ERROR(("xsi_msgget: key %d does not exist, but the "
666 					"caller did not ask for creation\n", (int)key));
667 				return ENOENT;
668 			}
669 			ipcKey = new(std::nothrow) Ipc(key);
670 			if (ipcKey == NULL) {
671 				TRACE_ERROR(("xsi_msgget: failed to create new Ipc object "
672 					"for key %d\n", (int)key));
673 				return ENOMEM;
674 			}
675 			sIpcHashTable.Insert(ipcKey);
676 		} else {
677 			// The IPC key exist and it already has a message queue
678 			if ((flags & IPC_CREAT) && (flags & IPC_EXCL)) {
679 				TRACE_ERROR(("xsi_msgget: key %d already exist\n", (int)key));
680 				return EEXIST;
681 			}
682 			int messageQueueID = ipcKey->MessageQueueID();
683 
684 			MutexLocker _(sXsiMessageQueueLock);
685 			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
686 			if (!messageQueue->HasPermission()) {
687 				TRACE_ERROR(("xsi_msgget: calling process has not permission "
688 					"on message queue %d, key %d\n", messageQueue->ID(),
689 					(int)key));
690 				return EACCES;
691 			}
692 			create = false;
693 		}
694 	}
695 
696 	if (create) {
697 		// Create a new message queue for this key
698 		if (atomic_get(&sXsiMessageQueueCount) >= MAX_XSI_MESSAGE_QUEUE) {
699 			TRACE_ERROR(("xsi_msgget: reached limit of maximun number of "
700 				"message queues\n"));
701 			return ENOSPC;
702 		}
703 
704 		messageQueue = new(std::nothrow) XsiMessageQueue(flags);
705 		if (messageQueue == NULL) {
706 			TRACE_ERROR(("xsi_msgget: failed to allocate new xsi "
707 				"message queue\n"));
708 			return ENOMEM;
709 		}
710 		atomic_add(&sXsiMessageQueueCount, 1);
711 
712 		MutexLocker _(sXsiMessageQueueLock);
713 		messageQueue->SetID();
714 		if (isPrivate)
715 			messageQueue->SetIpcKey((key_t)-1);
716 		else {
717 			messageQueue->SetIpcKey(key);
718 			ipcKey->SetMessageQueueID(messageQueue);
719 		}
720 		sMessageQueueHashTable.Insert(messageQueue);
721 	}
722 
723 	return messageQueue->ID();
724 }
725 
726 
727 ssize_t
728 _user_xsi_msgrcv(int messageQueueID, void *messagePointer,
729 	size_t messageSize, long messageType, int messageFlags)
730 {
731 	TRACE(("xsi_msgrcv: messageQueueID = %d, messageSize = %ld\n",
732 		messageQueueID, messageSize));
733 	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
734 	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
735 	if (messageQueue == NULL) {
736 		TRACE_ERROR(("xsi_msgrcv: message queue id %d not valid\n",
737 			messageQueueID));
738 		return EINVAL;
739 	}
740 	MutexLocker messageQueueLocker(messageQueue->Lock());
741 	messageQueueHashLocker.Unlock();
742 
743 	if (messageSize > MAX_BYTES_PER_QUEUE) {
744 		TRACE_ERROR(("xsi_msgrcv: message size is out of range\n"));
745 		return EINVAL;
746 	}
747 	if (!messageQueue->HasPermission()) {
748 		TRACE_ERROR(("xsi_msgrcv: calling process has not permission "
749 			"on message queue id %d, key %d\n", messageQueueID,
750 			(int)messageQueue->IpcKey()));
751 		return EACCES;
752 	}
753 	if (!IS_USER_ADDRESS(messagePointer)) {
754 		TRACE_ERROR(("xsi_msgrcv: message address is not valid\n"));
755 		return B_BAD_ADDRESS;
756 	}
757 
758 	queued_message *message = NULL;
759 	while (true) {
760 		message = messageQueue->Remove(messageType);
761 
762 		if (message == NULL && !(messageFlags & IPC_NOWAIT)) {
763 			// We are going to sleep
764 			Thread *thread = thread_get_current_thread();
765 			queued_thread queueEntry(thread, messageSize);
766 			messageQueue->Enqueue(&queueEntry, /* waitForMessage */ true);
767 
768 			uint32 sequenceNumber = messageQueue->SequenceNumber();
769 
770 			TRACE(("xsi_msgrcv: thread %d going to sleep\n", (int)thread->id));
771 			status_t result
772 				= messageQueue->BlockAndUnlock(thread, &messageQueueLocker);
773 			TRACE(("xsi_msgrcv: thread %d back to life\n", (int)thread->id));
774 
775 			messageQueueHashLocker.Lock();
776 			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
777 			if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
778 				&& sequenceNumber != messageQueue->SequenceNumber())) {
779 				TRACE_ERROR(("xsi_msgrcv: message queue id %d (sequence = "
780 					"%" B_PRIu32 ") got destroyed\n", messageQueueID,
781 					sequenceNumber));
782 				return EIDRM;
783 			} else if (result == B_INTERRUPTED) {
784 				TRACE_ERROR(("xsi_msgrcv: thread %d got interrupted while "
785 					"waiting on message queue %d\n",(int)thread->id,
786 					messageQueueID));
787 				messageQueue->Deque(&queueEntry, /* waitForMessage */ true);
788 				return EINTR;
789 			} else {
790 				messageQueueLocker.Lock();
791 				messageQueueHashLocker.Unlock();
792 			}
793 		} else if (message == NULL) {
794 			// There is not message of type requested and
795 			// we can't wait
796 			return ENOMSG;
797 		} else {
798 			// Message received correctly (so far)
799 			if ((ssize_t)messageSize < message->length
800 				&& !(messageFlags & MSG_NOERROR)) {
801 				TRACE_ERROR(("xsi_msgrcv: message too big!\n"));
802 				// Put the message back inside. Since we hold the
803 				// queue message lock, not one else could have filled
804 				// up the queue meanwhile
805 				messageQueue->Insert(message);
806 				return E2BIG;
807 			}
808 
809 			ssize_t result
810 				= message->copy_to_user_buffer(messagePointer, messageSize);
811 			if (result < 0) {
812 				messageQueue->Insert(message);
813 				return B_BAD_ADDRESS;
814 			}
815 
816 			delete message;
817 			TRACE(("xsi_msgrcv: message received correctly\n"));
818 			return result;
819 		}
820 	}
821 
822 	return B_OK;
823 }
824 
825 
826 int
827 _user_xsi_msgsnd(int messageQueueID, const void *messagePointer,
828 	size_t messageSize, int messageFlags)
829 {
830 	TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %ld\n",
831 		messageQueueID, messageSize));
832 	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
833 	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
834 	if (messageQueue == NULL) {
835 		TRACE_ERROR(("xsi_msgsnd: message queue id %d not valid\n",
836 			messageQueueID));
837 		return EINVAL;
838 	}
839 	MutexLocker messageQueueLocker(messageQueue->Lock());
840 	messageQueueHashLocker.Unlock();
841 
842 	if (messageSize > MAX_BYTES_PER_QUEUE) {
843 		TRACE_ERROR(("xsi_msgsnd: message size is out of range\n"));
844 		return EINVAL;
845 	}
846 	if (!messageQueue->HasPermission()) {
847 		TRACE_ERROR(("xsi_msgsnd: calling process has not permission "
848 			"on message queue id %d, key %d\n", messageQueueID,
849 			(int)messageQueue->IpcKey()));
850 		return EACCES;
851 	}
852 	if (!IS_USER_ADDRESS(messagePointer)) {
853 		TRACE_ERROR(("xsi_msgsnd: message address is not valid\n"));
854 		return B_BAD_ADDRESS;
855 	}
856 
857 	queued_message *message
858 		= new(std::nothrow) queued_message(messagePointer, messageSize);
859 	if (message == NULL || message->initOK != true) {
860 		TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n"));
861 		delete message;
862 		return ENOMEM;
863 	}
864 
865 	bool notSent = true;
866 	status_t result = B_OK;
867 	while (notSent) {
868 		bool goToSleep = messageQueue->Insert(message);
869 
870 		if (goToSleep && !(messageFlags & IPC_NOWAIT)) {
871 			// We are going to sleep
872 			Thread *thread = thread_get_current_thread();
873 			queued_thread queueEntry(thread, messageSize);
874 			messageQueue->Enqueue(&queueEntry, /* waitForMessage */ false);
875 
876 			uint32 sequenceNumber = messageQueue->SequenceNumber();
877 
878 			TRACE(("xsi_msgsnd: thread %d going to sleep\n", (int)thread->id));
879 			result = messageQueue->BlockAndUnlock(thread, &messageQueueLocker);
880 			TRACE(("xsi_msgsnd: thread %d back to life\n", (int)thread->id));
881 
882 			messageQueueHashLocker.Lock();
883 			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
884 			if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
885 				&& sequenceNumber != messageQueue->SequenceNumber())) {
886 				TRACE_ERROR(("xsi_msgsnd: message queue id %d (sequence = "
887 					"%" B_PRIu32 ") got destroyed\n", messageQueueID,
888 					sequenceNumber));
889 				delete message;
890 				notSent = false;
891 				result = EIDRM;
892 			} else if (result == B_INTERRUPTED) {
893 				TRACE_ERROR(("xsi_msgsnd: thread %d got interrupted while "
894 					"waiting on message queue %d\n",(int)thread->id,
895 					messageQueueID));
896 				messageQueue->Deque(&queueEntry, /* waitForMessage */ false);
897 				delete message;
898 				notSent = false;
899 				result = EINTR;
900 			} else {
901 				messageQueueLocker.Lock();
902 				messageQueueHashLocker.Unlock();
903 			}
904 		} else if (goToSleep) {
905 			// We did not send the message and we can't wait
906 			delete message;
907 			notSent = false;
908 			result = EAGAIN;
909 		} else {
910 			// Message delivered correctly
911 			TRACE(("xsi_msgsnd: message sent correctly\n"));
912 			notSent = false;
913 		}
914 	}
915 
916 	return result;
917 }
918