UDocumentation UE5.7 10.02.2026 (Source)
API documentation for Unreal Engine 5.7
LocalWorkQueue.h
Go to the documentation of this file.
1// Copyright Epic Games, Inc. All Rights Reserved.
2
3#pragma once
6#include "Async/EventCount.h"
11#include <atomic>
12
13template<typename LAMBDA>
15{
16 LAMBDA Lambda;
17
18public:
20 {}
21
22 constexpr TYCombinator(const LAMBDA& InLambda): Lambda(InLambda)
23 {}
24
25 template<typename... ARGS>
26 constexpr auto operator()(ARGS&&... Args) const -> decltype(Lambda(static_cast<const TYCombinator<LAMBDA>&>(*this), Forward<ARGS>(Args)...))
27 {
28 return Lambda(static_cast<const TYCombinator<LAMBDA>&>(*this), Forward<ARGS>(Args)...);
29 }
30
31 template<typename... ARGS>
32 constexpr auto operator()(ARGS&&... Args) -> decltype(Lambda(static_cast<TYCombinator<LAMBDA>&>(*this), Forward<ARGS>(Args)...))
33 {
34 return Lambda(static_cast<TYCombinator<LAMBDA>&>(*this), Forward<ARGS>(Args)...);
35 }
36};
37
38template<typename LAMBDA>
39constexpr auto MakeYCombinator(LAMBDA&& Lambda)
40{
42}
43
44template<typename TaskType>
46{
48
49 struct FInternalData : public TConcurrentLinearObject<FInternalData, FTaskGraphBlockAllocationTag>, public FThreadSafeRefCountedObject
50 {
52 std::atomic_int ActiveWorkers {0};
53 std::atomic_bool CheckDone {false};
54 UE::FEventCount FinishedEvent;
55
56 ~FInternalData()
57 {
58 check(ActiveWorkers == 0);
59 check(TaskQueue.dequeue() == nullptr);
60 }
61 };
62
63 TRefCountPtr<FInternalData> InternalData;
65 TFunctionRef<void(TaskType*)>* DoWork = nullptr;
66
67public:
95
96public:
97 inline void AddTask(TaskType* NewWork)
98 {
99 check(!InternalData->CheckDone.load(std::memory_order_relaxed));
100 InternalData->TaskQueue.enqueue(NewWork);
101 }
102
103 inline void AddWorkers(uint16 NumWorkers)
104 {
105 check(!InternalData->CheckDone.load(std::memory_order_relaxed));
106 check(DoWork != nullptr);
107
108 for (uint16 Index = 0; Index < NumWorkers; Index++)
109 {
112
113 TFunctionRef<void(TaskType*)>* LocalDoWork = DoWork;
114 TaskHandle->Init(TEXT("TLocalWorkQueue::AddWorkers"), Priority, [LocalDoWork, InternalData = InternalData, TaskHandle]()
115 {
117 InternalData->ActiveWorkers.fetch_add(1, std::memory_order_acquire);
118 while (TaskType* Work = InternalData->TaskQueue.dequeue())
119 {
120 check(!InternalData->CheckDone.load(std::memory_order_relaxed));
121 (*LocalDoWork)(Work);
122 }
123 if (InternalData->ActiveWorkers.fetch_sub(1, std::memory_order_release) == 1)
124 {
125 InternalData->FinishedEvent.Notify();
126 }
127 });
129 }
130 }
131
132 inline void Run(TFunctionRef<void(TaskType*)> InDoWork)
133 {
134 DoWork = &InDoWork;
135
137
138 while (true)
139 {
140 bool bNoActiveWorkers = InternalData->ActiveWorkers.load(std::memory_order_acquire) == 0;
141 if (TaskType* Work = InternalData->TaskQueue.dequeue())
142 {
143 InDoWork(Work);
144 }
145 else if (bNoActiveWorkers && InternalData->ActiveWorkers.load(std::memory_order_acquire) == 0)
146 {
147 break;
148 }
149 else
150 {
151 auto Token = InternalData->FinishedEvent.PrepareWait();
152 if (InternalData->ActiveWorkers.load(std::memory_order_acquire) == 0)
153 {
154 continue;
155 }
156
157 TRACE_CPUPROFILER_EVENT_SCOPE(TLocalWorkQueue::WaitingForWorkers);
158 InternalData->FinishedEvent.Wait(Token);
159 }
160 }
161
162 InternalData->CheckDone.store(true);
163 check(InternalData->TaskQueue.dequeue() == nullptr);
164 }
165};
OODEFFUNC typedef void(OODLE_CALLBACK t_fp_OodleCore_Plugin_Free)(void *ptr)
#define check(expr)
Definition AssertionMacros.h:314
#define verify(expr)
Definition AssertionMacros.h:319
#define UE_NONCOPYABLE(TypeName)
Definition CoreMiscDefines.h:457
#define TEXT(x)
Definition Platform.h:1272
UE_FORCEINLINE_HINT TSharedRef< CastToType, Mode > StaticCastSharedRef(TSharedRef< CastFromType, Mode > const &InSharedRef)
Definition SharedPointer.h:127
#define TRACE_CPUPROFILER_EVENT_SCOPE(Name)
Definition CpuProfilerTrace.h:528
constexpr auto MakeYCombinator(LAMBDA &&Lambda)
Definition LocalWorkQueue.h:39
UE_INTRINSIC_CAST UE_REWRITE constexpr std::remove_reference_t< T > && MoveTemp(T &&Obj) noexcept
Definition UnrealTemplate.h:520
uint16_t uint16
Definition binka_ue_file_header.h:7
Definition FAAArrayQueue.h:81
T * dequeue(DequeueHazard &Hazard)
Definition FAAArrayQueue.h:291
Definition RefCounting.h:283
Definition Task.h:310
static CORE_API const FTask * GetActiveTask()
Definition Scheduler.cpp:656
Definition ConcurrentLinearAllocator.h:571
Definition AssetRegistryState.h:50
Definition LocalWorkQueue.h:46
TLocalWorkQueue(TaskType *InitialWork, LowLevelTasks::ETaskPriority InPriority=LowLevelTasks::ETaskPriority::Count)
Definition LocalWorkQueue.h:68
void AddTask(TaskType *NewWork)
Definition LocalWorkQueue.h:97
void AddWorkers(uint16 NumWorkers)
Definition LocalWorkQueue.h:103
void Run(TFunctionRef< void(TaskType *)> InDoWork)
Definition LocalWorkQueue.h:132
Definition RefCounting.h:454
Definition SharedPointer.h:692
Definition LocalWorkQueue.h:15
constexpr auto operator()(ARGS &&... Args) -> decltype(Lambda(static_cast< TYCombinator< LAMBDA > & >(*this), Forward< ARGS >(Args)...))
Definition LocalWorkQueue.h:32
constexpr TYCombinator(const LAMBDA &InLambda)
Definition LocalWorkQueue.h:22
constexpr TYCombinator(LAMBDA &&InLambda)
Definition LocalWorkQueue.h:19
constexpr auto operator()(ARGS &&... Args) const -> decltype(Lambda(static_cast< const TYCombinator< LAMBDA > & >(*this), Forward< ARGS >(Args)...))
Definition LocalWorkQueue.h:26
ETaskPriority
Definition Task.h:18
U16 Index
Definition radfft.cpp:71