1 /* 2 * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de. 3 * Distributed under the terms of the MIT License. 4 */ 5 6 7 #include "UnixStreamEndpoint.h" 8 9 #include <stdio.h> 10 #include <sys/stat.h> 11 12 #include <AutoDeleter.h> 13 14 #include <vfs.h> 15 16 #include "UnixAddressManager.h" 17 #include "UnixFifo.h" 18 19 20 #define UNIX_STREAM_ENDPOINT_DEBUG_LEVEL 0 21 #define UNIX_DEBUG_LEVEL UNIX_STREAM_ENDPOINT_DEBUG_LEVEL 22 #include "UnixDebug.h" 23 24 25 // Note on locking order (outermost -> innermost): 26 // UnixStreamEndpoint: connecting -> listening -> child 27 // -> UnixFifo (never lock more than one at a time) 28 // -> UnixAddressManager 29 30 31 UnixStreamEndpoint::UnixStreamEndpoint(net_socket* socket) 32 : 33 UnixEndpoint(socket), 34 fPeerEndpoint(NULL), 35 fReceiveFifo(NULL), 36 fState(unix_stream_endpoint_state::Closed), 37 fAcceptSemaphore(-1), 38 fIsChild(false), 39 fWasConnected(false) 40 { 41 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::UnixStreamEndpoint()\n", 42 find_thread(NULL), this); 43 } 44 45 46 UnixStreamEndpoint::~UnixStreamEndpoint() 47 { 48 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::~UnixStreamEndpoint()\n", 49 find_thread(NULL), this); 50 } 51 52 53 status_t 54 UnixStreamEndpoint::Init() 55 { 56 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Init()\n", find_thread(NULL), 57 this); 58 59 RETURN_ERROR(B_OK); 60 } 61 62 63 void 64 UnixStreamEndpoint::Uninit() 65 { 66 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Uninit()\n", find_thread(NULL), 67 this); 68 69 // check whether we're closed 70 UnixStreamEndpointLocker locker(this); 71 bool closed = (fState == unix_stream_endpoint_state::Closed); 72 locker.Unlock(); 73 74 if (!closed) { 75 // That probably means, we're a child endpoint of a listener and 76 // have been fully connected, but not yet accepted. Our Close() 77 // hook isn't called in this case. Do it manually. 78 Close(); 79 } 80 81 ReleaseReference(); 82 } 83 84 85 status_t 86 UnixStreamEndpoint::Open() 87 { 88 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Open()\n", find_thread(NULL), 89 this); 90 91 status_t error = ProtocolSocket::Open(); 92 if (error != B_OK) 93 RETURN_ERROR(error); 94 95 fState = unix_stream_endpoint_state::NotConnected; 96 97 RETURN_ERROR(B_OK); 98 } 99 100 101 status_t 102 UnixStreamEndpoint::Close() 103 { 104 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Close()\n", find_thread(NULL), 105 this); 106 107 UnixStreamEndpointLocker locker(this); 108 109 if (fState == unix_stream_endpoint_state::Connected) { 110 UnixStreamEndpointLocker peerLocker; 111 if (_LockConnectedEndpoints(locker, peerLocker) == B_OK) { 112 // We're still connected. Disconnect both endpoints! 113 fPeerEndpoint->_Disconnect(); 114 _Disconnect(); 115 } 116 } 117 118 if (fState == unix_stream_endpoint_state::Listening) 119 _StopListening(); 120 121 _Unbind(); 122 123 fState = unix_stream_endpoint_state::Closed; 124 RETURN_ERROR(B_OK); 125 } 126 127 128 status_t 129 UnixStreamEndpoint::Free() 130 { 131 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Free()\n", find_thread(NULL), 132 this); 133 134 UnixStreamEndpointLocker locker(this); 135 136 _UnsetReceiveFifo(); 137 138 RETURN_ERROR(B_OK); 139 } 140 141 142 status_t 143 UnixStreamEndpoint::Bind(const struct sockaddr* _address) 144 { 145 if (_address->sa_family != AF_UNIX) 146 RETURN_ERROR(EAFNOSUPPORT); 147 148 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Bind(\"%s\")\n", 149 find_thread(NULL), this, 150 ConstSocketAddress(&gAddressModule, _address).AsString().Data()); 151 152 const sockaddr_un* address = (const sockaddr_un*)_address; 153 154 UnixStreamEndpointLocker endpointLocker(this); 155 156 if (fState != unix_stream_endpoint_state::NotConnected || IsBound()) 157 RETURN_ERROR(B_BAD_VALUE); 158 159 RETURN_ERROR(_Bind(address)); 160 } 161 162 163 status_t 164 UnixStreamEndpoint::Unbind() 165 { 166 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Unbind()\n", find_thread(NULL), 167 this); 168 169 UnixStreamEndpointLocker endpointLocker(this); 170 171 RETURN_ERROR(_Unbind()); 172 } 173 174 175 status_t 176 UnixStreamEndpoint::Listen(int backlog) 177 { 178 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Listen(%d)\n", find_thread(NULL), 179 this, backlog); 180 181 UnixStreamEndpointLocker endpointLocker(this); 182 183 if (!IsBound()) 184 RETURN_ERROR(EDESTADDRREQ); 185 if (fState != unix_stream_endpoint_state::NotConnected 186 && fState != unix_stream_endpoint_state::Listening) 187 RETURN_ERROR(EINVAL); 188 189 gSocketModule->set_max_backlog(socket, backlog); 190 191 if (fState == unix_stream_endpoint_state::NotConnected) { 192 fAcceptSemaphore = create_sem(0, "unix accept"); 193 if (fAcceptSemaphore < 0) 194 RETURN_ERROR(ENOBUFS); 195 196 _UnsetReceiveFifo(); 197 198 fCredentials.pid = getpid(); 199 fCredentials.uid = geteuid(); 200 fCredentials.gid = getegid(); 201 202 fState = unix_stream_endpoint_state::Listening; 203 } 204 205 RETURN_ERROR(B_OK); 206 } 207 208 209 status_t 210 UnixStreamEndpoint::Connect(const struct sockaddr* _address) 211 { 212 if (_address->sa_family != AF_UNIX) 213 RETURN_ERROR(EAFNOSUPPORT); 214 215 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Connect(\"%s\")\n", 216 find_thread(NULL), this, 217 ConstSocketAddress(&gAddressModule, _address).AsString().Data()); 218 219 const sockaddr_un* address = (const sockaddr_un*)_address; 220 221 UnixStreamEndpointLocker endpointLocker(this); 222 223 if (fState == unix_stream_endpoint_state::Connected) 224 RETURN_ERROR(EISCONN); 225 226 if (fState != unix_stream_endpoint_state::NotConnected) 227 RETURN_ERROR(B_BAD_VALUE); 228 // TODO: If listening, we could set the backlog to 0 and connect. 229 230 // check the address first 231 UnixAddress unixAddress; 232 233 if (address->sun_path[0] == '\0') { 234 // internal address space (or empty address) 235 int32 internalID; 236 if (UnixAddress::IsEmptyAddress(*address)) 237 RETURN_ERROR(B_BAD_VALUE); 238 239 internalID = UnixAddress::InternalID(*address); 240 if (internalID < 0) 241 RETURN_ERROR(internalID); 242 243 unixAddress.SetTo(internalID); 244 } else { 245 // FS address space 246 size_t pathLen = strnlen(address->sun_path, sizeof(address->sun_path)); 247 if (pathLen == 0 || pathLen == sizeof(address->sun_path)) 248 RETURN_ERROR(B_BAD_VALUE); 249 250 struct stat st; 251 status_t error = vfs_read_stat(-1, address->sun_path, true, &st, 252 !gStackModule->is_syscall()); 253 if (error != B_OK) 254 RETURN_ERROR(error); 255 256 if (!S_ISSOCK(st.st_mode)) 257 RETURN_ERROR(B_BAD_VALUE); 258 259 unixAddress.SetTo(st.st_dev, st.st_ino, NULL); 260 } 261 262 // get the peer endpoint 263 UnixAddressManagerLocker addressLocker(gAddressManager); 264 UnixEndpoint* listeningUnixEndpoint = gAddressManager.Lookup(unixAddress); 265 if (listeningUnixEndpoint == NULL) 266 RETURN_ERROR(ECONNREFUSED); 267 UnixStreamEndpoint* listeningEndpoint 268 = dynamic_cast<UnixStreamEndpoint*>(listeningUnixEndpoint); 269 if (listeningEndpoint == NULL) 270 RETURN_ERROR(EPROTOTYPE); 271 BReference<UnixStreamEndpoint> peerReference(listeningEndpoint); 272 addressLocker.Unlock(); 273 274 UnixStreamEndpointLocker peerLocker(listeningEndpoint); 275 276 if (!listeningEndpoint->IsBound() 277 || listeningEndpoint->fState != unix_stream_endpoint_state::Listening 278 || listeningEndpoint->fAddress != unixAddress) { 279 RETURN_ERROR(ECONNREFUSED); 280 } 281 282 // Allocate FIFOs for us and the socket we're going to spawn. We do that 283 // now, so that the mess we need to cleanup, if allocating them fails, is 284 // harmless. 285 UnixFifo* fifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT, UnixFifoType::Stream); 286 UnixFifo* peerFifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT, UnixFifoType::Stream); 287 ObjectDeleter<UnixFifo> fifoDeleter(fifo); 288 ObjectDeleter<UnixFifo> peerFifoDeleter(peerFifo); 289 290 status_t error; 291 if ((error = fifo->Init()) != B_OK || (error = peerFifo->Init()) != B_OK) 292 return error; 293 294 // spawn new endpoint for accept() 295 net_socket* newSocket; 296 error = gSocketModule->spawn_pending_socket(listeningEndpoint->socket, 297 &newSocket); 298 if (error != B_OK) 299 RETURN_ERROR(error); 300 301 // init connected peer endpoint 302 UnixStreamEndpoint* connectedEndpoint = (UnixStreamEndpoint*)newSocket->first_protocol; 303 304 UnixStreamEndpointLocker connectedLocker(connectedEndpoint); 305 306 connectedEndpoint->_Spawn(this, listeningEndpoint, peerFifo); 307 308 // update our attributes 309 _UnsetReceiveFifo(); 310 311 fPeerEndpoint = connectedEndpoint; 312 PeerAddress().SetTo(&connectedEndpoint->socket->address); 313 fPeerEndpoint->AcquireReference(); 314 fReceiveFifo = fifo; 315 316 fCredentials.pid = getpid(); 317 fCredentials.uid = geteuid(); 318 fCredentials.gid = getegid(); 319 320 fifoDeleter.Detach(); 321 peerFifoDeleter.Detach(); 322 323 fState = unix_stream_endpoint_state::Connected; 324 fWasConnected = true; 325 326 gSocketModule->set_connected(Socket()); 327 328 release_sem(listeningEndpoint->fAcceptSemaphore); 329 330 connectedLocker.Unlock(); 331 peerLocker.Unlock(); 332 endpointLocker.Unlock(); 333 334 RETURN_ERROR(B_OK); 335 } 336 337 338 status_t 339 UnixStreamEndpoint::Accept(net_socket** _acceptedSocket) 340 { 341 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Accept()\n", find_thread(NULL), 342 this); 343 344 bigtime_t timeout = absolute_timeout(socket->receive.timeout); 345 if (gStackModule->is_restarted_syscall()) 346 timeout = gStackModule->restore_syscall_restart_timeout(); 347 else 348 gStackModule->store_syscall_restart_timeout(timeout); 349 350 UnixStreamEndpointLocker locker(this); 351 352 status_t error; 353 do { 354 locker.Unlock(); 355 356 error = acquire_sem_etc(fAcceptSemaphore, 1, 357 B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout); 358 if (error < B_OK) 359 break; 360 361 locker.Lock(); 362 error = gSocketModule->dequeue_connected(socket, _acceptedSocket); 363 } while (error != B_OK); 364 365 if (error == B_TIMED_OUT && timeout == 0) { 366 // translate non-blocking timeouts to the correct error code 367 error = B_WOULD_BLOCK; 368 } 369 370 RETURN_ERROR(error); 371 } 372 373 374 ssize_t 375 UnixStreamEndpoint::Send(const iovec* vecs, size_t vecCount, 376 ancillary_data_container* ancillaryData, 377 const struct sockaddr* address, socklen_t addressLength, int flags) 378 { 379 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Send(%p, %ld, %p)\n", 380 find_thread(NULL), this, vecs, vecCount, ancillaryData); 381 382 if ((flags & ~(MSG_DONTWAIT)) != 0) 383 return EOPNOTSUPP; 384 385 bigtime_t timeout = 0; 386 if ((flags & MSG_DONTWAIT) == 0) { 387 timeout = absolute_timeout(socket->send.timeout); 388 if (gStackModule->is_restarted_syscall()) 389 timeout = gStackModule->restore_syscall_restart_timeout(); 390 else 391 gStackModule->store_syscall_restart_timeout(timeout); 392 } 393 394 UnixStreamEndpointLocker locker(this); 395 396 BReference<UnixStreamEndpoint> peerReference; 397 UnixStreamEndpointLocker peerLocker; 398 399 status_t error = _LockConnectedEndpoints(locker, peerLocker); 400 if (error != B_OK) 401 RETURN_ERROR(error); 402 403 UnixStreamEndpoint* peerEndpoint = fPeerEndpoint; 404 peerReference.SetTo(peerEndpoint); 405 406 // lock the peer's FIFO 407 UnixFifo* peerFifo = peerEndpoint->fReceiveFifo; 408 BReference<UnixFifo> _(peerFifo); 409 UnixFifoLocker fifoLocker(peerFifo); 410 411 // unlock endpoints 412 locker.Unlock(); 413 peerLocker.Unlock(); 414 415 ssize_t result = peerFifo->Write(vecs, vecCount, ancillaryData, NULL, timeout); 416 417 // Notify select()ing readers, if we successfully wrote anything. 418 size_t readable = peerFifo->Readable(); 419 bool notifyRead = (error == B_OK && readable > 0 420 && !peerFifo->IsReadShutdown()); 421 422 // Notify select()ing writers, if we failed to write anything and there's 423 // still room to write. 424 size_t writable = peerFifo->Writable(); 425 bool notifyWrite = (error != B_OK && writable > 0 426 && !peerFifo->IsWriteShutdown()); 427 428 // re-lock our endpoint (unlock FIFO to respect locking order) 429 fifoLocker.Unlock(); 430 locker.Lock(); 431 432 bool peerLocked = (fPeerEndpoint == peerEndpoint 433 && _LockConnectedEndpoints(locker, peerLocker) == B_OK); 434 435 // send notifications 436 if (peerLocked && notifyRead) 437 gSocketModule->notify(peerEndpoint->socket, B_SELECT_READ, readable); 438 if (notifyWrite) 439 gSocketModule->notify(socket, B_SELECT_WRITE, writable); 440 441 switch (result) { 442 case UNIX_FIFO_SHUTDOWN: 443 if (fPeerEndpoint == peerEndpoint 444 && fState == unix_stream_endpoint_state::Connected) { 445 // Orderly write shutdown on our side. 446 // Note: Linux and Solaris also send a SIGPIPE, but according 447 // the send() specification that shouldn't be done. 448 result = EPIPE; 449 } else { 450 // The FD has been closed. 451 result = EBADF; 452 } 453 break; 454 case EPIPE: 455 // The socket module will generate SIGPIPE for us, if necessary. 456 break; 457 case B_TIMED_OUT: 458 // Translate non-blocking timeouts to the correct error code. 459 if (timeout == 0) 460 result = B_WOULD_BLOCK; 461 break; 462 } 463 464 RETURN_ERROR(result); 465 } 466 467 468 ssize_t 469 UnixStreamEndpoint::Receive(const iovec* vecs, size_t vecCount, 470 ancillary_data_container** _ancillaryData, struct sockaddr* _address, 471 socklen_t* _addressLength, int flags) 472 { 473 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receive(%p, %ld)\n", 474 find_thread(NULL), this, vecs, vecCount); 475 476 if ((flags & ~(MSG_DONTWAIT)) != 0) 477 return EOPNOTSUPP; 478 479 bigtime_t timeout = 0; 480 if ((flags & MSG_DONTWAIT) == 0) { 481 timeout = absolute_timeout(socket->receive.timeout); 482 if (gStackModule->is_restarted_syscall()) 483 timeout = gStackModule->restore_syscall_restart_timeout(); 484 else 485 gStackModule->store_syscall_restart_timeout(timeout); 486 } 487 488 UnixStreamEndpointLocker locker(this); 489 490 // We can read as long as we have a FIFO. I.e. we are still connected, or 491 // disconnected and not yet reconnected/listening/closed. 492 if (fReceiveFifo == NULL) 493 RETURN_ERROR(ENOTCONN); 494 495 UnixStreamEndpoint* peerEndpoint = fPeerEndpoint; 496 BReference<UnixStreamEndpoint> peerReference(peerEndpoint); 497 498 // Copy the peer address upfront. This way, if we read something, we don't 499 // get into a potential race with Close(). 500 if (_address != NULL) { 501 socklen_t addrLen = min_c(*_addressLength, socket->peer.ss_len); 502 memcpy(_address, &socket->peer, addrLen); 503 *_addressLength = addrLen; 504 } 505 506 // lock our FIFO 507 UnixFifo* fifo = fReceiveFifo; 508 BReference<UnixFifo> _(fifo); 509 UnixFifoLocker fifoLocker(fifo); 510 511 // unlock endpoint 512 locker.Unlock(); 513 514 ssize_t result = fifo->Read(vecs, vecCount, _ancillaryData, NULL, timeout); 515 516 // Notify select()ing writers, if we successfully read anything. 517 size_t writable = fifo->Writable(); 518 bool notifyWrite = (result >= 0 && writable > 0 519 && !fifo->IsWriteShutdown()); 520 521 // Notify select()ing readers, if we failed to read anything and there's 522 // still something left to read. 523 size_t readable = fifo->Readable(); 524 bool notifyRead = (result < 0 && readable > 0 525 && !fifo->IsReadShutdown()); 526 527 // re-lock our endpoint (unlock FIFO to respect locking order) 528 fifoLocker.Unlock(); 529 locker.Lock(); 530 531 UnixStreamEndpointLocker peerLocker; 532 bool peerLocked = (peerEndpoint != NULL && fPeerEndpoint == peerEndpoint 533 && _LockConnectedEndpoints(locker, peerLocker) == B_OK); 534 535 // send notifications 536 if (notifyRead) 537 gSocketModule->notify(socket, B_SELECT_READ, readable); 538 if (peerLocked && notifyWrite) 539 gSocketModule->notify(peerEndpoint->socket, B_SELECT_WRITE, writable); 540 541 switch (result) { 542 case UNIX_FIFO_SHUTDOWN: 543 // Either our socket was closed or read shutdown. 544 if (fState == unix_stream_endpoint_state::Closed) { 545 // The FD has been closed. 546 result = EBADF; 547 } else { 548 // if (fReceiveFifo == fifo) { 549 // Orderly shutdown or the peer closed the connection. 550 // } else { 551 // Weird case: Peer closed connection and we are already 552 // reconnected (or listening). 553 // } 554 result = 0; 555 } 556 break; 557 case B_TIMED_OUT: 558 // translate non-blocking timeouts to the correct error code 559 if (timeout == 0) 560 result = B_WOULD_BLOCK; 561 break; 562 } 563 564 RETURN_ERROR(result); 565 } 566 567 568 ssize_t 569 UnixStreamEndpoint::Sendable() 570 { 571 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Sendable()\n", find_thread(NULL), 572 this); 573 574 UnixStreamEndpointLocker locker(this); 575 UnixStreamEndpointLocker peerLocker; 576 577 status_t error = _LockConnectedEndpoints(locker, peerLocker); 578 if (error != B_OK) 579 RETURN_ERROR(error); 580 581 // lock the peer's FIFO 582 UnixFifo* peerFifo = fPeerEndpoint->fReceiveFifo; 583 UnixFifoLocker fifoLocker(peerFifo); 584 585 RETURN_ERROR(peerFifo->Writable()); 586 } 587 588 589 ssize_t 590 UnixStreamEndpoint::Receivable() 591 { 592 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receivable()\n", find_thread(NULL), 593 this); 594 595 UnixStreamEndpointLocker locker(this); 596 597 if (fState == unix_stream_endpoint_state::Listening) 598 return gSocketModule->count_connected(socket); 599 600 if (fState != unix_stream_endpoint_state::Connected) 601 RETURN_ERROR(ENOTCONN); 602 603 UnixFifoLocker fifoLocker(fReceiveFifo); 604 ssize_t readable = fReceiveFifo->Readable(); 605 if (readable == 0 && (fReceiveFifo->IsWriteShutdown() 606 || fReceiveFifo->IsReadShutdown())) { 607 RETURN_ERROR(ENOTCONN); 608 } 609 RETURN_ERROR(readable); 610 } 611 612 613 status_t 614 UnixStreamEndpoint::SetReceiveBufferSize(size_t size) 615 { 616 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::SetReceiveBufferSize(%lu)\n", 617 find_thread(NULL), this, size); 618 619 UnixStreamEndpointLocker locker(this); 620 621 if (fReceiveFifo == NULL) 622 return B_BAD_VALUE; 623 624 UnixFifoLocker fifoLocker(fReceiveFifo); 625 return fReceiveFifo->SetBufferCapacity(size); 626 } 627 628 629 status_t 630 UnixStreamEndpoint::GetPeerCredentials(ucred* credentials) 631 { 632 UnixStreamEndpointLocker locker(this); 633 UnixStreamEndpointLocker peerLocker; 634 635 status_t error = _LockConnectedEndpoints(locker, peerLocker); 636 if (error != B_OK) 637 RETURN_ERROR(error); 638 639 *credentials = fPeerEndpoint->fCredentials; 640 641 return B_OK; 642 } 643 644 645 status_t 646 UnixStreamEndpoint::Shutdown(int direction) 647 { 648 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Shutdown(%d)\n", 649 find_thread(NULL), this, direction); 650 651 uint32 shutdown; 652 uint32 peerShutdown; 653 654 // translate the direction into shutdown flags for our and the peer fifo 655 switch (direction) { 656 case SHUT_RD: 657 shutdown = UNIX_FIFO_SHUTDOWN_READ; 658 peerShutdown = 0; 659 break; 660 case SHUT_WR: 661 shutdown = 0; 662 peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE; 663 break; 664 case SHUT_RDWR: 665 shutdown = UNIX_FIFO_SHUTDOWN_READ; 666 peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE; 667 break; 668 default: 669 RETURN_ERROR(B_BAD_VALUE); 670 } 671 672 // lock endpoints 673 UnixStreamEndpointLocker locker(this); 674 UnixStreamEndpointLocker peerLocker; 675 676 status_t error = _LockConnectedEndpoints(locker, peerLocker); 677 if (error != B_OK) 678 RETURN_ERROR(error); 679 680 // shutdown our FIFO 681 fReceiveFifo->Lock(); 682 fReceiveFifo->Shutdown(shutdown); 683 fReceiveFifo->Unlock(); 684 685 // shutdown peer FIFO 686 fPeerEndpoint->fReceiveFifo->Lock(); 687 fPeerEndpoint->fReceiveFifo->Shutdown(peerShutdown); 688 fPeerEndpoint->fReceiveFifo->Unlock(); 689 690 // send select notifications 691 if (direction == SHUT_RD || direction == SHUT_RDWR) { 692 gSocketModule->notify(socket, B_SELECT_READ, EPIPE); 693 gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_WRITE, EPIPE); 694 } 695 if (direction == SHUT_WR || direction == SHUT_RDWR) { 696 gSocketModule->notify(socket, B_SELECT_WRITE, EPIPE); 697 gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_READ, EPIPE); 698 } 699 700 RETURN_ERROR(B_OK); 701 } 702 703 704 void 705 UnixStreamEndpoint::_Spawn(UnixStreamEndpoint* connectingEndpoint, 706 UnixStreamEndpoint* listeningEndpoint, UnixFifo* fifo) 707 { 708 ProtocolSocket::Open(); 709 710 fIsChild = true; 711 fPeerEndpoint = connectingEndpoint; 712 fPeerEndpoint->AcquireReference(); 713 714 fReceiveFifo = fifo; 715 716 PeerAddress().SetTo(&connectingEndpoint->socket->address); 717 718 fCredentials = listeningEndpoint->fCredentials; 719 720 fState = unix_stream_endpoint_state::Connected; 721 722 gSocketModule->set_connected(Socket()); 723 } 724 725 726 void 727 UnixStreamEndpoint::_Disconnect() 728 { 729 // Both endpoints must be locked. 730 731 // Write shutdown the receive FIFO. 732 fReceiveFifo->Lock(); 733 fReceiveFifo->Shutdown(UNIX_FIFO_SHUTDOWN_WRITE); 734 fReceiveFifo->Unlock(); 735 736 // select() notification. 737 gSocketModule->notify(socket, B_SELECT_READ, ECONNRESET); 738 gSocketModule->notify(socket, B_SELECT_WRITE, ECONNRESET); 739 740 // Unset the peer endpoint. 741 fPeerEndpoint->ReleaseReference(); 742 fPeerEndpoint = NULL; 743 744 // We're officially disconnected. 745 // TODO: Deal with non accept()ed connections correctly! 746 fIsChild = false; 747 fState = unix_stream_endpoint_state::NotConnected; 748 } 749 750 751 status_t 752 UnixStreamEndpoint::_LockConnectedEndpoints(UnixStreamEndpointLocker& locker, 753 UnixStreamEndpointLocker& peerLocker) 754 { 755 if (fState != unix_stream_endpoint_state::Connected) 756 RETURN_ERROR(fWasConnected ? EPIPE : ENOTCONN); 757 758 // We need to lock the peer, too. Get a reference -- we might need to 759 // unlock ourselves to get the locking order right. 760 BReference<UnixStreamEndpoint> peerReference(fPeerEndpoint); 761 UnixStreamEndpoint* peerEndpoint = fPeerEndpoint; 762 763 if (fIsChild) { 764 // We're the child, but locking order is the other way around. 765 locker.Unlock(); 766 peerLocker.SetTo(peerEndpoint, false); 767 768 locker.Lock(); 769 770 // recheck our state, also whether the peer is still the same 771 if (fState != unix_stream_endpoint_state::Connected || peerEndpoint != fPeerEndpoint) 772 RETURN_ERROR(ENOTCONN); 773 } else 774 peerLocker.SetTo(peerEndpoint, false); 775 776 RETURN_ERROR(B_OK); 777 } 778 779 780 status_t 781 UnixStreamEndpoint::_Unbind() 782 { 783 if (fState == unix_stream_endpoint_state::Connected 784 || fState == unix_stream_endpoint_state::Listening) 785 RETURN_ERROR(B_BAD_VALUE); 786 787 if (IsBound()) 788 RETURN_ERROR(UnixEndpoint::_Unbind()); 789 790 RETURN_ERROR(B_OK); 791 } 792 793 794 void 795 UnixStreamEndpoint::_UnsetReceiveFifo() 796 { 797 if (fReceiveFifo) { 798 fReceiveFifo->ReleaseReference(); 799 fReceiveFifo = NULL; 800 } 801 } 802 803 804 void 805 UnixStreamEndpoint::_StopListening() 806 { 807 if (fState == unix_stream_endpoint_state::Listening) { 808 delete_sem(fAcceptSemaphore); 809 fAcceptSemaphore = -1; 810 fState = unix_stream_endpoint_state::NotConnected; 811 } 812 } 813