xref: /haiku/src/system/kernel/fs/fifo.cpp (revision 8019fdbab89c0b320e8f70583182eae2189bc913)
1 /*
2  * Copyright 2007-2013, Ingo Weinhold, ingo_weinhold@gmx.de.
3  * Copyright 2003-2010, Axel Dörfler, axeld@pinc-software.de.
4  * Distributed under the terms of the MIT License.
5  */
6 
7 
8 #include "fifo.h"
9 
10 #include <limits.h>
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <string.h>
14 #include <sys/ioctl.h>
15 #include <sys/stat.h>
16 
17 #include <new>
18 
19 #include <KernelExport.h>
20 #include <NodeMonitor.h>
21 #include <Select.h>
22 
23 #include <condition_variable.h>
24 #include <debug_hex_dump.h>
25 #include <khash.h>
26 #include <lock.h>
27 #include <select_sync_pool.h>
28 #include <syscall_restart.h>
29 #include <team.h>
30 #include <thread.h>
31 #include <util/DoublyLinkedList.h>
32 #include <util/AutoLock.h>
33 #include <util/ring_buffer.h>
34 #include <vfs.h>
35 #include <vfs_defs.h>
36 #include <vm/vm.h>
37 
38 
39 //#define TRACE_FIFO
40 #ifdef TRACE_FIFO
41 #	define TRACE(x...) dprintf(x)
42 #else
43 #	define TRACE(x...)
44 #endif
45 
46 
47 #define PIPEFS_HASH_SIZE		16
48 
49 
50 namespace fifo {
51 
52 
53 struct file_cookie;
54 class Inode;
55 
56 
57 class RingBuffer {
58 public:
59 								RingBuffer();
60 								~RingBuffer();
61 
62 			status_t			CreateBuffer();
63 			void				DeleteBuffer();
64 
65 			ssize_t				Write(const void* buffer, size_t length);
66 			ssize_t				Read(void* buffer, size_t length);
67 			ssize_t				Peek(size_t offset, void* buffer,
68 									size_t length) const;
69 			ssize_t				UserWrite(const void* buffer, ssize_t length);
70 			ssize_t				UserRead(void* buffer, ssize_t length);
71 
72 			size_t				Readable() const;
73 			size_t				Writable() const;
74 
75 private:
76 			struct ring_buffer*	fBuffer;
77 };
78 
79 
80 class ReadRequest : public DoublyLinkedListLinkImpl<ReadRequest> {
81 public:
82 	ReadRequest(file_cookie* cookie)
83 		:
84 		fThread(thread_get_current_thread()),
85 		fCookie(cookie),
86 		fNotified(true)
87 	{
88 		B_INITIALIZE_SPINLOCK(&fLock);
89 	}
90 
91 	void SetNotified(bool notified)
92 	{
93 		InterruptsSpinLocker _(fLock);
94 		fNotified = notified;
95 	}
96 
97 	void Notify(status_t status = B_OK)
98 	{
99 		InterruptsSpinLocker _(fLock);
100 		TRACE("ReadRequest %p::Notify(), fNotified %d\n", this, fNotified);
101 
102 		if (!fNotified) {
103 			SpinLocker schedulerLocker(gSchedulerLock);
104 			thread_unblock_locked(fThread, status);
105 			fNotified = true;
106 		}
107 	}
108 
109 	Thread* GetThread() const
110 	{
111 		return fThread;
112 	}
113 
114 	file_cookie* Cookie() const
115 	{
116 		return fCookie;
117 	}
118 
119 private:
120 	spinlock		fLock;
121 	Thread*			fThread;
122 	file_cookie*	fCookie;
123 	volatile bool	fNotified;
124 };
125 
126 
127 class WriteRequest : public DoublyLinkedListLinkImpl<WriteRequest> {
128 public:
129 	WriteRequest(Thread* thread, size_t minimalWriteCount)
130 		:
131 		fThread(thread),
132 		fMinimalWriteCount(minimalWriteCount)
133 	{
134 	}
135 
136 	Thread* GetThread() const
137 	{
138 		return fThread;
139 	}
140 
141 	size_t MinimalWriteCount() const
142 	{
143 		return fMinimalWriteCount;
144 	}
145 
146 private:
147 	Thread*	fThread;
148 	size_t	fMinimalWriteCount;
149 };
150 
151 
152 typedef DoublyLinkedList<ReadRequest> ReadRequestList;
153 typedef DoublyLinkedList<WriteRequest> WriteRequestList;
154 
155 
156 class Inode {
157 public:
158 								Inode();
159 								~Inode();
160 
161 			status_t			InitCheck();
162 
163 			bool				IsActive() const { return fActive; }
164 			timespec			CreationTime() const { return fCreationTime; }
165 			void				SetCreationTime(timespec creationTime)
166 									{ fCreationTime = creationTime; }
167 			timespec			ModificationTime() const
168 									{ return fModificationTime; }
169 			void				SetModificationTime(timespec modificationTime)
170 									{ fModificationTime = modificationTime; }
171 
172 			mutex*				RequestLock() { return &fRequestLock; }
173 
174 			status_t			WriteDataToBuffer(const void* data,
175 									size_t* _length, bool nonBlocking);
176 			status_t			ReadDataFromBuffer(void* data, size_t* _length,
177 									bool nonBlocking, ReadRequest& request);
178 			size_t				BytesAvailable() const
179 									{ return fBuffer.Readable(); }
180 			size_t				BytesWritable() const
181 									{ return fBuffer.Writable(); }
182 
183 			void				AddReadRequest(ReadRequest& request);
184 			void				RemoveReadRequest(ReadRequest& request);
185 			status_t			WaitForReadRequest(ReadRequest& request);
186 
187 			void				NotifyBytesRead(size_t bytes);
188 			void				NotifyReadDone();
189 			void				NotifyBytesWritten(size_t bytes);
190 			void				NotifyEndClosed(bool writer);
191 
192 			void				Open(int openMode);
193 			void				Close(file_cookie* cookie);
194 			int32				ReaderCount() const { return fReaderCount; }
195 			int32				WriterCount() const { return fWriterCount; }
196 
197 			status_t			Select(uint8 event, selectsync* sync,
198 									int openMode);
199 			status_t			Deselect(uint8 event, selectsync* sync,
200 									int openMode);
201 
202 			void				Dump(bool dumpData) const;
203 	static	int					Dump(int argc, char** argv);
204 
205 private:
206 			timespec			fCreationTime;
207 			timespec			fModificationTime;
208 
209 			RingBuffer			fBuffer;
210 
211 			ReadRequestList		fReadRequests;
212 			WriteRequestList	fWriteRequests;
213 
214 			mutex				fRequestLock;
215 
216 			ConditionVariable	fWriteCondition;
217 
218 			int32				fReaderCount;
219 			int32				fWriterCount;
220 			bool				fActive;
221 
222 			select_sync_pool*	fReadSelectSyncPool;
223 			select_sync_pool*	fWriteSelectSyncPool;
224 };
225 
226 
227 class FIFOInode : public Inode {
228 public:
229 	FIFOInode(fs_vnode* vnode)
230 		:
231 		Inode(),
232 		fSuperVnode(*vnode)
233 	{
234 	}
235 
236 	fs_vnode*	SuperVnode() { return &fSuperVnode; }
237 
238 private:
239 	fs_vnode	fSuperVnode;
240 };
241 
242 
243 struct file_cookie {
244 	int	open_mode;
245 			// guarded by Inode::fRequestLock
246 
247 	void SetNonBlocking(bool nonBlocking)
248 	{
249 		if (nonBlocking)
250 			open_mode |= O_NONBLOCK;
251 		else
252 			open_mode &= ~(int)O_NONBLOCK;
253 	}
254 };
255 
256 
257 // #pragma mark -
258 
259 
260 RingBuffer::RingBuffer()
261 	:
262 	fBuffer(NULL)
263 {
264 }
265 
266 
267 RingBuffer::~RingBuffer()
268 {
269 	DeleteBuffer();
270 }
271 
272 
273 status_t
274 RingBuffer::CreateBuffer()
275 {
276 	if (fBuffer != NULL)
277 		return B_OK;
278 
279 	fBuffer = create_ring_buffer(VFS_FIFO_BUFFER_CAPACITY);
280 	return fBuffer != NULL ? B_OK : B_NO_MEMORY;
281 }
282 
283 
284 void
285 RingBuffer::DeleteBuffer()
286 {
287 	if (fBuffer != NULL) {
288 		delete_ring_buffer(fBuffer);
289 		fBuffer = NULL;
290 	}
291 }
292 
293 
294 inline ssize_t
295 RingBuffer::Write(const void* buffer, size_t length)
296 {
297 	if (fBuffer == NULL)
298 		return B_NO_MEMORY;
299 
300 	return ring_buffer_write(fBuffer, (const uint8*)buffer, length);
301 }
302 
303 
304 inline ssize_t
305 RingBuffer::Read(void* buffer, size_t length)
306 {
307 	if (fBuffer == NULL)
308 		return B_NO_MEMORY;
309 
310 	return ring_buffer_read(fBuffer, (uint8*)buffer, length);
311 }
312 
313 
314 inline ssize_t
315 RingBuffer::Peek(size_t offset, void* buffer, size_t length) const
316 {
317 	if (fBuffer == NULL)
318 		return B_NO_MEMORY;
319 
320 	return ring_buffer_peek(fBuffer, offset, (uint8*)buffer, length);
321 }
322 
323 
324 inline ssize_t
325 RingBuffer::UserWrite(const void* buffer, ssize_t length)
326 {
327 	if (fBuffer == NULL)
328 		return B_NO_MEMORY;
329 
330 	return ring_buffer_user_write(fBuffer, (const uint8*)buffer, length);
331 }
332 
333 
334 inline ssize_t
335 RingBuffer::UserRead(void* buffer, ssize_t length)
336 {
337 	if (fBuffer == NULL)
338 		return B_NO_MEMORY;
339 
340 	return ring_buffer_user_read(fBuffer, (uint8*)buffer, length);
341 }
342 
343 
344 inline size_t
345 RingBuffer::Readable() const
346 {
347 	return fBuffer != NULL ? ring_buffer_readable(fBuffer) : 0;
348 }
349 
350 
351 inline size_t
352 RingBuffer::Writable() const
353 {
354 	return fBuffer != NULL ? ring_buffer_writable(fBuffer) : 0;
355 }
356 
357 
358 //	#pragma mark -
359 
360 
361 Inode::Inode()
362 	:
363 	fReadRequests(),
364 	fWriteRequests(),
365 	fReaderCount(0),
366 	fWriterCount(0),
367 	fActive(false),
368 	fReadSelectSyncPool(NULL),
369 	fWriteSelectSyncPool(NULL)
370 {
371 	fWriteCondition.Publish(this, "pipe");
372 	mutex_init(&fRequestLock, "pipe request");
373 
374 	bigtime_t time = real_time_clock();
375 	fModificationTime.tv_sec = time / 1000000;
376 	fModificationTime.tv_nsec = (time % 1000000) * 1000;
377 	fCreationTime = fModificationTime;
378 }
379 
380 
381 Inode::~Inode()
382 {
383 	fWriteCondition.Unpublish();
384 	mutex_destroy(&fRequestLock);
385 }
386 
387 
388 status_t
389 Inode::InitCheck()
390 {
391 	return B_OK;
392 }
393 
394 
395 /*!	Writes the specified data bytes to the inode's ring buffer. The
396 	request lock must be held when calling this method.
397 	Notifies readers if necessary, so that blocking readers will get started.
398 	Returns B_OK for success, B_BAD_ADDRESS if copying from the buffer failed,
399 	and various semaphore errors (like B_WOULD_BLOCK in non-blocking mode). If
400 	the returned length is > 0, the returned error code can be ignored.
401 */
402 status_t
403 Inode::WriteDataToBuffer(const void* _data, size_t* _length, bool nonBlocking)
404 {
405 	const uint8* data = (const uint8*)_data;
406 	size_t dataSize = *_length;
407 	size_t& written = *_length;
408 	written = 0;
409 
410 	TRACE("Inode %p::WriteDataToBuffer(data = %p, bytes = %zu)\n", this, data,
411 		dataSize);
412 
413 	// A request up to VFS_FIFO_ATOMIC_WRITE_SIZE bytes shall not be
414 	// interleaved with other writer's data.
415 	size_t minToWrite = 1;
416 	if (dataSize <= VFS_FIFO_ATOMIC_WRITE_SIZE)
417 		minToWrite = dataSize;
418 
419 	while (dataSize > 0) {
420 		// Wait until enough space in the buffer is available.
421 		while (!fActive
422 				|| (fBuffer.Writable() < minToWrite && fReaderCount > 0)) {
423 			if (nonBlocking)
424 				return B_WOULD_BLOCK;
425 
426 			ConditionVariableEntry entry;
427 			entry.Add(this);
428 
429 			WriteRequest request(thread_get_current_thread(), minToWrite);
430 			fWriteRequests.Add(&request);
431 
432 			mutex_unlock(&fRequestLock);
433 			status_t status = entry.Wait(B_CAN_INTERRUPT);
434 			mutex_lock(&fRequestLock);
435 
436 			fWriteRequests.Remove(&request);
437 
438 			if (status != B_OK)
439 				return status;
440 		}
441 
442 		// write only as long as there are readers left
443 		if (fActive && fReaderCount == 0) {
444 			if (written == 0)
445 				send_signal(find_thread(NULL), SIGPIPE);
446 			return EPIPE;
447 		}
448 
449 		// write as much as we can
450 
451 		size_t toWrite = (fActive ? fBuffer.Writable() : 0);
452 		if (toWrite > dataSize)
453 			toWrite = dataSize;
454 
455 		if (toWrite > 0 && fBuffer.UserWrite(data, toWrite) < 0)
456 			return B_BAD_ADDRESS;
457 
458 		data += toWrite;
459 		dataSize -= toWrite;
460 		written += toWrite;
461 
462 		NotifyBytesWritten(toWrite);
463 	}
464 
465 	return B_OK;
466 }
467 
468 
469 status_t
470 Inode::ReadDataFromBuffer(void* data, size_t* _length, bool nonBlocking,
471 	ReadRequest& request)
472 {
473 	size_t dataSize = *_length;
474 	*_length = 0;
475 
476 	// wait until our request is first in queue
477 	status_t error;
478 	if (fReadRequests.Head() != &request) {
479 		if (nonBlocking)
480 			return B_WOULD_BLOCK;
481 
482 		TRACE("Inode %p::%s(): wait for request %p to become the first "
483 			"request.\n", this, __FUNCTION__, &request);
484 
485 		error = WaitForReadRequest(request);
486 		if (error != B_OK)
487 			return error;
488 	}
489 
490 	// wait until data are available
491 	while (fBuffer.Readable() == 0) {
492 		if (nonBlocking)
493 			return B_WOULD_BLOCK;
494 
495 		if (fActive && fWriterCount == 0)
496 			return B_OK;
497 
498 		TRACE("Inode %p::%s(): wait for data, request %p\n", this, __FUNCTION__,
499 			&request);
500 
501 		error = WaitForReadRequest(request);
502 		if (error != B_OK)
503 			return error;
504 	}
505 
506 	// read as much as we can
507 	size_t toRead = fBuffer.Readable();
508 	if (toRead > dataSize)
509 		toRead = dataSize;
510 
511 	if (fBuffer.UserRead(data, toRead) < 0)
512 		return B_BAD_ADDRESS;
513 
514 	NotifyBytesRead(toRead);
515 
516 	*_length = toRead;
517 
518 	return B_OK;
519 }
520 
521 
522 void
523 Inode::AddReadRequest(ReadRequest& request)
524 {
525 	fReadRequests.Add(&request);
526 }
527 
528 
529 void
530 Inode::RemoveReadRequest(ReadRequest& request)
531 {
532 	fReadRequests.Remove(&request);
533 }
534 
535 
536 status_t
537 Inode::WaitForReadRequest(ReadRequest& request)
538 {
539 	// add the entry to wait on
540 	thread_prepare_to_block(thread_get_current_thread(), B_CAN_INTERRUPT,
541 		THREAD_BLOCK_TYPE_OTHER, "fifo read request");
542 
543 	request.SetNotified(false);
544 
545 	// wait
546 	mutex_unlock(&fRequestLock);
547 	status_t status = thread_block();
548 
549 	// Before going to lock again, we need to make sure no one tries to
550 	// unblock us. Otherwise that would screw with mutex_lock().
551 	request.SetNotified(true);
552 
553 	mutex_lock(&fRequestLock);
554 
555 	return status;
556 }
557 
558 
559 void
560 Inode::NotifyBytesRead(size_t bytes)
561 {
562 	// notify writer, if something can be written now
563 	size_t writable = fBuffer.Writable();
564 	if (bytes > 0) {
565 		// notify select()ors only, if nothing was writable before
566 		if (writable == bytes) {
567 			if (fWriteSelectSyncPool)
568 				notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
569 		}
570 
571 		// If any of the waiting writers has a minimal write count that has
572 		// now become satisfied, we notify all of them (condition variables
573 		// don't support doing that selectively).
574 		WriteRequest* request;
575 		WriteRequestList::Iterator iterator = fWriteRequests.GetIterator();
576 		while ((request = iterator.Next()) != NULL) {
577 			size_t minWriteCount = request->MinimalWriteCount();
578 			if (minWriteCount > 0 && minWriteCount <= writable
579 					&& minWriteCount > writable - bytes) {
580 				fWriteCondition.NotifyAll();
581 				break;
582 			}
583 		}
584 	}
585 }
586 
587 
588 void
589 Inode::NotifyReadDone()
590 {
591 	// notify next reader, if there's still something to be read
592 	if (fBuffer.Readable() > 0) {
593 		if (ReadRequest* request = fReadRequests.First())
594 			request->Notify();
595 	}
596 }
597 
598 
599 void
600 Inode::NotifyBytesWritten(size_t bytes)
601 {
602 	// notify reader, if something can be read now
603 	if (bytes > 0 && fBuffer.Readable() == bytes) {
604 		if (fReadSelectSyncPool)
605 			notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
606 
607 		if (ReadRequest* request = fReadRequests.First())
608 			request->Notify();
609 	}
610 }
611 
612 
613 void
614 Inode::NotifyEndClosed(bool writer)
615 {
616 	TRACE("Inode %p::%s(%s)\n", this, __FUNCTION__,
617 		writer ? "writer" : "reader");
618 
619 	if (writer) {
620 		// Our last writer has been closed; if the pipe
621 		// contains no data, unlock all waiting readers
622 		TRACE("  buffer readable: %zu\n", fBuffer.Readable());
623 		if (fBuffer.Readable() == 0) {
624 			ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
625 			while (ReadRequest* request = iterator.Next())
626 				request->Notify();
627 
628 			if (fReadSelectSyncPool)
629 				notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
630 		}
631 	} else {
632 		// Last reader is gone. Wake up all writers.
633 		fWriteCondition.NotifyAll();
634 
635 		if (fWriteSelectSyncPool) {
636 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
637 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_ERROR);
638 		}
639 	}
640 }
641 
642 
643 void
644 Inode::Open(int openMode)
645 {
646 	MutexLocker locker(RequestLock());
647 
648 	if ((openMode & O_ACCMODE) == O_WRONLY)
649 		fWriterCount++;
650 
651 	if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR)
652 		fReaderCount++;
653 
654 	if (fReaderCount > 0 && fWriterCount > 0) {
655 		TRACE("Inode %p::Open(): fifo becomes active\n", this);
656 		fBuffer.CreateBuffer();
657 		fActive = true;
658 
659 		// notify all waiting writers that they can start
660 		if (fWriteSelectSyncPool)
661 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
662 		fWriteCondition.NotifyAll();
663 	}
664 }
665 
666 
667 void
668 Inode::Close(file_cookie* cookie)
669 {
670 	TRACE("Inode %p::Close(openMode = %d)\n", this, openMode);
671 
672 	MutexLocker locker(RequestLock());
673 
674 	int openMode = cookie->open_mode;
675 
676 	// Notify all currently reading file descriptors
677 	ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
678 	while (ReadRequest* request = iterator.Next()) {
679 		if (request->Cookie() == cookie)
680 			request->Notify(B_FILE_ERROR);
681 	}
682 
683 	if ((openMode & O_ACCMODE) == O_WRONLY && --fWriterCount == 0)
684 		NotifyEndClosed(true);
685 
686 	if ((openMode & O_ACCMODE) == O_RDONLY
687 		|| (openMode & O_ACCMODE) == O_RDWR) {
688 		if (--fReaderCount == 0)
689 			NotifyEndClosed(false);
690 	}
691 
692 	if (fWriterCount == 0) {
693 		// Notify any still reading writers to stop
694 		// TODO: This only works reliable if there is only one writer - we could
695 		// do the same thing done for the read requests.
696 		fWriteCondition.NotifyAll(B_FILE_ERROR);
697 	}
698 
699 	if (fReaderCount == 0 && fWriterCount == 0) {
700 		fActive = false;
701 		fBuffer.DeleteBuffer();
702 	}
703 }
704 
705 
706 status_t
707 Inode::Select(uint8 event, selectsync* sync, int openMode)
708 {
709 	bool writer = true;
710 	select_sync_pool** pool;
711 	if ((openMode & O_RWMASK) == O_RDONLY) {
712 		pool = &fReadSelectSyncPool;
713 		writer = false;
714 	} else if ((openMode & O_RWMASK) == O_WRONLY) {
715 		pool = &fWriteSelectSyncPool;
716 	} else
717 		return B_NOT_ALLOWED;
718 
719 	if (add_select_sync_pool_entry(pool, sync, event) != B_OK)
720 		return B_ERROR;
721 
722 	// signal right away, if the condition holds already
723 	if (writer) {
724 		if ((event == B_SELECT_WRITE
725 				&& (fBuffer.Writable() > 0 || fReaderCount == 0))
726 			|| (event == B_SELECT_ERROR && fReaderCount == 0)) {
727 			return notify_select_event(sync, event);
728 		}
729 	} else {
730 		if (event == B_SELECT_READ
731 				&& (fBuffer.Readable() > 0 || fWriterCount == 0)) {
732 			return notify_select_event(sync, event);
733 		}
734 	}
735 
736 	return B_OK;
737 }
738 
739 
740 status_t
741 Inode::Deselect(uint8 event, selectsync* sync, int openMode)
742 {
743 	select_sync_pool** pool;
744 	if ((openMode & O_RWMASK) == O_RDONLY) {
745 		pool = &fReadSelectSyncPool;
746 	} else if ((openMode & O_RWMASK) == O_WRONLY) {
747 		pool = &fWriteSelectSyncPool;
748 	} else
749 		return B_NOT_ALLOWED;
750 
751 	remove_select_sync_pool_entry(pool, sync, event);
752 	return B_OK;
753 }
754 
755 
756 void
757 Inode::Dump(bool dumpData) const
758 {
759 	kprintf("FIFO %p\n", this);
760 	kprintf("  active:        %s\n", fActive ? "true" : "false");
761 	kprintf("  readers:       %" B_PRId32 "\n", fReaderCount);
762 	kprintf("  writers:       %" B_PRId32 "\n", fWriterCount);
763 
764 	if (!fReadRequests.IsEmpty()) {
765 		kprintf(" pending readers:\n");
766 		for (ReadRequestList::ConstIterator it = fReadRequests.GetIterator();
767 			ReadRequest* request = it.Next();) {
768 			kprintf("    %p: thread %" B_PRId32 ", cookie: %p\n", request,
769 				request->GetThread()->id, request->Cookie());
770 		}
771 	}
772 
773 	if (!fWriteRequests.IsEmpty()) {
774 		kprintf(" pending writers:\n");
775 		for (WriteRequestList::ConstIterator it = fWriteRequests.GetIterator();
776 			WriteRequest* request = it.Next();) {
777 			kprintf("    %p:  thread %" B_PRId32 ", min count: %zu\n", request,
778 				request->GetThread()->id, request->MinimalWriteCount());
779 		}
780 	}
781 
782 	kprintf("  %zu bytes buffered\n", fBuffer.Readable());
783 
784 	if (dumpData && fBuffer.Readable() > 0) {
785 		struct DataProvider : BKernel::HexDumpDataProvider {
786 			DataProvider(const RingBuffer& buffer)
787 				:
788 				fBuffer(buffer),
789 				fOffset(0)
790 			{
791 			}
792 
793 			virtual bool HasMoreData() const
794 			{
795 				return fOffset < fBuffer.Readable();
796 			}
797 
798 			virtual uint8 NextByte()
799 			{
800 				uint8 byte = '\0';
801 				if (fOffset < fBuffer.Readable()) {
802 					fBuffer.Peek(fOffset, &byte, 1);
803 					fOffset++;
804 				}
805 				return byte;
806 			}
807 
808 			virtual bool GetAddressString(char* buffer, size_t bufferSize) const
809 			{
810 				snprintf(buffer, bufferSize, "    %4zx", fOffset);
811 				return true;
812 			}
813 
814 		private:
815 			const RingBuffer&	fBuffer;
816 			size_t				fOffset;
817 		};
818 
819 		DataProvider dataProvider(fBuffer);
820 		BKernel::print_hex_dump(dataProvider, fBuffer.Readable());
821 	}
822 }
823 
824 
825 /*static*/ int
826 Inode::Dump(int argc, char** argv)
827 {
828 	bool dumpData = false;
829 	int argi = 1;
830 	if (argi < argc && strcmp(argv[argi], "-d") == 0) {
831 		dumpData = true;
832 		argi++;
833 	}
834 
835 	if (argi >= argc || argi + 2 < argc) {
836 		print_debugger_command_usage(argv[0]);
837 		return 0;
838 	}
839 
840 	Inode* node = (Inode*)parse_expression(argv[argi]);
841 	if (IS_USER_ADDRESS(node)) {
842 		kprintf("invalid FIFO address\n");
843 		return 0;
844 	}
845 
846 	node->Dump(dumpData);
847 	return 0;
848 }
849 
850 
851 //	#pragma mark - vnode API
852 
853 
854 static status_t
855 fifo_put_vnode(fs_volume* volume, fs_vnode* vnode, bool reenter)
856 {
857 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
858 	fs_vnode* superVnode = fifo->SuperVnode();
859 
860 	status_t error = B_OK;
861 	if (superVnode->ops->put_vnode != NULL)
862 		error = superVnode->ops->put_vnode(volume, superVnode, reenter);
863 
864 	delete fifo;
865 
866 	return error;
867 }
868 
869 
870 static status_t
871 fifo_remove_vnode(fs_volume* volume, fs_vnode* vnode, bool reenter)
872 {
873 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
874 	fs_vnode* superVnode = fifo->SuperVnode();
875 
876 	status_t error = B_OK;
877 	if (superVnode->ops->remove_vnode != NULL)
878 		error = superVnode->ops->remove_vnode(volume, superVnode, reenter);
879 
880 	delete fifo;
881 
882 	return error;
883 }
884 
885 
886 static status_t
887 fifo_open(fs_volume* _volume, fs_vnode* _node, int openMode,
888 	void** _cookie)
889 {
890 	Inode* inode = (Inode*)_node->private_node;
891 
892 	TRACE("fifo_open(): node = %p, openMode = %d\n", inode, openMode);
893 
894 	file_cookie* cookie = (file_cookie*)malloc(sizeof(file_cookie));
895 	if (cookie == NULL)
896 		return B_NO_MEMORY;
897 
898 	TRACE("  open cookie = %p\n", cookie);
899 	cookie->open_mode = openMode;
900 	inode->Open(openMode);
901 
902 	*_cookie = (void*)cookie;
903 
904 	return B_OK;
905 }
906 
907 
908 static status_t
909 fifo_close(fs_volume* volume, fs_vnode* vnode, void* _cookie)
910 {
911 	file_cookie* cookie = (file_cookie*)_cookie;
912 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
913 
914 	fifo->Close(cookie);
915 
916 	return B_OK;
917 }
918 
919 
920 static status_t
921 fifo_free_cookie(fs_volume* _volume, fs_vnode* _node, void* _cookie)
922 {
923 	file_cookie* cookie = (file_cookie*)_cookie;
924 
925 	TRACE("fifo_freecookie: entry vnode %p, cookie %p\n", _node, _cookie);
926 
927 	free(cookie);
928 
929 	return B_OK;
930 }
931 
932 
933 static status_t
934 fifo_fsync(fs_volume* _volume, fs_vnode* _node)
935 {
936 	return B_OK;
937 }
938 
939 
940 static status_t
941 fifo_read(fs_volume* _volume, fs_vnode* _node, void* _cookie,
942 	off_t /*pos*/, void* buffer, size_t* _length)
943 {
944 	file_cookie* cookie = (file_cookie*)_cookie;
945 	Inode* inode = (Inode*)_node->private_node;
946 
947 	TRACE("fifo_read(vnode = %p, cookie = %p, length = %lu, mode = %d)\n",
948 		inode, cookie, *_length, cookie->open_mode);
949 
950 	MutexLocker locker(inode->RequestLock());
951 
952 	if ((cookie->open_mode & O_RWMASK) != O_RDONLY)
953 		return B_NOT_ALLOWED;
954 
955 	if (inode->IsActive() && inode->WriterCount() == 0) {
956 		// as long there is no writer, and the pipe is empty,
957 		// we always just return 0 to indicate end of file
958 		if (inode->BytesAvailable() == 0) {
959 			*_length = 0;
960 			return B_OK;
961 		}
962 	}
963 
964 	// issue read request
965 
966 	ReadRequest request(cookie);
967 	inode->AddReadRequest(request);
968 
969 	TRACE("  issue read request %p\n", &request);
970 
971 	size_t length = *_length;
972 	status_t status = inode->ReadDataFromBuffer(buffer, &length,
973 		(cookie->open_mode & O_NONBLOCK) != 0, request);
974 
975 	inode->RemoveReadRequest(request);
976 	inode->NotifyReadDone();
977 
978 	TRACE("  done reading request %p, length %zu\n", &request, length);
979 
980 	if (length > 0)
981 		status = B_OK;
982 
983 	*_length = length;
984 	return status;
985 }
986 
987 
988 static status_t
989 fifo_write(fs_volume* _volume, fs_vnode* _node, void* _cookie,
990 	off_t /*pos*/, const void* buffer, size_t* _length)
991 {
992 	file_cookie* cookie = (file_cookie*)_cookie;
993 	Inode* inode = (Inode*)_node->private_node;
994 
995 	TRACE("fifo_write(vnode = %p, cookie = %p, length = %lu)\n",
996 		_node, cookie, *_length);
997 
998 	MutexLocker locker(inode->RequestLock());
999 
1000 	if ((cookie->open_mode & O_RWMASK) != O_WRONLY)
1001 		return B_NOT_ALLOWED;
1002 
1003 	size_t length = *_length;
1004 	if (length == 0)
1005 		return B_OK;
1006 
1007 	// copy data into ring buffer
1008 	status_t status = inode->WriteDataToBuffer(buffer, &length,
1009 		(cookie->open_mode & O_NONBLOCK) != 0);
1010 
1011 	if (length > 0)
1012 		status = B_OK;
1013 
1014 	*_length = length;
1015 	return status;
1016 }
1017 
1018 
1019 static status_t
1020 fifo_read_stat(fs_volume* volume, fs_vnode* vnode, struct ::stat* st)
1021 {
1022 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
1023 	fs_vnode* superVnode = fifo->SuperVnode();
1024 
1025 	if (superVnode->ops->read_stat == NULL)
1026 		return B_BAD_VALUE;
1027 
1028 	status_t error = superVnode->ops->read_stat(volume, superVnode, st);
1029 	if (error != B_OK)
1030 		return error;
1031 
1032 
1033 	MutexLocker locker(fifo->RequestLock());
1034 
1035 	st->st_size = fifo->BytesAvailable();
1036 
1037 	st->st_blksize = 4096;
1038 
1039 	// TODO: Just pass the changes to our modification time on to the super node.
1040 	st->st_atim.tv_sec = time(NULL);
1041 	st->st_atim.tv_nsec = 0;
1042 	st->st_mtim = st->st_ctim = fifo->ModificationTime();
1043 
1044 	return B_OK;
1045 }
1046 
1047 
1048 static status_t
1049 fifo_write_stat(fs_volume* volume, fs_vnode* vnode, const struct ::stat* st,
1050 	uint32 statMask)
1051 {
1052 	// we cannot change the size of anything
1053 	if ((statMask & B_STAT_SIZE) != 0)
1054 		return B_BAD_VALUE;
1055 
1056 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
1057 	fs_vnode* superVnode = fifo->SuperVnode();
1058 
1059 	if (superVnode->ops->write_stat == NULL)
1060 		return B_BAD_VALUE;
1061 
1062 	status_t error = superVnode->ops->write_stat(volume, superVnode, st,
1063 		statMask);
1064 	if (error != B_OK)
1065 		return error;
1066 
1067 	return B_OK;
1068 }
1069 
1070 
1071 static status_t
1072 fifo_ioctl(fs_volume* _volume, fs_vnode* _node, void* _cookie, uint32 op,
1073 	void* buffer, size_t length)
1074 {
1075 	file_cookie* cookie = (file_cookie*)_cookie;
1076 	Inode* inode = (Inode*)_node->private_node;
1077 
1078 	TRACE("fifo_ioctl: vnode %p, cookie %p, op %ld, buf %p, len %ld\n",
1079 		_vnode, _cookie, op, buffer, length);
1080 
1081 	switch (op) {
1082 		case FIONBIO:
1083 		{
1084 			if (buffer == NULL)
1085 				return B_BAD_VALUE;
1086 
1087 			int value;
1088 			if (is_called_via_syscall()) {
1089 				if (!IS_USER_ADDRESS(buffer)
1090 					|| user_memcpy(&value, buffer, sizeof(int)) != B_OK) {
1091 					return B_BAD_ADDRESS;
1092 				}
1093 			} else
1094 				value = *(int*)buffer;
1095 
1096 			MutexLocker locker(inode->RequestLock());
1097 			cookie->SetNonBlocking(value != 0);
1098 			return B_OK;
1099 		}
1100 
1101 		case FIONREAD:
1102 		{
1103 			if (buffer == NULL)
1104 				return B_BAD_VALUE;
1105 
1106 			MutexLocker locker(inode->RequestLock());
1107 			int available = (int)inode->BytesAvailable();
1108 			locker.Unlock();
1109 
1110 			if (is_called_via_syscall()) {
1111 				if (!IS_USER_ADDRESS(buffer)
1112 					|| user_memcpy(buffer, &available, sizeof(available))
1113 						!= B_OK) {
1114 					return B_BAD_ADDRESS;
1115 				}
1116 			} else
1117 				*(int*)buffer = available;
1118 
1119 			return B_OK;
1120 		}
1121 
1122 		case B_SET_BLOCKING_IO:
1123 		case B_SET_NONBLOCKING_IO:
1124 		{
1125 			MutexLocker locker(inode->RequestLock());
1126 			cookie->SetNonBlocking(op == B_SET_NONBLOCKING_IO);
1127 			return B_OK;
1128 		}
1129 	}
1130 
1131 	return EINVAL;
1132 }
1133 
1134 
1135 static status_t
1136 fifo_set_flags(fs_volume* _volume, fs_vnode* _node, void* _cookie,
1137 	int flags)
1138 {
1139 	Inode* inode = (Inode*)_node->private_node;
1140 	file_cookie* cookie = (file_cookie*)_cookie;
1141 
1142 	TRACE("fifo_set_flags(vnode = %p, flags = %x)\n", _vnode, flags);
1143 
1144 	MutexLocker locker(inode->RequestLock());
1145 	cookie->open_mode = (cookie->open_mode & ~(O_APPEND | O_NONBLOCK)) | flags;
1146 	return B_OK;
1147 }
1148 
1149 
1150 static status_t
1151 fifo_select(fs_volume* _volume, fs_vnode* _node, void* _cookie,
1152 	uint8 event, selectsync* sync)
1153 {
1154 	file_cookie* cookie = (file_cookie*)_cookie;
1155 
1156 	TRACE("fifo_select(vnode = %p)\n", _node);
1157 	Inode* inode = (Inode*)_node->private_node;
1158 	if (!inode)
1159 		return B_ERROR;
1160 
1161 	MutexLocker locker(inode->RequestLock());
1162 	return inode->Select(event, sync, cookie->open_mode);
1163 }
1164 
1165 
1166 static status_t
1167 fifo_deselect(fs_volume* _volume, fs_vnode* _node, void* _cookie,
1168 	uint8 event, selectsync* sync)
1169 {
1170 	file_cookie* cookie = (file_cookie*)_cookie;
1171 
1172 	TRACE("fifo_deselect(vnode = %p)\n", _node);
1173 	Inode* inode = (Inode*)_node->private_node;
1174 	if (inode == NULL)
1175 		return B_ERROR;
1176 
1177 	MutexLocker locker(inode->RequestLock());
1178 	return inode->Deselect(event, sync, cookie->open_mode);
1179 }
1180 
1181 
1182 static bool
1183 fifo_can_page(fs_volume* _volume, fs_vnode* _node, void* cookie)
1184 {
1185 	return false;
1186 }
1187 
1188 
1189 static status_t
1190 fifo_read_pages(fs_volume* _volume, fs_vnode* _node, void* cookie, off_t pos,
1191 	const iovec* vecs, size_t count, size_t* _numBytes)
1192 {
1193 	return B_NOT_ALLOWED;
1194 }
1195 
1196 
1197 static status_t
1198 fifo_write_pages(fs_volume* _volume, fs_vnode* _node, void* cookie,
1199 	off_t pos, const iovec* vecs, size_t count, size_t* _numBytes)
1200 {
1201 	return B_NOT_ALLOWED;
1202 }
1203 
1204 
1205 static status_t
1206 fifo_get_super_vnode(fs_volume* volume, fs_vnode* vnode, fs_volume* superVolume,
1207 	fs_vnode* _superVnode)
1208 {
1209 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
1210 	fs_vnode* superVnode = fifo->SuperVnode();
1211 
1212 	if (superVnode->ops->get_super_vnode != NULL) {
1213 		return superVnode->ops->get_super_vnode(volume, superVnode, superVolume,
1214 			_superVnode);
1215 	}
1216 
1217 	*_superVnode = *superVnode;
1218 
1219 	return B_OK;
1220 }
1221 
1222 
1223 static fs_vnode_ops sFIFOVnodeOps = {
1224 	NULL,	// lookup
1225 	NULL,	// get_vnode_name
1226 					// TODO: This is suboptimal! We'd need to forward the
1227 					// super node's hook, if it has got one.
1228 
1229 	&fifo_put_vnode,
1230 	&fifo_remove_vnode,
1231 
1232 	&fifo_can_page,
1233 	&fifo_read_pages,
1234 	&fifo_write_pages,
1235 
1236 	NULL,	// io()
1237 	NULL,	// cancel_io()
1238 
1239 	NULL,	// get_file_map
1240 
1241 	/* common */
1242 	&fifo_ioctl,
1243 	&fifo_set_flags,
1244 	&fifo_select,
1245 	&fifo_deselect,
1246 	&fifo_fsync,
1247 
1248 	NULL,	// fs_read_link
1249 	NULL,	// fs_symlink
1250 	NULL,	// fs_link
1251 	NULL,	// unlink
1252 	NULL,	// rename
1253 
1254 	NULL,	// fs_access()
1255 	&fifo_read_stat,
1256 	&fifo_write_stat,
1257 	NULL,
1258 
1259 	/* file */
1260 	NULL,	// create()
1261 	&fifo_open,
1262 	&fifo_close,
1263 	&fifo_free_cookie,
1264 	&fifo_read,
1265 	&fifo_write,
1266 
1267 	/* directory */
1268 	NULL,	// create_dir
1269 	NULL,	// remove_dir
1270 	NULL,	// open_dir
1271 	NULL,	// close_dir
1272 	NULL,	// free_dir_cookie
1273 	NULL,	// read_dir
1274 	NULL,	// rewind_dir
1275 
1276 	/* attribute directory operations */
1277 	NULL,	// open_attr_dir
1278 	NULL,	// close_attr_dir
1279 	NULL,	// free_attr_dir_cookie
1280 	NULL,	// read_attr_dir
1281 	NULL,	// rewind_attr_dir
1282 
1283 	/* attribute operations */
1284 	NULL,	// create_attr
1285 	NULL,	// open_attr
1286 	NULL,	// close_attr
1287 	NULL,	// free_attr_cookie
1288 	NULL,	// read_attr
1289 	NULL,	// write_attr
1290 
1291 	NULL,	// read_attr_stat
1292 	NULL,	// write_attr_stat
1293 	NULL,	// rename_attr
1294 	NULL,	// remove_attr
1295 
1296 	/* support for node and FS layers */
1297 	NULL,	// create_special_node
1298 	&fifo_get_super_vnode,
1299 };
1300 
1301 
1302 }	// namespace fifo
1303 
1304 
1305 using namespace fifo;
1306 
1307 
1308 // #pragma mark -
1309 
1310 
1311 status_t
1312 create_fifo_vnode(fs_volume* superVolume, fs_vnode* vnode)
1313 {
1314 	FIFOInode* fifo = new(std::nothrow) FIFOInode(vnode);
1315 	if (fifo == NULL)
1316 		return B_NO_MEMORY;
1317 
1318 	status_t status = fifo->InitCheck();
1319 	if (status != B_OK) {
1320 		delete fifo;
1321 		return status;
1322 	}
1323 
1324 	vnode->private_node = fifo;
1325 	vnode->ops = &sFIFOVnodeOps;
1326 
1327 	return B_OK;
1328 }
1329 
1330 
1331 void
1332 fifo_init()
1333 {
1334 	add_debugger_command_etc("fifo", &Inode::Dump,
1335 		"Print info about the specified FIFO node",
1336 		"[ \"-d\" ] <address>\n"
1337 		"Prints information about the FIFO node specified by address\n"
1338 		"<address>. If \"-d\" is given, the data in the FIFO's ring buffer\n"
1339 		"hexdumped as well.\n",
1340 		0);
1341 }
1342