1 /* 2 * Copyright 2008-2011, Haiku, Inc. All rights reserved. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Salvatore Benedetto <salvatore.benedetto@gmail.com> 7 */ 8 9 #include <posix/xsi_message_queue.h> 10 11 #include <new> 12 13 #include <sys/ipc.h> 14 #include <sys/types.h> 15 16 #include <OS.h> 17 18 #include <kernel.h> 19 #include <syscall_restart.h> 20 21 #include <util/atomic.h> 22 #include <util/AutoLock.h> 23 #include <util/DoublyLinkedList.h> 24 #include <util/OpenHashTable.h> 25 26 27 #define TRACE_XSI_MSG_QUEUE 28 #ifdef TRACE_XSI_MSG_QUEUE 29 # define TRACE(x) dprintf x 30 # define TRACE_ERROR(x) dprintf x 31 #else 32 # define TRACE(x) /* nothing */ 33 # define TRACE_ERROR(x) dprintf x 34 #endif 35 36 37 namespace { 38 39 // Queue for holding blocked threads 40 struct queued_thread : DoublyLinkedListLinkImpl<queued_thread> { 41 queued_thread(Thread *_thread, int32 _message_length) 42 : 43 thread(_thread), 44 message_length(_message_length), 45 queued(false) 46 { 47 } 48 49 Thread *thread; 50 int32 message_length; 51 bool queued; 52 }; 53 54 typedef DoublyLinkedList<queued_thread> ThreadQueue; 55 56 57 struct queued_message : DoublyLinkedListLinkImpl<queued_message> { 58 queued_message(const void *_message, ssize_t _length) 59 : 60 initOK(false), 61 length(_length) 62 { 63 message = (char *)malloc(sizeof(char) * _length); 64 if (message == NULL) 65 return; 66 67 if (user_memcpy(&type, _message, sizeof(long)) != B_OK 68 || user_memcpy(message, (void *)((char *)_message + sizeof(long)), 69 _length) != B_OK) { 70 free(message); 71 return; 72 } 73 initOK = true; 74 } 75 76 ~queued_message() 77 { 78 if (initOK) 79 free(message); 80 } 81 82 ssize_t copy_to_user_buffer(void *_message, ssize_t _length) 83 { 84 if (_length > length) 85 _length = length; 86 87 if (user_memcpy(_message, &type, sizeof(long)) != B_OK 88 || user_memcpy((void *)((char *)_message + sizeof(long)), message, 89 _length) != B_OK) 90 return B_ERROR; 91 return _length; 92 } 93 94 bool initOK; 95 ssize_t length; 96 char *message; 97 long type; 98 }; 99 100 typedef DoublyLinkedList<queued_message> MessageQueue; 101 102 // Arbitrary limit 103 #define MAX_BYTES_PER_QUEUE 2048 104 105 class XsiMessageQueue { 106 public: 107 XsiMessageQueue(int flags) 108 : 109 fBytesInQueue(0), 110 fThreadsWaitingToReceive(0), 111 fThreadsWaitingToSend(0) 112 { 113 mutex_init(&fLock, "XsiMessageQueue private mutex"); 114 SetIpcKey((key_t)-1); 115 SetPermissions(flags); 116 // Initialize all fields to zero 117 memset((void *)&fMessageQueue, 0, sizeof(struct msqid_ds)); 118 fMessageQueue.msg_ctime = (time_t)real_time_clock(); 119 fMessageQueue.msg_qbytes = MAX_BYTES_PER_QUEUE; 120 } 121 122 // Implemented after sXsiMessageCount is declared 123 ~XsiMessageQueue(); 124 125 status_t BlockAndUnlock(Thread *thread, MutexLocker *queueLocker) 126 { 127 thread_prepare_to_block(thread, B_CAN_INTERRUPT, 128 THREAD_BLOCK_TYPE_OTHER, (void*)"xsi message queue"); 129 // Unlock the queue before blocking 130 queueLocker->Unlock(); 131 132 // TODO: We've got a serious race condition: If BlockAndUnlock() returned due to 133 // interruption, we will still be queued. A WakeUpThread() at this point will 134 // call thread_unblock() and might thus screw with our trying to re-lock the 135 // mutex. 136 return thread_block(); 137 } 138 139 void DoIpcSet(struct msqid_ds *result) 140 { 141 fMessageQueue.msg_perm.uid = result->msg_perm.uid; 142 fMessageQueue.msg_perm.gid = result->msg_perm.gid; 143 fMessageQueue.msg_perm.mode = (fMessageQueue.msg_perm.mode & ~0x01ff) 144 | (result->msg_perm.mode & 0x01ff); 145 fMessageQueue.msg_qbytes = result->msg_qbytes; 146 fMessageQueue.msg_ctime = (time_t)real_time_clock(); 147 } 148 149 void Deque(queued_thread *queueEntry, bool waitForMessage) 150 { 151 if (queueEntry->queued) { 152 if (waitForMessage) { 153 fWaitingToReceive.Remove(queueEntry); 154 fThreadsWaitingToReceive--; 155 } else { 156 fWaitingToSend.Remove(queueEntry); 157 fThreadsWaitingToSend--; 158 } 159 } 160 } 161 162 void Enqueue(queued_thread *queueEntry, bool waitForMessage) 163 { 164 if (waitForMessage) { 165 fWaitingToReceive.Add(queueEntry); 166 fThreadsWaitingToReceive++; 167 } else { 168 fWaitingToSend.Add(queueEntry); 169 fThreadsWaitingToSend++; 170 } 171 queueEntry->queued = true; 172 } 173 174 struct msqid_ds &GetMessageQueue() 175 { 176 return fMessageQueue; 177 } 178 179 bool HasPermission() const 180 { 181 if ((fMessageQueue.msg_perm.mode & S_IWOTH) != 0) 182 return true; 183 184 uid_t uid = geteuid(); 185 if (uid == 0 || (uid == fMessageQueue.msg_perm.uid 186 && (fMessageQueue.msg_perm.mode & S_IWUSR) != 0)) 187 return true; 188 189 gid_t gid = getegid(); 190 if (gid == fMessageQueue.msg_perm.gid 191 && (fMessageQueue.msg_perm.mode & S_IWGRP) != 0) 192 return true; 193 194 return false; 195 } 196 197 bool HasReadPermission() const 198 { 199 // TODO: fix this 200 return HasPermission(); 201 } 202 203 int ID() const 204 { 205 return fID; 206 } 207 208 // Implemented after sXsiMessageCount is declared 209 bool Insert(queued_message *message); 210 211 key_t IpcKey() const 212 { 213 return fMessageQueue.msg_perm.key; 214 } 215 216 mutex &Lock() 217 { 218 return fLock; 219 } 220 221 msglen_t MaxBytes() const 222 { 223 return fMessageQueue.msg_qbytes; 224 } 225 226 // Implemented after sXsiMessageCount is declared 227 queued_message *Remove(long typeRequested); 228 229 uint32 SequenceNumber() const 230 { 231 return fSequenceNumber; 232 } 233 234 // Implemented after sMessageQueueHashTable is declared 235 void SetID(); 236 237 void SetIpcKey(key_t key) 238 { 239 fMessageQueue.msg_perm.key = key; 240 } 241 242 void SetPermissions(int flags) 243 { 244 fMessageQueue.msg_perm.uid = fMessageQueue.msg_perm.cuid = geteuid(); 245 fMessageQueue.msg_perm.gid = fMessageQueue.msg_perm.cgid = getegid(); 246 fMessageQueue.msg_perm.mode = (flags & 0x01ff); 247 } 248 249 void WakeUpThread(bool waitForMessage) 250 { 251 if (waitForMessage) { 252 // Wake up all waiting thread for a message 253 // TODO: this can cause starvation for any 254 // very-unlucky-and-slow thread 255 while (queued_thread *entry = fWaitingToReceive.RemoveHead()) { 256 entry->queued = false; 257 fThreadsWaitingToReceive--; 258 thread_unblock(entry->thread, 0); 259 } 260 } else { 261 // Wake up only one thread waiting to send 262 if (queued_thread *entry = fWaitingToSend.RemoveHead()) { 263 entry->queued = false; 264 fThreadsWaitingToSend--; 265 thread_unblock(entry->thread, 0); 266 } 267 } 268 } 269 270 XsiMessageQueue*& Link() 271 { 272 return fLink; 273 } 274 275 private: 276 msglen_t fBytesInQueue; 277 int fID; 278 mutex fLock; 279 MessageQueue fMessage; 280 struct msqid_ds fMessageQueue; 281 uint32 fSequenceNumber; 282 uint32 fThreadsWaitingToReceive; 283 uint32 fThreadsWaitingToSend; 284 285 ThreadQueue fWaitingToReceive; 286 ThreadQueue fWaitingToSend; 287 288 XsiMessageQueue* fLink; 289 }; 290 291 292 // Xsi message queue hash table 293 struct MessageQueueHashTableDefinition { 294 typedef int KeyType; 295 typedef XsiMessageQueue ValueType; 296 297 size_t HashKey (const int key) const 298 { 299 return (size_t)key; 300 } 301 302 size_t Hash(XsiMessageQueue *variable) const 303 { 304 return (size_t)variable->ID(); 305 } 306 307 bool Compare(const int key, XsiMessageQueue *variable) const 308 { 309 return (int)key == (int)variable->ID(); 310 } 311 312 XsiMessageQueue*& GetLink(XsiMessageQueue *variable) const 313 { 314 return variable->Link(); 315 } 316 }; 317 318 319 // IPC class 320 class Ipc { 321 public: 322 Ipc(key_t key) 323 : fKey(key), 324 fMessageQueueId(-1) 325 { 326 } 327 328 key_t Key() const 329 { 330 return fKey; 331 } 332 333 int MessageQueueID() const 334 { 335 return fMessageQueueId; 336 } 337 338 void SetMessageQueueID(XsiMessageQueue *messageQueue) 339 { 340 fMessageQueueId = messageQueue->ID(); 341 } 342 343 Ipc*& Link() 344 { 345 return fLink; 346 } 347 348 private: 349 key_t fKey; 350 int fMessageQueueId; 351 Ipc* fLink; 352 }; 353 354 355 struct IpcHashTableDefinition { 356 typedef key_t KeyType; 357 typedef Ipc ValueType; 358 359 size_t HashKey (const key_t key) const 360 { 361 return (size_t)(key); 362 } 363 364 size_t Hash(Ipc *variable) const 365 { 366 return (size_t)HashKey(variable->Key()); 367 } 368 369 bool Compare(const key_t key, Ipc *variable) const 370 { 371 return (key_t)key == (key_t)variable->Key(); 372 } 373 374 Ipc*& GetLink(Ipc *variable) const 375 { 376 return variable->Link(); 377 } 378 }; 379 380 } // namespace 381 382 383 // Arbitrary limits 384 #define MAX_XSI_MESSAGE 4096 385 #define MAX_XSI_MESSAGE_QUEUE 1024 386 static BOpenHashTable<IpcHashTableDefinition> sIpcHashTable; 387 static BOpenHashTable<MessageQueueHashTableDefinition> sMessageQueueHashTable; 388 389 static mutex sIpcLock; 390 static mutex sXsiMessageQueueLock; 391 392 static uint32 sGlobalSequenceNumber = 1; 393 static int32 sXsiMessageCount = 0; 394 static int32 sXsiMessageQueueCount = 0; 395 396 397 // #pragma mark - 398 399 400 XsiMessageQueue::~XsiMessageQueue() 401 { 402 mutex_destroy(&fLock); 403 404 // Wake up any threads still waiting 405 if (fThreadsWaitingToSend || fThreadsWaitingToReceive) { 406 while (queued_thread *entry = fWaitingToReceive.RemoveHead()) { 407 entry->queued = false; 408 thread_unblock(entry->thread, EIDRM); 409 } 410 while (queued_thread *entry = fWaitingToSend.RemoveHead()) { 411 entry->queued = false; 412 thread_unblock(entry->thread, EIDRM); 413 } 414 } 415 416 // Free up any remaining messages 417 if (fMessageQueue.msg_qnum) { 418 while (queued_message *message = fMessage.RemoveHead()) { 419 atomic_add(&sXsiMessageCount, -1); 420 delete message; 421 } 422 } 423 } 424 425 426 bool 427 XsiMessageQueue::Insert(queued_message *message) 428 { 429 // The only situation that would make us (potentially) wait 430 // is that we exceed with bytes or with the total number of messages 431 if (fBytesInQueue + message->length > fMessageQueue.msg_qbytes) 432 return true; 433 434 while (true) { 435 int32 oldCount = atomic_get(&sXsiMessageCount); 436 if (oldCount >= MAX_XSI_MESSAGE) 437 return true; 438 // If another thread updates the counter we keep 439 // iterating 440 if (atomic_test_and_set(&sXsiMessageCount, oldCount + 1, oldCount) 441 == oldCount) 442 break; 443 } 444 445 fMessage.Add(message); 446 fMessageQueue.msg_qnum++; 447 fMessageQueue.msg_lspid = getpid(); 448 fMessageQueue.msg_stime = real_time_clock(); 449 fBytesInQueue += message->length; 450 if (fThreadsWaitingToReceive) 451 WakeUpThread(true /* WaitForMessage */); 452 return false; 453 } 454 455 456 queued_message* 457 XsiMessageQueue::Remove(long typeRequested) 458 { 459 queued_message *message = NULL; 460 if (typeRequested < 0) { 461 // Return first message of the lowest type 462 // that is less than or equal to the absolute 463 // value of type requested. 464 MessageQueue::Iterator iterator = fMessage.GetIterator(); 465 while (iterator.HasNext()) { 466 queued_message *current = iterator.Next(); 467 if (current->type <= -typeRequested) { 468 message = iterator.Remove(); 469 break; 470 } 471 } 472 } else if (typeRequested == 0) { 473 // Return the first message on the queue 474 message = fMessage.RemoveHead(); 475 } else { 476 // Return the first message of type requested 477 MessageQueue::Iterator iterator = fMessage.GetIterator(); 478 while (iterator.HasNext()) { 479 queued_message *current = iterator.Next(); 480 if (current->type == typeRequested) { 481 message = iterator.Remove(); 482 break; 483 } 484 } 485 } 486 487 if (message == NULL) 488 return NULL; 489 490 fMessageQueue.msg_qnum--; 491 fMessageQueue.msg_lrpid = getpid(); 492 fMessageQueue.msg_rtime = real_time_clock(); 493 fBytesInQueue -= message->length; 494 atomic_add(&sXsiMessageCount, -1); 495 if (fThreadsWaitingToSend) 496 WakeUpThread(false /* WaitForMessage */); 497 return message; 498 } 499 500 501 void 502 XsiMessageQueue::SetID() 503 { 504 fID = real_time_clock(); 505 // The lock is held before calling us 506 while (true) { 507 if (sMessageQueueHashTable.Lookup(fID) == NULL) 508 break; 509 fID++; 510 } 511 sGlobalSequenceNumber = (sGlobalSequenceNumber + 1) % UINT_MAX; 512 fSequenceNumber = sGlobalSequenceNumber; 513 } 514 515 516 // #pragma mark - Kernel exported API 517 518 519 void 520 xsi_msg_init() 521 { 522 // Initialize hash tables 523 status_t status = sIpcHashTable.Init(); 524 if (status != B_OK) 525 panic("xsi_msg_init() failed to initialize ipc hash table\n"); 526 status = sMessageQueueHashTable.Init(); 527 if (status != B_OK) 528 panic("xsi_msg_init() failed to initialize message queue hash table\n"); 529 530 mutex_init(&sIpcLock, "global POSIX message queue IPC table"); 531 mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table"); 532 } 533 534 535 // #pragma mark - Syscalls 536 537 538 int 539 _user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer) 540 { 541 TRACE(("xsi_msgctl: messageQueueID = %d, command = %d\n", messageQueueID, command)); 542 MutexLocker ipcHashLocker(sIpcLock); 543 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock); 544 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 545 if (messageQueue == NULL) { 546 TRACE_ERROR(("xsi_msgctl: message queue id %d not valid\n", messageQueueID)); 547 return EINVAL; 548 } 549 if (!IS_USER_ADDRESS(buffer)) { 550 TRACE_ERROR(("xsi_msgctl: buffer address is not valid\n")); 551 return B_BAD_ADDRESS; 552 } 553 554 // Lock the message queue itself and release both the ipc hash table lock 555 // and the message queue hash table lock _only_ if the command it's not 556 // IPC_RMID, this prevents undesidered situation from happening while 557 // (hopefully) improving the concurrency. 558 MutexLocker messageQueueLocker; 559 if (command != IPC_RMID) { 560 messageQueueLocker.SetTo(&messageQueue->Lock(), false); 561 messageQueueHashLocker.Unlock(); 562 ipcHashLocker.Unlock(); 563 } else 564 // Since we are going to delete the message queue object 565 // along with its mutex, we can't use a MutexLocker object, 566 // as the mutex itself won't exist on function exit 567 mutex_lock(&messageQueue->Lock()); 568 569 switch (command) { 570 case IPC_STAT: { 571 if (!messageQueue->HasReadPermission()) { 572 TRACE_ERROR(("xsi_msgctl: calling process has not read " 573 "permission on message queue %d, key %d\n", messageQueueID, 574 (int)messageQueue->IpcKey())); 575 return EACCES; 576 } 577 struct msqid_ds msg = messageQueue->GetMessageQueue(); 578 if (user_memcpy(buffer, &msg, sizeof(struct msqid_ds)) < B_OK) { 579 TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n")); 580 return B_BAD_ADDRESS; 581 } 582 break; 583 } 584 585 case IPC_SET: { 586 if (!messageQueue->HasPermission()) { 587 TRACE_ERROR(("xsi_msgctl: calling process has not permission " 588 "on message queue %d, key %d\n", messageQueueID, 589 (int)messageQueue->IpcKey())); 590 return EPERM; 591 } 592 struct msqid_ds msg; 593 if (user_memcpy(&msg, buffer, sizeof(struct msqid_ds)) < B_OK) { 594 TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n")); 595 return B_BAD_ADDRESS; 596 } 597 if (msg.msg_qbytes > messageQueue->MaxBytes() && getuid() != 0) { 598 TRACE_ERROR(("xsi_msgctl: user does not have permission to " 599 "increase the maximum number of bytes allowed on queue\n")); 600 return EPERM; 601 } 602 if (msg.msg_qbytes == 0) { 603 TRACE_ERROR(("xsi_msgctl: can't set msg_qbytes to 0!\n")); 604 return EINVAL; 605 } 606 607 messageQueue->DoIpcSet(&msg); 608 break; 609 } 610 611 case IPC_RMID: { 612 // If this was the command, we are still holding the message 613 // queue hash table lock along with the ipc one, but not the 614 // message queue lock itself. This prevents other process 615 // to try and acquire a destroyed mutex 616 if (!messageQueue->HasPermission()) { 617 TRACE_ERROR(("xsi_msgctl: calling process has not permission " 618 "on message queue %d, key %d\n", messageQueueID, 619 (int)messageQueue->IpcKey())); 620 return EPERM; 621 } 622 key_t key = messageQueue->IpcKey(); 623 Ipc *ipcKey = NULL; 624 if (key != -1) { 625 ipcKey = sIpcHashTable.Lookup(key); 626 sIpcHashTable.Remove(ipcKey); 627 } 628 sMessageQueueHashTable.Remove(messageQueue); 629 // Wake up of any threads waiting on this 630 // queue happens in destructor 631 if (key != -1) 632 delete ipcKey; 633 atomic_add(&sXsiMessageQueueCount, -1); 634 635 delete messageQueue; 636 break; 637 } 638 639 default: 640 TRACE_ERROR(("xsi_semctl: command %d not valid\n", command)); 641 return EINVAL; 642 } 643 644 return B_OK; 645 } 646 647 648 int 649 _user_xsi_msgget(key_t key, int flags) 650 { 651 TRACE(("xsi_msgget: key = %d, flags = %d\n", (int)key, flags)); 652 XsiMessageQueue *messageQueue = NULL; 653 Ipc *ipcKey = NULL; 654 // Default assumptions 655 bool isPrivate = true; 656 bool create = true; 657 658 if (key != IPC_PRIVATE) { 659 isPrivate = false; 660 // Check if key already exist, if it does it already has a message 661 // queue associated with it 662 ipcKey = sIpcHashTable.Lookup(key); 663 if (ipcKey == NULL || ipcKey->MessageQueueID() == -1) { 664 if (!(flags & IPC_CREAT)) { 665 TRACE_ERROR(("xsi_msgget: key %d does not exist, but the " 666 "caller did not ask for creation\n", (int)key)); 667 return ENOENT; 668 } 669 if (ipcKey == NULL) { 670 ipcKey = new(std::nothrow) Ipc(key); 671 if (ipcKey == NULL) { 672 TRACE_ERROR(("xsi_msgget: failed to create new Ipc object " 673 "for key %d\n", (int)key)); 674 return ENOMEM; 675 } 676 sIpcHashTable.Insert(ipcKey); 677 } 678 } else { 679 // The IPC key exist and it already has a message queue 680 if ((flags & IPC_CREAT) && (flags & IPC_EXCL)) { 681 TRACE_ERROR(("xsi_msgget: key %d already exist\n", (int)key)); 682 return EEXIST; 683 } 684 int messageQueueID = ipcKey->MessageQueueID(); 685 686 MutexLocker _(sXsiMessageQueueLock); 687 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 688 if (!messageQueue->HasPermission()) { 689 TRACE_ERROR(("xsi_msgget: calling process has not permission " 690 "on message queue %d, key %d\n", messageQueue->ID(), 691 (int)key)); 692 return EACCES; 693 } 694 create = false; 695 } 696 } 697 698 if (create) { 699 // Create a new message queue for this key 700 if (atomic_get(&sXsiMessageQueueCount) >= MAX_XSI_MESSAGE_QUEUE) { 701 TRACE_ERROR(("xsi_msgget: reached limit of maximun number of " 702 "message queues\n")); 703 return ENOSPC; 704 } 705 706 messageQueue = new(std::nothrow) XsiMessageQueue(flags); 707 if (messageQueue == NULL) { 708 TRACE_ERROR(("xsi_msgget: failed to allocate new xsi " 709 "message queue\n")); 710 return ENOMEM; 711 } 712 atomic_add(&sXsiMessageQueueCount, 1); 713 714 MutexLocker _(sXsiMessageQueueLock); 715 messageQueue->SetID(); 716 if (isPrivate) 717 messageQueue->SetIpcKey((key_t)-1); 718 else { 719 messageQueue->SetIpcKey(key); 720 ipcKey->SetMessageQueueID(messageQueue); 721 } 722 sMessageQueueHashTable.Insert(messageQueue); 723 } 724 725 return messageQueue->ID(); 726 } 727 728 729 ssize_t 730 _user_xsi_msgrcv(int messageQueueID, void *messagePointer, 731 size_t messageSize, long messageType, int messageFlags) 732 { 733 TRACE(("xsi_msgrcv: messageQueueID = %d, messageSize = %ld\n", 734 messageQueueID, messageSize)); 735 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock); 736 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 737 if (messageQueue == NULL) { 738 TRACE_ERROR(("xsi_msgrcv: message queue id %d not valid\n", 739 messageQueueID)); 740 return EINVAL; 741 } 742 MutexLocker messageQueueLocker(messageQueue->Lock()); 743 messageQueueHashLocker.Unlock(); 744 745 if (messageSize > MAX_BYTES_PER_QUEUE) { 746 TRACE_ERROR(("xsi_msgrcv: message size is out of range\n")); 747 return EINVAL; 748 } 749 if (!messageQueue->HasPermission()) { 750 TRACE_ERROR(("xsi_msgrcv: calling process has not permission " 751 "on message queue id %d, key %d\n", messageQueueID, 752 (int)messageQueue->IpcKey())); 753 return EACCES; 754 } 755 if (!IS_USER_ADDRESS(messagePointer)) { 756 TRACE_ERROR(("xsi_msgrcv: message address is not valid\n")); 757 return B_BAD_ADDRESS; 758 } 759 760 queued_message *message = NULL; 761 while (true) { 762 message = messageQueue->Remove(messageType); 763 764 if (message == NULL && !(messageFlags & IPC_NOWAIT)) { 765 // We are going to sleep 766 Thread *thread = thread_get_current_thread(); 767 queued_thread queueEntry(thread, messageSize); 768 messageQueue->Enqueue(&queueEntry, /* waitForMessage */ true); 769 770 uint32 sequenceNumber = messageQueue->SequenceNumber(); 771 772 TRACE(("xsi_msgrcv: thread %d going to sleep\n", (int)thread->id)); 773 status_t result 774 = messageQueue->BlockAndUnlock(thread, &messageQueueLocker); 775 TRACE(("xsi_msgrcv: thread %d back to life\n", (int)thread->id)); 776 777 messageQueueHashLocker.Lock(); 778 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 779 if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL 780 && sequenceNumber != messageQueue->SequenceNumber())) { 781 TRACE_ERROR(("xsi_msgrcv: message queue id %d (sequence = " 782 "%" B_PRIu32 ") got destroyed\n", messageQueueID, 783 sequenceNumber)); 784 return EIDRM; 785 } else if (result == B_INTERRUPTED) { 786 TRACE_ERROR(("xsi_msgrcv: thread %d got interrupted while " 787 "waiting on message queue %d\n",(int)thread->id, 788 messageQueueID)); 789 messageQueue->Deque(&queueEntry, /* waitForMessage */ true); 790 return EINTR; 791 } else { 792 messageQueueLocker.Lock(); 793 messageQueueHashLocker.Unlock(); 794 } 795 } else if (message == NULL) { 796 // There is not message of type requested and 797 // we can't wait 798 return ENOMSG; 799 } else { 800 // Message received correctly (so far) 801 if ((ssize_t)messageSize < message->length 802 && !(messageFlags & MSG_NOERROR)) { 803 TRACE_ERROR(("xsi_msgrcv: message too big!\n")); 804 // Put the message back inside. Since we hold the 805 // queue message lock, not one else could have filled 806 // up the queue meanwhile 807 messageQueue->Insert(message); 808 return E2BIG; 809 } 810 811 ssize_t result 812 = message->copy_to_user_buffer(messagePointer, messageSize); 813 if (result < 0) { 814 messageQueue->Insert(message); 815 return B_BAD_ADDRESS; 816 } 817 818 delete message; 819 TRACE(("xsi_msgrcv: message received correctly\n")); 820 return result; 821 } 822 } 823 824 return B_OK; 825 } 826 827 828 int 829 _user_xsi_msgsnd(int messageQueueID, const void *messagePointer, 830 size_t messageSize, int messageFlags) 831 { 832 TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %ld\n", 833 messageQueueID, messageSize)); 834 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock); 835 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 836 if (messageQueue == NULL) { 837 TRACE_ERROR(("xsi_msgsnd: message queue id %d not valid\n", 838 messageQueueID)); 839 return EINVAL; 840 } 841 MutexLocker messageQueueLocker(messageQueue->Lock()); 842 messageQueueHashLocker.Unlock(); 843 844 if (messageSize > MAX_BYTES_PER_QUEUE) { 845 TRACE_ERROR(("xsi_msgsnd: message size is out of range\n")); 846 return EINVAL; 847 } 848 if (!messageQueue->HasPermission()) { 849 TRACE_ERROR(("xsi_msgsnd: calling process has not permission " 850 "on message queue id %d, key %d\n", messageQueueID, 851 (int)messageQueue->IpcKey())); 852 return EACCES; 853 } 854 if (!IS_USER_ADDRESS(messagePointer)) { 855 TRACE_ERROR(("xsi_msgsnd: message address is not valid\n")); 856 return B_BAD_ADDRESS; 857 } 858 859 queued_message *message 860 = new(std::nothrow) queued_message(messagePointer, messageSize); 861 if (message == NULL || message->initOK != true) { 862 TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n")); 863 delete message; 864 return ENOMEM; 865 } 866 867 bool notSent = true; 868 status_t result = B_OK; 869 while (notSent) { 870 bool goToSleep = messageQueue->Insert(message); 871 872 if (goToSleep && !(messageFlags & IPC_NOWAIT)) { 873 // We are going to sleep 874 Thread *thread = thread_get_current_thread(); 875 queued_thread queueEntry(thread, messageSize); 876 messageQueue->Enqueue(&queueEntry, /* waitForMessage */ false); 877 878 uint32 sequenceNumber = messageQueue->SequenceNumber(); 879 880 TRACE(("xsi_msgsnd: thread %d going to sleep\n", (int)thread->id)); 881 result = messageQueue->BlockAndUnlock(thread, &messageQueueLocker); 882 TRACE(("xsi_msgsnd: thread %d back to life\n", (int)thread->id)); 883 884 messageQueueHashLocker.Lock(); 885 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 886 if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL 887 && sequenceNumber != messageQueue->SequenceNumber())) { 888 TRACE_ERROR(("xsi_msgsnd: message queue id %d (sequence = " 889 "%" B_PRIu32 ") got destroyed\n", messageQueueID, 890 sequenceNumber)); 891 delete message; 892 notSent = false; 893 result = EIDRM; 894 } else if (result == B_INTERRUPTED) { 895 TRACE_ERROR(("xsi_msgsnd: thread %d got interrupted while " 896 "waiting on message queue %d\n",(int)thread->id, 897 messageQueueID)); 898 messageQueue->Deque(&queueEntry, /* waitForMessage */ false); 899 delete message; 900 notSent = false; 901 result = EINTR; 902 } else { 903 messageQueueLocker.Lock(); 904 messageQueueHashLocker.Unlock(); 905 } 906 } else if (goToSleep) { 907 // We did not send the message and we can't wait 908 delete message; 909 notSent = false; 910 result = EAGAIN; 911 } else { 912 // Message delivered correctly 913 TRACE(("xsi_msgsnd: message sent correctly\n")); 914 notSent = false; 915 } 916 } 917 918 return result; 919 } 920