1 /* 2 * Copyright 2012 Haiku, Inc. All rights reserved. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Paweł Dziepak, pdziepak@quarnos.org 7 */ 8 9 10 #include "WorkQueue.h" 11 12 #include <io_requests.h> 13 14 15 WorkQueue* gWorkQueue = NULL; 16 17 18 WorkQueue::WorkQueue() 19 : 20 fQueueSemaphore(create_sem(0, NULL)), 21 fThreadCancel(create_sem(0, NULL)) 22 { 23 mutex_init(&fQueueLock, NULL); 24 25 fThread = spawn_kernel_thread(&WorkQueue::LaunchWorkingThread, 26 "NFSv4 Work Queue", B_NORMAL_PRIORITY, this); 27 if (fThread < B_OK) { 28 fInitError = fThread; 29 return; 30 } 31 32 status_t result = resume_thread(fThread); 33 if (result != B_OK) { 34 kill_thread(fThread); 35 fInitError = result; 36 return; 37 } 38 39 fInitError = B_OK; 40 } 41 42 43 WorkQueue::~WorkQueue() 44 { 45 release_sem(fThreadCancel); 46 47 status_t result; 48 wait_for_thread(fThread, &result); 49 50 mutex_destroy(&fQueueLock); 51 delete_sem(fThreadCancel); 52 delete_sem(fQueueSemaphore); 53 } 54 55 56 status_t 57 WorkQueue::EnqueueJob(JobType type, void* args) 58 { 59 WorkQueueEntry* entry = new(std::nothrow) WorkQueueEntry; 60 if (entry == NULL) 61 return B_NO_MEMORY; 62 63 entry->fType = type; 64 entry->fArguments = args; 65 MutexLocker locker(fQueueLock); 66 fQueue.InsertAfter(fQueue.Tail(), entry); 67 locker.Unlock(); 68 69 release_sem(fQueueSemaphore); 70 return B_OK; 71 } 72 73 74 status_t 75 WorkQueue::LaunchWorkingThread(void* object) 76 { 77 WorkQueue* queue = reinterpret_cast<WorkQueue*>(object); 78 return queue->WorkingThread(); 79 } 80 81 82 status_t 83 WorkQueue::WorkingThread() 84 { 85 while (true) { 86 object_wait_info object[2]; 87 object[0].object = fThreadCancel; 88 object[0].type = B_OBJECT_TYPE_SEMAPHORE; 89 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 90 91 object[1].object = fQueueSemaphore; 92 object[1].type = B_OBJECT_TYPE_SEMAPHORE; 93 object[1].events = B_EVENT_ACQUIRE_SEMAPHORE; 94 95 status_t result = wait_for_objects(object, 2); 96 97 if (result < B_OK || 98 (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) { 99 return result; 100 } else if ((object[1].events & B_EVENT_ACQUIRE_SEMAPHORE) == 0) 101 continue; 102 103 acquire_sem(fQueueSemaphore); 104 105 DequeueJob(); 106 } 107 108 return B_OK; 109 } 110 111 112 void 113 WorkQueue::DequeueJob() 114 { 115 MutexLocker locker(fQueueLock); 116 117 WorkQueueEntry* entry = fQueue.RemoveHead(); 118 119 void* args = entry->fArguments; 120 switch (entry->fType) { 121 case DelegationRecall: 122 JobRecall(reinterpret_cast<DelegationRecallArgs*>(args)); 123 break; 124 case IORequest: 125 JobIO(reinterpret_cast<IORequestArgs*>(args)); 126 break; 127 } 128 129 delete entry; 130 } 131 132 133 void 134 WorkQueue::JobRecall(DelegationRecallArgs* args) 135 { 136 args->fDelegation->GetInode()->RecallDelegation(args->fTruncate); 137 } 138 139 140 void 141 WorkQueue::JobIO(IORequestArgs* args) 142 { 143 uint64 offset = io_request_offset(args->fRequest); 144 uint64 length = io_request_length(args->fRequest); 145 146 char* buffer = reinterpret_cast<char*>(malloc(length)); 147 if (buffer == NULL) { 148 notify_io_request(args->fRequest, B_NO_MEMORY); 149 return; 150 } 151 152 bool eof = false; 153 uint64 size = 0; 154 status_t result; 155 if (io_request_is_write(args->fRequest)) { 156 if (offset + length > args->fInode->MaxFileSize()) 157 length = args->fInode->MaxFileSize() - offset; 158 159 result = read_from_io_request(args->fRequest, buffer, length); 160 do { 161 size_t bytesWritten = length - size; 162 result = args->fInode->WriteDirect(NULL, offset + size, 163 buffer + size, &bytesWritten); 164 size += bytesWritten; 165 } while (size < length && result == B_OK); 166 } else { 167 do { 168 size_t bytesRead = length - size; 169 result = args->fInode->ReadDirect(NULL, offset + size, buffer, 170 &bytesRead, &eof); 171 if (result != B_OK) 172 break; 173 174 result = write_to_io_request(args->fRequest, buffer, bytesRead); 175 if (result != B_OK) 176 break; 177 178 size += bytesRead; 179 } while (size < length && result == B_OK && !eof); 180 181 } 182 free(buffer); 183 184 notify_io_request(args->fRequest, result); 185 } 186 187