UDocumentation UE5.7 10.02.2026 (Source)
API documentation for Unreal Engine 5.7
QueuedThreadPoolWrapper.h
Go to the documentation of this file.
1// Copyright Epic Games, Inc. All Rights Reserved.
2
3#pragma once
4
9#include "Containers/Array.h"
10#include "Containers/Map.h"
11#include "CoreMinimal.h"
16#include "HAL/CriticalSection.h"
17#include "HAL/Platform.h"
19#include "HAL/PlatformCrt.h"
20#include "HAL/PlatformProcess.h"
22#include "HAL/UnrealMemory.h"
24#include "Misc/IQueuedWork.h"
25#include "Misc/MemStack.h"
26#include "Misc/ScopeLock.h"
27#include "QueuedThreadPool.h"
28#include "ScopeRWLock.h"
29#include "Stats/Stats.h"
30#include "Templates/Atomic.h"
31#include "Templates/Function.h"
33
34#include <atomic>
35
37
44{
45public:
53
57 CORE_API void Pause();
58
63
67 CORE_API void SetMaxConcurrency(int32 MaxConcurrency = -1);
68
71 CORE_API int32 GetNumThreads() const override;
72 int32 GetCurrentConcurrency() const { return CurrentConcurrency.load(std::memory_order_relaxed); }
73
74protected:
76 {
77 public:
79
82
83 FScheduledWork(const FScheduledWork&&) = delete;
85
86 CORE_API ~FScheduledWork() override;
87
88 const TCHAR * GetDebugName() const final
89 {
90 // return DebugName of innerwork, if any
91 if ( Work )
92 {
93 return Work->GetDebugName();
94 }
95 else
96 {
97 return TEXT("FScheduledWork");
98 }
99 }
100 protected:
101 virtual void DoThreadedWork() override
102 {
103 {
104 // Add this object as an execution context that can be retrieved via
105 // FExecutionResourceContext::Get() if a task needs to hold on the
106 // resources acquired (i.e. Concurrency Limit, Memory Pressure, etc...)
107 // longer than for the DoThreadedWork() scope.
109
111 }
112
113 Release();
114 }
115 private:
116 FReturnedRefCountValue AddRef() const override
117 {
118 return FReturnedRefCountValue{uint32(NumRefs.Increment())};
119 }
120
121 uint32 Release() const override
122 {
123 uint32 Refs = uint32(NumRefs.Decrement());
124
125 // When the last ref is released, we call the schedule function of the parent pool
126 // so that OnUnschedule can release any resources acquired by the OnSchedule function and
127 // the scheduling of the next work items can proceed.
128 if (Refs == 0)
129 {
130 ParentPool->Schedule(const_cast<FScheduledWork*>(this));
131 }
132 return Refs;
133 }
134
135 uint32 GetRefCount() const override
136 {
137 return uint32(NumRefs.GetValue());
138 }
139
141 {
142 check(GetRefCount() == 0);
143 ParentPool = InParentPool;
144 Work = InWork;
145 Priority = InPriority;
146 AddRef();
147 }
148
149
150
151 void Abandon() override
152 {
153 Work->Abandon();
154
155 Release();
156 }
157
158 EQueuedWorkFlags GetQueuedWorkFlags() const override
159 {
160 return Work->GetQueuedWorkFlags();
161 }
162
163 int64 GetRequiredMemory() const override
164 {
165 return Work->GetRequiredMemory();
166 }
167
168 IQueuedWork* GetInnerWork() const
169 {
170 return Work;
171 }
172
173 EQueuedWorkPriority GetPriority() const
174 {
175 return Priority;
176 }
177
178 void Reset()
179 {
180 Work = nullptr;
181 }
182
183 mutable FThreadSafeCounter NumRefs;
185 FQueuedThreadPoolWrapper* ParentPool;
186 protected:
188 private:
190 };
191
192 // A critical section is used since we need reentrancy support from the same thread
195
196 // Can be overriden to dynamically control the maximum concurrency
197 virtual int32 GetMaxConcurrency() const { return MaxConcurrency.load(std::memory_order_relaxed); }
198
199 // Can be overriden to know when work has been scheduled.
200 virtual void OnScheduled(const IQueuedWork*) {}
201
202 // Can be overriden to know when work has been unscheduled.
203 virtual void OnUnscheduled(const IQueuedWork*) {}
204
205 // Can be overriden to allocate a more specialized version if needed.
207private:
208 CORE_API FScheduledWork* AllocateWork(IQueuedWork* InnerWork, EQueuedWorkPriority Priority);
209 CORE_API bool CanSchedule(EQueuedWorkPriority Priority) const;
210 CORE_API bool Create(uint32 InNumQueuedThreads, uint32 StackSize, EThreadPriority ThreadPriority, const TCHAR* Name) override;
211 CORE_API void Destroy() override;
212 CORE_API void Schedule(FScheduledWork* Work = nullptr);
213 CORE_API void ReleaseWorkNoLock(FScheduledWork* Work);
214 CORE_API bool TryRetractWorkNoLock(EQueuedWorkPriority InPriority);
215
217
218 FQueuedThreadPool* WrappedQueuedThreadPool;
221 std::atomic<int32> MaxConcurrency;
222 int32 MaxTaskToSchedule;
223 std::atomic<int32> CurrentConcurrency;
224 EQueuedWorkPriority WrappedQueuePriority;
225 bool bIsScheduling = false;
226};
227
261
265{
266public:
271 : TaskCount(0)
272 , bIsExiting(0)
273 {
275 {
276 PriorityMapper = InPriorityMapper;
277 }
278 else
279 {
280 PriorityMapper = [this](EQueuedWorkPriority InPriority) { return GetDefaultPriorityMapping(InPriority); };
281 }
282 }
283
288 : TaskCount(0)
289 , bIsExiting(0)
290 {
291 PriorityMapper = [InDesiredThread](EQueuedWorkPriority InPriority) { return InDesiredThread; };
292 }
293
298private:
300 {
301 check(bIsExiting == false);
302 TaskCount++;
305 {
306 FMemMark Mark(FMemStack::Get());
307 InQueuedWork->DoThreadedWork();
308 OnTaskCompleted(InQueuedWork);
309 },
311 nullptr,
312 PriorityMapper(InPriority)
313 );
314 }
315
316 bool RetractQueuedWork(IQueuedWork* InQueuedWork) override
317 {
318 // The task graph doesn't support retraction for now
319 return false;
320 }
321
322 void OnTaskCompleted(IQueuedWork* InQueuedWork)
323 {
324 if (--TaskCount == 0 && bIsExiting)
325 {
326 Finished.Notify();
327 }
328 }
329
330 int32 GetNumThreads() const override
331 {
333 }
334
336 {
339 {
341 }
343 {
345 }
346 return DesiredThread;
347 }
348protected:
349 bool Create(uint32 InNumQueuedThreads, uint32 StackSize, EThreadPriority ThreadPriority, const TCHAR* Name) override
350 {
351 return true;
352 }
353
354 void Destroy() override
355 {
356 bIsExiting = true;
357
358 if (TaskCount != 0)
359 {
360 Finished.Wait();
361 }
362 }
363private:
365 TAtomic<uint32> TaskCount;
366 TAtomic<bool> bIsExiting;
367 UE::FManualResetEvent Finished;
368};
369
373{
374 /* Internal data of the scheduler used for cancellation */
375 struct FQueuedWorkInternalData : TConcurrentLinearObject<FQueuedWorkInternalData, FTaskGraphBlockAllocationTag>, IQueuedWorkInternalData
376 {
378
379 virtual bool Retract()
380 {
381 return Task.TryCancel();
382 }
383 };
384public:
392
397
398 void* operator new(size_t size)
399 {
400 return FMemory::Malloc(size, 128);
401 }
402
403 void operator delete(void* ptr)
404 {
405 FMemory::Free(ptr);
406 }
407
411 void Pause()
412 {
413 bIsPaused = true;
414 }
415
420 {
421 for (uint32 i = 0; i < uint32(InNumQueuedWork); i++)
422 {
423 FQueuedWorkInternalData* QueuedWork = Dequeue();
424 if (!QueuedWork)
425 {
426 break;
427 }
428 TaskCount.fetch_add(1, std::memory_order_acquire);
430 }
431
432 if (InNumQueuedWork == -1)
433 {
434 bIsPaused = false;
435 }
436
437 bool bWakeUpWorker = true;
438 ScheduleTasks(bWakeUpWorker);
439 }
440
441private:
442 void ScheduleTasks(bool &bWakeUpWorker)
443 {
444 while (!bIsPaused)
445 {
446 FQueuedWorkInternalData* QueuedWork = Dequeue();
447 if (QueuedWork)
448 {
449 verifySlow(Scheduler->TryLaunch(QueuedWork->Task, bWakeUpWorker ? LowLevelTasks::EQueuePreference::GlobalQueuePreference : LowLevelTasks::EQueuePreference::LocalQueuePreference, bWakeUpWorker));
450 TaskCount.fetch_add(1, std::memory_order_acquire);
451 bWakeUpWorker = true;
452 }
453 else
454 {
455 break;
456 }
457 }
458 }
459
460 void FinalizeExecution()
461 {
462 if (TaskCount.fetch_sub(1, std::memory_order_release) == 1 && bIsExiting)
463 {
464 Finished.Notify();
465 }
466 else
467 {
468 bool bWakeUpWorker = false;
469 ScheduleTasks(bWakeUpWorker);
470 }
471 }
472
474 {
475 check(bIsExiting == false);
476
477 FQueuedWorkInternalData* QueuedWorkInternalData = new FQueuedWorkInternalData();
478 InQueuedWork->InternalData = QueuedWorkInternalData;
479 EQueuedWorkPriority Priority = PriorityMapper(InPriority);
480
484
485 QueuedWorkInternalData->Task.Init(TEXT("FQueuedLowLevelThreadPoolTask"), TaskPriority, [InQueuedWork, InternalData = InQueuedWork->InternalData, Deleter = LowLevelTasks::TDeleter<FQueuedLowLevelThreadPool, &FQueuedLowLevelThreadPool::FinalizeExecution>{ this }]
486 {
487 FMemMark Mark(FMemStack::Get());
488 InQueuedWork->DoThreadedWork();
489 }, Flags);
490
491 if (!bIsPaused)
492 {
493 TaskCount.fetch_add(1, std::memory_order_acquire);
495 }
496 else
497 {
499 }
500 }
501
502 bool RetractQueuedWork(IQueuedWork* InQueuedWork) override
503 {
504 bool bCancelled = false;
505 if(InQueuedWork->InternalData.IsValid())
506 {
507 bCancelled = InQueuedWork->InternalData->Retract();
508 InQueuedWork->InternalData = nullptr;
509 }
510
511 bool bWakeUpWorker = true;
512 ScheduleTasks(bWakeUpWorker);
513 return bCancelled;
514 }
515
516 int32 GetNumThreads() const override
517 {
518 return Scheduler->GetNumWorkers();
519 }
520
521protected:
523 {
524 return true;
525 }
526
527 void Destroy() override
528 {
529 bIsExiting = true;
530
531 while (true)
532 {
533 FQueuedWorkInternalData* QueuedWork = Dequeue();
534 if (!QueuedWork)
535 {
536 break;
537 }
538
539 verify(QueuedWork->Retract());
540 TaskCount++;
542 }
543
544 if (TaskCount != 0)
545 {
546 Finished.Wait();
547 }
548 }
549
550private:
552
553 inline FQueuedWorkInternalData* Dequeue()
554 {
555 for (int32 i = 0; i < int32(EQueuedWorkPriority::Count); i++)
556 {
557 FQueuedWorkInternalData* QueuedWork = PendingWork[i].dequeue();
558 if (QueuedWork)
559 {
560 return QueuedWork;
561 }
562 }
563 return nullptr;
564 }
565
566 inline void Enqueue(EQueuedWorkPriority Priority, FQueuedWorkInternalData* Item)
567 {
568 PendingWork[int32(Priority)].enqueue(Item);
569 }
570
571 LowLevelTasks::FScheduler* Scheduler = nullptr;
573
574 std::atomic_uint TaskCount{0};
575 std::atomic_bool bIsExiting{false};
576 std::atomic_bool bIsPaused{false};
577 UE::FManualResetEvent Finished;
578};
#define check(expr)
Definition AssertionMacros.h:314
#define verifySlow(expr)
Definition AssertionMacros.h:334
#define verify(expr)
Definition AssertionMacros.h:319
#define TEXT(x)
Definition Platform.h:1272
FPlatformTypes::TCHAR TCHAR
Either ANSICHAR or WIDECHAR, depending on whether the platform supports wide characters or the requir...
Definition Platform.h:1135
FPlatformTypes::int64 int64
A 64-bit signed integer.
Definition Platform.h:1127
FPlatformTypes::int32 int32
A 32-bit signed integer.
Definition Platform.h:1125
#define QUICK_USE_CYCLE_STAT(StatId, GroupId)
Definition Stats.h:668
UE_FORCEINLINE_HINT TSharedRef< CastToType, Mode > StaticCastSharedRef(TSharedRef< CastFromType, Mode > const &InSharedRef)
Definition SharedPointer.h:127
UE::FPlatformRecursiveMutex FCriticalSection
Definition CriticalSection.h:53
EThreadPriority
Definition GenericPlatformAffinity.h:26
EQueuedWorkFlags
Definition IQueuedWork.h:10
constexpr auto DefaultQueuedWorkPriorityWrapper
Definition QueuedThreadPoolWrapper.h:36
EQueuedWorkPriority
Definition QueuedThreadPool.h:14
uint32_t uint32
Definition binka_ue_file_header.h:6
Definition FAAArrayQueue.h:81
void enqueue(T *item, EnqueueHazard &Hazard)
Definition FAAArrayQueue.h:209
T * dequeue(DequeueHazard &Hazard)
Definition FAAArrayQueue.h:291
Definition ExecutionResource.h:29
Definition MemStack.h:506
Definition QueuedThreadPoolWrapper.h:373
void Destroy() override
Definition QueuedThreadPoolWrapper.h:527
bool Create(uint32 InNumQueuedThreads, uint32 InStackSize, EThreadPriority InThreadPriority, const TCHAR *InName) override
Definition QueuedThreadPoolWrapper.h:522
void Pause()
Definition QueuedThreadPoolWrapper.h:411
~FQueuedLowLevelThreadPool()
Definition QueuedThreadPoolWrapper.h:393
void Resume(int32 InNumQueuedWork=-1)
Definition QueuedThreadPoolWrapper.h:419
FQueuedLowLevelThreadPool(TFunction< EQueuedWorkPriority(EQueuedWorkPriority)> InPriorityMapper=DefaultQueuedWorkPriorityWrapper, LowLevelTasks::FScheduler *InScheduler=&LowLevelTasks::FScheduler::Get())
Definition QueuedThreadPoolWrapper.h:388
Definition QueuedThreadPoolWrapper.h:234
FQueuedThreadPoolDynamicWrapper(FQueuedThreadPool *InWrappedQueuedThreadPool, int32 InMaxConcurrency=-1, TFunction< EQueuedWorkPriority(EQueuedWorkPriority)> InPriorityMapper=DefaultQueuedWorkPriorityWrapper)
Definition QueuedThreadPoolWrapper.h:241
void Sort(TFunctionRef< bool(const IQueuedWork *Lhs, const IQueuedWork *Rhs)> Predicate)
Definition QueuedThreadPoolWrapper.h:255
void AddQueuedWork(IQueuedWork *InQueuedWork, EQueuedWorkPriority InPriority=EQueuedWorkPriority::Normal) override
Definition QueuedThreadPoolWrapper.h:246
Definition QueuedThreadPoolWrapper.h:265
~FQueuedThreadPoolTaskGraphWrapper()
Definition QueuedThreadPoolWrapper.h:294
void Destroy() override
Definition QueuedThreadPoolWrapper.h:354
FQueuedThreadPoolTaskGraphWrapper(ENamedThreads::Type InDesiredThread)
Definition QueuedThreadPoolWrapper.h:287
FQueuedThreadPoolTaskGraphWrapper(TFunction< ENamedThreads::Type(EQueuedWorkPriority)> InPriorityMapper=nullptr)
Definition QueuedThreadPoolWrapper.h:270
bool Create(uint32 InNumQueuedThreads, uint32 StackSize, EThreadPriority ThreadPriority, const TCHAR *Name) override
Definition QueuedThreadPoolWrapper.h:349
Definition QueuedThreadPoolWrapper.h:76
virtual void DoThreadedWork() override
Definition QueuedThreadPoolWrapper.h:101
CORE_API FScheduledWork()
Definition QueuedThreadPoolWrapper.cpp:20
IQueuedWork * Work
Definition QueuedThreadPoolWrapper.h:187
FScheduledWork(const FScheduledWork &)=delete
CORE_API ~FScheduledWork() override
Definition QueuedThreadPoolWrapper.cpp:25
const TCHAR * GetDebugName() const final
Definition QueuedThreadPoolWrapper.h:88
FScheduledWork & operator=(const FScheduledWork &&)=delete
FScheduledWork(const FScheduledWork &&)=delete
FScheduledWork & operator=(const FScheduledWork &)=delete
Definition QueuedThreadPoolWrapper.h:44
CORE_API ~FQueuedThreadPoolWrapper()
Definition QueuedThreadPoolWrapper.cpp:40
CORE_API void AddQueuedWork(IQueuedWork *InQueuedWork, EQueuedWorkPriority InPriority=EQueuedWorkPriority::Normal) override
Definition QueuedThreadPoolWrapper.cpp:124
virtual int32 GetMaxConcurrency() const
Definition QueuedThreadPoolWrapper.h:197
virtual void OnScheduled(const IQueuedWork *)
Definition QueuedThreadPoolWrapper.h:200
virtual void OnUnscheduled(const IQueuedWork *)
Definition QueuedThreadPoolWrapper.h:203
CORE_API bool RetractQueuedWork(IQueuedWork *InQueuedWork) override
Definition QueuedThreadPoolWrapper.cpp:134
CORE_API int32 GetNumThreads() const override
Definition QueuedThreadPoolWrapper.cpp:160
virtual FScheduledWork * AllocateScheduledWork()
Definition QueuedThreadPoolWrapper.h:206
CORE_API void SetMaxConcurrency(int32 MaxConcurrency=-1)
Definition QueuedThreadPoolWrapper.cpp:100
CORE_API void Pause()
Definition QueuedThreadPoolWrapper.cpp:108
FCriticalSection Lock
Definition QueuedThreadPoolWrapper.h:193
int32 GetCurrentConcurrency() const
Definition QueuedThreadPoolWrapper.h:72
FThreadPoolPriorityQueue QueuedWork
Definition QueuedThreadPoolWrapper.h:194
Definition QueuedThreadPool.h:105
Definition ScopeLock.h:141
static CORE_API FTaskGraphInterface & Get()
Definition TaskGraph.cpp:1794
virtual int32 GetNumWorkerThreads()=0
Definition QueuedThreadPool.h:53
CORE_API void Sort(EQueuedWorkPriority InPriorityBucket, TFunctionRef< bool(const IQueuedWork *A, const IQueuedWork *B)> Predicate)
Definition ThreadingBase.cpp:1035
Definition ThreadSafeCounter.h:14
int32 Increment()
Definition ThreadSafeCounter.h:52
int32 Decrement()
Definition ThreadSafeCounter.h:75
int32 GetValue() const
Definition ThreadSafeCounter.h:120
Definition ExecutionResource.h:10
Definition IQueuedWork.h:35
Definition IQueuedWork.h:62
virtual int64 GetRequiredMemory() const
Definition IQueuedWork.h:86
virtual void DoThreadedWork()=0
virtual EQueuedWorkFlags GetQueuedWorkFlags() const
Definition IQueuedWork.h:81
virtual void Abandon()=0
virtual const TCHAR * GetDebugName() const
Definition IQueuedWork.h:94
Definition Scheduler.h:103
bool TryLaunch(FTask &Task, EQueuePreference QueuePreference=EQueuePreference::DefaultPreference, bool bWakeUpWorker=true)
Definition Scheduler.h:189
uint32 GetNumWorkers() const
Definition Scheduler.h:199
static FScheduler & Get()
Definition Scheduler.h:215
Definition Task.h:310
Definition Task.h:153
Definition Array.h:670
Definition Atomic.h:538
Definition ConcurrentLinearAllocator.h:571
Definition AssetRegistryState.h:50
Definition AndroidPlatformMisc.h:14
Definition UnrealString.h.inl:34
static FORCEINLINE FMemStack & Get()
Definition ThreadSingleton.h:101
Definition ManualResetEvent.h:15
void Notify()
Definition ManualResetEvent.h:83
void Wait()
Definition ManualResetEvent.h:33
Type
Definition TaskGraphInterfaces.h:57
@ AnyBackgroundThreadNormalTask
Definition TaskGraphInterfaces.h:106
@ AnyHiPriThreadNormalTask
Definition TaskGraphInterfaces.h:100
@ AnyNormalThreadNormalTask
Definition TaskGraphInterfaces.h:103
Definition Scheduler.cpp:25
ETaskPriority
Definition Task.h:18
ETaskFlags
Definition Task.h:93
int
Definition TestServer.py:515
static FGraphEventRef CreateAndDispatchWhenReady(TUniqueFunction< void()> InFunction, TStatId InStatId=TStatId{}, const FGraphEventArray *InPrerequisites=nullptr, ENamedThreads::Type InDesiredThread=ENamedThreads::AnyThread)
Definition TaskGraphInterfaces.h:1128
static FORCENOINLINE CORE_API void Free(void *Original)
Definition UnrealMemory.cpp:685
Definition RefCounting.h:29