xref: /haiku/src/kits/network/libnetservices2/HttpSession.cpp (revision b70a0efa9e95a3cfdf9e629ab00f2610fc64e162)
1 /*
2  * Copyright 2022 Haiku Inc. All rights reserved.
3  * Distributed under the terms of the MIT License.
4  *
5  * Authors:
6  *		Niels Sascha Reedijk, niels.reedijk@gmail.com
7  */
8 
9 #include <algorithm>
10 #include <atomic>
11 #include <deque>
12 #include <list>
13 #include <map>
14 #include <optional>
15 #include <vector>
16 
17 #include <AutoLocker.h>
18 #include <DataIO.h>
19 #include <ErrorsExt.h>
20 #include <HttpFields.h>
21 #include <HttpRequest.h>
22 #include <HttpResult.h>
23 #include <HttpSession.h>
24 #include <Locker.h>
25 #include <Messenger.h>
26 #include <NetBuffer.h>
27 #include <NetServicesDefs.h>
28 #include <NetworkAddress.h>
29 #include <OS.h>
30 #include <SecureSocket.h>
31 #include <Socket.h>
32 #include <ZlibCompressionAlgorithm.h>
33 
34 #include "HttpBuffer.h"
35 #include "HttpParser.h"
36 #include "HttpResultPrivate.h"
37 #include "HttpSerializer.h"
38 #include "NetServicesPrivate.h"
39 
40 using namespace std::literals;
41 using namespace BPrivate::Network;
42 
43 
44 /*!
45 	\brief Maximum size of the HTTP Header lines of the message.
46 
47 	In the RFC there is no maximum, but we need to prevent the situation where we keep growing the
48 	internal buffer waiting for the end of line ('\r\n\') characters to occur.
49 */
50 static constexpr ssize_t kMaxHeaderLineSize = 64 * 1024;
51 
52 
53 struct CounterDeleter {
operator ()CounterDeleter54 	void operator()(int32* counter) const noexcept { atomic_add(counter, -1); }
55 };
56 
57 
58 class BHttpSession::Request
59 {
60 public:
61 	Request(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer);
62 
63 	Request(Request& original, const Redirect& redirect);
64 
65 	// States
66 	enum RequestState { InitialState, Connected, RequestSent, ContentReceived };
State() const67 	RequestState State() const noexcept { return fRequestStatus; }
68 
69 	// Result Helpers
Result()70 	std::shared_ptr<HttpResultPrivate> Result() { return fResult; }
71 	void SetError(std::exception_ptr e);
72 
73 	// Helpers for maintaining the connection count
74 	std::pair<BString, int> GetHost() const;
75 	void SetCounter(int32* counter) noexcept;
76 
77 	// Operational methods
78 	void ResolveHostName();
79 	void OpenConnection();
80 	void TransferRequest();
81 	bool ReceiveResult();
82 	void Disconnect() noexcept;
83 
84 	// Object information
Socket() const85 	int Socket() const noexcept { return fSocket->Socket(); }
Id() const86 	int32 Id() const noexcept { return fResult->id; }
CanCancel() const87 	bool CanCancel() const noexcept { return fResult->CanCancel(); }
88 
89 	// Message helper
90 	void SendMessage(uint32 what, std::function<void(BMessage&)> dataFunc = nullptr) const;
91 
92 private:
93 	BHttpRequest fRequest;
94 
95 	// Request state/events
96 	RequestState fRequestStatus = InitialState;
97 
98 	// Communication
99 	BMessenger fObserver;
100 	std::shared_ptr<HttpResultPrivate> fResult;
101 
102 	// Connection
103 	BNetworkAddress fRemoteAddress;
104 	std::unique_ptr<BSocket> fSocket;
105 
106 	// Sending and receiving
107 	HttpBuffer fBuffer;
108 	HttpSerializer fSerializer;
109 	HttpParser fParser;
110 
111 	// Receive state
112 	BHttpStatus fStatus;
113 	BHttpFields fFields;
114 
115 	// Redirection
116 	bool fMightRedirect = false;
117 	int8 fRemainingRedirects;
118 
119 	// Connection counter
120 	std::unique_ptr<int32, CounterDeleter> fConnectionCounter;
121 };
122 
123 
124 class BHttpSession::Impl
125 {
126 public:
127 	Impl();
128 	~Impl() noexcept;
129 
130 	BHttpResult Execute(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer);
131 	void Cancel(int32 identifier);
132 	void SetMaxConnectionsPerHost(size_t maxConnections);
133 	void SetMaxHosts(size_t maxConnections);
134 
135 private:
136 		// Thread functions
137 	static status_t ControlThreadFunc(void* arg);
138 	static status_t DataThreadFunc(void* arg);
139 
140 	// Helper functions
141 	std::vector<BHttpSession::Request> GetRequestsForControlThread();
142 
143 private:
144 		// constants (can be accessed unlocked)
145 	const sem_id fControlQueueSem;
146 	const sem_id fDataQueueSem;
147 	const thread_id fControlThread;
148 	const thread_id fDataThread;
149 
150 	// locking mechanism
151 	BLocker fLock;
152 	std::atomic<bool> fQuitting = false;
153 
154 	// queues & shared data
155 	std::list<BHttpSession::Request> fControlQueue;
156 	std::deque<BHttpSession::Request> fDataQueue;
157 	std::vector<int32> fCancelList;
158 
159 	// data owned by the controlThread
160 	using Host = std::pair<BString, int>;
161 	std::map<Host, int32> fConnectionCount;
162 
163 	// data that can only be accessed atomically
164 	std::atomic<size_t> fMaxConnectionsPerHost = 2;
165 	std::atomic<size_t> fMaxHosts = 10;
166 
167 	// data owned by the dataThread
168 	std::map<int, BHttpSession::Request> connectionMap;
169 	std::vector<object_wait_info> objectList;
170 };
171 
172 
173 struct BHttpSession::Redirect {
174 	BUrl url;
175 	bool redirectToGet;
176 };
177 
178 
179 // #pragma mark -- BHttpSession::Impl
180 
181 
Impl()182 BHttpSession::Impl::Impl()
183 	:
184 	fControlQueueSem(create_sem(0, "http:control")),
185 	fDataQueueSem(create_sem(0, "http:data")),
186 	fControlThread(spawn_thread(ControlThreadFunc, "http:control", B_NORMAL_PRIORITY, this)),
187 	fDataThread(spawn_thread(DataThreadFunc, "http:data", B_NORMAL_PRIORITY, this))
188 {
189 	// check initialization of semaphores
190 	if (fControlQueueSem < 0)
191 		throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create control queue semaphore");
192 	if (fDataQueueSem < 0)
193 		throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create data queue semaphore");
194 
195 	// set up internal threads
196 	if (fControlThread < 0)
197 		throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create control thread");
198 	if (resume_thread(fControlThread) != B_OK)
199 		throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot resume control thread");
200 
201 	if (fDataThread < 0)
202 		throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create data thread");
203 	if (resume_thread(fDataThread) != B_OK)
204 		throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot resume data thread");
205 }
206 
207 
~Impl()208 BHttpSession::Impl::~Impl() noexcept
209 {
210 	fQuitting.store(true);
211 	delete_sem(fControlQueueSem);
212 	delete_sem(fDataQueueSem);
213 	status_t threadResult;
214 	wait_for_thread(fControlThread, &threadResult);
215 		// The control thread waits for the data thread
216 }
217 
218 
219 BHttpResult
Execute(BHttpRequest && request,BBorrow<BDataIO> target,BMessenger observer)220 BHttpSession::Impl::Execute(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer)
221 {
222 	auto wRequest = Request(std::move(request), std::move(target), observer);
223 
224 	auto retval = BHttpResult(wRequest.Result());
225 	auto lock = AutoLocker<BLocker>(fLock);
226 	fControlQueue.push_back(std::move(wRequest));
227 	release_sem(fControlQueueSem);
228 	return retval;
229 }
230 
231 
232 void
Cancel(int32 identifier)233 BHttpSession::Impl::Cancel(int32 identifier)
234 {
235 	auto lock = AutoLocker<BLocker>(fLock);
236 	// Check if the item is on the control queue
237 	fControlQueue.remove_if([&identifier](auto& request) {
238 		if (request.Id() == identifier) {
239 			try {
240 				throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
241 			} catch (...) {
242 				request.SetError(std::current_exception());
243 			}
244 			return true;
245 		}
246 		return false;
247 	});
248 
249 	// Get it on the list for deletion in the data queue
250 	fCancelList.push_back(identifier);
251 	release_sem(fDataQueueSem);
252 }
253 
254 
255 void
SetMaxConnectionsPerHost(size_t maxConnections)256 BHttpSession::Impl::SetMaxConnectionsPerHost(size_t maxConnections)
257 {
258 	if (maxConnections <= 0 || maxConnections >= INT32_MAX) {
259 		throw BRuntimeError(
260 			__PRETTY_FUNCTION__, "MaxConnectionsPerHost must be between 1 and INT32_MAX");
261 	}
262 	fMaxConnectionsPerHost.store(maxConnections, std::memory_order_relaxed);
263 }
264 
265 
266 void
SetMaxHosts(size_t maxConnections)267 BHttpSession::Impl::SetMaxHosts(size_t maxConnections)
268 {
269 	if (maxConnections <= 0)
270 		throw BRuntimeError(__PRETTY_FUNCTION__, "MaxHosts must be 1 or more");
271 	fMaxHosts.store(maxConnections, std::memory_order_relaxed);
272 }
273 
274 
275 /*static*/ status_t
ControlThreadFunc(void * arg)276 BHttpSession::Impl::ControlThreadFunc(void* arg)
277 {
278 	BHttpSession::Impl* impl = static_cast<BHttpSession::Impl*>(arg);
279 
280 	// Outer loop to use the fControlQueueSem when new items have entered the queue
281 	while (true) {
282 		if (auto status = acquire_sem(impl->fControlQueueSem); status == B_INTERRUPTED)
283 			continue;
284 		else if (status != B_OK) {
285 			// Most likely B_BAD_SEM_ID indicating that the sem was deleted; go to cleanup
286 			break;
287 		}
288 
289 		// Check if we have woken up because we are quitting
290 		if (impl->fQuitting.load())
291 			break;
292 
293 		// Get items to process (locking done by the helper)
294 		auto requests = impl->GetRequestsForControlThread();
295 		if (requests.size() == 0)
296 			continue;
297 
298 		for (auto& request: requests) {
299 			bool hasError = false;
300 			try {
301 				request.ResolveHostName();
302 				request.OpenConnection();
303 			} catch (...) {
304 				request.SetError(std::current_exception());
305 				hasError = true;
306 			}
307 
308 			if (hasError) {
309 				// Do not add the request back to the queue; release the sem to do another round
310 				// in case there is another item waiting because the limits of concurrent requests
311 				// were reached
312 				release_sem(impl->fControlQueueSem);
313 				continue;
314 			}
315 
316 			impl->fLock.Lock();
317 			impl->fDataQueue.push_back(std::move(request));
318 			impl->fLock.Unlock();
319 			release_sem(impl->fDataQueueSem);
320 		}
321 	}
322 
323 	// Clean up and make sure we are quitting
324 	if (impl->fQuitting.load()) {
325 		// First wait for the data thread to complete
326 		status_t threadResult;
327 		wait_for_thread(impl->fDataThread, &threadResult);
328 		// Cancel all requests
329 		for (auto& request: impl->fControlQueue) {
330 			try {
331 				throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
332 			} catch (...) {
333 				request.SetError(std::current_exception());
334 			}
335 		}
336 	} else {
337 		throw BRuntimeError(
338 			__PRETTY_FUNCTION__, "Unknown reason that the controlQueueSem is deleted");
339 	}
340 
341 	// Cleanup: wait for data thread
342 	return B_OK;
343 }
344 
345 
346 static constexpr uint16 EVENT_CANCELLED = 0x4000;
347 
348 
349 /*static*/ status_t
DataThreadFunc(void * arg)350 BHttpSession::Impl::DataThreadFunc(void* arg)
351 {
352 	BHttpSession::Impl* data = static_cast<BHttpSession::Impl*>(arg);
353 
354 	// initial initialization of wait list
355 	data->objectList.push_back(
356 		object_wait_info{data->fDataQueueSem, B_OBJECT_TYPE_SEMAPHORE, B_EVENT_ACQUIRE_SEMAPHORE});
357 
358 	while (true) {
359 		if (auto status = wait_for_objects(data->objectList.data(), data->objectList.size());
360 			status == B_INTERRUPTED)
361 			continue;
362 		else if (status < 0) {
363 			// Something went inexplicably wrong
364 			throw BSystemError("wait_for_objects()", status);
365 		}
366 
367 		// First check if the change is in acquiring the sem, meaning that
368 		// there are new requests to be scheduled
369 		if (data->objectList[0].events == B_EVENT_ACQUIRE_SEMAPHORE) {
370 			if (auto status = acquire_sem(data->fDataQueueSem); status == B_INTERRUPTED)
371 				continue;
372 			else if (status != B_OK) {
373 				// Most likely B_BAD_SEM_ID indicating that the sem was deleted
374 				break;
375 			}
376 
377 			// Process the cancelList and dataQueue. Note that there might
378 			// be a situation where a request is cancelled and added in the
379 			// same iteration, but that is taken care by this algorithm.
380 			data->fLock.Lock();
381 			while (!data->fDataQueue.empty()) {
382 				auto request = std::move(data->fDataQueue.front());
383 				data->fDataQueue.pop_front();
384 				auto socket = request.Socket();
385 
386 				data->connectionMap.insert(std::make_pair(socket, std::move(request)));
387 
388 				// Add to objectList
389 				data->objectList.push_back(
390 					object_wait_info{socket, B_OBJECT_TYPE_FD, B_EVENT_WRITE});
391 			}
392 
393 			for (auto id: data->fCancelList) {
394 				// To cancel, we set a special event status on the
395 				// object_wait_info list so that we can handle it below.
396 				// Also: the first item in the waitlist is always the semaphore
397 				// so the fun starts at offset 1.
398 				size_t offset = 0;
399 				for (auto it = data->connectionMap.cbegin(); it != data->connectionMap.cend();
400 					it++) {
401 					offset++;
402 					if (it->second.Id() == id) {
403 						data->objectList[offset].events = EVENT_CANCELLED;
404 						break;
405 					}
406 				}
407 			}
408 			data->fCancelList.clear();
409 			data->fLock.Unlock();
410 		} else if ((data->objectList[0].events & B_EVENT_INVALID) == B_EVENT_INVALID) {
411 			// The semaphore has been deleted. Start the cleanup
412 			break;
413 		}
414 
415 		// Process all objects that are ready
416 		bool resizeObjectList = false;
417 		for (auto& item: data->objectList) {
418 			if (item.type != B_OBJECT_TYPE_FD)
419 				continue;
420 			if ((item.events & B_EVENT_WRITE) == B_EVENT_WRITE) {
421 				auto& request = data->connectionMap.find(item.object)->second;
422 				auto error = false;
423 				try {
424 					request.TransferRequest();
425 				} catch (...) {
426 					request.SetError(std::current_exception());
427 					error = true;
428 				}
429 
430 				// End failed writes
431 				if (error) {
432 					request.Disconnect();
433 					data->connectionMap.erase(item.object);
434 					release_sem(data->fControlQueueSem);
435 						// wake up control thread; there may queued requests unblocked.
436 					resizeObjectList = true;
437 				}
438 			} else if ((item.events & B_EVENT_READ) == B_EVENT_READ) {
439 				auto& request = data->connectionMap.find(item.object)->second;
440 				auto finished = false;
441 				try {
442 					if (request.CanCancel())
443 						finished = true;
444 					else
445 						finished = request.ReceiveResult();
446 				} catch (const Redirect& r) {
447 					// Request is redirected, send back to the controlThread
448 					// Move existing request into a new request and hand over to the control queue
449 					auto lock = AutoLocker<BLocker>(data->fLock);
450 					data->fControlQueue.emplace_back(request, r);
451 					release_sem(data->fControlQueueSem);
452 
453 					finished = true;
454 				} catch (...) {
455 					request.SetError(std::current_exception());
456 					finished = true;
457 				}
458 
459 				if (finished) {
460 					// Clean up finished requests; including redirected requests
461 					request.Disconnect();
462 					data->connectionMap.erase(item.object);
463 					release_sem(data->fControlQueueSem);
464 						// wake up control thread; there may queued requests unblocked.
465 					resizeObjectList = true;
466 				}
467 			} else if ((item.events & B_EVENT_DISCONNECTED) == B_EVENT_DISCONNECTED) {
468 				auto& request = data->connectionMap.find(item.object)->second;
469 				try {
470 					throw BNetworkRequestError(
471 						__PRETTY_FUNCTION__, BNetworkRequestError::NetworkError);
472 				} catch (...) {
473 					request.SetError(std::current_exception());
474 				}
475 				data->connectionMap.erase(item.object);
476 				resizeObjectList = true;
477 			} else if ((item.events & EVENT_CANCELLED) == EVENT_CANCELLED) {
478 				auto& request = data->connectionMap.find(item.object)->second;
479 				request.Disconnect();
480 				try {
481 					throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
482 				} catch (...) {
483 					request.SetError(std::current_exception());
484 				}
485 				data->connectionMap.erase(item.object);
486 				release_sem(data->fControlQueueSem);
487 					// wake up control thread; there may queued requests unblocked.
488 				resizeObjectList = true;
489 			} else if (item.events == 0) {
490 				// No events for this item, skip
491 				continue;
492 			} else {
493 				// Likely to be B_EVENT_INVALID. This should not happen
494 				auto& request = data->connectionMap.find(item.object)->second;
495 				request.SendMessage(UrlEvent::DebugMessage, [](BMessage& msg) {
496 					msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugError);
497 					msg.AddString(UrlEventData::DebugMessage, "Unexpected event; socket deleted?");
498 				});
499 				throw BRuntimeError(
500 					__PRETTY_FUNCTION__, "Socket was deleted at an unexpected time");
501 			}
502 		}
503 
504 		// Reset objectList
505 		data->objectList[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
506 		if (resizeObjectList)
507 			data->objectList.resize(data->connectionMap.size() + 1);
508 
509 		auto i = 1;
510 		for (auto it = data->connectionMap.cbegin(); it != data->connectionMap.cend(); it++) {
511 			data->objectList[i].object = it->first;
512 			if (it->second.State() == Request::InitialState)
513 				throw BRuntimeError(__PRETTY_FUNCTION__, "Invalid state of request");
514 			else if (it->second.State() == Request::Connected)
515 				data->objectList[i].events = B_EVENT_WRITE | B_EVENT_DISCONNECTED;
516 			else
517 				data->objectList[i].events = B_EVENT_READ | B_EVENT_DISCONNECTED;
518 			i++;
519 		}
520 	}
521 	// Clean up and make sure we are quitting
522 	if (data->fQuitting.load()) {
523 		// Cancel all requests
524 		for (auto it = data->connectionMap.begin(); it != data->connectionMap.end(); it++) {
525 			try {
526 				throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
527 			} catch (...) {
528 				it->second.SetError(std::current_exception());
529 			}
530 		}
531 	} else {
532 		throw BRuntimeError(__PRETTY_FUNCTION__, "Unknown reason that the dataQueueSem is deleted");
533 	}
534 
535 	return B_OK;
536 }
537 
538 
539 /*!
540 	\brief Internal helper that filters the lists of requests to guard against the concurrent
541 		requests limit.
542 
543 	This method will do the locking of the internal structure.
544 */
545 std::vector<BHttpSession::Request>
GetRequestsForControlThread()546 BHttpSession::Impl::GetRequestsForControlThread()
547 {
548 	std::vector<BHttpSession::Request> requests;
549 
550 	// Clean up connection list if it is at the max number of hosts
551 	if (fConnectionCount.size() >= fMaxHosts.load()) {
552 		for (auto it = fConnectionCount.begin(); it != fConnectionCount.end();) {
553 			if (atomic_get(std::addressof(it->second)) == 0) {
554 				it = fConnectionCount.erase(it);
555 			} else {
556 				it++;
557 			}
558 		}
559 	}
560 
561 	// Process the list of pending requests and review if they can be started.
562 	auto lock = AutoLocker<BLocker>(fLock);
563 	fControlQueue.remove_if([this, &requests](auto& request) {
564 		auto host = request.GetHost();
565 		auto it = fConnectionCount.find(host);
566 		if (it != fConnectionCount.end()) {
567 			if (static_cast<size_t>(atomic_get(std::addressof(it->second)))
568 				>= fMaxConnectionsPerHost.load(std::memory_order_relaxed)) {
569 				request.SendMessage(UrlEvent::DebugMessage, [](BMessage& msg) {
570 					msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugWarning);
571 					msg.AddString(UrlEventData::DebugMessage,
572 						"Request is queued: too many active connections for host");
573 				});
574 				return false;
575 			} else {
576 				atomic_add(std::addressof(it->second), 1);
577 				request.SetCounter(std::addressof(it->second));
578 			}
579 		} else {
580 			if (fConnectionCount.size() == fMaxHosts.load()) {
581 				request.SendMessage(UrlEvent::DebugMessage, [](BMessage& msg) {
582 					msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugWarning);
583 					msg.AddString(UrlEventData::DebugMessage,
584 						"Request is queued: maximum number of concurrent hosts");
585 				});
586 				return false;
587 			}
588 			auto [newIt, success] = fConnectionCount.insert({host, 1});
589 			if (!success) {
590 				throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot insert into fConnectionCount");
591 			}
592 			request.SetCounter(std::addressof(newIt->second));
593 		}
594 		requests.emplace_back(std::move(request));
595 		return true;
596 	});
597 	return requests;
598 }
599 
600 
601 // #pragma mark -- BHttpSession (public interface)
602 
603 
BHttpSession()604 BHttpSession::BHttpSession()
605 {
606 	fImpl = std::make_shared<BHttpSession::Impl>();
607 }
608 
609 
610 BHttpSession::~BHttpSession() = default;
611 
612 
613 BHttpSession::BHttpSession(const BHttpSession&) noexcept = default;
614 
615 
616 BHttpSession& BHttpSession::operator=(const BHttpSession&) noexcept = default;
617 
618 
619 BHttpResult
Execute(BHttpRequest && request,BBorrow<BDataIO> target,BMessenger observer)620 BHttpSession::Execute(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer)
621 {
622 	return fImpl->Execute(std::move(request), std::move(target), observer);
623 }
624 
625 
626 void
Cancel(int32 identifier)627 BHttpSession::Cancel(int32 identifier)
628 {
629 	fImpl->Cancel(identifier);
630 }
631 
632 
633 void
Cancel(const BHttpResult & request)634 BHttpSession::Cancel(const BHttpResult& request)
635 {
636 	fImpl->Cancel(request.Identity());
637 }
638 
639 
640 void
SetMaxConnectionsPerHost(size_t maxConnections)641 BHttpSession::SetMaxConnectionsPerHost(size_t maxConnections)
642 {
643 	fImpl->SetMaxConnectionsPerHost(maxConnections);
644 }
645 
646 
647 void
SetMaxHosts(size_t maxConnections)648 BHttpSession::SetMaxHosts(size_t maxConnections)
649 {
650 	fImpl->SetMaxHosts(maxConnections);
651 }
652 
653 
654 // #pragma mark -- BHttpSession::Request (helpers)
Request(BHttpRequest && request,BBorrow<BDataIO> target,BMessenger observer)655 BHttpSession::Request::Request(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer)
656 	:
657 	fRequest(std::move(request)),
658 	fObserver(observer)
659 {
660 	auto identifier = get_netservices_request_identifier();
661 
662 	// interpret the remaining redirects
663 	fRemainingRedirects = fRequest.MaxRedirections();
664 
665 	// create shared data
666 	fResult = std::make_shared<HttpResultPrivate>(identifier);
667 
668 	// check if there is a target
669 	if (target.HasValue())
670 		fResult->bodyTarget = std::move(target);
671 
672 	// inform the parser when we do a HEAD request, so not to expect content
673 	if (fRequest.Method() == BHttpMethod::Head)
674 		fParser.SetNoContent();
675 }
676 
677 
Request(Request & original,const BHttpSession::Redirect & redirect)678 BHttpSession::Request::Request(Request& original, const BHttpSession::Redirect& redirect)
679 	:
680 	fRequest(std::move(original.fRequest)),
681 	fObserver(original.fObserver),
682 	fResult(original.fResult)
683 {
684 	// update the original request with the new location
685 	fRequest.SetUrl(redirect.url);
686 
687 	if (redirect.redirectToGet
688 		&& (fRequest.Method() != BHttpMethod::Head && fRequest.Method() != BHttpMethod::Get)) {
689 		fRequest.SetMethod(BHttpMethod::Get);
690 		fRequest.ClearRequestBody();
691 	}
692 
693 	fRemainingRedirects = original.fRemainingRedirects--;
694 
695 	// inform the parser when we do a HEAD request, so not to expect content
696 	if (fRequest.Method() == BHttpMethod::Head)
697 		fParser.SetNoContent();
698 }
699 
700 
701 /*!
702 	\brief Helper that sets the error in the result to \a e and notifies the listeners.
703 */
704 void
SetError(std::exception_ptr e)705 BHttpSession::Request::SetError(std::exception_ptr e)
706 {
707 	fResult->SetError(e);
708 	SendMessage(UrlEvent::DebugMessage, [&e](BMessage& msg) {
709 		msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugError);
710 		try {
711 			std::rethrow_exception(e);
712 		} catch (BError& error) {
713 			msg.AddString(UrlEventData::DebugMessage, error.DebugMessage());
714 		} catch (std::exception& error) {
715 			msg.AddString(UrlEventData::DebugMessage, error.what());
716 		} catch (...) {
717 			msg.AddString(UrlEventData::DebugMessage, "Unknown exception");
718 		}
719 	});
720 	SendMessage(UrlEvent::RequestCompleted,
721 		[](BMessage& msg) { msg.AddBool(UrlEventData::Success, false); });
722 }
723 
724 
725 std::pair<BString, int>
GetHost() const726 BHttpSession::Request::GetHost() const
727 {
728 	return {fRequest.Url().Host(), fRequest.Url().Port()};
729 }
730 
731 
732 void
SetCounter(int32 * counter)733 BHttpSession::Request::SetCounter(int32* counter) noexcept
734 {
735 	fConnectionCounter = std::unique_ptr<int32, CounterDeleter>(counter);
736 }
737 
738 
739 /*!
740 	\brief Resolve the hostname for a request
741 */
742 void
ResolveHostName()743 BHttpSession::Request::ResolveHostName()
744 {
745 	int port;
746 	if (fRequest.Url().HasPort())
747 		port = fRequest.Url().Port();
748 	else if (fRequest.Url().Protocol() == "https")
749 		port = 443;
750 	else
751 		port = 80;
752 
753 	// TODO: proxy
754 	if (auto status = fRemoteAddress.SetTo(fRequest.Url().Host(), port); status != B_OK) {
755 		throw BNetworkRequestError(
756 			"BNetworkAddress::SetTo()", BNetworkRequestError::HostnameError, status);
757 	}
758 
759 	SendMessage(UrlEvent::HostNameResolved,
760 		[this](BMessage& msg) { msg.AddString(UrlEventData::HostName, fRequest.Url().Host()); });
761 }
762 
763 
764 /*!
765 	\brief Open the connection and make the socket non-blocking after opening it
766 */
767 void
OpenConnection()768 BHttpSession::Request::OpenConnection()
769 {
770 	// Set up the socket
771 	if (fRequest.Url().Protocol() == "https") {
772 		// To do: secure socket with callbacks to check certificates
773 		fSocket = std::make_unique<BSecureSocket>();
774 	} else {
775 		fSocket = std::make_unique<BSocket>();
776 	}
777 
778 	// Set timeout
779 	fSocket->SetTimeout(fRequest.Timeout());
780 
781 	// Open connection
782 	if (auto status = fSocket->Connect(fRemoteAddress); status != B_OK) {
783 		// TODO: inform listeners that the connection failed
784 		throw BNetworkRequestError(
785 			"BSocket::Connect()", BNetworkRequestError::NetworkError, status);
786 	}
787 
788 	// Make the rest of the interaction non-blocking
789 	auto flags = fcntl(fSocket->Socket(), F_GETFL, 0);
790 	if (flags == -1)
791 		throw BRuntimeError("fcntl()", "Error getting socket flags");
792 	if (fcntl(fSocket->Socket(), F_SETFL, flags | O_NONBLOCK) != 0)
793 		throw BRuntimeError("fcntl()", "Error setting non-blocking flag on socket");
794 
795 	SendMessage(UrlEvent::ConnectionOpened);
796 
797 	fRequestStatus = Connected;
798 }
799 
800 
801 /*!
802 	\brief Transfer data from the request to the socket.
803 
804 	\returns \c true if the request is complete, or false if there is more.
805 */
806 void
TransferRequest()807 BHttpSession::Request::TransferRequest()
808 {
809 	// Assert that we are in the right state
810 	if (fRequestStatus != Connected)
811 		throw BRuntimeError(
812 			__PRETTY_FUNCTION__, "Write request for object that is not in the Connected state");
813 
814 	if (!fSerializer.IsInitialized())
815 		fSerializer.SetTo(fBuffer, fRequest);
816 
817 	auto currentBytesWritten = fSerializer.Serialize(fBuffer, fSocket.get());
818 
819 	if (currentBytesWritten > 0) {
820 		SendMessage(UrlEvent::UploadProgress, [this](BMessage& msg) {
821 			msg.AddInt64(UrlEventData::NumBytes, fSerializer.BodyBytesTransferred());
822 			if (auto totalSize = fSerializer.BodyBytesTotal())
823 				msg.AddInt64(UrlEventData::TotalBytes, totalSize.value());
824 		});
825 	}
826 
827 	if (fSerializer.Complete())
828 		fRequestStatus = RequestSent;
829 }
830 
831 
832 /*!
833 	\brief Transfer data from the socket and parse the result.
834 
835 	\returns \c true if the request is complete, or false if there is more.
836 */
837 bool
ReceiveResult()838 BHttpSession::Request::ReceiveResult()
839 {
840 	// First: stream data from the socket
841 	auto bytesRead = fBuffer.ReadFrom(fSocket.get());
842 
843 	if (bytesRead == B_WOULD_BLOCK || bytesRead == B_INTERRUPTED)
844 		return false;
845 
846 	auto readEnd = bytesRead == 0;
847 
848 	// Parse the content in the buffer
849 	switch (fParser.State()) {
850 		case HttpInputStreamState::StatusLine:
851 		{
852 			if (fBuffer.RemainingBytes() == static_cast<size_t>(bytesRead)) {
853 				// In the initial run, the bytes in the buffer will match the bytes read to indicate
854 				// the response has started.
855 				SendMessage(UrlEvent::ResponseStarted);
856 			}
857 
858 			if (fParser.ParseStatus(fBuffer, fStatus)) {
859 				// the status headers are now received, decide what to do next
860 
861 				// Determine if we can handle redirects; else notify of receiving status
862 				if (fRemainingRedirects > 0) {
863 					switch (fStatus.StatusCode()) {
864 						case BHttpStatusCode::MovedPermanently:
865 						case BHttpStatusCode::TemporaryRedirect:
866 						case BHttpStatusCode::PermanentRedirect:
867 							// These redirects require the request body to be sent again. It this is
868 							// possible, BHttpRequest::RewindBody() will return true in which case
869 							// we can handle the redirect.
870 							if (!fRequest.RewindBody())
871 								break;
872 							[[fallthrough]];
873 						case BHttpStatusCode::Found:
874 						case BHttpStatusCode::SeeOther:
875 							// These redirects redirect to GET, so we don't care if we can rewind
876 							// the body; in this case redirect
877 							fMightRedirect = true;
878 							break;
879 						default:
880 							break;
881 					}
882 				}
883 
884 				if ((fStatus.StatusClass() == BHttpStatusClass::ClientError
885 						|| fStatus.StatusClass() == BHttpStatusClass::ServerError)
886 					&& fRequest.StopOnError()) {
887 					fRequestStatus = ContentReceived;
888 					fResult->SetStatus(std::move(fStatus));
889 					fResult->SetFields(BHttpFields());
890 					fResult->SetBody();
891 					SendMessage(UrlEvent::RequestCompleted,
892 						[](BMessage& msg) { msg.AddBool(UrlEventData::Success, true); });
893 					return true;
894 				}
895 
896 				if (!fMightRedirect) {
897 					// we are not redirecting and there is no error, so inform listeners
898 					SendMessage(UrlEvent::HttpStatus, [this](BMessage& msg) {
899 						msg.AddInt16(UrlEventData::HttpStatusCode, fStatus.code);
900 					});
901 					fResult->SetStatus(BHttpStatus{fStatus.code, std::move(fStatus.text)});
902 				}
903 			} else {
904 				// We do not have enough data for the status line yet
905 				if (readEnd) {
906 					throw BNetworkRequestError(__PRETTY_FUNCTION__,
907 						BNetworkRequestError::ProtocolError,
908 						"Response did not include a complete status line");
909 				}
910 				return false;
911 			}
912 			[[fallthrough]];
913 		}
914 		case HttpInputStreamState::Fields:
915 		{
916 			if (!fParser.ParseFields(fBuffer, fFields)) {
917 				// there may be more headers to receive, throw an error if there will be no more
918 				if (readEnd) {
919 					throw BNetworkRequestError(__PRETTY_FUNCTION__,
920 						BNetworkRequestError::ProtocolError,
921 						"Response did not include a complete header section");
922 				}
923 				break;
924 			}
925 
926 			// The headers have been received, now set up the rest of the response handling
927 
928 			// Handle redirects
929 			if (fMightRedirect) {
930 				auto redirectToGet = false;
931 				switch (fStatus.StatusCode()) {
932 					case BHttpStatusCode::Found:
933 					case BHttpStatusCode::SeeOther:
934 						// 302 and 303 redirections convert all requests to GET request, except for
935 						// HEAD
936 						redirectToGet = true;
937 						[[fallthrough]];
938 					case BHttpStatusCode::MovedPermanently:
939 					case BHttpStatusCode::TemporaryRedirect:
940 					case BHttpStatusCode::PermanentRedirect:
941 					{
942 						auto locationField = fFields.FindField("Location");
943 						if (locationField == fFields.end()) {
944 							throw BNetworkRequestError(__PRETTY_FUNCTION__,
945 								BNetworkRequestError::ProtocolError,
946 								"Redirect; the Location field must be present and cannot be found");
947 						}
948 						auto locationString = BString(
949 							(*locationField).Value().data(), (*locationField).Value().size());
950 						auto redirect = BHttpSession::Redirect{
951 							BUrl(fRequest.Url(), locationString), redirectToGet};
952 						if (!redirect.url.IsValid()) {
953 							throw BNetworkRequestError(__PRETTY_FUNCTION__,
954 								BNetworkRequestError::ProtocolError,
955 								"Redirect; invalid URL in the Location field");
956 						}
957 
958 						// Notify of redirect
959 						SendMessage(UrlEvent::HttpRedirect, [&locationString](BMessage& msg) {
960 							msg.AddString(UrlEventData::HttpRedirectUrl, locationString);
961 						});
962 						throw redirect;
963 					}
964 					default:
965 						// ignore other status codes and continue regular processing
966 						SendMessage(UrlEvent::HttpStatus, [this](BMessage& msg) {
967 							msg.AddInt16(UrlEventData::HttpStatusCode, fStatus.code);
968 						});
969 						fResult->SetStatus(BHttpStatus{fStatus.code, std::move(fStatus.text)});
970 						break;
971 				}
972 			}
973 
974 			// TODO: Parse received cookies
975 
976 			// Move headers to the result and inform listener
977 			fResult->SetFields(std::move(fFields));
978 			SendMessage(UrlEvent::HttpFields);
979 
980 			if (!fParser.HasContent()) {
981 				// Any requests with not content are finished
982 				fResult->SetBody();
983 				SendMessage(UrlEvent::RequestCompleted,
984 					[](BMessage& msg) { msg.AddBool(UrlEventData::Success, true); });
985 				fRequestStatus = ContentReceived;
986 				return true;
987 			}
988 			[[fallthrough]];
989 		}
990 		case HttpInputStreamState::Body:
991 		{
992 			size_t bytesWrittenToBody;
993 				// The bytesWrittenToBody may differ from the bytes parsed from the buffer when
994 				// there is compression on the incoming stream.
995 			bytesRead = fParser.ParseBody(
996 				fBuffer,
997 				[this, &bytesWrittenToBody](const std::byte* buffer, size_t size) {
998 					bytesWrittenToBody = fResult->WriteToBody(buffer, size);
999 					return bytesWrittenToBody;
1000 				},
1001 				readEnd);
1002 
1003 			SendMessage(UrlEvent::DownloadProgress, [this, bytesRead](BMessage& msg) {
1004 				msg.AddInt64(UrlEventData::NumBytes, bytesRead);
1005 				if (fParser.BodyBytesTotal())
1006 					msg.AddInt64(UrlEventData::TotalBytes, fParser.BodyBytesTotal().value());
1007 			});
1008 
1009 			if (bytesWrittenToBody > 0) {
1010 				SendMessage(UrlEvent::BytesWritten, [bytesWrittenToBody](BMessage& msg) {
1011 					msg.AddInt64(UrlEventData::NumBytes, bytesWrittenToBody);
1012 				});
1013 			}
1014 
1015 			if (fParser.Complete()) {
1016 				fResult->SetBody();
1017 				SendMessage(UrlEvent::RequestCompleted,
1018 					[](BMessage& msg) { msg.AddBool(UrlEventData::Success, true); });
1019 				fRequestStatus = ContentReceived;
1020 				return true;
1021 			} else if (readEnd) {
1022 				// the parsing of the body is not complete but we are at the end of the data
1023 				throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::ProtocolError,
1024 					"Unexpected end of data: more data was expected");
1025 			}
1026 
1027 			break;
1028 		}
1029 		default:
1030 			throw BRuntimeError(__PRETTY_FUNCTION__, "Not reachable");
1031 	}
1032 
1033 	// There is more to receive
1034 	return false;
1035 }
1036 
1037 
1038 /*!
1039 	\brief Disconnect the socket. Does not validate if it actually succeeded.
1040 */
1041 void
Disconnect()1042 BHttpSession::Request::Disconnect() noexcept
1043 {
1044 	fSocket->Disconnect();
1045 }
1046 
1047 
1048 /*!
1049 	\brief Send a message to the observer, if one is present
1050 
1051 	\param what The code of the message to be sent
1052 	\param dataFunc Optional function that adds additional data to the message.
1053 */
1054 void
SendMessage(uint32 what,std::function<void (BMessage &)> dataFunc) const1055 BHttpSession::Request::SendMessage(uint32 what, std::function<void(BMessage&)> dataFunc) const
1056 {
1057 	if (fObserver.IsValid()) {
1058 		BMessage msg(what);
1059 		msg.AddInt32(UrlEventData::Id, fResult->id);
1060 		if (dataFunc)
1061 			dataFunc(msg);
1062 		fObserver.SendMessage(&msg);
1063 	}
1064 }
1065 
1066 
1067 // #pragma mark -- Message constants
1068 
1069 
1070 namespace BPrivate::Network::UrlEventData {
1071 const char* HttpStatusCode = "url:httpstatuscode";
1072 const char* SSLCertificate = "url:sslcertificate";
1073 const char* SSLMessage = "url:sslmessage";
1074 const char* HttpRedirectUrl = "url:httpredirecturl";
1075 } // namespace BPrivate::Network::UrlEventData
1076