UDocumentation UE5.7 10.02.2026 (Source)
API documentation for Unreal Engine 5.7
FAAArrayQueue.h
Go to the documentation of this file.
1/******************************************************************************
2 * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of Concurrency Freaks nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 ******************************************************************************
27 */
28
29#ifndef _FAA_ARRAY_QUEUE_HP_H_
30#define _FAA_ARRAY_QUEUE_HP_H_
31
32#include <atomic>
33#include "HazardPointer.h"
34#include "HAL/PlatformProcess.h"
35
79template<typename T>
81{
82 static constexpr long BUFFER_SIZE = 1024; // 1024
83
84private:
85 struct Node
86 {
87 std::atomic<int> deqidx;
88 std::atomic<T*> items[BUFFER_SIZE];
89 std::atomic<int> enqidx;
90 std::atomic<Node*> next;
91
92 // Start with the first entry pre-filled and enqidx at 1
93 Node(T* item) : deqidx{0}, enqidx{1}, next{nullptr}
94 {
95 items[0].store(item, std::memory_order_relaxed);
96 for (long i = 1; i < BUFFER_SIZE; i++)
97 {
98 items[i].store(nullptr, std::memory_order_relaxed);
99 }
100 }
101
102 bool casNext(Node *cmp, Node *val)
103 {
104 return next.compare_exchange_strong(cmp, val);
105 }
106 };
107
108 bool casTail(Node *cmp, Node *val)
109 {
110 return tail.compare_exchange_strong(cmp, val);
111 }
112
113 bool casHead(Node *cmp, Node *val)
114 {
115 return head.compare_exchange_strong(cmp, val);
116 }
117
118 // Pointers to head and tail of the list
119 alignas(PLATFORM_CACHE_LINE_SIZE * 2) std::atomic<Node*> head;
120 alignas(PLATFORM_CACHE_LINE_SIZE * 2) std::atomic<Node*> tail;
121
123 inline static T* GetTakenPointer()
124 {
125 return reinterpret_cast<T*>(~uintptr_t(0));
126 }
127
128public:
130 {
131 Node* sentinelNode = new Node(nullptr);
132 sentinelNode->enqidx.store(0, std::memory_order_relaxed);
133 head.store(sentinelNode, std::memory_order_relaxed);
134 tail.store(sentinelNode, std::memory_order_relaxed);
135 }
136
138 {
139 while (dequeue() != nullptr); // Drain the queue
140 delete head.load(); // Delete the last node
141 }
142
143 class EnqueueHazard : private THazardPointer<Node, true>
144 {
145 friend class FAAArrayQueue<T>;
146 inline EnqueueHazard(std::atomic<Node*>& Hazard, FHazardPointerCollection& Collection) : THazardPointer<Node, true>(Hazard, Collection)
147 {}
148
149 public:
150 inline EnqueueHazard() = default;
151 inline EnqueueHazard(EnqueueHazard&& Hazard) : THazardPointer<Node, true>(MoveTemp(Hazard))
152 {}
153
155 {
156 static_cast<THazardPointer<Node, true>&>(*this) = MoveTemp(Other);
157 return *this;
158 }
159 };
160
161private:
162 template<typename HazardType>
163 void enqueueInternal(T* item, HazardType& Hazard)
164 {
165 checkSlow(item);
166 while (true)
167 {
168 Node* ltail = Hazard.Get();
169 const int idx = ltail->enqidx.fetch_add(1);
170 if (idx > BUFFER_SIZE-1)
171 { // This node is full
172 if (ltail != tail.load())
173 {
174 continue;
175 }
176 Node* lnext = ltail->next.load();
177 if (lnext == nullptr)
178 {
179 Node* newNode = new Node(item);
180 if (ltail->casNext(nullptr, newNode))
181 {
182 casTail(ltail, newNode);
183 Hazard.Retire();
184 return;
185 }
186 delete newNode;
187 }
188 else
189 {
190 casTail(ltail, lnext);
191 }
192 continue;
193 }
194 T* itemnull = nullptr;
195 if (ltail->items[idx].compare_exchange_strong(itemnull, item))
196 {
197 Hazard.Retire();
198 return;
199 }
200 }
201 }
202
203public:
205 {
206 return {tail, Hazards};
207 };
208
209 inline void enqueue(T* item, EnqueueHazard& Hazard)
210 {
211 enqueueInternal(item, Hazard);
212 }
213
214 inline void enqueue(T* item)
215 {
216 THazardPointer<Node> Hazard(tail, Hazards);
217 enqueueInternal(item, Hazard);
218 }
219
220 class DequeueHazard : private THazardPointer<Node, true>
221 {
222 friend class FAAArrayQueue<T>;
223 inline DequeueHazard(std::atomic<Node*>& Hazard, FHazardPointerCollection& Collection) : THazardPointer<Node, true>(Hazard, Collection)
224 {}
225
226 public:
227 inline DequeueHazard() = default;
228 inline DequeueHazard(DequeueHazard&& Hazard) : THazardPointer<Node, true>(MoveTemp(Hazard))
229 {}
230
232 {
233 static_cast<THazardPointer<Node, true>&>(*this) = MoveTemp(Other);
234 return *this;
235 }
236 };
237
238private:
239 template<typename HazardType>
240 T* dequeueInternal(HazardType& Hazard)
241 {
242 while (true)
243 {
244 Node* lhead = Hazard.Get();
245 if (lhead->deqidx.load() >= lhead->enqidx.load() && lhead->next.load() == nullptr)
246 {
247 break;
248 }
249 const int idx = lhead->deqidx.fetch_add(1);
250 if (idx > BUFFER_SIZE-1)
251 { // This node has been drained, check if there is another one
252 Node* lnext = lhead->next.load();
253 if (lnext == nullptr)
254 {
255 break; // No more nodes in the queue
256 }
257 if (casHead(lhead, lnext))
258 {
259 Hazard.Retire();
260 Hazards.Delete(lhead);
261 }
262 continue;
263 }
264
265 // When there is more consumers than producers we can end up stealing
266 // empty slots that producers have reserved but not yet had time to write into.
267 // This leads to a lot of retries on the producers side so help this case
268 // by spinning just a little bit when we know the deqidx we got is valid
269 // and is likely to be written to very soon.
270 if (lhead->items[idx].load() == nullptr && idx <= lhead->enqidx.load())
271 {
272 for (int32 Try = 0; lhead->items[idx].load() == nullptr && Try < 10; ++Try)
273 {
275 }
276 }
277
278 T* item = lhead->items[idx].exchange(GetTakenPointer());
279 if (item == nullptr)
280 {
281 continue;
282 }
283 Hazard.Retire();
284 return item;
285 }
286 Hazard.Retire();
287 return nullptr;
288 }
289
290public:
291 inline T* dequeue(DequeueHazard& Hazard)
292 {
293 return dequeueInternal(Hazard);
294 }
295
297 {
298 return {head, Hazards};
299 };
300
301 inline T* dequeue()
302 {
303 THazardPointer<Node> Hazard(head, Hazards);
304 return dequeueInternal(Hazard);
305 }
306};
307
308#endif /* _FAA_ARRAY_QUEUE_HP_H_ */
#define checkSlow(expr)
Definition AssertionMacros.h:332
FPlatformTypes::int32 int32
A 32-bit signed integer.
Definition Platform.h:1125
#define PLATFORM_CACHE_LINE_SIZE
Definition Platform.h:938
UE_FORCEINLINE_HINT TSharedRef< CastToType, Mode > StaticCastSharedRef(TSharedRef< CastFromType, Mode > const &InSharedRef)
Definition SharedPointer.h:127
return true
Definition ExternalRpcRegistry.cpp:601
UE_INTRINSIC_CAST UE_REWRITE constexpr std::remove_reference_t< T > && MoveTemp(T &&Obj) noexcept
Definition UnrealTemplate.h:520
Definition FAAArrayQueue.h:221
DequeueHazard & operator=(DequeueHazard &&Other)
Definition FAAArrayQueue.h:231
DequeueHazard(DequeueHazard &&Hazard)
Definition FAAArrayQueue.h:228
Definition FAAArrayQueue.h:144
EnqueueHazard & operator=(EnqueueHazard &&Other)
Definition FAAArrayQueue.h:154
EnqueueHazard(EnqueueHazard &&Hazard)
Definition FAAArrayQueue.h:151
Definition FAAArrayQueue.h:81
void enqueue(T *item)
Definition FAAArrayQueue.h:214
EnqueueHazard getTailHazard()
Definition FAAArrayQueue.h:204
void enqueue(T *item, EnqueueHazard &Hazard)
Definition FAAArrayQueue.h:209
T * dequeue()
Definition FAAArrayQueue.h:301
~FAAArrayQueue()
Definition FAAArrayQueue.h:137
FAAArrayQueue()
Definition FAAArrayQueue.h:129
T * dequeue(DequeueHazard &Hazard)
Definition FAAArrayQueue.h:291
DequeueHazard getHeadHazard()
Definition FAAArrayQueue.h:296
Definition HazardPointer.h:58
Definition HazardPointer.h:245
static void Yield()
Definition GenericPlatformProcess.h:950