xref: /haiku/src/system/kernel/fs/fifo.cpp (revision 4f2fd49bdc6078128b1391191e4edac647044c3d)
1 /*
2  * Copyright 2007-2008, Ingo Weinhold, ingo_weinhold@gmx.de.
3  * Copyright 2003-2007, Axel Dörfler, axeld@pinc-software.de.
4  * Distributed under the terms of the MIT License.
5  */
6 
7 #include <limits.h>
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <string.h>
11 #include <sys/stat.h>
12 
13 #include <new>
14 
15 #include <KernelExport.h>
16 #include <NodeMonitor.h>
17 #include <Select.h>
18 
19 #include <condition_variable.h>
20 #include <debug.h>
21 #include <khash.h>
22 #include <lock.h>
23 #include <select_sync_pool.h>
24 #include <team.h>
25 #include <thread.h>
26 #include <util/DoublyLinkedList.h>
27 #include <util/AutoLock.h>
28 #include <util/ring_buffer.h>
29 #include <vfs.h>
30 #include <vm.h>
31 
32 #include "fifo.h"
33 
34 
35 //#define TRACE_FIFO
36 #ifdef TRACE_FIFO
37 #	define TRACE(x) dprintf x
38 #else
39 #	define TRACE(x)
40 #endif
41 
42 
43 #define PIPEFS_HASH_SIZE		16
44 #define PIPEFS_MAX_BUFFER_SIZE	32768
45 
46 
47 // TODO: PIPE_BUF is supposed to be defined somewhere else.
48 #define PIPE_BUF	_POSIX_PIPE_BUF
49 
50 
51 namespace fifo {
52 
53 class Inode;
54 
55 class RingBuffer {
56 	public:
57 		RingBuffer();
58 		~RingBuffer();
59 
60 		status_t CreateBuffer();
61 		void DeleteBuffer();
62 
63 		size_t Write(const void *buffer, size_t length);
64 		size_t Read(void *buffer, size_t length);
65 		ssize_t UserWrite(const void *buffer, ssize_t length);
66 		ssize_t UserRead(void *buffer, ssize_t length);
67 
68 		size_t Readable() const;
69 		size_t Writable() const;
70 
71 	private:
72 		struct ring_buffer	*fBuffer;
73 };
74 
75 
76 class ReadRequest : public DoublyLinkedListLinkImpl<ReadRequest> {
77 	public:
78 		ReadRequest()
79 			:
80 			fThread(thread_get_current_thread()),
81 			fNotified(true)
82 		{
83 			B_INITIALIZE_SPINLOCK(&fLock);
84 		}
85 
86 		void SetNotified(bool notified)
87 		{
88 			InterruptsSpinLocker _(fLock);
89 			fNotified = notified;
90 		}
91 
92 		void Notify()
93 		{
94 			InterruptsSpinLocker _(fLock);
95 
96 			if (!fNotified) {
97 				SpinLocker threadLocker(gThreadSpinlock);
98 				thread_unblock_locked(fThread, B_OK);
99 				fNotified = true;
100 			}
101 		}
102 
103 	private:
104 		spinlock			fLock;
105 		struct thread*		fThread;
106 		volatile bool		fNotified;
107 };
108 
109 
110 class WriteRequest : public DoublyLinkedListLinkImpl<WriteRequest> {
111 	public:
112 		WriteRequest(size_t minimalWriteCount)
113 			:
114 			fMinimalWriteCount(minimalWriteCount)
115 		{
116 		}
117 
118 		size_t MinimalWriteCount() const
119 		{
120 			return fMinimalWriteCount;
121 		}
122 
123 	private:
124 		size_t	fMinimalWriteCount;
125 };
126 
127 
128 typedef DoublyLinkedList<ReadRequest> ReadRequestList;
129 typedef DoublyLinkedList<WriteRequest> WriteRequestList;
130 
131 
132 class Inode {
133 	public:
134 		Inode();
135 		~Inode();
136 
137 		status_t	InitCheck();
138 
139 		bool		IsActive() const { return fActive; }
140 		time_t		CreationTime() const { return fCreationTime; }
141 		void		SetCreationTime(time_t creationTime)
142 						{ fCreationTime = creationTime; }
143 		time_t		ModificationTime() const { return fModificationTime; }
144 		void		SetModificationTime(time_t modificationTime)
145 						{ fModificationTime = modificationTime; }
146 
147 		mutex		*RequestLock() { return &fRequestLock; }
148 
149 		status_t	WriteDataToBuffer(const void *data, size_t *_length,
150 						bool nonBlocking);
151 		status_t	ReadDataFromBuffer(void *data, size_t *_length,
152 						bool nonBlocking, ReadRequest &request);
153 		size_t		BytesAvailable() const { return fBuffer.Readable(); }
154 		size_t		BytesWritable() const { return fBuffer.Writable(); }
155 
156 		void		AddReadRequest(ReadRequest &request);
157 		void		RemoveReadRequest(ReadRequest &request);
158 		status_t	WaitForReadRequest(ReadRequest &request);
159 
160 		void		NotifyBytesRead(size_t bytes);
161 		void		NotifyReadDone();
162 		void		NotifyBytesWritten(size_t bytes);
163 		void		NotifyEndClosed(bool writer);
164 
165 		void		Open(int openMode);
166 		void		Close(int openMode);
167 		int32		ReaderCount() const { return fReaderCount; }
168 		int32		WriterCount() const { return fWriterCount; }
169 
170 		status_t	Select(uint8 event, selectsync *sync, int openMode);
171 		status_t	Deselect(uint8 event, selectsync *sync, int openMode);
172 
173 	private:
174 		time_t		fCreationTime;
175 		time_t		fModificationTime;
176 
177 		RingBuffer	fBuffer;
178 
179 		ReadRequestList		fReadRequests;
180 		WriteRequestList	fWriteRequests;
181 
182 		mutex		fRequestLock;
183 
184 		ConditionVariable fWriteCondition;
185 
186 		int32		fReaderCount;
187 		int32		fWriterCount;
188 		bool		fActive;
189 
190 		select_sync_pool	*fReadSelectSyncPool;
191 		select_sync_pool	*fWriteSelectSyncPool;
192 };
193 
194 
195 class FIFOInode : public Inode {
196 public:
197 	FIFOInode(fs_vnode* vnode)
198 		:
199 		Inode(),
200 		fSuperVnode(*vnode)
201 	{
202 	}
203 
204 	fs_vnode*	SuperVnode() { return &fSuperVnode; }
205 
206 private:
207 	fs_vnode	fSuperVnode;
208 };
209 
210 
211 struct file_cookie {
212 	int				open_mode;
213 };
214 
215 
216 //---------------------
217 
218 
219 RingBuffer::RingBuffer()
220 	: fBuffer(NULL)
221 {
222 }
223 
224 
225 RingBuffer::~RingBuffer()
226 {
227 	DeleteBuffer();
228 }
229 
230 
231 status_t
232 RingBuffer::CreateBuffer()
233 {
234 	if (fBuffer != NULL)
235 		return B_OK;
236 
237 	fBuffer = create_ring_buffer(PIPEFS_MAX_BUFFER_SIZE);
238 	return (fBuffer != NULL ? B_OK : B_NO_MEMORY);
239 }
240 
241 
242 void
243 RingBuffer::DeleteBuffer()
244 {
245 	if (fBuffer != NULL) {
246 		delete_ring_buffer(fBuffer);
247 		fBuffer = NULL;
248 	}
249 }
250 
251 
252 inline size_t
253 RingBuffer::Write(const void *buffer, size_t length)
254 {
255 	if (fBuffer == NULL)
256 		return B_NO_MEMORY;
257 
258 	return ring_buffer_write(fBuffer, (const uint8 *)buffer, length);
259 }
260 
261 
262 inline size_t
263 RingBuffer::Read(void *buffer, size_t length)
264 {
265 	if (fBuffer == NULL)
266 		return B_NO_MEMORY;
267 
268 	return ring_buffer_read(fBuffer, (uint8 *)buffer, length);
269 }
270 
271 
272 inline ssize_t
273 RingBuffer::UserWrite(const void *buffer, ssize_t length)
274 {
275 	if (fBuffer == NULL)
276 		return B_NO_MEMORY;
277 
278 	return ring_buffer_user_write(fBuffer, (const uint8 *)buffer, length);
279 }
280 
281 
282 inline ssize_t
283 RingBuffer::UserRead(void *buffer, ssize_t length)
284 {
285 	if (fBuffer == NULL)
286 		return B_NO_MEMORY;
287 
288 	return ring_buffer_user_read(fBuffer, (uint8 *)buffer, length);
289 }
290 
291 
292 inline size_t
293 RingBuffer::Readable() const
294 {
295 	return (fBuffer != NULL ? ring_buffer_readable(fBuffer) : 0);
296 }
297 
298 
299 inline size_t
300 RingBuffer::Writable() const
301 {
302 	return (fBuffer != NULL ? ring_buffer_writable(fBuffer) : 0);
303 }
304 
305 
306 //	#pragma mark -
307 
308 
309 Inode::Inode()
310 	:
311 	fReadRequests(),
312 	fWriteRequests(),
313 	fReaderCount(0),
314 	fWriterCount(0),
315 	fActive(false),
316 	fReadSelectSyncPool(NULL),
317 	fWriteSelectSyncPool(NULL)
318 {
319 	fWriteCondition.Publish(this, "pipe");
320 	mutex_init(&fRequestLock, "pipe request");
321 
322 	fCreationTime = fModificationTime = time(NULL);
323 }
324 
325 
326 Inode::~Inode()
327 {
328 	fWriteCondition.Unpublish();
329 	mutex_destroy(&fRequestLock);
330 }
331 
332 
333 status_t
334 Inode::InitCheck()
335 {
336 	return B_OK;
337 }
338 
339 
340 /*!	Writes the specified data bytes to the inode's ring buffer. The
341 	request lock must be held when calling this method.
342 	Notifies readers if necessary, so that blocking readers will get started.
343 	Returns B_OK for success, B_BAD_ADDRESS if copying from the buffer failed,
344 	and various semaphore errors (like B_WOULD_BLOCK in non-blocking mode). If
345 	the returned length is > 0, the returned error code can be ignored.
346 */
347 status_t
348 Inode::WriteDataToBuffer(const void *_data, size_t *_length, bool nonBlocking)
349 {
350 	const uint8* data = (const uint8*)_data;
351 	size_t dataSize = *_length;
352 	size_t& written = *_length;
353 	written = 0;
354 
355 	TRACE(("Inode::WriteDataToBuffer(data = %p, bytes = %lu)\n",
356 		data, dataSize));
357 
358 	// According to the standard, request up to PIPE_BUF bytes shall not be
359 	// interleaved with other writer's data.
360 	size_t minToWrite = 1;
361 	if (dataSize <= PIPE_BUF)
362 		minToWrite = dataSize;
363 
364 	while (dataSize > 0) {
365 		// Wait until enough space in the buffer is available.
366 		while (!fActive
367 				|| fBuffer.Writable() < minToWrite && fReaderCount > 0) {
368 			if (nonBlocking)
369 				return B_WOULD_BLOCK;
370 
371 			ConditionVariableEntry entry;
372 			entry.Add(this);
373 
374 			WriteRequest request(minToWrite);
375 			fWriteRequests.Add(&request);
376 
377 			mutex_unlock(&fRequestLock);
378 			status_t status = entry.Wait(B_CAN_INTERRUPT);
379 			mutex_lock(&fRequestLock);
380 
381 			fWriteRequests.Remove(&request);
382 
383 			if (status != B_OK)
384 				return status;
385 		}
386 
387 		// write only as long as there are readers left
388 		if (fReaderCount == 0 && fActive) {
389 			if (written == 0)
390 				send_signal(find_thread(NULL), SIGPIPE);
391 			return EPIPE;
392 		}
393 
394 		// write as much as we can
395 
396 		size_t toWrite = (fActive ? fBuffer.Writable() : 0);
397 		if (toWrite > dataSize)
398 			toWrite = dataSize;
399 
400 		if (toWrite > 0 && fBuffer.UserWrite(data, toWrite) < B_OK)
401 			return B_BAD_ADDRESS;
402 
403 		data += toWrite;
404 		dataSize -= toWrite;
405 		written += toWrite;
406 
407 		NotifyBytesWritten(toWrite);
408 	}
409 
410 	return B_OK;
411 }
412 
413 
414 status_t
415 Inode::ReadDataFromBuffer(void *data, size_t *_length, bool nonBlocking,
416 	ReadRequest &request)
417 {
418 	size_t dataSize = *_length;
419 	*_length = 0;
420 
421 	// wait until our request is first in queue
422 	status_t error;
423 	if (fReadRequests.Head() != &request) {
424 		if (nonBlocking)
425 			return B_WOULD_BLOCK;
426 
427 		error = WaitForReadRequest(request);
428 		if (error != B_OK)
429 			return error;
430 	}
431 
432 	// wait until data are available
433 	while (fBuffer.Readable() == 0) {
434 		if (nonBlocking)
435 			return B_WOULD_BLOCK;
436 
437 		if (fActive && fWriterCount == 0)
438 			return B_OK;
439 
440 		error = WaitForReadRequest(request);
441 		if (error != B_OK)
442 			return error;
443 	}
444 
445 	// read as much as we can
446 	size_t toRead = fBuffer.Readable();
447 	if (toRead > dataSize)
448 		toRead = dataSize;
449 
450 	if (fBuffer.UserRead(data, toRead) < B_OK)
451 		return B_BAD_ADDRESS;
452 
453 	NotifyBytesRead(toRead);
454 
455 	*_length = toRead;
456 
457 	return B_OK;
458 }
459 
460 
461 void
462 Inode::AddReadRequest(ReadRequest &request)
463 {
464 	fReadRequests.Add(&request);
465 }
466 
467 
468 void
469 Inode::RemoveReadRequest(ReadRequest &request)
470 {
471 	fReadRequests.Remove(&request);
472 }
473 
474 
475 status_t
476 Inode::WaitForReadRequest(ReadRequest &request)
477 {
478 	// add the entry to wait on
479 	thread_prepare_to_block(thread_get_current_thread(), B_CAN_INTERRUPT,
480 		THREAD_BLOCK_TYPE_OTHER, "fifo read request");
481 
482 	request.SetNotified(false);
483 
484 	// wait
485 	mutex_unlock(&fRequestLock);
486 	status_t status = thread_block();
487 
488 	// Before going to lock again, we need to make sure no one tries to
489 	// unblock us. Otherwise that would screw with mutex_lock().
490 	request.SetNotified(true);
491 
492 	mutex_lock(&fRequestLock);
493 
494 	return status;
495 }
496 
497 
498 void
499 Inode::NotifyBytesRead(size_t bytes)
500 {
501 	// notify writer, if something can be written now
502 	size_t writable = fBuffer.Writable();
503 	if (bytes > 0) {
504 		// notify select()ors only, if nothing was writable before
505 		if (writable == bytes) {
506 			if (fWriteSelectSyncPool)
507 				notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
508 		}
509 
510 		// If any of the waiting writers has a minimal write count that has
511 		// now become satisfied, we notify all of them (condition variables
512 		// don't support doing that selectively).
513 		WriteRequest *request;
514 		WriteRequestList::Iterator iterator = fWriteRequests.GetIterator();
515 		while ((request = iterator.Next()) != NULL) {
516 			size_t minWriteCount = request->MinimalWriteCount();
517 			if (minWriteCount > 0 && minWriteCount <= writable
518 					&& minWriteCount > writable - bytes) {
519 				fWriteCondition.NotifyAll();
520 				break;
521 			}
522 		}
523 	}
524 }
525 
526 
527 void
528 Inode::NotifyReadDone()
529 {
530 	// notify next reader, if there's still something to be read
531 	if (fBuffer.Readable() > 0) {
532 		if (ReadRequest* request = fReadRequests.First())
533 			request->Notify();
534 	}
535 }
536 
537 
538 void
539 Inode::NotifyBytesWritten(size_t bytes)
540 {
541 	// notify reader, if something can be read now
542 	if (bytes > 0 && fBuffer.Readable() == bytes) {
543 		if (fReadSelectSyncPool)
544 			notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
545 
546 		if (ReadRequest* request = fReadRequests.First())
547 			request->Notify();
548 	}
549 }
550 
551 
552 void
553 Inode::NotifyEndClosed(bool writer)
554 {
555 	if (writer) {
556 		// Our last writer has been closed; if the pipe
557 		// contains no data, unlock all waiting readers
558 		if (fBuffer.Readable() == 0) {
559 			ReadRequest *request;
560 			ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
561 			while ((request = iterator.Next()) != NULL)
562 				request->Notify();
563 
564 			if (fReadSelectSyncPool)
565 				notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
566 		}
567 	} else {
568 		// Last reader is gone. Wake up all writers.
569 		fWriteCondition.NotifyAll();
570 
571 		if (fWriteSelectSyncPool) {
572 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
573 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_ERROR);
574 		}
575 	}
576 }
577 
578 
579 void
580 Inode::Open(int openMode)
581 {
582 	MutexLocker locker(RequestLock());
583 
584 	if ((openMode & O_ACCMODE) == O_WRONLY)
585 		fWriterCount++;
586 
587 	if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR)
588 		fReaderCount++;
589 
590 	if (fReaderCount > 0 && fWriterCount > 0) {
591 		fBuffer.CreateBuffer();
592 		fActive = true;
593 
594 		// notify all waiting writers that they can start
595 		if (fWriteSelectSyncPool)
596 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
597 		fWriteCondition.NotifyAll();
598 	}
599 }
600 
601 
602 void
603 Inode::Close(int openMode)
604 {
605 	TRACE(("Inode::Close(openMode = %d)\n", openMode));
606 
607 	MutexLocker locker(RequestLock());
608 
609 	if ((openMode & O_ACCMODE) == O_WRONLY && --fWriterCount == 0)
610 		NotifyEndClosed(true);
611 
612 	if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR) {
613 		if (--fReaderCount == 0)
614 			NotifyEndClosed(false);
615 	}
616 
617 	if (fReaderCount == 0 && fWriterCount == 0) {
618 		fActive = false;
619 		fBuffer.DeleteBuffer();
620 	}
621 }
622 
623 
624 status_t
625 Inode::Select(uint8 event, selectsync *sync, int openMode)
626 {
627 	bool writer = true;
628 	select_sync_pool** pool;
629 	if ((openMode & O_RWMASK) == O_RDONLY) {
630 		pool = &fReadSelectSyncPool;
631 		writer = false;
632 	} else if ((openMode & O_RWMASK) == O_WRONLY) {
633 		pool = &fWriteSelectSyncPool;
634 	} else
635 		return B_NOT_ALLOWED;
636 
637 	if (add_select_sync_pool_entry(pool, sync, event) != B_OK)
638 		return B_ERROR;
639 
640 	// signal right away, if the condition holds already
641 	if (writer) {
642 		if (event == B_SELECT_WRITE
643 				&& (fBuffer.Writable() > 0 || fReaderCount == 0)
644 			|| event == B_SELECT_ERROR && fReaderCount == 0) {
645 			return notify_select_event(sync, event);
646 		}
647 	} else {
648 		if (event == B_SELECT_READ
649 				&& (fBuffer.Readable() > 0 || fWriterCount == 0)) {
650 			return notify_select_event(sync, event);
651 		}
652 	}
653 
654 	return B_OK;
655 }
656 
657 
658 status_t
659 Inode::Deselect(uint8 event, selectsync *sync, int openMode)
660 {
661 	select_sync_pool** pool;
662 	if ((openMode & O_RWMASK) == O_RDONLY) {
663 		pool = &fReadSelectSyncPool;
664 	} else if ((openMode & O_RWMASK) == O_WRONLY) {
665 		pool = &fWriteSelectSyncPool;
666 	} else
667 		return B_NOT_ALLOWED;
668 
669 	remove_select_sync_pool_entry(pool, sync, event);
670 	return B_OK;
671 }
672 
673 
674 //	#pragma mark -
675 
676 
677 static status_t
678 fifo_put_vnode(fs_volume *volume, fs_vnode *vnode, bool reenter)
679 {
680 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
681 	fs_vnode* superVnode = fifo->SuperVnode();
682 
683 	status_t error = B_OK;
684 	if (superVnode->ops->put_vnode != NULL)
685 		error = superVnode->ops->put_vnode(volume, superVnode, reenter);
686 
687 	delete fifo;
688 
689 	return error;
690 }
691 
692 
693 static status_t
694 fifo_remove_vnode(fs_volume *volume, fs_vnode *vnode, bool reenter)
695 {
696 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
697 	fs_vnode* superVnode = fifo->SuperVnode();
698 
699 	status_t error = B_OK;
700 	if (superVnode->ops->remove_vnode != NULL)
701 		error = superVnode->ops->remove_vnode(volume, superVnode, reenter);
702 
703 	delete fifo;
704 
705 	return error;
706 }
707 
708 
709 static status_t
710 fifo_open(fs_volume *_volume, fs_vnode *_node, int openMode,
711 	void **_cookie)
712 {
713 	Inode *inode = (Inode *)_node->private_node;
714 
715 	TRACE(("fifo_open(): node = %p, openMode = %d\n", inode, openMode));
716 
717 	file_cookie *cookie = (file_cookie *)malloc(sizeof(file_cookie));
718 	if (cookie == NULL)
719 		return B_NO_MEMORY;
720 
721 	TRACE(("  open cookie = %p\n", cookie));
722 	cookie->open_mode = openMode;
723 	inode->Open(openMode);
724 
725 	*_cookie = (void *)cookie;
726 
727 	return B_OK;
728 }
729 
730 
731 static status_t
732 fifo_close(fs_volume *volume, fs_vnode *vnode, void *_cookie)
733 {
734 	file_cookie *cookie = (file_cookie *)_cookie;
735 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
736 
737 	fifo->Close(cookie->open_mode);
738 
739 	return B_OK;
740 }
741 
742 
743 static status_t
744 fifo_free_cookie(fs_volume *_volume, fs_vnode *_node, void *_cookie)
745 {
746 	file_cookie *cookie = (file_cookie *)_cookie;
747 
748 	TRACE(("fifo_freecookie: entry vnode %p, cookie %p\n", _node, _cookie));
749 
750 	free(cookie);
751 
752 	return B_OK;
753 }
754 
755 
756 static status_t
757 fifo_fsync(fs_volume *_volume, fs_vnode *_v)
758 {
759 	return B_OK;
760 }
761 
762 
763 static status_t
764 fifo_read(fs_volume *_volume, fs_vnode *_node, void *_cookie,
765 	off_t /*pos*/, void *buffer, size_t *_length)
766 {
767 	file_cookie *cookie = (file_cookie *)_cookie;
768 	Inode *inode = (Inode *)_node->private_node;
769 
770 	TRACE(("fifo_read(vnode = %p, cookie = %p, length = %lu, mode = %d)\n",
771 		inode, cookie, *_length, cookie->open_mode));
772 
773 	if ((cookie->open_mode & O_RWMASK) != O_RDONLY)
774 		return B_NOT_ALLOWED;
775 
776 	MutexLocker locker(inode->RequestLock());
777 
778 	if (inode->IsActive() && inode->WriterCount() == 0) {
779 		// as long there is no writer, and the pipe is empty,
780 		// we always just return 0 to indicate end of file
781 		if (inode->BytesAvailable() == 0) {
782 			*_length = 0;
783 			return B_OK;
784 		}
785 	}
786 
787 	// issue read request
788 
789 	ReadRequest request;
790 	inode->AddReadRequest(request);
791 
792 	size_t length = *_length;
793 	status_t status = inode->ReadDataFromBuffer(buffer, &length,
794 		(cookie->open_mode & O_NONBLOCK) != 0, request);
795 
796 	inode->RemoveReadRequest(request);
797 	inode->NotifyReadDone();
798 
799 	if (length > 0)
800 		status = B_OK;
801 
802 	*_length = length;
803 	return status;
804 }
805 
806 
807 static status_t
808 fifo_write(fs_volume *_volume, fs_vnode *_node, void *_cookie,
809 	off_t /*pos*/, const void *buffer, size_t *_length)
810 {
811 	file_cookie *cookie = (file_cookie *)_cookie;
812 	Inode *inode = (Inode *)_node->private_node;
813 
814 	TRACE(("fifo_write(vnode = %p, cookie = %p, length = %lu)\n",
815 		_node, cookie, *_length));
816 
817 	if ((cookie->open_mode & O_RWMASK) != O_WRONLY)
818 		return B_NOT_ALLOWED;
819 
820 	MutexLocker locker(inode->RequestLock());
821 
822 	size_t length = *_length;
823 	if (length == 0)
824 		return B_OK;
825 
826 	// copy data into ring buffer
827 	status_t status = inode->WriteDataToBuffer(buffer, &length,
828 		(cookie->open_mode & O_NONBLOCK) != 0);
829 
830 	if (length > 0)
831 		status = B_OK;
832 
833 	*_length = length;
834 	return status;
835 }
836 
837 
838 static status_t
839 fifo_read_stat(fs_volume *volume, fs_vnode *vnode, struct ::stat *st)
840 {
841 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
842 	fs_vnode* superVnode = fifo->SuperVnode();
843 
844 	if (superVnode->ops->read_stat == NULL)
845 		return B_BAD_VALUE;
846 
847 	status_t error = superVnode->ops->read_stat(volume, superVnode, st);
848 	if (error != B_OK)
849 		return error;
850 
851 
852 	MutexLocker locker(fifo->RequestLock());
853 
854 	st->st_size = fifo->BytesAvailable();
855 
856 	st->st_blksize = 4096;
857 
858 // TODO: Just pass the changes to our modification time on to the super node.
859 	st->st_atime = time(NULL);
860 	st->st_mtime = st->st_ctime = fifo->ModificationTime();
861 //	st->st_crtime = inode->CreationTime();
862 
863 	return B_OK;
864 }
865 
866 
867 static status_t
868 fifo_write_stat(fs_volume *volume, fs_vnode *vnode, const struct ::stat *st,
869 	uint32 statMask)
870 {
871 	// we cannot change the size of anything
872 	if (statMask & B_STAT_SIZE)
873 		return B_BAD_VALUE;
874 
875 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
876 	fs_vnode* superVnode = fifo->SuperVnode();
877 
878 	if (superVnode->ops->write_stat == NULL)
879 		return B_BAD_VALUE;
880 
881 	status_t error = superVnode->ops->write_stat(volume, superVnode, st,
882 		statMask);
883 	if (error != B_OK)
884 		return error;
885 
886 	return B_OK;
887 }
888 
889 
890 static status_t
891 fifo_ioctl(fs_volume *_volume, fs_vnode *_vnode, void *_cookie, ulong op,
892 	void *buffer, size_t length)
893 {
894 	TRACE(("fifo_ioctl: vnode %p, cookie %p, op %ld, buf %p, len %ld\n",
895 		_vnode, _cookie, op, buffer, length));
896 
897 	return EINVAL;
898 }
899 
900 
901 static status_t
902 fifo_set_flags(fs_volume *_volume, fs_vnode *_vnode, void *_cookie,
903 	int flags)
904 {
905 	file_cookie *cookie = (file_cookie *)_cookie;
906 
907 	TRACE(("fifo_set_flags(vnode = %p, flags = %x)\n", _vnode, flags));
908 	cookie->open_mode = (cookie->open_mode & ~(O_APPEND | O_NONBLOCK)) | flags;
909 	return B_OK;
910 }
911 
912 
913 static status_t
914 fifo_select(fs_volume *_volume, fs_vnode *_node, void *_cookie,
915 	uint8 event, selectsync *sync)
916 {
917 	file_cookie *cookie = (file_cookie *)_cookie;
918 
919 	TRACE(("fifo_select(vnode = %p)\n", _node));
920 	Inode *inode = (Inode *)_node->private_node;
921 	if (!inode)
922 		return B_ERROR;
923 
924 	MutexLocker locker(inode->RequestLock());
925 	return inode->Select(event, sync, cookie->open_mode);
926 }
927 
928 
929 static status_t
930 fifo_deselect(fs_volume *_volume, fs_vnode *_node, void *_cookie,
931 	uint8 event, selectsync *sync)
932 {
933 	file_cookie *cookie = (file_cookie *)_cookie;
934 
935 	TRACE(("fifo_deselect(vnode = %p)\n", _node));
936 	Inode *inode = (Inode *)_node->private_node;
937 	if (!inode)
938 		return B_ERROR;
939 
940 	MutexLocker locker(inode->RequestLock());
941 	return inode->Deselect(event, sync, cookie->open_mode);
942 }
943 
944 
945 static bool
946 fifo_can_page(fs_volume *_volume, fs_vnode *_v, void *cookie)
947 {
948 	return false;
949 }
950 
951 
952 static status_t
953 fifo_read_pages(fs_volume *_volume, fs_vnode *_v, void *cookie, off_t pos,
954 	const iovec *vecs, size_t count, size_t *_numBytes)
955 {
956 	return B_NOT_ALLOWED;
957 }
958 
959 
960 static status_t
961 fifo_write_pages(fs_volume *_volume, fs_vnode *_v, void *cookie,
962 	off_t pos, const iovec *vecs, size_t count, size_t *_numBytes)
963 {
964 	return B_NOT_ALLOWED;
965 }
966 
967 
968 static status_t
969 fifo_get_super_vnode(fs_volume *volume, fs_vnode *vnode, fs_volume *superVolume,
970 	fs_vnode *_superVnode)
971 {
972 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
973 	fs_vnode* superVnode = fifo->SuperVnode();
974 
975 	if (superVnode->ops->get_super_vnode != NULL) {
976 		return superVnode->ops->get_super_vnode(volume, superVnode, superVolume,
977 			_superVnode);
978 	}
979 
980 	*_superVnode = *superVnode;
981 
982 	return B_OK;
983 }
984 
985 
986 static fs_vnode_ops sFIFOVnodeOps = {
987 	NULL,	// lookup
988 	NULL,	// get_vnode_name
989 					// TODO: This is suboptimal! We'd need to forward the
990 					// super node's hook, if it has got one.
991 
992 	&fifo_put_vnode,
993 	&fifo_remove_vnode,
994 
995 	&fifo_can_page,
996 	&fifo_read_pages,
997 	&fifo_write_pages,
998 
999 	NULL,	// io()
1000 	NULL,	// cancel_io()
1001 
1002 	NULL,	// get_file_map
1003 
1004 	/* common */
1005 	&fifo_ioctl,
1006 	&fifo_set_flags,
1007 	&fifo_select,
1008 	&fifo_deselect,
1009 	&fifo_fsync,
1010 
1011 	NULL,	// fs_read_link
1012 	NULL,	// fs_symlink
1013 	NULL,	// fs_link
1014 	NULL,	// unlink
1015 	NULL,	// rename
1016 
1017 	NULL,	// fs_access()
1018 	&fifo_read_stat,
1019 	&fifo_write_stat,
1020 
1021 	/* file */
1022 	NULL,	// create()
1023 	&fifo_open,
1024 	&fifo_close,
1025 	&fifo_free_cookie,
1026 	&fifo_read,
1027 	&fifo_write,
1028 
1029 	/* directory */
1030 	NULL,	// create_dir
1031 	NULL,	// remove_dir
1032 	NULL,	// open_dir
1033 	NULL,	// close_dir
1034 	NULL,	// free_dir_cookie
1035 	NULL,	// read_dir
1036 	NULL,	// rewind_dir
1037 
1038 	/* attribute directory operations */
1039 	NULL,	// open_attr_dir
1040 	NULL,	// close_attr_dir
1041 	NULL,	// free_attr_dir_cookie
1042 	NULL,	// read_attr_dir
1043 	NULL,	// rewind_attr_dir
1044 
1045 	/* attribute operations */
1046 	NULL,	// create_attr
1047 	NULL,	// open_attr
1048 	NULL,	// close_attr
1049 	NULL,	// free_attr_cookie
1050 	NULL,	// read_attr
1051 	NULL,	// write_attr
1052 
1053 	NULL,	// read_attr_stat
1054 	NULL,	// write_attr_stat
1055 	NULL,	// rename_attr
1056 	NULL,	// remove_attr
1057 
1058 	/* support for node and FS layers */
1059 	NULL,	// create_special_node
1060 	&fifo_get_super_vnode,
1061 };
1062 
1063 
1064 }	// namespace fifo
1065 
1066 using namespace fifo;
1067 
1068 
1069 // #pragma mark -
1070 
1071 
1072 status_t
1073 create_fifo_vnode(fs_volume* superVolume, fs_vnode* vnode)
1074 {
1075 	FIFOInode *fifo = new(std::nothrow) FIFOInode(vnode);
1076 	if (fifo == NULL)
1077 		return B_NO_MEMORY;
1078 
1079 	status_t status = fifo->InitCheck();
1080 	if (status != B_OK) {
1081 		delete fifo;
1082 		return status;
1083 	}
1084 
1085 	vnode->private_node = fifo;
1086 	vnode->ops = &sFIFOVnodeOps;
1087 
1088 	return B_OK;
1089 }
1090