1 /*
2 * Copyright 2012 Haiku, Inc. All rights reserved.
3 * Distributed under the terms of the MIT License.
4 *
5 * Authors:
6 * Paweł Dziepak, pdziepak@quarnos.org
7 */
8
9
10 #include "WorkQueue.h"
11
12 #include <io_requests.h>
13
14
15 #define MAX_BUFFER_SIZE (1024 * 1024)
16
17 WorkQueue* gWorkQueue = NULL;
18
19
WorkQueue()20 WorkQueue::WorkQueue()
21 :
22 fQueueSemaphore(create_sem(0, NULL)),
23 fThreadCancel(create_sem(0, NULL))
24 {
25 mutex_init(&fQueueLock, NULL);
26
27 fThread = spawn_kernel_thread(&WorkQueue::LaunchWorkingThread,
28 "NFSv4 Work Queue", B_NORMAL_PRIORITY, this);
29 if (fThread < B_OK) {
30 fInitError = fThread;
31 return;
32 }
33
34 status_t result = resume_thread(fThread);
35 if (result != B_OK) {
36 kill_thread(fThread);
37 fInitError = result;
38 return;
39 }
40
41 fInitError = B_OK;
42 }
43
44
~WorkQueue()45 WorkQueue::~WorkQueue()
46 {
47 release_sem(fThreadCancel);
48
49 status_t result;
50 wait_for_thread(fThread, &result);
51
52 mutex_destroy(&fQueueLock);
53 delete_sem(fThreadCancel);
54 delete_sem(fQueueSemaphore);
55 }
56
57
58 status_t
EnqueueJob(JobType type,void * args)59 WorkQueue::EnqueueJob(JobType type, void* args)
60 {
61 WorkQueueEntry* entry = new(std::nothrow) WorkQueueEntry;
62 if (entry == NULL)
63 return B_NO_MEMORY;
64
65 entry->fType = type;
66 entry->fArguments = args;
67 if (type == IORequest)
68 reinterpret_cast<IORequestArgs*>(args)->fInode->BeginAIOOp();
69
70 MutexLocker locker(fQueueLock);
71 fQueue.InsertAfter(fQueue.Tail(), entry);
72 locker.Unlock();
73
74 release_sem(fQueueSemaphore);
75 return B_OK;
76 }
77
78
79 status_t
LaunchWorkingThread(void * object)80 WorkQueue::LaunchWorkingThread(void* object)
81 {
82 ASSERT(object != NULL);
83
84 WorkQueue* queue = reinterpret_cast<WorkQueue*>(object);
85 return queue->WorkingThread();
86 }
87
88
89 status_t
WorkingThread()90 WorkQueue::WorkingThread()
91 {
92 while (true) {
93 object_wait_info object[2];
94 object[0].object = fThreadCancel;
95 object[0].type = B_OBJECT_TYPE_SEMAPHORE;
96 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
97
98 object[1].object = fQueueSemaphore;
99 object[1].type = B_OBJECT_TYPE_SEMAPHORE;
100 object[1].events = B_EVENT_ACQUIRE_SEMAPHORE;
101
102 status_t result = wait_for_objects(object, 2);
103
104 if (result < B_OK
105 || (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) {
106 return result;
107 } else if ((object[1].events & B_EVENT_ACQUIRE_SEMAPHORE) == 0)
108 continue;
109
110 acquire_sem(fQueueSemaphore);
111
112 DequeueJob();
113 }
114
115 return B_OK;
116 }
117
118
119 void
DequeueJob()120 WorkQueue::DequeueJob()
121 {
122 MutexLocker locker(fQueueLock);
123 WorkQueueEntry* entry = fQueue.RemoveHead();
124 locker.Unlock();
125 ASSERT(entry != NULL);
126
127 void* args = entry->fArguments;
128 switch (entry->fType) {
129 case DelegationRecall:
130 JobRecall(reinterpret_cast<DelegationRecallArgs*>(args));
131 break;
132 case IORequest:
133 JobIO(reinterpret_cast<IORequestArgs*>(args));
134 break;
135 }
136
137 delete entry;
138 }
139
140
141 void
JobRecall(DelegationRecallArgs * args)142 WorkQueue::JobRecall(DelegationRecallArgs* args)
143 {
144 ASSERT(args != NULL);
145 args->fDelegation->GetInode()->RecallDelegation(args->fTruncate);
146 }
147
148
149 void
JobIO(IORequestArgs * args)150 WorkQueue::JobIO(IORequestArgs* args)
151 {
152 ASSERT(args != NULL);
153
154 uint64 offset = io_request_offset(args->fRequest);
155 uint64 length = io_request_length(args->fRequest);
156
157 size_t bufferLength = min_c(MAX_BUFFER_SIZE, length);
158 char* buffer = reinterpret_cast<char*>(malloc(bufferLength));
159 if (buffer == NULL) {
160 notify_io_request(args->fRequest, B_NO_MEMORY);
161 args->fInode->EndAIOOp();
162 return;
163 }
164
165 status_t result;
166 if (io_request_is_write(args->fRequest)) {
167 if (offset + length > args->fInode->MaxFileSize())
168 length = args->fInode->MaxFileSize() - offset;
169
170 uint64 position = 0;
171 do {
172 size_t size = 0;
173 size_t thisBufferLength = min_c(bufferLength, length - position);
174
175 result = read_from_io_request(args->fRequest, buffer,
176 thisBufferLength);
177
178 while (size < thisBufferLength && result == B_OK) {
179 size_t bytesWritten = thisBufferLength - size;
180 result = args->fInode->WriteDirect(NULL,
181 offset + position + size, buffer + size, &bytesWritten);
182 size += bytesWritten;
183 }
184
185 position += thisBufferLength;
186 } while (position < length && result == B_OK);
187 } else {
188 bool eof = false;
189 uint64 position = 0;
190 do {
191 size_t size = 0;
192 size_t thisBufferLength = min_c(bufferLength, length - position);
193
194 do {
195 size_t bytesRead = thisBufferLength - size;
196 result = args->fInode->ReadDirect(NULL,
197 offset + position + size, buffer + size, &bytesRead, &eof);
198 if (result != B_OK)
199 break;
200
201 result = write_to_io_request(args->fRequest, buffer + size,
202 bytesRead);
203 if (result != B_OK)
204 break;
205
206 size += bytesRead;
207 } while (size < length && result == B_OK && !eof);
208
209 position += thisBufferLength;
210 } while (position < length && result == B_OK && !eof);
211 }
212
213 free(buffer);
214
215 notify_io_request(args->fRequest, result);
216 args->fInode->EndAIOOp();
217 }
218
219