1 /* 2 * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de. 3 * Distributed under the terms of the MIT License. 4 */ 5 6 #include "UnixFifo.h" 7 8 #include <new> 9 10 #include <AutoDeleter.h> 11 12 #include <net_stack.h> 13 #include <util/ring_buffer.h> 14 15 #include "unix.h" 16 17 18 #define UNIX_FIFO_DEBUG_LEVEL 0 19 #define UNIX_DEBUG_LEVEL UNIX_FIFO_DEBUG_LEVEL 20 #include "UnixDebug.h" 21 22 23 // #pragma mark - UnixRequest 24 25 26 UnixRequest::UnixRequest(const iovec* vecs, size_t count, 27 ancillary_data_container* ancillaryData, 28 struct sockaddr_storage* address) 29 : 30 fVecs(vecs), 31 fVecCount(count), 32 fAncillaryData(ancillaryData), 33 fTotalSize(0), 34 fBytesTransferred(0), 35 fVecIndex(0), 36 fVecOffset(0), 37 fAddress(address) 38 { 39 for (size_t i = 0; i < fVecCount; i++) 40 fTotalSize += fVecs[i].iov_len; 41 } 42 43 44 void 45 UnixRequest::AddBytesTransferred(size_t size) 46 { 47 fBytesTransferred += size; 48 49 // also adjust the current iovec index/offset 50 while (fVecIndex < fVecCount 51 && fVecs[fVecIndex].iov_len - fVecOffset <= size) { 52 size -= fVecs[fVecIndex].iov_len - fVecOffset; 53 fVecIndex++; 54 fVecOffset = 0; 55 } 56 57 if (fVecIndex < fVecCount) 58 fVecOffset += size; 59 } 60 61 62 bool 63 UnixRequest::GetCurrentChunk(void*& data, size_t& size) 64 { 65 while (fVecIndex < fVecCount 66 && fVecOffset >= fVecs[fVecIndex].iov_len) { 67 fVecIndex++; 68 fVecOffset = 0; 69 } 70 if (fVecIndex >= fVecCount) 71 return false; 72 73 data = (uint8*)fVecs[fVecIndex].iov_base + fVecOffset; 74 size = fVecs[fVecIndex].iov_len - fVecOffset; 75 return true; 76 } 77 78 79 void 80 UnixRequest::SetAncillaryData(ancillary_data_container* data) 81 { 82 fAncillaryData = data; 83 } 84 85 86 void 87 UnixRequest::AddAncillaryData(ancillary_data_container* data) 88 { 89 if (fAncillaryData != NULL) { 90 gStackModule->move_ancillary_data(data, fAncillaryData); 91 gStackModule->delete_ancillary_data_container(data); 92 } else 93 fAncillaryData = data; 94 } 95 96 97 // #pragma mark - UnixBufferQueue 98 99 100 UnixBufferQueue::UnixBufferQueue(size_t capacity, UnixFifoType type) 101 : 102 fBuffer(NULL), 103 fCapacity(capacity), 104 fType(type) 105 { 106 } 107 108 109 UnixBufferQueue::~UnixBufferQueue() 110 { 111 while (AncillaryDataEntry* entry = fAncillaryData.RemoveHead()) { 112 gStackModule->delete_ancillary_data_container(entry->data); 113 delete entry; 114 } 115 116 delete_ring_buffer(fBuffer); 117 } 118 119 120 status_t 121 UnixBufferQueue::Init() 122 { 123 fBuffer = create_ring_buffer(fCapacity); 124 if (fBuffer == NULL) 125 return B_NO_MEMORY; 126 return B_OK; 127 } 128 129 130 size_t 131 UnixBufferQueue::Readable() const 132 { 133 return ring_buffer_readable(fBuffer); 134 } 135 136 137 size_t 138 UnixBufferQueue::Writable() const 139 { 140 return ring_buffer_writable(fBuffer); 141 } 142 143 144 status_t 145 UnixBufferQueue::Read(UnixRequest& request) 146 { 147 bool user = gStackModule->is_syscall(); 148 149 size_t readable = Readable(); 150 void* data; 151 size_t size; 152 153 DatagramEntry* datagramEntry = NULL; 154 if (fType == UnixFifoType::Datagram) { 155 datagramEntry = fDatagrams.Head(); 156 if (datagramEntry == NULL) 157 return B_ERROR; 158 159 if (datagramEntry->size > readable) 160 TRACE("UnixBufferQueue::Read(): expected to read a datagram of size %lu, " 161 "but only %lu bytes are readable\n", datagramEntry->size, readable); 162 else 163 readable = datagramEntry->size; 164 } 165 166 while (readable > 0 && request.GetCurrentChunk(data, size)) { 167 if (size > readable) 168 size = readable; 169 170 ssize_t bytesRead; 171 if (user) 172 bytesRead = ring_buffer_user_read(fBuffer, (uint8*)data, size); 173 else 174 bytesRead = ring_buffer_read(fBuffer, (uint8*)data, size); 175 176 if (bytesRead < 0) 177 return bytesRead; 178 if (bytesRead == 0) 179 return B_ERROR; 180 181 // Adjust ancillary data entry offsets, respectively attach the ones 182 // that belong to the read data to the request. 183 if (AncillaryDataEntry* entry = fAncillaryData.Head()) { 184 size_t offsetDelta = bytesRead; 185 while (entry != NULL && offsetDelta > entry->offset) { 186 // entry data have been read -- add ancillary data to request 187 fAncillaryData.RemoveHead(); 188 offsetDelta -= entry->offset; 189 request.AddAncillaryData(entry->data); 190 delete entry; 191 192 entry = fAncillaryData.Head(); 193 } 194 195 if (entry != NULL) 196 entry->offset -= offsetDelta; 197 } 198 199 request.AddBytesTransferred(bytesRead); 200 readable -= bytesRead; 201 } 202 203 if (fType == UnixFifoType::Datagram) { 204 fDatagrams.RemoveHead(); 205 206 memcpy(request.Address(), &datagramEntry->address, sizeof(datagramEntry->address)); 207 delete datagramEntry; 208 209 if (readable > 0) { 210 ring_buffer_flush(fBuffer, readable); 211 if (AncillaryDataEntry* entry = fAncillaryData.Head()) { 212 size_t offsetDelta = readable; 213 while (entry != NULL && offsetDelta > entry->offset) { 214 fAncillaryData.RemoveHead(); 215 offsetDelta -= entry->offset; 216 delete entry; 217 218 entry = fAncillaryData.Head(); 219 } 220 221 if (entry != NULL) 222 entry->offset -= offsetDelta; 223 } 224 } 225 } 226 227 return B_OK; 228 } 229 230 231 status_t 232 UnixBufferQueue::Write(UnixRequest& request) 233 { 234 bool user = gStackModule->is_syscall(); 235 236 size_t writable = Writable(); 237 void* data; 238 size_t size; 239 240 DatagramEntry* datagramEntry = NULL; 241 ObjectDeleter<DatagramEntry> datagramEntryDeleter; 242 if (fType == UnixFifoType::Datagram) { 243 datagramEntry = new(std::nothrow) DatagramEntry; 244 if (datagramEntry == NULL) 245 return B_NO_MEMORY; 246 247 datagramEntryDeleter.SetTo(datagramEntry); 248 memcpy(&datagramEntry->address, request.Address(), 249 sizeof(datagramEntry->address)); 250 datagramEntry->size = request.TotalSize(); 251 252 // This should have been handled in UnixFifo 253 if (writable < datagramEntry->size) { 254 TRACE("UnixBufferQueue::Write(): not enough space for" 255 "datagram of size %lu (%lu bytes left)\n", datagramEntry->size, writable); 256 return B_ERROR; 257 } 258 } 259 260 // If the request has ancillary data create an entry first. 261 AncillaryDataEntry* ancillaryEntry = NULL; 262 ObjectDeleter<AncillaryDataEntry> ancillaryEntryDeleter; 263 if (writable > 0 && request.AncillaryData() != NULL) { 264 ancillaryEntry = new(std::nothrow) AncillaryDataEntry; 265 if (ancillaryEntry == NULL) 266 return B_NO_MEMORY; 267 268 ancillaryEntryDeleter.SetTo(ancillaryEntry); 269 ancillaryEntry->data = request.AncillaryData(); 270 ancillaryEntry->offset = Readable(); 271 272 // The offsets are relative to the previous entry. 273 AncillaryDataList::Iterator it = fAncillaryData.GetIterator(); 274 while (AncillaryDataEntry* entry = it.Next()) 275 ancillaryEntry->offset -= entry->offset; 276 // TODO: This is inefficient when the list is long. Rather also 277 // store and maintain the absolute offset of the last queued entry. 278 } 279 280 // write as much as we can 281 while (writable > 0 && request.GetCurrentChunk(data, size)) { 282 if (size > writable) 283 size = writable; 284 285 ssize_t bytesWritten; 286 if (user) 287 bytesWritten = ring_buffer_user_write(fBuffer, (uint8*)data, size); 288 else 289 bytesWritten = ring_buffer_write(fBuffer, (uint8*)data, size); 290 291 if (bytesWritten < 0) 292 return bytesWritten; 293 if (bytesWritten == 0) 294 return B_ERROR; 295 296 if (ancillaryEntry != NULL) { 297 fAncillaryData.Add(ancillaryEntry); 298 ancillaryEntryDeleter.Detach(); 299 request.SetAncillaryData(NULL); 300 ancillaryEntry = NULL; 301 } 302 303 request.AddBytesTransferred(bytesWritten); 304 writable -= bytesWritten; 305 } 306 307 if (fType == UnixFifoType::Datagram) { 308 fDatagrams.Add(datagramEntry); 309 datagramEntryDeleter.Detach(); 310 } 311 312 return B_OK; 313 } 314 315 316 status_t 317 UnixBufferQueue::SetCapacity(size_t capacity) 318 { 319 if (capacity <= fCapacity) 320 return B_OK; 321 322 ring_buffer* newBuffer = create_ring_buffer(capacity); 323 if (newBuffer == NULL) 324 return B_NO_MEMORY; 325 326 ring_buffer_move(newBuffer, ring_buffer_readable(fBuffer), fBuffer); 327 delete_ring_buffer(fBuffer); 328 329 fBuffer = newBuffer; 330 fCapacity = capacity; 331 332 return B_OK; 333 } 334 335 336 // #pragma mark - 337 338 339 UnixFifo::UnixFifo(size_t capacity, UnixFifoType type) 340 : 341 fBuffer(capacity, type), 342 fReaders(), 343 fWriters(), 344 fReadRequested(0), 345 fWriteRequested(0), 346 fShutdown(0) 347 348 { 349 fReadCondition.Init(this, "unix fifo read"); 350 fWriteCondition.Init(this, "unix fifo write"); 351 mutex_init(&fLock, "unix fifo"); 352 } 353 354 355 UnixFifo::~UnixFifo() 356 { 357 mutex_destroy(&fLock); 358 } 359 360 361 status_t 362 UnixFifo::Init() 363 { 364 return fBuffer.Init(); 365 } 366 367 368 void 369 UnixFifo::Shutdown(uint32 shutdown) 370 { 371 TRACE("[%" B_PRId32 "] %p->UnixFifo::Shutdown(0x%" B_PRIx32 ")\n", 372 find_thread(NULL), this, shutdown); 373 374 fShutdown |= shutdown; 375 376 if (shutdown != 0) { 377 // Shutting down either end also effects the other, so notify both. 378 fReadCondition.NotifyAll(); 379 fWriteCondition.NotifyAll(); 380 } 381 } 382 383 384 ssize_t 385 UnixFifo::Read(const iovec* vecs, size_t vecCount, 386 ancillary_data_container** _ancillaryData, 387 struct sockaddr_storage* address, bigtime_t timeout) 388 { 389 TRACE("[%" B_PRId32 "] %p->UnixFifo::Read(%p, %ld, %" B_PRIdBIGTIME ")\n", 390 find_thread(NULL), this, vecs, vecCount, timeout); 391 392 if (IsReadShutdown() && fBuffer.Readable() == 0) 393 RETURN_ERROR(UNIX_FIFO_SHUTDOWN); 394 395 UnixRequest request(vecs, vecCount, NULL, address); 396 fReaders.Add(&request); 397 fReadRequested += request.TotalSize(); 398 399 status_t error = _Read(request, timeout); 400 401 bool firstInQueue = fReaders.Head() == &request; 402 fReaders.Remove(&request); 403 fReadRequested -= request.TotalSize(); 404 405 if (firstInQueue && !fReaders.IsEmpty() && fBuffer.Readable() > 0 406 && !IsReadShutdown()) { 407 // There's more to read, other readers, and we were first in the queue. 408 // So we need to notify the others. 409 fReadCondition.NotifyAll(); 410 } 411 412 if (request.BytesTransferred() > 0 && !fWriters.IsEmpty() 413 && !IsWriteShutdown()) { 414 // We read something and there are writers. Notify them 415 fWriteCondition.NotifyAll(); 416 } 417 418 *_ancillaryData = request.AncillaryData(); 419 420 if (request.BytesTransferred() > 0) { 421 if (request.BytesTransferred() > SSIZE_MAX) 422 RETURN_ERROR(SSIZE_MAX); 423 RETURN_ERROR((ssize_t)request.BytesTransferred()); 424 } 425 426 RETURN_ERROR(error); 427 } 428 429 430 ssize_t 431 UnixFifo::Write(const iovec* vecs, size_t vecCount, 432 ancillary_data_container* ancillaryData, 433 const struct sockaddr_storage* address, bigtime_t timeout) 434 { 435 TRACE("[%" B_PRId32 "] %p->UnixFifo::Write(%p, %ld, %p, %" B_PRIdBIGTIME 436 ")\n", find_thread(NULL), this, vecs, vecCount, ancillaryData, 437 timeout); 438 439 if (IsWriteShutdown()) 440 RETURN_ERROR(UNIX_FIFO_SHUTDOWN); 441 442 if (IsReadShutdown()) 443 RETURN_ERROR(EPIPE); 444 445 UnixRequest request(vecs, vecCount, ancillaryData, 446 (struct sockaddr_storage*)address); 447 fWriters.Add(&request); 448 fWriteRequested += request.TotalSize(); 449 450 status_t error = _Write(request, timeout); 451 452 bool firstInQueue = fWriters.Head() == &request; 453 fWriters.Remove(&request); 454 fWriteRequested -= request.TotalSize(); 455 456 if (firstInQueue && !fWriters.IsEmpty() && fBuffer.Writable() > 0 457 && !IsWriteShutdown()) { 458 // There's more space for writing, other writers, and we were first in 459 // the queue. So we need to notify the others. 460 fWriteCondition.NotifyAll(); 461 } 462 463 if (request.BytesTransferred() > 0 && !fReaders.IsEmpty() 464 && !IsReadShutdown()) { 465 // We've written something and there are readers. Notify them. 466 fReadCondition.NotifyAll(); 467 } 468 469 if (request.BytesTransferred() > 0) { 470 if (request.BytesTransferred() > SSIZE_MAX) 471 RETURN_ERROR(SSIZE_MAX); 472 RETURN_ERROR((ssize_t)request.BytesTransferred()); 473 } 474 475 RETURN_ERROR(error); 476 } 477 478 479 size_t 480 UnixFifo::Readable() const 481 { 482 size_t readable = fBuffer.Readable(); 483 return (off_t)readable > fReadRequested ? readable - fReadRequested : 0; 484 } 485 486 487 size_t 488 UnixFifo::Writable() const 489 { 490 size_t writable = fBuffer.Writable(); 491 return (off_t)writable > fWriteRequested ? writable - fWriteRequested : 0; 492 } 493 494 495 status_t 496 UnixFifo::SetBufferCapacity(size_t capacity) 497 { 498 // check against allowed minimal/maximal value 499 if (capacity > UNIX_FIFO_MAXIMAL_CAPACITY) 500 capacity = UNIX_FIFO_MAXIMAL_CAPACITY; 501 else if (capacity < UNIX_FIFO_MINIMAL_CAPACITY) 502 capacity = UNIX_FIFO_MINIMAL_CAPACITY; 503 504 size_t oldCapacity = fBuffer.Capacity(); 505 if (capacity == oldCapacity) 506 return B_OK; 507 508 // set capacity 509 status_t error = fBuffer.SetCapacity(capacity); 510 if (error != B_OK) 511 return error; 512 513 // wake up waiting writers, if the capacity increased 514 if (!fWriters.IsEmpty() && !IsWriteShutdown()) 515 fWriteCondition.NotifyAll(); 516 517 return B_OK; 518 } 519 520 521 status_t 522 UnixFifo::_Read(UnixRequest& request, bigtime_t timeout) 523 { 524 // wait for the request to reach the front of the queue 525 if (fReaders.Head() != &request && timeout == 0) 526 RETURN_ERROR(B_WOULD_BLOCK); 527 528 while (fReaders.Head() != &request 529 && !(IsReadShutdown() && fBuffer.Readable() == 0)) { 530 ConditionVariableEntry entry; 531 fReadCondition.Add(&entry); 532 533 mutex_unlock(&fLock); 534 status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, 535 timeout); 536 mutex_lock(&fLock); 537 538 if (error != B_OK) 539 RETURN_ERROR(error); 540 } 541 542 if (fBuffer.Readable() == 0) { 543 if (IsReadShutdown()) 544 RETURN_ERROR(UNIX_FIFO_SHUTDOWN); 545 546 if (IsWriteShutdown()) 547 RETURN_ERROR(0); 548 549 if (timeout == 0) 550 RETURN_ERROR(B_WOULD_BLOCK); 551 } 552 553 // wait for any data to become available 554 // TODO: Support low water marks! 555 while (fBuffer.Readable() == 0 556 && !IsReadShutdown() && !IsWriteShutdown()) { 557 ConditionVariableEntry entry; 558 fReadCondition.Add(&entry); 559 560 mutex_unlock(&fLock); 561 status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, 562 timeout); 563 mutex_lock(&fLock); 564 565 if (error != B_OK) 566 RETURN_ERROR(error); 567 } 568 569 if (fBuffer.Readable() == 0) { 570 if (IsReadShutdown()) 571 RETURN_ERROR(UNIX_FIFO_SHUTDOWN); 572 if (IsWriteShutdown()) 573 RETURN_ERROR(0); 574 } 575 576 RETURN_ERROR(fBuffer.Read(request)); 577 } 578 579 580 status_t 581 UnixFifo::_Write(UnixRequest& request, bigtime_t timeout) 582 { 583 if (timeout == 0) 584 RETURN_ERROR(_WriteNonBlocking(request)); 585 586 // wait for the request to reach the front of the queue 587 while (fWriters.Head() != &request && !IsWriteShutdown()) { 588 ConditionVariableEntry entry; 589 fWriteCondition.Add(&entry); 590 591 mutex_unlock(&fLock); 592 status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, 593 timeout); 594 mutex_lock(&fLock); 595 596 if (error != B_OK) 597 RETURN_ERROR(error); 598 } 599 600 if (IsWriteShutdown()) 601 RETURN_ERROR(UNIX_FIFO_SHUTDOWN); 602 603 if (IsReadShutdown()) 604 RETURN_ERROR(EPIPE); 605 606 if (request.TotalSize() == 0) 607 return 0; 608 609 status_t error = B_OK; 610 611 while (error == B_OK && request.BytesRemaining() > 0) { 612 // wait for any space to become available 613 while (error == B_OK && fBuffer.Writable() < _MinimumWritableSize(request) 614 && !IsWriteShutdown() && !IsReadShutdown()) { 615 ConditionVariableEntry entry; 616 fWriteCondition.Add(&entry); 617 618 mutex_unlock(&fLock); 619 error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout); 620 mutex_lock(&fLock); 621 622 if (error != B_OK) 623 RETURN_ERROR(error); 624 } 625 626 if (IsWriteShutdown()) 627 RETURN_ERROR(UNIX_FIFO_SHUTDOWN); 628 629 if (IsReadShutdown()) 630 RETURN_ERROR(EPIPE); 631 632 // write as much as we can 633 error = fBuffer.Write(request); 634 635 if (error == B_OK) { 636 // TODO: Whenever we've successfully written a part, we should reset the 637 // timeout! 638 } 639 } 640 641 RETURN_ERROR(error); 642 } 643 644 645 status_t 646 UnixFifo::_WriteNonBlocking(UnixRequest& request) 647 { 648 // We need to be first in queue and space should be available right now, 649 // otherwise we need to fail. 650 if (fWriters.Head() != &request || fBuffer.Writable() < _MinimumWritableSize(request)) 651 RETURN_ERROR(B_WOULD_BLOCK); 652 653 if (request.TotalSize() == 0) 654 return 0; 655 656 // Write as much as we can. 657 RETURN_ERROR(fBuffer.Write(request)); 658 } 659 660 661 size_t 662 UnixFifo::_MinimumWritableSize(const UnixRequest& request) const 663 { 664 switch (fType) { 665 case UnixFifoType::Datagram: 666 return request.TotalSize(); 667 case UnixFifoType::Stream: 668 default: 669 return 1; 670 } 671 } 672