1 /*
2 * Copyright 2008-2023, Haiku, Inc. All rights reserved.
3 * Distributed under the terms of the MIT License.
4 *
5 * Authors:
6 * Salvatore Benedetto <salvatore.benedetto@gmail.com>
7 */
8
9 #include <posix/xsi_message_queue.h>
10
11 #include <new>
12
13 #include <sys/ipc.h>
14 #include <sys/types.h>
15
16 #include <OS.h>
17
18 #include <kernel.h>
19 #include <syscall_restart.h>
20
21 #include <util/atomic.h>
22 #include <util/AutoLock.h>
23 #include <util/DoublyLinkedList.h>
24 #include <util/OpenHashTable.h>
25
26
27 #define TRACE_XSI_MSG_QUEUE
28 #ifdef TRACE_XSI_MSG_QUEUE
29 # define TRACE(x) dprintf x
30 # define TRACE_ERROR(x) dprintf x
31 #else
32 # define TRACE(x) /* nothing */
33 # define TRACE_ERROR(x) dprintf x
34 #endif
35
36
37 namespace {
38
39
40 struct queued_message : DoublyLinkedListLinkImpl<queued_message> {
queued_message__anonea5043ca0111::queued_message41 queued_message(const void *_message, ssize_t _length)
42 :
43 initOK(false),
44 length(_length)
45 {
46 message = (char *)malloc(sizeof(char) * _length);
47 if (message == NULL)
48 return;
49
50 if (user_memcpy(&type, _message, sizeof(long)) != B_OK
51 || user_memcpy(message, (void *)((char *)_message + sizeof(long)),
52 _length) != B_OK) {
53 free(message);
54 return;
55 }
56 initOK = true;
57 }
58
~queued_message__anonea5043ca0111::queued_message59 ~queued_message()
60 {
61 if (initOK)
62 free(message);
63 }
64
copy_to_user_buffer__anonea5043ca0111::queued_message65 ssize_t copy_to_user_buffer(void *_message, ssize_t _length)
66 {
67 if (_length > length)
68 _length = length;
69
70 if (user_memcpy(_message, &type, sizeof(long)) != B_OK
71 || user_memcpy((void *)((char *)_message + sizeof(long)), message,
72 _length) != B_OK)
73 return B_ERROR;
74 return _length;
75 }
76
77 bool initOK;
78 ssize_t length;
79 char *message;
80 long type;
81 };
82
83 typedef DoublyLinkedList<queued_message> MessageQueue;
84
85 // Arbitrary limit
86 #define MAX_BYTES_PER_QUEUE 2048
87
88 class XsiMessageQueue {
89 public:
XsiMessageQueue(int flags)90 XsiMessageQueue(int flags)
91 :
92 fBytesInQueue(0)
93 {
94 mutex_init(&fLock, "XsiMessageQueue private mutex");
95 fWaitingToReceive.Init(this, "XsiMessageQueue");
96 fWaitingToSend.Init(this, "XsiMessageQueue");
97
98 SetIpcKey((key_t)-1);
99 SetPermissions(flags);
100 // Initialize all fields to zero
101 memset((void *)&fMessageQueue, 0, sizeof(struct msqid_ds));
102 fMessageQueue.msg_ctime = (time_t)real_time_clock();
103 fMessageQueue.msg_qbytes = MAX_BYTES_PER_QUEUE;
104 }
105
106 // Implemented after sXsiMessageCount is declared
107 ~XsiMessageQueue();
108
BlockAndUnlock(ConditionVariableEntry * queueEntry,MutexLocker * queueLocker)109 status_t BlockAndUnlock(ConditionVariableEntry *queueEntry, MutexLocker *queueLocker)
110 {
111 // Unlock the queue before blocking
112 queueLocker->Unlock();
113 return queueEntry->Wait(B_CAN_INTERRUPT);
114 }
115
DoIpcSet(struct msqid_ds * result)116 void DoIpcSet(struct msqid_ds *result)
117 {
118 fMessageQueue.msg_perm.uid = result->msg_perm.uid;
119 fMessageQueue.msg_perm.gid = result->msg_perm.gid;
120 fMessageQueue.msg_perm.mode = (fMessageQueue.msg_perm.mode & ~0x01ff)
121 | (result->msg_perm.mode & 0x01ff);
122 fMessageQueue.msg_qbytes = result->msg_qbytes;
123 fMessageQueue.msg_ctime = (time_t)real_time_clock();
124 }
125
Dequeue(ConditionVariableEntry * queueEntry)126 void Dequeue(ConditionVariableEntry *queueEntry)
127 {
128 queueEntry->Wait(B_RELATIVE_TIMEOUT, 0);
129 }
130
Enqueue(ConditionVariableEntry * queueEntry,bool waitForMessage)131 void Enqueue(ConditionVariableEntry *queueEntry, bool waitForMessage)
132 {
133 if (waitForMessage) {
134 fWaitingToReceive.Add(queueEntry);
135 } else {
136 fWaitingToSend.Add(queueEntry);
137 }
138 }
139
GetMessageQueue()140 struct msqid_ds &GetMessageQueue()
141 {
142 return fMessageQueue;
143 }
144
HasPermission() const145 bool HasPermission() const
146 {
147 if ((fMessageQueue.msg_perm.mode & S_IWOTH) != 0)
148 return true;
149
150 uid_t uid = geteuid();
151 if (uid == 0 || (uid == fMessageQueue.msg_perm.uid
152 && (fMessageQueue.msg_perm.mode & S_IWUSR) != 0))
153 return true;
154
155 gid_t gid = getegid();
156 if (gid == fMessageQueue.msg_perm.gid
157 && (fMessageQueue.msg_perm.mode & S_IWGRP) != 0)
158 return true;
159
160 return false;
161 }
162
HasReadPermission() const163 bool HasReadPermission() const
164 {
165 // TODO: fix this
166 return HasPermission();
167 }
168
ID() const169 int ID() const
170 {
171 return fID;
172 }
173
174 // Implemented after sXsiMessageCount is declared
175 bool Insert(queued_message *message);
176
IpcKey() const177 key_t IpcKey() const
178 {
179 return fMessageQueue.msg_perm.key;
180 }
181
Lock()182 mutex &Lock()
183 {
184 return fLock;
185 }
186
MaxBytes() const187 msglen_t MaxBytes() const
188 {
189 return fMessageQueue.msg_qbytes;
190 }
191
192 // Implemented after sXsiMessageCount is declared
193 queued_message *Remove(long typeRequested);
194
SequenceNumber() const195 uint32 SequenceNumber() const
196 {
197 return fSequenceNumber;
198 }
199
200 // Implemented after sMessageQueueHashTable is declared
201 void SetID();
202
SetIpcKey(key_t key)203 void SetIpcKey(key_t key)
204 {
205 fMessageQueue.msg_perm.key = key;
206 }
207
SetPermissions(int flags)208 void SetPermissions(int flags)
209 {
210 fMessageQueue.msg_perm.uid = fMessageQueue.msg_perm.cuid = geteuid();
211 fMessageQueue.msg_perm.gid = fMessageQueue.msg_perm.cgid = getegid();
212 fMessageQueue.msg_perm.mode = (flags & 0x01ff);
213 }
214
WakeUpThread(bool waitForMessage)215 void WakeUpThread(bool waitForMessage)
216 {
217 if (waitForMessage) {
218 // Wake up all waiting thread for a message
219 // TODO: this can cause starvation for any
220 // very-unlucky-and-slow thread
221 fWaitingToReceive.NotifyAll();
222 } else {
223 // Wake up only one thread waiting to send
224 fWaitingToSend.NotifyOne();
225 }
226 }
227
Link()228 XsiMessageQueue*& Link()
229 {
230 return fLink;
231 }
232
233 private:
234 msglen_t fBytesInQueue;
235 int fID;
236 mutex fLock;
237 MessageQueue fMessage;
238 struct msqid_ds fMessageQueue;
239 uint32 fSequenceNumber;
240
241 ConditionVariable fWaitingToReceive;
242 ConditionVariable fWaitingToSend;
243
244 XsiMessageQueue* fLink;
245 };
246
247
248 // Xsi message queue hash table
249 struct MessageQueueHashTableDefinition {
250 typedef int KeyType;
251 typedef XsiMessageQueue ValueType;
252
HashKey__anonea5043ca0111::MessageQueueHashTableDefinition253 size_t HashKey (const int key) const
254 {
255 return (size_t)key;
256 }
257
Hash__anonea5043ca0111::MessageQueueHashTableDefinition258 size_t Hash(XsiMessageQueue *variable) const
259 {
260 return (size_t)variable->ID();
261 }
262
Compare__anonea5043ca0111::MessageQueueHashTableDefinition263 bool Compare(const int key, XsiMessageQueue *variable) const
264 {
265 return (int)key == (int)variable->ID();
266 }
267
GetLink__anonea5043ca0111::MessageQueueHashTableDefinition268 XsiMessageQueue*& GetLink(XsiMessageQueue *variable) const
269 {
270 return variable->Link();
271 }
272 };
273
274
275 // IPC class
276 class Ipc {
277 public:
Ipc(key_t key)278 Ipc(key_t key)
279 : fKey(key),
280 fMessageQueueId(-1)
281 {
282 }
283
Key() const284 key_t Key() const
285 {
286 return fKey;
287 }
288
MessageQueueID() const289 int MessageQueueID() const
290 {
291 return fMessageQueueId;
292 }
293
SetMessageQueueID(XsiMessageQueue * messageQueue)294 void SetMessageQueueID(XsiMessageQueue *messageQueue)
295 {
296 fMessageQueueId = messageQueue->ID();
297 }
298
Link()299 Ipc*& Link()
300 {
301 return fLink;
302 }
303
304 private:
305 key_t fKey;
306 int fMessageQueueId;
307 Ipc* fLink;
308 };
309
310
311 struct IpcHashTableDefinition {
312 typedef key_t KeyType;
313 typedef Ipc ValueType;
314
HashKey__anonea5043ca0111::IpcHashTableDefinition315 size_t HashKey (const key_t key) const
316 {
317 return (size_t)(key);
318 }
319
Hash__anonea5043ca0111::IpcHashTableDefinition320 size_t Hash(Ipc *variable) const
321 {
322 return (size_t)HashKey(variable->Key());
323 }
324
Compare__anonea5043ca0111::IpcHashTableDefinition325 bool Compare(const key_t key, Ipc *variable) const
326 {
327 return (key_t)key == (key_t)variable->Key();
328 }
329
GetLink__anonea5043ca0111::IpcHashTableDefinition330 Ipc*& GetLink(Ipc *variable) const
331 {
332 return variable->Link();
333 }
334 };
335
336 } // namespace
337
338
339 // Arbitrary limits
340 #define MAX_XSI_MESSAGE 4096
341 #define MAX_XSI_MESSAGE_QUEUE 1024
342 static BOpenHashTable<IpcHashTableDefinition> sIpcHashTable;
343 static BOpenHashTable<MessageQueueHashTableDefinition> sMessageQueueHashTable;
344
345 static mutex sIpcLock;
346 static mutex sXsiMessageQueueLock;
347
348 static uint32 sGlobalSequenceNumber = 1;
349 static int32 sXsiMessageCount = 0;
350 static int32 sXsiMessageQueueCount = 0;
351
352
353 // #pragma mark -
354
355
~XsiMessageQueue()356 XsiMessageQueue::~XsiMessageQueue()
357 {
358 mutex_destroy(&fLock);
359
360 // Wake up any threads still waiting
361 fWaitingToReceive.NotifyAll(EIDRM);
362 fWaitingToSend.NotifyAll(EIDRM);
363
364 // Free up any remaining messages
365 if (fMessageQueue.msg_qnum) {
366 while (queued_message *message = fMessage.RemoveHead()) {
367 atomic_add(&sXsiMessageCount, -1);
368 delete message;
369 }
370 }
371 }
372
373
374 bool
Insert(queued_message * message)375 XsiMessageQueue::Insert(queued_message *message)
376 {
377 // The only situation that would make us (potentially) wait
378 // is that we exceed with bytes or with the total number of messages
379 if (fBytesInQueue + message->length > fMessageQueue.msg_qbytes)
380 return true;
381
382 while (true) {
383 int32 oldCount = atomic_get(&sXsiMessageCount);
384 if (oldCount >= MAX_XSI_MESSAGE)
385 return true;
386 // If another thread updates the counter we keep
387 // iterating
388 if (atomic_test_and_set(&sXsiMessageCount, oldCount + 1, oldCount)
389 == oldCount)
390 break;
391 }
392
393 fMessage.Add(message);
394 fMessageQueue.msg_qnum++;
395 fMessageQueue.msg_lspid = getpid();
396 fMessageQueue.msg_stime = real_time_clock();
397 fBytesInQueue += message->length;
398
399 WakeUpThread(true /* WaitForMessage */);
400 return false;
401 }
402
403
404 queued_message*
Remove(long typeRequested)405 XsiMessageQueue::Remove(long typeRequested)
406 {
407 queued_message *message = NULL;
408 if (typeRequested < 0) {
409 // Return first message of the lowest type
410 // that is less than or equal to the absolute
411 // value of type requested.
412 MessageQueue::Iterator iterator = fMessage.GetIterator();
413 while (iterator.HasNext()) {
414 queued_message *current = iterator.Next();
415 if (current->type <= -typeRequested) {
416 message = iterator.Remove();
417 break;
418 }
419 }
420 } else if (typeRequested == 0) {
421 // Return the first message on the queue
422 message = fMessage.RemoveHead();
423 } else {
424 // Return the first message of type requested
425 MessageQueue::Iterator iterator = fMessage.GetIterator();
426 while (iterator.HasNext()) {
427 queued_message *current = iterator.Next();
428 if (current->type == typeRequested) {
429 message = iterator.Remove();
430 break;
431 }
432 }
433 }
434
435 if (message == NULL)
436 return NULL;
437
438 fMessageQueue.msg_qnum--;
439 fMessageQueue.msg_lrpid = getpid();
440 fMessageQueue.msg_rtime = real_time_clock();
441 fBytesInQueue -= message->length;
442 atomic_add(&sXsiMessageCount, -1);
443
444 WakeUpThread(false /* WaitForMessage */);
445 return message;
446 }
447
448
449 void
SetID()450 XsiMessageQueue::SetID()
451 {
452 fID = real_time_clock();
453 // The lock is held before calling us
454 while (true) {
455 if (sMessageQueueHashTable.Lookup(fID) == NULL)
456 break;
457 fID++;
458 }
459 sGlobalSequenceNumber = (sGlobalSequenceNumber + 1) % UINT_MAX;
460 fSequenceNumber = sGlobalSequenceNumber;
461 }
462
463
464 // #pragma mark - Kernel exported API
465
466
467 void
xsi_msg_init()468 xsi_msg_init()
469 {
470 // Initialize hash tables
471 status_t status = sIpcHashTable.Init();
472 if (status != B_OK)
473 panic("xsi_msg_init() failed to initialize ipc hash table\n");
474 status = sMessageQueueHashTable.Init();
475 if (status != B_OK)
476 panic("xsi_msg_init() failed to initialize message queue hash table\n");
477
478 mutex_init(&sIpcLock, "global POSIX message queue IPC table");
479 mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table");
480 }
481
482
483 // #pragma mark - Syscalls
484
485
486 int
_user_xsi_msgctl(int messageQueueID,int command,struct msqid_ds * buffer)487 _user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer)
488 {
489 TRACE(("xsi_msgctl: messageQueueID = %d, command = %d\n", messageQueueID, command));
490 MutexLocker ipcHashLocker(sIpcLock);
491 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
492 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
493 if (messageQueue == NULL) {
494 TRACE(("xsi_msgctl: message queue id %d not valid\n", messageQueueID));
495 return EINVAL;
496 }
497 if (buffer != NULL && !IS_USER_ADDRESS(buffer)) {
498 TRACE(("xsi_msgctl: buffer address is not valid\n"));
499 return B_BAD_ADDRESS;
500 }
501
502 // Lock the message queue itself and release both the ipc hash table lock
503 // and the message queue hash table lock _only_ if the command it's not
504 // IPC_RMID, this prevents undesidered situation from happening while
505 // (hopefully) improving the concurrency.
506 MutexLocker messageQueueLocker;
507 if (command != IPC_RMID) {
508 messageQueueLocker.SetTo(&messageQueue->Lock(), false);
509 messageQueueHashLocker.Unlock();
510 ipcHashLocker.Unlock();
511 } else
512 // Since we are going to delete the message queue object
513 // along with its mutex, we can't use a MutexLocker object,
514 // as the mutex itself won't exist on function exit
515 mutex_lock(&messageQueue->Lock());
516
517 switch (command) {
518 case IPC_STAT: {
519 if (!messageQueue->HasReadPermission()) {
520 TRACE(("xsi_msgctl: calling process has not read "
521 "permission on message queue %d, key %d\n", messageQueueID,
522 (int)messageQueue->IpcKey()));
523 return EACCES;
524 }
525 struct msqid_ds msg = messageQueue->GetMessageQueue();
526 if (user_memcpy(buffer, &msg, sizeof(struct msqid_ds)) < B_OK) {
527 TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
528 return B_BAD_ADDRESS;
529 }
530 break;
531 }
532
533 case IPC_SET: {
534 if (!messageQueue->HasPermission()) {
535 TRACE(("xsi_msgctl: calling process has not permission "
536 "on message queue %d, key %d\n", messageQueueID,
537 (int)messageQueue->IpcKey()));
538 return EPERM;
539 }
540 struct msqid_ds msg;
541 if (user_memcpy(&msg, buffer, sizeof(struct msqid_ds)) < B_OK) {
542 TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
543 return B_BAD_ADDRESS;
544 }
545 if (msg.msg_qbytes > messageQueue->MaxBytes() && getuid() != 0) {
546 TRACE(("xsi_msgctl: user does not have permission to "
547 "increase the maximum number of bytes allowed on queue\n"));
548 return EPERM;
549 }
550 if (msg.msg_qbytes == 0) {
551 TRACE(("xsi_msgctl: can't set msg_qbytes to 0!\n"));
552 return EINVAL;
553 }
554
555 messageQueue->DoIpcSet(&msg);
556 break;
557 }
558
559 case IPC_RMID: {
560 // If this was the command, we are still holding the message
561 // queue hash table lock along with the ipc one, but not the
562 // message queue lock itself. This prevents other process
563 // to try and acquire a destroyed mutex
564 if (!messageQueue->HasPermission()) {
565 TRACE(("xsi_msgctl: calling process has not permission "
566 "on message queue %d, key %d\n", messageQueueID,
567 (int)messageQueue->IpcKey()));
568 return EPERM;
569 }
570 key_t key = messageQueue->IpcKey();
571 Ipc *ipcKey = NULL;
572 if (key != -1) {
573 ipcKey = sIpcHashTable.Lookup(key);
574 sIpcHashTable.Remove(ipcKey);
575 }
576 sMessageQueueHashTable.Remove(messageQueue);
577 // Wake up of any threads waiting on this
578 // queue happens in destructor
579 if (key != -1)
580 delete ipcKey;
581 atomic_add(&sXsiMessageQueueCount, -1);
582
583 delete messageQueue;
584 break;
585 }
586
587 default:
588 TRACE_ERROR(("xsi_semctl: command %d not valid\n", command));
589 return EINVAL;
590 }
591
592 return B_OK;
593 }
594
595
596 int
_user_xsi_msgget(key_t key,int flags)597 _user_xsi_msgget(key_t key, int flags)
598 {
599 TRACE(("xsi_msgget: key = %d, flags = %d\n", (int)key, flags));
600 XsiMessageQueue *messageQueue = NULL;
601 Ipc *ipcKey = NULL;
602 // Default assumptions
603 bool isPrivate = true;
604 bool create = true;
605
606 if (key != IPC_PRIVATE) {
607 isPrivate = false;
608 // Check if key already exist, if it does it already has a message
609 // queue associated with it
610 ipcKey = sIpcHashTable.Lookup(key);
611 if (ipcKey == NULL || ipcKey->MessageQueueID() == -1) {
612 if (!(flags & IPC_CREAT)) {
613 TRACE(("xsi_msgget: key %d does not exist, but the "
614 "caller did not ask for creation\n", (int)key));
615 return ENOENT;
616 }
617 if (ipcKey == NULL) {
618 ipcKey = new(std::nothrow) Ipc(key);
619 if (ipcKey == NULL) {
620 TRACE(("xsi_msgget: failed to create new Ipc object "
621 "for key %d\n", (int)key));
622 return ENOMEM;
623 }
624 sIpcHashTable.Insert(ipcKey);
625 }
626 } else {
627 // The IPC key exist and it already has a message queue
628 if ((flags & IPC_CREAT) && (flags & IPC_EXCL)) {
629 TRACE_ERROR(("xsi_msgget: key %d already exist\n", (int)key));
630 return EEXIST;
631 }
632 int messageQueueID = ipcKey->MessageQueueID();
633
634 MutexLocker _(sXsiMessageQueueLock);
635 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
636 if (!messageQueue->HasPermission()) {
637 TRACE(("xsi_msgget: calling process has not permission "
638 "on message queue %d, key %d\n", messageQueue->ID(),
639 (int)key));
640 return EACCES;
641 }
642 create = false;
643 }
644 }
645
646 if (create) {
647 // Create a new message queue for this key
648 if (atomic_get(&sXsiMessageQueueCount) >= MAX_XSI_MESSAGE_QUEUE) {
649 TRACE_ERROR(("xsi_msgget: reached limit of maximun number of "
650 "message queues\n"));
651 return ENOSPC;
652 }
653
654 messageQueue = new(std::nothrow) XsiMessageQueue(flags);
655 if (messageQueue == NULL) {
656 TRACE_ERROR(("xsi_msgget: failed to allocate new xsi "
657 "message queue\n"));
658 return ENOMEM;
659 }
660 atomic_add(&sXsiMessageQueueCount, 1);
661
662 MutexLocker _(sXsiMessageQueueLock);
663 messageQueue->SetID();
664 if (isPrivate)
665 messageQueue->SetIpcKey((key_t)-1);
666 else {
667 messageQueue->SetIpcKey(key);
668 ipcKey->SetMessageQueueID(messageQueue);
669 }
670 sMessageQueueHashTable.Insert(messageQueue);
671 }
672
673 return messageQueue->ID();
674 }
675
676
677 ssize_t
_user_xsi_msgrcv(int messageQueueID,void * messagePointer,size_t messageSize,long messageType,int messageFlags)678 _user_xsi_msgrcv(int messageQueueID, void *messagePointer,
679 size_t messageSize, long messageType, int messageFlags)
680 {
681 TRACE(("xsi_msgrcv: messageQueueID = %d, messageSize = %ld\n",
682 messageQueueID, messageSize));
683 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
684 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
685 if (messageQueue == NULL) {
686 TRACE(("xsi_msgrcv: message queue id %d not valid\n",
687 messageQueueID));
688 return EINVAL;
689 }
690 MutexLocker messageQueueLocker(messageQueue->Lock());
691 messageQueueHashLocker.Unlock();
692
693 if (messageSize > MAX_BYTES_PER_QUEUE) {
694 TRACE_ERROR(("xsi_msgrcv: message size is out of range\n"));
695 return EINVAL;
696 }
697 if (!messageQueue->HasPermission()) {
698 TRACE(("xsi_msgrcv: calling process has not permission "
699 "on message queue id %d, key %d\n", messageQueueID,
700 (int)messageQueue->IpcKey()));
701 return EACCES;
702 }
703 if (!IS_USER_ADDRESS(messagePointer)) {
704 TRACE(("xsi_msgrcv: message address is not valid\n"));
705 return B_BAD_ADDRESS;
706 }
707
708 queued_message *message = NULL;
709 while (true) {
710 message = messageQueue->Remove(messageType);
711
712 if (message == NULL && !(messageFlags & IPC_NOWAIT)) {
713 // We are going to sleep
714 ConditionVariableEntry queueEntry;
715 messageQueue->Enqueue(&queueEntry, /* waitForMessage */ true);
716
717 uint32 sequenceNumber = messageQueue->SequenceNumber();
718
719 TRACE(("xsi_msgrcv: thread %d going to sleep\n", (int)thread_get_current_thread_id()));
720 status_t result
721 = messageQueue->BlockAndUnlock(&queueEntry, &messageQueueLocker);
722 TRACE(("xsi_msgrcv: thread %d back to life\n", (int)thread_get_current_thread_id()));
723
724 messageQueueHashLocker.Lock();
725 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
726 if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
727 && sequenceNumber != messageQueue->SequenceNumber())) {
728 TRACE(("xsi_msgrcv: message queue id %d (sequence = "
729 "%" B_PRIu32 ") got destroyed\n", messageQueueID,
730 sequenceNumber));
731 return EIDRM;
732 } else if (result == B_INTERRUPTED) {
733 TRACE(("xsi_msgrcv: thread %d got interrupted while "
734 "waiting on message queue %d\n", (int)thread_get_current_thread_id(),
735 messageQueueID));
736 messageQueue->Dequeue(&queueEntry);
737 return EINTR;
738 } else {
739 messageQueueLocker.Lock();
740 messageQueueHashLocker.Unlock();
741 }
742 } else if (message == NULL) {
743 // There is not message of type requested and
744 // we can't wait
745 return ENOMSG;
746 } else {
747 // Message received correctly (so far)
748 if ((ssize_t)messageSize < message->length
749 && !(messageFlags & MSG_NOERROR)) {
750 TRACE_ERROR(("xsi_msgrcv: message too big!\n"));
751 // Put the message back inside. Since we hold the
752 // queue message lock, not one else could have filled
753 // up the queue meanwhile
754 messageQueue->Insert(message);
755 return E2BIG;
756 }
757
758 ssize_t result
759 = message->copy_to_user_buffer(messagePointer, messageSize);
760 if (result < 0) {
761 messageQueue->Insert(message);
762 return B_BAD_ADDRESS;
763 }
764
765 delete message;
766 TRACE(("xsi_msgrcv: message received correctly\n"));
767 return result;
768 }
769 }
770
771 return B_OK;
772 }
773
774
775 int
_user_xsi_msgsnd(int messageQueueID,const void * messagePointer,size_t messageSize,int messageFlags)776 _user_xsi_msgsnd(int messageQueueID, const void *messagePointer,
777 size_t messageSize, int messageFlags)
778 {
779 TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %ld\n",
780 messageQueueID, messageSize));
781 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
782 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
783 if (messageQueue == NULL) {
784 TRACE(("xsi_msgsnd: message queue id %d not valid\n",
785 messageQueueID));
786 return EINVAL;
787 }
788 MutexLocker messageQueueLocker(messageQueue->Lock());
789 messageQueueHashLocker.Unlock();
790
791 if (messageSize > MAX_BYTES_PER_QUEUE) {
792 TRACE_ERROR(("xsi_msgsnd: message size is out of range\n"));
793 return EINVAL;
794 }
795 if (!messageQueue->HasPermission()) {
796 TRACE(("xsi_msgsnd: calling process has not permission "
797 "on message queue id %d, key %d\n", messageQueueID,
798 (int)messageQueue->IpcKey()));
799 return EACCES;
800 }
801 if (!IS_USER_ADDRESS(messagePointer)) {
802 TRACE(("xsi_msgsnd: message address is not valid\n"));
803 return B_BAD_ADDRESS;
804 }
805
806 queued_message *message
807 = new(std::nothrow) queued_message(messagePointer, messageSize);
808 if (message == NULL || message->initOK != true) {
809 TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n"));
810 delete message;
811 return ENOMEM;
812 }
813
814 bool notSent = true;
815 status_t result = B_OK;
816 while (notSent) {
817 bool goToSleep = messageQueue->Insert(message);
818
819 if (goToSleep && !(messageFlags & IPC_NOWAIT)) {
820 // We are going to sleep
821 ConditionVariableEntry queueEntry;
822 messageQueue->Enqueue(&queueEntry, /* waitForMessage */ false);
823
824 uint32 sequenceNumber = messageQueue->SequenceNumber();
825
826 TRACE(("xsi_msgsnd: thread %d going to sleep\n", (int)thread_get_current_thread_id()));
827 result = messageQueue->BlockAndUnlock(&queueEntry, &messageQueueLocker);
828 TRACE(("xsi_msgsnd: thread %d back to life\n", (int)thread_get_current_thread_id()));
829
830 messageQueueHashLocker.Lock();
831 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
832 if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
833 && sequenceNumber != messageQueue->SequenceNumber())) {
834 TRACE(("xsi_msgsnd: message queue id %d (sequence = "
835 "%" B_PRIu32 ") got destroyed\n", messageQueueID,
836 sequenceNumber));
837 delete message;
838 notSent = false;
839 result = EIDRM;
840 } else if (result == B_INTERRUPTED) {
841 TRACE(("xsi_msgsnd: thread %d got interrupted while "
842 "waiting on message queue %d\n", (int)thread_get_current_thread_id(),
843 messageQueueID));
844 messageQueue->Dequeue(&queueEntry);
845 delete message;
846 notSent = false;
847 result = EINTR;
848 } else {
849 messageQueueLocker.Lock();
850 messageQueueHashLocker.Unlock();
851 }
852 } else if (goToSleep) {
853 // We did not send the message and we can't wait
854 delete message;
855 notSent = false;
856 result = EAGAIN;
857 } else {
858 // Message delivered correctly
859 TRACE(("xsi_msgsnd: message sent correctly\n"));
860 notSent = false;
861 }
862 }
863
864 return result;
865 }
866