UDocumentation UE5.7 10.02.2026 (Source)
API documentation for Unreal Engine 5.7
TaskConcurrencyLimiter.h
Go to the documentation of this file.
1// Copyright Epic Games, Inc. All Rights Reserved.
2
3#pragma once
4
5#include "Tasks/Task.h"
9#include "CoreTypes.h"
10
11#include <AtomicQueue.h>
12
13namespace UE::Tasks
14{
15 namespace TaskConcurrencyLimiter_Private
16 {
17 // a queue of free slots in range [0 .. max_concurrency). Initially contains all slots in the range.
19 {
20 public:
21 explicit FConcurrencySlots(uint32 MaxConcurrency)
22 : FreeSlots(MaxConcurrency)
23 {
24 for (uint32 Index = IndexOffset; Index < MaxConcurrency + IndexOffset; ++Index)
25 {
26 FreeSlots.push(Index);
27 }
28 }
29
30 bool Alloc(uint32& Slot)
31 {
32 if (FreeSlots.try_pop(Slot))
33 {
34 Slot -= IndexOffset;
35 return true;
36 }
37
38 return false;
39 }
40
41 void Release(uint32 Slot)
42 {
43 FreeSlots.push(Slot + IndexOffset);
44 }
45
46 private:
47 // this queue uses 0 as a special "null" value. to work around this, slots are shifted by one for storage, thus ending up in
48 // [1 .. max_concurrency] range
49 static constexpr int32 IndexOffset = 1;
50 atomic_queue::AtomicQueueB<uint32> FreeSlots; // a bounded lock-free FIFO queue
51 };
52
53 // an implementation details of FTaskConcurrenctyLimiter
54 class FPimpl : public TSharedFromThis<FPimpl>
55 {
56 public:
58 : ConcurrencySlots(InMaxConcurrency)
59 , TaskPriority(InTaskPriority)
60 {
61 }
62
64
65 template<typename TaskFunctionType>
66 void Push(const TCHAR* DebugName, TaskFunctionType&& TaskFunction)
67 {
69
70 Task->Init(
71 DebugName,
72 TaskPriority,
73 [
74 TaskFunction = MoveTemp(TaskFunction),
75 this,
76 Pimpl = TSharedFromThis<FPimpl>::AsShared(), // to keep it alive
77 Task // self-destruct
78 ]()
79 {
80 // We can't pass the ConcurrencySlot in the lambda during creation as
81 // it's not actually acquired yet. The value will be passed using
82 // the user data when the task is launched.
83 uint32 ConcurrencySlot = (uint32)(UPTRINT)Task->GetUserData();
84
85 TaskFunction(ConcurrencySlot);
86 CompleteWorkItem(ConcurrencySlot);
87 }
88 );
89
90 AddWorkItem(Task.Get());
91 }
92
94
95 private:
96 void CORE_API AddWorkItem(LowLevelTasks::FTask* Task);
97 void CORE_API ProcessQueue(uint32 ConcurrencySlot, bool bSkipFirstWakeUp);
98 void CORE_API ProcessQueueFromWorker(uint32 ConcurrencySlot);
99 void CORE_API ProcessQueueFromPush(uint32 ConcurrencySlot);
100 void CORE_API CompleteWorkItem(uint32 ConcurrencySlot);
101
102 FConcurrencySlots ConcurrencySlots; // free slots queue. used also to limit concurrency
103 ETaskPriority TaskPriority;
104 TLockFreePointerListFIFO<LowLevelTasks::FTask, PLATFORM_CACHE_LINE_SIZE> WorkQueue; // a queue of user-provided task functions
105 std::atomic<uint32> NumWorkItems { 0 };
106 std::atomic<FEvent*> CompletionEvent { nullptr };
107 };
108
109 } // namespace TaskConcurrencyLimiter_Private
110
117 {
118 public:
125 explicit FTaskConcurrencyLimiter(uint32 MaxConcurrency, ETaskPriority TaskPriority = ETaskPriority::Default)
126 : Pimpl(MakeShared<TaskConcurrencyLimiter_Private::FPimpl>(MaxConcurrency, TaskPriority))
127 {
128 }
129
139 template<typename TaskFunctionType>
140 void Push(const TCHAR* DebugName, TaskFunctionType&& TaskFunction)
141 {
142 Pimpl->Push(DebugName, MoveTemp(TaskFunction));
143 }
144
156 {
157 return Pimpl->Wait(Timeout);
158 }
159
160 private:
162 };
163
164} // namespace UE::Tasks
FPlatformTypes::TCHAR TCHAR
Either ANSICHAR or WIDECHAR, depending on whether the platform supports wide characters or the requir...
Definition Platform.h:1135
FPlatformTypes::int32 int32
A 32-bit signed integer.
Definition Platform.h:1125
FPlatformTypes::UPTRINT UPTRINT
An unsigned integer the same size as a pointer.
Definition Platform.h:1146
TSharedRef< InObjectType, InMode > MakeShared(InArgTypes &&... Args)
Definition SharedPointer.h:2009
UE_FORCEINLINE_HINT TSharedRef< CastToType, Mode > StaticCastSharedRef(TSharedRef< CastFromType, Mode > const &InSharedRef)
Definition SharedPointer.h:127
UE_INTRINSIC_CAST UE_REWRITE constexpr std::remove_reference_t< T > && MoveTemp(T &&Obj) noexcept
Definition UnrealTemplate.h:520
uint32_t uint32
Definition binka_ue_file_header.h:6
Definition Task.h:310
Definition LockFreeList.h:910
Definition SharedPointer.h:1640
Definition SharedPointer.h:692
Definition SharedPointer.h:153
Definition TaskConcurrencyLimiter.h:117
FTaskConcurrencyLimiter(uint32 MaxConcurrency, ETaskPriority TaskPriority=ETaskPriority::Default)
Definition TaskConcurrencyLimiter.h:125
bool Wait(FTimespan Timeout=FTimespan::MaxValue())
Definition TaskConcurrencyLimiter.h:155
void Push(const TCHAR *DebugName, TaskFunctionType &&TaskFunction)
Definition TaskConcurrencyLimiter.h:140
bool Alloc(uint32 &Slot)
Definition TaskConcurrencyLimiter.h:30
void Release(uint32 Slot)
Definition TaskConcurrencyLimiter.h:41
FConcurrencySlots(uint32 MaxConcurrency)
Definition TaskConcurrencyLimiter.h:21
Definition TaskConcurrencyLimiter.h:55
bool CORE_API Wait(FTimespan Timeout)
Definition TaskConcurrencyLimiter.cpp:35
void Push(const TCHAR *DebugName, TaskFunctionType &&TaskFunction)
Definition TaskConcurrencyLimiter.h:66
FPimpl(uint32 InMaxConcurrency, ETaskPriority InTaskPriority)
Definition TaskConcurrencyLimiter.h:57
CORE_API ~FPimpl()
Definition TaskConcurrencyLimiter.cpp:14
Definition AnalyticsProviderLog.h:8
U16 Index
Definition radfft.cpp:71
Definition Timespan.h:76
static FTimespan MaxValue()
Definition Timespan.h:686