1 /* 2 * Copyright 2008-2011, Haiku, Inc. All rights reserved. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Salvatore Benedetto <salvatore.benedetto@gmail.com> 7 */ 8 9 #include <posix/xsi_message_queue.h> 10 11 #include <new> 12 13 #include <sys/ipc.h> 14 #include <sys/types.h> 15 16 #include <OS.h> 17 18 #include <kernel.h> 19 #include <syscall_restart.h> 20 21 #include <util/atomic.h> 22 #include <util/AutoLock.h> 23 #include <util/DoublyLinkedList.h> 24 #include <util/OpenHashTable.h> 25 26 27 #define TRACE_XSI_MSG_QUEUE 28 #ifdef TRACE_XSI_MSG_QUEUE 29 # define TRACE(x) dprintf x 30 # define TRACE_ERROR(x) dprintf x 31 #else 32 # define TRACE(x) /* nothing */ 33 # define TRACE_ERROR(x) dprintf x 34 #endif 35 36 37 namespace { 38 39 // Queue for holding blocked threads 40 struct queued_thread : DoublyLinkedListLinkImpl<queued_thread> { 41 queued_thread(Thread *_thread, int32 _message_length) 42 : 43 thread(_thread), 44 message_length(_message_length), 45 queued(false) 46 { 47 } 48 49 Thread *thread; 50 int32 message_length; 51 bool queued; 52 }; 53 54 typedef DoublyLinkedList<queued_thread> ThreadQueue; 55 56 57 struct queued_message : DoublyLinkedListLinkImpl<queued_message> { 58 queued_message(const void *_message, ssize_t _length) 59 : 60 initOK(false), 61 length(_length) 62 { 63 message = (char *)malloc(sizeof(char) * _length); 64 if (message == NULL) 65 return; 66 67 if (user_memcpy(&type, _message, sizeof(long)) != B_OK 68 || user_memcpy(message, (void *)((char *)_message + sizeof(long)), 69 _length) != B_OK) { 70 free(message); 71 return; 72 } 73 initOK = true; 74 } 75 76 ~queued_message() 77 { 78 if (initOK) 79 free(message); 80 } 81 82 ssize_t copy_to_user_buffer(void *_message, ssize_t _length) 83 { 84 if (_length > length) 85 _length = length; 86 87 if (user_memcpy(_message, &type, sizeof(long)) != B_OK 88 || user_memcpy((void *)((char *)_message + sizeof(long)), message, 89 _length) != B_OK) 90 return B_ERROR; 91 return _length; 92 } 93 94 bool initOK; 95 ssize_t length; 96 char *message; 97 long type; 98 }; 99 100 typedef DoublyLinkedList<queued_message> MessageQueue; 101 102 // Arbitrary limit 103 #define MAX_BYTES_PER_QUEUE 2048 104 105 class XsiMessageQueue { 106 public: 107 XsiMessageQueue(int flags) 108 : 109 fBytesInQueue(0), 110 fThreadsWaitingToReceive(0), 111 fThreadsWaitingToSend(0) 112 { 113 mutex_init(&fLock, "XsiMessageQueue private mutex"); 114 SetIpcKey((key_t)-1); 115 SetPermissions(flags); 116 // Initialize all fields to zero 117 memset((void *)&fMessageQueue, 0, sizeof(struct msqid_ds)); 118 fMessageQueue.msg_ctime = (time_t)real_time_clock(); 119 fMessageQueue.msg_qbytes = MAX_BYTES_PER_QUEUE; 120 } 121 122 // Implemented after sXsiMessageCount is declared 123 ~XsiMessageQueue(); 124 125 status_t BlockAndUnlock(Thread *thread, MutexLocker *queueLocker) 126 { 127 thread_prepare_to_block(thread, B_CAN_INTERRUPT, 128 THREAD_BLOCK_TYPE_OTHER, (void*)"xsi message queue"); 129 // Unlock the queue before blocking 130 queueLocker->Unlock(); 131 132 // TODO: We've got a serious race condition: If BlockAndUnlock() returned due to 133 // interruption, we will still be queued. A WakeUpThread() at this point will 134 // call thread_unblock() and might thus screw with our trying to re-lock the 135 // mutex. 136 return thread_block(); 137 } 138 139 void DoIpcSet(struct msqid_ds *result) 140 { 141 fMessageQueue.msg_perm.uid = result->msg_perm.uid; 142 fMessageQueue.msg_perm.gid = result->msg_perm.gid; 143 fMessageQueue.msg_perm.mode = (fMessageQueue.msg_perm.mode & ~0x01ff) 144 | (result->msg_perm.mode & 0x01ff); 145 fMessageQueue.msg_qbytes = result->msg_qbytes; 146 fMessageQueue.msg_ctime = (time_t)real_time_clock(); 147 } 148 149 void Deque(queued_thread *queueEntry, bool waitForMessage) 150 { 151 if (queueEntry->queued) { 152 if (waitForMessage) { 153 fWaitingToReceive.Remove(queueEntry); 154 fThreadsWaitingToReceive--; 155 } else { 156 fWaitingToSend.Remove(queueEntry); 157 fThreadsWaitingToSend--; 158 } 159 } 160 } 161 162 void Enqueue(queued_thread *queueEntry, bool waitForMessage) 163 { 164 if (waitForMessage) { 165 fWaitingToReceive.Add(queueEntry); 166 fThreadsWaitingToReceive++; 167 } else { 168 fWaitingToSend.Add(queueEntry); 169 fThreadsWaitingToSend++; 170 } 171 queueEntry->queued = true; 172 } 173 174 struct msqid_ds &GetMessageQueue() 175 { 176 return fMessageQueue; 177 } 178 179 bool HasPermission() const 180 { 181 if ((fMessageQueue.msg_perm.mode & S_IWOTH) != 0) 182 return true; 183 184 uid_t uid = geteuid(); 185 if (uid == 0 || (uid == fMessageQueue.msg_perm.uid 186 && (fMessageQueue.msg_perm.mode & S_IWUSR) != 0)) 187 return true; 188 189 gid_t gid = getegid(); 190 if (gid == fMessageQueue.msg_perm.gid 191 && (fMessageQueue.msg_perm.mode & S_IWGRP) != 0) 192 return true; 193 194 return false; 195 } 196 197 bool HasReadPermission() const 198 { 199 // TODO: fix this 200 return HasPermission(); 201 } 202 203 int ID() const 204 { 205 return fID; 206 } 207 208 // Implemented after sXsiMessageCount is declared 209 bool Insert(queued_message *message); 210 211 key_t IpcKey() const 212 { 213 return fMessageQueue.msg_perm.key; 214 } 215 216 mutex &Lock() 217 { 218 return fLock; 219 } 220 221 msglen_t MaxBytes() const 222 { 223 return fMessageQueue.msg_qbytes; 224 } 225 226 // Implemented after sXsiMessageCount is declared 227 queued_message *Remove(long typeRequested); 228 229 uint32 SequenceNumber() const 230 { 231 return fSequenceNumber; 232 } 233 234 // Implemented after sMessageQueueHashTable is declared 235 void SetID(); 236 237 void SetIpcKey(key_t key) 238 { 239 fMessageQueue.msg_perm.key = key; 240 } 241 242 void SetPermissions(int flags) 243 { 244 fMessageQueue.msg_perm.uid = fMessageQueue.msg_perm.cuid = geteuid(); 245 fMessageQueue.msg_perm.gid = fMessageQueue.msg_perm.cgid = getegid(); 246 fMessageQueue.msg_perm.mode = (flags & 0x01ff); 247 } 248 249 void WakeUpThread(bool waitForMessage) 250 { 251 if (waitForMessage) { 252 // Wake up all waiting thread for a message 253 // TODO: this can cause starvation for any 254 // very-unlucky-and-slow thread 255 while (queued_thread *entry = fWaitingToReceive.RemoveHead()) { 256 entry->queued = false; 257 fThreadsWaitingToReceive--; 258 thread_unblock(entry->thread, 0); 259 } 260 } else { 261 // Wake up only one thread waiting to send 262 if (queued_thread *entry = fWaitingToSend.RemoveHead()) { 263 entry->queued = false; 264 fThreadsWaitingToSend--; 265 thread_unblock(entry->thread, 0); 266 } 267 } 268 } 269 270 XsiMessageQueue*& Link() 271 { 272 return fLink; 273 } 274 275 private: 276 msglen_t fBytesInQueue; 277 int fID; 278 mutex fLock; 279 MessageQueue fMessage; 280 struct msqid_ds fMessageQueue; 281 uint32 fSequenceNumber; 282 uint32 fThreadsWaitingToReceive; 283 uint32 fThreadsWaitingToSend; 284 285 ThreadQueue fWaitingToReceive; 286 ThreadQueue fWaitingToSend; 287 288 XsiMessageQueue* fLink; 289 }; 290 291 292 // Xsi message queue hash table 293 struct MessageQueueHashTableDefinition { 294 typedef int KeyType; 295 typedef XsiMessageQueue ValueType; 296 297 size_t HashKey (const int key) const 298 { 299 return (size_t)key; 300 } 301 302 size_t Hash(XsiMessageQueue *variable) const 303 { 304 return (size_t)variable->ID(); 305 } 306 307 bool Compare(const int key, XsiMessageQueue *variable) const 308 { 309 return (int)key == (int)variable->ID(); 310 } 311 312 XsiMessageQueue*& GetLink(XsiMessageQueue *variable) const 313 { 314 return variable->Link(); 315 } 316 }; 317 318 319 // IPC class 320 class Ipc { 321 public: 322 Ipc(key_t key) 323 : fKey(key), 324 fMessageQueueId(-1) 325 { 326 } 327 328 key_t Key() const 329 { 330 return fKey; 331 } 332 333 int MessageQueueID() const 334 { 335 return fMessageQueueId; 336 } 337 338 void SetMessageQueueID(XsiMessageQueue *messageQueue) 339 { 340 fMessageQueueId = messageQueue->ID(); 341 } 342 343 Ipc*& Link() 344 { 345 return fLink; 346 } 347 348 private: 349 key_t fKey; 350 int fMessageQueueId; 351 Ipc* fLink; 352 }; 353 354 355 struct IpcHashTableDefinition { 356 typedef key_t KeyType; 357 typedef Ipc ValueType; 358 359 size_t HashKey (const key_t key) const 360 { 361 return (size_t)(key); 362 } 363 364 size_t Hash(Ipc *variable) const 365 { 366 return (size_t)HashKey(variable->Key()); 367 } 368 369 bool Compare(const key_t key, Ipc *variable) const 370 { 371 return (key_t)key == (key_t)variable->Key(); 372 } 373 374 Ipc*& GetLink(Ipc *variable) const 375 { 376 return variable->Link(); 377 } 378 }; 379 380 } // namespace 381 382 383 // Arbitrary limits 384 #define MAX_XSI_MESSAGE 4096 385 #define MAX_XSI_MESSAGE_QUEUE 1024 386 static BOpenHashTable<IpcHashTableDefinition> sIpcHashTable; 387 static BOpenHashTable<MessageQueueHashTableDefinition> sMessageQueueHashTable; 388 389 static mutex sIpcLock; 390 static mutex sXsiMessageQueueLock; 391 392 static uint32 sGlobalSequenceNumber = 1; 393 static int32 sXsiMessageCount = 0; 394 static int32 sXsiMessageQueueCount = 0; 395 396 397 // #pragma mark - 398 399 400 XsiMessageQueue::~XsiMessageQueue() 401 { 402 mutex_destroy(&fLock); 403 404 // Wake up any threads still waiting 405 if (fThreadsWaitingToSend || fThreadsWaitingToReceive) { 406 while (queued_thread *entry = fWaitingToReceive.RemoveHead()) { 407 entry->queued = false; 408 thread_unblock(entry->thread, EIDRM); 409 } 410 while (queued_thread *entry = fWaitingToSend.RemoveHead()) { 411 entry->queued = false; 412 thread_unblock(entry->thread, EIDRM); 413 } 414 } 415 416 // Free up any remaining messages 417 if (fMessageQueue.msg_qnum) { 418 while (queued_message *message = fMessage.RemoveHead()) { 419 atomic_add(&sXsiMessageCount, -1); 420 delete message; 421 } 422 } 423 } 424 425 426 bool 427 XsiMessageQueue::Insert(queued_message *message) 428 { 429 // The only situation that would make us (potentially) wait 430 // is that we exceed with bytes or with the total number of messages 431 if (fBytesInQueue + message->length > fMessageQueue.msg_qbytes) 432 return true; 433 434 while (true) { 435 int32 oldCount = atomic_get(&sXsiMessageCount); 436 if (oldCount >= MAX_XSI_MESSAGE) 437 return true; 438 // If another thread updates the counter we keep 439 // iterating 440 if (atomic_test_and_set(&sXsiMessageCount, oldCount + 1, oldCount) 441 == oldCount) 442 break; 443 } 444 445 fMessage.Add(message); 446 fMessageQueue.msg_qnum++; 447 fMessageQueue.msg_lspid = getpid(); 448 fMessageQueue.msg_stime = real_time_clock(); 449 fBytesInQueue += message->length; 450 if (fThreadsWaitingToReceive) 451 WakeUpThread(true /* WaitForMessage */); 452 return false; 453 } 454 455 456 queued_message* 457 XsiMessageQueue::Remove(long typeRequested) 458 { 459 queued_message *message = NULL; 460 if (typeRequested < 0) { 461 // Return first message of the lowest type 462 // that is less than or equal to the absolute 463 // value of type requested. 464 MessageQueue::Iterator iterator = fMessage.GetIterator(); 465 while (iterator.HasNext()) { 466 queued_message *current = iterator.Next(); 467 if (current->type <= -typeRequested) { 468 message = iterator.Remove(); 469 break; 470 } 471 } 472 } else if (typeRequested == 0) { 473 // Return the first message on the queue 474 message = fMessage.RemoveHead(); 475 } else { 476 // Return the first message of type requested 477 MessageQueue::Iterator iterator = fMessage.GetIterator(); 478 while (iterator.HasNext()) { 479 queued_message *current = iterator.Next(); 480 if (current->type == typeRequested) { 481 message = iterator.Remove(); 482 break; 483 } 484 } 485 } 486 487 if (message == NULL) 488 return NULL; 489 490 fMessageQueue.msg_qnum--; 491 fMessageQueue.msg_lrpid = getpid(); 492 fMessageQueue.msg_rtime = real_time_clock(); 493 fBytesInQueue -= message->length; 494 atomic_add(&sXsiMessageCount, -1); 495 if (fThreadsWaitingToSend) 496 WakeUpThread(false /* WaitForMessage */); 497 return message; 498 } 499 500 501 void 502 XsiMessageQueue::SetID() 503 { 504 fID = real_time_clock(); 505 // The lock is held before calling us 506 while (true) { 507 if (sMessageQueueHashTable.Lookup(fID) == NULL) 508 break; 509 fID++; 510 } 511 sGlobalSequenceNumber = (sGlobalSequenceNumber + 1) % UINT_MAX; 512 fSequenceNumber = sGlobalSequenceNumber; 513 } 514 515 516 // #pragma mark - Kernel exported API 517 518 519 void 520 xsi_msg_init() 521 { 522 // Initialize hash tables 523 status_t status = sIpcHashTable.Init(); 524 if (status != B_OK) 525 panic("xsi_msg_init() failed to initialize ipc hash table\n"); 526 status = sMessageQueueHashTable.Init(); 527 if (status != B_OK) 528 panic("xsi_msg_init() failed to initialize message queue hash table\n"); 529 530 mutex_init(&sIpcLock, "global POSIX message queue IPC table"); 531 mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table"); 532 } 533 534 535 // #pragma mark - Syscalls 536 537 538 int 539 _user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer) 540 { 541 TRACE(("xsi_msgctl: messageQueueID = %d, command = %d\n", messageQueueID, command)); 542 MutexLocker ipcHashLocker(sIpcLock); 543 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock); 544 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 545 if (messageQueue == NULL) { 546 TRACE_ERROR(("xsi_msgctl: message queue id %d not valid\n", messageQueueID)); 547 return EINVAL; 548 } 549 if (!IS_USER_ADDRESS(buffer)) { 550 TRACE_ERROR(("xsi_msgctl: buffer address is not valid\n")); 551 return B_BAD_ADDRESS; 552 } 553 554 // Lock the message queue itself and release both the ipc hash table lock 555 // and the message queue hash table lock _only_ if the command it's not 556 // IPC_RMID, this prevents undesidered situation from happening while 557 // (hopefully) improving the concurrency. 558 MutexLocker messageQueueLocker; 559 if (command != IPC_RMID) { 560 messageQueueLocker.SetTo(&messageQueue->Lock(), false); 561 messageQueueHashLocker.Unlock(); 562 ipcHashLocker.Unlock(); 563 } else 564 // Since we are going to delete the message queue object 565 // along with its mutex, we can't use a MutexLocker object, 566 // as the mutex itself won't exist on function exit 567 mutex_lock(&messageQueue->Lock()); 568 569 switch (command) { 570 case IPC_STAT: { 571 if (!messageQueue->HasReadPermission()) { 572 TRACE_ERROR(("xsi_msgctl: calling process has not read " 573 "permission on message queue %d, key %d\n", messageQueueID, 574 (int)messageQueue->IpcKey())); 575 return EACCES; 576 } 577 struct msqid_ds msg = messageQueue->GetMessageQueue(); 578 if (user_memcpy(buffer, &msg, sizeof(struct msqid_ds)) < B_OK) { 579 TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n")); 580 return B_BAD_ADDRESS; 581 } 582 break; 583 } 584 585 case IPC_SET: { 586 if (!messageQueue->HasPermission()) { 587 TRACE_ERROR(("xsi_msgctl: calling process has not permission " 588 "on message queue %d, key %d\n", messageQueueID, 589 (int)messageQueue->IpcKey())); 590 return EPERM; 591 } 592 struct msqid_ds msg; 593 if (user_memcpy(&msg, buffer, sizeof(struct msqid_ds)) < B_OK) { 594 TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n")); 595 return B_BAD_ADDRESS; 596 } 597 if (msg.msg_qbytes > messageQueue->MaxBytes() && getuid() != 0) { 598 TRACE_ERROR(("xsi_msgctl: user does not have permission to " 599 "increase the maximum number of bytes allowed on queue\n")); 600 return EPERM; 601 } 602 if (msg.msg_qbytes == 0) { 603 TRACE_ERROR(("xsi_msgctl: can't set msg_qbytes to 0!\n")); 604 return EINVAL; 605 } 606 607 messageQueue->DoIpcSet(&msg); 608 break; 609 } 610 611 case IPC_RMID: { 612 // If this was the command, we are still holding the message 613 // queue hash table lock along with the ipc one, but not the 614 // message queue lock itself. This prevents other process 615 // to try and acquire a destroyed mutex 616 if (!messageQueue->HasPermission()) { 617 TRACE_ERROR(("xsi_msgctl: calling process has not permission " 618 "on message queue %d, key %d\n", messageQueueID, 619 (int)messageQueue->IpcKey())); 620 return EPERM; 621 } 622 key_t key = messageQueue->IpcKey(); 623 Ipc *ipcKey = NULL; 624 if (key != -1) { 625 ipcKey = sIpcHashTable.Lookup(key); 626 sIpcHashTable.Remove(ipcKey); 627 } 628 sMessageQueueHashTable.Remove(messageQueue); 629 // Wake up of any threads waiting on this 630 // queue happens in destructor 631 if (key != -1) 632 delete ipcKey; 633 atomic_add(&sXsiMessageQueueCount, -1); 634 635 delete messageQueue; 636 break; 637 } 638 639 default: 640 TRACE_ERROR(("xsi_semctl: command %d not valid\n", command)); 641 return EINVAL; 642 } 643 644 return B_OK; 645 } 646 647 648 int 649 _user_xsi_msgget(key_t key, int flags) 650 { 651 TRACE(("xsi_msgget: key = %d, flags = %d\n", (int)key, flags)); 652 XsiMessageQueue *messageQueue = NULL; 653 Ipc *ipcKey = NULL; 654 // Default assumptions 655 bool isPrivate = true; 656 bool create = true; 657 658 if (key != IPC_PRIVATE) { 659 isPrivate = false; 660 // Check if key already exist, if it does it already has a message 661 // queue associated with it 662 ipcKey = sIpcHashTable.Lookup(key); 663 if (ipcKey == NULL) { 664 if (!(flags & IPC_CREAT)) { 665 TRACE_ERROR(("xsi_msgget: key %d does not exist, but the " 666 "caller did not ask for creation\n", (int)key)); 667 return ENOENT; 668 } 669 ipcKey = new(std::nothrow) Ipc(key); 670 if (ipcKey == NULL) { 671 TRACE_ERROR(("xsi_msgget: failed to create new Ipc object " 672 "for key %d\n", (int)key)); 673 return ENOMEM; 674 } 675 sIpcHashTable.Insert(ipcKey); 676 } else { 677 // The IPC key exist and it already has a message queue 678 if ((flags & IPC_CREAT) && (flags & IPC_EXCL)) { 679 TRACE_ERROR(("xsi_msgget: key %d already exist\n", (int)key)); 680 return EEXIST; 681 } 682 int messageQueueID = ipcKey->MessageQueueID(); 683 684 MutexLocker _(sXsiMessageQueueLock); 685 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 686 if (!messageQueue->HasPermission()) { 687 TRACE_ERROR(("xsi_msgget: calling process has not permission " 688 "on message queue %d, key %d\n", messageQueue->ID(), 689 (int)key)); 690 return EACCES; 691 } 692 create = false; 693 } 694 } 695 696 if (create) { 697 // Create a new message queue for this key 698 if (atomic_get(&sXsiMessageQueueCount) >= MAX_XSI_MESSAGE_QUEUE) { 699 TRACE_ERROR(("xsi_msgget: reached limit of maximun number of " 700 "message queues\n")); 701 return ENOSPC; 702 } 703 704 messageQueue = new(std::nothrow) XsiMessageQueue(flags); 705 if (messageQueue == NULL) { 706 TRACE_ERROR(("xsi_msgget: failed to allocate new xsi " 707 "message queue\n")); 708 return ENOMEM; 709 } 710 atomic_add(&sXsiMessageQueueCount, 1); 711 712 MutexLocker _(sXsiMessageQueueLock); 713 messageQueue->SetID(); 714 if (isPrivate) 715 messageQueue->SetIpcKey((key_t)-1); 716 else { 717 messageQueue->SetIpcKey(key); 718 ipcKey->SetMessageQueueID(messageQueue); 719 } 720 sMessageQueueHashTable.Insert(messageQueue); 721 } 722 723 return messageQueue->ID(); 724 } 725 726 727 ssize_t 728 _user_xsi_msgrcv(int messageQueueID, void *messagePointer, 729 size_t messageSize, long messageType, int messageFlags) 730 { 731 TRACE(("xsi_msgrcv: messageQueueID = %d, messageSize = %ld\n", 732 messageQueueID, messageSize)); 733 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock); 734 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 735 if (messageQueue == NULL) { 736 TRACE_ERROR(("xsi_msgrcv: message queue id %d not valid\n", 737 messageQueueID)); 738 return EINVAL; 739 } 740 MutexLocker messageQueueLocker(messageQueue->Lock()); 741 messageQueueHashLocker.Unlock(); 742 743 if (messageSize > MAX_BYTES_PER_QUEUE) { 744 TRACE_ERROR(("xsi_msgrcv: message size is out of range\n")); 745 return EINVAL; 746 } 747 if (!messageQueue->HasPermission()) { 748 TRACE_ERROR(("xsi_msgrcv: calling process has not permission " 749 "on message queue id %d, key %d\n", messageQueueID, 750 (int)messageQueue->IpcKey())); 751 return EACCES; 752 } 753 if (!IS_USER_ADDRESS(messagePointer)) { 754 TRACE_ERROR(("xsi_msgrcv: message address is not valid\n")); 755 return B_BAD_ADDRESS; 756 } 757 758 queued_message *message = NULL; 759 while (true) { 760 message = messageQueue->Remove(messageType); 761 762 if (message == NULL && !(messageFlags & IPC_NOWAIT)) { 763 // We are going to sleep 764 Thread *thread = thread_get_current_thread(); 765 queued_thread queueEntry(thread, messageSize); 766 messageQueue->Enqueue(&queueEntry, /* waitForMessage */ true); 767 768 uint32 sequenceNumber = messageQueue->SequenceNumber(); 769 770 TRACE(("xsi_msgrcv: thread %d going to sleep\n", (int)thread->id)); 771 status_t result 772 = messageQueue->BlockAndUnlock(thread, &messageQueueLocker); 773 TRACE(("xsi_msgrcv: thread %d back to life\n", (int)thread->id)); 774 775 messageQueueHashLocker.Lock(); 776 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 777 if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL 778 && sequenceNumber != messageQueue->SequenceNumber())) { 779 TRACE_ERROR(("xsi_msgrcv: message queue id %d (sequence = " 780 "%" B_PRIu32 ") got destroyed\n", messageQueueID, 781 sequenceNumber)); 782 return EIDRM; 783 } else if (result == B_INTERRUPTED) { 784 TRACE_ERROR(("xsi_msgrcv: thread %d got interrupted while " 785 "waiting on message queue %d\n",(int)thread->id, 786 messageQueueID)); 787 messageQueue->Deque(&queueEntry, /* waitForMessage */ true); 788 return EINTR; 789 } else { 790 messageQueueLocker.Lock(); 791 messageQueueHashLocker.Unlock(); 792 } 793 } else if (message == NULL) { 794 // There is not message of type requested and 795 // we can't wait 796 return ENOMSG; 797 } else { 798 // Message received correctly (so far) 799 if ((ssize_t)messageSize < message->length 800 && !(messageFlags & MSG_NOERROR)) { 801 TRACE_ERROR(("xsi_msgrcv: message too big!\n")); 802 // Put the message back inside. Since we hold the 803 // queue message lock, not one else could have filled 804 // up the queue meanwhile 805 messageQueue->Insert(message); 806 return E2BIG; 807 } 808 809 ssize_t result 810 = message->copy_to_user_buffer(messagePointer, messageSize); 811 if (result < 0) { 812 messageQueue->Insert(message); 813 return B_BAD_ADDRESS; 814 } 815 816 delete message; 817 TRACE(("xsi_msgrcv: message received correctly\n")); 818 return result; 819 } 820 } 821 822 return B_OK; 823 } 824 825 826 int 827 _user_xsi_msgsnd(int messageQueueID, const void *messagePointer, 828 size_t messageSize, int messageFlags) 829 { 830 TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %ld\n", 831 messageQueueID, messageSize)); 832 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock); 833 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 834 if (messageQueue == NULL) { 835 TRACE_ERROR(("xsi_msgsnd: message queue id %d not valid\n", 836 messageQueueID)); 837 return EINVAL; 838 } 839 MutexLocker messageQueueLocker(messageQueue->Lock()); 840 messageQueueHashLocker.Unlock(); 841 842 if (messageSize > MAX_BYTES_PER_QUEUE) { 843 TRACE_ERROR(("xsi_msgsnd: message size is out of range\n")); 844 return EINVAL; 845 } 846 if (!messageQueue->HasPermission()) { 847 TRACE_ERROR(("xsi_msgsnd: calling process has not permission " 848 "on message queue id %d, key %d\n", messageQueueID, 849 (int)messageQueue->IpcKey())); 850 return EACCES; 851 } 852 if (!IS_USER_ADDRESS(messagePointer)) { 853 TRACE_ERROR(("xsi_msgsnd: message address is not valid\n")); 854 return B_BAD_ADDRESS; 855 } 856 857 queued_message *message 858 = new(std::nothrow) queued_message(messagePointer, messageSize); 859 if (message == NULL || message->initOK != true) { 860 TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n")); 861 delete message; 862 return ENOMEM; 863 } 864 865 bool notSent = true; 866 status_t result = B_OK; 867 while (notSent) { 868 bool goToSleep = messageQueue->Insert(message); 869 870 if (goToSleep && !(messageFlags & IPC_NOWAIT)) { 871 // We are going to sleep 872 Thread *thread = thread_get_current_thread(); 873 queued_thread queueEntry(thread, messageSize); 874 messageQueue->Enqueue(&queueEntry, /* waitForMessage */ false); 875 876 uint32 sequenceNumber = messageQueue->SequenceNumber(); 877 878 TRACE(("xsi_msgsnd: thread %d going to sleep\n", (int)thread->id)); 879 result = messageQueue->BlockAndUnlock(thread, &messageQueueLocker); 880 TRACE(("xsi_msgsnd: thread %d back to life\n", (int)thread->id)); 881 882 messageQueueHashLocker.Lock(); 883 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 884 if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL 885 && sequenceNumber != messageQueue->SequenceNumber())) { 886 TRACE_ERROR(("xsi_msgsnd: message queue id %d (sequence = " 887 "%" B_PRIu32 ") got destroyed\n", messageQueueID, 888 sequenceNumber)); 889 delete message; 890 notSent = false; 891 result = EIDRM; 892 } else if (result == B_INTERRUPTED) { 893 TRACE_ERROR(("xsi_msgsnd: thread %d got interrupted while " 894 "waiting on message queue %d\n",(int)thread->id, 895 messageQueueID)); 896 messageQueue->Deque(&queueEntry, /* waitForMessage */ false); 897 delete message; 898 notSent = false; 899 result = EINTR; 900 } else { 901 messageQueueLocker.Lock(); 902 messageQueueHashLocker.Unlock(); 903 } 904 } else if (goToSleep) { 905 // We did not send the message and we can't wait 906 delete message; 907 notSent = false; 908 result = EAGAIN; 909 } else { 910 // Message delivered correctly 911 TRACE(("xsi_msgsnd: message sent correctly\n")); 912 notSent = false; 913 } 914 } 915 916 return result; 917 } 918