xref: /haiku/src/system/kernel/fs/fifo.cpp (revision 893988af824e65e49e55f517b157db8386e8002b)
1 /*
2  * Copyright 2007-2008, Ingo Weinhold, ingo_weinhold@gmx.de.
3  * Copyright 2003-2009, Axel Dörfler, axeld@pinc-software.de.
4  * Distributed under the terms of the MIT License.
5  */
6 
7 #include <limits.h>
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <string.h>
11 #include <sys/stat.h>
12 
13 #include <new>
14 
15 #include <KernelExport.h>
16 #include <NodeMonitor.h>
17 #include <Select.h>
18 
19 #include <condition_variable.h>
20 #include <debug.h>
21 #include <khash.h>
22 #include <lock.h>
23 #include <select_sync_pool.h>
24 #include <team.h>
25 #include <thread.h>
26 #include <util/DoublyLinkedList.h>
27 #include <util/AutoLock.h>
28 #include <util/ring_buffer.h>
29 #include <vfs.h>
30 #include <vm.h>
31 
32 #include "fifo.h"
33 
34 
35 //#define TRACE_FIFO
36 #ifdef TRACE_FIFO
37 #	define TRACE(x) dprintf x
38 #else
39 #	define TRACE(x)
40 #endif
41 
42 
43 #define PIPEFS_HASH_SIZE		16
44 #define PIPEFS_MAX_BUFFER_SIZE	32768
45 
46 
47 // TODO: PIPE_BUF is supposed to be defined somewhere else.
48 #define PIPE_BUF	_POSIX_PIPE_BUF
49 
50 
51 namespace fifo {
52 
53 class Inode;
54 
55 class RingBuffer {
56 	public:
57 		RingBuffer();
58 		~RingBuffer();
59 
60 		status_t CreateBuffer();
61 		void DeleteBuffer();
62 
63 		size_t Write(const void *buffer, size_t length);
64 		size_t Read(void *buffer, size_t length);
65 		ssize_t UserWrite(const void *buffer, ssize_t length);
66 		ssize_t UserRead(void *buffer, ssize_t length);
67 
68 		size_t Readable() const;
69 		size_t Writable() const;
70 
71 	private:
72 		struct ring_buffer	*fBuffer;
73 };
74 
75 
76 class ReadRequest : public DoublyLinkedListLinkImpl<ReadRequest> {
77 	public:
78 		ReadRequest()
79 			:
80 			fThread(thread_get_current_thread()),
81 			fNotified(true)
82 		{
83 			B_INITIALIZE_SPINLOCK(&fLock);
84 		}
85 
86 		void SetNotified(bool notified)
87 		{
88 			InterruptsSpinLocker _(fLock);
89 			fNotified = notified;
90 		}
91 
92 		void Notify()
93 		{
94 			InterruptsSpinLocker _(fLock);
95 
96 			if (!fNotified) {
97 				SpinLocker threadLocker(gThreadSpinlock);
98 				thread_unblock_locked(fThread, B_OK);
99 				fNotified = true;
100 			}
101 		}
102 
103 	private:
104 		spinlock			fLock;
105 		struct thread*		fThread;
106 		volatile bool		fNotified;
107 };
108 
109 
110 class WriteRequest : public DoublyLinkedListLinkImpl<WriteRequest> {
111 	public:
112 		WriteRequest(size_t minimalWriteCount)
113 			:
114 			fMinimalWriteCount(minimalWriteCount)
115 		{
116 		}
117 
118 		size_t MinimalWriteCount() const
119 		{
120 			return fMinimalWriteCount;
121 		}
122 
123 	private:
124 		size_t	fMinimalWriteCount;
125 };
126 
127 
128 typedef DoublyLinkedList<ReadRequest> ReadRequestList;
129 typedef DoublyLinkedList<WriteRequest> WriteRequestList;
130 
131 
132 class Inode {
133 	public:
134 		Inode();
135 		~Inode();
136 
137 		status_t	InitCheck();
138 
139 		bool		IsActive() const { return fActive; }
140 		timespec	CreationTime() const { return fCreationTime; }
141 		void		SetCreationTime(timespec creationTime)
142 						{ fCreationTime = creationTime; }
143 		timespec	ModificationTime() const { return fModificationTime; }
144 		void		SetModificationTime(timespec modificationTime)
145 						{ fModificationTime = modificationTime; }
146 
147 		mutex		*RequestLock() { return &fRequestLock; }
148 
149 		status_t	WriteDataToBuffer(const void *data, size_t *_length,
150 						bool nonBlocking);
151 		status_t	ReadDataFromBuffer(void *data, size_t *_length,
152 						bool nonBlocking, ReadRequest &request);
153 		size_t		BytesAvailable() const { return fBuffer.Readable(); }
154 		size_t		BytesWritable() const { return fBuffer.Writable(); }
155 
156 		void		AddReadRequest(ReadRequest &request);
157 		void		RemoveReadRequest(ReadRequest &request);
158 		status_t	WaitForReadRequest(ReadRequest &request);
159 
160 		void		NotifyBytesRead(size_t bytes);
161 		void		NotifyReadDone();
162 		void		NotifyBytesWritten(size_t bytes);
163 		void		NotifyEndClosed(bool writer);
164 
165 		void		Open(int openMode);
166 		void		Close(int openMode);
167 		int32		ReaderCount() const { return fReaderCount; }
168 		int32		WriterCount() const { return fWriterCount; }
169 
170 		status_t	Select(uint8 event, selectsync *sync, int openMode);
171 		status_t	Deselect(uint8 event, selectsync *sync, int openMode);
172 
173 	private:
174 		timespec	fCreationTime;
175 		timespec	fModificationTime;
176 
177 		RingBuffer	fBuffer;
178 
179 		ReadRequestList		fReadRequests;
180 		WriteRequestList	fWriteRequests;
181 
182 		mutex		fRequestLock;
183 
184 		ConditionVariable fWriteCondition;
185 
186 		int32		fReaderCount;
187 		int32		fWriterCount;
188 		bool		fActive;
189 
190 		select_sync_pool	*fReadSelectSyncPool;
191 		select_sync_pool	*fWriteSelectSyncPool;
192 };
193 
194 
195 class FIFOInode : public Inode {
196 public:
197 	FIFOInode(fs_vnode* vnode)
198 		:
199 		Inode(),
200 		fSuperVnode(*vnode)
201 	{
202 	}
203 
204 	fs_vnode*	SuperVnode() { return &fSuperVnode; }
205 
206 private:
207 	fs_vnode	fSuperVnode;
208 };
209 
210 
211 struct file_cookie {
212 	int				open_mode;
213 };
214 
215 
216 //---------------------
217 
218 
219 RingBuffer::RingBuffer()
220 	: fBuffer(NULL)
221 {
222 }
223 
224 
225 RingBuffer::~RingBuffer()
226 {
227 	DeleteBuffer();
228 }
229 
230 
231 status_t
232 RingBuffer::CreateBuffer()
233 {
234 	if (fBuffer != NULL)
235 		return B_OK;
236 
237 	fBuffer = create_ring_buffer(PIPEFS_MAX_BUFFER_SIZE);
238 	return (fBuffer != NULL ? B_OK : B_NO_MEMORY);
239 }
240 
241 
242 void
243 RingBuffer::DeleteBuffer()
244 {
245 	if (fBuffer != NULL) {
246 		delete_ring_buffer(fBuffer);
247 		fBuffer = NULL;
248 	}
249 }
250 
251 
252 inline size_t
253 RingBuffer::Write(const void *buffer, size_t length)
254 {
255 	if (fBuffer == NULL)
256 		return B_NO_MEMORY;
257 
258 	return ring_buffer_write(fBuffer, (const uint8 *)buffer, length);
259 }
260 
261 
262 inline size_t
263 RingBuffer::Read(void *buffer, size_t length)
264 {
265 	if (fBuffer == NULL)
266 		return B_NO_MEMORY;
267 
268 	return ring_buffer_read(fBuffer, (uint8 *)buffer, length);
269 }
270 
271 
272 inline ssize_t
273 RingBuffer::UserWrite(const void *buffer, ssize_t length)
274 {
275 	if (fBuffer == NULL)
276 		return B_NO_MEMORY;
277 
278 	return ring_buffer_user_write(fBuffer, (const uint8 *)buffer, length);
279 }
280 
281 
282 inline ssize_t
283 RingBuffer::UserRead(void *buffer, ssize_t length)
284 {
285 	if (fBuffer == NULL)
286 		return B_NO_MEMORY;
287 
288 	return ring_buffer_user_read(fBuffer, (uint8 *)buffer, length);
289 }
290 
291 
292 inline size_t
293 RingBuffer::Readable() const
294 {
295 	return (fBuffer != NULL ? ring_buffer_readable(fBuffer) : 0);
296 }
297 
298 
299 inline size_t
300 RingBuffer::Writable() const
301 {
302 	return (fBuffer != NULL ? ring_buffer_writable(fBuffer) : 0);
303 }
304 
305 
306 //	#pragma mark -
307 
308 
309 Inode::Inode()
310 	:
311 	fReadRequests(),
312 	fWriteRequests(),
313 	fReaderCount(0),
314 	fWriterCount(0),
315 	fActive(false),
316 	fReadSelectSyncPool(NULL),
317 	fWriteSelectSyncPool(NULL)
318 {
319 	fWriteCondition.Publish(this, "pipe");
320 	mutex_init(&fRequestLock, "pipe request");
321 
322 	bigtime_t time = real_time_clock();
323 	fModificationTime.tv_sec = time / 1000000;
324 	fModificationTime.tv_nsec = (time % 1000000) * 1000;
325 	fCreationTime = fModificationTime;
326 }
327 
328 
329 Inode::~Inode()
330 {
331 	fWriteCondition.Unpublish();
332 	mutex_destroy(&fRequestLock);
333 }
334 
335 
336 status_t
337 Inode::InitCheck()
338 {
339 	return B_OK;
340 }
341 
342 
343 /*!	Writes the specified data bytes to the inode's ring buffer. The
344 	request lock must be held when calling this method.
345 	Notifies readers if necessary, so that blocking readers will get started.
346 	Returns B_OK for success, B_BAD_ADDRESS if copying from the buffer failed,
347 	and various semaphore errors (like B_WOULD_BLOCK in non-blocking mode). If
348 	the returned length is > 0, the returned error code can be ignored.
349 */
350 status_t
351 Inode::WriteDataToBuffer(const void *_data, size_t *_length, bool nonBlocking)
352 {
353 	const uint8* data = (const uint8*)_data;
354 	size_t dataSize = *_length;
355 	size_t& written = *_length;
356 	written = 0;
357 
358 	TRACE(("Inode::WriteDataToBuffer(data = %p, bytes = %lu)\n",
359 		data, dataSize));
360 
361 	// According to the standard, request up to PIPE_BUF bytes shall not be
362 	// interleaved with other writer's data.
363 	size_t minToWrite = 1;
364 	if (dataSize <= PIPE_BUF)
365 		minToWrite = dataSize;
366 
367 	while (dataSize > 0) {
368 		// Wait until enough space in the buffer is available.
369 		while (!fActive
370 				|| (fBuffer.Writable() < minToWrite && fReaderCount > 0)) {
371 			if (nonBlocking)
372 				return B_WOULD_BLOCK;
373 
374 			ConditionVariableEntry entry;
375 			entry.Add(this);
376 
377 			WriteRequest request(minToWrite);
378 			fWriteRequests.Add(&request);
379 
380 			mutex_unlock(&fRequestLock);
381 			status_t status = entry.Wait(B_CAN_INTERRUPT);
382 			mutex_lock(&fRequestLock);
383 
384 			fWriteRequests.Remove(&request);
385 
386 			if (status != B_OK)
387 				return status;
388 		}
389 
390 		// write only as long as there are readers left
391 		if (fReaderCount == 0 && fActive) {
392 			if (written == 0)
393 				send_signal(find_thread(NULL), SIGPIPE);
394 			return EPIPE;
395 		}
396 
397 		// write as much as we can
398 
399 		size_t toWrite = (fActive ? fBuffer.Writable() : 0);
400 		if (toWrite > dataSize)
401 			toWrite = dataSize;
402 
403 		if (toWrite > 0 && fBuffer.UserWrite(data, toWrite) < B_OK)
404 			return B_BAD_ADDRESS;
405 
406 		data += toWrite;
407 		dataSize -= toWrite;
408 		written += toWrite;
409 
410 		NotifyBytesWritten(toWrite);
411 	}
412 
413 	return B_OK;
414 }
415 
416 
417 status_t
418 Inode::ReadDataFromBuffer(void *data, size_t *_length, bool nonBlocking,
419 	ReadRequest &request)
420 {
421 	size_t dataSize = *_length;
422 	*_length = 0;
423 
424 	// wait until our request is first in queue
425 	status_t error;
426 	if (fReadRequests.Head() != &request) {
427 		if (nonBlocking)
428 			return B_WOULD_BLOCK;
429 
430 		error = WaitForReadRequest(request);
431 		if (error != B_OK)
432 			return error;
433 	}
434 
435 	// wait until data are available
436 	while (fBuffer.Readable() == 0) {
437 		if (nonBlocking)
438 			return B_WOULD_BLOCK;
439 
440 		if (fActive && fWriterCount == 0)
441 			return B_OK;
442 
443 		error = WaitForReadRequest(request);
444 		if (error != B_OK)
445 			return error;
446 	}
447 
448 	// read as much as we can
449 	size_t toRead = fBuffer.Readable();
450 	if (toRead > dataSize)
451 		toRead = dataSize;
452 
453 	if (fBuffer.UserRead(data, toRead) < B_OK)
454 		return B_BAD_ADDRESS;
455 
456 	NotifyBytesRead(toRead);
457 
458 	*_length = toRead;
459 
460 	return B_OK;
461 }
462 
463 
464 void
465 Inode::AddReadRequest(ReadRequest &request)
466 {
467 	fReadRequests.Add(&request);
468 }
469 
470 
471 void
472 Inode::RemoveReadRequest(ReadRequest &request)
473 {
474 	fReadRequests.Remove(&request);
475 }
476 
477 
478 status_t
479 Inode::WaitForReadRequest(ReadRequest &request)
480 {
481 	// add the entry to wait on
482 	thread_prepare_to_block(thread_get_current_thread(), B_CAN_INTERRUPT,
483 		THREAD_BLOCK_TYPE_OTHER, "fifo read request");
484 
485 	request.SetNotified(false);
486 
487 	// wait
488 	mutex_unlock(&fRequestLock);
489 	status_t status = thread_block();
490 
491 	// Before going to lock again, we need to make sure no one tries to
492 	// unblock us. Otherwise that would screw with mutex_lock().
493 	request.SetNotified(true);
494 
495 	mutex_lock(&fRequestLock);
496 
497 	return status;
498 }
499 
500 
501 void
502 Inode::NotifyBytesRead(size_t bytes)
503 {
504 	// notify writer, if something can be written now
505 	size_t writable = fBuffer.Writable();
506 	if (bytes > 0) {
507 		// notify select()ors only, if nothing was writable before
508 		if (writable == bytes) {
509 			if (fWriteSelectSyncPool)
510 				notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
511 		}
512 
513 		// If any of the waiting writers has a minimal write count that has
514 		// now become satisfied, we notify all of them (condition variables
515 		// don't support doing that selectively).
516 		WriteRequest *request;
517 		WriteRequestList::Iterator iterator = fWriteRequests.GetIterator();
518 		while ((request = iterator.Next()) != NULL) {
519 			size_t minWriteCount = request->MinimalWriteCount();
520 			if (minWriteCount > 0 && minWriteCount <= writable
521 					&& minWriteCount > writable - bytes) {
522 				fWriteCondition.NotifyAll();
523 				break;
524 			}
525 		}
526 	}
527 }
528 
529 
530 void
531 Inode::NotifyReadDone()
532 {
533 	// notify next reader, if there's still something to be read
534 	if (fBuffer.Readable() > 0) {
535 		if (ReadRequest* request = fReadRequests.First())
536 			request->Notify();
537 	}
538 }
539 
540 
541 void
542 Inode::NotifyBytesWritten(size_t bytes)
543 {
544 	// notify reader, if something can be read now
545 	if (bytes > 0 && fBuffer.Readable() == bytes) {
546 		if (fReadSelectSyncPool)
547 			notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
548 
549 		if (ReadRequest* request = fReadRequests.First())
550 			request->Notify();
551 	}
552 }
553 
554 
555 void
556 Inode::NotifyEndClosed(bool writer)
557 {
558 	if (writer) {
559 		// Our last writer has been closed; if the pipe
560 		// contains no data, unlock all waiting readers
561 		if (fBuffer.Readable() == 0) {
562 			ReadRequest *request;
563 			ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
564 			while ((request = iterator.Next()) != NULL)
565 				request->Notify();
566 
567 			if (fReadSelectSyncPool)
568 				notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
569 		}
570 	} else {
571 		// Last reader is gone. Wake up all writers.
572 		fWriteCondition.NotifyAll();
573 
574 		if (fWriteSelectSyncPool) {
575 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
576 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_ERROR);
577 		}
578 	}
579 }
580 
581 
582 void
583 Inode::Open(int openMode)
584 {
585 	MutexLocker locker(RequestLock());
586 
587 	if ((openMode & O_ACCMODE) == O_WRONLY)
588 		fWriterCount++;
589 
590 	if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR)
591 		fReaderCount++;
592 
593 	if (fReaderCount > 0 && fWriterCount > 0) {
594 		fBuffer.CreateBuffer();
595 		fActive = true;
596 
597 		// notify all waiting writers that they can start
598 		if (fWriteSelectSyncPool)
599 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
600 		fWriteCondition.NotifyAll();
601 	}
602 }
603 
604 
605 void
606 Inode::Close(int openMode)
607 {
608 	TRACE(("Inode::Close(openMode = %d)\n", openMode));
609 
610 	MutexLocker locker(RequestLock());
611 
612 	if ((openMode & O_ACCMODE) == O_WRONLY && --fWriterCount == 0)
613 		NotifyEndClosed(true);
614 
615 	if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR) {
616 		if (--fReaderCount == 0)
617 			NotifyEndClosed(false);
618 	}
619 
620 	if (fReaderCount == 0 && fWriterCount == 0) {
621 		fActive = false;
622 		fBuffer.DeleteBuffer();
623 	}
624 }
625 
626 
627 status_t
628 Inode::Select(uint8 event, selectsync *sync, int openMode)
629 {
630 	bool writer = true;
631 	select_sync_pool** pool;
632 	if ((openMode & O_RWMASK) == O_RDONLY) {
633 		pool = &fReadSelectSyncPool;
634 		writer = false;
635 	} else if ((openMode & O_RWMASK) == O_WRONLY) {
636 		pool = &fWriteSelectSyncPool;
637 	} else
638 		return B_NOT_ALLOWED;
639 
640 	if (add_select_sync_pool_entry(pool, sync, event) != B_OK)
641 		return B_ERROR;
642 
643 	// signal right away, if the condition holds already
644 	if (writer) {
645 		if ((event == B_SELECT_WRITE
646 				&& (fBuffer.Writable() > 0 || fReaderCount == 0))
647 			|| (event == B_SELECT_ERROR && fReaderCount == 0)) {
648 			return notify_select_event(sync, event);
649 		}
650 	} else {
651 		if (event == B_SELECT_READ
652 				&& (fBuffer.Readable() > 0 || fWriterCount == 0)) {
653 			return notify_select_event(sync, event);
654 		}
655 	}
656 
657 	return B_OK;
658 }
659 
660 
661 status_t
662 Inode::Deselect(uint8 event, selectsync *sync, int openMode)
663 {
664 	select_sync_pool** pool;
665 	if ((openMode & O_RWMASK) == O_RDONLY) {
666 		pool = &fReadSelectSyncPool;
667 	} else if ((openMode & O_RWMASK) == O_WRONLY) {
668 		pool = &fWriteSelectSyncPool;
669 	} else
670 		return B_NOT_ALLOWED;
671 
672 	remove_select_sync_pool_entry(pool, sync, event);
673 	return B_OK;
674 }
675 
676 
677 //	#pragma mark -
678 
679 
680 static status_t
681 fifo_put_vnode(fs_volume *volume, fs_vnode *vnode, bool reenter)
682 {
683 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
684 	fs_vnode* superVnode = fifo->SuperVnode();
685 
686 	status_t error = B_OK;
687 	if (superVnode->ops->put_vnode != NULL)
688 		error = superVnode->ops->put_vnode(volume, superVnode, reenter);
689 
690 	delete fifo;
691 
692 	return error;
693 }
694 
695 
696 static status_t
697 fifo_remove_vnode(fs_volume *volume, fs_vnode *vnode, bool reenter)
698 {
699 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
700 	fs_vnode* superVnode = fifo->SuperVnode();
701 
702 	status_t error = B_OK;
703 	if (superVnode->ops->remove_vnode != NULL)
704 		error = superVnode->ops->remove_vnode(volume, superVnode, reenter);
705 
706 	delete fifo;
707 
708 	return error;
709 }
710 
711 
712 static status_t
713 fifo_open(fs_volume *_volume, fs_vnode *_node, int openMode,
714 	void **_cookie)
715 {
716 	Inode *inode = (Inode *)_node->private_node;
717 
718 	TRACE(("fifo_open(): node = %p, openMode = %d\n", inode, openMode));
719 
720 	file_cookie *cookie = (file_cookie *)malloc(sizeof(file_cookie));
721 	if (cookie == NULL)
722 		return B_NO_MEMORY;
723 
724 	TRACE(("  open cookie = %p\n", cookie));
725 	cookie->open_mode = openMode;
726 	inode->Open(openMode);
727 
728 	*_cookie = (void *)cookie;
729 
730 	return B_OK;
731 }
732 
733 
734 static status_t
735 fifo_close(fs_volume *volume, fs_vnode *vnode, void *_cookie)
736 {
737 	file_cookie *cookie = (file_cookie *)_cookie;
738 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
739 
740 	fifo->Close(cookie->open_mode);
741 
742 	return B_OK;
743 }
744 
745 
746 static status_t
747 fifo_free_cookie(fs_volume *_volume, fs_vnode *_node, void *_cookie)
748 {
749 	file_cookie *cookie = (file_cookie *)_cookie;
750 
751 	TRACE(("fifo_freecookie: entry vnode %p, cookie %p\n", _node, _cookie));
752 
753 	free(cookie);
754 
755 	return B_OK;
756 }
757 
758 
759 static status_t
760 fifo_fsync(fs_volume *_volume, fs_vnode *_v)
761 {
762 	return B_OK;
763 }
764 
765 
766 static status_t
767 fifo_read(fs_volume *_volume, fs_vnode *_node, void *_cookie,
768 	off_t /*pos*/, void *buffer, size_t *_length)
769 {
770 	file_cookie *cookie = (file_cookie *)_cookie;
771 	Inode *inode = (Inode *)_node->private_node;
772 
773 	TRACE(("fifo_read(vnode = %p, cookie = %p, length = %lu, mode = %d)\n",
774 		inode, cookie, *_length, cookie->open_mode));
775 
776 	if ((cookie->open_mode & O_RWMASK) != O_RDONLY)
777 		return B_NOT_ALLOWED;
778 
779 	MutexLocker locker(inode->RequestLock());
780 
781 	if (inode->IsActive() && inode->WriterCount() == 0) {
782 		// as long there is no writer, and the pipe is empty,
783 		// we always just return 0 to indicate end of file
784 		if (inode->BytesAvailable() == 0) {
785 			*_length = 0;
786 			return B_OK;
787 		}
788 	}
789 
790 	// issue read request
791 
792 	ReadRequest request;
793 	inode->AddReadRequest(request);
794 
795 	size_t length = *_length;
796 	status_t status = inode->ReadDataFromBuffer(buffer, &length,
797 		(cookie->open_mode & O_NONBLOCK) != 0, request);
798 
799 	inode->RemoveReadRequest(request);
800 	inode->NotifyReadDone();
801 
802 	if (length > 0)
803 		status = B_OK;
804 
805 	*_length = length;
806 	return status;
807 }
808 
809 
810 static status_t
811 fifo_write(fs_volume *_volume, fs_vnode *_node, void *_cookie,
812 	off_t /*pos*/, const void *buffer, size_t *_length)
813 {
814 	file_cookie *cookie = (file_cookie *)_cookie;
815 	Inode *inode = (Inode *)_node->private_node;
816 
817 	TRACE(("fifo_write(vnode = %p, cookie = %p, length = %lu)\n",
818 		_node, cookie, *_length));
819 
820 	if ((cookie->open_mode & O_RWMASK) != O_WRONLY)
821 		return B_NOT_ALLOWED;
822 
823 	MutexLocker locker(inode->RequestLock());
824 
825 	size_t length = *_length;
826 	if (length == 0)
827 		return B_OK;
828 
829 	// copy data into ring buffer
830 	status_t status = inode->WriteDataToBuffer(buffer, &length,
831 		(cookie->open_mode & O_NONBLOCK) != 0);
832 
833 	if (length > 0)
834 		status = B_OK;
835 
836 	*_length = length;
837 	return status;
838 }
839 
840 
841 static status_t
842 fifo_read_stat(fs_volume *volume, fs_vnode *vnode, struct ::stat *st)
843 {
844 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
845 	fs_vnode* superVnode = fifo->SuperVnode();
846 
847 	if (superVnode->ops->read_stat == NULL)
848 		return B_BAD_VALUE;
849 
850 	status_t error = superVnode->ops->read_stat(volume, superVnode, st);
851 	if (error != B_OK)
852 		return error;
853 
854 
855 	MutexLocker locker(fifo->RequestLock());
856 
857 	st->st_size = fifo->BytesAvailable();
858 
859 	st->st_blksize = 4096;
860 
861 	// TODO: Just pass the changes to our modification time on to the super node.
862 	st->st_atim.tv_sec = time(NULL);
863 	st->st_atim.tv_nsec = 0;
864 	st->st_mtim = st->st_ctim = fifo->ModificationTime();
865 
866 	return B_OK;
867 }
868 
869 
870 static status_t
871 fifo_write_stat(fs_volume *volume, fs_vnode *vnode, const struct ::stat *st,
872 	uint32 statMask)
873 {
874 	// we cannot change the size of anything
875 	if (statMask & B_STAT_SIZE)
876 		return B_BAD_VALUE;
877 
878 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
879 	fs_vnode* superVnode = fifo->SuperVnode();
880 
881 	if (superVnode->ops->write_stat == NULL)
882 		return B_BAD_VALUE;
883 
884 	status_t error = superVnode->ops->write_stat(volume, superVnode, st,
885 		statMask);
886 	if (error != B_OK)
887 		return error;
888 
889 	return B_OK;
890 }
891 
892 
893 static status_t
894 fifo_ioctl(fs_volume *_volume, fs_vnode *_vnode, void *_cookie, ulong op,
895 	void *buffer, size_t length)
896 {
897 	TRACE(("fifo_ioctl: vnode %p, cookie %p, op %ld, buf %p, len %ld\n",
898 		_vnode, _cookie, op, buffer, length));
899 
900 	return EINVAL;
901 }
902 
903 
904 static status_t
905 fifo_set_flags(fs_volume *_volume, fs_vnode *_vnode, void *_cookie,
906 	int flags)
907 {
908 	file_cookie *cookie = (file_cookie *)_cookie;
909 
910 	TRACE(("fifo_set_flags(vnode = %p, flags = %x)\n", _vnode, flags));
911 	cookie->open_mode = (cookie->open_mode & ~(O_APPEND | O_NONBLOCK)) | flags;
912 	return B_OK;
913 }
914 
915 
916 static status_t
917 fifo_select(fs_volume *_volume, fs_vnode *_node, void *_cookie,
918 	uint8 event, selectsync *sync)
919 {
920 	file_cookie *cookie = (file_cookie *)_cookie;
921 
922 	TRACE(("fifo_select(vnode = %p)\n", _node));
923 	Inode *inode = (Inode *)_node->private_node;
924 	if (!inode)
925 		return B_ERROR;
926 
927 	MutexLocker locker(inode->RequestLock());
928 	return inode->Select(event, sync, cookie->open_mode);
929 }
930 
931 
932 static status_t
933 fifo_deselect(fs_volume *_volume, fs_vnode *_node, void *_cookie,
934 	uint8 event, selectsync *sync)
935 {
936 	file_cookie *cookie = (file_cookie *)_cookie;
937 
938 	TRACE(("fifo_deselect(vnode = %p)\n", _node));
939 	Inode *inode = (Inode *)_node->private_node;
940 	if (!inode)
941 		return B_ERROR;
942 
943 	MutexLocker locker(inode->RequestLock());
944 	return inode->Deselect(event, sync, cookie->open_mode);
945 }
946 
947 
948 static bool
949 fifo_can_page(fs_volume *_volume, fs_vnode *_v, void *cookie)
950 {
951 	return false;
952 }
953 
954 
955 static status_t
956 fifo_read_pages(fs_volume *_volume, fs_vnode *_v, void *cookie, off_t pos,
957 	const iovec *vecs, size_t count, size_t *_numBytes)
958 {
959 	return B_NOT_ALLOWED;
960 }
961 
962 
963 static status_t
964 fifo_write_pages(fs_volume *_volume, fs_vnode *_v, void *cookie,
965 	off_t pos, const iovec *vecs, size_t count, size_t *_numBytes)
966 {
967 	return B_NOT_ALLOWED;
968 }
969 
970 
971 static status_t
972 fifo_get_super_vnode(fs_volume *volume, fs_vnode *vnode, fs_volume *superVolume,
973 	fs_vnode *_superVnode)
974 {
975 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
976 	fs_vnode* superVnode = fifo->SuperVnode();
977 
978 	if (superVnode->ops->get_super_vnode != NULL) {
979 		return superVnode->ops->get_super_vnode(volume, superVnode, superVolume,
980 			_superVnode);
981 	}
982 
983 	*_superVnode = *superVnode;
984 
985 	return B_OK;
986 }
987 
988 
989 static fs_vnode_ops sFIFOVnodeOps = {
990 	NULL,	// lookup
991 	NULL,	// get_vnode_name
992 					// TODO: This is suboptimal! We'd need to forward the
993 					// super node's hook, if it has got one.
994 
995 	&fifo_put_vnode,
996 	&fifo_remove_vnode,
997 
998 	&fifo_can_page,
999 	&fifo_read_pages,
1000 	&fifo_write_pages,
1001 
1002 	NULL,	// io()
1003 	NULL,	// cancel_io()
1004 
1005 	NULL,	// get_file_map
1006 
1007 	/* common */
1008 	&fifo_ioctl,
1009 	&fifo_set_flags,
1010 	&fifo_select,
1011 	&fifo_deselect,
1012 	&fifo_fsync,
1013 
1014 	NULL,	// fs_read_link
1015 	NULL,	// fs_symlink
1016 	NULL,	// fs_link
1017 	NULL,	// unlink
1018 	NULL,	// rename
1019 
1020 	NULL,	// fs_access()
1021 	&fifo_read_stat,
1022 	&fifo_write_stat,
1023 
1024 	/* file */
1025 	NULL,	// create()
1026 	&fifo_open,
1027 	&fifo_close,
1028 	&fifo_free_cookie,
1029 	&fifo_read,
1030 	&fifo_write,
1031 
1032 	/* directory */
1033 	NULL,	// create_dir
1034 	NULL,	// remove_dir
1035 	NULL,	// open_dir
1036 	NULL,	// close_dir
1037 	NULL,	// free_dir_cookie
1038 	NULL,	// read_dir
1039 	NULL,	// rewind_dir
1040 
1041 	/* attribute directory operations */
1042 	NULL,	// open_attr_dir
1043 	NULL,	// close_attr_dir
1044 	NULL,	// free_attr_dir_cookie
1045 	NULL,	// read_attr_dir
1046 	NULL,	// rewind_attr_dir
1047 
1048 	/* attribute operations */
1049 	NULL,	// create_attr
1050 	NULL,	// open_attr
1051 	NULL,	// close_attr
1052 	NULL,	// free_attr_cookie
1053 	NULL,	// read_attr
1054 	NULL,	// write_attr
1055 
1056 	NULL,	// read_attr_stat
1057 	NULL,	// write_attr_stat
1058 	NULL,	// rename_attr
1059 	NULL,	// remove_attr
1060 
1061 	/* support for node and FS layers */
1062 	NULL,	// create_special_node
1063 	&fifo_get_super_vnode,
1064 };
1065 
1066 
1067 }	// namespace fifo
1068 
1069 using namespace fifo;
1070 
1071 
1072 // #pragma mark -
1073 
1074 
1075 status_t
1076 create_fifo_vnode(fs_volume* superVolume, fs_vnode* vnode)
1077 {
1078 	FIFOInode *fifo = new(std::nothrow) FIFOInode(vnode);
1079 	if (fifo == NULL)
1080 		return B_NO_MEMORY;
1081 
1082 	status_t status = fifo->InitCheck();
1083 	if (status != B_OK) {
1084 		delete fifo;
1085 		return status;
1086 	}
1087 
1088 	vnode->private_node = fifo;
1089 	vnode->ops = &sFIFOVnodeOps;
1090 
1091 	return B_OK;
1092 }
1093