xref: /haiku/src/add-ons/kernel/network/protocols/unix/UnixEndpoint.cpp (revision 445d4fd926c569e7b9ae28017da86280aaecbae2)
1 /*
2  * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
3  * Distributed under the terms of the MIT License.
4  */
5 
6 
7 #include "UnixEndpoint.h"
8 
9 #include <stdio.h>
10 #include <sys/stat.h>
11 
12 #include <AutoDeleter.h>
13 
14 #include <vfs.h>
15 
16 #include "UnixAddressManager.h"
17 #include "UnixFifo.h"
18 
19 
20 #define UNIX_ENDPOINT_DEBUG_LEVEL	0
21 #define UNIX_DEBUG_LEVEL			UNIX_ENDPOINT_DEBUG_LEVEL
22 #include "UnixDebug.h"
23 
24 
25 // Note on locking order (outermost -> innermost):
26 // UnixEndpoint: connecting -> listening -> child
27 // -> UnixFifo (never lock more than one at a time)
28 // -> UnixAddressManager
29 
30 
31 static inline bigtime_t
32 absolute_timeout(bigtime_t timeout)
33 {
34 	if (timeout == 0 || timeout == B_INFINITE_TIMEOUT)
35 		return timeout;
36 
37 // TODO: Make overflow safe!
38 	return timeout + system_time();
39 }
40 
41 
42 UnixEndpoint::UnixEndpoint(net_socket* socket)
43 	:
44 	ProtocolSocket(socket),
45 	fAddress(),
46 	fAddressHashLink(),
47 	fPeerEndpoint(NULL),
48 	fReceiveFifo(NULL),
49 	fState(UNIX_ENDPOINT_CLOSED),
50 	fAcceptSemaphore(-1),
51 	fIsChild(false),
52 	fWasConnected(false)
53 {
54 	TRACE("[%" B_PRId32 "] %p->UnixEndpoint::UnixEndpoint()\n",
55 		find_thread(NULL), this);
56 
57 	mutex_init(&fLock, "unix endpoint");
58 }
59 
60 
61 UnixEndpoint::~UnixEndpoint()
62 {
63 	TRACE("[%" B_PRId32 "] %p->UnixEndpoint::~UnixEndpoint()\n",
64 		find_thread(NULL), this);
65 
66 	mutex_destroy(&fLock);
67 }
68 
69 
70 status_t
71 UnixEndpoint::Init()
72 {
73 	TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Init()\n", find_thread(NULL),
74 		this);
75 
76 	RETURN_ERROR(B_OK);
77 }
78 
79 
80 void
81 UnixEndpoint::Uninit()
82 {
83 	TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Uninit()\n", find_thread(NULL),
84 		this);
85 
86 	// check whether we're closed
87 	UnixEndpointLocker locker(this);
88 	bool closed = (fState == UNIX_ENDPOINT_CLOSED);
89 	locker.Unlock();
90 
91 	if (!closed) {
92 		// That probably means, we're a child endpoint of a listener and
93 		// have been fully connected, but not yet accepted. Our Close()
94 		// hook isn't called in this case. Do it manually.
95 		Close();
96 	}
97 
98 	ReleaseReference();
99 }
100 
101 
102 status_t
103 UnixEndpoint::Open()
104 {
105 	TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Open()\n", find_thread(NULL),
106 		this);
107 
108 	status_t error = ProtocolSocket::Open();
109 	if (error != B_OK)
110 		RETURN_ERROR(error);
111 
112 	fState = UNIX_ENDPOINT_NOT_CONNECTED;
113 
114 	RETURN_ERROR(B_OK);
115 }
116 
117 
118 status_t
119 UnixEndpoint::Close()
120 {
121 	TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Close()\n", find_thread(NULL),
122 		this);
123 
124 	UnixEndpointLocker locker(this);
125 
126 	if (fState == UNIX_ENDPOINT_CONNECTED) {
127 		UnixEndpointLocker peerLocker;
128 		if (_LockConnectedEndpoints(locker, peerLocker) == B_OK) {
129 			// We're still connected. Disconnect both endpoints!
130 			fPeerEndpoint->_Disconnect();
131 			_Disconnect();
132 		}
133 	}
134 
135 	if (fState == UNIX_ENDPOINT_LISTENING)
136 		_StopListening();
137 
138 	_Unbind();
139 
140 	fState = UNIX_ENDPOINT_CLOSED;
141 	RETURN_ERROR(B_OK);
142 }
143 
144 
145 status_t
146 UnixEndpoint::Free()
147 {
148 	TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Free()\n", find_thread(NULL),
149 		this);
150 
151 	UnixEndpointLocker locker(this);
152 
153 	_UnsetReceiveFifo();
154 
155 	RETURN_ERROR(B_OK);
156 }
157 
158 
159 status_t
160 UnixEndpoint::Bind(const struct sockaddr *_address)
161 {
162 	if (_address->sa_family != AF_UNIX)
163 		RETURN_ERROR(EAFNOSUPPORT);
164 
165 	TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Bind(\"%s\")\n",
166 		find_thread(NULL), this,
167 		ConstSocketAddress(&gAddressModule, _address).AsString().Data());
168 
169 	const sockaddr_un* address = (const sockaddr_un*)_address;
170 
171 	UnixEndpointLocker endpointLocker(this);
172 
173 	if (fState != UNIX_ENDPOINT_NOT_CONNECTED || IsBound())
174 		RETURN_ERROR(B_BAD_VALUE);
175 
176 	if (address->sun_path[0] == '\0') {
177 		UnixAddressManagerLocker addressLocker(gAddressManager);
178 
179 		// internal address space (or empty address)
180 		int32 internalID;
181 		if (UnixAddress::IsEmptyAddress(*address))
182 			internalID = gAddressManager.NextUnusedInternalID();
183 		else
184 			internalID = UnixAddress::InternalID(*address);
185 		if (internalID < 0)
186 			RETURN_ERROR(internalID);
187 
188 		status_t error = _Bind(internalID);
189 		if (error != B_OK)
190 			RETURN_ERROR(error);
191 
192 		sockaddr_un* outAddress = (sockaddr_un*)&socket->address;
193 		outAddress->sun_path[0] = '\0';
194 		sprintf(outAddress->sun_path + 1, "%05" B_PRIx32, internalID);
195 		outAddress->sun_len = INTERNAL_UNIX_ADDRESS_LEN;
196 			// null-byte + 5 hex digits
197 
198 		gAddressManager.Add(this);
199 	} else {
200 		// FS address space
201 		size_t pathLen = strnlen(address->sun_path, sizeof(address->sun_path));
202 		if (pathLen == 0 || pathLen == sizeof(address->sun_path))
203 			RETURN_ERROR(B_BAD_VALUE);
204 
205 		struct vnode* vnode;
206 		status_t error = vfs_create_special_node(address->sun_path,
207 			NULL, S_IFSOCK | 0644, 0, !gStackModule->is_syscall(), NULL,
208 			&vnode);
209 		if (error != B_OK)
210 			RETURN_ERROR(error == B_FILE_EXISTS ? EADDRINUSE : error);
211 
212 		error = _Bind(vnode);
213 		if (error != B_OK) {
214 			vfs_put_vnode(vnode);
215 			RETURN_ERROR(error);
216 		}
217 
218 		size_t addressLen = address->sun_path + pathLen + 1 - (char*)address;
219 		memcpy(&socket->address, address, addressLen);
220 		socket->address.ss_len = addressLen;
221 
222 		UnixAddressManagerLocker addressLocker(gAddressManager);
223 		gAddressManager.Add(this);
224 	}
225 
226 	RETURN_ERROR(B_OK);
227 }
228 
229 
230 status_t
231 UnixEndpoint::Unbind()
232 {
233 	TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Unbind()\n", find_thread(NULL),
234 		this);
235 
236 	UnixEndpointLocker endpointLocker(this);
237 
238 	RETURN_ERROR(_Unbind());
239 }
240 
241 
242 status_t
243 UnixEndpoint::Listen(int backlog)
244 {
245 	TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Listen(%d)\n", find_thread(NULL),
246 		this, backlog);
247 
248 	UnixEndpointLocker endpointLocker(this);
249 
250 	if (!IsBound())
251 		RETURN_ERROR(EDESTADDRREQ);
252 	if (fState != UNIX_ENDPOINT_NOT_CONNECTED
253 		&& fState != UNIX_ENDPOINT_LISTENING)
254 		RETURN_ERROR(EINVAL);
255 
256 	gSocketModule->set_max_backlog(socket, backlog);
257 
258 	if (fState == UNIX_ENDPOINT_NOT_CONNECTED) {
259 		fAcceptSemaphore = create_sem(0, "unix accept");
260 		if (fAcceptSemaphore < 0)
261 			RETURN_ERROR(ENOBUFS);
262 
263 		_UnsetReceiveFifo();
264 
265 		fCredentials.pid = getpid();
266 		fCredentials.uid = geteuid();
267 		fCredentials.gid = getegid();
268 
269 		fState = UNIX_ENDPOINT_LISTENING;
270 	}
271 
272 	RETURN_ERROR(B_OK);
273 }
274 
275 
276 status_t
277 UnixEndpoint::Connect(const struct sockaddr *_address)
278 {
279 	if (_address->sa_family != AF_UNIX)
280 		RETURN_ERROR(EAFNOSUPPORT);
281 
282 	TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Connect(\"%s\")\n",
283 		find_thread(NULL), this,
284 		ConstSocketAddress(&gAddressModule, _address).AsString().Data());
285 
286 	const sockaddr_un* address = (const sockaddr_un*)_address;
287 
288 	UnixEndpointLocker endpointLocker(this);
289 
290 	if (fState == UNIX_ENDPOINT_CONNECTED)
291 		RETURN_ERROR(EISCONN);
292 
293 	if (fState != UNIX_ENDPOINT_NOT_CONNECTED)
294 		RETURN_ERROR(B_BAD_VALUE);
295 // TODO: If listening, we could set the backlog to 0 and connect.
296 
297 	// check the address first
298 	UnixAddress unixAddress;
299 
300 	if (address->sun_path[0] == '\0') {
301 		// internal address space (or empty address)
302 		int32 internalID;
303 		if (UnixAddress::IsEmptyAddress(*address))
304 			RETURN_ERROR(B_BAD_VALUE);
305 
306 		internalID = UnixAddress::InternalID(*address);
307 		if (internalID < 0)
308 			RETURN_ERROR(internalID);
309 
310 		unixAddress.SetTo(internalID);
311 	} else {
312 		// FS address space
313 		size_t pathLen = strnlen(address->sun_path, sizeof(address->sun_path));
314 		if (pathLen == 0 || pathLen == sizeof(address->sun_path))
315 			RETURN_ERROR(B_BAD_VALUE);
316 
317 		struct stat st;
318 		status_t error = vfs_read_stat(-1, address->sun_path, true, &st,
319 			!gStackModule->is_syscall());
320 		if (error != B_OK)
321 			RETURN_ERROR(error);
322 
323 		if (!S_ISSOCK(st.st_mode))
324 			RETURN_ERROR(B_BAD_VALUE);
325 
326 		unixAddress.SetTo(st.st_dev, st.st_ino, NULL);
327 	}
328 
329 	// get the peer endpoint
330 	UnixAddressManagerLocker addressLocker(gAddressManager);
331 	UnixEndpoint* listeningEndpoint = gAddressManager.Lookup(unixAddress);
332 	if (listeningEndpoint == NULL)
333 		RETURN_ERROR(ECONNREFUSED);
334 	BReference<UnixEndpoint> peerReference(listeningEndpoint);
335 	addressLocker.Unlock();
336 
337 	UnixEndpointLocker peerLocker(listeningEndpoint);
338 
339 	if (!listeningEndpoint->IsBound()
340 		|| listeningEndpoint->fState != UNIX_ENDPOINT_LISTENING
341 		|| listeningEndpoint->fAddress != unixAddress) {
342 		RETURN_ERROR(ECONNREFUSED);
343 	}
344 
345 	// Allocate FIFOs for us and the socket we're going to spawn. We do that
346 	// now, so that the mess we need to cleanup, if allocating them fails, is
347 	// harmless.
348 	UnixFifo* fifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT);
349 	UnixFifo* peerFifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT);
350 	ObjectDeleter<UnixFifo> fifoDeleter(fifo);
351 	ObjectDeleter<UnixFifo> peerFifoDeleter(peerFifo);
352 
353 	status_t error;
354 	if ((error = fifo->Init()) != B_OK || (error = peerFifo->Init()) != B_OK)
355 		return error;
356 
357 	// spawn new endpoint for accept()
358 	net_socket* newSocket;
359 	error = gSocketModule->spawn_pending_socket(listeningEndpoint->socket,
360 		&newSocket);
361 	if (error != B_OK)
362 		RETURN_ERROR(error);
363 
364 	// init connected peer endpoint
365 	UnixEndpoint* connectedEndpoint = (UnixEndpoint*)newSocket->first_protocol;
366 
367 	UnixEndpointLocker connectedLocker(connectedEndpoint);
368 
369 	connectedEndpoint->_Spawn(this, listeningEndpoint, peerFifo);
370 
371 	// update our attributes
372 	_UnsetReceiveFifo();
373 
374 	fPeerEndpoint = connectedEndpoint;
375 	PeerAddress().SetTo(&connectedEndpoint->socket->address);
376 	fPeerEndpoint->AcquireReference();
377 	fReceiveFifo = fifo;
378 
379 	fCredentials.pid = getpid();
380 	fCredentials.uid = geteuid();
381 	fCredentials.gid = getegid();
382 
383 	fifoDeleter.Detach();
384 	peerFifoDeleter.Detach();
385 
386 	fState = UNIX_ENDPOINT_CONNECTED;
387 	fWasConnected = true;
388 
389 	gSocketModule->set_connected(newSocket);
390 
391 	release_sem(listeningEndpoint->fAcceptSemaphore);
392 
393 	connectedLocker.Unlock();
394 	peerLocker.Unlock();
395 	endpointLocker.Unlock();
396 
397 	RETURN_ERROR(B_OK);
398 }
399 
400 
401 status_t
402 UnixEndpoint::Accept(net_socket **_acceptedSocket)
403 {
404 	TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Accept()\n", find_thread(NULL),
405 		this);
406 
407 	bigtime_t timeout = absolute_timeout(socket->receive.timeout);
408 	if (gStackModule->is_restarted_syscall())
409 		timeout = gStackModule->restore_syscall_restart_timeout();
410 	else
411 		gStackModule->store_syscall_restart_timeout(timeout);
412 
413 	UnixEndpointLocker locker(this);
414 
415 	status_t error;
416 	do {
417 		locker.Unlock();
418 
419 		error = acquire_sem_etc(fAcceptSemaphore, 1,
420 			B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
421 		if (error < B_OK)
422 			break;
423 
424 		locker.Lock();
425 		error = gSocketModule->dequeue_connected(socket, _acceptedSocket);
426 	} while (error != B_OK);
427 
428 	if (error == B_TIMED_OUT && timeout == 0) {
429 		// translate non-blocking timeouts to the correct error code
430 		error = B_WOULD_BLOCK;
431 	}
432 
433 	RETURN_ERROR(error);
434 }
435 
436 
437 ssize_t
438 UnixEndpoint::Send(const iovec *vecs, size_t vecCount,
439 	ancillary_data_container *ancillaryData)
440 {
441 	TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Send(%p, %ld, %p)\n",
442 		find_thread(NULL), this, vecs, vecCount, ancillaryData);
443 
444 	bigtime_t timeout = absolute_timeout(socket->send.timeout);
445 	if (gStackModule->is_restarted_syscall())
446 		timeout = gStackModule->restore_syscall_restart_timeout();
447 	else
448 		gStackModule->store_syscall_restart_timeout(timeout);
449 
450 	UnixEndpointLocker locker(this);
451 
452 	BReference<UnixEndpoint> peerReference;
453 	UnixEndpointLocker peerLocker;
454 
455 	status_t error = _LockConnectedEndpoints(locker, peerLocker);
456 	if (error != B_OK)
457 		RETURN_ERROR(error);
458 
459 	UnixEndpoint* peerEndpoint = fPeerEndpoint;
460 	peerReference.SetTo(peerEndpoint);
461 
462 	// lock the peer's FIFO
463 	UnixFifo* peerFifo = peerEndpoint->fReceiveFifo;
464 	BReference<UnixFifo> _(peerFifo);
465 	UnixFifoLocker fifoLocker(peerFifo);
466 
467 	// unlock endpoints
468 	locker.Unlock();
469 	peerLocker.Unlock();
470 
471 	ssize_t result = peerFifo->Write(vecs, vecCount, ancillaryData, timeout);
472 
473 	// Notify select()ing readers, if we successfully wrote anything.
474 	size_t readable = peerFifo->Readable();
475 	bool notifyRead = (error == B_OK && readable > 0
476 		&& !peerFifo->IsReadShutdown());
477 
478 	// Notify select()ing writers, if we failed to write anything and there's
479 	// still room to write.
480 	size_t writable = peerFifo->Writable();
481 	bool notifyWrite = (error != B_OK && writable > 0
482 		&& !peerFifo->IsWriteShutdown());
483 
484 	// re-lock our endpoint (unlock FIFO to respect locking order)
485 	fifoLocker.Unlock();
486 	locker.Lock();
487 
488 	bool peerLocked = (fPeerEndpoint == peerEndpoint
489 		&& _LockConnectedEndpoints(locker, peerLocker) == B_OK);
490 
491 	// send notifications
492 	if (peerLocked && notifyRead)
493 		gSocketModule->notify(peerEndpoint->socket, B_SELECT_READ, readable);
494 	if (notifyWrite)
495 		gSocketModule->notify(socket, B_SELECT_WRITE, writable);
496 
497 	switch (result) {
498 		case UNIX_FIFO_SHUTDOWN:
499 			if (fPeerEndpoint == peerEndpoint
500 					&& fState == UNIX_ENDPOINT_CONNECTED) {
501 				// Orderly write shutdown on our side.
502 				// Note: Linux and Solaris also send a SIGPIPE, but according
503 				// the send() specification that shouldn't be done.
504 				result = EPIPE;
505 			} else {
506 				// The FD has been closed.
507 				result = EBADF;
508 			}
509 			break;
510 		case EPIPE:
511 			// The peer closed connection or shutdown its read side. Reward
512 			// the caller with a SIGPIPE.
513 			if (gStackModule->is_syscall())
514 				send_signal(find_thread(NULL), SIGPIPE);
515 			break;
516 		case B_TIMED_OUT:
517 			// Translate non-blocking timeouts to the correct error code.
518 			if (timeout == 0)
519 				result = B_WOULD_BLOCK;
520 			break;
521 	}
522 
523 	RETURN_ERROR(result);
524 }
525 
526 
527 ssize_t
528 UnixEndpoint::Receive(const iovec *vecs, size_t vecCount,
529 	ancillary_data_container **_ancillaryData, struct sockaddr *_address,
530 	socklen_t *_addressLength)
531 {
532 	TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Receive(%p, %ld)\n",
533 		find_thread(NULL), this, vecs, vecCount);
534 
535 	bigtime_t timeout = absolute_timeout(socket->receive.timeout);
536 	if (gStackModule->is_restarted_syscall())
537 		timeout = gStackModule->restore_syscall_restart_timeout();
538 	else
539 		gStackModule->store_syscall_restart_timeout(timeout);
540 
541 	UnixEndpointLocker locker(this);
542 
543 	// We can read as long as we have a FIFO. I.e. we are still connected, or
544 	// disconnected and not yet reconnected/listening/closed.
545 	if (fReceiveFifo == NULL)
546 		RETURN_ERROR(ENOTCONN);
547 
548 	UnixEndpoint* peerEndpoint = fPeerEndpoint;
549 	BReference<UnixEndpoint> peerReference(peerEndpoint);
550 
551 	// Copy the peer address upfront. This way, if we read something, we don't
552 	// get into a potential race with Close().
553 	if (_address != NULL) {
554 		socklen_t addrLen = min_c(*_addressLength, socket->peer.ss_len);
555 		memcpy(_address, &socket->peer, addrLen);
556 		*_addressLength = addrLen;
557 	}
558 
559 	// lock our FIFO
560 	UnixFifo* fifo = fReceiveFifo;
561 	BReference<UnixFifo> _(fifo);
562 	UnixFifoLocker fifoLocker(fifo);
563 
564 	// unlock endpoint
565 	locker.Unlock();
566 
567 	ssize_t result = fifo->Read(vecs, vecCount, _ancillaryData, timeout);
568 
569 	// Notify select()ing writers, if we successfully read anything.
570 	size_t writable = fifo->Writable();
571 	bool notifyWrite = (result >= 0 && writable > 0
572 		&& !fifo->IsWriteShutdown());
573 
574 	// Notify select()ing readers, if we failed to read anything and there's
575 	// still something left to read.
576 	size_t readable = fifo->Readable();
577 	bool notifyRead = (result < 0 && readable > 0
578 		&& !fifo->IsReadShutdown());
579 
580 	// re-lock our endpoint (unlock FIFO to respect locking order)
581 	fifoLocker.Unlock();
582 	locker.Lock();
583 
584 	UnixEndpointLocker peerLocker;
585 	bool peerLocked = (peerEndpoint != NULL && fPeerEndpoint == peerEndpoint
586 		&& _LockConnectedEndpoints(locker, peerLocker) == B_OK);
587 
588 	// send notifications
589 	if (notifyRead)
590 		gSocketModule->notify(socket, B_SELECT_READ, readable);
591 	if (peerLocked && notifyWrite)
592 		gSocketModule->notify(peerEndpoint->socket, B_SELECT_WRITE, writable);
593 
594 	switch (result) {
595 		case UNIX_FIFO_SHUTDOWN:
596 			// Either our socket was closed or read shutdown.
597 			if (fState == UNIX_ENDPOINT_CLOSED) {
598 				// The FD has been closed.
599 				result = EBADF;
600 			} else {
601 				// if (fReceiveFifo == fifo) {
602 				// 		Orderly shutdown or the peer closed the connection.
603 				// } else {
604 				//		Weird case: Peer closed connection and we are already
605 				// 		reconnected (or listening).
606 				// }
607 				result = 0;
608 			}
609 			break;
610 		case B_TIMED_OUT:
611 			// translate non-blocking timeouts to the correct error code
612 			if (timeout == 0)
613 				result = B_WOULD_BLOCK;
614 			break;
615 	}
616 
617 	RETURN_ERROR(result);
618 }
619 
620 
621 ssize_t
622 UnixEndpoint::Sendable()
623 {
624 	TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Sendable()\n", find_thread(NULL),
625 		this);
626 
627 	UnixEndpointLocker locker(this);
628 	UnixEndpointLocker peerLocker;
629 
630 	status_t error = _LockConnectedEndpoints(locker, peerLocker);
631 	if (error != B_OK)
632 		RETURN_ERROR(error);
633 
634 	// lock the peer's FIFO
635 	UnixFifo* peerFifo = fPeerEndpoint->fReceiveFifo;
636 	UnixFifoLocker fifoLocker(peerFifo);
637 
638 	RETURN_ERROR(peerFifo->Writable());
639 }
640 
641 
642 ssize_t
643 UnixEndpoint::Receivable()
644 {
645 	TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Receivable()\n", find_thread(NULL),
646 		this);
647 
648 	UnixEndpointLocker locker(this);
649 
650 	if (fState == UNIX_ENDPOINT_LISTENING)
651 		return gSocketModule->count_connected(socket);
652 
653 	if (fState != UNIX_ENDPOINT_CONNECTED)
654 		RETURN_ERROR(ENOTCONN);
655 
656 	UnixFifoLocker fifoLocker(fReceiveFifo);
657 	ssize_t readable = fReceiveFifo->Readable();
658 	if (readable == 0 && (fReceiveFifo->IsWriteShutdown()
659 			|| fReceiveFifo->IsReadShutdown())) {
660 		RETURN_ERROR(ENOTCONN);
661 	}
662 	RETURN_ERROR(readable);
663 }
664 
665 
666 status_t
667 UnixEndpoint::SetReceiveBufferSize(size_t size)
668 {
669 	TRACE("[%" B_PRId32 "] %p->UnixEndpoint::SetReceiveBufferSize(%lu)\n",
670 		find_thread(NULL), this, size);
671 
672 	UnixEndpointLocker locker(this);
673 
674 	if (fReceiveFifo == NULL)
675 		return B_BAD_VALUE;
676 
677 	UnixFifoLocker fifoLocker(fReceiveFifo);
678 	return fReceiveFifo->SetBufferCapacity(size);
679 }
680 
681 
682 status_t
683 UnixEndpoint::GetPeerCredentials(ucred* credentials)
684 {
685 	UnixEndpointLocker locker(this);
686 	UnixEndpointLocker peerLocker;
687 
688 	status_t error = _LockConnectedEndpoints(locker, peerLocker);
689 	if (error != B_OK)
690 		RETURN_ERROR(error);
691 
692 	*credentials = fPeerEndpoint->fCredentials;
693 
694 	return B_OK;
695 }
696 
697 
698 status_t
699 UnixEndpoint::Shutdown(int direction)
700 {
701 	TRACE("[%" B_PRId32 "] %p->UnixEndpoint::Shutdown(%d)\n",
702 		find_thread(NULL), this, direction);
703 
704 	uint32 shutdown;
705 	uint32 peerShutdown;
706 
707 	// translate the direction into shutdown flags for our and the peer fifo
708 	switch (direction) {
709 		case SHUT_RD:
710 			shutdown = UNIX_FIFO_SHUTDOWN_READ;
711 			peerShutdown = 0;
712 			break;
713 		case SHUT_WR:
714 			shutdown = 0;
715 			peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
716 			break;
717 		case SHUT_RDWR:
718 			shutdown = UNIX_FIFO_SHUTDOWN_READ;
719 			peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
720 			break;
721 		default:
722 			RETURN_ERROR(B_BAD_VALUE);
723 	}
724 
725 	// lock endpoints
726 	UnixEndpointLocker locker(this);
727 	UnixEndpointLocker peerLocker;
728 
729 	status_t error = _LockConnectedEndpoints(locker, peerLocker);
730 	if (error != B_OK)
731 		RETURN_ERROR(error);
732 
733 	// shutdown our FIFO
734 	fReceiveFifo->Lock();
735 	fReceiveFifo->Shutdown(shutdown);
736 	fReceiveFifo->Unlock();
737 
738 	// shutdown peer FIFO
739 	fPeerEndpoint->fReceiveFifo->Lock();
740 	fPeerEndpoint->fReceiveFifo->Shutdown(peerShutdown);
741 	fPeerEndpoint->fReceiveFifo->Unlock();
742 
743 	// send select notifications
744 	if (direction == SHUT_RD || direction == SHUT_RDWR) {
745 		gSocketModule->notify(socket, B_SELECT_READ, EPIPE);
746 		gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_WRITE, EPIPE);
747 	}
748 	if (direction == SHUT_WR || direction == SHUT_RDWR) {
749 		gSocketModule->notify(socket, B_SELECT_WRITE, EPIPE);
750 		gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_READ, EPIPE);
751 	}
752 
753 	RETURN_ERROR(B_OK);
754 }
755 
756 
757 void
758 UnixEndpoint::_Spawn(UnixEndpoint* connectingEndpoint,
759 	UnixEndpoint* listeningEndpoint, UnixFifo* fifo)
760 {
761 	ProtocolSocket::Open();
762 
763 	fIsChild = true;
764 	fPeerEndpoint = connectingEndpoint;
765 	fPeerEndpoint->AcquireReference();
766 
767 	fReceiveFifo = fifo;
768 
769 	PeerAddress().SetTo(&connectingEndpoint->socket->address);
770 
771 	fCredentials = listeningEndpoint->fCredentials;
772 
773 	fState = UNIX_ENDPOINT_CONNECTED;
774 }
775 
776 
777 void
778 UnixEndpoint::_Disconnect()
779 {
780 	// Both endpoints must be locked.
781 
782 	// Write shutdown the receive FIFO.
783 	fReceiveFifo->Lock();
784 	fReceiveFifo->Shutdown(UNIX_FIFO_SHUTDOWN_WRITE);
785 	fReceiveFifo->Unlock();
786 
787 	// select() notification.
788 	gSocketModule->notify(socket, B_SELECT_READ, ECONNRESET);
789 	gSocketModule->notify(socket, B_SELECT_WRITE, ECONNRESET);
790 
791 	// Unset the peer endpoint.
792 	fPeerEndpoint->ReleaseReference();
793 	fPeerEndpoint = NULL;
794 
795 	// We're officially disconnected.
796 // TODO: Deal with non accept()ed connections correctly!
797 	fIsChild = false;
798 	fState = UNIX_ENDPOINT_NOT_CONNECTED;
799 }
800 
801 
802 status_t
803 UnixEndpoint::_LockConnectedEndpoints(UnixEndpointLocker& locker,
804 	UnixEndpointLocker& peerLocker)
805 {
806 	if (fState != UNIX_ENDPOINT_CONNECTED)
807 		RETURN_ERROR(fWasConnected ? EPIPE : ENOTCONN);
808 
809 	// We need to lock the peer, too. Get a reference -- we might need to
810 	// unlock ourselves to get the locking order right.
811 	BReference<UnixEndpoint> peerReference(fPeerEndpoint);
812 	UnixEndpoint* peerEndpoint = fPeerEndpoint;
813 
814 	if (fIsChild) {
815 		// We're the child, but locking order is the other way around.
816 		locker.Unlock();
817 		peerLocker.SetTo(peerEndpoint, false);
818 
819 		locker.Lock();
820 
821 		// recheck our state, also whether the peer is still the same
822 		if (fState != UNIX_ENDPOINT_CONNECTED || peerEndpoint != fPeerEndpoint)
823 			RETURN_ERROR(ENOTCONN);
824 	} else
825 		peerLocker.SetTo(peerEndpoint, false);
826 
827 	RETURN_ERROR(B_OK);
828 }
829 
830 
831 status_t
832 UnixEndpoint::_Bind(struct vnode* vnode)
833 {
834 	struct stat st;
835 	status_t error = vfs_stat_vnode(vnode, &st);
836 	if (error != B_OK)
837 		RETURN_ERROR(error);
838 
839 	fAddress.SetTo(st.st_dev, st.st_ino, vnode);
840 	RETURN_ERROR(B_OK);
841 }
842 
843 
844 status_t
845 UnixEndpoint::_Bind(int32 internalID)
846 {
847 	fAddress.SetTo(internalID);
848 	RETURN_ERROR(B_OK);
849 }
850 
851 
852 status_t
853 UnixEndpoint::_Unbind()
854 {
855 	if (fState == UNIX_ENDPOINT_CONNECTED || fState == UNIX_ENDPOINT_LISTENING)
856 		RETURN_ERROR(B_BAD_VALUE);
857 
858 	if (IsBound()) {
859 		UnixAddressManagerLocker addressLocker(gAddressManager);
860 		gAddressManager.Remove(this);
861 		if (struct vnode* vnode = fAddress.Vnode())
862 			vfs_put_vnode(vnode);
863 
864 		fAddress.Unset();
865 	}
866 
867 	RETURN_ERROR(B_OK);
868 }
869 
870 
871 void
872 UnixEndpoint::_UnsetReceiveFifo()
873 {
874 	if (fReceiveFifo) {
875 		fReceiveFifo->ReleaseReference();
876 		fReceiveFifo = NULL;
877 	}
878 }
879 
880 
881 void
882 UnixEndpoint::_StopListening()
883 {
884 	if (fState == UNIX_ENDPOINT_LISTENING) {
885 		delete_sem(fAcceptSemaphore);
886 		fAcceptSemaphore = -1;
887 		fState = UNIX_ENDPOINT_NOT_CONNECTED;
888 	}
889 }
890