gwnavruntime/visualdebug/amp/Amp_ThreadMgr.h Source File

Amp_ThreadMgr.h
Go to the documentation of this file.
1 /*
2 * Copyright 2015 Autodesk, Inc. All rights reserved.
3 * Use of this software is subject to the terms of the Autodesk license agreement and any attachments or Appendices thereto provided at the time of installation or download,
4 * or which otherwise accompanies this software in either electronic or hard copy form, or which is signed by you and accepted by Autodesk.
5 */
6 
7 /**************************************************************************
8 
9 Filename : Amp_ThreadMgr.h
10 Content : Manages the socket threads
11 Created : December 2009
12 Authors : Alex Mantzaris
13 
14 **************************************************************************/
15 
16 
17 #ifndef INC_KY_AMP_THREAD_MGR_H
18 #define INC_KY_AMP_THREAD_MGR_H
19 
20 
22 
23 #ifdef KY_ENABLE_SOCKETS
24 
34 
35 
36 namespace Kaim {
37  // Forward declarations
38  class Lock;
39  class Event;
40 namespace Net {
41 namespace AMP {
42 
43 
44 // Utility class allows to discard messages as soon as they arrive
45 class DiscardMessageHandler : public IMessageHandler
46 {
47  virtual void Handle(Message*)
48  {
49  //Do nothing, it allows the ThreadMgr to discard messages as they arrive
50  }
51 
52  virtual Mode HandleMode() { return Handle_AfterDecompression; }
53 };
54 
55 // Manages the socket connection threads for AMP
56 // An instance of this class is contained both in the AMP server singleton and in the AMP client
57 // The caller can pass in callback objects for notification
58 // The SendInterface::OnSendLoop gets called from the socket thread for app-specific processing
59 // The ConnStatusInterface::OnStatusChanged gets called whenever the connection status is updated
60 // A MsgHandler can be specified for immediate message handling, bypassing the received queue (for AS debugger)
61 class ThreadMgr : public RefCountBase<ThreadMgr, StatAmp_Server>
62 {
63 public:
64  ThreadMgr(bool initSocketLib, SendInterface* sendCallback,
65  ConnStatusInterface* connectionCallback, Kaim::Event* sendQueueWaitEvent,
66  SocketImplFactory* socketFactory, MessageTypeRegistry* msgTypeRegistry = NULL,
67  SInt32 processorForSocketLoop = -1, SInt32 processorForCompressLoop = -1);
68  virtual ~ThreadMgr();
69 
70  // Initialize
71  void ForceConnectionBySkippingSetupMessages() { DoForceConnectionBySkippingSetupMessages = true; }
72  void InitSync(const char* ipAddress, UInt32 port, UInt32 broadcastPort);
73  bool ConnectSync(UInt32 timeoutMs);
74  void StartBroadcastRecvSync(UInt32 port);
75 #if defined(KY_ENABLE_THREADS)
76  // Initialize AMP - the threads for sending and receiving messages
77  bool InitAsync(const char* ipAddress, UInt32 port, UInt32 broadcastPort);
78  // Initialize the broadcast receive thread
79  void StartBroadcastRecv(UInt32 port);
80 #endif
81  // Set the connected app
82  void SetBroadcastInfo(const char* connectedApp, const char* connectedFile);
83 
84  // Uninitialize AMP - Performs thread cleanup
85  void UninitAmp();
86 
87  // Heartbeat interval
88  void SetHeartbeatInterval(UInt32 heartbeatInterval);
89 
90  // Accessors
91  const StringLH& GetAddress() const { return IpAddress; }
92  UInt32 GetPort() const { return Port; }
93  UInt32 GetBroadcastRecvPort() const { return BroadcastRecvPort; }
94 
95  ConnStatusInterface::StatusType GetConnectionStatus() const;
96 
97  UInt32 GetReceivedQueueSize() const { return MsgReceivedQueue.GetSize(); }
98  UInt32 GetUncompressedQueueSize() const { return MsgUncompressedQueue.GetSize(); }
99  UInt32 GetSendQueueSize() const { return MsgSendQueue.GetSize(); }
100  UInt32 GetCompressedQueueSize() const { return MsgCompressedQueue.GetSize(); }
101  UInt32 GetSendRate() const { return SendRate; }
102  UInt32 GetReceiveRate() const { return ReceiveRate; }
103 
104  bool IsReceptionPipelineEmpty() const { return (OutOfQueueInReceptionMessages + GetUncompressedQueueSize() + GetReceivedQueueSize()) == 0; }
105  bool IsEmissionPipelineEmpty() const { return GetTotalMessagesInEmissionPipeline() == 0; }
106 
107  UInt32 GetTotalMessagesInEmissionPipeline() const { return (MessageInEmissionStream + OutOfQueueInEmissionMessages + GetCompressedQueueSize() + GetSendQueueSize()); }
108 
109  // Has a socket been successfully created? Thread-safe
110  bool IsValidSocket();
111  // Has a message been received from the other end recently? Thread-safe
112  bool IsValidConnection();
113  // Place a message on the queue to be sent. Thread-safe
114  enum MessageValidity
115  {
116  MessageIsValid = 0,
117  UnregisteredMessageType,
118  UnversionnedMessageType
119  };
120  MessageValidity SendAmpMessage(Message* msg);
121  void UpdateSync(bool & wereActionPerformed); //< should only be called if init with InitSync
122  bool BroadcastRecv(); //< automatically called in secondary thread in Async, or must be called regularly in Sync mode
123 
124  // User MUST ensure that messages are valid
125  void SendMultipleAmpMessages(List<Message>& msgList, UInt32 listSize)
126  {
127  MsgSendQueue.PushListToBack(msgList, listSize);
128  CheckHeapLimit();
129  }
130 
131  // Send a log message. Thread-safe
132  void SendLog(const String& logMessage, LogMessageId messageType);
133 
134  void ClearAllMessagesToSend();
135  // Retrieve the next received message from the queue. Thread-safe
136  Ptr<Message> GetNextReceivedMessage();
137  void GetAllReceivedMessages(List<Message>& msgList, UInt32& listSize)
138  {
139  MsgUncompressedQueue.PopToList(msgList, listSize);
140  }
141 
142  // Clear all messages of the specified type from the received queue
143  void ClearReceivedMessages(const Message* msg);
144 
145  const MessageTypeRegistry& GetMsgTypeRegistry() const { return MsgTypeRegistry; } // only provided as const because changing it while running would be unsafe
146  // UNSAFE: This function should only be accessed from outside when ThreadMgr is running synchronous (InitSync, ConnectSync, UpdateSync..)
147  // Read a stream and creates a message of the appropriate type
148  Message* CreateAndReadMessage(File& str);
149 
150  // UNSAFE: This function should only be accessed from outside when ThreadMgr is running synchronous (InitSync, ConnectSync, UpdateSync..)
151  // Create messages expected to be sent for connection
152  void CreateConnectionSetupMessages(List<Message>& msgList);
153 
154  // used internally
155  enum ExitStatus
156  {
157  ExitStatus_NotExiting = 0, // not exiting
158  ExitStatus_Immediate = 1, // exit immediately
159  ExitStatus_WaitForEmissionEmptiness = 2, // exit once all queued messages were sent including the disconnection message
160  ExitStatus_WaitForReceptionEmptiness = 3, // exit once all queued messages were received
161  };
162 
163  ExitStatus GetExitStatus()
164  {
165  return Exiting;
166  }
167 
168 public:
169 
170 
171  // Stats
172  AtomicInt<UInt32> SendStatLoop;
173  AtomicInt<UInt32> SendStatTime;
174  AtomicInt<UInt32> MaxSendStatTime;
175  AtomicInt<UInt32> MaxSendRate;
176 
177 
178 private:
179  enum
180  {
181  // maximum packet size - larger messages are broken up
182  BufferSize = 8*1024, // 8 kBytes is the default size of Wii U socket's internal buffers
183 
184  // If no message has been sent for this long, we send a heartbeat message
185 #ifdef KY_OS_WII
186  DefaultHeartbeatIntervalMillisecs = 0, // 0 means no heartbeat
187 #else
188  DefaultHeartbeatIntervalMillisecs = 1000,
189 #endif
190  };
191 
192 
193  // MsgQueue encapsulates a message queue
194  // It is a thread-safe list that keeps track of its size
195  class MsgQueue
196  {
197  public:
198  MsgQueue();
199  ~MsgQueue() { Clear(); }
200  void PushBack(Message* msg);
201  Message* PopFront();
202  void Clear();
203  void ClearMsgType(const Message* msg);
204  UInt32 GetSize() const;
205 
206  void PushListToBack(List<Message>& msgList, UInt32 listSize);
207  void PopToList(List<Message>& msgList, UInt32& listSize);
208 
209  public: // internal
210  bool IsAsync;
211  Lock QueueLock;
212  List<Message> Queue;
213  AtomicInt<UInt32> Size;
214  };
215 
216 
217  class SendReceiveContext : public NewOverrideBase<Stat_Default_Mem>
218  {
219  public:
220  SendReceiveContext(Ptr<AmpStream> streamSend, Ptr<AmpStream> streamReceived);
221 
222  UInt64 LastSampleTime;
223  UInt32 BytesSent;
224  UInt32 BytesReceived;
225  UPInt StreamSendDataLeft;
226  const char* SendBuffer;
227 
228  UInt32 SilentFrameCount;
229  UInt32 MaxSilentFrameCount;
230  LogSilentMode SilentMode;
231 
232  UInt32 SendStatLoop;
233  UInt64 SendStatTicks;
234 
235  char BufferReceived[ThreadMgr::BufferSize];
236 
237  Ptr<AmpStream> StreamSend;
238  Ptr<AmpStream> StreamReceived;
239  };
240 
241  class BroadCastRcvContext : public NewOverrideBase<Stat_Default_Mem>
242  {
243  public:
244  BroadCastRcvContext(bool initSocketLib, SocketImplFactory* socketFactory)
245  : WiiuSocket(initSocketLib, socketFactory)
246  , LastWiiuAttempt(0)
247  {}
248 
249  ~BroadCastRcvContext()
250  {
251  WiiuSocket.Destroy();
252  }
253 
254  Socket WiiuSocket;
255  UInt64 LastWiiuAttempt;
256  char WiiuBuffer[BufferSize];
257  char BufferReceived[BufferSize];
258  };
259 
260  bool IsAsync;
261  SInt32 ProcessorForSocketLoop;
262  SInt32 ProcessorForCompressLoop;
263 #if defined(KY_ENABLE_THREADS)
264  Ptr<Thread> SocketThread; // Takes messages from the send queue and sends them
265  Ptr<Thread> BroadcastThread; // Broadcasts listener socket IP address and port
266  Ptr<Thread> BroadcastRecvThread; // Listens for broadcast messages
267  Ptr<Thread> CompressThread; // Uncompresses/compresses received/sending messages
268 #endif
269 
270  UInt32 Port;
271  Lock BroadcastInfoLock;
272  UInt32 BroadcastPort;
273  StringLH BroadcastApp;
274  StringLH BroadcastFile;
275  UInt32 BroadcastRecvPort;
276  StringLH IpAddress;
277  bool Server; // Server or Client?
278  bool InitSocketLib;
279  bool DoForceConnectionBySkippingSetupMessages; // typically used to skip RegistryMessage and SetupMessage when simulating socket from file;
280  Lock SocketLock;
281  Socket Sock;
282  BroadcastSocket BroadcastSock; // used for receive _XOR_ send
283  SendReceiveContext* SendReceiveCallContext;
284  BroadCastRcvContext* BroadCastRcvCallContext;
285 
286  // Status is checked from send and received threads
287  mutable Lock InitLock;
288  mutable Lock StatusLock;
289  AtomicInt<ExitStatus> Exiting;
290  UInt64 LastSendHeartbeat;
291  UInt64 LastRcvdHeartbeat;
292  ConnStatusInterface::StatusType ConnectionStatus;
293  bool LastConnected;
294  AtomicInt<UInt32> SendRate;
295  AtomicInt<UInt32> ReceiveRate;
296 
297  // Message queues
298  MsgQueue MsgReceivedQueue;
299  MsgQueue MsgSendQueue;
300  MsgQueue MsgUncompressedQueue;
301  MsgQueue MsgCompressedQueue;
302  Kaim::Event* SendQueueWaitEvent;
303  UInt32 LimitCheckHysterisisPercent;
304  void CheckHeapLimit();
305 
306  AtomicInt<UInt32> OutOfQueueInEmissionMessages; // used to maintained a count of messages that are treated but out of MsgCompressedQueue and MsgSendQueue
307  AtomicInt<UInt32> MessageInEmissionStream; // Indicates if there's a message copied in AmpStream
308  AtomicInt<UInt32> OutOfQueueInReceptionMessages; // used to maintained a count of messages that are treated but out of MsgUncompressedQueue and MsgReceivedQueue
309 
310  // Callback interfaces
311  SendInterface* SendCallback; // send thread callback
312  ConnStatusInterface* ConnectionChangedCallback; // connection changed callback
313  SocketImplFactory* SocketFactory;
314 
315  MessageTypeRegistry MsgTypeRegistry;
316  friend class DisconnectionMessageHandler;
317 
318 #if defined(KY_ENABLE_THREADS)
319  // Thread functions
320  static int SocketThreadLoop(Thread* sendThread, void* param);
321  static int BroadcastThreadLoop(Thread* broadcastThread, void* param);
322  static int BroadcastRecvThreadLoop(Thread* broadcastThread, void* param);
323  static int CompressThreadLoop(Thread* uncompressThread, void* param);
324 #endif
325 
326  // Private helper functions
327  bool UnCompress();
328  bool Compress();
329  bool SendReceiveCall(bool & wereActionPerformed);
330  bool Send(SendReceiveContext& context, bool& couldNotSend);
331  bool Receive(SendReceiveContext& context, bool& couldNotReceive);
332  int Broadcast();
333 
334  bool IsServer() const;
335  bool IsRunning() const;
336  bool IsExiting() const;
337  void ResetLastReceivedTime();
338  bool SocketConnect();
339  Message* RetrieveMessageForSending();
340  bool SendReceiveLoop();
341  bool BroadcastLoop();
342  bool BroadcastRecvLoop();
343  bool CompressLoop();
344  void UpdateStatus(ConnStatusInterface::StatusType eStatus, const char* pcMessage);
345 
346 public: //internal
347  void UpdateLastReceivedTime(bool updateStatus);
348 
349 public: //internal
350  AtomicInt<UInt32> HeartbeatIntervalMillisecs;
351 };
352 
353 // helper class to enable or disable lock based on a boolean variable
354 class Locker
355 {
356 public:
357  inline Locker(bool activated, Lock *plock)
358  {
359  pLock = activated ? plock : KY_NULL;
360  if (pLock)
361  pLock->DoLock();
362  }
363 
364  inline ~Locker()
365  { if (pLock) pLock->Unlock(); }
366 
367 public:
368  Lock *pLock;
369 };
370 
371 // helper functions, their implementation only exists in debug
372 enum DebugMsgLogEnum
373 {
374  // Following values activate messages for specific Threads they must be completed by other values:
375  DbgML_SocketThread = 1,
376  DbgML_CompressThread = 1<<1,
377  DbgML_BroadcastThread = 1<<2,
378 
379  // Following values should be completed with DbgML_*Thread masks :
380  DbgML_Emission = 1<<3, // activate log for messages in the Emission pipeline (Send, Compress ...)
381  DbgML_Reception = 1<<4, // activate log for messages in the reception pipeline (Receive, Uncompress ...)
382  DbgML_SleepTime = 1<<5, // activate log in order to know when a thread is set to sleep
383 
384  // Independent values
385  DbgML_Rate = 1<<6, // activate log for displaying reception and emission rate
386  DbgML_HeartbeatTolerance = 1<<7, // if set the Heartbeat Interval is multiplied by 100
387 };
388 
389 
390 
391 } // namespace AMP
392 } // namespace Net
393 } // namespace Kaim
394 
395 #endif
396 
397 
398 namespace Kaim {
399 namespace Net {
400 namespace AMP {
401 void SetDebugMsgLog(UInt32 DebugMsgLogEnum_Mask); // used to toggle extreme verbosity of the ThreadMgr in debug only, do nothing in other modes.
402 UInt32 GetDebugMsgLog(); //< return a mask whose values are defined in DebugMsgLogEnum
403 } // namespace AMP
404 } // namespace Net
405 } // namespace Kaim
406 
407 
408 #endif // INC_KY_AMP_THREAD_MGR_H
#define KY_NULL
Null value.
Definition: types.h:247
Definition: gamekitcrowddispersion.h:20