UDocumentation UE5.7 10.02.2026 (Source)
API documentation for Unreal Engine 5.7
ClosableMpscQueue.h
Go to the documentation of this file.
1// Copyright Epic Games, Inc. All Rights Reserved.
2
3#pragma once
4
5// HEADER_UNIT_SKIP - Not included directly
6
7#include "CoreTypes.h"
10#include <atomic>
11
15template<typename T>
17{
18public:
20
22
24 {
25 if (Head.load(std::memory_order_relaxed) == nullptr)
26 {
27 return; // closed
28 }
29
30 FNode* Tail = Sentinel.Next.load(std::memory_order_relaxed);
31 while (Tail != nullptr)
32 {
33 FNode* Next = Tail->Next.load(std::memory_order_relaxed);
34 DestructItem((T*)&Tail->Value);
35 delete Tail;
36 Tail = Next;
37 }
38 }
39
43 template <typename... ArgTypes>
44 bool Enqueue(ArgTypes&&... Args)
45 {
46 FNode* Prev = Head.load(std::memory_order_acquire);
47 if (Prev == nullptr)
48 {
49 return false; // already closed
50 }
51
52 FNode* New = new FNode;
53 ::new ((void*)&New->Value) T(Forward<ArgTypes>(Args)...);
54
55 while (!Head.compare_exchange_weak(Prev, New, std::memory_order_release) && Prev != nullptr) // linearisation point
56 {
57 }
58
59 if (Prev == nullptr)
60 {
61 DestructItem((T*)&New->Value);
62 delete New;
63 return false;
64 }
65
66 Prev->Next.store(New, std::memory_order_release);
67
68 return true;
69 }
70
76 template<typename F>
77 bool Close(const F& Consumer)
78 {
79 FNode* Tail = &Sentinel;
80
81 // we need to take note of the head at the moment of nullifying it because it can be still unreacheable from the tail
82 FNode* const Head_Local = Head.exchange(nullptr, std::memory_order_acq_rel); // linearisation point
83
84 // the queue is closed at this point, and the user is free to destroy it
85 // no members should be accessed
86 Close_NonMember(Head_Local, Tail, Consumer);
87
88 return Head_Local != nullptr;
89 }
90
91 [[nodiscard]] bool IsClosed() const
92 {
93 return Head.load(std::memory_order_relaxed) == nullptr;
94 }
95
96private:
97 struct FNode
98 {
99 std::atomic<FNode*> Next{ nullptr };
101 };
102
103 FNode Sentinel;
104 std::atomic<FNode*> Head{ &Sentinel };
105
106private:
107 template<typename F>
108 static void Close_NonMember(FNode* Head, FNode* Tail, const F& Consumer)
109 {
110 if (Head == Tail /* empty */ || Head == nullptr /* already closed */)
111 {
112 return;
113 }
114
115 auto GetNext = [](FNode* Node)
116 {
117 FNode* Next;
118 // producers can be still updating `Next`, we need to loop until we detect that the list is fully linked
119 do
120 {
121 Next = Node->Next.load(std::memory_order_relaxed);
122 } while (Next == nullptr); // <- This loop has the potential for live locking if enqueue was not completed (e.g was running at lower priority)
123
124 return Next;
125 };
126
127 // skip sentinel, do it outside of the main loop to avoid unnecessary branching in it
128 Tail = GetNext(Tail);
129
130 auto Consume = [&Consumer](FNode* Node)
131 {
132 T* ValuePtr = (T*)&Node->Value;
133 Consumer(MoveTemp(*ValuePtr));
134 DestructItem(ValuePtr);
135 delete Node;
136 };
137
138 while (Tail != Head)
139 {
140 FNode* Next = GetNext(Tail);
141 Consume(Tail);
142 Tail = Next;
143 }
144
145 Consume(Head);
146 }
147};
FORCEINLINE constexpr void DestructItem(ElementType *Element)
Definition MemoryOps.h:56
UE_FORCEINLINE_HINT TSharedRef< CastToType, Mode > StaticCastSharedRef(TSharedRef< CastFromType, Mode > const &InSharedRef)
Definition SharedPointer.h:127
T * New(FMemStackBase &Mem, int32 Count=1, int32 Align=DEFAULT_ALIGNMENT)
Definition MemStack.h:259
UE_INTRINSIC_CAST UE_REWRITE constexpr std::remove_reference_t< T > && MoveTemp(T &&Obj) noexcept
Definition UnrealTemplate.h:520
Definition ClosableMpscQueue.h:17
bool Close(const F &Consumer)
Definition ClosableMpscQueue.h:77
TClosableMpscQueue()=default
~TClosableMpscQueue()
Definition ClosableMpscQueue.h:23
bool IsClosed() const
Definition ClosableMpscQueue.h:91
UE_NONCOPYABLE(TClosableMpscQueue)
bool Enqueue(ArgTypes &&... Args)
Definition ClosableMpscQueue.h:44
uint32 GetNext(uint32 Index, const IndexType *NextIndexData, const uint32 NextIndexCount)
Definition CompactHashTable.h:116
Definition TypeCompatibleBytes.h:24