UDocumentation UE5.7 10.02.2026 (Source)
API documentation for Unreal Engine 5.7
Loop.inl
Go to the documentation of this file.
1// Copyright Epic Games, Inc. All Rights Reserved.
2
3#pragma once
4
5namespace UE::IoStore::HTTP
6{
7
8// {{{1 work-queue .............................................................
9
10/*
11 * - Activities (requests send with a loop) are managed in singly-linked lists
12 * - Each activity has an associated host it is talking to.
13 * - Hosts are ephemeral, or represented externally via a FConnectionPool object
14 * - Loop has a group for each host, and each host-group has a bunch of socket-groups
15 * - Host-group has a list of work; pending activities waiting to start
16 * - Socket-groups own up to two activities; one sending, one receiving
17 * - As it recvs, a socket-group will, if possible, fetch more work from the host
18 *
19 * Loop:
20 * FHostGroup[HostPtr]:
21 * Work: Act0 -> Act1 -> Act2 -> Act3 -> ...
22 * FPeerGroup[0...HostMaxConnections]:
23 * Act.Send
24 * Act.Recv
25 */
26
29 : public FActivity
30{
37
38 static FActivityNode* Create(FParams& Params, FAnsiStringView Url={}, FCertRootsRef VerifyCert={});
39 static void Destroy(FActivityNode* Activity);
40 FActivityNode* Next = nullptr;
41 int8 Slot = -1;
43
44private:
45 FActivityNode(const FParams& Params);
46 ~FActivityNode() = default;
47};
48
50FActivityNode::FActivityNode(const FParams& Params)
51: FActivity(Params)
52, bFollow30x(Params.bFollow30x)
53{
54}
55
58{
59 static_assert(alignof(FActivityNode) <= 16);
60 static_assert(alignof(FHost) <= alignof(FActivityNode));
61
62 check(!Params.Method.IsEmpty());
63
64 if (Url.IsEmpty())
65 {
66 uint32 BufferSize = Params.BufferSize;
67 BufferSize = (BufferSize >= 128) ? BufferSize : 128;
68 BufferSize = (BufferSize + 15) & ~15;
69
70 uint32 AllocSize = sizeof(FActivityNode) + BufferSize;
71 auto* Ptr = (FActivityNode*)FMemory::Malloc(AllocSize, alignof(FActivityNode));
72
73 check(Params.Host != nullptr);
74 check(Params.Host->IsPooled());
75
76 Params.Buffer = (char*)(Ptr + 1);
77 Params.BufferSize = uint16(BufferSize);
78 Params.bIsKeepAlive = true;
79 return new (Ptr) FActivityNode(Params);
80 }
81
82 // Parse the URL into its components
84 if (ParseUrl(Url, UrlOffsets) < 0)
85 {
86 return nullptr;
87 }
88
89 FAnsiStringView HostName = UrlOffsets.HostName.Get(Url);
90
91 uint32 Port = 0;
92 if (UrlOffsets.Port)
93 {
94 FAnsiStringView PortView = UrlOffsets.Port.Get(Url);
95 Port = uint32(CrudeToInt(PortView));
96 }
97
98 FAnsiStringView Path;
99 if (UrlOffsets.Path > 0)
100 {
101 Path = Url.Mid(UrlOffsets.Path);
102 }
103
104 uint32 BufferSize = Params.BufferSize;
105 BufferSize = (BufferSize >= 128) ? BufferSize : 128;
106 BufferSize += sizeof(FHost);
107 BufferSize += HostName.Len() + 1;
108 BufferSize = (BufferSize + 15) & ~15;
109
110 uint32 AllocSize = sizeof(FActivityNode) + BufferSize;
111 auto* Ptr = (FActivityNode*)FMemory::Malloc(AllocSize, alignof(FActivityNode));
112
113 // Create an emphemeral host
114 if (UrlOffsets.SchemeLength == 5)
115 {
116 if (VerifyCert == ECertRootsRefType::None)
117 {
118 VerifyCert = FCertRoots::Default();
119 }
120 check(VerifyCert != ECertRootsRefType::None);
121 }
122 else
123 {
124 VerifyCert = FCertRoots::NoTls();
125 }
126
127 FHost* Host = (FHost*)(Ptr + 1);
128
129 char* HostNamePtr = (char*)(Host + 1);
130 uint32 HostNameLength = HostName.Len();
133
134 EHttpVersion Version = Params.HttpVersion;
135 Version = (Version != EHttpVersion::Two) ? EHttpVersion::One : Version;
136
137 new (Host) FHost({
138 .HostName = HostNamePtr,
139 .Port = Port,
140 .HttpVersion= Version,
141 .VerifyCert = VerifyCert,
142 });
143
144 HostNameLength = (HostNameLength + 8) & ~7;
145
146 check(Params.Host == nullptr);
147 Params.Path = Path;
148 Params.Host = Host;
149 Params.Buffer = HostNamePtr + HostNameLength;
150 Params.BufferSize = uint16(AllocSize - ptrdiff_t(Params.Buffer - (char*)Ptr));
151 Params.bIsKeepAlive = false;
152 return new (Ptr) (FActivityNode)(Params);
153}
154
157{
159 Ptr->~FActivityNode();
160 FMemory::Free(Ptr);
161}
162
163
164
167{
168public:
169 FActivityList() = default;
171 FActivityList(FActivityList&& Rhs) { Swap(Head, Rhs.Head); }
172 void operator = (FActivityList&& Rhs) { Swap(Head, Rhs.Head); }
173 FActivityList(const FActivityList&) = delete;
174 void operator = (const FActivityList&) = delete;
175 int32 IsEmpty() const { return Head == nullptr; }
176 FActivityNode* GetHead() const { return Head; }
177 FActivityNode* Detach() { FActivityNode* Ret = Head; Head = nullptr; return Ret; }
178 void Reverse();
179 void Join(FActivityList& Rhs);
180 void Prepend(FActivityNode* Node);
181 void Append(FActivityNode* Node);
185
186 template <typename LAMBDA>
187 FActivityNode* MoveToHead(LAMBDA&& Predicate);
188
189private:
190 FActivityNode* PopImpl();
191 FActivityNode* Head = nullptr;
192};
193
196{
197 check(Head == nullptr);
198}
199
201template <typename LAMBDA>
203{
204 FActivityNode* Node = Head;
205 FActivityNode* Prev = nullptr;
206 for (; Node != nullptr; Prev = Node, Node = Node->Next)
207 {
208 if (!Predicate(Node))
209 {
210 continue;
211 }
212
213 if (Prev == nullptr)
214 {
215 check(Head == Node);
216 return Node;
217 }
218
219 Prev->Next = Node->Next;
220 Node->Next = Head;
221 Head = Node;
222 return Node;
223 }
224
225 return nullptr;
226}
227
230{
231 FActivityNode* Reverse = nullptr;
232 for (FActivityNode* Next; Head != nullptr; Head = Next)
233 {
234 Next = Head->Next;
235 Head->Next = Reverse;
236 Reverse = Head;
237 }
238 Head = Reverse;
239}
240
243{
244 if (Head == nullptr)
245 {
246 Swap(Head, Rhs.Head);
247 return;
248 }
249
250 FActivityNode* Tail = Head;
251 for (; Tail->Next != nullptr; Tail = Tail->Next);
252 Swap(Tail->Next, Rhs.Head);
253}
254
257{
258 check(Node->Next == nullptr);
259 Node->Next = Head;
260 Head = Node;
261}
262
265{
266 check(Node->Next == nullptr);
267 if (Head == nullptr)
268 {
269 Head = Node;
270 return;
271 }
272
273 FActivityNode* Tail = Head;
274 for (; Tail->Next != nullptr; Tail = Tail->Next);
275 Tail->Next = Node;
276}
277
280{
281 if (Head == nullptr)
282 {
283 return Head;
284 }
285 return PopImpl();
286}
287
289FActivityNode* FActivityList::PopImpl()
290{
291 FActivityNode* Node = Head;
292 Head = Node->Next;
293 Node->Next = nullptr;
294 return Node;
295}
296
299{
300 check(Head != nullptr);
301 FActivityNode* Node = PopImpl();
302 ToList.Prepend(Node);
303}
304
307{
308 check(Head != nullptr);
309 FActivityNode* Node = PopImpl();
310 ToList.Append(Node);
311}
312
313
314
326
327
328
331{
332public:
333 FWorkQueue() = default;
334 bool HasWork() const { return !List.IsEmpty(); }
335 void AddActivity(FActivityNode* Activity);
337 void TickCancels(FTickState& State);
338
339private:
340 FActivityList List;
341 uint64 ActiveSlots = 0;
342
344};
345
348{
349 // This prepends and thus reverses order. It is assumed this this is working
350 // in conjunction with the loop, negating the reversal what happens there.
351
352 check(Activity->Next == nullptr);
353 List.Prepend(Activity);
354
355 ActiveSlots |= (1ull << Activity->Slot);
356}
357
360{
361 FActivityNode* Activity = List.Pop();
362 if (Activity == nullptr)
363 {
364 return nullptr;
365 }
366
367 check(ActiveSlots & (1ull << Activity->Slot));
368 ActiveSlots ^= (1ull << Activity->Slot);
369
370 check(Activity->Next == nullptr);
371 return Activity;
372}
373
376{
377 if (State.Cancels == 0 || (State.Cancels & ActiveSlots) == 0)
378 {
379 return;
380 }
381
382 // We are going to rebuild the list of activities to maintain order as the
383 // activity list is singular.
384
385 check(!List.IsEmpty());
386 ActiveSlots = 0;
387
388 List.Reverse();
389 FActivityNode* Activity = List.Detach();
390
391 for (FActivityNode* Next; Activity != nullptr; Activity = Next)
392 {
393 Next = Activity->Next;
394 Activity->Next = nullptr;
395
396 if (uint64 Slot = (1ull << Activity->Slot); (State.Cancels & Slot) == 0)
397 {
398 AddActivity(Activity);
399 continue;
400 }
401
402 Activity->Cancel();
403
404 State.DoneList.Prepend(Activity);
405 }
406}
407
408
409
410// {{{1 peer-group .............................................................
411
414{
415public:
416 FPeerGroup() = default;
418 void Unwait() { check(bWaiting); bWaiting = false; }
419 FWaiter GetWaiter() const;
420 bool Tick(FTickState& State);
421 void TickSend(FTickState& State, FHost& Host, FPoller& Poller);
422 void Fail(FTickState& State, FOutcome Reason);
423
424private:
425 bool MaybeStartNewWork(FTickState& State);
426 void Negotiate(FTickState& State);
427 void RecvInternal(FTickState& State);
428 void SendInternal(FTickState& State);
429 FActivityList Send;
430 FActivityList Recv;
431 FHttpPeer Peer;
432 uint32 LastUseMs = 0;
433 uint8 IsKeepAlive = 0;
434 uint8 InflightNum = 0;
435 uint8 InflightMax = 1;
436 bool bNegotiating = false;
437 bool bWaiting = false;
438
440};
441
444{
445 InflightMax = uint8(FMath::Min(uint32(Maximum), 127u));
446}
447
450{
451 if (!bWaiting)
452 {
453 return FWaiter();
454 }
455
457
460 return Waiter;
461}
462
465{
466 // Any send left at this point is unrecoverable. Send is likely shorter.
467 Send.Join(Recv);
468 Recv = MoveTemp(Send);
469
470 // Failure is quite terminal and we need to abort everything
471 while (FActivityNode* Activity = Recv.Pop())
472 {
473 Activity->Fail(Reason);
474 State.DoneList.Prepend(Activity);
475 }
476
477 Peer = FHttpPeer();
478 Send = FActivityList();
479 Recv = FActivityList();
480 bWaiting = false;
481 IsKeepAlive = 0;
482 InflightNum = 0;
483 bNegotiating = false;
484}
485
487bool FPeerGroup::MaybeStartNewWork(FTickState& State)
488{
489 if (InflightNum >= InflightMax) return false;
490 if (IsKeepAlive == 0) return false;
491 if (!State.Work->HasWork()) return false;
492
493 FActivityNode* Activity = State.Work->PopActivity();
494 do
495 {
496 Trace(Activity, ETrace::StartWork);
497
498 check(Activity->GetHost()->IsPooled());
499
500 Send.Append(Activity);
501 InflightNum++;
502 if (InflightNum == InflightMax)
503 {
504 break;
505 }
506
507 Activity = State.Work->PopActivity();
508 }
509 while (Activity != nullptr);
510
511 return true;
512}
513
515void FPeerGroup::Negotiate(FTickState& State)
516{
517 check(bNegotiating);
518 check(!Send.IsEmpty());
519 check(Peer.IsValid());
520
521 FOutcome Outcome = Peer.Handshake();
522 if (Outcome.IsError())
523 {
524 Fail(State, Outcome);
525 return;
526 }
527
528 if (Outcome.IsWaiting())
529 {
530 bWaiting = true;
531 return;
532 }
533
534 bNegotiating = false;
535 return SendInternal(State);
536}
537
539void FPeerGroup::RecvInternal(FTickState& State)
540{
541 check(bNegotiating == false);
542 check(!Recv.IsEmpty());
543
544 FActivityNode* Activity = nullptr;
545
546 FOutcome Outcome = Peer.GetPendingTransactId();
547 if (Outcome.IsWaiting())
548 {
549 bWaiting |= true;
550 return;
551 }
552
553 while (Outcome.IsOk())
554 {
555 uint32 TransactId = Outcome.GetResult();
556 Activity = Recv.MoveToHead(
557 [TransactId] (FActivityNode* Activity)
558 {
559 return Activity->GetTransactId() == TransactId;
560 }
561 );
562
563 if (Activity == nullptr)
564 {
565 Outcome = FOutcome::Error("could not find work for transact id", TransactId);
566 break;
567 }
568
569 Outcome = Activity->Tick(Peer, &State.RecvAllowance);
570 break;
571 }
572
573 // Any sort of error here is unrecoverable
574 if (Outcome.IsError())
575 {
576 Fail(State, Outcome);
577 return;
578 }
579
580 check(Activity != nullptr);
581
582 IsKeepAlive &= uint8(Activity->GetTransaction()->IsKeepAlive() == true);
583 LastUseMs = State.NowMs;
584 bWaiting |= (Outcome.IsWaiting() && Outcome.IsWaitData());
585
586 // New work may have fell foul of peer issues
587 if (!Peer.IsValid())
588 {
589 return;
590 }
591
592 // If there was no data available this is far as receiving can go
593 if (Outcome.IsWaiting())
594 {
595 return;
596 }
597
598 // If an okay result is -1, then message recv is done and content is next
599 // and it is likely that we have it.
600 if (Outcome.GetResult() == uint32(FActivity::EStage::Response))
601 {
602 return RecvInternal(State);
603 }
604
606
607 check(Recv.GetHead() == Activity);
608 Recv.PopPrepend(State.DoneList);
609 InflightNum--;
610
611 if (MaybeStartNewWork(State))
612 {
613 SendInternal(State);
614 }
615
616 // If the server wants the connection closed then we can drop our peer
617 if (IsKeepAlive == 0)
618 {
619 Peer = FHttpPeer();
620 }
621}
622
624void FPeerGroup::SendInternal(FTickState& State)
625{
626 check(bNegotiating == false);
627 check(IsKeepAlive == 1);
628 check(!Send.IsEmpty());
629
630 FActivityNode* Activity = Send.GetHead();
631
632 FOutcome Outcome = Activity->Tick(Peer);
633
634 if (Outcome.IsWaiting())
635 {
636 bWaiting = true;
637 return;
638 }
639
640 if (Outcome.IsError())
641 {
642 Fail(State, Outcome);
643 return;
644 }
645
647
648 Send.PopAppend(Recv);
649
650 if (!Send.IsEmpty())
651 {
652 return SendInternal(State);
653 }
654}
655
658{
659 if (bNegotiating)
660 {
661 Negotiate(State);
662 }
663
664 else if (!Send.IsEmpty())
665 {
666 SendInternal(State);
667 }
668
669 if (!Recv.IsEmpty() && State.RecvAllowance)
670 {
671 RecvInternal(State);
672 }
673
674 return !!IsKeepAlive | !(Send.IsEmpty() & Recv.IsEmpty());
675}
676
678void FPeerGroup::TickSend(FTickState& State, FHost& Host, FPoller& Poller)
679{
680 // This path is only for those that have spare capacity
681 if (InflightNum >= InflightMax)
682 {
683 return;
684 }
685
686 // Failing will try and recover work which we don't want to happen yet
687 FActivityNode* Pending = State.Work->PopActivity();
688 check(Pending != nullptr);
689
690 // Close idle sockets
691 if (Peer.IsValid() && LastUseMs + GIdleMs < State.NowMs)
692 {
693 LastUseMs = State.NowMs;
694 Peer = FHttpPeer();
695 }
696
697 // We don't have a connected socket on first use, or if a keep-alive:close
698 // was received from the server. So we connect here.
699 bool bWillBlock = false;
700 if (!Peer.IsValid())
701 {
703
704 FSocket Socket;
705 if (Socket.Create())
706 {
707 FWaitable Waitable = Socket.GetWaitable();
708 Poller.Register(Waitable);
709
710 Outcome = Host.Connect(Socket);
711 }
712 else
713 {
714 Outcome = FOutcome::Error("Failed to create socket");
715 }
716
717 if (Outcome.IsError())
718 {
719 // We failed to connect, let's bail.
720 Recv.Prepend(Pending);
721 Fail(State, Outcome);
722 return;
723 }
724
725 IsKeepAlive = 1;
726 bNegotiating = true;
727 bWillBlock = Outcome.IsWaiting();
728 InflightNum = 0;
729
730 FHttpPeer::FParams Params = {
731 .Socket = MoveTemp(Socket),
732 .Certs = Host.GetVerifyCert(),
733 .HostName = Host.GetHostName().GetData(),
734 .HttpVersion = Host.GetHttpVersion(),
735 };
736 Peer = FHttpPeer(MoveTemp(Params));
737 }
738
739 Send.Append(Pending);
740 InflightNum++;
741
742 MaybeStartNewWork(State);
743
744 if (!bWillBlock)
745 {
746 if (bNegotiating)
747 {
748 return Negotiate(State);
749 }
750 return SendInternal(State);
751 }
752
753 // Non-blocking connect
754 bWaiting = true;
755}
756
757
758
759// {{{1 host-group .............................................................
760
763{
764public:
766 bool IsBusy() const { return BusyCount != 0; }
767 const FHost& GetHost() const { return Host; }
768 void Tick(FTickState& State);
769 void AddActivity(FActivityNode* Activity);
770
771private:
772 int32 Wait(const FTickState& State);
773 TArray<FPeerGroup> PeerGroups;
774 FWorkQueue Work;
775 FHost& Host;
776 FPoller Poller;
777 uint32 BusyCount = 0;
778 int32 WaitTimeAccum = 0;
779
781};
782
785: Host(InHost)
786{
787 uint32 Num = InHost.GetMaxConnections();
788 PeerGroups.SetNum(Num);
789
790 for (uint32 Inflight = Host.GetMaxInflight(); FPeerGroup& Group : PeerGroups)
791 {
792 Group.SetMaxInflight(Inflight);
793 }
794}
795
797int32 FHostGroup::Wait(const FTickState& State)
798{
799 // Collect groups that are waiting on something
801 for (uint32 i = 0, n = PeerGroups.Num(); i < n; ++i)
802 {
803 FWaiter Waiter = PeerGroups[i].GetWaiter();
804 if (!Waiter.IsValid())
805 {
806 continue;
807 }
808
809 Waiter.SetIndex(i);
810 Waiters.Add(MoveTemp(Waiter));
811 }
812
813 if (Waiters.IsEmpty())
814 {
815 return 0;
816 }
817
818 Trace(ETrace::Wait);
819 ON_SCOPE_EXIT { Trace(ETrace::Unwait); };
820
821 // If the poll timeout is negative then treat that as a fatal timeout
822 check(State.FailTimeoutMs);
823 int32 PollTimeoutMs = State.PollTimeoutMs;
824 if (PollTimeoutMs < 0)
825 {
826 PollTimeoutMs = State.FailTimeoutMs;
827 }
828
829 // Actually do the wait
830 int32 Result = FWaiter::Wait(Waiters, Poller, PollTimeoutMs);
831 if (Result <= 0)
832 {
833 // If the user opts to not block then we don't accumulate wait time and
834 // leave it to them to manage time a fail timoue
835 WaitTimeAccum += PollTimeoutMs;
836
837 if (State.PollTimeoutMs < 0 || WaitTimeAccum >= State.FailTimeoutMs)
838 {
839 return MIN_int32;
840 }
841
842 return Result;
843 }
844
845 WaitTimeAccum = 0;
846
847 // For each waiter that's ready, find the associated group "unwait" them.
848 int32 Count = 0;
849 for (int32 i = 0, n = Waiters.Num(); i < n; ++i)
850 {
851 if (!Waiters[i].IsReady())
852 {
853 continue;
854 }
855
856 uint32 Index = Waiters[i].GetIndex();
857 check(Index < uint32(PeerGroups.Num()));
858 PeerGroups[Index].Unwait();
859
860 Waiters.RemoveAtSwap(i, EAllowShrinking::No);
861 --n, --i, ++Count;
862 }
863 check(Count == Result);
864
865 return Result;
866}
867
870{
871 State.Work = &Work;
872
873 if (BusyCount = Work.HasWork(); BusyCount)
874 {
875 Work.TickCancels(State);
876
877 // Get available work out on idle sockets as soon as possible
878 for (FPeerGroup& Group : PeerGroups)
879 {
880 if (!Work.HasWork())
881 {
882 break;
883 }
884
885 Group.TickSend(State, Host, Poller);
886 }
887 }
888
889 // Wait on the groups that are
890 if (int32 Result = Wait(State); Result < 0)
891 {
892 FOutcome Reason = (Result == MIN_int32)
893 ? FOutcome::Error("FailTimeout hit")
894 : FOutcome::Error("poll() returned an unexpected error");
895
896 for (FPeerGroup& Group : PeerGroups)
897 {
898 Group.Fail(State, Reason);
899 }
900
901 return;
902 }
903
904 // Tick everything, starting with groups that are maybe closest to finishing
905 for (FPeerGroup& Group : PeerGroups)
906 {
907 BusyCount += (Group.Tick(State) == true);
908 }
909}
910
913{
914 Work.AddActivity(Activity);
915}
916
917
918
919// {{{1 event-loop .............................................................
920
922static const FEventLoop::FRequestParams GDefaultParams;
923
926{
927public:
928 ~FImpl();
929 uint32 Tick(int32 PollTimeoutMs=0);
930 bool IsIdle() const;
932 void SetFailTimeout(int32 TimeoutMs);
933 void Cancel(FTicket Ticket);
934 FRequest Request(FActivityNode* Activity);
935 FTicket Send(FActivityNode* Activity);
936
937private:
938 void ReceiveWork();
939 FCriticalSection Lock;
940 std::atomic<uint64> FreeSlots = ~0ull;
941 std::atomic<uint64> Cancels = 0;
942 uint64 PrevFreeSlots = ~0ull;
943 FActivityList Pending;
944 FThrottler Throttler;
945 TArray<FHostGroup> Groups;
946 int32 FailTimeoutMs = GIdleMs;
947 uint32 BusyCount = 0;
948};
949
952{
953 check(BusyCount == 0);
954}
955
958{
959 Trace(Activity, ETrace::ActivityCreate, 0);
960
961 FRequest Ret;
962 Ret.Ptr = Activity;
963 return Ret;
964}
965
968{
969 Trace(Activity, ETrace::RequestBegin);
970
971 uint64 Slot;
972 {
973 FScopeLock _(&Lock);
974
975 for (;; FPlatformProcess::SleepNoStats(0.0f))
976 {
977 uint64 FreeSlotsLoad = FreeSlots.load(std::memory_order_relaxed);
978 if (!FreeSlotsLoad)
979 {
980 // we don't handle oversubscription at the moment. Could return
981 // activity to Reqeust and return a 0 ticket.
982 check(false);
983 }
985 if (FreeSlots.compare_exchange_weak(FreeSlotsLoad, FreeSlotsLoad - Slot, std::memory_order_relaxed))
986 {
987 break;
988 }
989 }
990 Activity->Slot = int8(63 - FMath::CountLeadingZeros64(Slot));
991
992 // This puts pending requests in reverse order of when they were made
993 // but this will be undone when ReceiveWork() adds work to the queue
994 Pending.Prepend(Activity);
995 }
996
997 return Slot;
998}
999
1002{
1003 return FreeSlots.load(std::memory_order_relaxed) == ~0ull;
1004}
1005
1008{
1009 Throttler.SetLimit(KiBPerSec);
1010}
1011
1014{
1015 if (TimeoutMs > 0)
1016 {
1017 FailTimeoutMs = TimeoutMs;
1018 }
1019 else
1020 {
1021 FailTimeoutMs = GIdleMs; // Reset to default
1022 }
1023}
1024
1027{
1028 Cancels.fetch_or(Ticket, std::memory_order_relaxed);
1029}
1030
1032void FEventLoop::FImpl::ReceiveWork()
1033{
1034 uint64 FreeSlotsLoad = FreeSlots.load(std::memory_order_relaxed);
1035 if (FreeSlots == PrevFreeSlots)
1036 {
1037 return;
1038 }
1039 PrevFreeSlots = FreeSlotsLoad;
1040
1041 // Fetch the pending activities from out in the wild
1043 {
1044 FScopeLock _(&Lock);
1046 }
1047
1048 // Pending is in the reverse of the order that requests were made. Adding
1049 // activities to their corresponding group will reverse this reversal.
1050
1051 // Group activities by their host.
1052 FActivityNode* Activity = Collected.Detach();
1053 for (FActivityNode* Next; Activity != nullptr; Activity = Next)
1054 {
1055 Next = Activity->Next;
1056 Activity->Next = nullptr;
1057
1058 FHost& Host = *(Activity->GetHost());
1059 auto Pred = [&Host] (const FHostGroup& Lhs) { return &Lhs.GetHost() == &Host; };
1060 FHostGroup* Group = Groups.FindByPredicate(Pred);
1061 if (Group == nullptr)
1062 {
1063 Group = &(Groups.Emplace_GetRef(Host));
1064 }
1065
1066 Group->AddActivity(Activity);
1067 ++BusyCount;
1068 }
1069}
1070
1073{
1074 TRACE_CPUPROFILER_EVENT_SCOPE(IasHttp::Tick);
1075
1076 ReceiveWork();
1077
1078 // We limit recv sizes as a way to control bandwidth use.
1079 int32 RecvAllowance = Throttler.GetAllowance();
1080 if (RecvAllowance <= 0)
1081 {
1082 if (PollTimeoutMs == 0)
1083 {
1084 return BusyCount;
1085 }
1086
1087 int32 ThrottleWaitMs = -RecvAllowance;
1088 if (PollTimeoutMs > 0)
1089 {
1090 ThrottleWaitMs = FMath::Min(ThrottleWaitMs, PollTimeoutMs);
1091 }
1092 FPlatformProcess::SleepNoStats(float(ThrottleWaitMs) / 1000.0f);
1093
1094 RecvAllowance = Throttler.GetAllowance();
1095 if (RecvAllowance <= 0)
1096 {
1097 return BusyCount;
1098 }
1099 }
1100
1101 uint64 CancelsLoad = Cancels.load(std::memory_order_relaxed);
1102
1103 uint32 NowMs;
1104 {
1105 // 4.2MM seconds will give us 50 days of uptime.
1106 static uint64 Freq = 0;
1107 static uint64 Base = 0;
1108 if (Freq == 0)
1109 {
1112 }
1113 uint64 NowBig = ((FPlatformTime::Cycles64() - Base) * 1000) / Freq;
1114 NowMs = uint32(NowBig);
1115 check(NowMs == NowBig);
1116 }
1117
1118 // Tick groups and then remove ones that are idle
1119 FTickState TickState = {
1121 .RecvAllowance = RecvAllowance,
1122 .PollTimeoutMs = PollTimeoutMs,
1123 .FailTimeoutMs = FailTimeoutMs,
1124 .NowMs = NowMs,
1125 };
1126 for (FHostGroup& Group : Groups)
1127 {
1128 Group.Tick(TickState);
1129 }
1130
1131 for (uint32 i = 0, n = Groups.Num(); i < n; ++i)
1132 {
1133 FHostGroup& Group = Groups[i];
1134 if (Group.IsBusy())
1135 {
1136 continue;
1137 }
1138
1139 Groups.RemoveAtSwap(i, EAllowShrinking::No);
1140 --n, --i;
1141 }
1142
1143 Throttler.ReturnUnused(RecvAllowance);
1144
1146 for (FActivityNode* Activity = TickState.DoneList.Detach(); Activity != nullptr;)
1147 {
1148 FActivityNode* Next = Activity->Next;
1149 ReturnedSlots |= (1ull << Activity->Slot);
1150
1151 FActivityNode::Destroy(Activity);
1152
1153 --BusyCount;
1154 Activity = Next;
1155 }
1156
1157 uint32 BusyBias = 0;
1158 if (ReturnedSlots)
1159 {
1160 uint64 LatestFree = FreeSlots.fetch_add(ReturnedSlots, std::memory_order_relaxed);
1161 BusyBias += (LatestFree != PrevFreeSlots);
1162 PrevFreeSlots += ReturnedSlots;
1163 }
1164
1166 {
1167 Cancels.fetch_and(~Mask, std::memory_order_relaxed);
1168 }
1169
1170 return BusyCount + BusyBias;
1171}
1172
1173
1174
1178uint32 FEventLoop::Tick(int32 PollTimeoutMs) { return Impl->Tick(PollTimeoutMs); }
1179bool FEventLoop::IsIdle() const { return Impl->IsIdle(); }
1180void FEventLoop::Cancel(FTicket Ticket) { return Impl->Cancel(Ticket); }
1182void FEventLoop::SetFailTimeout(int32 Ms) { return Impl->SetFailTimeout(Ms); }
1183
1186 FAnsiStringView Method,
1187 FAnsiStringView Url,
1188 const FRequestParams* Params)
1189{
1190 Params = (Params != nullptr) ? Params : &GDefaultParams;
1191
1193 ActivityParams.Method = Method;
1194 ActivityParams.BufferSize = Params->BufferSize;
1195 ActivityParams.bFollow30x = (Params->bAutoRedirect == true);
1196 ActivityParams.bAllowChunked = (Params->bAllowChunked == true);
1197 ActivityParams.ContentSizeEst = Params->ContentSizeEst;
1198 ActivityParams.HttpVersion = Params->HttpVersion;
1199
1200 FActivityNode* Activity = FActivityNode::Create(ActivityParams, Url, Params->VerifyCert);
1201 if (Activity == nullptr)
1202 {
1203 return FRequest();
1204 }
1205
1206 return Impl->Request(Activity);
1207}
1208
1211 FAnsiStringView Method,
1212 FAnsiStringView Path,
1213 FConnectionPool& Pool,
1214 const FRequestParams* Params)
1215{
1216 check(Pool.Ptr != nullptr);
1217 check(Params == nullptr || Params->VerifyCert == ECertRootsRefType::None); // add cert to FConPool instead
1218 check(Params == nullptr || Params->HttpVersion == EHttpVersion::Default); // set protocol version via FConPool
1219
1220 Params = (Params != nullptr) ? Params : &GDefaultParams;
1221
1223 ActivityParams.Method = Method;
1224 ActivityParams.Path = Path;
1225 ActivityParams.Host = Pool.Ptr;
1226 ActivityParams.bFollow30x = (Params->bAutoRedirect == true);
1227 ActivityParams.bAllowChunked = (Params->bAllowChunked == true);
1228 ActivityParams.ContentSizeEst = Params->ContentSizeEst;
1230 check(Activity != nullptr);
1231
1232 return Impl->Request(Activity);
1233}
1234
1236bool FEventLoop::Redirect(const FTicketStatus& Status, FTicketSink& OuterSink)
1237{
1238 const FResponse& Response = Status.GetResponse();
1239
1240 switch (Response.GetStatusCode())
1241 {
1242 case 301: // RedirectMoved
1243 case 302: // RedirectFound
1244 case 307: // RedirectTemp
1245 case 308: break; // RedirectPerm
1246 default: return false;
1247 }
1248
1249 FAnsiStringView Location = Response.GetHeader("Location");
1250 if (Location.IsEmpty())
1251 {
1252 // todo: turn source activity into an error?
1253 return false;
1254 }
1255
1256 const auto& Activity = (FActivity&)Response; // todo: yuk
1257
1258 // should we ever hit this, we'll fix it
1259 check(Activity.GetMethod().Equals("HEAD", ESearchCase::IgnoreCase) || Response.GetContentLength() == 0);
1260
1261 // Original method should remain unchanged
1262 FAnsiStringView Method = Activity.GetMethod();
1263 check(!Method.IsEmpty());
1264
1265 FRequestParams RequestParams = {
1266 .bAutoRedirect = true,
1267 };
1268
1269 FRequest ForwardRequest;
1270 if (!Location.StartsWith("http://") && !Location.StartsWith("https://"))
1271 {
1272 if (Location[0] != '/')
1273 {
1274 return false;
1275 }
1276
1277 const FHost& Host = *(Activity.GetHost());
1278
1280 Url << ((Host.GetVerifyCert() != ECertRootsRefType::None) ? "https" : "http");
1281 Url << "://";
1282 Url << Host.GetHostName();
1283 Url << ":" << Host.GetPort();
1284 Url << Location;
1285
1286 RequestParams.VerifyCert = Host.GetVerifyCert();
1287 new (&ForwardRequest) FRequest(Request(Method, Url, &RequestParams));
1288 }
1289 else
1290 {
1291 new (&ForwardRequest) FRequest(Request(Method, Location, &RequestParams));
1292 }
1293
1294 // Transfer original request headers
1295 Activity.EnumerateHeaders([&ForwardRequest] (FAnsiStringView Name, FAnsiStringView Value)
1296 {
1297 ForwardRequest.Header(Name, Value);
1298 return true;
1299 });
1300
1301 // Send the request
1303
1304 // todo: activity slots should be swapped so original slot matches ticket
1305
1306 return true;
1307}
1308
1311{
1312 FActivityNode* Activity = nullptr;
1313 Swap(Activity, Request.Ptr);
1314
1315 // Intercept sink calls to catch 30x status codes and follow them
1316 if (Activity->bFollow30x)
1317 {
1318 auto RedirectSink = [
1319 this,
1320 OuterSink=MoveTemp(Sink)
1321 ] (const FTicketStatus& Status) mutable
1322 {
1323 if (Status.GetId() == FTicketStatus::EId::Response)
1324 {
1325 if (Redirect(Status, OuterSink))
1326 {
1327 return;
1328 }
1329 }
1330
1331 if (OuterSink)
1332 {
1333 return OuterSink(Status);
1334 }
1335 };
1336 Sink = MoveTemp(RedirectSink);
1337 }
1338
1339 Activity->SetSink(MoveTemp(Sink), SinkParam);
1340 return Impl->Send(Activity);
1341}
1342
1343// }}}
1344
1345} // namespace UE::IoStore::HTTP
1346
#define check(expr)
Definition AssertionMacros.h:314
#define UE_NONCOPYABLE(TypeName)
Definition CoreMiscDefines.h:457
FPlatformTypes::int8 int8
An 8-bit signed integer.
Definition Platform.h:1121
FPlatformTypes::int64 int64
A 64-bit signed integer.
Definition Platform.h:1127
FPlatformTypes::int32 int32
A 32-bit signed integer.
Definition Platform.h:1125
FPlatformTypes::UPTRINT UPTRINT
An unsigned integer the same size as a pointer.
Definition Platform.h:1146
FPlatformTypes::uint64 uint64
A 64-bit unsigned integer.
Definition Platform.h:1117
UE_FORCEINLINE_HINT TSharedRef< CastToType, Mode > StaticCastSharedRef(TSharedRef< CastFromType, Mode > const &InSharedRef)
Definition SharedPointer.h:127
#define TRACE_CPUPROFILER_EVENT_SCOPE(Name)
Definition CpuProfilerTrace.h:528
UE::FPlatformRecursiveMutex FCriticalSection
Definition CriticalSection.h:53
auto Response
Definition ExternalRpcRegistry.cpp:598
@ Num
Definition MetalRHIPrivate.h:234
#define MIN_int32
Definition NumericLimits.h:16
#define ON_SCOPE_EXIT
Definition ScopeExit.h:73
UE_INTRINSIC_CAST UE_REWRITE constexpr std::remove_reference_t< T > && MoveTemp(T &&Obj) noexcept
Definition UnrealTemplate.h:520
FRWLock Lock
Definition UnversionedPropertySerialization.cpp:921
memcpy(InputBufferBase, BinkBlocksData, BinkBlocksSize)
uint8_t uint8
Definition binka_ue_file_header.h:8
uint16_t uint16
Definition binka_ue_file_header.h:7
uint32_t uint32
Definition binka_ue_file_header.h:6
Definition ScopeLock.h:141
Definition Array.h:670
Definition StringBuilder.h:509
constexpr bool IsEmpty() const
Definition StringView.h:180
ViewType Mid(int32 Position, int32 CharCount=MAX_int32) const
Definition StringView.h:606
constexpr int32 Len() const
Definition StringView.h:174
constexpr const CharType * GetData() const
Definition StringView.h:160
Definition Loop.inl:167
int32 IsEmpty() const
Definition Loop.inl:175
void Join(FActivityList &Rhs)
Definition Loop.inl:242
FActivityList(const FActivityList &)=delete
void operator=(FActivityList &&Rhs)
Definition Loop.inl:172
~FActivityList()
Definition Loop.inl:195
void PopAppend(FActivityList &ToList)
Definition Loop.inl:306
void Reverse()
Definition Loop.inl:229
FActivityNode * Pop()
Definition Loop.inl:279
void Prepend(FActivityNode *Node)
Definition Loop.inl:256
void Append(FActivityNode *Node)
Definition Loop.inl:264
FActivityList(FActivityList &&Rhs)
Definition Loop.inl:171
FActivityNode * MoveToHead(LAMBDA &&Predicate)
Definition Loop.inl:202
void PopPrepend(FActivityList &ToList)
Definition Loop.inl:298
FActivityNode * GetHead() const
Definition Loop.inl:176
FActivityNode * Detach()
Definition Loop.inl:177
static UE_API FCertRootsRef NoTls()
Definition Peer.inl:400
static UE_API FCertRootsRef Default()
Definition Peer.inl:406
FTicket Send(FActivityNode *Activity)
Definition Loop.inl:967
FRequest Request(FActivityNode *Activity)
Definition Loop.inl:957
void Cancel(FTicket Ticket)
Definition Loop.inl:1026
bool IsIdle() const
Definition Loop.inl:1001
uint32 Tick(int32 PollTimeoutMs=0)
Definition Loop.inl:1072
~FImpl()
Definition Loop.inl:951
void Throttle(uint32 KiBPerSec)
Definition Loop.inl:1007
void SetFailTimeout(int32 TimeoutMs)
Definition Loop.inl:1013
UE_API FTicket Send(FRequest &&Request, FTicketSink Sink, UPTRINT SinkParam=0)
Definition Loop.inl:1310
UE_API FRequest Request(FAnsiStringView Method, FAnsiStringView Url, const FRequestParams *Params=nullptr)
Definition Loop.inl:1185
UE_API uint32 Tick(int32 PollTimeoutMs=0)
Definition Loop.inl:1178
UE_API void SetFailTimeout(int32 TimeoutMs)
Definition Loop.inl:1182
UE_API void Throttle(uint32 KiBPerSec)
Definition Loop.inl:1181
UE_API ~FEventLoop()
Definition Loop.inl:1177
UE_API bool IsIdle() const
Definition Loop.inl:1179
UE_API void Cancel(FTicket Ticket)
Definition Loop.inl:1180
UE_API FEventLoop()
Definition Loop.inl:1176
Definition Loop.inl:763
const FHost & GetHost() const
Definition Loop.inl:767
FHostGroup(FHost &InHost)
Definition Loop.inl:784
void Tick(FTickState &State)
Definition Loop.inl:869
void AddActivity(FActivityNode *Activity)
Definition Loop.inl:912
bool IsBusy() const
Definition Loop.inl:766
Definition ConnectionPool.inl:10
uint32 GetMaxInflight() const
Definition ConnectionPool.inl:34
FOutcome Connect(FSocket &Socket)
Definition ConnectionPool.inl:155
FCertRootsRef GetVerifyCert() const
Definition ConnectionPool.inl:32
EHttpVersion GetHttpVersion() const
Definition ConnectionPool.inl:35
bool IsPooled() const
Definition ConnectionPool.inl:36
FAnsiStringView GetHostName() const
Definition ConnectionPool.inl:38
Definition Peer.inl:527
FOutcome GetPendingTransactId()
Definition Peer.inl:641
FOutcome Handshake()
Definition Peer.inl:594
Definition Misc.inl:73
static FOutcome Error(const char *Message, int32 Code=-1)
Definition Misc.inl:151
static FOutcome None()
Definition Misc.inl:80
Definition Loop.inl:414
FWaiter GetWaiter() const
Definition Loop.inl:449
void SetMaxInflight(uint32 Maximum)
Definition Loop.inl:443
void Fail(FTickState &State, FOutcome Reason)
Definition Loop.inl:464
bool Tick(FTickState &State)
Definition Loop.inl:657
void TickSend(FTickState &State, FHost &Host, FPoller &Poller)
Definition Loop.inl:678
void Unwait()
Definition Loop.inl:418
bool IsValid() const
Definition Peer.inl:45
FWaitable GetWaitable() const
Definition Peer.inl:42
Definition Socket.inl:207
bool Register(const FWaitable &Waitable)
Definition Socket.inl:213
Definition Client.h:126
Definition Client.h:150
Definition Socket.inl:367
Definition Misc.inl:502
Definition Client.h:195
UE_API UPTRINT GetParam() const
Definition Api.inl:175
UE_API FResponse & GetResponse() const
Definition Api.inl:196
UE_API EId GetId() const
Definition Api.inl:160
Definition Socket.inl:130
Definition Socket.inl:281
static int32 Wait(TArrayView< FWaiter > Waiters, FPoller &Poller, int32 TimeoutMs)
Definition Socket.inl:321
Definition Loop.inl:331
void AddActivity(FActivityNode *Activity)
Definition Loop.inl:347
FActivityNode * PopActivity()
Definition Loop.inl:359
void TickCancels(FTickState &State)
Definition Loop.inl:375
bool HasWork() const
Definition Loop.inl:334
@ IgnoreCase
Definition CString.h:26
Definition HttpServerHttpVersion.h:7
Definition ExpressionParserTypes.h:21
State
Definition PacketHandler.h:88
Definition Client.h:20
EHttpVersion
Definition Client.h:24
UPTRINT FCertRootsRef
Definition Client.h:56
uint64 FTicket
Definition Client.h:57
UE_STRING_CLASS Result(Forward< LhsType >(Lhs), RhsLen)
Definition String.cpp.inl:732
U16 Index
Definition radfft.cpp:71
static uint64 Cycles64()
Definition AndroidPlatformTime.h:34
static double GetSecondsPerCycle64()
Definition GenericPlatformTime.h:196
static FORCENOINLINE CORE_API void Free(void *Original)
Definition UnrealMemory.cpp:685
static const FCertRootsRef None
Definition Peer.inl:30
bool bFollow30x
Definition Loop.inl:34
Definition Loop.inl:30
bool bFollow30x
Definition Loop.inl:42
int8 Slot
Definition Loop.inl:41
static FActivityNode * Create(FParams &Params, FAnsiStringView Url={}, FCertRootsRef VerifyCert={})
Definition Loop.inl:57
FActivityNode * Next
Definition Loop.inl:40
static void Destroy(FActivityNode *Activity)
Definition Loop.inl:156
Definition Activity.inl:66
FAnsiStringView Method
Definition Activity.inl:67
Definition Activity.inl:64
void Cancel()
Definition Activity.inl:238
friend void Trace(const FActivity *, ETrace, uint32)
void SetSink(FTicketSink &&InSink, UPTRINT Param)
Definition Activity.inl:225
FOutcome Tick(FHttpPeer &Peer, int32 *MaxRecvSize=nullptr)
Definition Activity.inl:641
FHost * Host
Definition Activity.inl:149
FHost * GetHost() const
Definition Activity.inl:339
Definition Loop.inl:317
uint64 Cancels
Definition Loop.inl:319
uint32 NowMs
Definition Loop.inl:323
int32 & RecvAllowance
Definition Loop.inl:320
int32 FailTimeoutMs
Definition Loop.inl:322
class FWorkQueue * Work
Definition Loop.inl:324
FActivityList DoneList
Definition Loop.inl:318
int32 PollTimeoutMs
Definition Loop.inl:321
Definition Misc.inl:203