xref: /haiku/src/add-ons/kernel/network/protocols/unix/UnixStreamEndpoint.cpp (revision e1c4049fed1047bdb957b0529e1921e97ef94770)
1 /*
2  * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
3  * Distributed under the terms of the MIT License.
4  */
5 
6 
7 #include "UnixStreamEndpoint.h"
8 
9 #include <stdio.h>
10 #include <sys/stat.h>
11 
12 #include <AutoDeleter.h>
13 
14 #include <vfs.h>
15 
16 #include "UnixAddressManager.h"
17 #include "UnixFifo.h"
18 
19 
20 #define UNIX_STREAM_ENDPOINT_DEBUG_LEVEL	0
21 #define UNIX_DEBUG_LEVEL					UNIX_STREAM_ENDPOINT_DEBUG_LEVEL
22 #include "UnixDebug.h"
23 
24 
25 // Note on locking order (outermost -> innermost):
26 // UnixStreamEndpoint: connecting -> listening -> child
27 // -> UnixFifo (never lock more than one at a time)
28 // -> UnixAddressManager
29 
30 
31 UnixStreamEndpoint::UnixStreamEndpoint(net_socket* socket)
32 	:
33 	UnixEndpoint(socket),
34 	fPeerEndpoint(NULL),
35 	fReceiveFifo(NULL),
36 	fState(unix_stream_endpoint_state::Closed),
37 	fAcceptSemaphore(-1),
38 	fIsChild(false),
39 	fWasConnected(false)
40 {
41 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::UnixStreamEndpoint()\n",
42 		find_thread(NULL), this);
43 }
44 
45 
46 UnixStreamEndpoint::~UnixStreamEndpoint()
47 {
48 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::~UnixStreamEndpoint()\n",
49 		find_thread(NULL), this);
50 }
51 
52 
53 status_t
54 UnixStreamEndpoint::Init()
55 {
56 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Init()\n", find_thread(NULL),
57 		this);
58 
59 	RETURN_ERROR(B_OK);
60 }
61 
62 
63 void
64 UnixStreamEndpoint::Uninit()
65 {
66 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Uninit()\n", find_thread(NULL),
67 		this);
68 
69 	// check whether we're closed
70 	UnixStreamEndpointLocker locker(this);
71 	bool closed = (fState == unix_stream_endpoint_state::Closed);
72 	locker.Unlock();
73 
74 	if (!closed) {
75 		// That probably means, we're a child endpoint of a listener and
76 		// have been fully connected, but not yet accepted. Our Close()
77 		// hook isn't called in this case. Do it manually.
78 		Close();
79 	}
80 
81 	ReleaseReference();
82 }
83 
84 
85 status_t
86 UnixStreamEndpoint::Open()
87 {
88 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Open()\n", find_thread(NULL),
89 		this);
90 
91 	status_t error = ProtocolSocket::Open();
92 	if (error != B_OK)
93 		RETURN_ERROR(error);
94 
95 	fState = unix_stream_endpoint_state::NotConnected;
96 
97 	RETURN_ERROR(B_OK);
98 }
99 
100 
101 status_t
102 UnixStreamEndpoint::Close()
103 {
104 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Close()\n", find_thread(NULL),
105 		this);
106 
107 	UnixStreamEndpointLocker locker(this);
108 
109 	if (fState == unix_stream_endpoint_state::Connected) {
110 		UnixStreamEndpointLocker peerLocker;
111 		if (_LockConnectedEndpoints(locker, peerLocker) == B_OK) {
112 			// We're still connected. Disconnect both endpoints!
113 			fPeerEndpoint->_Disconnect();
114 			_Disconnect();
115 		}
116 	}
117 
118 	if (fState == unix_stream_endpoint_state::Listening)
119 		_StopListening();
120 
121 	_Unbind();
122 
123 	fState = unix_stream_endpoint_state::Closed;
124 	RETURN_ERROR(B_OK);
125 }
126 
127 
128 status_t
129 UnixStreamEndpoint::Free()
130 {
131 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Free()\n", find_thread(NULL),
132 		this);
133 
134 	UnixStreamEndpointLocker locker(this);
135 
136 	_UnsetReceiveFifo();
137 
138 	RETURN_ERROR(B_OK);
139 }
140 
141 
142 status_t
143 UnixStreamEndpoint::Bind(const struct sockaddr* _address)
144 {
145 	if (_address->sa_family != AF_UNIX)
146 		RETURN_ERROR(EAFNOSUPPORT);
147 
148 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Bind(\"%s\")\n",
149 		find_thread(NULL), this,
150 		ConstSocketAddress(&gAddressModule, _address).AsString().Data());
151 
152 	const sockaddr_un* address = (const sockaddr_un*)_address;
153 
154 	UnixStreamEndpointLocker endpointLocker(this);
155 
156 	if (fState != unix_stream_endpoint_state::NotConnected || IsBound())
157 		RETURN_ERROR(B_BAD_VALUE);
158 
159 	RETURN_ERROR(_Bind(address));
160 }
161 
162 
163 status_t
164 UnixStreamEndpoint::Unbind()
165 {
166 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Unbind()\n", find_thread(NULL),
167 		this);
168 
169 	UnixStreamEndpointLocker endpointLocker(this);
170 
171 	RETURN_ERROR(_Unbind());
172 }
173 
174 
175 status_t
176 UnixStreamEndpoint::Listen(int backlog)
177 {
178 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Listen(%d)\n", find_thread(NULL),
179 		this, backlog);
180 
181 	UnixStreamEndpointLocker endpointLocker(this);
182 
183 	if (!IsBound())
184 		RETURN_ERROR(EDESTADDRREQ);
185 	if (fState != unix_stream_endpoint_state::NotConnected
186 		&& fState != unix_stream_endpoint_state::Listening)
187 		RETURN_ERROR(EINVAL);
188 
189 	gSocketModule->set_max_backlog(socket, backlog);
190 
191 	if (fState == unix_stream_endpoint_state::NotConnected) {
192 		fAcceptSemaphore = create_sem(0, "unix accept");
193 		if (fAcceptSemaphore < 0)
194 			RETURN_ERROR(ENOBUFS);
195 
196 		_UnsetReceiveFifo();
197 
198 		fCredentials.pid = getpid();
199 		fCredentials.uid = geteuid();
200 		fCredentials.gid = getegid();
201 
202 		fState = unix_stream_endpoint_state::Listening;
203 	}
204 
205 	RETURN_ERROR(B_OK);
206 }
207 
208 
209 status_t
210 UnixStreamEndpoint::Connect(const struct sockaddr* _address)
211 {
212 	if (_address->sa_family != AF_UNIX)
213 		RETURN_ERROR(EAFNOSUPPORT);
214 
215 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Connect(\"%s\")\n",
216 		find_thread(NULL), this,
217 		ConstSocketAddress(&gAddressModule, _address).AsString().Data());
218 
219 	const sockaddr_un* address = (const sockaddr_un*)_address;
220 
221 	UnixStreamEndpointLocker endpointLocker(this);
222 
223 	if (fState == unix_stream_endpoint_state::Connected)
224 		RETURN_ERROR(EISCONN);
225 
226 	if (fState != unix_stream_endpoint_state::NotConnected)
227 		RETURN_ERROR(B_BAD_VALUE);
228 // TODO: If listening, we could set the backlog to 0 and connect.
229 
230 	// check the address first
231 	UnixAddress unixAddress;
232 
233 	if (address->sun_path[0] == '\0') {
234 		// internal address space (or empty address)
235 		int32 internalID;
236 		if (UnixAddress::IsEmptyAddress(*address))
237 			RETURN_ERROR(B_BAD_VALUE);
238 
239 		internalID = UnixAddress::InternalID(*address);
240 		if (internalID < 0)
241 			RETURN_ERROR(internalID);
242 
243 		unixAddress.SetTo(internalID);
244 	} else {
245 		// FS address space
246 		size_t pathLen = strnlen(address->sun_path, sizeof(address->sun_path));
247 		if (pathLen == 0 || pathLen == sizeof(address->sun_path))
248 			RETURN_ERROR(B_BAD_VALUE);
249 
250 		struct stat st;
251 		status_t error = vfs_read_stat(-1, address->sun_path, true, &st,
252 			!gStackModule->is_syscall());
253 		if (error != B_OK)
254 			RETURN_ERROR(error);
255 
256 		if (!S_ISSOCK(st.st_mode))
257 			RETURN_ERROR(B_BAD_VALUE);
258 
259 		unixAddress.SetTo(st.st_dev, st.st_ino, NULL);
260 	}
261 
262 	// get the peer endpoint
263 	UnixAddressManagerLocker addressLocker(gAddressManager);
264 	UnixEndpoint* listeningUnixEndpoint = gAddressManager.Lookup(unixAddress);
265 	if (listeningUnixEndpoint == NULL)
266 		RETURN_ERROR(ECONNREFUSED);
267 	UnixStreamEndpoint* listeningEndpoint
268 		= dynamic_cast<UnixStreamEndpoint*>(listeningUnixEndpoint);
269 	if (listeningEndpoint == NULL)
270 		RETURN_ERROR(EPROTOTYPE);
271 	BReference<UnixStreamEndpoint> peerReference(listeningEndpoint);
272 	addressLocker.Unlock();
273 
274 	UnixStreamEndpointLocker peerLocker(listeningEndpoint);
275 
276 	if (!listeningEndpoint->IsBound()
277 		|| listeningEndpoint->fState != unix_stream_endpoint_state::Listening
278 		|| listeningEndpoint->fAddress != unixAddress) {
279 		RETURN_ERROR(ECONNREFUSED);
280 	}
281 
282 	// Allocate FIFOs for us and the socket we're going to spawn. We do that
283 	// now, so that the mess we need to cleanup, if allocating them fails, is
284 	// harmless.
285 	UnixFifo* fifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT, UnixFifoType::Stream);
286 	UnixFifo* peerFifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT, UnixFifoType::Stream);
287 	ObjectDeleter<UnixFifo> fifoDeleter(fifo);
288 	ObjectDeleter<UnixFifo> peerFifoDeleter(peerFifo);
289 
290 	status_t error;
291 	if ((error = fifo->Init()) != B_OK || (error = peerFifo->Init()) != B_OK)
292 		return error;
293 
294 	// spawn new endpoint for accept()
295 	net_socket* newSocket;
296 	error = gSocketModule->spawn_pending_socket(listeningEndpoint->socket,
297 		&newSocket);
298 	if (error != B_OK)
299 		RETURN_ERROR(error);
300 
301 	// init connected peer endpoint
302 	UnixStreamEndpoint* connectedEndpoint = (UnixStreamEndpoint*)newSocket->first_protocol;
303 
304 	UnixStreamEndpointLocker connectedLocker(connectedEndpoint);
305 
306 	connectedEndpoint->_Spawn(this, listeningEndpoint, peerFifo);
307 
308 	// update our attributes
309 	_UnsetReceiveFifo();
310 
311 	fPeerEndpoint = connectedEndpoint;
312 	PeerAddress().SetTo(&connectedEndpoint->socket->address);
313 	fPeerEndpoint->AcquireReference();
314 	fReceiveFifo = fifo;
315 
316 	fCredentials.pid = getpid();
317 	fCredentials.uid = geteuid();
318 	fCredentials.gid = getegid();
319 
320 	fifoDeleter.Detach();
321 	peerFifoDeleter.Detach();
322 
323 	fState = unix_stream_endpoint_state::Connected;
324 	fWasConnected = true;
325 
326 	gSocketModule->set_connected(Socket());
327 
328 	release_sem(listeningEndpoint->fAcceptSemaphore);
329 
330 	connectedLocker.Unlock();
331 	peerLocker.Unlock();
332 	endpointLocker.Unlock();
333 
334 	RETURN_ERROR(B_OK);
335 }
336 
337 
338 status_t
339 UnixStreamEndpoint::Accept(net_socket** _acceptedSocket)
340 {
341 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Accept()\n", find_thread(NULL),
342 		this);
343 
344 	bigtime_t timeout = absolute_timeout(socket->receive.timeout);
345 	if (gStackModule->is_restarted_syscall())
346 		timeout = gStackModule->restore_syscall_restart_timeout();
347 	else
348 		gStackModule->store_syscall_restart_timeout(timeout);
349 
350 	UnixStreamEndpointLocker locker(this);
351 
352 	status_t error;
353 	do {
354 		locker.Unlock();
355 
356 		error = acquire_sem_etc(fAcceptSemaphore, 1,
357 			B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
358 		if (error < B_OK)
359 			break;
360 
361 		locker.Lock();
362 		error = gSocketModule->dequeue_connected(socket, _acceptedSocket);
363 	} while (error != B_OK);
364 
365 	if (error == B_TIMED_OUT && timeout == 0) {
366 		// translate non-blocking timeouts to the correct error code
367 		error = B_WOULD_BLOCK;
368 	}
369 
370 	RETURN_ERROR(error);
371 }
372 
373 
374 ssize_t
375 UnixStreamEndpoint::Send(const iovec* vecs, size_t vecCount,
376 	ancillary_data_container* ancillaryData,
377 	const struct sockaddr* address, socklen_t addressLength, int flags)
378 {
379 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Send(%p, %ld, %p)\n",
380 		find_thread(NULL), this, vecs, vecCount, ancillaryData);
381 
382 	bigtime_t timeout = 0;
383 	if ((flags & MSG_DONTWAIT) == 0) {
384 		timeout = absolute_timeout(socket->send.timeout);
385 		if (gStackModule->is_restarted_syscall())
386 			timeout = gStackModule->restore_syscall_restart_timeout();
387 		else
388 			gStackModule->store_syscall_restart_timeout(timeout);
389 	}
390 
391 	UnixStreamEndpointLocker locker(this);
392 
393 	BReference<UnixStreamEndpoint> peerReference;
394 	UnixStreamEndpointLocker peerLocker;
395 
396 	status_t error = _LockConnectedEndpoints(locker, peerLocker);
397 	if (error != B_OK)
398 		RETURN_ERROR(error);
399 
400 	UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;
401 	peerReference.SetTo(peerEndpoint);
402 
403 	// lock the peer's FIFO
404 	UnixFifo* peerFifo = peerEndpoint->fReceiveFifo;
405 	BReference<UnixFifo> _(peerFifo);
406 	UnixFifoLocker fifoLocker(peerFifo);
407 
408 	// unlock endpoints
409 	locker.Unlock();
410 	peerLocker.Unlock();
411 
412 	ssize_t result = peerFifo->Write(vecs, vecCount, ancillaryData, NULL, timeout);
413 
414 	// Notify select()ing readers, if we successfully wrote anything.
415 	size_t readable = peerFifo->Readable();
416 	bool notifyRead = (error == B_OK && readable > 0
417 		&& !peerFifo->IsReadShutdown());
418 
419 	// Notify select()ing writers, if we failed to write anything and there's
420 	// still room to write.
421 	size_t writable = peerFifo->Writable();
422 	bool notifyWrite = (error != B_OK && writable > 0
423 		&& !peerFifo->IsWriteShutdown());
424 
425 	// re-lock our endpoint (unlock FIFO to respect locking order)
426 	fifoLocker.Unlock();
427 	locker.Lock();
428 
429 	bool peerLocked = (fPeerEndpoint == peerEndpoint
430 		&& _LockConnectedEndpoints(locker, peerLocker) == B_OK);
431 
432 	// send notifications
433 	if (peerLocked && notifyRead)
434 		gSocketModule->notify(peerEndpoint->socket, B_SELECT_READ, readable);
435 	if (notifyWrite)
436 		gSocketModule->notify(socket, B_SELECT_WRITE, writable);
437 
438 	switch (result) {
439 		case UNIX_FIFO_SHUTDOWN:
440 			if (fPeerEndpoint == peerEndpoint
441 					&& fState == unix_stream_endpoint_state::Connected) {
442 				// Orderly write shutdown on our side.
443 				// Note: Linux and Solaris also send a SIGPIPE, but according
444 				// the send() specification that shouldn't be done.
445 				result = EPIPE;
446 			} else {
447 				// The FD has been closed.
448 				result = EBADF;
449 			}
450 			break;
451 		case EPIPE:
452 			// The peer closed connection or shutdown its read side. Reward
453 			// the caller with a SIGPIPE.
454 			if (gStackModule->is_syscall())
455 				send_signal(find_thread(NULL), SIGPIPE);
456 			break;
457 		case B_TIMED_OUT:
458 			// Translate non-blocking timeouts to the correct error code.
459 			if (timeout == 0)
460 				result = B_WOULD_BLOCK;
461 			break;
462 	}
463 
464 	RETURN_ERROR(result);
465 }
466 
467 
468 ssize_t
469 UnixStreamEndpoint::Receive(const iovec* vecs, size_t vecCount,
470 	ancillary_data_container** _ancillaryData, struct sockaddr* _address,
471 	socklen_t* _addressLength, int flags)
472 {
473 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receive(%p, %ld)\n",
474 		find_thread(NULL), this, vecs, vecCount);
475 
476 	bigtime_t timeout = 0;
477 	if ((flags & MSG_DONTWAIT) == 0) {
478 		timeout = absolute_timeout(socket->receive.timeout);
479 		if (gStackModule->is_restarted_syscall())
480 			timeout = gStackModule->restore_syscall_restart_timeout();
481 		else
482 			gStackModule->store_syscall_restart_timeout(timeout);
483 	}
484 
485 	UnixStreamEndpointLocker locker(this);
486 
487 	// We can read as long as we have a FIFO. I.e. we are still connected, or
488 	// disconnected and not yet reconnected/listening/closed.
489 	if (fReceiveFifo == NULL)
490 		RETURN_ERROR(ENOTCONN);
491 
492 	UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;
493 	BReference<UnixStreamEndpoint> peerReference(peerEndpoint);
494 
495 	// Copy the peer address upfront. This way, if we read something, we don't
496 	// get into a potential race with Close().
497 	if (_address != NULL) {
498 		socklen_t addrLen = min_c(*_addressLength, socket->peer.ss_len);
499 		memcpy(_address, &socket->peer, addrLen);
500 		*_addressLength = addrLen;
501 	}
502 
503 	// lock our FIFO
504 	UnixFifo* fifo = fReceiveFifo;
505 	BReference<UnixFifo> _(fifo);
506 	UnixFifoLocker fifoLocker(fifo);
507 
508 	// unlock endpoint
509 	locker.Unlock();
510 
511 	ssize_t result = fifo->Read(vecs, vecCount, _ancillaryData, NULL, timeout);
512 
513 	// Notify select()ing writers, if we successfully read anything.
514 	size_t writable = fifo->Writable();
515 	bool notifyWrite = (result >= 0 && writable > 0
516 		&& !fifo->IsWriteShutdown());
517 
518 	// Notify select()ing readers, if we failed to read anything and there's
519 	// still something left to read.
520 	size_t readable = fifo->Readable();
521 	bool notifyRead = (result < 0 && readable > 0
522 		&& !fifo->IsReadShutdown());
523 
524 	// re-lock our endpoint (unlock FIFO to respect locking order)
525 	fifoLocker.Unlock();
526 	locker.Lock();
527 
528 	UnixStreamEndpointLocker peerLocker;
529 	bool peerLocked = (peerEndpoint != NULL && fPeerEndpoint == peerEndpoint
530 		&& _LockConnectedEndpoints(locker, peerLocker) == B_OK);
531 
532 	// send notifications
533 	if (notifyRead)
534 		gSocketModule->notify(socket, B_SELECT_READ, readable);
535 	if (peerLocked && notifyWrite)
536 		gSocketModule->notify(peerEndpoint->socket, B_SELECT_WRITE, writable);
537 
538 	switch (result) {
539 		case UNIX_FIFO_SHUTDOWN:
540 			// Either our socket was closed or read shutdown.
541 			if (fState == unix_stream_endpoint_state::Closed) {
542 				// The FD has been closed.
543 				result = EBADF;
544 			} else {
545 				// if (fReceiveFifo == fifo) {
546 				// 		Orderly shutdown or the peer closed the connection.
547 				// } else {
548 				//		Weird case: Peer closed connection and we are already
549 				// 		reconnected (or listening).
550 				// }
551 				result = 0;
552 			}
553 			break;
554 		case B_TIMED_OUT:
555 			// translate non-blocking timeouts to the correct error code
556 			if (timeout == 0)
557 				result = B_WOULD_BLOCK;
558 			break;
559 	}
560 
561 	RETURN_ERROR(result);
562 }
563 
564 
565 ssize_t
566 UnixStreamEndpoint::Sendable()
567 {
568 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Sendable()\n", find_thread(NULL),
569 		this);
570 
571 	UnixStreamEndpointLocker locker(this);
572 	UnixStreamEndpointLocker peerLocker;
573 
574 	status_t error = _LockConnectedEndpoints(locker, peerLocker);
575 	if (error != B_OK)
576 		RETURN_ERROR(error);
577 
578 	// lock the peer's FIFO
579 	UnixFifo* peerFifo = fPeerEndpoint->fReceiveFifo;
580 	UnixFifoLocker fifoLocker(peerFifo);
581 
582 	RETURN_ERROR(peerFifo->Writable());
583 }
584 
585 
586 ssize_t
587 UnixStreamEndpoint::Receivable()
588 {
589 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receivable()\n", find_thread(NULL),
590 		this);
591 
592 	UnixStreamEndpointLocker locker(this);
593 
594 	if (fState == unix_stream_endpoint_state::Listening)
595 		return gSocketModule->count_connected(socket);
596 
597 	if (fState != unix_stream_endpoint_state::Connected)
598 		RETURN_ERROR(ENOTCONN);
599 
600 	UnixFifoLocker fifoLocker(fReceiveFifo);
601 	ssize_t readable = fReceiveFifo->Readable();
602 	if (readable == 0 && (fReceiveFifo->IsWriteShutdown()
603 			|| fReceiveFifo->IsReadShutdown())) {
604 		RETURN_ERROR(ENOTCONN);
605 	}
606 	RETURN_ERROR(readable);
607 }
608 
609 
610 status_t
611 UnixStreamEndpoint::SetReceiveBufferSize(size_t size)
612 {
613 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::SetReceiveBufferSize(%lu)\n",
614 		find_thread(NULL), this, size);
615 
616 	UnixStreamEndpointLocker locker(this);
617 
618 	if (fReceiveFifo == NULL)
619 		return B_BAD_VALUE;
620 
621 	UnixFifoLocker fifoLocker(fReceiveFifo);
622 	return fReceiveFifo->SetBufferCapacity(size);
623 }
624 
625 
626 status_t
627 UnixStreamEndpoint::GetPeerCredentials(ucred* credentials)
628 {
629 	UnixStreamEndpointLocker locker(this);
630 	UnixStreamEndpointLocker peerLocker;
631 
632 	status_t error = _LockConnectedEndpoints(locker, peerLocker);
633 	if (error != B_OK)
634 		RETURN_ERROR(error);
635 
636 	*credentials = fPeerEndpoint->fCredentials;
637 
638 	return B_OK;
639 }
640 
641 
642 status_t
643 UnixStreamEndpoint::Shutdown(int direction)
644 {
645 	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Shutdown(%d)\n",
646 		find_thread(NULL), this, direction);
647 
648 	uint32 shutdown;
649 	uint32 peerShutdown;
650 
651 	// translate the direction into shutdown flags for our and the peer fifo
652 	switch (direction) {
653 		case SHUT_RD:
654 			shutdown = UNIX_FIFO_SHUTDOWN_READ;
655 			peerShutdown = 0;
656 			break;
657 		case SHUT_WR:
658 			shutdown = 0;
659 			peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
660 			break;
661 		case SHUT_RDWR:
662 			shutdown = UNIX_FIFO_SHUTDOWN_READ;
663 			peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
664 			break;
665 		default:
666 			RETURN_ERROR(B_BAD_VALUE);
667 	}
668 
669 	// lock endpoints
670 	UnixStreamEndpointLocker locker(this);
671 	UnixStreamEndpointLocker peerLocker;
672 
673 	status_t error = _LockConnectedEndpoints(locker, peerLocker);
674 	if (error != B_OK)
675 		RETURN_ERROR(error);
676 
677 	// shutdown our FIFO
678 	fReceiveFifo->Lock();
679 	fReceiveFifo->Shutdown(shutdown);
680 	fReceiveFifo->Unlock();
681 
682 	// shutdown peer FIFO
683 	fPeerEndpoint->fReceiveFifo->Lock();
684 	fPeerEndpoint->fReceiveFifo->Shutdown(peerShutdown);
685 	fPeerEndpoint->fReceiveFifo->Unlock();
686 
687 	// send select notifications
688 	if (direction == SHUT_RD || direction == SHUT_RDWR) {
689 		gSocketModule->notify(socket, B_SELECT_READ, EPIPE);
690 		gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_WRITE, EPIPE);
691 	}
692 	if (direction == SHUT_WR || direction == SHUT_RDWR) {
693 		gSocketModule->notify(socket, B_SELECT_WRITE, EPIPE);
694 		gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_READ, EPIPE);
695 	}
696 
697 	RETURN_ERROR(B_OK);
698 }
699 
700 
701 void
702 UnixStreamEndpoint::_Spawn(UnixStreamEndpoint* connectingEndpoint,
703 	UnixStreamEndpoint* listeningEndpoint, UnixFifo* fifo)
704 {
705 	ProtocolSocket::Open();
706 
707 	fIsChild = true;
708 	fPeerEndpoint = connectingEndpoint;
709 	fPeerEndpoint->AcquireReference();
710 
711 	fReceiveFifo = fifo;
712 
713 	PeerAddress().SetTo(&connectingEndpoint->socket->address);
714 
715 	fCredentials = listeningEndpoint->fCredentials;
716 
717 	fState = unix_stream_endpoint_state::Connected;
718 
719 	gSocketModule->set_connected(Socket());
720 }
721 
722 
723 void
724 UnixStreamEndpoint::_Disconnect()
725 {
726 	// Both endpoints must be locked.
727 
728 	// Write shutdown the receive FIFO.
729 	fReceiveFifo->Lock();
730 	fReceiveFifo->Shutdown(UNIX_FIFO_SHUTDOWN_WRITE);
731 	fReceiveFifo->Unlock();
732 
733 	// select() notification.
734 	gSocketModule->notify(socket, B_SELECT_READ, ECONNRESET);
735 	gSocketModule->notify(socket, B_SELECT_WRITE, ECONNRESET);
736 
737 	// Unset the peer endpoint.
738 	fPeerEndpoint->ReleaseReference();
739 	fPeerEndpoint = NULL;
740 
741 	// We're officially disconnected.
742 // TODO: Deal with non accept()ed connections correctly!
743 	fIsChild = false;
744 	fState = unix_stream_endpoint_state::NotConnected;
745 }
746 
747 
748 status_t
749 UnixStreamEndpoint::_LockConnectedEndpoints(UnixStreamEndpointLocker& locker,
750 	UnixStreamEndpointLocker& peerLocker)
751 {
752 	if (fState != unix_stream_endpoint_state::Connected)
753 		RETURN_ERROR(fWasConnected ? EPIPE : ENOTCONN);
754 
755 	// We need to lock the peer, too. Get a reference -- we might need to
756 	// unlock ourselves to get the locking order right.
757 	BReference<UnixStreamEndpoint> peerReference(fPeerEndpoint);
758 	UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;
759 
760 	if (fIsChild) {
761 		// We're the child, but locking order is the other way around.
762 		locker.Unlock();
763 		peerLocker.SetTo(peerEndpoint, false);
764 
765 		locker.Lock();
766 
767 		// recheck our state, also whether the peer is still the same
768 		if (fState != unix_stream_endpoint_state::Connected || peerEndpoint != fPeerEndpoint)
769 			RETURN_ERROR(ENOTCONN);
770 	} else
771 		peerLocker.SetTo(peerEndpoint, false);
772 
773 	RETURN_ERROR(B_OK);
774 }
775 
776 
777 status_t
778 UnixStreamEndpoint::_Unbind()
779 {
780 	if (fState == unix_stream_endpoint_state::Connected
781 		|| fState == unix_stream_endpoint_state::Listening)
782 		RETURN_ERROR(B_BAD_VALUE);
783 
784 	if (IsBound())
785 		RETURN_ERROR(UnixEndpoint::_Unbind());
786 
787 	RETURN_ERROR(B_OK);
788 }
789 
790 
791 void
792 UnixStreamEndpoint::_UnsetReceiveFifo()
793 {
794 	if (fReceiveFifo) {
795 		fReceiveFifo->ReleaseReference();
796 		fReceiveFifo = NULL;
797 	}
798 }
799 
800 
801 void
802 UnixStreamEndpoint::_StopListening()
803 {
804 	if (fState == unix_stream_endpoint_state::Listening) {
805 		delete_sem(fAcceptSemaphore);
806 		fAcceptSemaphore = -1;
807 		fState = unix_stream_endpoint_state::NotConnected;
808 	}
809 }
810