xref: /haiku/src/system/kernel/fs/fifo.cpp (revision 702e7b011cf52494980191097592b219b3f88c06)
1 /*
2  * Copyright 2007-2013, Ingo Weinhold, ingo_weinhold@gmx.de.
3  * Copyright 2003-2010, Axel Dörfler, axeld@pinc-software.de.
4  * Distributed under the terms of the MIT License.
5  */
6 
7 
8 #include "fifo.h"
9 
10 #include <limits.h>
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <string.h>
14 #include <sys/ioctl.h>
15 #include <sys/stat.h>
16 
17 #include <new>
18 
19 #include <KernelExport.h>
20 #include <NodeMonitor.h>
21 #include <Select.h>
22 
23 #include <condition_variable.h>
24 #include <debug_hex_dump.h>
25 #include <lock.h>
26 #include <select_sync_pool.h>
27 #include <syscall_restart.h>
28 #include <team.h>
29 #include <thread.h>
30 #include <util/DoublyLinkedList.h>
31 #include <util/AutoLock.h>
32 #include <util/ring_buffer.h>
33 #include <vfs.h>
34 #include <vfs_defs.h>
35 #include <vm/vm.h>
36 
37 
38 //#define TRACE_FIFO
39 #ifdef TRACE_FIFO
40 #	define TRACE(x...) dprintf(x)
41 #else
42 #	define TRACE(x...)
43 #endif
44 
45 
46 namespace fifo {
47 
48 
49 struct file_cookie;
50 class Inode;
51 
52 
53 class RingBuffer {
54 public:
55 								RingBuffer();
56 								~RingBuffer();
57 
58 			status_t			CreateBuffer();
59 			void				DeleteBuffer();
60 
61 			ssize_t				Write(const void* buffer, size_t length,
62 									bool isUser);
63 			ssize_t				Read(void* buffer, size_t length, bool isUser);
64 			ssize_t				Peek(size_t offset, void* buffer,
65 									size_t length) const;
66 
67 			size_t				Readable() const;
68 			size_t				Writable() const;
69 
70 private:
71 			struct ring_buffer*	fBuffer;
72 };
73 
74 
75 class ReadRequest : public DoublyLinkedListLinkImpl<ReadRequest> {
76 public:
77 	ReadRequest(file_cookie* cookie)
78 		:
79 		fThread(thread_get_current_thread()),
80 		fCookie(cookie),
81 		fNotified(true)
82 	{
83 		B_INITIALIZE_SPINLOCK(&fLock);
84 	}
85 
86 	void SetNotified(bool notified)
87 	{
88 		InterruptsSpinLocker _(fLock);
89 		fNotified = notified;
90 	}
91 
92 	void Notify(status_t status = B_OK)
93 	{
94 		InterruptsSpinLocker _(fLock);
95 		TRACE("ReadRequest %p::Notify(), fNotified %d\n", this, fNotified);
96 
97 		if (!fNotified) {
98 			thread_unblock(fThread, status);
99 			fNotified = true;
100 		}
101 	}
102 
103 	Thread* GetThread() const
104 	{
105 		return fThread;
106 	}
107 
108 	file_cookie* Cookie() const
109 	{
110 		return fCookie;
111 	}
112 
113 private:
114 	spinlock		fLock;
115 	Thread*			fThread;
116 	file_cookie*	fCookie;
117 	volatile bool	fNotified;
118 };
119 
120 
121 class WriteRequest : public DoublyLinkedListLinkImpl<WriteRequest> {
122 public:
123 	WriteRequest(Thread* thread, size_t minimalWriteCount)
124 		:
125 		fThread(thread),
126 		fMinimalWriteCount(minimalWriteCount)
127 	{
128 	}
129 
130 	Thread* GetThread() const
131 	{
132 		return fThread;
133 	}
134 
135 	size_t MinimalWriteCount() const
136 	{
137 		return fMinimalWriteCount;
138 	}
139 
140 private:
141 	Thread*	fThread;
142 	size_t	fMinimalWriteCount;
143 };
144 
145 
146 typedef DoublyLinkedList<ReadRequest> ReadRequestList;
147 typedef DoublyLinkedList<WriteRequest> WriteRequestList;
148 
149 
150 class Inode {
151 public:
152 								Inode();
153 								~Inode();
154 
155 			status_t			InitCheck();
156 
157 			bool				IsActive() const { return fActive; }
158 			timespec			CreationTime() const { return fCreationTime; }
159 			void				SetCreationTime(timespec creationTime)
160 									{ fCreationTime = creationTime; }
161 			timespec			ModificationTime() const
162 									{ return fModificationTime; }
163 			void				SetModificationTime(timespec modificationTime)
164 									{ fModificationTime = modificationTime; }
165 
166 			mutex*				RequestLock() { return &fRequestLock; }
167 
168 			status_t			WriteDataToBuffer(const void* data,
169 									size_t* _length, bool nonBlocking,
170 									bool isUser);
171 			status_t			ReadDataFromBuffer(void* data, size_t* _length,
172 									bool nonBlocking, bool isUser,
173 									ReadRequest& request);
174 			size_t				BytesAvailable() const
175 									{ return fBuffer.Readable(); }
176 			size_t				BytesWritable() const
177 									{ return fBuffer.Writable(); }
178 
179 			void				AddReadRequest(ReadRequest& request);
180 			void				RemoveReadRequest(ReadRequest& request);
181 			status_t			WaitForReadRequest(ReadRequest& request);
182 
183 			void				NotifyBytesRead(size_t bytes);
184 			void				NotifyReadDone();
185 			void				NotifyBytesWritten(size_t bytes);
186 			void				NotifyEndClosed(bool writer);
187 
188 			void				Open(int openMode);
189 			void				Close(file_cookie* cookie);
190 			int32				ReaderCount() const { return fReaderCount; }
191 			int32				WriterCount() const { return fWriterCount; }
192 
193 			status_t			Select(uint8 event, selectsync* sync,
194 									int openMode);
195 			status_t			Deselect(uint8 event, selectsync* sync,
196 									int openMode);
197 
198 			void				Dump(bool dumpData) const;
199 	static	int					Dump(int argc, char** argv);
200 
201 private:
202 			timespec			fCreationTime;
203 			timespec			fModificationTime;
204 
205 			RingBuffer			fBuffer;
206 
207 			ReadRequestList		fReadRequests;
208 			WriteRequestList	fWriteRequests;
209 
210 			mutex				fRequestLock;
211 
212 			ConditionVariable	fWriteCondition;
213 
214 			int32				fReaderCount;
215 			int32				fWriterCount;
216 			bool				fActive;
217 
218 			select_sync_pool*	fReadSelectSyncPool;
219 			select_sync_pool*	fWriteSelectSyncPool;
220 };
221 
222 
223 class FIFOInode : public Inode {
224 public:
225 	FIFOInode(fs_vnode* vnode)
226 		:
227 		Inode(),
228 		fSuperVnode(*vnode)
229 	{
230 	}
231 
232 	fs_vnode*	SuperVnode() { return &fSuperVnode; }
233 
234 private:
235 	fs_vnode	fSuperVnode;
236 };
237 
238 
239 struct file_cookie {
240 	int	open_mode;
241 			// guarded by Inode::fRequestLock
242 
243 	void SetNonBlocking(bool nonBlocking)
244 	{
245 		if (nonBlocking)
246 			open_mode |= O_NONBLOCK;
247 		else
248 			open_mode &= ~(int)O_NONBLOCK;
249 	}
250 };
251 
252 
253 // #pragma mark -
254 
255 
256 RingBuffer::RingBuffer()
257 	:
258 	fBuffer(NULL)
259 {
260 }
261 
262 
263 RingBuffer::~RingBuffer()
264 {
265 	DeleteBuffer();
266 }
267 
268 
269 status_t
270 RingBuffer::CreateBuffer()
271 {
272 	if (fBuffer != NULL)
273 		return B_OK;
274 
275 	fBuffer = create_ring_buffer(VFS_FIFO_BUFFER_CAPACITY);
276 	return fBuffer != NULL ? B_OK : B_NO_MEMORY;
277 }
278 
279 
280 void
281 RingBuffer::DeleteBuffer()
282 {
283 	if (fBuffer != NULL) {
284 		delete_ring_buffer(fBuffer);
285 		fBuffer = NULL;
286 	}
287 }
288 
289 
290 inline ssize_t
291 RingBuffer::Write(const void* buffer, size_t length, bool isUser)
292 {
293 	if (fBuffer == NULL)
294 		return B_NO_MEMORY;
295 	if (isUser && !IS_USER_ADDRESS(buffer))
296 		return B_BAD_ADDRESS;
297 
298 	return isUser
299 		? ring_buffer_user_write(fBuffer, (const uint8*)buffer, length)
300 		: ring_buffer_write(fBuffer, (const uint8*)buffer, length);
301 }
302 
303 
304 inline ssize_t
305 RingBuffer::Read(void* buffer, size_t length, bool isUser)
306 {
307 	if (fBuffer == NULL)
308 		return B_NO_MEMORY;
309 	if (isUser && !IS_USER_ADDRESS(buffer))
310 		return B_BAD_ADDRESS;
311 
312 	return isUser
313 		? ring_buffer_user_read(fBuffer, (uint8*)buffer, length)
314 		: ring_buffer_read(fBuffer, (uint8*)buffer, length);
315 }
316 
317 
318 inline ssize_t
319 RingBuffer::Peek(size_t offset, void* buffer, size_t length) const
320 {
321 	if (fBuffer == NULL)
322 		return B_NO_MEMORY;
323 
324 	return ring_buffer_peek(fBuffer, offset, (uint8*)buffer, length);
325 }
326 
327 
328 inline size_t
329 RingBuffer::Readable() const
330 {
331 	return fBuffer != NULL ? ring_buffer_readable(fBuffer) : 0;
332 }
333 
334 
335 inline size_t
336 RingBuffer::Writable() const
337 {
338 	return fBuffer != NULL ? ring_buffer_writable(fBuffer) : 0;
339 }
340 
341 
342 //	#pragma mark -
343 
344 
345 Inode::Inode()
346 	:
347 	fReadRequests(),
348 	fWriteRequests(),
349 	fReaderCount(0),
350 	fWriterCount(0),
351 	fActive(false),
352 	fReadSelectSyncPool(NULL),
353 	fWriteSelectSyncPool(NULL)
354 {
355 	fWriteCondition.Publish(this, "pipe");
356 	mutex_init(&fRequestLock, "pipe request");
357 
358 	bigtime_t time = real_time_clock();
359 	fModificationTime.tv_sec = time / 1000000;
360 	fModificationTime.tv_nsec = (time % 1000000) * 1000;
361 	fCreationTime = fModificationTime;
362 }
363 
364 
365 Inode::~Inode()
366 {
367 	fWriteCondition.Unpublish();
368 	mutex_destroy(&fRequestLock);
369 }
370 
371 
372 status_t
373 Inode::InitCheck()
374 {
375 	return B_OK;
376 }
377 
378 
379 /*!	Writes the specified data bytes to the inode's ring buffer. The
380 	request lock must be held when calling this method.
381 	Notifies readers if necessary, so that blocking readers will get started.
382 	Returns B_OK for success, B_BAD_ADDRESS if copying from the buffer failed,
383 	and various semaphore errors (like B_WOULD_BLOCK in non-blocking mode). If
384 	the returned length is > 0, the returned error code can be ignored.
385 */
386 status_t
387 Inode::WriteDataToBuffer(const void* _data, size_t* _length, bool nonBlocking,
388 	bool isUser)
389 {
390 	const uint8* data = (const uint8*)_data;
391 	size_t dataSize = *_length;
392 	size_t& written = *_length;
393 	written = 0;
394 
395 	TRACE("Inode %p::WriteDataToBuffer(data = %p, bytes = %zu)\n", this, data,
396 		dataSize);
397 
398 	// A request up to VFS_FIFO_ATOMIC_WRITE_SIZE bytes shall not be
399 	// interleaved with other writer's data.
400 	size_t minToWrite = 1;
401 	if (dataSize <= VFS_FIFO_ATOMIC_WRITE_SIZE)
402 		minToWrite = dataSize;
403 
404 	while (dataSize > 0) {
405 		// Wait until enough space in the buffer is available.
406 		while (!fActive
407 				|| (fBuffer.Writable() < minToWrite && fReaderCount > 0)) {
408 			if (nonBlocking)
409 				return B_WOULD_BLOCK;
410 
411 			ConditionVariableEntry entry;
412 			entry.Add(this);
413 
414 			WriteRequest request(thread_get_current_thread(), minToWrite);
415 			fWriteRequests.Add(&request);
416 
417 			mutex_unlock(&fRequestLock);
418 			status_t status = entry.Wait(B_CAN_INTERRUPT);
419 			mutex_lock(&fRequestLock);
420 
421 			fWriteRequests.Remove(&request);
422 
423 			if (status != B_OK)
424 				return status;
425 		}
426 
427 		// write only as long as there are readers left
428 		if (fActive && fReaderCount == 0) {
429 			if (written == 0)
430 				send_signal(find_thread(NULL), SIGPIPE);
431 			return EPIPE;
432 		}
433 
434 		// write as much as we can
435 
436 		size_t toWrite = (fActive ? fBuffer.Writable() : 0);
437 		if (toWrite > dataSize)
438 			toWrite = dataSize;
439 
440 		if (toWrite > 0) {
441 			ssize_t bytesWritten = fBuffer.Write(data, toWrite, isUser);
442 			if (bytesWritten < 0)
443 				return bytesWritten;
444 		}
445 
446 		data += toWrite;
447 		dataSize -= toWrite;
448 		written += toWrite;
449 
450 		NotifyBytesWritten(toWrite);
451 	}
452 
453 	return B_OK;
454 }
455 
456 
457 status_t
458 Inode::ReadDataFromBuffer(void* data, size_t* _length, bool nonBlocking,
459 	bool isUser, ReadRequest& request)
460 {
461 	size_t dataSize = *_length;
462 	*_length = 0;
463 
464 	// wait until our request is first in queue
465 	status_t error;
466 	if (fReadRequests.Head() != &request) {
467 		if (nonBlocking)
468 			return B_WOULD_BLOCK;
469 
470 		TRACE("Inode %p::%s(): wait for request %p to become the first "
471 			"request.\n", this, __FUNCTION__, &request);
472 
473 		error = WaitForReadRequest(request);
474 		if (error != B_OK)
475 			return error;
476 	}
477 
478 	// wait until data are available
479 	while (fBuffer.Readable() == 0) {
480 		if (nonBlocking)
481 			return B_WOULD_BLOCK;
482 
483 		if (fActive && fWriterCount == 0)
484 			return B_OK;
485 
486 		TRACE("Inode %p::%s(): wait for data, request %p\n", this, __FUNCTION__,
487 			&request);
488 
489 		error = WaitForReadRequest(request);
490 		if (error != B_OK)
491 			return error;
492 	}
493 
494 	// read as much as we can
495 	size_t toRead = fBuffer.Readable();
496 	if (toRead > dataSize)
497 		toRead = dataSize;
498 
499 	ssize_t bytesRead = fBuffer.Read(data, toRead, isUser);
500 	if (bytesRead < 0)
501 		return bytesRead;
502 
503 	NotifyBytesRead(toRead);
504 
505 	*_length = toRead;
506 
507 	return B_OK;
508 }
509 
510 
511 void
512 Inode::AddReadRequest(ReadRequest& request)
513 {
514 	fReadRequests.Add(&request);
515 }
516 
517 
518 void
519 Inode::RemoveReadRequest(ReadRequest& request)
520 {
521 	fReadRequests.Remove(&request);
522 }
523 
524 
525 status_t
526 Inode::WaitForReadRequest(ReadRequest& request)
527 {
528 	// add the entry to wait on
529 	thread_prepare_to_block(thread_get_current_thread(), B_CAN_INTERRUPT,
530 		THREAD_BLOCK_TYPE_OTHER, "fifo read request");
531 
532 	request.SetNotified(false);
533 
534 	// wait
535 	mutex_unlock(&fRequestLock);
536 	status_t status = thread_block();
537 
538 	// Before going to lock again, we need to make sure no one tries to
539 	// unblock us. Otherwise that would screw with mutex_lock().
540 	request.SetNotified(true);
541 
542 	mutex_lock(&fRequestLock);
543 
544 	return status;
545 }
546 
547 
548 void
549 Inode::NotifyBytesRead(size_t bytes)
550 {
551 	// notify writer, if something can be written now
552 	size_t writable = fBuffer.Writable();
553 	if (bytes > 0) {
554 		// notify select()ors only, if nothing was writable before
555 		if (writable == bytes) {
556 			if (fWriteSelectSyncPool)
557 				notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
558 		}
559 
560 		// If any of the waiting writers has a minimal write count that has
561 		// now become satisfied, we notify all of them (condition variables
562 		// don't support doing that selectively).
563 		WriteRequest* request;
564 		WriteRequestList::Iterator iterator = fWriteRequests.GetIterator();
565 		while ((request = iterator.Next()) != NULL) {
566 			size_t minWriteCount = request->MinimalWriteCount();
567 			if (minWriteCount > 0 && minWriteCount <= writable
568 					&& minWriteCount > writable - bytes) {
569 				fWriteCondition.NotifyAll();
570 				break;
571 			}
572 		}
573 	}
574 }
575 
576 
577 void
578 Inode::NotifyReadDone()
579 {
580 	// notify next reader, if there's still something to be read
581 	if (fBuffer.Readable() > 0) {
582 		if (ReadRequest* request = fReadRequests.First())
583 			request->Notify();
584 	}
585 }
586 
587 
588 void
589 Inode::NotifyBytesWritten(size_t bytes)
590 {
591 	// notify reader, if something can be read now
592 	if (bytes > 0 && fBuffer.Readable() == bytes) {
593 		if (fReadSelectSyncPool)
594 			notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
595 
596 		if (ReadRequest* request = fReadRequests.First())
597 			request->Notify();
598 	}
599 }
600 
601 
602 void
603 Inode::NotifyEndClosed(bool writer)
604 {
605 	TRACE("Inode %p::%s(%s)\n", this, __FUNCTION__,
606 		writer ? "writer" : "reader");
607 
608 	if (writer) {
609 		// Our last writer has been closed; if the pipe
610 		// contains no data, unlock all waiting readers
611 		TRACE("  buffer readable: %zu\n", fBuffer.Readable());
612 		if (fBuffer.Readable() == 0) {
613 			ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
614 			while (ReadRequest* request = iterator.Next())
615 				request->Notify();
616 
617 			if (fReadSelectSyncPool)
618 				notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
619 		}
620 	} else {
621 		// Last reader is gone. Wake up all writers.
622 		fWriteCondition.NotifyAll();
623 
624 		if (fWriteSelectSyncPool) {
625 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
626 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_ERROR);
627 		}
628 	}
629 }
630 
631 
632 void
633 Inode::Open(int openMode)
634 {
635 	MutexLocker locker(RequestLock());
636 
637 	if ((openMode & O_ACCMODE) == O_WRONLY || (openMode & O_ACCMODE) == O_RDWR)
638 		fWriterCount++;
639 
640 	if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR)
641 		fReaderCount++;
642 
643 	if (fReaderCount > 0 && fWriterCount > 0) {
644 		TRACE("Inode %p::Open(): fifo becomes active\n", this);
645 		fBuffer.CreateBuffer();
646 		fActive = true;
647 
648 		// notify all waiting writers that they can start
649 		if (fWriteSelectSyncPool)
650 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
651 		fWriteCondition.NotifyAll();
652 	}
653 }
654 
655 
656 void
657 Inode::Close(file_cookie* cookie)
658 {
659 
660 	MutexLocker locker(RequestLock());
661 
662 	int openMode = cookie->open_mode;
663 	TRACE("Inode %p::Close(openMode = %" B_PRId32 ")\n", this, openMode);
664 
665 	// Notify all currently reading file descriptors
666 	ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
667 	while (ReadRequest* request = iterator.Next()) {
668 		if (request->Cookie() == cookie)
669 			request->Notify(B_FILE_ERROR);
670 	}
671 
672 	if ((openMode & O_ACCMODE) == O_WRONLY || (openMode & O_ACCMODE) == O_RDWR) {
673 		if (--fWriterCount == 0)
674 			NotifyEndClosed(true);
675 	}
676 
677 	if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR) {
678 		if (--fReaderCount == 0)
679 			NotifyEndClosed(false);
680 	}
681 
682 	if (fWriterCount == 0) {
683 		// Notify any still reading writers to stop
684 		// TODO: This only works reliable if there is only one writer - we could
685 		// do the same thing done for the read requests.
686 		fWriteCondition.NotifyAll(B_FILE_ERROR);
687 	}
688 
689 	if (fReaderCount == 0 && fWriterCount == 0) {
690 		fActive = false;
691 		fBuffer.DeleteBuffer();
692 	}
693 }
694 
695 
696 status_t
697 Inode::Select(uint8 event, selectsync* sync, int openMode)
698 {
699 	bool writer = true;
700 	select_sync_pool** pool;
701 	// B_SELECT_READ can happen on write-only opened fds, so restrain B_SELECT_READ to O_RDWR
702 	if ((event == B_SELECT_READ && (openMode & O_RWMASK) == O_RDWR)
703 		|| (openMode & O_RWMASK) == O_RDONLY) {
704 		pool = &fReadSelectSyncPool;
705 		writer = false;
706 	} else if ((openMode & O_RWMASK) == O_RDWR || (openMode & O_RWMASK) == O_WRONLY) {
707 		pool = &fWriteSelectSyncPool;
708 	} else
709 		return B_NOT_ALLOWED;
710 
711 	if (add_select_sync_pool_entry(pool, sync, event) != B_OK)
712 		return B_ERROR;
713 
714 	// signal right away, if the condition holds already
715 	if (writer) {
716 		if ((event == B_SELECT_WRITE
717 				&& (fBuffer.Writable() > 0 || fReaderCount == 0))
718 			|| (event == B_SELECT_ERROR && fReaderCount == 0)) {
719 			return notify_select_event(sync, event);
720 		}
721 	} else {
722 		if (event == B_SELECT_READ
723 				&& (fBuffer.Readable() > 0 || fWriterCount == 0)) {
724 			return notify_select_event(sync, event);
725 		}
726 	}
727 
728 	return B_OK;
729 }
730 
731 
732 status_t
733 Inode::Deselect(uint8 event, selectsync* sync, int openMode)
734 {
735 	select_sync_pool** pool;
736 	if ((event == B_SELECT_READ && (openMode & O_RWMASK) == O_RDWR)
737 		|| (openMode & O_RWMASK) == O_RDONLY) {
738 		pool = &fReadSelectSyncPool;
739 	} else if ((openMode & O_RWMASK) == O_RDWR || (openMode & O_RWMASK) == O_WRONLY) {
740 		pool = &fWriteSelectSyncPool;
741 	} else
742 		return B_NOT_ALLOWED;
743 
744 	remove_select_sync_pool_entry(pool, sync, event);
745 	return B_OK;
746 }
747 
748 
749 void
750 Inode::Dump(bool dumpData) const
751 {
752 	kprintf("FIFO %p\n", this);
753 	kprintf("  active:        %s\n", fActive ? "true" : "false");
754 	kprintf("  readers:       %" B_PRId32 "\n", fReaderCount);
755 	kprintf("  writers:       %" B_PRId32 "\n", fWriterCount);
756 
757 	if (!fReadRequests.IsEmpty()) {
758 		kprintf(" pending readers:\n");
759 		for (ReadRequestList::ConstIterator it = fReadRequests.GetIterator();
760 			ReadRequest* request = it.Next();) {
761 			kprintf("    %p: thread %" B_PRId32 ", cookie: %p\n", request,
762 				request->GetThread()->id, request->Cookie());
763 		}
764 	}
765 
766 	if (!fWriteRequests.IsEmpty()) {
767 		kprintf(" pending writers:\n");
768 		for (WriteRequestList::ConstIterator it = fWriteRequests.GetIterator();
769 			WriteRequest* request = it.Next();) {
770 			kprintf("    %p:  thread %" B_PRId32 ", min count: %zu\n", request,
771 				request->GetThread()->id, request->MinimalWriteCount());
772 		}
773 	}
774 
775 	kprintf("  %zu bytes buffered\n", fBuffer.Readable());
776 
777 	if (dumpData && fBuffer.Readable() > 0) {
778 		struct DataProvider : BKernel::HexDumpDataProvider {
779 			DataProvider(const RingBuffer& buffer)
780 				:
781 				fBuffer(buffer),
782 				fOffset(0)
783 			{
784 			}
785 
786 			virtual bool HasMoreData() const
787 			{
788 				return fOffset < fBuffer.Readable();
789 			}
790 
791 			virtual uint8 NextByte()
792 			{
793 				uint8 byte = '\0';
794 				if (fOffset < fBuffer.Readable()) {
795 					fBuffer.Peek(fOffset, &byte, 1);
796 					fOffset++;
797 				}
798 				return byte;
799 			}
800 
801 			virtual bool GetAddressString(char* buffer, size_t bufferSize) const
802 			{
803 				snprintf(buffer, bufferSize, "    %4zx", fOffset);
804 				return true;
805 			}
806 
807 		private:
808 			const RingBuffer&	fBuffer;
809 			size_t				fOffset;
810 		};
811 
812 		DataProvider dataProvider(fBuffer);
813 		BKernel::print_hex_dump(dataProvider, fBuffer.Readable());
814 	}
815 }
816 
817 
818 /*static*/ int
819 Inode::Dump(int argc, char** argv)
820 {
821 	bool dumpData = false;
822 	int argi = 1;
823 	if (argi < argc && strcmp(argv[argi], "-d") == 0) {
824 		dumpData = true;
825 		argi++;
826 	}
827 
828 	if (argi >= argc || argi + 2 < argc) {
829 		print_debugger_command_usage(argv[0]);
830 		return 0;
831 	}
832 
833 	Inode* node = (Inode*)parse_expression(argv[argi]);
834 	if (IS_USER_ADDRESS(node)) {
835 		kprintf("invalid FIFO address\n");
836 		return 0;
837 	}
838 
839 	node->Dump(dumpData);
840 	return 0;
841 }
842 
843 
844 //	#pragma mark - vnode API
845 
846 
847 static status_t
848 fifo_put_vnode(fs_volume* volume, fs_vnode* vnode, bool reenter)
849 {
850 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
851 	fs_vnode* superVnode = fifo->SuperVnode();
852 
853 	status_t error = B_OK;
854 	if (superVnode->ops->put_vnode != NULL)
855 		error = superVnode->ops->put_vnode(volume, superVnode, reenter);
856 
857 	delete fifo;
858 
859 	return error;
860 }
861 
862 
863 static status_t
864 fifo_remove_vnode(fs_volume* volume, fs_vnode* vnode, bool reenter)
865 {
866 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
867 	fs_vnode* superVnode = fifo->SuperVnode();
868 
869 	status_t error = B_OK;
870 	if (superVnode->ops->remove_vnode != NULL)
871 		error = superVnode->ops->remove_vnode(volume, superVnode, reenter);
872 
873 	delete fifo;
874 
875 	return error;
876 }
877 
878 
879 static status_t
880 fifo_open(fs_volume* _volume, fs_vnode* _node, int openMode,
881 	void** _cookie)
882 {
883 	Inode* inode = (Inode*)_node->private_node;
884 
885 	TRACE("fifo_open(): node = %p, openMode = %d\n", inode, openMode);
886 
887 	file_cookie* cookie = (file_cookie*)malloc(sizeof(file_cookie));
888 	if (cookie == NULL)
889 		return B_NO_MEMORY;
890 
891 	TRACE("  open cookie = %p\n", cookie);
892 	cookie->open_mode = openMode;
893 	inode->Open(openMode);
894 
895 	*_cookie = (void*)cookie;
896 
897 	return B_OK;
898 }
899 
900 
901 static status_t
902 fifo_close(fs_volume* volume, fs_vnode* vnode, void* _cookie)
903 {
904 	file_cookie* cookie = (file_cookie*)_cookie;
905 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
906 
907 	fifo->Close(cookie);
908 
909 	return B_OK;
910 }
911 
912 
913 static status_t
914 fifo_free_cookie(fs_volume* _volume, fs_vnode* _node, void* _cookie)
915 {
916 	file_cookie* cookie = (file_cookie*)_cookie;
917 
918 	TRACE("fifo_freecookie: entry vnode %p, cookie %p\n", _node, _cookie);
919 
920 	free(cookie);
921 
922 	return B_OK;
923 }
924 
925 
926 static status_t
927 fifo_fsync(fs_volume* _volume, fs_vnode* _node)
928 {
929 	return B_BAD_VALUE;
930 }
931 
932 
933 static status_t
934 fifo_read(fs_volume* _volume, fs_vnode* _node, void* _cookie,
935 	off_t /*pos*/, void* buffer, size_t* _length)
936 {
937 	file_cookie* cookie = (file_cookie*)_cookie;
938 	Inode* inode = (Inode*)_node->private_node;
939 
940 	TRACE("fifo_read(vnode = %p, cookie = %p, length = %lu, mode = %d)\n",
941 		inode, cookie, *_length, cookie->open_mode);
942 
943 	MutexLocker locker(inode->RequestLock());
944 
945 	if (inode->IsActive() && inode->WriterCount() == 0) {
946 		// as long there is no writer, and the pipe is empty,
947 		// we always just return 0 to indicate end of file
948 		if (inode->BytesAvailable() == 0) {
949 			*_length = 0;
950 			return B_OK;
951 		}
952 	}
953 
954 	// issue read request
955 
956 	ReadRequest request(cookie);
957 	inode->AddReadRequest(request);
958 
959 	TRACE("  issue read request %p\n", &request);
960 
961 	size_t length = *_length;
962 	status_t status = inode->ReadDataFromBuffer(buffer, &length,
963 		(cookie->open_mode & O_NONBLOCK) != 0, is_called_via_syscall(),
964 		request);
965 
966 	inode->RemoveReadRequest(request);
967 	inode->NotifyReadDone();
968 
969 	TRACE("  done reading request %p, length %zu\n", &request, length);
970 
971 	if (length > 0)
972 		status = B_OK;
973 
974 	*_length = length;
975 	return status;
976 }
977 
978 
979 static status_t
980 fifo_write(fs_volume* _volume, fs_vnode* _node, void* _cookie,
981 	off_t /*pos*/, const void* buffer, size_t* _length)
982 {
983 	file_cookie* cookie = (file_cookie*)_cookie;
984 	Inode* inode = (Inode*)_node->private_node;
985 
986 	TRACE("fifo_write(vnode = %p, cookie = %p, length = %lu)\n",
987 		_node, cookie, *_length);
988 
989 	MutexLocker locker(inode->RequestLock());
990 
991 	size_t length = *_length;
992 	if (length == 0)
993 		return B_OK;
994 
995 	// copy data into ring buffer
996 	status_t status = inode->WriteDataToBuffer(buffer, &length,
997 		(cookie->open_mode & O_NONBLOCK) != 0, is_called_via_syscall());
998 
999 	if (length > 0)
1000 		status = B_OK;
1001 
1002 	*_length = length;
1003 	return status;
1004 }
1005 
1006 
1007 static status_t
1008 fifo_read_stat(fs_volume* volume, fs_vnode* vnode, struct ::stat* st)
1009 {
1010 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
1011 	fs_vnode* superVnode = fifo->SuperVnode();
1012 
1013 	if (superVnode->ops->read_stat == NULL)
1014 		return B_BAD_VALUE;
1015 
1016 	status_t error = superVnode->ops->read_stat(volume, superVnode, st);
1017 	if (error != B_OK)
1018 		return error;
1019 
1020 
1021 	MutexLocker locker(fifo->RequestLock());
1022 
1023 	st->st_size = fifo->BytesAvailable();
1024 
1025 	st->st_blksize = 4096;
1026 
1027 	// TODO: Just pass the changes to our modification time on to the super node.
1028 	st->st_atim.tv_sec = time(NULL);
1029 	st->st_atim.tv_nsec = 0;
1030 	st->st_mtim = st->st_ctim = fifo->ModificationTime();
1031 
1032 	return B_OK;
1033 }
1034 
1035 
1036 static status_t
1037 fifo_write_stat(fs_volume* volume, fs_vnode* vnode, const struct ::stat* st,
1038 	uint32 statMask)
1039 {
1040 	// we cannot change the size of anything
1041 	if ((statMask & B_STAT_SIZE) != 0)
1042 		return B_BAD_VALUE;
1043 
1044 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
1045 	fs_vnode* superVnode = fifo->SuperVnode();
1046 
1047 	if (superVnode->ops->write_stat == NULL)
1048 		return B_BAD_VALUE;
1049 
1050 	status_t error = superVnode->ops->write_stat(volume, superVnode, st,
1051 		statMask);
1052 	if (error != B_OK)
1053 		return error;
1054 
1055 	return B_OK;
1056 }
1057 
1058 
1059 static status_t
1060 fifo_ioctl(fs_volume* _volume, fs_vnode* _node, void* _cookie, uint32 op,
1061 	void* buffer, size_t length)
1062 {
1063 	file_cookie* cookie = (file_cookie*)_cookie;
1064 	Inode* inode = (Inode*)_node->private_node;
1065 
1066 	TRACE("fifo_ioctl: vnode %p, cookie %p, op %" B_PRId32 ", buf %p, len %ld\n",
1067 		_node, _cookie, op, buffer, length);
1068 
1069 	switch (op) {
1070 		case FIONBIO:
1071 		{
1072 			if (buffer == NULL)
1073 				return B_BAD_VALUE;
1074 
1075 			int value;
1076 			if (is_called_via_syscall()) {
1077 				if (!IS_USER_ADDRESS(buffer)
1078 					|| user_memcpy(&value, buffer, sizeof(int)) != B_OK) {
1079 					return B_BAD_ADDRESS;
1080 				}
1081 			} else
1082 				value = *(int*)buffer;
1083 
1084 			MutexLocker locker(inode->RequestLock());
1085 			cookie->SetNonBlocking(value != 0);
1086 			return B_OK;
1087 		}
1088 
1089 		case FIONREAD:
1090 		{
1091 			if (buffer == NULL)
1092 				return B_BAD_VALUE;
1093 
1094 			MutexLocker locker(inode->RequestLock());
1095 			int available = (int)inode->BytesAvailable();
1096 			locker.Unlock();
1097 
1098 			if (is_called_via_syscall()) {
1099 				if (!IS_USER_ADDRESS(buffer)
1100 					|| user_memcpy(buffer, &available, sizeof(available))
1101 						!= B_OK) {
1102 					return B_BAD_ADDRESS;
1103 				}
1104 			} else
1105 				*(int*)buffer = available;
1106 
1107 			return B_OK;
1108 		}
1109 
1110 		case B_SET_BLOCKING_IO:
1111 		case B_SET_NONBLOCKING_IO:
1112 		{
1113 			MutexLocker locker(inode->RequestLock());
1114 			cookie->SetNonBlocking(op == B_SET_NONBLOCKING_IO);
1115 			return B_OK;
1116 		}
1117 	}
1118 
1119 	return EINVAL;
1120 }
1121 
1122 
1123 static status_t
1124 fifo_set_flags(fs_volume* _volume, fs_vnode* _node, void* _cookie,
1125 	int flags)
1126 {
1127 	Inode* inode = (Inode*)_node->private_node;
1128 	file_cookie* cookie = (file_cookie*)_cookie;
1129 
1130 	TRACE("fifo_set_flags(vnode = %p, flags = %x)\n", _node, flags);
1131 
1132 	MutexLocker locker(inode->RequestLock());
1133 	cookie->open_mode = (cookie->open_mode & ~(O_APPEND | O_NONBLOCK)) | flags;
1134 	return B_OK;
1135 }
1136 
1137 
1138 static status_t
1139 fifo_select(fs_volume* _volume, fs_vnode* _node, void* _cookie,
1140 	uint8 event, selectsync* sync)
1141 {
1142 	file_cookie* cookie = (file_cookie*)_cookie;
1143 
1144 	TRACE("fifo_select(vnode = %p)\n", _node);
1145 	Inode* inode = (Inode*)_node->private_node;
1146 	if (!inode)
1147 		return B_ERROR;
1148 
1149 	MutexLocker locker(inode->RequestLock());
1150 	return inode->Select(event, sync, cookie->open_mode);
1151 }
1152 
1153 
1154 static status_t
1155 fifo_deselect(fs_volume* _volume, fs_vnode* _node, void* _cookie,
1156 	uint8 event, selectsync* sync)
1157 {
1158 	file_cookie* cookie = (file_cookie*)_cookie;
1159 
1160 	TRACE("fifo_deselect(vnode = %p)\n", _node);
1161 	Inode* inode = (Inode*)_node->private_node;
1162 	if (inode == NULL)
1163 		return B_ERROR;
1164 
1165 	MutexLocker locker(inode->RequestLock());
1166 	return inode->Deselect(event, sync, cookie->open_mode);
1167 }
1168 
1169 
1170 static bool
1171 fifo_can_page(fs_volume* _volume, fs_vnode* _node, void* cookie)
1172 {
1173 	return false;
1174 }
1175 
1176 
1177 static status_t
1178 fifo_read_pages(fs_volume* _volume, fs_vnode* _node, void* cookie, off_t pos,
1179 	const iovec* vecs, size_t count, size_t* _numBytes)
1180 {
1181 	return B_NOT_ALLOWED;
1182 }
1183 
1184 
1185 static status_t
1186 fifo_write_pages(fs_volume* _volume, fs_vnode* _node, void* cookie,
1187 	off_t pos, const iovec* vecs, size_t count, size_t* _numBytes)
1188 {
1189 	return B_NOT_ALLOWED;
1190 }
1191 
1192 
1193 static status_t
1194 fifo_get_super_vnode(fs_volume* volume, fs_vnode* vnode, fs_volume* superVolume,
1195 	fs_vnode* _superVnode)
1196 {
1197 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
1198 	fs_vnode* superVnode = fifo->SuperVnode();
1199 
1200 	if (superVnode->ops->get_super_vnode != NULL) {
1201 		return superVnode->ops->get_super_vnode(volume, superVnode, superVolume,
1202 			_superVnode);
1203 	}
1204 
1205 	*_superVnode = *superVnode;
1206 
1207 	return B_OK;
1208 }
1209 
1210 
1211 static fs_vnode_ops sFIFOVnodeOps = {
1212 	NULL,	// lookup
1213 	NULL,	// get_vnode_name
1214 					// TODO: This is suboptimal! We'd need to forward the
1215 					// super node's hook, if it has got one.
1216 
1217 	&fifo_put_vnode,
1218 	&fifo_remove_vnode,
1219 
1220 	&fifo_can_page,
1221 	&fifo_read_pages,
1222 	&fifo_write_pages,
1223 
1224 	NULL,	// io()
1225 	NULL,	// cancel_io()
1226 
1227 	NULL,	// get_file_map
1228 
1229 	/* common */
1230 	&fifo_ioctl,
1231 	&fifo_set_flags,
1232 	&fifo_select,
1233 	&fifo_deselect,
1234 	&fifo_fsync,
1235 
1236 	NULL,	// fs_read_link
1237 	NULL,	// fs_symlink
1238 	NULL,	// fs_link
1239 	NULL,	// unlink
1240 	NULL,	// rename
1241 
1242 	NULL,	// fs_access()
1243 	&fifo_read_stat,
1244 	&fifo_write_stat,
1245 	NULL,
1246 
1247 	/* file */
1248 	NULL,	// create()
1249 	&fifo_open,
1250 	&fifo_close,
1251 	&fifo_free_cookie,
1252 	&fifo_read,
1253 	&fifo_write,
1254 
1255 	/* directory */
1256 	NULL,	// create_dir
1257 	NULL,	// remove_dir
1258 	NULL,	// open_dir
1259 	NULL,	// close_dir
1260 	NULL,	// free_dir_cookie
1261 	NULL,	// read_dir
1262 	NULL,	// rewind_dir
1263 
1264 	/* attribute directory operations */
1265 	NULL,	// open_attr_dir
1266 	NULL,	// close_attr_dir
1267 	NULL,	// free_attr_dir_cookie
1268 	NULL,	// read_attr_dir
1269 	NULL,	// rewind_attr_dir
1270 
1271 	/* attribute operations */
1272 	NULL,	// create_attr
1273 	NULL,	// open_attr
1274 	NULL,	// close_attr
1275 	NULL,	// free_attr_cookie
1276 	NULL,	// read_attr
1277 	NULL,	// write_attr
1278 
1279 	NULL,	// read_attr_stat
1280 	NULL,	// write_attr_stat
1281 	NULL,	// rename_attr
1282 	NULL,	// remove_attr
1283 
1284 	/* support for node and FS layers */
1285 	NULL,	// create_special_node
1286 	&fifo_get_super_vnode,
1287 };
1288 
1289 
1290 }	// namespace fifo
1291 
1292 
1293 using namespace fifo;
1294 
1295 
1296 // #pragma mark -
1297 
1298 
1299 status_t
1300 create_fifo_vnode(fs_volume* superVolume, fs_vnode* vnode)
1301 {
1302 	FIFOInode* fifo = new(std::nothrow) FIFOInode(vnode);
1303 	if (fifo == NULL)
1304 		return B_NO_MEMORY;
1305 
1306 	status_t status = fifo->InitCheck();
1307 	if (status != B_OK) {
1308 		delete fifo;
1309 		return status;
1310 	}
1311 
1312 	vnode->private_node = fifo;
1313 	vnode->ops = &sFIFOVnodeOps;
1314 
1315 	return B_OK;
1316 }
1317 
1318 
1319 void
1320 fifo_init()
1321 {
1322 	add_debugger_command_etc("fifo", &Inode::Dump,
1323 		"Print info about the specified FIFO node",
1324 		"[ \"-d\" ] <address>\n"
1325 		"Prints information about the FIFO node specified by address\n"
1326 		"<address>. If \"-d\" is given, the data in the FIFO's ring buffer\n"
1327 		"hexdumped as well.\n",
1328 		0);
1329 }
1330