xref: /haiku/src/add-ons/kernel/network/protocols/unix/UnixStreamEndpoint.cpp (revision 9a6a20d4689307142a7ed26a1437ba47e244e73f)
1 /*
2  * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
3  * Distributed under the terms of the MIT License.
4  */
5 
6 
7 #include "UnixStreamEndpoint.h"
8 
9 #include <stdio.h>
10 #include <sys/stat.h>
11 
12 #include <AutoDeleter.h>
13 
14 #include <vfs.h>
15 
16 #include "UnixAddressManager.h"
17 #include "UnixFifo.h"
18 
19 
20 #define UNIX_STREAM_ENDPOINT_DEBUG_LEVEL	0
21 #define UNIX_DEBUG_LEVEL					UNIX_STREAM_ENDPOINT_DEBUG_LEVEL
22 #include "UnixDebug.h"
23 
24 
25 // Note on locking order (outermost -> innermost):
26 // UnixStreamEndpoint: connecting -> listening -> child
27 // -> UnixFifo (never lock more than one at a time)
28 // -> UnixAddressManager
29 
30 
31 UnixStreamEndpoint::UnixStreamEndpoint(net_socket* socket)
32 	:
33 	UnixEndpoint(socket),
34 	fPeerEndpoint(NULL),
35 	fReceiveFifo(NULL),
36 	fState(unix_stream_endpoint_state::Closed),
37 	fAcceptSemaphore(-1),
38 	fIsChild(false),
39 	fWasConnected(false)
40 {
41 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::UnixStreamEndpoint()\n",
42 		find_thread(NULL), this);
43 }
44 
45 
46 UnixStreamEndpoint::~UnixStreamEndpoint()
47 {
48 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::~UnixStreamEndpoint()\n",
49 		find_thread(NULL), this);
50 }
51 
52 
53 status_t
54 UnixStreamEndpoint::Init()
55 {
56 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Init()\n", find_thread(NULL),
57 		this);
58 
59 	RETURN_ERROR(B_OK);
60 }
61 
62 
63 void
64 UnixStreamEndpoint::Uninit()
65 {
66 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Uninit()\n", find_thread(NULL),
67 		this);
68 
69 	// check whether we're closed
70 	UnixStreamEndpointLocker locker(this);
71 	bool closed = (fState == unix_stream_endpoint_state::Closed);
72 	locker.Unlock();
73 
74 	if (!closed) {
75 		// That probably means, we're a child endpoint of a listener and
76 		// have been fully connected, but not yet accepted. Our Close()
77 		// hook isn't called in this case. Do it manually.
78 		Close();
79 	}
80 
81 	ReleaseReference();
82 }
83 
84 
85 status_t
86 UnixStreamEndpoint::Open()
87 {
88 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Open()\n", find_thread(NULL),
89 		this);
90 
91 	status_t error = ProtocolSocket::Open();
92 	if (error != B_OK)
93 		RETURN_ERROR(error);
94 
95 	fState = unix_stream_endpoint_state::NotConnected;
96 
97 	RETURN_ERROR(B_OK);
98 }
99 
100 
101 status_t
102 UnixStreamEndpoint::Close()
103 {
104 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Close()\n", find_thread(NULL),
105 		this);
106 
107 	UnixStreamEndpointLocker locker(this);
108 
109 	if (fState == unix_stream_endpoint_state::Connected) {
110 		BReference<UnixStreamEndpoint> peerReference;
111 		UnixStreamEndpointLocker peerLocker;
112 		if (_LockConnectedEndpoints(locker, peerLocker) == B_OK) {
113 			// The peer may be destroyed when we release it,
114 			// so grab a reference for the locker's sake.
115 			peerReference.SetTo(peerLocker.Get(), false);
116 
117 			// We're still connected. Disconnect both endpoints!
118 			fPeerEndpoint->_Disconnect();
119 			_Disconnect();
120 		}
121 	}
122 
123 	if (fState == unix_stream_endpoint_state::Listening)
124 		_StopListening();
125 
126 	_Unbind();
127 
128 	fState = unix_stream_endpoint_state::Closed;
129 	RETURN_ERROR(B_OK);
130 }
131 
132 
133 status_t
134 UnixStreamEndpoint::Free()
135 {
136 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Free()\n", find_thread(NULL),
137 		this);
138 
139 	UnixStreamEndpointLocker locker(this);
140 
141 	_UnsetReceiveFifo();
142 
143 	RETURN_ERROR(B_OK);
144 }
145 
146 
147 status_t
148 UnixStreamEndpoint::Bind(const struct sockaddr* _address)
149 {
150 	if (_address->sa_family != AF_UNIX)
151 		RETURN_ERROR(EAFNOSUPPORT);
152 
153 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Bind(\"%s\")\n",
154 		find_thread(NULL), this,
155 		ConstSocketAddress(&gAddressModule, _address).AsString().Data());
156 
157 	const sockaddr_un* address = (const sockaddr_un*)_address;
158 
159 	UnixStreamEndpointLocker endpointLocker(this);
160 
161 	if (fState != unix_stream_endpoint_state::NotConnected || IsBound())
162 		RETURN_ERROR(B_BAD_VALUE);
163 
164 	RETURN_ERROR(_Bind(address));
165 }
166 
167 
168 status_t
169 UnixStreamEndpoint::Unbind()
170 {
171 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Unbind()\n", find_thread(NULL),
172 		this);
173 
174 	UnixStreamEndpointLocker endpointLocker(this);
175 
176 	RETURN_ERROR(_Unbind());
177 }
178 
179 
180 status_t
181 UnixStreamEndpoint::Listen(int backlog)
182 {
183 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Listen(%d)\n", find_thread(NULL),
184 		this, backlog);
185 
186 	UnixStreamEndpointLocker endpointLocker(this);
187 
188 	if (!IsBound())
189 		RETURN_ERROR(EDESTADDRREQ);
190 	if (fState != unix_stream_endpoint_state::NotConnected
191 		&& fState != unix_stream_endpoint_state::Listening)
192 		RETURN_ERROR(EINVAL);
193 
194 	gSocketModule->set_max_backlog(socket, backlog);
195 
196 	if (fState == unix_stream_endpoint_state::NotConnected) {
197 		fAcceptSemaphore = create_sem(0, "unix accept");
198 		if (fAcceptSemaphore < 0)
199 			RETURN_ERROR(ENOBUFS);
200 
201 		_UnsetReceiveFifo();
202 
203 		fCredentials.pid = getpid();
204 		fCredentials.uid = geteuid();
205 		fCredentials.gid = getegid();
206 
207 		fState = unix_stream_endpoint_state::Listening;
208 	}
209 
210 	RETURN_ERROR(B_OK);
211 }
212 
213 
214 status_t
215 UnixStreamEndpoint::Connect(const struct sockaddr* _address)
216 {
217 	if (_address->sa_family != AF_UNIX)
218 		RETURN_ERROR(EAFNOSUPPORT);
219 
220 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Connect(\"%s\")\n",
221 		find_thread(NULL), this,
222 		ConstSocketAddress(&gAddressModule, _address).AsString().Data());
223 
224 	const sockaddr_un* address = (const sockaddr_un*)_address;
225 
226 	UnixStreamEndpointLocker endpointLocker(this);
227 
228 	if (fState == unix_stream_endpoint_state::Connected)
229 		RETURN_ERROR(EISCONN);
230 
231 	if (fState != unix_stream_endpoint_state::NotConnected)
232 		RETURN_ERROR(B_BAD_VALUE);
233 // TODO: If listening, we could set the backlog to 0 and connect.
234 
235 	// check the address first
236 	UnixAddress unixAddress;
237 
238 	if (address->sun_path[0] == '\0') {
239 		// internal address space (or empty address)
240 		int32 internalID;
241 		if (UnixAddress::IsEmptyAddress(*address))
242 			RETURN_ERROR(B_BAD_VALUE);
243 
244 		internalID = UnixAddress::InternalID(*address);
245 		if (internalID < 0)
246 			RETURN_ERROR(internalID);
247 
248 		unixAddress.SetTo(internalID);
249 	} else {
250 		// FS address space
251 		size_t pathLen = strnlen(address->sun_path, sizeof(address->sun_path));
252 		if (pathLen == 0 || pathLen == sizeof(address->sun_path))
253 			RETURN_ERROR(B_BAD_VALUE);
254 
255 		struct stat st;
256 		status_t error = vfs_read_stat(-1, address->sun_path, true, &st,
257 			!gStackModule->is_syscall());
258 		if (error != B_OK)
259 			RETURN_ERROR(error);
260 
261 		if (!S_ISSOCK(st.st_mode))
262 			RETURN_ERROR(B_BAD_VALUE);
263 
264 		unixAddress.SetTo(st.st_dev, st.st_ino, NULL);
265 	}
266 
267 	// get the peer endpoint
268 	UnixAddressManagerLocker addressLocker(gAddressManager);
269 	UnixEndpoint* listeningUnixEndpoint = gAddressManager.Lookup(unixAddress);
270 	if (listeningUnixEndpoint == NULL)
271 		RETURN_ERROR(ECONNREFUSED);
272 	UnixStreamEndpoint* listeningEndpoint
273 		= dynamic_cast<UnixStreamEndpoint*>(listeningUnixEndpoint);
274 	if (listeningEndpoint == NULL)
275 		RETURN_ERROR(EPROTOTYPE);
276 	BReference<UnixStreamEndpoint> peerReference(listeningEndpoint);
277 	addressLocker.Unlock();
278 
279 	UnixStreamEndpointLocker peerLocker(listeningEndpoint);
280 
281 	if (!listeningEndpoint->IsBound()
282 		|| listeningEndpoint->fState != unix_stream_endpoint_state::Listening
283 		|| listeningEndpoint->fAddress != unixAddress) {
284 		RETURN_ERROR(ECONNREFUSED);
285 	}
286 
287 	// Allocate FIFOs for us and the socket we're going to spawn. We do that
288 	// now, so that the mess we need to cleanup, if allocating them fails, is
289 	// harmless.
290 	UnixFifo* fifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT, UnixFifoType::Stream);
291 	UnixFifo* peerFifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT, UnixFifoType::Stream);
292 	ObjectDeleter<UnixFifo> fifoDeleter(fifo);
293 	ObjectDeleter<UnixFifo> peerFifoDeleter(peerFifo);
294 
295 	status_t error;
296 	if ((error = fifo->Init()) != B_OK || (error = peerFifo->Init()) != B_OK)
297 		return error;
298 
299 	// spawn new endpoint for accept()
300 	net_socket* newSocket;
301 	error = gSocketModule->spawn_pending_socket(listeningEndpoint->socket,
302 		&newSocket);
303 	if (error != B_OK)
304 		RETURN_ERROR(error);
305 
306 	// init connected peer endpoint
307 	UnixStreamEndpoint* connectedEndpoint = (UnixStreamEndpoint*)newSocket->first_protocol;
308 
309 	UnixStreamEndpointLocker connectedLocker(connectedEndpoint);
310 
311 	connectedEndpoint->_Spawn(this, listeningEndpoint, peerFifo);
312 
313 	// update our attributes
314 	_UnsetReceiveFifo();
315 
316 	fPeerEndpoint = connectedEndpoint;
317 	PeerAddress().SetTo(&connectedEndpoint->socket->address);
318 	fPeerEndpoint->AcquireReference();
319 	fReceiveFifo = fifo;
320 
321 	fCredentials.pid = getpid();
322 	fCredentials.uid = geteuid();
323 	fCredentials.gid = getegid();
324 
325 	fifoDeleter.Detach();
326 	peerFifoDeleter.Detach();
327 
328 	fState = unix_stream_endpoint_state::Connected;
329 	fWasConnected = true;
330 
331 	gSocketModule->set_connected(Socket());
332 
333 	release_sem(listeningEndpoint->fAcceptSemaphore);
334 
335 	connectedLocker.Unlock();
336 	peerLocker.Unlock();
337 	endpointLocker.Unlock();
338 
339 	RETURN_ERROR(B_OK);
340 }
341 
342 
343 status_t
344 UnixStreamEndpoint::Accept(net_socket** _acceptedSocket)
345 {
346 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Accept()\n", find_thread(NULL),
347 		this);
348 
349 	bigtime_t timeout = absolute_timeout(socket->receive.timeout);
350 	if (gStackModule->is_restarted_syscall())
351 		timeout = gStackModule->restore_syscall_restart_timeout();
352 	else
353 		gStackModule->store_syscall_restart_timeout(timeout);
354 
355 	UnixStreamEndpointLocker locker(this);
356 
357 	status_t error;
358 	do {
359 		locker.Unlock();
360 
361 		error = acquire_sem_etc(fAcceptSemaphore, 1,
362 			B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
363 		if (error < B_OK)
364 			break;
365 
366 		locker.Lock();
367 		error = gSocketModule->dequeue_connected(socket, _acceptedSocket);
368 	} while (error != B_OK);
369 
370 	if (error == B_TIMED_OUT && timeout == 0) {
371 		// translate non-blocking timeouts to the correct error code
372 		error = B_WOULD_BLOCK;
373 	}
374 
375 	RETURN_ERROR(error);
376 }
377 
378 
379 ssize_t
380 UnixStreamEndpoint::Send(const iovec* vecs, size_t vecCount,
381 	ancillary_data_container* ancillaryData,
382 	const struct sockaddr* address, socklen_t addressLength, int flags)
383 {
384 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Send(%p, %ld, %p)\n",
385 		find_thread(NULL), this, vecs, vecCount, ancillaryData);
386 
387 	if ((flags & ~(MSG_DONTWAIT)) != 0)
388 		return EOPNOTSUPP;
389 
390 	bigtime_t timeout = 0;
391 	if ((flags & MSG_DONTWAIT) == 0) {
392 		timeout = absolute_timeout(socket->send.timeout);
393 		if (gStackModule->is_restarted_syscall())
394 			timeout = gStackModule->restore_syscall_restart_timeout();
395 		else
396 			gStackModule->store_syscall_restart_timeout(timeout);
397 	}
398 
399 	UnixStreamEndpointLocker locker(this);
400 
401 	BReference<UnixStreamEndpoint> peerReference;
402 	UnixStreamEndpointLocker peerLocker;
403 
404 	status_t error = _LockConnectedEndpoints(locker, peerLocker);
405 	if (error != B_OK)
406 		RETURN_ERROR(error);
407 
408 	UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;
409 	peerReference.SetTo(peerEndpoint);
410 
411 	// lock the peer's FIFO
412 	UnixFifo* peerFifo = peerEndpoint->fReceiveFifo;
413 	BReference<UnixFifo> _(peerFifo);
414 	UnixFifoLocker fifoLocker(peerFifo);
415 
416 	// unlock endpoints
417 	locker.Unlock();
418 	peerLocker.Unlock();
419 
420 	ssize_t result = peerFifo->Write(vecs, vecCount, ancillaryData, NULL, timeout);
421 
422 	// Notify select()ing readers, if we successfully wrote anything.
423 	size_t readable = peerFifo->Readable();
424 	bool notifyRead = (error == B_OK && readable > 0
425 		&& !peerFifo->IsReadShutdown());
426 
427 	// Notify select()ing writers, if we failed to write anything and there's
428 	// still room to write.
429 	size_t writable = peerFifo->Writable();
430 	bool notifyWrite = (error != B_OK && writable > 0
431 		&& !peerFifo->IsWriteShutdown());
432 
433 	// re-lock our endpoint (unlock FIFO to respect locking order)
434 	fifoLocker.Unlock();
435 	locker.Lock();
436 
437 	bool peerLocked = (fPeerEndpoint == peerEndpoint
438 		&& _LockConnectedEndpoints(locker, peerLocker) == B_OK);
439 
440 	// send notifications
441 	if (peerLocked && notifyRead)
442 		gSocketModule->notify(peerEndpoint->socket, B_SELECT_READ, readable);
443 	if (notifyWrite)
444 		gSocketModule->notify(socket, B_SELECT_WRITE, writable);
445 
446 	switch (result) {
447 		case UNIX_FIFO_SHUTDOWN:
448 			if (fPeerEndpoint == peerEndpoint
449 					&& fState == unix_stream_endpoint_state::Connected) {
450 				// Orderly write shutdown on our side.
451 				// Note: Linux and Solaris also send a SIGPIPE, but according
452 				// the send() specification that shouldn't be done.
453 				result = EPIPE;
454 			} else {
455 				// The FD has been closed.
456 				result = EBADF;
457 			}
458 			break;
459 		case EPIPE:
460 			// The socket module will generate SIGPIPE for us, if necessary.
461 			break;
462 		case B_TIMED_OUT:
463 			// Translate non-blocking timeouts to the correct error code.
464 			if (timeout == 0)
465 				result = B_WOULD_BLOCK;
466 			break;
467 	}
468 
469 	RETURN_ERROR(result);
470 }
471 
472 
473 ssize_t
474 UnixStreamEndpoint::Receive(const iovec* vecs, size_t vecCount,
475 	ancillary_data_container** _ancillaryData, struct sockaddr* _address,
476 	socklen_t* _addressLength, int flags)
477 {
478 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receive(%p, %ld)\n",
479 		find_thread(NULL), this, vecs, vecCount);
480 
481 	if ((flags & ~(MSG_DONTWAIT)) != 0)
482 		return EOPNOTSUPP;
483 
484 	bigtime_t timeout = 0;
485 	if ((flags & MSG_DONTWAIT) == 0) {
486 		timeout = absolute_timeout(socket->receive.timeout);
487 		if (gStackModule->is_restarted_syscall())
488 			timeout = gStackModule->restore_syscall_restart_timeout();
489 		else
490 			gStackModule->store_syscall_restart_timeout(timeout);
491 	}
492 
493 	UnixStreamEndpointLocker locker(this);
494 
495 	// We can read as long as we have a FIFO. I.e. we are still connected, or
496 	// disconnected and not yet reconnected/listening/closed.
497 	if (fReceiveFifo == NULL)
498 		RETURN_ERROR(ENOTCONN);
499 
500 	UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;
501 	BReference<UnixStreamEndpoint> peerReference(peerEndpoint);
502 
503 	// Copy the peer address upfront. This way, if we read something, we don't
504 	// get into a potential race with Close().
505 	if (_address != NULL) {
506 		socklen_t addrLen = min_c(*_addressLength, socket->peer.ss_len);
507 		memcpy(_address, &socket->peer, addrLen);
508 		*_addressLength = addrLen;
509 	}
510 
511 	// lock our FIFO
512 	UnixFifo* fifo = fReceiveFifo;
513 	BReference<UnixFifo> _(fifo);
514 	UnixFifoLocker fifoLocker(fifo);
515 
516 	// unlock endpoint
517 	locker.Unlock();
518 
519 	ssize_t result = fifo->Read(vecs, vecCount, _ancillaryData, NULL, timeout);
520 
521 	// Notify select()ing writers, if we successfully read anything.
522 	size_t writable = fifo->Writable();
523 	bool notifyWrite = (result >= 0 && writable > 0
524 		&& !fifo->IsWriteShutdown());
525 
526 	// Notify select()ing readers, if we failed to read anything and there's
527 	// still something left to read.
528 	size_t readable = fifo->Readable();
529 	bool notifyRead = (result < 0 && readable > 0
530 		&& !fifo->IsReadShutdown());
531 
532 	// re-lock our endpoint (unlock FIFO to respect locking order)
533 	fifoLocker.Unlock();
534 	locker.Lock();
535 
536 	UnixStreamEndpointLocker peerLocker;
537 	bool peerLocked = (peerEndpoint != NULL && fPeerEndpoint == peerEndpoint
538 		&& _LockConnectedEndpoints(locker, peerLocker) == B_OK);
539 
540 	// send notifications
541 	if (notifyRead)
542 		gSocketModule->notify(socket, B_SELECT_READ, readable);
543 	if (peerLocked && notifyWrite)
544 		gSocketModule->notify(peerEndpoint->socket, B_SELECT_WRITE, writable);
545 
546 	switch (result) {
547 		case UNIX_FIFO_SHUTDOWN:
548 			// Either our socket was closed or read shutdown.
549 			if (fState == unix_stream_endpoint_state::Closed) {
550 				// The FD has been closed.
551 				result = EBADF;
552 			} else {
553 				// if (fReceiveFifo == fifo) {
554 				// 		Orderly shutdown or the peer closed the connection.
555 				// } else {
556 				//		Weird case: Peer closed connection and we are already
557 				// 		reconnected (or listening).
558 				// }
559 				result = 0;
560 			}
561 			break;
562 		case B_TIMED_OUT:
563 			// translate non-blocking timeouts to the correct error code
564 			if (timeout == 0)
565 				result = B_WOULD_BLOCK;
566 			break;
567 	}
568 
569 	RETURN_ERROR(result);
570 }
571 
572 
573 ssize_t
574 UnixStreamEndpoint::Sendable()
575 {
576 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Sendable()\n", find_thread(NULL),
577 		this);
578 
579 	UnixStreamEndpointLocker locker(this);
580 	UnixStreamEndpointLocker peerLocker;
581 
582 	status_t error = _LockConnectedEndpoints(locker, peerLocker);
583 	if (error != B_OK)
584 		RETURN_ERROR(error);
585 
586 	// lock the peer's FIFO
587 	UnixFifo* peerFifo = fPeerEndpoint->fReceiveFifo;
588 	UnixFifoLocker fifoLocker(peerFifo);
589 
590 	RETURN_ERROR(peerFifo->Writable());
591 }
592 
593 
594 ssize_t
595 UnixStreamEndpoint::Receivable()
596 {
597 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receivable()\n", find_thread(NULL),
598 		this);
599 
600 	UnixStreamEndpointLocker locker(this);
601 
602 	if (fState == unix_stream_endpoint_state::Listening)
603 		return gSocketModule->count_connected(socket);
604 
605 	if (fState != unix_stream_endpoint_state::Connected)
606 		RETURN_ERROR(ENOTCONN);
607 
608 	UnixFifoLocker fifoLocker(fReceiveFifo);
609 	ssize_t readable = fReceiveFifo->Readable();
610 	if (readable == 0 && (fReceiveFifo->IsWriteShutdown()
611 			|| fReceiveFifo->IsReadShutdown())) {
612 		RETURN_ERROR(ENOTCONN);
613 	}
614 	RETURN_ERROR(readable);
615 }
616 
617 
618 status_t
619 UnixStreamEndpoint::SetReceiveBufferSize(size_t size)
620 {
621 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::SetReceiveBufferSize(%lu)\n",
622 		find_thread(NULL), this, size);
623 
624 	UnixStreamEndpointLocker locker(this);
625 
626 	if (fReceiveFifo == NULL)
627 		return B_BAD_VALUE;
628 
629 	UnixFifoLocker fifoLocker(fReceiveFifo);
630 	return fReceiveFifo->SetBufferCapacity(size);
631 }
632 
633 
634 status_t
635 UnixStreamEndpoint::GetPeerCredentials(ucred* credentials)
636 {
637 	UnixStreamEndpointLocker locker(this);
638 	UnixStreamEndpointLocker peerLocker;
639 
640 	status_t error = _LockConnectedEndpoints(locker, peerLocker);
641 	if (error != B_OK)
642 		RETURN_ERROR(error);
643 
644 	*credentials = fPeerEndpoint->fCredentials;
645 
646 	return B_OK;
647 }
648 
649 
650 status_t
651 UnixStreamEndpoint::Shutdown(int direction)
652 {
653 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Shutdown(%d)\n",
654 		find_thread(NULL), this, direction);
655 
656 	uint32 shutdown;
657 	uint32 peerShutdown;
658 
659 	// translate the direction into shutdown flags for our and the peer fifo
660 	switch (direction) {
661 		case SHUT_RD:
662 			shutdown = UNIX_FIFO_SHUTDOWN_READ;
663 			peerShutdown = 0;
664 			break;
665 		case SHUT_WR:
666 			shutdown = 0;
667 			peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
668 			break;
669 		case SHUT_RDWR:
670 			shutdown = UNIX_FIFO_SHUTDOWN_READ;
671 			peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
672 			break;
673 		default:
674 			RETURN_ERROR(B_BAD_VALUE);
675 	}
676 
677 	// lock endpoints
678 	UnixStreamEndpointLocker locker(this);
679 	UnixStreamEndpointLocker peerLocker;
680 
681 	status_t error = _LockConnectedEndpoints(locker, peerLocker);
682 	if (error != B_OK)
683 		RETURN_ERROR(error);
684 
685 	// shutdown our FIFO
686 	fReceiveFifo->Lock();
687 	fReceiveFifo->Shutdown(shutdown);
688 	fReceiveFifo->Unlock();
689 
690 	// shutdown peer FIFO
691 	fPeerEndpoint->fReceiveFifo->Lock();
692 	fPeerEndpoint->fReceiveFifo->Shutdown(peerShutdown);
693 	fPeerEndpoint->fReceiveFifo->Unlock();
694 
695 	// send select notifications
696 	if (direction == SHUT_RD || direction == SHUT_RDWR) {
697 		gSocketModule->notify(socket, B_SELECT_READ, EPIPE);
698 		gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_WRITE, EPIPE);
699 	}
700 	if (direction == SHUT_WR || direction == SHUT_RDWR) {
701 		gSocketModule->notify(socket, B_SELECT_WRITE, EPIPE);
702 		gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_READ, EPIPE);
703 	}
704 
705 	RETURN_ERROR(B_OK);
706 }
707 
708 
709 void
710 UnixStreamEndpoint::_Spawn(UnixStreamEndpoint* connectingEndpoint,
711 	UnixStreamEndpoint* listeningEndpoint, UnixFifo* fifo)
712 {
713 	ProtocolSocket::Open();
714 
715 	fIsChild = true;
716 	fPeerEndpoint = connectingEndpoint;
717 	fPeerEndpoint->AcquireReference();
718 
719 	fReceiveFifo = fifo;
720 
721 	PeerAddress().SetTo(&connectingEndpoint->socket->address);
722 
723 	fCredentials = listeningEndpoint->fCredentials;
724 
725 	fState = unix_stream_endpoint_state::Connected;
726 
727 	gSocketModule->set_connected(Socket());
728 }
729 
730 
731 void
732 UnixStreamEndpoint::_Disconnect()
733 {
734 	// Both endpoints must be locked.
735 
736 	// Write shutdown the receive FIFO.
737 	fReceiveFifo->Lock();
738 	fReceiveFifo->Shutdown(UNIX_FIFO_SHUTDOWN_WRITE);
739 	fReceiveFifo->Unlock();
740 
741 	// select() notification.
742 	gSocketModule->notify(socket, B_SELECT_READ, ECONNRESET);
743 	gSocketModule->notify(socket, B_SELECT_WRITE, ECONNRESET);
744 
745 	// Unset the peer endpoint.
746 	fPeerEndpoint->ReleaseReference();
747 	fPeerEndpoint = NULL;
748 
749 	// We're officially disconnected.
750 // TODO: Deal with non accept()ed connections correctly!
751 	fIsChild = false;
752 	fState = unix_stream_endpoint_state::NotConnected;
753 }
754 
755 
756 status_t
757 UnixStreamEndpoint::_LockConnectedEndpoints(UnixStreamEndpointLocker& locker,
758 	UnixStreamEndpointLocker& peerLocker)
759 {
760 	if (fState != unix_stream_endpoint_state::Connected)
761 		RETURN_ERROR(fWasConnected ? EPIPE : ENOTCONN);
762 
763 	// We need to lock the peer, too. Get a reference -- we might need to
764 	// unlock ourselves to get the locking order right.
765 	BReference<UnixStreamEndpoint> peerReference(fPeerEndpoint);
766 	UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;
767 
768 	if (fIsChild) {
769 		// We're the child, but locking order is the other way around.
770 		locker.Unlock();
771 		peerLocker.SetTo(peerEndpoint, false);
772 		locker.Lock();
773 
774 		// recheck our state, also whether the peer is still the same
775 		if (fState != unix_stream_endpoint_state::Connected || peerEndpoint != fPeerEndpoint) {
776 			peerLocker.Unset();
777 			RETURN_ERROR(ENOTCONN);
778 		}
779 	} else
780 		peerLocker.SetTo(peerEndpoint, false);
781 
782 	RETURN_ERROR(B_OK);
783 }
784 
785 
786 status_t
787 UnixStreamEndpoint::_Unbind()
788 {
789 	if (fState == unix_stream_endpoint_state::Connected
790 		|| fState == unix_stream_endpoint_state::Listening)
791 		RETURN_ERROR(B_BAD_VALUE);
792 
793 	if (IsBound())
794 		RETURN_ERROR(UnixEndpoint::_Unbind());
795 
796 	RETURN_ERROR(B_OK);
797 }
798 
799 
800 void
801 UnixStreamEndpoint::_UnsetReceiveFifo()
802 {
803 	if (fReceiveFifo) {
804 		fReceiveFifo->ReleaseReference();
805 		fReceiveFifo = NULL;
806 	}
807 }
808 
809 
810 void
811 UnixStreamEndpoint::_StopListening()
812 {
813 	if (fState == unix_stream_endpoint_state::Listening) {
814 		delete_sem(fAcceptSemaphore);
815 		fAcceptSemaphore = -1;
816 		fState = unix_stream_endpoint_state::NotConnected;
817 	}
818 }
819