TDME2  1.9.200
UDPClient.cpp
Go to the documentation of this file.
1 #include <string.h>
2 
3 #include <memory>
4 #include <queue>
5 #include <string>
6 #include <typeinfo>
7 #include <unordered_map>
8 
9 #include <tdme/tdme.h>
19 #include <tdme/utilities/Console.h>
21 #include <tdme/utilities/RTTI.h>
22 #include <tdme/utilities/Time.h>
23 
24 using std::make_unique;
25 using std::queue;
26 using std::unique_ptr;
27 using std::unordered_map;
28 using std::string;
29 
46 
47 const uint64_t UDPClient::MESSAGEACK_RESENDTIMES[UDPClient::MESSAGEACK_RESENDTIMES_TRIES] = {125L, 250L, 500L, 750L, 1000L, 2000L, 5000L};
48 
49 UDPClient::UDPClient(const string& ip, const uint16_t port) :
50  Thread("nioudpclientthread"),
51  messageQueueMutex("nioupclientthread_messagequeue"),
52  messageMapAckMutex("nioupclientthread_messagequeueack"),
53  recvMessageQueueMutex("nioupclientthread_recvmessagequeuemutex"),
54  messageMapSafeMutex("nioupclientthread_messagemasafemutex"),
55  ip(ip),
56  port(port),
57  clientId(0),
58  messageCount(0),
59  initialized(false),
60  connected(false) {
61  //
62 }
63 
65  //
67  while (messageQueue.empty() == false) {
68  delete messageQueue.front();
69  messageQueue.pop();
70  }
72  //
74  for (const auto& [messageId, message]: messageMapAck) delete message;
75  messageMapAck.clear();
77  //
79  while (recvMessageQueue.empty() == false) {
80  delete recvMessageQueue.front();
81  recvMessageQueue.pop();
82  }
84  //
86  for (const auto& [messageId, message]: messageMapSafe) delete message;
87  messageMapSafe.clear();
89 }
90 
92  Console::println("UDPClient::run(): start");
93 
94  // catch kernel event and server socket exceptions
95  try {
96  // create client socket
97  socket = unique_ptr<UDPSocket>(UDPSocket::createClientSocket(NetworkSocket::determineIpVersion(ip)));
98 
99  // initialize kernel event mechanismn
102 
103  // initialized
104  initialized = true;
105 
106  // do event loop
107  auto lastMessageQueueAckTime = Time::getCurrentMillis();
108  auto lastMessageConnectTime = Time::getCurrentMillis();
109  auto lastMessageSafeCleanTime = Time::getCurrentMillis();
110  while(isStopRequested() == false) {
111  auto now = Time::getCurrentMillis();
112 
113  // process connect messages every 25ms
114  if (connected == false && now >= lastMessageConnectTime + 25L) {
115  // send connection message
117  lastMessageConnectTime = now;
118  }
119 
120  // process ack messages every 25ms
121  if (now >= lastMessageQueueAckTime + 25L) {
123  lastMessageQueueAckTime = now;
124  }
125 
126  // process save messages clean up every 25ms
127  if (now >= lastMessageSafeCleanTime + 25L) {
129  lastMessageQueueAckTime = now;
130  }
131 
132  // do kernel event mechanism
133  int events = kem.doKernelEventMechanism();
134 
135  // iterate the event list
136  for(unsigned int i = 0; i < (unsigned int)events; i++) {
137  NIOInterest keInterest;
138  void* nil;
139 
140  // decode kernel event
141  kem.decodeKernelEvent(i, keInterest, (void*&)nil);
142 
143  // interests
144  auto hasReadInterest = (keInterest & NIO_INTEREST_READ) == NIO_INTEREST_READ;
145  auto hasWriteInterest = (keInterest & NIO_INTEREST_WRITE) == NIO_INTEREST_WRITE;
146 
147  // process read interest
148  if (hasReadInterest == true) {
149  ssize_t bytesReceived;
150  string fromIp;
151  unsigned int fromPort;
152  char message[512];
153 
154  // receive datagrams as long as its size > 0 and read would not block
155  while ((bytesReceived = socket->read(fromIp, fromPort, (void*)message, sizeof(message))) > 0) {
156  //
158  //
159  auto clientMessage = unique_ptr<UDPClientMessage>(UDPClientMessage::parse(message, bytesReceived));
160  try {
161  if (clientMessage == nullptr) {
162  throw NetworkClientException("invalid message");
163  }
164  switch(clientMessage->getMessageType()) {
166  {
167  processAckReceived(clientMessage->getMessageId());
168  //
169  break;
170  }
172  {
173  sendMessage(
174  new UDPClientMessage(
176  clientMessage->getClientId(),
177  clientMessage->getMessageId(),
178  clientMessage->getRetryCount() + 1,
179  nullptr
180  ),
181  false
182  );
183  clientId = clientMessage->getClientId();
184  // read client key
185  auto packet = clientMessage->getPacket();
186  clientKey = packet->getString();
187  // we are connected
188  connected = true;
189  //
190  break;
191  }
193  {
194  // check if message queue is full
196  if (recvMessageQueue.size() > 1000) {
198  throw NetworkClientException("recv message queue overflow");
199  }
200  recvMessageQueue.push(clientMessage.release());
202  break;
203  }
205  {
206  break;
207  }
208  }
209  } catch (Exception &exception) {
210  // log
211  Console::println(
212  "UDPClient::run(): " +
213  RTTI::demangle(typeid(exception).name()) +
214  ": " +
215  string(exception.what())
216  );
217 
218  //
219  statistics.errors++;
220 
221  // rethrow to quit communication for now
222  // TODO: maybe find a better way to handle errors
223  // one layer up should be informed about network client problems somehow
224  throw exception;
225  }
226  }
227  }
228 
229  // process write interest
230  while (hasWriteInterest == true) {
231  // fetch batch of messages to be send
232  MessageQueue messageQueueBatch;
234  for (int i = 0; i < MESSAGEQUEUE_SEND_BATCH_SIZE && messageQueue.empty() == false; i++) {
235  auto message = messageQueue.front();
236  messageQueueBatch.push(message);
237  messageQueue.pop();
238  }
240 
241  // try to send batch
242  while (messageQueueBatch.empty() == false) {
243  auto message = messageQueueBatch.front();
244  if (socket->write(ip, port, (void*)message->message, message->bytes) == -1) {
245  // sending would block, stop trying to sendin
246  statistics.errors++;
247  //
248  break;
249  } else {
250  // success, remove message from message queue batch and continue
251  auto message = messageQueueBatch.front();
252  delete message;
253  messageQueueBatch.pop();
254  //
255  statistics.sent++;
256  }
257  }
258 
259  // re add messages not sent in batch to message queue
260  if (messageQueueBatch.empty() == true) {
262  if (messageQueue.empty() == true) {
264  socket.get(),
267  nullptr
268  );
269 
270  // no more data to send, so stop the loop
271  hasWriteInterest = false;
272  }
274  } else {
276  do {
277  auto message = messageQueueBatch.front();
278  messageQueue.push(message);
279  messageQueueBatch.pop();
280  } while (messageQueueBatch.empty() == false);
282 
283  // we did not send all batched messages, so stop the loop
284  hasWriteInterest = false;
285  }
286  }
287  }
288  }
289 
290  //
291  } catch (Exception &exception) {
292  // log
293  Console::println(
294  "UDPClient::run(): " +
295  RTTI::demangle(typeid(exception).name()) +
296  ": " +
297  string(exception.what())
298  );
299  }
300 
301  // exit gracefully
303  socket->close();
304 
305  // log
306  Console::println("UDPClient::run(): done");
307 }
308 
309 void UDPClient::sendMessage(UDPClientMessage* clientMessage, bool safe) {
310  // create message
311  auto clientMessagePtr = unique_ptr<UDPClientMessage>(clientMessage);
312  auto message = make_unique<Message>();
313  message->time = clientMessagePtr->getTime();
314  message->messageType = clientMessagePtr->getMessageType();
315  message->messageId = clientMessagePtr->getMessageId();
316  message->retries = 0;
317  clientMessagePtr->generate(message->message, message->bytes);
318 
319  // requires ack and retransmission ?
320  if (safe == true) {
322  // check if message has already be pushed to ack
323  auto it = messageMapAck.find(message->messageId);
324  if (it != messageMapAck.end()) {
325  // its on ack queue already, so unlock
327  throw NetworkClientException("message already on message queue ack");
328  }
329  // check if message queue is full
330  if (messageMapAck.size() > 1000) {
332  throw NetworkClientException("message queue ack overflow");
333  }
334  // push to message queue ack
335  // create message ack
336  auto messageAck = new Message();
337  *messageAck = *message;
338  messageMapAck[message->messageId] = messageAck;
340  }
341 
342  // push to message queue
344 
345  // check if message queue is full
346  if (messageQueue.size() > 1000) {
348  throw NetworkClientException("message queue overflow");
349  }
350  messageQueue.push(message.release());
351 
352  // set nio interest
353  if (messageQueue.size() == 1) {
355  socket.get(),
358  nullptr
359  );
360  }
361 
362  // done
364 }
365 
366 void UDPClient::processAckReceived(const uint32_t messageId) {
367  auto messageAckValid = true;
368 
369  // delete message from message queue ack
371  auto it = messageMapAck.find(messageId);
372  if (it != messageMapAck.end()) {
373  // message ack valid?
374  messageAckValid = true; //messageAck->ip == client->ip && messageAck->port == client->port;
375  // remove if valid
376  if (messageAckValid == true) {
377  // remove message from message queue ack
378  delete it->second;
379  messageMapAck.erase(it);
380  }
381  }
383 
384  // check if message ack was valid
385  if (messageAckValid == false) {
386  throw NetworkClientException("message ack invalid");
387  }
388 }
389 
391  MessageQueue messageQueueResend;
392  auto now = Time::getCurrentMillis();
393 
395  auto it = messageMapAck.begin();
396  while (it != messageMapAck.end()) {
397  auto messageAck = it->second;
398  // message ack timed out?
399  // most likely the client is gone
400  if (messageAck->retries == MESSAGEACK_RESENDTIMES_TRIES) {
401  // delete from message map ack
402  delete it->second;
403  messageMapAck.erase(it++);
404  // skip
405  continue;
406  } else
407  // message should be resend?
408  if (now > (messageAck->time + (MESSAGEACK_RESENDTIMES[messageAck->retries]))) {
409  // increase tries
410  messageAck->retries++;
411 
412  // construct message
413  auto message = new Message();
414  *message = *messageAck;
415 
416  // parse client message from message raw data
417  auto clientMessage = unique_ptr<UDPClientMessage>(UDPClientMessage::parse(message->message, message->bytes));
418 
419  // increase/set retry
420  clientMessage->retry();
421 
422  // recreate message
423  clientMessage->generate(message->message, message->bytes);
424 
425  // and push to be resent
426  messageQueueResend.push(message);
427  }
428  ++it;
429  }
431 
432  // reissue messages to be resent
433  if (messageQueueResend.empty() == false) {
435  do {
436  auto message = messageQueueResend.front();
437  messageQueue.push(message);
438  messageQueueResend.pop();
439 
440  // set nio interest
441  if (messageQueue.size() == 1) {
443  socket.get(),
446  nullptr
447  );
448  }
449  } while (messageQueueResend.empty() == false);
451  }
452 }
453 
455  auto messageProcessed = false;
456  auto messageId = clientMessage->getMessageId();
457 
458  //
460 
461  // check if message has been already processed
462  auto it = messageMapSafe.find(messageId);
463  if (it != messageMapSafe.end()) {
464  // yep, we did
465  messageProcessed = true;
466  auto message = it->second;
467  message->receptions++;
468  } else {
469  // nope, just remember message
470  auto message = new SafeMessage();
471  message->messageId = messageId;
472  message->receptions = 1;
473  message->time = Time::getCurrentMillis();
474  // TODO: check for overflow
475  messageMapSafe[messageId] = message;
476  }
477 
478  //
480 
481  // always send ack
482  sendMessage(
483  new UDPClientMessage(
485  clientId,
486  clientMessage->getMessageId(),
487  0,
488  nullptr
489  ),
490  false
491  );
492 
493  // return if message should be processed
494  return messageProcessed == true?false:true;
495 }
496 
497 
499  //
501 
502  // check if message has been already processed
503  auto now = Time::getCurrentMillis();
504  auto it = messageMapSafe.begin();
505  while (it != messageMapSafe.end()) {
506  SafeMessage* message = it->second;
507  if (message->time < now - MESSAGESSAFE_KEEPTIME) {
508  delete it->second;
509  messageMapSafe.erase(it++);
510  continue;
511  }
512  ++it;
513  }
514 
515  //
517 }
518 
520  UDPClientMessage* message = nullptr;
522  if (recvMessageQueue.empty() == false) {
523  message = recvMessageQueue.front();
524  recvMessageQueue.pop();
525  }
527  return message;
528 }
529 
531  return new UDPClientMessage(
533  clientId,
534  messageCount++,
535  0,
536  packet
537  );
538 }
539 
541  auto stats = statistics;
542  statistics.time = Time::getCurrentMillis();
543  statistics.received = 0;
544  statistics.sent = 0;
545  statistics.errors = 0;
546  return stats;
547 }
Base exception class for network client exceptions.
static UDPClientMessage * parse(const char message[512], uint16_t bytes)
Parse UDP client message.
void cleanUpSafeMessages()
Clean up safe messages.
Definition: UDPClient.cpp:498
static const int MESSAGEACK_RESENDTIMES_TRIES
Definition: UDPClient.h:184
virtual void run()
Run thread program.
Definition: UDPClient.cpp:91
void processAckReceived(const uint32_t messageId)
Processes ack reveived.
Definition: UDPClient.cpp:366
void processAckMessages()
Process ack messages.
Definition: UDPClient.cpp:390
queue< Message * > MessageQueue
Definition: UDPClient.h:196
static const uint64_t MESSAGESSAFE_KEEPTIME
Definition: UDPClient.h:200
const UDPClient_Statistics getStatistics()
Definition: UDPClient.cpp:540
static const int MESSAGEQUEUE_SEND_BATCH_SIZE
Definition: UDPClient.h:186
unique_ptr< UDPSocket > socket
Definition: UDPClient.h:223
void sendMessage(UDPClientMessage *clientMessage, bool safe)
Pushes a message to be send, takes over ownership of message.
Definition: UDPClient.cpp:309
UDPClientMessage * createMessage(const UDPPacket *packet)
Create message.
Definition: UDPClient.cpp:530
STATIC_DLL_IMPEXT static const uint64_t MESSAGEACK_RESENDTIMES[MESSAGEACK_RESENDTIMES_TRIES]
Definition: UDPClient.h:185
bool processSafeMessage(UDPClientMessage *clientMessage)
Returns if a message should be processed or already have been processed.
Definition: UDPClient.cpp:454
UDPClientMessage * receiveMessage()
Receive message.
Definition: UDPClient.cpp:519
UDPClient_Statistics statistics
Definition: UDPClient.h:225
Interface to kernel event mechanismns.
void shutdownKernelEventMechanism()
Shutdowns the kernel event mechanism.
void setSocketInterest(NetworkSocket *socket, const NIOInterest lastInterest, const NIOInterest interest, const void *cookie)
Sets a non blocked socket io interest.
int doKernelEventMechanism()
Do the kernel event mechanism.
void initKernelEventMechanism(const unsigned int maxSockets)
Initializes the kernel event mechanism.
void decodeKernelEvent(const unsigned int index, NIOInterest &interest, void *&cookie)
Decodes a kernel event.
Base class of network sockets.
Definition: NetworkSocket.h:17
Mutex implementation.
Definition: Mutex.h:19
void unlock()
Unlocks this mutex.
Definition: Mutex.h:54
void lock()
Locks the mutex, additionally mutex locks will block until other locks have been unlocked.
Definition: Mutex.h:47
Base class for threads.
Definition: Thread.h:20
bool isStopRequested()
Returns if stop has been requested.
Definition: Thread.h:77
Console class.
Definition: Console.h:29
Run time type information utility class.
Definition: RTTI.h:14
Time utility class.
Definition: Time.h:20
const NIOInterest NIO_INTEREST_NONE
Definition: NIOInterest.h:11
const NIOInterest NIO_INTEREST_READ
Definition: NIOInterest.h:12
uint8_t NIOInterest
type definition for network IO interest
Definition: NIOInterest.h:10
const NIOInterest NIO_INTEREST_WRITE
Definition: NIOInterest.h:13
std::exception Exception
Exception base class.
Definition: Exception.h:18