xref: /haiku/src/system/kernel/posix/xsi_message_queue.cpp (revision 541ff51a6ef4c47f8ab105ba6ff895cdbba83aca)
1 /*
2  * Copyright 2008-2011, Haiku, Inc. All rights reserved.
3  * Distributed under the terms of the MIT License.
4  *
5  * Authors:
6  *		Salvatore Benedetto <salvatore.benedetto@gmail.com>
7  */
8 
9 #include <posix/xsi_message_queue.h>
10 
11 #include <new>
12 
13 #include <sys/ipc.h>
14 #include <sys/types.h>
15 
16 #include <OS.h>
17 
18 #include <kernel.h>
19 #include <syscall_restart.h>
20 
21 #include <util/atomic.h>
22 #include <util/AutoLock.h>
23 #include <util/DoublyLinkedList.h>
24 #include <util/OpenHashTable.h>
25 
26 
27 #define TRACE_XSI_MSG_QUEUE
28 #ifdef TRACE_XSI_MSG_QUEUE
29 #	define TRACE(x)			dprintf x
30 #	define TRACE_ERROR(x)	dprintf x
31 #else
32 #	define TRACE(x)			/* nothing */
33 #	define TRACE_ERROR(x)	dprintf x
34 #endif
35 
36 // Queue for holding blocked threads
37 struct queued_thread : DoublyLinkedListLinkImpl<queued_thread> {
38 	queued_thread(Thread *_thread, int32 _message_length)
39 		:
40 		thread(_thread),
41 		message_length(_message_length),
42 		queued(false)
43 	{
44 	}
45 
46 	Thread	*thread;
47 	int32	message_length;
48 	bool	queued;
49 };
50 
51 typedef DoublyLinkedList<queued_thread> ThreadQueue;
52 
53 
54 struct queued_message : DoublyLinkedListLinkImpl<queued_message> {
55 	queued_message(const void *_message, ssize_t _length)
56 		:
57 		initOK(false),
58 		length(_length)
59 	{
60 		message = (char *)malloc(sizeof(char) * _length);
61 		if (message == NULL)
62 			return;
63 
64 		if (user_memcpy(&type, _message, sizeof(long)) != B_OK
65 			|| user_memcpy(message, (void *)((char *)_message + sizeof(long)),
66 			_length) != B_OK) {
67 			free(message);
68 			return;
69 		}
70 		initOK = true;
71 	}
72 
73 	~queued_message()
74 	{
75 		if (initOK)
76 			free(message);
77 	}
78 
79 	ssize_t copy_to_user_buffer(void *_message, ssize_t _length)
80 	{
81 		if (_length > length)
82 			_length = length;
83 
84 		if (user_memcpy(_message, &type, sizeof(long)) != B_OK
85 			|| user_memcpy((void *)((char *)_message + sizeof(long)), message,
86 			_length) != B_OK)
87 			return B_ERROR;
88 		return _length;
89 	}
90 
91 	bool		initOK;
92 	ssize_t		length;
93 	char		*message;
94 	long		type;
95 };
96 
97 typedef DoublyLinkedList<queued_message> MessageQueue;
98 
99 // Arbitrary limit
100 #define MAX_BYTES_PER_QUEUE		2048
101 
102 class XsiMessageQueue {
103 public:
104 	XsiMessageQueue(int flags)
105 		:
106 		fBytesInQueue(0),
107 		fThreadsWaitingToReceive(0),
108 		fThreadsWaitingToSend(0)
109 	{
110 		mutex_init(&fLock, "XsiMessageQueue private mutex");
111 		SetIpcKey((key_t)-1);
112 		SetPermissions(flags);
113 		// Initialize all fields to zero
114 		memset((void *)&fMessageQueue, 0, sizeof(struct msqid_ds));
115 		fMessageQueue.msg_ctime = (time_t)real_time_clock();
116 		fMessageQueue.msg_qbytes = MAX_BYTES_PER_QUEUE;
117 	}
118 
119 	// Implemented after sXsiMessageCount is declared
120 	~XsiMessageQueue();
121 
122 	status_t BlockAndUnlock(Thread *thread, MutexLocker *queueLocker)
123 	{
124 		thread_prepare_to_block(thread, B_CAN_INTERRUPT,
125 				THREAD_BLOCK_TYPE_OTHER, (void*)"xsi message queue");
126 		// Unlock the queue before blocking
127 		queueLocker->Unlock();
128 
129 		InterruptsSpinLocker schedulerLocker(gSchedulerLock);
130 // TODO: We've got a serious race condition: If BlockAndUnlock() returned due to
131 // interruption, we will still be queued. A WakeUpThread() at this point will
132 // call thread_unblock() and might thus screw with our trying to re-lock the
133 // mutex.
134 		return thread_block_locked(thread);
135 	}
136 
137 	void DoIpcSet(struct msqid_ds *result)
138 	{
139 		fMessageQueue.msg_perm.uid = result->msg_perm.uid;
140 		fMessageQueue.msg_perm.gid = result->msg_perm.gid;
141 		fMessageQueue.msg_perm.mode = (fMessageQueue.msg_perm.mode & ~0x01ff)
142 			| (result->msg_perm.mode & 0x01ff);
143 		fMessageQueue.msg_qbytes = result->msg_qbytes;
144 		fMessageQueue.msg_ctime = (time_t)real_time_clock();
145 	}
146 
147 	void Deque(queued_thread *queueEntry, bool waitForMessage)
148 	{
149 		if (queueEntry->queued) {
150 			if (waitForMessage) {
151 				fWaitingToReceive.Remove(queueEntry);
152 				fThreadsWaitingToReceive--;
153 			} else {
154 				fWaitingToSend.Remove(queueEntry);
155 				fThreadsWaitingToSend--;
156 			}
157 		}
158 	}
159 
160 	void Enqueue(queued_thread *queueEntry, bool waitForMessage)
161 	{
162 		if (waitForMessage) {
163 			fWaitingToReceive.Add(queueEntry);
164 			fThreadsWaitingToReceive++;
165 		} else {
166 			fWaitingToSend.Add(queueEntry);
167 			fThreadsWaitingToSend++;
168 		}
169 		queueEntry->queued = true;
170 	}
171 
172 	struct msqid_ds &GetMessageQueue()
173 	{
174 		return fMessageQueue;
175 	}
176 
177 	bool HasPermission() const
178 	{
179 		if ((fMessageQueue.msg_perm.mode & S_IWOTH) != 0)
180 			return true;
181 
182 		uid_t uid = geteuid();
183 		if (uid == 0 || (uid == fMessageQueue.msg_perm.uid
184 			&& (fMessageQueue.msg_perm.mode & S_IWUSR) != 0))
185 			return true;
186 
187 		gid_t gid = getegid();
188 		if (gid == fMessageQueue.msg_perm.gid
189 			&& (fMessageQueue.msg_perm.mode & S_IWGRP) != 0)
190 			return true;
191 
192 		return false;
193 	}
194 
195 	bool HasReadPermission() const
196 	{
197 		// TODO: fix this
198 		return HasPermission();
199 	}
200 
201 	int ID() const
202 	{
203 		return fID;
204 	}
205 
206 	// Implemented after sXsiMessageCount is declared
207 	bool Insert(queued_message *message);
208 
209 	key_t IpcKey() const
210 	{
211 		return fMessageQueue.msg_perm.key;
212 	}
213 
214 	mutex &Lock()
215 	{
216 		return fLock;
217 	}
218 
219 	msglen_t MaxBytes() const
220 	{
221 		return fMessageQueue.msg_qbytes;
222 	}
223 
224 	// Implemented after sXsiMessageCount is declared
225 	queued_message *Remove(long typeRequested);
226 
227 	uint32 SequenceNumber() const
228 	{
229 		return fSequenceNumber;
230 	}
231 
232 	// Implemented after sMessageQueueHashTable is declared
233 	void SetID();
234 
235 	void SetIpcKey(key_t key)
236 	{
237 		fMessageQueue.msg_perm.key = key;
238 	}
239 
240 	void SetPermissions(int flags)
241 	{
242 		fMessageQueue.msg_perm.uid = fMessageQueue.msg_perm.cuid = geteuid();
243 		fMessageQueue.msg_perm.gid = fMessageQueue.msg_perm.cgid = getegid();
244 		fMessageQueue.msg_perm.mode = (flags & 0x01ff);
245 	}
246 
247 	void WakeUpThread(bool waitForMessage)
248 	{
249 		InterruptsSpinLocker schedulerLocker(gSchedulerLock);
250 
251 		if (waitForMessage) {
252 			// Wake up all waiting thread for a message
253 			// TODO: this can cause starvation for any
254 			// very-unlucky-and-slow thread
255 			while (queued_thread *entry = fWaitingToReceive.RemoveHead()) {
256 				entry->queued = false;
257 				fThreadsWaitingToReceive--;
258 				thread_unblock_locked(entry->thread, 0);
259 			}
260 		} else {
261 			// Wake up only one thread waiting to send
262 			if (queued_thread *entry = fWaitingToSend.RemoveHead()) {
263 				entry->queued = false;
264 				fThreadsWaitingToSend--;
265 				thread_unblock_locked(entry->thread, 0);
266 			}
267 		}
268 	}
269 
270 	XsiMessageQueue*& Link()
271 	{
272 		return fLink;
273 	}
274 
275 private:
276 	msglen_t			fBytesInQueue;
277 	int					fID;
278 	mutex				fLock;
279 	MessageQueue		fMessage;
280 	struct msqid_ds		fMessageQueue;
281 	uint32				fSequenceNumber;
282 	uint32				fThreadsWaitingToReceive;
283 	uint32				fThreadsWaitingToSend;
284 
285 	ThreadQueue			fWaitingToReceive;
286 	ThreadQueue			fWaitingToSend;
287 
288 	XsiMessageQueue*	fLink;
289 };
290 
291 
292 // Xsi message queue hash table
293 struct MessageQueueHashTableDefinition {
294 	typedef int					KeyType;
295 	typedef XsiMessageQueue		ValueType;
296 
297 	size_t HashKey (const int key) const
298 	{
299 		return (size_t)key;
300 	}
301 
302 	size_t Hash(XsiMessageQueue *variable) const
303 	{
304 		return (size_t)variable->ID();
305 	}
306 
307 	bool Compare(const int key, XsiMessageQueue *variable) const
308 	{
309 		return (int)key == (int)variable->ID();
310 	}
311 
312 	XsiMessageQueue*& GetLink(XsiMessageQueue *variable) const
313 	{
314 		return variable->Link();
315 	}
316 };
317 
318 
319 // IPC class
320 class Ipc {
321 public:
322 	Ipc(key_t key)
323 		: fKey(key),
324 		fMessageQueueId(-1)
325 	{
326 	}
327 
328 	key_t Key() const
329 	{
330 		return fKey;
331 	}
332 
333 	int MessageQueueID() const
334 	{
335 		return fMessageQueueId;
336 	}
337 
338 	void SetMessageQueueID(XsiMessageQueue *messageQueue)
339 	{
340 		fMessageQueueId = messageQueue->ID();
341 	}
342 
343 	Ipc*& Link()
344 	{
345 		return fLink;
346 	}
347 
348 private:
349 	key_t				fKey;
350 	int					fMessageQueueId;
351 	Ipc*				fLink;
352 };
353 
354 
355 struct IpcHashTableDefinition {
356 	typedef key_t	KeyType;
357 	typedef Ipc		ValueType;
358 
359 	size_t HashKey (const key_t key) const
360 	{
361 		return (size_t)(key);
362 	}
363 
364 	size_t Hash(Ipc *variable) const
365 	{
366 		return (size_t)HashKey(variable->Key());
367 	}
368 
369 	bool Compare(const key_t key, Ipc *variable) const
370 	{
371 		return (key_t)key == (key_t)variable->Key();
372 	}
373 
374 	Ipc*& GetLink(Ipc *variable) const
375 	{
376 		return variable->Link();
377 	}
378 };
379 
380 // Arbitrary limits
381 #define MAX_XSI_MESSAGE			4096
382 #define MAX_XSI_MESSAGE_QUEUE	1024
383 static BOpenHashTable<IpcHashTableDefinition> sIpcHashTable;
384 static BOpenHashTable<MessageQueueHashTableDefinition> sMessageQueueHashTable;
385 
386 static mutex sIpcLock;
387 static mutex sXsiMessageQueueLock;
388 
389 static uint32 sGlobalSequenceNumber = 1;
390 static vint32 sXsiMessageCount = 0;
391 static vint32 sXsiMessageQueueCount = 0;
392 
393 
394 //	#pragma mark -
395 
396 
397 XsiMessageQueue::~XsiMessageQueue()
398 {
399 	mutex_destroy(&fLock);
400 
401 	// Wake up any threads still waiting
402 	if (fThreadsWaitingToSend || fThreadsWaitingToReceive) {
403 		InterruptsSpinLocker schedulerLocker(gSchedulerLock);
404 
405 		while (queued_thread *entry = fWaitingToReceive.RemoveHead()) {
406 			entry->queued = false;
407 			thread_unblock_locked(entry->thread, EIDRM);
408 		}
409 		while (queued_thread *entry = fWaitingToSend.RemoveHead()) {
410 			entry->queued = false;
411 			thread_unblock_locked(entry->thread, EIDRM);
412 		}
413 	}
414 
415 	// Free up any remaining messages
416 	if (fMessageQueue.msg_qnum) {
417 		while (queued_message *message = fMessage.RemoveHead()) {
418 			atomic_add(&sXsiMessageCount, -1);
419 			delete message;
420 		}
421 	}
422 }
423 
424 
425 bool
426 XsiMessageQueue::Insert(queued_message *message)
427 {
428 	// The only situation that would make us (potentially) wait
429 	// is that we exceed with bytes or with the total number of messages
430 	if (fBytesInQueue + message->length > fMessageQueue.msg_qbytes)
431 		return true;
432 
433 	while (true) {
434 		int32 oldCount = atomic_get(&sXsiMessageCount);
435 		if (oldCount >= MAX_XSI_MESSAGE)
436 			return true;
437 		// If another thread updates the counter we keep
438 		// iterating
439 		if (atomic_test_and_set(&sXsiMessageCount, oldCount + 1, oldCount)
440 			== oldCount)
441 			break;
442 	}
443 
444 	fMessage.Add(message);
445 	fMessageQueue.msg_qnum++;
446 	fMessageQueue.msg_lspid = getpid();
447 	fMessageQueue.msg_stime = real_time_clock();
448 	fBytesInQueue += message->length;
449 	if (fThreadsWaitingToReceive)
450 		WakeUpThread(true /* WaitForMessage */);
451 	return false;
452 }
453 
454 
455 queued_message*
456 XsiMessageQueue::Remove(long typeRequested)
457 {
458 	queued_message *message = NULL;
459 	if (typeRequested < 0) {
460 		// Return first message of the lowest type
461 		// that is less than or equal to the absolute
462 		// value of type requested.
463 		MessageQueue::Iterator iterator = fMessage.GetIterator();
464 		while (iterator.HasNext()) {
465 			queued_message *current = iterator.Next();
466 			if (current->type <= -typeRequested) {
467 				message = iterator.Remove();
468 				break;
469 			}
470 		}
471 	} else if (typeRequested == 0) {
472 		// Return the first message on the queue
473 		message = fMessage.RemoveHead();
474 	} else {
475 		// Return the first message of type requested
476 		MessageQueue::Iterator iterator = fMessage.GetIterator();
477 		while (iterator.HasNext()) {
478 			queued_message *current = iterator.Next();
479 			if (current->type == typeRequested) {
480 				message = iterator.Remove();
481 				break;
482 			}
483 		}
484 	}
485 
486 	if (message == NULL)
487 		return NULL;
488 
489 	fMessageQueue.msg_qnum--;
490 	fMessageQueue.msg_lrpid = getpid();
491 	fMessageQueue.msg_rtime = real_time_clock();
492 	fBytesInQueue -= message->length;
493 	atomic_add(&sXsiMessageCount, -1);
494 	if (fThreadsWaitingToSend)
495 		WakeUpThread(false /* WaitForMessage */);
496 	return message;
497 }
498 
499 
500 void
501 XsiMessageQueue::SetID()
502 {
503 	fID = real_time_clock();
504 	// The lock is held before calling us
505 	while (true) {
506 		if (sMessageQueueHashTable.Lookup(fID) == NULL)
507 			break;
508 		fID++;
509 	}
510 	sGlobalSequenceNumber = (sGlobalSequenceNumber + 1) % UINT_MAX;
511 	fSequenceNumber = sGlobalSequenceNumber;
512 }
513 
514 
515 //	#pragma mark - Kernel exported API
516 
517 
518 void
519 xsi_msg_init()
520 {
521 	// Initialize hash tables
522 	status_t status = sIpcHashTable.Init();
523 	if (status != B_OK)
524 		panic("xsi_msg_init() failed to initialize ipc hash table\n");
525 	status =  sMessageQueueHashTable.Init();
526 	if (status != B_OK)
527 		panic("xsi_msg_init() failed to initialize message queue hash table\n");
528 
529 	mutex_init(&sIpcLock, "global POSIX message queue IPC table");
530 	mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table");
531 }
532 
533 
534 //	#pragma mark - Syscalls
535 
536 
537 int
538 _user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer)
539 {
540 	TRACE(("xsi_msgctl: messageQueueID = %d, command = %d\n", messageQueueID, command));
541 	MutexLocker ipcHashLocker(sIpcLock);
542 	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
543 	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
544 	if (messageQueue == NULL) {
545 		TRACE_ERROR(("xsi_msgctl: message queue id %d not valid\n", messageQueueID));
546 		return EINVAL;
547 	}
548 	if (!IS_USER_ADDRESS(buffer)) {
549 		TRACE_ERROR(("xsi_msgctl: buffer address is not valid\n"));
550 		return B_BAD_ADDRESS;
551 	}
552 
553 	// Lock the message queue itself and release both the ipc hash table lock
554 	// and the message queue hash table lock _only_ if the command it's not
555 	// IPC_RMID, this prevents undesidered situation from happening while
556 	// (hopefully) improving the concurrency.
557 	MutexLocker messageQueueLocker;
558 	if (command != IPC_RMID) {
559 		messageQueueLocker.SetTo(&messageQueue->Lock(), false);
560 		messageQueueHashLocker.Unlock();
561 		ipcHashLocker.Unlock();
562 	} else
563 		// Since we are going to delete the message queue object
564 		// along with its mutex, we can't use a MutexLocker object,
565 		// as the mutex itself won't exist on function exit
566 		mutex_lock(&messageQueue->Lock());
567 
568 	switch (command) {
569 		case IPC_STAT: {
570 			if (!messageQueue->HasReadPermission()) {
571 				TRACE_ERROR(("xsi_msgctl: calling process has not read "
572 					"permission on message queue %d, key %d\n", messageQueueID,
573 					(int)messageQueue->IpcKey()));
574 				return EACCES;
575 			}
576 			struct msqid_ds msg = messageQueue->GetMessageQueue();
577 			if (user_memcpy(buffer, &msg, sizeof(struct msqid_ds)) < B_OK) {
578 				TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
579 				return B_BAD_ADDRESS;
580 			}
581 			break;
582 		}
583 
584 		case IPC_SET: {
585 			if (!messageQueue->HasPermission()) {
586 				TRACE_ERROR(("xsi_msgctl: calling process has not permission "
587 					"on message queue %d, key %d\n", messageQueueID,
588 					(int)messageQueue->IpcKey()));
589 				return EPERM;
590 			}
591 			struct msqid_ds msg;
592 			if (user_memcpy(&msg, buffer, sizeof(struct msqid_ds)) < B_OK) {
593 				TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
594 				return B_BAD_ADDRESS;
595 			}
596 			if (msg.msg_qbytes > messageQueue->MaxBytes() && getuid() != 0) {
597 				TRACE_ERROR(("xsi_msgctl: user does not have permission to "
598 					"increase the maximum number of bytes allowed on queue\n"));
599 				return EPERM;
600 			}
601 			if (msg.msg_qbytes == 0) {
602 				TRACE_ERROR(("xsi_msgctl: can't set msg_qbytes to 0!\n"));
603 				return EINVAL;
604 			}
605 
606 			messageQueue->DoIpcSet(&msg);
607 			break;
608 		}
609 
610 		case IPC_RMID: {
611 			// If this was the command, we are still holding the message
612 			// queue hash table lock along with the ipc one, but not the
613 			// message queue lock itself. This prevents other process
614 			// to try and acquire a destroyed mutex
615 			if (!messageQueue->HasPermission()) {
616 				TRACE_ERROR(("xsi_msgctl: calling process has not permission "
617 					"on message queue %d, key %d\n", messageQueueID,
618 					(int)messageQueue->IpcKey()));
619 				return EPERM;
620 			}
621 			key_t key = messageQueue->IpcKey();
622 			Ipc *ipcKey = NULL;
623 			if (key != -1) {
624 				ipcKey = sIpcHashTable.Lookup(key);
625 				sIpcHashTable.Remove(ipcKey);
626 			}
627 			sMessageQueueHashTable.Remove(messageQueue);
628 			// Wake up of any threads waiting on this
629 			// queue happens in destructor
630 			if (key != -1)
631 				delete ipcKey;
632 			atomic_add(&sXsiMessageQueueCount, -1);
633 
634 			delete messageQueue;
635 			break;
636 		}
637 
638 		default:
639 			TRACE_ERROR(("xsi_semctl: command %d not valid\n", command));
640 			return EINVAL;
641 	}
642 
643 	return B_OK;
644 }
645 
646 
647 int
648 _user_xsi_msgget(key_t key, int flags)
649 {
650 	TRACE(("xsi_msgget: key = %d, flags = %d\n", (int)key, flags));
651 	XsiMessageQueue *messageQueue = NULL;
652 	Ipc *ipcKey = NULL;
653 	// Default assumptions
654 	bool isPrivate = true;
655 	bool create = true;
656 
657 	if (key != IPC_PRIVATE) {
658 		isPrivate = false;
659 		// Check if key already exist, if it does it already has a message
660 		// queue associated with it
661 		ipcKey = sIpcHashTable.Lookup(key);
662 		if (ipcKey == NULL) {
663 			if (!(flags & IPC_CREAT)) {
664 				TRACE_ERROR(("xsi_msgget: key %d does not exist, but the "
665 					"caller did not ask for creation\n", (int)key));
666 				return ENOENT;
667 			}
668 			ipcKey = new(std::nothrow) Ipc(key);
669 			if (ipcKey == NULL) {
670 				TRACE_ERROR(("xsi_msgget: failed to create new Ipc object "
671 					"for key %d\n", (int)key));
672 				return ENOMEM;
673 			}
674 			sIpcHashTable.Insert(ipcKey);
675 		} else {
676 			// The IPC key exist and it already has a message queue
677 			if ((flags & IPC_CREAT) && (flags & IPC_EXCL)) {
678 				TRACE_ERROR(("xsi_msgget: key %d already exist\n", (int)key));
679 				return EEXIST;
680 			}
681 			int messageQueueID = ipcKey->MessageQueueID();
682 
683 			MutexLocker _(sXsiMessageQueueLock);
684 			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
685 			if (!messageQueue->HasPermission()) {
686 				TRACE_ERROR(("xsi_msgget: calling process has not permission "
687 					"on message queue %d, key %d\n", messageQueue->ID(),
688 					(int)key));
689 				return EACCES;
690 			}
691 			create = false;
692 		}
693 	}
694 
695 	if (create) {
696 		// Create a new message queue for this key
697 		if (sXsiMessageQueueCount >= MAX_XSI_MESSAGE_QUEUE) {
698 			TRACE_ERROR(("xsi_msgget: reached limit of maximun number of "
699 				"message queues\n"));
700 			return ENOSPC;
701 		}
702 
703 		messageQueue = new(std::nothrow) XsiMessageQueue(flags);
704 		if (messageQueue == NULL) {
705 			TRACE_ERROR(("xsi_msgget: failed to allocate new xsi "
706 				"message queue\n"));
707 			return ENOMEM;
708 		}
709 		atomic_add(&sXsiMessageQueueCount, 1);
710 
711 		MutexLocker _(sXsiMessageQueueLock);
712 		messageQueue->SetID();
713 		if (isPrivate)
714 			messageQueue->SetIpcKey((key_t)-1);
715 		else {
716 			messageQueue->SetIpcKey(key);
717 			ipcKey->SetMessageQueueID(messageQueue);
718 		}
719 		sMessageQueueHashTable.Insert(messageQueue);
720 	}
721 
722 	return messageQueue->ID();
723 }
724 
725 
726 ssize_t
727 _user_xsi_msgrcv(int messageQueueID, void *messagePointer,
728 	size_t messageSize, long messageType, int messageFlags)
729 {
730 	TRACE(("xsi_msgrcv: messageQueueID = %d, messageSize = %ld\n",
731 		messageQueueID, messageSize));
732 	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
733 	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
734 	if (messageQueue == NULL) {
735 		TRACE_ERROR(("xsi_msgrcv: message queue id %d not valid\n",
736 			messageQueueID));
737 		return EINVAL;
738 	}
739 	MutexLocker messageQueueLocker(messageQueue->Lock());
740 	messageQueueHashLocker.Unlock();
741 
742 	if (messageSize > MAX_BYTES_PER_QUEUE) {
743 		TRACE_ERROR(("xsi_msgrcv: message size is out of range\n"));
744 		return EINVAL;
745 	}
746 	if (!messageQueue->HasPermission()) {
747 		TRACE_ERROR(("xsi_msgrcv: calling process has not permission "
748 			"on message queue id %d, key %d\n", messageQueueID,
749 			(int)messageQueue->IpcKey()));
750 		return EACCES;
751 	}
752 	if (!IS_USER_ADDRESS(messagePointer)) {
753 		TRACE_ERROR(("xsi_msgrcv: message address is not valid\n"));
754 		return B_BAD_ADDRESS;
755 	}
756 
757 	queued_message *message = NULL;
758 	while (true) {
759 		message = messageQueue->Remove(messageType);
760 
761 		if (message == NULL && !(messageFlags & IPC_NOWAIT)) {
762 			// We are going to sleep
763 			Thread *thread = thread_get_current_thread();
764 			queued_thread queueEntry(thread, messageSize);
765 			messageQueue->Enqueue(&queueEntry, /* waitForMessage */ true);
766 
767 			uint32 sequenceNumber = messageQueue->SequenceNumber();
768 
769 			TRACE(("xsi_msgrcv: thread %d going to sleep\n", (int)thread->id));
770 			status_t result
771 				= messageQueue->BlockAndUnlock(thread, &messageQueueLocker);
772 			TRACE(("xsi_msgrcv: thread %d back to life\n", (int)thread->id));
773 
774 			messageQueueHashLocker.Lock();
775 			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
776 			if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
777 				&& sequenceNumber != messageQueue->SequenceNumber())) {
778 				TRACE_ERROR(("xsi_msgrcv: message queue id %d (sequence = %ld) "
779 					"got destroyed\n", messageQueueID, sequenceNumber));
780 				return EIDRM;
781 			} else if (result == B_INTERRUPTED) {
782 				TRACE_ERROR(("xsi_msgrcv: thread %d got interrupted while "
783 					"waiting on message queue %d\n",(int)thread->id,
784 					messageQueueID));
785 				messageQueue->Deque(&queueEntry, /* waitForMessage */ true);
786 				return EINTR;
787 			} else {
788 				messageQueueLocker.Lock();
789 				messageQueueHashLocker.Unlock();
790 			}
791 		} else if (message == NULL) {
792 			// There is not message of type requested and
793 			// we can't wait
794 			return ENOMSG;
795 		} else {
796 			// Message received correctly (so far)
797 			if ((ssize_t)messageSize < message->length
798 				&& !(messageFlags & MSG_NOERROR)) {
799 				TRACE_ERROR(("xsi_msgrcv: message too big!\n"));
800 				// Put the message back inside. Since we hold the
801 				// queue message lock, not one else could have filled
802 				// up the queue meanwhile
803 				messageQueue->Insert(message);
804 				return E2BIG;
805 			}
806 
807 			ssize_t result
808 				= message->copy_to_user_buffer(messagePointer, messageSize);
809 			if (result < 0) {
810 				messageQueue->Insert(message);
811 				return B_BAD_ADDRESS;
812 			}
813 
814 			delete message;
815 			TRACE(("xsi_msgrcv: message received correctly\n"));
816 			return result;
817 		}
818 	}
819 
820 	return B_OK;
821 }
822 
823 
824 int
825 _user_xsi_msgsnd(int messageQueueID, const void *messagePointer,
826 	size_t messageSize, int messageFlags)
827 {
828 	TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %ld\n",
829 		messageQueueID, messageSize));
830 	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
831 	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
832 	if (messageQueue == NULL) {
833 		TRACE_ERROR(("xsi_msgsnd: message queue id %d not valid\n",
834 			messageQueueID));
835 		return EINVAL;
836 	}
837 	MutexLocker messageQueueLocker(messageQueue->Lock());
838 	messageQueueHashLocker.Unlock();
839 
840 	if (messageSize > MAX_BYTES_PER_QUEUE) {
841 		TRACE_ERROR(("xsi_msgsnd: message size is out of range\n"));
842 		return EINVAL;
843 	}
844 	if (!messageQueue->HasPermission()) {
845 		TRACE_ERROR(("xsi_msgsnd: calling process has not permission "
846 			"on message queue id %d, key %d\n", messageQueueID,
847 			(int)messageQueue->IpcKey()));
848 		return EACCES;
849 	}
850 	if (!IS_USER_ADDRESS(messagePointer)) {
851 		TRACE_ERROR(("xsi_msgsnd: message address is not valid\n"));
852 		return B_BAD_ADDRESS;
853 	}
854 
855 	queued_message *message
856 		= new(std::nothrow) queued_message(messagePointer, messageSize);
857 	if (message == NULL || message->initOK != true) {
858 		TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n"));
859 		delete message;
860 		return ENOMEM;
861 	}
862 
863 	bool notSent = true;
864 	status_t result = B_OK;
865 	while (notSent) {
866 		bool goToSleep = messageQueue->Insert(message);
867 
868 		if (goToSleep && !(messageFlags & IPC_NOWAIT)) {
869 			// We are going to sleep
870 			Thread *thread = thread_get_current_thread();
871 			queued_thread queueEntry(thread, messageSize);
872 			messageQueue->Enqueue(&queueEntry, /* waitForMessage */ false);
873 
874 			uint32 sequenceNumber = messageQueue->SequenceNumber();
875 
876 			TRACE(("xsi_msgsnd: thread %d going to sleep\n", (int)thread->id));
877 			result = messageQueue->BlockAndUnlock(thread, &messageQueueLocker);
878 			TRACE(("xsi_msgsnd: thread %d back to life\n", (int)thread->id));
879 
880 			messageQueueHashLocker.Lock();
881 			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
882 			if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
883 				&& sequenceNumber != messageQueue->SequenceNumber())) {
884 				TRACE_ERROR(("xsi_msgsnd: message queue id %d (sequence = %ld) "
885 					"got destroyed\n", messageQueueID, sequenceNumber));
886 				delete message;
887 				notSent = false;
888 				result = EIDRM;
889 			} else if (result == B_INTERRUPTED) {
890 				TRACE_ERROR(("xsi_msgsnd: thread %d got interrupted while "
891 					"waiting on message queue %d\n",(int)thread->id,
892 					messageQueueID));
893 				messageQueue->Deque(&queueEntry, /* waitForMessage */ false);
894 				delete message;
895 				notSent = false;
896 				result = EINTR;
897 			} else {
898 				messageQueueLocker.Lock();
899 				messageQueueHashLocker.Unlock();
900 			}
901 		} else if (goToSleep) {
902 			// We did not send the message and we can't wait
903 			delete message;
904 			notSent = false;
905 			result = EAGAIN;
906 		} else {
907 			// Message delivered correctly
908 			TRACE(("xsi_msgsnd: message sent correctly\n"));
909 			notSent = false;
910 		}
911 	}
912 
913 	return result;
914 }
915