1 /*
2 * Copyright 2009, 2017, Haiku, Inc.
3 * Distributed under the terms of the MIT License.
4 *
5 * Authors:
6 * Michael Lotz <mmlr@mlotz.ch>
7 */
8
9 #include "StreamingRingBuffer.h"
10
11 #include <Autolock.h>
12
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <string.h>
16
17
18 #ifdef CLIENT_COMPILE
19 #define TRACE_ALWAYS(x...) printf("StreamingRingBuffer: " x)
20 #else
21 #define TRACE_ALWAYS(x...) debug_printf("StreamingRingBuffer: " x)
22 #endif
23
24 #define TRACE(x...) /*TRACE_ALWAYS(x)*/
25 #define TRACE_ERROR(x...) TRACE_ALWAYS(x)
26
27
StreamingRingBuffer(size_t bufferSize)28 StreamingRingBuffer::StreamingRingBuffer(size_t bufferSize)
29 :
30 fReaderWaiting(false),
31 fWriterWaiting(false),
32 fCancelRead(false),
33 fCancelWrite(false),
34 fReaderNotifier(-1),
35 fWriterNotifier(-1),
36 fReaderLocker("StreamingRingBuffer reader"),
37 fWriterLocker("StreamingRingBuffer writer"),
38 fDataLocker("StreamingRingBuffer data"),
39 fBuffer(NULL),
40 fBufferSize(bufferSize),
41 fReadable(0),
42 fReadPosition(0),
43 fWritePosition(0)
44 {
45 fReaderNotifier = create_sem(0, "StreamingRingBuffer read notify");
46 fWriterNotifier = create_sem(0, "StreamingRingBuffer write notify");
47
48 fBuffer = (uint8 *)malloc(fBufferSize);
49 if (fBuffer == NULL)
50 fBufferSize = 0;
51 }
52
53
~StreamingRingBuffer()54 StreamingRingBuffer::~StreamingRingBuffer()
55 {
56 delete_sem(fReaderNotifier);
57 delete_sem(fWriterNotifier);
58 free(fBuffer);
59 }
60
61
62 status_t
InitCheck()63 StreamingRingBuffer::InitCheck()
64 {
65 if (fReaderNotifier < 0)
66 return fReaderNotifier;
67 if (fWriterNotifier < 0)
68 return fWriterNotifier;
69 if (fBuffer == NULL)
70 return B_NO_MEMORY;
71
72 return B_OK;
73 }
74
75
76 int32
Read(void * buffer,size_t length,bool onlyBlockOnNoData)77 StreamingRingBuffer::Read(void *buffer, size_t length, bool onlyBlockOnNoData)
78 {
79 BAutolock readerLock(fReaderLocker);
80 if (!readerLock.IsLocked())
81 return B_ERROR;
82
83 BAutolock dataLock(fDataLocker);
84 if (!dataLock.IsLocked())
85 return B_ERROR;
86
87 int32 readSize = 0;
88 while (length > 0) {
89 size_t copyLength = min_c(length, fBufferSize - fReadPosition);
90 copyLength = min_c(copyLength, fReadable);
91
92 if (copyLength == 0) {
93 if (onlyBlockOnNoData && readSize > 0)
94 return readSize;
95
96 fReaderWaiting = true;
97 dataLock.Unlock();
98
99 status_t result;
100 do {
101 TRACE("waiting in reader\n");
102 result = acquire_sem(fReaderNotifier);
103 TRACE("done waiting in reader with status: %#" B_PRIx32 "\n",
104 result);
105 } while (result == B_INTERRUPTED);
106
107 if (result != B_OK)
108 return result;
109
110 if (!dataLock.Lock()) {
111 TRACE_ERROR("failed to acquire data lock\n");
112 return B_ERROR;
113 }
114
115 if (fCancelRead) {
116 TRACE("read canceled\n");
117 fCancelRead = false;
118 return B_CANCELED;
119 }
120
121 continue;
122 }
123
124 // support discarding input
125 if (buffer != NULL) {
126 memcpy(buffer, fBuffer + fReadPosition, copyLength);
127 buffer = (uint8 *)buffer + copyLength;
128 }
129
130 fReadPosition = (fReadPosition + copyLength) % fBufferSize;
131 fReadable -= copyLength;
132 readSize += copyLength;
133 length -= copyLength;
134
135 if (fWriterWaiting) {
136 release_sem_etc(fWriterNotifier, 1, B_DO_NOT_RESCHEDULE);
137 fWriterWaiting = false;
138 }
139 }
140
141 return readSize;
142 }
143
144
145 status_t
Write(const void * buffer,size_t length)146 StreamingRingBuffer::Write(const void *buffer, size_t length)
147 {
148 BAutolock writerLock(fWriterLocker);
149 if (!writerLock.IsLocked())
150 return B_ERROR;
151
152 BAutolock dataLock(fDataLocker);
153 if (!dataLock.IsLocked())
154 return B_ERROR;
155
156 while (length > 0) {
157 size_t copyLength = min_c(length, fBufferSize - fWritePosition);
158 copyLength = min_c(copyLength, fBufferSize - fReadable);
159
160 if (copyLength == 0) {
161 fWriterWaiting = true;
162 dataLock.Unlock();
163
164 status_t result;
165 do {
166 TRACE("waiting in writer\n");
167 result = acquire_sem(fWriterNotifier);
168 TRACE("done waiting in writer with status: %#" B_PRIx32 "\n",
169 result);
170 } while (result == B_INTERRUPTED);
171
172 if (result != B_OK)
173 return result;
174
175 if (!dataLock.Lock()) {
176 TRACE_ERROR("failed to acquire data lock\n");
177 return B_ERROR;
178 }
179
180 if (fCancelWrite) {
181 TRACE("write canceled\n");
182 fCancelWrite = false;
183 return B_CANCELED;
184 }
185
186 continue;
187 }
188
189 memcpy(fBuffer + fWritePosition, buffer, copyLength);
190 fWritePosition = (fWritePosition + copyLength) % fBufferSize;
191 fReadable += copyLength;
192
193 buffer = (uint8 *)buffer + copyLength;
194 length -= copyLength;
195
196 if (fReaderWaiting) {
197 release_sem_etc(fReaderNotifier, 1, B_DO_NOT_RESCHEDULE);
198 fReaderWaiting = false;
199 }
200 }
201
202 return B_OK;
203 }
204
205
206 void
MakeEmpty()207 StreamingRingBuffer::MakeEmpty()
208 {
209 BAutolock dataLock(fDataLocker);
210 if (!dataLock.IsLocked())
211 return;
212
213 fReadPosition = fWritePosition = 0;
214 fReadable = 0;
215
216 if (fWriterWaiting) {
217 release_sem_etc(fWriterNotifier, 1, 0);
218 fWriterWaiting = false;
219 fCancelWrite = true;
220 }
221
222 if (fReaderWaiting) {
223 release_sem_etc(fReaderNotifier, 1, 0);
224 fReaderWaiting = false;
225 fCancelRead = true;
226 }
227 }
228