xref: /haiku/src/add-ons/kernel/file_systems/nfs4/RPCCallbackServer.cpp (revision 040a81419dda83d1014e9dc94936a4cb3f027303)
1 /*
2  * Copyright 2012-2013 Haiku, Inc. All rights reserved.
3  * Distributed under the terms of the MIT License.
4  *
5  * Authors:
6  *		Paweł Dziepak, pdziepak@quarnos.org
7  */
8 
9 
10 #include "RPCCallbackServer.h"
11 
12 #include "NFS4Defs.h"
13 #include "RPCCallback.h"
14 #include "RPCCallbackReply.h"
15 #include "RPCCallbackRequest.h"
16 #include "RPCServer.h"
17 
18 
19 using namespace RPC;
20 
21 
22 CallbackServer* gRPCCallbackServer		= NULL;
23 CallbackServer* gRPCCallbackServer6		= NULL;
24 
25 
26 CallbackServer::CallbackServer(int networkFamily)
27 	:
28 	fConnectionList(NULL),
29 	fListener(NULL),
30 	fThreadRunning(false),
31 	fCallbackArray(NULL),
32 	fArraySize(0),
33 	fFreeSlot(-1),
34 	fNetworkFamily(networkFamily)
35 {
36 	mutex_init(&fConnectionLock, NULL);
37 	mutex_init(&fThreadLock, NULL);
38 	rw_lock_init(&fArrayLock, NULL);
39 }
40 
41 
42 CallbackServer::~CallbackServer()
43 {
44 	StopServer();
45 
46 	free(fCallbackArray);
47 	rw_lock_destroy(&fArrayLock);
48 	mutex_destroy(&fThreadLock);
49 	mutex_destroy(&fConnectionLock);
50 }
51 
52 
53 CallbackServer*
54 CallbackServer::Get(Server* server)
55 {
56 	ASSERT(server != NULL);
57 
58 	int family = server->ID().Family();
59 	ASSERT(family == AF_INET || family == AF_INET6);
60 
61 	int idx;
62 	switch (family) {
63 		case AF_INET:
64 			idx = 0;
65 			break;
66 		case AF_INET6:
67 			idx = 1;
68 			break;
69 		default:
70 			return NULL;
71 	}
72 
73 	MutexLocker _(fServerCreationLock);
74 	if (fServers[idx] == NULL)
75 		fServers[idx] = new CallbackServer(family);
76 	return fServers[idx];
77 }
78 
79 
80 void
81 CallbackServer::ShutdownAll()
82 {
83 	MutexLocker _(fServerCreationLock);
84 	for (unsigned int i = 0; i < sizeof(fServers) / sizeof(fServers[0]); i++)
85 		delete fServers[i];
86 	memset(&fServers, 0, sizeof(fServers));
87 }
88 
89 
90 mutex			CallbackServer::fServerCreationLock = MUTEX_INITIALIZER(NULL);
91 CallbackServer*	CallbackServer::fServers[2] = { NULL, NULL };
92 
93 
94 status_t
95 CallbackServer::RegisterCallback(Callback* callback)
96 {
97 	ASSERT(callback != NULL);
98 
99 	status_t result = StartServer();
100 	if (result != B_OK)
101 		return result;
102 
103 	WriteLocker _(fArrayLock);
104 	if (fFreeSlot == -1) {
105 		uint32 newSize = max_c(fArraySize * 2, 4);
106 		uint32 size = newSize * sizeof(CallbackSlot);
107 		CallbackSlot* array	= reinterpret_cast<CallbackSlot*>(malloc(size));
108 		if (array == NULL)
109 			return B_NO_MEMORY;
110 
111 		if (fCallbackArray != NULL)
112 			memcpy(array, fCallbackArray, fArraySize * sizeof(CallbackSlot));
113 
114 		for (uint32 i = fArraySize; i < newSize; i++)
115 			array[i].fNext = i + 1;
116 
117 		array[newSize - 1].fNext = -1;
118 
119 		fCallbackArray = array;
120 		fFreeSlot = fArraySize;
121 		fArraySize = newSize;
122 	}
123 
124 	int32 id = fFreeSlot;
125 	fFreeSlot = fCallbackArray[id].fNext;
126 
127 	fCallbackArray[id].fCallback = callback;
128 	callback->SetID(id);
129 	callback->SetCBServer(this);
130 
131 	return B_OK;
132 }
133 
134 
135 status_t
136 CallbackServer::UnregisterCallback(Callback* callback)
137 {
138 	ASSERT(callback != NULL);
139 	ASSERT(callback->CBServer() == this);
140 
141 	int32 id = callback->ID();
142 
143 	WriteLocker _(fArrayLock);
144 	fCallbackArray[id].fNext = fFreeSlot;
145 	fFreeSlot = id;
146 
147 	callback->SetCBServer(NULL);
148 	return B_OK;
149 }
150 
151 
152 status_t
153 CallbackServer::StartServer()
154 {
155 	MutexLocker _(fThreadLock);
156 	if (fThreadRunning)
157 		return B_OK;
158 
159 	status_t result = ConnectionListener::Listen(&fListener, fNetworkFamily);
160 	if (result != B_OK)
161 		return result;
162 
163 	fThread = spawn_kernel_thread(&CallbackServer::ListenerThreadLauncher,
164 		"NFSv4 Callback Listener", B_NORMAL_PRIORITY, this);
165 	if (fThread < B_OK)
166 		return fThread;
167 
168 	fThreadRunning = true;
169 
170 	result = resume_thread(fThread);
171 	if (result != B_OK) {
172 		kill_thread(fThread);
173 		fThreadRunning = false;
174 		return result;
175 	}
176 
177 	return B_OK;
178 }
179 
180 
181 status_t
182 CallbackServer::StopServer()
183 {
184 	MutexLocker _(&fThreadLock);
185 	if (!fThreadRunning)
186 		return B_OK;
187 
188 	fListener->Disconnect();
189 	status_t result;
190 	wait_for_thread(fThread, &result);
191 
192 	MutexLocker locker(fConnectionLock);
193 	while (fConnectionList != NULL) {
194 		ConnectionEntry* entry = fConnectionList;
195 		fConnectionList = entry->fNext;
196 		entry->fConnection->Disconnect();
197 
198 		status_t result;
199 		wait_for_thread(entry->fThread, &result);
200 
201 		delete entry->fConnection;
202 		delete entry;
203 	}
204 
205 	delete fListener;
206 
207 	fThreadRunning = false;
208 	return B_OK;
209 }
210 
211 
212 status_t
213 CallbackServer::NewConnection(Connection* connection)
214 {
215 	ASSERT(connection != NULL);
216 
217 	ConnectionEntry* entry = new ConnectionEntry;
218 	entry->fConnection = connection;
219 	entry->fPrev = NULL;
220 
221 	MutexLocker locker(fConnectionLock);
222 	entry->fNext = fConnectionList;
223 	if (fConnectionList != NULL)
224 		fConnectionList->fPrev = entry;
225 	fConnectionList = entry;
226 	locker.Unlock();
227 
228 	void** arguments = reinterpret_cast<void**>(malloc(sizeof(void*) * 2));
229 	if (arguments == NULL)
230 		return B_NO_MEMORY;
231 
232 	arguments[0] = this;
233 	arguments[1] = entry;
234 
235 	thread_id thread;
236 	thread = spawn_kernel_thread(&CallbackServer::ConnectionThreadLauncher,
237 		"NFSv4 Callback Connection", B_NORMAL_PRIORITY, arguments);
238 	if (thread < B_OK) {
239 		ReleaseConnection(entry);
240 		free(arguments);
241 		return thread;
242 	}
243 
244 	entry->fThread = thread;
245 
246 	status_t result = resume_thread(thread);
247 	if (result != B_OK) {
248 		kill_thread(thread);
249 		ReleaseConnection(entry);
250 		free(arguments);
251 		return result;
252 	}
253 
254 	return B_OK;
255 }
256 
257 
258 status_t
259 CallbackServer::ReleaseConnection(ConnectionEntry* entry)
260 {
261 	ASSERT(entry != NULL);
262 
263 	MutexLocker _(fConnectionLock);
264 	if (entry->fNext != NULL)
265 		entry->fNext->fPrev = entry->fPrev;
266 	if (entry->fPrev != NULL)
267 		entry->fPrev->fNext = entry->fNext;
268 	else
269 		fConnectionList = entry->fNext;
270 
271 	delete entry->fConnection;
272 	delete entry;
273 	return B_OK;
274 }
275 
276 
277 status_t
278 CallbackServer::ConnectionThreadLauncher(void* object)
279 {
280 	ASSERT(object != NULL);
281 
282 	void** objects = reinterpret_cast<void**>(object);
283 	CallbackServer* server = reinterpret_cast<CallbackServer*>(objects[0]);
284 	ConnectionEntry* entry = reinterpret_cast<ConnectionEntry*>(objects[1]);
285 	free(objects);
286 
287 	return server->ConnectionThread(entry);
288 }
289 
290 
291 status_t
292 CallbackServer::ConnectionThread(ConnectionEntry* entry)
293 {
294 	ASSERT(entry != NULL);
295 
296 	Connection* connection = entry->fConnection;
297 	CallbackReply* reply;
298 
299 	while (fThreadRunning) {
300 		uint32 size;
301 		void* buffer;
302 		status_t result = connection->Receive(&buffer, &size);
303 		if (result != B_OK) {
304 			if (result != ECONNABORTED)
305 				ReleaseConnection(entry);
306 			return result;
307 		}
308 
309 		CallbackRequest* request
310 			= new(std::nothrow) CallbackRequest(buffer, size);
311 		if (request == NULL) {
312 			free(buffer);
313 			continue;
314 		} else if (request->Error() != B_OK) {
315 			reply = CallbackReply::Create(request->XID(), request->RPCError());
316 			if (reply != NULL) {
317 				connection->Send(reply->Stream().Buffer(),
318 					reply->Stream().Size());
319 				delete reply;
320 			}
321 			delete request;
322 			continue;
323 		}
324 
325 		switch (request->Procedure()) {
326 			case CallbackProcCompound:
327 				GetCallback(request->ID())->EnqueueRequest(request, connection);
328 				break;
329 
330 			case CallbackProcNull:
331 				reply = CallbackReply::Create(request->XID());
332 				if (reply != NULL) {
333 					connection->Send(reply->Stream().Buffer(),
334 						reply->Stream().Size());
335 					delete reply;
336 				}
337 
338 			default:
339 				delete request;
340 		}
341 	}
342 
343 	return B_OK;
344 }
345 
346 
347 status_t
348 CallbackServer::ListenerThreadLauncher(void* object)
349 {
350 	ASSERT(object != NULL);
351 
352 	CallbackServer* server = reinterpret_cast<CallbackServer*>(object);
353 	return server->ListenerThread();
354 }
355 
356 
357 status_t
358 CallbackServer::ListenerThread()
359 {
360 	while (fThreadRunning) {
361 		Connection* connection;
362 
363 		status_t result = fListener->AcceptConnection(&connection);
364 		if (result != B_OK) {
365 			fThreadRunning = false;
366 			return result;
367 		}
368 		result = NewConnection(connection);
369 		if (result != B_OK)
370 			delete connection;
371 	}
372 
373 	return B_OK;
374 }
375 
376