xref: /haiku/src/system/kernel/posix/xsi_message_queue.cpp (revision d06cbe081b7ea043aea2012359744091de6d604d)
1 /*
2  * Copyright 2008-2011, Haiku, Inc. All rights reserved.
3  * Distributed under the terms of the MIT License.
4  *
5  * Authors:
6  *		Salvatore Benedetto <salvatore.benedetto@gmail.com>
7  */
8 
9 #include <posix/xsi_message_queue.h>
10 
11 #include <new>
12 
13 #include <sys/ipc.h>
14 #include <sys/types.h>
15 
16 #include <OS.h>
17 
18 #include <kernel.h>
19 #include <syscall_restart.h>
20 
21 #include <util/atomic.h>
22 #include <util/AutoLock.h>
23 #include <util/DoublyLinkedList.h>
24 #include <util/OpenHashTable.h>
25 
26 
27 #define TRACE_XSI_MSG_QUEUE
28 #ifdef TRACE_XSI_MSG_QUEUE
29 #	define TRACE(x)			dprintf x
30 #	define TRACE_ERROR(x)	dprintf x
31 #else
32 #	define TRACE(x)			/* nothing */
33 #	define TRACE_ERROR(x)	dprintf x
34 #endif
35 
36 // Queue for holding blocked threads
37 struct queued_thread : DoublyLinkedListLinkImpl<queued_thread> {
38 	queued_thread(Thread *_thread, int32 _message_length)
39 		:
40 		thread(_thread),
41 		message_length(_message_length),
42 		queued(false)
43 	{
44 	}
45 
46 	Thread	*thread;
47 	int32	message_length;
48 	bool	queued;
49 };
50 
51 typedef DoublyLinkedList<queued_thread> ThreadQueue;
52 
53 
54 struct queued_message : DoublyLinkedListLinkImpl<queued_message> {
55 	queued_message(const void *_message, ssize_t _length)
56 		:
57 		initOK(false),
58 		length(_length)
59 	{
60 		message = (char *)malloc(sizeof(char) * _length);
61 		if (message == NULL)
62 			return;
63 
64 		if (user_memcpy(&type, _message, sizeof(long)) != B_OK
65 			|| user_memcpy(message, (void *)((char *)_message + sizeof(long)),
66 			_length) != B_OK) {
67 			free(message);
68 			return;
69 		}
70 		initOK = true;
71 	}
72 
73 	~queued_message()
74 	{
75 		if (initOK)
76 			free(message);
77 	}
78 
79 	ssize_t copy_to_user_buffer(void *_message, ssize_t _length)
80 	{
81 		if (_length > length)
82 			_length = length;
83 
84 		if (user_memcpy(_message, &type, sizeof(long)) != B_OK
85 			|| user_memcpy((void *)((char *)_message + sizeof(long)), message,
86 			_length) != B_OK)
87 			return B_ERROR;
88 		return _length;
89 	}
90 
91 	bool		initOK;
92 	ssize_t		length;
93 	char		*message;
94 	long		type;
95 };
96 
97 typedef DoublyLinkedList<queued_message> MessageQueue;
98 
99 // Arbitrary limit
100 #define MAX_BYTES_PER_QUEUE		2048
101 
102 class XsiMessageQueue {
103 public:
104 	XsiMessageQueue(int flags)
105 		:
106 		fBytesInQueue(0),
107 		fThreadsWaitingToReceive(0),
108 		fThreadsWaitingToSend(0)
109 	{
110 		mutex_init(&fLock, "XsiMessageQueue private mutex");
111 		SetIpcKey((key_t)-1);
112 		SetPermissions(flags);
113 		// Initialize all fields to zero
114 		memset((void *)&fMessageQueue, 0, sizeof(struct msqid_ds));
115 		fMessageQueue.msg_ctime = (time_t)real_time_clock();
116 		fMessageQueue.msg_qbytes = MAX_BYTES_PER_QUEUE;
117 	}
118 
119 	// Implemented after sXsiMessageCount is declared
120 	~XsiMessageQueue();
121 
122 	status_t BlockAndUnlock(Thread *thread, MutexLocker *queueLocker)
123 	{
124 		thread_prepare_to_block(thread, B_CAN_INTERRUPT,
125 				THREAD_BLOCK_TYPE_OTHER, (void*)"xsi message queue");
126 		// Unlock the queue before blocking
127 		queueLocker->Unlock();
128 
129 // TODO: We've got a serious race condition: If BlockAndUnlock() returned due to
130 // interruption, we will still be queued. A WakeUpThread() at this point will
131 // call thread_unblock() and might thus screw with our trying to re-lock the
132 // mutex.
133 		return thread_block();
134 	}
135 
136 	void DoIpcSet(struct msqid_ds *result)
137 	{
138 		fMessageQueue.msg_perm.uid = result->msg_perm.uid;
139 		fMessageQueue.msg_perm.gid = result->msg_perm.gid;
140 		fMessageQueue.msg_perm.mode = (fMessageQueue.msg_perm.mode & ~0x01ff)
141 			| (result->msg_perm.mode & 0x01ff);
142 		fMessageQueue.msg_qbytes = result->msg_qbytes;
143 		fMessageQueue.msg_ctime = (time_t)real_time_clock();
144 	}
145 
146 	void Deque(queued_thread *queueEntry, bool waitForMessage)
147 	{
148 		if (queueEntry->queued) {
149 			if (waitForMessage) {
150 				fWaitingToReceive.Remove(queueEntry);
151 				fThreadsWaitingToReceive--;
152 			} else {
153 				fWaitingToSend.Remove(queueEntry);
154 				fThreadsWaitingToSend--;
155 			}
156 		}
157 	}
158 
159 	void Enqueue(queued_thread *queueEntry, bool waitForMessage)
160 	{
161 		if (waitForMessage) {
162 			fWaitingToReceive.Add(queueEntry);
163 			fThreadsWaitingToReceive++;
164 		} else {
165 			fWaitingToSend.Add(queueEntry);
166 			fThreadsWaitingToSend++;
167 		}
168 		queueEntry->queued = true;
169 	}
170 
171 	struct msqid_ds &GetMessageQueue()
172 	{
173 		return fMessageQueue;
174 	}
175 
176 	bool HasPermission() const
177 	{
178 		if ((fMessageQueue.msg_perm.mode & S_IWOTH) != 0)
179 			return true;
180 
181 		uid_t uid = geteuid();
182 		if (uid == 0 || (uid == fMessageQueue.msg_perm.uid
183 			&& (fMessageQueue.msg_perm.mode & S_IWUSR) != 0))
184 			return true;
185 
186 		gid_t gid = getegid();
187 		if (gid == fMessageQueue.msg_perm.gid
188 			&& (fMessageQueue.msg_perm.mode & S_IWGRP) != 0)
189 			return true;
190 
191 		return false;
192 	}
193 
194 	bool HasReadPermission() const
195 	{
196 		// TODO: fix this
197 		return HasPermission();
198 	}
199 
200 	int ID() const
201 	{
202 		return fID;
203 	}
204 
205 	// Implemented after sXsiMessageCount is declared
206 	bool Insert(queued_message *message);
207 
208 	key_t IpcKey() const
209 	{
210 		return fMessageQueue.msg_perm.key;
211 	}
212 
213 	mutex &Lock()
214 	{
215 		return fLock;
216 	}
217 
218 	msglen_t MaxBytes() const
219 	{
220 		return fMessageQueue.msg_qbytes;
221 	}
222 
223 	// Implemented after sXsiMessageCount is declared
224 	queued_message *Remove(long typeRequested);
225 
226 	uint32 SequenceNumber() const
227 	{
228 		return fSequenceNumber;
229 	}
230 
231 	// Implemented after sMessageQueueHashTable is declared
232 	void SetID();
233 
234 	void SetIpcKey(key_t key)
235 	{
236 		fMessageQueue.msg_perm.key = key;
237 	}
238 
239 	void SetPermissions(int flags)
240 	{
241 		fMessageQueue.msg_perm.uid = fMessageQueue.msg_perm.cuid = geteuid();
242 		fMessageQueue.msg_perm.gid = fMessageQueue.msg_perm.cgid = getegid();
243 		fMessageQueue.msg_perm.mode = (flags & 0x01ff);
244 	}
245 
246 	void WakeUpThread(bool waitForMessage)
247 	{
248 		if (waitForMessage) {
249 			// Wake up all waiting thread for a message
250 			// TODO: this can cause starvation for any
251 			// very-unlucky-and-slow thread
252 			while (queued_thread *entry = fWaitingToReceive.RemoveHead()) {
253 				entry->queued = false;
254 				fThreadsWaitingToReceive--;
255 				thread_unblock(entry->thread, 0);
256 			}
257 		} else {
258 			// Wake up only one thread waiting to send
259 			if (queued_thread *entry = fWaitingToSend.RemoveHead()) {
260 				entry->queued = false;
261 				fThreadsWaitingToSend--;
262 				thread_unblock(entry->thread, 0);
263 			}
264 		}
265 	}
266 
267 	XsiMessageQueue*& Link()
268 	{
269 		return fLink;
270 	}
271 
272 private:
273 	msglen_t			fBytesInQueue;
274 	int					fID;
275 	mutex				fLock;
276 	MessageQueue		fMessage;
277 	struct msqid_ds		fMessageQueue;
278 	uint32				fSequenceNumber;
279 	uint32				fThreadsWaitingToReceive;
280 	uint32				fThreadsWaitingToSend;
281 
282 	ThreadQueue			fWaitingToReceive;
283 	ThreadQueue			fWaitingToSend;
284 
285 	XsiMessageQueue*	fLink;
286 };
287 
288 
289 // Xsi message queue hash table
290 struct MessageQueueHashTableDefinition {
291 	typedef int					KeyType;
292 	typedef XsiMessageQueue		ValueType;
293 
294 	size_t HashKey (const int key) const
295 	{
296 		return (size_t)key;
297 	}
298 
299 	size_t Hash(XsiMessageQueue *variable) const
300 	{
301 		return (size_t)variable->ID();
302 	}
303 
304 	bool Compare(const int key, XsiMessageQueue *variable) const
305 	{
306 		return (int)key == (int)variable->ID();
307 	}
308 
309 	XsiMessageQueue*& GetLink(XsiMessageQueue *variable) const
310 	{
311 		return variable->Link();
312 	}
313 };
314 
315 
316 // IPC class
317 class Ipc {
318 public:
319 	Ipc(key_t key)
320 		: fKey(key),
321 		fMessageQueueId(-1)
322 	{
323 	}
324 
325 	key_t Key() const
326 	{
327 		return fKey;
328 	}
329 
330 	int MessageQueueID() const
331 	{
332 		return fMessageQueueId;
333 	}
334 
335 	void SetMessageQueueID(XsiMessageQueue *messageQueue)
336 	{
337 		fMessageQueueId = messageQueue->ID();
338 	}
339 
340 	Ipc*& Link()
341 	{
342 		return fLink;
343 	}
344 
345 private:
346 	key_t				fKey;
347 	int					fMessageQueueId;
348 	Ipc*				fLink;
349 };
350 
351 
352 struct IpcHashTableDefinition {
353 	typedef key_t	KeyType;
354 	typedef Ipc		ValueType;
355 
356 	size_t HashKey (const key_t key) const
357 	{
358 		return (size_t)(key);
359 	}
360 
361 	size_t Hash(Ipc *variable) const
362 	{
363 		return (size_t)HashKey(variable->Key());
364 	}
365 
366 	bool Compare(const key_t key, Ipc *variable) const
367 	{
368 		return (key_t)key == (key_t)variable->Key();
369 	}
370 
371 	Ipc*& GetLink(Ipc *variable) const
372 	{
373 		return variable->Link();
374 	}
375 };
376 
377 // Arbitrary limits
378 #define MAX_XSI_MESSAGE			4096
379 #define MAX_XSI_MESSAGE_QUEUE	1024
380 static BOpenHashTable<IpcHashTableDefinition> sIpcHashTable;
381 static BOpenHashTable<MessageQueueHashTableDefinition> sMessageQueueHashTable;
382 
383 static mutex sIpcLock;
384 static mutex sXsiMessageQueueLock;
385 
386 static uint32 sGlobalSequenceNumber = 1;
387 static int32 sXsiMessageCount = 0;
388 static int32 sXsiMessageQueueCount = 0;
389 
390 
391 //	#pragma mark -
392 
393 
394 XsiMessageQueue::~XsiMessageQueue()
395 {
396 	mutex_destroy(&fLock);
397 
398 	// Wake up any threads still waiting
399 	if (fThreadsWaitingToSend || fThreadsWaitingToReceive) {
400 		while (queued_thread *entry = fWaitingToReceive.RemoveHead()) {
401 			entry->queued = false;
402 			thread_unblock(entry->thread, EIDRM);
403 		}
404 		while (queued_thread *entry = fWaitingToSend.RemoveHead()) {
405 			entry->queued = false;
406 			thread_unblock(entry->thread, EIDRM);
407 		}
408 	}
409 
410 	// Free up any remaining messages
411 	if (fMessageQueue.msg_qnum) {
412 		while (queued_message *message = fMessage.RemoveHead()) {
413 			atomic_add(&sXsiMessageCount, -1);
414 			delete message;
415 		}
416 	}
417 }
418 
419 
420 bool
421 XsiMessageQueue::Insert(queued_message *message)
422 {
423 	// The only situation that would make us (potentially) wait
424 	// is that we exceed with bytes or with the total number of messages
425 	if (fBytesInQueue + message->length > fMessageQueue.msg_qbytes)
426 		return true;
427 
428 	while (true) {
429 		int32 oldCount = atomic_get(&sXsiMessageCount);
430 		if (oldCount >= MAX_XSI_MESSAGE)
431 			return true;
432 		// If another thread updates the counter we keep
433 		// iterating
434 		if (atomic_test_and_set(&sXsiMessageCount, oldCount + 1, oldCount)
435 			== oldCount)
436 			break;
437 	}
438 
439 	fMessage.Add(message);
440 	fMessageQueue.msg_qnum++;
441 	fMessageQueue.msg_lspid = getpid();
442 	fMessageQueue.msg_stime = real_time_clock();
443 	fBytesInQueue += message->length;
444 	if (fThreadsWaitingToReceive)
445 		WakeUpThread(true /* WaitForMessage */);
446 	return false;
447 }
448 
449 
450 queued_message*
451 XsiMessageQueue::Remove(long typeRequested)
452 {
453 	queued_message *message = NULL;
454 	if (typeRequested < 0) {
455 		// Return first message of the lowest type
456 		// that is less than or equal to the absolute
457 		// value of type requested.
458 		MessageQueue::Iterator iterator = fMessage.GetIterator();
459 		while (iterator.HasNext()) {
460 			queued_message *current = iterator.Next();
461 			if (current->type <= -typeRequested) {
462 				message = iterator.Remove();
463 				break;
464 			}
465 		}
466 	} else if (typeRequested == 0) {
467 		// Return the first message on the queue
468 		message = fMessage.RemoveHead();
469 	} else {
470 		// Return the first message of type requested
471 		MessageQueue::Iterator iterator = fMessage.GetIterator();
472 		while (iterator.HasNext()) {
473 			queued_message *current = iterator.Next();
474 			if (current->type == typeRequested) {
475 				message = iterator.Remove();
476 				break;
477 			}
478 		}
479 	}
480 
481 	if (message == NULL)
482 		return NULL;
483 
484 	fMessageQueue.msg_qnum--;
485 	fMessageQueue.msg_lrpid = getpid();
486 	fMessageQueue.msg_rtime = real_time_clock();
487 	fBytesInQueue -= message->length;
488 	atomic_add(&sXsiMessageCount, -1);
489 	if (fThreadsWaitingToSend)
490 		WakeUpThread(false /* WaitForMessage */);
491 	return message;
492 }
493 
494 
495 void
496 XsiMessageQueue::SetID()
497 {
498 	fID = real_time_clock();
499 	// The lock is held before calling us
500 	while (true) {
501 		if (sMessageQueueHashTable.Lookup(fID) == NULL)
502 			break;
503 		fID++;
504 	}
505 	sGlobalSequenceNumber = (sGlobalSequenceNumber + 1) % UINT_MAX;
506 	fSequenceNumber = sGlobalSequenceNumber;
507 }
508 
509 
510 //	#pragma mark - Kernel exported API
511 
512 
513 void
514 xsi_msg_init()
515 {
516 	// Initialize hash tables
517 	status_t status = sIpcHashTable.Init();
518 	if (status != B_OK)
519 		panic("xsi_msg_init() failed to initialize ipc hash table\n");
520 	status =  sMessageQueueHashTable.Init();
521 	if (status != B_OK)
522 		panic("xsi_msg_init() failed to initialize message queue hash table\n");
523 
524 	mutex_init(&sIpcLock, "global POSIX message queue IPC table");
525 	mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table");
526 }
527 
528 
529 //	#pragma mark - Syscalls
530 
531 
532 int
533 _user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer)
534 {
535 	TRACE(("xsi_msgctl: messageQueueID = %d, command = %d\n", messageQueueID, command));
536 	MutexLocker ipcHashLocker(sIpcLock);
537 	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
538 	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
539 	if (messageQueue == NULL) {
540 		TRACE_ERROR(("xsi_msgctl: message queue id %d not valid\n", messageQueueID));
541 		return EINVAL;
542 	}
543 	if (!IS_USER_ADDRESS(buffer)) {
544 		TRACE_ERROR(("xsi_msgctl: buffer address is not valid\n"));
545 		return B_BAD_ADDRESS;
546 	}
547 
548 	// Lock the message queue itself and release both the ipc hash table lock
549 	// and the message queue hash table lock _only_ if the command it's not
550 	// IPC_RMID, this prevents undesidered situation from happening while
551 	// (hopefully) improving the concurrency.
552 	MutexLocker messageQueueLocker;
553 	if (command != IPC_RMID) {
554 		messageQueueLocker.SetTo(&messageQueue->Lock(), false);
555 		messageQueueHashLocker.Unlock();
556 		ipcHashLocker.Unlock();
557 	} else
558 		// Since we are going to delete the message queue object
559 		// along with its mutex, we can't use a MutexLocker object,
560 		// as the mutex itself won't exist on function exit
561 		mutex_lock(&messageQueue->Lock());
562 
563 	switch (command) {
564 		case IPC_STAT: {
565 			if (!messageQueue->HasReadPermission()) {
566 				TRACE_ERROR(("xsi_msgctl: calling process has not read "
567 					"permission on message queue %d, key %d\n", messageQueueID,
568 					(int)messageQueue->IpcKey()));
569 				return EACCES;
570 			}
571 			struct msqid_ds msg = messageQueue->GetMessageQueue();
572 			if (user_memcpy(buffer, &msg, sizeof(struct msqid_ds)) < B_OK) {
573 				TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
574 				return B_BAD_ADDRESS;
575 			}
576 			break;
577 		}
578 
579 		case IPC_SET: {
580 			if (!messageQueue->HasPermission()) {
581 				TRACE_ERROR(("xsi_msgctl: calling process has not permission "
582 					"on message queue %d, key %d\n", messageQueueID,
583 					(int)messageQueue->IpcKey()));
584 				return EPERM;
585 			}
586 			struct msqid_ds msg;
587 			if (user_memcpy(&msg, buffer, sizeof(struct msqid_ds)) < B_OK) {
588 				TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
589 				return B_BAD_ADDRESS;
590 			}
591 			if (msg.msg_qbytes > messageQueue->MaxBytes() && getuid() != 0) {
592 				TRACE_ERROR(("xsi_msgctl: user does not have permission to "
593 					"increase the maximum number of bytes allowed on queue\n"));
594 				return EPERM;
595 			}
596 			if (msg.msg_qbytes == 0) {
597 				TRACE_ERROR(("xsi_msgctl: can't set msg_qbytes to 0!\n"));
598 				return EINVAL;
599 			}
600 
601 			messageQueue->DoIpcSet(&msg);
602 			break;
603 		}
604 
605 		case IPC_RMID: {
606 			// If this was the command, we are still holding the message
607 			// queue hash table lock along with the ipc one, but not the
608 			// message queue lock itself. This prevents other process
609 			// to try and acquire a destroyed mutex
610 			if (!messageQueue->HasPermission()) {
611 				TRACE_ERROR(("xsi_msgctl: calling process has not permission "
612 					"on message queue %d, key %d\n", messageQueueID,
613 					(int)messageQueue->IpcKey()));
614 				return EPERM;
615 			}
616 			key_t key = messageQueue->IpcKey();
617 			Ipc *ipcKey = NULL;
618 			if (key != -1) {
619 				ipcKey = sIpcHashTable.Lookup(key);
620 				sIpcHashTable.Remove(ipcKey);
621 			}
622 			sMessageQueueHashTable.Remove(messageQueue);
623 			// Wake up of any threads waiting on this
624 			// queue happens in destructor
625 			if (key != -1)
626 				delete ipcKey;
627 			atomic_add(&sXsiMessageQueueCount, -1);
628 
629 			delete messageQueue;
630 			break;
631 		}
632 
633 		default:
634 			TRACE_ERROR(("xsi_semctl: command %d not valid\n", command));
635 			return EINVAL;
636 	}
637 
638 	return B_OK;
639 }
640 
641 
642 int
643 _user_xsi_msgget(key_t key, int flags)
644 {
645 	TRACE(("xsi_msgget: key = %d, flags = %d\n", (int)key, flags));
646 	XsiMessageQueue *messageQueue = NULL;
647 	Ipc *ipcKey = NULL;
648 	// Default assumptions
649 	bool isPrivate = true;
650 	bool create = true;
651 
652 	if (key != IPC_PRIVATE) {
653 		isPrivate = false;
654 		// Check if key already exist, if it does it already has a message
655 		// queue associated with it
656 		ipcKey = sIpcHashTable.Lookup(key);
657 		if (ipcKey == NULL) {
658 			if (!(flags & IPC_CREAT)) {
659 				TRACE_ERROR(("xsi_msgget: key %d does not exist, but the "
660 					"caller did not ask for creation\n", (int)key));
661 				return ENOENT;
662 			}
663 			ipcKey = new(std::nothrow) Ipc(key);
664 			if (ipcKey == NULL) {
665 				TRACE_ERROR(("xsi_msgget: failed to create new Ipc object "
666 					"for key %d\n", (int)key));
667 				return ENOMEM;
668 			}
669 			sIpcHashTable.Insert(ipcKey);
670 		} else {
671 			// The IPC key exist and it already has a message queue
672 			if ((flags & IPC_CREAT) && (flags & IPC_EXCL)) {
673 				TRACE_ERROR(("xsi_msgget: key %d already exist\n", (int)key));
674 				return EEXIST;
675 			}
676 			int messageQueueID = ipcKey->MessageQueueID();
677 
678 			MutexLocker _(sXsiMessageQueueLock);
679 			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
680 			if (!messageQueue->HasPermission()) {
681 				TRACE_ERROR(("xsi_msgget: calling process has not permission "
682 					"on message queue %d, key %d\n", messageQueue->ID(),
683 					(int)key));
684 				return EACCES;
685 			}
686 			create = false;
687 		}
688 	}
689 
690 	if (create) {
691 		// Create a new message queue for this key
692 		if (atomic_get(&sXsiMessageQueueCount) >= MAX_XSI_MESSAGE_QUEUE) {
693 			TRACE_ERROR(("xsi_msgget: reached limit of maximun number of "
694 				"message queues\n"));
695 			return ENOSPC;
696 		}
697 
698 		messageQueue = new(std::nothrow) XsiMessageQueue(flags);
699 		if (messageQueue == NULL) {
700 			TRACE_ERROR(("xsi_msgget: failed to allocate new xsi "
701 				"message queue\n"));
702 			return ENOMEM;
703 		}
704 		atomic_add(&sXsiMessageQueueCount, 1);
705 
706 		MutexLocker _(sXsiMessageQueueLock);
707 		messageQueue->SetID();
708 		if (isPrivate)
709 			messageQueue->SetIpcKey((key_t)-1);
710 		else {
711 			messageQueue->SetIpcKey(key);
712 			ipcKey->SetMessageQueueID(messageQueue);
713 		}
714 		sMessageQueueHashTable.Insert(messageQueue);
715 	}
716 
717 	return messageQueue->ID();
718 }
719 
720 
721 ssize_t
722 _user_xsi_msgrcv(int messageQueueID, void *messagePointer,
723 	size_t messageSize, long messageType, int messageFlags)
724 {
725 	TRACE(("xsi_msgrcv: messageQueueID = %d, messageSize = %ld\n",
726 		messageQueueID, messageSize));
727 	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
728 	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
729 	if (messageQueue == NULL) {
730 		TRACE_ERROR(("xsi_msgrcv: message queue id %d not valid\n",
731 			messageQueueID));
732 		return EINVAL;
733 	}
734 	MutexLocker messageQueueLocker(messageQueue->Lock());
735 	messageQueueHashLocker.Unlock();
736 
737 	if (messageSize > MAX_BYTES_PER_QUEUE) {
738 		TRACE_ERROR(("xsi_msgrcv: message size is out of range\n"));
739 		return EINVAL;
740 	}
741 	if (!messageQueue->HasPermission()) {
742 		TRACE_ERROR(("xsi_msgrcv: calling process has not permission "
743 			"on message queue id %d, key %d\n", messageQueueID,
744 			(int)messageQueue->IpcKey()));
745 		return EACCES;
746 	}
747 	if (!IS_USER_ADDRESS(messagePointer)) {
748 		TRACE_ERROR(("xsi_msgrcv: message address is not valid\n"));
749 		return B_BAD_ADDRESS;
750 	}
751 
752 	queued_message *message = NULL;
753 	while (true) {
754 		message = messageQueue->Remove(messageType);
755 
756 		if (message == NULL && !(messageFlags & IPC_NOWAIT)) {
757 			// We are going to sleep
758 			Thread *thread = thread_get_current_thread();
759 			queued_thread queueEntry(thread, messageSize);
760 			messageQueue->Enqueue(&queueEntry, /* waitForMessage */ true);
761 
762 			uint32 sequenceNumber = messageQueue->SequenceNumber();
763 
764 			TRACE(("xsi_msgrcv: thread %d going to sleep\n", (int)thread->id));
765 			status_t result
766 				= messageQueue->BlockAndUnlock(thread, &messageQueueLocker);
767 			TRACE(("xsi_msgrcv: thread %d back to life\n", (int)thread->id));
768 
769 			messageQueueHashLocker.Lock();
770 			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
771 			if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
772 				&& sequenceNumber != messageQueue->SequenceNumber())) {
773 				TRACE_ERROR(("xsi_msgrcv: message queue id %d (sequence = "
774 					"%" B_PRIu32 ") got destroyed\n", messageQueueID,
775 					sequenceNumber));
776 				return EIDRM;
777 			} else if (result == B_INTERRUPTED) {
778 				TRACE_ERROR(("xsi_msgrcv: thread %d got interrupted while "
779 					"waiting on message queue %d\n",(int)thread->id,
780 					messageQueueID));
781 				messageQueue->Deque(&queueEntry, /* waitForMessage */ true);
782 				return EINTR;
783 			} else {
784 				messageQueueLocker.Lock();
785 				messageQueueHashLocker.Unlock();
786 			}
787 		} else if (message == NULL) {
788 			// There is not message of type requested and
789 			// we can't wait
790 			return ENOMSG;
791 		} else {
792 			// Message received correctly (so far)
793 			if ((ssize_t)messageSize < message->length
794 				&& !(messageFlags & MSG_NOERROR)) {
795 				TRACE_ERROR(("xsi_msgrcv: message too big!\n"));
796 				// Put the message back inside. Since we hold the
797 				// queue message lock, not one else could have filled
798 				// up the queue meanwhile
799 				messageQueue->Insert(message);
800 				return E2BIG;
801 			}
802 
803 			ssize_t result
804 				= message->copy_to_user_buffer(messagePointer, messageSize);
805 			if (result < 0) {
806 				messageQueue->Insert(message);
807 				return B_BAD_ADDRESS;
808 			}
809 
810 			delete message;
811 			TRACE(("xsi_msgrcv: message received correctly\n"));
812 			return result;
813 		}
814 	}
815 
816 	return B_OK;
817 }
818 
819 
820 int
821 _user_xsi_msgsnd(int messageQueueID, const void *messagePointer,
822 	size_t messageSize, int messageFlags)
823 {
824 	TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %ld\n",
825 		messageQueueID, messageSize));
826 	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
827 	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
828 	if (messageQueue == NULL) {
829 		TRACE_ERROR(("xsi_msgsnd: message queue id %d not valid\n",
830 			messageQueueID));
831 		return EINVAL;
832 	}
833 	MutexLocker messageQueueLocker(messageQueue->Lock());
834 	messageQueueHashLocker.Unlock();
835 
836 	if (messageSize > MAX_BYTES_PER_QUEUE) {
837 		TRACE_ERROR(("xsi_msgsnd: message size is out of range\n"));
838 		return EINVAL;
839 	}
840 	if (!messageQueue->HasPermission()) {
841 		TRACE_ERROR(("xsi_msgsnd: calling process has not permission "
842 			"on message queue id %d, key %d\n", messageQueueID,
843 			(int)messageQueue->IpcKey()));
844 		return EACCES;
845 	}
846 	if (!IS_USER_ADDRESS(messagePointer)) {
847 		TRACE_ERROR(("xsi_msgsnd: message address is not valid\n"));
848 		return B_BAD_ADDRESS;
849 	}
850 
851 	queued_message *message
852 		= new(std::nothrow) queued_message(messagePointer, messageSize);
853 	if (message == NULL || message->initOK != true) {
854 		TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n"));
855 		delete message;
856 		return ENOMEM;
857 	}
858 
859 	bool notSent = true;
860 	status_t result = B_OK;
861 	while (notSent) {
862 		bool goToSleep = messageQueue->Insert(message);
863 
864 		if (goToSleep && !(messageFlags & IPC_NOWAIT)) {
865 			// We are going to sleep
866 			Thread *thread = thread_get_current_thread();
867 			queued_thread queueEntry(thread, messageSize);
868 			messageQueue->Enqueue(&queueEntry, /* waitForMessage */ false);
869 
870 			uint32 sequenceNumber = messageQueue->SequenceNumber();
871 
872 			TRACE(("xsi_msgsnd: thread %d going to sleep\n", (int)thread->id));
873 			result = messageQueue->BlockAndUnlock(thread, &messageQueueLocker);
874 			TRACE(("xsi_msgsnd: thread %d back to life\n", (int)thread->id));
875 
876 			messageQueueHashLocker.Lock();
877 			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
878 			if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
879 				&& sequenceNumber != messageQueue->SequenceNumber())) {
880 				TRACE_ERROR(("xsi_msgsnd: message queue id %d (sequence = "
881 					"%" B_PRIu32 ") got destroyed\n", messageQueueID,
882 					sequenceNumber));
883 				delete message;
884 				notSent = false;
885 				result = EIDRM;
886 			} else if (result == B_INTERRUPTED) {
887 				TRACE_ERROR(("xsi_msgsnd: thread %d got interrupted while "
888 					"waiting on message queue %d\n",(int)thread->id,
889 					messageQueueID));
890 				messageQueue->Deque(&queueEntry, /* waitForMessage */ false);
891 				delete message;
892 				notSent = false;
893 				result = EINTR;
894 			} else {
895 				messageQueueLocker.Lock();
896 				messageQueueHashLocker.Unlock();
897 			}
898 		} else if (goToSleep) {
899 			// We did not send the message and we can't wait
900 			delete message;
901 			notSent = false;
902 			result = EAGAIN;
903 		} else {
904 			// Message delivered correctly
905 			TRACE(("xsi_msgsnd: message sent correctly\n"));
906 			notSent = false;
907 		}
908 	}
909 
910 	return result;
911 }
912