1 /* 2 * Copyright 2008-2011, Haiku, Inc. All rights reserved. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Salvatore Benedetto <salvatore.benedetto@gmail.com> 7 */ 8 9 #include <posix/xsi_message_queue.h> 10 11 #include <new> 12 13 #include <sys/ipc.h> 14 #include <sys/types.h> 15 16 #include <OS.h> 17 18 #include <kernel.h> 19 #include <syscall_restart.h> 20 21 #include <util/atomic.h> 22 #include <util/AutoLock.h> 23 #include <util/DoublyLinkedList.h> 24 #include <util/OpenHashTable.h> 25 26 27 #define TRACE_XSI_MSG_QUEUE 28 #ifdef TRACE_XSI_MSG_QUEUE 29 # define TRACE(x) dprintf x 30 # define TRACE_ERROR(x) dprintf x 31 #else 32 # define TRACE(x) /* nothing */ 33 # define TRACE_ERROR(x) dprintf x 34 #endif 35 36 // Queue for holding blocked threads 37 struct queued_thread : DoublyLinkedListLinkImpl<queued_thread> { 38 queued_thread(Thread *_thread, int32 _message_length) 39 : 40 thread(_thread), 41 message_length(_message_length), 42 queued(false) 43 { 44 } 45 46 Thread *thread; 47 int32 message_length; 48 bool queued; 49 }; 50 51 typedef DoublyLinkedList<queued_thread> ThreadQueue; 52 53 54 struct queued_message : DoublyLinkedListLinkImpl<queued_message> { 55 queued_message(const void *_message, ssize_t _length) 56 : 57 initOK(false), 58 length(_length) 59 { 60 message = (char *)malloc(sizeof(char) * _length); 61 if (message == NULL) 62 return; 63 64 if (user_memcpy(&type, _message, sizeof(long)) != B_OK 65 || user_memcpy(message, (void *)((char *)_message + sizeof(long)), 66 _length) != B_OK) { 67 free(message); 68 return; 69 } 70 initOK = true; 71 } 72 73 ~queued_message() 74 { 75 if (initOK) 76 free(message); 77 } 78 79 ssize_t copy_to_user_buffer(void *_message, ssize_t _length) 80 { 81 if (_length > length) 82 _length = length; 83 84 if (user_memcpy(_message, &type, sizeof(long)) != B_OK 85 || user_memcpy((void *)((char *)_message + sizeof(long)), message, 86 _length) != B_OK) 87 return B_ERROR; 88 return _length; 89 } 90 91 bool initOK; 92 ssize_t length; 93 char *message; 94 long type; 95 }; 96 97 typedef DoublyLinkedList<queued_message> MessageQueue; 98 99 // Arbitrary limit 100 #define MAX_BYTES_PER_QUEUE 2048 101 102 class XsiMessageQueue { 103 public: 104 XsiMessageQueue(int flags) 105 : 106 fBytesInQueue(0), 107 fThreadsWaitingToReceive(0), 108 fThreadsWaitingToSend(0) 109 { 110 mutex_init(&fLock, "XsiMessageQueue private mutex"); 111 SetIpcKey((key_t)-1); 112 SetPermissions(flags); 113 // Initialize all fields to zero 114 memset((void *)&fMessageQueue, 0, sizeof(struct msqid_ds)); 115 fMessageQueue.msg_ctime = (time_t)real_time_clock(); 116 fMessageQueue.msg_qbytes = MAX_BYTES_PER_QUEUE; 117 } 118 119 // Implemented after sXsiMessageCount is declared 120 ~XsiMessageQueue(); 121 122 status_t BlockAndUnlock(Thread *thread, MutexLocker *queueLocker) 123 { 124 thread_prepare_to_block(thread, B_CAN_INTERRUPT, 125 THREAD_BLOCK_TYPE_OTHER, (void*)"xsi message queue"); 126 // Unlock the queue before blocking 127 queueLocker->Unlock(); 128 129 InterruptsSpinLocker _(gThreadSpinlock); 130 // TODO: We've got a serious race condition: If BlockAndUnlock() returned due to 131 // interruption, we will still be queued. A WakeUpThread() at this point will 132 // call thread_unblock() and might thus screw with our trying to re-lock the 133 // mutex. 134 return thread_block_locked(thread); 135 } 136 137 void DoIpcSet(struct msqid_ds *result) 138 { 139 fMessageQueue.msg_perm.uid = result->msg_perm.uid; 140 fMessageQueue.msg_perm.gid = result->msg_perm.gid; 141 fMessageQueue.msg_perm.mode = (fMessageQueue.msg_perm.mode & ~0x01ff) 142 | (result->msg_perm.mode & 0x01ff); 143 fMessageQueue.msg_qbytes = result->msg_qbytes; 144 fMessageQueue.msg_ctime = (time_t)real_time_clock(); 145 } 146 147 void Deque(queued_thread *queueEntry, bool waitForMessage) 148 { 149 if (queueEntry->queued) { 150 if (waitForMessage) { 151 fWaitingToReceive.Remove(queueEntry); 152 fThreadsWaitingToReceive--; 153 } else { 154 fWaitingToSend.Remove(queueEntry); 155 fThreadsWaitingToSend--; 156 } 157 } 158 } 159 160 void Enqueue(queued_thread *queueEntry, bool waitForMessage) 161 { 162 if (waitForMessage) { 163 fWaitingToReceive.Add(queueEntry); 164 fThreadsWaitingToReceive++; 165 } else { 166 fWaitingToSend.Add(queueEntry); 167 fThreadsWaitingToSend++; 168 } 169 queueEntry->queued = true; 170 } 171 172 struct msqid_ds &GetMessageQueue() 173 { 174 return fMessageQueue; 175 } 176 177 bool HasPermission() const 178 { 179 if ((fMessageQueue.msg_perm.mode & S_IWOTH) != 0) 180 return true; 181 182 uid_t uid = geteuid(); 183 if (uid == 0 || (uid == fMessageQueue.msg_perm.uid 184 && (fMessageQueue.msg_perm.mode & S_IWUSR) != 0)) 185 return true; 186 187 gid_t gid = getegid(); 188 if (gid == fMessageQueue.msg_perm.gid 189 && (fMessageQueue.msg_perm.mode & S_IWGRP) != 0) 190 return true; 191 192 return false; 193 } 194 195 bool HasReadPermission() const 196 { 197 // TODO: fix this 198 return HasPermission(); 199 } 200 201 int ID() const 202 { 203 return fID; 204 } 205 206 // Implemented after sXsiMessageCount is declared 207 bool Insert(queued_message *message); 208 209 key_t IpcKey() const 210 { 211 return fMessageQueue.msg_perm.key; 212 } 213 214 mutex &Lock() 215 { 216 return fLock; 217 } 218 219 msglen_t MaxBytes() const 220 { 221 return fMessageQueue.msg_qbytes; 222 } 223 224 // Implemented after sXsiMessageCount is declared 225 queued_message *Remove(long typeRequested); 226 227 uint32 SequenceNumber() const 228 { 229 return fSequenceNumber; 230 } 231 232 // Implemented after sMessageQueueHashTable is declared 233 void SetID(); 234 235 void SetIpcKey(key_t key) 236 { 237 fMessageQueue.msg_perm.key = key; 238 } 239 240 void SetPermissions(int flags) 241 { 242 fMessageQueue.msg_perm.uid = fMessageQueue.msg_perm.cuid = geteuid(); 243 fMessageQueue.msg_perm.gid = fMessageQueue.msg_perm.cgid = getegid(); 244 fMessageQueue.msg_perm.mode = (flags & 0x01ff); 245 } 246 247 void WakeUpThread(bool waitForMessage) 248 { 249 InterruptsSpinLocker _(gThreadSpinlock); 250 if (waitForMessage) { 251 // Wake up all waiting thread for a message 252 // TODO: this can cause starvation for any 253 // very-unlucky-and-slow thread 254 while (queued_thread *entry = fWaitingToReceive.RemoveHead()) { 255 entry->queued = false; 256 fThreadsWaitingToReceive--; 257 thread_unblock_locked(entry->thread, 0); 258 } 259 } else { 260 // Wake up only one thread waiting to send 261 if (queued_thread *entry = fWaitingToSend.RemoveHead()) { 262 entry->queued = false; 263 fThreadsWaitingToSend--; 264 thread_unblock_locked(entry->thread, 0); 265 } 266 } 267 } 268 269 XsiMessageQueue*& Link() 270 { 271 return fLink; 272 } 273 274 private: 275 msglen_t fBytesInQueue; 276 int fID; 277 mutex fLock; 278 MessageQueue fMessage; 279 struct msqid_ds fMessageQueue; 280 uint32 fSequenceNumber; 281 uint32 fThreadsWaitingToReceive; 282 uint32 fThreadsWaitingToSend; 283 284 ThreadQueue fWaitingToReceive; 285 ThreadQueue fWaitingToSend; 286 287 XsiMessageQueue* fLink; 288 }; 289 290 291 // Xsi message queue hash table 292 struct MessageQueueHashTableDefinition { 293 typedef int KeyType; 294 typedef XsiMessageQueue ValueType; 295 296 size_t HashKey (const int key) const 297 { 298 return (size_t)key; 299 } 300 301 size_t Hash(XsiMessageQueue *variable) const 302 { 303 return (size_t)variable->ID(); 304 } 305 306 bool Compare(const int key, XsiMessageQueue *variable) const 307 { 308 return (int)key == (int)variable->ID(); 309 } 310 311 XsiMessageQueue*& GetLink(XsiMessageQueue *variable) const 312 { 313 return variable->Link(); 314 } 315 }; 316 317 318 // IPC class 319 class Ipc { 320 public: 321 Ipc(key_t key) 322 : fKey(key), 323 fMessageQueueId(-1) 324 { 325 } 326 327 key_t Key() const 328 { 329 return fKey; 330 } 331 332 int MessageQueueID() const 333 { 334 return fMessageQueueId; 335 } 336 337 void SetMessageQueueID(XsiMessageQueue *messageQueue) 338 { 339 fMessageQueueId = messageQueue->ID(); 340 } 341 342 Ipc*& Link() 343 { 344 return fLink; 345 } 346 347 private: 348 key_t fKey; 349 int fMessageQueueId; 350 Ipc* fLink; 351 }; 352 353 354 struct IpcHashTableDefinition { 355 typedef key_t KeyType; 356 typedef Ipc ValueType; 357 358 size_t HashKey (const key_t key) const 359 { 360 return (size_t)(key); 361 } 362 363 size_t Hash(Ipc *variable) const 364 { 365 return (size_t)HashKey(variable->Key()); 366 } 367 368 bool Compare(const key_t key, Ipc *variable) const 369 { 370 return (key_t)key == (key_t)variable->Key(); 371 } 372 373 Ipc*& GetLink(Ipc *variable) const 374 { 375 return variable->Link(); 376 } 377 }; 378 379 // Arbitrary limits 380 #define MAX_XSI_MESSAGE 4096 381 #define MAX_XSI_MESSAGE_QUEUE 1024 382 static BOpenHashTable<IpcHashTableDefinition> sIpcHashTable; 383 static BOpenHashTable<MessageQueueHashTableDefinition> sMessageQueueHashTable; 384 385 static mutex sIpcLock; 386 static mutex sXsiMessageQueueLock; 387 388 static uint32 sGlobalSequenceNumber = 1; 389 static vint32 sXsiMessageCount = 0; 390 static vint32 sXsiMessageQueueCount = 0; 391 392 393 // #pragma mark - 394 395 396 XsiMessageQueue::~XsiMessageQueue() 397 { 398 mutex_destroy(&fLock); 399 400 // Wake up any threads still waiting 401 if (fThreadsWaitingToSend || fThreadsWaitingToReceive) { 402 InterruptsSpinLocker _(gThreadSpinlock); 403 while (queued_thread *entry = fWaitingToReceive.RemoveHead()) { 404 entry->queued = false; 405 thread_unblock_locked(entry->thread, EIDRM); 406 } 407 while (queued_thread *entry = fWaitingToSend.RemoveHead()) { 408 entry->queued = false; 409 thread_unblock_locked(entry->thread, EIDRM); 410 } 411 } 412 413 // Free up any remaining messages 414 if (fMessageQueue.msg_qnum) { 415 while (queued_message *message = fMessage.RemoveHead()) { 416 atomic_add(&sXsiMessageCount, -1); 417 delete message; 418 } 419 } 420 } 421 422 423 bool 424 XsiMessageQueue::Insert(queued_message *message) 425 { 426 // The only situation that would make us (potentially) wait 427 // is that we exceed with bytes or with the total number of messages 428 if (fBytesInQueue + message->length > fMessageQueue.msg_qbytes) 429 return true; 430 431 while (true) { 432 int32 oldCount = atomic_get(&sXsiMessageCount); 433 if (oldCount >= MAX_XSI_MESSAGE) 434 return true; 435 // If another thread updates the counter we keep 436 // iterating 437 if (atomic_test_and_set(&sXsiMessageCount, oldCount + 1, oldCount) 438 == oldCount) 439 break; 440 } 441 442 fMessage.Add(message); 443 fMessageQueue.msg_qnum++; 444 fMessageQueue.msg_lspid = getpid(); 445 fMessageQueue.msg_stime = real_time_clock(); 446 fBytesInQueue += message->length; 447 if (fThreadsWaitingToReceive) 448 WakeUpThread(true /* WaitForMessage */); 449 return false; 450 } 451 452 453 queued_message* 454 XsiMessageQueue::Remove(long typeRequested) 455 { 456 queued_message *message = NULL; 457 if (typeRequested < 0) { 458 // Return first message of the lowest type 459 // that is less than or equal to the absolute 460 // value of type requested. 461 MessageQueue::Iterator iterator = fMessage.GetIterator(); 462 while (iterator.HasNext()) { 463 queued_message *current = iterator.Next(); 464 if (current->type <= -typeRequested) { 465 message = iterator.Remove(); 466 break; 467 } 468 } 469 } else if (typeRequested == 0) { 470 // Return the first message on the queue 471 message = fMessage.RemoveHead(); 472 } else { 473 // Return the first message of type requested 474 MessageQueue::Iterator iterator = fMessage.GetIterator(); 475 while (iterator.HasNext()) { 476 queued_message *current = iterator.Next(); 477 if (current->type == typeRequested) { 478 message = iterator.Remove(); 479 break; 480 } 481 } 482 } 483 484 if (message == NULL) 485 return NULL; 486 487 fMessageQueue.msg_qnum--; 488 fMessageQueue.msg_lrpid = getpid(); 489 fMessageQueue.msg_rtime = real_time_clock(); 490 fBytesInQueue -= message->length; 491 atomic_add(&sXsiMessageCount, -1); 492 if (fThreadsWaitingToSend) 493 WakeUpThread(false /* WaitForMessage */); 494 return message; 495 } 496 497 498 void 499 XsiMessageQueue::SetID() 500 { 501 fID = real_time_clock(); 502 // The lock is held before calling us 503 while (true) { 504 if (sMessageQueueHashTable.Lookup(fID) == NULL) 505 break; 506 fID++; 507 } 508 sGlobalSequenceNumber = (sGlobalSequenceNumber + 1) % UINT_MAX; 509 fSequenceNumber = sGlobalSequenceNumber; 510 } 511 512 513 // #pragma mark - Kernel exported API 514 515 516 void 517 xsi_msg_init() 518 { 519 // Initialize hash tables 520 status_t status = sIpcHashTable.Init(); 521 if (status != B_OK) 522 panic("xsi_msg_init() failed to initialize ipc hash table\n"); 523 status = sMessageQueueHashTable.Init(); 524 if (status != B_OK) 525 panic("xsi_msg_init() failed to initialize message queue hash table\n"); 526 527 mutex_init(&sIpcLock, "global POSIX message queue IPC table"); 528 mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table"); 529 } 530 531 532 // #pragma mark - Syscalls 533 534 535 int 536 _user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer) 537 { 538 TRACE(("xsi_msgctl: messageQueueID = %d, command = %d\n", messageQueueID, command)); 539 MutexLocker ipcHashLocker(sIpcLock); 540 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock); 541 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 542 if (messageQueue == NULL) { 543 TRACE_ERROR(("xsi_msgctl: message queue id %d not valid\n", messageQueueID)); 544 return EINVAL; 545 } 546 if (!IS_USER_ADDRESS(buffer)) { 547 TRACE_ERROR(("xsi_msgctl: buffer address is not valid\n")); 548 return B_BAD_ADDRESS; 549 } 550 551 // Lock the message queue itself and release both the ipc hash table lock 552 // and the message queue hash table lock _only_ if the command it's not 553 // IPC_RMID, this prevents undesidered situation from happening while 554 // (hopefully) improving the concurrency. 555 MutexLocker messageQueueLocker; 556 if (command != IPC_RMID) { 557 messageQueueLocker.SetTo(&messageQueue->Lock(), false); 558 messageQueueHashLocker.Unlock(); 559 ipcHashLocker.Unlock(); 560 } else 561 // Since we are going to delete the message queue object 562 // along with its mutex, we can't use a MutexLocker object, 563 // as the mutex itself won't exist on function exit 564 mutex_lock(&messageQueue->Lock()); 565 566 switch (command) { 567 case IPC_STAT: { 568 if (!messageQueue->HasReadPermission()) { 569 TRACE_ERROR(("xsi_msgctl: calling process has not read " 570 "permission on message queue %d, key %d\n", messageQueueID, 571 (int)messageQueue->IpcKey())); 572 return EACCES; 573 } 574 struct msqid_ds msg = messageQueue->GetMessageQueue(); 575 if (user_memcpy(buffer, &msg, sizeof(struct msqid_ds)) < B_OK) { 576 TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n")); 577 return B_BAD_ADDRESS; 578 } 579 break; 580 } 581 582 case IPC_SET: { 583 if (!messageQueue->HasPermission()) { 584 TRACE_ERROR(("xsi_msgctl: calling process has not permission " 585 "on message queue %d, key %d\n", messageQueueID, 586 (int)messageQueue->IpcKey())); 587 return EPERM; 588 } 589 struct msqid_ds msg; 590 if (user_memcpy(&msg, buffer, sizeof(struct msqid_ds)) < B_OK) { 591 TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n")); 592 return B_BAD_ADDRESS; 593 } 594 if (msg.msg_qbytes > messageQueue->MaxBytes() && getuid() != 0) { 595 TRACE_ERROR(("xsi_msgctl: user does not have permission to " 596 "increase the maximum number of bytes allowed on queue\n")); 597 return EPERM; 598 } 599 if (msg.msg_qbytes == 0) { 600 TRACE_ERROR(("xsi_msgctl: can't set msg_qbytes to 0!\n")); 601 return EINVAL; 602 } 603 604 messageQueue->DoIpcSet(&msg); 605 break; 606 } 607 608 case IPC_RMID: { 609 // If this was the command, we are still holding the message 610 // queue hash table lock along with the ipc one, but not the 611 // message queue lock itself. This prevents other process 612 // to try and acquire a destroyed mutex 613 if (!messageQueue->HasPermission()) { 614 TRACE_ERROR(("xsi_msgctl: calling process has not permission " 615 "on message queue %d, key %d\n", messageQueueID, 616 (int)messageQueue->IpcKey())); 617 return EPERM; 618 } 619 key_t key = messageQueue->IpcKey(); 620 Ipc *ipcKey = NULL; 621 if (key != -1) { 622 ipcKey = sIpcHashTable.Lookup(key); 623 sIpcHashTable.Remove(ipcKey); 624 } 625 sMessageQueueHashTable.Remove(messageQueue); 626 // Wake up of any threads waiting on this 627 // queue happens in destructor 628 if (key != -1) 629 delete ipcKey; 630 atomic_add(&sXsiMessageQueueCount, -1); 631 632 delete messageQueue; 633 break; 634 } 635 636 default: 637 TRACE_ERROR(("xsi_semctl: command %d not valid\n", command)); 638 return EINVAL; 639 } 640 641 return B_OK; 642 } 643 644 645 int 646 _user_xsi_msgget(key_t key, int flags) 647 { 648 TRACE(("xsi_msgget: key = %d, flags = %d\n", (int)key, flags)); 649 XsiMessageQueue *messageQueue = NULL; 650 Ipc *ipcKey = NULL; 651 // Default assumptions 652 bool isPrivate = true; 653 bool create = true; 654 655 if (key != IPC_PRIVATE) { 656 isPrivate = false; 657 // Check if key already exist, if it does it already has a message 658 // queue associated with it 659 ipcKey = sIpcHashTable.Lookup(key); 660 if (ipcKey == NULL) { 661 if (!(flags & IPC_CREAT)) { 662 TRACE_ERROR(("xsi_msgget: key %d does not exist, but the " 663 "caller did not ask for creation\n", (int)key)); 664 return ENOENT; 665 } 666 ipcKey = new(std::nothrow) Ipc(key); 667 if (ipcKey == NULL) { 668 TRACE_ERROR(("xsi_msgget: failed to create new Ipc object " 669 "for key %d\n", (int)key)); 670 return ENOMEM; 671 } 672 sIpcHashTable.Insert(ipcKey); 673 } else { 674 // The IPC key exist and it already has a message queue 675 if ((flags & IPC_CREAT) && (flags & IPC_EXCL)) { 676 TRACE_ERROR(("xsi_msgget: key %d already exist\n", (int)key)); 677 return EEXIST; 678 } 679 int messageQueueID = ipcKey->MessageQueueID(); 680 681 MutexLocker _(sXsiMessageQueueLock); 682 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 683 if (!messageQueue->HasPermission()) { 684 TRACE_ERROR(("xsi_msgget: calling process has not permission " 685 "on message queue %d, key %d\n", messageQueue->ID(), 686 (int)key)); 687 return EACCES; 688 } 689 create = false; 690 } 691 } 692 693 if (create) { 694 // Create a new message queue for this key 695 if (sXsiMessageQueueCount >= MAX_XSI_MESSAGE_QUEUE) { 696 TRACE_ERROR(("xsi_msgget: reached limit of maximun number of " 697 "message queues\n")); 698 return ENOSPC; 699 } 700 701 messageQueue = new(std::nothrow) XsiMessageQueue(flags); 702 if (messageQueue == NULL) { 703 TRACE_ERROR(("xsi_msgget: failed to allocate new xsi " 704 "message queue\n")); 705 return ENOMEM; 706 } 707 atomic_add(&sXsiMessageQueueCount, 1); 708 709 MutexLocker _(sXsiMessageQueueLock); 710 messageQueue->SetID(); 711 if (isPrivate) 712 messageQueue->SetIpcKey((key_t)-1); 713 else { 714 messageQueue->SetIpcKey(key); 715 ipcKey->SetMessageQueueID(messageQueue); 716 } 717 sMessageQueueHashTable.Insert(messageQueue); 718 } 719 720 return messageQueue->ID(); 721 } 722 723 724 ssize_t 725 _user_xsi_msgrcv(int messageQueueID, void *messagePointer, 726 size_t messageSize, long messageType, int messageFlags) 727 { 728 TRACE(("xsi_msgrcv: messageQueueID = %d, messageSize = %ld\n", 729 messageQueueID, messageSize)); 730 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock); 731 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 732 if (messageQueue == NULL) { 733 TRACE_ERROR(("xsi_msgrcv: message queue id %d not valid\n", 734 messageQueueID)); 735 return EINVAL; 736 } 737 MutexLocker messageQueueLocker(messageQueue->Lock()); 738 messageQueueHashLocker.Unlock(); 739 740 if (messageSize < 0 || messageSize > MAX_BYTES_PER_QUEUE) { 741 TRACE_ERROR(("xsi_msgrcv: message size is out of range\n")); 742 return EINVAL; 743 } 744 if (!messageQueue->HasPermission()) { 745 TRACE_ERROR(("xsi_msgrcv: calling process has not permission " 746 "on message queue id %d, key %d\n", messageQueueID, 747 (int)messageQueue->IpcKey())); 748 return EACCES; 749 } 750 if (!IS_USER_ADDRESS(messagePointer)) { 751 TRACE_ERROR(("xsi_msgrcv: message address is not valid\n")); 752 return B_BAD_ADDRESS; 753 } 754 755 queued_message *message = NULL; 756 while (true) { 757 message = messageQueue->Remove(messageType); 758 759 if (message == NULL && !(messageFlags & IPC_NOWAIT)) { 760 // We are going to sleep 761 Thread *thread = thread_get_current_thread(); 762 queued_thread queueEntry(thread, messageSize); 763 messageQueue->Enqueue(&queueEntry, /* waitForMessage */ true); 764 765 uint32 sequenceNumber = messageQueue->SequenceNumber(); 766 767 TRACE(("xsi_msgrcv: thread %d going to sleep\n", (int)thread->id)); 768 status_t result 769 = messageQueue->BlockAndUnlock(thread, &messageQueueLocker); 770 TRACE(("xsi_msgrcv: thread %d back to life\n", (int)thread->id)); 771 772 messageQueueHashLocker.Lock(); 773 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 774 if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL 775 && sequenceNumber != messageQueue->SequenceNumber())) { 776 TRACE_ERROR(("xsi_msgrcv: message queue id %d (sequence = %ld) " 777 "got destroyed\n", messageQueueID, sequenceNumber)); 778 return EIDRM; 779 } else if (result == B_INTERRUPTED) { 780 TRACE_ERROR(("xsi_msgrcv: thread %d got interrupted while " 781 "waiting on message queue %d\n",(int)thread->id, 782 messageQueueID)); 783 messageQueue->Deque(&queueEntry, /* waitForMessage */ true); 784 return EINTR; 785 } else { 786 messageQueueLocker.Lock(); 787 messageQueueHashLocker.Unlock(); 788 } 789 } else if (message == NULL) { 790 // There is not message of type requested and 791 // we can't wait 792 return ENOMSG; 793 } else { 794 // Message received correctly (so far) 795 if ((ssize_t)messageSize < message->length 796 && !(messageFlags & MSG_NOERROR)) { 797 TRACE_ERROR(("xsi_msgrcv: message too big!\n")); 798 // Put the message back inside. Since we hold the 799 // queue message lock, not one else could have filled 800 // up the queue meanwhile 801 messageQueue->Insert(message); 802 return E2BIG; 803 } 804 805 ssize_t result 806 = message->copy_to_user_buffer(messagePointer, messageSize); 807 if (result < 0) { 808 messageQueue->Insert(message); 809 return B_BAD_ADDRESS; 810 } 811 812 delete message; 813 TRACE(("xsi_msgrcv: message received correctly\n")); 814 return result; 815 } 816 } 817 818 return B_OK; 819 } 820 821 822 int 823 _user_xsi_msgsnd(int messageQueueID, const void *messagePointer, 824 size_t messageSize, int messageFlags) 825 { 826 TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %ld\n", 827 messageQueueID, messageSize)); 828 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock); 829 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 830 if (messageQueue == NULL) { 831 TRACE_ERROR(("xsi_msgsnd: message queue id %d not valid\n", 832 messageQueueID)); 833 return EINVAL; 834 } 835 MutexLocker messageQueueLocker(messageQueue->Lock()); 836 messageQueueHashLocker.Unlock(); 837 838 if (messageSize < 0 || messageSize > MAX_BYTES_PER_QUEUE) { 839 TRACE_ERROR(("xsi_msgsnd: message size is out of range\n")); 840 return EINVAL; 841 } 842 if (!messageQueue->HasPermission()) { 843 TRACE_ERROR(("xsi_msgsnd: calling process has not permission " 844 "on message queue id %d, key %d\n", messageQueueID, 845 (int)messageQueue->IpcKey())); 846 return EACCES; 847 } 848 if (!IS_USER_ADDRESS(messagePointer)) { 849 TRACE_ERROR(("xsi_msgsnd: message address is not valid\n")); 850 return B_BAD_ADDRESS; 851 } 852 853 queued_message *message 854 = new(std::nothrow) queued_message(messagePointer, messageSize); 855 if (message == NULL || message->initOK != true) { 856 TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n")); 857 delete message; 858 return ENOMEM; 859 } 860 861 bool notSent = true; 862 status_t result = B_OK; 863 while (notSent) { 864 bool goToSleep = messageQueue->Insert(message); 865 866 if (goToSleep && !(messageFlags & IPC_NOWAIT)) { 867 // We are going to sleep 868 Thread *thread = thread_get_current_thread(); 869 queued_thread queueEntry(thread, messageSize); 870 messageQueue->Enqueue(&queueEntry, /* waitForMessage */ false); 871 872 uint32 sequenceNumber = messageQueue->SequenceNumber(); 873 874 TRACE(("xsi_msgsnd: thread %d going to sleep\n", (int)thread->id)); 875 result = messageQueue->BlockAndUnlock(thread, &messageQueueLocker); 876 TRACE(("xsi_msgsnd: thread %d back to life\n", (int)thread->id)); 877 878 messageQueueHashLocker.Lock(); 879 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 880 if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL 881 && sequenceNumber != messageQueue->SequenceNumber())) { 882 TRACE_ERROR(("xsi_msgsnd: message queue id %d (sequence = %ld) " 883 "got destroyed\n", messageQueueID, sequenceNumber)); 884 delete message; 885 notSent = false; 886 result = EIDRM; 887 } else if (result == B_INTERRUPTED) { 888 TRACE_ERROR(("xsi_msgsnd: thread %d got interrupted while " 889 "waiting on message queue %d\n",(int)thread->id, 890 messageQueueID)); 891 messageQueue->Deque(&queueEntry, /* waitForMessage */ false); 892 delete message; 893 notSent = false; 894 result = EINTR; 895 } else { 896 messageQueueLocker.Lock(); 897 messageQueueHashLocker.Unlock(); 898 } 899 } else if (goToSleep) { 900 // We did not send the message and we can't wait 901 delete message; 902 notSent = false; 903 result = EAGAIN; 904 } else { 905 // Message delivered correctly 906 TRACE(("xsi_msgsnd: message sent correctly\n")); 907 notSent = false; 908 } 909 } 910 911 return result; 912 } 913