1 /*
2 * Copyright 2012-2013 Haiku, Inc. All rights reserved.
3 * Distributed under the terms of the MIT License.
4 *
5 * Authors:
6 * Paweł Dziepak, pdziepak@quarnos.org
7 */
8
9
10 #include "RPCCallbackServer.h"
11
12 #include "NFS4Defs.h"
13 #include "RPCCallback.h"
14 #include "RPCCallbackReply.h"
15 #include "RPCCallbackRequest.h"
16 #include "RPCServer.h"
17
18
19 using namespace RPC;
20
21
22 CallbackServer* gRPCCallbackServer = NULL;
23 CallbackServer* gRPCCallbackServer6 = NULL;
24
25
CallbackServer(int networkFamily)26 CallbackServer::CallbackServer(int networkFamily)
27 :
28 fConnectionList(NULL),
29 fListener(NULL),
30 fThreadRunning(false),
31 fCallbackArray(NULL),
32 fArraySize(0),
33 fFreeSlot(-1),
34 fNetworkFamily(networkFamily)
35 {
36 mutex_init(&fConnectionLock, NULL);
37 mutex_init(&fThreadLock, NULL);
38 rw_lock_init(&fArrayLock, NULL);
39 }
40
41
~CallbackServer()42 CallbackServer::~CallbackServer()
43 {
44 StopServer();
45
46 free(fCallbackArray);
47 rw_lock_destroy(&fArrayLock);
48 mutex_destroy(&fThreadLock);
49 mutex_destroy(&fConnectionLock);
50 }
51
52
53 CallbackServer*
Get(Server * server)54 CallbackServer::Get(Server* server)
55 {
56 ASSERT(server != NULL);
57
58 int family = server->ID().Family();
59 ASSERT(family == AF_INET || family == AF_INET6);
60
61 int idx;
62 switch (family) {
63 case AF_INET:
64 idx = 0;
65 break;
66 case AF_INET6:
67 idx = 1;
68 break;
69 default:
70 return NULL;
71 }
72
73 MutexLocker _(fServerCreationLock);
74 if (fServers[idx] == NULL)
75 fServers[idx] = new CallbackServer(family);
76 return fServers[idx];
77 }
78
79
80 void
ShutdownAll()81 CallbackServer::ShutdownAll()
82 {
83 MutexLocker _(fServerCreationLock);
84 for (unsigned int i = 0; i < sizeof(fServers) / sizeof(fServers[0]); i++)
85 delete fServers[i];
86 memset(&fServers, 0, sizeof(fServers));
87 }
88
89
90 mutex CallbackServer::fServerCreationLock = MUTEX_INITIALIZER(NULL);
91 CallbackServer* CallbackServer::fServers[2] = { NULL, NULL };
92
93
94 status_t
RegisterCallback(Callback * callback)95 CallbackServer::RegisterCallback(Callback* callback)
96 {
97 ASSERT(callback != NULL);
98
99 status_t result = StartServer();
100 if (result != B_OK)
101 return result;
102
103 WriteLocker _(fArrayLock);
104 if (fFreeSlot == -1) {
105 uint32 newSize = max_c(fArraySize * 2, 4);
106 uint32 size = newSize * sizeof(CallbackSlot);
107 CallbackSlot* array = reinterpret_cast<CallbackSlot*>(malloc(size));
108 if (array == NULL)
109 return B_NO_MEMORY;
110
111 if (fCallbackArray != NULL)
112 memcpy(array, fCallbackArray, fArraySize * sizeof(CallbackSlot));
113
114 for (uint32 i = fArraySize; i < newSize; i++)
115 array[i].fNext = i + 1;
116
117 array[newSize - 1].fNext = -1;
118
119 fCallbackArray = array;
120 fFreeSlot = fArraySize;
121 fArraySize = newSize;
122 }
123
124 int32 id = fFreeSlot;
125 fFreeSlot = fCallbackArray[id].fNext;
126
127 fCallbackArray[id].fCallback = callback;
128 callback->SetID(id);
129 callback->SetCBServer(this);
130
131 return B_OK;
132 }
133
134
135 status_t
UnregisterCallback(Callback * callback)136 CallbackServer::UnregisterCallback(Callback* callback)
137 {
138 ASSERT(callback != NULL);
139 ASSERT(callback->CBServer() == this);
140
141 int32 id = callback->ID();
142
143 WriteLocker _(fArrayLock);
144 fCallbackArray[id].fNext = fFreeSlot;
145 fFreeSlot = id;
146
147 callback->SetCBServer(NULL);
148 return B_OK;
149 }
150
151
152 status_t
StartServer()153 CallbackServer::StartServer()
154 {
155 MutexLocker _(fThreadLock);
156 if (fThreadRunning)
157 return B_OK;
158
159 status_t result = ConnectionListener::Listen(&fListener, fNetworkFamily);
160 if (result != B_OK)
161 return result;
162
163 fThread = spawn_kernel_thread(&CallbackServer::ListenerThreadLauncher,
164 "NFSv4 Callback Listener", B_NORMAL_PRIORITY, this);
165 if (fThread < B_OK)
166 return fThread;
167
168 fThreadRunning = true;
169
170 result = resume_thread(fThread);
171 if (result != B_OK) {
172 kill_thread(fThread);
173 fThreadRunning = false;
174 return result;
175 }
176
177 return B_OK;
178 }
179
180
181 status_t
StopServer()182 CallbackServer::StopServer()
183 {
184 MutexLocker _(&fThreadLock);
185 if (!fThreadRunning)
186 return B_OK;
187
188 fListener->Disconnect();
189 status_t result;
190 wait_for_thread(fThread, &result);
191
192 MutexLocker locker(fConnectionLock);
193 while (fConnectionList != NULL) {
194 ConnectionEntry* entry = fConnectionList;
195 fConnectionList = entry->fNext;
196 entry->fConnection->Disconnect();
197
198 status_t result;
199 wait_for_thread(entry->fThread, &result);
200
201 delete entry->fConnection;
202 delete entry;
203 }
204
205 delete fListener;
206
207 fThreadRunning = false;
208 return B_OK;
209 }
210
211
212 status_t
NewConnection(Connection * connection)213 CallbackServer::NewConnection(Connection* connection)
214 {
215 ASSERT(connection != NULL);
216
217 ConnectionEntry* entry = new ConnectionEntry;
218 entry->fConnection = connection;
219 entry->fPrev = NULL;
220
221 MutexLocker locker(fConnectionLock);
222 entry->fNext = fConnectionList;
223 if (fConnectionList != NULL)
224 fConnectionList->fPrev = entry;
225 fConnectionList = entry;
226 locker.Unlock();
227
228 void** arguments = reinterpret_cast<void**>(malloc(sizeof(void*) * 2));
229 if (arguments == NULL)
230 return B_NO_MEMORY;
231
232 arguments[0] = this;
233 arguments[1] = entry;
234
235 thread_id thread;
236 thread = spawn_kernel_thread(&CallbackServer::ConnectionThreadLauncher,
237 "NFSv4 Callback Connection", B_NORMAL_PRIORITY, arguments);
238 if (thread < B_OK) {
239 ReleaseConnection(entry);
240 free(arguments);
241 return thread;
242 }
243
244 entry->fThread = thread;
245
246 status_t result = resume_thread(thread);
247 if (result != B_OK) {
248 kill_thread(thread);
249 ReleaseConnection(entry);
250 free(arguments);
251 return result;
252 }
253
254 return B_OK;
255 }
256
257
258 status_t
ReleaseConnection(ConnectionEntry * entry)259 CallbackServer::ReleaseConnection(ConnectionEntry* entry)
260 {
261 ASSERT(entry != NULL);
262
263 MutexLocker _(fConnectionLock);
264 if (entry->fNext != NULL)
265 entry->fNext->fPrev = entry->fPrev;
266 if (entry->fPrev != NULL)
267 entry->fPrev->fNext = entry->fNext;
268 else
269 fConnectionList = entry->fNext;
270
271 delete entry->fConnection;
272 delete entry;
273 return B_OK;
274 }
275
276
277 status_t
ConnectionThreadLauncher(void * object)278 CallbackServer::ConnectionThreadLauncher(void* object)
279 {
280 ASSERT(object != NULL);
281
282 void** objects = reinterpret_cast<void**>(object);
283 CallbackServer* server = reinterpret_cast<CallbackServer*>(objects[0]);
284 ConnectionEntry* entry = reinterpret_cast<ConnectionEntry*>(objects[1]);
285 free(objects);
286
287 return server->ConnectionThread(entry);
288 }
289
290
291 status_t
ConnectionThread(ConnectionEntry * entry)292 CallbackServer::ConnectionThread(ConnectionEntry* entry)
293 {
294 ASSERT(entry != NULL);
295
296 Connection* connection = entry->fConnection;
297 CallbackReply* reply;
298
299 while (fThreadRunning) {
300 uint32 size;
301 void* buffer;
302 status_t result = connection->Receive(&buffer, &size);
303 if (result != B_OK) {
304 if (result != ECONNABORTED)
305 ReleaseConnection(entry);
306 return result;
307 }
308
309 CallbackRequest* request
310 = new(std::nothrow) CallbackRequest(buffer, size);
311 if (request == NULL) {
312 free(buffer);
313 continue;
314 } else if (request->Error() != B_OK) {
315 reply = CallbackReply::Create(request->XID(), request->RPCError());
316 if (reply != NULL) {
317 connection->Send(reply->Stream().Buffer(),
318 reply->Stream().Size());
319 delete reply;
320 }
321 delete request;
322 continue;
323 }
324
325 switch (request->Procedure()) {
326 case CallbackProcCompound:
327 GetCallback(request->ID())->EnqueueRequest(request, connection);
328 break;
329
330 case CallbackProcNull:
331 reply = CallbackReply::Create(request->XID());
332 if (reply != NULL) {
333 connection->Send(reply->Stream().Buffer(),
334 reply->Stream().Size());
335 delete reply;
336 }
337
338 default:
339 delete request;
340 }
341 }
342
343 return B_OK;
344 }
345
346
347 status_t
ListenerThreadLauncher(void * object)348 CallbackServer::ListenerThreadLauncher(void* object)
349 {
350 ASSERT(object != NULL);
351
352 CallbackServer* server = reinterpret_cast<CallbackServer*>(object);
353 return server->ListenerThread();
354 }
355
356
357 status_t
ListenerThread()358 CallbackServer::ListenerThread()
359 {
360 while (fThreadRunning) {
361 Connection* connection;
362
363 status_t result = fListener->AcceptConnection(&connection);
364 if (result != B_OK) {
365 fThreadRunning = false;
366 return result;
367 }
368 result = NewConnection(connection);
369 if (result != B_OK)
370 delete connection;
371 }
372
373 return B_OK;
374 }
375
376