1 /* 2 * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de. 3 * Distributed under the terms of the MIT License. 4 */ 5 6 #include "UnixFifo.h" 7 8 #include <new> 9 10 #include <AutoDeleter.h> 11 12 #include <net_stack.h> 13 #include <util/ring_buffer.h> 14 15 #include "unix.h" 16 17 18 #define UNIX_FIFO_DEBUG_LEVEL 0 19 #define UNIX_DEBUG_LEVEL UNIX_FIFO_DEBUG_LEVEL 20 #include "UnixDebug.h" 21 22 23 // #pragma mark - UnixRequest 24 25 26 UnixRequest::UnixRequest(const iovec* vecs, size_t count, 27 ancillary_data_container* ancillaryData, 28 struct sockaddr_storage* address) 29 : 30 fVecs(vecs), 31 fVecCount(count), 32 fAncillaryData(ancillaryData), 33 fTotalSize(0), 34 fBytesTransferred(0), 35 fVecIndex(0), 36 fVecOffset(0), 37 fAddress(address) 38 { 39 for (size_t i = 0; i < fVecCount; i++) 40 fTotalSize += fVecs[i].iov_len; 41 } 42 43 44 void 45 UnixRequest::AddBytesTransferred(size_t size) 46 { 47 fBytesTransferred += size; 48 49 // also adjust the current iovec index/offset 50 while (fVecIndex < fVecCount 51 && fVecs[fVecIndex].iov_len - fVecOffset <= size) { 52 size -= fVecs[fVecIndex].iov_len - fVecOffset; 53 fVecIndex++; 54 fVecOffset = 0; 55 } 56 57 if (fVecIndex < fVecCount) 58 fVecOffset += size; 59 } 60 61 62 bool 63 UnixRequest::GetCurrentChunk(void*& data, size_t& size) 64 { 65 while (fVecIndex < fVecCount 66 && fVecOffset >= fVecs[fVecIndex].iov_len) { 67 fVecIndex++; 68 fVecOffset = 0; 69 } 70 if (fVecIndex >= fVecCount) 71 return false; 72 73 data = (uint8*)fVecs[fVecIndex].iov_base + fVecOffset; 74 size = fVecs[fVecIndex].iov_len - fVecOffset; 75 return true; 76 } 77 78 79 void 80 UnixRequest::UnsetAncillaryData() 81 { 82 fAncillaryData = NULL; 83 } 84 85 86 void 87 UnixRequest::AddAncillaryData(ancillary_data_container* data) 88 { 89 if (fAncillaryData != NULL) { 90 gStackModule->move_ancillary_data(data, fAncillaryData); 91 gStackModule->delete_ancillary_data_container(data); 92 } else 93 fAncillaryData = data; 94 } 95 96 97 // #pragma mark - UnixBufferQueue 98 99 100 UnixBufferQueue::UnixBufferQueue(size_t capacity, UnixFifoType type) 101 : 102 fBuffer(NULL), 103 fCapacity(capacity), 104 fType(type) 105 { 106 } 107 108 109 UnixBufferQueue::~UnixBufferQueue() 110 { 111 while (AncillaryDataEntry* entry = fAncillaryData.RemoveHead()) { 112 gStackModule->delete_ancillary_data_container(entry->data); 113 delete entry; 114 } 115 116 delete_ring_buffer(fBuffer); 117 } 118 119 120 status_t 121 UnixBufferQueue::Init() 122 { 123 fBuffer = create_ring_buffer(fCapacity); 124 if (fBuffer == NULL) 125 return B_NO_MEMORY; 126 return B_OK; 127 } 128 129 130 size_t 131 UnixBufferQueue::Readable() const 132 { 133 return ring_buffer_readable(fBuffer); 134 } 135 136 137 size_t 138 UnixBufferQueue::Writable() const 139 { 140 return ring_buffer_writable(fBuffer); 141 } 142 143 144 status_t 145 UnixBufferQueue::Read(UnixRequest& request) 146 { 147 bool user = gStackModule->is_syscall(); 148 149 size_t readable = Readable(); 150 void* data; 151 size_t size; 152 153 DatagramEntry* datagramEntry = NULL; 154 if (fType == UnixFifoType::Datagram) { 155 datagramEntry = fDatagrams.Head(); 156 if (datagramEntry == NULL) 157 return B_ERROR; 158 159 if (datagramEntry->size > readable) 160 TRACE("UnixBufferQueue::Read(): expected to read a datagram of size %lu, " 161 "but only %lu bytes are readable\n", datagramEntry->size, readable); 162 else 163 readable = datagramEntry->size; 164 } 165 166 while (readable > 0 && request.GetCurrentChunk(data, size)) { 167 if (size > readable) 168 size = readable; 169 170 ssize_t bytesRead; 171 if (user) 172 bytesRead = ring_buffer_user_read(fBuffer, (uint8*)data, size); 173 else 174 bytesRead = ring_buffer_read(fBuffer, (uint8*)data, size); 175 176 if (bytesRead < 0) 177 return bytesRead; 178 if (bytesRead == 0) 179 return B_ERROR; 180 181 // Adjust ancillary data entry offsets, respectively attach the ones 182 // that belong to the read data to the request. 183 if (AncillaryDataEntry* entry = fAncillaryData.Head()) { 184 size_t offsetDelta = bytesRead; 185 while (entry != NULL && offsetDelta > entry->offset) { 186 // entry data have been read -- add ancillary data to request 187 fAncillaryData.RemoveHead(); 188 offsetDelta -= entry->offset; 189 request.AddAncillaryData(entry->data); 190 delete entry; 191 192 entry = fAncillaryData.Head(); 193 } 194 195 if (entry != NULL) 196 entry->offset -= offsetDelta; 197 } 198 199 request.AddBytesTransferred(bytesRead); 200 readable -= bytesRead; 201 } 202 203 if (fType == UnixFifoType::Datagram) { 204 fDatagrams.RemoveHead(); 205 206 memcpy(request.Address(), &datagramEntry->address, sizeof(datagramEntry->address)); 207 delete datagramEntry; 208 209 if (readable > 0) { 210 ring_buffer_flush(fBuffer, readable); 211 if (AncillaryDataEntry* entry = fAncillaryData.Head()) { 212 size_t offsetDelta = readable; 213 while (entry != NULL && offsetDelta > entry->offset) { 214 fAncillaryData.RemoveHead(); 215 offsetDelta -= entry->offset; 216 gStackModule->delete_ancillary_data_container(entry->data); 217 delete entry; 218 219 entry = fAncillaryData.Head(); 220 } 221 222 if (entry != NULL) 223 entry->offset -= offsetDelta; 224 } 225 } 226 } 227 228 return B_OK; 229 } 230 231 232 status_t 233 UnixBufferQueue::Write(UnixRequest& request) 234 { 235 bool user = gStackModule->is_syscall(); 236 237 size_t writable = Writable(); 238 void* data; 239 size_t size; 240 241 DatagramEntry* datagramEntry = NULL; 242 ObjectDeleter<DatagramEntry> datagramEntryDeleter; 243 if (fType == UnixFifoType::Datagram) { 244 datagramEntry = new(std::nothrow) DatagramEntry; 245 if (datagramEntry == NULL) 246 return B_NO_MEMORY; 247 248 datagramEntryDeleter.SetTo(datagramEntry); 249 memcpy(&datagramEntry->address, request.Address(), 250 sizeof(datagramEntry->address)); 251 datagramEntry->size = request.TotalSize(); 252 253 // This should have been handled in UnixFifo 254 if (writable < datagramEntry->size) { 255 TRACE("UnixBufferQueue::Write(): not enough space for" 256 "datagram of size %lu (%lu bytes left)\n", datagramEntry->size, writable); 257 return B_ERROR; 258 } 259 } 260 261 // If the request has ancillary data create an entry first. 262 AncillaryDataEntry* ancillaryEntry = NULL; 263 ObjectDeleter<AncillaryDataEntry> ancillaryEntryDeleter; 264 if (writable > 0 && request.AncillaryData() != NULL) { 265 ancillaryEntry = new(std::nothrow) AncillaryDataEntry; 266 if (ancillaryEntry == NULL) 267 return B_NO_MEMORY; 268 269 ancillaryEntryDeleter.SetTo(ancillaryEntry); 270 ancillaryEntry->data = request.AncillaryData(); 271 ancillaryEntry->offset = Readable(); 272 273 // The offsets are relative to the previous entry. 274 AncillaryDataList::Iterator it = fAncillaryData.GetIterator(); 275 while (AncillaryDataEntry* entry = it.Next()) 276 ancillaryEntry->offset -= entry->offset; 277 // TODO: This is inefficient when the list is long. Rather also 278 // store and maintain the absolute offset of the last queued entry. 279 } 280 281 // write as much as we can 282 while (writable > 0 && request.GetCurrentChunk(data, size)) { 283 if (size > writable) 284 size = writable; 285 286 ssize_t bytesWritten; 287 if (user) 288 bytesWritten = ring_buffer_user_write(fBuffer, (uint8*)data, size); 289 else 290 bytesWritten = ring_buffer_write(fBuffer, (uint8*)data, size); 291 292 if (bytesWritten < 0) 293 return bytesWritten; 294 if (bytesWritten == 0) 295 return B_ERROR; 296 297 if (ancillaryEntry != NULL) { 298 fAncillaryData.Add(ancillaryEntry); 299 ancillaryEntryDeleter.Detach(); 300 request.UnsetAncillaryData(); 301 ancillaryEntry = NULL; 302 } 303 304 request.AddBytesTransferred(bytesWritten); 305 writable -= bytesWritten; 306 } 307 308 if (fType == UnixFifoType::Datagram) { 309 fDatagrams.Add(datagramEntry); 310 datagramEntryDeleter.Detach(); 311 } 312 313 return B_OK; 314 } 315 316 317 status_t 318 UnixBufferQueue::SetCapacity(size_t capacity) 319 { 320 if (capacity <= fCapacity) 321 return B_OK; 322 323 ring_buffer* newBuffer = create_ring_buffer(capacity); 324 if (newBuffer == NULL) 325 return B_NO_MEMORY; 326 327 ring_buffer_move(newBuffer, ring_buffer_readable(fBuffer), fBuffer); 328 delete_ring_buffer(fBuffer); 329 330 fBuffer = newBuffer; 331 fCapacity = capacity; 332 333 return B_OK; 334 } 335 336 337 // #pragma mark - 338 339 340 UnixFifo::UnixFifo(size_t capacity, UnixFifoType type) 341 : 342 fBuffer(capacity, type), 343 fReaders(), 344 fWriters(), 345 fReadRequested(0), 346 fWriteRequested(0), 347 fShutdown(0) 348 349 { 350 fReadCondition.Init(this, "unix fifo read"); 351 fWriteCondition.Init(this, "unix fifo write"); 352 mutex_init(&fLock, "unix fifo"); 353 } 354 355 356 UnixFifo::~UnixFifo() 357 { 358 mutex_destroy(&fLock); 359 } 360 361 362 status_t 363 UnixFifo::Init() 364 { 365 return fBuffer.Init(); 366 } 367 368 369 void 370 UnixFifo::Shutdown(uint32 shutdown) 371 { 372 TRACE("[%" B_PRId32 "] %p->UnixFifo::Shutdown(0x%" B_PRIx32 ")\n", 373 find_thread(NULL), this, shutdown); 374 375 fShutdown |= shutdown; 376 377 if (shutdown != 0) { 378 // Shutting down either end also effects the other, so notify both. 379 fReadCondition.NotifyAll(); 380 fWriteCondition.NotifyAll(); 381 } 382 } 383 384 385 ssize_t 386 UnixFifo::Read(const iovec* vecs, size_t vecCount, 387 ancillary_data_container** _ancillaryData, 388 struct sockaddr_storage* address, bigtime_t timeout) 389 { 390 TRACE("[%" B_PRId32 "] %p->UnixFifo::Read(%p, %ld, %" B_PRIdBIGTIME ")\n", 391 find_thread(NULL), this, vecs, vecCount, timeout); 392 393 if (IsReadShutdown() && fBuffer.Readable() == 0) 394 RETURN_ERROR(UNIX_FIFO_SHUTDOWN); 395 396 UnixRequest request(vecs, vecCount, NULL, address); 397 fReaders.Add(&request); 398 fReadRequested += request.TotalSize(); 399 400 status_t error = _Read(request, timeout); 401 402 bool firstInQueue = fReaders.Head() == &request; 403 fReaders.Remove(&request); 404 fReadRequested -= request.TotalSize(); 405 406 if (firstInQueue && !fReaders.IsEmpty() && fBuffer.Readable() > 0 407 && !IsReadShutdown()) { 408 // There's more to read, other readers, and we were first in the queue. 409 // So we need to notify the others. 410 fReadCondition.NotifyAll(); 411 } 412 413 if (request.BytesTransferred() > 0 && !fWriters.IsEmpty() 414 && !IsWriteShutdown()) { 415 // We read something and there are writers. Notify them 416 fWriteCondition.NotifyAll(); 417 } 418 419 *_ancillaryData = request.AncillaryData(); 420 421 if (request.BytesTransferred() > 0) { 422 if (request.BytesTransferred() > SSIZE_MAX) 423 RETURN_ERROR(SSIZE_MAX); 424 RETURN_ERROR((ssize_t)request.BytesTransferred()); 425 } 426 427 RETURN_ERROR(error); 428 } 429 430 431 ssize_t 432 UnixFifo::Write(const iovec* vecs, size_t vecCount, 433 ancillary_data_container* ancillaryData, 434 const struct sockaddr_storage* address, bigtime_t timeout) 435 { 436 TRACE("[%" B_PRId32 "] %p->UnixFifo::Write(%p, %ld, %p, %" B_PRIdBIGTIME 437 ")\n", find_thread(NULL), this, vecs, vecCount, ancillaryData, 438 timeout); 439 440 if (IsWriteShutdown()) 441 RETURN_ERROR(UNIX_FIFO_SHUTDOWN); 442 443 if (IsReadShutdown()) 444 RETURN_ERROR(EPIPE); 445 446 UnixRequest request(vecs, vecCount, ancillaryData, 447 (struct sockaddr_storage*)address); 448 fWriters.Add(&request); 449 fWriteRequested += request.TotalSize(); 450 451 status_t error = _Write(request, timeout); 452 453 bool firstInQueue = fWriters.Head() == &request; 454 fWriters.Remove(&request); 455 fWriteRequested -= request.TotalSize(); 456 457 if (firstInQueue && !fWriters.IsEmpty() && fBuffer.Writable() > 0 458 && !IsWriteShutdown()) { 459 // There's more space for writing, other writers, and we were first in 460 // the queue. So we need to notify the others. 461 fWriteCondition.NotifyAll(); 462 } 463 464 if (request.BytesTransferred() > 0 && !fReaders.IsEmpty() 465 && !IsReadShutdown()) { 466 // We've written something and there are readers. Notify them. 467 fReadCondition.NotifyAll(); 468 } 469 470 if (request.BytesTransferred() > 0) { 471 if (request.BytesTransferred() > SSIZE_MAX) 472 RETURN_ERROR(SSIZE_MAX); 473 RETURN_ERROR((ssize_t)request.BytesTransferred()); 474 } 475 476 RETURN_ERROR(error); 477 } 478 479 480 size_t 481 UnixFifo::Readable() const 482 { 483 size_t readable = fBuffer.Readable(); 484 return (off_t)readable > fReadRequested ? readable - fReadRequested : 0; 485 } 486 487 488 size_t 489 UnixFifo::Writable() const 490 { 491 size_t writable = fBuffer.Writable(); 492 return (off_t)writable > fWriteRequested ? writable - fWriteRequested : 0; 493 } 494 495 496 status_t 497 UnixFifo::SetBufferCapacity(size_t capacity) 498 { 499 // check against allowed minimal/maximal value 500 if (capacity > UNIX_FIFO_MAXIMAL_CAPACITY) 501 capacity = UNIX_FIFO_MAXIMAL_CAPACITY; 502 else if (capacity < UNIX_FIFO_MINIMAL_CAPACITY) 503 capacity = UNIX_FIFO_MINIMAL_CAPACITY; 504 505 size_t oldCapacity = fBuffer.Capacity(); 506 if (capacity == oldCapacity) 507 return B_OK; 508 509 // set capacity 510 status_t error = fBuffer.SetCapacity(capacity); 511 if (error != B_OK) 512 return error; 513 514 // wake up waiting writers, if the capacity increased 515 if (!fWriters.IsEmpty() && !IsWriteShutdown()) 516 fWriteCondition.NotifyAll(); 517 518 return B_OK; 519 } 520 521 522 status_t 523 UnixFifo::_Read(UnixRequest& request, bigtime_t timeout) 524 { 525 // wait for the request to reach the front of the queue 526 if (fReaders.Head() != &request && timeout == 0) 527 RETURN_ERROR(B_WOULD_BLOCK); 528 529 while (fReaders.Head() != &request 530 && !(IsReadShutdown() && fBuffer.Readable() == 0)) { 531 ConditionVariableEntry entry; 532 fReadCondition.Add(&entry); 533 534 mutex_unlock(&fLock); 535 status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, 536 timeout); 537 mutex_lock(&fLock); 538 539 if (error != B_OK) 540 RETURN_ERROR(error); 541 } 542 543 if (fBuffer.Readable() == 0) { 544 if (IsReadShutdown()) 545 RETURN_ERROR(UNIX_FIFO_SHUTDOWN); 546 547 if (IsWriteShutdown()) 548 RETURN_ERROR(0); 549 550 if (timeout == 0) 551 RETURN_ERROR(B_WOULD_BLOCK); 552 } 553 554 // wait for any data to become available 555 // TODO: Support low water marks! 556 while (fBuffer.Readable() == 0 557 && !IsReadShutdown() && !IsWriteShutdown()) { 558 ConditionVariableEntry entry; 559 fReadCondition.Add(&entry); 560 561 mutex_unlock(&fLock); 562 status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, 563 timeout); 564 mutex_lock(&fLock); 565 566 if (error != B_OK) 567 RETURN_ERROR(error); 568 } 569 570 if (fBuffer.Readable() == 0) { 571 if (IsReadShutdown()) 572 RETURN_ERROR(UNIX_FIFO_SHUTDOWN); 573 if (IsWriteShutdown()) 574 RETURN_ERROR(0); 575 } 576 577 RETURN_ERROR(fBuffer.Read(request)); 578 } 579 580 581 status_t 582 UnixFifo::_Write(UnixRequest& request, bigtime_t timeout) 583 { 584 if (timeout == 0) 585 RETURN_ERROR(_WriteNonBlocking(request)); 586 587 // wait for the request to reach the front of the queue 588 while (fWriters.Head() != &request && !IsWriteShutdown()) { 589 ConditionVariableEntry entry; 590 fWriteCondition.Add(&entry); 591 592 mutex_unlock(&fLock); 593 status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, 594 timeout); 595 mutex_lock(&fLock); 596 597 if (error != B_OK) 598 RETURN_ERROR(error); 599 } 600 601 if (IsWriteShutdown()) 602 RETURN_ERROR(UNIX_FIFO_SHUTDOWN); 603 604 if (IsReadShutdown()) 605 RETURN_ERROR(EPIPE); 606 607 if (request.TotalSize() == 0) 608 return 0; 609 610 status_t error = B_OK; 611 612 while (error == B_OK && request.BytesRemaining() > 0) { 613 // wait for any space to become available 614 while (error == B_OK && fBuffer.Writable() < _MinimumWritableSize(request) 615 && !IsWriteShutdown() && !IsReadShutdown()) { 616 ConditionVariableEntry entry; 617 fWriteCondition.Add(&entry); 618 619 mutex_unlock(&fLock); 620 error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout); 621 mutex_lock(&fLock); 622 623 if (error != B_OK) 624 RETURN_ERROR(error); 625 } 626 627 if (IsWriteShutdown()) 628 RETURN_ERROR(UNIX_FIFO_SHUTDOWN); 629 630 if (IsReadShutdown()) 631 RETURN_ERROR(EPIPE); 632 633 // write as much as we can 634 error = fBuffer.Write(request); 635 636 if (error == B_OK) { 637 // TODO: Whenever we've successfully written a part, we should reset the 638 // timeout! 639 } 640 } 641 642 RETURN_ERROR(error); 643 } 644 645 646 status_t 647 UnixFifo::_WriteNonBlocking(UnixRequest& request) 648 { 649 // We need to be first in queue and space should be available right now, 650 // otherwise we need to fail. 651 if (fWriters.Head() != &request || fBuffer.Writable() < _MinimumWritableSize(request)) 652 RETURN_ERROR(B_WOULD_BLOCK); 653 654 if (request.TotalSize() == 0) 655 return 0; 656 657 // Write as much as we can. 658 RETURN_ERROR(fBuffer.Write(request)); 659 } 660 661 662 size_t 663 UnixFifo::_MinimumWritableSize(const UnixRequest& request) const 664 { 665 switch (fType) { 666 case UnixFifoType::Datagram: 667 return request.TotalSize(); 668 case UnixFifoType::Stream: 669 default: 670 return 1; 671 } 672 } 673