xref: /haiku/src/add-ons/kernel/file_systems/nfs4/Connection.cpp (revision 1a3518cf757c2da8006753f83962da5935bbc82b)
1 /*
2  * Copyright 2012-2013 Haiku, Inc. All rights reserved.
3  * Distributed under the terms of the MIT License.
4  *
5  * Authors:
6  *		Paweł Dziepak, pdziepak@quarnos.org
7  */
8 
9 
10 #include "Connection.h"
11 
12 #include <arpa/inet.h>
13 #include <errno.h>
14 #include <stdio.h>
15 #include <stdlib.h>
16 #include <string.h>
17 #include <unistd.h>
18 
19 #include <AutoDeleter.h>
20 #include <net/dns_resolver.h>
21 #include <util/kernel_cpp.h>
22 #include <util/Random.h>
23 
24 
25 #define NFS4_PORT		2049
26 
27 #define	LAST_FRAGMENT	0x80000000
28 #define MAX_PACKET_SIZE	65535
29 
30 #define NFS_MIN_PORT	665
31 
32 
33 bool
operator ==(const PeerAddress & address)34 PeerAddress::operator==(const PeerAddress& address)
35 {
36 	return memcmp(&fAddress, &address.fAddress, sizeof(fAddress)) == 0
37 		&& fProtocol == address.fProtocol;
38 }
39 
40 
41 bool
operator <(const PeerAddress & address)42 PeerAddress::operator<(const PeerAddress& address)
43 {
44 	int compare = memcmp(&fAddress, &address.fAddress, sizeof(fAddress));
45 	return compare < 0 || (compare == 0 && fProtocol < address.fProtocol);
46 }
47 
48 
49 PeerAddress&
operator =(const PeerAddress & address)50 PeerAddress::operator=(const PeerAddress& address)
51 {
52 	fAddress = address.fAddress;
53 	fProtocol = address.fProtocol;
54 	return *this;
55 }
56 
57 
PeerAddress()58 PeerAddress::PeerAddress()
59 	:
60 	fProtocol(0)
61 {
62 	memset(&fAddress, 0, sizeof(fAddress));
63 }
64 
65 
PeerAddress(int networkFamily)66 PeerAddress::PeerAddress(int networkFamily)
67 	:
68 	fProtocol(0)
69 {
70 	ASSERT(networkFamily == AF_INET || networkFamily == AF_INET6);
71 
72 	memset(&fAddress, 0, sizeof(fAddress));
73 
74 	fAddress.ss_family = networkFamily;
75 	switch (networkFamily) {
76 		case AF_INET:
77 			fAddress.ss_len = sizeof(sockaddr_in);
78 			break;
79 		case AF_INET6:
80 			fAddress.ss_len = sizeof(sockaddr_in6);
81 			break;
82 	}
83 }
84 
85 
86 const char*
ProtocolString() const87 PeerAddress::ProtocolString() const
88 {
89 	static const char* tcpName = "tcp";
90 	static const char* udpName = "udp";
91 	static const char* unknown = "";
92 
93 	switch (fProtocol) {
94 		case IPPROTO_TCP:
95 			return tcpName;
96 		case IPPROTO_UDP:
97 			return udpName;
98 		default:
99 			return unknown;
100 	}
101 }
102 
103 
104 void
SetProtocol(const char * protocol)105 PeerAddress::SetProtocol(const char* protocol)
106 {
107 	ASSERT(protocol != NULL);
108 
109 	if (strcmp(protocol, "tcp") == 0)
110 		fProtocol = IPPROTO_TCP;
111 	else if (strcmp(protocol, "udp") == 0)
112 		fProtocol = IPPROTO_UDP;
113 }
114 
115 
116 char*
UniversalAddress() const117 PeerAddress::UniversalAddress() const
118 {
119 	char* uAddr = reinterpret_cast<char*>(malloc(INET6_ADDRSTRLEN + 16));
120 	if (uAddr == NULL)
121 		return NULL;
122 
123 	if (inet_ntop(fAddress.ss_family, InAddr(), uAddr, AddressSize()) == NULL)
124 		return NULL;
125 
126 	char port[16];
127 	sprintf(port, ".%d.%d", Port() >> 8, Port() & 0xff);
128 	strcat(uAddr, port);
129 
130 	return uAddr;
131 }
132 
133 
134 socklen_t
AddressSize() const135 PeerAddress::AddressSize() const
136 {
137 	switch (Family()) {
138 		case AF_INET:
139 			return sizeof(sockaddr_in);
140 		case AF_INET6:
141 			return sizeof(sockaddr_in6);
142 		default:
143 			return 0;
144 	}
145 }
146 
147 
148 uint16
Port() const149 PeerAddress::Port() const
150 {
151 	uint16 port;
152 
153 	switch (Family()) {
154 		case AF_INET:
155 			port = reinterpret_cast<const sockaddr_in*>(&fAddress)->sin_port;
156 			break;
157 		case AF_INET6:
158 			port = reinterpret_cast<const sockaddr_in6*>(&fAddress)->sin6_port;
159 			break;
160 		default:
161 			port = 0;
162 	}
163 
164 	return ntohs(port);
165 }
166 
167 
168 void
SetPort(uint16 port)169 PeerAddress::SetPort(uint16 port)
170 {
171 	port = htons(port);
172 
173 	switch (Family()) {
174 		case AF_INET:
175 			reinterpret_cast<sockaddr_in*>(&fAddress)->sin_port = port;
176 			break;
177 		case AF_INET6:
178 			reinterpret_cast<sockaddr_in6*>(&fAddress)->sin6_port = port;
179 			break;
180 	}
181 }
182 
183 const void*
InAddr() const184 PeerAddress::InAddr() const
185 {
186 	switch (Family()) {
187 		case AF_INET:
188 			return &reinterpret_cast<const sockaddr_in*>(&fAddress)->sin_addr;
189 		case AF_INET6:
190 			return &reinterpret_cast<const sockaddr_in6*>(&fAddress)->sin6_addr;
191 		default:
192 			return NULL;
193 	}
194 }
195 
196 
197 size_t
InAddrSize() const198 PeerAddress::InAddrSize() const
199 {
200 	switch (Family()) {
201 		case AF_INET:
202 			return sizeof(in_addr);
203 		case AF_INET6:
204 			return sizeof(in6_addr);
205 		default:
206 			return 0;
207 	}
208 }
209 
210 
AddressResolver(const char * name)211 AddressResolver::AddressResolver(const char* name)
212 	:
213 	fHead(NULL),
214 	fCurrent(NULL),
215 	fForcedPort(htons(NFS4_PORT)),
216 	fForcedProtocol(IPPROTO_TCP)
217 {
218 	fStatus = ResolveAddress(name);
219 }
220 
221 
~AddressResolver()222 AddressResolver::~AddressResolver()
223 {
224 	freeaddrinfo(fHead);
225 }
226 
227 
228 status_t
ResolveAddress(const char * name)229 AddressResolver::ResolveAddress(const char* name)
230 {
231 	ASSERT(name != NULL);
232 
233 	if (fHead != NULL) {
234 		freeaddrinfo(fHead);
235 		fHead = NULL;
236 		fCurrent = NULL;
237 	}
238 
239 	// getaddrinfo() is very expensive when called from kernel, so we do not
240 	// want to call it unless there is no other choice.
241 	struct sockaddr_in addr;
242 	memset(&addr, 0, sizeof(addr));
243 	if (inet_aton(name, &addr.sin_addr) == 1) {
244 		addr.sin_len = sizeof(addr);
245 		addr.sin_family = AF_INET;
246 		addr.sin_port = htons(NFS4_PORT);
247 
248 		memcpy(&fAddress.fAddress, &addr, sizeof(addr));
249 		fAddress.fProtocol = IPPROTO_TCP;
250 		return B_OK;
251 	}
252 
253 	status_t result = getaddrinfo(name, NULL, NULL, &fHead);
254 	fCurrent = fHead;
255 
256 	return result;
257 }
258 
259 
260 void
ForceProtocol(const char * protocol)261 AddressResolver::ForceProtocol(const char* protocol)
262 {
263 	ASSERT(protocol != NULL);
264 
265 	if (strcmp(protocol, "tcp") == 0)
266 		fForcedProtocol = IPPROTO_TCP;
267 	else if (strcmp(protocol, "udp") == 0)
268 		fForcedProtocol = IPPROTO_UDP;
269 
270 	fAddress.SetProtocol(protocol);
271 }
272 
273 
274 void
ForcePort(uint16 port)275 AddressResolver::ForcePort(uint16 port)
276 {
277 	fForcedPort = htons(port);
278 	fAddress.SetPort(port);
279 }
280 
281 
282 status_t
GetNextAddress(PeerAddress * address)283 AddressResolver::GetNextAddress(PeerAddress* address)
284 {
285 	ASSERT(address != NULL);
286 
287 	if (fStatus != B_OK)
288 		return fStatus;
289 
290 	if (fHead == NULL) {
291 		*address = fAddress;
292 		fStatus = B_NAME_NOT_FOUND;
293 		return B_OK;
294 	}
295 
296 	address->fProtocol = fForcedProtocol;
297 
298 	while (fCurrent != NULL) {
299 		if (fCurrent->ai_family == AF_INET) {
300 			memcpy(&address->fAddress, fCurrent->ai_addr, sizeof(sockaddr_in));
301 			reinterpret_cast<sockaddr_in*>(&address->fAddress)->sin_port
302 				= fForcedPort;
303 		} else if (fCurrent->ai_family == AF_INET6) {
304 			memcpy(&address->fAddress, fCurrent->ai_addr, sizeof(sockaddr_in6));
305 			reinterpret_cast<sockaddr_in6*>(&address->fAddress)->sin6_port
306 				= fForcedPort;
307 		} else {
308 			fCurrent = fCurrent->ai_next;
309 			continue;
310 		}
311 
312 		fCurrent = fCurrent->ai_next;
313 		return B_OK;
314 	}
315 
316 	return B_NAME_NOT_FOUND;
317 }
318 
319 
Connection(const PeerAddress & address)320 Connection::Connection(const PeerAddress& address)
321 	:
322 	ConnectionBase(address)
323 {
324 }
325 
326 
ConnectionListener(const PeerAddress & address)327 ConnectionListener::ConnectionListener(const PeerAddress& address)
328 	:
329 	ConnectionBase(address)
330 {
331 }
332 
333 
ConnectionBase(const PeerAddress & address)334 ConnectionBase::ConnectionBase(const PeerAddress& address)
335 	:
336 	fWaitCancel(create_sem(0, NULL)),
337 	fSocket(-1),
338 	fPeerAddress(address)
339 {
340 	mutex_init(&fSocketLock, NULL);
341 }
342 
343 
ConnectionStream(const PeerAddress & address)344 ConnectionStream::ConnectionStream(const PeerAddress& address)
345 	:
346 	Connection(address)
347 {
348 }
349 
350 
ConnectionPacket(const PeerAddress & address)351 ConnectionPacket::ConnectionPacket(const PeerAddress& address)
352 	:
353 	Connection(address)
354 {
355 }
356 
357 
~ConnectionBase()358 ConnectionBase::~ConnectionBase()
359 {
360 	if (fSocket != -1)
361 		close(fSocket);
362 	mutex_destroy(&fSocketLock);
363 	delete_sem(fWaitCancel);
364 }
365 
366 
367 status_t
GetLocalAddress(PeerAddress * address)368 ConnectionBase::GetLocalAddress(PeerAddress* address)
369 {
370 	ASSERT(address != NULL);
371 
372 	address->fProtocol = fPeerAddress.fProtocol;
373 
374 	socklen_t addressSize = sizeof(address->fAddress);
375 	return getsockname(fSocket,	(struct sockaddr*)&address->fAddress,
376 		&addressSize);
377 }
378 
379 
380 status_t
Send(const void * buffer,uint32 size)381 ConnectionStream::Send(const void* buffer, uint32 size)
382 {
383 	ASSERT(buffer != NULL);
384 
385 	status_t result;
386 
387 	uint32* buf = reinterpret_cast<uint32*>(malloc(size + sizeof(uint32)));
388 	if (buf == NULL)
389 		return B_NO_MEMORY;
390 	MemoryDeleter _(buf);
391 
392 	buf[0] = htonl(size | LAST_FRAGMENT);
393 	memcpy(buf + 1, buffer, size);
394 
395 	// More than one threads may send data and ksend is allowed to send partial
396 	// data. Need a lock here.
397 	uint32 sent = 0;
398 	mutex_lock(&fSocketLock);
399 	do {
400 		result = send(fSocket, buf + sent, size + sizeof(uint32) - sent, 0);
401 		sent += result;
402 	} while (result > 0 && sent < size + sizeof(uint32));
403 	mutex_unlock(&fSocketLock);
404 	if (result < 0) {
405 		result = errno;
406 		return result;
407 	} else if (result == 0)
408 		return B_IO_ERROR;
409 
410 	return B_OK;
411 }
412 
413 
414 status_t
Send(const void * buffer,uint32 size)415 ConnectionPacket::Send(const void* buffer, uint32 size)
416 {
417 	ASSERT(buffer != NULL);
418 	ASSERT(size < 65535);
419 
420 	// send on DGRAM sockets is atomic. No need to lock.
421 	status_t result = send(fSocket, buffer,  size, 0);
422 	if (result < 0)
423 		return errno;
424 	return B_OK;
425 }
426 
427 
428 status_t
Receive(void ** _buffer,uint32 * _size)429 ConnectionStream::Receive(void** _buffer, uint32* _size)
430 {
431 	ASSERT(_buffer != NULL);
432 	ASSERT(_size != NULL);
433 
434 	status_t result;
435 
436 	uint32 size = 0;
437 	void* buffer = NULL;
438 
439 	uint32 record_size;
440 	bool last_one = false;
441 
442 	object_wait_info object[2];
443 	object[0].object = fWaitCancel;
444 	object[0].type = B_OBJECT_TYPE_SEMAPHORE;
445 	object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
446 
447 	object[1].object = fSocket;
448 	object[1].type = B_OBJECT_TYPE_FD;
449 	object[1].events = B_EVENT_READ;
450 
451 	do {
452 		object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
453 		object[1].events = B_EVENT_READ;
454 
455 		result = wait_for_objects(object, 2);
456 		if (result < B_OK
457 			|| (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) {
458 			free(buffer);
459 			return ECONNABORTED;
460 		} else if ((object[1].events & B_EVENT_READ) == 0)
461 			continue;
462 
463 		// There is only one listener thread per connection. No need to lock.
464 		uint32 received = 0;
465 		do {
466 			result = recv(fSocket, ((uint8*)&record_size) + received,
467 							sizeof(record_size) - received, 0);
468 			received += result;
469 		} while (result > 0 && received < sizeof(record_size));
470 		if (result < 0) {
471 			result = errno;
472 			free(buffer);
473 			return result;
474 		} else if (result == 0) {
475 			free(buffer);
476 			return ECONNABORTED;
477 		}
478 
479 		record_size = ntohl(record_size);
480 		ASSERT(record_size > 0);
481 
482 		last_one = (record_size & LAST_FRAGMENT) != 0;
483 		record_size &= LAST_FRAGMENT - 1;
484 
485 		void* ptr = realloc(buffer, size + record_size);
486 		if (ptr == NULL) {
487 			free(buffer);
488 			return B_NO_MEMORY;
489 		} else
490 			buffer = ptr;
491 		MemoryDeleter bufferDeleter(buffer);
492 
493 		received = 0;
494 		do {
495 			result = recv(fSocket, (uint8*)buffer + size + received,
496 							record_size - received, 0);
497 			received += result;
498 		} while (result > 0 && received < record_size);
499 		if (result < 0)
500 			return errno;
501 		else if (result == 0)
502 			return ECONNABORTED;
503 
504 		bufferDeleter.Detach();
505 		size += record_size;
506 	} while (!last_one);
507 
508 
509 	*_buffer = buffer;
510 	*_size = size;
511 
512 	return B_OK;
513 }
514 
515 
516 status_t
Receive(void ** _buffer,uint32 * _size)517 ConnectionPacket::Receive(void** _buffer, uint32* _size)
518 {
519 	ASSERT(_buffer != NULL);
520 	ASSERT(_size != NULL);
521 
522 	status_t result;
523 	int32 size = MAX_PACKET_SIZE;
524 	void* buffer = malloc(size);
525 
526 	if (buffer == NULL)
527 		return B_NO_MEMORY;
528 
529 	object_wait_info object[2];
530 	object[0].object = fWaitCancel;
531 	object[0].type = B_OBJECT_TYPE_SEMAPHORE;
532 	object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
533 
534 	object[1].object = fSocket;
535 	object[1].type = B_OBJECT_TYPE_FD;
536 	object[1].events = B_EVENT_READ;
537 
538 	do {
539 		object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
540 		object[1].events = B_EVENT_READ;
541 
542 		result = wait_for_objects(object, 2);
543 		if (result < B_OK
544 			|| (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) {
545 			free(buffer);
546 			return ECONNABORTED;
547 		} else if ((object[1].events & B_EVENT_READ) == 0)
548 			continue;
549 		break;
550 	} while (true);
551 
552 	// There is only one listener thread per connection. No need to lock.
553 	size = recv(fSocket, buffer, size, 0);
554 	if (size < 0) {
555 		result = errno;
556 		free(buffer);
557 		return result;
558 	} else if (size == 0) {
559 		free(buffer);
560 		return ECONNABORTED;
561 	}
562 
563 	*_buffer = buffer;
564 	*_size = size;
565 
566 	return B_OK;
567 }
568 
569 
570 Connection*
CreateObject(const PeerAddress & address)571 Connection::CreateObject(const PeerAddress& address)
572 {
573 	switch (address.fProtocol) {
574 		case IPPROTO_TCP:
575 			return new(std::nothrow) ConnectionStream(address);
576 		case IPPROTO_UDP:
577 			return new(std::nothrow) ConnectionPacket(address);
578 		default:
579 			return NULL;
580 	}
581 }
582 
583 
584 status_t
Connect(Connection ** _connection,const PeerAddress & address)585 Connection::Connect(Connection **_connection, const PeerAddress& address)
586 {
587 	ASSERT(_connection != NULL);
588 
589 	Connection* conn = CreateObject(address);
590 	if (conn == NULL)
591 		return B_NO_MEMORY;
592 
593 	status_t result;
594 	if (conn->fWaitCancel < B_OK) {
595 		result = conn->fWaitCancel;
596 		delete conn;
597 		return result;
598 	}
599 
600 	result = conn->Connect();
601 	if (result != B_OK) {
602 		delete conn;
603 		return result;
604 	}
605 
606 	*_connection = conn;
607 
608 	return B_OK;
609 }
610 
611 
612 status_t
SetTo(Connection ** _connection,int socket,const PeerAddress & address)613 Connection::SetTo(Connection **_connection, int socket,
614 	const PeerAddress& address)
615 {
616 	ASSERT(_connection != NULL);
617 	ASSERT(socket != -1);
618 
619 	Connection* conn = CreateObject(address);
620 	if (conn == NULL)
621 		return B_NO_MEMORY;
622 
623 	status_t result;
624 	if (conn->fWaitCancel < B_OK) {
625 		result = conn->fWaitCancel;
626 		delete conn;
627 		return result;
628 	}
629 
630 	conn->fSocket = socket;
631 
632 	*_connection = conn;
633 
634 	return B_OK;
635 }
636 
637 
638 status_t
Connect()639 Connection::Connect()
640 {
641 	switch (fPeerAddress.fProtocol) {
642 		case IPPROTO_TCP:
643 			fSocket = socket(fPeerAddress.Family(), SOCK_STREAM, IPPROTO_TCP);
644 			break;
645 		case IPPROTO_UDP:
646 			fSocket = socket(fPeerAddress.Family(), SOCK_DGRAM, IPPROTO_UDP);
647 			break;
648 		default:
649 			return B_BAD_VALUE;
650 	}
651 	if (fSocket < 0)
652 		return errno;
653 
654 	status_t result;
655 	uint16 port, attempt = 0;
656 
657 	PeerAddress address(fPeerAddress.Family());
658 
659 	do {
660 		port = get_random<uint16>() % (IPPORT_RESERVED - NFS_MIN_PORT);
661 		port += NFS_MIN_PORT;
662 
663 		if (attempt == 9)
664 			port = 0;
665 		attempt++;
666 
667 		address.SetPort(port);
668 		result = bind(fSocket, (sockaddr*)&address.fAddress,
669 			address.AddressSize());
670 	} while (attempt <= 10 && result != B_OK);
671 
672 	if (attempt > 10) {
673 		close(fSocket);
674 		return result;
675 	}
676 
677 	result = connect(fSocket, (sockaddr*)&fPeerAddress.fAddress,
678 		fPeerAddress.AddressSize());
679 	if (result != 0) {
680 		result = errno;
681 		close(fSocket);
682 		return result;
683 	}
684 
685 	return B_OK;
686 }
687 
688 
689 status_t
Reconnect()690 Connection::Reconnect()
691 {
692 	release_sem(fWaitCancel);
693 	close(fSocket);
694 	acquire_sem(fWaitCancel);
695 	return Connect();
696 }
697 
698 
699 void
Disconnect()700 ConnectionBase::Disconnect()
701 {
702 	release_sem(fWaitCancel);
703 
704 	close(fSocket);
705 	fSocket = -1;
706 }
707 
708 
709 status_t
Listen(ConnectionListener ** listener,int networkFamily,uint16 port)710 ConnectionListener::Listen(ConnectionListener** listener, int networkFamily,
711 	uint16 port)
712 {
713 	ASSERT(listener != NULL);
714 	ASSERT(networkFamily == AF_INET || networkFamily == AF_INET6);
715 
716 	int sock = socket(networkFamily, SOCK_STREAM, IPPROTO_TCP);
717 	if (sock < 0)
718 		return errno;
719 
720 	PeerAddress address(networkFamily);
721 	address.SetPort(port);
722 	address.fProtocol = IPPROTO_TCP;
723 
724 	status_t result = bind(sock, (sockaddr*)&address.fAddress,
725 		address.AddressSize());
726 	if (result != B_OK) {
727 		close(sock);
728 		return errno;
729 	}
730 
731 	if (listen(sock, 5) != B_OK) {
732 		close(sock);
733 		return errno;
734 	}
735 
736 	*listener = new(std::nothrow) ConnectionListener(address);
737 	if (*listener == NULL) {
738 		close(sock);
739 		return B_NO_MEMORY;
740 	}
741 
742 	if ((*listener)->fWaitCancel < B_OK) {
743 		result = (*listener)->fWaitCancel;
744 		close(sock);
745 		delete *listener;
746 		return result;
747 	}
748 
749 	(*listener)->fSocket = sock;
750 
751 	return B_OK;
752 }
753 
754 
755 status_t
AcceptConnection(Connection ** connection)756 ConnectionListener::AcceptConnection(Connection** connection)
757 {
758 	ASSERT(connection != NULL);
759 
760 	object_wait_info object[2];
761 	object[0].object = fWaitCancel;
762 	object[0].type = B_OBJECT_TYPE_SEMAPHORE;
763 	object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
764 
765 	object[1].object = fSocket;
766 	object[1].type = B_OBJECT_TYPE_FD;
767 	object[1].events = B_EVENT_READ;
768 
769 	do {
770 		object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
771 		object[1].events = B_EVENT_READ;
772 
773 		status_t result = wait_for_objects(object, 2);
774 		if (result < B_OK
775 			|| (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) {
776 			return ECONNABORTED;
777 		} else if ((object[1].events & B_EVENT_READ) == 0)
778 			continue;
779 		break;
780 	} while (true);
781 
782 	sockaddr_storage addr;
783 	socklen_t length = sizeof(addr);
784 	int sock = accept(fSocket, reinterpret_cast<sockaddr*>(&addr), &length);
785 	if (sock < 0)
786 		return errno;
787 
788 	PeerAddress address;
789 	address.fProtocol = IPPROTO_TCP;
790 	address.fAddress = addr;
791 
792 	status_t result = Connection::SetTo(connection, sock, address);
793 	if (result != B_OK) {
794 		close(sock);
795 		return result;
796 	}
797 
798 	return B_OK;
799 }
800 
801