1 /* 2 * Copyright 2012-2013 Haiku, Inc. All rights reserved. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Paweł Dziepak, pdziepak@quarnos.org 7 */ 8 9 10 #include "Connection.h" 11 12 #include <arpa/inet.h> 13 #include <errno.h> 14 #include <stdlib.h> 15 #include <string.h> 16 #include <unistd.h> 17 18 #include <AutoDeleter.h> 19 #include <net/dns_resolver.h> 20 #include <util/kernel_cpp.h> 21 #include <util/Random.h> 22 23 24 #define NFS4_PORT 2049 25 26 #define LAST_FRAGMENT 0x80000000 27 #define MAX_PACKET_SIZE 65535 28 29 #define NFS_MIN_PORT 665 30 31 32 bool 33 PeerAddress::operator==(const PeerAddress& address) 34 { 35 return memcmp(&fAddress, &address.fAddress, sizeof(fAddress)) == 0 36 && fProtocol == address.fProtocol; 37 } 38 39 40 bool 41 PeerAddress::operator<(const PeerAddress& address) 42 { 43 int compare = memcmp(&fAddress, &address.fAddress, sizeof(fAddress)); 44 return compare < 0 || (compare == 0 && fProtocol < address.fProtocol); 45 } 46 47 48 PeerAddress& 49 PeerAddress::operator=(const PeerAddress& address) 50 { 51 fAddress = address.fAddress; 52 fProtocol = address.fProtocol; 53 return *this; 54 } 55 56 57 PeerAddress::PeerAddress() 58 : 59 fProtocol(0) 60 { 61 memset(&fAddress, 0, sizeof(fAddress)); 62 } 63 64 65 PeerAddress::PeerAddress(int networkFamily) 66 : 67 fProtocol(0) 68 { 69 ASSERT(networkFamily == AF_INET || networkFamily == AF_INET6); 70 71 memset(&fAddress, 0, sizeof(fAddress)); 72 73 fAddress.ss_family = networkFamily; 74 switch (networkFamily) { 75 case AF_INET: 76 fAddress.ss_len = sizeof(sockaddr_in); 77 break; 78 case AF_INET6: 79 fAddress.ss_len = sizeof(sockaddr_in6); 80 break; 81 } 82 } 83 84 85 const char* 86 PeerAddress::ProtocolString() const 87 { 88 static const char* tcpName = "tcp"; 89 static const char* udpName = "udp"; 90 static const char* unknown = ""; 91 92 switch (fProtocol) { 93 case IPPROTO_TCP: 94 return tcpName; 95 case IPPROTO_UDP: 96 return udpName; 97 default: 98 return unknown; 99 } 100 } 101 102 103 void 104 PeerAddress::SetProtocol(const char* protocol) 105 { 106 ASSERT(protocol != NULL); 107 108 if (strcmp(protocol, "tcp") == 0) 109 fProtocol = IPPROTO_TCP; 110 else if (strcmp(protocol, "udp") == 0) 111 fProtocol = IPPROTO_UDP; 112 } 113 114 115 char* 116 PeerAddress::UniversalAddress() const 117 { 118 char* uAddr = reinterpret_cast<char*>(malloc(INET6_ADDRSTRLEN + 16)); 119 if (uAddr == NULL) 120 return NULL; 121 122 if (inet_ntop(fAddress.ss_family, InAddr(), uAddr, AddressSize()) == NULL) 123 return NULL; 124 125 char port[16]; 126 sprintf(port, ".%d.%d", Port() >> 8, Port() & 0xff); 127 strcat(uAddr, port); 128 129 return uAddr; 130 } 131 132 133 socklen_t 134 PeerAddress::AddressSize() const 135 { 136 switch (Family()) { 137 case AF_INET: 138 return sizeof(sockaddr_in); 139 case AF_INET6: 140 return sizeof(sockaddr_in6); 141 default: 142 return 0; 143 } 144 } 145 146 147 uint16 148 PeerAddress::Port() const 149 { 150 uint16 port; 151 152 switch (Family()) { 153 case AF_INET: 154 port = reinterpret_cast<const sockaddr_in*>(&fAddress)->sin_port; 155 break; 156 case AF_INET6: 157 port = reinterpret_cast<const sockaddr_in6*>(&fAddress)->sin6_port; 158 break; 159 default: 160 port = 0; 161 } 162 163 return ntohs(port); 164 } 165 166 167 void 168 PeerAddress::SetPort(uint16 port) 169 { 170 port = htons(port); 171 172 switch (Family()) { 173 case AF_INET: 174 reinterpret_cast<sockaddr_in*>(&fAddress)->sin_port = port; 175 break; 176 case AF_INET6: 177 reinterpret_cast<sockaddr_in6*>(&fAddress)->sin6_port = port; 178 break; 179 } 180 } 181 182 const void* 183 PeerAddress::InAddr() const 184 { 185 switch (Family()) { 186 case AF_INET: 187 return &reinterpret_cast<const sockaddr_in*>(&fAddress)->sin_addr; 188 case AF_INET6: 189 return &reinterpret_cast<const sockaddr_in6*>(&fAddress)->sin6_addr; 190 default: 191 return NULL; 192 } 193 } 194 195 196 size_t 197 PeerAddress::InAddrSize() const 198 { 199 switch (Family()) { 200 case AF_INET: 201 return sizeof(in_addr); 202 case AF_INET6: 203 return sizeof(in6_addr); 204 default: 205 return 0; 206 } 207 } 208 209 210 AddressResolver::AddressResolver(const char* name) 211 : 212 fHead(NULL), 213 fCurrent(NULL), 214 fForcedPort(htons(NFS4_PORT)), 215 fForcedProtocol(IPPROTO_TCP) 216 { 217 fStatus = ResolveAddress(name); 218 } 219 220 221 AddressResolver::~AddressResolver() 222 { 223 freeaddrinfo(fHead); 224 } 225 226 227 status_t 228 AddressResolver::ResolveAddress(const char* name) 229 { 230 ASSERT(name != NULL); 231 232 if (fHead != NULL) { 233 freeaddrinfo(fHead); 234 fHead = NULL; 235 fCurrent = NULL; 236 } 237 238 // getaddrinfo() is very expensive when called from kernel, so we do not 239 // want to call it unless there is no other choice. 240 struct sockaddr_in addr; 241 memset(&addr, 0, sizeof(addr)); 242 if (inet_aton(name, &addr.sin_addr) == 1) { 243 addr.sin_len = sizeof(addr); 244 addr.sin_family = AF_INET; 245 addr.sin_port = htons(NFS4_PORT); 246 247 memcpy(&fAddress.fAddress, &addr, sizeof(addr)); 248 fAddress.fProtocol = IPPROTO_TCP; 249 return B_OK; 250 } 251 252 status_t result = getaddrinfo(name, NULL, NULL, &fHead); 253 fCurrent = fHead; 254 255 return result; 256 } 257 258 259 void 260 AddressResolver::ForceProtocol(const char* protocol) 261 { 262 ASSERT(protocol != NULL); 263 264 if (strcmp(protocol, "tcp") == 0) 265 fForcedProtocol = IPPROTO_TCP; 266 else if (strcmp(protocol, "udp") == 0) 267 fForcedProtocol = IPPROTO_UDP; 268 269 fAddress.SetProtocol(protocol); 270 } 271 272 273 void 274 AddressResolver::ForcePort(uint16 port) 275 { 276 fForcedPort = htons(port); 277 fAddress.SetPort(port); 278 } 279 280 281 status_t 282 AddressResolver::GetNextAddress(PeerAddress* address) 283 { 284 ASSERT(address != NULL); 285 286 if (fStatus != B_OK) 287 return fStatus; 288 289 if (fHead == NULL) { 290 *address = fAddress; 291 fStatus = B_NAME_NOT_FOUND; 292 return B_OK; 293 } 294 295 address->fProtocol = fForcedProtocol; 296 297 while (fCurrent != NULL) { 298 if (fCurrent->ai_family == AF_INET) { 299 memcpy(&address->fAddress, fCurrent->ai_addr, sizeof(sockaddr_in)); 300 reinterpret_cast<sockaddr_in*>(&address->fAddress)->sin_port 301 = fForcedPort; 302 } else if (fCurrent->ai_family == AF_INET6) { 303 memcpy(&address->fAddress, fCurrent->ai_addr, sizeof(sockaddr_in6)); 304 reinterpret_cast<sockaddr_in6*>(&address->fAddress)->sin6_port 305 = fForcedPort; 306 } else { 307 fCurrent = fCurrent->ai_next; 308 continue; 309 } 310 311 fCurrent = fCurrent->ai_next; 312 return B_OK; 313 } 314 315 return B_NAME_NOT_FOUND; 316 } 317 318 319 Connection::Connection(const PeerAddress& address) 320 : 321 ConnectionBase(address) 322 { 323 } 324 325 326 ConnectionListener::ConnectionListener(const PeerAddress& address) 327 : 328 ConnectionBase(address) 329 { 330 } 331 332 333 ConnectionBase::ConnectionBase(const PeerAddress& address) 334 : 335 fWaitCancel(create_sem(0, NULL)), 336 fSocket(-1), 337 fPeerAddress(address) 338 { 339 mutex_init(&fSocketLock, NULL); 340 } 341 342 343 ConnectionStream::ConnectionStream(const PeerAddress& address) 344 : 345 Connection(address) 346 { 347 } 348 349 350 ConnectionPacket::ConnectionPacket(const PeerAddress& address) 351 : 352 Connection(address) 353 { 354 } 355 356 357 ConnectionBase::~ConnectionBase() 358 { 359 if (fSocket != -1) 360 close(fSocket); 361 mutex_destroy(&fSocketLock); 362 delete_sem(fWaitCancel); 363 } 364 365 366 status_t 367 ConnectionBase::GetLocalAddress(PeerAddress* address) 368 { 369 ASSERT(address != NULL); 370 371 address->fProtocol = fPeerAddress.fProtocol; 372 373 socklen_t addressSize = sizeof(address->fAddress); 374 return getsockname(fSocket, (struct sockaddr*)&address->fAddress, 375 &addressSize); 376 } 377 378 379 status_t 380 ConnectionStream::Send(const void* buffer, uint32 size) 381 { 382 ASSERT(buffer != NULL); 383 384 status_t result; 385 386 uint32* buf = reinterpret_cast<uint32*>(malloc(size + sizeof(uint32))); 387 if (buf == NULL) 388 return B_NO_MEMORY; 389 MemoryDeleter _(buf); 390 391 buf[0] = htonl(size | LAST_FRAGMENT); 392 memcpy(buf + 1, buffer, size); 393 394 // More than one threads may send data and ksend is allowed to send partial 395 // data. Need a lock here. 396 uint32 sent = 0; 397 mutex_lock(&fSocketLock); 398 do { 399 result = send(fSocket, buf + sent, size + sizeof(uint32) - sent, 0); 400 sent += result; 401 } while (result > 0 && sent < size + sizeof(uint32)); 402 mutex_unlock(&fSocketLock); 403 if (result < 0) { 404 result = errno; 405 return result; 406 } else if (result == 0) 407 return B_IO_ERROR; 408 409 return B_OK; 410 } 411 412 413 status_t 414 ConnectionPacket::Send(const void* buffer, uint32 size) 415 { 416 ASSERT(buffer != NULL); 417 ASSERT(size < 65535); 418 419 // send on DGRAM sockets is atomic. No need to lock. 420 status_t result = send(fSocket, buffer, size, 0); 421 if (result < 0) 422 return errno; 423 return B_OK; 424 } 425 426 427 status_t 428 ConnectionStream::Receive(void** _buffer, uint32* _size) 429 { 430 ASSERT(_buffer != NULL); 431 ASSERT(_size != NULL); 432 433 status_t result; 434 435 uint32 size = 0; 436 void* buffer = NULL; 437 438 uint32 record_size; 439 bool last_one = false; 440 441 object_wait_info object[2]; 442 object[0].object = fWaitCancel; 443 object[0].type = B_OBJECT_TYPE_SEMAPHORE; 444 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 445 446 object[1].object = fSocket; 447 object[1].type = B_OBJECT_TYPE_FD; 448 object[1].events = B_EVENT_READ; 449 450 do { 451 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 452 object[1].events = B_EVENT_READ; 453 454 result = wait_for_objects(object, 2); 455 if (result < B_OK 456 || (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) { 457 free(buffer); 458 return ECONNABORTED; 459 } else if ((object[1].events & B_EVENT_READ) == 0) 460 continue; 461 462 // There is only one listener thread per connection. No need to lock. 463 uint32 received = 0; 464 do { 465 result = recv(fSocket, ((uint8*)&record_size) + received, 466 sizeof(record_size) - received, 0); 467 received += result; 468 } while (result > 0 && received < sizeof(record_size)); 469 if (result < 0) { 470 result = errno; 471 free(buffer); 472 return result; 473 } else if (result == 0) { 474 free(buffer); 475 return ECONNABORTED; 476 } 477 478 record_size = ntohl(record_size); 479 ASSERT(record_size > 0); 480 481 last_one = (record_size & LAST_FRAGMENT) != 0; 482 record_size &= LAST_FRAGMENT - 1; 483 484 void* ptr = realloc(buffer, size + record_size); 485 if (ptr == NULL) { 486 free(buffer); 487 return B_NO_MEMORY; 488 } else 489 buffer = ptr; 490 MemoryDeleter bufferDeleter(buffer); 491 492 received = 0; 493 do { 494 result = recv(fSocket, (uint8*)buffer + size + received, 495 record_size - received, 0); 496 received += result; 497 } while (result > 0 && received < record_size); 498 if (result < 0) 499 return errno; 500 else if (result == 0) 501 return ECONNABORTED; 502 503 bufferDeleter.Detach(); 504 size += record_size; 505 } while (!last_one); 506 507 508 *_buffer = buffer; 509 *_size = size; 510 511 return B_OK; 512 } 513 514 515 status_t 516 ConnectionPacket::Receive(void** _buffer, uint32* _size) 517 { 518 ASSERT(_buffer != NULL); 519 ASSERT(_size != NULL); 520 521 status_t result; 522 int32 size = MAX_PACKET_SIZE; 523 void* buffer = malloc(size); 524 525 if (buffer == NULL) 526 return B_NO_MEMORY; 527 528 object_wait_info object[2]; 529 object[0].object = fWaitCancel; 530 object[0].type = B_OBJECT_TYPE_SEMAPHORE; 531 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 532 533 object[1].object = fSocket; 534 object[1].type = B_OBJECT_TYPE_FD; 535 object[1].events = B_EVENT_READ; 536 537 do { 538 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 539 object[1].events = B_EVENT_READ; 540 541 result = wait_for_objects(object, 2); 542 if (result < B_OK 543 || (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) { 544 free(buffer); 545 return ECONNABORTED; 546 } else if ((object[1].events & B_EVENT_READ) == 0) 547 continue; 548 break; 549 } while (true); 550 551 // There is only one listener thread per connection. No need to lock. 552 size = recv(fSocket, buffer, size, 0); 553 if (size < 0) { 554 result = errno; 555 free(buffer); 556 return result; 557 } else if (size == 0) { 558 free(buffer); 559 return ECONNABORTED; 560 } 561 562 *_buffer = buffer; 563 *_size = size; 564 565 return B_OK; 566 } 567 568 569 Connection* 570 Connection::CreateObject(const PeerAddress& address) 571 { 572 switch (address.fProtocol) { 573 case IPPROTO_TCP: 574 return new(std::nothrow) ConnectionStream(address); 575 case IPPROTO_UDP: 576 return new(std::nothrow) ConnectionPacket(address); 577 default: 578 return NULL; 579 } 580 } 581 582 583 status_t 584 Connection::Connect(Connection **_connection, const PeerAddress& address) 585 { 586 ASSERT(_connection != NULL); 587 588 Connection* conn = CreateObject(address); 589 if (conn == NULL) 590 return B_NO_MEMORY; 591 592 status_t result; 593 if (conn->fWaitCancel < B_OK) { 594 result = conn->fWaitCancel; 595 delete conn; 596 return result; 597 } 598 599 result = conn->Connect(); 600 if (result != B_OK) { 601 delete conn; 602 return result; 603 } 604 605 *_connection = conn; 606 607 return B_OK; 608 } 609 610 611 status_t 612 Connection::SetTo(Connection **_connection, int socket, 613 const PeerAddress& address) 614 { 615 ASSERT(_connection != NULL); 616 ASSERT(socket != -1); 617 618 Connection* conn = CreateObject(address); 619 if (conn == NULL) 620 return B_NO_MEMORY; 621 622 status_t result; 623 if (conn->fWaitCancel < B_OK) { 624 result = conn->fWaitCancel; 625 delete conn; 626 return result; 627 } 628 629 conn->fSocket = socket; 630 631 *_connection = conn; 632 633 return B_OK; 634 } 635 636 637 status_t 638 Connection::Connect() 639 { 640 switch (fPeerAddress.fProtocol) { 641 case IPPROTO_TCP: 642 fSocket = socket(fPeerAddress.Family(), SOCK_STREAM, IPPROTO_TCP); 643 break; 644 case IPPROTO_UDP: 645 fSocket = socket(fPeerAddress.Family(), SOCK_DGRAM, IPPROTO_UDP); 646 break; 647 default: 648 return B_BAD_VALUE; 649 } 650 if (fSocket < 0) 651 return errno; 652 653 status_t result; 654 uint16 port, attempt = 0; 655 656 PeerAddress address(fPeerAddress.Family()); 657 658 do { 659 port = get_random<uint16>() % (IPPORT_RESERVED - NFS_MIN_PORT); 660 port += NFS_MIN_PORT; 661 662 if (attempt == 9) 663 port = 0; 664 attempt++; 665 666 address.SetPort(port); 667 result = bind(fSocket, (sockaddr*)&address.fAddress, 668 address.AddressSize()); 669 } while (attempt <= 10 && result != B_OK); 670 671 if (attempt > 10) { 672 close(fSocket); 673 return result; 674 } 675 676 result = connect(fSocket, (sockaddr*)&fPeerAddress.fAddress, 677 fPeerAddress.AddressSize()); 678 if (result != 0) { 679 result = errno; 680 close(fSocket); 681 return result; 682 } 683 684 return B_OK; 685 } 686 687 688 status_t 689 Connection::Reconnect() 690 { 691 release_sem(fWaitCancel); 692 close(fSocket); 693 acquire_sem(fWaitCancel); 694 return Connect(); 695 } 696 697 698 void 699 ConnectionBase::Disconnect() 700 { 701 release_sem(fWaitCancel); 702 703 close(fSocket); 704 fSocket = -1; 705 } 706 707 708 status_t 709 ConnectionListener::Listen(ConnectionListener** listener, int networkFamily, 710 uint16 port) 711 { 712 ASSERT(listener != NULL); 713 ASSERT(networkFamily == AF_INET || networkFamily == AF_INET6); 714 715 int sock = socket(networkFamily, SOCK_STREAM, IPPROTO_TCP); 716 if (sock < 0) 717 return errno; 718 719 PeerAddress address(networkFamily); 720 address.SetPort(port); 721 address.fProtocol = IPPROTO_TCP; 722 723 status_t result = bind(sock, (sockaddr*)&address.fAddress, 724 address.AddressSize()); 725 if (result != B_OK) { 726 close(sock); 727 return errno; 728 } 729 730 if (listen(sock, 5) != B_OK) { 731 close(sock); 732 return errno; 733 } 734 735 *listener = new(std::nothrow) ConnectionListener(address); 736 if (*listener == NULL) { 737 close(sock); 738 return B_NO_MEMORY; 739 } 740 741 if ((*listener)->fWaitCancel < B_OK) { 742 result = (*listener)->fWaitCancel; 743 close(sock); 744 delete *listener; 745 return result; 746 } 747 748 (*listener)->fSocket = sock; 749 750 return B_OK; 751 } 752 753 754 status_t 755 ConnectionListener::AcceptConnection(Connection** connection) 756 { 757 ASSERT(connection != NULL); 758 759 object_wait_info object[2]; 760 object[0].object = fWaitCancel; 761 object[0].type = B_OBJECT_TYPE_SEMAPHORE; 762 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 763 764 object[1].object = fSocket; 765 object[1].type = B_OBJECT_TYPE_FD; 766 object[1].events = B_EVENT_READ; 767 768 do { 769 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 770 object[1].events = B_EVENT_READ; 771 772 status_t result = wait_for_objects(object, 2); 773 if (result < B_OK 774 || (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) { 775 return ECONNABORTED; 776 } else if ((object[1].events & B_EVENT_READ) == 0) 777 continue; 778 break; 779 } while (true); 780 781 sockaddr_storage addr; 782 socklen_t length = sizeof(addr); 783 int sock = accept(fSocket, reinterpret_cast<sockaddr*>(&addr), &length); 784 if (sock < 0) 785 return errno; 786 787 PeerAddress address; 788 address.fProtocol = IPPROTO_TCP; 789 address.fAddress = addr; 790 791 status_t result = Connection::SetTo(connection, sock, address); 792 if (result != B_OK) { 793 close(sock); 794 return result; 795 } 796 797 return B_OK; 798 } 799 800