1 /* 2 * Copyright 2009, 2017, Haiku, Inc. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Michael Lotz <mmlr@mlotz.ch> 7 */ 8 9 #include "StreamingRingBuffer.h" 10 11 #include <Autolock.h> 12 13 #include <stdio.h> 14 #include <stdlib.h> 15 #include <string.h> 16 17 18 #ifdef CLIENT_COMPILE 19 #define TRACE_ALWAYS(x...) printf("StreamingRingBuffer: " x) 20 #else 21 #define TRACE_ALWAYS(x...) debug_printf("StreamingRingBuffer: " x) 22 #endif 23 24 #define TRACE(x...) /*TRACE_ALWAYS(x)*/ 25 #define TRACE_ERROR(x...) TRACE_ALWAYS(x) 26 27 28 StreamingRingBuffer::StreamingRingBuffer(size_t bufferSize) 29 : 30 fReaderWaiting(false), 31 fWriterWaiting(false), 32 fCancelRead(false), 33 fCancelWrite(false), 34 fReaderNotifier(-1), 35 fWriterNotifier(-1), 36 fReaderLocker("StreamingRingBuffer reader"), 37 fWriterLocker("StreamingRingBuffer writer"), 38 fDataLocker("StreamingRingBuffer data"), 39 fBuffer(NULL), 40 fBufferSize(bufferSize), 41 fReadable(0), 42 fReadPosition(0), 43 fWritePosition(0) 44 { 45 fReaderNotifier = create_sem(0, "StreamingRingBuffer read notify"); 46 fWriterNotifier = create_sem(0, "StreamingRingBuffer write notify"); 47 48 fBuffer = (uint8 *)malloc(fBufferSize); 49 if (fBuffer == NULL) 50 fBufferSize = 0; 51 } 52 53 54 StreamingRingBuffer::~StreamingRingBuffer() 55 { 56 delete_sem(fReaderNotifier); 57 delete_sem(fWriterNotifier); 58 free(fBuffer); 59 } 60 61 62 status_t 63 StreamingRingBuffer::InitCheck() 64 { 65 if (fReaderNotifier < 0) 66 return fReaderNotifier; 67 if (fWriterNotifier < 0) 68 return fWriterNotifier; 69 if (fBuffer == NULL) 70 return B_NO_MEMORY; 71 72 return B_OK; 73 } 74 75 76 int32 77 StreamingRingBuffer::Read(void *buffer, size_t length, bool onlyBlockOnNoData) 78 { 79 BAutolock readerLock(fReaderLocker); 80 if (!readerLock.IsLocked()) 81 return B_ERROR; 82 83 BAutolock dataLock(fDataLocker); 84 if (!dataLock.IsLocked()) 85 return B_ERROR; 86 87 int32 readSize = 0; 88 while (length > 0) { 89 size_t copyLength = min_c(length, fBufferSize - fReadPosition); 90 copyLength = min_c(copyLength, fReadable); 91 92 if (copyLength == 0) { 93 if (onlyBlockOnNoData && readSize > 0) 94 return readSize; 95 96 fReaderWaiting = true; 97 dataLock.Unlock(); 98 99 status_t result; 100 do { 101 TRACE("waiting in reader\n"); 102 result = acquire_sem(fReaderNotifier); 103 TRACE("done waiting in reader with status: %#" B_PRIx32 "\n", 104 result); 105 } while (result == B_INTERRUPTED); 106 107 if (result != B_OK) 108 return result; 109 110 if (!dataLock.Lock()) { 111 TRACE_ERROR("failed to acquire data lock\n"); 112 return B_ERROR; 113 } 114 115 if (fCancelRead) { 116 TRACE("read canceled\n"); 117 fCancelRead = false; 118 return B_CANCELED; 119 } 120 121 continue; 122 } 123 124 // support discarding input 125 if (buffer != NULL) { 126 memcpy(buffer, fBuffer + fReadPosition, copyLength); 127 buffer = (uint8 *)buffer + copyLength; 128 } 129 130 fReadPosition = (fReadPosition + copyLength) % fBufferSize; 131 fReadable -= copyLength; 132 readSize += copyLength; 133 length -= copyLength; 134 135 if (fWriterWaiting) { 136 release_sem_etc(fWriterNotifier, 1, B_DO_NOT_RESCHEDULE); 137 fWriterWaiting = false; 138 } 139 } 140 141 return readSize; 142 } 143 144 145 status_t 146 StreamingRingBuffer::Write(const void *buffer, size_t length) 147 { 148 BAutolock writerLock(fWriterLocker); 149 if (!writerLock.IsLocked()) 150 return B_ERROR; 151 152 BAutolock dataLock(fDataLocker); 153 if (!dataLock.IsLocked()) 154 return B_ERROR; 155 156 while (length > 0) { 157 size_t copyLength = min_c(length, fBufferSize - fWritePosition); 158 copyLength = min_c(copyLength, fBufferSize - fReadable); 159 160 if (copyLength == 0) { 161 fWriterWaiting = true; 162 dataLock.Unlock(); 163 164 status_t result; 165 do { 166 TRACE("waiting in writer\n"); 167 result = acquire_sem(fWriterNotifier); 168 TRACE("done waiting in writer with status: %#" B_PRIx32 "\n", 169 result); 170 } while (result == B_INTERRUPTED); 171 172 if (result != B_OK) 173 return result; 174 175 if (!dataLock.Lock()) { 176 TRACE_ERROR("failed to acquire data lock\n"); 177 return B_ERROR; 178 } 179 180 if (fCancelWrite) { 181 TRACE("write canceled\n"); 182 fCancelWrite = false; 183 return B_CANCELED; 184 } 185 186 continue; 187 } 188 189 memcpy(fBuffer + fWritePosition, buffer, copyLength); 190 fWritePosition = (fWritePosition + copyLength) % fBufferSize; 191 fReadable += copyLength; 192 193 buffer = (uint8 *)buffer + copyLength; 194 length -= copyLength; 195 196 if (fReaderWaiting) { 197 release_sem_etc(fReaderNotifier, 1, B_DO_NOT_RESCHEDULE); 198 fReaderWaiting = false; 199 } 200 } 201 202 return B_OK; 203 } 204 205 206 void 207 StreamingRingBuffer::MakeEmpty() 208 { 209 BAutolock dataLock(fDataLocker); 210 if (!dataLock.IsLocked()) 211 return; 212 213 fReadPosition = fWritePosition = 0; 214 fReadable = 0; 215 216 if (fWriterWaiting) { 217 release_sem_etc(fWriterNotifier, 1, 0); 218 fWriterWaiting = false; 219 fCancelWrite = true; 220 } 221 222 if (fReaderWaiting) { 223 release_sem_etc(fReaderNotifier, 1, 0); 224 fReaderWaiting = false; 225 fCancelRead = true; 226 } 227 } 228