UDocumentation UE5.7 10.02.2026 (Source)
API documentation for Unreal Engine 5.7
DepletableMpmcQueue.h
Go to the documentation of this file.
1// Copyright Epic Games, Inc. All Rights Reserved.
2
3#pragma once
4
5#include "CoreTypes.h"
6#include "HAL/UnrealMemory.h"
9#include <atomic>
10#include <type_traits>
11
12namespace UE
13{
24 template<typename T, typename AllocatorType = FMemory>
25 class UE_DEPRECATED(5.3, "This concurrent queue was deprecated because it uses spin-waiting that can cause priority inversion and subsequently deadlocks on some platforms. Please use TConsumeAllMpmcQueue.") TDepletableMpmcQueue final
26 {
27 private:
28 struct FNode
29 {
30 std::atomic<FNode*> Next{ nullptr };
32 };
33
34 FNode Sentinel; // `Sentinel.Next` is the head of the queue
35 std::atomic<FNode*> Tail{ &Sentinel };
36
37 public:
39
40 [[nodiscard]] TDepletableMpmcQueue() = default;
41
43 {
44 static_assert(std::is_trivially_destructible_v<FNode>);
45
46 // delete remaining elements
47 FNode* Node = Sentinel.Next.load(std::memory_order_relaxed);
48 while (Node != nullptr)
49 {
50 DestructItem(Node->Value.GetTypedPtr());
51 FNode* Next = Node->Next.load(std::memory_order_relaxed);
52 AllocatorType::Free(Node);
53 Node = Next;
54 }
55 }
56
57 template <typename... ArgTypes>
58 void Enqueue(ArgTypes&&... Args)
59 {
61 }
62
63 template <typename... ArgTypes>
65 {
66 FNode* New = ::new(AllocatorType::Malloc(sizeof(FNode), alignof(FNode))) FNode;
67 ::new ((void*)&New->Value) T(Forward<ArgTypes>(Args)...);
68
69 // switch `Tail` to the new node and only then link the old tail to the new one. The list is not fully linked between these ops,
70 // this is explicitly handled by the consumer by waiting for the link
71
72 FNode* Prev = Tail.exchange(New, std::memory_order_acq_rel); // "acquire" to sync with `Prev->Next` initialisation from a concurrent calls,
73 // `release` to make sure the new node is fully constructed before it becomes visible to the consumer or a concurrent enqueueing
74
75 // the following `check` is commented out because it can be reordered after the following `Prev->Next.store` which unlocks
76 // the consumer that will free `Prev`. left commented as a documentation
77 // check(Prev->Next.load(std::memory_order_relaxed) == nullptr); // `Tail` is assigned before its Next
78
79 Prev->Next.store(New, std::memory_order_relaxed);
80
81 return Prev == &Sentinel;
82 }
83
88 template<typename F>
89 void Deplete(const F& Consumer)
90 {
91 // reset the head so the next consumption can detect that the queue is empty
92 FNode* First = Sentinel.Next.exchange(nullptr, std::memory_order_relaxed);
93 if (First == nullptr)
94 {
95 return; // empty
96 }
97
98 // reset the queue to the empty state. this redirects producers to start from `Sentinel` again.
99 // take note of the tail on resetting it because the list can be still not fully linked and so `Node.Next == nullptr` can't be
100 // used to detect the end of the list
101 FNode* Last = Tail.exchange(&Sentinel, std::memory_order_acq_rel); // `acquire` to sync with producers' tail modifications, and
102 // "release" to force `Sentinel.Next = nullptr` happening before modifying `Tail
103 check(Last->Next.load(std::memory_order_relaxed) == nullptr); // `Tail` is assigned before its Next
104 // the previously queued items are detached from the instance (as a linked list, though potentially not fully linked yet)
105
106 check(Last != &Sentinel); // can't be empty because of `First != nullptr` above
107 Deplete_Internal(First, Last, Consumer);
108 }
109
110 // the result can be relied upon only in special cases (e.g. debug checks), as the state can change concurrently. use with caution
111 bool IsEmpty() const
112 {
113 return Tail.load(std::memory_order_relaxed) == &Sentinel;
114 }
115
116 private:
117 template<typename F>
118 static void Deplete_Internal(FNode* First, FNode* Last, const F& Consumer)
119 {
120 auto GetNext = [](FNode* Node)
121 {
122 FNode* Next = nullptr;
123 // producers can be still updating `Next`, wait until the link to the next element is established
124 while (Next == nullptr) // <- This loop has the potential for live locking if enqueue was not completed (e.g was running at lower priority)
125 {
126 Next = Node->Next.load(std::memory_order_relaxed);
127 };
128 return Next;
129 };
130
131 auto Consume = [&Consumer](FNode* Node)
132 {
133 T* ValuePtr = (T*)&Node->Value;
134 Consumer(MoveTemp(*ValuePtr));
135 DestructItem(ValuePtr);
136 AllocatorType::Free(Node);
137 };
138
139 while (First != Last)
140 {
141 FNode* Next = GetNext(First);
142 Consume(First);
143 First = Next;
144 }
145
146 Consume(Last);
147 }
148 };
149
151 template<typename T, typename AllocatorType = FMemory>
152 using TDepletableMpscQueue UE_DEPRECATED(5.2, "This concurrent queue was deprecated because it uses spin-waiting that can cause priority inversion and subsequently deadlocks on some platforms. Please use TConsumeAllMpmcQueue.") = TDepletableMpmcQueue<T, AllocatorType>;
154}
OODEFFUNC typedef void(OODLE_CALLBACK t_fp_OodleCore_Plugin_Free)(void *ptr)
#define check(expr)
Definition AssertionMacros.h:314
#define UE_NONCOPYABLE(TypeName)
Definition CoreMiscDefines.h:457
#define UE_DEPRECATED(Version, Message)
Definition CoreMiscDefines.h:302
FORCEINLINE constexpr void DestructItem(ElementType *Element)
Definition MemoryOps.h:56
UE_FORCEINLINE_HINT TSharedRef< CastToType, Mode > StaticCastSharedRef(TSharedRef< CastFromType, Mode > const &InSharedRef)
Definition SharedPointer.h:127
#define PRAGMA_ENABLE_DEPRECATION_WARNINGS
Definition GenericPlatformCompilerPreSetup.h:12
#define PRAGMA_DISABLE_DEPRECATION_WARNINGS
Definition GenericPlatformCompilerPreSetup.h:8
T * New(FMemStackBase &Mem, int32 Count=1, int32 Align=DEFAULT_ALIGNMENT)
Definition MemStack.h:259
UE_INTRINSIC_CAST UE_REWRITE constexpr std::remove_reference_t< T > && MoveTemp(T &&Obj) noexcept
Definition UnrealTemplate.h:520
Definition AdvancedWidgetsModule.cpp:13
Definition TypeCompatibleBytes.h:24