UDocumentation UE5.7 10.02.2026 (Source)
API documentation for Unreal Engine 5.7
ParallelFor.h
Go to the documentation of this file.
1// Copyright Epic Games, Inc. All Rights Reserved.
2
3/*=============================================================================
4 ParllelFor.h: TaskGraph library
5=============================================================================*/
6
7#pragma once
8
9#include "AutoRTFM.h"
13#include "Containers/Array.h"
15#include "CoreGlobals.h"
16#include "CoreTypes.h"
18#include "HAL/Event.h"
20#include "HAL/PlatformMisc.h"
21#include "HAL/PlatformTime.h"
24#include "Misc/App.h"
26#include "Misc/EnumClassFlags.h"
27#include "Misc/Fork.h"
28#include "Misc/MemStack.h"
29#include "Misc/Timespan.h"
32#include "Stats/Stats.h"
33#include "Templates/Function.h"
37
38#include <atomic>
39
40namespace UE { namespace LLMPrivate { class FTagData; } }
41
44
45// Flags controlling the ParallelFor's behavior.
47{
48 // Default behavior
49 None,
50
51 //Mostly used for testing, when used, ParallelFor will run single threaded instead.
53
54 //Offers better work distribution among threads at the cost of a little bit more synchronization.
55 //This should be used for tasks with highly variable computational time.
56 Unbalanced = 2,
57
58 // if running on the rendering thread, make sure the ProcessThread is called when idle
60
61 // tasks should run on background priority threads
63};
64
66
68{
69
70 // Helper to call body with context reference
71 template <typename FunctionType, typename ContextType>
72 inline void CallBody(const FunctionType& Body, const TArrayView<ContextType>& Contexts, int32 TaskIndex, int32 Index)
73 {
74 Body(Contexts[TaskIndex], Index);
75 }
76
77 // Helper specialization for "no context", which changes the assumed body call signature
78 template <typename FunctionType>
79 inline void CallBody(const FunctionType& Body, const TArrayView<TYPE_OF_NULLPTR>&, int32, int32 Index)
80 {
81 Body(Index);
82 }
83
85 {
89 {
90 NumThreadTasks = FMath::Min(int32(LowLevelTasks::FScheduler::Get().GetNumWorkers()), (Num + (MinBatchSize/2))/MinBatchSize);
91 }
92
93 if (!LowLevelTasks::FScheduler::Get().IsWorkerThread())
94 {
95 NumThreadTasks++; //named threads help with the work
96 }
97
98 // don't go wider than number of cores
100
101 return FMath::Max(NumThreadTasks, 1);
102 }
103
115 template<typename BodyType, typename PreWorkType, typename ContextType>
117 {
118 if (Num == 0)
119 {
120 // Contract is that prework should always be called even when number of tasks is 0.
121 // We omit the trace scope here to avoid noise when the prework is empty since this amounts to just calling a function anyway with nothing specific to parallelfor itself.
123 return;
124 }
125
128 check(Num >= 0);
129
130 // If we are executing the parallel for from within a transaction, run it single-threaded.
131 if (AutoRTFM::IsClosed())
132 {
134 }
135
136 int32 NumWorkers = GetNumberOfThreadTasks(Num, MinBatchSize, Flags);
137
138 if (!Contexts.IsEmpty())
139 {
140 // Use at most as many workers as there are contexts when task contexts are used.
141 NumWorkers = FMath::Min(NumWorkers, Contexts.Num());
142 }
143
144 //single threaded mode
145 if (NumWorkers <= 1)
146 {
147 // do the prework
149 // no threads, just do it and return
150 for(int32 Index = 0; Index < Num; Index++)
151 {
152 CallBody(Body, Contexts, 0, Index);
153 }
154 return;
155 }
156
157 //calculate the batch sizes
158 int32 BatchSize = 1;
159 int32 NumBatches = Num;
161 if (!bIsUnbalanced)
162 {
163 for (int32 Div = 6; Div; Div--)
164 {
165 if (Num >= (NumWorkers * Div))
166 {
167 BatchSize = FMath::DivideAndRoundUp<int32>(Num, (NumWorkers * Div));
168 NumBatches = FMath::DivideAndRoundUp<int32>(Num, BatchSize);
169
170 if (NumBatches >= NumWorkers)
171 {
172 break;
173 }
174 }
175 }
176 }
177 NumWorkers--; //Decrement one because this function will work on it locally
178 checkSlow(BatchSize * NumBatches >= Num);
179
180 //Try to inherit the Priority from the caller
181 // Anything scheduled by the task graph is latency sensitive because it might impact the frame rate. Anything else is not (i.e. Worker / Background threads).
188
191
194 {
196 }
197 else if (bBackgroundPriority)
198 {
200 }
201
202 struct FTracedTask
203 {
205 std::atomic<TaskTrace::FId> TraceId = TaskTrace::InvalidId;
206 };
207
208 //shared data between tasks
210 : public TConcurrentLinearObject<FParallelForData, FTaskGraphBlockAllocationTag>
213 {
215
217 : DebugName(InDebugName)
218 , Num(InNum)
219 , BatchSize(InBatchSize)
220 , NumBatches(InNumBatches)
221 , Contexts(InContexts)
222 , Body(InBody)
225 {
226 IncompleteBatches.store(NumBatches, std::memory_order_relaxed);
227 Tasks.AddDefaulted(InNumWorkers);
228
229 CaptureInheritedContext();
230 }
231
233 {
234 for (FTracedTask& Task : Tasks)
235 {
236 if (Task.TraceId != TaskTrace::InvalidId)
237 {
238 TaskTrace::Destroyed(Task.TraceId);
239 }
240 }
241 }
242
244 {
245 const int32 WorkerIndex = LaunchedWorkers.fetch_add(1, std::memory_order_relaxed);
246 return WorkerIndex >= Tasks.Num() ? -1 : WorkerIndex;
247 }
248
249 const TCHAR* DebugName;
250 std::atomic_int BatchItem { 0 };
251 std::atomic_int IncompleteBatches { 0 };
252 std::atomic_int LaunchedWorkers { 0 };
253 int32 Num;
254 int32 BatchSize;
255 int32 NumBatches;
256 const TArrayView<ContextType>& Contexts;
257 const BodyType& Body;
260
262 };
264
265 // Each task has an executor.
267 {
268 mutable FDataHandle Data;
269 int32 WorkerIndex;
270 mutable bool bReschedule = false;
271
272 public:
274 : Data(MoveTemp(InData))
275 , WorkerIndex(InWorkerIndex)
276 {
277 }
278
279 FParallelExecutor(const FParallelExecutor&) = delete;
281
283 {
284 if (Data.IsValid() && bReschedule)
285 {
286 FParallelExecutor::LaunchTask(MoveTemp(Data), WorkerIndex);
287 }
288 }
289
290 inline const FDataHandle& GetData() const
291 {
292 return Data;
293 }
294
295 inline bool operator()(const bool bIsMaster = false) const noexcept
296 {
297 UE::FInheritedContextScope InheritedContextScope = Data->RestoreInheritedContext();
298 FMemMark Mark(FMemStack::Get());
299
301 if (!bIsMaster)
302 {
303 TraceId = Data->Tasks[WorkerIndex].TraceId;
305 }
307 {
308 if (!bIsMaster)
309 {
311 }
312 };
313
314 const int32 NumBatches = Data->NumBatches;
315
316 // We're going to consume one ourself, so we need at least 2 left to consider launching a new worker
317 // We also do not launch a worker from the master as we already launched one before doing prework.
318 if (bIsMaster == false && Data->BatchItem.load(std::memory_order_relaxed) + 2 <= NumBatches)
319 {
321 }
322
323 TRACE_CPUPROFILER_EVENT_SCOPE_TEXT(Data->DebugName);
324
325 auto Now = [] { return FTimespan::FromSeconds(FPlatformTime::Seconds()); };
329
331 {
332 Start = Now();
334 }
335
336 const int32 Num = Data->Num;
337 const int32 BatchSize = Data->BatchSize;
338 const TArrayView<ContextType>& Contexts = Data->Contexts;
339 const BodyType& Body = Data->Body;
340
341 const bool bSaveLastBlockForMaster = (Num > NumBatches);
342 for(;;)
343 {
344 int32 BatchIndex = Data->BatchItem.fetch_add(1, std::memory_order_relaxed);
345
346 // Save the last block for the master to avoid an event
347 if (bSaveLastBlockForMaster && BatchIndex >= NumBatches - 1)
348 {
349 if (!bIsMaster)
350 {
351 return false;
352 }
353 BatchIndex = (NumBatches - 1);
354 }
355
356 int32 StartIndex = BatchIndex * BatchSize;
357 int32 EndIndex = FMath::Min<int32>(StartIndex + BatchSize, Num);
358 for (int32 Index = StartIndex; Index < EndIndex; Index++)
359 {
360 CallBody(Body, Contexts, WorkerIndex, Index);
361 }
362
363 // We need to decrement IncompleteBatches when processing a Batch because we need to know if we are the last one
364 // so that if the main thread is the last one we can avoid an FEvent call.
365
366 // Memory ordering is also very important here as it is what's making sure memory manipulated
367 // by the parallelfor is properly published before exiting so that it's safe to be read
368 // without other synchronization mechanism.
369 if (StartIndex < Num && Data->IncompleteBatches.fetch_sub(1, std::memory_order_acq_rel) == 1)
370 {
371 if (!bIsMaster)
372 {
373 Data->FinishedSignal->Trigger();
374 }
375
376 return true;
377 }
378 else if (EndIndex >= Num)
379 {
380 return false;
381 }
382 else if (!bIsBackgroundPriority)
383 {
384 continue;
385 }
386
387 auto PassedTime = [Start, &Now]() { return Now() - Start; };
389 {
390 // Abort and reschedule to give higher priority tasks a chance to run
391 bReschedule = true;
392 return false;
393 }
394 }
395 }
396
397 static void LaunchTask(FDataHandle&& InData, int32 InWorkerIndex, bool bWakeUpWorker = true)
398 {
400
401 if (TracedTask.TraceId != TaskTrace::InvalidId) // reused task
402 {
404 }
405
406 const TCHAR* DebugName = InData->DebugName;
408
410 TaskTrace::Launched(TracedTask.TraceId, DebugName, false, ENamedThreads::AnyThread, 0);
411
414 }
415
417 {
418 const int32 WorkerIndex = InData->GetNextWorkerIndexToLaunch();
419 if (WorkerIndex != -1)
420 {
421 LaunchTask(FDataHandle(InData), WorkerIndex);
422 }
423 }
424 };
425
426 //launch all the worker tasks
428 FDataHandle Data = new FParallelForData(DebugName, Num, BatchSize, NumBatches, NumWorkers, Contexts, Body, FinishedSignal, Priority);
429
430 // Launch the first worker before we start doing prework
431 FParallelExecutor::LaunchAnotherWorkerIfNeeded(Data);
432
433 // do the prework
435
436 // help with the parallel-for to prevent deadlocks
437 FParallelExecutor LocalExecutor(MoveTemp(Data), NumWorkers);
438 const bool bFinishedLast = LocalExecutor(true);
439
440 if (!bFinishedLast)
441 {
444 {
445 // FinishedSignal waits here if some other thread finishes the last item
446 // Data must live on until all of the tasks are cleared which might be long after this function exits
447 while (!FinishedSignal->Wait(1))
448 {
450 }
451 }
452 else
453 {
454 // FinishedSignal waits here if some other thread finishes the last item
455 // Data must live on until all of the tasks are cleared which might be long after this function exits
457
459 {
461 FinishedSignal->Wait();
462 }
463 else
464 {
465 // This can spawn new threads to handle tasks
466 FinishedSignal->Wait();
467 }
468 }
469 }
470 checkSlow(LocalExecutor.GetData()->BatchItem.load(std::memory_order_relaxed) * LocalExecutor.GetData()->BatchSize >= LocalExecutor.GetData()->Num);
471 }
472}
473
487
495template<typename FunctionType>
497{
498 ParallelForImpl::ParallelForInternal(TEXT("ParallelFor Task"), Num, 1, Body, [](){}, Flags, TArrayView<TYPE_OF_NULLPTR>());
499}
500
510template<typename FunctionType>
511inline void ParallelForTemplate(const TCHAR* DebugName, int32 Num, int32 MinBatchSize, const FunctionType& Body, EParallelForFlags Flags = EParallelForFlags::None)
512{
513 ParallelForImpl::ParallelForInternal(DebugName, Num, MinBatchSize, Body, [](){}, Flags, TArrayView<TYPE_OF_NULLPTR>());
514}
515
530
543inline void ParallelFor(const TCHAR* DebugName, int32 Num, int32 MinBatchSize, TFunctionRef<void(int32)> Body, EParallelForFlags Flags = EParallelForFlags::None)
544{
545 ParallelForImpl::ParallelForInternal(DebugName, Num, MinBatchSize, Body, [](){}, Flags, TArrayView<TYPE_OF_NULLPTR>());
546}
547
562
575
590
603template <typename ContextType, typename ContextAllocatorType, typename ContextConstructorType, typename BodyType, typename PreWorkType>
625
637template <typename ContextType, typename ContextAllocatorType, typename BodyType, typename PreWorkType>
655
667template <typename ContextType, typename BodyType, typename PreWorkType>
679
693template <typename ContextType, typename ContextAllocatorType, typename ContextConstructorType, typename FunctionType>
695{
696 if (Num > 0)
697 {
699 OutContexts.Reset();
700 OutContexts.AddUninitialized(NumContexts);
701 for (int32 ContextIndex = 0; ContextIndex < NumContexts; ++ContextIndex)
702 {
703 new(&OutContexts[ContextIndex]) ContextType(ContextConstructor(ContextIndex, NumContexts));
704 }
706 }
707}
708
721template <typename ContextType, typename ContextAllocatorType, typename ContextConstructorType, typename FunctionType>
726
737template <typename ContextType, typename ContextAllocatorType, typename FunctionType>
748
763template <typename ContextType, typename ContextAllocatorType, typename ContextConstructorType, typename FunctionType>
765{
766 if (Num > 0)
767 {
769 OutContexts.Reset();
770 OutContexts.AddUninitialized(NumContexts);
771 for (int32 ContextIndex = 0; ContextIndex < NumContexts; ++ContextIndex)
772 {
773 new(&OutContexts[ContextIndex]) ContextType(ContextConstructor(ContextIndex, NumContexts));
774 }
775 ParallelForImpl::ParallelForInternal(DebugName, Num, MinBatchSize, Body, [](){}, Flags, TArrayView<ContextType>(OutContexts));
776 }
777}
778
791template <typename ContextType, typename ContextAllocatorType, typename FunctionType>
793{
794 if (Num > 0)
795 {
797 OutContexts.Reset();
798 OutContexts.AddDefaulted(NumContexts);
799 ParallelForImpl::ParallelForInternal(DebugName, Num, MinBatchSize, Body, [](){}, Flags, TArrayView<ContextType>(OutContexts));
800 }
801}
802
814template <typename ContextType, typename FunctionType>
816{
817 ParallelForImpl::ParallelForInternal(TEXT("ParallelFor Task"), Num, MinBatchSize, Body, [](){}, Flags, Contexts);
818}
819
832template <typename ContextType, typename FunctionType>
833inline void ParallelForWithExistingTaskContext(const TCHAR* DebugName, TArrayView<ContextType> Contexts, int32 Num, int32 MinBatchSize, const FunctionType& Body, EParallelForFlags Flags = EParallelForFlags::None)
834{
835 ParallelForImpl::ParallelForInternal(DebugName, Num, MinBatchSize, Body, [](){}, Flags, Contexts);
836}
#define checkSlow(expr)
Definition AssertionMacros.h:332
#define check(expr)
Definition AssertionMacros.h:314
#define verify(expr)
Definition AssertionMacros.h:319
ETaskTag
Definition CoreGlobals.h:642
@ ERenderingThread
CORE_API int32 GParallelForBackgroundYieldingTimeoutMs
Definition ParallelFor.cpp:8
void ParallelForWithPreWorkWithTaskContext(const TCHAR *DebugName, TArray< ContextType, ContextAllocatorType > &OutContexts, int32 Num, int32 MinBatchSize, ContextConstructorType &&ContextConstructor, BodyType &&Body, PreWorkType &&CurrentThreadWorkToDoBeforeHelping, EParallelForFlags Flags=EParallelForFlags::None)
Definition ParallelFor.h:604
EParallelForFlags
Definition ParallelFor.h:47
void ParallelForWithExistingTaskContext(TArrayView< ContextType > Contexts, int32 Num, int32 MinBatchSize, const FunctionType &Body, EParallelForFlags Flags=EParallelForFlags::None)
Definition ParallelFor.h:815
void ParallelFor(int32 Num, TFunctionRef< void(int32)> Body, bool bForceSingleThread, bool bPumpRenderingThread=false)
Definition ParallelFor.h:481
void ParallelForWithPreWork(int32 Num, TFunctionRef< void(int32)> Body, TFunctionRef< void()> CurrentThreadWorkToDoBeforeHelping, bool bForceSingleThread, bool bPumpRenderingThread=false)
Definition ParallelFor.h:556
CORE_API bool GParallelForDisableOversubscription
Definition ParallelFor.cpp:15
void ParallelForWithPreWorkWithExistingTaskContext(const TCHAR *DebugName, TArrayView< ContextType > Contexts, int32 Num, int32 MinBatchSize, BodyType &&Body, PreWorkType &&CurrentThreadWorkToDoBeforeHelping, EParallelForFlags Flags=EParallelForFlags::None)
Definition ParallelFor.h:668
void ParallelForTemplate(int32 Num, const FunctionType &Body, EParallelForFlags Flags=EParallelForFlags::None)
Definition ParallelFor.h:496
void ParallelForWithTaskContext(const TCHAR *DebugName, TArray< ContextType, ContextAllocatorType > &OutContexts, int32 Num, const ContextConstructorType &ContextConstructor, const FunctionType &Body, EParallelForFlags Flags=EParallelForFlags::None)
Definition ParallelFor.h:694
#define TEXT(x)
Definition Platform.h:1272
FPlatformTypes::TCHAR TCHAR
Either ANSICHAR or WIDECHAR, depending on whether the platform supports wide characters or the requir...
Definition Platform.h:1135
FPlatformTypes::int32 int32
A 32-bit signed integer.
Definition Platform.h:1125
#define SCOPE_CYCLE_COUNTER(Stat)
Definition Stats.h:650
UE_FORCEINLINE_HINT TSharedRef< CastToType, Mode > StaticCastSharedRef(TSharedRef< CastFromType, Mode > const &InSharedRef)
Definition SharedPointer.h:127
#define TRACE_CPUPROFILER_EVENT_SCOPE(Name)
Definition CpuProfilerTrace.h:528
#define TRACE_CPUPROFILER_EVENT_SCOPE_TEXT(Name)
Definition CpuProfilerTrace.h:532
#define ENUM_CLASS_FLAGS(Enum)
Definition EnumClassFlags.h:6
@ Num
Definition MetalRHIPrivate.h:234
CORE_API int32 GParallelForBackgroundYieldingTimeoutMs
Definition ParallelFor.cpp:8
CORE_API bool GParallelForDisableOversubscription
Definition ParallelFor.cpp:15
#define ON_SCOPE_EXIT
Definition ScopeExit.h:73
auto GetData(const TStringConversion< Converter, DefaultConversionSize > &Conversion) -> decltype(Conversion.Get())
Definition StringConv.h:802
CORE_API bool IsInActualRenderingThread()
Definition ThreadingBase.cpp:258
UE_INTRINSIC_CAST UE_REWRITE constexpr std::remove_reference_t< T > && MoveTemp(T &&Obj) noexcept
Definition UnrealTemplate.h:520
static CORE_API bool ShouldUseThreadingForPerformance()
Definition App.cpp:300
Definition Event.h:135
static CORE_API bool IsForkedMultithreadInstance()
Definition Fork.cpp:103
Definition MemStack.h:506
static CORE_API FTaskGraphInterface & Get()
Definition TaskGraph.cpp:1794
virtual uint64 ProcessThreadUntilIdle(ENamedThreads::Type CurrentThread)=0
static CORE_API ETaskTag GetCurrentTag()
Definition ThreadingBase.cpp:175
Definition RefCounting.h:283
static FScheduler & Get()
Definition Scheduler.h:215
Definition Task.h:310
Definition ArrayView.h:139
UE_FORCEINLINE_HINT constexpr SizeType Num() const
Definition ArrayView.h:380
constexpr bool IsEmpty() const
Definition ArrayView.h:370
Definition Array.h:670
Definition ConcurrentLinearAllocator.h:571
Definition AssetRegistryState.h:50
Definition RefCounting.h:454
static FORCEINLINE FMemStack & Get()
Definition ThreadSingleton.h:101
Definition InheritedContext.h:118
CORE_API FInheritedContextScope RestoreInheritedContext()
Definition InheritedContext.cpp:8
Definition InheritedContext.h:54
@ AnyThread
Definition TaskGraphInterfaces.h:67
UE_FORCEINLINE_HINT Type GetRenderThread_Local()
Definition TaskGraphInterfaces.h:128
ETaskPriority
Definition Task.h:18
UE_FORCEINLINE_HINT bool TryLaunch(FTask &Task, EQueuePreference QueuePreference=EQueuePreference::DefaultPreference, bool bWakeUpWorker=true)
Definition Scheduler.h:181
Definition ParallelFor.h:68
int32 GetNumberOfThreadTasks(int32 Num, int32 MinBatchSize, EParallelForFlags Flags)
Definition ParallelFor.h:84
void CallBody(const FunctionType &Body, const TArrayView< ContextType > &Contexts, int32 TaskIndex, int32 Index)
Definition ParallelFor.h:72
void ParallelForInternal(const TCHAR *DebugName, int32 Num, int32 MinBatchSize, BodyType Body, PreWorkType CurrentThreadWorkToDoBeforeHelping, EParallelForFlags Flags, const TArrayView< ContextType > &Contexts)
Definition ParallelFor.h:116
void TASK_CORE_API Started(FId TaskId)
Definition TaskTrace.h:80
void TASK_CORE_API Launched(FId TaskId, const TCHAR *DebugName, bool bTracked, ENamedThreads::Type ThreadToExecuteOn, uint64 TaskSize)
Definition TaskTrace.h:77
const FId InvalidId
Definition TaskTrace.h:39
void TASK_CORE_API Destroyed(FId TaskId)
Definition TaskTrace.h:83
FId TASK_CORE_API GenerateTaskId()
Definition TaskTrace.h:74
uint64 FId
Definition TaskTrace.h:37
void TASK_CORE_API Completed(FId TaskId)
Definition TaskTrace.h:82
Definition AdvancedWidgetsModule.cpp:13
U16 Index
Definition radfft.cpp:71
static CORE_API int32 NumberOfCoresIncludingHyperthreads()
Definition AndroidPlatformMisc.cpp:977
static double Seconds()
Definition AndroidPlatformTime.h:20
Definition Timespan.h:76
static FTimespan MinValue()
Definition Timespan.h:699
static FTimespan FromSeconds(double Seconds)
Definition Timespan.h:673
static FTimespan FromMilliseconds(double Milliseconds)
Definition Timespan.h:649