xref: /haiku/src/add-ons/kernel/network/protocols/unix/UnixFifo.cpp (revision 3634f142352af2428aed187781fc9d75075e9140)
1 /*
2  * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
3  * Distributed under the terms of the MIT License.
4  */
5 
6 #include "UnixFifo.h"
7 
8 #include <new>
9 
10 #include <AutoDeleter.h>
11 
12 #include <net_stack.h>
13 #include <util/ring_buffer.h>
14 
15 #include "unix.h"
16 
17 
18 #define UNIX_FIFO_DEBUG_LEVEL	0
19 #define UNIX_DEBUG_LEVEL		UNIX_FIFO_DEBUG_LEVEL
20 #include "UnixDebug.h"
21 
22 
23 // #pragma mark - UnixRequest
24 
25 
26 UnixRequest::UnixRequest(const iovec* vecs, size_t count,
27 		ancillary_data_container* ancillaryData,
28 		struct sockaddr_storage* address)
29 	:
30 	fVecs(vecs),
31 	fVecCount(count),
32 	fAncillaryData(ancillaryData),
33 	fTotalSize(0),
34 	fBytesTransferred(0),
35 	fVecIndex(0),
36 	fVecOffset(0),
37 	fAddress(address)
38 {
39 	for (size_t i = 0; i < fVecCount; i++)
40 		fTotalSize += fVecs[i].iov_len;
41 }
42 
43 
44 void
45 UnixRequest::AddBytesTransferred(size_t size)
46 {
47 	fBytesTransferred += size;
48 
49 	// also adjust the current iovec index/offset
50 	while (fVecIndex < fVecCount
51 			&& fVecs[fVecIndex].iov_len - fVecOffset <= size) {
52 		size -= fVecs[fVecIndex].iov_len - fVecOffset;
53 		fVecIndex++;
54 		fVecOffset = 0;
55 	}
56 
57 	if (fVecIndex < fVecCount)
58 		fVecOffset += size;
59 }
60 
61 
62 bool
63 UnixRequest::GetCurrentChunk(void*& data, size_t& size)
64 {
65 	while (fVecIndex < fVecCount
66 			&& fVecOffset >= fVecs[fVecIndex].iov_len) {
67 		fVecIndex++;
68 		fVecOffset = 0;
69 	}
70 	if (fVecIndex >= fVecCount)
71 		return false;
72 
73 	data = (uint8*)fVecs[fVecIndex].iov_base + fVecOffset;
74 	size = fVecs[fVecIndex].iov_len - fVecOffset;
75 	return true;
76 }
77 
78 
79 void
80 UnixRequest::SetAncillaryData(ancillary_data_container* data)
81 {
82 	fAncillaryData = data;
83 }
84 
85 
86 void
87 UnixRequest::AddAncillaryData(ancillary_data_container* data)
88 {
89 	if (fAncillaryData != NULL) {
90 		gStackModule->move_ancillary_data(data, fAncillaryData);
91 		gStackModule->delete_ancillary_data_container(data);
92 	} else
93 		fAncillaryData = data;
94 }
95 
96 
97 // #pragma mark - UnixBufferQueue
98 
99 
100 UnixBufferQueue::UnixBufferQueue(size_t capacity, UnixFifoType type)
101 	:
102 	fBuffer(NULL),
103 	fCapacity(capacity),
104 	fType(type)
105 {
106 }
107 
108 
109 UnixBufferQueue::~UnixBufferQueue()
110 {
111 	while (AncillaryDataEntry* entry = fAncillaryData.RemoveHead()) {
112 		gStackModule->delete_ancillary_data_container(entry->data);
113 		delete entry;
114 	}
115 
116 	delete_ring_buffer(fBuffer);
117 }
118 
119 
120 status_t
121 UnixBufferQueue::Init()
122 {
123 	fBuffer = create_ring_buffer(fCapacity);
124 	if (fBuffer == NULL)
125 		return B_NO_MEMORY;
126 	return B_OK;
127 }
128 
129 
130 size_t
131 UnixBufferQueue::Readable() const
132 {
133 	return ring_buffer_readable(fBuffer);
134 }
135 
136 
137 size_t
138 UnixBufferQueue::Writable() const
139 {
140 	return ring_buffer_writable(fBuffer);
141 }
142 
143 
144 status_t
145 UnixBufferQueue::Read(UnixRequest& request)
146 {
147 	bool user = gStackModule->is_syscall();
148 
149 	size_t readable = Readable();
150 	void* data;
151 	size_t size;
152 
153 	DatagramEntry* datagramEntry = NULL;
154 	if (fType == UnixFifoType::Datagram) {
155 		datagramEntry = fDatagrams.Head();
156 		if (datagramEntry == NULL)
157 			return B_ERROR;
158 
159 		if (datagramEntry->size > readable)
160 			TRACE("UnixBufferQueue::Read(): expected to read a datagram of size %lu, "
161 				"but only %lu bytes are readable\n", datagramEntry->size, readable);
162 		else
163 			readable = datagramEntry->size;
164 	}
165 
166 	while (readable > 0 && request.GetCurrentChunk(data, size)) {
167 		if (size > readable)
168 			size = readable;
169 
170 		ssize_t bytesRead;
171 		if (user)
172 			bytesRead = ring_buffer_user_read(fBuffer, (uint8*)data, size);
173 		else
174 			bytesRead = ring_buffer_read(fBuffer, (uint8*)data, size);
175 
176 		if (bytesRead < 0)
177 			return bytesRead;
178 		if (bytesRead == 0)
179 			return B_ERROR;
180 
181 		// Adjust ancillary data entry offsets, respectively attach the ones
182 		// that belong to the read data to the request.
183 		if (AncillaryDataEntry* entry = fAncillaryData.Head()) {
184 			size_t offsetDelta = bytesRead;
185 			while (entry != NULL && offsetDelta > entry->offset) {
186 				// entry data have been read -- add ancillary data to request
187 				fAncillaryData.RemoveHead();
188 				offsetDelta -= entry->offset;
189 				request.AddAncillaryData(entry->data);
190 				delete entry;
191 
192 				entry = fAncillaryData.Head();
193 			}
194 
195 			if (entry != NULL)
196 				entry->offset -= offsetDelta;
197 		}
198 
199 		request.AddBytesTransferred(bytesRead);
200 		readable -= bytesRead;
201 	}
202 
203 	if (fType == UnixFifoType::Datagram) {
204 		fDatagrams.RemoveHead();
205 
206 		memcpy(request.Address(), &datagramEntry->address, sizeof(datagramEntry->address));
207 		delete datagramEntry;
208 
209 		if (readable > 0) {
210 			ring_buffer_flush(fBuffer, readable);
211 			if (AncillaryDataEntry* entry = fAncillaryData.Head()) {
212 				size_t offsetDelta = readable;
213 				while (entry != NULL && offsetDelta > entry->offset) {
214 					fAncillaryData.RemoveHead();
215 					offsetDelta -= entry->offset;
216 					delete entry;
217 
218 					entry = fAncillaryData.Head();
219 				}
220 
221 				if (entry != NULL)
222 					entry->offset -= offsetDelta;
223 			}
224 		}
225 	}
226 
227 	return B_OK;
228 }
229 
230 
231 status_t
232 UnixBufferQueue::Write(UnixRequest& request)
233 {
234 	bool user = gStackModule->is_syscall();
235 
236 	size_t writable = Writable();
237 	void* data;
238 	size_t size;
239 
240 	DatagramEntry* datagramEntry = NULL;
241 	ObjectDeleter<DatagramEntry> datagramEntryDeleter;
242 	if (fType == UnixFifoType::Datagram) {
243 		datagramEntry = new(std::nothrow) DatagramEntry;
244 		if (datagramEntry == NULL)
245 			return B_NO_MEMORY;
246 
247 		datagramEntryDeleter.SetTo(datagramEntry);
248 		memcpy(&datagramEntry->address, request.Address(),
249 			sizeof(datagramEntry->address));
250 		datagramEntry->size = request.TotalSize();
251 
252 		// This should have been handled in UnixFifo
253 		if (writable < datagramEntry->size) {
254 			TRACE("UnixBufferQueue::Write(): not enough space for"
255 				"datagram of size %lu (%lu bytes left)\n", datagramEntry->size, writable);
256 			return B_ERROR;
257 		}
258 	}
259 
260 	// If the request has ancillary data create an entry first.
261 	AncillaryDataEntry* ancillaryEntry = NULL;
262 	ObjectDeleter<AncillaryDataEntry> ancillaryEntryDeleter;
263 	if (writable > 0 && request.AncillaryData() != NULL) {
264 		ancillaryEntry = new(std::nothrow) AncillaryDataEntry;
265 		if (ancillaryEntry == NULL)
266 			return B_NO_MEMORY;
267 
268 		ancillaryEntryDeleter.SetTo(ancillaryEntry);
269 		ancillaryEntry->data = request.AncillaryData();
270 		ancillaryEntry->offset = Readable();
271 
272 		// The offsets are relative to the previous entry.
273 		AncillaryDataList::Iterator it = fAncillaryData.GetIterator();
274 		while (AncillaryDataEntry* entry = it.Next())
275 			ancillaryEntry->offset -= entry->offset;
276 			// TODO: This is inefficient when the list is long. Rather also
277 			// store and maintain the absolute offset of the last queued entry.
278 	}
279 
280 	// write as much as we can
281 	while (writable > 0 && request.GetCurrentChunk(data, size)) {
282 		if (size > writable)
283 			size = writable;
284 
285 		ssize_t bytesWritten;
286 		if (user)
287 			bytesWritten = ring_buffer_user_write(fBuffer, (uint8*)data, size);
288 		else
289 			bytesWritten = ring_buffer_write(fBuffer, (uint8*)data, size);
290 
291 		if (bytesWritten < 0)
292 			return bytesWritten;
293 		if (bytesWritten == 0)
294 			return B_ERROR;
295 
296 		if (ancillaryEntry != NULL) {
297 			fAncillaryData.Add(ancillaryEntry);
298 			ancillaryEntryDeleter.Detach();
299 			request.SetAncillaryData(NULL);
300 			ancillaryEntry = NULL;
301 		}
302 
303 		request.AddBytesTransferred(bytesWritten);
304 		writable -= bytesWritten;
305 	}
306 
307 	if (fType == UnixFifoType::Datagram) {
308 		fDatagrams.Add(datagramEntry);
309 		datagramEntryDeleter.Detach();
310 	}
311 
312 	return B_OK;
313 }
314 
315 
316 status_t
317 UnixBufferQueue::SetCapacity(size_t capacity)
318 {
319 	if (capacity <= fCapacity)
320 		return B_OK;
321 
322 	ring_buffer* newBuffer = create_ring_buffer(capacity);
323 	if (newBuffer == NULL)
324 		return B_NO_MEMORY;
325 
326 	ring_buffer_move(newBuffer, ring_buffer_readable(fBuffer), fBuffer);
327 	delete_ring_buffer(fBuffer);
328 
329 	fBuffer = newBuffer;
330 	fCapacity = capacity;
331 
332 	return B_OK;
333 }
334 
335 
336 // #pragma mark -
337 
338 
339 UnixFifo::UnixFifo(size_t capacity, UnixFifoType type)
340 	:
341 	fBuffer(capacity, type),
342 	fReaders(),
343 	fWriters(),
344 	fReadRequested(0),
345 	fWriteRequested(0),
346 	fShutdown(0)
347 
348 {
349 	fReadCondition.Init(this, "unix fifo read");
350 	fWriteCondition.Init(this, "unix fifo write");
351 	mutex_init(&fLock, "unix fifo");
352 }
353 
354 
355 UnixFifo::~UnixFifo()
356 {
357 	mutex_destroy(&fLock);
358 }
359 
360 
361 status_t
362 UnixFifo::Init()
363 {
364 	return fBuffer.Init();
365 }
366 
367 
368 void
369 UnixFifo::Shutdown(uint32 shutdown)
370 {
371 	TRACE("[%" B_PRId32 "] %p->UnixFifo::Shutdown(0x%" B_PRIx32 ")\n",
372 		find_thread(NULL), this, shutdown);
373 
374 	fShutdown |= shutdown;
375 
376 	if (shutdown != 0) {
377 		// Shutting down either end also effects the other, so notify both.
378 		fReadCondition.NotifyAll();
379 		fWriteCondition.NotifyAll();
380 	}
381 }
382 
383 
384 ssize_t
385 UnixFifo::Read(const iovec* vecs, size_t vecCount,
386 	ancillary_data_container** _ancillaryData,
387 	struct sockaddr_storage* address, bigtime_t timeout)
388 {
389 	TRACE("[%" B_PRId32 "] %p->UnixFifo::Read(%p, %ld, %" B_PRIdBIGTIME ")\n",
390 		find_thread(NULL), this, vecs, vecCount, timeout);
391 
392 	if (IsReadShutdown() && fBuffer.Readable() == 0)
393 		RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
394 
395 	UnixRequest request(vecs, vecCount, NULL, address);
396 	fReaders.Add(&request);
397 	fReadRequested += request.TotalSize();
398 
399 	status_t error = _Read(request, timeout);
400 
401 	bool firstInQueue = fReaders.Head() == &request;
402 	fReaders.Remove(&request);
403 	fReadRequested -= request.TotalSize();
404 
405 	if (firstInQueue && !fReaders.IsEmpty() && fBuffer.Readable() > 0
406 			&& !IsReadShutdown()) {
407 		// There's more to read, other readers, and we were first in the queue.
408 		// So we need to notify the others.
409 		fReadCondition.NotifyAll();
410 	}
411 
412 	if (request.BytesTransferred() > 0 && !fWriters.IsEmpty()
413 			&& !IsWriteShutdown()) {
414 		// We read something and there are writers. Notify them
415 		fWriteCondition.NotifyAll();
416 	}
417 
418 	*_ancillaryData = request.AncillaryData();
419 
420 	if (request.BytesTransferred() > 0) {
421 		if (request.BytesTransferred() > SSIZE_MAX)
422 			RETURN_ERROR(SSIZE_MAX);
423 		RETURN_ERROR((ssize_t)request.BytesTransferred());
424 	}
425 
426 	RETURN_ERROR(error);
427 }
428 
429 
430 ssize_t
431 UnixFifo::Write(const iovec* vecs, size_t vecCount,
432 	ancillary_data_container* ancillaryData,
433 	const struct sockaddr_storage* address, bigtime_t timeout)
434 {
435 	TRACE("[%" B_PRId32 "] %p->UnixFifo::Write(%p, %ld, %p, %" B_PRIdBIGTIME
436 		")\n", find_thread(NULL), this, vecs, vecCount, ancillaryData,
437 		timeout);
438 
439 	if (IsWriteShutdown())
440 		RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
441 
442 	if (IsReadShutdown())
443 		RETURN_ERROR(EPIPE);
444 
445 	UnixRequest request(vecs, vecCount, ancillaryData,
446 		(struct sockaddr_storage*)address);
447 	fWriters.Add(&request);
448 	fWriteRequested += request.TotalSize();
449 
450 	status_t error = _Write(request, timeout);
451 
452 	bool firstInQueue = fWriters.Head() == &request;
453 	fWriters.Remove(&request);
454 	fWriteRequested -= request.TotalSize();
455 
456 	if (firstInQueue && !fWriters.IsEmpty() && fBuffer.Writable() > 0
457 			&& !IsWriteShutdown()) {
458 		// There's more space for writing, other writers, and we were first in
459 		// the queue. So we need to notify the others.
460 		fWriteCondition.NotifyAll();
461 	}
462 
463 	if (request.BytesTransferred() > 0 && !fReaders.IsEmpty()
464 			&& !IsReadShutdown()) {
465 		// We've written something and there are readers. Notify them.
466 		fReadCondition.NotifyAll();
467 	}
468 
469 	if (request.BytesTransferred() > 0) {
470 		if (request.BytesTransferred() > SSIZE_MAX)
471 			RETURN_ERROR(SSIZE_MAX);
472 		RETURN_ERROR((ssize_t)request.BytesTransferred());
473 	}
474 
475 	RETURN_ERROR(error);
476 }
477 
478 
479 size_t
480 UnixFifo::Readable() const
481 {
482 	size_t readable = fBuffer.Readable();
483 	return (off_t)readable > fReadRequested ? readable - fReadRequested : 0;
484 }
485 
486 
487 size_t
488 UnixFifo::Writable() const
489 {
490 	size_t writable = fBuffer.Writable();
491 	return (off_t)writable > fWriteRequested ? writable - fWriteRequested : 0;
492 }
493 
494 
495 status_t
496 UnixFifo::SetBufferCapacity(size_t capacity)
497 {
498 	// check against allowed minimal/maximal value
499 	if (capacity > UNIX_FIFO_MAXIMAL_CAPACITY)
500 		capacity = UNIX_FIFO_MAXIMAL_CAPACITY;
501 	else if (capacity < UNIX_FIFO_MINIMAL_CAPACITY)
502 		capacity = UNIX_FIFO_MINIMAL_CAPACITY;
503 
504 	size_t oldCapacity = fBuffer.Capacity();
505 	if (capacity == oldCapacity)
506 		return B_OK;
507 
508 	// set capacity
509 	status_t error = fBuffer.SetCapacity(capacity);
510 	if (error != B_OK)
511 		return error;
512 
513 	// wake up waiting writers, if the capacity increased
514 	if (!fWriters.IsEmpty() && !IsWriteShutdown())
515 		fWriteCondition.NotifyAll();
516 
517 	return B_OK;
518 }
519 
520 
521 status_t
522 UnixFifo::_Read(UnixRequest& request, bigtime_t timeout)
523 {
524 	// wait for the request to reach the front of the queue
525 	if (fReaders.Head() != &request && timeout == 0)
526 		RETURN_ERROR(B_WOULD_BLOCK);
527 
528 	while (fReaders.Head() != &request
529 		&& !(IsReadShutdown() && fBuffer.Readable() == 0)) {
530 		ConditionVariableEntry entry;
531 		fReadCondition.Add(&entry);
532 
533 		mutex_unlock(&fLock);
534 		status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
535 			timeout);
536 		mutex_lock(&fLock);
537 
538 		if (error != B_OK)
539 			RETURN_ERROR(error);
540 	}
541 
542 	if (fBuffer.Readable() == 0) {
543 		if (IsReadShutdown())
544 			RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
545 
546 		if (IsWriteShutdown())
547 			RETURN_ERROR(0);
548 
549 		if (timeout == 0)
550 			RETURN_ERROR(B_WOULD_BLOCK);
551 	}
552 
553 	// wait for any data to become available
554 // TODO: Support low water marks!
555 	while (fBuffer.Readable() == 0
556 			&& !IsReadShutdown() && !IsWriteShutdown()) {
557 		ConditionVariableEntry entry;
558 		fReadCondition.Add(&entry);
559 
560 		mutex_unlock(&fLock);
561 		status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
562 			timeout);
563 		mutex_lock(&fLock);
564 
565 		if (error != B_OK)
566 			RETURN_ERROR(error);
567 	}
568 
569 	if (fBuffer.Readable() == 0) {
570 		if (IsReadShutdown())
571 			RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
572 		if (IsWriteShutdown())
573 			RETURN_ERROR(0);
574 	}
575 
576 	RETURN_ERROR(fBuffer.Read(request));
577 }
578 
579 
580 status_t
581 UnixFifo::_Write(UnixRequest& request, bigtime_t timeout)
582 {
583 	if (timeout == 0)
584 		RETURN_ERROR(_WriteNonBlocking(request));
585 
586 	// wait for the request to reach the front of the queue
587 	while (fWriters.Head() != &request && !IsWriteShutdown()) {
588 		ConditionVariableEntry entry;
589 		fWriteCondition.Add(&entry);
590 
591 		mutex_unlock(&fLock);
592 		status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
593 			timeout);
594 		mutex_lock(&fLock);
595 
596 		if (error != B_OK)
597 			RETURN_ERROR(error);
598 	}
599 
600 	if (IsWriteShutdown())
601 		RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
602 
603 	if (IsReadShutdown())
604 		RETURN_ERROR(EPIPE);
605 
606 	if (request.TotalSize() == 0)
607 		return 0;
608 
609 	status_t error = B_OK;
610 
611 	while (error == B_OK && request.BytesRemaining() > 0) {
612 		// wait for any space to become available
613 		while (error == B_OK && fBuffer.Writable() < _MinimumWritableSize(request)
614 				&& !IsWriteShutdown() && !IsReadShutdown()) {
615 			ConditionVariableEntry entry;
616 			fWriteCondition.Add(&entry);
617 
618 			mutex_unlock(&fLock);
619 			error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
620 			mutex_lock(&fLock);
621 
622 			if (error != B_OK)
623 				RETURN_ERROR(error);
624 		}
625 
626 		if (IsWriteShutdown())
627 			RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
628 
629 		if (IsReadShutdown())
630 			RETURN_ERROR(EPIPE);
631 
632 		// write as much as we can
633 		error = fBuffer.Write(request);
634 
635 		if (error == B_OK) {
636 // TODO: Whenever we've successfully written a part, we should reset the
637 // timeout!
638 		}
639 	}
640 
641 	RETURN_ERROR(error);
642 }
643 
644 
645 status_t
646 UnixFifo::_WriteNonBlocking(UnixRequest& request)
647 {
648 	// We need to be first in queue and space should be available right now,
649 	// otherwise we need to fail.
650 	if (fWriters.Head() != &request || fBuffer.Writable() < _MinimumWritableSize(request))
651 		RETURN_ERROR(B_WOULD_BLOCK);
652 
653 	if (request.TotalSize() == 0)
654 		return 0;
655 
656 	// Write as much as we can.
657 	RETURN_ERROR(fBuffer.Write(request));
658 }
659 
660 
661 size_t
662 UnixFifo::_MinimumWritableSize(const UnixRequest& request) const
663 {
664 	switch (fType) {
665 		case UnixFifoType::Datagram:
666 			return request.TotalSize();
667 		case UnixFifoType::Stream:
668 		default:
669 			return 1;
670 	}
671 }
672