xref: /haiku/src/system/kernel/posix/xsi_message_queue.cpp (revision b46615c55ad2c8fe6de54412055a0713da3d610a)
1 /*
2  * Copyright 2008-2011, Haiku, Inc. All rights reserved.
3  * Distributed under the terms of the MIT License.
4  *
5  * Authors:
6  *		Salvatore Benedetto <salvatore.benedetto@gmail.com>
7  */
8 
9 #include <posix/xsi_message_queue.h>
10 
11 #include <new>
12 
13 #include <sys/ipc.h>
14 #include <sys/types.h>
15 
16 #include <OS.h>
17 
18 #include <kernel.h>
19 #include <syscall_restart.h>
20 
21 #include <util/atomic.h>
22 #include <util/AutoLock.h>
23 #include <util/DoublyLinkedList.h>
24 #include <util/OpenHashTable.h>
25 
26 
27 #define TRACE_XSI_MSG_QUEUE
28 #ifdef TRACE_XSI_MSG_QUEUE
29 #	define TRACE(x)			dprintf x
30 #	define TRACE_ERROR(x)	dprintf x
31 #else
32 #	define TRACE(x)			/* nothing */
33 #	define TRACE_ERROR(x)	dprintf x
34 #endif
35 
36 // Queue for holding blocked threads
37 struct queued_thread : DoublyLinkedListLinkImpl<queued_thread> {
38 	queued_thread(Thread *_thread, int32 _message_length)
39 		:
40 		thread(_thread),
41 		message_length(_message_length),
42 		queued(false)
43 	{
44 	}
45 
46 	Thread	*thread;
47 	int32	message_length;
48 	bool	queued;
49 };
50 
51 typedef DoublyLinkedList<queued_thread> ThreadQueue;
52 
53 
54 struct queued_message : DoublyLinkedListLinkImpl<queued_message> {
55 	queued_message(const void *_message, ssize_t _length)
56 		:
57 		initOK(false),
58 		length(_length)
59 	{
60 		message = (char *)malloc(sizeof(char) * _length);
61 		if (message == NULL)
62 			return;
63 
64 		if (user_memcpy(&type, _message, sizeof(long)) != B_OK
65 			|| user_memcpy(message, (void *)((char *)_message + sizeof(long)),
66 			_length) != B_OK) {
67 			free(message);
68 			return;
69 		}
70 		initOK = true;
71 	}
72 
73 	~queued_message()
74 	{
75 		if (initOK)
76 			free(message);
77 	}
78 
79 	ssize_t copy_to_user_buffer(void *_message, ssize_t _length)
80 	{
81 		if (_length > length)
82 			_length = length;
83 
84 		if (user_memcpy(_message, &type, sizeof(long)) != B_OK
85 			|| user_memcpy((void *)((char *)_message + sizeof(long)), message,
86 			_length) != B_OK)
87 			return B_ERROR;
88 		return _length;
89 	}
90 
91 	bool		initOK;
92 	ssize_t		length;
93 	char		*message;
94 	long		type;
95 };
96 
97 typedef DoublyLinkedList<queued_message> MessageQueue;
98 
99 // Arbitrary limit
100 #define MAX_BYTES_PER_QUEUE		2048
101 
102 class XsiMessageQueue {
103 public:
104 	XsiMessageQueue(int flags)
105 		:
106 		fBytesInQueue(0),
107 		fThreadsWaitingToReceive(0),
108 		fThreadsWaitingToSend(0)
109 	{
110 		mutex_init(&fLock, "XsiMessageQueue private mutex");
111 		SetIpcKey((key_t)-1);
112 		SetPermissions(flags);
113 		// Initialize all fields to zero
114 		memset((void *)&fMessageQueue, 0, sizeof(struct msqid_ds));
115 		fMessageQueue.msg_ctime = (time_t)real_time_clock();
116 		fMessageQueue.msg_qbytes = MAX_BYTES_PER_QUEUE;
117 	}
118 
119 	// Implemented after sXsiMessageCount is declared
120 	~XsiMessageQueue();
121 
122 	status_t BlockAndUnlock(Thread *thread, MutexLocker *queueLocker)
123 	{
124 		thread_prepare_to_block(thread, B_CAN_INTERRUPT,
125 				THREAD_BLOCK_TYPE_OTHER, (void*)"xsi message queue");
126 		// Unlock the queue before blocking
127 		queueLocker->Unlock();
128 
129 		InterruptsSpinLocker _(gThreadSpinlock);
130 // TODO: We've got a serious race condition: If BlockAndUnlock() returned due to
131 // interruption, we will still be queued. A WakeUpThread() at this point will
132 // call thread_unblock() and might thus screw with our trying to re-lock the
133 // mutex.
134 		return thread_block_locked(thread);
135 	}
136 
137 	void DoIpcSet(struct msqid_ds *result)
138 	{
139 		fMessageQueue.msg_perm.uid = result->msg_perm.uid;
140 		fMessageQueue.msg_perm.gid = result->msg_perm.gid;
141 		fMessageQueue.msg_perm.mode = (fMessageQueue.msg_perm.mode & ~0x01ff)
142 			| (result->msg_perm.mode & 0x01ff);
143 		fMessageQueue.msg_qbytes = result->msg_qbytes;
144 		fMessageQueue.msg_ctime = (time_t)real_time_clock();
145 	}
146 
147 	void Deque(queued_thread *queueEntry, bool waitForMessage)
148 	{
149 		if (queueEntry->queued) {
150 			if (waitForMessage) {
151 				fWaitingToReceive.Remove(queueEntry);
152 				fThreadsWaitingToReceive--;
153 			} else {
154 				fWaitingToSend.Remove(queueEntry);
155 				fThreadsWaitingToSend--;
156 			}
157 		}
158 	}
159 
160 	void Enqueue(queued_thread *queueEntry, bool waitForMessage)
161 	{
162 		if (waitForMessage) {
163 			fWaitingToReceive.Add(queueEntry);
164 			fThreadsWaitingToReceive++;
165 		} else {
166 			fWaitingToSend.Add(queueEntry);
167 			fThreadsWaitingToSend++;
168 		}
169 		queueEntry->queued = true;
170 	}
171 
172 	struct msqid_ds &GetMessageQueue()
173 	{
174 		return fMessageQueue;
175 	}
176 
177 	bool HasPermission() const
178 	{
179 		if ((fMessageQueue.msg_perm.mode & S_IWOTH) != 0)
180 			return true;
181 
182 		uid_t uid = geteuid();
183 		if (uid == 0 || (uid == fMessageQueue.msg_perm.uid
184 			&& (fMessageQueue.msg_perm.mode & S_IWUSR) != 0))
185 			return true;
186 
187 		gid_t gid = getegid();
188 		if (gid == fMessageQueue.msg_perm.gid
189 			&& (fMessageQueue.msg_perm.mode & S_IWGRP) != 0)
190 			return true;
191 
192 		return false;
193 	}
194 
195 	bool HasReadPermission() const
196 	{
197 		// TODO: fix this
198 		return HasPermission();
199 	}
200 
201 	int ID() const
202 	{
203 		return fID;
204 	}
205 
206 	// Implemented after sXsiMessageCount is declared
207 	bool Insert(queued_message *message);
208 
209 	key_t IpcKey() const
210 	{
211 		return fMessageQueue.msg_perm.key;
212 	}
213 
214 	mutex &Lock()
215 	{
216 		return fLock;
217 	}
218 
219 	msglen_t MaxBytes() const
220 	{
221 		return fMessageQueue.msg_qbytes;
222 	}
223 
224 	// Implemented after sXsiMessageCount is declared
225 	queued_message *Remove(long typeRequested);
226 
227 	uint32 SequenceNumber() const
228 	{
229 		return fSequenceNumber;
230 	}
231 
232 	// Implemented after sMessageQueueHashTable is declared
233 	void SetID();
234 
235 	void SetIpcKey(key_t key)
236 	{
237 		fMessageQueue.msg_perm.key = key;
238 	}
239 
240 	void SetPermissions(int flags)
241 	{
242 		fMessageQueue.msg_perm.uid = fMessageQueue.msg_perm.cuid = geteuid();
243 		fMessageQueue.msg_perm.gid = fMessageQueue.msg_perm.cgid = getegid();
244 		fMessageQueue.msg_perm.mode = (flags & 0x01ff);
245 	}
246 
247 	void WakeUpThread(bool waitForMessage)
248 	{
249 		InterruptsSpinLocker _(gThreadSpinlock);
250 		if (waitForMessage) {
251 			// Wake up all waiting thread for a message
252 			// TODO: this can cause starvation for any
253 			// very-unlucky-and-slow thread
254 			while (queued_thread *entry = fWaitingToReceive.RemoveHead()) {
255 				entry->queued = false;
256 				fThreadsWaitingToReceive--;
257 				thread_unblock_locked(entry->thread, 0);
258 			}
259 		} else {
260 			// Wake up only one thread waiting to send
261 			if (queued_thread *entry = fWaitingToSend.RemoveHead()) {
262 				entry->queued = false;
263 				fThreadsWaitingToSend--;
264 				thread_unblock_locked(entry->thread, 0);
265 			}
266 		}
267 	}
268 
269 	XsiMessageQueue*& Link()
270 	{
271 		return fLink;
272 	}
273 
274 private:
275 	msglen_t			fBytesInQueue;
276 	int					fID;
277 	mutex				fLock;
278 	MessageQueue		fMessage;
279 	struct msqid_ds		fMessageQueue;
280 	uint32				fSequenceNumber;
281 	uint32				fThreadsWaitingToReceive;
282 	uint32				fThreadsWaitingToSend;
283 
284 	ThreadQueue			fWaitingToReceive;
285 	ThreadQueue			fWaitingToSend;
286 
287 	XsiMessageQueue*	fLink;
288 };
289 
290 
291 // Xsi message queue hash table
292 struct MessageQueueHashTableDefinition {
293 	typedef int					KeyType;
294 	typedef XsiMessageQueue		ValueType;
295 
296 	size_t HashKey (const int key) const
297 	{
298 		return (size_t)key;
299 	}
300 
301 	size_t Hash(XsiMessageQueue *variable) const
302 	{
303 		return (size_t)variable->ID();
304 	}
305 
306 	bool Compare(const int key, XsiMessageQueue *variable) const
307 	{
308 		return (int)key == (int)variable->ID();
309 	}
310 
311 	XsiMessageQueue*& GetLink(XsiMessageQueue *variable) const
312 	{
313 		return variable->Link();
314 	}
315 };
316 
317 
318 // IPC class
319 class Ipc {
320 public:
321 	Ipc(key_t key)
322 		: fKey(key),
323 		fMessageQueueId(-1)
324 	{
325 	}
326 
327 	key_t Key() const
328 	{
329 		return fKey;
330 	}
331 
332 	int MessageQueueID() const
333 	{
334 		return fMessageQueueId;
335 	}
336 
337 	void SetMessageQueueID(XsiMessageQueue *messageQueue)
338 	{
339 		fMessageQueueId = messageQueue->ID();
340 	}
341 
342 	Ipc*& Link()
343 	{
344 		return fLink;
345 	}
346 
347 private:
348 	key_t				fKey;
349 	int					fMessageQueueId;
350 	Ipc*				fLink;
351 };
352 
353 
354 struct IpcHashTableDefinition {
355 	typedef key_t	KeyType;
356 	typedef Ipc		ValueType;
357 
358 	size_t HashKey (const key_t key) const
359 	{
360 		return (size_t)(key);
361 	}
362 
363 	size_t Hash(Ipc *variable) const
364 	{
365 		return (size_t)HashKey(variable->Key());
366 	}
367 
368 	bool Compare(const key_t key, Ipc *variable) const
369 	{
370 		return (key_t)key == (key_t)variable->Key();
371 	}
372 
373 	Ipc*& GetLink(Ipc *variable) const
374 	{
375 		return variable->Link();
376 	}
377 };
378 
379 // Arbitrary limits
380 #define MAX_XSI_MESSAGE			4096
381 #define MAX_XSI_MESSAGE_QUEUE	1024
382 static BOpenHashTable<IpcHashTableDefinition> sIpcHashTable;
383 static BOpenHashTable<MessageQueueHashTableDefinition> sMessageQueueHashTable;
384 
385 static mutex sIpcLock;
386 static mutex sXsiMessageQueueLock;
387 
388 static uint32 sGlobalSequenceNumber = 1;
389 static vint32 sXsiMessageCount = 0;
390 static vint32 sXsiMessageQueueCount = 0;
391 
392 
393 //	#pragma mark -
394 
395 
396 XsiMessageQueue::~XsiMessageQueue()
397 {
398 	mutex_destroy(&fLock);
399 
400 	// Wake up any threads still waiting
401 	if (fThreadsWaitingToSend || fThreadsWaitingToReceive) {
402 		InterruptsSpinLocker _(gThreadSpinlock);
403 		while (queued_thread *entry = fWaitingToReceive.RemoveHead()) {
404 			entry->queued = false;
405 			thread_unblock_locked(entry->thread, EIDRM);
406 		}
407 		while (queued_thread *entry = fWaitingToSend.RemoveHead()) {
408 			entry->queued = false;
409 			thread_unblock_locked(entry->thread, EIDRM);
410 		}
411 	}
412 
413 	// Free up any remaining messages
414 	if (fMessageQueue.msg_qnum) {
415 		while (queued_message *message = fMessage.RemoveHead()) {
416 			atomic_add(&sXsiMessageCount, -1);
417 			delete message;
418 		}
419 	}
420 }
421 
422 
423 bool
424 XsiMessageQueue::Insert(queued_message *message)
425 {
426 	// The only situation that would make us (potentially) wait
427 	// is that we exceed with bytes or with the total number of messages
428 	if (fBytesInQueue + message->length > fMessageQueue.msg_qbytes)
429 		return true;
430 
431 	while (true) {
432 		int32 oldCount = atomic_get(&sXsiMessageCount);
433 		if (oldCount >= MAX_XSI_MESSAGE)
434 			return true;
435 		// If another thread updates the counter we keep
436 		// iterating
437 		if (atomic_test_and_set(&sXsiMessageCount, oldCount + 1, oldCount)
438 			== oldCount)
439 			break;
440 	}
441 
442 	fMessage.Add(message);
443 	fMessageQueue.msg_qnum++;
444 	fMessageQueue.msg_lspid = getpid();
445 	fMessageQueue.msg_stime = real_time_clock();
446 	fBytesInQueue += message->length;
447 	if (fThreadsWaitingToReceive)
448 		WakeUpThread(true /* WaitForMessage */);
449 	return false;
450 }
451 
452 
453 queued_message*
454 XsiMessageQueue::Remove(long typeRequested)
455 {
456 	queued_message *message = NULL;
457 	if (typeRequested < 0) {
458 		// Return first message of the lowest type
459 		// that is less than or equal to the absolute
460 		// value of type requested.
461 		MessageQueue::Iterator iterator = fMessage.GetIterator();
462 		while (iterator.HasNext()) {
463 			queued_message *current = iterator.Next();
464 			if (current->type <= -typeRequested) {
465 				message = iterator.Remove();
466 				break;
467 			}
468 		}
469 	} else if (typeRequested == 0) {
470 		// Return the first message on the queue
471 		message = fMessage.RemoveHead();
472 	} else {
473 		// Return the first message of type requested
474 		MessageQueue::Iterator iterator = fMessage.GetIterator();
475 		while (iterator.HasNext()) {
476 			queued_message *current = iterator.Next();
477 			if (current->type == typeRequested) {
478 				message = iterator.Remove();
479 				break;
480 			}
481 		}
482 	}
483 
484 	if (message == NULL)
485 		return NULL;
486 
487 	fMessageQueue.msg_qnum--;
488 	fMessageQueue.msg_lrpid = getpid();
489 	fMessageQueue.msg_rtime = real_time_clock();
490 	fBytesInQueue -= message->length;
491 	atomic_add(&sXsiMessageCount, -1);
492 	if (fThreadsWaitingToSend)
493 		WakeUpThread(false /* WaitForMessage */);
494 	return message;
495 }
496 
497 
498 void
499 XsiMessageQueue::SetID()
500 {
501 	fID = real_time_clock();
502 	// The lock is held before calling us
503 	while (true) {
504 		if (sMessageQueueHashTable.Lookup(fID) == NULL)
505 			break;
506 		fID++;
507 	}
508 	sGlobalSequenceNumber = (sGlobalSequenceNumber + 1) % UINT_MAX;
509 	fSequenceNumber = sGlobalSequenceNumber;
510 }
511 
512 
513 //	#pragma mark - Kernel exported API
514 
515 
516 void
517 xsi_msg_init()
518 {
519 	// Initialize hash tables
520 	status_t status = sIpcHashTable.Init();
521 	if (status != B_OK)
522 		panic("xsi_msg_init() failed to initialize ipc hash table\n");
523 	status =  sMessageQueueHashTable.Init();
524 	if (status != B_OK)
525 		panic("xsi_msg_init() failed to initialize message queue hash table\n");
526 
527 	mutex_init(&sIpcLock, "global POSIX message queue IPC table");
528 	mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table");
529 }
530 
531 
532 //	#pragma mark - Syscalls
533 
534 
535 int
536 _user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer)
537 {
538 	TRACE(("xsi_msgctl: messageQueueID = %d, command = %d\n", messageQueueID, command));
539 	MutexLocker ipcHashLocker(sIpcLock);
540 	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
541 	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
542 	if (messageQueue == NULL) {
543 		TRACE_ERROR(("xsi_msgctl: message queue id %d not valid\n", messageQueueID));
544 		return EINVAL;
545 	}
546 	if (!IS_USER_ADDRESS(buffer)) {
547 		TRACE_ERROR(("xsi_msgctl: buffer address is not valid\n"));
548 		return B_BAD_ADDRESS;
549 	}
550 
551 	// Lock the message queue itself and release both the ipc hash table lock
552 	// and the message queue hash table lock _only_ if the command it's not
553 	// IPC_RMID, this prevents undesidered situation from happening while
554 	// (hopefully) improving the concurrency.
555 	MutexLocker messageQueueLocker;
556 	if (command != IPC_RMID) {
557 		messageQueueLocker.SetTo(&messageQueue->Lock(), false);
558 		messageQueueHashLocker.Unlock();
559 		ipcHashLocker.Unlock();
560 	} else
561 		// Since we are going to delete the message queue object
562 		// along with its mutex, we can't use a MutexLocker object,
563 		// as the mutex itself won't exist on function exit
564 		mutex_lock(&messageQueue->Lock());
565 
566 	switch (command) {
567 		case IPC_STAT: {
568 			if (!messageQueue->HasReadPermission()) {
569 				TRACE_ERROR(("xsi_msgctl: calling process has not read "
570 					"permission on message queue %d, key %d\n", messageQueueID,
571 					(int)messageQueue->IpcKey()));
572 				return EACCES;
573 			}
574 			struct msqid_ds msg = messageQueue->GetMessageQueue();
575 			if (user_memcpy(buffer, &msg, sizeof(struct msqid_ds)) < B_OK) {
576 				TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
577 				return B_BAD_ADDRESS;
578 			}
579 			break;
580 		}
581 
582 		case IPC_SET: {
583 			if (!messageQueue->HasPermission()) {
584 				TRACE_ERROR(("xsi_msgctl: calling process has not permission "
585 					"on message queue %d, key %d\n", messageQueueID,
586 					(int)messageQueue->IpcKey()));
587 				return EPERM;
588 			}
589 			struct msqid_ds msg;
590 			if (user_memcpy(&msg, buffer, sizeof(struct msqid_ds)) < B_OK) {
591 				TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
592 				return B_BAD_ADDRESS;
593 			}
594 			if (msg.msg_qbytes > messageQueue->MaxBytes() && getuid() != 0) {
595 				TRACE_ERROR(("xsi_msgctl: user does not have permission to "
596 					"increase the maximum number of bytes allowed on queue\n"));
597 				return EPERM;
598 			}
599 			if (msg.msg_qbytes == 0) {
600 				TRACE_ERROR(("xsi_msgctl: can't set msg_qbytes to 0!\n"));
601 				return EINVAL;
602 			}
603 
604 			messageQueue->DoIpcSet(&msg);
605 			break;
606 		}
607 
608 		case IPC_RMID: {
609 			// If this was the command, we are still holding the message
610 			// queue hash table lock along with the ipc one, but not the
611 			// message queue lock itself. This prevents other process
612 			// to try and acquire a destroyed mutex
613 			if (!messageQueue->HasPermission()) {
614 				TRACE_ERROR(("xsi_msgctl: calling process has not permission "
615 					"on message queue %d, key %d\n", messageQueueID,
616 					(int)messageQueue->IpcKey()));
617 				return EPERM;
618 			}
619 			key_t key = messageQueue->IpcKey();
620 			Ipc *ipcKey = NULL;
621 			if (key != -1) {
622 				ipcKey = sIpcHashTable.Lookup(key);
623 				sIpcHashTable.Remove(ipcKey);
624 			}
625 			sMessageQueueHashTable.Remove(messageQueue);
626 			// Wake up of any threads waiting on this
627 			// queue happens in destructor
628 			if (key != -1)
629 				delete ipcKey;
630 			atomic_add(&sXsiMessageQueueCount, -1);
631 
632 			delete messageQueue;
633 			break;
634 		}
635 
636 		default:
637 			TRACE_ERROR(("xsi_semctl: command %d not valid\n", command));
638 			return EINVAL;
639 	}
640 
641 	return B_OK;
642 }
643 
644 
645 int
646 _user_xsi_msgget(key_t key, int flags)
647 {
648 	TRACE(("xsi_msgget: key = %d, flags = %d\n", (int)key, flags));
649 	XsiMessageQueue *messageQueue = NULL;
650 	Ipc *ipcKey = NULL;
651 	// Default assumptions
652 	bool isPrivate = true;
653 	bool create = true;
654 
655 	if (key != IPC_PRIVATE) {
656 		isPrivate = false;
657 		// Check if key already exist, if it does it already has a message
658 		// queue associated with it
659 		ipcKey = sIpcHashTable.Lookup(key);
660 		if (ipcKey == NULL) {
661 			if (!(flags & IPC_CREAT)) {
662 				TRACE_ERROR(("xsi_msgget: key %d does not exist, but the "
663 					"caller did not ask for creation\n", (int)key));
664 				return ENOENT;
665 			}
666 			ipcKey = new(std::nothrow) Ipc(key);
667 			if (ipcKey == NULL) {
668 				TRACE_ERROR(("xsi_msgget: failed to create new Ipc object "
669 					"for key %d\n", (int)key));
670 				return ENOMEM;
671 			}
672 			sIpcHashTable.Insert(ipcKey);
673 		} else {
674 			// The IPC key exist and it already has a message queue
675 			if ((flags & IPC_CREAT) && (flags & IPC_EXCL)) {
676 				TRACE_ERROR(("xsi_msgget: key %d already exist\n", (int)key));
677 				return EEXIST;
678 			}
679 			int messageQueueID = ipcKey->MessageQueueID();
680 
681 			MutexLocker _(sXsiMessageQueueLock);
682 			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
683 			if (!messageQueue->HasPermission()) {
684 				TRACE_ERROR(("xsi_msgget: calling process has not permission "
685 					"on message queue %d, key %d\n", messageQueue->ID(),
686 					(int)key));
687 				return EACCES;
688 			}
689 			create = false;
690 		}
691 	}
692 
693 	if (create) {
694 		// Create a new message queue for this key
695 		if (sXsiMessageQueueCount >= MAX_XSI_MESSAGE_QUEUE) {
696 			TRACE_ERROR(("xsi_msgget: reached limit of maximun number of "
697 				"message queues\n"));
698 			return ENOSPC;
699 		}
700 
701 		messageQueue = new(std::nothrow) XsiMessageQueue(flags);
702 		if (messageQueue == NULL) {
703 			TRACE_ERROR(("xsi_msgget: failed to allocate new xsi "
704 				"message queue\n"));
705 			return ENOMEM;
706 		}
707 		atomic_add(&sXsiMessageQueueCount, 1);
708 
709 		MutexLocker _(sXsiMessageQueueLock);
710 		messageQueue->SetID();
711 		if (isPrivate)
712 			messageQueue->SetIpcKey((key_t)-1);
713 		else {
714 			messageQueue->SetIpcKey(key);
715 			ipcKey->SetMessageQueueID(messageQueue);
716 		}
717 		sMessageQueueHashTable.Insert(messageQueue);
718 	}
719 
720 	return messageQueue->ID();
721 }
722 
723 
724 ssize_t
725 _user_xsi_msgrcv(int messageQueueID, void *messagePointer,
726 	size_t messageSize, long messageType, int messageFlags)
727 {
728 	TRACE(("xsi_msgrcv: messageQueueID = %d, messageSize = %ld\n",
729 		messageQueueID, messageSize));
730 	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
731 	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
732 	if (messageQueue == NULL) {
733 		TRACE_ERROR(("xsi_msgrcv: message queue id %d not valid\n",
734 			messageQueueID));
735 		return EINVAL;
736 	}
737 	MutexLocker messageQueueLocker(messageQueue->Lock());
738 	messageQueueHashLocker.Unlock();
739 
740 	if (messageSize < 0 || messageSize > MAX_BYTES_PER_QUEUE) {
741 		TRACE_ERROR(("xsi_msgrcv: message size is out of range\n"));
742 		return EINVAL;
743 	}
744 	if (!messageQueue->HasPermission()) {
745 		TRACE_ERROR(("xsi_msgrcv: calling process has not permission "
746 			"on message queue id %d, key %d\n", messageQueueID,
747 			(int)messageQueue->IpcKey()));
748 		return EACCES;
749 	}
750 	if (!IS_USER_ADDRESS(messagePointer)) {
751 		TRACE_ERROR(("xsi_msgrcv: message address is not valid\n"));
752 		return B_BAD_ADDRESS;
753 	}
754 
755 	queued_message *message = NULL;
756 	while (true) {
757 		message = messageQueue->Remove(messageType);
758 
759 		if (message == NULL && !(messageFlags & IPC_NOWAIT)) {
760 			// We are going to sleep
761 			Thread *thread = thread_get_current_thread();
762 			queued_thread queueEntry(thread, messageSize);
763 			messageQueue->Enqueue(&queueEntry, /* waitForMessage */ true);
764 
765 			uint32 sequenceNumber = messageQueue->SequenceNumber();
766 
767 			TRACE(("xsi_msgrcv: thread %d going to sleep\n", (int)thread->id));
768 			status_t result
769 				= messageQueue->BlockAndUnlock(thread, &messageQueueLocker);
770 			TRACE(("xsi_msgrcv: thread %d back to life\n", (int)thread->id));
771 
772 			messageQueueHashLocker.Lock();
773 			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
774 			if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
775 				&& sequenceNumber != messageQueue->SequenceNumber())) {
776 				TRACE_ERROR(("xsi_msgrcv: message queue id %d (sequence = %ld) "
777 					"got destroyed\n", messageQueueID, sequenceNumber));
778 				return EIDRM;
779 			} else if (result == B_INTERRUPTED) {
780 				TRACE_ERROR(("xsi_msgrcv: thread %d got interrupted while "
781 					"waiting on message queue %d\n",(int)thread->id,
782 					messageQueueID));
783 				messageQueue->Deque(&queueEntry, /* waitForMessage */ true);
784 				return EINTR;
785 			} else {
786 				messageQueueLocker.Lock();
787 				messageQueueHashLocker.Unlock();
788 			}
789 		} else if (message == NULL) {
790 			// There is not message of type requested and
791 			// we can't wait
792 			return ENOMSG;
793 		} else {
794 			// Message received correctly (so far)
795 			if ((ssize_t)messageSize < message->length
796 				&& !(messageFlags & MSG_NOERROR)) {
797 				TRACE_ERROR(("xsi_msgrcv: message too big!\n"));
798 				// Put the message back inside. Since we hold the
799 				// queue message lock, not one else could have filled
800 				// up the queue meanwhile
801 				messageQueue->Insert(message);
802 				return E2BIG;
803 			}
804 
805 			ssize_t result
806 				= message->copy_to_user_buffer(messagePointer, messageSize);
807 			if (result < 0) {
808 				messageQueue->Insert(message);
809 				return B_BAD_ADDRESS;
810 			}
811 
812 			delete message;
813 			TRACE(("xsi_msgrcv: message received correctly\n"));
814 			return result;
815 		}
816 	}
817 
818 	return B_OK;
819 }
820 
821 
822 int
823 _user_xsi_msgsnd(int messageQueueID, const void *messagePointer,
824 	size_t messageSize, int messageFlags)
825 {
826 	TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %ld\n",
827 		messageQueueID, messageSize));
828 	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
829 	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
830 	if (messageQueue == NULL) {
831 		TRACE_ERROR(("xsi_msgsnd: message queue id %d not valid\n",
832 			messageQueueID));
833 		return EINVAL;
834 	}
835 	MutexLocker messageQueueLocker(messageQueue->Lock());
836 	messageQueueHashLocker.Unlock();
837 
838 	if (messageSize < 0 || messageSize > MAX_BYTES_PER_QUEUE) {
839 		TRACE_ERROR(("xsi_msgsnd: message size is out of range\n"));
840 		return EINVAL;
841 	}
842 	if (!messageQueue->HasPermission()) {
843 		TRACE_ERROR(("xsi_msgsnd: calling process has not permission "
844 			"on message queue id %d, key %d\n", messageQueueID,
845 			(int)messageQueue->IpcKey()));
846 		return EACCES;
847 	}
848 	if (!IS_USER_ADDRESS(messagePointer)) {
849 		TRACE_ERROR(("xsi_msgsnd: message address is not valid\n"));
850 		return B_BAD_ADDRESS;
851 	}
852 
853 	queued_message *message
854 		= new(std::nothrow) queued_message(messagePointer, messageSize);
855 	if (message == NULL || message->initOK != true) {
856 		TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n"));
857 		delete message;
858 		return ENOMEM;
859 	}
860 
861 	bool notSent = true;
862 	status_t result = B_OK;
863 	while (notSent) {
864 		bool goToSleep = messageQueue->Insert(message);
865 
866 		if (goToSleep && !(messageFlags & IPC_NOWAIT)) {
867 			// We are going to sleep
868 			Thread *thread = thread_get_current_thread();
869 			queued_thread queueEntry(thread, messageSize);
870 			messageQueue->Enqueue(&queueEntry, /* waitForMessage */ false);
871 
872 			uint32 sequenceNumber = messageQueue->SequenceNumber();
873 
874 			TRACE(("xsi_msgsnd: thread %d going to sleep\n", (int)thread->id));
875 			result = messageQueue->BlockAndUnlock(thread, &messageQueueLocker);
876 			TRACE(("xsi_msgsnd: thread %d back to life\n", (int)thread->id));
877 
878 			messageQueueHashLocker.Lock();
879 			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
880 			if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
881 				&& sequenceNumber != messageQueue->SequenceNumber())) {
882 				TRACE_ERROR(("xsi_msgsnd: message queue id %d (sequence = %ld) "
883 					"got destroyed\n", messageQueueID, sequenceNumber));
884 				delete message;
885 				notSent = false;
886 				result = EIDRM;
887 			} else if (result == B_INTERRUPTED) {
888 				TRACE_ERROR(("xsi_msgsnd: thread %d got interrupted while "
889 					"waiting on message queue %d\n",(int)thread->id,
890 					messageQueueID));
891 				messageQueue->Deque(&queueEntry, /* waitForMessage */ false);
892 				delete message;
893 				notSent = false;
894 				result = EINTR;
895 			} else {
896 				messageQueueLocker.Lock();
897 				messageQueueHashLocker.Unlock();
898 			}
899 		} else if (goToSleep) {
900 			// We did not send the message and we can't wait
901 			delete message;
902 			notSent = false;
903 			result = EAGAIN;
904 		} else {
905 			// Message delivered correctly
906 			TRACE(("xsi_msgsnd: message sent correctly\n"));
907 			notSent = false;
908 		}
909 	}
910 
911 	return result;
912 }
913