39 static void Destroy(FActivityNode* Activity);
50FActivityNode::FActivityNode(
const FParams& Params)
52, bFollow30x(Params.bFollow30x)
62 check(!Params.Method.IsEmpty());
66 uint32 BufferSize = Params.BufferSize;
67 BufferSize = (BufferSize >= 128) ? BufferSize : 128;
68 BufferSize = (BufferSize + 15) & ~15;
73 check(Params.Host !=
nullptr);
74 check(Params.Host->IsPooled());
76 Params.Buffer = (
char*)(Ptr + 1);
77 Params.BufferSize =
uint16(BufferSize);
78 Params.bIsKeepAlive =
true;
104 uint32 BufferSize = Params.BufferSize;
105 BufferSize = (BufferSize >= 128) ? BufferSize : 128;
106 BufferSize +=
sizeof(
FHost);
107 BufferSize += HostName.
Len() + 1;
108 BufferSize = (BufferSize + 15) & ~15;
140 .HttpVersion= Version,
141 .VerifyCert = VerifyCert,
146 check(Params.Host ==
nullptr);
150 Params.BufferSize =
uint16(AllocSize -
ptrdiff_t(Params.Buffer - (
char*)Ptr));
151 Params.bIsKeepAlive =
false;
159 Ptr->~FActivityNode();
186 template <
typename LAMBDA>
197 check(Head ==
nullptr);
201template <
typename LAMBDA>
206 for (; Node !=
nullptr; Prev = Node, Node = Node->
Next)
208 if (!Predicate(Node))
219 Prev->Next = Node->
Next;
246 Swap(Head, Rhs.Head);
251 for (; Tail->
Next !=
nullptr; Tail = Tail->
Next);
274 for (; Tail->
Next !=
nullptr; Tail = Tail->
Next);
293 Node->
Next =
nullptr;
300 check(Head !=
nullptr);
308 check(Head !=
nullptr);
355 ActiveSlots |= (1ull << Activity->
Slot);
362 if (Activity ==
nullptr)
367 check(ActiveSlots & (1ull << Activity->
Slot));
368 ActiveSlots ^= (1ull << Activity->
Slot);
377 if (State.Cancels == 0 || (State.Cancels & ActiveSlots) == 0)
394 Activity->
Next =
nullptr;
396 if (
uint64 Slot = (1ull << Activity->
Slot); (State.Cancels & Slot) == 0)
404 State.DoneList.Prepend(Activity);
433 uint8 IsKeepAlive = 0;
434 uint8 InflightNum = 0;
435 uint8 InflightMax = 1;
436 bool bNegotiating =
false;
437 bool bWaiting =
false;
473 Activity->Fail(Reason);
474 State.DoneList.Prepend(Activity);
483 bNegotiating =
false;
487bool FPeerGroup::MaybeStartNewWork(
FTickState& State)
489 if (InflightNum >= InflightMax)
return false;
490 if (IsKeepAlive == 0)
return false;
491 if (!State.Work->HasWork())
return false;
502 if (InflightNum == InflightMax)
507 Activity = State.Work->PopActivity();
509 while (Activity !=
nullptr);
515void FPeerGroup::Negotiate(FTickState& State)
534 bNegotiating =
false;
535 return SendInternal(State);
539void FPeerGroup::RecvInternal(FTickState& State)
541 check(bNegotiating ==
false);
544 FActivityNode* Activity =
nullptr;
557 [TransactId] (FActivityNode* Activity)
559 return Activity->GetTransactId() == TransactId;
563 if (Activity ==
nullptr)
580 check(Activity !=
nullptr);
582 IsKeepAlive &=
uint8(Activity->GetTransaction()->IsKeepAlive() ==
true);
583 LastUseMs =
State.NowMs;
602 return RecvInternal(State);
611 if (MaybeStartNewWork(State))
617 if (IsKeepAlive == 0)
624void FPeerGroup::SendInternal(FTickState& State)
626 check(bNegotiating ==
false);
627 check(IsKeepAlive == 1);
630 FActivityNode* Activity = Send.
GetHead();
652 return SendInternal(State);
669 if (!Recv.
IsEmpty() && State.RecvAllowance)
681 if (InflightNum >= InflightMax)
693 LastUseMs = State.NowMs;
742 MaybeStartNewWork(State);
748 return Negotiate(State);
750 return SendInternal(State);
766 bool IsBusy()
const {
return BusyCount != 0; }
778 int32 WaitTimeAccum = 0;
788 PeerGroups.SetNum(
Num);
792 Group.SetMaxInflight(Inflight);
801 for (
uint32 i = 0, n = PeerGroups.Num(); i < n; ++i)
824 if (PollTimeoutMs < 0)
826 PollTimeoutMs =
State.FailTimeoutMs;
835 WaitTimeAccum += PollTimeoutMs;
837 if (
State.PollTimeoutMs < 0 || WaitTimeAccum >=
State.FailTimeoutMs)
858 PeerGroups[
Index].Unwait();
873 if (BusyCount = Work.
HasWork(); BusyCount)
885 Group.TickSend(State, Host, Poller);
890 if (
int32 Result =
Wait(State); Result < 0)
898 Group.Fail(State, Reason);
907 BusyCount += (
Group.Tick(State) ==
true);
940 std::atomic<uint64> FreeSlots = ~0
ull;
941 std::atomic<uint64> Cancels = 0;
953 check(BusyCount == 0);
975 for (;; FPlatformProcess::SleepNoStats(0.0f))
990 Activity->
Slot =
int8(63 - FMath::CountLeadingZeros64(Slot));
1003 return FreeSlots.load(std::memory_order_relaxed) == ~0
ull;
1017 FailTimeoutMs = TimeoutMs;
1028 Cancels.fetch_or(
Ticket, std::memory_order_relaxed);
1032void FEventLoop::FImpl::ReceiveWork()
1035 if (FreeSlots == PrevFreeSlots)
1052 FActivityNode* Activity =
Collected.Detach();
1053 for (FActivityNode*
Next; Activity !=
nullptr; Activity =
Next)
1055 Next = Activity->Next;
1056 Activity->Next =
nullptr;
1058 FHost& Host = *(Activity->GetHost());
1059 auto Pred = [&Host] (
const FHostGroup& Lhs) {
return &Lhs.GetHost() == &Host; };
1060 FHostGroup*
Group = Groups.FindByPredicate(
Pred);
1061 if (
Group ==
nullptr)
1063 Group = &(Groups.Emplace_GetRef(Host));
1066 Group->AddActivity(Activity);
1079 int32 RecvAllowance = Throttler.GetAllowance();
1080 if (RecvAllowance <= 0)
1082 if (PollTimeoutMs == 0)
1088 if (PollTimeoutMs > 0)
1094 RecvAllowance = Throttler.GetAllowance();
1095 if (RecvAllowance <= 0)
1121 .RecvAllowance = RecvAllowance,
1122 .PollTimeoutMs = PollTimeoutMs,
1123 .FailTimeoutMs = FailTimeoutMs,
1128 Group.Tick(TickState);
1131 for (
uint32 i = 0, n = Groups.Num(); i < n; ++i)
1143 Throttler.ReturnUnused(RecvAllowance);
1167 Cancels.fetch_and(~
Mask, std::memory_order_relaxed);
1190 Params = (Params !=
nullptr) ? Params : &GDefaultParams;
1201 if (Activity ==
nullptr)
1206 return Impl->Request(Activity);
1216 check(Pool.Ptr !=
nullptr);
1220 Params = (Params !=
nullptr) ? Params : &GDefaultParams;
1230 check(Activity !=
nullptr);
1232 return Impl->Request(Activity);
1246 default:
return false;
1250 if (Location.IsEmpty())
1256 const auto& Activity = (FActivity&)
Response;
1266 .bAutoRedirect =
true,
1270 if (!Location.StartsWith(
"http://") && !Location.StartsWith(
"https://"))
1272 if (Location[0] !=
'/')
1277 const FHost& Host = *(Activity.GetHost());
1282 Url << Host.GetHostName();
1283 Url <<
":" << Host.GetPort();
1340 return Impl->Send(Activity);
#define check(expr)
Definition AssertionMacros.h:314
#define UE_NONCOPYABLE(TypeName)
Definition CoreMiscDefines.h:457
UE_FORCEINLINE_HINT TSharedRef< CastToType, Mode > StaticCastSharedRef(TSharedRef< CastFromType, Mode > const &InSharedRef)
Definition SharedPointer.h:127
#define TRACE_CPUPROFILER_EVENT_SCOPE(Name)
Definition CpuProfilerTrace.h:528
UE::FPlatformRecursiveMutex FCriticalSection
Definition CriticalSection.h:53
auto Response
Definition ExternalRpcRegistry.cpp:598
#define MIN_int32
Definition NumericLimits.h:16
#define ON_SCOPE_EXIT
Definition ScopeExit.h:73
UE_INTRINSIC_CAST UE_REWRITE constexpr std::remove_reference_t< T > && MoveTemp(T &&Obj) noexcept
Definition UnrealTemplate.h:520
FRWLock Lock
Definition UnversionedPropertySerialization.cpp:921
memcpy(InputBufferBase, BinkBlocksData, BinkBlocksSize)
Definition ScopeLock.h:141
Definition StringBuilder.h:509
constexpr bool IsEmpty() const
Definition StringView.h:180
ViewType Mid(int32 Position, int32 CharCount=MAX_int32) const
Definition StringView.h:606
constexpr int32 Len() const
Definition StringView.h:174
constexpr const CharType * GetData() const
Definition StringView.h:160
int32 IsEmpty() const
Definition Loop.inl:175
void Join(FActivityList &Rhs)
Definition Loop.inl:242
FActivityList(const FActivityList &)=delete
void operator=(FActivityList &&Rhs)
Definition Loop.inl:172
~FActivityList()
Definition Loop.inl:195
void PopAppend(FActivityList &ToList)
Definition Loop.inl:306
void Reverse()
Definition Loop.inl:229
FActivityNode * Pop()
Definition Loop.inl:279
void Prepend(FActivityNode *Node)
Definition Loop.inl:256
void Append(FActivityNode *Node)
Definition Loop.inl:264
FActivityList(FActivityList &&Rhs)
Definition Loop.inl:171
FActivityNode * MoveToHead(LAMBDA &&Predicate)
Definition Loop.inl:202
void PopPrepend(FActivityList &ToList)
Definition Loop.inl:298
FActivityNode * GetHead() const
Definition Loop.inl:176
FActivityNode * Detach()
Definition Loop.inl:177
static UE_API FCertRootsRef NoTls()
Definition Peer.inl:400
static UE_API FCertRootsRef Default()
Definition Peer.inl:406
FTicket Send(FActivityNode *Activity)
Definition Loop.inl:967
FRequest Request(FActivityNode *Activity)
Definition Loop.inl:957
void Cancel(FTicket Ticket)
Definition Loop.inl:1026
bool IsIdle() const
Definition Loop.inl:1001
uint32 Tick(int32 PollTimeoutMs=0)
Definition Loop.inl:1072
~FImpl()
Definition Loop.inl:951
void Throttle(uint32 KiBPerSec)
Definition Loop.inl:1007
void SetFailTimeout(int32 TimeoutMs)
Definition Loop.inl:1013
UE_API FTicket Send(FRequest &&Request, FTicketSink Sink, UPTRINT SinkParam=0)
Definition Loop.inl:1310
UE_API FRequest Request(FAnsiStringView Method, FAnsiStringView Url, const FRequestParams *Params=nullptr)
Definition Loop.inl:1185
UE_API uint32 Tick(int32 PollTimeoutMs=0)
Definition Loop.inl:1178
UE_API void SetFailTimeout(int32 TimeoutMs)
Definition Loop.inl:1182
UE_API void Throttle(uint32 KiBPerSec)
Definition Loop.inl:1181
UE_API ~FEventLoop()
Definition Loop.inl:1177
UE_API bool IsIdle() const
Definition Loop.inl:1179
UE_API void Cancel(FTicket Ticket)
Definition Loop.inl:1180
UE_API FEventLoop()
Definition Loop.inl:1176
const FHost & GetHost() const
Definition Loop.inl:767
FHostGroup(FHost &InHost)
Definition Loop.inl:784
void Tick(FTickState &State)
Definition Loop.inl:869
void AddActivity(FActivityNode *Activity)
Definition Loop.inl:912
bool IsBusy() const
Definition Loop.inl:766
Definition ConnectionPool.inl:10
uint32 GetMaxInflight() const
Definition ConnectionPool.inl:34
FOutcome Connect(FSocket &Socket)
Definition ConnectionPool.inl:155
FCertRootsRef GetVerifyCert() const
Definition ConnectionPool.inl:32
EHttpVersion GetHttpVersion() const
Definition ConnectionPool.inl:35
bool IsPooled() const
Definition ConnectionPool.inl:36
FAnsiStringView GetHostName() const
Definition ConnectionPool.inl:38
FOutcome GetPendingTransactId()
Definition Peer.inl:641
FOutcome Handshake()
Definition Peer.inl:594
static FOutcome Error(const char *Message, int32 Code=-1)
Definition Misc.inl:151
static FOutcome None()
Definition Misc.inl:80
FWaiter GetWaiter() const
Definition Loop.inl:449
void SetMaxInflight(uint32 Maximum)
Definition Loop.inl:443
void Fail(FTickState &State, FOutcome Reason)
Definition Loop.inl:464
bool Tick(FTickState &State)
Definition Loop.inl:657
void TickSend(FTickState &State, FHost &Host, FPoller &Poller)
Definition Loop.inl:678
void Unwait()
Definition Loop.inl:418
bool IsValid() const
Definition Peer.inl:45
FWaitable GetWaitable() const
Definition Peer.inl:42
Definition Socket.inl:207
bool Register(const FWaitable &Waitable)
Definition Socket.inl:213
Definition Socket.inl:367
UE_API UPTRINT GetParam() const
Definition Api.inl:175
UE_API FResponse & GetResponse() const
Definition Api.inl:196
UE_API EId GetId() const
Definition Api.inl:160
Definition Socket.inl:130
Definition Socket.inl:281
static int32 Wait(TArrayView< FWaiter > Waiters, FPoller &Poller, int32 TimeoutMs)
Definition Socket.inl:321
void AddActivity(FActivityNode *Activity)
Definition Loop.inl:347
FActivityNode * PopActivity()
Definition Loop.inl:359
void TickCancels(FTickState &State)
Definition Loop.inl:375
bool HasWork() const
Definition Loop.inl:334
@ IgnoreCase
Definition CString.h:26
Definition HttpServerHttpVersion.h:7
Definition ExpressionParserTypes.h:21
State
Definition PacketHandler.h:88
EHttpVersion
Definition Client.h:24
UPTRINT FCertRootsRef
Definition Client.h:56
uint64 FTicket
Definition Client.h:57
UE_STRING_CLASS Result(Forward< LhsType >(Lhs), RhsLen)
Definition String.cpp.inl:732
U16 Index
Definition radfft.cpp:71
static uint64 Cycles64()
Definition AndroidPlatformTime.h:34
static FORCENOINLINE CORE_API void Free(void *Original)
Definition UnrealMemory.cpp:685
static const FCertRootsRef None
Definition Peer.inl:30
bool bFollow30x
Definition Loop.inl:34
bool bFollow30x
Definition Loop.inl:42
int8 Slot
Definition Loop.inl:41
static FActivityNode * Create(FParams &Params, FAnsiStringView Url={}, FCertRootsRef VerifyCert={})
Definition Loop.inl:57
FActivityNode * Next
Definition Loop.inl:40
static void Destroy(FActivityNode *Activity)
Definition Loop.inl:156
Definition Activity.inl:66
FAnsiStringView Method
Definition Activity.inl:67
Definition Activity.inl:64
void Cancel()
Definition Activity.inl:238
friend void Trace(const FActivity *, ETrace, uint32)
void SetSink(FTicketSink &&InSink, UPTRINT Param)
Definition Activity.inl:225
FOutcome Tick(FHttpPeer &Peer, int32 *MaxRecvSize=nullptr)
Definition Activity.inl:641
FHost * Host
Definition Activity.inl:149
FHost * GetHost() const
Definition Activity.inl:339
uint64 Cancels
Definition Loop.inl:319
uint32 NowMs
Definition Loop.inl:323
int32 & RecvAllowance
Definition Loop.inl:320
int32 FailTimeoutMs
Definition Loop.inl:322
class FWorkQueue * Work
Definition Loop.inl:324
FActivityList DoneList
Definition Loop.inl:318
int32 PollTimeoutMs
Definition Loop.inl:321