1 /* 2 * Copyright 2012 Haiku, Inc. All rights reserved. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Paweł Dziepak, pdziepak@quarnos.org 7 */ 8 9 10 #include "WorkQueue.h" 11 12 #include <io_requests.h> 13 14 15 WorkQueue* gWorkQueue = NULL; 16 17 18 WorkQueue::WorkQueue() 19 : 20 fQueueSemaphore(create_sem(0, NULL)), 21 fThreadCancel(create_sem(0, NULL)) 22 { 23 mutex_init(&fQueueLock, NULL); 24 25 fThread = spawn_kernel_thread(&WorkQueue::LaunchWorkingThread, 26 "NFSv4 Work Queue", B_NORMAL_PRIORITY, this); 27 if (fThread < B_OK) { 28 fInitError = fThread; 29 return; 30 } 31 32 status_t result = resume_thread(fThread); 33 if (result != B_OK) { 34 kill_thread(fThread); 35 fInitError = result; 36 return; 37 } 38 39 fInitError = B_OK; 40 } 41 42 43 WorkQueue::~WorkQueue() 44 { 45 release_sem(fThreadCancel); 46 47 status_t result; 48 wait_for_thread(fThread, &result); 49 50 mutex_destroy(&fQueueLock); 51 delete_sem(fThreadCancel); 52 delete_sem(fQueueSemaphore); 53 } 54 55 56 status_t 57 WorkQueue::EnqueueJob(JobType type, void* args) 58 { 59 WorkQueueEntry* entry = new(std::nothrow) WorkQueueEntry; 60 if (entry == NULL) 61 return B_NO_MEMORY; 62 63 entry->fType = type; 64 entry->fArguments = args; 65 MutexLocker locker(fQueueLock); 66 fQueue.InsertAfter(fQueue.Tail(), entry); 67 locker.Unlock(); 68 69 release_sem(fQueueSemaphore); 70 return B_OK; 71 } 72 73 74 status_t 75 WorkQueue::LaunchWorkingThread(void* object) 76 { 77 ASSERT(object != NULL); 78 79 WorkQueue* queue = reinterpret_cast<WorkQueue*>(object); 80 return queue->WorkingThread(); 81 } 82 83 84 status_t 85 WorkQueue::WorkingThread() 86 { 87 while (true) { 88 object_wait_info object[2]; 89 object[0].object = fThreadCancel; 90 object[0].type = B_OBJECT_TYPE_SEMAPHORE; 91 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 92 93 object[1].object = fQueueSemaphore; 94 object[1].type = B_OBJECT_TYPE_SEMAPHORE; 95 object[1].events = B_EVENT_ACQUIRE_SEMAPHORE; 96 97 status_t result = wait_for_objects(object, 2); 98 99 if (result < B_OK 100 || (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) { 101 return result; 102 } else if ((object[1].events & B_EVENT_ACQUIRE_SEMAPHORE) == 0) 103 continue; 104 105 acquire_sem(fQueueSemaphore); 106 107 DequeueJob(); 108 } 109 110 return B_OK; 111 } 112 113 114 void 115 WorkQueue::DequeueJob() 116 { 117 MutexLocker locker(fQueueLock); 118 WorkQueueEntry* entry = fQueue.RemoveHead(); 119 locker.Unlock(); 120 ASSERT(entry != NULL); 121 122 void* args = entry->fArguments; 123 switch (entry->fType) { 124 case DelegationRecall: 125 JobRecall(reinterpret_cast<DelegationRecallArgs*>(args)); 126 break; 127 case IORequest: 128 JobIO(reinterpret_cast<IORequestArgs*>(args)); 129 break; 130 } 131 132 delete entry; 133 } 134 135 136 void 137 WorkQueue::JobRecall(DelegationRecallArgs* args) 138 { 139 ASSERT(args != NULL); 140 args->fDelegation->GetInode()->RecallDelegation(args->fTruncate); 141 } 142 143 144 void 145 WorkQueue::JobIO(IORequestArgs* args) 146 { 147 ASSERT(args != NULL); 148 149 uint64 offset = io_request_offset(args->fRequest); 150 uint64 length = io_request_length(args->fRequest); 151 152 char* buffer = reinterpret_cast<char*>(malloc(length)); 153 if (buffer == NULL) { 154 notify_io_request(args->fRequest, B_NO_MEMORY); 155 return; 156 } 157 158 bool eof = false; 159 uint64 size = 0; 160 status_t result; 161 if (io_request_is_write(args->fRequest)) { 162 if (offset + length > args->fInode->MaxFileSize()) 163 length = args->fInode->MaxFileSize() - offset; 164 165 result = read_from_io_request(args->fRequest, buffer, length); 166 do { 167 size_t bytesWritten = length - size; 168 result = args->fInode->WriteDirect(NULL, offset + size, 169 buffer + size, &bytesWritten); 170 size += bytesWritten; 171 } while (size < length && result == B_OK); 172 } else { 173 do { 174 size_t bytesRead = length - size; 175 result = args->fInode->ReadDirect(NULL, offset + size, buffer, 176 &bytesRead, &eof); 177 if (result != B_OK) 178 break; 179 180 result = write_to_io_request(args->fRequest, buffer, bytesRead); 181 if (result != B_OK) 182 break; 183 184 size += bytesRead; 185 } while (size < length && result == B_OK && !eof); 186 187 } 188 free(buffer); 189 190 notify_io_request(args->fRequest, result); 191 } 192 193