UDocumentation UE5.7 10.02.2026 (Source)
API documentation for Unreal Engine 5.7
MultichannelTcpSender.h
Go to the documentation of this file.
1// Copyright Epic Games, Inc. All Rights Reserved.
2
3#pragma once
4
5#include "CoreMinimal.h"
7#include "NetworkMessage.h"
8#include "Misc/ScopedEvent.h"
9#include "Misc/ScopeLock.h"
10#include "HAL/RunnableThread.h"
11#include "HAL/Runnable.h"
13
14class Error;
15class FSocket;
16
24
25
30 : public FRunnable
31{
32 enum
33 {
35 MaxPacket = 128 * 1024 - 8
36 };
37
38public:
39
47 : BytesSent(0)
48 , Socket(InSocket)
49 , OkToSendDelegate(InOkToSendDelegate)
50 {
51 Thread = FRunnableThread::Create(this, TEXT("FMultichannelTCPSender"), 8 * 1024, TPri_AboveNormal);
52 }
53
56 {
57 if (Thread)
58 {
59 Thread->Kill(true);
60 delete Thread;
61 }
62 }
63
64public:
65
68 {
69 FScopeLock ScopeLock(&SendBuffersCriticalSection);
71 }
72
77 {
78 return BytesSent;
79 }
80
89 void Send( const uint8* Data, int32 Count, uint32 Channel )
90 {
91 FScopeLock ScopeLock(&SendBuffersCriticalSection);
92 check(Count);
93
94 TArray<uint8>* SendBuffer = SendBuffers.FindRef(Channel);
95
96 if (!SendBuffer)
97 {
98 SendBuffer = new TArray<uint8>();
99 SendBuffers.Add(Channel, SendBuffer);
100 }
101
102 int32 OldCount = SendBuffer->Num();
103
104 SendBuffer->AddUninitialized(Count);
105
106 FMemory::Memcpy(SendBuffer->GetData() + OldCount, Data, Count);
107
109 }
110
111public:
112
113 // FRunnable interface
114
115 virtual bool Init( )
116 {
117 return true;
118 }
119
120 virtual uint32 Run( )
121 {
122 while (1)
123 {
124 TArray<uint8> Data;
125 uint32 Channel = 0;
126
128 {
129 FScopeLock ScopeLock(&SendBuffersCriticalSection);
130
131 TArray<uint8>* SendBuffer = NULL;
132
133 for (TMap<uint32, TArray<uint8>*>::TIterator It(SendBuffers); It; ++It)
134 {
135 check(It.Value()->Num());
136
137 if (!SendBuffer || It.Key() < Channel)
138 {
139 Channel = It.Key();
140 SendBuffer = It.Value();
141 }
142 }
143
144 if (SendBuffer)
145 {
146 int32 Num = SendBuffer->Num();
147 check(Num > 0);
148
149 int32 Size = FMath::Min<int32>(Num, MaxPacket);
150
151 if (OkToSendDelegate.Execute(Size, Channel))
152 {
153 Data.AddUninitialized(Size);
154 FMemory::Memcpy(Data.GetData(), SendBuffer->GetData(), Size);
155
156 if (Size < Num)
157 {
158 SendBuffer->RemoveAt(0, Size);
159 }
160 else
161 {
162 delete SendBuffer;
163
164 SendBuffers.Remove(Channel);
165 }
166
167 check(Data.Num());
168 }
169 }
170 else
171 {
173 EventToRestart = LocalEventToRestart;
174 }
175 }
176
177 if (Data.Num())
178 {
180
182
183 Ar << Magic;
184 Ar << Channel;
185 Ar << Data;
186
188 {
189 UE_LOG(LogMultichannelTCP, Error, TEXT("Failed to send payload."));
190
191 break;
192 }
193
194 FPlatformAtomics::InterlockedAdd(&BytesSent, Data.Num());
195 }
196
197 delete LocalEventToRestart; // block here if we don't have any data
198 }
199
200 return 0;
201 }
202
203 virtual void Stop( ) { }
204
205 virtual void Exit( )
206 {
207 FScopeLock ScopeLock(&SendBuffersCriticalSection);
208
209 for (TMap<uint32, TArray<uint8>*>::TIterator It(SendBuffers); It; ++It)
210 {
211 delete It.Value();
212 }
213 }
214
215protected:
216
219 {
220 if (EventToRestart)
221 {
222 FScopedEvent* LocalEventToRestart = EventToRestart;
223 EventToRestart = NULL;
225 }
226 }
227
228private:
229
231 int32 BytesSent;
232
234 FScopedEvent* EventToRestart;
235
237 TMap<uint32, TArray<uint8>*> SendBuffers;
238
240 FCriticalSection SendBuffersCriticalSection;
241
243 FSocket* Socket;
244
246 FRunnableThread* Thread;
247
248private:
249
251 FOnMultichannelTcpOkToSend OkToSendDelegate;
252};
#define NULL
Definition oodle2base.h:134
#define check(expr)
Definition AssertionMacros.h:314
#define TEXT(x)
Definition Platform.h:1272
FPlatformTypes::int32 int32
A 32-bit signed integer.
Definition Platform.h:1125
UE_FORCEINLINE_HINT TSharedRef< CastToType, Mode > StaticCastSharedRef(TSharedRef< CastFromType, Mode > const &InSharedRef)
Definition SharedPointer.h:127
UE::FPlatformRecursiveMutex FCriticalSection
Definition CriticalSection.h:53
#define DECLARE_DELEGATE_RetVal_TwoParams(ReturnValueType, DelegateName, Param1Type, Param2Type)
Definition DelegateCombinations.h:63
@ TPri_AboveNormal
Definition GenericPlatformAffinity.h:28
#define UE_LOG(CategoryName, Verbosity, Format,...)
Definition LogMacros.h:270
@ Num
Definition MetalRHIPrivate.h:234
@ MultichannelMagic
Definition MultichannelTcpGlobals.h:14
uint32 Size
Definition VulkanMemory.cpp:4034
uint8_t uint8
Definition binka_ue_file_header.h:8
uint32_t uint32
Definition binka_ue_file_header.h:6
Definition BufferArchive.h:48
Definition MultichannelTcpSender.h:31
virtual uint32 Run()
Definition MultichannelTcpSender.h:120
virtual bool Init()
Definition MultichannelTcpSender.h:115
int32 GetBytesSent()
Definition MultichannelTcpSender.h:76
~FMultichannelTcpSender()
Definition MultichannelTcpSender.h:55
void Send(const uint8 *Data, int32 Count, uint32 Channel)
Definition MultichannelTcpSender.h:89
FMultichannelTcpSender(FSocket *InSocket, const FOnMultichannelTcpOkToSend &InOkToSendDelegate)
Definition MultichannelTcpSender.h:46
void AttemptResumeSendingInternal()
Definition MultichannelTcpSender.h:218
void AttemptResumeSending()
Definition MultichannelTcpSender.h:67
virtual void Stop()
Definition MultichannelTcpSender.h:203
virtual void Exit()
Definition MultichannelTcpSender.h:205
Definition RunnableThread.h:20
virtual bool Kill(bool bShouldWait=true)=0
static CORE_API FRunnableThread * Create(class FRunnable *InRunnable, const TCHAR *ThreadName, uint32 InStackSize=0, EThreadPriority InThreadPri=TPri_Normal, uint64 InThreadAffinityMask=FPlatformAffinity::GetNoAffinityMask(), EThreadCreateFlags InCreateFlags=EThreadCreateFlags::None)
Definition ThreadingBase.cpp:862
Definition Runnable.h:20
Definition ScopeLock.h:141
Definition ScopedEvent.h:19
void Trigger()
Definition ScopedEvent.h:29
Definition NetworkMessage.h:116
Definition Sockets.h:19
Definition Array.h:670
UE_FORCEINLINE_HINT SizeType AddUninitialized()
Definition Array.h:1664
UE_REWRITE SizeType Num() const
Definition Array.h:1144
void RemoveAt(SizeType Index, EAllowShrinking AllowShrinking=UE::Core::Private::AllowShrinkingByDefault< AllocatorType >())
Definition Array.h:2083
UE_NODEBUG UE_FORCEINLINE_HINT ElementType * GetData() UE_LIFETIMEBOUND
Definition Array.h:1027
Definition Array.h:64
Definition UnrealString.h.inl:34
static UE_FORCEINLINE_HINT void * Memcpy(void *Dest, const void *Src, SIZE_T Count)
Definition UnrealMemory.h:160
static SOCKETS_API bool WrapAndSendPayload(const TArray< uint8 > &Payload, const FSimpleAbstractSocket &Socket)
Definition NetworkMessage.cpp:69