UDocumentation UE5.7 10.02.2026 (Source)
API documentation for Unreal Engine 5.7
ConsumeAllMpmcQueue.h
Go to the documentation of this file.
1// Copyright Epic Games, Inc. All Rights Reserved.
2
3#pragma once
4
5#include "CoreTypes.h"
6#include "HAL/UnrealMemory.h"
9#include <atomic>
10#include <type_traits>
11
12namespace UE
13{
15 {
18 };
19
24 template<typename T, typename AllocatorType = FMemory>
26 {
27 private:
28 struct FNode
29 {
30 std::atomic<FNode*> Next{ nullptr };
32 };
33
34 std::atomic<FNode*> Head{ nullptr };
35
36 public:
38
40
42 {
43 static_assert(std::is_trivially_destructible_v<FNode>);
44 if (Head.load(std::memory_order_acquire) != nullptr)
45 {
46 ConsumeAllLifo([](T&&) {});
47 }
48 }
49
50 //Push an Item to the Queue and
51 //returns EConsumeAllMpmcQueueResult::WasEmpty if the Queue was empty before
52 //or EConsumeAllMpmcQueueResult::HadItems if there were already items in it
53 template <typename... ArgTypes>
55 {
56 FNode* New = ::new(AllocatorType::Malloc(sizeof(FNode), alignof(FNode))) FNode;
57 ::new ((void*)&New->Item) T(Forward<ArgTypes>(Args)...);
58
59 //Atomically append to the top of the Queue
60 FNode* Prev = Head.load(std::memory_order_relaxed);
61 do
62 {
63 New->Next.store(Prev, std::memory_order_relaxed);
64 } while (!Head.compare_exchange_weak(Prev, New, std::memory_order_acq_rel, std::memory_order_relaxed));
65
67 }
68
69 //Take all items off the Queue atomically and consumes them in LIFO order
70 //returns EConsumeAllMpmcQueueResult::WasEmpty if the Queue was empty
71 //or EConsumeAllMpmcQueueResult::HadItems if there were items to consume
72 template<typename F>
74 {
75 return ConsumeAll<false>(Consumer);
76 }
77
78 //Take all items off the Queue atomically and consumes them in FIFO order
79 //at the cost of the reversing the Links once
80 //returns EConsumeAllMpmcQueueResult::WasEmpty if the Queue was empty
81 //or EConsumeAllMpmcQueueResult::HadItems if there were items to consume
82 template<typename F>
84 {
85 return ConsumeAll<true>(Consumer);
86 }
87
88 // the result can be relied upon only in special cases (e.g. debug checks), as the state can change concurrently. use with caution
89 [[nodiscard]] bool IsEmpty() const
90 {
91 return Head.load(std::memory_order_relaxed) == nullptr;
92 }
93
94 private:
95 template<bool bReverse, typename F>
96 inline EConsumeAllMpmcQueueResult ConsumeAll(const F& Consumer)
97 {
98 //pop the entire Stack
99 FNode* Node = Head.exchange(nullptr, std::memory_order_acq_rel);
100
101 if (Node == nullptr)
102 {
104 }
105
106 if (bReverse) //reverse the links to FIFO Order if requested
107 {
108 FNode* Prev = nullptr;
109 while (Node)
110 {
111 FNode* Tmp = Node;
112 Node = Node->Next.exchange(Prev, std::memory_order_relaxed);
113 Prev = Tmp;
114 }
115 Node = Prev;
116 }
117
118 while (Node) //consume the nodes of the Queue
119 {
120 FNode* Next = Node->Next.load(std::memory_order_relaxed);
121 T* ValuePtr = Node->Item.GetTypedPtr();
122 Consumer(MoveTemp(*ValuePtr));
123 DestructItem(ValuePtr);
124 AllocatorType::Free(Node);
125 Node = Next;
126 }
127
129 }
130 };
131}
FORCEINLINE constexpr void DestructItem(ElementType *Element)
Definition MemoryOps.h:56
UE_FORCEINLINE_HINT TSharedRef< CastToType, Mode > StaticCastSharedRef(TSharedRef< CastFromType, Mode > const &InSharedRef)
Definition SharedPointer.h:127
T * New(FMemStackBase &Mem, int32 Count=1, int32 Align=DEFAULT_ALIGNMENT)
Definition MemStack.h:259
UE_INTRINSIC_CAST UE_REWRITE constexpr std::remove_reference_t< T > && MoveTemp(T &&Obj) noexcept
Definition UnrealTemplate.h:520
Definition ConsumeAllMpmcQueue.h:26
EConsumeAllMpmcQueueResult ConsumeAllLifo(const F &Consumer)
Definition ConsumeAllMpmcQueue.h:73
bool IsEmpty() const
Definition ConsumeAllMpmcQueue.h:89
EConsumeAllMpmcQueueResult ConsumeAllFifo(const F &Consumer)
Definition ConsumeAllMpmcQueue.h:83
~TConsumeAllMpmcQueue()
Definition ConsumeAllMpmcQueue.h:41
UE_NONCOPYABLE(TConsumeAllMpmcQueue)
EConsumeAllMpmcQueueResult ProduceItem(ArgTypes &&... Args)
Definition ConsumeAllMpmcQueue.h:54
Definition AdvancedWidgetsModule.cpp:13
EConsumeAllMpmcQueueResult
Definition ConsumeAllMpmcQueue.h:15
Definition TypeCompatibleBytes.h:24