gwnavruntime/visualdebug/amp/Amp_ThreadMgr.h Source File

Amp_ThreadMgr.h
Go to the documentation of this file.
1 /*
2 * Copyright 2016 Autodesk, Inc. All rights reserved.
3 * Use of this software is subject to the terms of the Autodesk license agreement and any attachments or Appendices thereto provided at the time of installation or download,
4 * or which otherwise accompanies this software in either electronic or hard copy form, or which is signed by you and accepted by Autodesk.
5 */
6 
7 /**************************************************************************
8 
9 Filename : Amp_ThreadMgr.h
10 Content : Manages the socket threads
11 Created : December 2009
12 Authors : Alex Mantzaris
13 
14 **************************************************************************/
15 
16 
17 #pragma once
18 
19 
21 
22 #ifdef KY_ENABLE_SOCKETS
23 
33 
34 
35 namespace Kaim {
36  // Forward declarations
37  class Lock;
38  class Event;
39 namespace Net {
40 namespace AMP {
41 
42 
43 // Utility class allows to discard messages as soon as they arrive
44 class DiscardMessageHandler : public IMessageHandler
45 {
46  virtual void Handle(Message*)
47  {
48  //Do nothing, it allows the ThreadMgr to discard messages as they arrive
49  }
50 
51  virtual Mode HandleMode() { return Handle_AfterDecompression; }
52 };
53 
54 // Manages the socket connection threads for AMP
55 // An instance of this class is contained both in the AMP server singleton and in the AMP client
56 // The caller can pass in callback objects for notification
57 // The SendInterface::OnSendLoop gets called from the socket thread for app-specific processing
58 // The ConnStatusInterface::OnStatusChanged gets called whenever the connection status is updated
59 // A MsgHandler can be specified for immediate message handling, bypassing the received queue (for AS debugger)
60 class ThreadMgr : public RefCountBase<ThreadMgr, StatAmp_Server>
61 {
62 public:
63  ThreadMgr(bool initSocketLib, SendInterface* sendCallback,
64  ConnStatusInterface* connectionCallback, Kaim::Event* sendQueueWaitEvent,
65  SocketImplFactory* socketFactory, MessageTypeRegistry* msgTypeRegistry = NULL,
66  SInt32 processorForSocketLoop = -1, SInt32 processorForCompressLoop = -1);
67  virtual ~ThreadMgr();
68 
69  // Initialize
70  void ForceConnectionBySkippingSetupMessages() { DoForceConnectionBySkippingSetupMessages = true; }
71  void InitSync(const char* ipAddress, UInt32 port, UInt32 broadcastPort);
72  bool ConnectSync(UInt32 timeoutMs);
73  void StartBroadcastRecvSync(UInt32 port);
74  // Initialize AMP - the threads for sending and receiving messages
75  bool InitAsync(const char* ipAddress, UInt32 port, UInt32 broadcastPort);
76  // Initialize the broadcast receive thread
77  void StartBroadcastRecv(UInt32 port);
78  // Set the connected app
79  void SetBroadcastInfo(const char* connectedApp, const char* connectedFile);
80 
81  // Uninitialize AMP - Performs thread cleanup
82  void UninitAmp();
83 
84  // Heartbeat interval
85  void SetHeartbeatInterval(UInt32 heartbeatInterval);
86 
87  // Accessors
88  const StringLH& GetAddress() const { return IpAddress; }
89  UInt32 GetPort() const { return Port; }
90  UInt32 GetBroadcastRecvPort() const { return BroadcastRecvPort; }
91 
92  ConnStatusInterface::StatusType GetConnectionStatus() const;
93 
94  UInt32 GetReceivedQueueSize() const { return MsgReceivedQueue.GetSize(); }
95  UInt32 GetUncompressedQueueSize() const { return MsgUncompressedQueue.GetSize(); }
96  UInt32 GetSendQueueSize() const { return MsgSendQueue.GetSize(); }
97  UInt32 GetCompressedQueueSize() const { return MsgCompressedQueue.GetSize(); }
98  UInt32 GetSendRate() const { return SendRate; }
99  UInt32 GetReceiveRate() const { return ReceiveRate; }
100 
101  bool IsReceptionPipelineEmpty() const { return (OutOfQueueInReceptionMessages + GetUncompressedQueueSize() + GetReceivedQueueSize()) == 0; }
102  bool IsEmissionPipelineEmpty() const { return GetTotalMessagesInEmissionPipeline() == 0; }
103 
104  UInt32 GetTotalMessagesInEmissionPipeline() const { return (MessageInEmissionStream + OutOfQueueInEmissionMessages + GetCompressedQueueSize() + GetSendQueueSize()); }
105 
106  // Has a socket been successfully created? Thread-safe
107  bool IsValidSocket();
108  // Has a message been received from the other end recently? Thread-safe
109  bool IsValidConnection();
110  // Place a message on the queue to be sent. Thread-safe
111  enum MessageValidity
112  {
113  MessageIsValid = 0,
114  UnregisteredMessageType,
115  UnversionnedMessageType
116  };
117  MessageValidity SendAmpMessage(Message* msg);
118  void UpdateSync(bool & wereActionPerformed); //< should only be called if init with InitSync
119  bool BroadcastRecv(); //< automatically called in secondary thread in Async, or must be called regularly in Sync mode
120 
121  // User MUST ensure that messages are valid
122  void SendMultipleAmpMessages(List<Message>& msgList, UInt32 listSize)
123  {
124  MsgSendQueue.PushListToBack(msgList, listSize);
125  CheckHeapLimit();
126  }
127 
128  // Send a log message. Thread-safe
129  void SendLog(const String& logMessage, LogMessageId messageType);
130 
131  void ClearAllMessagesToSend();
132  // Retrieve the next received message from the queue. Thread-safe
133  Ptr<Message> GetNextReceivedMessage();
134  void GetAllReceivedMessages(List<Message>& msgList, UInt32& listSize)
135  {
136  MsgUncompressedQueue.PopToList(msgList, listSize);
137  }
138 
139  // Clear all messages of the specified type from the received queue
140  void ClearReceivedMessages(const Message* msg);
141 
142  const MessageTypeRegistry& GetMsgTypeRegistry() const { return MsgTypeRegistry; } // only provided as const because changing it while running would be unsafe
143  // UNSAFE: This function should only be accessed from outside when ThreadMgr is running synchronous (InitSync, ConnectSync, UpdateSync..)
144  // Read a stream and creates a message of the appropriate type
145  Message* CreateAndReadMessage(File& str);
146 
147  // UNSAFE: This function should only be accessed from outside when ThreadMgr is running synchronous (InitSync, ConnectSync, UpdateSync..)
148  // Create messages expected to be sent for connection
149  void CreateConnectionSetupMessages(List<Message>& msgList);
150 
151  // used internally
152  enum ExitStatus
153  {
154  ExitStatus_NotExiting = 0, // not exiting
155  ExitStatus_Immediate = 1, // exit immediately
156  ExitStatus_WaitForEmissionEmptiness = 2, // exit once all queued messages were sent including the disconnection message
157  ExitStatus_WaitForReceptionEmptiness = 3, // exit once all queued messages were received
158  };
159 
160  ExitStatus GetExitStatus()
161  {
162  return Exiting;
163  }
164 
165 public:
166 
167 
168  // Stats
169  std::atomic<UInt32> SendStatLoop;
170  std::atomic<UInt32> SendStatTime;
171  std::atomic<UInt32> MaxSendStatTime;
172  std::atomic<UInt32> MaxSendRate;
173 
174 
175 private:
176  enum
177  {
178  // maximum packet size - larger messages are broken up
179  BufferSize = 8*1024, // 8 kBytes is the default size of Wii U socket's internal buffers
180 
181  // If no message has been sent for this long, we send a heartbeat message
182  DefaultHeartbeatIntervalMillisecs = 1000,
183  };
184 
185 
186  // MsgQueue encapsulates a message queue
187  // It is a thread-safe list that keeps track of its size
188  class MsgQueue
189  {
190  public:
191  MsgQueue();
192  ~MsgQueue() { Clear(); }
193  void PushBack(Message* msg);
194  Message* PopFront();
195  void Clear();
196  void ClearMsgType(const Message* msg);
197  UInt32 GetSize() const;
198 
199  void PushListToBack(List<Message>& msgList, UInt32 listSize);
200  void PopToList(List<Message>& msgList, UInt32& listSize);
201 
202  public: // internal
203  bool IsAsync;
204  Lock QueueLock;
205  List<Message> Queue;
206  std::atomic<UInt32> Size;
207  };
208 
209 
210  class SendReceiveContext : public NewOverrideBase<Stat_Default_Mem>
211  {
212  public:
213  SendReceiveContext(Ptr<AmpStream> streamSend, Ptr<AmpStream> streamReceived);
214 
215  UInt64 LastSampleTime;
216  UInt32 BytesSent;
217  UInt32 BytesReceived;
218  UPInt StreamSendDataLeft;
219  const char* SendBuffer;
220 
221  UInt32 SilentFrameCount;
222  UInt32 MaxSilentFrameCount;
223  LogSilentMode SilentMode;
224 
225  UInt32 SendStatLoop;
226  UInt64 SendStatTicks;
227 
228  char BufferReceived[ThreadMgr::BufferSize];
229 
230  Ptr<AmpStream> StreamSend;
231  Ptr<AmpStream> StreamReceived;
232  };
233 
234  class BroadCastRcvContext : public NewOverrideBase<Stat_Default_Mem>
235  {
236  public:
237  BroadCastRcvContext(bool initSocketLib, SocketImplFactory* socketFactory)
238  : WiiuSocket(initSocketLib, socketFactory)
239  , LastWiiuAttempt(0)
240  {}
241 
242  ~BroadCastRcvContext()
243  {
244  WiiuSocket.Destroy();
245  }
246 
247  Socket WiiuSocket;
248  UInt64 LastWiiuAttempt;
249  char WiiuBuffer[BufferSize];
250  char BufferReceived[BufferSize];
251  };
252 
253  bool IsAsync;
254  SInt32 ProcessorForSocketLoop;
255  SInt32 ProcessorForCompressLoop;
256  Ptr<Thread> SocketThread; // Takes messages from the send queue and sends them
257  Ptr<Thread> BroadcastThread; // Broadcasts listener socket IP address and port
258  Ptr<Thread> BroadcastRecvThread; // Listens for broadcast messages
259  Ptr<Thread> CompressThread; // Uncompresses/compresses received/sending messages
260 
261  UInt32 Port;
262  Lock BroadcastInfoLock;
263  UInt32 BroadcastPort;
264  StringLH BroadcastApp;
265  StringLH BroadcastFile;
266  UInt32 BroadcastRecvPort;
267  StringLH IpAddress;
268  bool Server; // Server or Client?
269  bool InitSocketLib;
270  bool DoForceConnectionBySkippingSetupMessages; // typically used to skip RegistryMessage and SetupMessage when simulating socket from file;
271  Lock SocketLock;
272  Socket Sock;
273  BroadcastSocket BroadcastSock; // used for receive _XOR_ send
274  SendReceiveContext* SendReceiveCallContext;
275  BroadCastRcvContext* BroadCastRcvCallContext;
276 
277  // Status is checked from send and received threads
278  mutable Lock InitLock;
279  mutable Lock StatusLock;
280  std::atomic<ExitStatus> Exiting;
281  UInt64 LastSendHeartbeat;
282  UInt64 LastRcvdHeartbeat;
283  ConnStatusInterface::StatusType ConnectionStatus;
284  bool LastConnected;
285  std::atomic<UInt32> SendRate;
286  std::atomic<UInt32> ReceiveRate;
287 
288  // Message queues
289  MsgQueue MsgReceivedQueue;
290  MsgQueue MsgSendQueue;
291  MsgQueue MsgUncompressedQueue;
292  MsgQueue MsgCompressedQueue;
293  Kaim::Event* SendQueueWaitEvent;
294  UInt32 LimitCheckHysterisisPercent;
295  void CheckHeapLimit();
296 
297  std::atomic<UInt32> OutOfQueueInEmissionMessages; // used to maintained a count of messages that are treated but out of MsgCompressedQueue and MsgSendQueue
298  std::atomic<UInt32> MessageInEmissionStream; // Indicates if there's a message copied in AmpStream
299  std::atomic<UInt32> OutOfQueueInReceptionMessages; // used to maintained a count of messages that are treated but out of MsgUncompressedQueue and MsgReceivedQueue
300 
301  // Callback interfaces
302  SendInterface* SendCallback; // send thread callback
303  ConnStatusInterface* ConnectionChangedCallback; // connection changed callback
304  SocketImplFactory* SocketFactory;
305 
306  MessageTypeRegistry MsgTypeRegistry;
307  friend class DisconnectionMessageHandler;
308 
309  // Thread functions
310  static int SocketThreadLoop(Thread* sendThread, void* param);
311  static int BroadcastThreadLoop(Thread* broadcastThread, void* param);
312  static int BroadcastRecvThreadLoop(Thread* broadcastThread, void* param);
313  static int CompressThreadLoop(Thread* uncompressThread, void* param);
314 
315  // Private helper functions
316  bool UnCompress();
317  bool Compress();
318  bool SendReceiveCall(bool & wereActionPerformed);
319  bool Send(SendReceiveContext& context, bool& couldNotSend);
320  bool Receive(SendReceiveContext& context, bool& couldNotReceive);
321  int Broadcast();
322 
323  bool IsServer() const;
324  bool IsRunning() const;
325  bool IsExiting() const;
326  void ResetLastReceivedTime();
327  bool SocketConnect();
328  Message* RetrieveMessageForSending();
329  bool SendReceiveLoop();
330  bool BroadcastLoop();
331  bool BroadcastRecvLoop();
332  bool CompressLoop();
333  void UpdateStatus(ConnStatusInterface::StatusType eStatus, const char* pcMessage);
334 
335 public: //internal
336  void UpdateLastReceivedTime(bool updateStatus);
337 
338 public: //internal
339  std::atomic<UInt32> HeartbeatIntervalMillisecs;
340 };
341 
342 // helper class to enable or disable lock based on a boolean variable
343 class Locker
344 {
345 public:
346  inline Locker(bool activated, Lock *plock)
347  {
348  pLock = activated ? plock : nullptr;
349  if (pLock)
350  pLock->DoLock();
351  }
352 
353  inline ~Locker()
354  { if (pLock) pLock->Unlock(); }
355 
356 public:
357  Lock *pLock;
358 };
359 
360 // helper functions, their implementation only exists in debug
361 enum DebugMsgLogEnum
362 {
363  // Following values activate messages for specific Threads they must be completed by other values:
364  DbgML_SocketThread = 1,
365  DbgML_CompressThread = 1<<1,
366  DbgML_BroadcastThread = 1<<2,
367 
368  // Following values should be completed with DbgML_*Thread masks :
369  DbgML_Emission = 1<<3, // activate log for messages in the Emission pipeline (Send, Compress ...)
370  DbgML_Reception = 1<<4, // activate log for messages in the reception pipeline (Receive, Uncompress ...)
371  DbgML_SleepTime = 1<<5, // activate log in order to know when a thread is set to sleep
372 
373  // Independent values
374  DbgML_Rate = 1<<6, // activate log for displaying reception and emission rate
375  DbgML_HeartbeatTolerance = 1<<7, // if set the Heartbeat Interval is multiplied by 100
376 };
377 
378 
379 
380 } // namespace AMP
381 } // namespace Net
382 } // namespace Kaim
383 
384 #endif
385 
386 
387 namespace Kaim {
388 namespace Net {
389 namespace AMP {
390 void SetDebugMsgLog(UInt32 DebugMsgLogEnum_Mask); // used to toggle extreme verbosity of the ThreadMgr in debug only, do nothing in other modes.
391 UInt32 GetDebugMsgLog(); //< return a mask whose values are defined in DebugMsgLogEnum
392 } // namespace AMP
393 } // namespace Net
394 } // namespace Kaim
395 
396 
std::int32_t SInt32
int32_t
Definition: SF_Types.h:131
std::uint32_t UInt32
uint32_t
Definition: SF_Types.h:137
The Autodesk Navigation namespace.
Definition: gamekitcrowddispersion.cpp:17
std::uint64_t UInt64
uint64_t
Definition: SF_Types.h:138