UDocumentation UE5.7 10.02.2026 (Source)
API documentation for Unreal Engine 5.7
UdpSocketReceiver.h
Go to the documentation of this file.
1// Copyright Epic Games, Inc. All Rights Reserved.
2
3#pragma once
4
5#include "CoreMinimal.h"
6#include "HAL/Runnable.h"
10#include "Sockets.h"
11#include "SocketSubsystem.h"
12
14#include "IPAddress.h"
15
16#include <atomic>
17
22
30
31
36 : public FRunnable
37 , private FSingleThreadRunnable
38{
39public:
40
49 : Socket(InSocket)
50 , bStopping(false)
51 , Thread(nullptr)
52 , ThreadName(InThreadName)
53 , WaitTime(InWaitTime)
54 {
55 check(Socket != nullptr);
57
59 }
60
63 {
64 if (Thread != nullptr)
65 {
66 Thread->Kill(true);
67 delete Thread;
68 }
69 }
70
71public:
72
75 {
76 MaxReadBufferSize = InMaxReadBufferSize;
77 }
78
81 {
82 ThreadStackSize = InTheadStackSize;
83 }
84
86 void Start()
87 {
88 Thread = FRunnableThread::Create(this, *ThreadName, ThreadStackSize, TPri_AboveNormal, FPlatformAffinity::GetPoolThreadMask());
89 }
90
100 {
101 check(Thread == nullptr);
102 return DataReceivedDelegate;
103 }
104
105public:
106
107 //~ FRunnable interface
108
110 {
111 return this;
112 }
113
114 virtual bool Init() override
115 {
116 return true;
117 }
118
119 virtual uint32 Run() override
120 {
121 while (!bStopping.load(std::memory_order_acquire))
122 {
123 Update(WaitTime);
124 }
125
126 return 0;
127 }
128
129 virtual void Stop() override
130 {
131 bStopping.store(true, std::memory_order_release);
132 }
133
134 virtual void Exit() override { }
135
136protected:
137
140 {
142 {
143 return;
144 }
145
147 TSharedRef<FInternetAddr> Sender = SocketSubsystem->CreateInternetAddr();
148 uint32 Size;
149
150 while (Socket->HasPendingData(Size))
151 {
153 Reader->SetNumUninitialized(FMath::Min(Size, MaxReadBufferSize));
154
155 int32 Read = 0;
156
157 if (Socket->RecvFrom(Reader->GetData(), Reader->Num(), Read, *Sender))
158 {
159 ensure((uint32)Read <= MaxReadBufferSize);
160 Reader->RemoveAt(Read, Reader->Num() - Read, EAllowShrinking::No);
161 DataReceivedDelegate.ExecuteIfBound(Reader, FIPv4Endpoint(Sender));
162 }
163 }
164 }
165
166protected:
167
168 //~ FSingleThreadRunnable interface
169
170 virtual void Tick() override
171 {
173 }
174
175private:
176
178 FSocket* Socket;
179
181 ISocketSubsystem* SocketSubsystem;
182
184 std::atomic<bool> bStopping;
185
187 FRunnableThread* Thread;
188
190 FString ThreadName;
191
193 FTimespan WaitTime;
194
196 uint32 MaxReadBufferSize = 65507u;
197
199 uint32 ThreadStackSize = 128 * 1024;
200
201private:
202
204 FOnSocketDataReceived DataReceivedDelegate;
205};
#define check(expr)
Definition AssertionMacros.h:314
#define ensure( InExpression)
Definition AssertionMacros.h:464
FPlatformTypes::TCHAR TCHAR
Either ANSICHAR or WIDECHAR, depending on whether the platform supports wide characters or the requir...
Definition Platform.h:1135
FPlatformTypes::int32 int32
A 32-bit signed integer.
Definition Platform.h:1125
UE_FORCEINLINE_HINT TSharedRef< CastToType, Mode > StaticCastSharedRef(TSharedRef< CastFromType, Mode > const &InSharedRef)
Definition SharedPointer.h:127
#define TRACE_CPUPROFILER_EVENT_SCOPE(Name)
Definition CpuProfilerTrace.h:528
#define DECLARE_DELEGATE_TwoParams(DelegateName, Param1Type, Param2Type)
Definition DelegateCombinations.h:57
@ TPri_AboveNormal
Definition GenericPlatformAffinity.h:28
#define PLATFORM_SOCKETSUBSYSTEM
Definition SocketSubsystem.h:44
@ SOCKTYPE_Datagram
Definition SocketTypes.h:37
TSharedPtr< FArrayReader, ESPMode::ThreadSafe > FArrayReaderPtr
Definition UdpSocketReceiver.h:21
uint32 Size
Definition VulkanMemory.cpp:4034
uint32_t uint32
Definition binka_ue_file_header.h:6
static const uint64 GetPoolThreadMask()
Definition AndroidPlatformAffinity.h:38
Definition RunnableThread.h:20
virtual bool Kill(bool bShouldWait=true)=0
static CORE_API FRunnableThread * Create(class FRunnable *InRunnable, const TCHAR *ThreadName, uint32 InStackSize=0, EThreadPriority InThreadPri=TPri_Normal, uint64 InThreadAffinityMask=FPlatformAffinity::GetNoAffinityMask(), EThreadCreateFlags InCreateFlags=EThreadCreateFlags::None)
Definition ThreadingBase.cpp:862
Definition Runnable.h:20
Definition SingleThreadRunnable.h:12
Definition Sockets.h:19
virtual bool HasPendingData(uint32 &PendingDataSize)=0
virtual bool Wait(ESocketWaitConditions::Type Condition, FTimespan WaitTime)=0
virtual SOCKETS_API bool RecvFrom(uint8 *Data, int32 BufferSize, int32 &BytesRead, FInternetAddr &Source, ESocketReceiveFlags::Type Flags=ESocketReceiveFlags::None)
Definition Sockets.cpp:63
ESocketType GetSocketType() const
Definition Sockets.h:458
Definition UdpSocketReceiver.h:38
virtual void Stop() override
Definition UdpSocketReceiver.h:129
void Update(const FTimespan &SocketWaitTime)
Definition UdpSocketReceiver.h:139
virtual ~FUdpSocketReceiver()
Definition UdpSocketReceiver.h:62
void SetThreadStackSize(uint32 InTheadStackSize)
Definition UdpSocketReceiver.h:80
virtual uint32 Run() override
Definition UdpSocketReceiver.h:119
FOnSocketDataReceived & OnDataReceived()
Definition UdpSocketReceiver.h:99
virtual void Tick() override
Definition UdpSocketReceiver.h:170
FUdpSocketReceiver(FSocket *InSocket, const FTimespan &InWaitTime, const TCHAR *InThreadName)
Definition UdpSocketReceiver.h:48
virtual bool Init() override
Definition UdpSocketReceiver.h:114
void SetMaxReadBufferSize(uint32 InMaxReadBufferSize)
Definition UdpSocketReceiver.h:74
virtual FSingleThreadRunnable * GetSingleThreadInterface() override
Definition UdpSocketReceiver.h:109
virtual void Exit() override
Definition UdpSocketReceiver.h:134
void Start()
Definition UdpSocketReceiver.h:86
Definition SocketSubsystem.h:58
static SOCKETS_API ISocketSubsystem * Get(const FName &SubsystemName=NAME_None)
Definition SocketSubsystem.cpp:224
virtual TSharedRef< FInternetAddr > CreateInternetAddr(uint32 Address, uint32 Port=0)
Definition SocketSubsystem.h:317
Definition SharedPointer.h:692
Definition SharedPointer.h:153
@ WaitForRead
Definition SocketTypes.h:89
@ false
Definition radaudio_common.h:23
Definition IPv4Endpoint.h:27
Definition Timespan.h:76
static FTimespan Zero()
Definition Timespan.h:747