xref: /haiku/src/system/kernel/fs/fifo.cpp (revision fb52b1f8b4be971f708d17b2ad2454120cd46d81)
1 /*
2  * Copyright 2007-2011, Ingo Weinhold, ingo_weinhold@gmx.de.
3  * Copyright 2003-2010, Axel Dörfler, axeld@pinc-software.de.
4  * Distributed under the terms of the MIT License.
5  */
6 
7 
8 #include "fifo.h"
9 
10 #include <limits.h>
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <string.h>
14 #include <sys/stat.h>
15 
16 #include <new>
17 
18 #include <KernelExport.h>
19 #include <NodeMonitor.h>
20 #include <Select.h>
21 
22 #include <condition_variable.h>
23 #include <debug.h>
24 #include <khash.h>
25 #include <lock.h>
26 #include <select_sync_pool.h>
27 #include <team.h>
28 #include <thread.h>
29 #include <util/DoublyLinkedList.h>
30 #include <util/AutoLock.h>
31 #include <util/ring_buffer.h>
32 #include <vfs.h>
33 #include <vfs_defs.h>
34 #include <vm/vm.h>
35 
36 
37 //#define TRACE_FIFO
38 #ifdef TRACE_FIFO
39 #	define TRACE(x...) dprintf(x)
40 #else
41 #	define TRACE(x...)
42 #endif
43 
44 
45 #define PIPEFS_HASH_SIZE		16
46 
47 
48 namespace fifo {
49 
50 
51 struct file_cookie;
52 class Inode;
53 
54 
55 class RingBuffer {
56 public:
57 								RingBuffer();
58 								~RingBuffer();
59 
60 			status_t			CreateBuffer();
61 			void				DeleteBuffer();
62 
63 			ssize_t				Write(const void* buffer, size_t length);
64 			ssize_t				Read(void* buffer, size_t length);
65 			ssize_t				UserWrite(const void* buffer, ssize_t length);
66 			ssize_t				UserRead(void* buffer, ssize_t length);
67 
68 			size_t				Readable() const;
69 			size_t				Writable() const;
70 
71 private:
72 			struct ring_buffer*	fBuffer;
73 };
74 
75 
76 class ReadRequest : public DoublyLinkedListLinkImpl<ReadRequest> {
77 public:
78 	ReadRequest(file_cookie* cookie)
79 		:
80 		fThread(thread_get_current_thread()),
81 		fCookie(cookie),
82 		fNotified(true)
83 	{
84 		B_INITIALIZE_SPINLOCK(&fLock);
85 	}
86 
87 	void SetNotified(bool notified)
88 	{
89 		InterruptsSpinLocker _(fLock);
90 		fNotified = notified;
91 	}
92 
93 	void Notify(status_t status = B_OK)
94 	{
95 		InterruptsSpinLocker _(fLock);
96 		TRACE("ReadRequest %p::Notify(), fNotified %d\n", this, fNotified);
97 
98 		if (!fNotified) {
99 			SpinLocker schedulerLocker(gSchedulerLock);
100 			thread_unblock_locked(fThread, status);
101 			fNotified = true;
102 		}
103 	}
104 
105 	file_cookie* Cookie() const
106 	{
107 		return fCookie;
108 	}
109 
110 private:
111 	spinlock		fLock;
112 	Thread*			fThread;
113 	file_cookie*	fCookie;
114 	volatile bool	fNotified;
115 };
116 
117 
118 class WriteRequest : public DoublyLinkedListLinkImpl<WriteRequest> {
119 public:
120 	WriteRequest(size_t minimalWriteCount)
121 		:
122 		fMinimalWriteCount(minimalWriteCount)
123 	{
124 	}
125 
126 	size_t MinimalWriteCount() const
127 	{
128 		return fMinimalWriteCount;
129 	}
130 
131 private:
132 	size_t	fMinimalWriteCount;
133 };
134 
135 
136 typedef DoublyLinkedList<ReadRequest> ReadRequestList;
137 typedef DoublyLinkedList<WriteRequest> WriteRequestList;
138 
139 
140 class Inode {
141 public:
142 								Inode();
143 								~Inode();
144 
145 			status_t			InitCheck();
146 
147 			bool				IsActive() const { return fActive; }
148 			timespec			CreationTime() const { return fCreationTime; }
149 			void				SetCreationTime(timespec creationTime)
150 									{ fCreationTime = creationTime; }
151 			timespec			ModificationTime() const
152 									{ return fModificationTime; }
153 			void				SetModificationTime(timespec modificationTime)
154 									{ fModificationTime = modificationTime; }
155 
156 			mutex*				RequestLock() { return &fRequestLock; }
157 
158 			status_t			WriteDataToBuffer(const void* data,
159 									size_t* _length, bool nonBlocking);
160 			status_t			ReadDataFromBuffer(void* data, size_t* _length,
161 									bool nonBlocking, ReadRequest& request);
162 			size_t				BytesAvailable() const
163 									{ return fBuffer.Readable(); }
164 			size_t				BytesWritable() const
165 									{ return fBuffer.Writable(); }
166 
167 			void				AddReadRequest(ReadRequest& request);
168 			void				RemoveReadRequest(ReadRequest& request);
169 			status_t			WaitForReadRequest(ReadRequest& request);
170 
171 			void				NotifyBytesRead(size_t bytes);
172 			void				NotifyReadDone();
173 			void				NotifyBytesWritten(size_t bytes);
174 			void				NotifyEndClosed(bool writer);
175 
176 			void				Open(int openMode);
177 			void				Close(int openMode, file_cookie* cookie);
178 			int32				ReaderCount() const { return fReaderCount; }
179 			int32				WriterCount() const { return fWriterCount; }
180 
181 			status_t			Select(uint8 event, selectsync* sync,
182 									int openMode);
183 			status_t			Deselect(uint8 event, selectsync* sync,
184 									int openMode);
185 
186 private:
187 			timespec			fCreationTime;
188 			timespec			fModificationTime;
189 
190 			RingBuffer			fBuffer;
191 
192 			ReadRequestList		fReadRequests;
193 			WriteRequestList	fWriteRequests;
194 
195 			mutex				fRequestLock;
196 
197 			ConditionVariable	fWriteCondition;
198 
199 			int32				fReaderCount;
200 			int32				fWriterCount;
201 			bool				fActive;
202 
203 			select_sync_pool*	fReadSelectSyncPool;
204 			select_sync_pool*	fWriteSelectSyncPool;
205 };
206 
207 
208 class FIFOInode : public Inode {
209 public:
210 	FIFOInode(fs_vnode* vnode)
211 		:
212 		Inode(),
213 		fSuperVnode(*vnode)
214 	{
215 	}
216 
217 	fs_vnode*	SuperVnode() { return &fSuperVnode; }
218 
219 private:
220 	fs_vnode	fSuperVnode;
221 };
222 
223 
224 struct file_cookie {
225 	int				open_mode;
226 };
227 
228 
229 // #pragma mark -
230 
231 
232 RingBuffer::RingBuffer()
233 	:
234 	fBuffer(NULL)
235 {
236 }
237 
238 
239 RingBuffer::~RingBuffer()
240 {
241 	DeleteBuffer();
242 }
243 
244 
245 status_t
246 RingBuffer::CreateBuffer()
247 {
248 	if (fBuffer != NULL)
249 		return B_OK;
250 
251 	fBuffer = create_ring_buffer(VFS_FIFO_BUFFER_CAPACITY);
252 	return fBuffer != NULL ? B_OK : B_NO_MEMORY;
253 }
254 
255 
256 void
257 RingBuffer::DeleteBuffer()
258 {
259 	if (fBuffer != NULL) {
260 		delete_ring_buffer(fBuffer);
261 		fBuffer = NULL;
262 	}
263 }
264 
265 
266 inline ssize_t
267 RingBuffer::Write(const void* buffer, size_t length)
268 {
269 	if (fBuffer == NULL)
270 		return B_NO_MEMORY;
271 
272 	return ring_buffer_write(fBuffer, (const uint8*)buffer, length);
273 }
274 
275 
276 inline ssize_t
277 RingBuffer::Read(void* buffer, size_t length)
278 {
279 	if (fBuffer == NULL)
280 		return B_NO_MEMORY;
281 
282 	return ring_buffer_read(fBuffer, (uint8*)buffer, length);
283 }
284 
285 
286 inline ssize_t
287 RingBuffer::UserWrite(const void* buffer, ssize_t length)
288 {
289 	if (fBuffer == NULL)
290 		return B_NO_MEMORY;
291 
292 	return ring_buffer_user_write(fBuffer, (const uint8*)buffer, length);
293 }
294 
295 
296 inline ssize_t
297 RingBuffer::UserRead(void* buffer, ssize_t length)
298 {
299 	if (fBuffer == NULL)
300 		return B_NO_MEMORY;
301 
302 	return ring_buffer_user_read(fBuffer, (uint8*)buffer, length);
303 }
304 
305 
306 inline size_t
307 RingBuffer::Readable() const
308 {
309 	return fBuffer != NULL ? ring_buffer_readable(fBuffer) : 0;
310 }
311 
312 
313 inline size_t
314 RingBuffer::Writable() const
315 {
316 	return fBuffer != NULL ? ring_buffer_writable(fBuffer) : 0;
317 }
318 
319 
320 //	#pragma mark -
321 
322 
323 Inode::Inode()
324 	:
325 	fReadRequests(),
326 	fWriteRequests(),
327 	fReaderCount(0),
328 	fWriterCount(0),
329 	fActive(false),
330 	fReadSelectSyncPool(NULL),
331 	fWriteSelectSyncPool(NULL)
332 {
333 	fWriteCondition.Publish(this, "pipe");
334 	mutex_init(&fRequestLock, "pipe request");
335 
336 	bigtime_t time = real_time_clock();
337 	fModificationTime.tv_sec = time / 1000000;
338 	fModificationTime.tv_nsec = (time % 1000000) * 1000;
339 	fCreationTime = fModificationTime;
340 }
341 
342 
343 Inode::~Inode()
344 {
345 	fWriteCondition.Unpublish();
346 	mutex_destroy(&fRequestLock);
347 }
348 
349 
350 status_t
351 Inode::InitCheck()
352 {
353 	return B_OK;
354 }
355 
356 
357 /*!	Writes the specified data bytes to the inode's ring buffer. The
358 	request lock must be held when calling this method.
359 	Notifies readers if necessary, so that blocking readers will get started.
360 	Returns B_OK for success, B_BAD_ADDRESS if copying from the buffer failed,
361 	and various semaphore errors (like B_WOULD_BLOCK in non-blocking mode). If
362 	the returned length is > 0, the returned error code can be ignored.
363 */
364 status_t
365 Inode::WriteDataToBuffer(const void* _data, size_t* _length, bool nonBlocking)
366 {
367 	const uint8* data = (const uint8*)_data;
368 	size_t dataSize = *_length;
369 	size_t& written = *_length;
370 	written = 0;
371 
372 	TRACE("Inode %p::WriteDataToBuffer(data = %p, bytes = %zu)\n", this, data,
373 		dataSize);
374 
375 	// A request up to VFS_FIFO_ATOMIC_WRITE_SIZE bytes shall not be
376 	// interleaved with other writer's data.
377 	size_t minToWrite = 1;
378 	if (dataSize <= VFS_FIFO_ATOMIC_WRITE_SIZE)
379 		minToWrite = dataSize;
380 
381 	while (dataSize > 0) {
382 		// Wait until enough space in the buffer is available.
383 		while (!fActive
384 				|| (fBuffer.Writable() < minToWrite && fReaderCount > 0)) {
385 			if (nonBlocking)
386 				return B_WOULD_BLOCK;
387 
388 			ConditionVariableEntry entry;
389 			entry.Add(this);
390 
391 			WriteRequest request(minToWrite);
392 			fWriteRequests.Add(&request);
393 
394 			mutex_unlock(&fRequestLock);
395 			status_t status = entry.Wait(B_CAN_INTERRUPT);
396 			mutex_lock(&fRequestLock);
397 
398 			fWriteRequests.Remove(&request);
399 
400 			if (status != B_OK)
401 				return status;
402 		}
403 
404 		// write only as long as there are readers left
405 		if (fActive && fReaderCount == 0) {
406 			if (written == 0)
407 				send_signal(find_thread(NULL), SIGPIPE);
408 			return EPIPE;
409 		}
410 
411 		// write as much as we can
412 
413 		size_t toWrite = (fActive ? fBuffer.Writable() : 0);
414 		if (toWrite > dataSize)
415 			toWrite = dataSize;
416 
417 		if (toWrite > 0 && fBuffer.UserWrite(data, toWrite) < 0)
418 			return B_BAD_ADDRESS;
419 
420 		data += toWrite;
421 		dataSize -= toWrite;
422 		written += toWrite;
423 
424 		NotifyBytesWritten(toWrite);
425 	}
426 
427 	return B_OK;
428 }
429 
430 
431 status_t
432 Inode::ReadDataFromBuffer(void* data, size_t* _length, bool nonBlocking,
433 	ReadRequest& request)
434 {
435 	size_t dataSize = *_length;
436 	*_length = 0;
437 
438 	// wait until our request is first in queue
439 	status_t error;
440 	if (fReadRequests.Head() != &request) {
441 		if (nonBlocking)
442 			return B_WOULD_BLOCK;
443 
444 		TRACE("Inode %p::%s(): wait for request %p to become the first "
445 			"request.\n", this, __FUNCTION__, &request);
446 
447 		error = WaitForReadRequest(request);
448 		if (error != B_OK)
449 			return error;
450 	}
451 
452 	// wait until data are available
453 	while (fBuffer.Readable() == 0) {
454 		if (nonBlocking)
455 			return B_WOULD_BLOCK;
456 
457 		if (fActive && fWriterCount == 0)
458 			return B_OK;
459 
460 		TRACE("Inode %p::%s(): wait for data, request %p\n", this, __FUNCTION__,
461 			&request);
462 
463 		error = WaitForReadRequest(request);
464 		if (error != B_OK)
465 			return error;
466 	}
467 
468 	// read as much as we can
469 	size_t toRead = fBuffer.Readable();
470 	if (toRead > dataSize)
471 		toRead = dataSize;
472 
473 	if (fBuffer.UserRead(data, toRead) < 0)
474 		return B_BAD_ADDRESS;
475 
476 	NotifyBytesRead(toRead);
477 
478 	*_length = toRead;
479 
480 	return B_OK;
481 }
482 
483 
484 void
485 Inode::AddReadRequest(ReadRequest& request)
486 {
487 	fReadRequests.Add(&request);
488 }
489 
490 
491 void
492 Inode::RemoveReadRequest(ReadRequest& request)
493 {
494 	fReadRequests.Remove(&request);
495 }
496 
497 
498 status_t
499 Inode::WaitForReadRequest(ReadRequest& request)
500 {
501 	// add the entry to wait on
502 	thread_prepare_to_block(thread_get_current_thread(), B_CAN_INTERRUPT,
503 		THREAD_BLOCK_TYPE_OTHER, "fifo read request");
504 
505 	request.SetNotified(false);
506 
507 	// wait
508 	mutex_unlock(&fRequestLock);
509 	status_t status = thread_block();
510 
511 	// Before going to lock again, we need to make sure no one tries to
512 	// unblock us. Otherwise that would screw with mutex_lock().
513 	request.SetNotified(true);
514 
515 	mutex_lock(&fRequestLock);
516 
517 	return status;
518 }
519 
520 
521 void
522 Inode::NotifyBytesRead(size_t bytes)
523 {
524 	// notify writer, if something can be written now
525 	size_t writable = fBuffer.Writable();
526 	if (bytes > 0) {
527 		// notify select()ors only, if nothing was writable before
528 		if (writable == bytes) {
529 			if (fWriteSelectSyncPool)
530 				notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
531 		}
532 
533 		// If any of the waiting writers has a minimal write count that has
534 		// now become satisfied, we notify all of them (condition variables
535 		// don't support doing that selectively).
536 		WriteRequest* request;
537 		WriteRequestList::Iterator iterator = fWriteRequests.GetIterator();
538 		while ((request = iterator.Next()) != NULL) {
539 			size_t minWriteCount = request->MinimalWriteCount();
540 			if (minWriteCount > 0 && minWriteCount <= writable
541 					&& minWriteCount > writable - bytes) {
542 				fWriteCondition.NotifyAll();
543 				break;
544 			}
545 		}
546 	}
547 }
548 
549 
550 void
551 Inode::NotifyReadDone()
552 {
553 	// notify next reader, if there's still something to be read
554 	if (fBuffer.Readable() > 0) {
555 		if (ReadRequest* request = fReadRequests.First())
556 			request->Notify();
557 	}
558 }
559 
560 
561 void
562 Inode::NotifyBytesWritten(size_t bytes)
563 {
564 	// notify reader, if something can be read now
565 	if (bytes > 0 && fBuffer.Readable() == bytes) {
566 		if (fReadSelectSyncPool)
567 			notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
568 
569 		if (ReadRequest* request = fReadRequests.First())
570 			request->Notify();
571 	}
572 }
573 
574 
575 void
576 Inode::NotifyEndClosed(bool writer)
577 {
578 	TRACE("Inode %p::%s(%s)\n", this, __FUNCTION__,
579 		writer ? "writer" : "reader");
580 
581 	if (writer) {
582 		// Our last writer has been closed; if the pipe
583 		// contains no data, unlock all waiting readers
584 		TRACE("  buffer readable: %zu\n", fBuffer.Readable());
585 		if (fBuffer.Readable() == 0) {
586 			ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
587 			while (ReadRequest* request = iterator.Next())
588 				request->Notify();
589 
590 			if (fReadSelectSyncPool)
591 				notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
592 		}
593 	} else {
594 		// Last reader is gone. Wake up all writers.
595 		fWriteCondition.NotifyAll();
596 
597 		if (fWriteSelectSyncPool) {
598 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
599 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_ERROR);
600 		}
601 	}
602 }
603 
604 
605 void
606 Inode::Open(int openMode)
607 {
608 	MutexLocker locker(RequestLock());
609 
610 	if ((openMode & O_ACCMODE) == O_WRONLY)
611 		fWriterCount++;
612 
613 	if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR)
614 		fReaderCount++;
615 
616 	if (fReaderCount > 0 && fWriterCount > 0) {
617 		TRACE("Inode %p::Open(): fifo becomes active\n", this);
618 		fBuffer.CreateBuffer();
619 		fActive = true;
620 
621 		// notify all waiting writers that they can start
622 		if (fWriteSelectSyncPool)
623 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
624 		fWriteCondition.NotifyAll();
625 	}
626 }
627 
628 
629 void
630 Inode::Close(int openMode, file_cookie* cookie)
631 {
632 	TRACE("Inode %p::Close(openMode = %d)\n", this, openMode);
633 
634 	MutexLocker locker(RequestLock());
635 
636 	// Notify all currently reading file descriptors
637 	ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
638 	while (ReadRequest* request = iterator.Next()) {
639 		if (request->Cookie() == cookie)
640 			request->Notify(B_FILE_ERROR);
641 	}
642 
643 	if ((openMode & O_ACCMODE) == O_WRONLY && --fWriterCount == 0)
644 		NotifyEndClosed(true);
645 
646 	if ((openMode & O_ACCMODE) == O_RDONLY
647 		|| (openMode & O_ACCMODE) == O_RDWR) {
648 		if (--fReaderCount == 0)
649 			NotifyEndClosed(false);
650 	}
651 
652 	if (fWriterCount == 0) {
653 		// Notify any still reading writers to stop
654 		// TODO: This only works reliable if there is only one writer - we could
655 		// do the same thing done for the read requests.
656 		fWriteCondition.NotifyAll(B_FILE_ERROR);
657 	}
658 
659 	if (fReaderCount == 0 && fWriterCount == 0) {
660 		fActive = false;
661 		fBuffer.DeleteBuffer();
662 	}
663 }
664 
665 
666 status_t
667 Inode::Select(uint8 event, selectsync* sync, int openMode)
668 {
669 	bool writer = true;
670 	select_sync_pool** pool;
671 	if ((openMode & O_RWMASK) == O_RDONLY) {
672 		pool = &fReadSelectSyncPool;
673 		writer = false;
674 	} else if ((openMode & O_RWMASK) == O_WRONLY) {
675 		pool = &fWriteSelectSyncPool;
676 	} else
677 		return B_NOT_ALLOWED;
678 
679 	if (add_select_sync_pool_entry(pool, sync, event) != B_OK)
680 		return B_ERROR;
681 
682 	// signal right away, if the condition holds already
683 	if (writer) {
684 		if ((event == B_SELECT_WRITE
685 				&& (fBuffer.Writable() > 0 || fReaderCount == 0))
686 			|| (event == B_SELECT_ERROR && fReaderCount == 0)) {
687 			return notify_select_event(sync, event);
688 		}
689 	} else {
690 		if (event == B_SELECT_READ
691 				&& (fBuffer.Readable() > 0 || fWriterCount == 0)) {
692 			return notify_select_event(sync, event);
693 		}
694 	}
695 
696 	return B_OK;
697 }
698 
699 
700 status_t
701 Inode::Deselect(uint8 event, selectsync* sync, int openMode)
702 {
703 	select_sync_pool** pool;
704 	if ((openMode & O_RWMASK) == O_RDONLY) {
705 		pool = &fReadSelectSyncPool;
706 	} else if ((openMode & O_RWMASK) == O_WRONLY) {
707 		pool = &fWriteSelectSyncPool;
708 	} else
709 		return B_NOT_ALLOWED;
710 
711 	remove_select_sync_pool_entry(pool, sync, event);
712 	return B_OK;
713 }
714 
715 
716 //	#pragma mark - vnode API
717 
718 
719 static status_t
720 fifo_put_vnode(fs_volume* volume, fs_vnode* vnode, bool reenter)
721 {
722 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
723 	fs_vnode* superVnode = fifo->SuperVnode();
724 
725 	status_t error = B_OK;
726 	if (superVnode->ops->put_vnode != NULL)
727 		error = superVnode->ops->put_vnode(volume, superVnode, reenter);
728 
729 	delete fifo;
730 
731 	return error;
732 }
733 
734 
735 static status_t
736 fifo_remove_vnode(fs_volume* volume, fs_vnode* vnode, bool reenter)
737 {
738 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
739 	fs_vnode* superVnode = fifo->SuperVnode();
740 
741 	status_t error = B_OK;
742 	if (superVnode->ops->remove_vnode != NULL)
743 		error = superVnode->ops->remove_vnode(volume, superVnode, reenter);
744 
745 	delete fifo;
746 
747 	return error;
748 }
749 
750 
751 static status_t
752 fifo_open(fs_volume* _volume, fs_vnode* _node, int openMode,
753 	void** _cookie)
754 {
755 	Inode* inode = (Inode*)_node->private_node;
756 
757 	TRACE("fifo_open(): node = %p, openMode = %d\n", inode, openMode);
758 
759 	file_cookie* cookie = (file_cookie*)malloc(sizeof(file_cookie));
760 	if (cookie == NULL)
761 		return B_NO_MEMORY;
762 
763 	TRACE("  open cookie = %p\n", cookie);
764 	cookie->open_mode = openMode;
765 	inode->Open(openMode);
766 
767 	*_cookie = (void*)cookie;
768 
769 	return B_OK;
770 }
771 
772 
773 static status_t
774 fifo_close(fs_volume* volume, fs_vnode* vnode, void* _cookie)
775 {
776 	file_cookie* cookie = (file_cookie*)_cookie;
777 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
778 
779 	fifo->Close(cookie->open_mode, cookie);
780 
781 	return B_OK;
782 }
783 
784 
785 static status_t
786 fifo_free_cookie(fs_volume* _volume, fs_vnode* _node, void* _cookie)
787 {
788 	file_cookie* cookie = (file_cookie*)_cookie;
789 
790 	TRACE("fifo_freecookie: entry vnode %p, cookie %p\n", _node, _cookie);
791 
792 	free(cookie);
793 
794 	return B_OK;
795 }
796 
797 
798 static status_t
799 fifo_fsync(fs_volume* _volume, fs_vnode* _node)
800 {
801 	return B_OK;
802 }
803 
804 
805 static status_t
806 fifo_read(fs_volume* _volume, fs_vnode* _node, void* _cookie,
807 	off_t /*pos*/, void* buffer, size_t* _length)
808 {
809 	file_cookie* cookie = (file_cookie*)_cookie;
810 	Inode* inode = (Inode*)_node->private_node;
811 
812 	TRACE("fifo_read(vnode = %p, cookie = %p, length = %lu, mode = %d)\n",
813 		inode, cookie, *_length, cookie->open_mode);
814 
815 	if ((cookie->open_mode & O_RWMASK) != O_RDONLY)
816 		return B_NOT_ALLOWED;
817 
818 	MutexLocker locker(inode->RequestLock());
819 
820 	if (inode->IsActive() && inode->WriterCount() == 0) {
821 		// as long there is no writer, and the pipe is empty,
822 		// we always just return 0 to indicate end of file
823 		if (inode->BytesAvailable() == 0) {
824 			*_length = 0;
825 			return B_OK;
826 		}
827 	}
828 
829 	// issue read request
830 
831 	ReadRequest request(cookie);
832 	inode->AddReadRequest(request);
833 
834 	TRACE("  issue read request %p\n", &request);
835 
836 	size_t length = *_length;
837 	status_t status = inode->ReadDataFromBuffer(buffer, &length,
838 		(cookie->open_mode & O_NONBLOCK) != 0, request);
839 
840 	inode->RemoveReadRequest(request);
841 	inode->NotifyReadDone();
842 
843 	TRACE("  done reading request %p, length %zu\n", &request, length);
844 
845 	if (length > 0)
846 		status = B_OK;
847 
848 	*_length = length;
849 	return status;
850 }
851 
852 
853 static status_t
854 fifo_write(fs_volume* _volume, fs_vnode* _node, void* _cookie,
855 	off_t /*pos*/, const void* buffer, size_t* _length)
856 {
857 	file_cookie* cookie = (file_cookie*)_cookie;
858 	Inode* inode = (Inode*)_node->private_node;
859 
860 	TRACE("fifo_write(vnode = %p, cookie = %p, length = %lu)\n",
861 		_node, cookie, *_length);
862 
863 	if ((cookie->open_mode & O_RWMASK) != O_WRONLY)
864 		return B_NOT_ALLOWED;
865 
866 	MutexLocker locker(inode->RequestLock());
867 
868 	size_t length = *_length;
869 	if (length == 0)
870 		return B_OK;
871 
872 	// copy data into ring buffer
873 	status_t status = inode->WriteDataToBuffer(buffer, &length,
874 		(cookie->open_mode & O_NONBLOCK) != 0);
875 
876 	if (length > 0)
877 		status = B_OK;
878 
879 	*_length = length;
880 	return status;
881 }
882 
883 
884 static status_t
885 fifo_read_stat(fs_volume* volume, fs_vnode* vnode, struct ::stat* st)
886 {
887 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
888 	fs_vnode* superVnode = fifo->SuperVnode();
889 
890 	if (superVnode->ops->read_stat == NULL)
891 		return B_BAD_VALUE;
892 
893 	status_t error = superVnode->ops->read_stat(volume, superVnode, st);
894 	if (error != B_OK)
895 		return error;
896 
897 
898 	MutexLocker locker(fifo->RequestLock());
899 
900 	st->st_size = fifo->BytesAvailable();
901 
902 	st->st_blksize = 4096;
903 
904 	// TODO: Just pass the changes to our modification time on to the super node.
905 	st->st_atim.tv_sec = time(NULL);
906 	st->st_atim.tv_nsec = 0;
907 	st->st_mtim = st->st_ctim = fifo->ModificationTime();
908 
909 	return B_OK;
910 }
911 
912 
913 static status_t
914 fifo_write_stat(fs_volume* volume, fs_vnode* vnode, const struct ::stat* st,
915 	uint32 statMask)
916 {
917 	// we cannot change the size of anything
918 	if ((statMask & B_STAT_SIZE) != 0)
919 		return B_BAD_VALUE;
920 
921 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
922 	fs_vnode* superVnode = fifo->SuperVnode();
923 
924 	if (superVnode->ops->write_stat == NULL)
925 		return B_BAD_VALUE;
926 
927 	status_t error = superVnode->ops->write_stat(volume, superVnode, st,
928 		statMask);
929 	if (error != B_OK)
930 		return error;
931 
932 	return B_OK;
933 }
934 
935 
936 static status_t
937 fifo_ioctl(fs_volume* _volume, fs_vnode* _vnode, void* _cookie, uint32 op,
938 	void* buffer, size_t length)
939 {
940 	TRACE("fifo_ioctl: vnode %p, cookie %p, op %ld, buf %p, len %ld\n",
941 		_vnode, _cookie, op, buffer, length);
942 
943 	return EINVAL;
944 }
945 
946 
947 static status_t
948 fifo_set_flags(fs_volume* _volume, fs_vnode* _vnode, void* _cookie,
949 	int flags)
950 {
951 	file_cookie* cookie = (file_cookie*)_cookie;
952 
953 	TRACE("fifo_set_flags(vnode = %p, flags = %x)\n", _vnode, flags);
954 	cookie->open_mode = (cookie->open_mode & ~(O_APPEND | O_NONBLOCK)) | flags;
955 	return B_OK;
956 }
957 
958 
959 static status_t
960 fifo_select(fs_volume* _volume, fs_vnode* _node, void* _cookie,
961 	uint8 event, selectsync* sync)
962 {
963 	file_cookie* cookie = (file_cookie*)_cookie;
964 
965 	TRACE("fifo_select(vnode = %p)\n", _node);
966 	Inode* inode = (Inode*)_node->private_node;
967 	if (!inode)
968 		return B_ERROR;
969 
970 	MutexLocker locker(inode->RequestLock());
971 	return inode->Select(event, sync, cookie->open_mode);
972 }
973 
974 
975 static status_t
976 fifo_deselect(fs_volume* _volume, fs_vnode* _node, void* _cookie,
977 	uint8 event, selectsync* sync)
978 {
979 	file_cookie* cookie = (file_cookie*)_cookie;
980 
981 	TRACE("fifo_deselect(vnode = %p)\n", _node);
982 	Inode* inode = (Inode*)_node->private_node;
983 	if (inode == NULL)
984 		return B_ERROR;
985 
986 	MutexLocker locker(inode->RequestLock());
987 	return inode->Deselect(event, sync, cookie->open_mode);
988 }
989 
990 
991 static bool
992 fifo_can_page(fs_volume* _volume, fs_vnode* _node, void* cookie)
993 {
994 	return false;
995 }
996 
997 
998 static status_t
999 fifo_read_pages(fs_volume* _volume, fs_vnode* _node, void* cookie, off_t pos,
1000 	const iovec* vecs, size_t count, size_t* _numBytes)
1001 {
1002 	return B_NOT_ALLOWED;
1003 }
1004 
1005 
1006 static status_t
1007 fifo_write_pages(fs_volume* _volume, fs_vnode* _node, void* cookie,
1008 	off_t pos, const iovec* vecs, size_t count, size_t* _numBytes)
1009 {
1010 	return B_NOT_ALLOWED;
1011 }
1012 
1013 
1014 static status_t
1015 fifo_get_super_vnode(fs_volume* volume, fs_vnode* vnode, fs_volume* superVolume,
1016 	fs_vnode* _superVnode)
1017 {
1018 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
1019 	fs_vnode* superVnode = fifo->SuperVnode();
1020 
1021 	if (superVnode->ops->get_super_vnode != NULL) {
1022 		return superVnode->ops->get_super_vnode(volume, superVnode, superVolume,
1023 			_superVnode);
1024 	}
1025 
1026 	*_superVnode = *superVnode;
1027 
1028 	return B_OK;
1029 }
1030 
1031 
1032 static fs_vnode_ops sFIFOVnodeOps = {
1033 	NULL,	// lookup
1034 	NULL,	// get_vnode_name
1035 					// TODO: This is suboptimal! We'd need to forward the
1036 					// super node's hook, if it has got one.
1037 
1038 	&fifo_put_vnode,
1039 	&fifo_remove_vnode,
1040 
1041 	&fifo_can_page,
1042 	&fifo_read_pages,
1043 	&fifo_write_pages,
1044 
1045 	NULL,	// io()
1046 	NULL,	// cancel_io()
1047 
1048 	NULL,	// get_file_map
1049 
1050 	/* common */
1051 	&fifo_ioctl,
1052 	&fifo_set_flags,
1053 	&fifo_select,
1054 	&fifo_deselect,
1055 	&fifo_fsync,
1056 
1057 	NULL,	// fs_read_link
1058 	NULL,	// fs_symlink
1059 	NULL,	// fs_link
1060 	NULL,	// unlink
1061 	NULL,	// rename
1062 
1063 	NULL,	// fs_access()
1064 	&fifo_read_stat,
1065 	&fifo_write_stat,
1066 	NULL,
1067 
1068 	/* file */
1069 	NULL,	// create()
1070 	&fifo_open,
1071 	&fifo_close,
1072 	&fifo_free_cookie,
1073 	&fifo_read,
1074 	&fifo_write,
1075 
1076 	/* directory */
1077 	NULL,	// create_dir
1078 	NULL,	// remove_dir
1079 	NULL,	// open_dir
1080 	NULL,	// close_dir
1081 	NULL,	// free_dir_cookie
1082 	NULL,	// read_dir
1083 	NULL,	// rewind_dir
1084 
1085 	/* attribute directory operations */
1086 	NULL,	// open_attr_dir
1087 	NULL,	// close_attr_dir
1088 	NULL,	// free_attr_dir_cookie
1089 	NULL,	// read_attr_dir
1090 	NULL,	// rewind_attr_dir
1091 
1092 	/* attribute operations */
1093 	NULL,	// create_attr
1094 	NULL,	// open_attr
1095 	NULL,	// close_attr
1096 	NULL,	// free_attr_cookie
1097 	NULL,	// read_attr
1098 	NULL,	// write_attr
1099 
1100 	NULL,	// read_attr_stat
1101 	NULL,	// write_attr_stat
1102 	NULL,	// rename_attr
1103 	NULL,	// remove_attr
1104 
1105 	/* support for node and FS layers */
1106 	NULL,	// create_special_node
1107 	&fifo_get_super_vnode,
1108 };
1109 
1110 
1111 }	// namespace fifo
1112 
1113 
1114 using namespace fifo;
1115 
1116 
1117 // #pragma mark -
1118 
1119 
1120 status_t
1121 create_fifo_vnode(fs_volume* superVolume, fs_vnode* vnode)
1122 {
1123 	FIFOInode* fifo = new(std::nothrow) FIFOInode(vnode);
1124 	if (fifo == NULL)
1125 		return B_NO_MEMORY;
1126 
1127 	status_t status = fifo->InitCheck();
1128 	if (status != B_OK) {
1129 		delete fifo;
1130 		return status;
1131 	}
1132 
1133 	vnode->private_node = fifo;
1134 	vnode->ops = &sFIFOVnodeOps;
1135 
1136 	return B_OK;
1137 }
1138