xref: /haiku/src/system/kernel/fs/fifo.cpp (revision 3dfd9cb95ce45f59160d50975210bc55e3fc0709)
1 /*
2  * Copyright 2007-2011, Ingo Weinhold, ingo_weinhold@gmx.de.
3  * Copyright 2003-2010, Axel Dörfler, axeld@pinc-software.de.
4  * Distributed under the terms of the MIT License.
5  */
6 
7 
8 #include "fifo.h"
9 
10 #include <limits.h>
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <string.h>
14 #include <sys/stat.h>
15 
16 #include <new>
17 
18 #include <KernelExport.h>
19 #include <NodeMonitor.h>
20 #include <Select.h>
21 
22 #include <condition_variable.h>
23 #include <debug.h>
24 #include <khash.h>
25 #include <lock.h>
26 #include <select_sync_pool.h>
27 #include <team.h>
28 #include <thread.h>
29 #include <util/DoublyLinkedList.h>
30 #include <util/AutoLock.h>
31 #include <util/ring_buffer.h>
32 #include <vfs.h>
33 #include <vm/vm.h>
34 
35 
36 //#define TRACE_FIFO
37 #ifdef TRACE_FIFO
38 #	define TRACE(x...) dprintf(x)
39 #else
40 #	define TRACE(x...)
41 #endif
42 
43 
44 #define PIPEFS_HASH_SIZE		16
45 #define PIPEFS_MAX_BUFFER_SIZE	32768
46 
47 
48 // TODO: PIPE_BUF is supposed to be defined somewhere else.
49 #define PIPE_BUF	_POSIX_PIPE_BUF
50 
51 
52 namespace fifo {
53 
54 
55 struct file_cookie;
56 class Inode;
57 
58 
59 class RingBuffer {
60 public:
61 								RingBuffer();
62 								~RingBuffer();
63 
64 			status_t			CreateBuffer();
65 			void				DeleteBuffer();
66 
67 			size_t				Write(const void* buffer, size_t length);
68 			size_t				Read(void* buffer, size_t length);
69 			ssize_t				UserWrite(const void* buffer, ssize_t length);
70 			ssize_t				UserRead(void* buffer, ssize_t length);
71 
72 			size_t				Readable() const;
73 			size_t				Writable() const;
74 
75 private:
76 			struct ring_buffer*	fBuffer;
77 };
78 
79 
80 class ReadRequest : public DoublyLinkedListLinkImpl<ReadRequest> {
81 public:
82 	ReadRequest(file_cookie* cookie)
83 		:
84 		fThread(thread_get_current_thread()),
85 		fCookie(cookie),
86 		fNotified(true)
87 	{
88 		B_INITIALIZE_SPINLOCK(&fLock);
89 	}
90 
91 	void SetNotified(bool notified)
92 	{
93 		InterruptsSpinLocker _(fLock);
94 		fNotified = notified;
95 	}
96 
97 	void Notify(status_t status = B_OK)
98 	{
99 		InterruptsSpinLocker _(fLock);
100 		TRACE("ReadRequest %p::Notify(), fNotified %d\n", this, fNotified);
101 
102 		if (!fNotified) {
103 			SpinLocker schedulerLocker(gSchedulerLock);
104 			thread_unblock_locked(fThread, status);
105 			fNotified = true;
106 		}
107 	}
108 
109 	file_cookie* Cookie() const
110 	{
111 		return fCookie;
112 	}
113 
114 private:
115 	spinlock		fLock;
116 	Thread*			fThread;
117 	file_cookie*	fCookie;
118 	volatile bool	fNotified;
119 };
120 
121 
122 class WriteRequest : public DoublyLinkedListLinkImpl<WriteRequest> {
123 public:
124 	WriteRequest(size_t minimalWriteCount)
125 		:
126 		fMinimalWriteCount(minimalWriteCount)
127 	{
128 	}
129 
130 	size_t MinimalWriteCount() const
131 	{
132 		return fMinimalWriteCount;
133 	}
134 
135 private:
136 	size_t	fMinimalWriteCount;
137 };
138 
139 
140 typedef DoublyLinkedList<ReadRequest> ReadRequestList;
141 typedef DoublyLinkedList<WriteRequest> WriteRequestList;
142 
143 
144 class Inode {
145 public:
146 								Inode();
147 								~Inode();
148 
149 			status_t			InitCheck();
150 
151 			bool				IsActive() const { return fActive; }
152 			timespec			CreationTime() const { return fCreationTime; }
153 			void				SetCreationTime(timespec creationTime)
154 									{ fCreationTime = creationTime; }
155 			timespec			ModificationTime() const
156 									{ return fModificationTime; }
157 			void				SetModificationTime(timespec modificationTime)
158 									{ fModificationTime = modificationTime; }
159 
160 			mutex*				RequestLock() { return &fRequestLock; }
161 
162 			status_t			WriteDataToBuffer(const void* data,
163 									size_t* _length, bool nonBlocking);
164 			status_t			ReadDataFromBuffer(void* data, size_t* _length,
165 									bool nonBlocking, ReadRequest& request);
166 			size_t				BytesAvailable() const
167 									{ return fBuffer.Readable(); }
168 			size_t				BytesWritable() const
169 									{ return fBuffer.Writable(); }
170 
171 			void				AddReadRequest(ReadRequest& request);
172 			void				RemoveReadRequest(ReadRequest& request);
173 			status_t			WaitForReadRequest(ReadRequest& request);
174 
175 			void				NotifyBytesRead(size_t bytes);
176 			void				NotifyReadDone();
177 			void				NotifyBytesWritten(size_t bytes);
178 			void				NotifyEndClosed(bool writer);
179 
180 			void				Open(int openMode);
181 			void				Close(int openMode, file_cookie* cookie);
182 			int32				ReaderCount() const { return fReaderCount; }
183 			int32				WriterCount() const { return fWriterCount; }
184 
185 			status_t			Select(uint8 event, selectsync* sync,
186 									int openMode);
187 			status_t			Deselect(uint8 event, selectsync* sync,
188 									int openMode);
189 
190 private:
191 			timespec			fCreationTime;
192 			timespec			fModificationTime;
193 
194 			RingBuffer			fBuffer;
195 
196 			ReadRequestList		fReadRequests;
197 			WriteRequestList	fWriteRequests;
198 
199 			mutex				fRequestLock;
200 
201 			ConditionVariable	fWriteCondition;
202 
203 			int32				fReaderCount;
204 			int32				fWriterCount;
205 			bool				fActive;
206 
207 			select_sync_pool*	fReadSelectSyncPool;
208 			select_sync_pool*	fWriteSelectSyncPool;
209 };
210 
211 
212 class FIFOInode : public Inode {
213 public:
214 	FIFOInode(fs_vnode* vnode)
215 		:
216 		Inode(),
217 		fSuperVnode(*vnode)
218 	{
219 	}
220 
221 	fs_vnode*	SuperVnode() { return &fSuperVnode; }
222 
223 private:
224 	fs_vnode	fSuperVnode;
225 };
226 
227 
228 struct file_cookie {
229 	int				open_mode;
230 };
231 
232 
233 // #pragma mark -
234 
235 
236 RingBuffer::RingBuffer()
237 	:
238 	fBuffer(NULL)
239 {
240 }
241 
242 
243 RingBuffer::~RingBuffer()
244 {
245 	DeleteBuffer();
246 }
247 
248 
249 status_t
250 RingBuffer::CreateBuffer()
251 {
252 	if (fBuffer != NULL)
253 		return B_OK;
254 
255 	fBuffer = create_ring_buffer(PIPEFS_MAX_BUFFER_SIZE);
256 	return fBuffer != NULL ? B_OK : B_NO_MEMORY;
257 }
258 
259 
260 void
261 RingBuffer::DeleteBuffer()
262 {
263 	if (fBuffer != NULL) {
264 		delete_ring_buffer(fBuffer);
265 		fBuffer = NULL;
266 	}
267 }
268 
269 
270 inline size_t
271 RingBuffer::Write(const void* buffer, size_t length)
272 {
273 	if (fBuffer == NULL)
274 		return B_NO_MEMORY;
275 
276 	return ring_buffer_write(fBuffer, (const uint8*)buffer, length);
277 }
278 
279 
280 inline size_t
281 RingBuffer::Read(void* buffer, size_t length)
282 {
283 	if (fBuffer == NULL)
284 		return B_NO_MEMORY;
285 
286 	return ring_buffer_read(fBuffer, (uint8*)buffer, length);
287 }
288 
289 
290 inline ssize_t
291 RingBuffer::UserWrite(const void* buffer, ssize_t length)
292 {
293 	if (fBuffer == NULL)
294 		return B_NO_MEMORY;
295 
296 	return ring_buffer_user_write(fBuffer, (const uint8*)buffer, length);
297 }
298 
299 
300 inline ssize_t
301 RingBuffer::UserRead(void* buffer, ssize_t length)
302 {
303 	if (fBuffer == NULL)
304 		return B_NO_MEMORY;
305 
306 	return ring_buffer_user_read(fBuffer, (uint8*)buffer, length);
307 }
308 
309 
310 inline size_t
311 RingBuffer::Readable() const
312 {
313 	return fBuffer != NULL ? ring_buffer_readable(fBuffer) : 0;
314 }
315 
316 
317 inline size_t
318 RingBuffer::Writable() const
319 {
320 	return fBuffer != NULL ? ring_buffer_writable(fBuffer) : 0;
321 }
322 
323 
324 //	#pragma mark -
325 
326 
327 Inode::Inode()
328 	:
329 	fReadRequests(),
330 	fWriteRequests(),
331 	fReaderCount(0),
332 	fWriterCount(0),
333 	fActive(false),
334 	fReadSelectSyncPool(NULL),
335 	fWriteSelectSyncPool(NULL)
336 {
337 	fWriteCondition.Publish(this, "pipe");
338 	mutex_init(&fRequestLock, "pipe request");
339 
340 	bigtime_t time = real_time_clock();
341 	fModificationTime.tv_sec = time / 1000000;
342 	fModificationTime.tv_nsec = (time % 1000000) * 1000;
343 	fCreationTime = fModificationTime;
344 }
345 
346 
347 Inode::~Inode()
348 {
349 	fWriteCondition.Unpublish();
350 	mutex_destroy(&fRequestLock);
351 }
352 
353 
354 status_t
355 Inode::InitCheck()
356 {
357 	return B_OK;
358 }
359 
360 
361 /*!	Writes the specified data bytes to the inode's ring buffer. The
362 	request lock must be held when calling this method.
363 	Notifies readers if necessary, so that blocking readers will get started.
364 	Returns B_OK for success, B_BAD_ADDRESS if copying from the buffer failed,
365 	and various semaphore errors (like B_WOULD_BLOCK in non-blocking mode). If
366 	the returned length is > 0, the returned error code can be ignored.
367 */
368 status_t
369 Inode::WriteDataToBuffer(const void* _data, size_t* _length, bool nonBlocking)
370 {
371 	const uint8* data = (const uint8*)_data;
372 	size_t dataSize = *_length;
373 	size_t& written = *_length;
374 	written = 0;
375 
376 	TRACE("Inode %p::WriteDataToBuffer(data = %p, bytes = %zu)\n", this, data,
377 		dataSize);
378 
379 	// According to the standard, request up to PIPE_BUF bytes shall not be
380 	// interleaved with other writer's data.
381 	size_t minToWrite = 1;
382 	if (dataSize <= PIPE_BUF)
383 		minToWrite = dataSize;
384 
385 	while (dataSize > 0) {
386 		// Wait until enough space in the buffer is available.
387 		while (!fActive
388 				|| (fBuffer.Writable() < minToWrite && fReaderCount > 0)) {
389 			if (nonBlocking)
390 				return B_WOULD_BLOCK;
391 
392 			ConditionVariableEntry entry;
393 			entry.Add(this);
394 
395 			WriteRequest request(minToWrite);
396 			fWriteRequests.Add(&request);
397 
398 			mutex_unlock(&fRequestLock);
399 			status_t status = entry.Wait(B_CAN_INTERRUPT);
400 			mutex_lock(&fRequestLock);
401 
402 			fWriteRequests.Remove(&request);
403 
404 			if (status != B_OK)
405 				return status;
406 		}
407 
408 		// write only as long as there are readers left
409 		if (fActive && fReaderCount == 0) {
410 			if (written == 0)
411 				send_signal(find_thread(NULL), SIGPIPE);
412 			return EPIPE;
413 		}
414 
415 		// write as much as we can
416 
417 		size_t toWrite = (fActive ? fBuffer.Writable() : 0);
418 		if (toWrite > dataSize)
419 			toWrite = dataSize;
420 
421 		if (toWrite > 0 && fBuffer.UserWrite(data, toWrite) < B_OK)
422 			return B_BAD_ADDRESS;
423 
424 		data += toWrite;
425 		dataSize -= toWrite;
426 		written += toWrite;
427 
428 		NotifyBytesWritten(toWrite);
429 	}
430 
431 	return B_OK;
432 }
433 
434 
435 status_t
436 Inode::ReadDataFromBuffer(void* data, size_t* _length, bool nonBlocking,
437 	ReadRequest& request)
438 {
439 	size_t dataSize = *_length;
440 	*_length = 0;
441 
442 	// wait until our request is first in queue
443 	status_t error;
444 	if (fReadRequests.Head() != &request) {
445 		if (nonBlocking)
446 			return B_WOULD_BLOCK;
447 
448 		TRACE("Inode %p::%s(): wait for request %p to become the first "
449 			"request.\n", this, __FUNCTION__, &request);
450 
451 		error = WaitForReadRequest(request);
452 		if (error != B_OK)
453 			return error;
454 	}
455 
456 	// wait until data are available
457 	while (fBuffer.Readable() == 0) {
458 		if (nonBlocking)
459 			return B_WOULD_BLOCK;
460 
461 		if (fActive && fWriterCount == 0)
462 			return B_OK;
463 
464 		TRACE("Inode %p::%s(): wait for data, request %p\n", this, __FUNCTION__,
465 			&request);
466 
467 		error = WaitForReadRequest(request);
468 		if (error != B_OK)
469 			return error;
470 	}
471 
472 	// read as much as we can
473 	size_t toRead = fBuffer.Readable();
474 	if (toRead > dataSize)
475 		toRead = dataSize;
476 
477 	if (fBuffer.UserRead(data, toRead) < B_OK)
478 		return B_BAD_ADDRESS;
479 
480 	NotifyBytesRead(toRead);
481 
482 	*_length = toRead;
483 
484 	return B_OK;
485 }
486 
487 
488 void
489 Inode::AddReadRequest(ReadRequest& request)
490 {
491 	fReadRequests.Add(&request);
492 }
493 
494 
495 void
496 Inode::RemoveReadRequest(ReadRequest& request)
497 {
498 	fReadRequests.Remove(&request);
499 }
500 
501 
502 status_t
503 Inode::WaitForReadRequest(ReadRequest& request)
504 {
505 	// add the entry to wait on
506 	thread_prepare_to_block(thread_get_current_thread(), B_CAN_INTERRUPT,
507 		THREAD_BLOCK_TYPE_OTHER, "fifo read request");
508 
509 	request.SetNotified(false);
510 
511 	// wait
512 	mutex_unlock(&fRequestLock);
513 	status_t status = thread_block();
514 
515 	// Before going to lock again, we need to make sure no one tries to
516 	// unblock us. Otherwise that would screw with mutex_lock().
517 	request.SetNotified(true);
518 
519 	mutex_lock(&fRequestLock);
520 
521 	return status;
522 }
523 
524 
525 void
526 Inode::NotifyBytesRead(size_t bytes)
527 {
528 	// notify writer, if something can be written now
529 	size_t writable = fBuffer.Writable();
530 	if (bytes > 0) {
531 		// notify select()ors only, if nothing was writable before
532 		if (writable == bytes) {
533 			if (fWriteSelectSyncPool)
534 				notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
535 		}
536 
537 		// If any of the waiting writers has a minimal write count that has
538 		// now become satisfied, we notify all of them (condition variables
539 		// don't support doing that selectively).
540 		WriteRequest* request;
541 		WriteRequestList::Iterator iterator = fWriteRequests.GetIterator();
542 		while ((request = iterator.Next()) != NULL) {
543 			size_t minWriteCount = request->MinimalWriteCount();
544 			if (minWriteCount > 0 && minWriteCount <= writable
545 					&& minWriteCount > writable - bytes) {
546 				fWriteCondition.NotifyAll();
547 				break;
548 			}
549 		}
550 	}
551 }
552 
553 
554 void
555 Inode::NotifyReadDone()
556 {
557 	// notify next reader, if there's still something to be read
558 	if (fBuffer.Readable() > 0) {
559 		if (ReadRequest* request = fReadRequests.First())
560 			request->Notify();
561 	}
562 }
563 
564 
565 void
566 Inode::NotifyBytesWritten(size_t bytes)
567 {
568 	// notify reader, if something can be read now
569 	if (bytes > 0 && fBuffer.Readable() == bytes) {
570 		if (fReadSelectSyncPool)
571 			notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
572 
573 		if (ReadRequest* request = fReadRequests.First())
574 			request->Notify();
575 	}
576 }
577 
578 
579 void
580 Inode::NotifyEndClosed(bool writer)
581 {
582 	TRACE("Inode %p::%s(%s)\n", this, __FUNCTION__,
583 		writer ? "writer" : "reader");
584 
585 	if (writer) {
586 		// Our last writer has been closed; if the pipe
587 		// contains no data, unlock all waiting readers
588 		TRACE("  buffer readable: %zu\n", fBuffer.Readable());
589 		if (fBuffer.Readable() == 0) {
590 			ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
591 			while (ReadRequest* request = iterator.Next())
592 				request->Notify();
593 
594 			if (fReadSelectSyncPool)
595 				notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
596 		}
597 	} else {
598 		// Last reader is gone. Wake up all writers.
599 		fWriteCondition.NotifyAll();
600 
601 		if (fWriteSelectSyncPool) {
602 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
603 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_ERROR);
604 		}
605 	}
606 }
607 
608 
609 void
610 Inode::Open(int openMode)
611 {
612 	MutexLocker locker(RequestLock());
613 
614 	if ((openMode & O_ACCMODE) == O_WRONLY)
615 		fWriterCount++;
616 
617 	if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR)
618 		fReaderCount++;
619 
620 	if (fReaderCount > 0 && fWriterCount > 0) {
621 		TRACE("Inode %p::Open(): fifo becomes active\n", this);
622 		fBuffer.CreateBuffer();
623 		fActive = true;
624 
625 		// notify all waiting writers that they can start
626 		if (fWriteSelectSyncPool)
627 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
628 		fWriteCondition.NotifyAll();
629 	}
630 }
631 
632 
633 void
634 Inode::Close(int openMode, file_cookie* cookie)
635 {
636 	TRACE("Inode %p::Close(openMode = %d)\n", this, openMode);
637 
638 	MutexLocker locker(RequestLock());
639 
640 	// Notify all currently reading file descriptors
641 	ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
642 	while (ReadRequest* request = iterator.Next()) {
643 		if (request->Cookie() == cookie)
644 			request->Notify(B_FILE_ERROR);
645 	}
646 
647 	if ((openMode & O_ACCMODE) == O_WRONLY && --fWriterCount == 0)
648 		NotifyEndClosed(true);
649 
650 	if ((openMode & O_ACCMODE) == O_RDONLY
651 		|| (openMode & O_ACCMODE) == O_RDWR) {
652 		if (--fReaderCount == 0)
653 			NotifyEndClosed(false);
654 	}
655 
656 	if (fWriterCount == 0) {
657 		// Notify any still reading writers to stop
658 		// TODO: This only works reliable if there is only one writer - we could
659 		// do the same thing done for the read requests.
660 		fWriteCondition.NotifyAll(B_FILE_ERROR);
661 	}
662 
663 	if (fReaderCount == 0 && fWriterCount == 0) {
664 		fActive = false;
665 		fBuffer.DeleteBuffer();
666 	}
667 }
668 
669 
670 status_t
671 Inode::Select(uint8 event, selectsync* sync, int openMode)
672 {
673 	bool writer = true;
674 	select_sync_pool** pool;
675 	if ((openMode & O_RWMASK) == O_RDONLY) {
676 		pool = &fReadSelectSyncPool;
677 		writer = false;
678 	} else if ((openMode & O_RWMASK) == O_WRONLY) {
679 		pool = &fWriteSelectSyncPool;
680 	} else
681 		return B_NOT_ALLOWED;
682 
683 	if (add_select_sync_pool_entry(pool, sync, event) != B_OK)
684 		return B_ERROR;
685 
686 	// signal right away, if the condition holds already
687 	if (writer) {
688 		if ((event == B_SELECT_WRITE
689 				&& (fBuffer.Writable() > 0 || fReaderCount == 0))
690 			|| (event == B_SELECT_ERROR && fReaderCount == 0)) {
691 			return notify_select_event(sync, event);
692 		}
693 	} else {
694 		if (event == B_SELECT_READ
695 				&& (fBuffer.Readable() > 0 || fWriterCount == 0)) {
696 			return notify_select_event(sync, event);
697 		}
698 	}
699 
700 	return B_OK;
701 }
702 
703 
704 status_t
705 Inode::Deselect(uint8 event, selectsync* sync, int openMode)
706 {
707 	select_sync_pool** pool;
708 	if ((openMode & O_RWMASK) == O_RDONLY) {
709 		pool = &fReadSelectSyncPool;
710 	} else if ((openMode & O_RWMASK) == O_WRONLY) {
711 		pool = &fWriteSelectSyncPool;
712 	} else
713 		return B_NOT_ALLOWED;
714 
715 	remove_select_sync_pool_entry(pool, sync, event);
716 	return B_OK;
717 }
718 
719 
720 //	#pragma mark - vnode API
721 
722 
723 static status_t
724 fifo_put_vnode(fs_volume* volume, fs_vnode* vnode, bool reenter)
725 {
726 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
727 	fs_vnode* superVnode = fifo->SuperVnode();
728 
729 	status_t error = B_OK;
730 	if (superVnode->ops->put_vnode != NULL)
731 		error = superVnode->ops->put_vnode(volume, superVnode, reenter);
732 
733 	delete fifo;
734 
735 	return error;
736 }
737 
738 
739 static status_t
740 fifo_remove_vnode(fs_volume* volume, fs_vnode* vnode, bool reenter)
741 {
742 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
743 	fs_vnode* superVnode = fifo->SuperVnode();
744 
745 	status_t error = B_OK;
746 	if (superVnode->ops->remove_vnode != NULL)
747 		error = superVnode->ops->remove_vnode(volume, superVnode, reenter);
748 
749 	delete fifo;
750 
751 	return error;
752 }
753 
754 
755 static status_t
756 fifo_open(fs_volume* _volume, fs_vnode* _node, int openMode,
757 	void** _cookie)
758 {
759 	Inode* inode = (Inode*)_node->private_node;
760 
761 	TRACE("fifo_open(): node = %p, openMode = %d\n", inode, openMode);
762 
763 	file_cookie* cookie = (file_cookie*)malloc(sizeof(file_cookie));
764 	if (cookie == NULL)
765 		return B_NO_MEMORY;
766 
767 	TRACE("  open cookie = %p\n", cookie);
768 	cookie->open_mode = openMode;
769 	inode->Open(openMode);
770 
771 	*_cookie = (void*)cookie;
772 
773 	return B_OK;
774 }
775 
776 
777 static status_t
778 fifo_close(fs_volume* volume, fs_vnode* vnode, void* _cookie)
779 {
780 	file_cookie* cookie = (file_cookie*)_cookie;
781 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
782 
783 	fifo->Close(cookie->open_mode, cookie);
784 
785 	return B_OK;
786 }
787 
788 
789 static status_t
790 fifo_free_cookie(fs_volume* _volume, fs_vnode* _node, void* _cookie)
791 {
792 	file_cookie* cookie = (file_cookie*)_cookie;
793 
794 	TRACE("fifo_freecookie: entry vnode %p, cookie %p\n", _node, _cookie);
795 
796 	free(cookie);
797 
798 	return B_OK;
799 }
800 
801 
802 static status_t
803 fifo_fsync(fs_volume* _volume, fs_vnode* _node)
804 {
805 	return B_OK;
806 }
807 
808 
809 static status_t
810 fifo_read(fs_volume* _volume, fs_vnode* _node, void* _cookie,
811 	off_t /*pos*/, void* buffer, size_t* _length)
812 {
813 	file_cookie* cookie = (file_cookie*)_cookie;
814 	Inode* inode = (Inode*)_node->private_node;
815 
816 	TRACE("fifo_read(vnode = %p, cookie = %p, length = %lu, mode = %d)\n",
817 		inode, cookie, *_length, cookie->open_mode);
818 
819 	if ((cookie->open_mode & O_RWMASK) != O_RDONLY)
820 		return B_NOT_ALLOWED;
821 
822 	MutexLocker locker(inode->RequestLock());
823 
824 	if (inode->IsActive() && inode->WriterCount() == 0) {
825 		// as long there is no writer, and the pipe is empty,
826 		// we always just return 0 to indicate end of file
827 		if (inode->BytesAvailable() == 0) {
828 			*_length = 0;
829 			return B_OK;
830 		}
831 	}
832 
833 	// issue read request
834 
835 	ReadRequest request(cookie);
836 	inode->AddReadRequest(request);
837 
838 	TRACE("  issue read request %p\n", &request);
839 
840 	size_t length = *_length;
841 	status_t status = inode->ReadDataFromBuffer(buffer, &length,
842 		(cookie->open_mode & O_NONBLOCK) != 0, request);
843 
844 	inode->RemoveReadRequest(request);
845 	inode->NotifyReadDone();
846 
847 	TRACE("  done reading request %p, length %zu\n", &request, length);
848 
849 	if (length > 0)
850 		status = B_OK;
851 
852 	*_length = length;
853 	return status;
854 }
855 
856 
857 static status_t
858 fifo_write(fs_volume* _volume, fs_vnode* _node, void* _cookie,
859 	off_t /*pos*/, const void* buffer, size_t* _length)
860 {
861 	file_cookie* cookie = (file_cookie*)_cookie;
862 	Inode* inode = (Inode*)_node->private_node;
863 
864 	TRACE("fifo_write(vnode = %p, cookie = %p, length = %lu)\n",
865 		_node, cookie, *_length);
866 
867 	if ((cookie->open_mode & O_RWMASK) != O_WRONLY)
868 		return B_NOT_ALLOWED;
869 
870 	MutexLocker locker(inode->RequestLock());
871 
872 	size_t length = *_length;
873 	if (length == 0)
874 		return B_OK;
875 
876 	// copy data into ring buffer
877 	status_t status = inode->WriteDataToBuffer(buffer, &length,
878 		(cookie->open_mode & O_NONBLOCK) != 0);
879 
880 	if (length > 0)
881 		status = B_OK;
882 
883 	*_length = length;
884 	return status;
885 }
886 
887 
888 static status_t
889 fifo_read_stat(fs_volume* volume, fs_vnode* vnode, struct ::stat* st)
890 {
891 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
892 	fs_vnode* superVnode = fifo->SuperVnode();
893 
894 	if (superVnode->ops->read_stat == NULL)
895 		return B_BAD_VALUE;
896 
897 	status_t error = superVnode->ops->read_stat(volume, superVnode, st);
898 	if (error != B_OK)
899 		return error;
900 
901 
902 	MutexLocker locker(fifo->RequestLock());
903 
904 	st->st_size = fifo->BytesAvailable();
905 
906 	st->st_blksize = 4096;
907 
908 	// TODO: Just pass the changes to our modification time on to the super node.
909 	st->st_atim.tv_sec = time(NULL);
910 	st->st_atim.tv_nsec = 0;
911 	st->st_mtim = st->st_ctim = fifo->ModificationTime();
912 
913 	return B_OK;
914 }
915 
916 
917 static status_t
918 fifo_write_stat(fs_volume* volume, fs_vnode* vnode, const struct ::stat* st,
919 	uint32 statMask)
920 {
921 	// we cannot change the size of anything
922 	if ((statMask & B_STAT_SIZE) != 0)
923 		return B_BAD_VALUE;
924 
925 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
926 	fs_vnode* superVnode = fifo->SuperVnode();
927 
928 	if (superVnode->ops->write_stat == NULL)
929 		return B_BAD_VALUE;
930 
931 	status_t error = superVnode->ops->write_stat(volume, superVnode, st,
932 		statMask);
933 	if (error != B_OK)
934 		return error;
935 
936 	return B_OK;
937 }
938 
939 
940 static status_t
941 fifo_ioctl(fs_volume* _volume, fs_vnode* _vnode, void* _cookie, uint32 op,
942 	void* buffer, size_t length)
943 {
944 	TRACE("fifo_ioctl: vnode %p, cookie %p, op %ld, buf %p, len %ld\n",
945 		_vnode, _cookie, op, buffer, length);
946 
947 	return EINVAL;
948 }
949 
950 
951 static status_t
952 fifo_set_flags(fs_volume* _volume, fs_vnode* _vnode, void* _cookie,
953 	int flags)
954 {
955 	file_cookie* cookie = (file_cookie*)_cookie;
956 
957 	TRACE("fifo_set_flags(vnode = %p, flags = %x)\n", _vnode, flags);
958 	cookie->open_mode = (cookie->open_mode & ~(O_APPEND | O_NONBLOCK)) | flags;
959 	return B_OK;
960 }
961 
962 
963 static status_t
964 fifo_select(fs_volume* _volume, fs_vnode* _node, void* _cookie,
965 	uint8 event, selectsync* sync)
966 {
967 	file_cookie* cookie = (file_cookie*)_cookie;
968 
969 	TRACE("fifo_select(vnode = %p)\n", _node);
970 	Inode* inode = (Inode*)_node->private_node;
971 	if (!inode)
972 		return B_ERROR;
973 
974 	MutexLocker locker(inode->RequestLock());
975 	return inode->Select(event, sync, cookie->open_mode);
976 }
977 
978 
979 static status_t
980 fifo_deselect(fs_volume* _volume, fs_vnode* _node, void* _cookie,
981 	uint8 event, selectsync* sync)
982 {
983 	file_cookie* cookie = (file_cookie*)_cookie;
984 
985 	TRACE("fifo_deselect(vnode = %p)\n", _node);
986 	Inode* inode = (Inode*)_node->private_node;
987 	if (inode == NULL)
988 		return B_ERROR;
989 
990 	MutexLocker locker(inode->RequestLock());
991 	return inode->Deselect(event, sync, cookie->open_mode);
992 }
993 
994 
995 static bool
996 fifo_can_page(fs_volume* _volume, fs_vnode* _node, void* cookie)
997 {
998 	return false;
999 }
1000 
1001 
1002 static status_t
1003 fifo_read_pages(fs_volume* _volume, fs_vnode* _node, void* cookie, off_t pos,
1004 	const iovec* vecs, size_t count, size_t* _numBytes)
1005 {
1006 	return B_NOT_ALLOWED;
1007 }
1008 
1009 
1010 static status_t
1011 fifo_write_pages(fs_volume* _volume, fs_vnode* _node, void* cookie,
1012 	off_t pos, const iovec* vecs, size_t count, size_t* _numBytes)
1013 {
1014 	return B_NOT_ALLOWED;
1015 }
1016 
1017 
1018 static status_t
1019 fifo_get_super_vnode(fs_volume* volume, fs_vnode* vnode, fs_volume* superVolume,
1020 	fs_vnode* _superVnode)
1021 {
1022 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
1023 	fs_vnode* superVnode = fifo->SuperVnode();
1024 
1025 	if (superVnode->ops->get_super_vnode != NULL) {
1026 		return superVnode->ops->get_super_vnode(volume, superVnode, superVolume,
1027 			_superVnode);
1028 	}
1029 
1030 	*_superVnode = *superVnode;
1031 
1032 	return B_OK;
1033 }
1034 
1035 
1036 static fs_vnode_ops sFIFOVnodeOps = {
1037 	NULL,	// lookup
1038 	NULL,	// get_vnode_name
1039 					// TODO: This is suboptimal! We'd need to forward the
1040 					// super node's hook, if it has got one.
1041 
1042 	&fifo_put_vnode,
1043 	&fifo_remove_vnode,
1044 
1045 	&fifo_can_page,
1046 	&fifo_read_pages,
1047 	&fifo_write_pages,
1048 
1049 	NULL,	// io()
1050 	NULL,	// cancel_io()
1051 
1052 	NULL,	// get_file_map
1053 
1054 	/* common */
1055 	&fifo_ioctl,
1056 	&fifo_set_flags,
1057 	&fifo_select,
1058 	&fifo_deselect,
1059 	&fifo_fsync,
1060 
1061 	NULL,	// fs_read_link
1062 	NULL,	// fs_symlink
1063 	NULL,	// fs_link
1064 	NULL,	// unlink
1065 	NULL,	// rename
1066 
1067 	NULL,	// fs_access()
1068 	&fifo_read_stat,
1069 	&fifo_write_stat,
1070 	NULL,
1071 
1072 	/* file */
1073 	NULL,	// create()
1074 	&fifo_open,
1075 	&fifo_close,
1076 	&fifo_free_cookie,
1077 	&fifo_read,
1078 	&fifo_write,
1079 
1080 	/* directory */
1081 	NULL,	// create_dir
1082 	NULL,	// remove_dir
1083 	NULL,	// open_dir
1084 	NULL,	// close_dir
1085 	NULL,	// free_dir_cookie
1086 	NULL,	// read_dir
1087 	NULL,	// rewind_dir
1088 
1089 	/* attribute directory operations */
1090 	NULL,	// open_attr_dir
1091 	NULL,	// close_attr_dir
1092 	NULL,	// free_attr_dir_cookie
1093 	NULL,	// read_attr_dir
1094 	NULL,	// rewind_attr_dir
1095 
1096 	/* attribute operations */
1097 	NULL,	// create_attr
1098 	NULL,	// open_attr
1099 	NULL,	// close_attr
1100 	NULL,	// free_attr_cookie
1101 	NULL,	// read_attr
1102 	NULL,	// write_attr
1103 
1104 	NULL,	// read_attr_stat
1105 	NULL,	// write_attr_stat
1106 	NULL,	// rename_attr
1107 	NULL,	// remove_attr
1108 
1109 	/* support for node and FS layers */
1110 	NULL,	// create_special_node
1111 	&fifo_get_super_vnode,
1112 };
1113 
1114 
1115 }	// namespace fifo
1116 
1117 
1118 using namespace fifo;
1119 
1120 
1121 // #pragma mark -
1122 
1123 
1124 status_t
1125 create_fifo_vnode(fs_volume* superVolume, fs_vnode* vnode)
1126 {
1127 	FIFOInode* fifo = new(std::nothrow) FIFOInode(vnode);
1128 	if (fifo == NULL)
1129 		return B_NO_MEMORY;
1130 
1131 	status_t status = fifo->InitCheck();
1132 	if (status != B_OK) {
1133 		delete fifo;
1134 		return status;
1135 	}
1136 
1137 	vnode->private_node = fifo;
1138 	vnode->ops = &sFIFOVnodeOps;
1139 
1140 	return B_OK;
1141 }
1142