1 /* 2 * Copyright 2008-2011, Haiku, Inc. All rights reserved. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Salvatore Benedetto <salvatore.benedetto@gmail.com> 7 */ 8 9 #include <posix/xsi_message_queue.h> 10 11 #include <new> 12 13 #include <sys/ipc.h> 14 #include <sys/types.h> 15 16 #include <OS.h> 17 18 #include <kernel.h> 19 #include <syscall_restart.h> 20 21 #include <util/atomic.h> 22 #include <util/AutoLock.h> 23 #include <util/DoublyLinkedList.h> 24 #include <util/OpenHashTable.h> 25 26 27 #define TRACE_XSI_MSG_QUEUE 28 #ifdef TRACE_XSI_MSG_QUEUE 29 # define TRACE(x) dprintf x 30 # define TRACE_ERROR(x) dprintf x 31 #else 32 # define TRACE(x) /* nothing */ 33 # define TRACE_ERROR(x) dprintf x 34 #endif 35 36 // Queue for holding blocked threads 37 struct queued_thread : DoublyLinkedListLinkImpl<queued_thread> { 38 queued_thread(Thread *_thread, int32 _message_length) 39 : 40 thread(_thread), 41 message_length(_message_length), 42 queued(false) 43 { 44 } 45 46 Thread *thread; 47 int32 message_length; 48 bool queued; 49 }; 50 51 typedef DoublyLinkedList<queued_thread> ThreadQueue; 52 53 54 struct queued_message : DoublyLinkedListLinkImpl<queued_message> { 55 queued_message(const void *_message, ssize_t _length) 56 : 57 initOK(false), 58 length(_length) 59 { 60 message = (char *)malloc(sizeof(char) * _length); 61 if (message == NULL) 62 return; 63 64 if (user_memcpy(&type, _message, sizeof(long)) != B_OK 65 || user_memcpy(message, (void *)((char *)_message + sizeof(long)), 66 _length) != B_OK) { 67 free(message); 68 return; 69 } 70 initOK = true; 71 } 72 73 ~queued_message() 74 { 75 if (initOK) 76 free(message); 77 } 78 79 ssize_t copy_to_user_buffer(void *_message, ssize_t _length) 80 { 81 if (_length > length) 82 _length = length; 83 84 if (user_memcpy(_message, &type, sizeof(long)) != B_OK 85 || user_memcpy((void *)((char *)_message + sizeof(long)), message, 86 _length) != B_OK) 87 return B_ERROR; 88 return _length; 89 } 90 91 bool initOK; 92 ssize_t length; 93 char *message; 94 long type; 95 }; 96 97 typedef DoublyLinkedList<queued_message> MessageQueue; 98 99 // Arbitrary limit 100 #define MAX_BYTES_PER_QUEUE 2048 101 102 class XsiMessageQueue { 103 public: 104 XsiMessageQueue(int flags) 105 : 106 fBytesInQueue(0), 107 fThreadsWaitingToReceive(0), 108 fThreadsWaitingToSend(0) 109 { 110 mutex_init(&fLock, "XsiMessageQueue private mutex"); 111 SetIpcKey((key_t)-1); 112 SetPermissions(flags); 113 // Initialize all fields to zero 114 memset((void *)&fMessageQueue, 0, sizeof(struct msqid_ds)); 115 fMessageQueue.msg_ctime = (time_t)real_time_clock(); 116 fMessageQueue.msg_qbytes = MAX_BYTES_PER_QUEUE; 117 } 118 119 // Implemented after sXsiMessageCount is declared 120 ~XsiMessageQueue(); 121 122 status_t BlockAndUnlock(Thread *thread, MutexLocker *queueLocker) 123 { 124 thread_prepare_to_block(thread, B_CAN_INTERRUPT, 125 THREAD_BLOCK_TYPE_OTHER, (void*)"xsi message queue"); 126 // Unlock the queue before blocking 127 queueLocker->Unlock(); 128 129 // TODO: We've got a serious race condition: If BlockAndUnlock() returned due to 130 // interruption, we will still be queued. A WakeUpThread() at this point will 131 // call thread_unblock() and might thus screw with our trying to re-lock the 132 // mutex. 133 return thread_block(); 134 } 135 136 void DoIpcSet(struct msqid_ds *result) 137 { 138 fMessageQueue.msg_perm.uid = result->msg_perm.uid; 139 fMessageQueue.msg_perm.gid = result->msg_perm.gid; 140 fMessageQueue.msg_perm.mode = (fMessageQueue.msg_perm.mode & ~0x01ff) 141 | (result->msg_perm.mode & 0x01ff); 142 fMessageQueue.msg_qbytes = result->msg_qbytes; 143 fMessageQueue.msg_ctime = (time_t)real_time_clock(); 144 } 145 146 void Deque(queued_thread *queueEntry, bool waitForMessage) 147 { 148 if (queueEntry->queued) { 149 if (waitForMessage) { 150 fWaitingToReceive.Remove(queueEntry); 151 fThreadsWaitingToReceive--; 152 } else { 153 fWaitingToSend.Remove(queueEntry); 154 fThreadsWaitingToSend--; 155 } 156 } 157 } 158 159 void Enqueue(queued_thread *queueEntry, bool waitForMessage) 160 { 161 if (waitForMessage) { 162 fWaitingToReceive.Add(queueEntry); 163 fThreadsWaitingToReceive++; 164 } else { 165 fWaitingToSend.Add(queueEntry); 166 fThreadsWaitingToSend++; 167 } 168 queueEntry->queued = true; 169 } 170 171 struct msqid_ds &GetMessageQueue() 172 { 173 return fMessageQueue; 174 } 175 176 bool HasPermission() const 177 { 178 if ((fMessageQueue.msg_perm.mode & S_IWOTH) != 0) 179 return true; 180 181 uid_t uid = geteuid(); 182 if (uid == 0 || (uid == fMessageQueue.msg_perm.uid 183 && (fMessageQueue.msg_perm.mode & S_IWUSR) != 0)) 184 return true; 185 186 gid_t gid = getegid(); 187 if (gid == fMessageQueue.msg_perm.gid 188 && (fMessageQueue.msg_perm.mode & S_IWGRP) != 0) 189 return true; 190 191 return false; 192 } 193 194 bool HasReadPermission() const 195 { 196 // TODO: fix this 197 return HasPermission(); 198 } 199 200 int ID() const 201 { 202 return fID; 203 } 204 205 // Implemented after sXsiMessageCount is declared 206 bool Insert(queued_message *message); 207 208 key_t IpcKey() const 209 { 210 return fMessageQueue.msg_perm.key; 211 } 212 213 mutex &Lock() 214 { 215 return fLock; 216 } 217 218 msglen_t MaxBytes() const 219 { 220 return fMessageQueue.msg_qbytes; 221 } 222 223 // Implemented after sXsiMessageCount is declared 224 queued_message *Remove(long typeRequested); 225 226 uint32 SequenceNumber() const 227 { 228 return fSequenceNumber; 229 } 230 231 // Implemented after sMessageQueueHashTable is declared 232 void SetID(); 233 234 void SetIpcKey(key_t key) 235 { 236 fMessageQueue.msg_perm.key = key; 237 } 238 239 void SetPermissions(int flags) 240 { 241 fMessageQueue.msg_perm.uid = fMessageQueue.msg_perm.cuid = geteuid(); 242 fMessageQueue.msg_perm.gid = fMessageQueue.msg_perm.cgid = getegid(); 243 fMessageQueue.msg_perm.mode = (flags & 0x01ff); 244 } 245 246 void WakeUpThread(bool waitForMessage) 247 { 248 if (waitForMessage) { 249 // Wake up all waiting thread for a message 250 // TODO: this can cause starvation for any 251 // very-unlucky-and-slow thread 252 while (queued_thread *entry = fWaitingToReceive.RemoveHead()) { 253 entry->queued = false; 254 fThreadsWaitingToReceive--; 255 thread_unblock(entry->thread, 0); 256 } 257 } else { 258 // Wake up only one thread waiting to send 259 if (queued_thread *entry = fWaitingToSend.RemoveHead()) { 260 entry->queued = false; 261 fThreadsWaitingToSend--; 262 thread_unblock(entry->thread, 0); 263 } 264 } 265 } 266 267 XsiMessageQueue*& Link() 268 { 269 return fLink; 270 } 271 272 private: 273 msglen_t fBytesInQueue; 274 int fID; 275 mutex fLock; 276 MessageQueue fMessage; 277 struct msqid_ds fMessageQueue; 278 uint32 fSequenceNumber; 279 uint32 fThreadsWaitingToReceive; 280 uint32 fThreadsWaitingToSend; 281 282 ThreadQueue fWaitingToReceive; 283 ThreadQueue fWaitingToSend; 284 285 XsiMessageQueue* fLink; 286 }; 287 288 289 // Xsi message queue hash table 290 struct MessageQueueHashTableDefinition { 291 typedef int KeyType; 292 typedef XsiMessageQueue ValueType; 293 294 size_t HashKey (const int key) const 295 { 296 return (size_t)key; 297 } 298 299 size_t Hash(XsiMessageQueue *variable) const 300 { 301 return (size_t)variable->ID(); 302 } 303 304 bool Compare(const int key, XsiMessageQueue *variable) const 305 { 306 return (int)key == (int)variable->ID(); 307 } 308 309 XsiMessageQueue*& GetLink(XsiMessageQueue *variable) const 310 { 311 return variable->Link(); 312 } 313 }; 314 315 316 // IPC class 317 class Ipc { 318 public: 319 Ipc(key_t key) 320 : fKey(key), 321 fMessageQueueId(-1) 322 { 323 } 324 325 key_t Key() const 326 { 327 return fKey; 328 } 329 330 int MessageQueueID() const 331 { 332 return fMessageQueueId; 333 } 334 335 void SetMessageQueueID(XsiMessageQueue *messageQueue) 336 { 337 fMessageQueueId = messageQueue->ID(); 338 } 339 340 Ipc*& Link() 341 { 342 return fLink; 343 } 344 345 private: 346 key_t fKey; 347 int fMessageQueueId; 348 Ipc* fLink; 349 }; 350 351 352 struct IpcHashTableDefinition { 353 typedef key_t KeyType; 354 typedef Ipc ValueType; 355 356 size_t HashKey (const key_t key) const 357 { 358 return (size_t)(key); 359 } 360 361 size_t Hash(Ipc *variable) const 362 { 363 return (size_t)HashKey(variable->Key()); 364 } 365 366 bool Compare(const key_t key, Ipc *variable) const 367 { 368 return (key_t)key == (key_t)variable->Key(); 369 } 370 371 Ipc*& GetLink(Ipc *variable) const 372 { 373 return variable->Link(); 374 } 375 }; 376 377 // Arbitrary limits 378 #define MAX_XSI_MESSAGE 4096 379 #define MAX_XSI_MESSAGE_QUEUE 1024 380 static BOpenHashTable<IpcHashTableDefinition> sIpcHashTable; 381 static BOpenHashTable<MessageQueueHashTableDefinition> sMessageQueueHashTable; 382 383 static mutex sIpcLock; 384 static mutex sXsiMessageQueueLock; 385 386 static uint32 sGlobalSequenceNumber = 1; 387 static int32 sXsiMessageCount = 0; 388 static int32 sXsiMessageQueueCount = 0; 389 390 391 // #pragma mark - 392 393 394 XsiMessageQueue::~XsiMessageQueue() 395 { 396 mutex_destroy(&fLock); 397 398 // Wake up any threads still waiting 399 if (fThreadsWaitingToSend || fThreadsWaitingToReceive) { 400 while (queued_thread *entry = fWaitingToReceive.RemoveHead()) { 401 entry->queued = false; 402 thread_unblock(entry->thread, EIDRM); 403 } 404 while (queued_thread *entry = fWaitingToSend.RemoveHead()) { 405 entry->queued = false; 406 thread_unblock(entry->thread, EIDRM); 407 } 408 } 409 410 // Free up any remaining messages 411 if (fMessageQueue.msg_qnum) { 412 while (queued_message *message = fMessage.RemoveHead()) { 413 atomic_add(&sXsiMessageCount, -1); 414 delete message; 415 } 416 } 417 } 418 419 420 bool 421 XsiMessageQueue::Insert(queued_message *message) 422 { 423 // The only situation that would make us (potentially) wait 424 // is that we exceed with bytes or with the total number of messages 425 if (fBytesInQueue + message->length > fMessageQueue.msg_qbytes) 426 return true; 427 428 while (true) { 429 int32 oldCount = atomic_get(&sXsiMessageCount); 430 if (oldCount >= MAX_XSI_MESSAGE) 431 return true; 432 // If another thread updates the counter we keep 433 // iterating 434 if (atomic_test_and_set(&sXsiMessageCount, oldCount + 1, oldCount) 435 == oldCount) 436 break; 437 } 438 439 fMessage.Add(message); 440 fMessageQueue.msg_qnum++; 441 fMessageQueue.msg_lspid = getpid(); 442 fMessageQueue.msg_stime = real_time_clock(); 443 fBytesInQueue += message->length; 444 if (fThreadsWaitingToReceive) 445 WakeUpThread(true /* WaitForMessage */); 446 return false; 447 } 448 449 450 queued_message* 451 XsiMessageQueue::Remove(long typeRequested) 452 { 453 queued_message *message = NULL; 454 if (typeRequested < 0) { 455 // Return first message of the lowest type 456 // that is less than or equal to the absolute 457 // value of type requested. 458 MessageQueue::Iterator iterator = fMessage.GetIterator(); 459 while (iterator.HasNext()) { 460 queued_message *current = iterator.Next(); 461 if (current->type <= -typeRequested) { 462 message = iterator.Remove(); 463 break; 464 } 465 } 466 } else if (typeRequested == 0) { 467 // Return the first message on the queue 468 message = fMessage.RemoveHead(); 469 } else { 470 // Return the first message of type requested 471 MessageQueue::Iterator iterator = fMessage.GetIterator(); 472 while (iterator.HasNext()) { 473 queued_message *current = iterator.Next(); 474 if (current->type == typeRequested) { 475 message = iterator.Remove(); 476 break; 477 } 478 } 479 } 480 481 if (message == NULL) 482 return NULL; 483 484 fMessageQueue.msg_qnum--; 485 fMessageQueue.msg_lrpid = getpid(); 486 fMessageQueue.msg_rtime = real_time_clock(); 487 fBytesInQueue -= message->length; 488 atomic_add(&sXsiMessageCount, -1); 489 if (fThreadsWaitingToSend) 490 WakeUpThread(false /* WaitForMessage */); 491 return message; 492 } 493 494 495 void 496 XsiMessageQueue::SetID() 497 { 498 fID = real_time_clock(); 499 // The lock is held before calling us 500 while (true) { 501 if (sMessageQueueHashTable.Lookup(fID) == NULL) 502 break; 503 fID++; 504 } 505 sGlobalSequenceNumber = (sGlobalSequenceNumber + 1) % UINT_MAX; 506 fSequenceNumber = sGlobalSequenceNumber; 507 } 508 509 510 // #pragma mark - Kernel exported API 511 512 513 void 514 xsi_msg_init() 515 { 516 // Initialize hash tables 517 status_t status = sIpcHashTable.Init(); 518 if (status != B_OK) 519 panic("xsi_msg_init() failed to initialize ipc hash table\n"); 520 status = sMessageQueueHashTable.Init(); 521 if (status != B_OK) 522 panic("xsi_msg_init() failed to initialize message queue hash table\n"); 523 524 mutex_init(&sIpcLock, "global POSIX message queue IPC table"); 525 mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table"); 526 } 527 528 529 // #pragma mark - Syscalls 530 531 532 int 533 _user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer) 534 { 535 TRACE(("xsi_msgctl: messageQueueID = %d, command = %d\n", messageQueueID, command)); 536 MutexLocker ipcHashLocker(sIpcLock); 537 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock); 538 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 539 if (messageQueue == NULL) { 540 TRACE_ERROR(("xsi_msgctl: message queue id %d not valid\n", messageQueueID)); 541 return EINVAL; 542 } 543 if (!IS_USER_ADDRESS(buffer)) { 544 TRACE_ERROR(("xsi_msgctl: buffer address is not valid\n")); 545 return B_BAD_ADDRESS; 546 } 547 548 // Lock the message queue itself and release both the ipc hash table lock 549 // and the message queue hash table lock _only_ if the command it's not 550 // IPC_RMID, this prevents undesidered situation from happening while 551 // (hopefully) improving the concurrency. 552 MutexLocker messageQueueLocker; 553 if (command != IPC_RMID) { 554 messageQueueLocker.SetTo(&messageQueue->Lock(), false); 555 messageQueueHashLocker.Unlock(); 556 ipcHashLocker.Unlock(); 557 } else 558 // Since we are going to delete the message queue object 559 // along with its mutex, we can't use a MutexLocker object, 560 // as the mutex itself won't exist on function exit 561 mutex_lock(&messageQueue->Lock()); 562 563 switch (command) { 564 case IPC_STAT: { 565 if (!messageQueue->HasReadPermission()) { 566 TRACE_ERROR(("xsi_msgctl: calling process has not read " 567 "permission on message queue %d, key %d\n", messageQueueID, 568 (int)messageQueue->IpcKey())); 569 return EACCES; 570 } 571 struct msqid_ds msg = messageQueue->GetMessageQueue(); 572 if (user_memcpy(buffer, &msg, sizeof(struct msqid_ds)) < B_OK) { 573 TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n")); 574 return B_BAD_ADDRESS; 575 } 576 break; 577 } 578 579 case IPC_SET: { 580 if (!messageQueue->HasPermission()) { 581 TRACE_ERROR(("xsi_msgctl: calling process has not permission " 582 "on message queue %d, key %d\n", messageQueueID, 583 (int)messageQueue->IpcKey())); 584 return EPERM; 585 } 586 struct msqid_ds msg; 587 if (user_memcpy(&msg, buffer, sizeof(struct msqid_ds)) < B_OK) { 588 TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n")); 589 return B_BAD_ADDRESS; 590 } 591 if (msg.msg_qbytes > messageQueue->MaxBytes() && getuid() != 0) { 592 TRACE_ERROR(("xsi_msgctl: user does not have permission to " 593 "increase the maximum number of bytes allowed on queue\n")); 594 return EPERM; 595 } 596 if (msg.msg_qbytes == 0) { 597 TRACE_ERROR(("xsi_msgctl: can't set msg_qbytes to 0!\n")); 598 return EINVAL; 599 } 600 601 messageQueue->DoIpcSet(&msg); 602 break; 603 } 604 605 case IPC_RMID: { 606 // If this was the command, we are still holding the message 607 // queue hash table lock along with the ipc one, but not the 608 // message queue lock itself. This prevents other process 609 // to try and acquire a destroyed mutex 610 if (!messageQueue->HasPermission()) { 611 TRACE_ERROR(("xsi_msgctl: calling process has not permission " 612 "on message queue %d, key %d\n", messageQueueID, 613 (int)messageQueue->IpcKey())); 614 return EPERM; 615 } 616 key_t key = messageQueue->IpcKey(); 617 Ipc *ipcKey = NULL; 618 if (key != -1) { 619 ipcKey = sIpcHashTable.Lookup(key); 620 sIpcHashTable.Remove(ipcKey); 621 } 622 sMessageQueueHashTable.Remove(messageQueue); 623 // Wake up of any threads waiting on this 624 // queue happens in destructor 625 if (key != -1) 626 delete ipcKey; 627 atomic_add(&sXsiMessageQueueCount, -1); 628 629 delete messageQueue; 630 break; 631 } 632 633 default: 634 TRACE_ERROR(("xsi_semctl: command %d not valid\n", command)); 635 return EINVAL; 636 } 637 638 return B_OK; 639 } 640 641 642 int 643 _user_xsi_msgget(key_t key, int flags) 644 { 645 TRACE(("xsi_msgget: key = %d, flags = %d\n", (int)key, flags)); 646 XsiMessageQueue *messageQueue = NULL; 647 Ipc *ipcKey = NULL; 648 // Default assumptions 649 bool isPrivate = true; 650 bool create = true; 651 652 if (key != IPC_PRIVATE) { 653 isPrivate = false; 654 // Check if key already exist, if it does it already has a message 655 // queue associated with it 656 ipcKey = sIpcHashTable.Lookup(key); 657 if (ipcKey == NULL) { 658 if (!(flags & IPC_CREAT)) { 659 TRACE_ERROR(("xsi_msgget: key %d does not exist, but the " 660 "caller did not ask for creation\n", (int)key)); 661 return ENOENT; 662 } 663 ipcKey = new(std::nothrow) Ipc(key); 664 if (ipcKey == NULL) { 665 TRACE_ERROR(("xsi_msgget: failed to create new Ipc object " 666 "for key %d\n", (int)key)); 667 return ENOMEM; 668 } 669 sIpcHashTable.Insert(ipcKey); 670 } else { 671 // The IPC key exist and it already has a message queue 672 if ((flags & IPC_CREAT) && (flags & IPC_EXCL)) { 673 TRACE_ERROR(("xsi_msgget: key %d already exist\n", (int)key)); 674 return EEXIST; 675 } 676 int messageQueueID = ipcKey->MessageQueueID(); 677 678 MutexLocker _(sXsiMessageQueueLock); 679 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 680 if (!messageQueue->HasPermission()) { 681 TRACE_ERROR(("xsi_msgget: calling process has not permission " 682 "on message queue %d, key %d\n", messageQueue->ID(), 683 (int)key)); 684 return EACCES; 685 } 686 create = false; 687 } 688 } 689 690 if (create) { 691 // Create a new message queue for this key 692 if (atomic_get(&sXsiMessageQueueCount) >= MAX_XSI_MESSAGE_QUEUE) { 693 TRACE_ERROR(("xsi_msgget: reached limit of maximun number of " 694 "message queues\n")); 695 return ENOSPC; 696 } 697 698 messageQueue = new(std::nothrow) XsiMessageQueue(flags); 699 if (messageQueue == NULL) { 700 TRACE_ERROR(("xsi_msgget: failed to allocate new xsi " 701 "message queue\n")); 702 return ENOMEM; 703 } 704 atomic_add(&sXsiMessageQueueCount, 1); 705 706 MutexLocker _(sXsiMessageQueueLock); 707 messageQueue->SetID(); 708 if (isPrivate) 709 messageQueue->SetIpcKey((key_t)-1); 710 else { 711 messageQueue->SetIpcKey(key); 712 ipcKey->SetMessageQueueID(messageQueue); 713 } 714 sMessageQueueHashTable.Insert(messageQueue); 715 } 716 717 return messageQueue->ID(); 718 } 719 720 721 ssize_t 722 _user_xsi_msgrcv(int messageQueueID, void *messagePointer, 723 size_t messageSize, long messageType, int messageFlags) 724 { 725 TRACE(("xsi_msgrcv: messageQueueID = %d, messageSize = %ld\n", 726 messageQueueID, messageSize)); 727 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock); 728 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 729 if (messageQueue == NULL) { 730 TRACE_ERROR(("xsi_msgrcv: message queue id %d not valid\n", 731 messageQueueID)); 732 return EINVAL; 733 } 734 MutexLocker messageQueueLocker(messageQueue->Lock()); 735 messageQueueHashLocker.Unlock(); 736 737 if (messageSize > MAX_BYTES_PER_QUEUE) { 738 TRACE_ERROR(("xsi_msgrcv: message size is out of range\n")); 739 return EINVAL; 740 } 741 if (!messageQueue->HasPermission()) { 742 TRACE_ERROR(("xsi_msgrcv: calling process has not permission " 743 "on message queue id %d, key %d\n", messageQueueID, 744 (int)messageQueue->IpcKey())); 745 return EACCES; 746 } 747 if (!IS_USER_ADDRESS(messagePointer)) { 748 TRACE_ERROR(("xsi_msgrcv: message address is not valid\n")); 749 return B_BAD_ADDRESS; 750 } 751 752 queued_message *message = NULL; 753 while (true) { 754 message = messageQueue->Remove(messageType); 755 756 if (message == NULL && !(messageFlags & IPC_NOWAIT)) { 757 // We are going to sleep 758 Thread *thread = thread_get_current_thread(); 759 queued_thread queueEntry(thread, messageSize); 760 messageQueue->Enqueue(&queueEntry, /* waitForMessage */ true); 761 762 uint32 sequenceNumber = messageQueue->SequenceNumber(); 763 764 TRACE(("xsi_msgrcv: thread %d going to sleep\n", (int)thread->id)); 765 status_t result 766 = messageQueue->BlockAndUnlock(thread, &messageQueueLocker); 767 TRACE(("xsi_msgrcv: thread %d back to life\n", (int)thread->id)); 768 769 messageQueueHashLocker.Lock(); 770 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 771 if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL 772 && sequenceNumber != messageQueue->SequenceNumber())) { 773 TRACE_ERROR(("xsi_msgrcv: message queue id %d (sequence = " 774 "%" B_PRIu32 ") got destroyed\n", messageQueueID, 775 sequenceNumber)); 776 return EIDRM; 777 } else if (result == B_INTERRUPTED) { 778 TRACE_ERROR(("xsi_msgrcv: thread %d got interrupted while " 779 "waiting on message queue %d\n",(int)thread->id, 780 messageQueueID)); 781 messageQueue->Deque(&queueEntry, /* waitForMessage */ true); 782 return EINTR; 783 } else { 784 messageQueueLocker.Lock(); 785 messageQueueHashLocker.Unlock(); 786 } 787 } else if (message == NULL) { 788 // There is not message of type requested and 789 // we can't wait 790 return ENOMSG; 791 } else { 792 // Message received correctly (so far) 793 if ((ssize_t)messageSize < message->length 794 && !(messageFlags & MSG_NOERROR)) { 795 TRACE_ERROR(("xsi_msgrcv: message too big!\n")); 796 // Put the message back inside. Since we hold the 797 // queue message lock, not one else could have filled 798 // up the queue meanwhile 799 messageQueue->Insert(message); 800 return E2BIG; 801 } 802 803 ssize_t result 804 = message->copy_to_user_buffer(messagePointer, messageSize); 805 if (result < 0) { 806 messageQueue->Insert(message); 807 return B_BAD_ADDRESS; 808 } 809 810 delete message; 811 TRACE(("xsi_msgrcv: message received correctly\n")); 812 return result; 813 } 814 } 815 816 return B_OK; 817 } 818 819 820 int 821 _user_xsi_msgsnd(int messageQueueID, const void *messagePointer, 822 size_t messageSize, int messageFlags) 823 { 824 TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %ld\n", 825 messageQueueID, messageSize)); 826 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock); 827 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 828 if (messageQueue == NULL) { 829 TRACE_ERROR(("xsi_msgsnd: message queue id %d not valid\n", 830 messageQueueID)); 831 return EINVAL; 832 } 833 MutexLocker messageQueueLocker(messageQueue->Lock()); 834 messageQueueHashLocker.Unlock(); 835 836 if (messageSize > MAX_BYTES_PER_QUEUE) { 837 TRACE_ERROR(("xsi_msgsnd: message size is out of range\n")); 838 return EINVAL; 839 } 840 if (!messageQueue->HasPermission()) { 841 TRACE_ERROR(("xsi_msgsnd: calling process has not permission " 842 "on message queue id %d, key %d\n", messageQueueID, 843 (int)messageQueue->IpcKey())); 844 return EACCES; 845 } 846 if (!IS_USER_ADDRESS(messagePointer)) { 847 TRACE_ERROR(("xsi_msgsnd: message address is not valid\n")); 848 return B_BAD_ADDRESS; 849 } 850 851 queued_message *message 852 = new(std::nothrow) queued_message(messagePointer, messageSize); 853 if (message == NULL || message->initOK != true) { 854 TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n")); 855 delete message; 856 return ENOMEM; 857 } 858 859 bool notSent = true; 860 status_t result = B_OK; 861 while (notSent) { 862 bool goToSleep = messageQueue->Insert(message); 863 864 if (goToSleep && !(messageFlags & IPC_NOWAIT)) { 865 // We are going to sleep 866 Thread *thread = thread_get_current_thread(); 867 queued_thread queueEntry(thread, messageSize); 868 messageQueue->Enqueue(&queueEntry, /* waitForMessage */ false); 869 870 uint32 sequenceNumber = messageQueue->SequenceNumber(); 871 872 TRACE(("xsi_msgsnd: thread %d going to sleep\n", (int)thread->id)); 873 result = messageQueue->BlockAndUnlock(thread, &messageQueueLocker); 874 TRACE(("xsi_msgsnd: thread %d back to life\n", (int)thread->id)); 875 876 messageQueueHashLocker.Lock(); 877 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); 878 if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL 879 && sequenceNumber != messageQueue->SequenceNumber())) { 880 TRACE_ERROR(("xsi_msgsnd: message queue id %d (sequence = " 881 "%" B_PRIu32 ") got destroyed\n", messageQueueID, 882 sequenceNumber)); 883 delete message; 884 notSent = false; 885 result = EIDRM; 886 } else if (result == B_INTERRUPTED) { 887 TRACE_ERROR(("xsi_msgsnd: thread %d got interrupted while " 888 "waiting on message queue %d\n",(int)thread->id, 889 messageQueueID)); 890 messageQueue->Deque(&queueEntry, /* waitForMessage */ false); 891 delete message; 892 notSent = false; 893 result = EINTR; 894 } else { 895 messageQueueLocker.Lock(); 896 messageQueueHashLocker.Unlock(); 897 } 898 } else if (goToSleep) { 899 // We did not send the message and we can't wait 900 delete message; 901 notSent = false; 902 result = EAGAIN; 903 } else { 904 // Message delivered correctly 905 TRACE(("xsi_msgsnd: message sent correctly\n")); 906 notSent = false; 907 } 908 } 909 910 return result; 911 } 912