1 /* 2 * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de. 3 * Distributed under the terms of the MIT License. 4 */ 5 6 7 #include "UnixStreamEndpoint.h" 8 9 #include <stdio.h> 10 #include <sys/stat.h> 11 12 #include <AutoDeleter.h> 13 14 #include <vfs.h> 15 16 #include "UnixAddressManager.h" 17 #include "UnixFifo.h" 18 19 20 #define UNIX_STREAM_ENDPOINT_DEBUG_LEVEL 0 21 #define UNIX_DEBUG_LEVEL UNIX_STREAM_ENDPOINT_DEBUG_LEVEL 22 #include "UnixDebug.h" 23 24 25 // Note on locking order (outermost -> innermost): 26 // UnixStreamEndpoint: connecting -> listening -> child 27 // -> UnixFifo (never lock more than one at a time) 28 // -> UnixAddressManager 29 30 31 UnixStreamEndpoint::UnixStreamEndpoint(net_socket* socket) 32 : 33 UnixEndpoint(socket), 34 fPeerEndpoint(NULL), 35 fReceiveFifo(NULL), 36 fState(unix_stream_endpoint_state::Closed), 37 fAcceptSemaphore(-1), 38 fIsChild(false), 39 fWasConnected(false) 40 { 41 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::UnixStreamEndpoint()\n", 42 find_thread(NULL), this); 43 } 44 45 46 UnixStreamEndpoint::~UnixStreamEndpoint() 47 { 48 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::~UnixStreamEndpoint()\n", 49 find_thread(NULL), this); 50 } 51 52 53 status_t 54 UnixStreamEndpoint::Init() 55 { 56 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Init()\n", find_thread(NULL), 57 this); 58 59 RETURN_ERROR(B_OK); 60 } 61 62 63 void 64 UnixStreamEndpoint::Uninit() 65 { 66 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Uninit()\n", find_thread(NULL), 67 this); 68 69 // check whether we're closed 70 UnixStreamEndpointLocker locker(this); 71 bool closed = (fState == unix_stream_endpoint_state::Closed); 72 locker.Unlock(); 73 74 if (!closed) { 75 // That probably means, we're a child endpoint of a listener and 76 // have been fully connected, but not yet accepted. Our Close() 77 // hook isn't called in this case. Do it manually. 78 Close(); 79 } 80 81 ReleaseReference(); 82 } 83 84 85 status_t 86 UnixStreamEndpoint::Open() 87 { 88 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Open()\n", find_thread(NULL), 89 this); 90 91 status_t error = ProtocolSocket::Open(); 92 if (error != B_OK) 93 RETURN_ERROR(error); 94 95 fState = unix_stream_endpoint_state::NotConnected; 96 97 RETURN_ERROR(B_OK); 98 } 99 100 101 status_t 102 UnixStreamEndpoint::Close() 103 { 104 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Close()\n", find_thread(NULL), 105 this); 106 107 UnixStreamEndpointLocker locker(this); 108 109 if (fState == unix_stream_endpoint_state::Connected) { 110 UnixStreamEndpointLocker peerLocker; 111 if (_LockConnectedEndpoints(locker, peerLocker) == B_OK) { 112 // We're still connected. Disconnect both endpoints! 113 fPeerEndpoint->_Disconnect(); 114 _Disconnect(); 115 } 116 } 117 118 if (fState == unix_stream_endpoint_state::Listening) 119 _StopListening(); 120 121 _Unbind(); 122 123 fState = unix_stream_endpoint_state::Closed; 124 RETURN_ERROR(B_OK); 125 } 126 127 128 status_t 129 UnixStreamEndpoint::Free() 130 { 131 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Free()\n", find_thread(NULL), 132 this); 133 134 UnixStreamEndpointLocker locker(this); 135 136 _UnsetReceiveFifo(); 137 138 RETURN_ERROR(B_OK); 139 } 140 141 142 status_t 143 UnixStreamEndpoint::Bind(const struct sockaddr* _address) 144 { 145 if (_address->sa_family != AF_UNIX) 146 RETURN_ERROR(EAFNOSUPPORT); 147 148 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Bind(\"%s\")\n", 149 find_thread(NULL), this, 150 ConstSocketAddress(&gAddressModule, _address).AsString().Data()); 151 152 const sockaddr_un* address = (const sockaddr_un*)_address; 153 154 UnixStreamEndpointLocker endpointLocker(this); 155 156 if (fState != unix_stream_endpoint_state::NotConnected || IsBound()) 157 RETURN_ERROR(B_BAD_VALUE); 158 159 RETURN_ERROR(_Bind(address)); 160 } 161 162 163 status_t 164 UnixStreamEndpoint::Unbind() 165 { 166 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Unbind()\n", find_thread(NULL), 167 this); 168 169 UnixStreamEndpointLocker endpointLocker(this); 170 171 RETURN_ERROR(_Unbind()); 172 } 173 174 175 status_t 176 UnixStreamEndpoint::Listen(int backlog) 177 { 178 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Listen(%d)\n", find_thread(NULL), 179 this, backlog); 180 181 UnixStreamEndpointLocker endpointLocker(this); 182 183 if (!IsBound()) 184 RETURN_ERROR(EDESTADDRREQ); 185 if (fState != unix_stream_endpoint_state::NotConnected 186 && fState != unix_stream_endpoint_state::Listening) 187 RETURN_ERROR(EINVAL); 188 189 gSocketModule->set_max_backlog(socket, backlog); 190 191 if (fState == unix_stream_endpoint_state::NotConnected) { 192 fAcceptSemaphore = create_sem(0, "unix accept"); 193 if (fAcceptSemaphore < 0) 194 RETURN_ERROR(ENOBUFS); 195 196 _UnsetReceiveFifo(); 197 198 fCredentials.pid = getpid(); 199 fCredentials.uid = geteuid(); 200 fCredentials.gid = getegid(); 201 202 fState = unix_stream_endpoint_state::Listening; 203 } 204 205 RETURN_ERROR(B_OK); 206 } 207 208 209 status_t 210 UnixStreamEndpoint::Connect(const struct sockaddr* _address) 211 { 212 if (_address->sa_family != AF_UNIX) 213 RETURN_ERROR(EAFNOSUPPORT); 214 215 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Connect(\"%s\")\n", 216 find_thread(NULL), this, 217 ConstSocketAddress(&gAddressModule, _address).AsString().Data()); 218 219 const sockaddr_un* address = (const sockaddr_un*)_address; 220 221 UnixStreamEndpointLocker endpointLocker(this); 222 223 if (fState == unix_stream_endpoint_state::Connected) 224 RETURN_ERROR(EISCONN); 225 226 if (fState != unix_stream_endpoint_state::NotConnected) 227 RETURN_ERROR(B_BAD_VALUE); 228 // TODO: If listening, we could set the backlog to 0 and connect. 229 230 // check the address first 231 UnixAddress unixAddress; 232 233 if (address->sun_path[0] == '\0') { 234 // internal address space (or empty address) 235 int32 internalID; 236 if (UnixAddress::IsEmptyAddress(*address)) 237 RETURN_ERROR(B_BAD_VALUE); 238 239 internalID = UnixAddress::InternalID(*address); 240 if (internalID < 0) 241 RETURN_ERROR(internalID); 242 243 unixAddress.SetTo(internalID); 244 } else { 245 // FS address space 246 size_t pathLen = strnlen(address->sun_path, sizeof(address->sun_path)); 247 if (pathLen == 0 || pathLen == sizeof(address->sun_path)) 248 RETURN_ERROR(B_BAD_VALUE); 249 250 struct stat st; 251 status_t error = vfs_read_stat(-1, address->sun_path, true, &st, 252 !gStackModule->is_syscall()); 253 if (error != B_OK) 254 RETURN_ERROR(error); 255 256 if (!S_ISSOCK(st.st_mode)) 257 RETURN_ERROR(B_BAD_VALUE); 258 259 unixAddress.SetTo(st.st_dev, st.st_ino, NULL); 260 } 261 262 // get the peer endpoint 263 UnixAddressManagerLocker addressLocker(gAddressManager); 264 UnixEndpoint* listeningUnixEndpoint = gAddressManager.Lookup(unixAddress); 265 if (listeningUnixEndpoint == NULL) 266 RETURN_ERROR(ECONNREFUSED); 267 UnixStreamEndpoint* listeningEndpoint 268 = dynamic_cast<UnixStreamEndpoint*>(listeningUnixEndpoint); 269 if (listeningEndpoint == NULL) 270 RETURN_ERROR(EPROTOTYPE); 271 BReference<UnixStreamEndpoint> peerReference(listeningEndpoint); 272 addressLocker.Unlock(); 273 274 UnixStreamEndpointLocker peerLocker(listeningEndpoint); 275 276 if (!listeningEndpoint->IsBound() 277 || listeningEndpoint->fState != unix_stream_endpoint_state::Listening 278 || listeningEndpoint->fAddress != unixAddress) { 279 RETURN_ERROR(ECONNREFUSED); 280 } 281 282 // Allocate FIFOs for us and the socket we're going to spawn. We do that 283 // now, so that the mess we need to cleanup, if allocating them fails, is 284 // harmless. 285 UnixFifo* fifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT, UnixFifoType::Stream); 286 UnixFifo* peerFifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT, UnixFifoType::Stream); 287 ObjectDeleter<UnixFifo> fifoDeleter(fifo); 288 ObjectDeleter<UnixFifo> peerFifoDeleter(peerFifo); 289 290 status_t error; 291 if ((error = fifo->Init()) != B_OK || (error = peerFifo->Init()) != B_OK) 292 return error; 293 294 // spawn new endpoint for accept() 295 net_socket* newSocket; 296 error = gSocketModule->spawn_pending_socket(listeningEndpoint->socket, 297 &newSocket); 298 if (error != B_OK) 299 RETURN_ERROR(error); 300 301 // init connected peer endpoint 302 UnixStreamEndpoint* connectedEndpoint = (UnixStreamEndpoint*)newSocket->first_protocol; 303 304 UnixStreamEndpointLocker connectedLocker(connectedEndpoint); 305 306 connectedEndpoint->_Spawn(this, listeningEndpoint, peerFifo); 307 308 // update our attributes 309 _UnsetReceiveFifo(); 310 311 fPeerEndpoint = connectedEndpoint; 312 PeerAddress().SetTo(&connectedEndpoint->socket->address); 313 fPeerEndpoint->AcquireReference(); 314 fReceiveFifo = fifo; 315 316 fCredentials.pid = getpid(); 317 fCredentials.uid = geteuid(); 318 fCredentials.gid = getegid(); 319 320 fifoDeleter.Detach(); 321 peerFifoDeleter.Detach(); 322 323 fState = unix_stream_endpoint_state::Connected; 324 fWasConnected = true; 325 326 gSocketModule->set_connected(Socket()); 327 328 release_sem(listeningEndpoint->fAcceptSemaphore); 329 330 connectedLocker.Unlock(); 331 peerLocker.Unlock(); 332 endpointLocker.Unlock(); 333 334 RETURN_ERROR(B_OK); 335 } 336 337 338 status_t 339 UnixStreamEndpoint::Accept(net_socket** _acceptedSocket) 340 { 341 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Accept()\n", find_thread(NULL), 342 this); 343 344 bigtime_t timeout = absolute_timeout(socket->receive.timeout); 345 if (gStackModule->is_restarted_syscall()) 346 timeout = gStackModule->restore_syscall_restart_timeout(); 347 else 348 gStackModule->store_syscall_restart_timeout(timeout); 349 350 UnixStreamEndpointLocker locker(this); 351 352 status_t error; 353 do { 354 locker.Unlock(); 355 356 error = acquire_sem_etc(fAcceptSemaphore, 1, 357 B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout); 358 if (error < B_OK) 359 break; 360 361 locker.Lock(); 362 error = gSocketModule->dequeue_connected(socket, _acceptedSocket); 363 } while (error != B_OK); 364 365 if (error == B_TIMED_OUT && timeout == 0) { 366 // translate non-blocking timeouts to the correct error code 367 error = B_WOULD_BLOCK; 368 } 369 370 RETURN_ERROR(error); 371 } 372 373 374 ssize_t 375 UnixStreamEndpoint::Send(const iovec* vecs, size_t vecCount, 376 ancillary_data_container* ancillaryData, 377 const struct sockaddr* address, socklen_t addressLength, int flags) 378 { 379 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Send(%p, %ld, %p)\n", 380 find_thread(NULL), this, vecs, vecCount, ancillaryData); 381 382 bigtime_t timeout = 0; 383 if ((flags & MSG_DONTWAIT) == 0) { 384 timeout = absolute_timeout(socket->send.timeout); 385 if (gStackModule->is_restarted_syscall()) 386 timeout = gStackModule->restore_syscall_restart_timeout(); 387 else 388 gStackModule->store_syscall_restart_timeout(timeout); 389 } 390 391 UnixStreamEndpointLocker locker(this); 392 393 BReference<UnixStreamEndpoint> peerReference; 394 UnixStreamEndpointLocker peerLocker; 395 396 status_t error = _LockConnectedEndpoints(locker, peerLocker); 397 if (error != B_OK) 398 RETURN_ERROR(error); 399 400 UnixStreamEndpoint* peerEndpoint = fPeerEndpoint; 401 peerReference.SetTo(peerEndpoint); 402 403 // lock the peer's FIFO 404 UnixFifo* peerFifo = peerEndpoint->fReceiveFifo; 405 BReference<UnixFifo> _(peerFifo); 406 UnixFifoLocker fifoLocker(peerFifo); 407 408 // unlock endpoints 409 locker.Unlock(); 410 peerLocker.Unlock(); 411 412 ssize_t result = peerFifo->Write(vecs, vecCount, ancillaryData, NULL, timeout); 413 414 // Notify select()ing readers, if we successfully wrote anything. 415 size_t readable = peerFifo->Readable(); 416 bool notifyRead = (error == B_OK && readable > 0 417 && !peerFifo->IsReadShutdown()); 418 419 // Notify select()ing writers, if we failed to write anything and there's 420 // still room to write. 421 size_t writable = peerFifo->Writable(); 422 bool notifyWrite = (error != B_OK && writable > 0 423 && !peerFifo->IsWriteShutdown()); 424 425 // re-lock our endpoint (unlock FIFO to respect locking order) 426 fifoLocker.Unlock(); 427 locker.Lock(); 428 429 bool peerLocked = (fPeerEndpoint == peerEndpoint 430 && _LockConnectedEndpoints(locker, peerLocker) == B_OK); 431 432 // send notifications 433 if (peerLocked && notifyRead) 434 gSocketModule->notify(peerEndpoint->socket, B_SELECT_READ, readable); 435 if (notifyWrite) 436 gSocketModule->notify(socket, B_SELECT_WRITE, writable); 437 438 switch (result) { 439 case UNIX_FIFO_SHUTDOWN: 440 if (fPeerEndpoint == peerEndpoint 441 && fState == unix_stream_endpoint_state::Connected) { 442 // Orderly write shutdown on our side. 443 // Note: Linux and Solaris also send a SIGPIPE, but according 444 // the send() specification that shouldn't be done. 445 result = EPIPE; 446 } else { 447 // The FD has been closed. 448 result = EBADF; 449 } 450 break; 451 case EPIPE: 452 // The peer closed connection or shutdown its read side. Reward 453 // the caller with a SIGPIPE. 454 if (gStackModule->is_syscall()) 455 send_signal(find_thread(NULL), SIGPIPE); 456 break; 457 case B_TIMED_OUT: 458 // Translate non-blocking timeouts to the correct error code. 459 if (timeout == 0) 460 result = B_WOULD_BLOCK; 461 break; 462 } 463 464 RETURN_ERROR(result); 465 } 466 467 468 ssize_t 469 UnixStreamEndpoint::Receive(const iovec* vecs, size_t vecCount, 470 ancillary_data_container** _ancillaryData, struct sockaddr* _address, 471 socklen_t* _addressLength, int flags) 472 { 473 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receive(%p, %ld)\n", 474 find_thread(NULL), this, vecs, vecCount); 475 476 bigtime_t timeout = 0; 477 if ((flags & MSG_DONTWAIT) == 0) { 478 timeout = absolute_timeout(socket->receive.timeout); 479 if (gStackModule->is_restarted_syscall()) 480 timeout = gStackModule->restore_syscall_restart_timeout(); 481 else 482 gStackModule->store_syscall_restart_timeout(timeout); 483 } 484 485 UnixStreamEndpointLocker locker(this); 486 487 // We can read as long as we have a FIFO. I.e. we are still connected, or 488 // disconnected and not yet reconnected/listening/closed. 489 if (fReceiveFifo == NULL) 490 RETURN_ERROR(ENOTCONN); 491 492 UnixStreamEndpoint* peerEndpoint = fPeerEndpoint; 493 BReference<UnixStreamEndpoint> peerReference(peerEndpoint); 494 495 // Copy the peer address upfront. This way, if we read something, we don't 496 // get into a potential race with Close(). 497 if (_address != NULL) { 498 socklen_t addrLen = min_c(*_addressLength, socket->peer.ss_len); 499 memcpy(_address, &socket->peer, addrLen); 500 *_addressLength = addrLen; 501 } 502 503 // lock our FIFO 504 UnixFifo* fifo = fReceiveFifo; 505 BReference<UnixFifo> _(fifo); 506 UnixFifoLocker fifoLocker(fifo); 507 508 // unlock endpoint 509 locker.Unlock(); 510 511 ssize_t result = fifo->Read(vecs, vecCount, _ancillaryData, NULL, timeout); 512 513 // Notify select()ing writers, if we successfully read anything. 514 size_t writable = fifo->Writable(); 515 bool notifyWrite = (result >= 0 && writable > 0 516 && !fifo->IsWriteShutdown()); 517 518 // Notify select()ing readers, if we failed to read anything and there's 519 // still something left to read. 520 size_t readable = fifo->Readable(); 521 bool notifyRead = (result < 0 && readable > 0 522 && !fifo->IsReadShutdown()); 523 524 // re-lock our endpoint (unlock FIFO to respect locking order) 525 fifoLocker.Unlock(); 526 locker.Lock(); 527 528 UnixStreamEndpointLocker peerLocker; 529 bool peerLocked = (peerEndpoint != NULL && fPeerEndpoint == peerEndpoint 530 && _LockConnectedEndpoints(locker, peerLocker) == B_OK); 531 532 // send notifications 533 if (notifyRead) 534 gSocketModule->notify(socket, B_SELECT_READ, readable); 535 if (peerLocked && notifyWrite) 536 gSocketModule->notify(peerEndpoint->socket, B_SELECT_WRITE, writable); 537 538 switch (result) { 539 case UNIX_FIFO_SHUTDOWN: 540 // Either our socket was closed or read shutdown. 541 if (fState == unix_stream_endpoint_state::Closed) { 542 // The FD has been closed. 543 result = EBADF; 544 } else { 545 // if (fReceiveFifo == fifo) { 546 // Orderly shutdown or the peer closed the connection. 547 // } else { 548 // Weird case: Peer closed connection and we are already 549 // reconnected (or listening). 550 // } 551 result = 0; 552 } 553 break; 554 case B_TIMED_OUT: 555 // translate non-blocking timeouts to the correct error code 556 if (timeout == 0) 557 result = B_WOULD_BLOCK; 558 break; 559 } 560 561 RETURN_ERROR(result); 562 } 563 564 565 ssize_t 566 UnixStreamEndpoint::Sendable() 567 { 568 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Sendable()\n", find_thread(NULL), 569 this); 570 571 UnixStreamEndpointLocker locker(this); 572 UnixStreamEndpointLocker peerLocker; 573 574 status_t error = _LockConnectedEndpoints(locker, peerLocker); 575 if (error != B_OK) 576 RETURN_ERROR(error); 577 578 // lock the peer's FIFO 579 UnixFifo* peerFifo = fPeerEndpoint->fReceiveFifo; 580 UnixFifoLocker fifoLocker(peerFifo); 581 582 RETURN_ERROR(peerFifo->Writable()); 583 } 584 585 586 ssize_t 587 UnixStreamEndpoint::Receivable() 588 { 589 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receivable()\n", find_thread(NULL), 590 this); 591 592 UnixStreamEndpointLocker locker(this); 593 594 if (fState == unix_stream_endpoint_state::Listening) 595 return gSocketModule->count_connected(socket); 596 597 if (fState != unix_stream_endpoint_state::Connected) 598 RETURN_ERROR(ENOTCONN); 599 600 UnixFifoLocker fifoLocker(fReceiveFifo); 601 ssize_t readable = fReceiveFifo->Readable(); 602 if (readable == 0 && (fReceiveFifo->IsWriteShutdown() 603 || fReceiveFifo->IsReadShutdown())) { 604 RETURN_ERROR(ENOTCONN); 605 } 606 RETURN_ERROR(readable); 607 } 608 609 610 status_t 611 UnixStreamEndpoint::SetReceiveBufferSize(size_t size) 612 { 613 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::SetReceiveBufferSize(%lu)\n", 614 find_thread(NULL), this, size); 615 616 UnixStreamEndpointLocker locker(this); 617 618 if (fReceiveFifo == NULL) 619 return B_BAD_VALUE; 620 621 UnixFifoLocker fifoLocker(fReceiveFifo); 622 return fReceiveFifo->SetBufferCapacity(size); 623 } 624 625 626 status_t 627 UnixStreamEndpoint::GetPeerCredentials(ucred* credentials) 628 { 629 UnixStreamEndpointLocker locker(this); 630 UnixStreamEndpointLocker peerLocker; 631 632 status_t error = _LockConnectedEndpoints(locker, peerLocker); 633 if (error != B_OK) 634 RETURN_ERROR(error); 635 636 *credentials = fPeerEndpoint->fCredentials; 637 638 return B_OK; 639 } 640 641 642 status_t 643 UnixStreamEndpoint::Shutdown(int direction) 644 { 645 TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Shutdown(%d)\n", 646 find_thread(NULL), this, direction); 647 648 uint32 shutdown; 649 uint32 peerShutdown; 650 651 // translate the direction into shutdown flags for our and the peer fifo 652 switch (direction) { 653 case SHUT_RD: 654 shutdown = UNIX_FIFO_SHUTDOWN_READ; 655 peerShutdown = 0; 656 break; 657 case SHUT_WR: 658 shutdown = 0; 659 peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE; 660 break; 661 case SHUT_RDWR: 662 shutdown = UNIX_FIFO_SHUTDOWN_READ; 663 peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE; 664 break; 665 default: 666 RETURN_ERROR(B_BAD_VALUE); 667 } 668 669 // lock endpoints 670 UnixStreamEndpointLocker locker(this); 671 UnixStreamEndpointLocker peerLocker; 672 673 status_t error = _LockConnectedEndpoints(locker, peerLocker); 674 if (error != B_OK) 675 RETURN_ERROR(error); 676 677 // shutdown our FIFO 678 fReceiveFifo->Lock(); 679 fReceiveFifo->Shutdown(shutdown); 680 fReceiveFifo->Unlock(); 681 682 // shutdown peer FIFO 683 fPeerEndpoint->fReceiveFifo->Lock(); 684 fPeerEndpoint->fReceiveFifo->Shutdown(peerShutdown); 685 fPeerEndpoint->fReceiveFifo->Unlock(); 686 687 // send select notifications 688 if (direction == SHUT_RD || direction == SHUT_RDWR) { 689 gSocketModule->notify(socket, B_SELECT_READ, EPIPE); 690 gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_WRITE, EPIPE); 691 } 692 if (direction == SHUT_WR || direction == SHUT_RDWR) { 693 gSocketModule->notify(socket, B_SELECT_WRITE, EPIPE); 694 gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_READ, EPIPE); 695 } 696 697 RETURN_ERROR(B_OK); 698 } 699 700 701 void 702 UnixStreamEndpoint::_Spawn(UnixStreamEndpoint* connectingEndpoint, 703 UnixStreamEndpoint* listeningEndpoint, UnixFifo* fifo) 704 { 705 ProtocolSocket::Open(); 706 707 fIsChild = true; 708 fPeerEndpoint = connectingEndpoint; 709 fPeerEndpoint->AcquireReference(); 710 711 fReceiveFifo = fifo; 712 713 PeerAddress().SetTo(&connectingEndpoint->socket->address); 714 715 fCredentials = listeningEndpoint->fCredentials; 716 717 fState = unix_stream_endpoint_state::Connected; 718 719 gSocketModule->set_connected(Socket()); 720 } 721 722 723 void 724 UnixStreamEndpoint::_Disconnect() 725 { 726 // Both endpoints must be locked. 727 728 // Write shutdown the receive FIFO. 729 fReceiveFifo->Lock(); 730 fReceiveFifo->Shutdown(UNIX_FIFO_SHUTDOWN_WRITE); 731 fReceiveFifo->Unlock(); 732 733 // select() notification. 734 gSocketModule->notify(socket, B_SELECT_READ, ECONNRESET); 735 gSocketModule->notify(socket, B_SELECT_WRITE, ECONNRESET); 736 737 // Unset the peer endpoint. 738 fPeerEndpoint->ReleaseReference(); 739 fPeerEndpoint = NULL; 740 741 // We're officially disconnected. 742 // TODO: Deal with non accept()ed connections correctly! 743 fIsChild = false; 744 fState = unix_stream_endpoint_state::NotConnected; 745 } 746 747 748 status_t 749 UnixStreamEndpoint::_LockConnectedEndpoints(UnixStreamEndpointLocker& locker, 750 UnixStreamEndpointLocker& peerLocker) 751 { 752 if (fState != unix_stream_endpoint_state::Connected) 753 RETURN_ERROR(fWasConnected ? EPIPE : ENOTCONN); 754 755 // We need to lock the peer, too. Get a reference -- we might need to 756 // unlock ourselves to get the locking order right. 757 BReference<UnixStreamEndpoint> peerReference(fPeerEndpoint); 758 UnixStreamEndpoint* peerEndpoint = fPeerEndpoint; 759 760 if (fIsChild) { 761 // We're the child, but locking order is the other way around. 762 locker.Unlock(); 763 peerLocker.SetTo(peerEndpoint, false); 764 765 locker.Lock(); 766 767 // recheck our state, also whether the peer is still the same 768 if (fState != unix_stream_endpoint_state::Connected || peerEndpoint != fPeerEndpoint) 769 RETURN_ERROR(ENOTCONN); 770 } else 771 peerLocker.SetTo(peerEndpoint, false); 772 773 RETURN_ERROR(B_OK); 774 } 775 776 777 status_t 778 UnixStreamEndpoint::_Unbind() 779 { 780 if (fState == unix_stream_endpoint_state::Connected 781 || fState == unix_stream_endpoint_state::Listening) 782 RETURN_ERROR(B_BAD_VALUE); 783 784 if (IsBound()) 785 RETURN_ERROR(UnixEndpoint::_Unbind()); 786 787 RETURN_ERROR(B_OK); 788 } 789 790 791 void 792 UnixStreamEndpoint::_UnsetReceiveFifo() 793 { 794 if (fReceiveFifo) { 795 fReceiveFifo->ReleaseReference(); 796 fReceiveFifo = NULL; 797 } 798 } 799 800 801 void 802 UnixStreamEndpoint::_StopListening() 803 { 804 if (fState == unix_stream_endpoint_state::Listening) { 805 delete_sem(fAcceptSemaphore); 806 fAcceptSemaphore = -1; 807 fState = unix_stream_endpoint_state::NotConnected; 808 } 809 } 810