22 #ifdef KY_ENABLE_SOCKETS
44 class DiscardMessageHandler :
public IMessageHandler
46 virtual void Handle(Message*)
51 virtual Mode HandleMode() {
return Handle_AfterDecompression; }
60 class ThreadMgr :
public RefCountBase<ThreadMgr, StatAmp_Server>
63 ThreadMgr(
bool initSocketLib, SendInterface* sendCallback,
64 ConnStatusInterface* connectionCallback, Kaim::Event* sendQueueWaitEvent,
65 SocketImplFactory* socketFactory, MessageTypeRegistry* msgTypeRegistry = NULL,
66 SInt32 processorForSocketLoop = -1,
SInt32 processorForCompressLoop = -1);
70 void ForceConnectionBySkippingSetupMessages() { DoForceConnectionBySkippingSetupMessages =
true; }
71 void InitSync(
const char* ipAddress,
UInt32 port,
UInt32 broadcastPort);
72 bool ConnectSync(
UInt32 timeoutMs);
73 void StartBroadcastRecvSync(
UInt32 port);
75 bool InitAsync(
const char* ipAddress,
UInt32 port,
UInt32 broadcastPort);
77 void StartBroadcastRecv(
UInt32 port);
79 void SetBroadcastInfo(
const char* connectedApp,
const char* connectedFile);
85 void SetHeartbeatInterval(
UInt32 heartbeatInterval);
88 const StringLH& GetAddress()
const {
return IpAddress; }
89 UInt32 GetPort()
const {
return Port; }
90 UInt32 GetBroadcastRecvPort()
const {
return BroadcastRecvPort; }
92 ConnStatusInterface::StatusType GetConnectionStatus()
const;
94 UInt32 GetReceivedQueueSize()
const {
return MsgReceivedQueue.GetSize(); }
95 UInt32 GetUncompressedQueueSize()
const {
return MsgUncompressedQueue.GetSize(); }
96 UInt32 GetSendQueueSize()
const {
return MsgSendQueue.GetSize(); }
97 UInt32 GetCompressedQueueSize()
const {
return MsgCompressedQueue.GetSize(); }
98 UInt32 GetSendRate()
const {
return SendRate; }
99 UInt32 GetReceiveRate()
const {
return ReceiveRate; }
101 bool IsReceptionPipelineEmpty()
const {
return (OutOfQueueInReceptionMessages + GetUncompressedQueueSize() + GetReceivedQueueSize()) == 0; }
102 bool IsEmissionPipelineEmpty()
const {
return GetTotalMessagesInEmissionPipeline() == 0; }
104 UInt32 GetTotalMessagesInEmissionPipeline()
const {
return (MessageInEmissionStream + OutOfQueueInEmissionMessages + GetCompressedQueueSize() + GetSendQueueSize()); }
107 bool IsValidSocket();
109 bool IsValidConnection();
114 UnregisteredMessageType,
115 UnversionnedMessageType
117 MessageValidity SendAmpMessage(Message* msg);
118 void UpdateSync(
bool & wereActionPerformed);
119 bool BroadcastRecv();
122 void SendMultipleAmpMessages(List<Message>& msgList,
UInt32 listSize)
124 MsgSendQueue.PushListToBack(msgList, listSize);
129 void SendLog(
const String& logMessage, LogMessageId messageType);
131 void ClearAllMessagesToSend();
133 Ptr<Message> GetNextReceivedMessage();
134 void GetAllReceivedMessages(List<Message>& msgList,
UInt32& listSize)
136 MsgUncompressedQueue.PopToList(msgList, listSize);
140 void ClearReceivedMessages(
const Message* msg);
142 const MessageTypeRegistry& GetMsgTypeRegistry()
const {
return MsgTypeRegistry; }
145 Message* CreateAndReadMessage(File& str);
149 void CreateConnectionSetupMessages(List<Message>& msgList);
154 ExitStatus_NotExiting = 0,
155 ExitStatus_Immediate = 1,
156 ExitStatus_WaitForEmissionEmptiness = 2,
157 ExitStatus_WaitForReceptionEmptiness = 3,
160 ExitStatus GetExitStatus()
169 std::atomic<UInt32> SendStatLoop;
170 std::atomic<UInt32> SendStatTime;
171 std::atomic<UInt32> MaxSendStatTime;
172 std::atomic<UInt32> MaxSendRate;
182 DefaultHeartbeatIntervalMillisecs = 1000,
192 ~MsgQueue() { Clear(); }
193 void PushBack(Message* msg);
196 void ClearMsgType(
const Message* msg);
199 void PushListToBack(List<Message>& msgList,
UInt32 listSize);
200 void PopToList(List<Message>& msgList,
UInt32& listSize);
206 std::atomic<UInt32> Size;
210 class SendReceiveContext :
public NewOverrideBase<Stat_Default_Mem>
213 SendReceiveContext(Ptr<AmpStream> streamSend, Ptr<AmpStream> streamReceived);
218 UPInt StreamSendDataLeft;
219 const char* SendBuffer;
222 UInt32 MaxSilentFrameCount;
223 LogSilentMode SilentMode;
228 char BufferReceived[ThreadMgr::BufferSize];
230 Ptr<AmpStream> StreamSend;
231 Ptr<AmpStream> StreamReceived;
234 class BroadCastRcvContext :
public NewOverrideBase<Stat_Default_Mem>
237 BroadCastRcvContext(
bool initSocketLib, SocketImplFactory* socketFactory)
238 : WiiuSocket(initSocketLib, socketFactory)
242 ~BroadCastRcvContext()
244 WiiuSocket.Destroy();
249 char WiiuBuffer[BufferSize];
250 char BufferReceived[BufferSize];
254 SInt32 ProcessorForSocketLoop;
255 SInt32 ProcessorForCompressLoop;
256 Ptr<Thread> SocketThread;
257 Ptr<Thread> BroadcastThread;
258 Ptr<Thread> BroadcastRecvThread;
259 Ptr<Thread> CompressThread;
262 Lock BroadcastInfoLock;
264 StringLH BroadcastApp;
265 StringLH BroadcastFile;
270 bool DoForceConnectionBySkippingSetupMessages;
273 BroadcastSocket BroadcastSock;
274 SendReceiveContext* SendReceiveCallContext;
275 BroadCastRcvContext* BroadCastRcvCallContext;
278 mutable Lock InitLock;
279 mutable Lock StatusLock;
280 std::atomic<ExitStatus> Exiting;
283 ConnStatusInterface::StatusType ConnectionStatus;
285 std::atomic<UInt32> SendRate;
286 std::atomic<UInt32> ReceiveRate;
289 MsgQueue MsgReceivedQueue;
290 MsgQueue MsgSendQueue;
291 MsgQueue MsgUncompressedQueue;
292 MsgQueue MsgCompressedQueue;
293 Kaim::Event* SendQueueWaitEvent;
294 UInt32 LimitCheckHysterisisPercent;
295 void CheckHeapLimit();
297 std::atomic<UInt32> OutOfQueueInEmissionMessages;
298 std::atomic<UInt32> MessageInEmissionStream;
299 std::atomic<UInt32> OutOfQueueInReceptionMessages;
302 SendInterface* SendCallback;
303 ConnStatusInterface* ConnectionChangedCallback;
304 SocketImplFactory* SocketFactory;
306 MessageTypeRegistry MsgTypeRegistry;
307 friend class DisconnectionMessageHandler;
310 static int SocketThreadLoop(Thread* sendThread,
void* param);
311 static int BroadcastThreadLoop(Thread* broadcastThread,
void* param);
312 static int BroadcastRecvThreadLoop(Thread* broadcastThread,
void* param);
313 static int CompressThreadLoop(Thread* uncompressThread,
void* param);
318 bool SendReceiveCall(
bool & wereActionPerformed);
319 bool Send(SendReceiveContext& context,
bool& couldNotSend);
320 bool Receive(SendReceiveContext& context,
bool& couldNotReceive);
323 bool IsServer()
const;
324 bool IsRunning()
const;
325 bool IsExiting()
const;
326 void ResetLastReceivedTime();
327 bool SocketConnect();
328 Message* RetrieveMessageForSending();
329 bool SendReceiveLoop();
330 bool BroadcastLoop();
331 bool BroadcastRecvLoop();
333 void UpdateStatus(ConnStatusInterface::StatusType eStatus,
const char* pcMessage);
336 void UpdateLastReceivedTime(
bool updateStatus);
339 std::atomic<UInt32> HeartbeatIntervalMillisecs;
346 inline Locker(
bool activated, Lock *plock)
348 pLock = activated ? plock :
nullptr;
354 {
if (pLock) pLock->Unlock(); }
364 DbgML_SocketThread = 1,
365 DbgML_CompressThread = 1<<1,
366 DbgML_BroadcastThread = 1<<2,
369 DbgML_Emission = 1<<3,
370 DbgML_Reception = 1<<4,
371 DbgML_SleepTime = 1<<5,
375 DbgML_HeartbeatTolerance = 1<<7,
390 void SetDebugMsgLog(
UInt32 DebugMsgLogEnum_Mask);
std::int32_t SInt32
int32_t
Definition: SF_Types.h:131
std::uint32_t UInt32
uint32_t
Definition: SF_Types.h:137
The Autodesk Navigation namespace.
Definition: gamekitcrowddispersion.cpp:17
std::uint64_t UInt64
uint64_t
Definition: SF_Types.h:138