1 /* 2 * Copyright 2008-2011, Haiku, Inc. All rights reserved. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Salvatore Benedetto <salvatore.benedetto@gmail.com> 7 */ 8 9 #include <posix/xsi_message_queue.h> 10 11 #include <new> 12 13 #include <sys/ipc.h> 14 #include <sys/types.h> 15 16 #include <OS.h> 17 18 #include <kernel.h> 19 #include <syscall_restart.h> 20 21 #include <util/atomic.h> 22 #include <util/AutoLock.h> 23 #include <util/DoublyLinkedList.h> 24 #include <util/OpenHashTable.h> 25 26 27 #define TRACE_XSI_MSG_QUEUE 28 #ifdef TRACE_XSI_MSG_QUEUE 29 # define TRACE(x) dprintf x 30 # define TRACE_ERROR(x) dprintf x 31 #else 32 # define TRACE(x) /* nothing */ 33 # define TRACE_ERROR(x) dprintf x 34 #endif 35 36 // Queue for holding blocked threads 37 struct queued_thread : DoublyLinkedListLinkImpl<queued_thread> { 38 queued_thread(Thread *_thread, int32 _message_length) 39 : 40 thread(_thread), 41 message_length(_message_length), 42 queued(false) 43 { 44 } 45 46 Thread *thread; 47 int32 message_length; 48 bool queued; 49 }; 50 51 typedef DoublyLinkedList<queued_thread> ThreadQueue; 52 53 54 struct queued_message : DoublyLinkedListLinkImpl<queued_message> { 55 queued_message(const void *_message, ssize_t _length) 56 : 57 initOK(false), 58 length(_length) 59 { 60 message = (char *)malloc(sizeof(char) * _length); 61 if (message == NULL) 62 return; 63 64 if (user_memcpy(&type, _message, sizeof(long)) != B_OK 65 || user_memcpy(message, (void *)((char *)_message + sizeof(long)), 66 _length) != B_OK) { 67 free(message); 68 return; 69 } 70 initOK = true; 71 } 72 73 ~queued_message() 74 { 75 if (initOK) 76 free(message); 77 } 78 79 ssize_t copy_to_user_buffer(void *_message, ssize_t _length) 80 { 81 if (_length > length) 82 _length = length; 83 84 if (user_memcpy(_message, &type, sizeof(long)) != B_OK 85 || user_memcpy((void *)((char *)_message + sizeof(long)), message, 86 _length) != B_OK) 87 return B_ERROR; 88 return _length; 89 } 90 91 bool initOK; 92 ssize_t length; 93 char *message; 94 long type; 95 }; 96 97 typedef DoublyLinkedList<queued_message> MessageQueue; 98 99 // Arbitrary limit 100 #define MAX_BYTES_PER_QUEUE 2048 101 102 class XsiMessageQueue { 103 public: 104 XsiMessageQueue(int flags) 105 : 106 fBytesInQueue(0), 107 fThreadsWaitingToReceive(0), 108 fThreadsWaitingToSend(0) 109 { 110 mutex_init(&fLock, "XsiMessageQueue private mutex"); 111 SetIpcKey((key_t)-1); 112 SetPermissions(flags); 113 // Initialize all fields to zero 114 memset((void *)&fMessageQueue, 0, sizeof(struct msqid_ds)); 115 fMessageQueue.msg_ctime = (time_t)real_time_clock(); 116 fMessageQueue.msg_qbytes = MAX_BYTES_PER_QUEUE; 117 } 118 119 // Implemented after sXsiMessageCount is declared 120 ~XsiMessageQueue(); 121 122 status_t BlockAndUnlock(Thread *thread, MutexLocker *queueLocker) 123 { 124 thread_prepare_to_block(thread, B_CAN_INTERRUPT, 125 THREAD_BLOCK_TYPE_OTHER, (void*)"xsi message queue"); 126 // Unlock the queue before blocking 127 queueLocker->Unlock(); 128 129 InterruptsSpinLocker schedulerLocker(gSchedulerLock); 130 // TODO: We've got a serious race condition: If BlockAndUnlock() returned due to 131 // interruption, we will still be queued. A WakeUpThread() at this point will 132 // call thread_unblock() and might thus screw with our trying to re-lock the 133 // mutex. 134 return thread_block_locked(thread); 135 } 136 137 void DoIpcSet(struct msqid_ds *result) 138 { 139 fMessageQueue.msg_perm.uid = result->msg_perm.uid; 140 fMessageQueue.msg_perm.gid = result->msg_perm.gid; 141 fMessageQueue.msg_perm.mode = (fMessageQueue.msg_perm.mode & ~0x01ff) 142 | (result->msg_perm.mode & 0x01ff); 143 fMessageQueue.msg_qbytes = result->msg_qbytes; 144 fMessageQueue.msg_ctime = (time_t)real_time_clock(); 145 } 146 147 void Deque(queued_thread *queueEntry, bool waitForMessage) 148 { 149 if (queueEntry->queued) { 150 if (waitForMessage) { 151 fWaitingToReceive.Remove(queueEntry); 152 fThreadsWaitingToReceive--; 153 } else { 154 fWaitingToSend.Remove(queueEntry); 155 fThreadsWaitingToSend--; 156 } 157 } 158 } 159 160 void Enqueue(queued_thread *queueEntry, bool waitForMessage) 161 { 162 if (waitForMessage) { 163 fWaitingToReceive.Add(queueEntry); 164 fThreadsWaitingToReceive++; 165 } else { 166 fWaitingToSend.Add(queueEntry); 167 fThreadsWaitingToSend++; 168 } 169 queueEntry->queued = true; 170 } 171 172 struct msqid_ds &GetMessageQueue() 173 { 174 return fMessageQueue; 175 } 176 177 bool HasPermission() const 178 { 179 if ((fMessageQueue.msg_perm.mode & S_IWOTH) != 0) 180 return true; 181 182 uid_t uid = geteuid(); 183 if (uid == 0 || (uid == fMessageQueue.msg_perm.uid 184 && (fMessageQueue.msg_perm.mode & S_IWUSR) != 0)) 185 return true; 186 187 gid_t gid = getegid(); 188 if (gid == fMessageQueue.msg_perm.gid 189 && (fMessageQueue.msg_perm.mode & S_IWGRP) != 0) 190 return true; 191 192 return false; 193 } 194 195 bool HasReadPermission() const 196 { 197 // TODO: fix this 198 return HasPermission(); 199 } 200 201 int ID() const 202 { 203 return fID; 204 } 205 206 // Implemented after sXsiMessageCount is declared 207 bool Insert(queued_message *message); 208 209 key_t IpcKey() const 210 { 211 return fMessageQueue.msg_perm.key; 212 } 213 214 mutex &Lock() 215 { 216 return fLock; 217 } 218 219 msglen_t MaxBytes() const 220 { 221 return fMessageQueue.msg_qbytes; 222 } 223 224 // Implemented after sXsiMessageCount is declared 225 queued_message *Remove(long typeRequested); 226 227 uint32 SequenceNumber() const 228 { 229 return fSequenceNumber; 230 } 231 232 // Implemented after sMessageQueueHashTable is declared 233 void SetID(); 234 235 void SetIpcKey(key_t key) 236 { 237 fMessageQueue.msg_perm.key = key; 238 } 239 240 void SetPermissions(int flags) 241 { 242 fMessageQueue.msg_perm.uid = fMessageQueue.msg_perm.cuid = geteuid(); 243 fMessageQueue.msg_perm.gid = fMessageQueue.msg_perm.cgid = getegid(); 244 fMessageQueue.msg_perm.mode = (flags & 0x01ff); 245 } 246 247 void WakeUpThread(bool waitForMessage) 248 { 249 InterruptsSpinLocker schedulerLocker(gSchedulerLock); 250 251 if (waitForMessage) { 252 // Wake up all waiting thread for a message 253 // TODO: this can cause starvation for any 254 // very-unlucky-and-slow thread 255 while (queued_thread *entry = fWaitingToReceive.RemoveHead()) { 256 entry->queued = false; 257 fThreadsWaitingToReceive--; 258 thread_unblock_locked(entry->thread, 0); 259 } 260 } else { 261 // Wake up only one thread waiting to send 262 if (queued_thread *entry = fWaitingToSend.RemoveHead()) { 263 entry->queued = false; 264 fThreadsWaitingToSend--; 265 thread_unblock_locked(entry->thread, 0); 266 } 267 } 268 } 269 270 XsiMessageQueue*& Link() 271 { 272 return fLink; 273 } 274 275 private: 276 msglen_t fBytesInQueue; 277 int fID; 278 mutex fLock; 279 MessageQueue fMessage; 280 struct msqid_ds fMessageQueue; 281 uint32 fSequenceNumber; 282 uint32 fThreadsWaitingToReceive; 283 uint32 fThreadsWaitingToSend; 284 285 ThreadQueue fWaitingToReceive; 286 ThreadQueue fWaitingToSend; 287 288 XsiMessageQueue* fLink; 289 }; 290 291 292 // Xsi message queue hash table 293 struct MessageQueueHashTableDefinition { 294 typedef int KeyType; 295 typedef XsiMessageQueue ValueType; 296 297 size_t HashKey (const int key) const 298 { 299 return (size_t)key; 300 } 301 302 size_t Hash(XsiMessageQueue *variable) const 303 { 304 return (size_t)variable->ID(); 305 } 306 307 bool Compare(const int key, XsiMessageQueue *variable) const 308 { 309 return (int)key == (int)variable->ID(); 310 } 311 312 XsiMessageQueue*& GetLink(XsiMessageQueue *variable) const 313 { 314 return variable->Link(); 315 } 316 }; 317 318 319 // IPC class 320 class Ipc { 321 public: 322 Ipc(key_t key) 323 : fKey(key), 324 fMessageQueueId(-1) 325 { 326 } 327 328 key_t Key() const 329 { 330 return fKey; 331 } 332 333 int MessageQueueID() const 334 { 335 return fMessageQueueId; 336 } 337 338 void SetMessageQueueID(XsiMessageQueue *messageQueue) 339 { 340 fMessageQueueId = messageQueue->ID(); 341 } 342 343 Ipc*& Link() 344 { 345 return fLink; 346 } 347 348 private: 349 key_t fKey; 350 int fMessageQueueId; 351 Ipc* fLink; 352 }; 353 354 355 struct IpcHashTableDefinition { 356 typedef key_t KeyType; 357 typedef Ipc ValueType; 358 359 size_t HashKey (const key_t key) const 360 { 361 return (size_t)(key); 362 } 363 364 size_t Hash(Ipc *variable) const 365 { 366 return (size_t)HashKey(variable->Key()); 367 } 368 369 bool Compare(const key_t key, Ipc *variable) const 370 { 371 return (key_t)key == (key_t)variable->Key(); 372 } 373 374 Ipc*& GetLink(Ipc *variable) const 375 { 376 return variable->Link(); 377 } 378 }; 379 380 // Arbitrary limits 381 #define MAX_XSI_MESSAGE 4096 382 #define MAX_XSI_MESSAGE_QUEUE 1024 383 static BOpenHashTable<IpcHashTableDefinition> sIpcHashTable; 384 static BOpenHashTable<MessageQueueHashTableDefinition> sMessageQueueHashTable; 385 386 static mutex sIpcLock; 387 static mutex sXsiMessageQueueLock; 388 389 static uint32 sGlobalSequenceNumber = 1; 390 static vint32 sXsiMessageCount = 0; 391 static vint32 sXsiMessageQueueCount = 0; 392 393 394 // #pragma mark - 395 396 397 XsiMessageQueue::~XsiMessageQueue() 398 { 399 mutex_destroy(&fLock); 400 401 // Wake up any threads still waiting 402 if (fThreadsWaitingToSend || fThreadsWaitingToReceive) { 403 InterruptsSpinLocker schedulerLocker(gSchedulerLock); 404 405 while (queued_thread *entry = fWaitingToReceive.RemoveHead()) { 406 entry->queued = false; 407 thread_unblock_locked(entry->thread, EIDRM); 408 } 409 while (queued_thread *entry = fWaitingToSend.RemoveHead()) { 410 entry->queued = false; 411 thread_unblock_locked(entry->thread, EIDRM); 412 } 413 } 414 415 // Free up any remaining messages 416 if (fMessageQueue.msg_qnum) { 417 while (queued_message *message = fMessage.RemoveHead()) { 418 atomic_add(&sXsiMessageCount, -1); 419 delete message; 420 } 421 } 422 } 423 424 425 bool 426 XsiMessageQueue::Insert(queued_message *message) 427 { 428 // The only situation that would make us (potentially) wait 429 // is that we exceed with bytes or with the total number of messages 430 if (fBytesInQueue + message->length > fMessageQueue.msg_qbytes) 431 return true; 432 433 while (true) { 434 int32 oldCount = atomic_get(&sXsiMessageCount); 435 if (oldCount >= MAX_XSI_MESSAGE) 436 return true; 437 // If another thread updates the counter we keep 438 // iterating 439 if (atomic_test_and_set(&sXsiMessageCount, oldCount + 1, oldCount) 440 == oldCount) 441 break; 442 } 443 444 fMessage.Add(message); 445 fMessageQueue.msg_qnum++; 446 fMessageQueue.msg_lspid = getpid(); 447 fMessageQueue.msg_stime = real_time_clock(); 448 fBytesInQueue += message->length; 449 if (fThreadsWaitingToReceive) 450 WakeUpThread(true /* WaitForMessage */); 451 return false; 452 } 453 454 455 queued_message* 456 XsiMessageQueue::Remove(long typeRequested) 457 { 458 queued_message *message = NULL; 459 if (typeRequested < 0) { 460 // Return first message of the lowest type 461 // that is less than or equal to the absolute 462 // value of type requested. 463 MessageQueue::Iterator iterator = fMessage.GetIterator(); 464 while (iterator.HasNext()) { 465 queued_message *current = iterator.Next(); 466 if (current->type <= -typeRequested) { 467 message = iterator.Remove(); 468 break; 469 } 470 } 471 } else if (typeRequested == 0) { 472 // Return the first message on the queue 473 message = fMessage.RemoveHead(); 474 } else { 475 // Return the first message of type requested 476 MessageQueue::Iterator iterator = fMessage.GetIterator(); 477 while (iterator.HasNext()) { 478 queued_message *current = iterator.Next(); 479 if (current->type == typeRequested) { 480 message = iterator.Remove(); 481 break; 482 } 483 } 484 } 485 486 if (message == NULL) 487 return NULL; 488 489 fMessageQueue.msg_qnum--; 490 fMessageQueue.msg_lrpid = getpid(); 491 fMessageQueue.msg_rtime = real_time_clock(); 492 fBytesInQueue -= message->length; 493 atomic_add(&sXsiMessageCount, -1); 494 if (fThreadsWaitingToSend) 495 WakeUpThread(false /* WaitForMessage */); 496 return message; 497 } 498 499 500 void 501 XsiMessageQueue::SetID() 502 { 503 fID = real_time_clock(); 504 // The lock is held before calling us 505 while (true) { 506 if (sMessageQueueHashTable.Lookup(fID) == NULL) 507 break; 508 fID++; 509 } 510 sGlobalSequenceNumber = (sGlobalSequenceNumber + 1) % UINT_MAX; 511 fSequenceNumber = sGlobalSequenceNumber; 512 } 513 514 515 // #pragma mark - Kernel exported API 516 517 518 void 519 xsi_msg_init() 520 { 521 // Initialize hash tables 522 status_t status = sIpcHashTable.Init(); 523 if (status != B_OK) 524 panic("xsi_msg_init() failed to initialize ipc hash table\n"); 525 status = sMessageQueueHashTable.Init(); 526 if (status != B_OK) 527 panic("xsi_msg_init() failed to initialize message queue hash table\n"); 528 529 mutex_init(&sIpcLock, "global POSIX message queue IPC table"); 530 mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table"); 531 } 532 533 534 // #pragma mark - Syscalls 535 536 537 int 538 _user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer) 539 { 540 TRACE(("xsi_msgctl: messageQueueID = %d, command = %d\n", messageQueueID, command)); 541 MutexLocker ipcHashLocker(sIpcLock); 542 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock); 543 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 544 if (messageQueue == NULL) { 545 TRACE_ERROR(("xsi_msgctl: message queue id %d not valid\n", messageQueueID)); 546 return EINVAL; 547 } 548 if (!IS_USER_ADDRESS(buffer)) { 549 TRACE_ERROR(("xsi_msgctl: buffer address is not valid\n")); 550 return B_BAD_ADDRESS; 551 } 552 553 // Lock the message queue itself and release both the ipc hash table lock 554 // and the message queue hash table lock _only_ if the command it's not 555 // IPC_RMID, this prevents undesidered situation from happening while 556 // (hopefully) improving the concurrency. 557 MutexLocker messageQueueLocker; 558 if (command != IPC_RMID) { 559 messageQueueLocker.SetTo(&messageQueue->Lock(), false); 560 messageQueueHashLocker.Unlock(); 561 ipcHashLocker.Unlock(); 562 } else 563 // Since we are going to delete the message queue object 564 // along with its mutex, we can't use a MutexLocker object, 565 // as the mutex itself won't exist on function exit 566 mutex_lock(&messageQueue->Lock()); 567 568 switch (command) { 569 case IPC_STAT: { 570 if (!messageQueue->HasReadPermission()) { 571 TRACE_ERROR(("xsi_msgctl: calling process has not read " 572 "permission on message queue %d, key %d\n", messageQueueID, 573 (int)messageQueue->IpcKey())); 574 return EACCES; 575 } 576 struct msqid_ds msg = messageQueue->GetMessageQueue(); 577 if (user_memcpy(buffer, &msg, sizeof(struct msqid_ds)) < B_OK) { 578 TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n")); 579 return B_BAD_ADDRESS; 580 } 581 break; 582 } 583 584 case IPC_SET: { 585 if (!messageQueue->HasPermission()) { 586 TRACE_ERROR(("xsi_msgctl: calling process has not permission " 587 "on message queue %d, key %d\n", messageQueueID, 588 (int)messageQueue->IpcKey())); 589 return EPERM; 590 } 591 struct msqid_ds msg; 592 if (user_memcpy(&msg, buffer, sizeof(struct msqid_ds)) < B_OK) { 593 TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n")); 594 return B_BAD_ADDRESS; 595 } 596 if (msg.msg_qbytes > messageQueue->MaxBytes() && getuid() != 0) { 597 TRACE_ERROR(("xsi_msgctl: user does not have permission to " 598 "increase the maximum number of bytes allowed on queue\n")); 599 return EPERM; 600 } 601 if (msg.msg_qbytes == 0) { 602 TRACE_ERROR(("xsi_msgctl: can't set msg_qbytes to 0!\n")); 603 return EINVAL; 604 } 605 606 messageQueue->DoIpcSet(&msg); 607 break; 608 } 609 610 case IPC_RMID: { 611 // If this was the command, we are still holding the message 612 // queue hash table lock along with the ipc one, but not the 613 // message queue lock itself. This prevents other process 614 // to try and acquire a destroyed mutex 615 if (!messageQueue->HasPermission()) { 616 TRACE_ERROR(("xsi_msgctl: calling process has not permission " 617 "on message queue %d, key %d\n", messageQueueID, 618 (int)messageQueue->IpcKey())); 619 return EPERM; 620 } 621 key_t key = messageQueue->IpcKey(); 622 Ipc *ipcKey = NULL; 623 if (key != -1) { 624 ipcKey = sIpcHashTable.Lookup(key); 625 sIpcHashTable.Remove(ipcKey); 626 } 627 sMessageQueueHashTable.Remove(messageQueue); 628 // Wake up of any threads waiting on this 629 // queue happens in destructor 630 if (key != -1) 631 delete ipcKey; 632 atomic_add(&sXsiMessageQueueCount, -1); 633 634 delete messageQueue; 635 break; 636 } 637 638 default: 639 TRACE_ERROR(("xsi_semctl: command %d not valid\n", command)); 640 return EINVAL; 641 } 642 643 return B_OK; 644 } 645 646 647 int 648 _user_xsi_msgget(key_t key, int flags) 649 { 650 TRACE(("xsi_msgget: key = %d, flags = %d\n", (int)key, flags)); 651 XsiMessageQueue *messageQueue = NULL; 652 Ipc *ipcKey = NULL; 653 // Default assumptions 654 bool isPrivate = true; 655 bool create = true; 656 657 if (key != IPC_PRIVATE) { 658 isPrivate = false; 659 // Check if key already exist, if it does it already has a message 660 // queue associated with it 661 ipcKey = sIpcHashTable.Lookup(key); 662 if (ipcKey == NULL) { 663 if (!(flags & IPC_CREAT)) { 664 TRACE_ERROR(("xsi_msgget: key %d does not exist, but the " 665 "caller did not ask for creation\n", (int)key)); 666 return ENOENT; 667 } 668 ipcKey = new(std::nothrow) Ipc(key); 669 if (ipcKey == NULL) { 670 TRACE_ERROR(("xsi_msgget: failed to create new Ipc object " 671 "for key %d\n", (int)key)); 672 return ENOMEM; 673 } 674 sIpcHashTable.Insert(ipcKey); 675 } else { 676 // The IPC key exist and it already has a message queue 677 if ((flags & IPC_CREAT) && (flags & IPC_EXCL)) { 678 TRACE_ERROR(("xsi_msgget: key %d already exist\n", (int)key)); 679 return EEXIST; 680 } 681 int messageQueueID = ipcKey->MessageQueueID(); 682 683 MutexLocker _(sXsiMessageQueueLock); 684 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 685 if (!messageQueue->HasPermission()) { 686 TRACE_ERROR(("xsi_msgget: calling process has not permission " 687 "on message queue %d, key %d\n", messageQueue->ID(), 688 (int)key)); 689 return EACCES; 690 } 691 create = false; 692 } 693 } 694 695 if (create) { 696 // Create a new message queue for this key 697 if (sXsiMessageQueueCount >= MAX_XSI_MESSAGE_QUEUE) { 698 TRACE_ERROR(("xsi_msgget: reached limit of maximun number of " 699 "message queues\n")); 700 return ENOSPC; 701 } 702 703 messageQueue = new(std::nothrow) XsiMessageQueue(flags); 704 if (messageQueue == NULL) { 705 TRACE_ERROR(("xsi_msgget: failed to allocate new xsi " 706 "message queue\n")); 707 return ENOMEM; 708 } 709 atomic_add(&sXsiMessageQueueCount, 1); 710 711 MutexLocker _(sXsiMessageQueueLock); 712 messageQueue->SetID(); 713 if (isPrivate) 714 messageQueue->SetIpcKey((key_t)-1); 715 else { 716 messageQueue->SetIpcKey(key); 717 ipcKey->SetMessageQueueID(messageQueue); 718 } 719 sMessageQueueHashTable.Insert(messageQueue); 720 } 721 722 return messageQueue->ID(); 723 } 724 725 726 ssize_t 727 _user_xsi_msgrcv(int messageQueueID, void *messagePointer, 728 size_t messageSize, long messageType, int messageFlags) 729 { 730 TRACE(("xsi_msgrcv: messageQueueID = %d, messageSize = %ld\n", 731 messageQueueID, messageSize)); 732 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock); 733 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 734 if (messageQueue == NULL) { 735 TRACE_ERROR(("xsi_msgrcv: message queue id %d not valid\n", 736 messageQueueID)); 737 return EINVAL; 738 } 739 MutexLocker messageQueueLocker(messageQueue->Lock()); 740 messageQueueHashLocker.Unlock(); 741 742 if (messageSize > MAX_BYTES_PER_QUEUE) { 743 TRACE_ERROR(("xsi_msgrcv: message size is out of range\n")); 744 return EINVAL; 745 } 746 if (!messageQueue->HasPermission()) { 747 TRACE_ERROR(("xsi_msgrcv: calling process has not permission " 748 "on message queue id %d, key %d\n", messageQueueID, 749 (int)messageQueue->IpcKey())); 750 return EACCES; 751 } 752 if (!IS_USER_ADDRESS(messagePointer)) { 753 TRACE_ERROR(("xsi_msgrcv: message address is not valid\n")); 754 return B_BAD_ADDRESS; 755 } 756 757 queued_message *message = NULL; 758 while (true) { 759 message = messageQueue->Remove(messageType); 760 761 if (message == NULL && !(messageFlags & IPC_NOWAIT)) { 762 // We are going to sleep 763 Thread *thread = thread_get_current_thread(); 764 queued_thread queueEntry(thread, messageSize); 765 messageQueue->Enqueue(&queueEntry, /* waitForMessage */ true); 766 767 uint32 sequenceNumber = messageQueue->SequenceNumber(); 768 769 TRACE(("xsi_msgrcv: thread %d going to sleep\n", (int)thread->id)); 770 status_t result 771 = messageQueue->BlockAndUnlock(thread, &messageQueueLocker); 772 TRACE(("xsi_msgrcv: thread %d back to life\n", (int)thread->id)); 773 774 messageQueueHashLocker.Lock(); 775 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 776 if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL 777 && sequenceNumber != messageQueue->SequenceNumber())) { 778 TRACE_ERROR(("xsi_msgrcv: message queue id %d (sequence = " 779 "%" B_PRIu32 ") got destroyed\n", messageQueueID, 780 sequenceNumber)); 781 return EIDRM; 782 } else if (result == B_INTERRUPTED) { 783 TRACE_ERROR(("xsi_msgrcv: thread %d got interrupted while " 784 "waiting on message queue %d\n",(int)thread->id, 785 messageQueueID)); 786 messageQueue->Deque(&queueEntry, /* waitForMessage */ true); 787 return EINTR; 788 } else { 789 messageQueueLocker.Lock(); 790 messageQueueHashLocker.Unlock(); 791 } 792 } else if (message == NULL) { 793 // There is not message of type requested and 794 // we can't wait 795 return ENOMSG; 796 } else { 797 // Message received correctly (so far) 798 if ((ssize_t)messageSize < message->length 799 && !(messageFlags & MSG_NOERROR)) { 800 TRACE_ERROR(("xsi_msgrcv: message too big!\n")); 801 // Put the message back inside. Since we hold the 802 // queue message lock, not one else could have filled 803 // up the queue meanwhile 804 messageQueue->Insert(message); 805 return E2BIG; 806 } 807 808 ssize_t result 809 = message->copy_to_user_buffer(messagePointer, messageSize); 810 if (result < 0) { 811 messageQueue->Insert(message); 812 return B_BAD_ADDRESS; 813 } 814 815 delete message; 816 TRACE(("xsi_msgrcv: message received correctly\n")); 817 return result; 818 } 819 } 820 821 return B_OK; 822 } 823 824 825 int 826 _user_xsi_msgsnd(int messageQueueID, const void *messagePointer, 827 size_t messageSize, int messageFlags) 828 { 829 TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %ld\n", 830 messageQueueID, messageSize)); 831 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock); 832 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 833 if (messageQueue == NULL) { 834 TRACE_ERROR(("xsi_msgsnd: message queue id %d not valid\n", 835 messageQueueID)); 836 return EINVAL; 837 } 838 MutexLocker messageQueueLocker(messageQueue->Lock()); 839 messageQueueHashLocker.Unlock(); 840 841 if (messageSize > MAX_BYTES_PER_QUEUE) { 842 TRACE_ERROR(("xsi_msgsnd: message size is out of range\n")); 843 return EINVAL; 844 } 845 if (!messageQueue->HasPermission()) { 846 TRACE_ERROR(("xsi_msgsnd: calling process has not permission " 847 "on message queue id %d, key %d\n", messageQueueID, 848 (int)messageQueue->IpcKey())); 849 return EACCES; 850 } 851 if (!IS_USER_ADDRESS(messagePointer)) { 852 TRACE_ERROR(("xsi_msgsnd: message address is not valid\n")); 853 return B_BAD_ADDRESS; 854 } 855 856 queued_message *message 857 = new(std::nothrow) queued_message(messagePointer, messageSize); 858 if (message == NULL || message->initOK != true) { 859 TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n")); 860 delete message; 861 return ENOMEM; 862 } 863 864 bool notSent = true; 865 status_t result = B_OK; 866 while (notSent) { 867 bool goToSleep = messageQueue->Insert(message); 868 869 if (goToSleep && !(messageFlags & IPC_NOWAIT)) { 870 // We are going to sleep 871 Thread *thread = thread_get_current_thread(); 872 queued_thread queueEntry(thread, messageSize); 873 messageQueue->Enqueue(&queueEntry, /* waitForMessage */ false); 874 875 uint32 sequenceNumber = messageQueue->SequenceNumber(); 876 877 TRACE(("xsi_msgsnd: thread %d going to sleep\n", (int)thread->id)); 878 result = messageQueue->BlockAndUnlock(thread, &messageQueueLocker); 879 TRACE(("xsi_msgsnd: thread %d back to life\n", (int)thread->id)); 880 881 messageQueueHashLocker.Lock(); 882 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 883 if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL 884 && sequenceNumber != messageQueue->SequenceNumber())) { 885 TRACE_ERROR(("xsi_msgsnd: message queue id %d (sequence = " 886 "%" B_PRIu32 ") got destroyed\n", messageQueueID, 887 sequenceNumber)); 888 delete message; 889 notSent = false; 890 result = EIDRM; 891 } else if (result == B_INTERRUPTED) { 892 TRACE_ERROR(("xsi_msgsnd: thread %d got interrupted while " 893 "waiting on message queue %d\n",(int)thread->id, 894 messageQueueID)); 895 messageQueue->Deque(&queueEntry, /* waitForMessage */ false); 896 delete message; 897 notSent = false; 898 result = EINTR; 899 } else { 900 messageQueueLocker.Lock(); 901 messageQueueHashLocker.Unlock(); 902 } 903 } else if (goToSleep) { 904 // We did not send the message and we can't wait 905 delete message; 906 notSent = false; 907 result = EAGAIN; 908 } else { 909 // Message delivered correctly 910 TRACE(("xsi_msgsnd: message sent correctly\n")); 911 notSent = false; 912 } 913 } 914 915 return result; 916 } 917