xref: /haiku/src/add-ons/kernel/network/protocols/unix/UnixFifo.cpp (revision 9a6a20d4689307142a7ed26a1437ba47e244e73f)
1 /*
2  * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
3  * Distributed under the terms of the MIT License.
4  */
5 
6 #include "UnixFifo.h"
7 
8 #include <new>
9 
10 #include <AutoDeleter.h>
11 
12 #include <net_stack.h>
13 #include <util/ring_buffer.h>
14 
15 #include "unix.h"
16 
17 
18 #define UNIX_FIFO_DEBUG_LEVEL	0
19 #define UNIX_DEBUG_LEVEL		UNIX_FIFO_DEBUG_LEVEL
20 #include "UnixDebug.h"
21 
22 
23 // #pragma mark - UnixRequest
24 
25 
26 UnixRequest::UnixRequest(const iovec* vecs, size_t count,
27 		ancillary_data_container* ancillaryData,
28 		struct sockaddr_storage* address)
29 	:
30 	fVecs(vecs),
31 	fVecCount(count),
32 	fAncillaryData(ancillaryData),
33 	fTotalSize(0),
34 	fBytesTransferred(0),
35 	fVecIndex(0),
36 	fVecOffset(0),
37 	fAddress(address)
38 {
39 	for (size_t i = 0; i < fVecCount; i++)
40 		fTotalSize += fVecs[i].iov_len;
41 }
42 
43 
44 void
45 UnixRequest::AddBytesTransferred(size_t size)
46 {
47 	fBytesTransferred += size;
48 
49 	// also adjust the current iovec index/offset
50 	while (fVecIndex < fVecCount
51 			&& fVecs[fVecIndex].iov_len - fVecOffset <= size) {
52 		size -= fVecs[fVecIndex].iov_len - fVecOffset;
53 		fVecIndex++;
54 		fVecOffset = 0;
55 	}
56 
57 	if (fVecIndex < fVecCount)
58 		fVecOffset += size;
59 }
60 
61 
62 bool
63 UnixRequest::GetCurrentChunk(void*& data, size_t& size)
64 {
65 	while (fVecIndex < fVecCount
66 			&& fVecOffset >= fVecs[fVecIndex].iov_len) {
67 		fVecIndex++;
68 		fVecOffset = 0;
69 	}
70 	if (fVecIndex >= fVecCount)
71 		return false;
72 
73 	data = (uint8*)fVecs[fVecIndex].iov_base + fVecOffset;
74 	size = fVecs[fVecIndex].iov_len - fVecOffset;
75 	return true;
76 }
77 
78 
79 void
80 UnixRequest::UnsetAncillaryData()
81 {
82 	fAncillaryData = NULL;
83 }
84 
85 
86 void
87 UnixRequest::AddAncillaryData(ancillary_data_container* data)
88 {
89 	if (fAncillaryData != NULL) {
90 		gStackModule->move_ancillary_data(data, fAncillaryData);
91 		gStackModule->delete_ancillary_data_container(data);
92 	} else
93 		fAncillaryData = data;
94 }
95 
96 
97 // #pragma mark - UnixBufferQueue
98 
99 
100 UnixBufferQueue::UnixBufferQueue(size_t capacity, UnixFifoType type)
101 	:
102 	fBuffer(NULL),
103 	fCapacity(capacity),
104 	fType(type)
105 {
106 }
107 
108 
109 UnixBufferQueue::~UnixBufferQueue()
110 {
111 	while (AncillaryDataEntry* entry = fAncillaryData.RemoveHead()) {
112 		gStackModule->delete_ancillary_data_container(entry->data);
113 		delete entry;
114 	}
115 
116 	delete_ring_buffer(fBuffer);
117 }
118 
119 
120 status_t
121 UnixBufferQueue::Init()
122 {
123 	fBuffer = create_ring_buffer(fCapacity);
124 	if (fBuffer == NULL)
125 		return B_NO_MEMORY;
126 	return B_OK;
127 }
128 
129 
130 size_t
131 UnixBufferQueue::Readable() const
132 {
133 	return ring_buffer_readable(fBuffer);
134 }
135 
136 
137 size_t
138 UnixBufferQueue::Writable() const
139 {
140 	return ring_buffer_writable(fBuffer);
141 }
142 
143 
144 status_t
145 UnixBufferQueue::Read(UnixRequest& request)
146 {
147 	bool user = gStackModule->is_syscall();
148 
149 	size_t readable = Readable();
150 	void* data;
151 	size_t size;
152 
153 	DatagramEntry* datagramEntry = NULL;
154 	if (fType == UnixFifoType::Datagram) {
155 		datagramEntry = fDatagrams.Head();
156 		if (datagramEntry == NULL)
157 			return B_ERROR;
158 
159 		if (datagramEntry->size > readable)
160 			TRACE("UnixBufferQueue::Read(): expected to read a datagram of size %lu, "
161 				"but only %lu bytes are readable\n", datagramEntry->size, readable);
162 		else
163 			readable = datagramEntry->size;
164 	}
165 
166 	while (readable > 0 && request.GetCurrentChunk(data, size)) {
167 		if (size > readable)
168 			size = readable;
169 
170 		ssize_t bytesRead;
171 		if (user)
172 			bytesRead = ring_buffer_user_read(fBuffer, (uint8*)data, size);
173 		else
174 			bytesRead = ring_buffer_read(fBuffer, (uint8*)data, size);
175 
176 		if (bytesRead < 0)
177 			return bytesRead;
178 		if (bytesRead == 0)
179 			return B_ERROR;
180 
181 		// Adjust ancillary data entry offsets, respectively attach the ones
182 		// that belong to the read data to the request.
183 		if (AncillaryDataEntry* entry = fAncillaryData.Head()) {
184 			size_t offsetDelta = bytesRead;
185 			while (entry != NULL && offsetDelta > entry->offset) {
186 				// entry data have been read -- add ancillary data to request
187 				fAncillaryData.RemoveHead();
188 				offsetDelta -= entry->offset;
189 				request.AddAncillaryData(entry->data);
190 				delete entry;
191 
192 				entry = fAncillaryData.Head();
193 			}
194 
195 			if (entry != NULL)
196 				entry->offset -= offsetDelta;
197 		}
198 
199 		request.AddBytesTransferred(bytesRead);
200 		readable -= bytesRead;
201 	}
202 
203 	if (fType == UnixFifoType::Datagram) {
204 		fDatagrams.RemoveHead();
205 
206 		memcpy(request.Address(), &datagramEntry->address, sizeof(datagramEntry->address));
207 		delete datagramEntry;
208 
209 		if (readable > 0) {
210 			ring_buffer_flush(fBuffer, readable);
211 			if (AncillaryDataEntry* entry = fAncillaryData.Head()) {
212 				size_t offsetDelta = readable;
213 				while (entry != NULL && offsetDelta > entry->offset) {
214 					fAncillaryData.RemoveHead();
215 					offsetDelta -= entry->offset;
216 					gStackModule->delete_ancillary_data_container(entry->data);
217 					delete entry;
218 
219 					entry = fAncillaryData.Head();
220 				}
221 
222 				if (entry != NULL)
223 					entry->offset -= offsetDelta;
224 			}
225 		}
226 	}
227 
228 	return B_OK;
229 }
230 
231 
232 status_t
233 UnixBufferQueue::Write(UnixRequest& request)
234 {
235 	bool user = gStackModule->is_syscall();
236 
237 	size_t writable = Writable();
238 	void* data;
239 	size_t size;
240 
241 	DatagramEntry* datagramEntry = NULL;
242 	ObjectDeleter<DatagramEntry> datagramEntryDeleter;
243 	if (fType == UnixFifoType::Datagram) {
244 		datagramEntry = new(std::nothrow) DatagramEntry;
245 		if (datagramEntry == NULL)
246 			return B_NO_MEMORY;
247 
248 		datagramEntryDeleter.SetTo(datagramEntry);
249 		memcpy(&datagramEntry->address, request.Address(),
250 			sizeof(datagramEntry->address));
251 		datagramEntry->size = request.TotalSize();
252 
253 		// This should have been handled in UnixFifo
254 		if (writable < datagramEntry->size) {
255 			TRACE("UnixBufferQueue::Write(): not enough space for"
256 				"datagram of size %lu (%lu bytes left)\n", datagramEntry->size, writable);
257 			return B_ERROR;
258 		}
259 	}
260 
261 	// If the request has ancillary data create an entry first.
262 	AncillaryDataEntry* ancillaryEntry = NULL;
263 	ObjectDeleter<AncillaryDataEntry> ancillaryEntryDeleter;
264 	if (writable > 0 && request.AncillaryData() != NULL) {
265 		ancillaryEntry = new(std::nothrow) AncillaryDataEntry;
266 		if (ancillaryEntry == NULL)
267 			return B_NO_MEMORY;
268 
269 		ancillaryEntryDeleter.SetTo(ancillaryEntry);
270 		ancillaryEntry->data = request.AncillaryData();
271 		ancillaryEntry->offset = Readable();
272 
273 		// The offsets are relative to the previous entry.
274 		AncillaryDataList::Iterator it = fAncillaryData.GetIterator();
275 		while (AncillaryDataEntry* entry = it.Next())
276 			ancillaryEntry->offset -= entry->offset;
277 			// TODO: This is inefficient when the list is long. Rather also
278 			// store and maintain the absolute offset of the last queued entry.
279 	}
280 
281 	// write as much as we can
282 	while (writable > 0 && request.GetCurrentChunk(data, size)) {
283 		if (size > writable)
284 			size = writable;
285 
286 		ssize_t bytesWritten;
287 		if (user)
288 			bytesWritten = ring_buffer_user_write(fBuffer, (uint8*)data, size);
289 		else
290 			bytesWritten = ring_buffer_write(fBuffer, (uint8*)data, size);
291 
292 		if (bytesWritten < 0)
293 			return bytesWritten;
294 		if (bytesWritten == 0)
295 			return B_ERROR;
296 
297 		if (ancillaryEntry != NULL) {
298 			fAncillaryData.Add(ancillaryEntry);
299 			ancillaryEntryDeleter.Detach();
300 			request.UnsetAncillaryData();
301 			ancillaryEntry = NULL;
302 		}
303 
304 		request.AddBytesTransferred(bytesWritten);
305 		writable -= bytesWritten;
306 	}
307 
308 	if (fType == UnixFifoType::Datagram) {
309 		fDatagrams.Add(datagramEntry);
310 		datagramEntryDeleter.Detach();
311 	}
312 
313 	return B_OK;
314 }
315 
316 
317 status_t
318 UnixBufferQueue::SetCapacity(size_t capacity)
319 {
320 	if (capacity <= fCapacity)
321 		return B_OK;
322 
323 	ring_buffer* newBuffer = create_ring_buffer(capacity);
324 	if (newBuffer == NULL)
325 		return B_NO_MEMORY;
326 
327 	ring_buffer_move(newBuffer, ring_buffer_readable(fBuffer), fBuffer);
328 	delete_ring_buffer(fBuffer);
329 
330 	fBuffer = newBuffer;
331 	fCapacity = capacity;
332 
333 	return B_OK;
334 }
335 
336 
337 // #pragma mark -
338 
339 
340 UnixFifo::UnixFifo(size_t capacity, UnixFifoType type)
341 	:
342 	fBuffer(capacity, type),
343 	fReaders(),
344 	fWriters(),
345 	fReadRequested(0),
346 	fWriteRequested(0),
347 	fShutdown(0)
348 
349 {
350 	fReadCondition.Init(this, "unix fifo read");
351 	fWriteCondition.Init(this, "unix fifo write");
352 	mutex_init(&fLock, "unix fifo");
353 }
354 
355 
356 UnixFifo::~UnixFifo()
357 {
358 	mutex_destroy(&fLock);
359 }
360 
361 
362 status_t
363 UnixFifo::Init()
364 {
365 	return fBuffer.Init();
366 }
367 
368 
369 void
370 UnixFifo::Shutdown(uint32 shutdown)
371 {
372 	TRACE("[%" B_PRId32 "] %p->UnixFifo::Shutdown(0x%" B_PRIx32 ")\n",
373 		find_thread(NULL), this, shutdown);
374 
375 	fShutdown |= shutdown;
376 
377 	if (shutdown != 0) {
378 		// Shutting down either end also effects the other, so notify both.
379 		fReadCondition.NotifyAll();
380 		fWriteCondition.NotifyAll();
381 	}
382 }
383 
384 
385 ssize_t
386 UnixFifo::Read(const iovec* vecs, size_t vecCount,
387 	ancillary_data_container** _ancillaryData,
388 	struct sockaddr_storage* address, bigtime_t timeout)
389 {
390 	TRACE("[%" B_PRId32 "] %p->UnixFifo::Read(%p, %ld, %" B_PRIdBIGTIME ")\n",
391 		find_thread(NULL), this, vecs, vecCount, timeout);
392 
393 	if (IsReadShutdown() && fBuffer.Readable() == 0)
394 		RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
395 
396 	UnixRequest request(vecs, vecCount, NULL, address);
397 	fReaders.Add(&request);
398 	fReadRequested += request.TotalSize();
399 
400 	status_t error = _Read(request, timeout);
401 
402 	bool firstInQueue = fReaders.Head() == &request;
403 	fReaders.Remove(&request);
404 	fReadRequested -= request.TotalSize();
405 
406 	if (firstInQueue && !fReaders.IsEmpty() && fBuffer.Readable() > 0
407 			&& !IsReadShutdown()) {
408 		// There's more to read, other readers, and we were first in the queue.
409 		// So we need to notify the others.
410 		fReadCondition.NotifyAll();
411 	}
412 
413 	if (request.BytesTransferred() > 0 && !fWriters.IsEmpty()
414 			&& !IsWriteShutdown()) {
415 		// We read something and there are writers. Notify them
416 		fWriteCondition.NotifyAll();
417 	}
418 
419 	*_ancillaryData = request.AncillaryData();
420 
421 	if (request.BytesTransferred() > 0) {
422 		if (request.BytesTransferred() > SSIZE_MAX)
423 			RETURN_ERROR(SSIZE_MAX);
424 		RETURN_ERROR((ssize_t)request.BytesTransferred());
425 	}
426 
427 	RETURN_ERROR(error);
428 }
429 
430 
431 ssize_t
432 UnixFifo::Write(const iovec* vecs, size_t vecCount,
433 	ancillary_data_container* ancillaryData,
434 	const struct sockaddr_storage* address, bigtime_t timeout)
435 {
436 	TRACE("[%" B_PRId32 "] %p->UnixFifo::Write(%p, %ld, %p, %" B_PRIdBIGTIME
437 		")\n", find_thread(NULL), this, vecs, vecCount, ancillaryData,
438 		timeout);
439 
440 	if (IsWriteShutdown())
441 		RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
442 
443 	if (IsReadShutdown())
444 		RETURN_ERROR(EPIPE);
445 
446 	UnixRequest request(vecs, vecCount, ancillaryData,
447 		(struct sockaddr_storage*)address);
448 	fWriters.Add(&request);
449 	fWriteRequested += request.TotalSize();
450 
451 	status_t error = _Write(request, timeout);
452 
453 	bool firstInQueue = fWriters.Head() == &request;
454 	fWriters.Remove(&request);
455 	fWriteRequested -= request.TotalSize();
456 
457 	if (firstInQueue && !fWriters.IsEmpty() && fBuffer.Writable() > 0
458 			&& !IsWriteShutdown()) {
459 		// There's more space for writing, other writers, and we were first in
460 		// the queue. So we need to notify the others.
461 		fWriteCondition.NotifyAll();
462 	}
463 
464 	if (request.BytesTransferred() > 0 && !fReaders.IsEmpty()
465 			&& !IsReadShutdown()) {
466 		// We've written something and there are readers. Notify them.
467 		fReadCondition.NotifyAll();
468 	}
469 
470 	if (request.BytesTransferred() > 0) {
471 		if (request.BytesTransferred() > SSIZE_MAX)
472 			RETURN_ERROR(SSIZE_MAX);
473 		RETURN_ERROR((ssize_t)request.BytesTransferred());
474 	}
475 
476 	RETURN_ERROR(error);
477 }
478 
479 
480 size_t
481 UnixFifo::Readable() const
482 {
483 	size_t readable = fBuffer.Readable();
484 	return (off_t)readable > fReadRequested ? readable - fReadRequested : 0;
485 }
486 
487 
488 size_t
489 UnixFifo::Writable() const
490 {
491 	size_t writable = fBuffer.Writable();
492 	return (off_t)writable > fWriteRequested ? writable - fWriteRequested : 0;
493 }
494 
495 
496 status_t
497 UnixFifo::SetBufferCapacity(size_t capacity)
498 {
499 	// check against allowed minimal/maximal value
500 	if (capacity > UNIX_FIFO_MAXIMAL_CAPACITY)
501 		capacity = UNIX_FIFO_MAXIMAL_CAPACITY;
502 	else if (capacity < UNIX_FIFO_MINIMAL_CAPACITY)
503 		capacity = UNIX_FIFO_MINIMAL_CAPACITY;
504 
505 	size_t oldCapacity = fBuffer.Capacity();
506 	if (capacity == oldCapacity)
507 		return B_OK;
508 
509 	// set capacity
510 	status_t error = fBuffer.SetCapacity(capacity);
511 	if (error != B_OK)
512 		return error;
513 
514 	// wake up waiting writers, if the capacity increased
515 	if (!fWriters.IsEmpty() && !IsWriteShutdown())
516 		fWriteCondition.NotifyAll();
517 
518 	return B_OK;
519 }
520 
521 
522 status_t
523 UnixFifo::_Read(UnixRequest& request, bigtime_t timeout)
524 {
525 	// wait for the request to reach the front of the queue
526 	if (fReaders.Head() != &request && timeout == 0)
527 		RETURN_ERROR(B_WOULD_BLOCK);
528 
529 	while (fReaders.Head() != &request
530 		&& !(IsReadShutdown() && fBuffer.Readable() == 0)) {
531 		ConditionVariableEntry entry;
532 		fReadCondition.Add(&entry);
533 
534 		mutex_unlock(&fLock);
535 		status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
536 			timeout);
537 		mutex_lock(&fLock);
538 
539 		if (error != B_OK)
540 			RETURN_ERROR(error);
541 	}
542 
543 	if (fBuffer.Readable() == 0) {
544 		if (IsReadShutdown())
545 			RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
546 
547 		if (IsWriteShutdown())
548 			RETURN_ERROR(0);
549 
550 		if (timeout == 0)
551 			RETURN_ERROR(B_WOULD_BLOCK);
552 	}
553 
554 	// wait for any data to become available
555 // TODO: Support low water marks!
556 	while (fBuffer.Readable() == 0
557 			&& !IsReadShutdown() && !IsWriteShutdown()) {
558 		ConditionVariableEntry entry;
559 		fReadCondition.Add(&entry);
560 
561 		mutex_unlock(&fLock);
562 		status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
563 			timeout);
564 		mutex_lock(&fLock);
565 
566 		if (error != B_OK)
567 			RETURN_ERROR(error);
568 	}
569 
570 	if (fBuffer.Readable() == 0) {
571 		if (IsReadShutdown())
572 			RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
573 		if (IsWriteShutdown())
574 			RETURN_ERROR(0);
575 	}
576 
577 	RETURN_ERROR(fBuffer.Read(request));
578 }
579 
580 
581 status_t
582 UnixFifo::_Write(UnixRequest& request, bigtime_t timeout)
583 {
584 	if (timeout == 0)
585 		RETURN_ERROR(_WriteNonBlocking(request));
586 
587 	// wait for the request to reach the front of the queue
588 	while (fWriters.Head() != &request && !IsWriteShutdown()) {
589 		ConditionVariableEntry entry;
590 		fWriteCondition.Add(&entry);
591 
592 		mutex_unlock(&fLock);
593 		status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
594 			timeout);
595 		mutex_lock(&fLock);
596 
597 		if (error != B_OK)
598 			RETURN_ERROR(error);
599 	}
600 
601 	if (IsWriteShutdown())
602 		RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
603 
604 	if (IsReadShutdown())
605 		RETURN_ERROR(EPIPE);
606 
607 	if (request.TotalSize() == 0)
608 		return 0;
609 
610 	status_t error = B_OK;
611 
612 	while (error == B_OK && request.BytesRemaining() > 0) {
613 		// wait for any space to become available
614 		while (error == B_OK && fBuffer.Writable() < _MinimumWritableSize(request)
615 				&& !IsWriteShutdown() && !IsReadShutdown()) {
616 			ConditionVariableEntry entry;
617 			fWriteCondition.Add(&entry);
618 
619 			mutex_unlock(&fLock);
620 			error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
621 			mutex_lock(&fLock);
622 
623 			if (error != B_OK)
624 				RETURN_ERROR(error);
625 		}
626 
627 		if (IsWriteShutdown())
628 			RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
629 
630 		if (IsReadShutdown())
631 			RETURN_ERROR(EPIPE);
632 
633 		// write as much as we can
634 		error = fBuffer.Write(request);
635 
636 		if (error == B_OK) {
637 // TODO: Whenever we've successfully written a part, we should reset the
638 // timeout!
639 		}
640 	}
641 
642 	RETURN_ERROR(error);
643 }
644 
645 
646 status_t
647 UnixFifo::_WriteNonBlocking(UnixRequest& request)
648 {
649 	// We need to be first in queue and space should be available right now,
650 	// otherwise we need to fail.
651 	if (fWriters.Head() != &request || fBuffer.Writable() < _MinimumWritableSize(request))
652 		RETURN_ERROR(B_WOULD_BLOCK);
653 
654 	if (request.TotalSize() == 0)
655 		return 0;
656 
657 	// Write as much as we can.
658 	RETURN_ERROR(fBuffer.Write(request));
659 }
660 
661 
662 size_t
663 UnixFifo::_MinimumWritableSize(const UnixRequest& request) const
664 {
665 	switch (fType) {
666 		case UnixFifoType::Datagram:
667 			return request.TotalSize();
668 		case UnixFifoType::Stream:
669 		default:
670 			return 1;
671 	}
672 }
673