xref: /haiku/src/add-ons/kernel/network/protocols/unix/UnixFifo.cpp (revision 1deede7388b04dbeec5af85cae7164735ea9e70d)
1 /*
2  * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
3  * Distributed under the terms of the MIT License.
4  */
5 
6 #include "UnixFifo.h"
7 
8 #include <new>
9 
10 #include <AutoDeleter.h>
11 
12 #include <net_stack.h>
13 #include <util/ring_buffer.h>
14 
15 #include "unix.h"
16 
17 
18 #define UNIX_FIFO_DEBUG_LEVEL	0
19 #define UNIX_DEBUG_LEVEL		UNIX_FIFO_DEBUG_LEVEL
20 #include "UnixDebug.h"
21 
22 
23 // #pragma mark - UnixRequest
24 
25 
26 UnixRequest::UnixRequest(const iovec* vecs, size_t count,
27 		ancillary_data_container* ancillaryData)
28 	:
29 	fVecs(vecs),
30 	fVecCount(count),
31 	fAncillaryData(ancillaryData),
32 	fTotalSize(0),
33 	fBytesTransferred(0),
34 	fVecIndex(0),
35 	fVecOffset(0)
36 {
37 	for (size_t i = 0; i < fVecCount; i++)
38 		fTotalSize += fVecs[i].iov_len;
39 }
40 
41 
42 void
43 UnixRequest::AddBytesTransferred(size_t size)
44 {
45 	fBytesTransferred += size;
46 
47 	// also adjust the current iovec index/offset
48 	while (fVecIndex < fVecCount
49 			&& fVecs[fVecIndex].iov_len - fVecOffset <= size) {
50 		size -= fVecs[fVecIndex].iov_len - fVecOffset;
51 		fVecIndex++;
52 		fVecOffset = 0;
53 	}
54 
55 	if (fVecIndex < fVecCount)
56 		fVecOffset += size;
57 }
58 
59 
60 bool
61 UnixRequest::GetCurrentChunk(void*& data, size_t& size)
62 {
63 	while (fVecIndex < fVecCount
64 			&& fVecOffset >= fVecs[fVecIndex].iov_len) {
65 		fVecIndex++;
66 		fVecOffset = 0;
67 	}
68 	if (fVecIndex >= fVecCount)
69 		return false;
70 
71 	data = (uint8*)fVecs[fVecIndex].iov_base + fVecOffset;
72 	size = fVecs[fVecIndex].iov_len - fVecOffset;
73 	return true;
74 }
75 
76 
77 void
78 UnixRequest::SetAncillaryData(ancillary_data_container* data)
79 {
80 	fAncillaryData = data;
81 }
82 
83 
84 void
85 UnixRequest::AddAncillaryData(ancillary_data_container* data)
86 {
87 	if (fAncillaryData != NULL) {
88 		gStackModule->move_ancillary_data(data, fAncillaryData);
89 		gStackModule->delete_ancillary_data_container(data);
90 	} else
91 		fAncillaryData = data;
92 }
93 
94 
95 // #pragma mark - UnixBufferQueue
96 
97 
98 UnixBufferQueue::UnixBufferQueue(size_t capacity)
99 	:
100 	fBuffer(NULL),
101 	fCapacity(capacity)
102 {
103 }
104 
105 
106 UnixBufferQueue::~UnixBufferQueue()
107 {
108 	while (AncillaryDataEntry* entry = fAncillaryData.RemoveHead()) {
109 		gStackModule->delete_ancillary_data_container(entry->data);
110 		delete entry;
111 	}
112 
113 	delete_ring_buffer(fBuffer);
114 }
115 
116 
117 status_t
118 UnixBufferQueue::Init()
119 {
120 	fBuffer = create_ring_buffer(fCapacity);
121 	if (fBuffer == NULL)
122 		return B_NO_MEMORY;
123 	return B_OK;
124 }
125 
126 
127 size_t
128 UnixBufferQueue::Readable() const
129 {
130 	return ring_buffer_readable(fBuffer);
131 }
132 
133 
134 size_t
135 UnixBufferQueue::Writable() const
136 {
137 	return ring_buffer_writable(fBuffer);
138 }
139 
140 
141 status_t
142 UnixBufferQueue::Read(UnixRequest& request)
143 {
144 	bool user = gStackModule->is_syscall();
145 
146 	size_t readable = Readable();
147 	void* data;
148 	size_t size;
149 
150 	while (readable > 0 && request.GetCurrentChunk(data, size)) {
151 		if (size > readable)
152 			size = readable;
153 
154 		ssize_t bytesRead;
155 		if (user)
156 			bytesRead = ring_buffer_user_read(fBuffer, (uint8*)data, size);
157 		else
158 			bytesRead = ring_buffer_read(fBuffer, (uint8*)data, size);
159 
160 		if (bytesRead < 0)
161 			return bytesRead;
162 		if (bytesRead == 0)
163 			return B_ERROR;
164 
165 		// Adjust ancillary data entry offsets, respectively attach the ones
166 		// that belong to the read data to the request.
167 		if (AncillaryDataEntry* entry = fAncillaryData.Head()) {
168 			size_t offsetDelta = bytesRead;
169 			while (entry != NULL && offsetDelta > entry->offset) {
170 				// entry data have been read -- add ancillary data to request
171 				fAncillaryData.RemoveHead();
172 				offsetDelta -= entry->offset;
173 				request.AddAncillaryData(entry->data);
174 				delete entry;
175 
176 				entry = fAncillaryData.Head();
177 			}
178 
179 			if (entry != NULL)
180 				entry->offset -= offsetDelta;
181 		}
182 
183 		request.AddBytesTransferred(bytesRead);
184 		readable -= bytesRead;
185 	}
186 
187 	return B_OK;
188 }
189 
190 
191 status_t
192 UnixBufferQueue::Write(UnixRequest& request)
193 {
194 	bool user = gStackModule->is_syscall();
195 
196 	size_t writable = Writable();
197 	void* data;
198 	size_t size;
199 
200 	// If the request has ancillary data create an entry first.
201 	AncillaryDataEntry* ancillaryEntry = NULL;
202 	ObjectDeleter<AncillaryDataEntry> ancillaryEntryDeleter;
203 	if (writable > 0 && request.AncillaryData() != NULL) {
204 		ancillaryEntry = new(std::nothrow) AncillaryDataEntry;
205 		if (ancillaryEntry == NULL)
206 			return B_NO_MEMORY;
207 
208 		ancillaryEntryDeleter.SetTo(ancillaryEntry);
209 		ancillaryEntry->data = request.AncillaryData();
210 		ancillaryEntry->offset = Readable();
211 
212 		// The offsets are relative to the previous entry.
213 		AncillaryDataList::Iterator it = fAncillaryData.GetIterator();
214 		while (AncillaryDataEntry* entry = it.Next())
215 			ancillaryEntry->offset -= entry->offset;
216 			// TODO: This is inefficient when the list is long. Rather also
217 			// store and maintain the absolute offset of the last queued entry.
218 	}
219 
220 	// write as much as we can
221 	while (writable > 0 && request.GetCurrentChunk(data, size)) {
222 		if (size > writable)
223 			size = writable;
224 
225 		ssize_t bytesWritten;
226 		if (user)
227 			bytesWritten = ring_buffer_user_write(fBuffer, (uint8*)data, size);
228 		else
229 			bytesWritten = ring_buffer_write(fBuffer, (uint8*)data, size);
230 
231 		if (bytesWritten < 0)
232 			return bytesWritten;
233 		if (bytesWritten == 0)
234 			return B_ERROR;
235 
236 		if (ancillaryEntry != NULL) {
237 			fAncillaryData.Add(ancillaryEntry);
238 			ancillaryEntryDeleter.Detach();
239 			request.SetAncillaryData(NULL);
240 			ancillaryEntry = NULL;
241 		}
242 
243 		request.AddBytesTransferred(bytesWritten);
244 		writable -= bytesWritten;
245 	}
246 
247 	return B_OK;
248 }
249 
250 
251 status_t
252 UnixBufferQueue::SetCapacity(size_t capacity)
253 {
254 // TODO:...
255 return B_ERROR;
256 }
257 
258 
259 // #pragma mark -
260 
261 
262 UnixFifo::UnixFifo(size_t capacity)
263 	:
264 	fBuffer(capacity),
265 	fReaders(),
266 	fWriters(),
267 	fReadRequested(0),
268 	fWriteRequested(0),
269 	fShutdown(0)
270 
271 {
272 	fReadCondition.Init(this, "unix fifo read");
273 	fWriteCondition.Init(this, "unix fifo write");
274 	mutex_init(&fLock, "unix fifo");
275 }
276 
277 
278 UnixFifo::~UnixFifo()
279 {
280 	mutex_destroy(&fLock);
281 }
282 
283 
284 status_t
285 UnixFifo::Init()
286 {
287 	return fBuffer.Init();
288 }
289 
290 
291 void
292 UnixFifo::Shutdown(uint32 shutdown)
293 {
294 	TRACE("[%" B_PRId32 "] %p->UnixFifo::Shutdown(0x%" B_PRIx32 ")\n",
295 		find_thread(NULL), this, shutdown);
296 
297 	fShutdown |= shutdown;
298 
299 	if (shutdown != 0) {
300 		// Shutting down either end also effects the other, so notify both.
301 		fReadCondition.NotifyAll();
302 		fWriteCondition.NotifyAll();
303 	}
304 }
305 
306 
307 ssize_t
308 UnixFifo::Read(const iovec* vecs, size_t vecCount,
309 	ancillary_data_container** _ancillaryData, bigtime_t timeout)
310 {
311 	TRACE("[%" B_PRId32 "] %p->UnixFifo::Read(%p, %ld, %" B_PRIdBIGTIME ")\n",
312 		find_thread(NULL), this, vecs, vecCount, timeout);
313 
314 	if (IsReadShutdown() && fBuffer.Readable() == 0)
315 		RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
316 
317 	UnixRequest request(vecs, vecCount, NULL);
318 	fReaders.Add(&request);
319 	fReadRequested += request.TotalSize();
320 
321 	status_t error = _Read(request, timeout);
322 
323 	bool firstInQueue = fReaders.Head() == &request;
324 	fReaders.Remove(&request);
325 	fReadRequested -= request.TotalSize();
326 
327 	if (firstInQueue && !fReaders.IsEmpty() && fBuffer.Readable() > 0
328 			&& !IsReadShutdown()) {
329 		// There's more to read, other readers, and we were first in the queue.
330 		// So we need to notify the others.
331 		fReadCondition.NotifyAll();
332 	}
333 
334 	if (request.BytesTransferred() > 0 && !fWriters.IsEmpty()
335 			&& !IsWriteShutdown()) {
336 		// We read something and there are writers. Notify them
337 		fWriteCondition.NotifyAll();
338 	}
339 
340 	*_ancillaryData = request.AncillaryData();
341 
342 	if (request.BytesTransferred() > 0) {
343 		if (request.BytesTransferred() > SSIZE_MAX)
344 			RETURN_ERROR(SSIZE_MAX);
345 		RETURN_ERROR((ssize_t)request.BytesTransferred());
346 	}
347 
348 	RETURN_ERROR(error);
349 }
350 
351 
352 ssize_t
353 UnixFifo::Write(const iovec* vecs, size_t vecCount,
354 	ancillary_data_container* ancillaryData, bigtime_t timeout)
355 {
356 	TRACE("[%" B_PRId32 "] %p->UnixFifo::Write(%p, %ld, %p, %" B_PRIdBIGTIME
357 		")\n", find_thread(NULL), this, vecs, vecCount, ancillaryData,
358 		timeout);
359 
360 	if (IsWriteShutdown())
361 		RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
362 
363 	if (IsReadShutdown())
364 		RETURN_ERROR(EPIPE);
365 
366 	UnixRequest request(vecs, vecCount, ancillaryData);
367 	fWriters.Add(&request);
368 	fWriteRequested += request.TotalSize();
369 
370 	status_t error = _Write(request, timeout);
371 
372 	bool firstInQueue = fWriters.Head() == &request;
373 	fWriters.Remove(&request);
374 	fWriteRequested -= request.TotalSize();
375 
376 	if (firstInQueue && !fWriters.IsEmpty() && fBuffer.Writable() > 0
377 			&& !IsWriteShutdown()) {
378 		// There's more space for writing, other writers, and we were first in
379 		// the queue. So we need to notify the others.
380 		fWriteCondition.NotifyAll();
381 	}
382 
383 	if (request.BytesTransferred() > 0 && !fReaders.IsEmpty()
384 			&& !IsReadShutdown()) {
385 		// We've written something and there are readers. Notify them.
386 		fReadCondition.NotifyAll();
387 	}
388 
389 	if (request.BytesTransferred() > 0) {
390 		if (request.BytesTransferred() > SSIZE_MAX)
391 			RETURN_ERROR(SSIZE_MAX);
392 		RETURN_ERROR((ssize_t)request.BytesTransferred());
393 	}
394 
395 	RETURN_ERROR(error);
396 }
397 
398 
399 size_t
400 UnixFifo::Readable() const
401 {
402 	size_t readable = fBuffer.Readable();
403 	return (off_t)readable > fReadRequested ? readable - fReadRequested : 0;
404 }
405 
406 
407 size_t
408 UnixFifo::Writable() const
409 {
410 	size_t writable = fBuffer.Writable();
411 	return (off_t)writable > fWriteRequested ? writable - fWriteRequested : 0;
412 }
413 
414 
415 status_t
416 UnixFifo::SetBufferCapacity(size_t capacity)
417 {
418 	// check against allowed minimal/maximal value
419 	if (capacity > UNIX_FIFO_MAXIMAL_CAPACITY)
420 		capacity = UNIX_FIFO_MAXIMAL_CAPACITY;
421 	else if (capacity < UNIX_FIFO_MINIMAL_CAPACITY)
422 		capacity = UNIX_FIFO_MINIMAL_CAPACITY;
423 
424 	size_t oldCapacity = fBuffer.Capacity();
425 	if (capacity == oldCapacity)
426 		return B_OK;
427 
428 	// set capacity
429 	status_t error = fBuffer.SetCapacity(capacity);
430 	if (error != B_OK)
431 		return error;
432 
433 	// wake up waiting writers, if the capacity increased
434 	if (!fWriters.IsEmpty() && !IsWriteShutdown())
435 		fWriteCondition.NotifyAll();
436 
437 	return B_OK;
438 }
439 
440 
441 status_t
442 UnixFifo::_Read(UnixRequest& request, bigtime_t timeout)
443 {
444 	// wait for the request to reach the front of the queue
445 	if (fReaders.Head() != &request && timeout == 0)
446 		RETURN_ERROR(B_WOULD_BLOCK);
447 
448 	while (fReaders.Head() != &request
449 		&& !(IsReadShutdown() && fBuffer.Readable() == 0)) {
450 		ConditionVariableEntry entry;
451 		fReadCondition.Add(&entry);
452 
453 		mutex_unlock(&fLock);
454 		status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
455 			timeout);
456 		mutex_lock(&fLock);
457 
458 		if (error != B_OK)
459 			RETURN_ERROR(error);
460 	}
461 
462 	if (fBuffer.Readable() == 0) {
463 		if (IsReadShutdown())
464 			RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
465 
466 		if (IsWriteShutdown())
467 			RETURN_ERROR(0);
468 
469 		if (timeout == 0)
470 			RETURN_ERROR(B_WOULD_BLOCK);
471 	}
472 
473 	// wait for any data to become available
474 // TODO: Support low water marks!
475 	while (fBuffer.Readable() == 0
476 			&& !IsReadShutdown() && !IsWriteShutdown()) {
477 		ConditionVariableEntry entry;
478 		fReadCondition.Add(&entry);
479 
480 		mutex_unlock(&fLock);
481 		status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
482 			timeout);
483 		mutex_lock(&fLock);
484 
485 		if (error != B_OK)
486 			RETURN_ERROR(error);
487 	}
488 
489 	if (fBuffer.Readable() == 0) {
490 		if (IsReadShutdown())
491 			RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
492 		if (IsWriteShutdown())
493 			RETURN_ERROR(0);
494 	}
495 
496 	RETURN_ERROR(fBuffer.Read(request));
497 }
498 
499 
500 status_t
501 UnixFifo::_Write(UnixRequest& request, bigtime_t timeout)
502 {
503 	if (timeout == 0)
504 		RETURN_ERROR(_WriteNonBlocking(request));
505 
506 	// wait for the request to reach the front of the queue
507 	while (fWriters.Head() != &request && !IsWriteShutdown()) {
508 		ConditionVariableEntry entry;
509 		fWriteCondition.Add(&entry);
510 
511 		mutex_unlock(&fLock);
512 		status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
513 			timeout);
514 		mutex_lock(&fLock);
515 
516 		if (error != B_OK)
517 			RETURN_ERROR(error);
518 	}
519 
520 	if (IsWriteShutdown())
521 		RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
522 
523 	if (IsReadShutdown())
524 		RETURN_ERROR(EPIPE);
525 
526 	if (request.TotalSize() == 0)
527 		return 0;
528 
529 	status_t error = B_OK;
530 
531 	while (error == B_OK && request.BytesRemaining() > 0) {
532 		// wait for any space to become available
533 		while (error == B_OK && fBuffer.Writable() == 0 && !IsWriteShutdown()
534 				&& !IsReadShutdown()) {
535 			ConditionVariableEntry entry;
536 			fWriteCondition.Add(&entry);
537 
538 			mutex_unlock(&fLock);
539 			error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
540 			mutex_lock(&fLock);
541 
542 			if (error != B_OK)
543 				RETURN_ERROR(error);
544 		}
545 
546 		if (IsWriteShutdown())
547 			RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
548 
549 		if (IsReadShutdown())
550 			RETURN_ERROR(EPIPE);
551 
552 		// write as much as we can
553 		error = fBuffer.Write(request);
554 
555 		if (error == B_OK) {
556 // TODO: Whenever we've successfully written a part, we should reset the
557 // timeout!
558 		}
559 	}
560 
561 	RETURN_ERROR(error);
562 }
563 
564 
565 status_t
566 UnixFifo::_WriteNonBlocking(UnixRequest& request)
567 {
568 	// We need to be first in queue and space should be available right now,
569 	// otherwise we need to fail.
570 	if (fWriters.Head() != &request || fBuffer.Writable() == 0)
571 		RETURN_ERROR(B_WOULD_BLOCK);
572 
573 	if (request.TotalSize() == 0)
574 		return 0;
575 
576 	// Write as much as we can.
577 	RETURN_ERROR(fBuffer.Write(request));
578 }
579 
580