xref: /haiku/src/add-ons/kernel/file_systems/nfs4/WorkQueue.cpp (revision 2f44d34e5e696fe1fde8f4a8ee2728664984db04)
1 /*
2  * Copyright 2012 Haiku, Inc. All rights reserved.
3  * Distributed under the terms of the MIT License.
4  *
5  * Authors:
6  *		Paweł Dziepak, pdziepak@quarnos.org
7  */
8 
9 
10 #include "WorkQueue.h"
11 
12 #include <io_requests.h>
13 
14 
15 WorkQueue*		gWorkQueue		= NULL;
16 
17 
18 WorkQueue::WorkQueue()
19 	:
20 	fQueueSemaphore(create_sem(0, NULL)),
21 	fThreadCancel(create_sem(0, NULL))
22 {
23 	mutex_init(&fQueueLock, NULL);
24 
25 	fThread = spawn_kernel_thread(&WorkQueue::LaunchWorkingThread,
26 		"NFSv4 Work Queue", B_NORMAL_PRIORITY, this);
27 	if (fThread < B_OK) {
28 		fInitError = fThread;
29 		return;
30 	}
31 
32 	status_t result = resume_thread(fThread);
33 	if (result != B_OK) {
34 		kill_thread(fThread);
35 		fInitError = result;
36 		return;
37 	}
38 
39 	fInitError = B_OK;
40 }
41 
42 
43 WorkQueue::~WorkQueue()
44 {
45 	release_sem(fThreadCancel);
46 
47 	status_t result;
48 	wait_for_thread(fThread, &result);
49 
50 	mutex_destroy(&fQueueLock);
51 	delete_sem(fThreadCancel);
52 	delete_sem(fQueueSemaphore);
53 }
54 
55 
56 status_t
57 WorkQueue::EnqueueJob(JobType type, void* args)
58 {
59 	WorkQueueEntry* entry = new(std::nothrow) WorkQueueEntry;
60 	if (entry == NULL)
61 		return B_NO_MEMORY;
62 
63 	entry->fType = type;
64 	entry->fArguments = args;
65 	MutexLocker locker(fQueueLock);
66 	fQueue.InsertAfter(fQueue.Tail(), entry);
67 	locker.Unlock();
68 
69 	release_sem(fQueueSemaphore);
70 	return B_OK;
71 }
72 
73 
74 status_t
75 WorkQueue::LaunchWorkingThread(void* object)
76 {
77 	ASSERT(object != NULL);
78 
79 	WorkQueue* queue = reinterpret_cast<WorkQueue*>(object);
80 	return queue->WorkingThread();
81 }
82 
83 
84 status_t
85 WorkQueue::WorkingThread()
86 {
87 	while (true) {
88 		object_wait_info object[2];
89 		object[0].object = fThreadCancel;
90 		object[0].type = B_OBJECT_TYPE_SEMAPHORE;
91 		object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
92 
93 		object[1].object = fQueueSemaphore;
94 		object[1].type = B_OBJECT_TYPE_SEMAPHORE;
95 		object[1].events = B_EVENT_ACQUIRE_SEMAPHORE;
96 
97 		status_t result = wait_for_objects(object, 2);
98 
99 		if (result < B_OK
100 			|| (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) {
101 			return result;
102 		} else if ((object[1].events & B_EVENT_ACQUIRE_SEMAPHORE) == 0)
103 			continue;
104 
105 		acquire_sem(fQueueSemaphore);
106 
107 		DequeueJob();
108 	}
109 
110 	return B_OK;
111 }
112 
113 
114 void
115 WorkQueue::DequeueJob()
116 {
117 	MutexLocker locker(fQueueLock);
118 	WorkQueueEntry* entry = fQueue.RemoveHead();
119 	locker.Unlock();
120 	ASSERT(entry != NULL);
121 
122 	void* args = entry->fArguments;
123 	switch (entry->fType) {
124 		case DelegationRecall:
125 			JobRecall(reinterpret_cast<DelegationRecallArgs*>(args));
126 			break;
127 		case IORequest:
128 			JobIO(reinterpret_cast<IORequestArgs*>(args));
129 			break;
130 	}
131 
132 	delete entry;
133 }
134 
135 
136 void
137 WorkQueue::JobRecall(DelegationRecallArgs* args)
138 {
139 	ASSERT(args != NULL);
140 	args->fDelegation->GetInode()->RecallDelegation(args->fTruncate);
141 }
142 
143 
144 void
145 WorkQueue::JobIO(IORequestArgs* args)
146 {
147 	ASSERT(args != NULL);
148 
149 	uint64 offset = io_request_offset(args->fRequest);
150 	uint64 length = io_request_length(args->fRequest);
151 
152 	char* buffer = reinterpret_cast<char*>(malloc(length));
153 	if (buffer == NULL) {
154 		notify_io_request(args->fRequest, B_NO_MEMORY);
155 		return;
156 	}
157 
158 	bool eof = false;
159 	uint64 size = 0;
160 	status_t result;
161 	if (io_request_is_write(args->fRequest)) {
162 		if (offset + length > args->fInode->MaxFileSize())
163 				length = args->fInode->MaxFileSize() - offset;
164 
165 		result = read_from_io_request(args->fRequest, buffer, length);
166 		do {
167 			size_t bytesWritten = length - size;
168 			result = args->fInode->WriteDirect(NULL, offset + size,
169 				buffer + size, &bytesWritten);
170 			size += bytesWritten;
171 		} while (size < length && result == B_OK);
172 	} else {
173 		do {
174 			size_t bytesRead = length - size;
175 			result = args->fInode->ReadDirect(NULL, offset + size, buffer,
176 				&bytesRead, &eof);
177 			if (result != B_OK)
178 				break;
179 
180 			result = write_to_io_request(args->fRequest, buffer, bytesRead);
181 			if (result != B_OK)
182 				break;
183 
184 			size += bytesRead;
185 		} while (size < length && result == B_OK && !eof);
186 
187 	}
188 	free(buffer);
189 
190 	notify_io_request(args->fRequest, result);
191 }
192 
193