1 /*
2 * Copyright 2022 Haiku Inc. All rights reserved.
3 * Distributed under the terms of the MIT License.
4 *
5 * Authors:
6 * Niels Sascha Reedijk, niels.reedijk@gmail.com
7 */
8
9 #include <algorithm>
10 #include <atomic>
11 #include <deque>
12 #include <list>
13 #include <map>
14 #include <optional>
15 #include <vector>
16
17 #include <AutoLocker.h>
18 #include <DataIO.h>
19 #include <ErrorsExt.h>
20 #include <HttpFields.h>
21 #include <HttpRequest.h>
22 #include <HttpResult.h>
23 #include <HttpSession.h>
24 #include <Locker.h>
25 #include <Messenger.h>
26 #include <NetBuffer.h>
27 #include <NetServicesDefs.h>
28 #include <NetworkAddress.h>
29 #include <OS.h>
30 #include <SecureSocket.h>
31 #include <Socket.h>
32 #include <ZlibCompressionAlgorithm.h>
33
34 #include "HttpBuffer.h"
35 #include "HttpParser.h"
36 #include "HttpResultPrivate.h"
37 #include "HttpSerializer.h"
38 #include "NetServicesPrivate.h"
39
40 using namespace std::literals;
41 using namespace BPrivate::Network;
42
43
44 /*!
45 \brief Maximum size of the HTTP Header lines of the message.
46
47 In the RFC there is no maximum, but we need to prevent the situation where we keep growing the
48 internal buffer waiting for the end of line ('\r\n\') characters to occur.
49 */
50 static constexpr ssize_t kMaxHeaderLineSize = 64 * 1024;
51
52
53 struct CounterDeleter {
operator ()CounterDeleter54 void operator()(int32* counter) const noexcept { atomic_add(counter, -1); }
55 };
56
57
58 class BHttpSession::Request
59 {
60 public:
61 Request(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer);
62
63 Request(Request& original, const Redirect& redirect);
64
65 // States
66 enum RequestState { InitialState, Connected, RequestSent, ContentReceived };
State() const67 RequestState State() const noexcept { return fRequestStatus; }
68
69 // Result Helpers
Result()70 std::shared_ptr<HttpResultPrivate> Result() { return fResult; }
71 void SetError(std::exception_ptr e);
72
73 // Helpers for maintaining the connection count
74 std::pair<BString, int> GetHost() const;
75 void SetCounter(int32* counter) noexcept;
76
77 // Operational methods
78 void ResolveHostName();
79 void OpenConnection();
80 void TransferRequest();
81 bool ReceiveResult();
82 void Disconnect() noexcept;
83
84 // Object information
Socket() const85 int Socket() const noexcept { return fSocket->Socket(); }
Id() const86 int32 Id() const noexcept { return fResult->id; }
CanCancel() const87 bool CanCancel() const noexcept { return fResult->CanCancel(); }
88
89 // Message helper
90 void SendMessage(uint32 what, std::function<void(BMessage&)> dataFunc = nullptr) const;
91
92 private:
93 BHttpRequest fRequest;
94
95 // Request state/events
96 RequestState fRequestStatus = InitialState;
97
98 // Communication
99 BMessenger fObserver;
100 std::shared_ptr<HttpResultPrivate> fResult;
101
102 // Connection
103 BNetworkAddress fRemoteAddress;
104 std::unique_ptr<BSocket> fSocket;
105
106 // Sending and receiving
107 HttpBuffer fBuffer;
108 HttpSerializer fSerializer;
109 HttpParser fParser;
110
111 // Receive state
112 BHttpStatus fStatus;
113 BHttpFields fFields;
114
115 // Redirection
116 bool fMightRedirect = false;
117 int8 fRemainingRedirects;
118
119 // Connection counter
120 std::unique_ptr<int32, CounterDeleter> fConnectionCounter;
121 };
122
123
124 class BHttpSession::Impl
125 {
126 public:
127 Impl();
128 ~Impl() noexcept;
129
130 BHttpResult Execute(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer);
131 void Cancel(int32 identifier);
132 void SetMaxConnectionsPerHost(size_t maxConnections);
133 void SetMaxHosts(size_t maxConnections);
134
135 private:
136 // Thread functions
137 static status_t ControlThreadFunc(void* arg);
138 static status_t DataThreadFunc(void* arg);
139
140 // Helper functions
141 std::vector<BHttpSession::Request> GetRequestsForControlThread();
142
143 private:
144 // constants (can be accessed unlocked)
145 const sem_id fControlQueueSem;
146 const sem_id fDataQueueSem;
147 const thread_id fControlThread;
148 const thread_id fDataThread;
149
150 // locking mechanism
151 BLocker fLock;
152 std::atomic<bool> fQuitting = false;
153
154 // queues & shared data
155 std::list<BHttpSession::Request> fControlQueue;
156 std::deque<BHttpSession::Request> fDataQueue;
157 std::vector<int32> fCancelList;
158
159 // data owned by the controlThread
160 using Host = std::pair<BString, int>;
161 std::map<Host, int32> fConnectionCount;
162
163 // data that can only be accessed atomically
164 std::atomic<size_t> fMaxConnectionsPerHost = 2;
165 std::atomic<size_t> fMaxHosts = 10;
166
167 // data owned by the dataThread
168 std::map<int, BHttpSession::Request> connectionMap;
169 std::vector<object_wait_info> objectList;
170 };
171
172
173 struct BHttpSession::Redirect {
174 BUrl url;
175 bool redirectToGet;
176 };
177
178
179 // #pragma mark -- BHttpSession::Impl
180
181
Impl()182 BHttpSession::Impl::Impl()
183 :
184 fControlQueueSem(create_sem(0, "http:control")),
185 fDataQueueSem(create_sem(0, "http:data")),
186 fControlThread(spawn_thread(ControlThreadFunc, "http:control", B_NORMAL_PRIORITY, this)),
187 fDataThread(spawn_thread(DataThreadFunc, "http:data", B_NORMAL_PRIORITY, this))
188 {
189 // check initialization of semaphores
190 if (fControlQueueSem < 0)
191 throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create control queue semaphore");
192 if (fDataQueueSem < 0)
193 throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create data queue semaphore");
194
195 // set up internal threads
196 if (fControlThread < 0)
197 throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create control thread");
198 if (resume_thread(fControlThread) != B_OK)
199 throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot resume control thread");
200
201 if (fDataThread < 0)
202 throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create data thread");
203 if (resume_thread(fDataThread) != B_OK)
204 throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot resume data thread");
205 }
206
207
~Impl()208 BHttpSession::Impl::~Impl() noexcept
209 {
210 fQuitting.store(true);
211 delete_sem(fControlQueueSem);
212 delete_sem(fDataQueueSem);
213 status_t threadResult;
214 wait_for_thread(fControlThread, &threadResult);
215 // The control thread waits for the data thread
216 }
217
218
219 BHttpResult
Execute(BHttpRequest && request,BBorrow<BDataIO> target,BMessenger observer)220 BHttpSession::Impl::Execute(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer)
221 {
222 auto wRequest = Request(std::move(request), std::move(target), observer);
223
224 auto retval = BHttpResult(wRequest.Result());
225 auto lock = AutoLocker<BLocker>(fLock);
226 fControlQueue.push_back(std::move(wRequest));
227 release_sem(fControlQueueSem);
228 return retval;
229 }
230
231
232 void
Cancel(int32 identifier)233 BHttpSession::Impl::Cancel(int32 identifier)
234 {
235 auto lock = AutoLocker<BLocker>(fLock);
236 // Check if the item is on the control queue
237 fControlQueue.remove_if([&identifier](auto& request) {
238 if (request.Id() == identifier) {
239 try {
240 throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
241 } catch (...) {
242 request.SetError(std::current_exception());
243 }
244 return true;
245 }
246 return false;
247 });
248
249 // Get it on the list for deletion in the data queue
250 fCancelList.push_back(identifier);
251 release_sem(fDataQueueSem);
252 }
253
254
255 void
SetMaxConnectionsPerHost(size_t maxConnections)256 BHttpSession::Impl::SetMaxConnectionsPerHost(size_t maxConnections)
257 {
258 if (maxConnections <= 0 || maxConnections >= INT32_MAX) {
259 throw BRuntimeError(
260 __PRETTY_FUNCTION__, "MaxConnectionsPerHost must be between 1 and INT32_MAX");
261 }
262 fMaxConnectionsPerHost.store(maxConnections, std::memory_order_relaxed);
263 }
264
265
266 void
SetMaxHosts(size_t maxConnections)267 BHttpSession::Impl::SetMaxHosts(size_t maxConnections)
268 {
269 if (maxConnections <= 0)
270 throw BRuntimeError(__PRETTY_FUNCTION__, "MaxHosts must be 1 or more");
271 fMaxHosts.store(maxConnections, std::memory_order_relaxed);
272 }
273
274
275 /*static*/ status_t
ControlThreadFunc(void * arg)276 BHttpSession::Impl::ControlThreadFunc(void* arg)
277 {
278 BHttpSession::Impl* impl = static_cast<BHttpSession::Impl*>(arg);
279
280 // Outer loop to use the fControlQueueSem when new items have entered the queue
281 while (true) {
282 if (auto status = acquire_sem(impl->fControlQueueSem); status == B_INTERRUPTED)
283 continue;
284 else if (status != B_OK) {
285 // Most likely B_BAD_SEM_ID indicating that the sem was deleted; go to cleanup
286 break;
287 }
288
289 // Check if we have woken up because we are quitting
290 if (impl->fQuitting.load())
291 break;
292
293 // Get items to process (locking done by the helper)
294 auto requests = impl->GetRequestsForControlThread();
295 if (requests.size() == 0)
296 continue;
297
298 for (auto& request: requests) {
299 bool hasError = false;
300 try {
301 request.ResolveHostName();
302 request.OpenConnection();
303 } catch (...) {
304 request.SetError(std::current_exception());
305 hasError = true;
306 }
307
308 if (hasError) {
309 // Do not add the request back to the queue; release the sem to do another round
310 // in case there is another item waiting because the limits of concurrent requests
311 // were reached
312 release_sem(impl->fControlQueueSem);
313 continue;
314 }
315
316 impl->fLock.Lock();
317 impl->fDataQueue.push_back(std::move(request));
318 impl->fLock.Unlock();
319 release_sem(impl->fDataQueueSem);
320 }
321 }
322
323 // Clean up and make sure we are quitting
324 if (impl->fQuitting.load()) {
325 // First wait for the data thread to complete
326 status_t threadResult;
327 wait_for_thread(impl->fDataThread, &threadResult);
328 // Cancel all requests
329 for (auto& request: impl->fControlQueue) {
330 try {
331 throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
332 } catch (...) {
333 request.SetError(std::current_exception());
334 }
335 }
336 } else {
337 throw BRuntimeError(
338 __PRETTY_FUNCTION__, "Unknown reason that the controlQueueSem is deleted");
339 }
340
341 // Cleanup: wait for data thread
342 return B_OK;
343 }
344
345
346 static constexpr uint16 EVENT_CANCELLED = 0x4000;
347
348
349 /*static*/ status_t
DataThreadFunc(void * arg)350 BHttpSession::Impl::DataThreadFunc(void* arg)
351 {
352 BHttpSession::Impl* data = static_cast<BHttpSession::Impl*>(arg);
353
354 // initial initialization of wait list
355 data->objectList.push_back(
356 object_wait_info{data->fDataQueueSem, B_OBJECT_TYPE_SEMAPHORE, B_EVENT_ACQUIRE_SEMAPHORE});
357
358 while (true) {
359 if (auto status = wait_for_objects(data->objectList.data(), data->objectList.size());
360 status == B_INTERRUPTED)
361 continue;
362 else if (status < 0) {
363 // Something went inexplicably wrong
364 throw BSystemError("wait_for_objects()", status);
365 }
366
367 // First check if the change is in acquiring the sem, meaning that
368 // there are new requests to be scheduled
369 if (data->objectList[0].events == B_EVENT_ACQUIRE_SEMAPHORE) {
370 if (auto status = acquire_sem(data->fDataQueueSem); status == B_INTERRUPTED)
371 continue;
372 else if (status != B_OK) {
373 // Most likely B_BAD_SEM_ID indicating that the sem was deleted
374 break;
375 }
376
377 // Process the cancelList and dataQueue. Note that there might
378 // be a situation where a request is cancelled and added in the
379 // same iteration, but that is taken care by this algorithm.
380 data->fLock.Lock();
381 while (!data->fDataQueue.empty()) {
382 auto request = std::move(data->fDataQueue.front());
383 data->fDataQueue.pop_front();
384 auto socket = request.Socket();
385
386 data->connectionMap.insert(std::make_pair(socket, std::move(request)));
387
388 // Add to objectList
389 data->objectList.push_back(
390 object_wait_info{socket, B_OBJECT_TYPE_FD, B_EVENT_WRITE});
391 }
392
393 for (auto id: data->fCancelList) {
394 // To cancel, we set a special event status on the
395 // object_wait_info list so that we can handle it below.
396 // Also: the first item in the waitlist is always the semaphore
397 // so the fun starts at offset 1.
398 size_t offset = 0;
399 for (auto it = data->connectionMap.cbegin(); it != data->connectionMap.cend();
400 it++) {
401 offset++;
402 if (it->second.Id() == id) {
403 data->objectList[offset].events = EVENT_CANCELLED;
404 break;
405 }
406 }
407 }
408 data->fCancelList.clear();
409 data->fLock.Unlock();
410 } else if ((data->objectList[0].events & B_EVENT_INVALID) == B_EVENT_INVALID) {
411 // The semaphore has been deleted. Start the cleanup
412 break;
413 }
414
415 // Process all objects that are ready
416 bool resizeObjectList = false;
417 for (auto& item: data->objectList) {
418 if (item.type != B_OBJECT_TYPE_FD)
419 continue;
420 if ((item.events & B_EVENT_WRITE) == B_EVENT_WRITE) {
421 auto& request = data->connectionMap.find(item.object)->second;
422 auto error = false;
423 try {
424 request.TransferRequest();
425 } catch (...) {
426 request.SetError(std::current_exception());
427 error = true;
428 }
429
430 // End failed writes
431 if (error) {
432 request.Disconnect();
433 data->connectionMap.erase(item.object);
434 release_sem(data->fControlQueueSem);
435 // wake up control thread; there may queued requests unblocked.
436 resizeObjectList = true;
437 }
438 } else if ((item.events & B_EVENT_READ) == B_EVENT_READ) {
439 auto& request = data->connectionMap.find(item.object)->second;
440 auto finished = false;
441 try {
442 if (request.CanCancel())
443 finished = true;
444 else
445 finished = request.ReceiveResult();
446 } catch (const Redirect& r) {
447 // Request is redirected, send back to the controlThread
448 // Move existing request into a new request and hand over to the control queue
449 auto lock = AutoLocker<BLocker>(data->fLock);
450 data->fControlQueue.emplace_back(request, r);
451 release_sem(data->fControlQueueSem);
452
453 finished = true;
454 } catch (...) {
455 request.SetError(std::current_exception());
456 finished = true;
457 }
458
459 if (finished) {
460 // Clean up finished requests; including redirected requests
461 request.Disconnect();
462 data->connectionMap.erase(item.object);
463 release_sem(data->fControlQueueSem);
464 // wake up control thread; there may queued requests unblocked.
465 resizeObjectList = true;
466 }
467 } else if ((item.events & B_EVENT_DISCONNECTED) == B_EVENT_DISCONNECTED) {
468 auto& request = data->connectionMap.find(item.object)->second;
469 try {
470 throw BNetworkRequestError(
471 __PRETTY_FUNCTION__, BNetworkRequestError::NetworkError);
472 } catch (...) {
473 request.SetError(std::current_exception());
474 }
475 data->connectionMap.erase(item.object);
476 resizeObjectList = true;
477 } else if ((item.events & EVENT_CANCELLED) == EVENT_CANCELLED) {
478 auto& request = data->connectionMap.find(item.object)->second;
479 request.Disconnect();
480 try {
481 throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
482 } catch (...) {
483 request.SetError(std::current_exception());
484 }
485 data->connectionMap.erase(item.object);
486 release_sem(data->fControlQueueSem);
487 // wake up control thread; there may queued requests unblocked.
488 resizeObjectList = true;
489 } else if (item.events == 0) {
490 // No events for this item, skip
491 continue;
492 } else {
493 // Likely to be B_EVENT_INVALID. This should not happen
494 auto& request = data->connectionMap.find(item.object)->second;
495 request.SendMessage(UrlEvent::DebugMessage, [](BMessage& msg) {
496 msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugError);
497 msg.AddString(UrlEventData::DebugMessage, "Unexpected event; socket deleted?");
498 });
499 throw BRuntimeError(
500 __PRETTY_FUNCTION__, "Socket was deleted at an unexpected time");
501 }
502 }
503
504 // Reset objectList
505 data->objectList[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
506 if (resizeObjectList)
507 data->objectList.resize(data->connectionMap.size() + 1);
508
509 auto i = 1;
510 for (auto it = data->connectionMap.cbegin(); it != data->connectionMap.cend(); it++) {
511 data->objectList[i].object = it->first;
512 if (it->second.State() == Request::InitialState)
513 throw BRuntimeError(__PRETTY_FUNCTION__, "Invalid state of request");
514 else if (it->second.State() == Request::Connected)
515 data->objectList[i].events = B_EVENT_WRITE | B_EVENT_DISCONNECTED;
516 else
517 data->objectList[i].events = B_EVENT_READ | B_EVENT_DISCONNECTED;
518 i++;
519 }
520 }
521 // Clean up and make sure we are quitting
522 if (data->fQuitting.load()) {
523 // Cancel all requests
524 for (auto it = data->connectionMap.begin(); it != data->connectionMap.end(); it++) {
525 try {
526 throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
527 } catch (...) {
528 it->second.SetError(std::current_exception());
529 }
530 }
531 } else {
532 throw BRuntimeError(__PRETTY_FUNCTION__, "Unknown reason that the dataQueueSem is deleted");
533 }
534
535 return B_OK;
536 }
537
538
539 /*!
540 \brief Internal helper that filters the lists of requests to guard against the concurrent
541 requests limit.
542
543 This method will do the locking of the internal structure.
544 */
545 std::vector<BHttpSession::Request>
GetRequestsForControlThread()546 BHttpSession::Impl::GetRequestsForControlThread()
547 {
548 std::vector<BHttpSession::Request> requests;
549
550 // Clean up connection list if it is at the max number of hosts
551 if (fConnectionCount.size() >= fMaxHosts.load()) {
552 for (auto it = fConnectionCount.begin(); it != fConnectionCount.end();) {
553 if (atomic_get(std::addressof(it->second)) == 0) {
554 it = fConnectionCount.erase(it);
555 } else {
556 it++;
557 }
558 }
559 }
560
561 // Process the list of pending requests and review if they can be started.
562 auto lock = AutoLocker<BLocker>(fLock);
563 fControlQueue.remove_if([this, &requests](auto& request) {
564 auto host = request.GetHost();
565 auto it = fConnectionCount.find(host);
566 if (it != fConnectionCount.end()) {
567 if (static_cast<size_t>(atomic_get(std::addressof(it->second)))
568 >= fMaxConnectionsPerHost.load(std::memory_order_relaxed)) {
569 request.SendMessage(UrlEvent::DebugMessage, [](BMessage& msg) {
570 msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugWarning);
571 msg.AddString(UrlEventData::DebugMessage,
572 "Request is queued: too many active connections for host");
573 });
574 return false;
575 } else {
576 atomic_add(std::addressof(it->second), 1);
577 request.SetCounter(std::addressof(it->second));
578 }
579 } else {
580 if (fConnectionCount.size() == fMaxHosts.load()) {
581 request.SendMessage(UrlEvent::DebugMessage, [](BMessage& msg) {
582 msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugWarning);
583 msg.AddString(UrlEventData::DebugMessage,
584 "Request is queued: maximum number of concurrent hosts");
585 });
586 return false;
587 }
588 auto [newIt, success] = fConnectionCount.insert({host, 1});
589 if (!success) {
590 throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot insert into fConnectionCount");
591 }
592 request.SetCounter(std::addressof(newIt->second));
593 }
594 requests.emplace_back(std::move(request));
595 return true;
596 });
597 return requests;
598 }
599
600
601 // #pragma mark -- BHttpSession (public interface)
602
603
BHttpSession()604 BHttpSession::BHttpSession()
605 {
606 fImpl = std::make_shared<BHttpSession::Impl>();
607 }
608
609
610 BHttpSession::~BHttpSession() = default;
611
612
613 BHttpSession::BHttpSession(const BHttpSession&) noexcept = default;
614
615
616 BHttpSession& BHttpSession::operator=(const BHttpSession&) noexcept = default;
617
618
619 BHttpResult
Execute(BHttpRequest && request,BBorrow<BDataIO> target,BMessenger observer)620 BHttpSession::Execute(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer)
621 {
622 return fImpl->Execute(std::move(request), std::move(target), observer);
623 }
624
625
626 void
Cancel(int32 identifier)627 BHttpSession::Cancel(int32 identifier)
628 {
629 fImpl->Cancel(identifier);
630 }
631
632
633 void
Cancel(const BHttpResult & request)634 BHttpSession::Cancel(const BHttpResult& request)
635 {
636 fImpl->Cancel(request.Identity());
637 }
638
639
640 void
SetMaxConnectionsPerHost(size_t maxConnections)641 BHttpSession::SetMaxConnectionsPerHost(size_t maxConnections)
642 {
643 fImpl->SetMaxConnectionsPerHost(maxConnections);
644 }
645
646
647 void
SetMaxHosts(size_t maxConnections)648 BHttpSession::SetMaxHosts(size_t maxConnections)
649 {
650 fImpl->SetMaxHosts(maxConnections);
651 }
652
653
654 // #pragma mark -- BHttpSession::Request (helpers)
Request(BHttpRequest && request,BBorrow<BDataIO> target,BMessenger observer)655 BHttpSession::Request::Request(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer)
656 :
657 fRequest(std::move(request)),
658 fObserver(observer)
659 {
660 auto identifier = get_netservices_request_identifier();
661
662 // interpret the remaining redirects
663 fRemainingRedirects = fRequest.MaxRedirections();
664
665 // create shared data
666 fResult = std::make_shared<HttpResultPrivate>(identifier);
667
668 // check if there is a target
669 if (target.HasValue())
670 fResult->bodyTarget = std::move(target);
671
672 // inform the parser when we do a HEAD request, so not to expect content
673 if (fRequest.Method() == BHttpMethod::Head)
674 fParser.SetNoContent();
675 }
676
677
Request(Request & original,const BHttpSession::Redirect & redirect)678 BHttpSession::Request::Request(Request& original, const BHttpSession::Redirect& redirect)
679 :
680 fRequest(std::move(original.fRequest)),
681 fObserver(original.fObserver),
682 fResult(original.fResult)
683 {
684 // update the original request with the new location
685 fRequest.SetUrl(redirect.url);
686
687 if (redirect.redirectToGet
688 && (fRequest.Method() != BHttpMethod::Head && fRequest.Method() != BHttpMethod::Get)) {
689 fRequest.SetMethod(BHttpMethod::Get);
690 fRequest.ClearRequestBody();
691 }
692
693 fRemainingRedirects = original.fRemainingRedirects--;
694
695 // inform the parser when we do a HEAD request, so not to expect content
696 if (fRequest.Method() == BHttpMethod::Head)
697 fParser.SetNoContent();
698 }
699
700
701 /*!
702 \brief Helper that sets the error in the result to \a e and notifies the listeners.
703 */
704 void
SetError(std::exception_ptr e)705 BHttpSession::Request::SetError(std::exception_ptr e)
706 {
707 fResult->SetError(e);
708 SendMessage(UrlEvent::DebugMessage, [&e](BMessage& msg) {
709 msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugError);
710 try {
711 std::rethrow_exception(e);
712 } catch (BError& error) {
713 msg.AddString(UrlEventData::DebugMessage, error.DebugMessage());
714 } catch (std::exception& error) {
715 msg.AddString(UrlEventData::DebugMessage, error.what());
716 } catch (...) {
717 msg.AddString(UrlEventData::DebugMessage, "Unknown exception");
718 }
719 });
720 SendMessage(UrlEvent::RequestCompleted,
721 [](BMessage& msg) { msg.AddBool(UrlEventData::Success, false); });
722 }
723
724
725 std::pair<BString, int>
GetHost() const726 BHttpSession::Request::GetHost() const
727 {
728 return {fRequest.Url().Host(), fRequest.Url().Port()};
729 }
730
731
732 void
SetCounter(int32 * counter)733 BHttpSession::Request::SetCounter(int32* counter) noexcept
734 {
735 fConnectionCounter = std::unique_ptr<int32, CounterDeleter>(counter);
736 }
737
738
739 /*!
740 \brief Resolve the hostname for a request
741 */
742 void
ResolveHostName()743 BHttpSession::Request::ResolveHostName()
744 {
745 int port;
746 if (fRequest.Url().HasPort())
747 port = fRequest.Url().Port();
748 else if (fRequest.Url().Protocol() == "https")
749 port = 443;
750 else
751 port = 80;
752
753 // TODO: proxy
754 if (auto status = fRemoteAddress.SetTo(fRequest.Url().Host(), port); status != B_OK) {
755 throw BNetworkRequestError(
756 "BNetworkAddress::SetTo()", BNetworkRequestError::HostnameError, status);
757 }
758
759 SendMessage(UrlEvent::HostNameResolved,
760 [this](BMessage& msg) { msg.AddString(UrlEventData::HostName, fRequest.Url().Host()); });
761 }
762
763
764 /*!
765 \brief Open the connection and make the socket non-blocking after opening it
766 */
767 void
OpenConnection()768 BHttpSession::Request::OpenConnection()
769 {
770 // Set up the socket
771 if (fRequest.Url().Protocol() == "https") {
772 // To do: secure socket with callbacks to check certificates
773 fSocket = std::make_unique<BSecureSocket>();
774 } else {
775 fSocket = std::make_unique<BSocket>();
776 }
777
778 // Set timeout
779 fSocket->SetTimeout(fRequest.Timeout());
780
781 // Open connection
782 if (auto status = fSocket->Connect(fRemoteAddress); status != B_OK) {
783 // TODO: inform listeners that the connection failed
784 throw BNetworkRequestError(
785 "BSocket::Connect()", BNetworkRequestError::NetworkError, status);
786 }
787
788 // Make the rest of the interaction non-blocking
789 auto flags = fcntl(fSocket->Socket(), F_GETFL, 0);
790 if (flags == -1)
791 throw BRuntimeError("fcntl()", "Error getting socket flags");
792 if (fcntl(fSocket->Socket(), F_SETFL, flags | O_NONBLOCK) != 0)
793 throw BRuntimeError("fcntl()", "Error setting non-blocking flag on socket");
794
795 SendMessage(UrlEvent::ConnectionOpened);
796
797 fRequestStatus = Connected;
798 }
799
800
801 /*!
802 \brief Transfer data from the request to the socket.
803
804 \returns \c true if the request is complete, or false if there is more.
805 */
806 void
TransferRequest()807 BHttpSession::Request::TransferRequest()
808 {
809 // Assert that we are in the right state
810 if (fRequestStatus != Connected)
811 throw BRuntimeError(
812 __PRETTY_FUNCTION__, "Write request for object that is not in the Connected state");
813
814 if (!fSerializer.IsInitialized())
815 fSerializer.SetTo(fBuffer, fRequest);
816
817 auto currentBytesWritten = fSerializer.Serialize(fBuffer, fSocket.get());
818
819 if (currentBytesWritten > 0) {
820 SendMessage(UrlEvent::UploadProgress, [this](BMessage& msg) {
821 msg.AddInt64(UrlEventData::NumBytes, fSerializer.BodyBytesTransferred());
822 if (auto totalSize = fSerializer.BodyBytesTotal())
823 msg.AddInt64(UrlEventData::TotalBytes, totalSize.value());
824 });
825 }
826
827 if (fSerializer.Complete())
828 fRequestStatus = RequestSent;
829 }
830
831
832 /*!
833 \brief Transfer data from the socket and parse the result.
834
835 \returns \c true if the request is complete, or false if there is more.
836 */
837 bool
ReceiveResult()838 BHttpSession::Request::ReceiveResult()
839 {
840 // First: stream data from the socket
841 auto bytesRead = fBuffer.ReadFrom(fSocket.get());
842
843 if (bytesRead == B_WOULD_BLOCK || bytesRead == B_INTERRUPTED)
844 return false;
845
846 auto readEnd = bytesRead == 0;
847
848 // Parse the content in the buffer
849 switch (fParser.State()) {
850 case HttpInputStreamState::StatusLine:
851 {
852 if (fBuffer.RemainingBytes() == static_cast<size_t>(bytesRead)) {
853 // In the initial run, the bytes in the buffer will match the bytes read to indicate
854 // the response has started.
855 SendMessage(UrlEvent::ResponseStarted);
856 }
857
858 if (fParser.ParseStatus(fBuffer, fStatus)) {
859 // the status headers are now received, decide what to do next
860
861 // Determine if we can handle redirects; else notify of receiving status
862 if (fRemainingRedirects > 0) {
863 switch (fStatus.StatusCode()) {
864 case BHttpStatusCode::MovedPermanently:
865 case BHttpStatusCode::TemporaryRedirect:
866 case BHttpStatusCode::PermanentRedirect:
867 // These redirects require the request body to be sent again. It this is
868 // possible, BHttpRequest::RewindBody() will return true in which case
869 // we can handle the redirect.
870 if (!fRequest.RewindBody())
871 break;
872 [[fallthrough]];
873 case BHttpStatusCode::Found:
874 case BHttpStatusCode::SeeOther:
875 // These redirects redirect to GET, so we don't care if we can rewind
876 // the body; in this case redirect
877 fMightRedirect = true;
878 break;
879 default:
880 break;
881 }
882 }
883
884 if ((fStatus.StatusClass() == BHttpStatusClass::ClientError
885 || fStatus.StatusClass() == BHttpStatusClass::ServerError)
886 && fRequest.StopOnError()) {
887 fRequestStatus = ContentReceived;
888 fResult->SetStatus(std::move(fStatus));
889 fResult->SetFields(BHttpFields());
890 fResult->SetBody();
891 SendMessage(UrlEvent::RequestCompleted,
892 [](BMessage& msg) { msg.AddBool(UrlEventData::Success, true); });
893 return true;
894 }
895
896 if (!fMightRedirect) {
897 // we are not redirecting and there is no error, so inform listeners
898 SendMessage(UrlEvent::HttpStatus, [this](BMessage& msg) {
899 msg.AddInt16(UrlEventData::HttpStatusCode, fStatus.code);
900 });
901 fResult->SetStatus(BHttpStatus{fStatus.code, std::move(fStatus.text)});
902 }
903 } else {
904 // We do not have enough data for the status line yet
905 if (readEnd) {
906 throw BNetworkRequestError(__PRETTY_FUNCTION__,
907 BNetworkRequestError::ProtocolError,
908 "Response did not include a complete status line");
909 }
910 return false;
911 }
912 [[fallthrough]];
913 }
914 case HttpInputStreamState::Fields:
915 {
916 if (!fParser.ParseFields(fBuffer, fFields)) {
917 // there may be more headers to receive, throw an error if there will be no more
918 if (readEnd) {
919 throw BNetworkRequestError(__PRETTY_FUNCTION__,
920 BNetworkRequestError::ProtocolError,
921 "Response did not include a complete header section");
922 }
923 break;
924 }
925
926 // The headers have been received, now set up the rest of the response handling
927
928 // Handle redirects
929 if (fMightRedirect) {
930 auto redirectToGet = false;
931 switch (fStatus.StatusCode()) {
932 case BHttpStatusCode::Found:
933 case BHttpStatusCode::SeeOther:
934 // 302 and 303 redirections convert all requests to GET request, except for
935 // HEAD
936 redirectToGet = true;
937 [[fallthrough]];
938 case BHttpStatusCode::MovedPermanently:
939 case BHttpStatusCode::TemporaryRedirect:
940 case BHttpStatusCode::PermanentRedirect:
941 {
942 auto locationField = fFields.FindField("Location");
943 if (locationField == fFields.end()) {
944 throw BNetworkRequestError(__PRETTY_FUNCTION__,
945 BNetworkRequestError::ProtocolError,
946 "Redirect; the Location field must be present and cannot be found");
947 }
948 auto locationString = BString(
949 (*locationField).Value().data(), (*locationField).Value().size());
950 auto redirect = BHttpSession::Redirect{
951 BUrl(fRequest.Url(), locationString), redirectToGet};
952 if (!redirect.url.IsValid()) {
953 throw BNetworkRequestError(__PRETTY_FUNCTION__,
954 BNetworkRequestError::ProtocolError,
955 "Redirect; invalid URL in the Location field");
956 }
957
958 // Notify of redirect
959 SendMessage(UrlEvent::HttpRedirect, [&locationString](BMessage& msg) {
960 msg.AddString(UrlEventData::HttpRedirectUrl, locationString);
961 });
962 throw redirect;
963 }
964 default:
965 // ignore other status codes and continue regular processing
966 SendMessage(UrlEvent::HttpStatus, [this](BMessage& msg) {
967 msg.AddInt16(UrlEventData::HttpStatusCode, fStatus.code);
968 });
969 fResult->SetStatus(BHttpStatus{fStatus.code, std::move(fStatus.text)});
970 break;
971 }
972 }
973
974 // TODO: Parse received cookies
975
976 // Move headers to the result and inform listener
977 fResult->SetFields(std::move(fFields));
978 SendMessage(UrlEvent::HttpFields);
979
980 if (!fParser.HasContent()) {
981 // Any requests with not content are finished
982 fResult->SetBody();
983 SendMessage(UrlEvent::RequestCompleted,
984 [](BMessage& msg) { msg.AddBool(UrlEventData::Success, true); });
985 fRequestStatus = ContentReceived;
986 return true;
987 }
988 [[fallthrough]];
989 }
990 case HttpInputStreamState::Body:
991 {
992 size_t bytesWrittenToBody;
993 // The bytesWrittenToBody may differ from the bytes parsed from the buffer when
994 // there is compression on the incoming stream.
995 bytesRead = fParser.ParseBody(
996 fBuffer,
997 [this, &bytesWrittenToBody](const std::byte* buffer, size_t size) {
998 bytesWrittenToBody = fResult->WriteToBody(buffer, size);
999 return bytesWrittenToBody;
1000 },
1001 readEnd);
1002
1003 SendMessage(UrlEvent::DownloadProgress, [this, bytesRead](BMessage& msg) {
1004 msg.AddInt64(UrlEventData::NumBytes, bytesRead);
1005 if (fParser.BodyBytesTotal())
1006 msg.AddInt64(UrlEventData::TotalBytes, fParser.BodyBytesTotal().value());
1007 });
1008
1009 if (bytesWrittenToBody > 0) {
1010 SendMessage(UrlEvent::BytesWritten, [bytesWrittenToBody](BMessage& msg) {
1011 msg.AddInt64(UrlEventData::NumBytes, bytesWrittenToBody);
1012 });
1013 }
1014
1015 if (fParser.Complete()) {
1016 fResult->SetBody();
1017 SendMessage(UrlEvent::RequestCompleted,
1018 [](BMessage& msg) { msg.AddBool(UrlEventData::Success, true); });
1019 fRequestStatus = ContentReceived;
1020 return true;
1021 } else if (readEnd) {
1022 // the parsing of the body is not complete but we are at the end of the data
1023 throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::ProtocolError,
1024 "Unexpected end of data: more data was expected");
1025 }
1026
1027 break;
1028 }
1029 default:
1030 throw BRuntimeError(__PRETTY_FUNCTION__, "Not reachable");
1031 }
1032
1033 // There is more to receive
1034 return false;
1035 }
1036
1037
1038 /*!
1039 \brief Disconnect the socket. Does not validate if it actually succeeded.
1040 */
1041 void
Disconnect()1042 BHttpSession::Request::Disconnect() noexcept
1043 {
1044 fSocket->Disconnect();
1045 }
1046
1047
1048 /*!
1049 \brief Send a message to the observer, if one is present
1050
1051 \param what The code of the message to be sent
1052 \param dataFunc Optional function that adds additional data to the message.
1053 */
1054 void
SendMessage(uint32 what,std::function<void (BMessage &)> dataFunc) const1055 BHttpSession::Request::SendMessage(uint32 what, std::function<void(BMessage&)> dataFunc) const
1056 {
1057 if (fObserver.IsValid()) {
1058 BMessage msg(what);
1059 msg.AddInt32(UrlEventData::Id, fResult->id);
1060 if (dataFunc)
1061 dataFunc(msg);
1062 fObserver.SendMessage(&msg);
1063 }
1064 }
1065
1066
1067 // #pragma mark -- Message constants
1068
1069
1070 namespace BPrivate::Network::UrlEventData {
1071 const char* HttpStatusCode = "url:httpstatuscode";
1072 const char* SSLCertificate = "url:sslcertificate";
1073 const char* SSLMessage = "url:sslmessage";
1074 const char* HttpRedirectUrl = "url:httpredirecturl";
1075 } // namespace BPrivate::Network::UrlEventData
1076