xref: /haiku/src/add-ons/kernel/network/protocols/unix/UnixFifo.cpp (revision 59234b36ce027000c55abc7ec2f6ff5b8f6f816b)
1 /*
2  * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
3  * Distributed under the terms of the MIT License.
4  */
5 
6 #include "UnixFifo.h"
7 
8 #include "unix.h"
9 
10 
11 #define UNIX_FIFO_DEBUG_LEVEL	2
12 #define UNIX_DEBUG_LEVEL		UNIX_FIFO_DEBUG_LEVEL
13 #include "UnixDebug.h"
14 
15 
16 UnixBufferQueue::UnixBufferQueue(size_t capacity)
17 	:
18 	fSize(0),
19 	fCapacity(capacity)
20 {
21 }
22 
23 
24 UnixBufferQueue::~UnixBufferQueue()
25 {
26 	while (net_buffer* buffer = fBuffers.RemoveHead())
27 		gBufferModule->free(buffer);
28 }
29 
30 
31 status_t
32 UnixBufferQueue::Read(size_t size, net_buffer** _buffer)
33 {
34 	if (size > fSize)
35 		size = fSize;
36 
37 	if (size == 0)
38 		return B_BAD_VALUE;
39 
40 	// If the first buffer has the right size or is smaller, we can just
41 	// dequeue it.
42 	net_buffer* buffer = fBuffers.Head();
43 	if (buffer->size <= size) {
44 		fBuffers.RemoveHead();
45 		fSize -= buffer->size;
46 		*_buffer = buffer;
47 
48 		if (buffer->size == size)
49 			return B_OK;
50 
51 		// buffer is too small
52 
53  		size_t bytesLeft = size - buffer->size;
54 
55 		// Append from the following buffers, until we've read as much as we're
56 		// supposed to.
57 		while (bytesLeft > 0) {
58 			net_buffer* nextBuffer = fBuffers.Head();
59 			size_t toCopy = min_c(bytesLeft, nextBuffer->size);
60 
61 			if (gBufferModule->append_cloned(buffer, nextBuffer, 0, toCopy)
62 					!= B_OK) {
63 				// Too bad, but we've got some data, so we don't fail.
64 				return B_OK;
65 			}
66 
67 			// transfer the ancillary data
68 			gBufferModule->transfer_ancillary_data(nextBuffer, buffer);
69 
70 			if (nextBuffer->size > toCopy) {
71 				// remove the part we've copied
72 				gBufferModule->remove_header(nextBuffer, toCopy);
73 			} else {
74 				// get rid of the buffer completely
75 				fBuffers.RemoveHead();
76 				gBufferModule->free(nextBuffer);
77 			}
78 
79 			bytesLeft -= toCopy;
80 		}
81 
82 		return B_OK;
83 	}
84 
85 	// buffer is too big
86 
87 	// Create a new buffer, and copy into it, as much as we need.
88 	net_buffer* newBuffer = gBufferModule->create(256);
89 	if (newBuffer == NULL)
90 		return ENOBUFS;
91 
92 	status_t error = gBufferModule->append_cloned(newBuffer, buffer, 0, size);
93 	if (error != B_OK) {
94 		gBufferModule->free(newBuffer);
95 		return error;
96 	}
97 
98 	// transfer the ancillary data
99 	gBufferModule->transfer_ancillary_data(buffer, newBuffer);
100 
101 	// remove the part we've copied
102 	gBufferModule->remove_header(buffer, size);
103 
104 	fSize -= size;
105 	*_buffer = newBuffer;
106 
107 	return B_OK;
108 }
109 
110 
111 status_t
112 UnixBufferQueue::Write(net_buffer* buffer)
113 {
114 	if (buffer->size > Writable())
115 		return ENOBUFS;
116 
117 	fBuffers.Add(buffer);
118 	fSize += buffer->size;
119 
120 	return B_OK;
121 }
122 
123 
124 void
125 UnixBufferQueue::SetCapacity(size_t capacity)
126 {
127 	fCapacity = capacity;
128 }
129 
130 
131 // #pragma mark -
132 
133 
134 UnixFifo::UnixFifo(size_t capacity)
135 	:
136 	fBuffer(capacity),
137 	fReaders(),
138 	fWriters(),
139 	fReadRequested(0),
140 	fWriteRequested(0),
141 	fReaderSem(-1),
142 	fWriterSem(-1),
143 	fShutdown(0)
144 
145 {
146 	fLock.sem = -1;
147 }
148 
149 
150 UnixFifo::~UnixFifo()
151 {
152 	if (fReaderSem >= 0)
153 		delete_sem(fReaderSem);
154 
155 	if (fWriterSem >= 0)
156 		delete_sem(fWriterSem);
157 
158 	if (fLock.sem >= 0)
159 		benaphore_destroy(&fLock);
160 }
161 
162 
163 status_t
164 UnixFifo::Init()
165 {
166 	status_t error = benaphore_init(&fLock, "unix fifo");
167 
168 	fReaderSem = create_sem(0, "unix fifo readers");
169 	fWriterSem = create_sem(0, "unix fifo writers");
170 
171 	if (error != B_OK || fReaderSem < 0 || fWriterSem < 0)
172 		return ENOBUFS;
173 
174 	return B_OK;
175 }
176 
177 
178 void
179 UnixFifo::Shutdown(uint32 shutdown)
180 {
181 	fShutdown |= shutdown;
182 
183 	if (shutdown != 0) {
184 		// Shutting down either end also effects the other, so notify both.
185 		release_sem_etc(fWriterSem, 1, B_RELEASE_ALL);
186 		release_sem_etc(fReaderSem, 1, B_RELEASE_ALL);
187 	}
188 }
189 
190 
191 status_t
192 UnixFifo::Read(size_t numBytes, bigtime_t timeout, net_buffer** _buffer)
193 {
194 	TRACE("[%ld] UnixFifo::Read(%lu, %lld)\n", find_thread(NULL), numBytes,
195 		timeout);
196 
197 	if (IsReadShutdown())
198 		return UNIX_FIFO_SHUTDOWN;
199 
200 	Request request(numBytes);
201 	fReaders.Add(&request);
202 	fReadRequested += request.size;
203 
204 	status_t error = _Read(request, numBytes, timeout, _buffer);
205 
206 	bool firstInQueue = fReaders.Head() == &request;
207 	fReaders.Remove(&request);
208 	fReadRequested -= request.size;
209 
210 	if (firstInQueue && !fReaders.IsEmpty() && fBuffer.Readable() > 0
211 			&& !IsReadShutdown()) {
212 		// There's more to read, other readers, and we were first in the queue.
213 		// So we need to notify the others.
214 		release_sem_etc(fReaderSem, 1, B_RELEASE_ALL);
215 	}
216 
217 	if (error == B_OK && *_buffer != NULL && (*_buffer)->size > 0
218 			&& !fWriters.IsEmpty() && !IsWriteShutdown()) {
219 		// We read something and there are writers. Notify them
220 		release_sem_etc(fWriterSem, 1, B_RELEASE_ALL);
221 	}
222 
223 	RETURN_ERROR(error);
224 }
225 
226 
227 status_t
228 UnixFifo::Write(net_buffer* buffer, bigtime_t timeout)
229 {
230 	if (IsWriteShutdown())
231 		return UNIX_FIFO_SHUTDOWN;
232 
233 	Request request(buffer->size);
234 	fWriters.Add(&request);
235 	fWriteRequested += request.size;
236 
237 	status_t error = _Write(request, buffer, timeout);
238 
239 	bool firstInQueue = fWriters.Head() == &request;
240 	fWriters.Remove(&request);
241 	fWriteRequested -= request.size;
242 
243 	if (firstInQueue && !fWriters.IsEmpty() && fBuffer.Writable() > 0
244 			&& !IsWriteShutdown()) {
245 		// There's more space for writing, other writers, and we were first in
246 		// the queue. So we need to notify the others.
247 		release_sem_etc(fWriterSem, 1, B_RELEASE_ALL);
248 	}
249 
250 	if (error == B_OK && request.size > 0 && !fReaders.IsEmpty()
251 			&& !IsReadShutdown()) {
252 		// We've written something and there are readers. Notify them
253 		release_sem_etc(fReaderSem, 1, B_RELEASE_ALL);
254 	}
255 
256 	RETURN_ERROR(error);
257 }
258 
259 
260 size_t
261 UnixFifo::Readable() const
262 {
263 	size_t readable = fBuffer.Readable();
264 	return readable > fReadRequested ? readable - fReadRequested : 0;
265 }
266 
267 
268 size_t
269 UnixFifo::Writable() const
270 {
271 	size_t writable = fBuffer.Writable();
272 	return writable > fWriteRequested ? writable - fWriteRequested : 0;
273 }
274 
275 
276 void
277 UnixFifo::SetBufferCapacity(size_t capacity)
278 {
279 	// check against allowed minimal/maximal value
280 	if (capacity > UNIX_FIFO_MAXIMAL_CAPACITY)
281 		capacity = UNIX_FIFO_MAXIMAL_CAPACITY;
282 	else if (capacity < UNIX_FIFO_MINIMAL_CAPACITY)
283 		capacity = UNIX_FIFO_MINIMAL_CAPACITY;
284 
285 	size_t oldCapacity = fBuffer.Capacity();
286 	if (capacity == oldCapacity)
287 		return;
288 
289 	// set capacity
290 	fBuffer.SetCapacity(capacity);
291 
292 	// wake up waiting writers, if the capacity increased
293 	if (!fWriters.IsEmpty() && !IsWriteShutdown())
294 		release_sem_etc(fWriterSem, 1, B_RELEASE_ALL);
295 }
296 
297 
298 status_t
299 UnixFifo::_Read(Request& request, size_t numBytes, bigtime_t timeout,
300 	net_buffer** _buffer)
301 {
302 	// wait for the request to reach the front of the queue
303 	if (fReaders.Head() != &request && timeout == 0)
304 		RETURN_ERROR(B_WOULD_BLOCK);
305 
306 	while (fReaders.Head() != &request && !IsReadShutdown()) {
307 		benaphore_unlock(&fLock);
308 
309 		status_t error = acquire_sem_etc(fReaderSem, 1,
310 			B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
311 
312 		benaphore_lock(&fLock);
313 
314 		if (error != B_OK)
315 			RETURN_ERROR(error);
316 	}
317 
318 	if (IsReadShutdown())
319 		return UNIX_FIFO_SHUTDOWN;
320 
321 	if (fBuffer.Readable() == 0) {
322 		if (IsWriteShutdown()) {
323 			*_buffer = NULL;
324 			RETURN_ERROR(B_OK);
325 		}
326 
327 		if (timeout == 0)
328 			RETURN_ERROR(B_WOULD_BLOCK);
329 	}
330 
331 	// wait for any data to become available
332 // TODO: Support low water marks!
333 	while (fBuffer.Readable() == 0
334 			&& !IsReadShutdown() && !IsWriteShutdown()) {
335 		benaphore_unlock(&fLock);
336 
337 		status_t error = acquire_sem_etc(fReaderSem, 1,
338 			B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
339 
340 		benaphore_lock(&fLock);
341 
342 		if (error != B_OK)
343 			RETURN_ERROR(error);
344 	}
345 
346 	if (IsReadShutdown())
347 		return UNIX_FIFO_SHUTDOWN;
348 
349 	if (fBuffer.Readable() == 0 && IsWriteShutdown()) {
350 		*_buffer = NULL;
351 		RETURN_ERROR(B_OK);
352 	}
353 
354 	RETURN_ERROR(fBuffer.Read(numBytes, _buffer));
355 }
356 
357 
358 status_t
359 UnixFifo::_Write(Request& request, net_buffer* buffer, bigtime_t timeout)
360 {
361 	// wait for the request to reach the front of the queue
362 	if (fWriters.Head() != &request && timeout == 0)
363 		RETURN_ERROR(B_WOULD_BLOCK);
364 
365 	while (fWriters.Head() != &request && !IsWriteShutdown()) {
366 		benaphore_unlock(&fLock);
367 
368 		status_t error = acquire_sem_etc(fWriterSem, 1,
369 			B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
370 
371 		benaphore_lock(&fLock);
372 
373 		if (error != B_OK)
374 			RETURN_ERROR(error);
375 	}
376 
377 	if (IsWriteShutdown())
378 		return UNIX_FIFO_SHUTDOWN;
379 
380 	if (IsReadShutdown())
381 		return EPIPE;
382 
383 	// wait for any space to become available
384 	if (fBuffer.Writable() < request.size && timeout == 0)
385 		RETURN_ERROR(B_WOULD_BLOCK);
386 
387 	while (fBuffer.Writable() < request.size && !IsWriteShutdown()
388 			&& !IsReadShutdown()) {
389 		benaphore_unlock(&fLock);
390 
391 		status_t error = acquire_sem_etc(fWriterSem, 1,
392 			B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
393 
394 		benaphore_lock(&fLock);
395 
396 		if (error != B_OK)
397 			RETURN_ERROR(error);
398 	}
399 
400 	if (IsWriteShutdown())
401 		return UNIX_FIFO_SHUTDOWN;
402 
403 	if (IsReadShutdown())
404 		return EPIPE;
405 
406 	RETURN_ERROR(fBuffer.Write(buffer));
407 }
408