1 /* 2 * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de. 3 * Distributed under the terms of the MIT License. 4 */ 5 6 #include "UnixFifo.h" 7 8 #include <new> 9 10 #include <AutoDeleter.h> 11 12 #include <net_stack.h> 13 #include <util/ring_buffer.h> 14 15 #include "unix.h" 16 17 18 #define UNIX_FIFO_DEBUG_LEVEL 0 19 #define UNIX_DEBUG_LEVEL UNIX_FIFO_DEBUG_LEVEL 20 #include "UnixDebug.h" 21 22 23 // #pragma mark - UnixRequest 24 25 26 UnixRequest::UnixRequest(const iovec* vecs, size_t count, 27 ancillary_data_container* ancillaryData) 28 : 29 fVecs(vecs), 30 fVecCount(count), 31 fAncillaryData(ancillaryData), 32 fTotalSize(0), 33 fBytesTransferred(0), 34 fVecIndex(0), 35 fVecOffset(0) 36 { 37 for (size_t i = 0; i < fVecCount; i++) 38 fTotalSize += fVecs[i].iov_len; 39 } 40 41 42 void 43 UnixRequest::AddBytesTransferred(size_t size) 44 { 45 fBytesTransferred += size; 46 47 // also adjust the current iovec index/offset 48 while (fVecIndex < fVecCount 49 && fVecs[fVecIndex].iov_len - fVecOffset <= size) { 50 size -= fVecs[fVecIndex].iov_len - fVecOffset; 51 fVecIndex++; 52 fVecOffset = 0; 53 } 54 55 if (fVecIndex < fVecCount) 56 fVecOffset += size; 57 } 58 59 60 bool 61 UnixRequest::GetCurrentChunk(void*& data, size_t& size) 62 { 63 while (fVecIndex < fVecCount 64 && fVecOffset >= fVecs[fVecIndex].iov_len) { 65 fVecIndex++; 66 fVecOffset = 0; 67 } 68 if (fVecIndex >= fVecCount) 69 return false; 70 71 data = (uint8*)fVecs[fVecIndex].iov_base + fVecOffset; 72 size = fVecs[fVecIndex].iov_len - fVecOffset; 73 return true; 74 } 75 76 77 void 78 UnixRequest::SetAncillaryData(ancillary_data_container* data) 79 { 80 fAncillaryData = data; 81 } 82 83 84 void 85 UnixRequest::AddAncillaryData(ancillary_data_container* data) 86 { 87 if (fAncillaryData != NULL) { 88 gStackModule->move_ancillary_data(data, fAncillaryData); 89 gStackModule->delete_ancillary_data_container(data); 90 } else 91 fAncillaryData = data; 92 } 93 94 95 // #pragma mark - UnixBufferQueue 96 97 98 UnixBufferQueue::UnixBufferQueue(size_t capacity) 99 : 100 fBuffer(NULL), 101 fCapacity(capacity) 102 { 103 } 104 105 106 UnixBufferQueue::~UnixBufferQueue() 107 { 108 while (AncillaryDataEntry* entry = fAncillaryData.RemoveHead()) { 109 gStackModule->delete_ancillary_data_container(entry->data); 110 delete entry; 111 } 112 113 delete_ring_buffer(fBuffer); 114 } 115 116 117 status_t 118 UnixBufferQueue::Init() 119 { 120 fBuffer = create_ring_buffer(fCapacity); 121 if (fBuffer == NULL) 122 return B_NO_MEMORY; 123 return B_OK; 124 } 125 126 127 size_t 128 UnixBufferQueue::Readable() const 129 { 130 return ring_buffer_readable(fBuffer); 131 } 132 133 134 size_t 135 UnixBufferQueue::Writable() const 136 { 137 return ring_buffer_writable(fBuffer); 138 } 139 140 141 status_t 142 UnixBufferQueue::Read(UnixRequest& request) 143 { 144 bool user = gStackModule->is_syscall(); 145 146 size_t readable = Readable(); 147 void* data; 148 size_t size; 149 150 while (readable > 0 && request.GetCurrentChunk(data, size)) { 151 if (size > readable) 152 size = readable; 153 154 ssize_t bytesRead; 155 if (user) 156 bytesRead = ring_buffer_user_read(fBuffer, (uint8*)data, size); 157 else 158 bytesRead = ring_buffer_read(fBuffer, (uint8*)data, size); 159 160 if (bytesRead < 0) 161 return bytesRead; 162 if (bytesRead == 0) 163 return B_ERROR; 164 165 // Adjust ancillary data entry offsets, respectively attach the ones 166 // that belong to the read data to the request. 167 if (AncillaryDataEntry* entry = fAncillaryData.Head()) { 168 size_t offsetDelta = bytesRead; 169 while (entry != NULL && offsetDelta > entry->offset) { 170 // entry data have been read -- add ancillary data to request 171 fAncillaryData.RemoveHead(); 172 offsetDelta -= entry->offset; 173 request.AddAncillaryData(entry->data); 174 delete entry; 175 176 entry = fAncillaryData.Head(); 177 } 178 179 if (entry != NULL) 180 entry->offset -= offsetDelta; 181 } 182 183 request.AddBytesTransferred(bytesRead); 184 readable -= bytesRead; 185 } 186 187 return B_OK; 188 } 189 190 191 status_t 192 UnixBufferQueue::Write(UnixRequest& request) 193 { 194 bool user = gStackModule->is_syscall(); 195 196 size_t writable = Writable(); 197 void* data; 198 size_t size; 199 200 // If the request has ancillary data create an entry first. 201 AncillaryDataEntry* ancillaryEntry = NULL; 202 ObjectDeleter<AncillaryDataEntry> ancillaryEntryDeleter; 203 if (writable > 0 && request.AncillaryData() != NULL) { 204 ancillaryEntry = new(std::nothrow) AncillaryDataEntry; 205 if (ancillaryEntry == NULL) 206 return B_NO_MEMORY; 207 208 ancillaryEntryDeleter.SetTo(ancillaryEntry); 209 ancillaryEntry->data = request.AncillaryData(); 210 ancillaryEntry->offset = Readable(); 211 212 // The offsets are relative to the previous entry. 213 AncillaryDataList::Iterator it = fAncillaryData.GetIterator(); 214 while (AncillaryDataEntry* entry = it.Next()) 215 ancillaryEntry->offset -= entry->offset; 216 // TODO: This is inefficient when the list is long. Rather also 217 // store and maintain the absolute offset of the last queued entry. 218 } 219 220 // write as much as we can 221 while (writable > 0 && request.GetCurrentChunk(data, size)) { 222 if (size > writable) 223 size = writable; 224 225 ssize_t bytesWritten; 226 if (user) 227 bytesWritten = ring_buffer_user_write(fBuffer, (uint8*)data, size); 228 else 229 bytesWritten = ring_buffer_write(fBuffer, (uint8*)data, size); 230 231 if (bytesWritten < 0) 232 return bytesWritten; 233 if (bytesWritten == 0) 234 return B_ERROR; 235 236 if (ancillaryEntry != NULL) { 237 fAncillaryData.Add(ancillaryEntry); 238 ancillaryEntryDeleter.Detach(); 239 request.SetAncillaryData(NULL); 240 ancillaryEntry = NULL; 241 } 242 243 request.AddBytesTransferred(bytesWritten); 244 writable -= bytesWritten; 245 } 246 247 return B_OK; 248 } 249 250 251 status_t 252 UnixBufferQueue::SetCapacity(size_t capacity) 253 { 254 // TODO:... 255 return B_ERROR; 256 } 257 258 259 // #pragma mark - 260 261 262 UnixFifo::UnixFifo(size_t capacity) 263 : 264 fBuffer(capacity), 265 fReaders(), 266 fWriters(), 267 fReadRequested(0), 268 fWriteRequested(0), 269 fShutdown(0) 270 271 { 272 fReadCondition.Init(this, "unix fifo read"); 273 fWriteCondition.Init(this, "unix fifo write"); 274 mutex_init(&fLock, "unix fifo"); 275 } 276 277 278 UnixFifo::~UnixFifo() 279 { 280 mutex_destroy(&fLock); 281 } 282 283 284 status_t 285 UnixFifo::Init() 286 { 287 return fBuffer.Init(); 288 } 289 290 291 void 292 UnixFifo::Shutdown(uint32 shutdown) 293 { 294 TRACE("[%ld] %p->UnixFifo::Shutdown(0x%lx)\n", find_thread(NULL), this, 295 shutdown); 296 297 fShutdown |= shutdown; 298 299 if (shutdown != 0) { 300 // Shutting down either end also effects the other, so notify both. 301 fReadCondition.NotifyAll(); 302 fWriteCondition.NotifyAll(); 303 } 304 } 305 306 307 ssize_t 308 UnixFifo::Read(const iovec* vecs, size_t vecCount, 309 ancillary_data_container** _ancillaryData, bigtime_t timeout) 310 { 311 TRACE("[%ld] %p->UnixFifo::Read(%p, %ld, %lld)\n", find_thread(NULL), 312 this, vecs, vecCount, timeout); 313 314 if (IsReadShutdown() && fBuffer.Readable() == 0) 315 RETURN_ERROR(UNIX_FIFO_SHUTDOWN); 316 317 UnixRequest request(vecs, vecCount, NULL); 318 fReaders.Add(&request); 319 fReadRequested += request.TotalSize(); 320 321 status_t error = _Read(request, timeout); 322 323 bool firstInQueue = fReaders.Head() == &request; 324 fReaders.Remove(&request); 325 fReadRequested -= request.TotalSize(); 326 327 if (firstInQueue && !fReaders.IsEmpty() && fBuffer.Readable() > 0 328 && !IsReadShutdown()) { 329 // There's more to read, other readers, and we were first in the queue. 330 // So we need to notify the others. 331 fReadCondition.NotifyAll(); 332 } 333 334 if (request.BytesTransferred() > 0 && !fWriters.IsEmpty() 335 && !IsWriteShutdown()) { 336 // We read something and there are writers. Notify them 337 fWriteCondition.NotifyAll(); 338 } 339 340 *_ancillaryData = request.AncillaryData(); 341 342 if (request.BytesTransferred() > 0) { 343 if (request.BytesTransferred() > SSIZE_MAX) 344 RETURN_ERROR(SSIZE_MAX); 345 RETURN_ERROR((ssize_t)request.BytesTransferred()); 346 } 347 348 RETURN_ERROR(error); 349 } 350 351 352 ssize_t 353 UnixFifo::Write(const iovec* vecs, size_t vecCount, 354 ancillary_data_container* ancillaryData, bigtime_t timeout) 355 { 356 TRACE("[%ld] %p->UnixFifo::Write(%p, %ld, %p, %lld)\n", find_thread(NULL), 357 this, vecs, vecCount, ancillaryData, timeout); 358 359 if (IsWriteShutdown()) 360 RETURN_ERROR(UNIX_FIFO_SHUTDOWN); 361 362 if (IsReadShutdown()) 363 RETURN_ERROR(EPIPE); 364 365 UnixRequest request(vecs, vecCount, ancillaryData); 366 fWriters.Add(&request); 367 fWriteRequested += request.TotalSize(); 368 369 status_t error = _Write(request, timeout); 370 371 bool firstInQueue = fWriters.Head() == &request; 372 fWriters.Remove(&request); 373 fWriteRequested -= request.TotalSize(); 374 375 if (firstInQueue && !fWriters.IsEmpty() && fBuffer.Writable() > 0 376 && !IsWriteShutdown()) { 377 // There's more space for writing, other writers, and we were first in 378 // the queue. So we need to notify the others. 379 fWriteCondition.NotifyAll(); 380 } 381 382 if (request.BytesTransferred() > 0 && !fReaders.IsEmpty() 383 && !IsReadShutdown()) { 384 // We've written something and there are readers. Notify them. 385 fReadCondition.NotifyAll(); 386 } 387 388 if (request.BytesTransferred() > 0) { 389 if (request.BytesTransferred() > SSIZE_MAX) 390 RETURN_ERROR(SSIZE_MAX); 391 RETURN_ERROR((ssize_t)request.BytesTransferred()); 392 } 393 394 RETURN_ERROR(error); 395 } 396 397 398 size_t 399 UnixFifo::Readable() const 400 { 401 size_t readable = fBuffer.Readable(); 402 return (off_t)readable > fReadRequested ? readable - fReadRequested : 0; 403 } 404 405 406 size_t 407 UnixFifo::Writable() const 408 { 409 size_t writable = fBuffer.Writable(); 410 return (off_t)writable > fWriteRequested ? writable - fWriteRequested : 0; 411 } 412 413 414 status_t 415 UnixFifo::SetBufferCapacity(size_t capacity) 416 { 417 // check against allowed minimal/maximal value 418 if (capacity > UNIX_FIFO_MAXIMAL_CAPACITY) 419 capacity = UNIX_FIFO_MAXIMAL_CAPACITY; 420 else if (capacity < UNIX_FIFO_MINIMAL_CAPACITY) 421 capacity = UNIX_FIFO_MINIMAL_CAPACITY; 422 423 size_t oldCapacity = fBuffer.Capacity(); 424 if (capacity == oldCapacity) 425 return B_OK; 426 427 // set capacity 428 status_t error = fBuffer.SetCapacity(capacity); 429 if (error != B_OK) 430 return error; 431 432 // wake up waiting writers, if the capacity increased 433 if (!fWriters.IsEmpty() && !IsWriteShutdown()) 434 fWriteCondition.NotifyAll(); 435 436 return B_OK; 437 } 438 439 440 status_t 441 UnixFifo::_Read(UnixRequest& request, bigtime_t timeout) 442 { 443 // wait for the request to reach the front of the queue 444 if (fReaders.Head() != &request && timeout == 0) 445 RETURN_ERROR(B_WOULD_BLOCK); 446 447 while (fReaders.Head() != &request 448 && !(IsReadShutdown() && fBuffer.Readable() == 0)) { 449 ConditionVariableEntry entry; 450 fReadCondition.Add(&entry); 451 452 mutex_unlock(&fLock); 453 status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, 454 timeout); 455 mutex_lock(&fLock); 456 457 if (error != B_OK) 458 RETURN_ERROR(error); 459 } 460 461 if (fBuffer.Readable() == 0) { 462 if (IsReadShutdown()) 463 RETURN_ERROR(UNIX_FIFO_SHUTDOWN); 464 465 if (IsWriteShutdown()) 466 RETURN_ERROR(0); 467 468 if (timeout == 0) 469 RETURN_ERROR(B_WOULD_BLOCK); 470 } 471 472 // wait for any data to become available 473 // TODO: Support low water marks! 474 while (fBuffer.Readable() == 0 475 && !IsReadShutdown() && !IsWriteShutdown()) { 476 ConditionVariableEntry entry; 477 fReadCondition.Add(&entry); 478 479 mutex_unlock(&fLock); 480 status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, 481 timeout); 482 mutex_lock(&fLock); 483 484 if (error != B_OK) 485 RETURN_ERROR(error); 486 } 487 488 if (fBuffer.Readable() == 0) { 489 if (IsReadShutdown()) 490 RETURN_ERROR(UNIX_FIFO_SHUTDOWN); 491 if (IsWriteShutdown()) 492 RETURN_ERROR(0); 493 } 494 495 RETURN_ERROR(fBuffer.Read(request)); 496 } 497 498 499 status_t 500 UnixFifo::_Write(UnixRequest& request, bigtime_t timeout) 501 { 502 if (timeout == 0) 503 RETURN_ERROR(_WriteNonBlocking(request)); 504 505 // wait for the request to reach the front of the queue 506 while (fWriters.Head() != &request && !IsWriteShutdown()) { 507 ConditionVariableEntry entry; 508 fWriteCondition.Add(&entry); 509 510 mutex_unlock(&fLock); 511 status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, 512 timeout); 513 mutex_lock(&fLock); 514 515 if (error != B_OK) 516 RETURN_ERROR(error); 517 } 518 519 if (IsWriteShutdown()) 520 RETURN_ERROR(UNIX_FIFO_SHUTDOWN); 521 522 if (IsReadShutdown()) 523 RETURN_ERROR(EPIPE); 524 525 if (request.TotalSize() == 0) 526 return 0; 527 528 status_t error = B_OK; 529 530 while (error == B_OK && request.BytesRemaining() > 0) { 531 // wait for any space to become available 532 while (error == B_OK && fBuffer.Writable() == 0 && !IsWriteShutdown() 533 && !IsReadShutdown()) { 534 ConditionVariableEntry entry; 535 fWriteCondition.Add(&entry); 536 537 mutex_unlock(&fLock); 538 error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout); 539 mutex_lock(&fLock); 540 541 if (error != B_OK) 542 RETURN_ERROR(error); 543 } 544 545 if (IsWriteShutdown()) 546 RETURN_ERROR(UNIX_FIFO_SHUTDOWN); 547 548 if (IsReadShutdown()) 549 RETURN_ERROR(EPIPE); 550 551 // write as much as we can 552 error = fBuffer.Write(request); 553 554 if (error == B_OK) { 555 // TODO: Whenever we've successfully written a part, we should reset the 556 // timeout! 557 } 558 } 559 560 RETURN_ERROR(error); 561 } 562 563 564 status_t 565 UnixFifo::_WriteNonBlocking(UnixRequest& request) 566 { 567 // We need to be first in queue and space should be available right now, 568 // otherwise we need to fail. 569 if (fWriters.Head() != &request || fBuffer.Writable() == 0) 570 RETURN_ERROR(B_WOULD_BLOCK); 571 572 if (request.TotalSize() == 0) 573 return 0; 574 575 // Write as much as we can. 576 RETURN_ERROR(fBuffer.Write(request)); 577 } 578 579