xref: /haiku/src/servers/app/drawing/interface/remote/StreamingRingBuffer.cpp (revision 5ed41cffff30e38974623d0586c36524209cb08e)
1 /*
2  * Copyright 2009, 2017, Haiku, Inc.
3  * Distributed under the terms of the MIT License.
4  *
5  * Authors:
6  *		Michael Lotz <mmlr@mlotz.ch>
7  */
8 
9 #include "StreamingRingBuffer.h"
10 
11 #include <Autolock.h>
12 
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <string.h>
16 
17 
18 #ifdef CLIENT_COMPILE
19 #define TRACE_ALWAYS(x...)		printf("StreamingRingBuffer: " x)
20 #else
21 #define TRACE_ALWAYS(x...)		debug_printf("StreamingRingBuffer: " x)
22 #endif
23 
24 #define TRACE(x...)				/*TRACE_ALWAYS(x)*/
25 #define TRACE_ERROR(x...)		TRACE_ALWAYS(x)
26 
27 
StreamingRingBuffer(size_t bufferSize)28 StreamingRingBuffer::StreamingRingBuffer(size_t bufferSize)
29 	:
30 	fReaderWaiting(false),
31 	fWriterWaiting(false),
32 	fCancelRead(false),
33 	fCancelWrite(false),
34 	fReaderNotifier(-1),
35 	fWriterNotifier(-1),
36 	fReaderLocker("StreamingRingBuffer reader"),
37 	fWriterLocker("StreamingRingBuffer writer"),
38 	fDataLocker("StreamingRingBuffer data"),
39 	fBuffer(NULL),
40 	fBufferSize(bufferSize),
41 	fReadable(0),
42 	fReadPosition(0),
43 	fWritePosition(0)
44 {
45 	fReaderNotifier = create_sem(0, "StreamingRingBuffer read notify");
46 	fWriterNotifier = create_sem(0, "StreamingRingBuffer write notify");
47 
48 	fBuffer = (uint8 *)malloc(fBufferSize);
49 	if (fBuffer == NULL)
50 		fBufferSize = 0;
51 }
52 
53 
~StreamingRingBuffer()54 StreamingRingBuffer::~StreamingRingBuffer()
55 {
56 	delete_sem(fReaderNotifier);
57 	delete_sem(fWriterNotifier);
58 	free(fBuffer);
59 }
60 
61 
62 status_t
InitCheck()63 StreamingRingBuffer::InitCheck()
64 {
65 	if (fReaderNotifier < 0)
66 		return fReaderNotifier;
67 	if (fWriterNotifier < 0)
68 		return fWriterNotifier;
69 	if (fBuffer == NULL)
70 		return B_NO_MEMORY;
71 
72 	return B_OK;
73 }
74 
75 
76 int32
Read(void * buffer,size_t length,bool onlyBlockOnNoData)77 StreamingRingBuffer::Read(void *buffer, size_t length, bool onlyBlockOnNoData)
78 {
79 	BAutolock readerLock(fReaderLocker);
80 	if (!readerLock.IsLocked())
81 		return B_ERROR;
82 
83 	BAutolock dataLock(fDataLocker);
84 	if (!dataLock.IsLocked())
85 		return B_ERROR;
86 
87 	int32 readSize = 0;
88 	while (length > 0) {
89 		size_t copyLength = min_c(length, fBufferSize - fReadPosition);
90 		copyLength = min_c(copyLength, fReadable);
91 
92 		if (copyLength == 0) {
93 			if (onlyBlockOnNoData && readSize > 0)
94 				return readSize;
95 
96 			fReaderWaiting = true;
97 			dataLock.Unlock();
98 
99 			status_t result;
100 			do {
101 				TRACE("waiting in reader\n");
102 				result = acquire_sem(fReaderNotifier);
103 				TRACE("done waiting in reader with status: %#" B_PRIx32 "\n",
104 					result);
105 			} while (result == B_INTERRUPTED);
106 
107 			if (result != B_OK)
108 				return result;
109 
110 			if (!dataLock.Lock()) {
111 				TRACE_ERROR("failed to acquire data lock\n");
112 				return B_ERROR;
113 			}
114 
115 			if (fCancelRead) {
116 				TRACE("read canceled\n");
117 				fCancelRead = false;
118 				return B_CANCELED;
119 			}
120 
121 			continue;
122 		}
123 
124 		// support discarding input
125 		if (buffer != NULL) {
126 			memcpy(buffer, fBuffer + fReadPosition, copyLength);
127 			buffer = (uint8 *)buffer + copyLength;
128 		}
129 
130 		fReadPosition = (fReadPosition + copyLength) % fBufferSize;
131 		fReadable -= copyLength;
132 		readSize += copyLength;
133 		length -= copyLength;
134 
135 		if (fWriterWaiting) {
136 			release_sem_etc(fWriterNotifier, 1, B_DO_NOT_RESCHEDULE);
137 			fWriterWaiting = false;
138 		}
139 	}
140 
141 	return readSize;
142 }
143 
144 
145 status_t
Write(const void * buffer,size_t length)146 StreamingRingBuffer::Write(const void *buffer, size_t length)
147 {
148 	BAutolock writerLock(fWriterLocker);
149 	if (!writerLock.IsLocked())
150 		return B_ERROR;
151 
152 	BAutolock dataLock(fDataLocker);
153 	if (!dataLock.IsLocked())
154 		return B_ERROR;
155 
156 	while (length > 0) {
157 		size_t copyLength = min_c(length, fBufferSize - fWritePosition);
158 		copyLength = min_c(copyLength, fBufferSize - fReadable);
159 
160 		if (copyLength == 0) {
161 			fWriterWaiting = true;
162 			dataLock.Unlock();
163 
164 			status_t result;
165 			do {
166 				TRACE("waiting in writer\n");
167 				result = acquire_sem(fWriterNotifier);
168 				TRACE("done waiting in writer with status: %#" B_PRIx32 "\n",
169 					result);
170 			} while (result == B_INTERRUPTED);
171 
172 			if (result != B_OK)
173 				return result;
174 
175 			if (!dataLock.Lock()) {
176 				TRACE_ERROR("failed to acquire data lock\n");
177 				return B_ERROR;
178 			}
179 
180 			if (fCancelWrite) {
181 				TRACE("write canceled\n");
182 				fCancelWrite = false;
183 				return B_CANCELED;
184 			}
185 
186 			continue;
187 		}
188 
189 		memcpy(fBuffer + fWritePosition, buffer, copyLength);
190 		fWritePosition = (fWritePosition + copyLength) % fBufferSize;
191 		fReadable += copyLength;
192 
193 		buffer = (uint8 *)buffer + copyLength;
194 		length -= copyLength;
195 
196 		if (fReaderWaiting) {
197 			release_sem_etc(fReaderNotifier, 1, B_DO_NOT_RESCHEDULE);
198 			fReaderWaiting = false;
199 		}
200 	}
201 
202 	return B_OK;
203 }
204 
205 
206 void
MakeEmpty()207 StreamingRingBuffer::MakeEmpty()
208 {
209 	BAutolock dataLock(fDataLocker);
210 	if (!dataLock.IsLocked())
211 		return;
212 
213 	fReadPosition = fWritePosition = 0;
214 	fReadable = 0;
215 
216 	if (fWriterWaiting) {
217 		release_sem_etc(fWriterNotifier, 1, 0);
218 		fWriterWaiting = false;
219 		fCancelWrite = true;
220 	}
221 
222 	if (fReaderWaiting) {
223 		release_sem_etc(fReaderNotifier, 1, 0);
224 		fReaderWaiting = false;
225 		fCancelRead = true;
226 	}
227 }
228