xref: /haiku/src/add-ons/kernel/network/protocols/unix/UnixStreamEndpoint.cpp (revision 4c07199d8201fcf267e90be0d24b76799d03cea6)
1 /*
2  * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
3  * Distributed under the terms of the MIT License.
4  */
5 
6 
7 #include "UnixStreamEndpoint.h"
8 
9 #include <stdio.h>
10 #include <sys/stat.h>
11 
12 #include <AutoDeleter.h>
13 
14 #include <vfs.h>
15 
16 #include "UnixAddressManager.h"
17 #include "UnixFifo.h"
18 
19 
20 #define UNIX_STREAM_ENDPOINT_DEBUG_LEVEL	0
21 #define UNIX_DEBUG_LEVEL					UNIX_STREAM_ENDPOINT_DEBUG_LEVEL
22 #include "UnixDebug.h"
23 
24 
25 // Note on locking order (outermost -> innermost):
26 // UnixStreamEndpoint: connecting -> listening -> child
27 // -> UnixFifo (never lock more than one at a time)
28 // -> UnixAddressManager
29 
30 
31 UnixStreamEndpoint::UnixStreamEndpoint(net_socket* socket)
32 	:
33 	UnixEndpoint(socket),
34 	fPeerEndpoint(NULL),
35 	fReceiveFifo(NULL),
36 	fState(unix_stream_endpoint_state::Closed),
37 	fAcceptSemaphore(-1),
38 	fIsChild(false),
39 	fWasConnected(false)
40 {
41 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::UnixStreamEndpoint()\n",
42 		find_thread(NULL), this);
43 }
44 
45 
46 UnixStreamEndpoint::~UnixStreamEndpoint()
47 {
48 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::~UnixStreamEndpoint()\n",
49 		find_thread(NULL), this);
50 }
51 
52 
53 status_t
54 UnixStreamEndpoint::Init()
55 {
56 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Init()\n", find_thread(NULL),
57 		this);
58 
59 	RETURN_ERROR(B_OK);
60 }
61 
62 
63 void
64 UnixStreamEndpoint::Uninit()
65 {
66 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Uninit()\n", find_thread(NULL),
67 		this);
68 
69 	// check whether we're closed
70 	UnixStreamEndpointLocker locker(this);
71 	bool closed = (fState == unix_stream_endpoint_state::Closed);
72 	locker.Unlock();
73 
74 	if (!closed) {
75 		// That probably means, we're a child endpoint of a listener and
76 		// have been fully connected, but not yet accepted. Our Close()
77 		// hook isn't called in this case. Do it manually.
78 		Close();
79 	}
80 
81 	ReleaseReference();
82 }
83 
84 
85 status_t
86 UnixStreamEndpoint::Open()
87 {
88 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Open()\n", find_thread(NULL),
89 		this);
90 
91 	status_t error = ProtocolSocket::Open();
92 	if (error != B_OK)
93 		RETURN_ERROR(error);
94 
95 	fState = unix_stream_endpoint_state::NotConnected;
96 
97 	RETURN_ERROR(B_OK);
98 }
99 
100 
101 status_t
102 UnixStreamEndpoint::Close()
103 {
104 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Close()\n", find_thread(NULL),
105 		this);
106 
107 	UnixStreamEndpointLocker locker(this);
108 
109 	if (fState == unix_stream_endpoint_state::Connected) {
110 		UnixStreamEndpointLocker peerLocker;
111 		if (_LockConnectedEndpoints(locker, peerLocker) == B_OK) {
112 			// We're still connected. Disconnect both endpoints!
113 			fPeerEndpoint->_Disconnect();
114 			_Disconnect();
115 		}
116 	}
117 
118 	if (fState == unix_stream_endpoint_state::Listening)
119 		_StopListening();
120 
121 	_Unbind();
122 
123 	fState = unix_stream_endpoint_state::Closed;
124 	RETURN_ERROR(B_OK);
125 }
126 
127 
128 status_t
129 UnixStreamEndpoint::Free()
130 {
131 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Free()\n", find_thread(NULL),
132 		this);
133 
134 	UnixStreamEndpointLocker locker(this);
135 
136 	_UnsetReceiveFifo();
137 
138 	RETURN_ERROR(B_OK);
139 }
140 
141 
142 status_t
143 UnixStreamEndpoint::Bind(const struct sockaddr* _address)
144 {
145 	if (_address->sa_family != AF_UNIX)
146 		RETURN_ERROR(EAFNOSUPPORT);
147 
148 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Bind(\"%s\")\n",
149 		find_thread(NULL), this,
150 		ConstSocketAddress(&gAddressModule, _address).AsString().Data());
151 
152 	const sockaddr_un* address = (const sockaddr_un*)_address;
153 
154 	UnixStreamEndpointLocker endpointLocker(this);
155 
156 	if (fState != unix_stream_endpoint_state::NotConnected || IsBound())
157 		RETURN_ERROR(B_BAD_VALUE);
158 
159 	RETURN_ERROR(_Bind(address));
160 }
161 
162 
163 status_t
164 UnixStreamEndpoint::Unbind()
165 {
166 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Unbind()\n", find_thread(NULL),
167 		this);
168 
169 	UnixStreamEndpointLocker endpointLocker(this);
170 
171 	RETURN_ERROR(_Unbind());
172 }
173 
174 
175 status_t
176 UnixStreamEndpoint::Listen(int backlog)
177 {
178 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Listen(%d)\n", find_thread(NULL),
179 		this, backlog);
180 
181 	UnixStreamEndpointLocker endpointLocker(this);
182 
183 	if (!IsBound())
184 		RETURN_ERROR(EDESTADDRREQ);
185 	if (fState != unix_stream_endpoint_state::NotConnected
186 		&& fState != unix_stream_endpoint_state::Listening)
187 		RETURN_ERROR(EINVAL);
188 
189 	gSocketModule->set_max_backlog(socket, backlog);
190 
191 	if (fState == unix_stream_endpoint_state::NotConnected) {
192 		fAcceptSemaphore = create_sem(0, "unix accept");
193 		if (fAcceptSemaphore < 0)
194 			RETURN_ERROR(ENOBUFS);
195 
196 		_UnsetReceiveFifo();
197 
198 		fCredentials.pid = getpid();
199 		fCredentials.uid = geteuid();
200 		fCredentials.gid = getegid();
201 
202 		fState = unix_stream_endpoint_state::Listening;
203 	}
204 
205 	RETURN_ERROR(B_OK);
206 }
207 
208 
209 status_t
210 UnixStreamEndpoint::Connect(const struct sockaddr* _address)
211 {
212 	if (_address->sa_family != AF_UNIX)
213 		RETURN_ERROR(EAFNOSUPPORT);
214 
215 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Connect(\"%s\")\n",
216 		find_thread(NULL), this,
217 		ConstSocketAddress(&gAddressModule, _address).AsString().Data());
218 
219 	const sockaddr_un* address = (const sockaddr_un*)_address;
220 
221 	UnixStreamEndpointLocker endpointLocker(this);
222 
223 	if (fState == unix_stream_endpoint_state::Connected)
224 		RETURN_ERROR(EISCONN);
225 
226 	if (fState != unix_stream_endpoint_state::NotConnected)
227 		RETURN_ERROR(B_BAD_VALUE);
228 // TODO: If listening, we could set the backlog to 0 and connect.
229 
230 	// check the address first
231 	UnixAddress unixAddress;
232 
233 	if (address->sun_path[0] == '\0') {
234 		// internal address space (or empty address)
235 		int32 internalID;
236 		if (UnixAddress::IsEmptyAddress(*address))
237 			RETURN_ERROR(B_BAD_VALUE);
238 
239 		internalID = UnixAddress::InternalID(*address);
240 		if (internalID < 0)
241 			RETURN_ERROR(internalID);
242 
243 		unixAddress.SetTo(internalID);
244 	} else {
245 		// FS address space
246 		size_t pathLen = strnlen(address->sun_path, sizeof(address->sun_path));
247 		if (pathLen == 0 || pathLen == sizeof(address->sun_path))
248 			RETURN_ERROR(B_BAD_VALUE);
249 
250 		struct stat st;
251 		status_t error = vfs_read_stat(-1, address->sun_path, true, &st,
252 			!gStackModule->is_syscall());
253 		if (error != B_OK)
254 			RETURN_ERROR(error);
255 
256 		if (!S_ISSOCK(st.st_mode))
257 			RETURN_ERROR(B_BAD_VALUE);
258 
259 		unixAddress.SetTo(st.st_dev, st.st_ino, NULL);
260 	}
261 
262 	// get the peer endpoint
263 	UnixAddressManagerLocker addressLocker(gAddressManager);
264 	UnixEndpoint* listeningUnixEndpoint = gAddressManager.Lookup(unixAddress);
265 	if (listeningUnixEndpoint == NULL)
266 		RETURN_ERROR(ECONNREFUSED);
267 	UnixStreamEndpoint* listeningEndpoint
268 		= dynamic_cast<UnixStreamEndpoint*>(listeningUnixEndpoint);
269 	if (listeningEndpoint == NULL)
270 		RETURN_ERROR(EPROTOTYPE);
271 	BReference<UnixStreamEndpoint> peerReference(listeningEndpoint);
272 	addressLocker.Unlock();
273 
274 	UnixStreamEndpointLocker peerLocker(listeningEndpoint);
275 
276 	if (!listeningEndpoint->IsBound()
277 		|| listeningEndpoint->fState != unix_stream_endpoint_state::Listening
278 		|| listeningEndpoint->fAddress != unixAddress) {
279 		RETURN_ERROR(ECONNREFUSED);
280 	}
281 
282 	// Allocate FIFOs for us and the socket we're going to spawn. We do that
283 	// now, so that the mess we need to cleanup, if allocating them fails, is
284 	// harmless.
285 	UnixFifo* fifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT, UnixFifoType::Stream);
286 	UnixFifo* peerFifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT, UnixFifoType::Stream);
287 	ObjectDeleter<UnixFifo> fifoDeleter(fifo);
288 	ObjectDeleter<UnixFifo> peerFifoDeleter(peerFifo);
289 
290 	status_t error;
291 	if ((error = fifo->Init()) != B_OK || (error = peerFifo->Init()) != B_OK)
292 		return error;
293 
294 	// spawn new endpoint for accept()
295 	net_socket* newSocket;
296 	error = gSocketModule->spawn_pending_socket(listeningEndpoint->socket,
297 		&newSocket);
298 	if (error != B_OK)
299 		RETURN_ERROR(error);
300 
301 	// init connected peer endpoint
302 	UnixStreamEndpoint* connectedEndpoint = (UnixStreamEndpoint*)newSocket->first_protocol;
303 
304 	UnixStreamEndpointLocker connectedLocker(connectedEndpoint);
305 
306 	connectedEndpoint->_Spawn(this, listeningEndpoint, peerFifo);
307 
308 	// update our attributes
309 	_UnsetReceiveFifo();
310 
311 	fPeerEndpoint = connectedEndpoint;
312 	PeerAddress().SetTo(&connectedEndpoint->socket->address);
313 	fPeerEndpoint->AcquireReference();
314 	fReceiveFifo = fifo;
315 
316 	fCredentials.pid = getpid();
317 	fCredentials.uid = geteuid();
318 	fCredentials.gid = getegid();
319 
320 	fifoDeleter.Detach();
321 	peerFifoDeleter.Detach();
322 
323 	fState = unix_stream_endpoint_state::Connected;
324 	fWasConnected = true;
325 
326 	gSocketModule->set_connected(Socket());
327 
328 	release_sem(listeningEndpoint->fAcceptSemaphore);
329 
330 	connectedLocker.Unlock();
331 	peerLocker.Unlock();
332 	endpointLocker.Unlock();
333 
334 	RETURN_ERROR(B_OK);
335 }
336 
337 
338 status_t
339 UnixStreamEndpoint::Accept(net_socket** _acceptedSocket)
340 {
341 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Accept()\n", find_thread(NULL),
342 		this);
343 
344 	bigtime_t timeout = absolute_timeout(socket->receive.timeout);
345 	if (gStackModule->is_restarted_syscall())
346 		timeout = gStackModule->restore_syscall_restart_timeout();
347 	else
348 		gStackModule->store_syscall_restart_timeout(timeout);
349 
350 	UnixStreamEndpointLocker locker(this);
351 
352 	status_t error;
353 	do {
354 		locker.Unlock();
355 
356 		error = acquire_sem_etc(fAcceptSemaphore, 1,
357 			B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
358 		if (error < B_OK)
359 			break;
360 
361 		locker.Lock();
362 		error = gSocketModule->dequeue_connected(socket, _acceptedSocket);
363 	} while (error != B_OK);
364 
365 	if (error == B_TIMED_OUT && timeout == 0) {
366 		// translate non-blocking timeouts to the correct error code
367 		error = B_WOULD_BLOCK;
368 	}
369 
370 	RETURN_ERROR(error);
371 }
372 
373 
374 ssize_t
375 UnixStreamEndpoint::Send(const iovec* vecs, size_t vecCount,
376 	ancillary_data_container* ancillaryData,
377 	const struct sockaddr* address, socklen_t addressLength, int flags)
378 {
379 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Send(%p, %ld, %p)\n",
380 		find_thread(NULL), this, vecs, vecCount, ancillaryData);
381 
382 	if ((flags & ~(MSG_DONTWAIT)) != 0)
383 		return EOPNOTSUPP;
384 
385 	bigtime_t timeout = 0;
386 	if ((flags & MSG_DONTWAIT) == 0) {
387 		timeout = absolute_timeout(socket->send.timeout);
388 		if (gStackModule->is_restarted_syscall())
389 			timeout = gStackModule->restore_syscall_restart_timeout();
390 		else
391 			gStackModule->store_syscall_restart_timeout(timeout);
392 	}
393 
394 	UnixStreamEndpointLocker locker(this);
395 
396 	BReference<UnixStreamEndpoint> peerReference;
397 	UnixStreamEndpointLocker peerLocker;
398 
399 	status_t error = _LockConnectedEndpoints(locker, peerLocker);
400 	if (error != B_OK)
401 		RETURN_ERROR(error);
402 
403 	UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;
404 	peerReference.SetTo(peerEndpoint);
405 
406 	// lock the peer's FIFO
407 	UnixFifo* peerFifo = peerEndpoint->fReceiveFifo;
408 	BReference<UnixFifo> _(peerFifo);
409 	UnixFifoLocker fifoLocker(peerFifo);
410 
411 	// unlock endpoints
412 	locker.Unlock();
413 	peerLocker.Unlock();
414 
415 	ssize_t result = peerFifo->Write(vecs, vecCount, ancillaryData, NULL, timeout);
416 
417 	// Notify select()ing readers, if we successfully wrote anything.
418 	size_t readable = peerFifo->Readable();
419 	bool notifyRead = (error == B_OK && readable > 0
420 		&& !peerFifo->IsReadShutdown());
421 
422 	// Notify select()ing writers, if we failed to write anything and there's
423 	// still room to write.
424 	size_t writable = peerFifo->Writable();
425 	bool notifyWrite = (error != B_OK && writable > 0
426 		&& !peerFifo->IsWriteShutdown());
427 
428 	// re-lock our endpoint (unlock FIFO to respect locking order)
429 	fifoLocker.Unlock();
430 	locker.Lock();
431 
432 	bool peerLocked = (fPeerEndpoint == peerEndpoint
433 		&& _LockConnectedEndpoints(locker, peerLocker) == B_OK);
434 
435 	// send notifications
436 	if (peerLocked && notifyRead)
437 		gSocketModule->notify(peerEndpoint->socket, B_SELECT_READ, readable);
438 	if (notifyWrite)
439 		gSocketModule->notify(socket, B_SELECT_WRITE, writable);
440 
441 	switch (result) {
442 		case UNIX_FIFO_SHUTDOWN:
443 			if (fPeerEndpoint == peerEndpoint
444 					&& fState == unix_stream_endpoint_state::Connected) {
445 				// Orderly write shutdown on our side.
446 				// Note: Linux and Solaris also send a SIGPIPE, but according
447 				// the send() specification that shouldn't be done.
448 				result = EPIPE;
449 			} else {
450 				// The FD has been closed.
451 				result = EBADF;
452 			}
453 			break;
454 		case EPIPE:
455 			// The socket module will generate SIGPIPE for us, if necessary.
456 			break;
457 		case B_TIMED_OUT:
458 			// Translate non-blocking timeouts to the correct error code.
459 			if (timeout == 0)
460 				result = B_WOULD_BLOCK;
461 			break;
462 	}
463 
464 	RETURN_ERROR(result);
465 }
466 
467 
468 ssize_t
469 UnixStreamEndpoint::Receive(const iovec* vecs, size_t vecCount,
470 	ancillary_data_container** _ancillaryData, struct sockaddr* _address,
471 	socklen_t* _addressLength, int flags)
472 {
473 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receive(%p, %ld)\n",
474 		find_thread(NULL), this, vecs, vecCount);
475 
476 	if ((flags & ~(MSG_DONTWAIT)) != 0)
477 		return EOPNOTSUPP;
478 
479 	bigtime_t timeout = 0;
480 	if ((flags & MSG_DONTWAIT) == 0) {
481 		timeout = absolute_timeout(socket->receive.timeout);
482 		if (gStackModule->is_restarted_syscall())
483 			timeout = gStackModule->restore_syscall_restart_timeout();
484 		else
485 			gStackModule->store_syscall_restart_timeout(timeout);
486 	}
487 
488 	UnixStreamEndpointLocker locker(this);
489 
490 	// We can read as long as we have a FIFO. I.e. we are still connected, or
491 	// disconnected and not yet reconnected/listening/closed.
492 	if (fReceiveFifo == NULL)
493 		RETURN_ERROR(ENOTCONN);
494 
495 	UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;
496 	BReference<UnixStreamEndpoint> peerReference(peerEndpoint);
497 
498 	// Copy the peer address upfront. This way, if we read something, we don't
499 	// get into a potential race with Close().
500 	if (_address != NULL) {
501 		socklen_t addrLen = min_c(*_addressLength, socket->peer.ss_len);
502 		memcpy(_address, &socket->peer, addrLen);
503 		*_addressLength = addrLen;
504 	}
505 
506 	// lock our FIFO
507 	UnixFifo* fifo = fReceiveFifo;
508 	BReference<UnixFifo> _(fifo);
509 	UnixFifoLocker fifoLocker(fifo);
510 
511 	// unlock endpoint
512 	locker.Unlock();
513 
514 	ssize_t result = fifo->Read(vecs, vecCount, _ancillaryData, NULL, timeout);
515 
516 	// Notify select()ing writers, if we successfully read anything.
517 	size_t writable = fifo->Writable();
518 	bool notifyWrite = (result >= 0 && writable > 0
519 		&& !fifo->IsWriteShutdown());
520 
521 	// Notify select()ing readers, if we failed to read anything and there's
522 	// still something left to read.
523 	size_t readable = fifo->Readable();
524 	bool notifyRead = (result < 0 && readable > 0
525 		&& !fifo->IsReadShutdown());
526 
527 	// re-lock our endpoint (unlock FIFO to respect locking order)
528 	fifoLocker.Unlock();
529 	locker.Lock();
530 
531 	UnixStreamEndpointLocker peerLocker;
532 	bool peerLocked = (peerEndpoint != NULL && fPeerEndpoint == peerEndpoint
533 		&& _LockConnectedEndpoints(locker, peerLocker) == B_OK);
534 
535 	// send notifications
536 	if (notifyRead)
537 		gSocketModule->notify(socket, B_SELECT_READ, readable);
538 	if (peerLocked && notifyWrite)
539 		gSocketModule->notify(peerEndpoint->socket, B_SELECT_WRITE, writable);
540 
541 	switch (result) {
542 		case UNIX_FIFO_SHUTDOWN:
543 			// Either our socket was closed or read shutdown.
544 			if (fState == unix_stream_endpoint_state::Closed) {
545 				// The FD has been closed.
546 				result = EBADF;
547 			} else {
548 				// if (fReceiveFifo == fifo) {
549 				// 		Orderly shutdown or the peer closed the connection.
550 				// } else {
551 				//		Weird case: Peer closed connection and we are already
552 				// 		reconnected (or listening).
553 				// }
554 				result = 0;
555 			}
556 			break;
557 		case B_TIMED_OUT:
558 			// translate non-blocking timeouts to the correct error code
559 			if (timeout == 0)
560 				result = B_WOULD_BLOCK;
561 			break;
562 	}
563 
564 	RETURN_ERROR(result);
565 }
566 
567 
568 ssize_t
569 UnixStreamEndpoint::Sendable()
570 {
571 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Sendable()\n", find_thread(NULL),
572 		this);
573 
574 	UnixStreamEndpointLocker locker(this);
575 	UnixStreamEndpointLocker peerLocker;
576 
577 	status_t error = _LockConnectedEndpoints(locker, peerLocker);
578 	if (error != B_OK)
579 		RETURN_ERROR(error);
580 
581 	// lock the peer's FIFO
582 	UnixFifo* peerFifo = fPeerEndpoint->fReceiveFifo;
583 	UnixFifoLocker fifoLocker(peerFifo);
584 
585 	RETURN_ERROR(peerFifo->Writable());
586 }
587 
588 
589 ssize_t
590 UnixStreamEndpoint::Receivable()
591 {
592 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receivable()\n", find_thread(NULL),
593 		this);
594 
595 	UnixStreamEndpointLocker locker(this);
596 
597 	if (fState == unix_stream_endpoint_state::Listening)
598 		return gSocketModule->count_connected(socket);
599 
600 	if (fState != unix_stream_endpoint_state::Connected)
601 		RETURN_ERROR(ENOTCONN);
602 
603 	UnixFifoLocker fifoLocker(fReceiveFifo);
604 	ssize_t readable = fReceiveFifo->Readable();
605 	if (readable == 0 && (fReceiveFifo->IsWriteShutdown()
606 			|| fReceiveFifo->IsReadShutdown())) {
607 		RETURN_ERROR(ENOTCONN);
608 	}
609 	RETURN_ERROR(readable);
610 }
611 
612 
613 status_t
614 UnixStreamEndpoint::SetReceiveBufferSize(size_t size)
615 {
616 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::SetReceiveBufferSize(%lu)\n",
617 		find_thread(NULL), this, size);
618 
619 	UnixStreamEndpointLocker locker(this);
620 
621 	if (fReceiveFifo == NULL)
622 		return B_BAD_VALUE;
623 
624 	UnixFifoLocker fifoLocker(fReceiveFifo);
625 	return fReceiveFifo->SetBufferCapacity(size);
626 }
627 
628 
629 status_t
630 UnixStreamEndpoint::GetPeerCredentials(ucred* credentials)
631 {
632 	UnixStreamEndpointLocker locker(this);
633 	UnixStreamEndpointLocker peerLocker;
634 
635 	status_t error = _LockConnectedEndpoints(locker, peerLocker);
636 	if (error != B_OK)
637 		RETURN_ERROR(error);
638 
639 	*credentials = fPeerEndpoint->fCredentials;
640 
641 	return B_OK;
642 }
643 
644 
645 status_t
646 UnixStreamEndpoint::Shutdown(int direction)
647 {
648 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Shutdown(%d)\n",
649 		find_thread(NULL), this, direction);
650 
651 	uint32 shutdown;
652 	uint32 peerShutdown;
653 
654 	// translate the direction into shutdown flags for our and the peer fifo
655 	switch (direction) {
656 		case SHUT_RD:
657 			shutdown = UNIX_FIFO_SHUTDOWN_READ;
658 			peerShutdown = 0;
659 			break;
660 		case SHUT_WR:
661 			shutdown = 0;
662 			peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
663 			break;
664 		case SHUT_RDWR:
665 			shutdown = UNIX_FIFO_SHUTDOWN_READ;
666 			peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
667 			break;
668 		default:
669 			RETURN_ERROR(B_BAD_VALUE);
670 	}
671 
672 	// lock endpoints
673 	UnixStreamEndpointLocker locker(this);
674 	UnixStreamEndpointLocker peerLocker;
675 
676 	status_t error = _LockConnectedEndpoints(locker, peerLocker);
677 	if (error != B_OK)
678 		RETURN_ERROR(error);
679 
680 	// shutdown our FIFO
681 	fReceiveFifo->Lock();
682 	fReceiveFifo->Shutdown(shutdown);
683 	fReceiveFifo->Unlock();
684 
685 	// shutdown peer FIFO
686 	fPeerEndpoint->fReceiveFifo->Lock();
687 	fPeerEndpoint->fReceiveFifo->Shutdown(peerShutdown);
688 	fPeerEndpoint->fReceiveFifo->Unlock();
689 
690 	// send select notifications
691 	if (direction == SHUT_RD || direction == SHUT_RDWR) {
692 		gSocketModule->notify(socket, B_SELECT_READ, EPIPE);
693 		gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_WRITE, EPIPE);
694 	}
695 	if (direction == SHUT_WR || direction == SHUT_RDWR) {
696 		gSocketModule->notify(socket, B_SELECT_WRITE, EPIPE);
697 		gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_READ, EPIPE);
698 	}
699 
700 	RETURN_ERROR(B_OK);
701 }
702 
703 
704 void
705 UnixStreamEndpoint::_Spawn(UnixStreamEndpoint* connectingEndpoint,
706 	UnixStreamEndpoint* listeningEndpoint, UnixFifo* fifo)
707 {
708 	ProtocolSocket::Open();
709 
710 	fIsChild = true;
711 	fPeerEndpoint = connectingEndpoint;
712 	fPeerEndpoint->AcquireReference();
713 
714 	fReceiveFifo = fifo;
715 
716 	PeerAddress().SetTo(&connectingEndpoint->socket->address);
717 
718 	fCredentials = listeningEndpoint->fCredentials;
719 
720 	fState = unix_stream_endpoint_state::Connected;
721 
722 	gSocketModule->set_connected(Socket());
723 }
724 
725 
726 void
727 UnixStreamEndpoint::_Disconnect()
728 {
729 	// Both endpoints must be locked.
730 
731 	// Write shutdown the receive FIFO.
732 	fReceiveFifo->Lock();
733 	fReceiveFifo->Shutdown(UNIX_FIFO_SHUTDOWN_WRITE);
734 	fReceiveFifo->Unlock();
735 
736 	// select() notification.
737 	gSocketModule->notify(socket, B_SELECT_READ, ECONNRESET);
738 	gSocketModule->notify(socket, B_SELECT_WRITE, ECONNRESET);
739 
740 	// Unset the peer endpoint.
741 	fPeerEndpoint->ReleaseReference();
742 	fPeerEndpoint = NULL;
743 
744 	// We're officially disconnected.
745 // TODO: Deal with non accept()ed connections correctly!
746 	fIsChild = false;
747 	fState = unix_stream_endpoint_state::NotConnected;
748 }
749 
750 
751 status_t
752 UnixStreamEndpoint::_LockConnectedEndpoints(UnixStreamEndpointLocker& locker,
753 	UnixStreamEndpointLocker& peerLocker)
754 {
755 	if (fState != unix_stream_endpoint_state::Connected)
756 		RETURN_ERROR(fWasConnected ? EPIPE : ENOTCONN);
757 
758 	// We need to lock the peer, too. Get a reference -- we might need to
759 	// unlock ourselves to get the locking order right.
760 	BReference<UnixStreamEndpoint> peerReference(fPeerEndpoint);
761 	UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;
762 
763 	if (fIsChild) {
764 		// We're the child, but locking order is the other way around.
765 		locker.Unlock();
766 		peerLocker.SetTo(peerEndpoint, false);
767 
768 		locker.Lock();
769 
770 		// recheck our state, also whether the peer is still the same
771 		if (fState != unix_stream_endpoint_state::Connected || peerEndpoint != fPeerEndpoint)
772 			RETURN_ERROR(ENOTCONN);
773 	} else
774 		peerLocker.SetTo(peerEndpoint, false);
775 
776 	RETURN_ERROR(B_OK);
777 }
778 
779 
780 status_t
781 UnixStreamEndpoint::_Unbind()
782 {
783 	if (fState == unix_stream_endpoint_state::Connected
784 		|| fState == unix_stream_endpoint_state::Listening)
785 		RETURN_ERROR(B_BAD_VALUE);
786 
787 	if (IsBound())
788 		RETURN_ERROR(UnixEndpoint::_Unbind());
789 
790 	RETURN_ERROR(B_OK);
791 }
792 
793 
794 void
795 UnixStreamEndpoint::_UnsetReceiveFifo()
796 {
797 	if (fReceiveFifo) {
798 		fReceiveFifo->ReleaseReference();
799 		fReceiveFifo = NULL;
800 	}
801 }
802 
803 
804 void
805 UnixStreamEndpoint::_StopListening()
806 {
807 	if (fState == unix_stream_endpoint_state::Listening) {
808 		delete_sem(fAcceptSemaphore);
809 		fAcceptSemaphore = -1;
810 		fState = unix_stream_endpoint_state::NotConnected;
811 	}
812 }
813