1 /* 2 * Copyright 2009-2011, Axel Dörfler, axeld@pinc-software.de. 3 * Copyright 2016, Rene Gollent, rene@gollent.com. 4 */ 5 6 7 #include <SocketMessenger.h> 8 9 #include <Message.h> 10 #include <MessageQueue.h> 11 #include <Messenger.h> 12 13 #include <AutoDeleter.h> 14 #include <AutoLocker.h> 15 #include <HashMap.h> 16 17 18 static const char* kReplySenderIDField = "socket_messenger:sender_reply_id"; 19 static const char* kReplyReceiverIDField = "socket_messenger:reply_id"; 20 21 22 // #pragma mark - BSocketMessenger::Private 23 24 25 struct BSocketMessenger::Private { 26 typedef SynchronizedHashMap<HashKey64<int64>, 27 BMessage> ReplyMessageMap; 28 29 Private(); 30 virtual ~Private(); 31 32 void ClearMessages(); 33 34 sem_id fMessageWaiters; 35 thread_id fReplyReader; 36 ReplyMessageMap fReceivedReplies; 37 BMessageQueue fReceivedMessages; 38 int64 fReplyIDCounter; 39 }; 40 41 42 BSocketMessenger::Private::Private() 43 : 44 fMessageWaiters(-1), 45 fReplyReader(-1), 46 fReceivedReplies(), 47 fReceivedMessages(), 48 fReplyIDCounter(0) 49 { 50 } 51 52 53 BSocketMessenger::Private::~Private() 54 { 55 if (fMessageWaiters > 0) 56 delete_sem(fMessageWaiters); 57 if (fReplyReader > 0) 58 wait_for_thread(fReplyReader, NULL); 59 60 ClearMessages(); 61 } 62 63 64 void 65 BSocketMessenger::Private::ClearMessages() 66 { 67 fReceivedReplies.Clear(); 68 AutoLocker<BMessageQueue> queueLocker(fReceivedMessages); 69 while (!fReceivedMessages.IsEmpty()) 70 delete fReceivedMessages.NextMessage(); 71 } 72 73 74 // #pragma mark - BSocketMessenger 75 76 77 BSocketMessenger::BSocketMessenger() 78 : 79 fPrivateData(NULL), 80 fSocket(), 81 fInitStatus(B_NO_INIT) 82 { 83 _Init(); 84 } 85 86 87 BSocketMessenger::BSocketMessenger(const BNetworkAddress& address, 88 bigtime_t timeout) 89 : 90 fPrivateData(NULL), 91 fSocket(), 92 fInitStatus(B_NO_INIT) 93 { 94 _Init(); 95 SetTo(address, timeout); 96 } 97 98 99 BSocketMessenger::BSocketMessenger(const BSocket& socket) 100 : 101 fPrivateData(NULL), 102 fSocket(socket), 103 fInitStatus(B_NO_INIT) 104 { 105 _Init(); 106 if (fPrivateData == NULL) 107 return; 108 109 fInitStatus = socket.InitCheck(); 110 if (fInitStatus != B_OK) 111 return; 112 113 fPrivateData->fReplyReader = spawn_thread(&_MessageReader, 114 "Message Reader", B_NORMAL_PRIORITY, this); 115 if (fPrivateData->fReplyReader < 0) 116 fInitStatus = fPrivateData->fReplyReader; 117 if (fInitStatus != B_OK) { 118 exit_thread(fPrivateData->fReplyReader); 119 fPrivateData->fReplyReader = -1; 120 return; 121 } 122 123 fInitStatus = resume_thread(fPrivateData->fReplyReader); 124 } 125 126 127 BSocketMessenger::~BSocketMessenger() 128 { 129 Unset(); 130 131 delete fPrivateData; 132 } 133 134 135 void 136 BSocketMessenger::Unset() 137 { 138 if (fPrivateData == NULL) 139 return; 140 141 fSocket.Disconnect(); 142 wait_for_thread(fPrivateData->fReplyReader, NULL); 143 fPrivateData->fReplyReader = -1; 144 fPrivateData->ClearMessages(); 145 146 release_sem_etc(fPrivateData->fMessageWaiters, 1, B_RELEASE_ALL); 147 148 fInitStatus = B_NO_INIT; 149 } 150 151 152 status_t 153 BSocketMessenger::SetTo(const BNetworkAddress& address, bigtime_t timeout) 154 { 155 Unset(); 156 157 if (fPrivateData == NULL) 158 return B_NO_MEMORY; 159 160 fPrivateData->fReplyReader = spawn_thread(&_MessageReader, 161 "Message Reader", B_NORMAL_PRIORITY, this); 162 if (fPrivateData->fReplyReader < 0) 163 return fPrivateData->fReplyReader; 164 status_t error = fSocket.Connect(address, timeout); 165 if (error != B_OK) { 166 Unset(); 167 return error; 168 } 169 170 return fInitStatus = resume_thread(fPrivateData->fReplyReader); 171 } 172 173 174 status_t 175 BSocketMessenger::SetTo(const BSocketMessenger& target, bigtime_t timeout) 176 { 177 return SetTo(target.Address(), timeout); 178 } 179 180 181 status_t 182 BSocketMessenger::SendMessage(const BMessage& message) 183 { 184 return _SendMessage(message); 185 } 186 187 188 status_t 189 BSocketMessenger::SendMessage(const BMessage& message, BMessage& _reply, 190 bigtime_t timeout) 191 { 192 int64 replyID = atomic_add64(&fPrivateData->fReplyIDCounter, 1); 193 BMessage temp(message); 194 temp.AddInt64(kReplySenderIDField, replyID); 195 status_t error = _SendMessage(temp); 196 if (error != B_OK) 197 return error; 198 199 return _ReadReply(replyID, _reply, timeout); 200 } 201 202 203 status_t 204 BSocketMessenger::SendMessage(const BMessage& message, 205 BMessenger& replyTarget, bigtime_t timeout) 206 { 207 BMessage reply; 208 status_t error = SendMessage(message, reply, timeout); 209 if (error != B_OK) 210 return error; 211 212 return replyTarget.SendMessage(&reply); 213 } 214 215 216 status_t 217 BSocketMessenger::SendReply(const BMessage& message, const BMessage& reply) 218 { 219 int64 replyID; 220 if (message.FindInt64(kReplySenderIDField, &replyID) != B_OK) 221 return B_NOT_ALLOWED; 222 223 BMessage replyMessage(reply); 224 replyMessage.AddInt64(kReplyReceiverIDField, replyID); 225 return SendMessage(replyMessage); 226 } 227 228 229 status_t 230 BSocketMessenger::ReceiveMessage(BMessage& _message, bigtime_t timeout) 231 { 232 status_t error = B_OK; 233 AutoLocker<BMessageQueue> queueLocker(fPrivateData->fReceivedMessages); 234 for (;;) { 235 if (!fPrivateData->fReceivedMessages.IsEmpty()) { 236 BMessage* nextMessage 237 = fPrivateData->fReceivedMessages.NextMessage(); 238 _message = *nextMessage; 239 delete nextMessage; 240 break; 241 } 242 243 queueLocker.Unlock(); 244 error = _WaitForMessage(timeout); 245 if (error != B_OK) 246 break; 247 if (!fSocket.IsConnected()) { 248 error = B_CANCELED; 249 break; 250 } 251 queueLocker.Lock(); 252 } 253 254 return error; 255 } 256 257 258 void 259 BSocketMessenger::_Init() 260 { 261 if (fPrivateData != NULL) 262 return; 263 264 BSocketMessenger::Private* data 265 = new(std::nothrow) BSocketMessenger::Private; 266 267 if (data == NULL) { 268 fInitStatus = B_NO_MEMORY; 269 return; 270 } 271 272 data->fMessageWaiters = create_sem(0, "message waiters"); 273 if (data->fMessageWaiters < 0) { 274 fInitStatus = data->fMessageWaiters; 275 delete data; 276 return; 277 } 278 279 fPrivateData = data; 280 } 281 282 283 status_t 284 BSocketMessenger::_WaitForMessage(bigtime_t timeout) 285 { 286 for (;;) { 287 status_t error = acquire_sem_etc(fPrivateData->fMessageWaiters, 1, 288 B_RELATIVE_TIMEOUT, timeout); 289 if (error == B_INTERRUPTED) { 290 if (timeout != B_INFINITE_TIMEOUT) 291 timeout -= system_time(); 292 continue; 293 } 294 if (error != B_OK) 295 return error; 296 break; 297 } 298 299 return B_OK; 300 } 301 302 303 status_t 304 BSocketMessenger::_SendMessage(const BMessage& message) 305 { 306 ssize_t flatSize = message.FlattenedSize(); 307 ssize_t totalSize = flatSize + sizeof(ssize_t); 308 309 char* buffer = new(std::nothrow) char[totalSize]; 310 if (buffer == NULL) 311 return B_NO_MEMORY; 312 313 ArrayDeleter<char> bufferDeleter(buffer); 314 *(ssize_t*)buffer = flatSize; 315 char* messageBuffer = buffer + sizeof(ssize_t); 316 status_t error = message.Flatten(messageBuffer, flatSize); 317 if (error != B_OK) 318 return error; 319 320 ssize_t size = fSocket.Write(buffer, totalSize); 321 if (size < 0) 322 return size; 323 324 return B_OK; 325 } 326 327 328 status_t 329 BSocketMessenger::_ReadMessage(BMessage& _message) 330 { 331 status_t error = fSocket.WaitForReadable(B_INFINITE_TIMEOUT); 332 if (error != B_OK) 333 return error; 334 335 ssize_t size = 0; 336 ssize_t readSize = fSocket.Read(&size, sizeof(ssize_t)); 337 if (readSize < 0) 338 return readSize; 339 340 if (readSize != sizeof(ssize_t)) 341 return B_BAD_VALUE; 342 343 if (size <= 0) 344 return B_MISMATCHED_VALUES; 345 346 char* buffer = new(std::nothrow) char[size]; 347 if (buffer == NULL) 348 return B_NO_MEMORY; 349 350 ArrayDeleter<char> bufferDeleter(buffer); 351 readSize = fSocket.Read(buffer, size); 352 if (readSize < 0) 353 return readSize; 354 355 if (readSize != size) 356 return B_MISMATCHED_VALUES; 357 358 return _message.Unflatten(buffer); 359 } 360 361 362 status_t 363 BSocketMessenger::_ReadReply(const int64 replyID, BMessage& reply, 364 bigtime_t timeout) 365 { 366 status_t error = B_OK; 367 for (;;) { 368 if (fPrivateData->fReceivedReplies.ContainsKey(replyID)) { 369 reply = fPrivateData->fReceivedReplies.Remove(replyID); 370 break; 371 } 372 373 error = _WaitForMessage(timeout); 374 if (error != B_OK) 375 break; 376 if (!fSocket.IsConnected()) { 377 error = B_CANCELED; 378 break; 379 } 380 } 381 382 return error; 383 } 384 385 386 status_t 387 BSocketMessenger::_MessageReader(void* arg) 388 { 389 BSocketMessenger* messenger = (BSocketMessenger*)arg; 390 BSocketMessenger::Private* data = messenger->fPrivateData; 391 status_t error = B_OK; 392 393 for (;;) { 394 BMessage message; 395 error = messenger->_ReadMessage(message); 396 if (error != B_OK) 397 break; 398 399 int64 replyID; 400 if (message.FindInt64(kReplyReceiverIDField, &replyID) == B_OK) { 401 error = data->fReceivedReplies.Put(replyID, message); 402 if (error != B_OK) 403 break; 404 } else { 405 BMessage* queueMessage = new(std::nothrow) BMessage(message); 406 if (queueMessage == NULL) { 407 error = B_NO_MEMORY; 408 break; 409 } 410 411 AutoLocker<BMessageQueue> queueLocker( 412 data->fReceivedMessages); 413 data->fReceivedMessages.AddMessage(queueMessage); 414 } 415 416 417 release_sem_etc(data->fMessageWaiters, 1, B_RELEASE_ALL); 418 } 419 420 // if we exit our message loop, ensure everybody wakes up and knows 421 // we're no longer receiving messages. 422 messenger->fSocket.Disconnect(); 423 release_sem_etc(data->fMessageWaiters, 1, B_RELEASE_ALL); 424 return error; 425 } 426