xref: /haiku/src/add-ons/kernel/file_systems/nfs4/Connection.cpp (revision 83b1a68c52ba3e0e8796282759f694b7fdddf06d)
1 /*
2  * Copyright 2012-2013 Haiku, Inc. All rights reserved.
3  * Distributed under the terms of the MIT License.
4  *
5  * Authors:
6  *		Paweł Dziepak, pdziepak@quarnos.org
7  */
8 
9 
10 #include "Connection.h"
11 
12 #include <arpa/inet.h>
13 #include <errno.h>
14 #include <stdlib.h>
15 #include <string.h>
16 #include <unistd.h>
17 
18 #include <AutoDeleter.h>
19 #include <net/dns_resolver.h>
20 #include <util/kernel_cpp.h>
21 #include <util/Random.h>
22 
23 
24 #define NFS4_PORT		2049
25 
26 #define	LAST_FRAGMENT	0x80000000
27 #define MAX_PACKET_SIZE	65535
28 
29 #define NFS_MIN_PORT	665
30 
31 
32 bool
33 PeerAddress::operator==(const PeerAddress& address)
34 {
35 	return memcmp(&fAddress, &address.fAddress, sizeof(fAddress)) == 0
36 		&& fProtocol == address.fProtocol;
37 }
38 
39 
40 bool
41 PeerAddress::operator<(const PeerAddress& address)
42 {
43 	int compare = memcmp(&fAddress, &address.fAddress, sizeof(fAddress));
44 	return compare < 0 || (compare == 0 && fProtocol < address.fProtocol);
45 }
46 
47 
48 PeerAddress&
49 PeerAddress::operator=(const PeerAddress& address)
50 {
51 	fAddress = address.fAddress;
52 	fProtocol = address.fProtocol;
53 	return *this;
54 }
55 
56 
57 PeerAddress::PeerAddress()
58 	:
59 	fProtocol(0)
60 {
61 	memset(&fAddress, 0, sizeof(fAddress));
62 }
63 
64 
65 PeerAddress::PeerAddress(int networkFamily)
66 	:
67 	fProtocol(0)
68 {
69 	ASSERT(networkFamily == AF_INET || networkFamily == AF_INET6);
70 
71 	memset(&fAddress, 0, sizeof(fAddress));
72 
73 	fAddress.ss_family = networkFamily;
74 	switch (networkFamily) {
75 		case AF_INET:
76 			fAddress.ss_len = sizeof(sockaddr_in);
77 			break;
78 		case AF_INET6:
79 			fAddress.ss_len = sizeof(sockaddr_in6);
80 			break;
81 	}
82 }
83 
84 
85 const char*
86 PeerAddress::ProtocolString() const
87 {
88 	static const char* tcpName = "tcp";
89 	static const char* udpName = "udp";
90 	static const char* unknown = "";
91 
92 	switch (fProtocol) {
93 		case IPPROTO_TCP:
94 			return tcpName;
95 		case IPPROTO_UDP:
96 			return udpName;
97 		default:
98 			return unknown;
99 	}
100 }
101 
102 
103 void
104 PeerAddress::SetProtocol(const char* protocol)
105 {
106 	ASSERT(protocol != NULL);
107 
108 	if (strcmp(protocol, "tcp") == 0)
109 		fProtocol = IPPROTO_TCP;
110 	else if (strcmp(protocol, "udp") == 0)
111 		fProtocol = IPPROTO_UDP;
112 }
113 
114 
115 char*
116 PeerAddress::UniversalAddress() const
117 {
118 	char* uAddr = reinterpret_cast<char*>(malloc(INET6_ADDRSTRLEN + 16));
119 	if (uAddr == NULL)
120 		return NULL;
121 
122 	if (inet_ntop(fAddress.ss_family, InAddr(), uAddr, AddressSize()) == NULL)
123 		return NULL;
124 
125 	char port[16];
126 	sprintf(port, ".%d.%d", Port() >> 8, Port() & 0xff);
127 	strcat(uAddr, port);
128 
129 	return uAddr;
130 }
131 
132 
133 socklen_t
134 PeerAddress::AddressSize() const
135 {
136 	switch (Family()) {
137 		case AF_INET:
138 			return sizeof(sockaddr_in);
139 		case AF_INET6:
140 			return sizeof(sockaddr_in6);
141 		default:
142 			return 0;
143 	}
144 }
145 
146 
147 uint16
148 PeerAddress::Port() const
149 {
150 	uint16 port;
151 
152 	switch (Family()) {
153 		case AF_INET:
154 			port = reinterpret_cast<const sockaddr_in*>(&fAddress)->sin_port;
155 			break;
156 		case AF_INET6:
157 			port = reinterpret_cast<const sockaddr_in6*>(&fAddress)->sin6_port;
158 			break;
159 		default:
160 			port = 0;
161 	}
162 
163 	return ntohs(port);
164 }
165 
166 
167 void
168 PeerAddress::SetPort(uint16 port)
169 {
170 	port = htons(port);
171 
172 	switch (Family()) {
173 		case AF_INET:
174 			reinterpret_cast<sockaddr_in*>(&fAddress)->sin_port = port;
175 			break;
176 		case AF_INET6:
177 			reinterpret_cast<sockaddr_in6*>(&fAddress)->sin6_port = port;
178 			break;
179 	}
180 }
181 
182 const void*
183 PeerAddress::InAddr() const
184 {
185 	switch (Family()) {
186 		case AF_INET:
187 			return &reinterpret_cast<const sockaddr_in*>(&fAddress)->sin_addr;
188 		case AF_INET6:
189 			return &reinterpret_cast<const sockaddr_in6*>(&fAddress)->sin6_addr;
190 		default:
191 			return NULL;
192 	}
193 }
194 
195 
196 size_t
197 PeerAddress::InAddrSize() const
198 {
199 	switch (Family()) {
200 		case AF_INET:
201 			return sizeof(in_addr);
202 		case AF_INET6:
203 			return sizeof(in6_addr);
204 		default:
205 			return 0;
206 	}
207 }
208 
209 
210 AddressResolver::AddressResolver(const char* name)
211 	:
212 	fHead(NULL),
213 	fCurrent(NULL),
214 	fForcedPort(htons(NFS4_PORT)),
215 	fForcedProtocol(IPPROTO_TCP)
216 {
217 	fStatus = ResolveAddress(name);
218 }
219 
220 
221 AddressResolver::~AddressResolver()
222 {
223 	freeaddrinfo(fHead);
224 }
225 
226 
227 status_t
228 AddressResolver::ResolveAddress(const char* name)
229 {
230 	ASSERT(name != NULL);
231 
232 	if (fHead != NULL) {
233 		freeaddrinfo(fHead);
234 		fHead = NULL;
235 		fCurrent = NULL;
236 	}
237 
238 	// getaddrinfo() is very expensive when called from kernel, so we do not
239 	// want to call it unless there is no other choice.
240 	struct sockaddr_in addr;
241 	memset(&addr, 0, sizeof(addr));
242 	if (inet_aton(name, &addr.sin_addr) == 1) {
243 		addr.sin_len = sizeof(addr);
244 		addr.sin_family = AF_INET;
245 		addr.sin_port = htons(NFS4_PORT);
246 
247 		memcpy(&fAddress.fAddress, &addr, sizeof(addr));
248 		fAddress.fProtocol = IPPROTO_TCP;
249 		return B_OK;
250 	}
251 
252 	status_t result = getaddrinfo(name, NULL, NULL, &fHead);
253 	fCurrent = fHead;
254 
255 	return result;
256 }
257 
258 
259 void
260 AddressResolver::ForceProtocol(const char* protocol)
261 {
262 	ASSERT(protocol != NULL);
263 
264 	if (strcmp(protocol, "tcp") == 0)
265 		fForcedProtocol = IPPROTO_TCP;
266 	else if (strcmp(protocol, "udp") == 0)
267 		fForcedProtocol = IPPROTO_UDP;
268 
269 	fAddress.SetProtocol(protocol);
270 }
271 
272 
273 void
274 AddressResolver::ForcePort(uint16 port)
275 {
276 	fForcedPort = htons(port);
277 	fAddress.SetPort(port);
278 }
279 
280 
281 status_t
282 AddressResolver::GetNextAddress(PeerAddress* address)
283 {
284 	ASSERT(address != NULL);
285 
286 	if (fStatus != B_OK)
287 		return fStatus;
288 
289 	if (fHead == NULL) {
290 		*address = fAddress;
291 		fStatus = B_NAME_NOT_FOUND;
292 		return B_OK;
293 	}
294 
295 	address->fProtocol = fForcedProtocol;
296 
297 	while (fCurrent != NULL) {
298 		if (fCurrent->ai_family == AF_INET) {
299 			memcpy(&address->fAddress, fCurrent->ai_addr, sizeof(sockaddr_in));
300 			reinterpret_cast<sockaddr_in*>(&address->fAddress)->sin_port
301 				= fForcedPort;
302 		} else if (fCurrent->ai_family == AF_INET6) {
303 			memcpy(&address->fAddress, fCurrent->ai_addr, sizeof(sockaddr_in6));
304 			reinterpret_cast<sockaddr_in6*>(&address->fAddress)->sin6_port
305 				= fForcedPort;
306 		} else {
307 			fCurrent = fCurrent->ai_next;
308 			continue;
309 		}
310 
311 		fCurrent = fCurrent->ai_next;
312 		return B_OK;
313 	}
314 
315 	return B_NAME_NOT_FOUND;
316 }
317 
318 
319 Connection::Connection(const PeerAddress& address)
320 	:
321 	ConnectionBase(address)
322 {
323 }
324 
325 
326 ConnectionListener::ConnectionListener(const PeerAddress& address)
327 	:
328 	ConnectionBase(address)
329 {
330 }
331 
332 
333 ConnectionBase::ConnectionBase(const PeerAddress& address)
334 	:
335 	fWaitCancel(create_sem(0, NULL)),
336 	fSocket(-1),
337 	fPeerAddress(address)
338 {
339 	mutex_init(&fSocketLock, NULL);
340 }
341 
342 
343 ConnectionStream::ConnectionStream(const PeerAddress& address)
344 	:
345 	Connection(address)
346 {
347 }
348 
349 
350 ConnectionPacket::ConnectionPacket(const PeerAddress& address)
351 	:
352 	Connection(address)
353 {
354 }
355 
356 
357 ConnectionBase::~ConnectionBase()
358 {
359 	if (fSocket != -1)
360 		close(fSocket);
361 	mutex_destroy(&fSocketLock);
362 	delete_sem(fWaitCancel);
363 }
364 
365 
366 status_t
367 ConnectionBase::GetLocalAddress(PeerAddress* address)
368 {
369 	ASSERT(address != NULL);
370 
371 	address->fProtocol = fPeerAddress.fProtocol;
372 
373 	socklen_t addressSize = sizeof(address->fAddress);
374 	return getsockname(fSocket,	(struct sockaddr*)&address->fAddress,
375 		&addressSize);
376 }
377 
378 
379 status_t
380 ConnectionStream::Send(const void* buffer, uint32 size)
381 {
382 	ASSERT(buffer != NULL);
383 
384 	status_t result;
385 
386 	uint32* buf = reinterpret_cast<uint32*>(malloc(size + sizeof(uint32)));
387 	if (buf == NULL)
388 		return B_NO_MEMORY;
389 	MemoryDeleter _(buf);
390 
391 	buf[0] = htonl(size | LAST_FRAGMENT);
392 	memcpy(buf + 1, buffer, size);
393 
394 	// More than one threads may send data and ksend is allowed to send partial
395 	// data. Need a lock here.
396 	uint32 sent = 0;
397 	mutex_lock(&fSocketLock);
398 	do {
399 		result = send(fSocket, buf + sent, size + sizeof(uint32) - sent, 0);
400 		sent += result;
401 	} while (result > 0 && sent < size + sizeof(uint32));
402 	mutex_unlock(&fSocketLock);
403 	if (result < 0) {
404 		result = errno;
405 		return result;
406 	} else if (result == 0)
407 		return B_IO_ERROR;
408 
409 	return B_OK;
410 }
411 
412 
413 status_t
414 ConnectionPacket::Send(const void* buffer, uint32 size)
415 {
416 	ASSERT(buffer != NULL);
417 	ASSERT(size < 65535);
418 
419 	// send on DGRAM sockets is atomic. No need to lock.
420 	status_t result = send(fSocket, buffer,  size, 0);
421 	if (result < 0)
422 		return errno;
423 	return B_OK;
424 }
425 
426 
427 status_t
428 ConnectionStream::Receive(void** _buffer, uint32* _size)
429 {
430 	ASSERT(_buffer != NULL);
431 	ASSERT(_size != NULL);
432 
433 	status_t result;
434 
435 	uint32 size = 0;
436 	void* buffer = NULL;
437 
438 	uint32 record_size;
439 	bool last_one = false;
440 
441 	object_wait_info object[2];
442 	object[0].object = fWaitCancel;
443 	object[0].type = B_OBJECT_TYPE_SEMAPHORE;
444 	object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
445 
446 	object[1].object = fSocket;
447 	object[1].type = B_OBJECT_TYPE_FD;
448 	object[1].events = B_EVENT_READ;
449 
450 	do {
451 		object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
452 		object[1].events = B_EVENT_READ;
453 
454 		result = wait_for_objects(object, 2);
455 		if (result < B_OK
456 			|| (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) {
457 			free(buffer);
458 			return ECONNABORTED;
459 		} else if ((object[1].events & B_EVENT_READ) == 0)
460 			continue;
461 
462 		// There is only one listener thread per connection. No need to lock.
463 		uint32 received = 0;
464 		do {
465 			result = recv(fSocket, ((uint8*)&record_size) + received,
466 							sizeof(record_size) - received, 0);
467 			received += result;
468 		} while (result > 0 && received < sizeof(record_size));
469 		if (result < 0) {
470 			result = errno;
471 			free(buffer);
472 			return result;
473 		} else if (result == 0) {
474 			free(buffer);
475 			return ECONNABORTED;
476 		}
477 
478 		record_size = ntohl(record_size);
479 		ASSERT(record_size > 0);
480 
481 		last_one = (record_size & LAST_FRAGMENT) != 0;
482 		record_size &= LAST_FRAGMENT - 1;
483 
484 		void* ptr = realloc(buffer, size + record_size);
485 		if (ptr == NULL) {
486 			free(buffer);
487 			return B_NO_MEMORY;
488 		} else
489 			buffer = ptr;
490 		MemoryDeleter bufferDeleter(buffer);
491 
492 		received = 0;
493 		do {
494 			result = recv(fSocket, (uint8*)buffer + size + received,
495 							record_size - received, 0);
496 			received += result;
497 		} while (result > 0 && received < record_size);
498 		if (result < 0)
499 			return errno;
500 		else if (result == 0)
501 			return ECONNABORTED;
502 
503 		bufferDeleter.Detach();
504 		size += record_size;
505 	} while (!last_one);
506 
507 
508 	*_buffer = buffer;
509 	*_size = size;
510 
511 	return B_OK;
512 }
513 
514 
515 status_t
516 ConnectionPacket::Receive(void** _buffer, uint32* _size)
517 {
518 	ASSERT(_buffer != NULL);
519 	ASSERT(_size != NULL);
520 
521 	status_t result;
522 	int32 size = MAX_PACKET_SIZE;
523 	void* buffer = malloc(size);
524 
525 	if (buffer == NULL)
526 		return B_NO_MEMORY;
527 
528 	object_wait_info object[2];
529 	object[0].object = fWaitCancel;
530 	object[0].type = B_OBJECT_TYPE_SEMAPHORE;
531 	object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
532 
533 	object[1].object = fSocket;
534 	object[1].type = B_OBJECT_TYPE_FD;
535 	object[1].events = B_EVENT_READ;
536 
537 	do {
538 		object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
539 		object[1].events = B_EVENT_READ;
540 
541 		result = wait_for_objects(object, 2);
542 		if (result < B_OK
543 			|| (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) {
544 			free(buffer);
545 			return ECONNABORTED;
546 		} else if ((object[1].events & B_EVENT_READ) == 0)
547 			continue;
548 		break;
549 	} while (true);
550 
551 	// There is only one listener thread per connection. No need to lock.
552 	size = recv(fSocket, buffer, size, 0);
553 	if (size < 0) {
554 		result = errno;
555 		free(buffer);
556 		return result;
557 	} else if (size == 0) {
558 		free(buffer);
559 		return ECONNABORTED;
560 	}
561 
562 	*_buffer = buffer;
563 	*_size = size;
564 
565 	return B_OK;
566 }
567 
568 
569 Connection*
570 Connection::CreateObject(const PeerAddress& address)
571 {
572 	switch (address.fProtocol) {
573 		case IPPROTO_TCP:
574 			return new(std::nothrow) ConnectionStream(address);
575 		case IPPROTO_UDP:
576 			return new(std::nothrow) ConnectionPacket(address);
577 		default:
578 			return NULL;
579 	}
580 }
581 
582 
583 status_t
584 Connection::Connect(Connection **_connection, const PeerAddress& address)
585 {
586 	ASSERT(_connection != NULL);
587 
588 	Connection* conn = CreateObject(address);
589 	if (conn == NULL)
590 		return B_NO_MEMORY;
591 
592 	status_t result;
593 	if (conn->fWaitCancel < B_OK) {
594 		result = conn->fWaitCancel;
595 		delete conn;
596 		return result;
597 	}
598 
599 	result = conn->Connect();
600 	if (result != B_OK) {
601 		delete conn;
602 		return result;
603 	}
604 
605 	*_connection = conn;
606 
607 	return B_OK;
608 }
609 
610 
611 status_t
612 Connection::SetTo(Connection **_connection, int socket,
613 	const PeerAddress& address)
614 {
615 	ASSERT(_connection != NULL);
616 	ASSERT(socket != -1);
617 
618 	Connection* conn = CreateObject(address);
619 	if (conn == NULL)
620 		return B_NO_MEMORY;
621 
622 	status_t result;
623 	if (conn->fWaitCancel < B_OK) {
624 		result = conn->fWaitCancel;
625 		delete conn;
626 		return result;
627 	}
628 
629 	conn->fSocket = socket;
630 
631 	*_connection = conn;
632 
633 	return B_OK;
634 }
635 
636 
637 status_t
638 Connection::Connect()
639 {
640 	switch (fPeerAddress.fProtocol) {
641 		case IPPROTO_TCP:
642 			fSocket = socket(fPeerAddress.Family(), SOCK_STREAM, IPPROTO_TCP);
643 			break;
644 		case IPPROTO_UDP:
645 			fSocket = socket(fPeerAddress.Family(), SOCK_DGRAM, IPPROTO_UDP);
646 			break;
647 		default:
648 			return B_BAD_VALUE;
649 	}
650 	if (fSocket < 0)
651 		return errno;
652 
653 	status_t result;
654 	uint16 port, attempt = 0;
655 
656 	PeerAddress address(fPeerAddress.Family());
657 
658 	do {
659 		port = get_random<uint16>() % (IPPORT_RESERVED - NFS_MIN_PORT);
660 		port += NFS_MIN_PORT;
661 
662 		if (attempt == 9)
663 			port = 0;
664 		attempt++;
665 
666 		address.SetPort(port);
667 		result = bind(fSocket, (sockaddr*)&address.fAddress,
668 			address.AddressSize());
669 	} while (attempt <= 10 && result != B_OK);
670 
671 	if (attempt > 10) {
672 		close(fSocket);
673 		return result;
674 	}
675 
676 	result = connect(fSocket, (sockaddr*)&fPeerAddress.fAddress,
677 		fPeerAddress.AddressSize());
678 	if (result != 0) {
679 		result = errno;
680 		close(fSocket);
681 		return result;
682 	}
683 
684 	return B_OK;
685 }
686 
687 
688 status_t
689 Connection::Reconnect()
690 {
691 	release_sem(fWaitCancel);
692 	close(fSocket);
693 	acquire_sem(fWaitCancel);
694 	return Connect();
695 }
696 
697 
698 void
699 ConnectionBase::Disconnect()
700 {
701 	release_sem(fWaitCancel);
702 
703 	close(fSocket);
704 	fSocket = -1;
705 }
706 
707 
708 status_t
709 ConnectionListener::Listen(ConnectionListener** listener, int networkFamily,
710 	uint16 port)
711 {
712 	ASSERT(listener != NULL);
713 	ASSERT(networkFamily == AF_INET || networkFamily == AF_INET6);
714 
715 	int sock = socket(networkFamily, SOCK_STREAM, IPPROTO_TCP);
716 	if (sock < 0)
717 		return errno;
718 
719 	PeerAddress address(networkFamily);
720 	address.SetPort(port);
721 	address.fProtocol = IPPROTO_TCP;
722 
723 	status_t result = bind(sock, (sockaddr*)&address.fAddress,
724 		address.AddressSize());
725 	if (result != B_OK) {
726 		close(sock);
727 		return errno;
728 	}
729 
730 	if (listen(sock, 5) != B_OK) {
731 		close(sock);
732 		return errno;
733 	}
734 
735 	*listener = new(std::nothrow) ConnectionListener(address);
736 	if (*listener == NULL) {
737 		close(sock);
738 		return B_NO_MEMORY;
739 	}
740 
741 	if ((*listener)->fWaitCancel < B_OK) {
742 		result = (*listener)->fWaitCancel;
743 		close(sock);
744 		delete *listener;
745 		return result;
746 	}
747 
748 	(*listener)->fSocket = sock;
749 
750 	return B_OK;
751 }
752 
753 
754 status_t
755 ConnectionListener::AcceptConnection(Connection** connection)
756 {
757 	ASSERT(connection != NULL);
758 
759 	object_wait_info object[2];
760 	object[0].object = fWaitCancel;
761 	object[0].type = B_OBJECT_TYPE_SEMAPHORE;
762 	object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
763 
764 	object[1].object = fSocket;
765 	object[1].type = B_OBJECT_TYPE_FD;
766 	object[1].events = B_EVENT_READ;
767 
768 	do {
769 		object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
770 		object[1].events = B_EVENT_READ;
771 
772 		status_t result = wait_for_objects(object, 2);
773 		if (result < B_OK
774 			|| (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) {
775 			return ECONNABORTED;
776 		} else if ((object[1].events & B_EVENT_READ) == 0)
777 			continue;
778 		break;
779 	} while (true);
780 
781 	sockaddr_storage addr;
782 	socklen_t length = sizeof(addr);
783 	int sock = accept(fSocket, reinterpret_cast<sockaddr*>(&addr), &length);
784 	if (sock < 0)
785 		return errno;
786 
787 	PeerAddress address;
788 	address.fProtocol = IPPROTO_TCP;
789 	address.fAddress = addr;
790 
791 	status_t result = Connection::SetTo(connection, sock, address);
792 	if (result != B_OK) {
793 		close(sock);
794 		return result;
795 	}
796 
797 	return B_OK;
798 }
799 
800