xref: /haiku/src/system/kernel/fs/fifo.cpp (revision de5c16f10ad133de6e8f864e3bbb16679d6ad900)
1 /*
2  * Copyright 2007-2013, Ingo Weinhold, ingo_weinhold@gmx.de.
3  * Copyright 2003-2010, Axel Dörfler, axeld@pinc-software.de.
4  * Distributed under the terms of the MIT License.
5  */
6 
7 
8 #include "fifo.h"
9 
10 #include <limits.h>
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <string.h>
14 #include <sys/ioctl.h>
15 #include <sys/stat.h>
16 
17 #include <new>
18 
19 #include <KernelExport.h>
20 #include <NodeMonitor.h>
21 #include <Select.h>
22 
23 #include <condition_variable.h>
24 #include <debug_hex_dump.h>
25 #include <khash.h>
26 #include <lock.h>
27 #include <select_sync_pool.h>
28 #include <syscall_restart.h>
29 #include <team.h>
30 #include <thread.h>
31 #include <util/DoublyLinkedList.h>
32 #include <util/AutoLock.h>
33 #include <util/ring_buffer.h>
34 #include <vfs.h>
35 #include <vfs_defs.h>
36 #include <vm/vm.h>
37 
38 
39 //#define TRACE_FIFO
40 #ifdef TRACE_FIFO
41 #	define TRACE(x...) dprintf(x)
42 #else
43 #	define TRACE(x...)
44 #endif
45 
46 
47 #define PIPEFS_HASH_SIZE		16
48 
49 
50 namespace fifo {
51 
52 
53 struct file_cookie;
54 class Inode;
55 
56 
57 class RingBuffer {
58 public:
59 								RingBuffer();
60 								~RingBuffer();
61 
62 			status_t			CreateBuffer();
63 			void				DeleteBuffer();
64 
65 			ssize_t				Write(const void* buffer, size_t length,
66 									bool isUser);
67 			ssize_t				Read(void* buffer, size_t length, bool isUser);
68 			ssize_t				Peek(size_t offset, void* buffer,
69 									size_t length) const;
70 
71 			size_t				Readable() const;
72 			size_t				Writable() const;
73 
74 private:
75 			struct ring_buffer*	fBuffer;
76 };
77 
78 
79 class ReadRequest : public DoublyLinkedListLinkImpl<ReadRequest> {
80 public:
81 	ReadRequest(file_cookie* cookie)
82 		:
83 		fThread(thread_get_current_thread()),
84 		fCookie(cookie),
85 		fNotified(true)
86 	{
87 		B_INITIALIZE_SPINLOCK(&fLock);
88 	}
89 
90 	void SetNotified(bool notified)
91 	{
92 		InterruptsSpinLocker _(fLock);
93 		fNotified = notified;
94 	}
95 
96 	void Notify(status_t status = B_OK)
97 	{
98 		InterruptsSpinLocker _(fLock);
99 		TRACE("ReadRequest %p::Notify(), fNotified %d\n", this, fNotified);
100 
101 		if (!fNotified) {
102 			SpinLocker schedulerLocker(gSchedulerLock);
103 			thread_unblock_locked(fThread, status);
104 			fNotified = true;
105 		}
106 	}
107 
108 	Thread* GetThread() const
109 	{
110 		return fThread;
111 	}
112 
113 	file_cookie* Cookie() const
114 	{
115 		return fCookie;
116 	}
117 
118 private:
119 	spinlock		fLock;
120 	Thread*			fThread;
121 	file_cookie*	fCookie;
122 	volatile bool	fNotified;
123 };
124 
125 
126 class WriteRequest : public DoublyLinkedListLinkImpl<WriteRequest> {
127 public:
128 	WriteRequest(Thread* thread, size_t minimalWriteCount)
129 		:
130 		fThread(thread),
131 		fMinimalWriteCount(minimalWriteCount)
132 	{
133 	}
134 
135 	Thread* GetThread() const
136 	{
137 		return fThread;
138 	}
139 
140 	size_t MinimalWriteCount() const
141 	{
142 		return fMinimalWriteCount;
143 	}
144 
145 private:
146 	Thread*	fThread;
147 	size_t	fMinimalWriteCount;
148 };
149 
150 
151 typedef DoublyLinkedList<ReadRequest> ReadRequestList;
152 typedef DoublyLinkedList<WriteRequest> WriteRequestList;
153 
154 
155 class Inode {
156 public:
157 								Inode();
158 								~Inode();
159 
160 			status_t			InitCheck();
161 
162 			bool				IsActive() const { return fActive; }
163 			timespec			CreationTime() const { return fCreationTime; }
164 			void				SetCreationTime(timespec creationTime)
165 									{ fCreationTime = creationTime; }
166 			timespec			ModificationTime() const
167 									{ return fModificationTime; }
168 			void				SetModificationTime(timespec modificationTime)
169 									{ fModificationTime = modificationTime; }
170 
171 			mutex*				RequestLock() { return &fRequestLock; }
172 
173 			status_t			WriteDataToBuffer(const void* data,
174 									size_t* _length, bool nonBlocking,
175 									bool isUser);
176 			status_t			ReadDataFromBuffer(void* data, size_t* _length,
177 									bool nonBlocking, bool isUser,
178 									ReadRequest& request);
179 			size_t				BytesAvailable() const
180 									{ return fBuffer.Readable(); }
181 			size_t				BytesWritable() const
182 									{ return fBuffer.Writable(); }
183 
184 			void				AddReadRequest(ReadRequest& request);
185 			void				RemoveReadRequest(ReadRequest& request);
186 			status_t			WaitForReadRequest(ReadRequest& request);
187 
188 			void				NotifyBytesRead(size_t bytes);
189 			void				NotifyReadDone();
190 			void				NotifyBytesWritten(size_t bytes);
191 			void				NotifyEndClosed(bool writer);
192 
193 			void				Open(int openMode);
194 			void				Close(file_cookie* cookie);
195 			int32				ReaderCount() const { return fReaderCount; }
196 			int32				WriterCount() const { return fWriterCount; }
197 
198 			status_t			Select(uint8 event, selectsync* sync,
199 									int openMode);
200 			status_t			Deselect(uint8 event, selectsync* sync,
201 									int openMode);
202 
203 			void				Dump(bool dumpData) const;
204 	static	int					Dump(int argc, char** argv);
205 
206 private:
207 			timespec			fCreationTime;
208 			timespec			fModificationTime;
209 
210 			RingBuffer			fBuffer;
211 
212 			ReadRequestList		fReadRequests;
213 			WriteRequestList	fWriteRequests;
214 
215 			mutex				fRequestLock;
216 
217 			ConditionVariable	fWriteCondition;
218 
219 			int32				fReaderCount;
220 			int32				fWriterCount;
221 			bool				fActive;
222 
223 			select_sync_pool*	fReadSelectSyncPool;
224 			select_sync_pool*	fWriteSelectSyncPool;
225 };
226 
227 
228 class FIFOInode : public Inode {
229 public:
230 	FIFOInode(fs_vnode* vnode)
231 		:
232 		Inode(),
233 		fSuperVnode(*vnode)
234 	{
235 	}
236 
237 	fs_vnode*	SuperVnode() { return &fSuperVnode; }
238 
239 private:
240 	fs_vnode	fSuperVnode;
241 };
242 
243 
244 struct file_cookie {
245 	int	open_mode;
246 			// guarded by Inode::fRequestLock
247 
248 	void SetNonBlocking(bool nonBlocking)
249 	{
250 		if (nonBlocking)
251 			open_mode |= O_NONBLOCK;
252 		else
253 			open_mode &= ~(int)O_NONBLOCK;
254 	}
255 };
256 
257 
258 // #pragma mark -
259 
260 
261 RingBuffer::RingBuffer()
262 	:
263 	fBuffer(NULL)
264 {
265 }
266 
267 
268 RingBuffer::~RingBuffer()
269 {
270 	DeleteBuffer();
271 }
272 
273 
274 status_t
275 RingBuffer::CreateBuffer()
276 {
277 	if (fBuffer != NULL)
278 		return B_OK;
279 
280 	fBuffer = create_ring_buffer(VFS_FIFO_BUFFER_CAPACITY);
281 	return fBuffer != NULL ? B_OK : B_NO_MEMORY;
282 }
283 
284 
285 void
286 RingBuffer::DeleteBuffer()
287 {
288 	if (fBuffer != NULL) {
289 		delete_ring_buffer(fBuffer);
290 		fBuffer = NULL;
291 	}
292 }
293 
294 
295 inline ssize_t
296 RingBuffer::Write(const void* buffer, size_t length, bool isUser)
297 {
298 	if (fBuffer == NULL)
299 		return B_NO_MEMORY;
300 	if (isUser && !IS_USER_ADDRESS(buffer))
301 		return B_BAD_ADDRESS;
302 
303 	return isUser
304 		? ring_buffer_user_write(fBuffer, (const uint8*)buffer, length)
305 		: ring_buffer_write(fBuffer, (const uint8*)buffer, length);
306 }
307 
308 
309 inline ssize_t
310 RingBuffer::Read(void* buffer, size_t length, bool isUser)
311 {
312 	if (fBuffer == NULL)
313 		return B_NO_MEMORY;
314 	if (isUser && !IS_USER_ADDRESS(buffer))
315 		return B_BAD_ADDRESS;
316 
317 	return isUser
318 		? ring_buffer_user_read(fBuffer, (uint8*)buffer, length)
319 		: ring_buffer_read(fBuffer, (uint8*)buffer, length);
320 }
321 
322 
323 inline ssize_t
324 RingBuffer::Peek(size_t offset, void* buffer, size_t length) const
325 {
326 	if (fBuffer == NULL)
327 		return B_NO_MEMORY;
328 
329 	return ring_buffer_peek(fBuffer, offset, (uint8*)buffer, length);
330 }
331 
332 
333 inline size_t
334 RingBuffer::Readable() const
335 {
336 	return fBuffer != NULL ? ring_buffer_readable(fBuffer) : 0;
337 }
338 
339 
340 inline size_t
341 RingBuffer::Writable() const
342 {
343 	return fBuffer != NULL ? ring_buffer_writable(fBuffer) : 0;
344 }
345 
346 
347 //	#pragma mark -
348 
349 
350 Inode::Inode()
351 	:
352 	fReadRequests(),
353 	fWriteRequests(),
354 	fReaderCount(0),
355 	fWriterCount(0),
356 	fActive(false),
357 	fReadSelectSyncPool(NULL),
358 	fWriteSelectSyncPool(NULL)
359 {
360 	fWriteCondition.Publish(this, "pipe");
361 	mutex_init(&fRequestLock, "pipe request");
362 
363 	bigtime_t time = real_time_clock();
364 	fModificationTime.tv_sec = time / 1000000;
365 	fModificationTime.tv_nsec = (time % 1000000) * 1000;
366 	fCreationTime = fModificationTime;
367 }
368 
369 
370 Inode::~Inode()
371 {
372 	fWriteCondition.Unpublish();
373 	mutex_destroy(&fRequestLock);
374 }
375 
376 
377 status_t
378 Inode::InitCheck()
379 {
380 	return B_OK;
381 }
382 
383 
384 /*!	Writes the specified data bytes to the inode's ring buffer. The
385 	request lock must be held when calling this method.
386 	Notifies readers if necessary, so that blocking readers will get started.
387 	Returns B_OK for success, B_BAD_ADDRESS if copying from the buffer failed,
388 	and various semaphore errors (like B_WOULD_BLOCK in non-blocking mode). If
389 	the returned length is > 0, the returned error code can be ignored.
390 */
391 status_t
392 Inode::WriteDataToBuffer(const void* _data, size_t* _length, bool nonBlocking,
393 	bool isUser)
394 {
395 	const uint8* data = (const uint8*)_data;
396 	size_t dataSize = *_length;
397 	size_t& written = *_length;
398 	written = 0;
399 
400 	TRACE("Inode %p::WriteDataToBuffer(data = %p, bytes = %zu)\n", this, data,
401 		dataSize);
402 
403 	// A request up to VFS_FIFO_ATOMIC_WRITE_SIZE bytes shall not be
404 	// interleaved with other writer's data.
405 	size_t minToWrite = 1;
406 	if (dataSize <= VFS_FIFO_ATOMIC_WRITE_SIZE)
407 		minToWrite = dataSize;
408 
409 	while (dataSize > 0) {
410 		// Wait until enough space in the buffer is available.
411 		while (!fActive
412 				|| (fBuffer.Writable() < minToWrite && fReaderCount > 0)) {
413 			if (nonBlocking)
414 				return B_WOULD_BLOCK;
415 
416 			ConditionVariableEntry entry;
417 			entry.Add(this);
418 
419 			WriteRequest request(thread_get_current_thread(), minToWrite);
420 			fWriteRequests.Add(&request);
421 
422 			mutex_unlock(&fRequestLock);
423 			status_t status = entry.Wait(B_CAN_INTERRUPT);
424 			mutex_lock(&fRequestLock);
425 
426 			fWriteRequests.Remove(&request);
427 
428 			if (status != B_OK)
429 				return status;
430 		}
431 
432 		// write only as long as there are readers left
433 		if (fActive && fReaderCount == 0) {
434 			if (written == 0)
435 				send_signal(find_thread(NULL), SIGPIPE);
436 			return EPIPE;
437 		}
438 
439 		// write as much as we can
440 
441 		size_t toWrite = (fActive ? fBuffer.Writable() : 0);
442 		if (toWrite > dataSize)
443 			toWrite = dataSize;
444 
445 		if (toWrite > 0) {
446 			ssize_t bytesWritten = fBuffer.Write(data, toWrite, isUser);
447 			if (bytesWritten < 0)
448 				return bytesWritten;
449 		}
450 
451 		data += toWrite;
452 		dataSize -= toWrite;
453 		written += toWrite;
454 
455 		NotifyBytesWritten(toWrite);
456 	}
457 
458 	return B_OK;
459 }
460 
461 
462 status_t
463 Inode::ReadDataFromBuffer(void* data, size_t* _length, bool nonBlocking,
464 	bool isUser, ReadRequest& request)
465 {
466 	size_t dataSize = *_length;
467 	*_length = 0;
468 
469 	// wait until our request is first in queue
470 	status_t error;
471 	if (fReadRequests.Head() != &request) {
472 		if (nonBlocking)
473 			return B_WOULD_BLOCK;
474 
475 		TRACE("Inode %p::%s(): wait for request %p to become the first "
476 			"request.\n", this, __FUNCTION__, &request);
477 
478 		error = WaitForReadRequest(request);
479 		if (error != B_OK)
480 			return error;
481 	}
482 
483 	// wait until data are available
484 	while (fBuffer.Readable() == 0) {
485 		if (nonBlocking)
486 			return B_WOULD_BLOCK;
487 
488 		if (fActive && fWriterCount == 0)
489 			return B_OK;
490 
491 		TRACE("Inode %p::%s(): wait for data, request %p\n", this, __FUNCTION__,
492 			&request);
493 
494 		error = WaitForReadRequest(request);
495 		if (error != B_OK)
496 			return error;
497 	}
498 
499 	// read as much as we can
500 	size_t toRead = fBuffer.Readable();
501 	if (toRead > dataSize)
502 		toRead = dataSize;
503 
504 	ssize_t bytesRead = fBuffer.Read(data, toRead, isUser);
505 	if (bytesRead < 0)
506 		return bytesRead;
507 
508 	NotifyBytesRead(toRead);
509 
510 	*_length = toRead;
511 
512 	return B_OK;
513 }
514 
515 
516 void
517 Inode::AddReadRequest(ReadRequest& request)
518 {
519 	fReadRequests.Add(&request);
520 }
521 
522 
523 void
524 Inode::RemoveReadRequest(ReadRequest& request)
525 {
526 	fReadRequests.Remove(&request);
527 }
528 
529 
530 status_t
531 Inode::WaitForReadRequest(ReadRequest& request)
532 {
533 	// add the entry to wait on
534 	thread_prepare_to_block(thread_get_current_thread(), B_CAN_INTERRUPT,
535 		THREAD_BLOCK_TYPE_OTHER, "fifo read request");
536 
537 	request.SetNotified(false);
538 
539 	// wait
540 	mutex_unlock(&fRequestLock);
541 	status_t status = thread_block();
542 
543 	// Before going to lock again, we need to make sure no one tries to
544 	// unblock us. Otherwise that would screw with mutex_lock().
545 	request.SetNotified(true);
546 
547 	mutex_lock(&fRequestLock);
548 
549 	return status;
550 }
551 
552 
553 void
554 Inode::NotifyBytesRead(size_t bytes)
555 {
556 	// notify writer, if something can be written now
557 	size_t writable = fBuffer.Writable();
558 	if (bytes > 0) {
559 		// notify select()ors only, if nothing was writable before
560 		if (writable == bytes) {
561 			if (fWriteSelectSyncPool)
562 				notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
563 		}
564 
565 		// If any of the waiting writers has a minimal write count that has
566 		// now become satisfied, we notify all of them (condition variables
567 		// don't support doing that selectively).
568 		WriteRequest* request;
569 		WriteRequestList::Iterator iterator = fWriteRequests.GetIterator();
570 		while ((request = iterator.Next()) != NULL) {
571 			size_t minWriteCount = request->MinimalWriteCount();
572 			if (minWriteCount > 0 && minWriteCount <= writable
573 					&& minWriteCount > writable - bytes) {
574 				fWriteCondition.NotifyAll();
575 				break;
576 			}
577 		}
578 	}
579 }
580 
581 
582 void
583 Inode::NotifyReadDone()
584 {
585 	// notify next reader, if there's still something to be read
586 	if (fBuffer.Readable() > 0) {
587 		if (ReadRequest* request = fReadRequests.First())
588 			request->Notify();
589 	}
590 }
591 
592 
593 void
594 Inode::NotifyBytesWritten(size_t bytes)
595 {
596 	// notify reader, if something can be read now
597 	if (bytes > 0 && fBuffer.Readable() == bytes) {
598 		if (fReadSelectSyncPool)
599 			notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
600 
601 		if (ReadRequest* request = fReadRequests.First())
602 			request->Notify();
603 	}
604 }
605 
606 
607 void
608 Inode::NotifyEndClosed(bool writer)
609 {
610 	TRACE("Inode %p::%s(%s)\n", this, __FUNCTION__,
611 		writer ? "writer" : "reader");
612 
613 	if (writer) {
614 		// Our last writer has been closed; if the pipe
615 		// contains no data, unlock all waiting readers
616 		TRACE("  buffer readable: %zu\n", fBuffer.Readable());
617 		if (fBuffer.Readable() == 0) {
618 			ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
619 			while (ReadRequest* request = iterator.Next())
620 				request->Notify();
621 
622 			if (fReadSelectSyncPool)
623 				notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
624 		}
625 	} else {
626 		// Last reader is gone. Wake up all writers.
627 		fWriteCondition.NotifyAll();
628 
629 		if (fWriteSelectSyncPool) {
630 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
631 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_ERROR);
632 		}
633 	}
634 }
635 
636 
637 void
638 Inode::Open(int openMode)
639 {
640 	MutexLocker locker(RequestLock());
641 
642 	if ((openMode & O_ACCMODE) == O_WRONLY)
643 		fWriterCount++;
644 
645 	if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR)
646 		fReaderCount++;
647 
648 	if (fReaderCount > 0 && fWriterCount > 0) {
649 		TRACE("Inode %p::Open(): fifo becomes active\n", this);
650 		fBuffer.CreateBuffer();
651 		fActive = true;
652 
653 		// notify all waiting writers that they can start
654 		if (fWriteSelectSyncPool)
655 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
656 		fWriteCondition.NotifyAll();
657 	}
658 }
659 
660 
661 void
662 Inode::Close(file_cookie* cookie)
663 {
664 	TRACE("Inode %p::Close(openMode = %d)\n", this, openMode);
665 
666 	MutexLocker locker(RequestLock());
667 
668 	int openMode = cookie->open_mode;
669 
670 	// Notify all currently reading file descriptors
671 	ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
672 	while (ReadRequest* request = iterator.Next()) {
673 		if (request->Cookie() == cookie)
674 			request->Notify(B_FILE_ERROR);
675 	}
676 
677 	if ((openMode & O_ACCMODE) == O_WRONLY && --fWriterCount == 0)
678 		NotifyEndClosed(true);
679 
680 	if ((openMode & O_ACCMODE) == O_RDONLY
681 		|| (openMode & O_ACCMODE) == O_RDWR) {
682 		if (--fReaderCount == 0)
683 			NotifyEndClosed(false);
684 	}
685 
686 	if (fWriterCount == 0) {
687 		// Notify any still reading writers to stop
688 		// TODO: This only works reliable if there is only one writer - we could
689 		// do the same thing done for the read requests.
690 		fWriteCondition.NotifyAll(B_FILE_ERROR);
691 	}
692 
693 	if (fReaderCount == 0 && fWriterCount == 0) {
694 		fActive = false;
695 		fBuffer.DeleteBuffer();
696 	}
697 }
698 
699 
700 status_t
701 Inode::Select(uint8 event, selectsync* sync, int openMode)
702 {
703 	bool writer = true;
704 	select_sync_pool** pool;
705 	if ((openMode & O_RWMASK) == O_RDONLY) {
706 		pool = &fReadSelectSyncPool;
707 		writer = false;
708 	} else if ((openMode & O_RWMASK) == O_WRONLY) {
709 		pool = &fWriteSelectSyncPool;
710 	} else
711 		return B_NOT_ALLOWED;
712 
713 	if (add_select_sync_pool_entry(pool, sync, event) != B_OK)
714 		return B_ERROR;
715 
716 	// signal right away, if the condition holds already
717 	if (writer) {
718 		if ((event == B_SELECT_WRITE
719 				&& (fBuffer.Writable() > 0 || fReaderCount == 0))
720 			|| (event == B_SELECT_ERROR && fReaderCount == 0)) {
721 			return notify_select_event(sync, event);
722 		}
723 	} else {
724 		if (event == B_SELECT_READ
725 				&& (fBuffer.Readable() > 0 || fWriterCount == 0)) {
726 			return notify_select_event(sync, event);
727 		}
728 	}
729 
730 	return B_OK;
731 }
732 
733 
734 status_t
735 Inode::Deselect(uint8 event, selectsync* sync, int openMode)
736 {
737 	select_sync_pool** pool;
738 	if ((openMode & O_RWMASK) == O_RDONLY) {
739 		pool = &fReadSelectSyncPool;
740 	} else if ((openMode & O_RWMASK) == O_WRONLY) {
741 		pool = &fWriteSelectSyncPool;
742 	} else
743 		return B_NOT_ALLOWED;
744 
745 	remove_select_sync_pool_entry(pool, sync, event);
746 	return B_OK;
747 }
748 
749 
750 void
751 Inode::Dump(bool dumpData) const
752 {
753 	kprintf("FIFO %p\n", this);
754 	kprintf("  active:        %s\n", fActive ? "true" : "false");
755 	kprintf("  readers:       %" B_PRId32 "\n", fReaderCount);
756 	kprintf("  writers:       %" B_PRId32 "\n", fWriterCount);
757 
758 	if (!fReadRequests.IsEmpty()) {
759 		kprintf(" pending readers:\n");
760 		for (ReadRequestList::ConstIterator it = fReadRequests.GetIterator();
761 			ReadRequest* request = it.Next();) {
762 			kprintf("    %p: thread %" B_PRId32 ", cookie: %p\n", request,
763 				request->GetThread()->id, request->Cookie());
764 		}
765 	}
766 
767 	if (!fWriteRequests.IsEmpty()) {
768 		kprintf(" pending writers:\n");
769 		for (WriteRequestList::ConstIterator it = fWriteRequests.GetIterator();
770 			WriteRequest* request = it.Next();) {
771 			kprintf("    %p:  thread %" B_PRId32 ", min count: %zu\n", request,
772 				request->GetThread()->id, request->MinimalWriteCount());
773 		}
774 	}
775 
776 	kprintf("  %zu bytes buffered\n", fBuffer.Readable());
777 
778 	if (dumpData && fBuffer.Readable() > 0) {
779 		struct DataProvider : BKernel::HexDumpDataProvider {
780 			DataProvider(const RingBuffer& buffer)
781 				:
782 				fBuffer(buffer),
783 				fOffset(0)
784 			{
785 			}
786 
787 			virtual bool HasMoreData() const
788 			{
789 				return fOffset < fBuffer.Readable();
790 			}
791 
792 			virtual uint8 NextByte()
793 			{
794 				uint8 byte = '\0';
795 				if (fOffset < fBuffer.Readable()) {
796 					fBuffer.Peek(fOffset, &byte, 1);
797 					fOffset++;
798 				}
799 				return byte;
800 			}
801 
802 			virtual bool GetAddressString(char* buffer, size_t bufferSize) const
803 			{
804 				snprintf(buffer, bufferSize, "    %4zx", fOffset);
805 				return true;
806 			}
807 
808 		private:
809 			const RingBuffer&	fBuffer;
810 			size_t				fOffset;
811 		};
812 
813 		DataProvider dataProvider(fBuffer);
814 		BKernel::print_hex_dump(dataProvider, fBuffer.Readable());
815 	}
816 }
817 
818 
819 /*static*/ int
820 Inode::Dump(int argc, char** argv)
821 {
822 	bool dumpData = false;
823 	int argi = 1;
824 	if (argi < argc && strcmp(argv[argi], "-d") == 0) {
825 		dumpData = true;
826 		argi++;
827 	}
828 
829 	if (argi >= argc || argi + 2 < argc) {
830 		print_debugger_command_usage(argv[0]);
831 		return 0;
832 	}
833 
834 	Inode* node = (Inode*)parse_expression(argv[argi]);
835 	if (IS_USER_ADDRESS(node)) {
836 		kprintf("invalid FIFO address\n");
837 		return 0;
838 	}
839 
840 	node->Dump(dumpData);
841 	return 0;
842 }
843 
844 
845 //	#pragma mark - vnode API
846 
847 
848 static status_t
849 fifo_put_vnode(fs_volume* volume, fs_vnode* vnode, bool reenter)
850 {
851 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
852 	fs_vnode* superVnode = fifo->SuperVnode();
853 
854 	status_t error = B_OK;
855 	if (superVnode->ops->put_vnode != NULL)
856 		error = superVnode->ops->put_vnode(volume, superVnode, reenter);
857 
858 	delete fifo;
859 
860 	return error;
861 }
862 
863 
864 static status_t
865 fifo_remove_vnode(fs_volume* volume, fs_vnode* vnode, bool reenter)
866 {
867 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
868 	fs_vnode* superVnode = fifo->SuperVnode();
869 
870 	status_t error = B_OK;
871 	if (superVnode->ops->remove_vnode != NULL)
872 		error = superVnode->ops->remove_vnode(volume, superVnode, reenter);
873 
874 	delete fifo;
875 
876 	return error;
877 }
878 
879 
880 static status_t
881 fifo_open(fs_volume* _volume, fs_vnode* _node, int openMode,
882 	void** _cookie)
883 {
884 	Inode* inode = (Inode*)_node->private_node;
885 
886 	TRACE("fifo_open(): node = %p, openMode = %d\n", inode, openMode);
887 
888 	file_cookie* cookie = (file_cookie*)malloc(sizeof(file_cookie));
889 	if (cookie == NULL)
890 		return B_NO_MEMORY;
891 
892 	TRACE("  open cookie = %p\n", cookie);
893 	cookie->open_mode = openMode;
894 	inode->Open(openMode);
895 
896 	*_cookie = (void*)cookie;
897 
898 	return B_OK;
899 }
900 
901 
902 static status_t
903 fifo_close(fs_volume* volume, fs_vnode* vnode, void* _cookie)
904 {
905 	file_cookie* cookie = (file_cookie*)_cookie;
906 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
907 
908 	fifo->Close(cookie);
909 
910 	return B_OK;
911 }
912 
913 
914 static status_t
915 fifo_free_cookie(fs_volume* _volume, fs_vnode* _node, void* _cookie)
916 {
917 	file_cookie* cookie = (file_cookie*)_cookie;
918 
919 	TRACE("fifo_freecookie: entry vnode %p, cookie %p\n", _node, _cookie);
920 
921 	free(cookie);
922 
923 	return B_OK;
924 }
925 
926 
927 static status_t
928 fifo_fsync(fs_volume* _volume, fs_vnode* _node)
929 {
930 	return B_OK;
931 }
932 
933 
934 static status_t
935 fifo_read(fs_volume* _volume, fs_vnode* _node, void* _cookie,
936 	off_t /*pos*/, void* buffer, size_t* _length)
937 {
938 	file_cookie* cookie = (file_cookie*)_cookie;
939 	Inode* inode = (Inode*)_node->private_node;
940 
941 	TRACE("fifo_read(vnode = %p, cookie = %p, length = %lu, mode = %d)\n",
942 		inode, cookie, *_length, cookie->open_mode);
943 
944 	MutexLocker locker(inode->RequestLock());
945 
946 	if ((cookie->open_mode & O_RWMASK) != O_RDONLY)
947 		return B_NOT_ALLOWED;
948 
949 	if (inode->IsActive() && inode->WriterCount() == 0) {
950 		// as long there is no writer, and the pipe is empty,
951 		// we always just return 0 to indicate end of file
952 		if (inode->BytesAvailable() == 0) {
953 			*_length = 0;
954 			return B_OK;
955 		}
956 	}
957 
958 	// issue read request
959 
960 	ReadRequest request(cookie);
961 	inode->AddReadRequest(request);
962 
963 	TRACE("  issue read request %p\n", &request);
964 
965 	size_t length = *_length;
966 	status_t status = inode->ReadDataFromBuffer(buffer, &length,
967 		(cookie->open_mode & O_NONBLOCK) != 0, is_called_via_syscall(),
968 		request);
969 
970 	inode->RemoveReadRequest(request);
971 	inode->NotifyReadDone();
972 
973 	TRACE("  done reading request %p, length %zu\n", &request, length);
974 
975 	if (length > 0)
976 		status = B_OK;
977 
978 	*_length = length;
979 	return status;
980 }
981 
982 
983 static status_t
984 fifo_write(fs_volume* _volume, fs_vnode* _node, void* _cookie,
985 	off_t /*pos*/, const void* buffer, size_t* _length)
986 {
987 	file_cookie* cookie = (file_cookie*)_cookie;
988 	Inode* inode = (Inode*)_node->private_node;
989 
990 	TRACE("fifo_write(vnode = %p, cookie = %p, length = %lu)\n",
991 		_node, cookie, *_length);
992 
993 	MutexLocker locker(inode->RequestLock());
994 
995 	if ((cookie->open_mode & O_RWMASK) != O_WRONLY)
996 		return B_NOT_ALLOWED;
997 
998 	size_t length = *_length;
999 	if (length == 0)
1000 		return B_OK;
1001 
1002 	// copy data into ring buffer
1003 	status_t status = inode->WriteDataToBuffer(buffer, &length,
1004 		(cookie->open_mode & O_NONBLOCK) != 0, is_called_via_syscall());
1005 
1006 	if (length > 0)
1007 		status = B_OK;
1008 
1009 	*_length = length;
1010 	return status;
1011 }
1012 
1013 
1014 static status_t
1015 fifo_read_stat(fs_volume* volume, fs_vnode* vnode, struct ::stat* st)
1016 {
1017 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
1018 	fs_vnode* superVnode = fifo->SuperVnode();
1019 
1020 	if (superVnode->ops->read_stat == NULL)
1021 		return B_BAD_VALUE;
1022 
1023 	status_t error = superVnode->ops->read_stat(volume, superVnode, st);
1024 	if (error != B_OK)
1025 		return error;
1026 
1027 
1028 	MutexLocker locker(fifo->RequestLock());
1029 
1030 	st->st_size = fifo->BytesAvailable();
1031 
1032 	st->st_blksize = 4096;
1033 
1034 	// TODO: Just pass the changes to our modification time on to the super node.
1035 	st->st_atim.tv_sec = time(NULL);
1036 	st->st_atim.tv_nsec = 0;
1037 	st->st_mtim = st->st_ctim = fifo->ModificationTime();
1038 
1039 	return B_OK;
1040 }
1041 
1042 
1043 static status_t
1044 fifo_write_stat(fs_volume* volume, fs_vnode* vnode, const struct ::stat* st,
1045 	uint32 statMask)
1046 {
1047 	// we cannot change the size of anything
1048 	if ((statMask & B_STAT_SIZE) != 0)
1049 		return B_BAD_VALUE;
1050 
1051 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
1052 	fs_vnode* superVnode = fifo->SuperVnode();
1053 
1054 	if (superVnode->ops->write_stat == NULL)
1055 		return B_BAD_VALUE;
1056 
1057 	status_t error = superVnode->ops->write_stat(volume, superVnode, st,
1058 		statMask);
1059 	if (error != B_OK)
1060 		return error;
1061 
1062 	return B_OK;
1063 }
1064 
1065 
1066 static status_t
1067 fifo_ioctl(fs_volume* _volume, fs_vnode* _node, void* _cookie, uint32 op,
1068 	void* buffer, size_t length)
1069 {
1070 	file_cookie* cookie = (file_cookie*)_cookie;
1071 	Inode* inode = (Inode*)_node->private_node;
1072 
1073 	TRACE("fifo_ioctl: vnode %p, cookie %p, op %ld, buf %p, len %ld\n",
1074 		_vnode, _cookie, op, buffer, length);
1075 
1076 	switch (op) {
1077 		case FIONBIO:
1078 		{
1079 			if (buffer == NULL)
1080 				return B_BAD_VALUE;
1081 
1082 			int value;
1083 			if (is_called_via_syscall()) {
1084 				if (!IS_USER_ADDRESS(buffer)
1085 					|| user_memcpy(&value, buffer, sizeof(int)) != B_OK) {
1086 					return B_BAD_ADDRESS;
1087 				}
1088 			} else
1089 				value = *(int*)buffer;
1090 
1091 			MutexLocker locker(inode->RequestLock());
1092 			cookie->SetNonBlocking(value != 0);
1093 			return B_OK;
1094 		}
1095 
1096 		case FIONREAD:
1097 		{
1098 			if (buffer == NULL)
1099 				return B_BAD_VALUE;
1100 
1101 			MutexLocker locker(inode->RequestLock());
1102 			int available = (int)inode->BytesAvailable();
1103 			locker.Unlock();
1104 
1105 			if (is_called_via_syscall()) {
1106 				if (!IS_USER_ADDRESS(buffer)
1107 					|| user_memcpy(buffer, &available, sizeof(available))
1108 						!= B_OK) {
1109 					return B_BAD_ADDRESS;
1110 				}
1111 			} else
1112 				*(int*)buffer = available;
1113 
1114 			return B_OK;
1115 		}
1116 
1117 		case B_SET_BLOCKING_IO:
1118 		case B_SET_NONBLOCKING_IO:
1119 		{
1120 			MutexLocker locker(inode->RequestLock());
1121 			cookie->SetNonBlocking(op == B_SET_NONBLOCKING_IO);
1122 			return B_OK;
1123 		}
1124 	}
1125 
1126 	return EINVAL;
1127 }
1128 
1129 
1130 static status_t
1131 fifo_set_flags(fs_volume* _volume, fs_vnode* _node, void* _cookie,
1132 	int flags)
1133 {
1134 	Inode* inode = (Inode*)_node->private_node;
1135 	file_cookie* cookie = (file_cookie*)_cookie;
1136 
1137 	TRACE("fifo_set_flags(vnode = %p, flags = %x)\n", _vnode, flags);
1138 
1139 	MutexLocker locker(inode->RequestLock());
1140 	cookie->open_mode = (cookie->open_mode & ~(O_APPEND | O_NONBLOCK)) | flags;
1141 	return B_OK;
1142 }
1143 
1144 
1145 static status_t
1146 fifo_select(fs_volume* _volume, fs_vnode* _node, void* _cookie,
1147 	uint8 event, selectsync* sync)
1148 {
1149 	file_cookie* cookie = (file_cookie*)_cookie;
1150 
1151 	TRACE("fifo_select(vnode = %p)\n", _node);
1152 	Inode* inode = (Inode*)_node->private_node;
1153 	if (!inode)
1154 		return B_ERROR;
1155 
1156 	MutexLocker locker(inode->RequestLock());
1157 	return inode->Select(event, sync, cookie->open_mode);
1158 }
1159 
1160 
1161 static status_t
1162 fifo_deselect(fs_volume* _volume, fs_vnode* _node, void* _cookie,
1163 	uint8 event, selectsync* sync)
1164 {
1165 	file_cookie* cookie = (file_cookie*)_cookie;
1166 
1167 	TRACE("fifo_deselect(vnode = %p)\n", _node);
1168 	Inode* inode = (Inode*)_node->private_node;
1169 	if (inode == NULL)
1170 		return B_ERROR;
1171 
1172 	MutexLocker locker(inode->RequestLock());
1173 	return inode->Deselect(event, sync, cookie->open_mode);
1174 }
1175 
1176 
1177 static bool
1178 fifo_can_page(fs_volume* _volume, fs_vnode* _node, void* cookie)
1179 {
1180 	return false;
1181 }
1182 
1183 
1184 static status_t
1185 fifo_read_pages(fs_volume* _volume, fs_vnode* _node, void* cookie, off_t pos,
1186 	const iovec* vecs, size_t count, size_t* _numBytes)
1187 {
1188 	return B_NOT_ALLOWED;
1189 }
1190 
1191 
1192 static status_t
1193 fifo_write_pages(fs_volume* _volume, fs_vnode* _node, void* cookie,
1194 	off_t pos, const iovec* vecs, size_t count, size_t* _numBytes)
1195 {
1196 	return B_NOT_ALLOWED;
1197 }
1198 
1199 
1200 static status_t
1201 fifo_get_super_vnode(fs_volume* volume, fs_vnode* vnode, fs_volume* superVolume,
1202 	fs_vnode* _superVnode)
1203 {
1204 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
1205 	fs_vnode* superVnode = fifo->SuperVnode();
1206 
1207 	if (superVnode->ops->get_super_vnode != NULL) {
1208 		return superVnode->ops->get_super_vnode(volume, superVnode, superVolume,
1209 			_superVnode);
1210 	}
1211 
1212 	*_superVnode = *superVnode;
1213 
1214 	return B_OK;
1215 }
1216 
1217 
1218 static fs_vnode_ops sFIFOVnodeOps = {
1219 	NULL,	// lookup
1220 	NULL,	// get_vnode_name
1221 					// TODO: This is suboptimal! We'd need to forward the
1222 					// super node's hook, if it has got one.
1223 
1224 	&fifo_put_vnode,
1225 	&fifo_remove_vnode,
1226 
1227 	&fifo_can_page,
1228 	&fifo_read_pages,
1229 	&fifo_write_pages,
1230 
1231 	NULL,	// io()
1232 	NULL,	// cancel_io()
1233 
1234 	NULL,	// get_file_map
1235 
1236 	/* common */
1237 	&fifo_ioctl,
1238 	&fifo_set_flags,
1239 	&fifo_select,
1240 	&fifo_deselect,
1241 	&fifo_fsync,
1242 
1243 	NULL,	// fs_read_link
1244 	NULL,	// fs_symlink
1245 	NULL,	// fs_link
1246 	NULL,	// unlink
1247 	NULL,	// rename
1248 
1249 	NULL,	// fs_access()
1250 	&fifo_read_stat,
1251 	&fifo_write_stat,
1252 	NULL,
1253 
1254 	/* file */
1255 	NULL,	// create()
1256 	&fifo_open,
1257 	&fifo_close,
1258 	&fifo_free_cookie,
1259 	&fifo_read,
1260 	&fifo_write,
1261 
1262 	/* directory */
1263 	NULL,	// create_dir
1264 	NULL,	// remove_dir
1265 	NULL,	// open_dir
1266 	NULL,	// close_dir
1267 	NULL,	// free_dir_cookie
1268 	NULL,	// read_dir
1269 	NULL,	// rewind_dir
1270 
1271 	/* attribute directory operations */
1272 	NULL,	// open_attr_dir
1273 	NULL,	// close_attr_dir
1274 	NULL,	// free_attr_dir_cookie
1275 	NULL,	// read_attr_dir
1276 	NULL,	// rewind_attr_dir
1277 
1278 	/* attribute operations */
1279 	NULL,	// create_attr
1280 	NULL,	// open_attr
1281 	NULL,	// close_attr
1282 	NULL,	// free_attr_cookie
1283 	NULL,	// read_attr
1284 	NULL,	// write_attr
1285 
1286 	NULL,	// read_attr_stat
1287 	NULL,	// write_attr_stat
1288 	NULL,	// rename_attr
1289 	NULL,	// remove_attr
1290 
1291 	/* support for node and FS layers */
1292 	NULL,	// create_special_node
1293 	&fifo_get_super_vnode,
1294 };
1295 
1296 
1297 }	// namespace fifo
1298 
1299 
1300 using namespace fifo;
1301 
1302 
1303 // #pragma mark -
1304 
1305 
1306 status_t
1307 create_fifo_vnode(fs_volume* superVolume, fs_vnode* vnode)
1308 {
1309 	FIFOInode* fifo = new(std::nothrow) FIFOInode(vnode);
1310 	if (fifo == NULL)
1311 		return B_NO_MEMORY;
1312 
1313 	status_t status = fifo->InitCheck();
1314 	if (status != B_OK) {
1315 		delete fifo;
1316 		return status;
1317 	}
1318 
1319 	vnode->private_node = fifo;
1320 	vnode->ops = &sFIFOVnodeOps;
1321 
1322 	return B_OK;
1323 }
1324 
1325 
1326 void
1327 fifo_init()
1328 {
1329 	add_debugger_command_etc("fifo", &Inode::Dump,
1330 		"Print info about the specified FIFO node",
1331 		"[ \"-d\" ] <address>\n"
1332 		"Prints information about the FIFO node specified by address\n"
1333 		"<address>. If \"-d\" is given, the data in the FIFO's ring buffer\n"
1334 		"hexdumped as well.\n",
1335 		0);
1336 }
1337