xref: /haiku/src/add-ons/kernel/file_systems/nfs4/WorkQueue.cpp (revision 8b72ce26511afc3cc9984174a20f6786f0f93363)
1 /*
2  * Copyright 2012 Haiku, Inc. All rights reserved.
3  * Distributed under the terms of the MIT License.
4  *
5  * Authors:
6  *		Paweł Dziepak, pdziepak@quarnos.org
7  */
8 
9 
10 #include "WorkQueue.h"
11 
12 #include <io_requests.h>
13 
14 
15 WorkQueue*		gWorkQueue		= NULL;
16 
17 
18 WorkQueue::WorkQueue()
19 	:
20 	fQueueSemaphore(create_sem(0, NULL)),
21 	fThreadCancel(create_sem(0, NULL))
22 {
23 	mutex_init(&fQueueLock, NULL);
24 
25 	fThread = spawn_kernel_thread(&WorkQueue::LaunchWorkingThread,
26 		"NFSv4 Work Queue", B_NORMAL_PRIORITY, this);
27 	if (fThread < B_OK) {
28 		fInitError = fThread;
29 		return;
30 	}
31 
32 	status_t result = resume_thread(fThread);
33 	if (result != B_OK) {
34 		kill_thread(fThread);
35 		fInitError = result;
36 		return;
37 	}
38 
39 	fInitError = B_OK;
40 }
41 
42 
43 WorkQueue::~WorkQueue()
44 {
45 	release_sem(fThreadCancel);
46 
47 	status_t result;
48 	wait_for_thread(fThread, &result);
49 
50 	mutex_destroy(&fQueueLock);
51 	delete_sem(fThreadCancel);
52 	delete_sem(fQueueSemaphore);
53 }
54 
55 
56 status_t
57 WorkQueue::EnqueueJob(JobType type, void* args)
58 {
59 	WorkQueueEntry* entry = new(std::nothrow) WorkQueueEntry;
60 	if (entry == NULL)
61 		return B_NO_MEMORY;
62 
63 	entry->fType = type;
64 	entry->fArguments = args;
65 	if (type == IORequest)
66 		reinterpret_cast<IORequestArgs*>(args)->fInode->BeginAIOOp();
67 
68 	MutexLocker locker(fQueueLock);
69 	fQueue.InsertAfter(fQueue.Tail(), entry);
70 	locker.Unlock();
71 
72 	release_sem(fQueueSemaphore);
73 	return B_OK;
74 }
75 
76 
77 status_t
78 WorkQueue::LaunchWorkingThread(void* object)
79 {
80 	ASSERT(object != NULL);
81 
82 	WorkQueue* queue = reinterpret_cast<WorkQueue*>(object);
83 	return queue->WorkingThread();
84 }
85 
86 
87 status_t
88 WorkQueue::WorkingThread()
89 {
90 	while (true) {
91 		object_wait_info object[2];
92 		object[0].object = fThreadCancel;
93 		object[0].type = B_OBJECT_TYPE_SEMAPHORE;
94 		object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
95 
96 		object[1].object = fQueueSemaphore;
97 		object[1].type = B_OBJECT_TYPE_SEMAPHORE;
98 		object[1].events = B_EVENT_ACQUIRE_SEMAPHORE;
99 
100 		status_t result = wait_for_objects(object, 2);
101 
102 		if (result < B_OK
103 			|| (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) {
104 			return result;
105 		} else if ((object[1].events & B_EVENT_ACQUIRE_SEMAPHORE) == 0)
106 			continue;
107 
108 		acquire_sem(fQueueSemaphore);
109 
110 		DequeueJob();
111 	}
112 
113 	return B_OK;
114 }
115 
116 
117 void
118 WorkQueue::DequeueJob()
119 {
120 	MutexLocker locker(fQueueLock);
121 	WorkQueueEntry* entry = fQueue.RemoveHead();
122 	locker.Unlock();
123 	ASSERT(entry != NULL);
124 
125 	void* args = entry->fArguments;
126 	switch (entry->fType) {
127 		case DelegationRecall:
128 			JobRecall(reinterpret_cast<DelegationRecallArgs*>(args));
129 			break;
130 		case IORequest:
131 			JobIO(reinterpret_cast<IORequestArgs*>(args));
132 			break;
133 	}
134 
135 	delete entry;
136 }
137 
138 
139 void
140 WorkQueue::JobRecall(DelegationRecallArgs* args)
141 {
142 	ASSERT(args != NULL);
143 	args->fDelegation->GetInode()->RecallDelegation(args->fTruncate);
144 }
145 
146 
147 void
148 WorkQueue::JobIO(IORequestArgs* args)
149 {
150 	ASSERT(args != NULL);
151 
152 	uint64 offset = io_request_offset(args->fRequest);
153 	uint64 length = io_request_length(args->fRequest);
154 
155 	char* buffer = reinterpret_cast<char*>(malloc(length));
156 	if (buffer == NULL) {
157 		notify_io_request(args->fRequest, B_NO_MEMORY);
158 		args->fInode->EndAIOOp();
159 		return;
160 	}
161 
162 	bool eof = false;
163 	uint64 size = 0;
164 	status_t result;
165 	if (io_request_is_write(args->fRequest)) {
166 		if (offset + length > args->fInode->MaxFileSize())
167 				length = args->fInode->MaxFileSize() - offset;
168 
169 		result = read_from_io_request(args->fRequest, buffer, length);
170 		do {
171 			size_t bytesWritten = length - size;
172 			result = args->fInode->WriteDirect(NULL, offset + size,
173 				buffer + size, &bytesWritten);
174 			size += bytesWritten;
175 		} while (size < length && result == B_OK);
176 	} else {
177 		do {
178 			size_t bytesRead = length - size;
179 			result = args->fInode->ReadDirect(NULL, offset + size, buffer,
180 				&bytesRead, &eof);
181 			if (result != B_OK)
182 				break;
183 
184 			result = write_to_io_request(args->fRequest, buffer, bytesRead);
185 			if (result != B_OK)
186 				break;
187 
188 			size += bytesRead;
189 		} while (size < length && result == B_OK && !eof);
190 
191 	}
192 	free(buffer);
193 
194 	notify_io_request(args->fRequest, result);
195 	args->fInode->EndAIOOp();
196 }
197 
198