xref: /haiku/src/system/kernel/fs/fifo.cpp (revision c8dd9f7780c426e592a3ccb231e6bfab51f15eb9)
1 /*
2  * Copyright 2007-2011, Ingo Weinhold, ingo_weinhold@gmx.de.
3  * Copyright 2003-2010, Axel Dörfler, axeld@pinc-software.de.
4  * Distributed under the terms of the MIT License.
5  */
6 
7 
8 #include "fifo.h"
9 
10 #include <limits.h>
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <string.h>
14 #include <sys/stat.h>
15 
16 #include <new>
17 
18 #include <KernelExport.h>
19 #include <NodeMonitor.h>
20 #include <Select.h>
21 
22 #include <condition_variable.h>
23 #include <debug.h>
24 #include <khash.h>
25 #include <lock.h>
26 #include <select_sync_pool.h>
27 #include <team.h>
28 #include <thread.h>
29 #include <util/DoublyLinkedList.h>
30 #include <util/AutoLock.h>
31 #include <util/ring_buffer.h>
32 #include <vfs.h>
33 #include <vm/vm.h>
34 
35 
36 //#define TRACE_FIFO
37 #ifdef TRACE_FIFO
38 #	define TRACE(x...) dprintf(x)
39 #else
40 #	define TRACE(x...)
41 #endif
42 
43 
44 #define PIPEFS_HASH_SIZE		16
45 #define PIPEFS_MAX_BUFFER_SIZE	32768
46 
47 
48 // TODO: PIPE_BUF is supposed to be defined somewhere else.
49 #define PIPE_BUF	_POSIX_PIPE_BUF
50 
51 
52 namespace fifo {
53 
54 
55 struct file_cookie;
56 class Inode;
57 
58 
59 class RingBuffer {
60 public:
61 								RingBuffer();
62 								~RingBuffer();
63 
64 			status_t			CreateBuffer();
65 			void				DeleteBuffer();
66 
67 			size_t				Write(const void* buffer, size_t length);
68 			size_t				Read(void* buffer, size_t length);
69 			ssize_t				UserWrite(const void* buffer, ssize_t length);
70 			ssize_t				UserRead(void* buffer, ssize_t length);
71 
72 			size_t				Readable() const;
73 			size_t				Writable() const;
74 
75 private:
76 			struct ring_buffer*	fBuffer;
77 };
78 
79 
80 class ReadRequest : public DoublyLinkedListLinkImpl<ReadRequest> {
81 public:
82 	ReadRequest(file_cookie* cookie)
83 		:
84 		fThread(thread_get_current_thread()),
85 		fCookie(cookie),
86 		fNotified(true)
87 	{
88 		B_INITIALIZE_SPINLOCK(&fLock);
89 	}
90 
91 	void SetNotified(bool notified)
92 	{
93 		InterruptsSpinLocker _(fLock);
94 		fNotified = notified;
95 	}
96 
97 	void Notify(status_t status = B_OK)
98 	{
99 		InterruptsSpinLocker _(fLock);
100 		TRACE("ReadRequest %p::Notify(), fNotified %d\n", this, fNotified);
101 
102 		if (!fNotified) {
103 			thread_unblock(fThread, status);
104 			fNotified = true;
105 		}
106 	}
107 
108 	file_cookie* Cookie() const
109 	{
110 		return fCookie;
111 	}
112 
113 private:
114 	spinlock		fLock;
115 	Thread*			fThread;
116 	file_cookie*	fCookie;
117 	volatile bool	fNotified;
118 };
119 
120 
121 class WriteRequest : public DoublyLinkedListLinkImpl<WriteRequest> {
122 public:
123 	WriteRequest(size_t minimalWriteCount)
124 		:
125 		fMinimalWriteCount(minimalWriteCount)
126 	{
127 	}
128 
129 	size_t MinimalWriteCount() const
130 	{
131 		return fMinimalWriteCount;
132 	}
133 
134 private:
135 	size_t	fMinimalWriteCount;
136 };
137 
138 
139 typedef DoublyLinkedList<ReadRequest> ReadRequestList;
140 typedef DoublyLinkedList<WriteRequest> WriteRequestList;
141 
142 
143 class Inode {
144 public:
145 								Inode();
146 								~Inode();
147 
148 			status_t			InitCheck();
149 
150 			bool				IsActive() const { return fActive; }
151 			timespec			CreationTime() const { return fCreationTime; }
152 			void				SetCreationTime(timespec creationTime)
153 									{ fCreationTime = creationTime; }
154 			timespec			ModificationTime() const
155 									{ return fModificationTime; }
156 			void				SetModificationTime(timespec modificationTime)
157 									{ fModificationTime = modificationTime; }
158 
159 			mutex*				RequestLock() { return &fRequestLock; }
160 
161 			status_t			WriteDataToBuffer(const void* data,
162 									size_t* _length, bool nonBlocking);
163 			status_t			ReadDataFromBuffer(void* data, size_t* _length,
164 									bool nonBlocking, ReadRequest& request);
165 			size_t				BytesAvailable() const
166 									{ return fBuffer.Readable(); }
167 			size_t				BytesWritable() const
168 									{ return fBuffer.Writable(); }
169 
170 			void				AddReadRequest(ReadRequest& request);
171 			void				RemoveReadRequest(ReadRequest& request);
172 			status_t			WaitForReadRequest(ReadRequest& request);
173 
174 			void				NotifyBytesRead(size_t bytes);
175 			void				NotifyReadDone();
176 			void				NotifyBytesWritten(size_t bytes);
177 			void				NotifyEndClosed(bool writer);
178 
179 			void				Open(int openMode);
180 			void				Close(int openMode, file_cookie* cookie);
181 			int32				ReaderCount() const { return fReaderCount; }
182 			int32				WriterCount() const { return fWriterCount; }
183 
184 			status_t			Select(uint8 event, selectsync* sync,
185 									int openMode);
186 			status_t			Deselect(uint8 event, selectsync* sync,
187 									int openMode);
188 
189 private:
190 			timespec			fCreationTime;
191 			timespec			fModificationTime;
192 
193 			RingBuffer			fBuffer;
194 
195 			ReadRequestList		fReadRequests;
196 			WriteRequestList	fWriteRequests;
197 
198 			mutex				fRequestLock;
199 
200 			ConditionVariable	fWriteCondition;
201 
202 			int32				fReaderCount;
203 			int32				fWriterCount;
204 			bool				fActive;
205 
206 			select_sync_pool*	fReadSelectSyncPool;
207 			select_sync_pool*	fWriteSelectSyncPool;
208 };
209 
210 
211 class FIFOInode : public Inode {
212 public:
213 	FIFOInode(fs_vnode* vnode)
214 		:
215 		Inode(),
216 		fSuperVnode(*vnode)
217 	{
218 	}
219 
220 	fs_vnode*	SuperVnode() { return &fSuperVnode; }
221 
222 private:
223 	fs_vnode	fSuperVnode;
224 };
225 
226 
227 struct file_cookie {
228 	int				open_mode;
229 };
230 
231 
232 // #pragma mark -
233 
234 
235 RingBuffer::RingBuffer()
236 	:
237 	fBuffer(NULL)
238 {
239 }
240 
241 
242 RingBuffer::~RingBuffer()
243 {
244 	DeleteBuffer();
245 }
246 
247 
248 status_t
249 RingBuffer::CreateBuffer()
250 {
251 	if (fBuffer != NULL)
252 		return B_OK;
253 
254 	fBuffer = create_ring_buffer(PIPEFS_MAX_BUFFER_SIZE);
255 	return fBuffer != NULL ? B_OK : B_NO_MEMORY;
256 }
257 
258 
259 void
260 RingBuffer::DeleteBuffer()
261 {
262 	if (fBuffer != NULL) {
263 		delete_ring_buffer(fBuffer);
264 		fBuffer = NULL;
265 	}
266 }
267 
268 
269 inline size_t
270 RingBuffer::Write(const void* buffer, size_t length)
271 {
272 	if (fBuffer == NULL)
273 		return B_NO_MEMORY;
274 
275 	return ring_buffer_write(fBuffer, (const uint8*)buffer, length);
276 }
277 
278 
279 inline size_t
280 RingBuffer::Read(void* buffer, size_t length)
281 {
282 	if (fBuffer == NULL)
283 		return B_NO_MEMORY;
284 
285 	return ring_buffer_read(fBuffer, (uint8*)buffer, length);
286 }
287 
288 
289 inline ssize_t
290 RingBuffer::UserWrite(const void* buffer, ssize_t length)
291 {
292 	if (fBuffer == NULL)
293 		return B_NO_MEMORY;
294 
295 	return ring_buffer_user_write(fBuffer, (const uint8*)buffer, length);
296 }
297 
298 
299 inline ssize_t
300 RingBuffer::UserRead(void* buffer, ssize_t length)
301 {
302 	if (fBuffer == NULL)
303 		return B_NO_MEMORY;
304 
305 	return ring_buffer_user_read(fBuffer, (uint8*)buffer, length);
306 }
307 
308 
309 inline size_t
310 RingBuffer::Readable() const
311 {
312 	return fBuffer != NULL ? ring_buffer_readable(fBuffer) : 0;
313 }
314 
315 
316 inline size_t
317 RingBuffer::Writable() const
318 {
319 	return fBuffer != NULL ? ring_buffer_writable(fBuffer) : 0;
320 }
321 
322 
323 //	#pragma mark -
324 
325 
326 Inode::Inode()
327 	:
328 	fReadRequests(),
329 	fWriteRequests(),
330 	fReaderCount(0),
331 	fWriterCount(0),
332 	fActive(false),
333 	fReadSelectSyncPool(NULL),
334 	fWriteSelectSyncPool(NULL)
335 {
336 	fWriteCondition.Publish(this, "pipe");
337 	mutex_init(&fRequestLock, "pipe request");
338 
339 	bigtime_t time = real_time_clock();
340 	fModificationTime.tv_sec = time / 1000000;
341 	fModificationTime.tv_nsec = (time % 1000000) * 1000;
342 	fCreationTime = fModificationTime;
343 }
344 
345 
346 Inode::~Inode()
347 {
348 	fWriteCondition.Unpublish();
349 	mutex_destroy(&fRequestLock);
350 }
351 
352 
353 status_t
354 Inode::InitCheck()
355 {
356 	return B_OK;
357 }
358 
359 
360 /*!	Writes the specified data bytes to the inode's ring buffer. The
361 	request lock must be held when calling this method.
362 	Notifies readers if necessary, so that blocking readers will get started.
363 	Returns B_OK for success, B_BAD_ADDRESS if copying from the buffer failed,
364 	and various semaphore errors (like B_WOULD_BLOCK in non-blocking mode). If
365 	the returned length is > 0, the returned error code can be ignored.
366 */
367 status_t
368 Inode::WriteDataToBuffer(const void* _data, size_t* _length, bool nonBlocking)
369 {
370 	const uint8* data = (const uint8*)_data;
371 	size_t dataSize = *_length;
372 	size_t& written = *_length;
373 	written = 0;
374 
375 	TRACE("Inode %p::WriteDataToBuffer(data = %p, bytes = %zu)\n", this, data,
376 		dataSize);
377 
378 	// According to the standard, request up to PIPE_BUF bytes shall not be
379 	// interleaved with other writer's data.
380 	size_t minToWrite = 1;
381 	if (dataSize <= PIPE_BUF)
382 		minToWrite = dataSize;
383 
384 	while (dataSize > 0) {
385 		// Wait until enough space in the buffer is available.
386 		while (!fActive
387 				|| (fBuffer.Writable() < minToWrite && fReaderCount > 0)) {
388 			if (nonBlocking)
389 				return B_WOULD_BLOCK;
390 
391 			ConditionVariableEntry entry;
392 			entry.Add(this);
393 
394 			WriteRequest request(minToWrite);
395 			fWriteRequests.Add(&request);
396 
397 			mutex_unlock(&fRequestLock);
398 			status_t status = entry.Wait(B_CAN_INTERRUPT);
399 			mutex_lock(&fRequestLock);
400 
401 			fWriteRequests.Remove(&request);
402 
403 			if (status != B_OK)
404 				return status;
405 		}
406 
407 		// write only as long as there are readers left
408 		if (fActive && fReaderCount == 0) {
409 			if (written == 0)
410 				send_signal(find_thread(NULL), SIGPIPE);
411 			return EPIPE;
412 		}
413 
414 		// write as much as we can
415 
416 		size_t toWrite = (fActive ? fBuffer.Writable() : 0);
417 		if (toWrite > dataSize)
418 			toWrite = dataSize;
419 
420 		if (toWrite > 0 && fBuffer.UserWrite(data, toWrite) < B_OK)
421 			return B_BAD_ADDRESS;
422 
423 		data += toWrite;
424 		dataSize -= toWrite;
425 		written += toWrite;
426 
427 		NotifyBytesWritten(toWrite);
428 	}
429 
430 	return B_OK;
431 }
432 
433 
434 status_t
435 Inode::ReadDataFromBuffer(void* data, size_t* _length, bool nonBlocking,
436 	ReadRequest& request)
437 {
438 	size_t dataSize = *_length;
439 	*_length = 0;
440 
441 	// wait until our request is first in queue
442 	status_t error;
443 	if (fReadRequests.Head() != &request) {
444 		if (nonBlocking)
445 			return B_WOULD_BLOCK;
446 
447 		TRACE("Inode %p::%s(): wait for request %p to become the first "
448 			"request.\n", this, __FUNCTION__, &request);
449 
450 		error = WaitForReadRequest(request);
451 		if (error != B_OK)
452 			return error;
453 	}
454 
455 	// wait until data are available
456 	while (fBuffer.Readable() == 0) {
457 		if (nonBlocking)
458 			return B_WOULD_BLOCK;
459 
460 		if (fActive && fWriterCount == 0)
461 			return B_OK;
462 
463 		TRACE("Inode %p::%s(): wait for data, request %p\n", this, __FUNCTION__,
464 			&request);
465 
466 		error = WaitForReadRequest(request);
467 		if (error != B_OK)
468 			return error;
469 	}
470 
471 	// read as much as we can
472 	size_t toRead = fBuffer.Readable();
473 	if (toRead > dataSize)
474 		toRead = dataSize;
475 
476 	if (fBuffer.UserRead(data, toRead) < B_OK)
477 		return B_BAD_ADDRESS;
478 
479 	NotifyBytesRead(toRead);
480 
481 	*_length = toRead;
482 
483 	return B_OK;
484 }
485 
486 
487 void
488 Inode::AddReadRequest(ReadRequest& request)
489 {
490 	fReadRequests.Add(&request);
491 }
492 
493 
494 void
495 Inode::RemoveReadRequest(ReadRequest& request)
496 {
497 	fReadRequests.Remove(&request);
498 }
499 
500 
501 status_t
502 Inode::WaitForReadRequest(ReadRequest& request)
503 {
504 	// add the entry to wait on
505 	thread_prepare_to_block(thread_get_current_thread(), B_CAN_INTERRUPT,
506 		THREAD_BLOCK_TYPE_OTHER, "fifo read request");
507 
508 	request.SetNotified(false);
509 
510 	// wait
511 	mutex_unlock(&fRequestLock);
512 	status_t status = thread_block();
513 
514 	// Before going to lock again, we need to make sure no one tries to
515 	// unblock us. Otherwise that would screw with mutex_lock().
516 	request.SetNotified(true);
517 
518 	mutex_lock(&fRequestLock);
519 
520 	return status;
521 }
522 
523 
524 void
525 Inode::NotifyBytesRead(size_t bytes)
526 {
527 	// notify writer, if something can be written now
528 	size_t writable = fBuffer.Writable();
529 	if (bytes > 0) {
530 		// notify select()ors only, if nothing was writable before
531 		if (writable == bytes) {
532 			if (fWriteSelectSyncPool)
533 				notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
534 		}
535 
536 		// If any of the waiting writers has a minimal write count that has
537 		// now become satisfied, we notify all of them (condition variables
538 		// don't support doing that selectively).
539 		WriteRequest* request;
540 		WriteRequestList::Iterator iterator = fWriteRequests.GetIterator();
541 		while ((request = iterator.Next()) != NULL) {
542 			size_t minWriteCount = request->MinimalWriteCount();
543 			if (minWriteCount > 0 && minWriteCount <= writable
544 					&& minWriteCount > writable - bytes) {
545 				fWriteCondition.NotifyAll();
546 				break;
547 			}
548 		}
549 	}
550 }
551 
552 
553 void
554 Inode::NotifyReadDone()
555 {
556 	// notify next reader, if there's still something to be read
557 	if (fBuffer.Readable() > 0) {
558 		if (ReadRequest* request = fReadRequests.First())
559 			request->Notify();
560 	}
561 }
562 
563 
564 void
565 Inode::NotifyBytesWritten(size_t bytes)
566 {
567 	// notify reader, if something can be read now
568 	if (bytes > 0 && fBuffer.Readable() == bytes) {
569 		if (fReadSelectSyncPool)
570 			notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
571 
572 		if (ReadRequest* request = fReadRequests.First())
573 			request->Notify();
574 	}
575 }
576 
577 
578 void
579 Inode::NotifyEndClosed(bool writer)
580 {
581 	TRACE("Inode %p::%s(%s)\n", this, __FUNCTION__,
582 		writer ? "writer" : "reader");
583 
584 	if (writer) {
585 		// Our last writer has been closed; if the pipe
586 		// contains no data, unlock all waiting readers
587 		TRACE("  buffer readable: %zu\n", fBuffer.Readable());
588 		if (fBuffer.Readable() == 0) {
589 			ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
590 			while (ReadRequest* request = iterator.Next())
591 				request->Notify();
592 
593 			if (fReadSelectSyncPool)
594 				notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
595 		}
596 	} else {
597 		// Last reader is gone. Wake up all writers.
598 		fWriteCondition.NotifyAll();
599 
600 		if (fWriteSelectSyncPool) {
601 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
602 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_ERROR);
603 		}
604 	}
605 }
606 
607 
608 void
609 Inode::Open(int openMode)
610 {
611 	MutexLocker locker(RequestLock());
612 
613 	if ((openMode & O_ACCMODE) == O_WRONLY)
614 		fWriterCount++;
615 
616 	if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR)
617 		fReaderCount++;
618 
619 	if (fReaderCount > 0 && fWriterCount > 0) {
620 		TRACE("Inode %p::Open(): fifo becomes active\n", this);
621 		fBuffer.CreateBuffer();
622 		fActive = true;
623 
624 		// notify all waiting writers that they can start
625 		if (fWriteSelectSyncPool)
626 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
627 		fWriteCondition.NotifyAll();
628 	}
629 }
630 
631 
632 void
633 Inode::Close(int openMode, file_cookie* cookie)
634 {
635 	TRACE("Inode %p::Close(openMode = %d)\n", this, openMode);
636 
637 	MutexLocker locker(RequestLock());
638 
639 	// Notify all currently reading file descriptors
640 	ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
641 	while (ReadRequest* request = iterator.Next()) {
642 		if (request->Cookie() == cookie)
643 			request->Notify(B_FILE_ERROR);
644 	}
645 
646 	if ((openMode & O_ACCMODE) == O_WRONLY && --fWriterCount == 0)
647 		NotifyEndClosed(true);
648 
649 	if ((openMode & O_ACCMODE) == O_RDONLY
650 		|| (openMode & O_ACCMODE) == O_RDWR) {
651 		if (--fReaderCount == 0)
652 			NotifyEndClosed(false);
653 	}
654 
655 	if (fWriterCount == 0) {
656 		// Notify any still reading writers to stop
657 		// TODO: This only works reliable if there is only one writer - we could
658 		// do the same thing done for the read requests.
659 		fWriteCondition.NotifyAll(B_FILE_ERROR);
660 	}
661 
662 	if (fReaderCount == 0 && fWriterCount == 0) {
663 		fActive = false;
664 		fBuffer.DeleteBuffer();
665 	}
666 }
667 
668 
669 status_t
670 Inode::Select(uint8 event, selectsync* sync, int openMode)
671 {
672 	bool writer = true;
673 	select_sync_pool** pool;
674 	if ((openMode & O_RWMASK) == O_RDONLY) {
675 		pool = &fReadSelectSyncPool;
676 		writer = false;
677 	} else if ((openMode & O_RWMASK) == O_WRONLY) {
678 		pool = &fWriteSelectSyncPool;
679 	} else
680 		return B_NOT_ALLOWED;
681 
682 	if (add_select_sync_pool_entry(pool, sync, event) != B_OK)
683 		return B_ERROR;
684 
685 	// signal right away, if the condition holds already
686 	if (writer) {
687 		if ((event == B_SELECT_WRITE
688 				&& (fBuffer.Writable() > 0 || fReaderCount == 0))
689 			|| (event == B_SELECT_ERROR && fReaderCount == 0)) {
690 			return notify_select_event(sync, event);
691 		}
692 	} else {
693 		if (event == B_SELECT_READ
694 				&& (fBuffer.Readable() > 0 || fWriterCount == 0)) {
695 			return notify_select_event(sync, event);
696 		}
697 	}
698 
699 	return B_OK;
700 }
701 
702 
703 status_t
704 Inode::Deselect(uint8 event, selectsync* sync, int openMode)
705 {
706 	select_sync_pool** pool;
707 	if ((openMode & O_RWMASK) == O_RDONLY) {
708 		pool = &fReadSelectSyncPool;
709 	} else if ((openMode & O_RWMASK) == O_WRONLY) {
710 		pool = &fWriteSelectSyncPool;
711 	} else
712 		return B_NOT_ALLOWED;
713 
714 	remove_select_sync_pool_entry(pool, sync, event);
715 	return B_OK;
716 }
717 
718 
719 //	#pragma mark - vnode API
720 
721 
722 static status_t
723 fifo_put_vnode(fs_volume* volume, fs_vnode* vnode, bool reenter)
724 {
725 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
726 	fs_vnode* superVnode = fifo->SuperVnode();
727 
728 	status_t error = B_OK;
729 	if (superVnode->ops->put_vnode != NULL)
730 		error = superVnode->ops->put_vnode(volume, superVnode, reenter);
731 
732 	delete fifo;
733 
734 	return error;
735 }
736 
737 
738 static status_t
739 fifo_remove_vnode(fs_volume* volume, fs_vnode* vnode, bool reenter)
740 {
741 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
742 	fs_vnode* superVnode = fifo->SuperVnode();
743 
744 	status_t error = B_OK;
745 	if (superVnode->ops->remove_vnode != NULL)
746 		error = superVnode->ops->remove_vnode(volume, superVnode, reenter);
747 
748 	delete fifo;
749 
750 	return error;
751 }
752 
753 
754 static status_t
755 fifo_open(fs_volume* _volume, fs_vnode* _node, int openMode,
756 	void** _cookie)
757 {
758 	Inode* inode = (Inode*)_node->private_node;
759 
760 	TRACE("fifo_open(): node = %p, openMode = %d\n", inode, openMode);
761 
762 	file_cookie* cookie = (file_cookie*)malloc(sizeof(file_cookie));
763 	if (cookie == NULL)
764 		return B_NO_MEMORY;
765 
766 	TRACE("  open cookie = %p\n", cookie);
767 	cookie->open_mode = openMode;
768 	inode->Open(openMode);
769 
770 	*_cookie = (void*)cookie;
771 
772 	return B_OK;
773 }
774 
775 
776 static status_t
777 fifo_close(fs_volume* volume, fs_vnode* vnode, void* _cookie)
778 {
779 	file_cookie* cookie = (file_cookie*)_cookie;
780 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
781 
782 	fifo->Close(cookie->open_mode, cookie);
783 
784 	return B_OK;
785 }
786 
787 
788 static status_t
789 fifo_free_cookie(fs_volume* _volume, fs_vnode* _node, void* _cookie)
790 {
791 	file_cookie* cookie = (file_cookie*)_cookie;
792 
793 	TRACE("fifo_freecookie: entry vnode %p, cookie %p\n", _node, _cookie);
794 
795 	free(cookie);
796 
797 	return B_OK;
798 }
799 
800 
801 static status_t
802 fifo_fsync(fs_volume* _volume, fs_vnode* _node)
803 {
804 	return B_OK;
805 }
806 
807 
808 static status_t
809 fifo_read(fs_volume* _volume, fs_vnode* _node, void* _cookie,
810 	off_t /*pos*/, void* buffer, size_t* _length)
811 {
812 	file_cookie* cookie = (file_cookie*)_cookie;
813 	Inode* inode = (Inode*)_node->private_node;
814 
815 	TRACE("fifo_read(vnode = %p, cookie = %p, length = %lu, mode = %d)\n",
816 		inode, cookie, *_length, cookie->open_mode);
817 
818 	if ((cookie->open_mode & O_RWMASK) != O_RDONLY)
819 		return B_NOT_ALLOWED;
820 
821 	MutexLocker locker(inode->RequestLock());
822 
823 	if (inode->IsActive() && inode->WriterCount() == 0) {
824 		// as long there is no writer, and the pipe is empty,
825 		// we always just return 0 to indicate end of file
826 		if (inode->BytesAvailable() == 0) {
827 			*_length = 0;
828 			return B_OK;
829 		}
830 	}
831 
832 	// issue read request
833 
834 	ReadRequest request(cookie);
835 	inode->AddReadRequest(request);
836 
837 	TRACE("  issue read request %p\n", &request);
838 
839 	size_t length = *_length;
840 	status_t status = inode->ReadDataFromBuffer(buffer, &length,
841 		(cookie->open_mode & O_NONBLOCK) != 0, request);
842 
843 	inode->RemoveReadRequest(request);
844 	inode->NotifyReadDone();
845 
846 	TRACE("  done reading request %p, length %zu\n", &request, length);
847 
848 	if (length > 0)
849 		status = B_OK;
850 
851 	*_length = length;
852 	return status;
853 }
854 
855 
856 static status_t
857 fifo_write(fs_volume* _volume, fs_vnode* _node, void* _cookie,
858 	off_t /*pos*/, const void* buffer, size_t* _length)
859 {
860 	file_cookie* cookie = (file_cookie*)_cookie;
861 	Inode* inode = (Inode*)_node->private_node;
862 
863 	TRACE("fifo_write(vnode = %p, cookie = %p, length = %lu)\n",
864 		_node, cookie, *_length);
865 
866 	if ((cookie->open_mode & O_RWMASK) != O_WRONLY)
867 		return B_NOT_ALLOWED;
868 
869 	MutexLocker locker(inode->RequestLock());
870 
871 	size_t length = *_length;
872 	if (length == 0)
873 		return B_OK;
874 
875 	// copy data into ring buffer
876 	status_t status = inode->WriteDataToBuffer(buffer, &length,
877 		(cookie->open_mode & O_NONBLOCK) != 0);
878 
879 	if (length > 0)
880 		status = B_OK;
881 
882 	*_length = length;
883 	return status;
884 }
885 
886 
887 static status_t
888 fifo_read_stat(fs_volume* volume, fs_vnode* vnode, struct ::stat* st)
889 {
890 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
891 	fs_vnode* superVnode = fifo->SuperVnode();
892 
893 	if (superVnode->ops->read_stat == NULL)
894 		return B_BAD_VALUE;
895 
896 	status_t error = superVnode->ops->read_stat(volume, superVnode, st);
897 	if (error != B_OK)
898 		return error;
899 
900 
901 	MutexLocker locker(fifo->RequestLock());
902 
903 	st->st_size = fifo->BytesAvailable();
904 
905 	st->st_blksize = 4096;
906 
907 	// TODO: Just pass the changes to our modification time on to the super node.
908 	st->st_atim.tv_sec = time(NULL);
909 	st->st_atim.tv_nsec = 0;
910 	st->st_mtim = st->st_ctim = fifo->ModificationTime();
911 
912 	return B_OK;
913 }
914 
915 
916 static status_t
917 fifo_write_stat(fs_volume* volume, fs_vnode* vnode, const struct ::stat* st,
918 	uint32 statMask)
919 {
920 	// we cannot change the size of anything
921 	if ((statMask & B_STAT_SIZE) != 0)
922 		return B_BAD_VALUE;
923 
924 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
925 	fs_vnode* superVnode = fifo->SuperVnode();
926 
927 	if (superVnode->ops->write_stat == NULL)
928 		return B_BAD_VALUE;
929 
930 	status_t error = superVnode->ops->write_stat(volume, superVnode, st,
931 		statMask);
932 	if (error != B_OK)
933 		return error;
934 
935 	return B_OK;
936 }
937 
938 
939 static status_t
940 fifo_ioctl(fs_volume* _volume, fs_vnode* _vnode, void* _cookie, uint32 op,
941 	void* buffer, size_t length)
942 {
943 	TRACE("fifo_ioctl: vnode %p, cookie %p, op %ld, buf %p, len %ld\n",
944 		_vnode, _cookie, op, buffer, length);
945 
946 	return EINVAL;
947 }
948 
949 
950 static status_t
951 fifo_set_flags(fs_volume* _volume, fs_vnode* _vnode, void* _cookie,
952 	int flags)
953 {
954 	file_cookie* cookie = (file_cookie*)_cookie;
955 
956 	TRACE("fifo_set_flags(vnode = %p, flags = %x)\n", _vnode, flags);
957 	cookie->open_mode = (cookie->open_mode & ~(O_APPEND | O_NONBLOCK)) | flags;
958 	return B_OK;
959 }
960 
961 
962 static status_t
963 fifo_select(fs_volume* _volume, fs_vnode* _node, void* _cookie,
964 	uint8 event, selectsync* sync)
965 {
966 	file_cookie* cookie = (file_cookie*)_cookie;
967 
968 	TRACE("fifo_select(vnode = %p)\n", _node);
969 	Inode* inode = (Inode*)_node->private_node;
970 	if (!inode)
971 		return B_ERROR;
972 
973 	MutexLocker locker(inode->RequestLock());
974 	return inode->Select(event, sync, cookie->open_mode);
975 }
976 
977 
978 static status_t
979 fifo_deselect(fs_volume* _volume, fs_vnode* _node, void* _cookie,
980 	uint8 event, selectsync* sync)
981 {
982 	file_cookie* cookie = (file_cookie*)_cookie;
983 
984 	TRACE("fifo_deselect(vnode = %p)\n", _node);
985 	Inode* inode = (Inode*)_node->private_node;
986 	if (inode == NULL)
987 		return B_ERROR;
988 
989 	MutexLocker locker(inode->RequestLock());
990 	return inode->Deselect(event, sync, cookie->open_mode);
991 }
992 
993 
994 static bool
995 fifo_can_page(fs_volume* _volume, fs_vnode* _node, void* cookie)
996 {
997 	return false;
998 }
999 
1000 
1001 static status_t
1002 fifo_read_pages(fs_volume* _volume, fs_vnode* _node, void* cookie, off_t pos,
1003 	const iovec* vecs, size_t count, size_t* _numBytes)
1004 {
1005 	return B_NOT_ALLOWED;
1006 }
1007 
1008 
1009 static status_t
1010 fifo_write_pages(fs_volume* _volume, fs_vnode* _node, void* cookie,
1011 	off_t pos, const iovec* vecs, size_t count, size_t* _numBytes)
1012 {
1013 	return B_NOT_ALLOWED;
1014 }
1015 
1016 
1017 static status_t
1018 fifo_get_super_vnode(fs_volume* volume, fs_vnode* vnode, fs_volume* superVolume,
1019 	fs_vnode* _superVnode)
1020 {
1021 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
1022 	fs_vnode* superVnode = fifo->SuperVnode();
1023 
1024 	if (superVnode->ops->get_super_vnode != NULL) {
1025 		return superVnode->ops->get_super_vnode(volume, superVnode, superVolume,
1026 			_superVnode);
1027 	}
1028 
1029 	*_superVnode = *superVnode;
1030 
1031 	return B_OK;
1032 }
1033 
1034 
1035 static fs_vnode_ops sFIFOVnodeOps = {
1036 	NULL,	// lookup
1037 	NULL,	// get_vnode_name
1038 					// TODO: This is suboptimal! We'd need to forward the
1039 					// super node's hook, if it has got one.
1040 
1041 	&fifo_put_vnode,
1042 	&fifo_remove_vnode,
1043 
1044 	&fifo_can_page,
1045 	&fifo_read_pages,
1046 	&fifo_write_pages,
1047 
1048 	NULL,	// io()
1049 	NULL,	// cancel_io()
1050 
1051 	NULL,	// get_file_map
1052 
1053 	/* common */
1054 	&fifo_ioctl,
1055 	&fifo_set_flags,
1056 	&fifo_select,
1057 	&fifo_deselect,
1058 	&fifo_fsync,
1059 
1060 	NULL,	// fs_read_link
1061 	NULL,	// fs_symlink
1062 	NULL,	// fs_link
1063 	NULL,	// unlink
1064 	NULL,	// rename
1065 
1066 	NULL,	// fs_access()
1067 	&fifo_read_stat,
1068 	&fifo_write_stat,
1069 	NULL,
1070 
1071 	/* file */
1072 	NULL,	// create()
1073 	&fifo_open,
1074 	&fifo_close,
1075 	&fifo_free_cookie,
1076 	&fifo_read,
1077 	&fifo_write,
1078 
1079 	/* directory */
1080 	NULL,	// create_dir
1081 	NULL,	// remove_dir
1082 	NULL,	// open_dir
1083 	NULL,	// close_dir
1084 	NULL,	// free_dir_cookie
1085 	NULL,	// read_dir
1086 	NULL,	// rewind_dir
1087 
1088 	/* attribute directory operations */
1089 	NULL,	// open_attr_dir
1090 	NULL,	// close_attr_dir
1091 	NULL,	// free_attr_dir_cookie
1092 	NULL,	// read_attr_dir
1093 	NULL,	// rewind_attr_dir
1094 
1095 	/* attribute operations */
1096 	NULL,	// create_attr
1097 	NULL,	// open_attr
1098 	NULL,	// close_attr
1099 	NULL,	// free_attr_cookie
1100 	NULL,	// read_attr
1101 	NULL,	// write_attr
1102 
1103 	NULL,	// read_attr_stat
1104 	NULL,	// write_attr_stat
1105 	NULL,	// rename_attr
1106 	NULL,	// remove_attr
1107 
1108 	/* support for node and FS layers */
1109 	NULL,	// create_special_node
1110 	&fifo_get_super_vnode,
1111 };
1112 
1113 
1114 }	// namespace fifo
1115 
1116 
1117 using namespace fifo;
1118 
1119 
1120 // #pragma mark -
1121 
1122 
1123 status_t
1124 create_fifo_vnode(fs_volume* superVolume, fs_vnode* vnode)
1125 {
1126 	FIFOInode* fifo = new(std::nothrow) FIFOInode(vnode);
1127 	if (fifo == NULL)
1128 		return B_NO_MEMORY;
1129 
1130 	status_t status = fifo->InitCheck();
1131 	if (status != B_OK) {
1132 		delete fifo;
1133 		return status;
1134 	}
1135 
1136 	vnode->private_node = fifo;
1137 	vnode->ops = &sFIFOVnodeOps;
1138 
1139 	return B_OK;
1140 }
1141