1 /* 2 * Copyright 2012 Haiku, Inc. All rights reserved. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Paweł Dziepak, pdziepak@quarnos.org 7 */ 8 9 10 #include "WorkQueue.h" 11 12 #include <io_requests.h> 13 14 15 WorkQueue* gWorkQueue = NULL; 16 17 18 WorkQueue::WorkQueue() 19 : 20 fQueueSemaphore(create_sem(0, NULL)), 21 fThreadCancel(create_sem(0, NULL)) 22 { 23 mutex_init(&fQueueLock, NULL); 24 25 fThread = spawn_kernel_thread(&WorkQueue::LaunchWorkingThread, 26 "NFSv4 Work Queue", B_NORMAL_PRIORITY, this); 27 if (fThread < B_OK) { 28 fInitError = fThread; 29 return; 30 } 31 32 status_t result = resume_thread(fThread); 33 if (result != B_OK) { 34 kill_thread(fThread); 35 fInitError = result; 36 return; 37 } 38 39 fInitError = B_OK; 40 } 41 42 43 WorkQueue::~WorkQueue() 44 { 45 release_sem(fThreadCancel); 46 47 status_t result; 48 wait_for_thread(fThread, &result); 49 50 mutex_destroy(&fQueueLock); 51 delete_sem(fThreadCancel); 52 delete_sem(fQueueSemaphore); 53 } 54 55 56 status_t 57 WorkQueue::EnqueueJob(JobType type, void* args) 58 { 59 WorkQueueEntry* entry = new(std::nothrow) WorkQueueEntry; 60 if (entry == NULL) 61 return B_NO_MEMORY; 62 63 entry->fType = type; 64 entry->fArguments = args; 65 if (type == IORequest) 66 reinterpret_cast<IORequestArgs*>(args)->fInode->BeginAIOOp(); 67 68 MutexLocker locker(fQueueLock); 69 fQueue.InsertAfter(fQueue.Tail(), entry); 70 locker.Unlock(); 71 72 release_sem(fQueueSemaphore); 73 return B_OK; 74 } 75 76 77 status_t 78 WorkQueue::LaunchWorkingThread(void* object) 79 { 80 ASSERT(object != NULL); 81 82 WorkQueue* queue = reinterpret_cast<WorkQueue*>(object); 83 return queue->WorkingThread(); 84 } 85 86 87 status_t 88 WorkQueue::WorkingThread() 89 { 90 while (true) { 91 object_wait_info object[2]; 92 object[0].object = fThreadCancel; 93 object[0].type = B_OBJECT_TYPE_SEMAPHORE; 94 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 95 96 object[1].object = fQueueSemaphore; 97 object[1].type = B_OBJECT_TYPE_SEMAPHORE; 98 object[1].events = B_EVENT_ACQUIRE_SEMAPHORE; 99 100 status_t result = wait_for_objects(object, 2); 101 102 if (result < B_OK 103 || (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) { 104 return result; 105 } else if ((object[1].events & B_EVENT_ACQUIRE_SEMAPHORE) == 0) 106 continue; 107 108 acquire_sem(fQueueSemaphore); 109 110 DequeueJob(); 111 } 112 113 return B_OK; 114 } 115 116 117 void 118 WorkQueue::DequeueJob() 119 { 120 MutexLocker locker(fQueueLock); 121 WorkQueueEntry* entry = fQueue.RemoveHead(); 122 locker.Unlock(); 123 ASSERT(entry != NULL); 124 125 void* args = entry->fArguments; 126 switch (entry->fType) { 127 case DelegationRecall: 128 JobRecall(reinterpret_cast<DelegationRecallArgs*>(args)); 129 break; 130 case IORequest: 131 JobIO(reinterpret_cast<IORequestArgs*>(args)); 132 break; 133 } 134 135 delete entry; 136 } 137 138 139 void 140 WorkQueue::JobRecall(DelegationRecallArgs* args) 141 { 142 ASSERT(args != NULL); 143 args->fDelegation->GetInode()->RecallDelegation(args->fTruncate); 144 } 145 146 147 void 148 WorkQueue::JobIO(IORequestArgs* args) 149 { 150 ASSERT(args != NULL); 151 152 uint64 offset = io_request_offset(args->fRequest); 153 uint64 length = io_request_length(args->fRequest); 154 155 char* buffer = reinterpret_cast<char*>(malloc(length)); 156 if (buffer == NULL) { 157 notify_io_request(args->fRequest, B_NO_MEMORY); 158 args->fInode->EndAIOOp(); 159 return; 160 } 161 162 bool eof = false; 163 uint64 size = 0; 164 status_t result; 165 if (io_request_is_write(args->fRequest)) { 166 if (offset + length > args->fInode->MaxFileSize()) 167 length = args->fInode->MaxFileSize() - offset; 168 169 result = read_from_io_request(args->fRequest, buffer, length); 170 do { 171 size_t bytesWritten = length - size; 172 result = args->fInode->WriteDirect(NULL, offset + size, 173 buffer + size, &bytesWritten); 174 size += bytesWritten; 175 } while (size < length && result == B_OK); 176 } else { 177 do { 178 size_t bytesRead = length - size; 179 result = args->fInode->ReadDirect(NULL, offset + size, buffer, 180 &bytesRead, &eof); 181 if (result != B_OK) 182 break; 183 184 result = write_to_io_request(args->fRequest, buffer, bytesRead); 185 if (result != B_OK) 186 break; 187 188 size += bytesRead; 189 } while (size < length && result == B_OK && !eof); 190 191 } 192 free(buffer); 193 194 notify_io_request(args->fRequest, result); 195 args->fInode->EndAIOOp(); 196 } 197 198