17 #ifndef INC_KY_AMP_THREAD_MGR_H
18 #define INC_KY_AMP_THREAD_MGR_H
23 #ifdef KY_ENABLE_SOCKETS
45 class DiscardMessageHandler :
public IMessageHandler
47 virtual void Handle(Message*)
52 virtual Mode HandleMode() {
return Handle_AfterDecompression; }
61 class ThreadMgr :
public RefCountBase<ThreadMgr, StatAmp_Server>
64 ThreadMgr(
bool initSocketLib, SendInterface* sendCallback,
65 ConnStatusInterface* connectionCallback, Kaim::Event* sendQueueWaitEvent,
66 SocketImplFactory* socketFactory, MessageTypeRegistry* msgTypeRegistry = NULL,
67 SInt32 processorForSocketLoop = -1, SInt32 processorForCompressLoop = -1);
71 void ForceConnectionBySkippingSetupMessages() { DoForceConnectionBySkippingSetupMessages =
true; }
72 void InitSync(
const char* ipAddress, UInt32 port, UInt32 broadcastPort);
73 bool ConnectSync(UInt32 timeoutMs);
74 void StartBroadcastRecvSync(UInt32 port);
75 #if defined(KY_ENABLE_THREADS)
77 bool InitAsync(
const char* ipAddress, UInt32 port, UInt32 broadcastPort);
79 void StartBroadcastRecv(UInt32 port);
82 void SetBroadcastInfo(
const char* connectedApp,
const char* connectedFile);
88 void SetHeartbeatInterval(UInt32 heartbeatInterval);
91 const StringLH& GetAddress()
const {
return IpAddress; }
92 UInt32 GetPort()
const {
return Port; }
93 UInt32 GetBroadcastRecvPort()
const {
return BroadcastRecvPort; }
95 ConnStatusInterface::StatusType GetConnectionStatus()
const;
97 UInt32 GetReceivedQueueSize()
const {
return MsgReceivedQueue.GetSize(); }
98 UInt32 GetUncompressedQueueSize()
const {
return MsgUncompressedQueue.GetSize(); }
99 UInt32 GetSendQueueSize()
const {
return MsgSendQueue.GetSize(); }
100 UInt32 GetCompressedQueueSize()
const {
return MsgCompressedQueue.GetSize(); }
101 UInt32 GetSendRate()
const {
return SendRate; }
102 UInt32 GetReceiveRate()
const {
return ReceiveRate; }
104 bool IsReceptionPipelineEmpty()
const {
return (OutOfQueueInReceptionMessages + GetUncompressedQueueSize() + GetReceivedQueueSize()) == 0; }
105 bool IsEmissionPipelineEmpty()
const {
return GetTotalMessagesInEmissionPipeline() == 0; }
107 UInt32 GetTotalMessagesInEmissionPipeline()
const {
return (MessageInEmissionStream + OutOfQueueInEmissionMessages + GetCompressedQueueSize() + GetSendQueueSize()); }
110 bool IsValidSocket();
112 bool IsValidConnection();
117 UnregisteredMessageType,
118 UnversionnedMessageType
120 MessageValidity SendAmpMessage(Message* msg);
121 void UpdateSync(
bool & wereActionPerformed);
122 bool BroadcastRecv();
125 void SendMultipleAmpMessages(List<Message>& msgList, UInt32 listSize)
127 MsgSendQueue.PushListToBack(msgList, listSize);
132 void SendLog(
const String& logMessage, LogMessageId messageType);
134 void ClearAllMessagesToSend();
136 Ptr<Message> GetNextReceivedMessage();
137 void GetAllReceivedMessages(List<Message>& msgList, UInt32& listSize)
139 MsgUncompressedQueue.PopToList(msgList, listSize);
143 void ClearReceivedMessages(
const Message* msg);
145 const MessageTypeRegistry& GetMsgTypeRegistry()
const {
return MsgTypeRegistry; }
148 Message* CreateAndReadMessage(File& str);
152 void CreateConnectionSetupMessages(List<Message>& msgList);
157 ExitStatus_NotExiting = 0,
158 ExitStatus_Immediate = 1,
159 ExitStatus_WaitForEmissionEmptiness = 2,
160 ExitStatus_WaitForReceptionEmptiness = 3,
163 ExitStatus GetExitStatus()
172 AtomicInt<UInt32> SendStatLoop;
173 AtomicInt<UInt32> SendStatTime;
174 AtomicInt<UInt32> MaxSendStatTime;
175 AtomicInt<UInt32> MaxSendRate;
186 DefaultHeartbeatIntervalMillisecs = 0,
188 DefaultHeartbeatIntervalMillisecs = 1000,
199 ~MsgQueue() { Clear(); }
200 void PushBack(Message* msg);
203 void ClearMsgType(
const Message* msg);
204 UInt32 GetSize()
const;
206 void PushListToBack(List<Message>& msgList, UInt32 listSize);
207 void PopToList(List<Message>& msgList, UInt32& listSize);
213 AtomicInt<UInt32> Size;
217 class SendReceiveContext :
public NewOverrideBase<Stat_Default_Mem>
220 SendReceiveContext(Ptr<AmpStream> streamSend, Ptr<AmpStream> streamReceived);
222 UInt64 LastSampleTime;
224 UInt32 BytesReceived;
225 UPInt StreamSendDataLeft;
226 const char* SendBuffer;
228 UInt32 SilentFrameCount;
229 UInt32 MaxSilentFrameCount;
230 LogSilentMode SilentMode;
233 UInt64 SendStatTicks;
235 char BufferReceived[ThreadMgr::BufferSize];
237 Ptr<AmpStream> StreamSend;
238 Ptr<AmpStream> StreamReceived;
241 class BroadCastRcvContext :
public NewOverrideBase<Stat_Default_Mem>
244 BroadCastRcvContext(
bool initSocketLib, SocketImplFactory* socketFactory)
245 : WiiuSocket(initSocketLib, socketFactory)
249 ~BroadCastRcvContext()
251 WiiuSocket.Destroy();
255 UInt64 LastWiiuAttempt;
256 char WiiuBuffer[BufferSize];
257 char BufferReceived[BufferSize];
261 SInt32 ProcessorForSocketLoop;
262 SInt32 ProcessorForCompressLoop;
263 #if defined(KY_ENABLE_THREADS)
264 Ptr<Thread> SocketThread;
265 Ptr<Thread> BroadcastThread;
266 Ptr<Thread> BroadcastRecvThread;
267 Ptr<Thread> CompressThread;
271 Lock BroadcastInfoLock;
272 UInt32 BroadcastPort;
273 StringLH BroadcastApp;
274 StringLH BroadcastFile;
275 UInt32 BroadcastRecvPort;
279 bool DoForceConnectionBySkippingSetupMessages;
282 BroadcastSocket BroadcastSock;
283 SendReceiveContext* SendReceiveCallContext;
284 BroadCastRcvContext* BroadCastRcvCallContext;
287 mutable Lock InitLock;
288 mutable Lock StatusLock;
289 AtomicInt<ExitStatus> Exiting;
290 UInt64 LastSendHeartbeat;
291 UInt64 LastRcvdHeartbeat;
292 ConnStatusInterface::StatusType ConnectionStatus;
294 AtomicInt<UInt32> SendRate;
295 AtomicInt<UInt32> ReceiveRate;
298 MsgQueue MsgReceivedQueue;
299 MsgQueue MsgSendQueue;
300 MsgQueue MsgUncompressedQueue;
301 MsgQueue MsgCompressedQueue;
302 Kaim::Event* SendQueueWaitEvent;
303 UInt32 LimitCheckHysterisisPercent;
304 void CheckHeapLimit();
306 AtomicInt<UInt32> OutOfQueueInEmissionMessages;
307 AtomicInt<UInt32> MessageInEmissionStream;
308 AtomicInt<UInt32> OutOfQueueInReceptionMessages;
311 SendInterface* SendCallback;
312 ConnStatusInterface* ConnectionChangedCallback;
313 SocketImplFactory* SocketFactory;
315 MessageTypeRegistry MsgTypeRegistry;
316 friend class DisconnectionMessageHandler;
318 #if defined(KY_ENABLE_THREADS)
320 static int SocketThreadLoop(Thread* sendThread,
void* param);
321 static int BroadcastThreadLoop(Thread* broadcastThread,
void* param);
322 static int BroadcastRecvThreadLoop(Thread* broadcastThread,
void* param);
323 static int CompressThreadLoop(Thread* uncompressThread,
void* param);
329 bool SendReceiveCall(
bool & wereActionPerformed);
330 bool Send(SendReceiveContext& context,
bool& couldNotSend);
331 bool Receive(SendReceiveContext& context,
bool& couldNotReceive);
334 bool IsServer()
const;
335 bool IsRunning()
const;
336 bool IsExiting()
const;
337 void ResetLastReceivedTime();
338 bool SocketConnect();
339 Message* RetrieveMessageForSending();
340 bool SendReceiveLoop();
341 bool BroadcastLoop();
342 bool BroadcastRecvLoop();
344 void UpdateStatus(ConnStatusInterface::StatusType eStatus,
const char* pcMessage);
347 void UpdateLastReceivedTime(
bool updateStatus);
350 AtomicInt<UInt32> HeartbeatIntervalMillisecs;
357 inline Locker(
bool activated, Lock *plock)
359 pLock = activated ? plock :
KY_NULL;
365 {
if (pLock) pLock->Unlock(); }
375 DbgML_SocketThread = 1,
376 DbgML_CompressThread = 1<<1,
377 DbgML_BroadcastThread = 1<<2,
380 DbgML_Emission = 1<<3,
381 DbgML_Reception = 1<<4,
382 DbgML_SleepTime = 1<<5,
386 DbgML_HeartbeatTolerance = 1<<7,
401 void SetDebugMsgLog(UInt32 DebugMsgLogEnum_Mask);
402 UInt32 GetDebugMsgLog();
408 #endif // INC_KY_AMP_THREAD_MGR_H
#define KY_NULL
Null value.
Definition: types.h:247
Definition: gamekitcrowddispersion.h:20