1 /*
2 * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
3 * Distributed under the terms of the MIT License.
4 */
5
6
7 #include "UnixStreamEndpoint.h"
8
9 #include <stdio.h>
10 #include <sys/stat.h>
11
12 #include <AutoDeleter.h>
13
14 #include <vfs.h>
15
16 #include "UnixAddressManager.h"
17 #include "UnixFifo.h"
18
19
20 #define UNIX_STREAM_ENDPOINT_DEBUG_LEVEL 0
21 #define UNIX_DEBUG_LEVEL UNIX_STREAM_ENDPOINT_DEBUG_LEVEL
22 #include "UnixDebug.h"
23
24
25 // Note on locking order (outermost -> innermost):
26 // UnixStreamEndpoint: connecting -> listening -> child
27 // -> UnixFifo (never lock more than one at a time)
28 // -> UnixAddressManager
29
30
UnixStreamEndpoint(net_socket * socket)31 UnixStreamEndpoint::UnixStreamEndpoint(net_socket* socket)
32 :
33 UnixEndpoint(socket),
34 fPeerEndpoint(NULL),
35 fReceiveFifo(NULL),
36 fState(unix_stream_endpoint_state::Closed),
37 fAcceptSemaphore(-1),
38 fIsChild(false),
39 fWasConnected(false)
40 {
41 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::UnixStreamEndpoint()\n",
42 find_thread(NULL), this);
43 }
44
45
~UnixStreamEndpoint()46 UnixStreamEndpoint::~UnixStreamEndpoint()
47 {
48 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::~UnixStreamEndpoint()\n",
49 find_thread(NULL), this);
50 }
51
52
53 status_t
Init()54 UnixStreamEndpoint::Init()
55 {
56 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Init()\n", find_thread(NULL),
57 this);
58
59 RETURN_ERROR(B_OK);
60 }
61
62
63 void
Uninit()64 UnixStreamEndpoint::Uninit()
65 {
66 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Uninit()\n", find_thread(NULL),
67 this);
68
69 // check whether we're closed
70 UnixStreamEndpointLocker locker(this);
71 bool closed = (fState == unix_stream_endpoint_state::Closed);
72 locker.Unlock();
73
74 if (!closed) {
75 // That probably means, we're a child endpoint of a listener and
76 // have been fully connected, but not yet accepted. Our Close()
77 // hook isn't called in this case. Do it manually.
78 Close();
79 }
80
81 ReleaseReference();
82 }
83
84
85 status_t
Open()86 UnixStreamEndpoint::Open()
87 {
88 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Open()\n", find_thread(NULL),
89 this);
90
91 status_t error = ProtocolSocket::Open();
92 if (error != B_OK)
93 RETURN_ERROR(error);
94
95 fState = unix_stream_endpoint_state::NotConnected;
96
97 RETURN_ERROR(B_OK);
98 }
99
100
101 status_t
Close()102 UnixStreamEndpoint::Close()
103 {
104 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Close()\n", find_thread(NULL),
105 this);
106
107 UnixStreamEndpointLocker locker(this);
108
109 if (fState == unix_stream_endpoint_state::Connected) {
110 BReference<UnixStreamEndpoint> peerReference;
111 UnixStreamEndpointLocker peerLocker;
112 if (_LockConnectedEndpoints(locker, peerLocker) == B_OK) {
113 // The peer may be destroyed when we release it,
114 // so grab a reference for the locker's sake.
115 peerReference.SetTo(peerLocker.Get(), false);
116
117 // We're still connected. Disconnect both endpoints!
118 fPeerEndpoint->_Disconnect();
119 _Disconnect();
120 }
121 }
122
123 if (fState == unix_stream_endpoint_state::Listening)
124 _StopListening();
125
126 _Unbind();
127
128 fState = unix_stream_endpoint_state::Closed;
129 RETURN_ERROR(B_OK);
130 }
131
132
133 status_t
Free()134 UnixStreamEndpoint::Free()
135 {
136 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Free()\n", find_thread(NULL),
137 this);
138
139 UnixStreamEndpointLocker locker(this);
140
141 _UnsetReceiveFifo();
142
143 RETURN_ERROR(B_OK);
144 }
145
146
147 status_t
Bind(const struct sockaddr * _address)148 UnixStreamEndpoint::Bind(const struct sockaddr* _address)
149 {
150 if (_address->sa_family != AF_UNIX)
151 RETURN_ERROR(EAFNOSUPPORT);
152
153 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Bind(\"%s\")\n",
154 find_thread(NULL), this,
155 ConstSocketAddress(&gAddressModule, _address).AsString().Data());
156
157 const sockaddr_un* address = (const sockaddr_un*)_address;
158
159 UnixStreamEndpointLocker endpointLocker(this);
160
161 if (fState != unix_stream_endpoint_state::NotConnected || IsBound())
162 RETURN_ERROR(B_BAD_VALUE);
163
164 RETURN_ERROR(_Bind(address));
165 }
166
167
168 status_t
Unbind()169 UnixStreamEndpoint::Unbind()
170 {
171 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Unbind()\n", find_thread(NULL),
172 this);
173
174 UnixStreamEndpointLocker endpointLocker(this);
175
176 RETURN_ERROR(_Unbind());
177 }
178
179
180 status_t
Listen(int backlog)181 UnixStreamEndpoint::Listen(int backlog)
182 {
183 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Listen(%d)\n", find_thread(NULL),
184 this, backlog);
185
186 UnixStreamEndpointLocker endpointLocker(this);
187
188 if (!IsBound())
189 RETURN_ERROR(EDESTADDRREQ);
190 if (fState != unix_stream_endpoint_state::NotConnected
191 && fState != unix_stream_endpoint_state::Listening)
192 RETURN_ERROR(EINVAL);
193
194 gSocketModule->set_max_backlog(socket, backlog);
195
196 if (fState == unix_stream_endpoint_state::NotConnected) {
197 fAcceptSemaphore = create_sem(0, "unix accept");
198 if (fAcceptSemaphore < 0)
199 RETURN_ERROR(ENOBUFS);
200
201 _UnsetReceiveFifo();
202
203 fCredentials.pid = getpid();
204 fCredentials.uid = geteuid();
205 fCredentials.gid = getegid();
206
207 fState = unix_stream_endpoint_state::Listening;
208 }
209
210 RETURN_ERROR(B_OK);
211 }
212
213
214 status_t
Connect(const struct sockaddr * _address)215 UnixStreamEndpoint::Connect(const struct sockaddr* _address)
216 {
217 if (_address->sa_family != AF_UNIX)
218 RETURN_ERROR(EAFNOSUPPORT);
219
220 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Connect(\"%s\")\n",
221 find_thread(NULL), this,
222 ConstSocketAddress(&gAddressModule, _address).AsString().Data());
223
224 const sockaddr_un* address = (const sockaddr_un*)_address;
225
226 UnixStreamEndpointLocker endpointLocker(this);
227
228 if (fState == unix_stream_endpoint_state::Connected)
229 RETURN_ERROR(EISCONN);
230
231 if (fState != unix_stream_endpoint_state::NotConnected)
232 RETURN_ERROR(B_BAD_VALUE);
233 // TODO: If listening, we could set the backlog to 0 and connect.
234
235 // check the address first
236 UnixAddress unixAddress;
237
238 if (address->sun_path[0] == '\0') {
239 // internal address space (or empty address)
240 int32 internalID;
241 if (UnixAddress::IsEmptyAddress(*address))
242 RETURN_ERROR(B_BAD_VALUE);
243
244 internalID = UnixAddress::InternalID(*address);
245 if (internalID < 0)
246 RETURN_ERROR(internalID);
247
248 unixAddress.SetTo(internalID);
249 } else {
250 // FS address space
251 size_t pathLen = strnlen(address->sun_path, sizeof(address->sun_path));
252 if (pathLen == 0 || pathLen == sizeof(address->sun_path))
253 RETURN_ERROR(B_BAD_VALUE);
254
255 struct stat st;
256 status_t error = vfs_read_stat(-1, address->sun_path, true, &st,
257 !gStackModule->is_syscall());
258 if (error != B_OK)
259 RETURN_ERROR(error);
260
261 if (!S_ISSOCK(st.st_mode))
262 RETURN_ERROR(B_BAD_VALUE);
263
264 unixAddress.SetTo(st.st_dev, st.st_ino, NULL);
265 }
266
267 // get the peer endpoint
268 UnixAddressManagerLocker addressLocker(gAddressManager);
269 UnixEndpoint* listeningUnixEndpoint = gAddressManager.Lookup(unixAddress);
270 if (listeningUnixEndpoint == NULL)
271 RETURN_ERROR(ECONNREFUSED);
272 UnixStreamEndpoint* listeningEndpoint
273 = dynamic_cast<UnixStreamEndpoint*>(listeningUnixEndpoint);
274 if (listeningEndpoint == NULL)
275 RETURN_ERROR(EPROTOTYPE);
276 BReference<UnixStreamEndpoint> peerReference(listeningEndpoint);
277 addressLocker.Unlock();
278
279 UnixStreamEndpointLocker peerLocker(listeningEndpoint);
280
281 if (!listeningEndpoint->IsBound()
282 || listeningEndpoint->fState != unix_stream_endpoint_state::Listening
283 || listeningEndpoint->fAddress != unixAddress) {
284 RETURN_ERROR(ECONNREFUSED);
285 }
286
287 // Allocate FIFOs for us and the socket we're going to spawn. We do that
288 // now, so that the mess we need to cleanup, if allocating them fails, is
289 // harmless.
290 UnixFifo* fifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT, UnixFifoType::Stream);
291 UnixFifo* peerFifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT, UnixFifoType::Stream);
292 ObjectDeleter<UnixFifo> fifoDeleter(fifo);
293 ObjectDeleter<UnixFifo> peerFifoDeleter(peerFifo);
294
295 status_t error;
296 if ((error = fifo->Init()) != B_OK || (error = peerFifo->Init()) != B_OK)
297 return error;
298
299 // spawn new endpoint for accept()
300 net_socket* newSocket;
301 error = gSocketModule->spawn_pending_socket(listeningEndpoint->socket,
302 &newSocket);
303 if (error != B_OK)
304 RETURN_ERROR(error);
305
306 // init connected peer endpoint
307 UnixStreamEndpoint* connectedEndpoint = (UnixStreamEndpoint*)newSocket->first_protocol;
308
309 UnixStreamEndpointLocker connectedLocker(connectedEndpoint);
310
311 connectedEndpoint->_Spawn(this, listeningEndpoint, peerFifo);
312
313 // update our attributes
314 _UnsetReceiveFifo();
315
316 fPeerEndpoint = connectedEndpoint;
317 PeerAddress().SetTo(&connectedEndpoint->socket->address);
318 fPeerEndpoint->AcquireReference();
319 fReceiveFifo = fifo;
320
321 fCredentials.pid = getpid();
322 fCredentials.uid = geteuid();
323 fCredentials.gid = getegid();
324
325 fifoDeleter.Detach();
326 peerFifoDeleter.Detach();
327
328 fState = unix_stream_endpoint_state::Connected;
329 fWasConnected = true;
330
331 gSocketModule->set_connected(Socket());
332
333 release_sem(listeningEndpoint->fAcceptSemaphore);
334
335 connectedLocker.Unlock();
336 peerLocker.Unlock();
337 endpointLocker.Unlock();
338
339 RETURN_ERROR(B_OK);
340 }
341
342
343 status_t
Accept(net_socket ** _acceptedSocket)344 UnixStreamEndpoint::Accept(net_socket** _acceptedSocket)
345 {
346 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Accept()\n", find_thread(NULL),
347 this);
348
349 bigtime_t timeout = absolute_timeout(socket->receive.timeout);
350 if (gStackModule->is_restarted_syscall())
351 timeout = gStackModule->restore_syscall_restart_timeout();
352 else
353 gStackModule->store_syscall_restart_timeout(timeout);
354
355 UnixStreamEndpointLocker locker(this);
356
357 status_t error;
358 do {
359 locker.Unlock();
360
361 error = acquire_sem_etc(fAcceptSemaphore, 1,
362 B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
363 if (error < B_OK)
364 break;
365
366 locker.Lock();
367 error = gSocketModule->dequeue_connected(socket, _acceptedSocket);
368 } while (error != B_OK);
369
370 if (error == B_TIMED_OUT && timeout == 0) {
371 // translate non-blocking timeouts to the correct error code
372 error = B_WOULD_BLOCK;
373 }
374
375 RETURN_ERROR(error);
376 }
377
378
379 ssize_t
Send(const iovec * vecs,size_t vecCount,ancillary_data_container * ancillaryData,const struct sockaddr * address,socklen_t addressLength,int flags)380 UnixStreamEndpoint::Send(const iovec* vecs, size_t vecCount,
381 ancillary_data_container* ancillaryData,
382 const struct sockaddr* address, socklen_t addressLength, int flags)
383 {
384 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Send(%p, %ld, %p)\n",
385 find_thread(NULL), this, vecs, vecCount, ancillaryData);
386
387 if ((flags & ~(MSG_DONTWAIT)) != 0)
388 return EOPNOTSUPP;
389
390 bigtime_t timeout = 0;
391 if ((flags & MSG_DONTWAIT) == 0) {
392 timeout = absolute_timeout(socket->send.timeout);
393 if (gStackModule->is_restarted_syscall())
394 timeout = gStackModule->restore_syscall_restart_timeout();
395 else
396 gStackModule->store_syscall_restart_timeout(timeout);
397 }
398
399 UnixStreamEndpointLocker locker(this);
400
401 BReference<UnixStreamEndpoint> peerReference;
402 UnixStreamEndpointLocker peerLocker;
403
404 status_t error = _LockConnectedEndpoints(locker, peerLocker);
405 if (error != B_OK)
406 RETURN_ERROR(error);
407
408 UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;
409 peerReference.SetTo(peerEndpoint);
410
411 // lock the peer's FIFO
412 UnixFifo* peerFifo = peerEndpoint->fReceiveFifo;
413 BReference<UnixFifo> _(peerFifo);
414 UnixFifoLocker fifoLocker(peerFifo);
415
416 // unlock endpoints
417 locker.Unlock();
418 peerLocker.Unlock();
419
420 ssize_t result = peerFifo->Write(vecs, vecCount, ancillaryData, NULL, timeout);
421
422 // Notify select()ing readers, if we successfully wrote anything.
423 size_t readable = peerFifo->Readable();
424 bool notifyRead = (error == B_OK && readable > 0
425 && !peerFifo->IsReadShutdown());
426
427 // Notify select()ing writers, if we failed to write anything and there's
428 // still room to write.
429 size_t writable = peerFifo->Writable();
430 bool notifyWrite = (error != B_OK && writable > 0
431 && !peerFifo->IsWriteShutdown());
432
433 // re-lock our endpoint (unlock FIFO to respect locking order)
434 fifoLocker.Unlock();
435 locker.Lock();
436
437 bool peerLocked = (fPeerEndpoint == peerEndpoint
438 && _LockConnectedEndpoints(locker, peerLocker) == B_OK);
439
440 // send notifications
441 if (peerLocked && notifyRead)
442 gSocketModule->notify(peerEndpoint->socket, B_SELECT_READ, readable);
443 if (notifyWrite)
444 gSocketModule->notify(socket, B_SELECT_WRITE, writable);
445
446 switch (result) {
447 case UNIX_FIFO_SHUTDOWN:
448 if (fPeerEndpoint == peerEndpoint
449 && fState == unix_stream_endpoint_state::Connected) {
450 // Orderly write shutdown on our side.
451 // Note: Linux and Solaris also send a SIGPIPE, but according
452 // the send() specification that shouldn't be done.
453 result = EPIPE;
454 } else {
455 // The FD has been closed.
456 result = EBADF;
457 }
458 break;
459 case EPIPE:
460 // The socket module will generate SIGPIPE for us, if necessary.
461 break;
462 case B_TIMED_OUT:
463 // Translate non-blocking timeouts to the correct error code.
464 if (timeout == 0)
465 result = B_WOULD_BLOCK;
466 break;
467 }
468
469 RETURN_ERROR(result);
470 }
471
472
473 ssize_t
Receive(const iovec * vecs,size_t vecCount,ancillary_data_container ** _ancillaryData,struct sockaddr * _address,socklen_t * _addressLength,int flags)474 UnixStreamEndpoint::Receive(const iovec* vecs, size_t vecCount,
475 ancillary_data_container** _ancillaryData, struct sockaddr* _address,
476 socklen_t* _addressLength, int flags)
477 {
478 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receive(%p, %ld)\n",
479 find_thread(NULL), this, vecs, vecCount);
480
481 if ((flags & ~(MSG_DONTWAIT)) != 0)
482 return EOPNOTSUPP;
483
484 bigtime_t timeout = 0;
485 if ((flags & MSG_DONTWAIT) == 0) {
486 timeout = absolute_timeout(socket->receive.timeout);
487 if (gStackModule->is_restarted_syscall())
488 timeout = gStackModule->restore_syscall_restart_timeout();
489 else
490 gStackModule->store_syscall_restart_timeout(timeout);
491 }
492
493 UnixStreamEndpointLocker locker(this);
494
495 // We can read as long as we have a FIFO. I.e. we are still connected, or
496 // disconnected and not yet reconnected/listening/closed.
497 if (fReceiveFifo == NULL)
498 RETURN_ERROR(ENOTCONN);
499
500 UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;
501 BReference<UnixStreamEndpoint> peerReference(peerEndpoint);
502
503 // Copy the peer address upfront. This way, if we read something, we don't
504 // get into a potential race with Close().
505 if (_address != NULL) {
506 socklen_t addrLen = min_c(*_addressLength, socket->peer.ss_len);
507 memcpy(_address, &socket->peer, addrLen);
508 *_addressLength = addrLen;
509 }
510
511 // lock our FIFO
512 UnixFifo* fifo = fReceiveFifo;
513 BReference<UnixFifo> _(fifo);
514 UnixFifoLocker fifoLocker(fifo);
515
516 // unlock endpoint
517 locker.Unlock();
518
519 ssize_t result = fifo->Read(vecs, vecCount, _ancillaryData, NULL, timeout);
520
521 // Notify select()ing writers, if we successfully read anything.
522 size_t writable = fifo->Writable();
523 bool notifyWrite = (result >= 0 && writable > 0
524 && !fifo->IsWriteShutdown());
525
526 // Notify select()ing readers, if we failed to read anything and there's
527 // still something left to read.
528 size_t readable = fifo->Readable();
529 bool notifyRead = (result < 0 && readable > 0
530 && !fifo->IsReadShutdown());
531
532 // re-lock our endpoint (unlock FIFO to respect locking order)
533 fifoLocker.Unlock();
534 locker.Lock();
535
536 UnixStreamEndpointLocker peerLocker;
537 bool peerLocked = (peerEndpoint != NULL && fPeerEndpoint == peerEndpoint
538 && _LockConnectedEndpoints(locker, peerLocker) == B_OK);
539
540 // send notifications
541 if (notifyRead)
542 gSocketModule->notify(socket, B_SELECT_READ, readable);
543 if (peerLocked && notifyWrite)
544 gSocketModule->notify(peerEndpoint->socket, B_SELECT_WRITE, writable);
545
546 switch (result) {
547 case UNIX_FIFO_SHUTDOWN:
548 // Either our socket was closed or read shutdown.
549 if (fState == unix_stream_endpoint_state::Closed) {
550 // The FD has been closed.
551 result = EBADF;
552 } else {
553 // if (fReceiveFifo == fifo) {
554 // Orderly shutdown or the peer closed the connection.
555 // } else {
556 // Weird case: Peer closed connection and we are already
557 // reconnected (or listening).
558 // }
559 result = 0;
560 }
561 break;
562 case B_TIMED_OUT:
563 // translate non-blocking timeouts to the correct error code
564 if (timeout == 0)
565 result = B_WOULD_BLOCK;
566 break;
567 }
568
569 RETURN_ERROR(result);
570 }
571
572
573 ssize_t
Sendable()574 UnixStreamEndpoint::Sendable()
575 {
576 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Sendable()\n", find_thread(NULL),
577 this);
578
579 UnixStreamEndpointLocker locker(this);
580 UnixStreamEndpointLocker peerLocker;
581
582 status_t error = _LockConnectedEndpoints(locker, peerLocker);
583 if (error != B_OK)
584 RETURN_ERROR(error);
585
586 // lock the peer's FIFO
587 UnixFifo* peerFifo = fPeerEndpoint->fReceiveFifo;
588 UnixFifoLocker fifoLocker(peerFifo);
589
590 RETURN_ERROR(peerFifo->Writable());
591 }
592
593
594 ssize_t
Receivable()595 UnixStreamEndpoint::Receivable()
596 {
597 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receivable()\n", find_thread(NULL),
598 this);
599
600 UnixStreamEndpointLocker locker(this);
601
602 if (fState == unix_stream_endpoint_state::Listening)
603 return gSocketModule->count_connected(socket);
604
605 if (fState != unix_stream_endpoint_state::Connected)
606 RETURN_ERROR(ENOTCONN);
607
608 UnixFifoLocker fifoLocker(fReceiveFifo);
609 ssize_t readable = fReceiveFifo->Readable();
610 if (readable == 0 && (fReceiveFifo->IsWriteShutdown()
611 || fReceiveFifo->IsReadShutdown())) {
612 RETURN_ERROR(ENOTCONN);
613 }
614 RETURN_ERROR(readable);
615 }
616
617
618 status_t
SetReceiveBufferSize(size_t size)619 UnixStreamEndpoint::SetReceiveBufferSize(size_t size)
620 {
621 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::SetReceiveBufferSize(%lu)\n",
622 find_thread(NULL), this, size);
623
624 UnixStreamEndpointLocker locker(this);
625
626 if (fReceiveFifo == NULL)
627 return B_BAD_VALUE;
628
629 UnixFifoLocker fifoLocker(fReceiveFifo);
630 return fReceiveFifo->SetBufferCapacity(size);
631 }
632
633
634 status_t
GetPeerCredentials(ucred * credentials)635 UnixStreamEndpoint::GetPeerCredentials(ucred* credentials)
636 {
637 UnixStreamEndpointLocker locker(this);
638 UnixStreamEndpointLocker peerLocker;
639
640 status_t error = _LockConnectedEndpoints(locker, peerLocker);
641 if (error != B_OK)
642 RETURN_ERROR(error);
643
644 *credentials = fPeerEndpoint->fCredentials;
645
646 return B_OK;
647 }
648
649
650 status_t
Shutdown(int direction)651 UnixStreamEndpoint::Shutdown(int direction)
652 {
653 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Shutdown(%d)\n",
654 find_thread(NULL), this, direction);
655
656 uint32 shutdown;
657 uint32 peerShutdown;
658
659 // translate the direction into shutdown flags for our and the peer fifo
660 switch (direction) {
661 case SHUT_RD:
662 shutdown = UNIX_FIFO_SHUTDOWN_READ;
663 peerShutdown = 0;
664 break;
665 case SHUT_WR:
666 shutdown = 0;
667 peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
668 break;
669 case SHUT_RDWR:
670 shutdown = UNIX_FIFO_SHUTDOWN_READ;
671 peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
672 break;
673 default:
674 RETURN_ERROR(B_BAD_VALUE);
675 }
676
677 // lock endpoints
678 UnixStreamEndpointLocker locker(this);
679 UnixStreamEndpointLocker peerLocker;
680
681 status_t error = _LockConnectedEndpoints(locker, peerLocker);
682 if (error != B_OK)
683 RETURN_ERROR(error);
684
685 // shutdown our FIFO
686 fReceiveFifo->Lock();
687 fReceiveFifo->Shutdown(shutdown);
688 fReceiveFifo->Unlock();
689
690 // shutdown peer FIFO
691 fPeerEndpoint->fReceiveFifo->Lock();
692 fPeerEndpoint->fReceiveFifo->Shutdown(peerShutdown);
693 fPeerEndpoint->fReceiveFifo->Unlock();
694
695 // send select notifications
696 if (direction == SHUT_RD || direction == SHUT_RDWR) {
697 gSocketModule->notify(socket, B_SELECT_READ, EPIPE);
698 gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_WRITE, EPIPE);
699 }
700 if (direction == SHUT_WR || direction == SHUT_RDWR) {
701 gSocketModule->notify(socket, B_SELECT_WRITE, EPIPE);
702 gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_READ, EPIPE);
703 }
704
705 RETURN_ERROR(B_OK);
706 }
707
708
709 void
_Spawn(UnixStreamEndpoint * connectingEndpoint,UnixStreamEndpoint * listeningEndpoint,UnixFifo * fifo)710 UnixStreamEndpoint::_Spawn(UnixStreamEndpoint* connectingEndpoint,
711 UnixStreamEndpoint* listeningEndpoint, UnixFifo* fifo)
712 {
713 ProtocolSocket::Open();
714
715 fIsChild = true;
716 fPeerEndpoint = connectingEndpoint;
717 fPeerEndpoint->AcquireReference();
718
719 fReceiveFifo = fifo;
720
721 PeerAddress().SetTo(&connectingEndpoint->socket->address);
722
723 fCredentials = listeningEndpoint->fCredentials;
724
725 fState = unix_stream_endpoint_state::Connected;
726
727 gSocketModule->set_connected(Socket());
728 }
729
730
731 void
_Disconnect()732 UnixStreamEndpoint::_Disconnect()
733 {
734 // Both endpoints must be locked.
735
736 // Write shutdown the receive FIFO.
737 fReceiveFifo->Lock();
738 fReceiveFifo->Shutdown(UNIX_FIFO_SHUTDOWN_WRITE);
739 fReceiveFifo->Unlock();
740
741 // select() notification.
742 gSocketModule->notify(socket, B_SELECT_READ, ECONNRESET);
743 gSocketModule->notify(socket, B_SELECT_WRITE, ECONNRESET);
744
745 // Unset the peer endpoint.
746 fPeerEndpoint->ReleaseReference();
747 fPeerEndpoint = NULL;
748
749 // We're officially disconnected.
750 // TODO: Deal with non accept()ed connections correctly!
751 fIsChild = false;
752 fState = unix_stream_endpoint_state::NotConnected;
753 }
754
755
756 status_t
_LockConnectedEndpoints(UnixStreamEndpointLocker & locker,UnixStreamEndpointLocker & peerLocker)757 UnixStreamEndpoint::_LockConnectedEndpoints(UnixStreamEndpointLocker& locker,
758 UnixStreamEndpointLocker& peerLocker)
759 {
760 if (fState != unix_stream_endpoint_state::Connected)
761 RETURN_ERROR(fWasConnected ? EPIPE : ENOTCONN);
762
763 // We need to lock the peer, too. Get a reference -- we might need to
764 // unlock ourselves to get the locking order right.
765 BReference<UnixStreamEndpoint> peerReference(fPeerEndpoint);
766 UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;
767
768 if (fIsChild) {
769 // We're the child, but locking order is the other way around.
770 locker.Unlock();
771 peerLocker.SetTo(peerEndpoint, false);
772 locker.Lock();
773
774 // recheck our state, also whether the peer is still the same
775 if (fState != unix_stream_endpoint_state::Connected || peerEndpoint != fPeerEndpoint) {
776 peerLocker.Unset();
777 RETURN_ERROR(ENOTCONN);
778 }
779 } else
780 peerLocker.SetTo(peerEndpoint, false);
781
782 RETURN_ERROR(B_OK);
783 }
784
785
786 status_t
_Unbind()787 UnixStreamEndpoint::_Unbind()
788 {
789 if (fState == unix_stream_endpoint_state::Connected
790 || fState == unix_stream_endpoint_state::Listening)
791 RETURN_ERROR(B_BAD_VALUE);
792
793 if (IsBound())
794 RETURN_ERROR(UnixEndpoint::_Unbind());
795
796 RETURN_ERROR(B_OK);
797 }
798
799
800 void
_UnsetReceiveFifo()801 UnixStreamEndpoint::_UnsetReceiveFifo()
802 {
803 if (fReceiveFifo) {
804 fReceiveFifo->ReleaseReference();
805 fReceiveFifo = NULL;
806 }
807 }
808
809
810 void
_StopListening()811 UnixStreamEndpoint::_StopListening()
812 {
813 if (fState == unix_stream_endpoint_state::Listening) {
814 delete_sem(fAcceptSemaphore);
815 fAcceptSemaphore = -1;
816 fState = unix_stream_endpoint_state::NotConnected;
817 }
818 }
819