1 /* 2 * Copyright 2012-2013 Haiku, Inc. All rights reserved. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Paweł Dziepak, pdziepak@quarnos.org 7 */ 8 9 10 #include "RPCCallbackServer.h" 11 12 #include "NFS4Defs.h" 13 #include "RPCCallback.h" 14 #include "RPCCallbackReply.h" 15 #include "RPCCallbackRequest.h" 16 #include "RPCServer.h" 17 18 19 using namespace RPC; 20 21 22 CallbackServer* gRPCCallbackServer = NULL; 23 CallbackServer* gRPCCallbackServer6 = NULL; 24 25 26 CallbackServer::CallbackServer(int networkFamily) 27 : 28 fConnectionList(NULL), 29 fListener(NULL), 30 fThreadRunning(false), 31 fCallbackArray(NULL), 32 fArraySize(0), 33 fFreeSlot(-1), 34 fNetworkFamily(networkFamily) 35 { 36 mutex_init(&fConnectionLock, NULL); 37 mutex_init(&fThreadLock, NULL); 38 rw_lock_init(&fArrayLock, NULL); 39 } 40 41 42 CallbackServer::~CallbackServer() 43 { 44 StopServer(); 45 46 free(fCallbackArray); 47 rw_lock_destroy(&fArrayLock); 48 mutex_destroy(&fThreadLock); 49 mutex_destroy(&fConnectionLock); 50 } 51 52 53 CallbackServer* 54 CallbackServer::Get(Server* server) 55 { 56 ASSERT(server != NULL); 57 58 int family = server->ID().Family(); 59 ASSERT(family == AF_INET || family == AF_INET6); 60 61 int idx; 62 switch (family) { 63 case AF_INET: 64 idx = 0; 65 break; 66 case AF_INET6: 67 idx = 1; 68 break; 69 default: 70 return NULL; 71 } 72 73 MutexLocker _(fServerCreationLock); 74 if (fServers[idx] == NULL) 75 fServers[idx] = new CallbackServer(family); 76 return fServers[idx]; 77 } 78 79 80 void 81 CallbackServer::ShutdownAll() 82 { 83 MutexLocker _(fServerCreationLock); 84 for (unsigned int i = 0; i < sizeof(fServers) / sizeof(fServers[0]); i++) 85 delete fServers[i]; 86 memset(&fServers, 0, sizeof(fServers)); 87 } 88 89 90 mutex CallbackServer::fServerCreationLock = MUTEX_INITIALIZER(NULL); 91 CallbackServer* CallbackServer::fServers[2] = { NULL, NULL }; 92 93 94 status_t 95 CallbackServer::RegisterCallback(Callback* callback) 96 { 97 ASSERT(callback != NULL); 98 99 status_t result = StartServer(); 100 if (result != B_OK) 101 return result; 102 103 WriteLocker _(fArrayLock); 104 if (fFreeSlot == -1) { 105 uint32 newSize = max_c(fArraySize * 2, 4); 106 uint32 size = newSize * sizeof(CallbackSlot); 107 CallbackSlot* array = reinterpret_cast<CallbackSlot*>(malloc(size)); 108 if (array == NULL) 109 return B_NO_MEMORY; 110 111 if (fCallbackArray != NULL) 112 memcpy(array, fCallbackArray, fArraySize * sizeof(CallbackSlot)); 113 114 for (uint32 i = fArraySize; i < newSize; i++) 115 array[i].fNext = i + 1; 116 117 array[newSize - 1].fNext = -1; 118 119 fCallbackArray = array; 120 fFreeSlot = fArraySize; 121 fArraySize = newSize; 122 } 123 124 int32 id = fFreeSlot; 125 fFreeSlot = fCallbackArray[id].fNext; 126 127 fCallbackArray[id].fCallback = callback; 128 callback->SetID(id); 129 callback->SetCBServer(this); 130 131 return B_OK; 132 } 133 134 135 status_t 136 CallbackServer::UnregisterCallback(Callback* callback) 137 { 138 ASSERT(callback != NULL); 139 ASSERT(callback->CBServer() == this); 140 141 int32 id = callback->ID(); 142 143 WriteLocker _(fArrayLock); 144 fCallbackArray[id].fNext = fFreeSlot; 145 fFreeSlot = id; 146 147 callback->SetCBServer(NULL); 148 return B_OK; 149 } 150 151 152 status_t 153 CallbackServer::StartServer() 154 { 155 MutexLocker _(fThreadLock); 156 if (fThreadRunning) 157 return B_OK; 158 159 status_t result = ConnectionListener::Listen(&fListener, fNetworkFamily); 160 if (result != B_OK) 161 return result; 162 163 fThread = spawn_kernel_thread(&CallbackServer::ListenerThreadLauncher, 164 "NFSv4 Callback Listener", B_NORMAL_PRIORITY, this); 165 if (fThread < B_OK) 166 return fThread; 167 168 fThreadRunning = true; 169 170 result = resume_thread(fThread); 171 if (result != B_OK) { 172 kill_thread(fThread); 173 fThreadRunning = false; 174 return result; 175 } 176 177 return B_OK; 178 } 179 180 181 status_t 182 CallbackServer::StopServer() 183 { 184 MutexLocker _(&fThreadLock); 185 if (!fThreadRunning) 186 return B_OK; 187 188 fListener->Disconnect(); 189 status_t result; 190 wait_for_thread(fThread, &result); 191 192 MutexLocker locker(fConnectionLock); 193 while (fConnectionList != NULL) { 194 ConnectionEntry* entry = fConnectionList; 195 fConnectionList = entry->fNext; 196 entry->fConnection->Disconnect(); 197 198 status_t result; 199 wait_for_thread(entry->fThread, &result); 200 201 delete entry->fConnection; 202 delete entry; 203 } 204 205 delete fListener; 206 207 fThreadRunning = false; 208 return B_OK; 209 } 210 211 212 status_t 213 CallbackServer::NewConnection(Connection* connection) 214 { 215 ASSERT(connection != NULL); 216 217 ConnectionEntry* entry = new ConnectionEntry; 218 entry->fConnection = connection; 219 entry->fPrev = NULL; 220 221 MutexLocker locker(fConnectionLock); 222 entry->fNext = fConnectionList; 223 if (fConnectionList != NULL) 224 fConnectionList->fPrev = entry; 225 fConnectionList = entry; 226 locker.Unlock(); 227 228 void** arguments = reinterpret_cast<void**>(malloc(sizeof(void*) * 2)); 229 if (arguments == NULL) 230 return B_NO_MEMORY; 231 232 arguments[0] = this; 233 arguments[1] = entry; 234 235 thread_id thread; 236 thread = spawn_kernel_thread(&CallbackServer::ConnectionThreadLauncher, 237 "NFSv4 Callback Connection", B_NORMAL_PRIORITY, arguments); 238 if (thread < B_OK) { 239 ReleaseConnection(entry); 240 free(arguments); 241 return thread; 242 } 243 244 entry->fThread = thread; 245 246 status_t result = resume_thread(thread); 247 if (result != B_OK) { 248 kill_thread(thread); 249 ReleaseConnection(entry); 250 free(arguments); 251 return result; 252 } 253 254 return B_OK; 255 } 256 257 258 status_t 259 CallbackServer::ReleaseConnection(ConnectionEntry* entry) 260 { 261 ASSERT(entry != NULL); 262 263 MutexLocker _(fConnectionLock); 264 if (entry->fNext != NULL) 265 entry->fNext->fPrev = entry->fPrev; 266 if (entry->fPrev != NULL) 267 entry->fPrev->fNext = entry->fNext; 268 else 269 fConnectionList = entry->fNext; 270 271 delete entry->fConnection; 272 delete entry; 273 return B_OK; 274 } 275 276 277 status_t 278 CallbackServer::ConnectionThreadLauncher(void* object) 279 { 280 ASSERT(object != NULL); 281 282 void** objects = reinterpret_cast<void**>(object); 283 CallbackServer* server = reinterpret_cast<CallbackServer*>(objects[0]); 284 ConnectionEntry* entry = reinterpret_cast<ConnectionEntry*>(objects[1]); 285 free(objects); 286 287 return server->ConnectionThread(entry); 288 } 289 290 291 status_t 292 CallbackServer::ConnectionThread(ConnectionEntry* entry) 293 { 294 ASSERT(entry != NULL); 295 296 Connection* connection = entry->fConnection; 297 CallbackReply* reply; 298 299 while (fThreadRunning) { 300 uint32 size; 301 void* buffer; 302 status_t result = connection->Receive(&buffer, &size); 303 if (result != B_OK) { 304 if (result != ECONNABORTED) 305 ReleaseConnection(entry); 306 return result; 307 } 308 309 CallbackRequest* request 310 = new(std::nothrow) CallbackRequest(buffer, size); 311 if (request == NULL) { 312 free(buffer); 313 continue; 314 } else if (request->Error() != B_OK) { 315 reply = CallbackReply::Create(request->XID(), request->RPCError()); 316 if (reply != NULL) { 317 connection->Send(reply->Stream().Buffer(), 318 reply->Stream().Size()); 319 delete reply; 320 } 321 delete request; 322 continue; 323 } 324 325 switch (request->Procedure()) { 326 case CallbackProcCompound: 327 GetCallback(request->ID())->EnqueueRequest(request, connection); 328 break; 329 330 case CallbackProcNull: 331 reply = CallbackReply::Create(request->XID()); 332 if (reply != NULL) { 333 connection->Send(reply->Stream().Buffer(), 334 reply->Stream().Size()); 335 delete reply; 336 } 337 338 default: 339 delete request; 340 } 341 } 342 343 return B_OK; 344 } 345 346 347 status_t 348 CallbackServer::ListenerThreadLauncher(void* object) 349 { 350 ASSERT(object != NULL); 351 352 CallbackServer* server = reinterpret_cast<CallbackServer*>(object); 353 return server->ListenerThread(); 354 } 355 356 357 status_t 358 CallbackServer::ListenerThread() 359 { 360 while (fThreadRunning) { 361 Connection* connection; 362 363 status_t result = fListener->AcceptConnection(&connection); 364 if (result != B_OK) { 365 fThreadRunning = false; 366 return result; 367 } 368 result = NewConnection(connection); 369 if (result != B_OK) 370 delete connection; 371 } 372 373 return B_OK; 374 } 375 376