xref: /haiku/src/servers/app/drawing/interface/remote/StreamingRingBuffer.cpp (revision dcaec19c9a11eec63fd86420326935c5097e48e9)
1 /*
2  * Copyright 2009, Haiku, Inc.
3  * Distributed under the terms of the MIT License.
4  *
5  * Authors:
6  *		Michael Lotz <mmlr@mlotz.ch>
7  */
8 
9 #include "StreamingRingBuffer.h"
10 
11 #include <Autolock.h>
12 
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <string.h>
16 
17 #define TRACE(x...)			/*debug_printf("StreamingRingBuffer: "x)*/
18 #define TRACE_ERROR(x...)	debug_printf("StreamingRingBuffer: "x)
19 
20 
21 StreamingRingBuffer::StreamingRingBuffer(size_t bufferSize)
22 	:
23 	fReaderWaiting(false),
24 	fWriterWaiting(false),
25 	fReaderNotifier(-1),
26 	fWriterNotifier(-1),
27 	fReaderLocker("StreamingRingBuffer reader"),
28 	fWriterLocker("StreamingRingBuffer writer"),
29 	fDataLocker("StreamingRingBuffer data"),
30 	fBuffer(NULL),
31 	fBufferSize(bufferSize),
32 	fReadable(0),
33 	fReadPosition(0),
34 	fWritePosition(0)
35 {
36 	fReaderNotifier = create_sem(0, "StreamingRingBuffer read notify");
37 	fWriterNotifier = create_sem(0, "StreamingRingBuffer write notify");
38 
39 	fBuffer = (uint8 *)malloc(fBufferSize);
40 	if (fBuffer == NULL)
41 		fBufferSize = 0;
42 }
43 
44 
45 StreamingRingBuffer::~StreamingRingBuffer()
46 {
47 	delete_sem(fReaderNotifier);
48 	delete_sem(fWriterNotifier);
49 	free(fBuffer);
50 }
51 
52 
53 status_t
54 StreamingRingBuffer::InitCheck()
55 {
56 	if (fReaderNotifier < 0)
57 		return fReaderNotifier;
58 	if (fWriterNotifier < 0)
59 		return fWriterNotifier;
60 	if (fBuffer == NULL)
61 		return B_NO_MEMORY;
62 
63 	return B_OK;
64 }
65 
66 
67 int32
68 StreamingRingBuffer::Read(void *buffer, size_t length, bool onlyBlockOnNoData)
69 {
70 	BAutolock readerLock(fReaderLocker);
71 	if (!readerLock.IsLocked())
72 		return B_ERROR;
73 
74 	BAutolock dataLock(fDataLocker);
75 	if (!dataLock.IsLocked())
76 		return B_ERROR;
77 
78 	int32 readSize = 0;
79 	while (length > 0) {
80 		size_t copyLength = min_c(length, fBufferSize - fReadPosition);
81 		copyLength = min_c(copyLength, fReadable);
82 
83 		if (copyLength == 0) {
84 			if (onlyBlockOnNoData && readSize > 0)
85 				return readSize;
86 
87 			fReaderWaiting = true;
88 			dataLock.Unlock();
89 
90 			status_t result;
91 			do {
92 				TRACE("waiting in reader\n");
93 				result = acquire_sem(fReaderNotifier);
94 				TRACE("done waiting in reader with status: 0x%08lx\n", result);
95 			} while (result == B_INTERRUPTED);
96 
97 			if (result != B_OK)
98 				return result;
99 
100 			if (!dataLock.Lock())
101 				return B_ERROR;
102 
103 			continue;
104 		}
105 
106 		// support discarding input
107 		if (buffer != NULL) {
108 			memcpy(buffer, fBuffer + fReadPosition, copyLength);
109 			buffer = (uint8 *)buffer + copyLength;
110 		}
111 
112 		fReadPosition = (fReadPosition + copyLength) % fBufferSize;
113 		fReadable -= copyLength;
114 		readSize += copyLength;
115 		length -= copyLength;
116 
117 		if (fWriterWaiting) {
118 			release_sem_etc(fWriterNotifier, 1, B_DO_NOT_RESCHEDULE);
119 			fWriterWaiting = false;
120 		}
121 	}
122 
123 	return readSize;
124 }
125 
126 
127 status_t
128 StreamingRingBuffer::Write(const void *buffer, size_t length)
129 {
130 	BAutolock writerLock(fWriterLocker);
131 	if (!writerLock.IsLocked())
132 		return B_ERROR;
133 
134 	BAutolock dataLock(fDataLocker);
135 	if (!dataLock.IsLocked())
136 		return B_ERROR;
137 
138 	while (length > 0) {
139 		size_t copyLength = min_c(length, fBufferSize - fWritePosition);
140 		copyLength = min_c(copyLength, fBufferSize - fReadable);
141 
142 		if (copyLength == 0) {
143 			fWriterWaiting = true;
144 			dataLock.Unlock();
145 
146 			status_t result;
147 			do {
148 				TRACE("waiting in writer\n");
149 				result = acquire_sem(fWriterNotifier);
150 				TRACE("done waiting in writer with status: 0x%08lx\n", result);
151 			} while (result == B_INTERRUPTED);
152 
153 			if (result != B_OK)
154 				return result;
155 
156 			if (!dataLock.Lock())
157 				return B_ERROR;
158 
159 			continue;
160 		}
161 
162 		memcpy(fBuffer + fWritePosition, buffer, copyLength);
163 		fWritePosition = (fWritePosition + copyLength) % fBufferSize;
164 		fReadable += copyLength;
165 
166 		buffer = (uint8 *)buffer + copyLength;
167 		length -= copyLength;
168 
169 		if (fReaderWaiting) {
170 			release_sem_etc(fReaderNotifier, 1, B_DO_NOT_RESCHEDULE);
171 			fReaderWaiting = false;
172 		}
173 	}
174 
175 	return B_OK;
176 }
177