1 /* 2 * Copyright 2008-2023, Haiku, Inc. All rights reserved. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Salvatore Benedetto <salvatore.benedetto@gmail.com> 7 */ 8 9 #include <posix/xsi_message_queue.h> 10 11 #include <new> 12 13 #include <sys/ipc.h> 14 #include <sys/types.h> 15 16 #include <OS.h> 17 18 #include <kernel.h> 19 #include <syscall_restart.h> 20 21 #include <util/atomic.h> 22 #include <util/AutoLock.h> 23 #include <util/DoublyLinkedList.h> 24 #include <util/OpenHashTable.h> 25 26 27 #define TRACE_XSI_MSG_QUEUE 28 #ifdef TRACE_XSI_MSG_QUEUE 29 # define TRACE(x) dprintf x 30 # define TRACE_ERROR(x) dprintf x 31 #else 32 # define TRACE(x) /* nothing */ 33 # define TRACE_ERROR(x) dprintf x 34 #endif 35 36 37 namespace { 38 39 40 struct queued_message : DoublyLinkedListLinkImpl<queued_message> { 41 queued_message(const void *_message, ssize_t _length) 42 : 43 initOK(false), 44 length(_length) 45 { 46 message = (char *)malloc(sizeof(char) * _length); 47 if (message == NULL) 48 return; 49 50 if (user_memcpy(&type, _message, sizeof(long)) != B_OK 51 || user_memcpy(message, (void *)((char *)_message + sizeof(long)), 52 _length) != B_OK) { 53 free(message); 54 return; 55 } 56 initOK = true; 57 } 58 59 ~queued_message() 60 { 61 if (initOK) 62 free(message); 63 } 64 65 ssize_t copy_to_user_buffer(void *_message, ssize_t _length) 66 { 67 if (_length > length) 68 _length = length; 69 70 if (user_memcpy(_message, &type, sizeof(long)) != B_OK 71 || user_memcpy((void *)((char *)_message + sizeof(long)), message, 72 _length) != B_OK) 73 return B_ERROR; 74 return _length; 75 } 76 77 bool initOK; 78 ssize_t length; 79 char *message; 80 long type; 81 }; 82 83 typedef DoublyLinkedList<queued_message> MessageQueue; 84 85 // Arbitrary limit 86 #define MAX_BYTES_PER_QUEUE 2048 87 88 class XsiMessageQueue { 89 public: 90 XsiMessageQueue(int flags) 91 : 92 fBytesInQueue(0) 93 { 94 mutex_init(&fLock, "XsiMessageQueue private mutex"); 95 fWaitingToReceive.Init(this, "XsiMessageQueue"); 96 fWaitingToSend.Init(this, "XsiMessageQueue"); 97 98 SetIpcKey((key_t)-1); 99 SetPermissions(flags); 100 // Initialize all fields to zero 101 memset((void *)&fMessageQueue, 0, sizeof(struct msqid_ds)); 102 fMessageQueue.msg_ctime = (time_t)real_time_clock(); 103 fMessageQueue.msg_qbytes = MAX_BYTES_PER_QUEUE; 104 } 105 106 // Implemented after sXsiMessageCount is declared 107 ~XsiMessageQueue(); 108 109 status_t BlockAndUnlock(ConditionVariableEntry *queueEntry, MutexLocker *queueLocker) 110 { 111 // Unlock the queue before blocking 112 queueLocker->Unlock(); 113 return queueEntry->Wait(B_CAN_INTERRUPT); 114 } 115 116 void DoIpcSet(struct msqid_ds *result) 117 { 118 fMessageQueue.msg_perm.uid = result->msg_perm.uid; 119 fMessageQueue.msg_perm.gid = result->msg_perm.gid; 120 fMessageQueue.msg_perm.mode = (fMessageQueue.msg_perm.mode & ~0x01ff) 121 | (result->msg_perm.mode & 0x01ff); 122 fMessageQueue.msg_qbytes = result->msg_qbytes; 123 fMessageQueue.msg_ctime = (time_t)real_time_clock(); 124 } 125 126 void Dequeue(ConditionVariableEntry *queueEntry, bool waitForMessage) 127 { 128 queueEntry->Wait(B_RELATIVE_TIMEOUT, 0); 129 } 130 131 void Enqueue(ConditionVariableEntry *queueEntry, bool waitForMessage) 132 { 133 if (waitForMessage) { 134 fWaitingToReceive.Add(queueEntry); 135 } else { 136 fWaitingToSend.Add(queueEntry); 137 } 138 } 139 140 struct msqid_ds &GetMessageQueue() 141 { 142 return fMessageQueue; 143 } 144 145 bool HasPermission() const 146 { 147 if ((fMessageQueue.msg_perm.mode & S_IWOTH) != 0) 148 return true; 149 150 uid_t uid = geteuid(); 151 if (uid == 0 || (uid == fMessageQueue.msg_perm.uid 152 && (fMessageQueue.msg_perm.mode & S_IWUSR) != 0)) 153 return true; 154 155 gid_t gid = getegid(); 156 if (gid == fMessageQueue.msg_perm.gid 157 && (fMessageQueue.msg_perm.mode & S_IWGRP) != 0) 158 return true; 159 160 return false; 161 } 162 163 bool HasReadPermission() const 164 { 165 // TODO: fix this 166 return HasPermission(); 167 } 168 169 int ID() const 170 { 171 return fID; 172 } 173 174 // Implemented after sXsiMessageCount is declared 175 bool Insert(queued_message *message); 176 177 key_t IpcKey() const 178 { 179 return fMessageQueue.msg_perm.key; 180 } 181 182 mutex &Lock() 183 { 184 return fLock; 185 } 186 187 msglen_t MaxBytes() const 188 { 189 return fMessageQueue.msg_qbytes; 190 } 191 192 // Implemented after sXsiMessageCount is declared 193 queued_message *Remove(long typeRequested); 194 195 uint32 SequenceNumber() const 196 { 197 return fSequenceNumber; 198 } 199 200 // Implemented after sMessageQueueHashTable is declared 201 void SetID(); 202 203 void SetIpcKey(key_t key) 204 { 205 fMessageQueue.msg_perm.key = key; 206 } 207 208 void SetPermissions(int flags) 209 { 210 fMessageQueue.msg_perm.uid = fMessageQueue.msg_perm.cuid = geteuid(); 211 fMessageQueue.msg_perm.gid = fMessageQueue.msg_perm.cgid = getegid(); 212 fMessageQueue.msg_perm.mode = (flags & 0x01ff); 213 } 214 215 void WakeUpThread(bool waitForMessage) 216 { 217 if (waitForMessage) { 218 // Wake up all waiting thread for a message 219 // TODO: this can cause starvation for any 220 // very-unlucky-and-slow thread 221 fWaitingToReceive.NotifyAll(); 222 } else { 223 // Wake up only one thread waiting to send 224 fWaitingToSend.NotifyOne(); 225 } 226 } 227 228 XsiMessageQueue*& Link() 229 { 230 return fLink; 231 } 232 233 private: 234 msglen_t fBytesInQueue; 235 int fID; 236 mutex fLock; 237 MessageQueue fMessage; 238 struct msqid_ds fMessageQueue; 239 uint32 fSequenceNumber; 240 241 ConditionVariable fWaitingToReceive; 242 ConditionVariable fWaitingToSend; 243 244 XsiMessageQueue* fLink; 245 }; 246 247 248 // Xsi message queue hash table 249 struct MessageQueueHashTableDefinition { 250 typedef int KeyType; 251 typedef XsiMessageQueue ValueType; 252 253 size_t HashKey (const int key) const 254 { 255 return (size_t)key; 256 } 257 258 size_t Hash(XsiMessageQueue *variable) const 259 { 260 return (size_t)variable->ID(); 261 } 262 263 bool Compare(const int key, XsiMessageQueue *variable) const 264 { 265 return (int)key == (int)variable->ID(); 266 } 267 268 XsiMessageQueue*& GetLink(XsiMessageQueue *variable) const 269 { 270 return variable->Link(); 271 } 272 }; 273 274 275 // IPC class 276 class Ipc { 277 public: 278 Ipc(key_t key) 279 : fKey(key), 280 fMessageQueueId(-1) 281 { 282 } 283 284 key_t Key() const 285 { 286 return fKey; 287 } 288 289 int MessageQueueID() const 290 { 291 return fMessageQueueId; 292 } 293 294 void SetMessageQueueID(XsiMessageQueue *messageQueue) 295 { 296 fMessageQueueId = messageQueue->ID(); 297 } 298 299 Ipc*& Link() 300 { 301 return fLink; 302 } 303 304 private: 305 key_t fKey; 306 int fMessageQueueId; 307 Ipc* fLink; 308 }; 309 310 311 struct IpcHashTableDefinition { 312 typedef key_t KeyType; 313 typedef Ipc ValueType; 314 315 size_t HashKey (const key_t key) const 316 { 317 return (size_t)(key); 318 } 319 320 size_t Hash(Ipc *variable) const 321 { 322 return (size_t)HashKey(variable->Key()); 323 } 324 325 bool Compare(const key_t key, Ipc *variable) const 326 { 327 return (key_t)key == (key_t)variable->Key(); 328 } 329 330 Ipc*& GetLink(Ipc *variable) const 331 { 332 return variable->Link(); 333 } 334 }; 335 336 } // namespace 337 338 339 // Arbitrary limits 340 #define MAX_XSI_MESSAGE 4096 341 #define MAX_XSI_MESSAGE_QUEUE 1024 342 static BOpenHashTable<IpcHashTableDefinition> sIpcHashTable; 343 static BOpenHashTable<MessageQueueHashTableDefinition> sMessageQueueHashTable; 344 345 static mutex sIpcLock; 346 static mutex sXsiMessageQueueLock; 347 348 static uint32 sGlobalSequenceNumber = 1; 349 static int32 sXsiMessageCount = 0; 350 static int32 sXsiMessageQueueCount = 0; 351 352 353 // #pragma mark - 354 355 356 XsiMessageQueue::~XsiMessageQueue() 357 { 358 mutex_destroy(&fLock); 359 360 // Wake up any threads still waiting 361 fWaitingToReceive.NotifyAll(EIDRM); 362 fWaitingToSend.NotifyAll(EIDRM); 363 364 // Free up any remaining messages 365 if (fMessageQueue.msg_qnum) { 366 while (queued_message *message = fMessage.RemoveHead()) { 367 atomic_add(&sXsiMessageCount, -1); 368 delete message; 369 } 370 } 371 } 372 373 374 bool 375 XsiMessageQueue::Insert(queued_message *message) 376 { 377 // The only situation that would make us (potentially) wait 378 // is that we exceed with bytes or with the total number of messages 379 if (fBytesInQueue + message->length > fMessageQueue.msg_qbytes) 380 return true; 381 382 while (true) { 383 int32 oldCount = atomic_get(&sXsiMessageCount); 384 if (oldCount >= MAX_XSI_MESSAGE) 385 return true; 386 // If another thread updates the counter we keep 387 // iterating 388 if (atomic_test_and_set(&sXsiMessageCount, oldCount + 1, oldCount) 389 == oldCount) 390 break; 391 } 392 393 fMessage.Add(message); 394 fMessageQueue.msg_qnum++; 395 fMessageQueue.msg_lspid = getpid(); 396 fMessageQueue.msg_stime = real_time_clock(); 397 fBytesInQueue += message->length; 398 399 WakeUpThread(true /* WaitForMessage */); 400 return false; 401 } 402 403 404 queued_message* 405 XsiMessageQueue::Remove(long typeRequested) 406 { 407 queued_message *message = NULL; 408 if (typeRequested < 0) { 409 // Return first message of the lowest type 410 // that is less than or equal to the absolute 411 // value of type requested. 412 MessageQueue::Iterator iterator = fMessage.GetIterator(); 413 while (iterator.HasNext()) { 414 queued_message *current = iterator.Next(); 415 if (current->type <= -typeRequested) { 416 message = iterator.Remove(); 417 break; 418 } 419 } 420 } else if (typeRequested == 0) { 421 // Return the first message on the queue 422 message = fMessage.RemoveHead(); 423 } else { 424 // Return the first message of type requested 425 MessageQueue::Iterator iterator = fMessage.GetIterator(); 426 while (iterator.HasNext()) { 427 queued_message *current = iterator.Next(); 428 if (current->type == typeRequested) { 429 message = iterator.Remove(); 430 break; 431 } 432 } 433 } 434 435 if (message == NULL) 436 return NULL; 437 438 fMessageQueue.msg_qnum--; 439 fMessageQueue.msg_lrpid = getpid(); 440 fMessageQueue.msg_rtime = real_time_clock(); 441 fBytesInQueue -= message->length; 442 atomic_add(&sXsiMessageCount, -1); 443 444 WakeUpThread(false /* WaitForMessage */); 445 return message; 446 } 447 448 449 void 450 XsiMessageQueue::SetID() 451 { 452 fID = real_time_clock(); 453 // The lock is held before calling us 454 while (true) { 455 if (sMessageQueueHashTable.Lookup(fID) == NULL) 456 break; 457 fID++; 458 } 459 sGlobalSequenceNumber = (sGlobalSequenceNumber + 1) % UINT_MAX; 460 fSequenceNumber = sGlobalSequenceNumber; 461 } 462 463 464 // #pragma mark - Kernel exported API 465 466 467 void 468 xsi_msg_init() 469 { 470 // Initialize hash tables 471 status_t status = sIpcHashTable.Init(); 472 if (status != B_OK) 473 panic("xsi_msg_init() failed to initialize ipc hash table\n"); 474 status = sMessageQueueHashTable.Init(); 475 if (status != B_OK) 476 panic("xsi_msg_init() failed to initialize message queue hash table\n"); 477 478 mutex_init(&sIpcLock, "global POSIX message queue IPC table"); 479 mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table"); 480 } 481 482 483 // #pragma mark - Syscalls 484 485 486 int 487 _user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer) 488 { 489 TRACE(("xsi_msgctl: messageQueueID = %d, command = %d\n", messageQueueID, command)); 490 MutexLocker ipcHashLocker(sIpcLock); 491 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock); 492 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 493 if (messageQueue == NULL) { 494 TRACE(("xsi_msgctl: message queue id %d not valid\n", messageQueueID)); 495 return EINVAL; 496 } 497 if (buffer != NULL && !IS_USER_ADDRESS(buffer)) { 498 TRACE(("xsi_msgctl: buffer address is not valid\n")); 499 return B_BAD_ADDRESS; 500 } 501 502 // Lock the message queue itself and release both the ipc hash table lock 503 // and the message queue hash table lock _only_ if the command it's not 504 // IPC_RMID, this prevents undesidered situation from happening while 505 // (hopefully) improving the concurrency. 506 MutexLocker messageQueueLocker; 507 if (command != IPC_RMID) { 508 messageQueueLocker.SetTo(&messageQueue->Lock(), false); 509 messageQueueHashLocker.Unlock(); 510 ipcHashLocker.Unlock(); 511 } else 512 // Since we are going to delete the message queue object 513 // along with its mutex, we can't use a MutexLocker object, 514 // as the mutex itself won't exist on function exit 515 mutex_lock(&messageQueue->Lock()); 516 517 switch (command) { 518 case IPC_STAT: { 519 if (!messageQueue->HasReadPermission()) { 520 TRACE(("xsi_msgctl: calling process has not read " 521 "permission on message queue %d, key %d\n", messageQueueID, 522 (int)messageQueue->IpcKey())); 523 return EACCES; 524 } 525 struct msqid_ds msg = messageQueue->GetMessageQueue(); 526 if (user_memcpy(buffer, &msg, sizeof(struct msqid_ds)) < B_OK) { 527 TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n")); 528 return B_BAD_ADDRESS; 529 } 530 break; 531 } 532 533 case IPC_SET: { 534 if (!messageQueue->HasPermission()) { 535 TRACE(("xsi_msgctl: calling process has not permission " 536 "on message queue %d, key %d\n", messageQueueID, 537 (int)messageQueue->IpcKey())); 538 return EPERM; 539 } 540 struct msqid_ds msg; 541 if (user_memcpy(&msg, buffer, sizeof(struct msqid_ds)) < B_OK) { 542 TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n")); 543 return B_BAD_ADDRESS; 544 } 545 if (msg.msg_qbytes > messageQueue->MaxBytes() && getuid() != 0) { 546 TRACE(("xsi_msgctl: user does not have permission to " 547 "increase the maximum number of bytes allowed on queue\n")); 548 return EPERM; 549 } 550 if (msg.msg_qbytes == 0) { 551 TRACE(("xsi_msgctl: can't set msg_qbytes to 0!\n")); 552 return EINVAL; 553 } 554 555 messageQueue->DoIpcSet(&msg); 556 break; 557 } 558 559 case IPC_RMID: { 560 // If this was the command, we are still holding the message 561 // queue hash table lock along with the ipc one, but not the 562 // message queue lock itself. This prevents other process 563 // to try and acquire a destroyed mutex 564 if (!messageQueue->HasPermission()) { 565 TRACE(("xsi_msgctl: calling process has not permission " 566 "on message queue %d, key %d\n", messageQueueID, 567 (int)messageQueue->IpcKey())); 568 return EPERM; 569 } 570 key_t key = messageQueue->IpcKey(); 571 Ipc *ipcKey = NULL; 572 if (key != -1) { 573 ipcKey = sIpcHashTable.Lookup(key); 574 sIpcHashTable.Remove(ipcKey); 575 } 576 sMessageQueueHashTable.Remove(messageQueue); 577 // Wake up of any threads waiting on this 578 // queue happens in destructor 579 if (key != -1) 580 delete ipcKey; 581 atomic_add(&sXsiMessageQueueCount, -1); 582 583 delete messageQueue; 584 break; 585 } 586 587 default: 588 TRACE_ERROR(("xsi_semctl: command %d not valid\n", command)); 589 return EINVAL; 590 } 591 592 return B_OK; 593 } 594 595 596 int 597 _user_xsi_msgget(key_t key, int flags) 598 { 599 TRACE(("xsi_msgget: key = %d, flags = %d\n", (int)key, flags)); 600 XsiMessageQueue *messageQueue = NULL; 601 Ipc *ipcKey = NULL; 602 // Default assumptions 603 bool isPrivate = true; 604 bool create = true; 605 606 if (key != IPC_PRIVATE) { 607 isPrivate = false; 608 // Check if key already exist, if it does it already has a message 609 // queue associated with it 610 ipcKey = sIpcHashTable.Lookup(key); 611 if (ipcKey == NULL || ipcKey->MessageQueueID() == -1) { 612 if (!(flags & IPC_CREAT)) { 613 TRACE(("xsi_msgget: key %d does not exist, but the " 614 "caller did not ask for creation\n", (int)key)); 615 return ENOENT; 616 } 617 if (ipcKey == NULL) { 618 ipcKey = new(std::nothrow) Ipc(key); 619 if (ipcKey == NULL) { 620 TRACE(("xsi_msgget: failed to create new Ipc object " 621 "for key %d\n", (int)key)); 622 return ENOMEM; 623 } 624 sIpcHashTable.Insert(ipcKey); 625 } 626 } else { 627 // The IPC key exist and it already has a message queue 628 if ((flags & IPC_CREAT) && (flags & IPC_EXCL)) { 629 TRACE_ERROR(("xsi_msgget: key %d already exist\n", (int)key)); 630 return EEXIST; 631 } 632 int messageQueueID = ipcKey->MessageQueueID(); 633 634 MutexLocker _(sXsiMessageQueueLock); 635 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 636 if (!messageQueue->HasPermission()) { 637 TRACE(("xsi_msgget: calling process has not permission " 638 "on message queue %d, key %d\n", messageQueue->ID(), 639 (int)key)); 640 return EACCES; 641 } 642 create = false; 643 } 644 } 645 646 if (create) { 647 // Create a new message queue for this key 648 if (atomic_get(&sXsiMessageQueueCount) >= MAX_XSI_MESSAGE_QUEUE) { 649 TRACE_ERROR(("xsi_msgget: reached limit of maximun number of " 650 "message queues\n")); 651 return ENOSPC; 652 } 653 654 messageQueue = new(std::nothrow) XsiMessageQueue(flags); 655 if (messageQueue == NULL) { 656 TRACE_ERROR(("xsi_msgget: failed to allocate new xsi " 657 "message queue\n")); 658 return ENOMEM; 659 } 660 atomic_add(&sXsiMessageQueueCount, 1); 661 662 MutexLocker _(sXsiMessageQueueLock); 663 messageQueue->SetID(); 664 if (isPrivate) 665 messageQueue->SetIpcKey((key_t)-1); 666 else { 667 messageQueue->SetIpcKey(key); 668 ipcKey->SetMessageQueueID(messageQueue); 669 } 670 sMessageQueueHashTable.Insert(messageQueue); 671 } 672 673 return messageQueue->ID(); 674 } 675 676 677 ssize_t 678 _user_xsi_msgrcv(int messageQueueID, void *messagePointer, 679 size_t messageSize, long messageType, int messageFlags) 680 { 681 TRACE(("xsi_msgrcv: messageQueueID = %d, messageSize = %ld\n", 682 messageQueueID, messageSize)); 683 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock); 684 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 685 if (messageQueue == NULL) { 686 TRACE(("xsi_msgrcv: message queue id %d not valid\n", 687 messageQueueID)); 688 return EINVAL; 689 } 690 MutexLocker messageQueueLocker(messageQueue->Lock()); 691 messageQueueHashLocker.Unlock(); 692 693 if (messageSize > MAX_BYTES_PER_QUEUE) { 694 TRACE_ERROR(("xsi_msgrcv: message size is out of range\n")); 695 return EINVAL; 696 } 697 if (!messageQueue->HasPermission()) { 698 TRACE(("xsi_msgrcv: calling process has not permission " 699 "on message queue id %d, key %d\n", messageQueueID, 700 (int)messageQueue->IpcKey())); 701 return EACCES; 702 } 703 if (!IS_USER_ADDRESS(messagePointer)) { 704 TRACE(("xsi_msgrcv: message address is not valid\n")); 705 return B_BAD_ADDRESS; 706 } 707 708 queued_message *message = NULL; 709 while (true) { 710 message = messageQueue->Remove(messageType); 711 712 if (message == NULL && !(messageFlags & IPC_NOWAIT)) { 713 // We are going to sleep 714 ConditionVariableEntry queueEntry; 715 messageQueue->Enqueue(&queueEntry, /* waitForMessage */ true); 716 717 uint32 sequenceNumber = messageQueue->SequenceNumber(); 718 719 TRACE(("xsi_msgrcv: thread %d going to sleep\n", (int)thread_get_current_thread_id())); 720 status_t result 721 = messageQueue->BlockAndUnlock(&queueEntry, &messageQueueLocker); 722 TRACE(("xsi_msgrcv: thread %d back to life\n", (int)thread_get_current_thread_id())); 723 724 messageQueueHashLocker.Lock(); 725 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 726 if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL 727 && sequenceNumber != messageQueue->SequenceNumber())) { 728 TRACE(("xsi_msgrcv: message queue id %d (sequence = " 729 "%" B_PRIu32 ") got destroyed\n", messageQueueID, 730 sequenceNumber)); 731 return EIDRM; 732 } else if (result == B_INTERRUPTED) { 733 TRACE(("xsi_msgrcv: thread %d got interrupted while " 734 "waiting on message queue %d\n", (int)thread_get_current_thread_id(), 735 messageQueueID)); 736 messageQueue->Dequeue(&queueEntry, /* waitForMessage */ true); 737 return EINTR; 738 } else { 739 messageQueueLocker.Lock(); 740 messageQueueHashLocker.Unlock(); 741 } 742 } else if (message == NULL) { 743 // There is not message of type requested and 744 // we can't wait 745 return ENOMSG; 746 } else { 747 // Message received correctly (so far) 748 if ((ssize_t)messageSize < message->length 749 && !(messageFlags & MSG_NOERROR)) { 750 TRACE_ERROR(("xsi_msgrcv: message too big!\n")); 751 // Put the message back inside. Since we hold the 752 // queue message lock, not one else could have filled 753 // up the queue meanwhile 754 messageQueue->Insert(message); 755 return E2BIG; 756 } 757 758 ssize_t result 759 = message->copy_to_user_buffer(messagePointer, messageSize); 760 if (result < 0) { 761 messageQueue->Insert(message); 762 return B_BAD_ADDRESS; 763 } 764 765 delete message; 766 TRACE(("xsi_msgrcv: message received correctly\n")); 767 return result; 768 } 769 } 770 771 return B_OK; 772 } 773 774 775 int 776 _user_xsi_msgsnd(int messageQueueID, const void *messagePointer, 777 size_t messageSize, int messageFlags) 778 { 779 TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %ld\n", 780 messageQueueID, messageSize)); 781 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock); 782 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 783 if (messageQueue == NULL) { 784 TRACE(("xsi_msgsnd: message queue id %d not valid\n", 785 messageQueueID)); 786 return EINVAL; 787 } 788 MutexLocker messageQueueLocker(messageQueue->Lock()); 789 messageQueueHashLocker.Unlock(); 790 791 if (messageSize > MAX_BYTES_PER_QUEUE) { 792 TRACE_ERROR(("xsi_msgsnd: message size is out of range\n")); 793 return EINVAL; 794 } 795 if (!messageQueue->HasPermission()) { 796 TRACE(("xsi_msgsnd: calling process has not permission " 797 "on message queue id %d, key %d\n", messageQueueID, 798 (int)messageQueue->IpcKey())); 799 return EACCES; 800 } 801 if (!IS_USER_ADDRESS(messagePointer)) { 802 TRACE(("xsi_msgsnd: message address is not valid\n")); 803 return B_BAD_ADDRESS; 804 } 805 806 queued_message *message 807 = new(std::nothrow) queued_message(messagePointer, messageSize); 808 if (message == NULL || message->initOK != true) { 809 TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n")); 810 delete message; 811 return ENOMEM; 812 } 813 814 bool notSent = true; 815 status_t result = B_OK; 816 while (notSent) { 817 bool goToSleep = messageQueue->Insert(message); 818 819 if (goToSleep && !(messageFlags & IPC_NOWAIT)) { 820 // We are going to sleep 821 ConditionVariableEntry queueEntry; 822 messageQueue->Enqueue(&queueEntry, /* waitForMessage */ false); 823 824 uint32 sequenceNumber = messageQueue->SequenceNumber(); 825 826 TRACE(("xsi_msgsnd: thread %d going to sleep\n", (int)thread_get_current_thread_id())); 827 result = messageQueue->BlockAndUnlock(&queueEntry, &messageQueueLocker); 828 TRACE(("xsi_msgsnd: thread %d back to life\n", (int)thread_get_current_thread_id())); 829 830 messageQueueHashLocker.Lock(); 831 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 832 if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL 833 && sequenceNumber != messageQueue->SequenceNumber())) { 834 TRACE(("xsi_msgsnd: message queue id %d (sequence = " 835 "%" B_PRIu32 ") got destroyed\n", messageQueueID, 836 sequenceNumber)); 837 delete message; 838 notSent = false; 839 result = EIDRM; 840 } else if (result == B_INTERRUPTED) { 841 TRACE(("xsi_msgsnd: thread %d got interrupted while " 842 "waiting on message queue %d\n", (int)thread_get_current_thread_id(), 843 messageQueueID)); 844 messageQueue->Dequeue(&queueEntry, /* waitForMessage */ false); 845 delete message; 846 notSent = false; 847 result = EINTR; 848 } else { 849 messageQueueLocker.Lock(); 850 messageQueueHashLocker.Unlock(); 851 } 852 } else if (goToSleep) { 853 // We did not send the message and we can't wait 854 delete message; 855 notSent = false; 856 result = EAGAIN; 857 } else { 858 // Message delivered correctly 859 TRACE(("xsi_msgsnd: message sent correctly\n")); 860 notSent = false; 861 } 862 } 863 864 return result; 865 } 866