UDocumentation UE5.7 10.02.2026 (Source)
API documentation for Unreal Engine 5.7
Pipe.h
Go to the documentation of this file.
1// Copyright Epic Games, Inc. All Rights Reserved.
2
3#pragma once
4
7#include "CoreTypes.h"
9#include "Tasks/Task.h"
10#include "Async/EventCount.h"
11#include "Tasks/TaskPrivate.h"
12#include "Templates/Invoke.h"
14
15#include <atomic>
16#include <type_traits>
17
18namespace UE::Tasks
19{
20 // A chain of tasks that are executed one after another. Can be used to synchronise access to a shared resource as FPipe guarantees
21 // non-concurrent tasks execution. FPipe is a replacement for named threads because it's lightweight and flexible -
22 // there can be a large dynamic number of pipes each controlling its own shared resource. Can be used as a replacement for
23 // dedicated threads.
24 // Execution order is FIFO for tasks that don't have prerequisites, i.e. it's the same as launching order.
25 // Adding prerequisites to a pipe task can alter when the task is queued to the pipe, hence can change the execution order.
26 // A pipe must be alive until its last task is completed.
27 // See `FTasksPipeTest` for tests and examples.
28 class FPipe
29 {
30 public:
32
33 // @param InDebugName helps to identify the pipe in debugger and profiler. `UE_SOURCE_LOCATION` can be used as an auto-generated
34 // unique name.
35 explicit FPipe(const TCHAR* InDebugName)
36 : EmptyEventRef(MakeShared<UE::FEventCount>())
37 , DebugName(InDebugName)
38 {}
39
41 {
42 check(!HasWork());
43 }
44
45 // returns `true` if the pipe has any not completed tasks
46 bool HasWork() const
47 {
48 return TaskCount.load(std::memory_order_relaxed) != 0;
49 }
50
51 // waits until the pipe is empty (its last task is executed)
52 // should be used only after no more tasks are launched in the pipe, e.g. preparing for the pipe destruction
54
55 // launches a task in the pipe
56 // @param InDebugName helps to identify the task in debugger and profiler
57 // @param TaskBody a callable with no parameters, usually a lambda but can be also a functor object
58 // or a pointer to a function. TaskBody can return results.
59 // @Priority - task priority, can affect task scheduling once it's passed the pipe
60 // @param TaskFlags - task config options
61 // @return Task instance that can be used to wait for task completion or to obtain the result of task execution
62 template<typename TaskBodyType>
64 (
65 const TCHAR* InDebugName,
67 ETaskPriority Priority = ETaskPriority::Default,
70 )
71 {
72 using FResult = TInvokeResult_T<TaskBodyType>;
74
75 FExecutableTask* Task = FExecutableTask::Create(InDebugName, Forward<TaskBodyType>(TaskBody), Priority, ExtendedPriority, Flags);
76 TaskCount.fetch_add(1, std::memory_order_acq_rel);
77 Task->SetPipe(*this);
78 Task->TryLaunch(sizeof(*Task));
79 return TTask<FResult>{ Task };
80 }
81
82 // launches a task in the pipe, with multiple prerequisites that must be completed before the task is scheduled
83 // @param InDebugName helps to identify the task in debugger and profiler
84 // @param TaskBody a callable with no parameters, usually a lambda but can be also a functor object
85 // or a pointer to a function. TaskBody can return results.
86 // @Priority - task priority, can affect task scheduling once it's passed the pipe
87 // @param TaskFlags - task config options
88 // @return Task instance that can be used to wait for task completion or to obtain the result of task execution
89 template<typename TaskBodyType, typename PrerequisitesCollectionType>
91 (
92 const TCHAR* InDebugName,
95 ETaskPriority Priority = ETaskPriority::Default,
98 )
99 {
100 using FResult = TInvokeResult_T<TaskBodyType>;
102
103 FExecutableTask* Task = FExecutableTask::Create(InDebugName, Forward<TaskBodyType>(TaskBody), Priority, ExtendedPriority, Flags);
104 TaskCount.fetch_add(1, std::memory_order_acq_rel);
105
106 // Order matters here, pipe must be set before prerequisites try to unlock us
107 // otherwise we could race between SetPipe and TryUnlock.
108 // Pipe and NumLock must both be consistent together at the time of Unlock.
109 Task->SetPipe(*this);
111 Task->TryLaunch(sizeof(*Task));
112 return TTask<FResult>{ Task };
113 }
114
115 // checks if pipe's task is being executed by the current thread. Allows to check if accessing a resource protected by a pipe
116 // is thread-safe
117 CORE_API bool IsInContext() const;
118
119 private:
120 friend class Private::FTaskBase;
121
122 // pushes given task into the pipe: adds the task as a subsequent to the last task if any and sets it as the new last task
123 // returns the accounted reference to the previous piped task if we managed to register the given task as its subsequent, otherwise nullptr.
124 // the reference must be released by the caller when not needed anymore
126
127 // pipe holds a "weak" reference to a task. the task must be cleared from the pipe when its execution finished before its completion,
128 // otherwise the next piped task can try to add itself as a subsequent to an already destroyed task.
129 void ClearTask(Private::FTaskBase& Task);
130
131 // notifications about pipe's task execution
132 void ExecutionStarted();
133 void ExecutionFinished();
134 private:
135 // pipe builds a chain (a linked list) of tasks and so needs to store only the last one. the last task is null if the pipe is not blocked
136 std::atomic<Private::FTaskBase*> LastTask{ nullptr };
137 std::atomic<uint64> TaskCount { 0 };
138 TSharedRef<UE::FEventCount> EmptyEventRef;
139 public:
141 {
142 return DebugName;
143 }
144
145 private:
146 const TCHAR* const DebugName;
147 };
148}
#define FORCENOINLINE
Definition AndroidPlatform.h:142
#define check(expr)
Definition AssertionMacros.h:314
FPlatformTypes::TCHAR TCHAR
Either ANSICHAR or WIDECHAR, depending on whether the platform supports wide characters or the requir...
Definition Platform.h:1135
typename TInvokeResult< FuncType, ArgTypes... >::Type TInvokeResult_T
Definition Invoke.h:135
TSharedRef< InObjectType, InMode > MakeShared(InArgTypes &&... Args)
Definition SharedPointer.h:2009
UE_FORCEINLINE_HINT TSharedRef< CastToType, Mode > StaticCastSharedRef(TSharedRef< CastFromType, Mode > const &InSharedRef)
Definition SharedPointer.h:127
Definition SharedPointer.h:153
Definition Pipe.h:29
TTask< TInvokeResult_T< TaskBodyType > > Launch(const TCHAR *InDebugName, TaskBodyType &&TaskBody, PrerequisitesCollectionType &&Prerequisites, ETaskPriority Priority=ETaskPriority::Default, EExtendedTaskPriority ExtendedPriority=EExtendedTaskPriority::None, ETaskFlags Flags=ETaskFlags::None)
Definition Pipe.h:91
CORE_API bool WaitUntilEmpty(FTimespan Timeout=FTimespan::MaxValue())
Definition Pipe.cpp:58
~FPipe()
Definition Pipe.h:40
bool HasWork() const
Definition Pipe.h:46
CORE_API bool IsInContext() const
Definition Pipe.cpp:135
FORCENOINLINE const TCHAR * GetDebugName() const
Definition Pipe.h:140
TTask< TInvokeResult_T< TaskBodyType > > Launch(const TCHAR *InDebugName, TaskBodyType &&TaskBody, ETaskPriority Priority=ETaskPriority::Default, EExtendedTaskPriority ExtendedPriority=EExtendedTaskPriority::None, ETaskFlags Flags=ETaskFlags::None)
Definition Pipe.h:64
FPipe(const TCHAR *InDebugName)
Definition Pipe.h:35
Definition TaskPrivate.h:120
Definition TaskPrivate.h:925
Definition Task.h:191
Definition AnalyticsProviderLog.h:8
TStaticArray< Private::FTaskBase *, sizeof...(TaskTypes)> Prerequisites(TaskTypes &... Tasks)
Definition Task.h:365
ETaskFlags
Definition TaskPrivate.h:89
EExtendedTaskPriority
Definition TaskPrivate.h:60
Definition AdvancedWidgetsModule.cpp:13
Definition Timespan.h:76
static FTimespan MaxValue()
Definition Timespan.h:686