UDocumentation UE5.7 10.02.2026 (Source)
API documentation for Unreal Engine 5.7
LocalQueue.h
Go to the documentation of this file.
1// Copyright Epic Games, Inc. All Rights Reserved.
2
3#pragma once
4#include "CoreTypes.h"
5#include "Math/RandomStream.h"
9
10#include <atomic>
11
12#if AGGRESSIVE_MEMORY_SAVING
13 #define LOCALQUEUEREGISTRYDEFAULTS_MAX_LOCALQUEUES 1024
14 #define LOCALQUEUEREGISTRYDEFAULTS_MAX_ITEMCOUNT 512
15#else
16 #define LOCALQUEUEREGISTRYDEFAULTS_MAX_LOCALQUEUES 1024
17 #define LOCALQUEUEREGISTRYDEFAULTS_MAX_ITEMCOUNT 1024
18#endif
19
20namespace LowLevelTasks
21{
22namespace LocalQueue_Impl
23{
24template<uint32 NumItems>
26{
27 enum class ESlotState : uintptr_t
28 {
29 Free = 0, //The slot is free and items can be put there
30 Taken = 1, //The slot is in the proccess of beeing stolen
31 };
32
33protected:
34 //insert an item at the head position (this can only safe on a single thread, shared with Get)
35 inline bool Put(uintptr_t Item)
36 {
37 checkSlow(Item != uintptr_t(ESlotState::Free));
38 checkSlow(Item != uintptr_t(ESlotState::Taken));
39
40 uint32 Idx = (Head + 1) % NumItems;
41 uintptr_t Slot = ItemSlots[Idx].Value.load(std::memory_order_acquire);
42
43 if (Slot == uintptr_t(ESlotState::Free))
44 {
45 ItemSlots[Idx].Value.store(Item, std::memory_order_release);
46 Head++;
47 checkSlow(Head % NumItems == Idx);
48 return true;
49 }
50 return false;
51 }
52
53 //remove an item at the head position in FIFO order (this can only safe on a single thread, shared with Put)
54 inline bool Get(uintptr_t& Item)
55 {
56 uint32 Idx = Head % NumItems;
57 uintptr_t Slot = ItemSlots[Idx].Value.load(std::memory_order_acquire);
58
59 if (Slot > uintptr_t(ESlotState::Taken) && ItemSlots[Idx].Value.compare_exchange_strong(Slot, uintptr_t(ESlotState::Free), std::memory_order_acq_rel))
60 {
61 Head--;
62 checkSlow((Head + 1) % NumItems == Idx);
63 Item = Slot;
64 return true;
65 }
66 return false;
67 }
68
69 //remove an item at the tail position in LIFO order (this can be done from any thread including the one that accesses the head)
70 inline bool Steal(uintptr_t& Item)
71 {
72 do
73 {
74 uint32 IdxVer = Tail.load(std::memory_order_acquire);
75 uint32 Idx = IdxVer % NumItems;
76 uintptr_t Slot = ItemSlots[Idx].Value.load(std::memory_order_acquire);
77
78 if (Slot == uintptr_t(ESlotState::Free))
79 {
80 // Once we find a free slot, we need to verify if it's been freed by another steal
81 // so check back the Tail value to make sure it wasn't incremented since we first read the value.
82 // If we don't do this, some threads might not see that other threads
83 // have already stolen the slot, and will wrongly return that no more tasks are available to steal.
84 if (IdxVer != Tail.load(std::memory_order_acquire))
85 {
86 continue; // Loop again since tail has changed
87 }
88 return false;
89 }
90 else if (Slot != uintptr_t(ESlotState::Taken) && ItemSlots[Idx].Value.compare_exchange_weak(Slot, uintptr_t(ESlotState::Taken), std::memory_order_acq_rel))
91 {
92 if(IdxVer == Tail.load(std::memory_order_acquire))
93 {
94 uint32 Prev = Tail.fetch_add(1, std::memory_order_release); (void)Prev;
95 checkSlow(Prev % NumItems == Idx);
96 ItemSlots[Idx].Value.store(uintptr_t(ESlotState::Free), std::memory_order_release);
97 Item = Slot;
98 return true;
99 }
100 ItemSlots[Idx].Value.store(Slot, std::memory_order_release);
101 }
102 } while(true);
103 }
104
105private:
106 struct FAlignedElement
107 {
108 alignas(PLATFORM_CACHE_LINE_SIZE * 2) std::atomic<uintptr_t> Value = {};
109 };
110
111 alignas(PLATFORM_CACHE_LINE_SIZE * 2) uint32 Head { ~0u };
112 alignas(PLATFORM_CACHE_LINE_SIZE * 2) std::atomic_uint Tail { 0 };
113 alignas(PLATFORM_CACHE_LINE_SIZE * 2) FAlignedElement ItemSlots[NumItems] = {};
114};
115
116template<typename Type, uint32 NumItems>
117class TWorkStealingQueue2 final : protected TWorkStealingQueueBase2<NumItems>
118{
119 using PointerType = Type*;
120
121public:
122 inline bool Put(PointerType Item)
123 {
124 return TWorkStealingQueueBase2<NumItems>::Put(reinterpret_cast<uintptr_t>(Item));
125 }
126
127 inline bool Get(PointerType& Item)
128 {
129 return TWorkStealingQueueBase2<NumItems>::Get(reinterpret_cast<uintptr_t&>(Item));
130 }
131
132 inline bool Steal(PointerType& Item)
133 {
134 return TWorkStealingQueueBase2<NumItems>::Steal(reinterpret_cast<uintptr_t&>(Item));
135 }
136};
137}
138
139namespace Private {
140
142{
145};
146
147/********************************************************************************************************************************************
148 * A LocalQueueRegistry is a collection of LockFree queues that store pointers to Items, there are ThreadLocal LocalQueues with LocalItems. *
149 * LocalQueues can only be Enqueued and Dequeued by the current Thread they were installed on. But Items can be stolen from any Thread *
150 * There is a global OverflowQueue than is used when a LocalQueue goes out of scope to dump all the remaining Items in *
151 * or when a Thread has no LocalQueue installed or when the LocalQueue is at capacity. A new LocalQueue is registers itself always. *
152 * A Dequeue Operation can only be done starting from a LocalQueue, than the GlobalQueue will be checked. *
153 * Finally Items might get Stolen from other LocalQueues that are registered with the LocalQueueRegistry. *
154 ********************************************************************************************************************************************/
155template<uint32 NumLocalItems = LOCALQUEUEREGISTRYDEFAULTS_MAX_ITEMCOUNT, uint32 MaxLocalQueues = LOCALQUEUEREGISTRYDEFAULTS_MAX_LOCALQUEUES>
157{
158 static uint32 Rand()
159 {
161 State = State * 747796405u + 2891336453u;
162 State = ((State >> ((State >> 28u) + 4u)) ^ State) * 277803737u;
163 return (State >> 22u) ^ State;
164 }
165
166public:
167 class TLocalQueue;
168
169private:
172 using DequeueHazard = typename FOverflowQueueType::DequeueHazard;
173
174public:
176 {
177 template<uint32, uint32>
179
180 public:
181 TLocalQueue() = default;
182
187
189 {
190 if (bIsInitialized.exchange(true, std::memory_order_relaxed))
191 {
192 checkf(false, TEXT("Trying to initialize local queue more than once"));
193 }
194 else
195 {
197 QueueType = InQueueType;
198
199 // Local queues are never unregistered, everything is shutdown at once.
200 Registry->AddLocalQueue(this);
201 for (int32 PriorityIndex = 0; PriorityIndex < int32(ETaskPriority::Count); ++PriorityIndex)
202 {
203 DequeueHazards[PriorityIndex] = Registry->OverflowQueues[PriorityIndex].getHeadHazard();
204 }
205 }
206 }
207
209 {
210 if (bIsInitialized.exchange(false, std::memory_order_relaxed))
211 {
212 for (int32 PriorityIndex = 0; PriorityIndex < int32(ETaskPriority::Count); PriorityIndex++)
213 {
214 while (true)
215 {
216 FTask* Item;
217 if (!LocalQueues[PriorityIndex].Get(Item))
218 {
219 break;
220 }
221 Registry->OverflowQueues[PriorityIndex].enqueue(Item);
222 }
223 }
224 }
225 }
226
227 // add an item to the local queue and overflow into the global queue if full
228 // returns true if we should wake a worker
229 inline void Enqueue(FTask* Item, uint32 PriorityIndex)
230 {
232 checkSlow(PriorityIndex < int32(ETaskPriority::Count));
233 checkSlow(Item != nullptr);
234
235 if (!LocalQueues[PriorityIndex].Put(Item))
236 {
237 Registry->OverflowQueues[PriorityIndex].enqueue(Item);
238 }
239 }
240
242 {
244
245 for (int32 PriorityIndex = 0; PriorityIndex < MaxPriority; ++PriorityIndex)
246 {
247 FTask* Item;
248 if (LocalQueues[PriorityIndex].Steal(Item))
249 {
250 return Item;
251 }
252 }
253 return nullptr;
254 }
255
256 // Check both the local and global queue in priority order
258 {
260
261 for (int32 PriorityIndex = 0; PriorityIndex < MaxPriority; ++PriorityIndex)
262 {
263 FTask* Item;
264 if (LocalQueues[PriorityIndex].Get(Item))
265 {
266 return Item;
267 }
268
269 Item = Registry->OverflowQueues[PriorityIndex].dequeue(DequeueHazards[PriorityIndex]);
270 if (Item)
271 {
272 return Item;
273 }
274 }
275 return nullptr;
276 }
277
279 {
280 if (CachedRandomIndex == InvalidIndex)
281 {
282 CachedRandomIndex = Rand();
283 }
284
285 FTask* Result = Registry->StealItem(CachedRandomIndex, CachedPriorityIndex, GetBackGroundTasks);
286 if (Result)
287 {
288 return Result;
289 }
290 return nullptr;
291 }
292
293 private:
294 static constexpr uint32 InvalidIndex = ~0u;
296 DequeueHazard DequeueHazards[uint32(ETaskPriority::Count)];
297 TLocalQueueRegistry* Registry = nullptr;
298 uint32 CachedRandomIndex = InvalidIndex;
299 uint32 CachedPriorityIndex = 0;
300 ELocalQueueType QueueType;
301 std::atomic<bool> bIsInitialized = false;
302 };
303
305 {
306 }
307
308private:
309 // Add a queue to the Registry. Thread-safe.
310 void AddLocalQueue(TLocalQueue* QueueToAdd)
311 {
312 uint32 Index = NumLocalQueues.fetch_add(1, std::memory_order_relaxed);
313 UE_CLOG(Index >= MaxLocalQueues, LowLevelTasks, Fatal, TEXT("Attempting to add more than the maximum allowed number of queues (%d)"), MaxLocalQueues);
314
315 // std::memory_order_release to make sure values are all written to the TLocalQueue before publishing.
316 LocalQueues[Index].store(QueueToAdd, std::memory_order_release);
317 }
318
319 // StealItem tries to steal an Item from a Registered LocalQueue
320 // Thread-safe with AddLocalQueue
321 FTask* StealItem(uint32& CachedRandomIndex, uint32& CachedPriorityIndex, bool GetBackGroundTasks)
322 {
323 uint32 NumQueues = NumLocalQueues.load(std::memory_order_relaxed);
325 CachedRandomIndex = CachedRandomIndex % NumQueues;
326
327 for (uint32 Index = 0; Index < NumLocalQueues; Index++)
328 {
329 // Test for null in case we race on reading NumLocalQueues reserved index before the pointer is set
330 if (TLocalQueue* LocalQueue = LocalQueues[Index].load(std::memory_order_acquire))
331 {
332 for(uint32 PriorityIndex = 0; PriorityIndex < MaxPriority; PriorityIndex++)
333 {
334 FTask* Item;
335 if (LocalQueue->LocalQueues[PriorityIndex].Steal(Item))
336 {
337 return Item;
338 }
339 CachedPriorityIndex = ++CachedPriorityIndex < MaxPriority ? CachedPriorityIndex : 0;
340 }
341 CachedRandomIndex = ++CachedRandomIndex < NumQueues ? CachedRandomIndex : 0;
342 }
343 }
344 CachedPriorityIndex = 0;
345 CachedRandomIndex = TLocalQueue::InvalidIndex;
346 return nullptr;
347 }
348
349public:
350 // enqueue an Item directy into the Global OverflowQueue
351 void Enqueue(FTask* Item, uint32 PriorityIndex)
352 {
353 check(PriorityIndex < int32(ETaskPriority::Count));
354 check(Item != nullptr);
355
356 OverflowQueues[PriorityIndex].enqueue(Item);
357 }
358
359 // grab an Item directy from the Global OverflowQueue
361 {
363
364 for (int32 PriorityIndex = 0; PriorityIndex < MaxPriority; ++PriorityIndex)
365 {
366 if (FTask* Item = OverflowQueues[PriorityIndex].dequeue())
367 {
368 return Item;
369 }
370 }
371 return nullptr;
372 }
373
375 {
376 uint32 CachedRandomIndex = Rand();
377 uint32 CachedPriorityIndex = 0;
378 FTask* Result = StealItem(CachedRandomIndex, CachedPriorityIndex, GetBackGroundTasks);
379 if (Result)
380 {
381 return Result;
382 }
383 return nullptr;
384 }
385
386 // Not thread-safe.
387 void Reset()
388 {
389 uint32 NumQueues = NumLocalQueues.load(std::memory_order_relaxed);
390 for (uint32 Index = 0; Index < NumQueues; Index++)
391 {
392 LocalQueues[Index].store(0, std::memory_order_relaxed);
393 }
394
395 NumLocalQueues.store(0, std::memory_order_release);
396 }
397
398private:
399 FOverflowQueueType OverflowQueues[uint32(ETaskPriority::Count)];
400 std::atomic<TLocalQueue*> LocalQueues[MaxLocalQueues] { nullptr };
401 std::atomic<uint32> NumLocalQueues {0};
402};
403
404} // namespace Private
405
406}
OODEFFUNC typedef void(OODLE_CALLBACK t_fp_OodleCore_Plugin_Free)(void *ptr)
#define checkSlow(expr)
Definition AssertionMacros.h:332
#define check(expr)
Definition AssertionMacros.h:314
#define checkf(expr, format,...)
Definition AssertionMacros.h:315
#define TEXT(x)
Definition Platform.h:1272
FPlatformTypes::int32 int32
A 32-bit signed integer.
Definition Platform.h:1125
#define PLATFORM_CACHE_LINE_SIZE
Definition Platform.h:938
UE_FORCEINLINE_HINT TSharedRef< CastToType, Mode > StaticCastSharedRef(TSharedRef< CastFromType, Mode > const &InSharedRef)
Definition SharedPointer.h:127
void Init()
Definition LockFreeList.h:4
#define UE_CLOG(Condition, CategoryName, Verbosity, Format,...)
Definition LogMacros.h:298
uint32_t uint32
Definition binka_ue_file_header.h:6
void enqueue(T *item, EnqueueHazard &Hazard)
Definition FAAArrayQueue.h:209
Definition Task.h:310
bool Put(PointerType Item)
Definition LocalQueue.h:122
bool Steal(PointerType &Item)
Definition LocalQueue.h:132
bool Get(PointerType &Item)
Definition LocalQueue.h:127
bool Steal(uintptr_t &Item)
Definition LocalQueue.h:70
bool Put(uintptr_t Item)
Definition LocalQueue.h:35
bool Get(uintptr_t &Item)
Definition LocalQueue.h:54
FTask * StealLocal(bool GetBackGroundTasks)
Definition LocalQueue.h:241
FTask * Dequeue(bool GetBackGroundTasks)
Definition LocalQueue.h:257
void Init(TLocalQueueRegistry &InRegistry, ELocalQueueType InQueueType)
Definition LocalQueue.h:188
void Enqueue(FTask *Item, uint32 PriorityIndex)
Definition LocalQueue.h:229
FTask * DequeueSteal(bool GetBackGroundTasks)
Definition LocalQueue.h:278
TLocalQueue(TLocalQueueRegistry &InRegistry, ELocalQueueType InQueueType)
Definition LocalQueue.h:183
FTask * DequeueGlobal(bool GetBackGroundTasks=true)
Definition LocalQueue.h:360
TLocalQueueRegistry()
Definition LocalQueue.h:304
void Enqueue(FTask *Item, uint32 PriorityIndex)
Definition LocalQueue.h:351
FTask * DequeueSteal(bool GetBackGroundTasks)
Definition LocalQueue.h:374
void Reset()
Definition LocalQueue.h:387
@ NumQueues
Definition TaskGraphInterfaces.h:74
ELocalQueueType
Definition LocalQueue.h:142
Definition Scheduler.cpp:25
ETaskPriority
Definition Task.h:18
Definition OverriddenPropertySet.cpp:45
U16 Index
Definition radfft.cpp:71
static uint32 Cycles()
Definition AndroidPlatformTime.h:27