1 /* 2 * Copyright 2012 Haiku, Inc. All rights reserved. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Paweł Dziepak, pdziepak@quarnos.org 7 */ 8 9 10 #include "WorkQueue.h" 11 12 #include <io_requests.h> 13 14 15 #define MAX_BUFFER_SIZE (1024 * 1024) 16 17 WorkQueue* gWorkQueue = NULL; 18 19 20 WorkQueue::WorkQueue() 21 : 22 fQueueSemaphore(create_sem(0, NULL)), 23 fThreadCancel(create_sem(0, NULL)) 24 { 25 mutex_init(&fQueueLock, NULL); 26 27 fThread = spawn_kernel_thread(&WorkQueue::LaunchWorkingThread, 28 "NFSv4 Work Queue", B_NORMAL_PRIORITY, this); 29 if (fThread < B_OK) { 30 fInitError = fThread; 31 return; 32 } 33 34 status_t result = resume_thread(fThread); 35 if (result != B_OK) { 36 kill_thread(fThread); 37 fInitError = result; 38 return; 39 } 40 41 fInitError = B_OK; 42 } 43 44 45 WorkQueue::~WorkQueue() 46 { 47 release_sem(fThreadCancel); 48 49 status_t result; 50 wait_for_thread(fThread, &result); 51 52 mutex_destroy(&fQueueLock); 53 delete_sem(fThreadCancel); 54 delete_sem(fQueueSemaphore); 55 } 56 57 58 status_t 59 WorkQueue::EnqueueJob(JobType type, void* args) 60 { 61 WorkQueueEntry* entry = new(std::nothrow) WorkQueueEntry; 62 if (entry == NULL) 63 return B_NO_MEMORY; 64 65 entry->fType = type; 66 entry->fArguments = args; 67 if (type == IORequest) 68 reinterpret_cast<IORequestArgs*>(args)->fInode->BeginAIOOp(); 69 70 MutexLocker locker(fQueueLock); 71 fQueue.InsertAfter(fQueue.Tail(), entry); 72 locker.Unlock(); 73 74 release_sem(fQueueSemaphore); 75 return B_OK; 76 } 77 78 79 status_t 80 WorkQueue::LaunchWorkingThread(void* object) 81 { 82 ASSERT(object != NULL); 83 84 WorkQueue* queue = reinterpret_cast<WorkQueue*>(object); 85 return queue->WorkingThread(); 86 } 87 88 89 status_t 90 WorkQueue::WorkingThread() 91 { 92 while (true) { 93 object_wait_info object[2]; 94 object[0].object = fThreadCancel; 95 object[0].type = B_OBJECT_TYPE_SEMAPHORE; 96 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 97 98 object[1].object = fQueueSemaphore; 99 object[1].type = B_OBJECT_TYPE_SEMAPHORE; 100 object[1].events = B_EVENT_ACQUIRE_SEMAPHORE; 101 102 status_t result = wait_for_objects(object, 2); 103 104 if (result < B_OK 105 || (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) { 106 return result; 107 } else if ((object[1].events & B_EVENT_ACQUIRE_SEMAPHORE) == 0) 108 continue; 109 110 acquire_sem(fQueueSemaphore); 111 112 DequeueJob(); 113 } 114 115 return B_OK; 116 } 117 118 119 void 120 WorkQueue::DequeueJob() 121 { 122 MutexLocker locker(fQueueLock); 123 WorkQueueEntry* entry = fQueue.RemoveHead(); 124 locker.Unlock(); 125 ASSERT(entry != NULL); 126 127 void* args = entry->fArguments; 128 switch (entry->fType) { 129 case DelegationRecall: 130 JobRecall(reinterpret_cast<DelegationRecallArgs*>(args)); 131 break; 132 case IORequest: 133 JobIO(reinterpret_cast<IORequestArgs*>(args)); 134 break; 135 } 136 137 delete entry; 138 } 139 140 141 void 142 WorkQueue::JobRecall(DelegationRecallArgs* args) 143 { 144 ASSERT(args != NULL); 145 args->fDelegation->GetInode()->RecallDelegation(args->fTruncate); 146 } 147 148 149 void 150 WorkQueue::JobIO(IORequestArgs* args) 151 { 152 ASSERT(args != NULL); 153 154 uint64 offset = io_request_offset(args->fRequest); 155 uint64 length = io_request_length(args->fRequest); 156 157 size_t bufferLength = min_c(MAX_BUFFER_SIZE, length); 158 char* buffer = reinterpret_cast<char*>(malloc(bufferLength)); 159 if (buffer == NULL) { 160 notify_io_request(args->fRequest, B_NO_MEMORY); 161 args->fInode->EndAIOOp(); 162 return; 163 } 164 165 status_t result; 166 if (io_request_is_write(args->fRequest)) { 167 if (offset + length > args->fInode->MaxFileSize()) 168 length = args->fInode->MaxFileSize() - offset; 169 170 uint64 position = 0; 171 do { 172 size_t size = 0; 173 size_t thisBufferLength = min_c(bufferLength, length - position); 174 175 result = read_from_io_request(args->fRequest, buffer, 176 thisBufferLength); 177 178 while (size < thisBufferLength && result == B_OK) { 179 size_t bytesWritten = thisBufferLength - size; 180 result = args->fInode->WriteDirect(NULL, 181 offset + position + size, buffer + size, &bytesWritten); 182 size += bytesWritten; 183 } 184 185 position += thisBufferLength; 186 } while (position < length && result == B_OK); 187 } else { 188 bool eof = false; 189 uint64 position = 0; 190 do { 191 size_t size = 0; 192 size_t thisBufferLength = min_c(bufferLength, length - position); 193 194 do { 195 size_t bytesRead = thisBufferLength - size; 196 result = args->fInode->ReadDirect(NULL, 197 offset + position + size, buffer + size, &bytesRead, &eof); 198 if (result != B_OK) 199 break; 200 201 result = write_to_io_request(args->fRequest, buffer + size, 202 bytesRead); 203 if (result != B_OK) 204 break; 205 206 size += bytesRead; 207 } while (size < length && result == B_OK && !eof); 208 209 position += thisBufferLength; 210 } while (position < length && result == B_OK && !eof); 211 } 212 213 free(buffer); 214 215 notify_io_request(args->fRequest, result); 216 args->fInode->EndAIOOp(); 217 } 218 219