xref: /haiku/src/system/kernel/fs/fifo.cpp (revision 984f843b917a1c4e077915c5961a6ef1cf8dabc7)
1 /*
2  * Copyright 2007-2013, Ingo Weinhold, ingo_weinhold@gmx.de.
3  * Copyright 2003-2010, Axel Dörfler, axeld@pinc-software.de.
4  * Distributed under the terms of the MIT License.
5  */
6 
7 
8 #include "fifo.h"
9 
10 #include <limits.h>
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <string.h>
14 #include <sys/ioctl.h>
15 #include <sys/stat.h>
16 
17 #include <new>
18 
19 #include <KernelExport.h>
20 #include <NodeMonitor.h>
21 #include <Select.h>
22 
23 #include <condition_variable.h>
24 #include <debug_hex_dump.h>
25 #include <lock.h>
26 #include <select_sync_pool.h>
27 #include <syscall_restart.h>
28 #include <team.h>
29 #include <thread.h>
30 #include <util/DoublyLinkedList.h>
31 #include <util/AutoLock.h>
32 #include <util/ring_buffer.h>
33 #include <vfs.h>
34 #include <vfs_defs.h>
35 #include <vm/vm.h>
36 
37 
38 //#define TRACE_FIFO
39 #ifdef TRACE_FIFO
40 #	define TRACE(x...) dprintf(x)
41 #else
42 #	define TRACE(x...)
43 #endif
44 
45 
46 namespace fifo {
47 
48 
49 struct file_cookie;
50 class Inode;
51 
52 
53 class RingBuffer {
54 public:
55 								RingBuffer();
56 								~RingBuffer();
57 
58 			status_t			CreateBuffer();
59 			void				DeleteBuffer();
60 
61 			ssize_t				Write(const void* buffer, size_t length,
62 									bool isUser);
63 			ssize_t				Read(void* buffer, size_t length, bool isUser);
64 			ssize_t				Peek(size_t offset, void* buffer,
65 									size_t length) const;
66 
67 			size_t				Readable() const;
68 			size_t				Writable() const;
69 
70 private:
71 			struct ring_buffer*	fBuffer;
72 };
73 
74 
75 class ReadRequest : public DoublyLinkedListLinkImpl<ReadRequest> {
76 public:
77 	ReadRequest(file_cookie* cookie)
78 		:
79 		fThread(thread_get_current_thread()),
80 		fCookie(cookie),
81 		fNotified(true)
82 	{
83 		B_INITIALIZE_SPINLOCK(&fLock);
84 	}
85 
86 	void SetNotified(bool notified)
87 	{
88 		InterruptsSpinLocker _(fLock);
89 		fNotified = notified;
90 	}
91 
92 	void Notify(status_t status = B_OK)
93 	{
94 		InterruptsSpinLocker _(fLock);
95 		TRACE("ReadRequest %p::Notify(), fNotified %d\n", this, fNotified);
96 
97 		if (!fNotified) {
98 			thread_unblock(fThread, status);
99 			fNotified = true;
100 		}
101 	}
102 
103 	Thread* GetThread() const
104 	{
105 		return fThread;
106 	}
107 
108 	file_cookie* Cookie() const
109 	{
110 		return fCookie;
111 	}
112 
113 private:
114 	spinlock		fLock;
115 	Thread*			fThread;
116 	file_cookie*	fCookie;
117 	volatile bool	fNotified;
118 };
119 
120 
121 class WriteRequest : public DoublyLinkedListLinkImpl<WriteRequest> {
122 public:
123 	WriteRequest(Thread* thread, size_t minimalWriteCount)
124 		:
125 		fThread(thread),
126 		fMinimalWriteCount(minimalWriteCount)
127 	{
128 	}
129 
130 	Thread* GetThread() const
131 	{
132 		return fThread;
133 	}
134 
135 	size_t MinimalWriteCount() const
136 	{
137 		return fMinimalWriteCount;
138 	}
139 
140 private:
141 	Thread*	fThread;
142 	size_t	fMinimalWriteCount;
143 };
144 
145 
146 typedef DoublyLinkedList<ReadRequest> ReadRequestList;
147 typedef DoublyLinkedList<WriteRequest> WriteRequestList;
148 
149 
150 class Inode {
151 public:
152 								Inode();
153 								~Inode();
154 
155 			status_t			InitCheck();
156 
157 			bool				IsActive() const { return fActive; }
158 			timespec			CreationTime() const { return fCreationTime; }
159 			void				SetCreationTime(timespec creationTime)
160 									{ fCreationTime = creationTime; }
161 			timespec			ModificationTime() const
162 									{ return fModificationTime; }
163 			void				SetModificationTime(timespec modificationTime)
164 									{ fModificationTime = modificationTime; }
165 
166 			mutex*				RequestLock() { return &fRequestLock; }
167 
168 			status_t			WriteDataToBuffer(const void* data,
169 									size_t* _length, bool nonBlocking,
170 									bool isUser);
171 			status_t			ReadDataFromBuffer(void* data, size_t* _length,
172 									bool nonBlocking, bool isUser,
173 									ReadRequest& request);
174 			size_t				BytesAvailable() const
175 									{ return fBuffer.Readable(); }
176 			size_t				BytesWritable() const
177 									{ return fBuffer.Writable(); }
178 
179 			void				AddReadRequest(ReadRequest& request);
180 			void				RemoveReadRequest(ReadRequest& request);
181 			status_t			WaitForReadRequest(ReadRequest& request);
182 
183 			void				NotifyBytesRead(size_t bytes);
184 			void				NotifyReadDone();
185 			void				NotifyBytesWritten(size_t bytes);
186 			void				NotifyEndClosed(bool writer);
187 
188 			void				Open(int openMode);
189 			void				Close(file_cookie* cookie);
190 			int32				ReaderCount() const { return fReaderCount; }
191 			int32				WriterCount() const { return fWriterCount; }
192 
193 			status_t			Select(uint8 event, selectsync* sync,
194 									int openMode);
195 			status_t			Deselect(uint8 event, selectsync* sync,
196 									int openMode);
197 
198 			void				Dump(bool dumpData) const;
199 	static	int					Dump(int argc, char** argv);
200 
201 private:
202 			timespec			fCreationTime;
203 			timespec			fModificationTime;
204 
205 			RingBuffer			fBuffer;
206 
207 			ReadRequestList		fReadRequests;
208 			WriteRequestList	fWriteRequests;
209 
210 			mutex				fRequestLock;
211 
212 			ConditionVariable	fWriteCondition;
213 
214 			int32				fReaderCount;
215 			int32				fWriterCount;
216 			bool				fActive;
217 
218 			select_sync_pool*	fReadSelectSyncPool;
219 			select_sync_pool*	fWriteSelectSyncPool;
220 };
221 
222 
223 class FIFOInode : public Inode {
224 public:
225 	FIFOInode(fs_vnode* vnode)
226 		:
227 		Inode(),
228 		fSuperVnode(*vnode)
229 	{
230 	}
231 
232 	fs_vnode*	SuperVnode() { return &fSuperVnode; }
233 
234 private:
235 	fs_vnode	fSuperVnode;
236 };
237 
238 
239 struct file_cookie {
240 	int	open_mode;
241 			// guarded by Inode::fRequestLock
242 
243 	void SetNonBlocking(bool nonBlocking)
244 	{
245 		if (nonBlocking)
246 			open_mode |= O_NONBLOCK;
247 		else
248 			open_mode &= ~(int)O_NONBLOCK;
249 	}
250 };
251 
252 
253 // #pragma mark -
254 
255 
256 RingBuffer::RingBuffer()
257 	:
258 	fBuffer(NULL)
259 {
260 }
261 
262 
263 RingBuffer::~RingBuffer()
264 {
265 	DeleteBuffer();
266 }
267 
268 
269 status_t
270 RingBuffer::CreateBuffer()
271 {
272 	if (fBuffer != NULL)
273 		return B_OK;
274 
275 	fBuffer = create_ring_buffer(VFS_FIFO_BUFFER_CAPACITY);
276 	return fBuffer != NULL ? B_OK : B_NO_MEMORY;
277 }
278 
279 
280 void
281 RingBuffer::DeleteBuffer()
282 {
283 	if (fBuffer != NULL) {
284 		delete_ring_buffer(fBuffer);
285 		fBuffer = NULL;
286 	}
287 }
288 
289 
290 inline ssize_t
291 RingBuffer::Write(const void* buffer, size_t length, bool isUser)
292 {
293 	if (fBuffer == NULL)
294 		return B_NO_MEMORY;
295 	if (isUser && !IS_USER_ADDRESS(buffer))
296 		return B_BAD_ADDRESS;
297 
298 	return isUser
299 		? ring_buffer_user_write(fBuffer, (const uint8*)buffer, length)
300 		: ring_buffer_write(fBuffer, (const uint8*)buffer, length);
301 }
302 
303 
304 inline ssize_t
305 RingBuffer::Read(void* buffer, size_t length, bool isUser)
306 {
307 	if (fBuffer == NULL)
308 		return B_NO_MEMORY;
309 	if (isUser && !IS_USER_ADDRESS(buffer))
310 		return B_BAD_ADDRESS;
311 
312 	return isUser
313 		? ring_buffer_user_read(fBuffer, (uint8*)buffer, length)
314 		: ring_buffer_read(fBuffer, (uint8*)buffer, length);
315 }
316 
317 
318 inline ssize_t
319 RingBuffer::Peek(size_t offset, void* buffer, size_t length) const
320 {
321 	if (fBuffer == NULL)
322 		return B_NO_MEMORY;
323 
324 	return ring_buffer_peek(fBuffer, offset, (uint8*)buffer, length);
325 }
326 
327 
328 inline size_t
329 RingBuffer::Readable() const
330 {
331 	return fBuffer != NULL ? ring_buffer_readable(fBuffer) : 0;
332 }
333 
334 
335 inline size_t
336 RingBuffer::Writable() const
337 {
338 	return fBuffer != NULL ? ring_buffer_writable(fBuffer) : 0;
339 }
340 
341 
342 //	#pragma mark -
343 
344 
345 Inode::Inode()
346 	:
347 	fReadRequests(),
348 	fWriteRequests(),
349 	fReaderCount(0),
350 	fWriterCount(0),
351 	fActive(false),
352 	fReadSelectSyncPool(NULL),
353 	fWriteSelectSyncPool(NULL)
354 {
355 	fWriteCondition.Publish(this, "pipe");
356 	mutex_init(&fRequestLock, "pipe request");
357 
358 	bigtime_t time = real_time_clock();
359 	fModificationTime.tv_sec = time / 1000000;
360 	fModificationTime.tv_nsec = (time % 1000000) * 1000;
361 	fCreationTime = fModificationTime;
362 }
363 
364 
365 Inode::~Inode()
366 {
367 	fWriteCondition.Unpublish();
368 	mutex_destroy(&fRequestLock);
369 }
370 
371 
372 status_t
373 Inode::InitCheck()
374 {
375 	return B_OK;
376 }
377 
378 
379 /*!	Writes the specified data bytes to the inode's ring buffer. The
380 	request lock must be held when calling this method.
381 	Notifies readers if necessary, so that blocking readers will get started.
382 	Returns B_OK for success, B_BAD_ADDRESS if copying from the buffer failed,
383 	and various semaphore errors (like B_WOULD_BLOCK in non-blocking mode). If
384 	the returned length is > 0, the returned error code can be ignored.
385 */
386 status_t
387 Inode::WriteDataToBuffer(const void* _data, size_t* _length, bool nonBlocking,
388 	bool isUser)
389 {
390 	const uint8* data = (const uint8*)_data;
391 	size_t dataSize = *_length;
392 	size_t& written = *_length;
393 	written = 0;
394 
395 	TRACE("Inode %p::WriteDataToBuffer(data = %p, bytes = %zu)\n", this, data,
396 		dataSize);
397 
398 	// A request up to VFS_FIFO_ATOMIC_WRITE_SIZE bytes shall not be
399 	// interleaved with other writer's data.
400 	size_t minToWrite = 1;
401 	if (dataSize <= VFS_FIFO_ATOMIC_WRITE_SIZE)
402 		minToWrite = dataSize;
403 
404 	while (dataSize > 0) {
405 		// Wait until enough space in the buffer is available.
406 		while (!fActive
407 				|| (fBuffer.Writable() < minToWrite && fReaderCount > 0)) {
408 			if (nonBlocking)
409 				return B_WOULD_BLOCK;
410 
411 			ConditionVariableEntry entry;
412 			entry.Add(this);
413 
414 			WriteRequest request(thread_get_current_thread(), minToWrite);
415 			fWriteRequests.Add(&request);
416 
417 			mutex_unlock(&fRequestLock);
418 			status_t status = entry.Wait(B_CAN_INTERRUPT);
419 			mutex_lock(&fRequestLock);
420 
421 			fWriteRequests.Remove(&request);
422 
423 			if (status != B_OK)
424 				return status;
425 		}
426 
427 		// write only as long as there are readers left
428 		if (fActive && fReaderCount == 0) {
429 			if (written == 0)
430 				send_signal(find_thread(NULL), SIGPIPE);
431 			return EPIPE;
432 		}
433 
434 		// write as much as we can
435 
436 		size_t toWrite = (fActive ? fBuffer.Writable() : 0);
437 		if (toWrite > dataSize)
438 			toWrite = dataSize;
439 
440 		if (toWrite > 0) {
441 			ssize_t bytesWritten = fBuffer.Write(data, toWrite, isUser);
442 			if (bytesWritten < 0)
443 				return bytesWritten;
444 		}
445 
446 		data += toWrite;
447 		dataSize -= toWrite;
448 		written += toWrite;
449 
450 		NotifyBytesWritten(toWrite);
451 	}
452 
453 	return B_OK;
454 }
455 
456 
457 status_t
458 Inode::ReadDataFromBuffer(void* data, size_t* _length, bool nonBlocking,
459 	bool isUser, ReadRequest& request)
460 {
461 	size_t dataSize = *_length;
462 	*_length = 0;
463 
464 	// wait until our request is first in queue
465 	status_t error;
466 	if (fReadRequests.Head() != &request) {
467 		if (nonBlocking)
468 			return B_WOULD_BLOCK;
469 
470 		TRACE("Inode %p::%s(): wait for request %p to become the first "
471 			"request.\n", this, __FUNCTION__, &request);
472 
473 		error = WaitForReadRequest(request);
474 		if (error != B_OK)
475 			return error;
476 	}
477 
478 	// wait until data are available
479 	while (fBuffer.Readable() == 0) {
480 		if (nonBlocking)
481 			return B_WOULD_BLOCK;
482 
483 		if (fActive && fWriterCount == 0)
484 			return B_OK;
485 
486 		TRACE("Inode %p::%s(): wait for data, request %p\n", this, __FUNCTION__,
487 			&request);
488 
489 		error = WaitForReadRequest(request);
490 		if (error != B_OK)
491 			return error;
492 	}
493 
494 	// read as much as we can
495 	size_t toRead = fBuffer.Readable();
496 	if (toRead > dataSize)
497 		toRead = dataSize;
498 
499 	ssize_t bytesRead = fBuffer.Read(data, toRead, isUser);
500 	if (bytesRead < 0)
501 		return bytesRead;
502 
503 	NotifyBytesRead(toRead);
504 
505 	*_length = toRead;
506 
507 	return B_OK;
508 }
509 
510 
511 void
512 Inode::AddReadRequest(ReadRequest& request)
513 {
514 	fReadRequests.Add(&request);
515 }
516 
517 
518 void
519 Inode::RemoveReadRequest(ReadRequest& request)
520 {
521 	fReadRequests.Remove(&request);
522 }
523 
524 
525 status_t
526 Inode::WaitForReadRequest(ReadRequest& request)
527 {
528 	// add the entry to wait on
529 	thread_prepare_to_block(thread_get_current_thread(), B_CAN_INTERRUPT,
530 		THREAD_BLOCK_TYPE_OTHER, "fifo read request");
531 
532 	request.SetNotified(false);
533 
534 	// wait
535 	mutex_unlock(&fRequestLock);
536 	status_t status = thread_block();
537 
538 	// Before going to lock again, we need to make sure no one tries to
539 	// unblock us. Otherwise that would screw with mutex_lock().
540 	request.SetNotified(true);
541 
542 	mutex_lock(&fRequestLock);
543 
544 	return status;
545 }
546 
547 
548 void
549 Inode::NotifyBytesRead(size_t bytes)
550 {
551 	// notify writer, if something can be written now
552 	size_t writable = fBuffer.Writable();
553 	if (bytes > 0) {
554 		// notify select()ors only, if nothing was writable before
555 		if (writable == bytes) {
556 			if (fWriteSelectSyncPool)
557 				notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
558 		}
559 
560 		// If any of the waiting writers has a minimal write count that has
561 		// now become satisfied, we notify all of them (condition variables
562 		// don't support doing that selectively).
563 		WriteRequest* request;
564 		WriteRequestList::Iterator iterator = fWriteRequests.GetIterator();
565 		while ((request = iterator.Next()) != NULL) {
566 			size_t minWriteCount = request->MinimalWriteCount();
567 			if (minWriteCount > 0 && minWriteCount <= writable
568 					&& minWriteCount > writable - bytes) {
569 				fWriteCondition.NotifyAll();
570 				break;
571 			}
572 		}
573 	}
574 }
575 
576 
577 void
578 Inode::NotifyReadDone()
579 {
580 	// notify next reader, if there's still something to be read
581 	if (fBuffer.Readable() > 0) {
582 		if (ReadRequest* request = fReadRequests.First())
583 			request->Notify();
584 	}
585 }
586 
587 
588 void
589 Inode::NotifyBytesWritten(size_t bytes)
590 {
591 	// notify reader, if something can be read now
592 	if (bytes > 0 && fBuffer.Readable() == bytes) {
593 		if (fReadSelectSyncPool)
594 			notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
595 
596 		if (ReadRequest* request = fReadRequests.First())
597 			request->Notify();
598 	}
599 }
600 
601 
602 void
603 Inode::NotifyEndClosed(bool writer)
604 {
605 	TRACE("Inode %p::%s(%s)\n", this, __FUNCTION__,
606 		writer ? "writer" : "reader");
607 
608 	if (writer) {
609 		// Our last writer has been closed; if the pipe
610 		// contains no data, unlock all waiting readers
611 		TRACE("  buffer readable: %zu\n", fBuffer.Readable());
612 		if (fBuffer.Readable() == 0) {
613 			ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
614 			while (ReadRequest* request = iterator.Next())
615 				request->Notify();
616 
617 			if (fReadSelectSyncPool)
618 				notify_select_event_pool(fReadSelectSyncPool, B_SELECT_DISCONNECTED);
619 
620 		}
621 	} else {
622 		// Last reader is gone. Wake up all writers.
623 		fWriteCondition.NotifyAll();
624 
625 		if (fWriteSelectSyncPool)
626 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_ERROR);
627 	}
628 }
629 
630 
631 void
632 Inode::Open(int openMode)
633 {
634 	MutexLocker locker(RequestLock());
635 
636 	if ((openMode & O_ACCMODE) == O_WRONLY || (openMode & O_ACCMODE) == O_RDWR)
637 		fWriterCount++;
638 
639 	if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR)
640 		fReaderCount++;
641 
642 	if (fReaderCount > 0 && fWriterCount > 0) {
643 		TRACE("Inode %p::Open(): fifo becomes active\n", this);
644 		fBuffer.CreateBuffer();
645 		fActive = true;
646 
647 		// notify all waiting writers that they can start
648 		if (fWriteSelectSyncPool)
649 			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
650 		fWriteCondition.NotifyAll();
651 	}
652 }
653 
654 
655 void
656 Inode::Close(file_cookie* cookie)
657 {
658 
659 	MutexLocker locker(RequestLock());
660 
661 	int openMode = cookie->open_mode;
662 	TRACE("Inode %p::Close(openMode = %" B_PRId32 ")\n", this, openMode);
663 
664 	// Notify all currently reading file descriptors
665 	ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
666 	while (ReadRequest* request = iterator.Next()) {
667 		if (request->Cookie() == cookie)
668 			request->Notify(B_FILE_ERROR);
669 	}
670 
671 	if ((openMode & O_ACCMODE) == O_WRONLY || (openMode & O_ACCMODE) == O_RDWR) {
672 		if (--fWriterCount == 0)
673 			NotifyEndClosed(true);
674 	}
675 
676 	if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR) {
677 		if (--fReaderCount == 0)
678 			NotifyEndClosed(false);
679 	}
680 
681 	if (fWriterCount == 0) {
682 		// Notify any still reading writers to stop
683 		// TODO: This only works reliable if there is only one writer - we could
684 		// do the same thing done for the read requests.
685 		fWriteCondition.NotifyAll(B_FILE_ERROR);
686 	}
687 
688 	if (fReaderCount == 0 && fWriterCount == 0) {
689 		fActive = false;
690 		fBuffer.DeleteBuffer();
691 	}
692 }
693 
694 
695 status_t
696 Inode::Select(uint8 event, selectsync* sync, int openMode)
697 {
698 	bool writer = true;
699 	select_sync_pool** pool;
700 	// B_SELECT_READ can happen on write-only opened fds, so restrain B_SELECT_READ to O_RDWR
701 	if ((event == B_SELECT_READ && (openMode & O_RWMASK) == O_RDWR)
702 		|| (openMode & O_RWMASK) == O_RDONLY) {
703 		pool = &fReadSelectSyncPool;
704 		writer = false;
705 	} else if ((openMode & O_RWMASK) == O_RDWR || (openMode & O_RWMASK) == O_WRONLY) {
706 		pool = &fWriteSelectSyncPool;
707 	} else
708 		return B_NOT_ALLOWED;
709 
710 	if (add_select_sync_pool_entry(pool, sync, event) != B_OK)
711 		return B_ERROR;
712 
713 	// signal right away, if the condition holds already
714 	if (writer) {
715 		if ((event == B_SELECT_WRITE && fBuffer.Writable() > 0)
716 			|| (event == B_SELECT_ERROR && fReaderCount == 0)) {
717 			return notify_select_event(sync, event);
718 		}
719 	} else {
720 		if ((event == B_SELECT_READ && fBuffer.Readable() > 0)
721 			|| (event == B_SELECT_DISCONNECTED && fWriterCount == 0)) {
722 			return notify_select_event(sync, event);
723 		}
724 	}
725 
726 	return B_OK;
727 }
728 
729 
730 status_t
731 Inode::Deselect(uint8 event, selectsync* sync, int openMode)
732 {
733 	select_sync_pool** pool;
734 	if ((event == B_SELECT_READ && (openMode & O_RWMASK) == O_RDWR)
735 		|| (openMode & O_RWMASK) == O_RDONLY) {
736 		pool = &fReadSelectSyncPool;
737 	} else if ((openMode & O_RWMASK) == O_RDWR || (openMode & O_RWMASK) == O_WRONLY) {
738 		pool = &fWriteSelectSyncPool;
739 	} else
740 		return B_NOT_ALLOWED;
741 
742 	remove_select_sync_pool_entry(pool, sync, event);
743 	return B_OK;
744 }
745 
746 
747 void
748 Inode::Dump(bool dumpData) const
749 {
750 	kprintf("FIFO %p\n", this);
751 	kprintf("  active:        %s\n", fActive ? "true" : "false");
752 	kprintf("  readers:       %" B_PRId32 "\n", fReaderCount);
753 	kprintf("  writers:       %" B_PRId32 "\n", fWriterCount);
754 
755 	if (!fReadRequests.IsEmpty()) {
756 		kprintf(" pending readers:\n");
757 		for (ReadRequestList::ConstIterator it = fReadRequests.GetIterator();
758 			ReadRequest* request = it.Next();) {
759 			kprintf("    %p: thread %" B_PRId32 ", cookie: %p\n", request,
760 				request->GetThread()->id, request->Cookie());
761 		}
762 	}
763 
764 	if (!fWriteRequests.IsEmpty()) {
765 		kprintf(" pending writers:\n");
766 		for (WriteRequestList::ConstIterator it = fWriteRequests.GetIterator();
767 			WriteRequest* request = it.Next();) {
768 			kprintf("    %p:  thread %" B_PRId32 ", min count: %zu\n", request,
769 				request->GetThread()->id, request->MinimalWriteCount());
770 		}
771 	}
772 
773 	kprintf("  %zu bytes buffered\n", fBuffer.Readable());
774 
775 	if (dumpData && fBuffer.Readable() > 0) {
776 		struct DataProvider : BKernel::HexDumpDataProvider {
777 			DataProvider(const RingBuffer& buffer)
778 				:
779 				fBuffer(buffer),
780 				fOffset(0)
781 			{
782 			}
783 
784 			virtual bool HasMoreData() const
785 			{
786 				return fOffset < fBuffer.Readable();
787 			}
788 
789 			virtual uint8 NextByte()
790 			{
791 				uint8 byte = '\0';
792 				if (fOffset < fBuffer.Readable()) {
793 					fBuffer.Peek(fOffset, &byte, 1);
794 					fOffset++;
795 				}
796 				return byte;
797 			}
798 
799 			virtual bool GetAddressString(char* buffer, size_t bufferSize) const
800 			{
801 				snprintf(buffer, bufferSize, "    %4zx", fOffset);
802 				return true;
803 			}
804 
805 		private:
806 			const RingBuffer&	fBuffer;
807 			size_t				fOffset;
808 		};
809 
810 		DataProvider dataProvider(fBuffer);
811 		BKernel::print_hex_dump(dataProvider, fBuffer.Readable());
812 	}
813 }
814 
815 
816 /*static*/ int
817 Inode::Dump(int argc, char** argv)
818 {
819 	bool dumpData = false;
820 	int argi = 1;
821 	if (argi < argc && strcmp(argv[argi], "-d") == 0) {
822 		dumpData = true;
823 		argi++;
824 	}
825 
826 	if (argi >= argc || argi + 2 < argc) {
827 		print_debugger_command_usage(argv[0]);
828 		return 0;
829 	}
830 
831 	Inode* node = (Inode*)parse_expression(argv[argi]);
832 	if (IS_USER_ADDRESS(node)) {
833 		kprintf("invalid FIFO address\n");
834 		return 0;
835 	}
836 
837 	node->Dump(dumpData);
838 	return 0;
839 }
840 
841 
842 //	#pragma mark - vnode API
843 
844 
845 static status_t
846 fifo_put_vnode(fs_volume* volume, fs_vnode* vnode, bool reenter)
847 {
848 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
849 	fs_vnode* superVnode = fifo->SuperVnode();
850 
851 	status_t error = B_OK;
852 	if (superVnode->ops->put_vnode != NULL)
853 		error = superVnode->ops->put_vnode(volume, superVnode, reenter);
854 
855 	delete fifo;
856 
857 	return error;
858 }
859 
860 
861 static status_t
862 fifo_remove_vnode(fs_volume* volume, fs_vnode* vnode, bool reenter)
863 {
864 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
865 	fs_vnode* superVnode = fifo->SuperVnode();
866 
867 	status_t error = B_OK;
868 	if (superVnode->ops->remove_vnode != NULL)
869 		error = superVnode->ops->remove_vnode(volume, superVnode, reenter);
870 
871 	delete fifo;
872 
873 	return error;
874 }
875 
876 
877 static status_t
878 fifo_open(fs_volume* _volume, fs_vnode* _node, int openMode,
879 	void** _cookie)
880 {
881 	Inode* inode = (Inode*)_node->private_node;
882 
883 	TRACE("fifo_open(): node = %p, openMode = %d\n", inode, openMode);
884 
885 	file_cookie* cookie = (file_cookie*)malloc(sizeof(file_cookie));
886 	if (cookie == NULL)
887 		return B_NO_MEMORY;
888 
889 	TRACE("  open cookie = %p\n", cookie);
890 	cookie->open_mode = openMode;
891 	inode->Open(openMode);
892 
893 	*_cookie = (void*)cookie;
894 
895 	return B_OK;
896 }
897 
898 
899 static status_t
900 fifo_close(fs_volume* volume, fs_vnode* vnode, void* _cookie)
901 {
902 	file_cookie* cookie = (file_cookie*)_cookie;
903 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
904 
905 	fifo->Close(cookie);
906 
907 	return B_OK;
908 }
909 
910 
911 static status_t
912 fifo_free_cookie(fs_volume* _volume, fs_vnode* _node, void* _cookie)
913 {
914 	file_cookie* cookie = (file_cookie*)_cookie;
915 
916 	TRACE("fifo_freecookie: entry vnode %p, cookie %p\n", _node, _cookie);
917 
918 	free(cookie);
919 
920 	return B_OK;
921 }
922 
923 
924 static status_t
925 fifo_fsync(fs_volume* _volume, fs_vnode* _node)
926 {
927 	return B_BAD_VALUE;
928 }
929 
930 
931 static status_t
932 fifo_read(fs_volume* _volume, fs_vnode* _node, void* _cookie,
933 	off_t /*pos*/, void* buffer, size_t* _length)
934 {
935 	file_cookie* cookie = (file_cookie*)_cookie;
936 	Inode* inode = (Inode*)_node->private_node;
937 
938 	TRACE("fifo_read(vnode = %p, cookie = %p, length = %lu, mode = %d)\n",
939 		inode, cookie, *_length, cookie->open_mode);
940 
941 	MutexLocker locker(inode->RequestLock());
942 
943 	if (inode->IsActive() && inode->WriterCount() == 0) {
944 		// as long there is no writer, and the pipe is empty,
945 		// we always just return 0 to indicate end of file
946 		if (inode->BytesAvailable() == 0) {
947 			*_length = 0;
948 			return B_OK;
949 		}
950 	}
951 
952 	// issue read request
953 
954 	ReadRequest request(cookie);
955 	inode->AddReadRequest(request);
956 
957 	TRACE("  issue read request %p\n", &request);
958 
959 	size_t length = *_length;
960 	status_t status = inode->ReadDataFromBuffer(buffer, &length,
961 		(cookie->open_mode & O_NONBLOCK) != 0, is_called_via_syscall(),
962 		request);
963 
964 	inode->RemoveReadRequest(request);
965 	inode->NotifyReadDone();
966 
967 	TRACE("  done reading request %p, length %zu\n", &request, length);
968 
969 	if (length > 0)
970 		status = B_OK;
971 
972 	*_length = length;
973 	return status;
974 }
975 
976 
977 static status_t
978 fifo_write(fs_volume* _volume, fs_vnode* _node, void* _cookie,
979 	off_t /*pos*/, const void* buffer, size_t* _length)
980 {
981 	file_cookie* cookie = (file_cookie*)_cookie;
982 	Inode* inode = (Inode*)_node->private_node;
983 
984 	TRACE("fifo_write(vnode = %p, cookie = %p, length = %lu)\n",
985 		_node, cookie, *_length);
986 
987 	MutexLocker locker(inode->RequestLock());
988 
989 	size_t length = *_length;
990 	if (length == 0)
991 		return B_OK;
992 
993 	// copy data into ring buffer
994 	status_t status = inode->WriteDataToBuffer(buffer, &length,
995 		(cookie->open_mode & O_NONBLOCK) != 0, is_called_via_syscall());
996 
997 	if (length > 0)
998 		status = B_OK;
999 
1000 	*_length = length;
1001 	return status;
1002 }
1003 
1004 
1005 static status_t
1006 fifo_read_stat(fs_volume* volume, fs_vnode* vnode, struct ::stat* st)
1007 {
1008 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
1009 	fs_vnode* superVnode = fifo->SuperVnode();
1010 
1011 	if (superVnode->ops->read_stat == NULL)
1012 		return B_BAD_VALUE;
1013 
1014 	status_t error = superVnode->ops->read_stat(volume, superVnode, st);
1015 	if (error != B_OK)
1016 		return error;
1017 
1018 
1019 	MutexLocker locker(fifo->RequestLock());
1020 
1021 	st->st_size = fifo->BytesAvailable();
1022 
1023 	st->st_blksize = 4096;
1024 
1025 	// TODO: Just pass the changes to our modification time on to the super node.
1026 	st->st_atim.tv_sec = time(NULL);
1027 	st->st_atim.tv_nsec = 0;
1028 	st->st_mtim = st->st_ctim = fifo->ModificationTime();
1029 
1030 	return B_OK;
1031 }
1032 
1033 
1034 static status_t
1035 fifo_write_stat(fs_volume* volume, fs_vnode* vnode, const struct ::stat* st,
1036 	uint32 statMask)
1037 {
1038 	// we cannot change the size of anything
1039 	if ((statMask & B_STAT_SIZE) != 0)
1040 		return B_BAD_VALUE;
1041 
1042 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
1043 	fs_vnode* superVnode = fifo->SuperVnode();
1044 
1045 	if (superVnode->ops->write_stat == NULL)
1046 		return B_BAD_VALUE;
1047 
1048 	status_t error = superVnode->ops->write_stat(volume, superVnode, st,
1049 		statMask);
1050 	if (error != B_OK)
1051 		return error;
1052 
1053 	return B_OK;
1054 }
1055 
1056 
1057 static status_t
1058 fifo_ioctl(fs_volume* _volume, fs_vnode* _node, void* _cookie, uint32 op,
1059 	void* buffer, size_t length)
1060 {
1061 	file_cookie* cookie = (file_cookie*)_cookie;
1062 	Inode* inode = (Inode*)_node->private_node;
1063 
1064 	TRACE("fifo_ioctl: vnode %p, cookie %p, op %" B_PRId32 ", buf %p, len %ld\n",
1065 		_node, _cookie, op, buffer, length);
1066 
1067 	switch (op) {
1068 		case FIONREAD:
1069 		{
1070 			if (buffer == NULL)
1071 				return B_BAD_VALUE;
1072 
1073 			MutexLocker locker(inode->RequestLock());
1074 			int available = (int)inode->BytesAvailable();
1075 			locker.Unlock();
1076 
1077 			if (is_called_via_syscall()) {
1078 				if (!IS_USER_ADDRESS(buffer)
1079 					|| user_memcpy(buffer, &available, sizeof(available))
1080 						!= B_OK) {
1081 					return B_BAD_ADDRESS;
1082 				}
1083 			} else
1084 				*(int*)buffer = available;
1085 
1086 			return B_OK;
1087 		}
1088 
1089 		case B_SET_BLOCKING_IO:
1090 		case B_SET_NONBLOCKING_IO:
1091 		{
1092 			MutexLocker locker(inode->RequestLock());
1093 			cookie->SetNonBlocking(op == B_SET_NONBLOCKING_IO);
1094 			return B_OK;
1095 		}
1096 	}
1097 
1098 	return EINVAL;
1099 }
1100 
1101 
1102 static status_t
1103 fifo_set_flags(fs_volume* _volume, fs_vnode* _node, void* _cookie,
1104 	int flags)
1105 {
1106 	Inode* inode = (Inode*)_node->private_node;
1107 	file_cookie* cookie = (file_cookie*)_cookie;
1108 
1109 	TRACE("fifo_set_flags(vnode = %p, flags = %x)\n", _node, flags);
1110 
1111 	MutexLocker locker(inode->RequestLock());
1112 	cookie->open_mode = (cookie->open_mode & ~(O_APPEND | O_NONBLOCK)) | flags;
1113 	return B_OK;
1114 }
1115 
1116 
1117 static status_t
1118 fifo_select(fs_volume* _volume, fs_vnode* _node, void* _cookie,
1119 	uint8 event, selectsync* sync)
1120 {
1121 	file_cookie* cookie = (file_cookie*)_cookie;
1122 
1123 	TRACE("fifo_select(vnode = %p)\n", _node);
1124 	Inode* inode = (Inode*)_node->private_node;
1125 	if (!inode)
1126 		return B_ERROR;
1127 
1128 	MutexLocker locker(inode->RequestLock());
1129 	return inode->Select(event, sync, cookie->open_mode);
1130 }
1131 
1132 
1133 static status_t
1134 fifo_deselect(fs_volume* _volume, fs_vnode* _node, void* _cookie,
1135 	uint8 event, selectsync* sync)
1136 {
1137 	file_cookie* cookie = (file_cookie*)_cookie;
1138 
1139 	TRACE("fifo_deselect(vnode = %p)\n", _node);
1140 	Inode* inode = (Inode*)_node->private_node;
1141 	if (inode == NULL)
1142 		return B_ERROR;
1143 
1144 	MutexLocker locker(inode->RequestLock());
1145 	return inode->Deselect(event, sync, cookie->open_mode);
1146 }
1147 
1148 
1149 static bool
1150 fifo_can_page(fs_volume* _volume, fs_vnode* _node, void* cookie)
1151 {
1152 	return false;
1153 }
1154 
1155 
1156 static status_t
1157 fifo_read_pages(fs_volume* _volume, fs_vnode* _node, void* cookie, off_t pos,
1158 	const iovec* vecs, size_t count, size_t* _numBytes)
1159 {
1160 	return B_NOT_ALLOWED;
1161 }
1162 
1163 
1164 static status_t
1165 fifo_write_pages(fs_volume* _volume, fs_vnode* _node, void* cookie,
1166 	off_t pos, const iovec* vecs, size_t count, size_t* _numBytes)
1167 {
1168 	return B_NOT_ALLOWED;
1169 }
1170 
1171 
1172 static status_t
1173 fifo_get_super_vnode(fs_volume* volume, fs_vnode* vnode, fs_volume* superVolume,
1174 	fs_vnode* _superVnode)
1175 {
1176 	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
1177 	fs_vnode* superVnode = fifo->SuperVnode();
1178 
1179 	if (superVnode->ops->get_super_vnode != NULL) {
1180 		return superVnode->ops->get_super_vnode(volume, superVnode, superVolume,
1181 			_superVnode);
1182 	}
1183 
1184 	*_superVnode = *superVnode;
1185 
1186 	return B_OK;
1187 }
1188 
1189 
1190 static fs_vnode_ops sFIFOVnodeOps = {
1191 	NULL,	// lookup
1192 	NULL,	// get_vnode_name
1193 					// TODO: This is suboptimal! We'd need to forward the
1194 					// super node's hook, if it has got one.
1195 
1196 	&fifo_put_vnode,
1197 	&fifo_remove_vnode,
1198 
1199 	&fifo_can_page,
1200 	&fifo_read_pages,
1201 	&fifo_write_pages,
1202 
1203 	NULL,	// io()
1204 	NULL,	// cancel_io()
1205 
1206 	NULL,	// get_file_map
1207 
1208 	/* common */
1209 	&fifo_ioctl,
1210 	&fifo_set_flags,
1211 	&fifo_select,
1212 	&fifo_deselect,
1213 	&fifo_fsync,
1214 
1215 	NULL,	// fs_read_link
1216 	NULL,	// fs_symlink
1217 	NULL,	// fs_link
1218 	NULL,	// unlink
1219 	NULL,	// rename
1220 
1221 	NULL,	// fs_access()
1222 	&fifo_read_stat,
1223 	&fifo_write_stat,
1224 	NULL,
1225 
1226 	/* file */
1227 	NULL,	// create()
1228 	&fifo_open,
1229 	&fifo_close,
1230 	&fifo_free_cookie,
1231 	&fifo_read,
1232 	&fifo_write,
1233 
1234 	/* directory */
1235 	NULL,	// create_dir
1236 	NULL,	// remove_dir
1237 	NULL,	// open_dir
1238 	NULL,	// close_dir
1239 	NULL,	// free_dir_cookie
1240 	NULL,	// read_dir
1241 	NULL,	// rewind_dir
1242 
1243 	/* attribute directory operations */
1244 	NULL,	// open_attr_dir
1245 	NULL,	// close_attr_dir
1246 	NULL,	// free_attr_dir_cookie
1247 	NULL,	// read_attr_dir
1248 	NULL,	// rewind_attr_dir
1249 
1250 	/* attribute operations */
1251 	NULL,	// create_attr
1252 	NULL,	// open_attr
1253 	NULL,	// close_attr
1254 	NULL,	// free_attr_cookie
1255 	NULL,	// read_attr
1256 	NULL,	// write_attr
1257 
1258 	NULL,	// read_attr_stat
1259 	NULL,	// write_attr_stat
1260 	NULL,	// rename_attr
1261 	NULL,	// remove_attr
1262 
1263 	/* support for node and FS layers */
1264 	NULL,	// create_special_node
1265 	&fifo_get_super_vnode,
1266 };
1267 
1268 
1269 }	// namespace fifo
1270 
1271 
1272 using namespace fifo;
1273 
1274 
1275 // #pragma mark -
1276 
1277 
1278 status_t
1279 create_fifo_vnode(fs_volume* superVolume, fs_vnode* vnode)
1280 {
1281 	FIFOInode* fifo = new(std::nothrow) FIFOInode(vnode);
1282 	if (fifo == NULL)
1283 		return B_NO_MEMORY;
1284 
1285 	status_t status = fifo->InitCheck();
1286 	if (status != B_OK) {
1287 		delete fifo;
1288 		return status;
1289 	}
1290 
1291 	vnode->private_node = fifo;
1292 	vnode->ops = &sFIFOVnodeOps;
1293 
1294 	return B_OK;
1295 }
1296 
1297 
1298 void
1299 fifo_init()
1300 {
1301 	add_debugger_command_etc("fifo", &Inode::Dump,
1302 		"Print info about the specified FIFO node",
1303 		"[ \"-d\" ] <address>\n"
1304 		"Prints information about the FIFO node specified by address\n"
1305 		"<address>. If \"-d\" is given, the data in the FIFO's ring buffer\n"
1306 		"hexdumped as well.\n",
1307 		0);
1308 }
1309