UDocumentation UE5.7 10.02.2026 (Source)
API documentation for Unreal Engine 5.7
MultichannelTcpSocket.h
Go to the documentation of this file.
1// Copyright Epic Games, Inc. All Rights Reserved.
2
3#pragma once
4
5#include "CoreMinimal.h"
7#include "Misc/ScopedEvent.h"
8#include "Misc/ScopeLock.h"
12
13class FSocket;
14
23{
24 enum
25 {
31 ControlChannel = 0
32 };
33
34public:
35
43 : BandwidthLatencyProduct(InBandwidthLatencyProduct)
44 , RemoteReceiverBytesReceived(0)
45 , Receiver(InSocket, FOnMultichannelTcpReceive::CreateRaw(this, &FMultichannelTcpSocket::HandleReceiverReceive))
46 , Sender(InSocket, FOnMultichannelTcpOkToSend::CreateRaw(this, &FMultichannelTcpSocket::HandleSenderOkToSend))
47 , Socket(InSocket)
48 { }
49
50public:
51
62 {
63 check((Channel != ControlChannel) && Data); // Channel 0 is reserved, must want data
64
65 if (Count > 0)
66 {
67 for (int32 Attempt = 0; Attempt < 2; Attempt++)
68 {
70 {
71 FScopeLock ScopeLock(&ReceiveBuffersCriticalSection);
72
73 FReceiveBuffer* ChannelBuffer = ReceiveBuffers.FindRef(Channel);
74
75 if (!ChannelBuffer)
76 {
77 ChannelBuffer = new FReceiveBuffer();
78
79 ReceiveBuffers.Add(Channel, ChannelBuffer);
80 }
81
82 // Would be bad to have multiple listeners
83 check(!ChannelBuffer->EventToResumeWhenDataIsReady && !ChannelBuffer->BytesRequiredToResume);
84
85 if (ChannelBuffer->Buffer.Num() >= Count)
86 {
87 FMemory::Memcpy(Data, ChannelBuffer->Buffer.GetData(), Count);
88
89 if (ChannelBuffer->Buffer.Num() == Count)
90 {
91 ReceiveBuffers.Remove(Channel);
92 }
93 else
94 {
95 ChannelBuffer->Buffer.RemoveAt(0, Count);
96 }
97
98 return Count;
99 }
100
101 if (!Attempt) // if someone woke us up with insufficient data, we simply error out
102 {
104
105 ChannelBuffer->EventToResumeWhenDataIsReady = LocalEventToRestart;
106 ChannelBuffer->BytesRequiredToResume = Count;
107 }
108 }
109
110 delete LocalEventToRestart; // wait here for sufficient data
111 }
112 }
113
114 return 0;
115 }
116
126 {
127 FScopeLock ScopeLock(&ReceiveBuffersCriticalSection);
128
129 FReceiveBuffer* ChannelBuffer = ReceiveBuffers.FindRef(Channel);
130
131 return ChannelBuffer ? ChannelBuffer->Buffer.Num() : 0;
132 }
133
145 int32 PollingReceive( uint8* Data, int32 MaxCount, uint32 Channel )
146 {
147 check(Channel != ControlChannel && Data && MaxCount > 0); // Channel 0 is reserved, must want data
148
149 FScopeLock ScopeLock(&ReceiveBuffersCriticalSection);
150
151 FReceiveBuffer* ChannelBuffer = ReceiveBuffers.FindRef(Channel);
152
153 if (!ChannelBuffer)
154 {
155 return 0;
156 }
157
158 check(!ChannelBuffer->EventToResumeWhenDataIsReady && !ChannelBuffer->BytesRequiredToResume && ChannelBuffer->Buffer.Num()); // would be bad to have multiple listeners
159
160 int32 Count = FMath::Min<int32>(ChannelBuffer->Buffer.Num(), MaxCount);
161
162 FMemory::Memcpy(Data, ChannelBuffer->Buffer.GetData(), Count);
163
164 if (ChannelBuffer->Buffer.Num() == Count)
165 {
166 ReceiveBuffers.Remove(Channel);
167 }
168 else
169 {
170 ChannelBuffer->Buffer.RemoveAt(0, Count);
171 }
172
173 return Count;
174 }
175
186 void Send( const uint8* Data, int32 Count, uint32 Channel )
187 {
188 check((Channel != ControlChannel) && Data && Count); // Channel 0 is reserved, must have data
189 Sender.Send(Data, Count, Channel);
190 }
191
192private:
193
195 struct FReceiveBuffer
196 {
199
201 int32 BytesRequiredToResume;
202
204 FScopedEvent* EventToResumeWhenDataIsReady;
205
207 FReceiveBuffer( )
208 : BytesRequiredToResume(0)
209 , EventToResumeWhenDataIsReady(NULL)
210 { }
211 };
212
213 // Callback for receiving data from the receiver thread.
214 void HandleReceiverReceive( const TArray<uint8>& Payload, uint32 Channel, bool bForceByteswapping )
215 {
216 if (Channel == ControlChannel)
217 {
218 FMemoryReader Ar(Payload);
219
220 Ar.SetByteSwapping(bForceByteswapping);
221
224
225 RemoteReceiverBytesReceived = RemoteBytesReceived;
226
227 Sender.AttemptResumeSending();
228
229 return;
230 }
231
232 // process the actual payload
233 {
234 FScopeLock ScopeLock(&ReceiveBuffersCriticalSection);
235 FReceiveBuffer* ChannelBuffer = ReceiveBuffers.FindRef(Channel);
236
237 if (!ChannelBuffer)
238 {
239 ChannelBuffer = new FReceiveBuffer();
240 ReceiveBuffers.Add(Channel, ChannelBuffer);
241 }
242
243 ChannelBuffer->Buffer += Payload;
244
245 if (ChannelBuffer->Buffer.Num() >= ChannelBuffer->BytesRequiredToResume && ChannelBuffer->EventToResumeWhenDataIsReady)
246 {
247 ChannelBuffer->EventToResumeWhenDataIsReady->Trigger();
248 ChannelBuffer->EventToResumeWhenDataIsReady = NULL;
249 ChannelBuffer->BytesRequiredToResume = 0;
250 }
251 }
252
253 // send a control channel message back
254 {
256 uint64 BytesReceived = Receiver.GetBytesReceived();
257 Ar << BytesReceived;
258 Sender.Send(Ar.GetData(), Ar.Num(), ControlChannel);
259 }
260 }
261
263 bool HandleSenderOkToSend( int32 PayloadSize, uint32 Channel )
264 {
265 if (Channel == ControlChannel)
266 {
267 return true; // always ok to send on the control channel
268 }
269
270 bool Result = (Sender.GetBytesSent() + PayloadSize) < (RemoteReceiverBytesReceived + BandwidthLatencyProduct);
271
272 return Result;
273 }
274
275private:
276
278 uint64 BandwidthLatencyProduct;
279
281 TMap<uint32, FReceiveBuffer*> ReceiveBuffers;
282
284 FCriticalSection ReceiveBuffersCriticalSection;
285
287 int64 RemoteReceiverBytesReceived;
288
291
294
296 FSocket* Socket;
297};
#define NULL
Definition oodle2base.h:134
#define check(expr)
Definition AssertionMacros.h:314
FPlatformTypes::int64 int64
A 64-bit signed integer.
Definition Platform.h:1127
FPlatformTypes::int32 int32
A 32-bit signed integer.
Definition Platform.h:1125
FPlatformTypes::uint64 uint64
A 64-bit unsigned integer.
Definition Platform.h:1117
UE_FORCEINLINE_HINT TSharedRef< CastToType, Mode > StaticCastSharedRef(TSharedRef< CastFromType, Mode > const &InSharedRef)
Definition SharedPointer.h:127
UE::FPlatformRecursiveMutex FCriticalSection
Definition CriticalSection.h:53
uint8_t uint8
Definition binka_ue_file_header.h:8
uint32_t uint32
Definition binka_ue_file_header.h:6
Definition BufferArchive.h:48
Definition MemoryReader.h:17
Definition MultichannelTcpReceiver.h:29
int32 GetBytesReceived()
Definition MultichannelTcpReceiver.h:60
Definition MultichannelTcpSender.h:31
int32 GetBytesSent()
Definition MultichannelTcpSender.h:76
void Send(const uint8 *Data, int32 Count, uint32 Channel)
Definition MultichannelTcpSender.h:89
void AttemptResumeSending()
Definition MultichannelTcpSender.h:67
Definition MultichannelTcpSocket.h:23
int32 DataAvailable(uint32 Channel)
Definition MultichannelTcpSocket.h:125
FMultichannelTcpSocket(FSocket *InSocket, uint64 InBandwidthLatencyProduct)
Definition MultichannelTcpSocket.h:42
int32 PollingReceive(uint8 *Data, int32 MaxCount, uint32 Channel)
Definition MultichannelTcpSocket.h:145
int32 BlockingReceive(uint8 *Data, int32 Count, uint32 Channel)
Definition MultichannelTcpSocket.h:61
void Send(const uint8 *Data, int32 Count, uint32 Channel)
Definition MultichannelTcpSocket.h:186
Definition ScopeLock.h:141
Definition ScopedEvent.h:19
Definition Sockets.h:19
Definition Array.h:670
UE_REWRITE SizeType Num() const
Definition Array.h:1144
UE_NODEBUG UE_FORCEINLINE_HINT ElementType * GetData() UE_LIFETIMEBOUND
Definition Array.h:1027
Definition UnrealString.h.inl:34
const SIZE_T PayloadSize
Definition UDPPing.cpp:1293
UE_STRING_CLASS Result(Forward< LhsType >(Lhs), RhsLen)
Definition String.cpp.inl:732
static UE_FORCEINLINE_HINT void * Memcpy(void *Dest, const void *Src, SIZE_T Count)
Definition UnrealMemory.h:160