xref: /haiku/src/add-ons/kernel/network/protocols/unix/UnixEndpoint.cpp (revision cda5b8808fd0262f0fac472f6cfa809f846a83cf)
1 /*
2  * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
3  * Distributed under the terms of the MIT License.
4  */
5 
6 #include "UnixEndpoint.h"
7 
8 #include <stdio.h>
9 #include <sys/stat.h>
10 
11 #include <AutoDeleter.h>
12 
13 #include <vfs.h>
14 
15 #include "UnixAddressManager.h"
16 #include "UnixFifo.h"
17 
18 
19 #define UNIX_ENDPOINT_DEBUG_LEVEL	2
20 #define UNIX_DEBUG_LEVEL			UNIX_ENDPOINT_DEBUG_LEVEL
21 #include "UnixDebug.h"
22 
23 
24 // Note on locking order (outermost -> innermost):
25 // UnixEndpoint: connecting -> listening -> child
26 // -> UnixFifo (never lock more than one at a time)
27 // -> UnixAddressManager
28 
29 
30 static inline bigtime_t
31 absolute_timeout(bigtime_t timeout)
32 {
33 	if (timeout == 0 || timeout == B_INFINITE_TIMEOUT)
34 		return timeout;
35 
36 // TODO: Make overflow safe!
37 	return timeout + system_time();
38 }
39 
40 
41 UnixEndpoint::UnixEndpoint(net_socket* socket)
42 	:
43 	ProtocolSocket(socket),
44 	fAddress(),
45 	fAddressHashLink(),
46 	fPeerEndpoint(NULL),
47 	fReceiveFifo(NULL),
48 	fState(UNIX_ENDPOINT_CLOSED),
49 	fAcceptSemaphore(-1),
50 	fIsChild(false)
51 {
52 	TRACE("[%ld] %p->UnixEndpoint::UnixEndpoint()\n", find_thread(NULL), this);
53 
54 	fLock.sem = -1;
55 }
56 
57 
58 UnixEndpoint::~UnixEndpoint()
59 {
60 	TRACE("[%ld] %p->UnixEndpoint::~UnixEndpoint()\n", find_thread(NULL), this);
61 
62 	if (fLock.sem >= 0)
63 		benaphore_destroy(&fLock);
64 }
65 
66 
67 status_t
68 UnixEndpoint::Init()
69 {
70 	TRACE("[%ld] %p->UnixEndpoint::Init()\n", find_thread(NULL), this);
71 
72 	status_t error = benaphore_init(&fLock, "unix endpoint");
73 	if (error != B_OK)
74 		RETURN_ERROR(ENOBUFS);
75 
76 	RETURN_ERROR(B_OK);
77 }
78 
79 
80 void
81 UnixEndpoint::Uninit()
82 {
83 	TRACE("[%ld] %p->UnixEndpoint::Uninit()\n", find_thread(NULL), this);
84 
85 	// check whether we're closed
86 	UnixEndpointLocker locker(this);
87 	bool closed = (fState == UNIX_ENDPOINT_CLOSED);
88 	locker.Unlock();
89 
90 	if (!closed) {
91 		// That probably means, we're a child endpoint of a listener and
92 		// have been fully connected, but not yet accepted. Our Close()
93 		// hook isn't called in this case. Do it manually.
94 		Close();
95 	}
96 
97 	RemoveReference();
98 }
99 
100 
101 status_t
102 UnixEndpoint::Open()
103 {
104 	TRACE("[%ld] %p->UnixEndpoint::Open()\n", find_thread(NULL), this);
105 
106 	status_t error = ProtocolSocket::Open();
107 	if (error != B_OK)
108 		RETURN_ERROR(error);
109 
110 	fState = UNIX_ENDPOINT_NOT_CONNECTED;
111 
112 	RETURN_ERROR(B_OK);
113 }
114 
115 
116 status_t
117 UnixEndpoint::Close()
118 {
119 	TRACE("[%ld] %p->UnixEndpoint::Close()\n", find_thread(NULL), this);
120 
121 	UnixEndpointLocker locker(this);
122 
123 	if (fState == UNIX_ENDPOINT_CONNECTED) {
124 		UnixEndpointLocker peerLocker;
125 		if (_LockConnectedEndpoints(locker, peerLocker) == B_OK) {
126 			// We're still connected. Disconnect both endpoints!
127 			fPeerEndpoint->_Disconnect();
128 			_Disconnect();
129 		}
130 	}
131 
132 	if (fState == UNIX_ENDPOINT_LISTENING)
133 		_StopListening();
134 
135 	_Unbind();
136 
137 	fState = UNIX_ENDPOINT_CLOSED;
138 	RETURN_ERROR(B_OK);
139 }
140 
141 
142 status_t
143 UnixEndpoint::Free()
144 {
145 	TRACE("[%ld] %p->UnixEndpoint::Free()\n", find_thread(NULL), this);
146 
147 	UnixEndpointLocker locker(this);
148 
149 	_UnsetReceiveFifo();
150 
151 	RETURN_ERROR(B_OK);
152 }
153 
154 
155 status_t
156 UnixEndpoint::Bind(const struct sockaddr *_address)
157 {
158 	if (_address->sa_family != AF_UNIX)
159 		RETURN_ERROR(EAFNOSUPPORT);
160 
161 	TRACE("[%ld] %p->UnixEndpoint::Bind(\"%s\")\n", find_thread(NULL), this,
162 		ConstSocketAddress(&gAddressModule, _address).AsString().Data());
163 
164 	const sockaddr_un* address = (const sockaddr_un*)_address;
165 
166 	UnixEndpointLocker endpointLocker(this);
167 
168 	if (fState != UNIX_ENDPOINT_NOT_CONNECTED || IsBound())
169 		RETURN_ERROR(B_BAD_VALUE);
170 
171 	if (address->sun_path[0] == '\0') {
172 		UnixAddressManagerLocker addressLocker(gAddressManager);
173 
174 		// internal address space (or empty address)
175 		int32 internalID;
176 		if (UnixAddress::IsEmptyAddress(*address))
177 			internalID = gAddressManager.NextUnusedInternalID();
178 		else
179 			internalID = UnixAddress::InternalID(*address);
180 		if (internalID < 0)
181 			RETURN_ERROR(internalID);
182 
183 		status_t error = _Bind(internalID);
184 		if (error != B_OK)
185 			RETURN_ERROR(error);
186 
187 		sockaddr_un* outAddress = (sockaddr_un*)&socket->address;
188 		outAddress->sun_path[0] = '\0';
189 		sprintf(outAddress->sun_path + 1, "%05lx", internalID);
190 		outAddress->sun_len = INTERNAL_UNIX_ADDRESS_LEN;
191 			// null-byte + 5 hex digits
192 
193 		gAddressManager.Add(this);
194 	} else {
195 		// FS address space
196 		size_t pathLen = strnlen(address->sun_path, sizeof(address->sun_path));
197 		if (pathLen == 0 || pathLen == sizeof(address->sun_path))
198 			RETURN_ERROR(B_BAD_VALUE);
199 
200 		struct vnode* vnode;
201 		status_t error = vfs_create_special_node(address->sun_path,
202 			NULL, S_IFSOCK | 0644, 0, !gStackModule->is_syscall(), NULL,
203 			&vnode);
204 		if (error != B_OK)
205 			RETURN_ERROR(error == B_FILE_EXISTS ? EADDRINUSE : error);
206 
207 		error = _Bind(vnode);
208 		if (error != B_OK) {
209 			vfs_put_vnode(vnode);
210 			RETURN_ERROR(error);
211 		}
212 
213 		size_t addressLen = address->sun_path + pathLen + 1 - (char*)address;
214 		memcpy(&socket->address, address, addressLen);
215 		socket->address.ss_len = addressLen;
216 
217 		UnixAddressManagerLocker addressLocker(gAddressManager);
218 		gAddressManager.Add(this);
219 	}
220 
221 	RETURN_ERROR(B_OK);
222 }
223 
224 
225 status_t
226 UnixEndpoint::Unbind()
227 {
228 	TRACE("[%ld] %p->UnixEndpoint::Unbind()\n", find_thread(NULL), this);
229 
230 	UnixEndpointLocker endpointLocker(this);
231 
232 	RETURN_ERROR(_Unbind());
233 }
234 
235 
236 status_t
237 UnixEndpoint::Listen(int backlog)
238 {
239 	TRACE("[%ld] %p->UnixEndpoint::Listen(%d)\n", find_thread(NULL), this,
240 		backlog);
241 
242 	UnixEndpointLocker endpointLocker(this);
243 
244 	if (!IsBound())
245 		RETURN_ERROR(EDESTADDRREQ);
246 	if (fState != UNIX_ENDPOINT_NOT_CONNECTED)
247 		RETURN_ERROR(EINVAL);
248 
249 	gSocketModule->set_max_backlog(socket, backlog);
250 
251 	fAcceptSemaphore = create_sem(0, "unix accept");
252 	if (fAcceptSemaphore < 0)
253 		RETURN_ERROR(ENOBUFS);
254 
255 	_UnsetReceiveFifo();
256 
257 	fState = UNIX_ENDPOINT_LISTENING;
258 
259 	RETURN_ERROR(B_OK);
260 }
261 
262 
263 status_t
264 UnixEndpoint::Connect(const struct sockaddr *_address)
265 {
266 	if (_address->sa_family != AF_UNIX)
267 		RETURN_ERROR(EAFNOSUPPORT);
268 
269 	TRACE("[%ld] %p->UnixEndpoint::Connect(\"%s\")\n", find_thread(NULL), this,
270 		ConstSocketAddress(&gAddressModule, _address).AsString().Data());
271 
272 	const sockaddr_un* address = (const sockaddr_un*)_address;
273 
274 	UnixEndpointLocker endpointLocker(this);
275 
276 	if (fState == UNIX_ENDPOINT_CONNECTED)
277 		RETURN_ERROR(EISCONN);
278 
279 	if (fState != UNIX_ENDPOINT_NOT_CONNECTED)
280 		RETURN_ERROR(B_BAD_VALUE);
281 // TODO: If listening, we could set the backlog to 0 and connect.
282 
283 	// check the address first
284 	UnixAddress unixAddress;
285 
286 	if (address->sun_path[0] == '\0') {
287 		// internal address space (or empty address)
288 		int32 internalID;
289 		if (UnixAddress::IsEmptyAddress(*address))
290 			RETURN_ERROR(B_BAD_VALUE);
291 
292 		internalID = UnixAddress::InternalID(*address);
293 		if (internalID < 0)
294 			RETURN_ERROR(internalID);
295 
296 		unixAddress.SetTo(internalID);
297 	} else {
298 		// FS address space
299 		size_t pathLen = strnlen(address->sun_path, sizeof(address->sun_path));
300 		if (pathLen == 0 || pathLen == sizeof(address->sun_path))
301 			RETURN_ERROR(B_BAD_VALUE);
302 
303 		struct stat st;
304 		status_t error = vfs_read_stat(-1, address->sun_path, true, &st,
305 			!gStackModule->is_syscall());
306 		if (error != B_OK)
307 			RETURN_ERROR(error);
308 
309 		if (!S_ISSOCK(st.st_mode))
310 			RETURN_ERROR(B_BAD_VALUE);
311 
312 		unixAddress.SetTo(st.st_dev, st.st_ino, NULL);
313 	}
314 
315 	// get the peer endpoint
316 	UnixAddressManagerLocker addressLocker(gAddressManager);
317 	UnixEndpoint* listeningEndpoint = gAddressManager.Lookup(unixAddress);
318 	if (listeningEndpoint == NULL)
319 		RETURN_ERROR(ECONNREFUSED);
320 	Reference<UnixEndpoint> peerReference(listeningEndpoint);
321 	addressLocker.Unlock();
322 
323 	UnixEndpointLocker peerLocker(listeningEndpoint);
324 
325 	if (!listeningEndpoint->IsBound()
326 		|| listeningEndpoint->fState != UNIX_ENDPOINT_LISTENING
327 		|| listeningEndpoint->fAddress != unixAddress) {
328 		RETURN_ERROR(ECONNREFUSED);
329 	}
330 
331 	// Allocate FIFOs for us and the socket we're going to spawn. We do that
332 	// now, so that the mess we need to cleanup, if allocating them fails, is
333 	// harmless.
334 	UnixFifo* fifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT);
335 	UnixFifo* peerFifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT);
336 	ObjectDeleter<UnixFifo> fifoDeleter(fifo);
337 	ObjectDeleter<UnixFifo> peerFifoDeleter(peerFifo);
338 
339 	status_t error;
340 	if ((error = fifo->Init()) != B_OK || (error = peerFifo->Init()) != B_OK)
341 		return error;
342 
343 	// spawn new endpoint for accept()
344 	net_socket* newSocket;
345 	error = gSocketModule->spawn_pending_socket(listeningEndpoint->socket,
346 		&newSocket);
347 	if (error != B_OK)
348 		RETURN_ERROR(error);
349 
350 	// init connected peer endpoint
351 	UnixEndpoint* connectedEndpoint = (UnixEndpoint*)newSocket->first_protocol;
352 
353 	UnixEndpointLocker connectedLocker(connectedEndpoint);
354 
355 	connectedEndpoint->_Spawn(this, peerFifo);
356 
357 	// update our attributes
358 	_UnsetReceiveFifo();
359 
360 	fPeerEndpoint = connectedEndpoint;
361 	PeerAddress().SetTo(&connectedEndpoint->socket->address);
362 	fPeerEndpoint->AddReference();
363 	fReceiveFifo = fifo;
364 
365 	fifoDeleter.Detach();
366 	peerFifoDeleter.Detach();
367 
368 	fState = UNIX_ENDPOINT_CONNECTED;
369 
370 	gSocketModule->set_connected(newSocket);
371 
372 	release_sem(listeningEndpoint->fAcceptSemaphore);
373 
374 	connectedLocker.Unlock();
375 	peerLocker.Unlock();
376 	endpointLocker.Unlock();
377 
378 	RETURN_ERROR(B_OK);
379 }
380 
381 
382 status_t
383 UnixEndpoint::Accept(net_socket **_acceptedSocket)
384 {
385 	TRACE("[%ld] %p->UnixEndpoint::Accept()\n", find_thread(NULL), this);
386 
387 	bigtime_t timeout = absolute_timeout(socket->receive.timeout);
388 	if (gStackModule->is_restarted_syscall())
389 		timeout = gStackModule->restore_syscall_restart_timeout();
390 	else
391 		gStackModule->store_syscall_restart_timeout(timeout);
392 
393 	UnixEndpointLocker locker(this);
394 
395 	status_t error;
396 	do {
397 		locker.Unlock();
398 
399 		error = acquire_sem_etc(fAcceptSemaphore, 1,
400 			B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
401 		if (error < B_OK)
402 			RETURN_ERROR(error);
403 
404 		locker.Lock();
405 		error = gSocketModule->dequeue_connected(socket, _acceptedSocket);
406 	} while (error != B_OK);
407 
408 	if (error == B_TIMED_OUT && timeout == 0) {
409 		// translate non-blocking timeouts to the correct error code
410 		error = B_WOULD_BLOCK;
411 	}
412 
413 	RETURN_ERROR(error);
414 }
415 
416 
417 status_t
418 UnixEndpoint::Send(net_buffer *buffer)
419 {
420 	TRACE("[%ld] %p->UnixEndpoint::Send(%p)\n", find_thread(NULL), this,
421 		buffer);
422 
423 	bigtime_t timeout = absolute_timeout(socket->send.timeout);
424 	if (gStackModule->is_restarted_syscall())
425 		timeout = gStackModule->restore_syscall_restart_timeout();
426 	else
427 		gStackModule->store_syscall_restart_timeout(timeout);
428 
429 	UnixEndpointLocker locker(this);
430 	UnixEndpointLocker peerLocker;
431 
432 	status_t error = _LockConnectedEndpoints(locker, peerLocker);
433 	if (error != B_OK)
434 		RETURN_ERROR(error);
435 
436 	UnixEndpoint* peerEndpoint = fPeerEndpoint;
437 	Reference<UnixEndpoint> peerReference(peerEndpoint);
438 
439 	// lock the peer's FIFO
440 	UnixFifo* peerFifo = peerEndpoint->fReceiveFifo;
441 	Reference<UnixFifo> _(peerFifo);
442 	UnixFifoLocker fifoLocker(peerFifo);
443 
444 	// unlock endpoints
445 	locker.Unlock();
446 	peerLocker.Unlock();
447 
448 	error = peerFifo->Write(buffer, timeout);
449 
450 	// Notify select()ing readers, if we successfully wrote anything.
451 	size_t readable = peerFifo->Readable();
452 	bool notifyRead = (error == B_OK && readable > 0
453 		&& !peerFifo->IsReadShutdown());
454 
455 	// Notify select()ing writers, if we failed to write anything and there's
456 	// still room to write.
457 	size_t writable = peerFifo->Writable();
458 	bool notifyWrite = (error != B_OK && writable > 0
459 		&& !peerFifo->IsWriteShutdown());
460 
461 	// re-lock our endpoint (unlock FIFO to respect locking order)
462 	fifoLocker.Unlock();
463 	locker.Lock();
464 
465 	bool peerLocked = (fPeerEndpoint == peerEndpoint
466 		&& _LockConnectedEndpoints(locker, peerLocker) == B_OK);
467 
468 	// send notifications
469 	if (peerLocked && notifyRead)
470 		gSocketModule->notify(peerEndpoint->socket, B_SELECT_READ, readable);
471 	if (notifyWrite)
472 		gSocketModule->notify(socket, B_SELECT_WRITE, writable);
473 
474 	switch (error) {
475 		case UNIX_FIFO_SHUTDOWN:
476 			if (fPeerEndpoint == peerEndpoint
477 					&& fState == UNIX_ENDPOINT_CONNECTED) {
478 				// Orderly write shutdown on our side.
479 				// Note: Linux and Solaris also send a SIGPIPE, but according
480 				// the send() specification that shouldn't be done.
481 				error = EPIPE;
482 			} else {
483 				// The FD has been closed.
484 				error = EBADF;
485 			}
486 			break;
487 		case EPIPE:
488 			// The peer closed connection or shutdown its read side. Reward
489 			// the caller with a SIGPIPE.
490 			if (gStackModule->is_syscall())
491 				send_signal(find_thread(NULL), SIGPIPE);
492 			break;
493 		case B_TIMED_OUT:
494 			// Translate non-blocking timeouts to the correct error code.
495 			if (timeout == 0)
496 				error = B_WOULD_BLOCK;
497 			break;
498 	}
499 
500 	RETURN_ERROR(error);
501 }
502 
503 
504 status_t
505 UnixEndpoint::Receive(size_t numBytes, uint32 flags, net_buffer **_buffer)
506 {
507 	TRACE("[%ld] %p->UnixEndpoint::Receive(%ld, 0x%lx)\n", find_thread(NULL),
508 		this, numBytes, flags);
509 
510 	bigtime_t timeout = absolute_timeout(socket->receive.timeout);
511 	if (gStackModule->is_restarted_syscall())
512 		timeout = gStackModule->restore_syscall_restart_timeout();
513 	else
514 		gStackModule->store_syscall_restart_timeout(timeout);
515 
516 	UnixEndpointLocker locker(this);
517 
518 	// We can read as long as we have a FIFO. I.e. we are still connected, or
519 	// disconnected and not yet reconnected/listening/closed.
520 	if (fReceiveFifo == NULL)
521 		RETURN_ERROR(ENOTCONN);
522 
523 	UnixEndpoint* peerEndpoint = fPeerEndpoint;
524 	Reference<UnixEndpoint> peerReference(peerEndpoint);
525 
526 	// lock our FIFO
527 	UnixFifo* fifo = fReceiveFifo;
528 	Reference<UnixFifo> _(fifo);
529 	UnixFifoLocker fifoLocker(fifo);
530 
531 	// unlock endpoint
532 	locker.Unlock();
533 
534 	status_t error = fifo->Read(numBytes, timeout, _buffer);
535 
536 	// Notify select()ing writers, if we successfully read anything.
537 	size_t writable = fifo->Writable();
538 	bool notifyWrite = (error == B_OK && writable > 0
539 		&& !fifo->IsWriteShutdown());
540 
541 	// Notify select()ing readers, if we failed to read anything and there's
542 	// still something left to read.
543 	size_t readable = fifo->Readable();
544 	bool notifyRead = (error != B_OK && readable > 0
545 		&& !fifo->IsReadShutdown());
546 
547 	// re-lock our endpoint (unlock FIFO to respect locking order)
548 	fifoLocker.Unlock();
549 	locker.Lock();
550 
551 	UnixEndpointLocker peerLocker;
552 	bool peerLocked = (peerEndpoint != NULL && fPeerEndpoint == peerEndpoint
553 		&& _LockConnectedEndpoints(locker, peerLocker) == B_OK);
554 
555 	// send notifications
556 	if (notifyRead)
557 		gSocketModule->notify(socket, B_SELECT_READ, readable);
558 	if (peerLocked && notifyWrite)
559 		gSocketModule->notify(peerEndpoint->socket, B_SELECT_WRITE, writable);
560 
561 	switch (error) {
562 		case UNIX_FIFO_SHUTDOWN:
563 			// Either our socket was closed or read shutdown.
564 			if (fState == UNIX_ENDPOINT_CLOSED) {
565 				// The FD has been closed.
566 				error = EBADF;
567 			} else {
568 				// if (fReceiveFifo == fifo) {
569 				// 		Orderly shutdown or the peer closed the connection.
570 				// } else {
571 				//		Weird case: Peer closed connection and we are already
572 				// 		reconnected (or listening).
573 				// }
574 				error = B_OK;
575 				*_buffer = NULL;
576 			}
577 			break;
578 		case B_TIMED_OUT:
579 			// translate non-blocking timeouts to the correct error code
580 			if (timeout == 0)
581 				error = B_WOULD_BLOCK;
582 			break;
583 	}
584 
585 	RETURN_ERROR(error);
586 }
587 
588 
589 ssize_t
590 UnixEndpoint::Sendable()
591 {
592 	TRACE("[%ld] %p->UnixEndpoint::Sendable()\n", find_thread(NULL), this);
593 
594 	UnixEndpointLocker locker(this);
595 	UnixEndpointLocker peerLocker;
596 
597 	status_t error = _LockConnectedEndpoints(locker, peerLocker);
598 	if (error != B_OK)
599 		RETURN_ERROR(error);
600 
601 	// lock the peer's FIFO
602 	UnixFifo* peerFifo = fPeerEndpoint->fReceiveFifo;
603 	UnixFifoLocker fifoLocker(peerFifo);
604 
605 	RETURN_ERROR(peerFifo->Writable());
606 }
607 
608 
609 ssize_t
610 UnixEndpoint::Receivable()
611 {
612 	TRACE("[%ld] %p->UnixEndpoint::Receivable()\n", find_thread(NULL), this);
613 
614 	UnixEndpointLocker locker(this);
615 
616 	if (fState == UNIX_ENDPOINT_LISTENING)
617 		return gSocketModule->count_connected(socket);
618 
619 	if (fState != UNIX_ENDPOINT_CONNECTED)
620 		RETURN_ERROR(ENOTCONN);
621 
622 	UnixFifoLocker fifoLocker(fReceiveFifo);
623 	RETURN_ERROR(fReceiveFifo->Readable());
624 }
625 
626 
627 void
628 UnixEndpoint::SetReceiveBufferSize(size_t size)
629 {
630 	TRACE("[%ld] %p->UnixEndpoint::SetReceiveBufferSize(%lu)\n",
631 		find_thread(NULL), this, size);
632 
633 	UnixEndpointLocker locker(this);
634 
635 	if (fState != UNIX_ENDPOINT_CONNECTED)
636 		return;
637 
638 	UnixFifoLocker fifoLocker(fReceiveFifo);
639 	fReceiveFifo->SetBufferCapacity(size);
640 }
641 
642 
643 status_t
644 UnixEndpoint::Shutdown(int direction)
645 {
646 	TRACE("[%ld] %p->UnixEndpoint::Shutdown(%d)\n",
647 		find_thread(NULL), this, direction);
648 
649 	uint32 shutdown;
650 	uint32 peerShutdown;
651 
652 	// translate the direction into shutdown flags for our and the peer fifo
653 	switch (direction) {
654 		case SHUT_RD:
655 			shutdown = UNIX_FIFO_SHUTDOWN_READ;
656 			peerShutdown = 0;
657 			break;
658 		case SHUT_WR:
659 			shutdown = 0;
660 			peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
661 			break;
662 		case SHUT_RDWR:
663 			shutdown = UNIX_FIFO_SHUTDOWN_READ;
664 			peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
665 			break;
666 		default:
667 			RETURN_ERROR(B_BAD_VALUE);
668 	}
669 
670 	// lock endpoints
671 	UnixEndpointLocker locker(this);
672 	UnixEndpointLocker peerLocker;
673 
674 	status_t error = _LockConnectedEndpoints(locker, peerLocker);
675 	if (error != B_OK)
676 		RETURN_ERROR(error);
677 
678 	// shutdown our FIFO
679 	fReceiveFifo->Lock();
680 	fReceiveFifo->Shutdown(shutdown);
681 	fReceiveFifo->Unlock();
682 
683 	// shutdown peer FIFO
684 	fPeerEndpoint->fReceiveFifo->Lock();
685 	fPeerEndpoint->fReceiveFifo->Shutdown(peerShutdown);
686 	fPeerEndpoint->fReceiveFifo->Unlock();
687 
688 	// send select notifications
689 	if (direction == SHUT_RD || direction == SHUT_RDWR) {
690 		gSocketModule->notify(socket, B_SELECT_READ, B_OK);
691 		gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_WRITE, B_OK);
692 	}
693 	if (direction == SHUT_WR || direction == SHUT_RDWR) {
694 		gSocketModule->notify(socket, B_SELECT_WRITE, B_OK);
695 		gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_READ, B_OK);
696 	}
697 
698 	RETURN_ERROR(B_OK);
699 }
700 
701 
702 void
703 UnixEndpoint::_Spawn(UnixEndpoint* connectingEndpoint, UnixFifo* fifo)
704 {
705 	ProtocolSocket::Open();
706 
707 	fIsChild = true;
708 	fPeerEndpoint = connectingEndpoint;
709 	fPeerEndpoint->AddReference();
710 
711 	fReceiveFifo = fifo;
712 
713 	PeerAddress().SetTo(&connectingEndpoint->socket->address);
714 
715 	fState = UNIX_ENDPOINT_CONNECTED;
716 }
717 
718 
719 void
720 UnixEndpoint::_Disconnect()
721 {
722 	// Both endpoints must be locked.
723 
724 	// Write shutdown the receive FIFO.
725 	fReceiveFifo->Lock();
726 	fReceiveFifo->Shutdown(UNIX_FIFO_SHUTDOWN_WRITE);
727 	fReceiveFifo->Unlock();
728 
729 	// select() notification.
730 	gSocketModule->notify(socket, B_SELECT_READ, ECONNRESET);
731 	gSocketModule->notify(socket, B_SELECT_WRITE, ECONNRESET);
732 
733 	// Unset the peer endpoint.
734 	fPeerEndpoint->RemoveReference();
735 	fPeerEndpoint = NULL;
736 
737 	// We're officially disconnected.
738 // TODO: Deal with non accept()ed connections correctly!
739 	fIsChild = false;
740 	fState = UNIX_ENDPOINT_NOT_CONNECTED;
741 }
742 
743 
744 status_t
745 UnixEndpoint::_LockConnectedEndpoints(UnixEndpointLocker& locker,
746 	UnixEndpointLocker& peerLocker)
747 {
748 	if (fState != UNIX_ENDPOINT_CONNECTED)
749 		RETURN_ERROR(ENOTCONN);
750 
751 	// We need to lock the peer, too. Get a reference -- we might need to
752 	// unlock ourselves to get the locking order right.
753 	Reference<UnixEndpoint> peerReference(fPeerEndpoint);
754 	UnixEndpoint* peerEndpoint = fPeerEndpoint;
755 
756 	if (fIsChild) {
757 		// We're the child, but locking order is the other way around.
758 		locker.Unlock();
759 		peerLocker.SetTo(peerEndpoint, false);
760 
761 		locker.Lock();
762 
763 		// recheck our state, also whether the peer is still the same
764 		if (fState != UNIX_ENDPOINT_CONNECTED || peerEndpoint != fPeerEndpoint)
765 			RETURN_ERROR(ENOTCONN);
766 	} else
767 		peerLocker.SetTo(peerEndpoint, false);
768 
769 	RETURN_ERROR(B_OK);
770 }
771 
772 
773 status_t
774 UnixEndpoint::_Bind(struct vnode* vnode)
775 {
776 	struct stat st;
777 	status_t error = vfs_stat_vnode(vnode, &st);
778 	if (error != B_OK)
779 		RETURN_ERROR(error);
780 
781 	fAddress.SetTo(st.st_dev, st.st_ino, vnode);
782 	RETURN_ERROR(B_OK);
783 }
784 
785 
786 status_t
787 UnixEndpoint::_Bind(int32 internalID)
788 {
789 	fAddress.SetTo(internalID);
790 	RETURN_ERROR(B_OK);
791 }
792 
793 
794 status_t
795 UnixEndpoint::_Unbind()
796 {
797 	if (fState == UNIX_ENDPOINT_CONNECTED || fState == UNIX_ENDPOINT_LISTENING)
798 		RETURN_ERROR(B_BAD_VALUE);
799 
800 	if (IsBound()) {
801 		UnixAddressManagerLocker addressLocker(gAddressManager);
802 		gAddressManager.Remove(this);
803 		if (struct vnode* vnode = fAddress.Vnode())
804 			vfs_put_vnode(vnode);
805 
806 		fAddress.Unset();
807 	}
808 
809 	RETURN_ERROR(B_OK);
810 }
811 
812 
813 void
814 UnixEndpoint::_UnsetReceiveFifo()
815 {
816 	if (fReceiveFifo) {
817 		fReceiveFifo->RemoveReference();
818 		fReceiveFifo = NULL;
819 	}
820 }
821 
822 
823 void
824 UnixEndpoint::_StopListening()
825 {
826 	if (fState == UNIX_ENDPOINT_LISTENING) {
827 		delete_sem(fAcceptSemaphore);
828 		fAcceptSemaphore = -1;
829 		fState = UNIX_ENDPOINT_NOT_CONNECTED;
830 	}
831 }
832