1 /* 2 * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de. 3 * Distributed under the terms of the MIT License. 4 */ 5 6 #include "UnixFifo.h" 7 8 #include "unix.h" 9 10 11 #define UNIX_FIFO_DEBUG_LEVEL 2 12 #define UNIX_DEBUG_LEVEL UNIX_FIFO_DEBUG_LEVEL 13 #include "UnixDebug.h" 14 15 16 UnixBufferQueue::UnixBufferQueue(size_t capacity) 17 : 18 fSize(0), 19 fCapacity(capacity) 20 { 21 } 22 23 24 UnixBufferQueue::~UnixBufferQueue() 25 { 26 while (net_buffer* buffer = fBuffers.RemoveHead()) 27 gBufferModule->free(buffer); 28 } 29 30 31 status_t 32 UnixBufferQueue::Read(size_t size, net_buffer** _buffer) 33 { 34 if (size > fSize) 35 size = fSize; 36 37 if (size == 0) 38 return B_BAD_VALUE; 39 40 // If the first buffer has the right size or is smaller, we can just 41 // dequeue it. 42 net_buffer* buffer = fBuffers.Head(); 43 if (buffer->size <= size) { 44 fBuffers.RemoveHead(); 45 fSize -= buffer->size; 46 *_buffer = buffer; 47 48 if (buffer->size == size) 49 return B_OK; 50 51 // buffer is too small 52 53 size_t bytesLeft = size - buffer->size; 54 55 // Append from the following buffers, until we've read as much as we're 56 // supposed to. 57 while (bytesLeft > 0) { 58 net_buffer* nextBuffer = fBuffers.Head(); 59 size_t toCopy = min_c(bytesLeft, nextBuffer->size); 60 61 if (gBufferModule->append_cloned(buffer, nextBuffer, 0, toCopy) 62 != B_OK) { 63 // Too bad, but we've got some data, so we don't fail. 64 return B_OK; 65 } 66 67 // transfer the ancillary data 68 gBufferModule->transfer_ancillary_data(nextBuffer, buffer); 69 70 if (nextBuffer->size > toCopy) { 71 // remove the part we've copied 72 gBufferModule->remove_header(nextBuffer, toCopy); 73 } else { 74 // get rid of the buffer completely 75 fBuffers.RemoveHead(); 76 gBufferModule->free(nextBuffer); 77 } 78 79 bytesLeft -= toCopy; 80 } 81 82 return B_OK; 83 } 84 85 // buffer is too big 86 87 // Create a new buffer, and copy into it, as much as we need. 88 net_buffer* newBuffer = gBufferModule->create(256); 89 if (newBuffer == NULL) 90 return ENOBUFS; 91 92 status_t error = gBufferModule->append_cloned(newBuffer, buffer, 0, size); 93 if (error != B_OK) { 94 gBufferModule->free(newBuffer); 95 return error; 96 } 97 98 // transfer the ancillary data 99 gBufferModule->transfer_ancillary_data(buffer, newBuffer); 100 101 // remove the part we've copied 102 gBufferModule->remove_header(buffer, size); 103 104 fSize -= size; 105 *_buffer = newBuffer; 106 107 return B_OK; 108 } 109 110 111 status_t 112 UnixBufferQueue::Write(net_buffer* buffer) 113 { 114 if (buffer->size > Writable()) 115 return ENOBUFS; 116 117 fBuffers.Add(buffer); 118 fSize += buffer->size; 119 120 return B_OK; 121 } 122 123 124 void 125 UnixBufferQueue::SetCapacity(size_t capacity) 126 { 127 fCapacity = capacity; 128 } 129 130 131 // #pragma mark - 132 133 134 UnixFifo::UnixFifo(size_t capacity) 135 : 136 fBuffer(capacity), 137 fReaders(), 138 fWriters(), 139 fReadRequested(0), 140 fWriteRequested(0), 141 fReaderSem(-1), 142 fWriterSem(-1), 143 fShutdown(0) 144 145 { 146 fLock.sem = -1; 147 } 148 149 150 UnixFifo::~UnixFifo() 151 { 152 if (fReaderSem >= 0) 153 delete_sem(fReaderSem); 154 155 if (fWriterSem >= 0) 156 delete_sem(fWriterSem); 157 158 if (fLock.sem >= 0) 159 benaphore_destroy(&fLock); 160 } 161 162 163 status_t 164 UnixFifo::Init() 165 { 166 status_t error = benaphore_init(&fLock, "unix fifo"); 167 168 fReaderSem = create_sem(0, "unix fifo readers"); 169 fWriterSem = create_sem(0, "unix fifo writers"); 170 171 if (error != B_OK || fReaderSem < 0 || fWriterSem < 0) 172 return ENOBUFS; 173 174 return B_OK; 175 } 176 177 178 void 179 UnixFifo::Shutdown(uint32 shutdown) 180 { 181 fShutdown |= shutdown; 182 183 if (shutdown != 0) { 184 // Shutting down either end also effects the other, so notify both. 185 release_sem_etc(fWriterSem, 1, B_RELEASE_ALL); 186 release_sem_etc(fReaderSem, 1, B_RELEASE_ALL); 187 } 188 } 189 190 191 status_t 192 UnixFifo::Read(size_t numBytes, bigtime_t timeout, net_buffer** _buffer) 193 { 194 TRACE("[%ld] UnixFifo::Read(%lu, %lld)\n", find_thread(NULL), numBytes, 195 timeout); 196 197 if (IsReadShutdown()) 198 return UNIX_FIFO_SHUTDOWN; 199 200 Request request(numBytes); 201 fReaders.Add(&request); 202 fReadRequested += request.size; 203 204 status_t error = _Read(request, numBytes, timeout, _buffer); 205 206 bool firstInQueue = fReaders.Head() == &request; 207 fReaders.Remove(&request); 208 fReadRequested -= request.size; 209 210 if (firstInQueue && !fReaders.IsEmpty() && fBuffer.Readable() > 0 211 && !IsReadShutdown()) { 212 // There's more to read, other readers, and we were first in the queue. 213 // So we need to notify the others. 214 release_sem_etc(fReaderSem, 1, B_RELEASE_ALL); 215 } 216 217 if (error == B_OK && *_buffer != NULL && (*_buffer)->size > 0 218 && !fWriters.IsEmpty() && !IsWriteShutdown()) { 219 // We read something and there are writers. Notify them 220 release_sem_etc(fWriterSem, 1, B_RELEASE_ALL); 221 } 222 223 RETURN_ERROR(error); 224 } 225 226 227 status_t 228 UnixFifo::Write(net_buffer* buffer, bigtime_t timeout) 229 { 230 if (IsWriteShutdown()) 231 return UNIX_FIFO_SHUTDOWN; 232 233 Request request(buffer->size); 234 fWriters.Add(&request); 235 fWriteRequested += request.size; 236 237 status_t error = _Write(request, buffer, timeout); 238 239 bool firstInQueue = fWriters.Head() == &request; 240 fWriters.Remove(&request); 241 fWriteRequested -= request.size; 242 243 if (firstInQueue && !fWriters.IsEmpty() && fBuffer.Writable() > 0 244 && !IsWriteShutdown()) { 245 // There's more space for writing, other writers, and we were first in 246 // the queue. So we need to notify the others. 247 release_sem_etc(fWriterSem, 1, B_RELEASE_ALL); 248 } 249 250 if (error == B_OK && request.size > 0 && !fReaders.IsEmpty() 251 && !IsReadShutdown()) { 252 // We've written something and there are readers. Notify them 253 release_sem_etc(fReaderSem, 1, B_RELEASE_ALL); 254 } 255 256 RETURN_ERROR(error); 257 } 258 259 260 size_t 261 UnixFifo::Readable() const 262 { 263 size_t readable = fBuffer.Readable(); 264 return readable > fReadRequested ? readable - fReadRequested : 0; 265 } 266 267 268 size_t 269 UnixFifo::Writable() const 270 { 271 size_t writable = fBuffer.Writable(); 272 return writable > fWriteRequested ? writable - fWriteRequested : 0; 273 } 274 275 276 void 277 UnixFifo::SetBufferCapacity(size_t capacity) 278 { 279 // check against allowed minimal/maximal value 280 if (capacity > UNIX_FIFO_MAXIMAL_CAPACITY) 281 capacity = UNIX_FIFO_MAXIMAL_CAPACITY; 282 else if (capacity < UNIX_FIFO_MINIMAL_CAPACITY) 283 capacity = UNIX_FIFO_MINIMAL_CAPACITY; 284 285 size_t oldCapacity = fBuffer.Capacity(); 286 if (capacity == oldCapacity) 287 return; 288 289 // set capacity 290 fBuffer.SetCapacity(capacity); 291 292 // wake up waiting writers, if the capacity increased 293 if (!fWriters.IsEmpty() && !IsWriteShutdown()) 294 release_sem_etc(fWriterSem, 1, B_RELEASE_ALL); 295 } 296 297 298 status_t 299 UnixFifo::_Read(Request& request, size_t numBytes, bigtime_t timeout, 300 net_buffer** _buffer) 301 { 302 // wait for the request to reach the front of the queue 303 if (fReaders.Head() != &request && timeout == 0) 304 RETURN_ERROR(B_WOULD_BLOCK); 305 306 while (fReaders.Head() != &request && !IsReadShutdown()) { 307 benaphore_unlock(&fLock); 308 309 status_t error = acquire_sem_etc(fReaderSem, 1, 310 B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout); 311 312 benaphore_lock(&fLock); 313 314 if (error != B_OK) 315 RETURN_ERROR(error); 316 } 317 318 if (IsReadShutdown()) 319 return UNIX_FIFO_SHUTDOWN; 320 321 if (fBuffer.Readable() == 0) { 322 if (IsWriteShutdown()) { 323 *_buffer = NULL; 324 RETURN_ERROR(B_OK); 325 } 326 327 if (timeout == 0) 328 RETURN_ERROR(B_WOULD_BLOCK); 329 } 330 331 // wait for any data to become available 332 // TODO: Support low water marks! 333 while (fBuffer.Readable() == 0 334 && !IsReadShutdown() && !IsWriteShutdown()) { 335 benaphore_unlock(&fLock); 336 337 status_t error = acquire_sem_etc(fReaderSem, 1, 338 B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout); 339 340 benaphore_lock(&fLock); 341 342 if (error != B_OK) 343 RETURN_ERROR(error); 344 } 345 346 if (IsReadShutdown()) 347 return UNIX_FIFO_SHUTDOWN; 348 349 if (fBuffer.Readable() == 0 && IsWriteShutdown()) { 350 *_buffer = NULL; 351 RETURN_ERROR(B_OK); 352 } 353 354 RETURN_ERROR(fBuffer.Read(numBytes, _buffer)); 355 } 356 357 358 status_t 359 UnixFifo::_Write(Request& request, net_buffer* buffer, bigtime_t timeout) 360 { 361 // wait for the request to reach the front of the queue 362 if (fWriters.Head() != &request && timeout == 0) 363 RETURN_ERROR(B_WOULD_BLOCK); 364 365 while (fWriters.Head() != &request && !IsWriteShutdown()) { 366 benaphore_unlock(&fLock); 367 368 status_t error = acquire_sem_etc(fWriterSem, 1, 369 B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout); 370 371 benaphore_lock(&fLock); 372 373 if (error != B_OK) 374 RETURN_ERROR(error); 375 } 376 377 if (IsWriteShutdown()) 378 return UNIX_FIFO_SHUTDOWN; 379 380 if (IsReadShutdown()) 381 return EPIPE; 382 383 // wait for any space to become available 384 if (fBuffer.Writable() < request.size && timeout == 0) 385 RETURN_ERROR(B_WOULD_BLOCK); 386 387 while (fBuffer.Writable() < request.size && !IsWriteShutdown() 388 && !IsReadShutdown()) { 389 benaphore_unlock(&fLock); 390 391 status_t error = acquire_sem_etc(fWriterSem, 1, 392 B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout); 393 394 benaphore_lock(&fLock); 395 396 if (error != B_OK) 397 RETURN_ERROR(error); 398 } 399 400 if (IsWriteShutdown()) 401 return UNIX_FIFO_SHUTDOWN; 402 403 if (IsReadShutdown()) 404 return EPIPE; 405 406 RETURN_ERROR(fBuffer.Write(buffer)); 407 } 408