1 /* 2 * Copyright 2012-2013 Haiku, Inc. All rights reserved. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Paweł Dziepak, pdziepak@quarnos.org 7 */ 8 9 10 #include "Connection.h" 11 12 #include <arpa/inet.h> 13 #include <errno.h> 14 #include <stdio.h> 15 #include <stdlib.h> 16 #include <string.h> 17 #include <unistd.h> 18 19 #include <AutoDeleter.h> 20 #include <net/dns_resolver.h> 21 #include <util/kernel_cpp.h> 22 #include <util/Random.h> 23 24 25 #define NFS4_PORT 2049 26 27 #define LAST_FRAGMENT 0x80000000 28 #define MAX_PACKET_SIZE 65535 29 30 #define NFS_MIN_PORT 665 31 32 33 bool 34 PeerAddress::operator==(const PeerAddress& address) 35 { 36 return memcmp(&fAddress, &address.fAddress, sizeof(fAddress)) == 0 37 && fProtocol == address.fProtocol; 38 } 39 40 41 bool 42 PeerAddress::operator<(const PeerAddress& address) 43 { 44 int compare = memcmp(&fAddress, &address.fAddress, sizeof(fAddress)); 45 return compare < 0 || (compare == 0 && fProtocol < address.fProtocol); 46 } 47 48 49 PeerAddress& 50 PeerAddress::operator=(const PeerAddress& address) 51 { 52 fAddress = address.fAddress; 53 fProtocol = address.fProtocol; 54 return *this; 55 } 56 57 58 PeerAddress::PeerAddress() 59 : 60 fProtocol(0) 61 { 62 memset(&fAddress, 0, sizeof(fAddress)); 63 } 64 65 66 PeerAddress::PeerAddress(int networkFamily) 67 : 68 fProtocol(0) 69 { 70 ASSERT(networkFamily == AF_INET || networkFamily == AF_INET6); 71 72 memset(&fAddress, 0, sizeof(fAddress)); 73 74 fAddress.ss_family = networkFamily; 75 switch (networkFamily) { 76 case AF_INET: 77 fAddress.ss_len = sizeof(sockaddr_in); 78 break; 79 case AF_INET6: 80 fAddress.ss_len = sizeof(sockaddr_in6); 81 break; 82 } 83 } 84 85 86 const char* 87 PeerAddress::ProtocolString() const 88 { 89 static const char* tcpName = "tcp"; 90 static const char* udpName = "udp"; 91 static const char* unknown = ""; 92 93 switch (fProtocol) { 94 case IPPROTO_TCP: 95 return tcpName; 96 case IPPROTO_UDP: 97 return udpName; 98 default: 99 return unknown; 100 } 101 } 102 103 104 void 105 PeerAddress::SetProtocol(const char* protocol) 106 { 107 ASSERT(protocol != NULL); 108 109 if (strcmp(protocol, "tcp") == 0) 110 fProtocol = IPPROTO_TCP; 111 else if (strcmp(protocol, "udp") == 0) 112 fProtocol = IPPROTO_UDP; 113 } 114 115 116 char* 117 PeerAddress::UniversalAddress() const 118 { 119 char* uAddr = reinterpret_cast<char*>(malloc(INET6_ADDRSTRLEN + 16)); 120 if (uAddr == NULL) 121 return NULL; 122 123 if (inet_ntop(fAddress.ss_family, InAddr(), uAddr, AddressSize()) == NULL) 124 return NULL; 125 126 char port[16]; 127 sprintf(port, ".%d.%d", Port() >> 8, Port() & 0xff); 128 strcat(uAddr, port); 129 130 return uAddr; 131 } 132 133 134 socklen_t 135 PeerAddress::AddressSize() const 136 { 137 switch (Family()) { 138 case AF_INET: 139 return sizeof(sockaddr_in); 140 case AF_INET6: 141 return sizeof(sockaddr_in6); 142 default: 143 return 0; 144 } 145 } 146 147 148 uint16 149 PeerAddress::Port() const 150 { 151 uint16 port; 152 153 switch (Family()) { 154 case AF_INET: 155 port = reinterpret_cast<const sockaddr_in*>(&fAddress)->sin_port; 156 break; 157 case AF_INET6: 158 port = reinterpret_cast<const sockaddr_in6*>(&fAddress)->sin6_port; 159 break; 160 default: 161 port = 0; 162 } 163 164 return ntohs(port); 165 } 166 167 168 void 169 PeerAddress::SetPort(uint16 port) 170 { 171 port = htons(port); 172 173 switch (Family()) { 174 case AF_INET: 175 reinterpret_cast<sockaddr_in*>(&fAddress)->sin_port = port; 176 break; 177 case AF_INET6: 178 reinterpret_cast<sockaddr_in6*>(&fAddress)->sin6_port = port; 179 break; 180 } 181 } 182 183 const void* 184 PeerAddress::InAddr() const 185 { 186 switch (Family()) { 187 case AF_INET: 188 return &reinterpret_cast<const sockaddr_in*>(&fAddress)->sin_addr; 189 case AF_INET6: 190 return &reinterpret_cast<const sockaddr_in6*>(&fAddress)->sin6_addr; 191 default: 192 return NULL; 193 } 194 } 195 196 197 size_t 198 PeerAddress::InAddrSize() const 199 { 200 switch (Family()) { 201 case AF_INET: 202 return sizeof(in_addr); 203 case AF_INET6: 204 return sizeof(in6_addr); 205 default: 206 return 0; 207 } 208 } 209 210 211 AddressResolver::AddressResolver(const char* name) 212 : 213 fHead(NULL), 214 fCurrent(NULL), 215 fForcedPort(htons(NFS4_PORT)), 216 fForcedProtocol(IPPROTO_TCP) 217 { 218 fStatus = ResolveAddress(name); 219 } 220 221 222 AddressResolver::~AddressResolver() 223 { 224 freeaddrinfo(fHead); 225 } 226 227 228 status_t 229 AddressResolver::ResolveAddress(const char* name) 230 { 231 ASSERT(name != NULL); 232 233 if (fHead != NULL) { 234 freeaddrinfo(fHead); 235 fHead = NULL; 236 fCurrent = NULL; 237 } 238 239 // getaddrinfo() is very expensive when called from kernel, so we do not 240 // want to call it unless there is no other choice. 241 struct sockaddr_in addr; 242 memset(&addr, 0, sizeof(addr)); 243 if (inet_aton(name, &addr.sin_addr) == 1) { 244 addr.sin_len = sizeof(addr); 245 addr.sin_family = AF_INET; 246 addr.sin_port = htons(NFS4_PORT); 247 248 memcpy(&fAddress.fAddress, &addr, sizeof(addr)); 249 fAddress.fProtocol = IPPROTO_TCP; 250 return B_OK; 251 } 252 253 status_t result = getaddrinfo(name, NULL, NULL, &fHead); 254 fCurrent = fHead; 255 256 return result; 257 } 258 259 260 void 261 AddressResolver::ForceProtocol(const char* protocol) 262 { 263 ASSERT(protocol != NULL); 264 265 if (strcmp(protocol, "tcp") == 0) 266 fForcedProtocol = IPPROTO_TCP; 267 else if (strcmp(protocol, "udp") == 0) 268 fForcedProtocol = IPPROTO_UDP; 269 270 fAddress.SetProtocol(protocol); 271 } 272 273 274 void 275 AddressResolver::ForcePort(uint16 port) 276 { 277 fForcedPort = htons(port); 278 fAddress.SetPort(port); 279 } 280 281 282 status_t 283 AddressResolver::GetNextAddress(PeerAddress* address) 284 { 285 ASSERT(address != NULL); 286 287 if (fStatus != B_OK) 288 return fStatus; 289 290 if (fHead == NULL) { 291 *address = fAddress; 292 fStatus = B_NAME_NOT_FOUND; 293 return B_OK; 294 } 295 296 address->fProtocol = fForcedProtocol; 297 298 while (fCurrent != NULL) { 299 if (fCurrent->ai_family == AF_INET) { 300 memcpy(&address->fAddress, fCurrent->ai_addr, sizeof(sockaddr_in)); 301 reinterpret_cast<sockaddr_in*>(&address->fAddress)->sin_port 302 = fForcedPort; 303 } else if (fCurrent->ai_family == AF_INET6) { 304 memcpy(&address->fAddress, fCurrent->ai_addr, sizeof(sockaddr_in6)); 305 reinterpret_cast<sockaddr_in6*>(&address->fAddress)->sin6_port 306 = fForcedPort; 307 } else { 308 fCurrent = fCurrent->ai_next; 309 continue; 310 } 311 312 fCurrent = fCurrent->ai_next; 313 return B_OK; 314 } 315 316 return B_NAME_NOT_FOUND; 317 } 318 319 320 Connection::Connection(const PeerAddress& address) 321 : 322 ConnectionBase(address) 323 { 324 } 325 326 327 ConnectionListener::ConnectionListener(const PeerAddress& address) 328 : 329 ConnectionBase(address) 330 { 331 } 332 333 334 ConnectionBase::ConnectionBase(const PeerAddress& address) 335 : 336 fWaitCancel(create_sem(0, NULL)), 337 fSocket(-1), 338 fPeerAddress(address) 339 { 340 mutex_init(&fSocketLock, NULL); 341 } 342 343 344 ConnectionStream::ConnectionStream(const PeerAddress& address) 345 : 346 Connection(address) 347 { 348 } 349 350 351 ConnectionPacket::ConnectionPacket(const PeerAddress& address) 352 : 353 Connection(address) 354 { 355 } 356 357 358 ConnectionBase::~ConnectionBase() 359 { 360 if (fSocket != -1) 361 close(fSocket); 362 mutex_destroy(&fSocketLock); 363 delete_sem(fWaitCancel); 364 } 365 366 367 status_t 368 ConnectionBase::GetLocalAddress(PeerAddress* address) 369 { 370 ASSERT(address != NULL); 371 372 address->fProtocol = fPeerAddress.fProtocol; 373 374 socklen_t addressSize = sizeof(address->fAddress); 375 return getsockname(fSocket, (struct sockaddr*)&address->fAddress, 376 &addressSize); 377 } 378 379 380 status_t 381 ConnectionStream::Send(const void* buffer, uint32 size) 382 { 383 ASSERT(buffer != NULL); 384 385 status_t result; 386 387 uint32* buf = reinterpret_cast<uint32*>(malloc(size + sizeof(uint32))); 388 if (buf == NULL) 389 return B_NO_MEMORY; 390 MemoryDeleter _(buf); 391 392 buf[0] = htonl(size | LAST_FRAGMENT); 393 memcpy(buf + 1, buffer, size); 394 395 // More than one threads may send data and ksend is allowed to send partial 396 // data. Need a lock here. 397 uint32 sent = 0; 398 mutex_lock(&fSocketLock); 399 do { 400 result = send(fSocket, buf + sent, size + sizeof(uint32) - sent, 0); 401 sent += result; 402 } while (result > 0 && sent < size + sizeof(uint32)); 403 mutex_unlock(&fSocketLock); 404 if (result < 0) { 405 result = errno; 406 return result; 407 } else if (result == 0) 408 return B_IO_ERROR; 409 410 return B_OK; 411 } 412 413 414 status_t 415 ConnectionPacket::Send(const void* buffer, uint32 size) 416 { 417 ASSERT(buffer != NULL); 418 ASSERT(size < 65535); 419 420 // send on DGRAM sockets is atomic. No need to lock. 421 status_t result = send(fSocket, buffer, size, 0); 422 if (result < 0) 423 return errno; 424 return B_OK; 425 } 426 427 428 status_t 429 ConnectionStream::Receive(void** _buffer, uint32* _size) 430 { 431 ASSERT(_buffer != NULL); 432 ASSERT(_size != NULL); 433 434 status_t result; 435 436 uint32 size = 0; 437 void* buffer = NULL; 438 439 uint32 record_size; 440 bool last_one = false; 441 442 object_wait_info object[2]; 443 object[0].object = fWaitCancel; 444 object[0].type = B_OBJECT_TYPE_SEMAPHORE; 445 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 446 447 object[1].object = fSocket; 448 object[1].type = B_OBJECT_TYPE_FD; 449 object[1].events = B_EVENT_READ; 450 451 do { 452 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 453 object[1].events = B_EVENT_READ; 454 455 result = wait_for_objects(object, 2); 456 if (result < B_OK 457 || (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) { 458 free(buffer); 459 return ECONNABORTED; 460 } else if ((object[1].events & B_EVENT_READ) == 0) 461 continue; 462 463 // There is only one listener thread per connection. No need to lock. 464 uint32 received = 0; 465 do { 466 result = recv(fSocket, ((uint8*)&record_size) + received, 467 sizeof(record_size) - received, 0); 468 received += result; 469 } while (result > 0 && received < sizeof(record_size)); 470 if (result < 0) { 471 result = errno; 472 free(buffer); 473 return result; 474 } else if (result == 0) { 475 free(buffer); 476 return ECONNABORTED; 477 } 478 479 record_size = ntohl(record_size); 480 ASSERT(record_size > 0); 481 482 last_one = (record_size & LAST_FRAGMENT) != 0; 483 record_size &= LAST_FRAGMENT - 1; 484 485 void* ptr = realloc(buffer, size + record_size); 486 if (ptr == NULL) { 487 free(buffer); 488 return B_NO_MEMORY; 489 } else 490 buffer = ptr; 491 MemoryDeleter bufferDeleter(buffer); 492 493 received = 0; 494 do { 495 result = recv(fSocket, (uint8*)buffer + size + received, 496 record_size - received, 0); 497 received += result; 498 } while (result > 0 && received < record_size); 499 if (result < 0) 500 return errno; 501 else if (result == 0) 502 return ECONNABORTED; 503 504 bufferDeleter.Detach(); 505 size += record_size; 506 } while (!last_one); 507 508 509 *_buffer = buffer; 510 *_size = size; 511 512 return B_OK; 513 } 514 515 516 status_t 517 ConnectionPacket::Receive(void** _buffer, uint32* _size) 518 { 519 ASSERT(_buffer != NULL); 520 ASSERT(_size != NULL); 521 522 status_t result; 523 int32 size = MAX_PACKET_SIZE; 524 void* buffer = malloc(size); 525 526 if (buffer == NULL) 527 return B_NO_MEMORY; 528 529 object_wait_info object[2]; 530 object[0].object = fWaitCancel; 531 object[0].type = B_OBJECT_TYPE_SEMAPHORE; 532 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 533 534 object[1].object = fSocket; 535 object[1].type = B_OBJECT_TYPE_FD; 536 object[1].events = B_EVENT_READ; 537 538 do { 539 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 540 object[1].events = B_EVENT_READ; 541 542 result = wait_for_objects(object, 2); 543 if (result < B_OK 544 || (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) { 545 free(buffer); 546 return ECONNABORTED; 547 } else if ((object[1].events & B_EVENT_READ) == 0) 548 continue; 549 break; 550 } while (true); 551 552 // There is only one listener thread per connection. No need to lock. 553 size = recv(fSocket, buffer, size, 0); 554 if (size < 0) { 555 result = errno; 556 free(buffer); 557 return result; 558 } else if (size == 0) { 559 free(buffer); 560 return ECONNABORTED; 561 } 562 563 *_buffer = buffer; 564 *_size = size; 565 566 return B_OK; 567 } 568 569 570 Connection* 571 Connection::CreateObject(const PeerAddress& address) 572 { 573 switch (address.fProtocol) { 574 case IPPROTO_TCP: 575 return new(std::nothrow) ConnectionStream(address); 576 case IPPROTO_UDP: 577 return new(std::nothrow) ConnectionPacket(address); 578 default: 579 return NULL; 580 } 581 } 582 583 584 status_t 585 Connection::Connect(Connection **_connection, const PeerAddress& address) 586 { 587 ASSERT(_connection != NULL); 588 589 Connection* conn = CreateObject(address); 590 if (conn == NULL) 591 return B_NO_MEMORY; 592 593 status_t result; 594 if (conn->fWaitCancel < B_OK) { 595 result = conn->fWaitCancel; 596 delete conn; 597 return result; 598 } 599 600 result = conn->Connect(); 601 if (result != B_OK) { 602 delete conn; 603 return result; 604 } 605 606 *_connection = conn; 607 608 return B_OK; 609 } 610 611 612 status_t 613 Connection::SetTo(Connection **_connection, int socket, 614 const PeerAddress& address) 615 { 616 ASSERT(_connection != NULL); 617 ASSERT(socket != -1); 618 619 Connection* conn = CreateObject(address); 620 if (conn == NULL) 621 return B_NO_MEMORY; 622 623 status_t result; 624 if (conn->fWaitCancel < B_OK) { 625 result = conn->fWaitCancel; 626 delete conn; 627 return result; 628 } 629 630 conn->fSocket = socket; 631 632 *_connection = conn; 633 634 return B_OK; 635 } 636 637 638 status_t 639 Connection::Connect() 640 { 641 switch (fPeerAddress.fProtocol) { 642 case IPPROTO_TCP: 643 fSocket = socket(fPeerAddress.Family(), SOCK_STREAM, IPPROTO_TCP); 644 break; 645 case IPPROTO_UDP: 646 fSocket = socket(fPeerAddress.Family(), SOCK_DGRAM, IPPROTO_UDP); 647 break; 648 default: 649 return B_BAD_VALUE; 650 } 651 if (fSocket < 0) 652 return errno; 653 654 status_t result; 655 uint16 port, attempt = 0; 656 657 PeerAddress address(fPeerAddress.Family()); 658 659 do { 660 port = get_random<uint16>() % (IPPORT_RESERVED - NFS_MIN_PORT); 661 port += NFS_MIN_PORT; 662 663 if (attempt == 9) 664 port = 0; 665 attempt++; 666 667 address.SetPort(port); 668 result = bind(fSocket, (sockaddr*)&address.fAddress, 669 address.AddressSize()); 670 } while (attempt <= 10 && result != B_OK); 671 672 if (attempt > 10) { 673 close(fSocket); 674 return result; 675 } 676 677 result = connect(fSocket, (sockaddr*)&fPeerAddress.fAddress, 678 fPeerAddress.AddressSize()); 679 if (result != 0) { 680 result = errno; 681 close(fSocket); 682 return result; 683 } 684 685 return B_OK; 686 } 687 688 689 status_t 690 Connection::Reconnect() 691 { 692 release_sem(fWaitCancel); 693 close(fSocket); 694 acquire_sem(fWaitCancel); 695 return Connect(); 696 } 697 698 699 void 700 ConnectionBase::Disconnect() 701 { 702 release_sem(fWaitCancel); 703 704 close(fSocket); 705 fSocket = -1; 706 } 707 708 709 status_t 710 ConnectionListener::Listen(ConnectionListener** listener, int networkFamily, 711 uint16 port) 712 { 713 ASSERT(listener != NULL); 714 ASSERT(networkFamily == AF_INET || networkFamily == AF_INET6); 715 716 int sock = socket(networkFamily, SOCK_STREAM, IPPROTO_TCP); 717 if (sock < 0) 718 return errno; 719 720 PeerAddress address(networkFamily); 721 address.SetPort(port); 722 address.fProtocol = IPPROTO_TCP; 723 724 status_t result = bind(sock, (sockaddr*)&address.fAddress, 725 address.AddressSize()); 726 if (result != B_OK) { 727 close(sock); 728 return errno; 729 } 730 731 if (listen(sock, 5) != B_OK) { 732 close(sock); 733 return errno; 734 } 735 736 *listener = new(std::nothrow) ConnectionListener(address); 737 if (*listener == NULL) { 738 close(sock); 739 return B_NO_MEMORY; 740 } 741 742 if ((*listener)->fWaitCancel < B_OK) { 743 result = (*listener)->fWaitCancel; 744 close(sock); 745 delete *listener; 746 return result; 747 } 748 749 (*listener)->fSocket = sock; 750 751 return B_OK; 752 } 753 754 755 status_t 756 ConnectionListener::AcceptConnection(Connection** connection) 757 { 758 ASSERT(connection != NULL); 759 760 object_wait_info object[2]; 761 object[0].object = fWaitCancel; 762 object[0].type = B_OBJECT_TYPE_SEMAPHORE; 763 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 764 765 object[1].object = fSocket; 766 object[1].type = B_OBJECT_TYPE_FD; 767 object[1].events = B_EVENT_READ; 768 769 do { 770 object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; 771 object[1].events = B_EVENT_READ; 772 773 status_t result = wait_for_objects(object, 2); 774 if (result < B_OK 775 || (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) { 776 return ECONNABORTED; 777 } else if ((object[1].events & B_EVENT_READ) == 0) 778 continue; 779 break; 780 } while (true); 781 782 sockaddr_storage addr; 783 socklen_t length = sizeof(addr); 784 int sock = accept(fSocket, reinterpret_cast<sockaddr*>(&addr), &length); 785 if (sock < 0) 786 return errno; 787 788 PeerAddress address; 789 address.fProtocol = IPPROTO_TCP; 790 address.fAddress = addr; 791 792 status_t result = Connection::SetTo(connection, sock, address); 793 if (result != B_OK) { 794 close(sock); 795 return result; 796 } 797 798 return B_OK; 799 } 800 801