xref: /haiku/src/add-ons/kernel/file_systems/nfs4/WorkQueue.cpp (revision 45cd64efd9124013b28ce9c232f4d9c42df6cd56)
1 /*
2  * Copyright 2012 Haiku, Inc. All rights reserved.
3  * Distributed under the terms of the MIT License.
4  *
5  * Authors:
6  *		Paweł Dziepak, pdziepak@quarnos.org
7  */
8 
9 
10 #include "WorkQueue.h"
11 
12 #include <io_requests.h>
13 
14 
15 WorkQueue*		gWorkQueue		= NULL;
16 
17 
18 WorkQueue::WorkQueue()
19 	:
20 	fQueueSemaphore(create_sem(0, NULL)),
21 	fThreadCancel(create_sem(0, NULL))
22 {
23 	mutex_init(&fQueueLock, NULL);
24 
25 	fThread = spawn_kernel_thread(&WorkQueue::LaunchWorkingThread,
26 		"NFSv4 Work Queue", B_NORMAL_PRIORITY, this);
27 	if (fThread < B_OK) {
28 		fInitError = fThread;
29 		return;
30 	}
31 
32 	status_t result = resume_thread(fThread);
33 	if (result != B_OK) {
34 		kill_thread(fThread);
35 		fInitError = result;
36 		return;
37 	}
38 
39 	fInitError = B_OK;
40 }
41 
42 
43 WorkQueue::~WorkQueue()
44 {
45 	release_sem(fThreadCancel);
46 
47 	status_t result;
48 	wait_for_thread(fThread, &result);
49 
50 	mutex_destroy(&fQueueLock);
51 	delete_sem(fThreadCancel);
52 	delete_sem(fQueueSemaphore);
53 }
54 
55 
56 status_t
57 WorkQueue::EnqueueJob(JobType type, void* args)
58 {
59 	WorkQueueEntry* entry = new(std::nothrow) WorkQueueEntry;
60 	if (entry == NULL)
61 		return B_NO_MEMORY;
62 
63 	entry->fType = type;
64 	entry->fArguments = args;
65 	MutexLocker locker(fQueueLock);
66 	fQueue.InsertAfter(fQueue.Tail(), entry);
67 	locker.Unlock();
68 
69 	release_sem(fQueueSemaphore);
70 	return B_OK;
71 }
72 
73 
74 status_t
75 WorkQueue::LaunchWorkingThread(void* object)
76 {
77 	WorkQueue* queue = reinterpret_cast<WorkQueue*>(object);
78 	return queue->WorkingThread();
79 }
80 
81 
82 status_t
83 WorkQueue::WorkingThread()
84 {
85 	while (true) {
86 		object_wait_info object[2];
87 		object[0].object = fThreadCancel;
88 		object[0].type = B_OBJECT_TYPE_SEMAPHORE;
89 		object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
90 
91 		object[1].object = fQueueSemaphore;
92 		object[1].type = B_OBJECT_TYPE_SEMAPHORE;
93 		object[1].events = B_EVENT_ACQUIRE_SEMAPHORE;
94 
95 		status_t result = wait_for_objects(object, 2);
96 
97 		if (result < B_OK ||
98 			(object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) {
99 			return result;
100 		} else if ((object[1].events & B_EVENT_ACQUIRE_SEMAPHORE) == 0)
101 			continue;
102 
103 		acquire_sem(fQueueSemaphore);
104 
105 		DequeueJob();
106 	}
107 
108 	return B_OK;
109 }
110 
111 
112 void
113 WorkQueue::DequeueJob()
114 {
115 	MutexLocker locker(fQueueLock);
116 
117 	WorkQueueEntry* entry = fQueue.RemoveHead();
118 
119 	void* args = entry->fArguments;
120 	switch (entry->fType) {
121 		case DelegationRecall:
122 			JobRecall(reinterpret_cast<DelegationRecallArgs*>(args));
123 			break;
124 		case IORequest:
125 			JobIO(reinterpret_cast<IORequestArgs*>(args));
126 			break;
127 	}
128 
129 	delete entry;
130 }
131 
132 
133 void
134 WorkQueue::JobRecall(DelegationRecallArgs* args)
135 {
136 	args->fDelegation->GetInode()->RecallDelegation(args->fTruncate);
137 }
138 
139 
140 void
141 WorkQueue::JobIO(IORequestArgs* args)
142 {
143 	uint64 offset = io_request_offset(args->fRequest);
144 	uint64 length = io_request_length(args->fRequest);
145 
146 	char* buffer = reinterpret_cast<char*>(malloc(length));
147 	if (buffer == NULL) {
148 		notify_io_request(args->fRequest, B_NO_MEMORY);
149 		return;
150 	}
151 
152 	bool eof = false;
153 	uint64 size = 0;
154 	status_t result;
155 	if (io_request_is_write(args->fRequest)) {
156 		if (offset + length > args->fInode->MaxFileSize())
157 				length = args->fInode->MaxFileSize() - offset;
158 
159 		result = read_from_io_request(args->fRequest, buffer, length);
160 		do {
161 			size_t bytesWritten = length - size;
162 			result = args->fInode->WriteDirect(NULL, offset + size,
163 				buffer + size, &bytesWritten);
164 			size += bytesWritten;
165 		} while (size < length && result == B_OK);
166 	} else {
167 		do {
168 			size_t bytesRead = length - size;
169 			result = args->fInode->ReadDirect(NULL, offset + size, buffer,
170 				&bytesRead, &eof);
171 			if (result != B_OK)
172 				break;
173 
174 			result = write_to_io_request(args->fRequest, buffer, bytesRead);
175 			if (result != B_OK)
176 				break;
177 
178 			size += bytesRead;
179 		} while (size < length && result == B_OK && !eof);
180 
181 	}
182 	free(buffer);
183 
184 	notify_io_request(args->fRequest, result);
185 }
186 
187