1 /* 2 * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de. 3 * Distributed under the terms of the MIT License. 4 */ 5 6 7 #include "UnixStreamEndpoint.h" 8 9 #include <stdio.h> 10 #include <sys/stat.h> 11 12 #include <AutoDeleter.h> 13 14 #include <vfs.h> 15 16 #include "UnixAddressManager.h" 17 #include "UnixFifo.h" 18 19 20 #define UNIX_STREAM_ENDPOINT_DEBUG_LEVEL 0 21 #define UNIX_DEBUG_LEVEL UNIX_STREAM_ENDPOINT_DEBUG_LEVEL 22 #include "UnixDebug.h" 23 24 25 // Note on locking order (outermost -> innermost): 26 // UnixStreamEndpoint: connecting -> listening -> child 27 // -> UnixFifo (never lock more than one at a time) 28 // -> UnixAddressManager 29 30 31 UnixStreamEndpoint::UnixStreamEndpoint(net_socket* socket) 32 : 33 UnixEndpoint(socket), 34 fPeerEndpoint(NULL), 35 fReceiveFifo(NULL), 36 fState(unix_stream_endpoint_state::Closed), 37 fAcceptSemaphore(-1), 38 fIsChild(false), 39 fWasConnected(false) 40 { 41 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::UnixStreamEndpoint()\n", 42 find_thread(NULL), this); 43 } 44 45 46 UnixStreamEndpoint::~UnixStreamEndpoint() 47 { 48 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::~UnixStreamEndpoint()\n", 49 find_thread(NULL), this); 50 } 51 52 53 status_t 54 UnixStreamEndpoint::Init() 55 { 56 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Init()\n", find_thread(NULL), 57 this); 58 59 RETURN_ERROR(B_OK); 60 } 61 62 63 void 64 UnixStreamEndpoint::Uninit() 65 { 66 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Uninit()\n", find_thread(NULL), 67 this); 68 69 // check whether we're closed 70 UnixStreamEndpointLocker locker(this); 71 bool closed = (fState == unix_stream_endpoint_state::Closed); 72 locker.Unlock(); 73 74 if (!closed) { 75 // That probably means, we're a child endpoint of a listener and 76 // have been fully connected, but not yet accepted. Our Close() 77 // hook isn't called in this case. Do it manually. 78 Close(); 79 } 80 81 ReleaseReference(); 82 } 83 84 85 status_t 86 UnixStreamEndpoint::Open() 87 { 88 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Open()\n", find_thread(NULL), 89 this); 90 91 status_t error = ProtocolSocket::Open(); 92 if (error != B_OK) 93 RETURN_ERROR(error); 94 95 fState = unix_stream_endpoint_state::NotConnected; 96 97 RETURN_ERROR(B_OK); 98 } 99 100 101 status_t 102 UnixStreamEndpoint::Close() 103 { 104 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Close()\n", find_thread(NULL), 105 this); 106 107 UnixStreamEndpointLocker locker(this); 108 109 if (fState == unix_stream_endpoint_state::Connected) { 110 BReference<UnixStreamEndpoint> peerReference; 111 UnixStreamEndpointLocker peerLocker; 112 if (_LockConnectedEndpoints(locker, peerLocker) == B_OK) { 113 // The peer may be destroyed when we release it, 114 // so grab a reference for the locker's sake. 115 peerReference.SetTo(peerLocker.Get(), false); 116 117 // We're still connected. Disconnect both endpoints! 118 fPeerEndpoint->_Disconnect(); 119 _Disconnect(); 120 } 121 } 122 123 if (fState == unix_stream_endpoint_state::Listening) 124 _StopListening(); 125 126 _Unbind(); 127 128 fState = unix_stream_endpoint_state::Closed; 129 RETURN_ERROR(B_OK); 130 } 131 132 133 status_t 134 UnixStreamEndpoint::Free() 135 { 136 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Free()\n", find_thread(NULL), 137 this); 138 139 UnixStreamEndpointLocker locker(this); 140 141 _UnsetReceiveFifo(); 142 143 RETURN_ERROR(B_OK); 144 } 145 146 147 status_t 148 UnixStreamEndpoint::Bind(const struct sockaddr* _address) 149 { 150 if (_address->sa_family != AF_UNIX) 151 RETURN_ERROR(EAFNOSUPPORT); 152 153 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Bind(\"%s\")\n", 154 find_thread(NULL), this, 155 ConstSocketAddress(&gAddressModule, _address).AsString().Data()); 156 157 const sockaddr_un* address = (const sockaddr_un*)_address; 158 159 UnixStreamEndpointLocker endpointLocker(this); 160 161 if (fState != unix_stream_endpoint_state::NotConnected || IsBound()) 162 RETURN_ERROR(B_BAD_VALUE); 163 164 RETURN_ERROR(_Bind(address)); 165 } 166 167 168 status_t 169 UnixStreamEndpoint::Unbind() 170 { 171 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Unbind()\n", find_thread(NULL), 172 this); 173 174 UnixStreamEndpointLocker endpointLocker(this); 175 176 RETURN_ERROR(_Unbind()); 177 } 178 179 180 status_t 181 UnixStreamEndpoint::Listen(int backlog) 182 { 183 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Listen(%d)\n", find_thread(NULL), 184 this, backlog); 185 186 UnixStreamEndpointLocker endpointLocker(this); 187 188 if (!IsBound()) 189 RETURN_ERROR(EDESTADDRREQ); 190 if (fState != unix_stream_endpoint_state::NotConnected 191 && fState != unix_stream_endpoint_state::Listening) 192 RETURN_ERROR(EINVAL); 193 194 gSocketModule->set_max_backlog(socket, backlog); 195 196 if (fState == unix_stream_endpoint_state::NotConnected) { 197 fAcceptSemaphore = create_sem(0, "unix accept"); 198 if (fAcceptSemaphore < 0) 199 RETURN_ERROR(ENOBUFS); 200 201 _UnsetReceiveFifo(); 202 203 fCredentials.pid = getpid(); 204 fCredentials.uid = geteuid(); 205 fCredentials.gid = getegid(); 206 207 fState = unix_stream_endpoint_state::Listening; 208 } 209 210 RETURN_ERROR(B_OK); 211 } 212 213 214 status_t 215 UnixStreamEndpoint::Connect(const struct sockaddr* _address) 216 { 217 if (_address->sa_family != AF_UNIX) 218 RETURN_ERROR(EAFNOSUPPORT); 219 220 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Connect(\"%s\")\n", 221 find_thread(NULL), this, 222 ConstSocketAddress(&gAddressModule, _address).AsString().Data()); 223 224 const sockaddr_un* address = (const sockaddr_un*)_address; 225 226 UnixStreamEndpointLocker endpointLocker(this); 227 228 if (fState == unix_stream_endpoint_state::Connected) 229 RETURN_ERROR(EISCONN); 230 231 if (fState != unix_stream_endpoint_state::NotConnected) 232 RETURN_ERROR(B_BAD_VALUE); 233 // TODO: If listening, we could set the backlog to 0 and connect. 234 235 // check the address first 236 UnixAddress unixAddress; 237 238 if (address->sun_path[0] == '\0') { 239 // internal address space (or empty address) 240 int32 internalID; 241 if (UnixAddress::IsEmptyAddress(*address)) 242 RETURN_ERROR(B_BAD_VALUE); 243 244 internalID = UnixAddress::InternalID(*address); 245 if (internalID < 0) 246 RETURN_ERROR(internalID); 247 248 unixAddress.SetTo(internalID); 249 } else { 250 // FS address space 251 size_t pathLen = strnlen(address->sun_path, sizeof(address->sun_path)); 252 if (pathLen == 0 || pathLen == sizeof(address->sun_path)) 253 RETURN_ERROR(B_BAD_VALUE); 254 255 struct stat st; 256 status_t error = vfs_read_stat(-1, address->sun_path, true, &st, 257 !gStackModule->is_syscall()); 258 if (error != B_OK) 259 RETURN_ERROR(error); 260 261 if (!S_ISSOCK(st.st_mode)) 262 RETURN_ERROR(B_BAD_VALUE); 263 264 unixAddress.SetTo(st.st_dev, st.st_ino, NULL); 265 } 266 267 // get the peer endpoint 268 UnixAddressManagerLocker addressLocker(gAddressManager); 269 UnixEndpoint* listeningUnixEndpoint = gAddressManager.Lookup(unixAddress); 270 if (listeningUnixEndpoint == NULL) 271 RETURN_ERROR(ECONNREFUSED); 272 UnixStreamEndpoint* listeningEndpoint 273 = dynamic_cast<UnixStreamEndpoint*>(listeningUnixEndpoint); 274 if (listeningEndpoint == NULL) 275 RETURN_ERROR(EPROTOTYPE); 276 BReference<UnixStreamEndpoint> peerReference(listeningEndpoint); 277 addressLocker.Unlock(); 278 279 UnixStreamEndpointLocker peerLocker(listeningEndpoint); 280 281 if (!listeningEndpoint->IsBound() 282 || listeningEndpoint->fState != unix_stream_endpoint_state::Listening 283 || listeningEndpoint->fAddress != unixAddress) { 284 RETURN_ERROR(ECONNREFUSED); 285 } 286 287 // Allocate FIFOs for us and the socket we're going to spawn. We do that 288 // now, so that the mess we need to cleanup, if allocating them fails, is 289 // harmless. 290 UnixFifo* fifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT, UnixFifoType::Stream); 291 UnixFifo* peerFifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT, UnixFifoType::Stream); 292 ObjectDeleter<UnixFifo> fifoDeleter(fifo); 293 ObjectDeleter<UnixFifo> peerFifoDeleter(peerFifo); 294 295 status_t error; 296 if ((error = fifo->Init()) != B_OK || (error = peerFifo->Init()) != B_OK) 297 return error; 298 299 // spawn new endpoint for accept() 300 net_socket* newSocket; 301 error = gSocketModule->spawn_pending_socket(listeningEndpoint->socket, 302 &newSocket); 303 if (error != B_OK) 304 RETURN_ERROR(error); 305 306 // init connected peer endpoint 307 UnixStreamEndpoint* connectedEndpoint = (UnixStreamEndpoint*)newSocket->first_protocol; 308 309 UnixStreamEndpointLocker connectedLocker(connectedEndpoint); 310 311 connectedEndpoint->_Spawn(this, listeningEndpoint, peerFifo); 312 313 // update our attributes 314 _UnsetReceiveFifo(); 315 316 fPeerEndpoint = connectedEndpoint; 317 PeerAddress().SetTo(&connectedEndpoint->socket->address); 318 fPeerEndpoint->AcquireReference(); 319 fReceiveFifo = fifo; 320 321 fCredentials.pid = getpid(); 322 fCredentials.uid = geteuid(); 323 fCredentials.gid = getegid(); 324 325 fifoDeleter.Detach(); 326 peerFifoDeleter.Detach(); 327 328 fState = unix_stream_endpoint_state::Connected; 329 fWasConnected = true; 330 331 gSocketModule->set_connected(Socket()); 332 333 release_sem(listeningEndpoint->fAcceptSemaphore); 334 335 connectedLocker.Unlock(); 336 peerLocker.Unlock(); 337 endpointLocker.Unlock(); 338 339 RETURN_ERROR(B_OK); 340 } 341 342 343 status_t 344 UnixStreamEndpoint::Accept(net_socket** _acceptedSocket) 345 { 346 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Accept()\n", find_thread(NULL), 347 this); 348 349 bigtime_t timeout = absolute_timeout(socket->receive.timeout); 350 if (gStackModule->is_restarted_syscall()) 351 timeout = gStackModule->restore_syscall_restart_timeout(); 352 else 353 gStackModule->store_syscall_restart_timeout(timeout); 354 355 UnixStreamEndpointLocker locker(this); 356 357 status_t error; 358 do { 359 locker.Unlock(); 360 361 error = acquire_sem_etc(fAcceptSemaphore, 1, 362 B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout); 363 if (error < B_OK) 364 break; 365 366 locker.Lock(); 367 error = gSocketModule->dequeue_connected(socket, _acceptedSocket); 368 } while (error != B_OK); 369 370 if (error == B_TIMED_OUT && timeout == 0) { 371 // translate non-blocking timeouts to the correct error code 372 error = B_WOULD_BLOCK; 373 } 374 375 RETURN_ERROR(error); 376 } 377 378 379 ssize_t 380 UnixStreamEndpoint::Send(const iovec* vecs, size_t vecCount, 381 ancillary_data_container* ancillaryData, 382 const struct sockaddr* address, socklen_t addressLength, int flags) 383 { 384 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Send(%p, %ld, %p)\n", 385 find_thread(NULL), this, vecs, vecCount, ancillaryData); 386 387 if ((flags & ~(MSG_DONTWAIT)) != 0) 388 return EOPNOTSUPP; 389 390 bigtime_t timeout = 0; 391 if ((flags & MSG_DONTWAIT) == 0) { 392 timeout = absolute_timeout(socket->send.timeout); 393 if (gStackModule->is_restarted_syscall()) 394 timeout = gStackModule->restore_syscall_restart_timeout(); 395 else 396 gStackModule->store_syscall_restart_timeout(timeout); 397 } 398 399 UnixStreamEndpointLocker locker(this); 400 401 BReference<UnixStreamEndpoint> peerReference; 402 UnixStreamEndpointLocker peerLocker; 403 404 status_t error = _LockConnectedEndpoints(locker, peerLocker); 405 if (error != B_OK) 406 RETURN_ERROR(error); 407 408 UnixStreamEndpoint* peerEndpoint = fPeerEndpoint; 409 peerReference.SetTo(peerEndpoint); 410 411 // lock the peer's FIFO 412 UnixFifo* peerFifo = peerEndpoint->fReceiveFifo; 413 BReference<UnixFifo> _(peerFifo); 414 UnixFifoLocker fifoLocker(peerFifo); 415 416 // unlock endpoints 417 locker.Unlock(); 418 peerLocker.Unlock(); 419 420 ssize_t result = peerFifo->Write(vecs, vecCount, ancillaryData, NULL, timeout); 421 422 // Notify select()ing readers, if we successfully wrote anything. 423 size_t readable = peerFifo->Readable(); 424 bool notifyRead = (error == B_OK && readable > 0 425 && !peerFifo->IsReadShutdown()); 426 427 // Notify select()ing writers, if we failed to write anything and there's 428 // still room to write. 429 size_t writable = peerFifo->Writable(); 430 bool notifyWrite = (error != B_OK && writable > 0 431 && !peerFifo->IsWriteShutdown()); 432 433 // re-lock our endpoint (unlock FIFO to respect locking order) 434 fifoLocker.Unlock(); 435 locker.Lock(); 436 437 bool peerLocked = (fPeerEndpoint == peerEndpoint 438 && _LockConnectedEndpoints(locker, peerLocker) == B_OK); 439 440 // send notifications 441 if (peerLocked && notifyRead) 442 gSocketModule->notify(peerEndpoint->socket, B_SELECT_READ, readable); 443 if (notifyWrite) 444 gSocketModule->notify(socket, B_SELECT_WRITE, writable); 445 446 switch (result) { 447 case UNIX_FIFO_SHUTDOWN: 448 if (fPeerEndpoint == peerEndpoint 449 && fState == unix_stream_endpoint_state::Connected) { 450 // Orderly write shutdown on our side. 451 // Note: Linux and Solaris also send a SIGPIPE, but according 452 // the send() specification that shouldn't be done. 453 result = EPIPE; 454 } else { 455 // The FD has been closed. 456 result = EBADF; 457 } 458 break; 459 case EPIPE: 460 // The socket module will generate SIGPIPE for us, if necessary. 461 break; 462 case B_TIMED_OUT: 463 // Translate non-blocking timeouts to the correct error code. 464 if (timeout == 0) 465 result = B_WOULD_BLOCK; 466 break; 467 } 468 469 RETURN_ERROR(result); 470 } 471 472 473 ssize_t 474 UnixStreamEndpoint::Receive(const iovec* vecs, size_t vecCount, 475 ancillary_data_container** _ancillaryData, struct sockaddr* _address, 476 socklen_t* _addressLength, int flags) 477 { 478 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receive(%p, %ld)\n", 479 find_thread(NULL), this, vecs, vecCount); 480 481 if ((flags & ~(MSG_DONTWAIT)) != 0) 482 return EOPNOTSUPP; 483 484 bigtime_t timeout = 0; 485 if ((flags & MSG_DONTWAIT) == 0) { 486 timeout = absolute_timeout(socket->receive.timeout); 487 if (gStackModule->is_restarted_syscall()) 488 timeout = gStackModule->restore_syscall_restart_timeout(); 489 else 490 gStackModule->store_syscall_restart_timeout(timeout); 491 } 492 493 UnixStreamEndpointLocker locker(this); 494 495 // We can read as long as we have a FIFO. I.e. we are still connected, or 496 // disconnected and not yet reconnected/listening/closed. 497 if (fReceiveFifo == NULL) 498 RETURN_ERROR(ENOTCONN); 499 500 UnixStreamEndpoint* peerEndpoint = fPeerEndpoint; 501 BReference<UnixStreamEndpoint> peerReference(peerEndpoint); 502 503 // Copy the peer address upfront. This way, if we read something, we don't 504 // get into a potential race with Close(). 505 if (_address != NULL) { 506 socklen_t addrLen = min_c(*_addressLength, socket->peer.ss_len); 507 memcpy(_address, &socket->peer, addrLen); 508 *_addressLength = addrLen; 509 } 510 511 // lock our FIFO 512 UnixFifo* fifo = fReceiveFifo; 513 BReference<UnixFifo> _(fifo); 514 UnixFifoLocker fifoLocker(fifo); 515 516 // unlock endpoint 517 locker.Unlock(); 518 519 ssize_t result = fifo->Read(vecs, vecCount, _ancillaryData, NULL, timeout); 520 521 // Notify select()ing writers, if we successfully read anything. 522 size_t writable = fifo->Writable(); 523 bool notifyWrite = (result >= 0 && writable > 0 524 && !fifo->IsWriteShutdown()); 525 526 // Notify select()ing readers, if we failed to read anything and there's 527 // still something left to read. 528 size_t readable = fifo->Readable(); 529 bool notifyRead = (result < 0 && readable > 0 530 && !fifo->IsReadShutdown()); 531 532 // re-lock our endpoint (unlock FIFO to respect locking order) 533 fifoLocker.Unlock(); 534 locker.Lock(); 535 536 UnixStreamEndpointLocker peerLocker; 537 bool peerLocked = (peerEndpoint != NULL && fPeerEndpoint == peerEndpoint 538 && _LockConnectedEndpoints(locker, peerLocker) == B_OK); 539 540 // send notifications 541 if (notifyRead) 542 gSocketModule->notify(socket, B_SELECT_READ, readable); 543 if (peerLocked && notifyWrite) 544 gSocketModule->notify(peerEndpoint->socket, B_SELECT_WRITE, writable); 545 546 switch (result) { 547 case UNIX_FIFO_SHUTDOWN: 548 // Either our socket was closed or read shutdown. 549 if (fState == unix_stream_endpoint_state::Closed) { 550 // The FD has been closed. 551 result = EBADF; 552 } else { 553 // if (fReceiveFifo == fifo) { 554 // Orderly shutdown or the peer closed the connection. 555 // } else { 556 // Weird case: Peer closed connection and we are already 557 // reconnected (or listening). 558 // } 559 result = 0; 560 } 561 break; 562 case B_TIMED_OUT: 563 // translate non-blocking timeouts to the correct error code 564 if (timeout == 0) 565 result = B_WOULD_BLOCK; 566 break; 567 } 568 569 RETURN_ERROR(result); 570 } 571 572 573 ssize_t 574 UnixStreamEndpoint::Sendable() 575 { 576 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Sendable()\n", find_thread(NULL), 577 this); 578 579 UnixStreamEndpointLocker locker(this); 580 UnixStreamEndpointLocker peerLocker; 581 582 status_t error = _LockConnectedEndpoints(locker, peerLocker); 583 if (error != B_OK) 584 RETURN_ERROR(error); 585 586 // lock the peer's FIFO 587 UnixFifo* peerFifo = fPeerEndpoint->fReceiveFifo; 588 UnixFifoLocker fifoLocker(peerFifo); 589 590 RETURN_ERROR(peerFifo->Writable()); 591 } 592 593 594 ssize_t 595 UnixStreamEndpoint::Receivable() 596 { 597 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receivable()\n", find_thread(NULL), 598 this); 599 600 UnixStreamEndpointLocker locker(this); 601 602 if (fState == unix_stream_endpoint_state::Listening) 603 return gSocketModule->count_connected(socket); 604 605 if (fState != unix_stream_endpoint_state::Connected) 606 RETURN_ERROR(ENOTCONN); 607 608 UnixFifoLocker fifoLocker(fReceiveFifo); 609 ssize_t readable = fReceiveFifo->Readable(); 610 if (readable == 0 && (fReceiveFifo->IsWriteShutdown() 611 || fReceiveFifo->IsReadShutdown())) { 612 RETURN_ERROR(ENOTCONN); 613 } 614 RETURN_ERROR(readable); 615 } 616 617 618 status_t 619 UnixStreamEndpoint::SetReceiveBufferSize(size_t size) 620 { 621 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::SetReceiveBufferSize(%lu)\n", 622 find_thread(NULL), this, size); 623 624 UnixStreamEndpointLocker locker(this); 625 626 if (fReceiveFifo == NULL) 627 return B_BAD_VALUE; 628 629 UnixFifoLocker fifoLocker(fReceiveFifo); 630 return fReceiveFifo->SetBufferCapacity(size); 631 } 632 633 634 status_t 635 UnixStreamEndpoint::GetPeerCredentials(ucred* credentials) 636 { 637 UnixStreamEndpointLocker locker(this); 638 UnixStreamEndpointLocker peerLocker; 639 640 status_t error = _LockConnectedEndpoints(locker, peerLocker); 641 if (error != B_OK) 642 RETURN_ERROR(error); 643 644 *credentials = fPeerEndpoint->fCredentials; 645 646 return B_OK; 647 } 648 649 650 status_t 651 UnixStreamEndpoint::Shutdown(int direction) 652 { 653 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Shutdown(%d)\n", 654 find_thread(NULL), this, direction); 655 656 uint32 shutdown; 657 uint32 peerShutdown; 658 659 // translate the direction into shutdown flags for our and the peer fifo 660 switch (direction) { 661 case SHUT_RD: 662 shutdown = UNIX_FIFO_SHUTDOWN_READ; 663 peerShutdown = 0; 664 break; 665 case SHUT_WR: 666 shutdown = 0; 667 peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE; 668 break; 669 case SHUT_RDWR: 670 shutdown = UNIX_FIFO_SHUTDOWN_READ; 671 peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE; 672 break; 673 default: 674 RETURN_ERROR(B_BAD_VALUE); 675 } 676 677 // lock endpoints 678 UnixStreamEndpointLocker locker(this); 679 UnixStreamEndpointLocker peerLocker; 680 681 status_t error = _LockConnectedEndpoints(locker, peerLocker); 682 if (error != B_OK) 683 RETURN_ERROR(error); 684 685 // shutdown our FIFO 686 fReceiveFifo->Lock(); 687 fReceiveFifo->Shutdown(shutdown); 688 fReceiveFifo->Unlock(); 689 690 // shutdown peer FIFO 691 fPeerEndpoint->fReceiveFifo->Lock(); 692 fPeerEndpoint->fReceiveFifo->Shutdown(peerShutdown); 693 fPeerEndpoint->fReceiveFifo->Unlock(); 694 695 // send select notifications 696 if (direction == SHUT_RD || direction == SHUT_RDWR) { 697 gSocketModule->notify(socket, B_SELECT_READ, EPIPE); 698 gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_WRITE, EPIPE); 699 } 700 if (direction == SHUT_WR || direction == SHUT_RDWR) { 701 gSocketModule->notify(socket, B_SELECT_WRITE, EPIPE); 702 gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_READ, EPIPE); 703 } 704 705 RETURN_ERROR(B_OK); 706 } 707 708 709 void 710 UnixStreamEndpoint::_Spawn(UnixStreamEndpoint* connectingEndpoint, 711 UnixStreamEndpoint* listeningEndpoint, UnixFifo* fifo) 712 { 713 ProtocolSocket::Open(); 714 715 fIsChild = true; 716 fPeerEndpoint = connectingEndpoint; 717 fPeerEndpoint->AcquireReference(); 718 719 fReceiveFifo = fifo; 720 721 PeerAddress().SetTo(&connectingEndpoint->socket->address); 722 723 fCredentials = listeningEndpoint->fCredentials; 724 725 fState = unix_stream_endpoint_state::Connected; 726 727 gSocketModule->set_connected(Socket()); 728 } 729 730 731 void 732 UnixStreamEndpoint::_Disconnect() 733 { 734 // Both endpoints must be locked. 735 736 // Write shutdown the receive FIFO. 737 fReceiveFifo->Lock(); 738 fReceiveFifo->Shutdown(UNIX_FIFO_SHUTDOWN_WRITE); 739 fReceiveFifo->Unlock(); 740 741 // select() notification. 742 gSocketModule->notify(socket, B_SELECT_READ, ECONNRESET); 743 gSocketModule->notify(socket, B_SELECT_WRITE, ECONNRESET); 744 745 // Unset the peer endpoint. 746 fPeerEndpoint->ReleaseReference(); 747 fPeerEndpoint = NULL; 748 749 // We're officially disconnected. 750 // TODO: Deal with non accept()ed connections correctly! 751 fIsChild = false; 752 fState = unix_stream_endpoint_state::NotConnected; 753 } 754 755 756 status_t 757 UnixStreamEndpoint::_LockConnectedEndpoints(UnixStreamEndpointLocker& locker, 758 UnixStreamEndpointLocker& peerLocker) 759 { 760 if (fState != unix_stream_endpoint_state::Connected) 761 RETURN_ERROR(fWasConnected ? EPIPE : ENOTCONN); 762 763 // We need to lock the peer, too. Get a reference -- we might need to 764 // unlock ourselves to get the locking order right. 765 BReference<UnixStreamEndpoint> peerReference(fPeerEndpoint); 766 UnixStreamEndpoint* peerEndpoint = fPeerEndpoint; 767 768 if (fIsChild) { 769 // We're the child, but locking order is the other way around. 770 locker.Unlock(); 771 peerLocker.SetTo(peerEndpoint, false); 772 locker.Lock(); 773 774 // recheck our state, also whether the peer is still the same 775 if (fState != unix_stream_endpoint_state::Connected || peerEndpoint != fPeerEndpoint) { 776 peerLocker.Unset(); 777 RETURN_ERROR(ENOTCONN); 778 } 779 } else 780 peerLocker.SetTo(peerEndpoint, false); 781 782 RETURN_ERROR(B_OK); 783 } 784 785 786 status_t 787 UnixStreamEndpoint::_Unbind() 788 { 789 if (fState == unix_stream_endpoint_state::Connected 790 || fState == unix_stream_endpoint_state::Listening) 791 RETURN_ERROR(B_BAD_VALUE); 792 793 if (IsBound()) 794 RETURN_ERROR(UnixEndpoint::_Unbind()); 795 796 RETURN_ERROR(B_OK); 797 } 798 799 800 void 801 UnixStreamEndpoint::_UnsetReceiveFifo() 802 { 803 if (fReceiveFifo) { 804 fReceiveFifo->ReleaseReference(); 805 fReceiveFifo = NULL; 806 } 807 } 808 809 810 void 811 UnixStreamEndpoint::_StopListening() 812 { 813 if (fState == unix_stream_endpoint_state::Listening) { 814 delete_sem(fAcceptSemaphore); 815 fAcceptSemaphore = -1; 816 fState = unix_stream_endpoint_state::NotConnected; 817 } 818 } 819