xref: /haiku/src/add-ons/kernel/file_systems/nfs4/WorkQueue.cpp (revision e81a954787e50e56a7f06f72705b7859b6ab06d1)
1 /*
2  * Copyright 2012 Haiku, Inc. All rights reserved.
3  * Distributed under the terms of the MIT License.
4  *
5  * Authors:
6  *		Paweł Dziepak, pdziepak@quarnos.org
7  */
8 
9 
10 #include "WorkQueue.h"
11 
12 #include <io_requests.h>
13 
14 
15 #define	MAX_BUFFER_SIZE			(1024 * 1024)
16 
17 WorkQueue*		gWorkQueue		= NULL;
18 
19 
20 WorkQueue::WorkQueue()
21 	:
22 	fQueueSemaphore(create_sem(0, NULL)),
23 	fThreadCancel(create_sem(0, NULL))
24 {
25 	mutex_init(&fQueueLock, NULL);
26 
27 	fThread = spawn_kernel_thread(&WorkQueue::LaunchWorkingThread,
28 		"NFSv4 Work Queue", B_NORMAL_PRIORITY, this);
29 	if (fThread < B_OK) {
30 		fInitError = fThread;
31 		return;
32 	}
33 
34 	status_t result = resume_thread(fThread);
35 	if (result != B_OK) {
36 		kill_thread(fThread);
37 		fInitError = result;
38 		return;
39 	}
40 
41 	fInitError = B_OK;
42 }
43 
44 
45 WorkQueue::~WorkQueue()
46 {
47 	release_sem(fThreadCancel);
48 
49 	status_t result;
50 	wait_for_thread(fThread, &result);
51 
52 	mutex_destroy(&fQueueLock);
53 	delete_sem(fThreadCancel);
54 	delete_sem(fQueueSemaphore);
55 }
56 
57 
58 status_t
59 WorkQueue::EnqueueJob(JobType type, void* args)
60 {
61 	WorkQueueEntry* entry = new(std::nothrow) WorkQueueEntry;
62 	if (entry == NULL)
63 		return B_NO_MEMORY;
64 
65 	entry->fType = type;
66 	entry->fArguments = args;
67 	if (type == IORequest)
68 		reinterpret_cast<IORequestArgs*>(args)->fInode->BeginAIOOp();
69 
70 	MutexLocker locker(fQueueLock);
71 	fQueue.InsertAfter(fQueue.Tail(), entry);
72 	locker.Unlock();
73 
74 	release_sem(fQueueSemaphore);
75 	return B_OK;
76 }
77 
78 
79 status_t
80 WorkQueue::LaunchWorkingThread(void* object)
81 {
82 	ASSERT(object != NULL);
83 
84 	WorkQueue* queue = reinterpret_cast<WorkQueue*>(object);
85 	return queue->WorkingThread();
86 }
87 
88 
89 status_t
90 WorkQueue::WorkingThread()
91 {
92 	while (true) {
93 		object_wait_info object[2];
94 		object[0].object = fThreadCancel;
95 		object[0].type = B_OBJECT_TYPE_SEMAPHORE;
96 		object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
97 
98 		object[1].object = fQueueSemaphore;
99 		object[1].type = B_OBJECT_TYPE_SEMAPHORE;
100 		object[1].events = B_EVENT_ACQUIRE_SEMAPHORE;
101 
102 		status_t result = wait_for_objects(object, 2);
103 
104 		if (result < B_OK
105 			|| (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) {
106 			return result;
107 		} else if ((object[1].events & B_EVENT_ACQUIRE_SEMAPHORE) == 0)
108 			continue;
109 
110 		acquire_sem(fQueueSemaphore);
111 
112 		DequeueJob();
113 	}
114 
115 	return B_OK;
116 }
117 
118 
119 void
120 WorkQueue::DequeueJob()
121 {
122 	MutexLocker locker(fQueueLock);
123 	WorkQueueEntry* entry = fQueue.RemoveHead();
124 	locker.Unlock();
125 	ASSERT(entry != NULL);
126 
127 	void* args = entry->fArguments;
128 	switch (entry->fType) {
129 		case DelegationRecall:
130 			JobRecall(reinterpret_cast<DelegationRecallArgs*>(args));
131 			break;
132 		case IORequest:
133 			JobIO(reinterpret_cast<IORequestArgs*>(args));
134 			break;
135 	}
136 
137 	delete entry;
138 }
139 
140 
141 void
142 WorkQueue::JobRecall(DelegationRecallArgs* args)
143 {
144 	ASSERT(args != NULL);
145 	args->fDelegation->GetInode()->RecallDelegation(args->fTruncate);
146 }
147 
148 
149 void
150 WorkQueue::JobIO(IORequestArgs* args)
151 {
152 	ASSERT(args != NULL);
153 
154 	uint64 offset = io_request_offset(args->fRequest);
155 	uint64 length = io_request_length(args->fRequest);
156 
157 	size_t bufferLength = min_c(MAX_BUFFER_SIZE, length);
158 	char* buffer = reinterpret_cast<char*>(malloc(bufferLength));
159 	if (buffer == NULL) {
160 		notify_io_request(args->fRequest, B_NO_MEMORY);
161 		args->fInode->EndAIOOp();
162 		return;
163 	}
164 
165 	status_t result;
166 	if (io_request_is_write(args->fRequest)) {
167 		if (offset + length > args->fInode->MaxFileSize())
168 				length = args->fInode->MaxFileSize() - offset;
169 
170 		uint64 position = 0;
171 		do {
172 			size_t size = 0;
173 			size_t thisBufferLength = min_c(bufferLength, length - position);
174 
175 			result = read_from_io_request(args->fRequest, buffer,
176 				thisBufferLength);
177 
178 			while (size < thisBufferLength && result == B_OK) {
179 				size_t bytesWritten = thisBufferLength - size;
180 				result = args->fInode->WriteDirect(NULL,
181 					offset + position + size, buffer + size, &bytesWritten);
182 				size += bytesWritten;
183 			}
184 
185 			position += thisBufferLength;
186 		} while (position < length && result == B_OK);
187 	} else {
188 		bool eof = false;
189 		uint64 position = 0;
190 		do {
191 			size_t size = 0;
192 			size_t thisBufferLength = min_c(bufferLength, length - position);
193 
194 			do {
195 				size_t bytesRead = thisBufferLength - size;
196 				result = args->fInode->ReadDirect(NULL,
197 					offset + position + size, buffer + size, &bytesRead, &eof);
198 				if (result != B_OK)
199 					break;
200 
201 				result = write_to_io_request(args->fRequest, buffer + size,
202 					bytesRead);
203 				if (result != B_OK)
204 					break;
205 
206 				size += bytesRead;
207 			} while (size < length && result == B_OK && !eof);
208 
209 			position += thisBufferLength;
210 		} while (position < length && result == B_OK && !eof);
211 	}
212 
213 	free(buffer);
214 
215 	notify_io_request(args->fRequest, result);
216 	args->fInode->EndAIOOp();
217 }
218 
219