xref: /haiku/src/system/kernel/fs/fifo.cpp (revision 65abebfaa73cc5833b38358e5d281a8f05914018)
1 /*
2  * Copyright 2007-2013, Ingo Weinhold, ingo_weinhold@gmx.de.
3  * Copyright 2003-2010, Axel Dörfler, axeld@pinc-software.de.
4  * Distributed under the terms of the MIT License.
5  */
6 
7 
8 #include "fifo.h"
9 
10 #include <limits.h>
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <string.h>
14 #include <sys/ioctl.h>
15 #include <sys/stat.h>
16 
17 #include <new>
18 
19 #include <KernelExport.h>
20 #include <NodeMonitor.h>
21 #include <Select.h>
22 
23 #include <condition_variable.h>
24 #include <debug_hex_dump.h>
25 #include <lock.h>
26 #include <select_sync_pool.h>
27 #include <syscall_restart.h>
28 #include <team.h>
29 #include <thread.h>
30 #include <util/DoublyLinkedList.h>
31 #include <util/AutoLock.h>
32 #include <util/ring_buffer.h>
33 #include <vfs.h>
34 #include <vfs_defs.h>
35 #include <vm/vm.h>
36 
37 
38 //#define TRACE_FIFO
39 #ifdef TRACE_FIFO
40 #	define TRACE(x...) dprintf(x)
41 #else
42 #	define TRACE(x...)
43 #endif
44 
45 
46 namespace fifo {
47 
48 
49 struct file_cookie;
50 class Inode;
51 
52 
53 class RingBuffer {
54 public:
55 								RingBuffer();
56 								~RingBuffer();
57 
58 			status_t			CreateBuffer();
59 			void				DeleteBuffer();
60 
61 			ssize_t				Write(const void* buffer, size_t length,
62 									bool isUser);
63 			ssize_t				Read(void* buffer, size_t length, bool isUser);
64 			ssize_t				Peek(size_t offset, void* buffer,
65 									size_t length) const;
66 
67 			size_t				Readable() const;
68 			size_t				Writable() const;
69 
70 private:
71 			struct ring_buffer*	fBuffer;
72 };
73 
74 
75 class ReadRequest : public DoublyLinkedListLinkImpl<ReadRequest> {
76 public:
77 	ReadRequest(file_cookie* cookie)
78 		:
79 		fThread(thread_get_current_thread()),
80 		fCookie(cookie),
81 		fNotified(true)
82 	{
83 		B_INITIALIZE_SPINLOCK(&fLock);
84 	}
85 
86 	void SetNotified(bool notified)
87 	{
88 		InterruptsSpinLocker _(fLock);
89 		fNotified = notified;
90 	}
91 
92 	void Notify(status_t status = B_OK)
93 	{
94 		InterruptsSpinLocker _(fLock);
95 		TRACE("ReadRequest %p::Notify(), fNotified %d\n", this, fNotified);
96 
97 		if (!fNotified) {
98 			thread_unblock(fThread, status);
99 			fNotified = true;
100 		}
101 	}
102 
103 	Thread* GetThread() const
104 	{
105 		return fThread;
106 	}
107 
108 	file_cookie* Cookie() const
109 	{
110 		return fCookie;
111 	}
112 
113 private:
114 	spinlock		fLock;
115 	Thread*			fThread;
116 	file_cookie*	fCookie;
117 	volatile bool	fNotified;
118 };
119 
120 
121 class WriteRequest : public DoublyLinkedListLinkImpl<WriteRequest> {
122 public:
123 	WriteRequest(Thread* thread, size_t minimalWriteCount)
124 		:
125 		fThread(thread),
126 		fMinimalWriteCount(minimalWriteCount)
127 	{
128 	}
129 
130 	Thread* GetThread() const
131 	{
132 		return fThread;
133 	}
134 
135 	size_t MinimalWriteCount() const
136 	{
137 		return fMinimalWriteCount;
138 	}
139 
140 private:
141 	Thread*	fThread;
142 	size_t	fMinimalWriteCount;
143 };
144 
145 
146 typedef DoublyLinkedList<ReadRequest> ReadRequestList;
147 typedef DoublyLinkedList<WriteRequest> WriteRequestList;
148 
149 
150 class Inode {
151 public:
152 								Inode();
153 								~Inode();
154 
155 			status_t			InitCheck();
156 
157 			bool				IsActive() const { return fActive; }
158 			timespec			CreationTime() const { return fCreationTime; }
159 			void				SetCreationTime(timespec creationTime)
160 									{ fCreationTime = creationTime; }
161 			timespec			ModificationTime() const
162 									{ return fModificationTime; }
163 			void				SetModificationTime(timespec modificationTime)
164 									{ fModificationTime = modificationTime; }
165 
166 			mutex*				RequestLock() { return &fRequestLock; }
167 
168 			status_t			WriteDataToBuffer(const void* data,
169 									size_t* _length, bool nonBlocking,
170 									bool isUser);
171 			status_t			ReadDataFromBuffer(void* data, size_t* _length,
172 									bool nonBlocking, bool isUser,
173 									ReadRequest& request);
174 			size_t				BytesAvailable() const
175 									{ return fBuffer.Readable(); }
176 			size_t				BytesWritable() const
177 									{ return fBuffer.Writable(); }
178 
179 			void				AddReadRequest(ReadRequest& request);
180 			void				RemoveReadRequest(ReadRequest& request);
181 			status_t			WaitForReadRequest(ReadRequest& request);
182 
183 			void				NotifyBytesRead(size_t bytes);
184 			void				NotifyReadDone();
185 			void				NotifyBytesWritten(size_t bytes);
186 			void				NotifyEndClosed(bool writer);
187 
188 			void				Open(int openMode);
189 			void				Close(file_cookie* cookie);
190 			int32				ReaderCount() const { return fReaderCount; }
191 			int32				WriterCount() const { return fWriterCount; }
192 
193 			status_t			Select(uint8 event, selectsync* sync,
194 									int openMode);
195 			status_t			Deselect(uint8 event, selectsync* sync,
196 									int openMode);
197 
198 			void				Dump(bool dumpData) const;
199 	static	int					Dump(int argc, char** argv);
200 
201 private:
202 			timespec			fCreationTime;
203 			timespec			fModificationTime;
204 
205 			RingBuffer			fBuffer;
206 
207 			ReadRequestList		fReadRequests;
208 			WriteRequestList	fWriteRequests;
209 
210 			mutex				fRequestLock;
211 
212 			ConditionVariable	fWriteCondition;
213 
214 			int32				fReaderCount;
215 			int32				fWriterCount;
216 			bool				fActive;
217 
218 			select_sync_pool*	fReadSelectSyncPool;
219 			select_sync_pool*	fWriteSelectSyncPool;
220 };
221 
222 
223 class FIFOInode : public Inode {
224 public:
225 	FIFOInode(fs_vnode* vnode)
226 		:
227 		Inode(),
228 		fSuperVnode(*vnode)
229 	{
230 	}
231 
232 	fs_vnode*	SuperVnode() { return &fSuperVnode; }
233 
234 private:
235 	fs_vnode	fSuperVnode;
236 };
237 
238 
239 struct file_cookie {
240 	int	open_mode;
241 			// guarded by Inode::fRequestLock
242 
243 	void SetNonBlocking(bool nonBlocking)
244 	{
245 		if (nonBlocking)
246 			open_mode |= O_NONBLOCK;
247 		else
248 			open_mode &= ~(int)O_NONBLOCK;
249 	}
250 };
251 
252 
253 // #pragma mark -
254 
255 
256 RingBuffer::RingBuffer()
257 	:
258 	fBuffer(NULL)
259 {
260 }
261 
262 
263 RingBuffer::~RingBuffer()
264 {
265 	DeleteBuffer();
266 }
267 
268 
269 status_t
270 RingBuffer::CreateBuffer()
271 {
272 	if (fBuffer != NULL)
273 		return B_OK;
274 
275 	fBuffer = create_ring_buffer(VFS_FIFO_BUFFER_CAPACITY);
276 	return fBuffer != NULL ? B_OK : B_NO_MEMORY;
277 }
278 
279 
280 void
281 RingBuffer::DeleteBuffer()
282 {
283 	if (fBuffer != NULL) {
284 		delete_ring_buffer(fBuffer);
285 		fBuffer = NULL;
286 	}
287 }
288 
289 
290 inline ssize_t
291 RingBuffer::Write(const void* buffer, size_t length, bool isUser)
292 {
293 	if (fBuffer == NULL)
294 		return B_NO_MEMORY;
295 	if (isUser && !IS_USER_ADDRESS(buffer))
296 		return B_BAD_ADDRESS;
297 
298 	return isUser
299 		? ring_buffer_user_write(fBuffer, (const uint8*)buffer, length)
300 		: ring_buffer_write(fBuffer, (const uint8*)buffer, length);
301 }
302 
303 
304 inline ssize_t
305 RingBuffer::Read(void* buffer, size_t length, bool isUser)
306 {
307 	if (fBuffer == NULL)
308 		return B_NO_MEMORY;
309 	if (isUser && !IS_USER_ADDRESS(buffer))
310 		return B_BAD_ADDRESS;
311 
312 	return isUser
313 		? ring_buffer_user_read(fBuffer, (uint8*)buffer, length)
314 		: ring_buffer_read(fBuffer, (uint8*)buffer, length);
315 }
316 
317 
318 inline ssize_t
319 RingBuffer::Peek(size_t offset, void* buffer, size_t length) const
320 {
321 	if (fBuffer == NULL)
322 		return B_NO_MEMORY;
323 
324 	return ring_buffer_peek(fBuffer, offset, (uint8*)buffer, length);
325 }
326 
327 
328 inline size_t
329 RingBuffer::Readable() const
330 {
331 	return fBuffer != NULL ? ring_buffer_readable(fBuffer) : 0;
332 }
333 
334 
335 inline size_t
336 RingBuffer::Writable() const
337 {
338 	return fBuffer != NULL ? ring_buffer_writable(fBuffer) : 0;
339 }
340 
341 
342 //	#pragma mark -
343 
344 
345 Inode::Inode()
346 	:
347 	fReadRequests(),
348 	fWriteRequests(),
349 	fReaderCount(0),
350 	fWriterCount(0),
351 	fActive(false),
352 	fReadSelectSyncPool(NULL),
353 	fWriteSelectSyncPool(NULL)
354 {
355 	fWriteCondition.Publish(this, "pipe");
356 	mutex_init(&fRequestLock, "pipe request");
357 
358 	bigtime_t time = real_time_clock();
359 	fModificationTime.tv_sec = time / 1000000;
360 	fModificationTime.tv_nsec = (time % 1000000) * 1000;
361 	fCreationTime = fModificationTime;
362 }
363 
364 
365 Inode::~Inode()
366 {
367 	fWriteCondition.Unpublish();
368 	mutex_destroy(&fRequestLock);
369 }
370 
371 
372 status_t
373 Inode::InitCheck()
374 {
375 	return B_OK;
376 }
377 
378 
379 /*!	Writes the specified data bytes to the inode's ring buffer. The
380 	request lock must be held when calling this method.
381 	Notifies readers if necessary, so that blocking readers will get started.
382 	Returns B_OK for success, B_BAD_ADDRESS if copying from the buffer failed,
383 	and various semaphore errors (like B_WOULD_BLOCK in non-blocking mode). If
384 	the returned length is > 0, the returned error code can be ignored.
385 */
386 status_t
387 Inode::WriteDataToBuffer(const void* _data, size_t* _length, bool nonBlocking,
388 	bool isUser)
389 {
390 	const uint8* data = (const uint8*)_data;
391 	size_t dataSize = *_length;
392 	size_t& written = *_length;
393 	written = 0;
394 
395 	TRACE("Inode %p::WriteDataToBuffer(data = %p, bytes = %zu)\n", this, data,
396 		dataSize);
397 
398 	// A request up to VFS_FIFO_ATOMIC_WRITE_SIZE bytes shall not be
399 	// interleaved with other writer's data.
400 	size_t minToWrite = 1;
401 	if (dataSize <= VFS_FIFO_ATOMIC_WRITE_SIZE)
402 		minToWrite = dataSize;
403 
404 	while (dataSize > 0) {
405 		// Wait until enough space in the buffer is available.
406 		while (!fActive
407 				|| (fBuffer.Writable() < minToWrite && fReaderCount > 0)) {
408 			if (nonBlocking)
409 				return B_WOULD_BLOCK;
410 
411 			ConditionVariableEntry entry;
412 			entry.Add(this);
413 
414 			WriteRequest request(thread_get_current_thread(), minToWrite);
415 			fWriteRequests.Add(&request);
416 
417 			mutex_unlock(&fRequestLock);
418 			status_t status = entry.Wait(B_CAN_INTERRUPT);
419 			mutex_lock(&fRequestLock);
420 
421 			fWriteRequests.Remove(&request);
422 
423 			if (status != B_OK)
424 				return status;
425 		}
426 
427 		// write only as long as there are readers left
428 		if (fActive && fReaderCount == 0) {
429 			if (written == 0)
430 				send_signal(find_thread(NULL), SIGPIPE);
431 			return EPIPE;
432 		}
433 
434 		// write as much as we can
435 
436 		size_t toWrite = (fActive ? fBuffer.Writable() : 0);
437 		if (toWrite > dataSize)
438 			toWrite = dataSize;
439 
440 		if (toWrite > 0) {
441 			ssize_t bytesWritten = fBuffer.Write(data, toWrite, isUser);
442 			if (bytesWritten < 0)
443 				return bytesWritten;
444 		}
445 
446 		data += toWrite;
447 		dataSize -= toWrite;
448 		written += toWrite;
449 
450 		NotifyBytesWritten(toWrite);
451 	}
452 
453 	return B_OK;
454 }
455 
456 
457 status_t
458 Inode::ReadDataFromBuffer(void* data, size_t* _length, bool nonBlocking,
459 	bool isUser, ReadRequest& request)
460 {
461 	size_t dataSize = *_length;
462 	*_length = 0;
463 
464 	// wait until our request is first in queue
465 	status_t error;
466 	if (fReadRequests.Head() != &request) {
467 		if (nonBlocking)
468 			return B_WOULD_BLOCK;
469 
470 		TRACE("Inode %p::%s(): wait for request %p to become the first "
471 			"request.\n", this, __FUNCTION__, &request);
472 
473 		error = WaitForReadRequest(request);
474 		if (error != B_OK)
475 			return error;
476 	}
477 
478 	// wait until data are available
479 	while (fBuffer.Readable() == 0) {
480 		if (nonBlocking)
481 			return B_WOULD_BLOCK;
482 
483 		if (fActive && fWriterCount == 0)
484 			return B_OK;
485 
486 		TRACE("Inode %p::%s(): wait for data, request %p\n", this, __FUNCTION__,
487 			&request);
488 
489 		error = WaitForReadRequest(request);
490 		if (error != B_OK)
491 			return error;
492 	}
493 
494 	// read as much as we can
495 	size_t toRead = fBuffer.Readable();
496 	if (toRead > dataSize)
497 		toRead = dataSize;
498 
499 	ssize_t bytesRead = fBuffer.Read(data, toRead, isUser);
500 	if (bytesRead < 0)
501 		return bytesRead;
502 
503 	NotifyBytesRead(toRead);
504 
505 	*_length = toRead;
506 
507 	return B_OK;
508 }
509 
510 
511 void
512 Inode::AddReadRequest(ReadRequest& request)
513 {
514 	fReadRequests.Add(&request);
515 }
516 
517 
518 void
519 Inode::RemoveReadRequest(ReadRequest& request)
520 {
521 	fReadRequests.Remove(&request);
522 }
523 
524 
525 status_t
526 Inode::WaitForReadRequest(ReadRequest& request)
527 {
528 	// add the entry to wait on
529 	thread_prepare_to_block(thread_get_current_thread(), B_CAN_INTERRUPT,
530 		THREAD_BLOCK_TYPE_OTHER, "fifo read request");
531 
532 	request.SetNotified(false);
533 
534 	// wait
535 	mutex_unlock(&fRequestLock);
536 	status_t status = thread_block();
537 
538 	// Before going to lock again, we need to make sure no one tries to
539 	// unblock us. Otherwise that would screw with mutex_lock().
540 	request.SetNotified(true);
541 
542 	mutex_lock(&fRequestLock);
543 
544 	return status;
545 }
546 
547 
548 void
549 Inode::NotifyBytesRead(size_t bytes)
550 {
551 	// notify writer, if something can be written now
552 	size_t writable = fBuffer.Writable();
553 	if (bytes > 0) {
554 		// notify select()ors only, if nothing was writable before
555 		if (writable == bytes) {
556 			if (fWriteSelectSyncPool)
557 				notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
558 		}
559 
560 		// If any of the waiting writers has a minimal write count that has
561 		// now become satisfied, we notify all of them (condition variables
562 		// don't support doing that selectively).
563 		WriteRequest* request;
564 		WriteRequestList::Iterator iterator = fWriteRequests.GetIterator();
565 		while ((request = iterator.Next()) != NULL) {
566 			size_t minWriteCount = request->MinimalWriteCount();
567 			if (minWriteCount > 0 && minWriteCount <= writable
568 					&& minWriteCount > writable - bytes) {
569 				fWriteCondition.NotifyAll();
570 				break;
571 			}
572 		}
573 	}
574 }
575 
576 
577 void
578 Inode::NotifyReadDone()
579 {
580 	// notify next reader, if there's still something to be read
581 	if (fBuffer.Readable() > 0) {
582 		if (ReadRequest* request = fReadRequests.First())
583 			request->Notify();
584 	}
585 }
586 
587 
588 void
589 Inode::NotifyBytesWritten(size_t bytes)
590 {
591 	// notify reader, if something can be read now
592 	if (bytes > 0 && fBuffer.Readable() == bytes) {
593 		if (fReadSelectSyncPool)
594 			notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
595 
596 		if (ReadRequest* request = fReadRequests.First())
597 			request->Notify();
598 	}
599 }
600 
601 
602 void
603 Inode::NotifyEndClosed(bool writer)
604 {
605 	TRACE("Inode %p::%s(%s)\n", this, __FUNCTION__,
606 		writer ? "writer" : "reader");
607 
608 	if (writer) {
609 		// Our last writer has been closed; if the pipe
610 		// contains no data, unlock all waiting readers
611 		TRACE("  buffer readable: %zu\n", fBuffer.Readable());
612 		if (fBuffer.Readable() == 0) {
613 			ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
614 			while (ReadRequest* request = iterator.Next())
615 				request->Notify();
616 
617 			if (fReadSelectSyncPool)
618 				notify_select_event_pool(fReadSelectSyncPool, B_SELECT_DISCONNECTED);
619 				notify_select_event_pool(fReadSelectSyncPool, B_SELECT_ERROR);
620 				notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
621 		}
622 	} else {
623 		// Last reader is gone. Wake up all writers.
624 		fWriteCondition.NotifyAll();
625 
626 		if (fWriteSelectSyncPool) {
627 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_DISCONNECTED);
628 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_ERROR);
629 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
630 		}
631 	}
632 }
633 
634 
635 void
636 Inode::Open(int openMode)
637 {
638 	MutexLocker locker(RequestLock());
639 
640 	if ((openMode & O_ACCMODE) == O_WRONLY)
641 		fWriterCount++;
642 
643 	if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR)
644 		fReaderCount++;
645 
646 	if (fReaderCount > 0 && fWriterCount > 0) {
647 		TRACE("Inode %p::Open(): fifo becomes active\n", this);
648 		fBuffer.CreateBuffer();
649 		fActive = true;
650 
651 		// notify all waiting writers that they can start
652 		if (fWriteSelectSyncPool)
653 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
654 		fWriteCondition.NotifyAll();
655 	}
656 }
657 
658 
659 void
660 Inode::Close(file_cookie* cookie)
661 {
662 	TRACE("Inode %p::Close(openMode = %d)\n", this, openMode);
663 
664 	MutexLocker locker(RequestLock());
665 
666 	int openMode = cookie->open_mode;
667 
668 	// Notify all currently reading file descriptors
669 	ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
670 	while (ReadRequest* request = iterator.Next()) {
671 		if (request->Cookie() == cookie)
672 			request->Notify(B_FILE_ERROR);
673 	}
674 
675 	if ((openMode & O_ACCMODE) == O_WRONLY && --fWriterCount == 0)
676 		NotifyEndClosed(true);
677 
678 	if ((openMode & O_ACCMODE) == O_RDONLY
679 		|| (openMode & O_ACCMODE) == O_RDWR) {
680 		if (--fReaderCount == 0)
681 			NotifyEndClosed(false);
682 	}
683 
684 	if (fWriterCount == 0) {
685 		// Notify any still reading writers to stop
686 		// TODO: This only works reliable if there is only one writer - we could
687 		// do the same thing done for the read requests.
688 		fWriteCondition.NotifyAll(B_FILE_ERROR);
689 	}
690 
691 	if (fReaderCount == 0 && fWriterCount == 0) {
692 		fActive = false;
693 		fBuffer.DeleteBuffer();
694 	}
695 }
696 
697 
698 status_t
699 Inode::Select(uint8 event, selectsync* sync, int openMode)
700 {
701 	bool writer = true;
702 	select_sync_pool** pool;
703 	if ((openMode & O_RWMASK) == O_RDONLY) {
704 		pool = &fReadSelectSyncPool;
705 		writer = false;
706 	} else if ((openMode & O_RWMASK) == O_WRONLY) {
707 		pool = &fWriteSelectSyncPool;
708 	} else
709 		return B_NOT_ALLOWED;
710 
711 	if (add_select_sync_pool_entry(pool, sync, event) != B_OK)
712 		return B_ERROR;
713 
714 	// signal right away, if the condition holds already
715 	if (writer) {
716 		if (event == B_SELECT_WRITE && fBuffer.Writable() > 0)
717 			return notify_select_event(sync, event);
718 		if (fReaderCount == 0) {
719 			if (event == B_SELECT_ERROR)
720 				return notify_select_event(sync, event);
721 			return notify_select_event(sync, B_SELECT_DISCONNECTED);
722 		}
723 	} else {
724 		if (event == B_SELECT_READ && fBuffer.Readable() > 0)
725 			return notify_select_event(sync, event);
726 		if (fWriterCount == 0) {
727 			if (event == B_SELECT_ERROR)
728 				return notify_select_event(sync, event);
729 			return notify_select_event(sync, B_SELECT_DISCONNECTED);
730 		}
731 	}
732 
733 	return B_OK;
734 }
735 
736 
737 status_t
738 Inode::Deselect(uint8 event, selectsync* sync, int openMode)
739 {
740 	select_sync_pool** pool;
741 	if ((openMode & O_RWMASK) == O_RDONLY) {
742 		pool = &fReadSelectSyncPool;
743 	} else if ((openMode & O_RWMASK) == O_WRONLY) {
744 		pool = &fWriteSelectSyncPool;
745 	} else
746 		return B_NOT_ALLOWED;
747 
748 	remove_select_sync_pool_entry(pool, sync, event);
749 	return B_OK;
750 }
751 
752 
753 void
754 Inode::Dump(bool dumpData) const
755 {
756 	kprintf("FIFO %p\n", this);
757 	kprintf("  active:        %s\n", fActive ? "true" : "false");
758 	kprintf("  readers:       %" B_PRId32 "\n", fReaderCount);
759 	kprintf("  writers:       %" B_PRId32 "\n", fWriterCount);
760 
761 	if (!fReadRequests.IsEmpty()) {
762 		kprintf(" pending readers:\n");
763 		for (ReadRequestList::ConstIterator it = fReadRequests.GetIterator();
764 			ReadRequest* request = it.Next();) {
765 			kprintf("    %p: thread %" B_PRId32 ", cookie: %p\n", request,
766 				request->GetThread()->id, request->Cookie());
767 		}
768 	}
769 
770 	if (!fWriteRequests.IsEmpty()) {
771 		kprintf(" pending writers:\n");
772 		for (WriteRequestList::ConstIterator it = fWriteRequests.GetIterator();
773 			WriteRequest* request = it.Next();) {
774 			kprintf("    %p:  thread %" B_PRId32 ", min count: %zu\n", request,
775 				request->GetThread()->id, request->MinimalWriteCount());
776 		}
777 	}
778 
779 	kprintf("  %zu bytes buffered\n", fBuffer.Readable());
780 
781 	if (dumpData && fBuffer.Readable() > 0) {
782 		struct DataProvider : BKernel::HexDumpDataProvider {
783 			DataProvider(const RingBuffer& buffer)
784 				:
785 				fBuffer(buffer),
786 				fOffset(0)
787 			{
788 			}
789 
790 			virtual bool HasMoreData() const
791 			{
792 				return fOffset < fBuffer.Readable();
793 			}
794 
795 			virtual uint8 NextByte()
796 			{
797 				uint8 byte = '\0';
798 				if (fOffset < fBuffer.Readable()) {
799 					fBuffer.Peek(fOffset, &byte, 1);
800 					fOffset++;
801 				}
802 				return byte;
803 			}
804 
805 			virtual bool GetAddressString(char* buffer, size_t bufferSize) const
806 			{
807 				snprintf(buffer, bufferSize, "    %4zx", fOffset);
808 				return true;
809 			}
810 
811 		private:
812 			const RingBuffer&	fBuffer;
813 			size_t				fOffset;
814 		};
815 
816 		DataProvider dataProvider(fBuffer);
817 		BKernel::print_hex_dump(dataProvider, fBuffer.Readable());
818 	}
819 }
820 
821 
822 /*static*/ int
823 Inode::Dump(int argc, char** argv)
824 {
825 	bool dumpData = false;
826 	int argi = 1;
827 	if (argi < argc && strcmp(argv[argi], "-d") == 0) {
828 		dumpData = true;
829 		argi++;
830 	}
831 
832 	if (argi >= argc || argi + 2 < argc) {
833 		print_debugger_command_usage(argv[0]);
834 		return 0;
835 	}
836 
837 	Inode* node = (Inode*)parse_expression(argv[argi]);
838 	if (IS_USER_ADDRESS(node)) {
839 		kprintf("invalid FIFO address\n");
840 		return 0;
841 	}
842 
843 	node->Dump(dumpData);
844 	return 0;
845 }
846 
847 
848 //	#pragma mark - vnode API
849 
850 
851 static status_t
852 fifo_put_vnode(fs_volume* volume, fs_vnode* vnode, bool reenter)
853 {
854 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
855 	fs_vnode* superVnode = fifo->SuperVnode();
856 
857 	status_t error = B_OK;
858 	if (superVnode->ops->put_vnode != NULL)
859 		error = superVnode->ops->put_vnode(volume, superVnode, reenter);
860 
861 	delete fifo;
862 
863 	return error;
864 }
865 
866 
867 static status_t
868 fifo_remove_vnode(fs_volume* volume, fs_vnode* vnode, bool reenter)
869 {
870 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
871 	fs_vnode* superVnode = fifo->SuperVnode();
872 
873 	status_t error = B_OK;
874 	if (superVnode->ops->remove_vnode != NULL)
875 		error = superVnode->ops->remove_vnode(volume, superVnode, reenter);
876 
877 	delete fifo;
878 
879 	return error;
880 }
881 
882 
883 static status_t
884 fifo_open(fs_volume* _volume, fs_vnode* _node, int openMode,
885 	void** _cookie)
886 {
887 	Inode* inode = (Inode*)_node->private_node;
888 
889 	TRACE("fifo_open(): node = %p, openMode = %d\n", inode, openMode);
890 
891 	file_cookie* cookie = (file_cookie*)malloc(sizeof(file_cookie));
892 	if (cookie == NULL)
893 		return B_NO_MEMORY;
894 
895 	TRACE("  open cookie = %p\n", cookie);
896 	cookie->open_mode = openMode;
897 	inode->Open(openMode);
898 
899 	*_cookie = (void*)cookie;
900 
901 	return B_OK;
902 }
903 
904 
905 static status_t
906 fifo_close(fs_volume* volume, fs_vnode* vnode, void* _cookie)
907 {
908 	file_cookie* cookie = (file_cookie*)_cookie;
909 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
910 
911 	fifo->Close(cookie);
912 
913 	return B_OK;
914 }
915 
916 
917 static status_t
918 fifo_free_cookie(fs_volume* _volume, fs_vnode* _node, void* _cookie)
919 {
920 	file_cookie* cookie = (file_cookie*)_cookie;
921 
922 	TRACE("fifo_freecookie: entry vnode %p, cookie %p\n", _node, _cookie);
923 
924 	free(cookie);
925 
926 	return B_OK;
927 }
928 
929 
930 static status_t
931 fifo_fsync(fs_volume* _volume, fs_vnode* _node)
932 {
933 	return B_OK;
934 }
935 
936 
937 static status_t
938 fifo_read(fs_volume* _volume, fs_vnode* _node, void* _cookie,
939 	off_t /*pos*/, void* buffer, size_t* _length)
940 {
941 	file_cookie* cookie = (file_cookie*)_cookie;
942 	Inode* inode = (Inode*)_node->private_node;
943 
944 	TRACE("fifo_read(vnode = %p, cookie = %p, length = %lu, mode = %d)\n",
945 		inode, cookie, *_length, cookie->open_mode);
946 
947 	MutexLocker locker(inode->RequestLock());
948 
949 	if ((cookie->open_mode & O_RWMASK) != O_RDONLY)
950 		return B_NOT_ALLOWED;
951 
952 	if (inode->IsActive() && inode->WriterCount() == 0) {
953 		// as long there is no writer, and the pipe is empty,
954 		// we always just return 0 to indicate end of file
955 		if (inode->BytesAvailable() == 0) {
956 			*_length = 0;
957 			return B_OK;
958 		}
959 	}
960 
961 	// issue read request
962 
963 	ReadRequest request(cookie);
964 	inode->AddReadRequest(request);
965 
966 	TRACE("  issue read request %p\n", &request);
967 
968 	size_t length = *_length;
969 	status_t status = inode->ReadDataFromBuffer(buffer, &length,
970 		(cookie->open_mode & O_NONBLOCK) != 0, is_called_via_syscall(),
971 		request);
972 
973 	inode->RemoveReadRequest(request);
974 	inode->NotifyReadDone();
975 
976 	TRACE("  done reading request %p, length %zu\n", &request, length);
977 
978 	if (length > 0)
979 		status = B_OK;
980 
981 	*_length = length;
982 	return status;
983 }
984 
985 
986 static status_t
987 fifo_write(fs_volume* _volume, fs_vnode* _node, void* _cookie,
988 	off_t /*pos*/, const void* buffer, size_t* _length)
989 {
990 	file_cookie* cookie = (file_cookie*)_cookie;
991 	Inode* inode = (Inode*)_node->private_node;
992 
993 	TRACE("fifo_write(vnode = %p, cookie = %p, length = %lu)\n",
994 		_node, cookie, *_length);
995 
996 	MutexLocker locker(inode->RequestLock());
997 
998 	if ((cookie->open_mode & O_RWMASK) != O_WRONLY)
999 		return B_NOT_ALLOWED;
1000 
1001 	size_t length = *_length;
1002 	if (length == 0)
1003 		return B_OK;
1004 
1005 	// copy data into ring buffer
1006 	status_t status = inode->WriteDataToBuffer(buffer, &length,
1007 		(cookie->open_mode & O_NONBLOCK) != 0, is_called_via_syscall());
1008 
1009 	if (length > 0)
1010 		status = B_OK;
1011 
1012 	*_length = length;
1013 	return status;
1014 }
1015 
1016 
1017 static status_t
1018 fifo_read_stat(fs_volume* volume, fs_vnode* vnode, struct ::stat* st)
1019 {
1020 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
1021 	fs_vnode* superVnode = fifo->SuperVnode();
1022 
1023 	if (superVnode->ops->read_stat == NULL)
1024 		return B_BAD_VALUE;
1025 
1026 	status_t error = superVnode->ops->read_stat(volume, superVnode, st);
1027 	if (error != B_OK)
1028 		return error;
1029 
1030 
1031 	MutexLocker locker(fifo->RequestLock());
1032 
1033 	st->st_size = fifo->BytesAvailable();
1034 
1035 	st->st_blksize = 4096;
1036 
1037 	// TODO: Just pass the changes to our modification time on to the super node.
1038 	st->st_atim.tv_sec = time(NULL);
1039 	st->st_atim.tv_nsec = 0;
1040 	st->st_mtim = st->st_ctim = fifo->ModificationTime();
1041 
1042 	return B_OK;
1043 }
1044 
1045 
1046 static status_t
1047 fifo_write_stat(fs_volume* volume, fs_vnode* vnode, const struct ::stat* st,
1048 	uint32 statMask)
1049 {
1050 	// we cannot change the size of anything
1051 	if ((statMask & B_STAT_SIZE) != 0)
1052 		return B_BAD_VALUE;
1053 
1054 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
1055 	fs_vnode* superVnode = fifo->SuperVnode();
1056 
1057 	if (superVnode->ops->write_stat == NULL)
1058 		return B_BAD_VALUE;
1059 
1060 	status_t error = superVnode->ops->write_stat(volume, superVnode, st,
1061 		statMask);
1062 	if (error != B_OK)
1063 		return error;
1064 
1065 	return B_OK;
1066 }
1067 
1068 
1069 static status_t
1070 fifo_ioctl(fs_volume* _volume, fs_vnode* _node, void* _cookie, uint32 op,
1071 	void* buffer, size_t length)
1072 {
1073 	file_cookie* cookie = (file_cookie*)_cookie;
1074 	Inode* inode = (Inode*)_node->private_node;
1075 
1076 	TRACE("fifo_ioctl: vnode %p, cookie %p, op %ld, buf %p, len %ld\n",
1077 		_vnode, _cookie, op, buffer, length);
1078 
1079 	switch (op) {
1080 		case FIONBIO:
1081 		{
1082 			if (buffer == NULL)
1083 				return B_BAD_VALUE;
1084 
1085 			int value;
1086 			if (is_called_via_syscall()) {
1087 				if (!IS_USER_ADDRESS(buffer)
1088 					|| user_memcpy(&value, buffer, sizeof(int)) != B_OK) {
1089 					return B_BAD_ADDRESS;
1090 				}
1091 			} else
1092 				value = *(int*)buffer;
1093 
1094 			MutexLocker locker(inode->RequestLock());
1095 			cookie->SetNonBlocking(value != 0);
1096 			return B_OK;
1097 		}
1098 
1099 		case FIONREAD:
1100 		{
1101 			if (buffer == NULL)
1102 				return B_BAD_VALUE;
1103 
1104 			MutexLocker locker(inode->RequestLock());
1105 			int available = (int)inode->BytesAvailable();
1106 			locker.Unlock();
1107 
1108 			if (is_called_via_syscall()) {
1109 				if (!IS_USER_ADDRESS(buffer)
1110 					|| user_memcpy(buffer, &available, sizeof(available))
1111 						!= B_OK) {
1112 					return B_BAD_ADDRESS;
1113 				}
1114 			} else
1115 				*(int*)buffer = available;
1116 
1117 			return B_OK;
1118 		}
1119 
1120 		case B_SET_BLOCKING_IO:
1121 		case B_SET_NONBLOCKING_IO:
1122 		{
1123 			MutexLocker locker(inode->RequestLock());
1124 			cookie->SetNonBlocking(op == B_SET_NONBLOCKING_IO);
1125 			return B_OK;
1126 		}
1127 	}
1128 
1129 	return EINVAL;
1130 }
1131 
1132 
1133 static status_t
1134 fifo_set_flags(fs_volume* _volume, fs_vnode* _node, void* _cookie,
1135 	int flags)
1136 {
1137 	Inode* inode = (Inode*)_node->private_node;
1138 	file_cookie* cookie = (file_cookie*)_cookie;
1139 
1140 	TRACE("fifo_set_flags(vnode = %p, flags = %x)\n", _vnode, flags);
1141 
1142 	MutexLocker locker(inode->RequestLock());
1143 	cookie->open_mode = (cookie->open_mode & ~(O_APPEND | O_NONBLOCK)) | flags;
1144 	return B_OK;
1145 }
1146 
1147 
1148 static status_t
1149 fifo_select(fs_volume* _volume, fs_vnode* _node, void* _cookie,
1150 	uint8 event, selectsync* sync)
1151 {
1152 	file_cookie* cookie = (file_cookie*)_cookie;
1153 
1154 	TRACE("fifo_select(vnode = %p)\n", _node);
1155 	Inode* inode = (Inode*)_node->private_node;
1156 	if (!inode)
1157 		return B_ERROR;
1158 
1159 	MutexLocker locker(inode->RequestLock());
1160 	return inode->Select(event, sync, cookie->open_mode);
1161 }
1162 
1163 
1164 static status_t
1165 fifo_deselect(fs_volume* _volume, fs_vnode* _node, void* _cookie,
1166 	uint8 event, selectsync* sync)
1167 {
1168 	file_cookie* cookie = (file_cookie*)_cookie;
1169 
1170 	TRACE("fifo_deselect(vnode = %p)\n", _node);
1171 	Inode* inode = (Inode*)_node->private_node;
1172 	if (inode == NULL)
1173 		return B_ERROR;
1174 
1175 	MutexLocker locker(inode->RequestLock());
1176 	return inode->Deselect(event, sync, cookie->open_mode);
1177 }
1178 
1179 
1180 static bool
1181 fifo_can_page(fs_volume* _volume, fs_vnode* _node, void* cookie)
1182 {
1183 	return false;
1184 }
1185 
1186 
1187 static status_t
1188 fifo_read_pages(fs_volume* _volume, fs_vnode* _node, void* cookie, off_t pos,
1189 	const iovec* vecs, size_t count, size_t* _numBytes)
1190 {
1191 	return B_NOT_ALLOWED;
1192 }
1193 
1194 
1195 static status_t
1196 fifo_write_pages(fs_volume* _volume, fs_vnode* _node, void* cookie,
1197 	off_t pos, const iovec* vecs, size_t count, size_t* _numBytes)
1198 {
1199 	return B_NOT_ALLOWED;
1200 }
1201 
1202 
1203 static status_t
1204 fifo_get_super_vnode(fs_volume* volume, fs_vnode* vnode, fs_volume* superVolume,
1205 	fs_vnode* _superVnode)
1206 {
1207 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
1208 	fs_vnode* superVnode = fifo->SuperVnode();
1209 
1210 	if (superVnode->ops->get_super_vnode != NULL) {
1211 		return superVnode->ops->get_super_vnode(volume, superVnode, superVolume,
1212 			_superVnode);
1213 	}
1214 
1215 	*_superVnode = *superVnode;
1216 
1217 	return B_OK;
1218 }
1219 
1220 
1221 static fs_vnode_ops sFIFOVnodeOps = {
1222 	NULL,	// lookup
1223 	NULL,	// get_vnode_name
1224 					// TODO: This is suboptimal! We'd need to forward the
1225 					// super node's hook, if it has got one.
1226 
1227 	&fifo_put_vnode,
1228 	&fifo_remove_vnode,
1229 
1230 	&fifo_can_page,
1231 	&fifo_read_pages,
1232 	&fifo_write_pages,
1233 
1234 	NULL,	// io()
1235 	NULL,	// cancel_io()
1236 
1237 	NULL,	// get_file_map
1238 
1239 	/* common */
1240 	&fifo_ioctl,
1241 	&fifo_set_flags,
1242 	&fifo_select,
1243 	&fifo_deselect,
1244 	&fifo_fsync,
1245 
1246 	NULL,	// fs_read_link
1247 	NULL,	// fs_symlink
1248 	NULL,	// fs_link
1249 	NULL,	// unlink
1250 	NULL,	// rename
1251 
1252 	NULL,	// fs_access()
1253 	&fifo_read_stat,
1254 	&fifo_write_stat,
1255 	NULL,
1256 
1257 	/* file */
1258 	NULL,	// create()
1259 	&fifo_open,
1260 	&fifo_close,
1261 	&fifo_free_cookie,
1262 	&fifo_read,
1263 	&fifo_write,
1264 
1265 	/* directory */
1266 	NULL,	// create_dir
1267 	NULL,	// remove_dir
1268 	NULL,	// open_dir
1269 	NULL,	// close_dir
1270 	NULL,	// free_dir_cookie
1271 	NULL,	// read_dir
1272 	NULL,	// rewind_dir
1273 
1274 	/* attribute directory operations */
1275 	NULL,	// open_attr_dir
1276 	NULL,	// close_attr_dir
1277 	NULL,	// free_attr_dir_cookie
1278 	NULL,	// read_attr_dir
1279 	NULL,	// rewind_attr_dir
1280 
1281 	/* attribute operations */
1282 	NULL,	// create_attr
1283 	NULL,	// open_attr
1284 	NULL,	// close_attr
1285 	NULL,	// free_attr_cookie
1286 	NULL,	// read_attr
1287 	NULL,	// write_attr
1288 
1289 	NULL,	// read_attr_stat
1290 	NULL,	// write_attr_stat
1291 	NULL,	// rename_attr
1292 	NULL,	// remove_attr
1293 
1294 	/* support for node and FS layers */
1295 	NULL,	// create_special_node
1296 	&fifo_get_super_vnode,
1297 };
1298 
1299 
1300 }	// namespace fifo
1301 
1302 
1303 using namespace fifo;
1304 
1305 
1306 // #pragma mark -
1307 
1308 
1309 status_t
1310 create_fifo_vnode(fs_volume* superVolume, fs_vnode* vnode)
1311 {
1312 	FIFOInode* fifo = new(std::nothrow) FIFOInode(vnode);
1313 	if (fifo == NULL)
1314 		return B_NO_MEMORY;
1315 
1316 	status_t status = fifo->InitCheck();
1317 	if (status != B_OK) {
1318 		delete fifo;
1319 		return status;
1320 	}
1321 
1322 	vnode->private_node = fifo;
1323 	vnode->ops = &sFIFOVnodeOps;
1324 
1325 	return B_OK;
1326 }
1327 
1328 
1329 void
1330 fifo_init()
1331 {
1332 	add_debugger_command_etc("fifo", &Inode::Dump,
1333 		"Print info about the specified FIFO node",
1334 		"[ \"-d\" ] <address>\n"
1335 		"Prints information about the FIFO node specified by address\n"
1336 		"<address>. If \"-d\" is given, the data in the FIFO's ring buffer\n"
1337 		"hexdumped as well.\n",
1338 		0);
1339 }
1340