xref: /haiku/src/add-ons/kernel/network/protocols/unix/UnixFifo.cpp (revision e81a954787e50e56a7f06f72705b7859b6ab06d1)
1 /*
2  * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
3  * Distributed under the terms of the MIT License.
4  */
5 
6 #include "UnixFifo.h"
7 
8 #include <new>
9 
10 #include <AutoDeleter.h>
11 
12 #include <net_stack.h>
13 #include <util/ring_buffer.h>
14 
15 #include "unix.h"
16 
17 
18 #define UNIX_FIFO_DEBUG_LEVEL	0
19 #define UNIX_DEBUG_LEVEL		UNIX_FIFO_DEBUG_LEVEL
20 #include "UnixDebug.h"
21 
22 
23 // #pragma mark - UnixRequest
24 
25 
26 UnixRequest::UnixRequest(const iovec* vecs, size_t count,
27 		ancillary_data_container* ancillaryData)
28 	:
29 	fVecs(vecs),
30 	fVecCount(count),
31 	fAncillaryData(ancillaryData),
32 	fTotalSize(0),
33 	fBytesTransferred(0),
34 	fVecIndex(0),
35 	fVecOffset(0)
36 {
37 	for (size_t i = 0; i < fVecCount; i++)
38 		fTotalSize += fVecs[i].iov_len;
39 }
40 
41 
42 void
43 UnixRequest::AddBytesTransferred(size_t size)
44 {
45 	fBytesTransferred += size;
46 
47 	// also adjust the current iovec index/offset
48 	while (fVecIndex < fVecCount
49 			&& fVecs[fVecIndex].iov_len - fVecOffset <= size) {
50 		size -= fVecs[fVecIndex].iov_len - fVecOffset;
51 		fVecIndex++;
52 		fVecOffset = 0;
53 	}
54 
55 	if (fVecIndex < fVecCount)
56 		fVecOffset += size;
57 }
58 
59 
60 bool
61 UnixRequest::GetCurrentChunk(void*& data, size_t& size)
62 {
63 	while (fVecIndex < fVecCount
64 			&& fVecOffset >= fVecs[fVecIndex].iov_len) {
65 		fVecIndex++;
66 		fVecOffset = 0;
67 	}
68 	if (fVecIndex >= fVecCount)
69 		return false;
70 
71 	data = (uint8*)fVecs[fVecIndex].iov_base + fVecOffset;
72 	size = fVecs[fVecIndex].iov_len - fVecOffset;
73 	return true;
74 }
75 
76 
77 void
78 UnixRequest::SetAncillaryData(ancillary_data_container* data)
79 {
80 	fAncillaryData = data;
81 }
82 
83 
84 void
85 UnixRequest::AddAncillaryData(ancillary_data_container* data)
86 {
87 	if (fAncillaryData != NULL) {
88 		gStackModule->move_ancillary_data(data, fAncillaryData);
89 		gStackModule->delete_ancillary_data_container(data);
90 	} else
91 		fAncillaryData = data;
92 }
93 
94 
95 // #pragma mark - UnixBufferQueue
96 
97 
98 UnixBufferQueue::UnixBufferQueue(size_t capacity)
99 	:
100 	fBuffer(NULL),
101 	fCapacity(capacity)
102 {
103 }
104 
105 
106 UnixBufferQueue::~UnixBufferQueue()
107 {
108 	while (AncillaryDataEntry* entry = fAncillaryData.RemoveHead()) {
109 		gStackModule->delete_ancillary_data_container(entry->data);
110 		delete entry;
111 	}
112 
113 	delete_ring_buffer(fBuffer);
114 }
115 
116 
117 status_t
118 UnixBufferQueue::Init()
119 {
120 	fBuffer = create_ring_buffer(fCapacity);
121 	if (fBuffer == NULL)
122 		return B_NO_MEMORY;
123 	return B_OK;
124 }
125 
126 
127 size_t
128 UnixBufferQueue::Readable() const
129 {
130 	return ring_buffer_readable(fBuffer);
131 }
132 
133 
134 size_t
135 UnixBufferQueue::Writable() const
136 {
137 	return ring_buffer_writable(fBuffer);
138 }
139 
140 
141 status_t
142 UnixBufferQueue::Read(UnixRequest& request)
143 {
144 	bool user = gStackModule->is_syscall();
145 
146 	size_t readable = Readable();
147 	void* data;
148 	size_t size;
149 
150 	while (readable > 0 && request.GetCurrentChunk(data, size)) {
151 		if (size > readable)
152 			size = readable;
153 
154 		ssize_t bytesRead;
155 		if (user)
156 			bytesRead = ring_buffer_user_read(fBuffer, (uint8*)data, size);
157 		else
158 			bytesRead = ring_buffer_read(fBuffer, (uint8*)data, size);
159 
160 		if (bytesRead < 0)
161 			return bytesRead;
162 		if (bytesRead == 0)
163 			return B_ERROR;
164 
165 		// Adjust ancillary data entry offsets, respectively attach the ones
166 		// that belong to the read data to the request.
167 		if (AncillaryDataEntry* entry = fAncillaryData.Head()) {
168 			size_t offsetDelta = bytesRead;
169 			while (entry != NULL && offsetDelta > entry->offset) {
170 				// entry data have been read -- add ancillary data to request
171 				fAncillaryData.RemoveHead();
172 				offsetDelta -= entry->offset;
173 				request.AddAncillaryData(entry->data);
174 				delete entry;
175 
176 				entry = fAncillaryData.Head();
177 			}
178 
179 			if (entry != NULL)
180 				entry->offset -= offsetDelta;
181 		}
182 
183 		request.AddBytesTransferred(bytesRead);
184 		readable -= bytesRead;
185 	}
186 
187 	return B_OK;
188 }
189 
190 
191 status_t
192 UnixBufferQueue::Write(UnixRequest& request)
193 {
194 	bool user = gStackModule->is_syscall();
195 
196 	size_t writable = Writable();
197 	void* data;
198 	size_t size;
199 
200 	// If the request has ancillary data create an entry first.
201 	AncillaryDataEntry* ancillaryEntry = NULL;
202 	ObjectDeleter<AncillaryDataEntry> ancillaryEntryDeleter;
203 	if (writable > 0 && request.AncillaryData() != NULL) {
204 		ancillaryEntry = new(std::nothrow) AncillaryDataEntry;
205 		if (ancillaryEntry == NULL)
206 			return B_NO_MEMORY;
207 
208 		ancillaryEntryDeleter.SetTo(ancillaryEntry);
209 		ancillaryEntry->data = request.AncillaryData();
210 		ancillaryEntry->offset = Readable();
211 
212 		// The offsets are relative to the previous entry.
213 		AncillaryDataList::Iterator it = fAncillaryData.GetIterator();
214 		while (AncillaryDataEntry* entry = it.Next())
215 			ancillaryEntry->offset -= entry->offset;
216 			// TODO: This is inefficient when the list is long. Rather also
217 			// store and maintain the absolute offset of the last queued entry.
218 	}
219 
220 	// write as much as we can
221 	while (writable > 0 && request.GetCurrentChunk(data, size)) {
222 		if (size > writable)
223 			size = writable;
224 
225 		ssize_t bytesWritten;
226 		if (user)
227 			bytesWritten = ring_buffer_user_write(fBuffer, (uint8*)data, size);
228 		else
229 			bytesWritten = ring_buffer_write(fBuffer, (uint8*)data, size);
230 
231 		if (bytesWritten < 0)
232 			return bytesWritten;
233 		if (bytesWritten == 0)
234 			return B_ERROR;
235 
236 		if (ancillaryEntry != NULL) {
237 			fAncillaryData.Add(ancillaryEntry);
238 			ancillaryEntryDeleter.Detach();
239 			request.SetAncillaryData(NULL);
240 			ancillaryEntry = NULL;
241 		}
242 
243 		request.AddBytesTransferred(bytesWritten);
244 		writable -= bytesWritten;
245 	}
246 
247 	return B_OK;
248 }
249 
250 
251 status_t
252 UnixBufferQueue::SetCapacity(size_t capacity)
253 {
254 // TODO:...
255 return B_ERROR;
256 }
257 
258 
259 // #pragma mark -
260 
261 
262 UnixFifo::UnixFifo(size_t capacity)
263 	:
264 	fBuffer(capacity),
265 	fReaders(),
266 	fWriters(),
267 	fReadRequested(0),
268 	fWriteRequested(0),
269 	fShutdown(0)
270 
271 {
272 	fReadCondition.Init(this, "unix fifo read");
273 	fWriteCondition.Init(this, "unix fifo write");
274 	mutex_init(&fLock, "unix fifo");
275 }
276 
277 
278 UnixFifo::~UnixFifo()
279 {
280 	mutex_destroy(&fLock);
281 }
282 
283 
284 status_t
285 UnixFifo::Init()
286 {
287 	return fBuffer.Init();
288 }
289 
290 
291 void
292 UnixFifo::Shutdown(uint32 shutdown)
293 {
294 	TRACE("[%ld] %p->UnixFifo::Shutdown(0x%lx)\n", find_thread(NULL), this,
295 		shutdown);
296 
297 	fShutdown |= shutdown;
298 
299 	if (shutdown != 0) {
300 		// Shutting down either end also effects the other, so notify both.
301 		fReadCondition.NotifyAll();
302 		fWriteCondition.NotifyAll();
303 	}
304 }
305 
306 
307 ssize_t
308 UnixFifo::Read(const iovec* vecs, size_t vecCount,
309 	ancillary_data_container** _ancillaryData, bigtime_t timeout)
310 {
311 	TRACE("[%ld] %p->UnixFifo::Read(%p, %ld, %lld)\n", find_thread(NULL),
312 		this, vecs, vecCount, timeout);
313 
314 	if (IsReadShutdown() && fBuffer.Readable() == 0)
315 		RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
316 
317 	UnixRequest request(vecs, vecCount, NULL);
318 	fReaders.Add(&request);
319 	fReadRequested += request.TotalSize();
320 
321 	status_t error = _Read(request, timeout);
322 
323 	bool firstInQueue = fReaders.Head() == &request;
324 	fReaders.Remove(&request);
325 	fReadRequested -= request.TotalSize();
326 
327 	if (firstInQueue && !fReaders.IsEmpty() && fBuffer.Readable() > 0
328 			&& !IsReadShutdown()) {
329 		// There's more to read, other readers, and we were first in the queue.
330 		// So we need to notify the others.
331 		fReadCondition.NotifyAll();
332 	}
333 
334 	if (request.BytesTransferred() > 0 && !fWriters.IsEmpty()
335 			&& !IsWriteShutdown()) {
336 		// We read something and there are writers. Notify them
337 		fWriteCondition.NotifyAll();
338 	}
339 
340 	*_ancillaryData = request.AncillaryData();
341 
342 	if (request.BytesTransferred() > 0) {
343 		if (request.BytesTransferred() > SSIZE_MAX)
344 			RETURN_ERROR(SSIZE_MAX);
345 		RETURN_ERROR((ssize_t)request.BytesTransferred());
346 	}
347 
348 	RETURN_ERROR(error);
349 }
350 
351 
352 ssize_t
353 UnixFifo::Write(const iovec* vecs, size_t vecCount,
354 	ancillary_data_container* ancillaryData, bigtime_t timeout)
355 {
356 	TRACE("[%ld] %p->UnixFifo::Write(%p, %ld, %p, %lld)\n", find_thread(NULL),
357 		this, vecs, vecCount, ancillaryData, timeout);
358 
359 	if (IsWriteShutdown())
360 		RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
361 
362 	if (IsReadShutdown())
363 		RETURN_ERROR(EPIPE);
364 
365 	UnixRequest request(vecs, vecCount, ancillaryData);
366 	fWriters.Add(&request);
367 	fWriteRequested += request.TotalSize();
368 
369 	status_t error = _Write(request, timeout);
370 
371 	bool firstInQueue = fWriters.Head() == &request;
372 	fWriters.Remove(&request);
373 	fWriteRequested -= request.TotalSize();
374 
375 	if (firstInQueue && !fWriters.IsEmpty() && fBuffer.Writable() > 0
376 			&& !IsWriteShutdown()) {
377 		// There's more space for writing, other writers, and we were first in
378 		// the queue. So we need to notify the others.
379 		fWriteCondition.NotifyAll();
380 	}
381 
382 	if (request.BytesTransferred() > 0 && !fReaders.IsEmpty()
383 			&& !IsReadShutdown()) {
384 		// We've written something and there are readers. Notify them.
385 		fReadCondition.NotifyAll();
386 	}
387 
388 	if (request.BytesTransferred() > 0) {
389 		if (request.BytesTransferred() > SSIZE_MAX)
390 			RETURN_ERROR(SSIZE_MAX);
391 		RETURN_ERROR((ssize_t)request.BytesTransferred());
392 	}
393 
394 	RETURN_ERROR(error);
395 }
396 
397 
398 size_t
399 UnixFifo::Readable() const
400 {
401 	size_t readable = fBuffer.Readable();
402 	return (off_t)readable > fReadRequested ? readable - fReadRequested : 0;
403 }
404 
405 
406 size_t
407 UnixFifo::Writable() const
408 {
409 	size_t writable = fBuffer.Writable();
410 	return (off_t)writable > fWriteRequested ? writable - fWriteRequested : 0;
411 }
412 
413 
414 status_t
415 UnixFifo::SetBufferCapacity(size_t capacity)
416 {
417 	// check against allowed minimal/maximal value
418 	if (capacity > UNIX_FIFO_MAXIMAL_CAPACITY)
419 		capacity = UNIX_FIFO_MAXIMAL_CAPACITY;
420 	else if (capacity < UNIX_FIFO_MINIMAL_CAPACITY)
421 		capacity = UNIX_FIFO_MINIMAL_CAPACITY;
422 
423 	size_t oldCapacity = fBuffer.Capacity();
424 	if (capacity == oldCapacity)
425 		return B_OK;
426 
427 	// set capacity
428 	status_t error = fBuffer.SetCapacity(capacity);
429 	if (error != B_OK)
430 		return error;
431 
432 	// wake up waiting writers, if the capacity increased
433 	if (!fWriters.IsEmpty() && !IsWriteShutdown())
434 		fWriteCondition.NotifyAll();
435 
436 	return B_OK;
437 }
438 
439 
440 status_t
441 UnixFifo::_Read(UnixRequest& request, bigtime_t timeout)
442 {
443 	// wait for the request to reach the front of the queue
444 	if (fReaders.Head() != &request && timeout == 0)
445 		RETURN_ERROR(B_WOULD_BLOCK);
446 
447 	while (fReaders.Head() != &request
448 		&& !(IsReadShutdown() && fBuffer.Readable() == 0)) {
449 		ConditionVariableEntry entry;
450 		fReadCondition.Add(&entry);
451 
452 		mutex_unlock(&fLock);
453 		status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
454 			timeout);
455 		mutex_lock(&fLock);
456 
457 		if (error != B_OK)
458 			RETURN_ERROR(error);
459 	}
460 
461 	if (fBuffer.Readable() == 0) {
462 		if (IsReadShutdown())
463 			RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
464 
465 		if (IsWriteShutdown())
466 			RETURN_ERROR(0);
467 
468 		if (timeout == 0)
469 			RETURN_ERROR(B_WOULD_BLOCK);
470 	}
471 
472 	// wait for any data to become available
473 // TODO: Support low water marks!
474 	while (fBuffer.Readable() == 0
475 			&& !IsReadShutdown() && !IsWriteShutdown()) {
476 		ConditionVariableEntry entry;
477 		fReadCondition.Add(&entry);
478 
479 		mutex_unlock(&fLock);
480 		status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
481 			timeout);
482 		mutex_lock(&fLock);
483 
484 		if (error != B_OK)
485 			RETURN_ERROR(error);
486 	}
487 
488 	if (fBuffer.Readable() == 0) {
489 		if (IsReadShutdown())
490 			RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
491 		if (IsWriteShutdown())
492 			RETURN_ERROR(0);
493 	}
494 
495 	RETURN_ERROR(fBuffer.Read(request));
496 }
497 
498 
499 status_t
500 UnixFifo::_Write(UnixRequest& request, bigtime_t timeout)
501 {
502 	if (timeout == 0)
503 		RETURN_ERROR(_WriteNonBlocking(request));
504 
505 	// wait for the request to reach the front of the queue
506 	while (fWriters.Head() != &request && !IsWriteShutdown()) {
507 		ConditionVariableEntry entry;
508 		fWriteCondition.Add(&entry);
509 
510 		mutex_unlock(&fLock);
511 		status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
512 			timeout);
513 		mutex_lock(&fLock);
514 
515 		if (error != B_OK)
516 			RETURN_ERROR(error);
517 	}
518 
519 	if (IsWriteShutdown())
520 		RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
521 
522 	if (IsReadShutdown())
523 		RETURN_ERROR(EPIPE);
524 
525 	if (request.TotalSize() == 0)
526 		return 0;
527 
528 	status_t error = B_OK;
529 
530 	while (error == B_OK && request.BytesRemaining() > 0) {
531 		// wait for any space to become available
532 		while (error == B_OK && fBuffer.Writable() == 0 && !IsWriteShutdown()
533 				&& !IsReadShutdown()) {
534 			ConditionVariableEntry entry;
535 			fWriteCondition.Add(&entry);
536 
537 			mutex_unlock(&fLock);
538 			error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
539 			mutex_lock(&fLock);
540 
541 			if (error != B_OK)
542 				RETURN_ERROR(error);
543 		}
544 
545 		if (IsWriteShutdown())
546 			RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
547 
548 		if (IsReadShutdown())
549 			RETURN_ERROR(EPIPE);
550 
551 		// write as much as we can
552 		error = fBuffer.Write(request);
553 
554 		if (error == B_OK) {
555 // TODO: Whenever we've successfully written a part, we should reset the
556 // timeout!
557 		}
558 	}
559 
560 	RETURN_ERROR(error);
561 }
562 
563 
564 status_t
565 UnixFifo::_WriteNonBlocking(UnixRequest& request)
566 {
567 	// We need to be first in queue and space should be available right now,
568 	// otherwise we need to fail.
569 	if (fWriters.Head() != &request || fBuffer.Writable() == 0)
570 		RETURN_ERROR(B_WOULD_BLOCK);
571 
572 	if (request.TotalSize() == 0)
573 		return 0;
574 
575 	// Write as much as we can.
576 	RETURN_ERROR(fBuffer.Write(request));
577 }
578 
579