TDME2  1.9.200
UDPServerIOThread.cpp
Go to the documentation of this file.
1 #include <string.h>
2 
3 #include <memory>
4 #include <string>
5 #include <typeinfo>
6 #include <unordered_map>
7 
8 #include <tdme/tdme.h>
17 #include <tdme/utilities/Console.h>
19 #include <tdme/utilities/RTTI.h>
20 #include <tdme/utilities/Time.h>
21 
22 using std::make_unique;
23 using std::string;
24 using std::to_string;
25 using std::unique_ptr;
26 using std::unordered_map;
27 
43 
44 const uint64_t UDPServerIOThread::MESSAGEACK_RESENDTIMES[UDPServerIOThread::MESSAGEACK_RESENDTIMES_TRIES] = {125L, 250L, 500L, 750L, 1000L, 2000L, 5000L};
45 
46 UDPServerIOThread::UDPServerIOThread(const unsigned int id, UDPServer *server, const unsigned int maxCCU, Barrier* startUpBarrier) :
47  Thread("nioudpserveriothread"),
48  id(id),
49  server(server),
50  maxCCU(maxCCU),
51  startUpBarrier(startUpBarrier),
52  messageQueueMutex("nioupserveriothread_messagequeue"),
53  messageMapAckMutex("nioupserveriothread_messagequeueack") {
54  //
55 }
56 
58  //
60  while (messageQueue.empty() == false) {
61  delete messageQueue.front();
62  messageQueue.pop();
63  }
65  //
67  for (const auto& [messageId, message]: messageMapAck) delete message;
68  messageMapAck.clear();
70 }
71 
73  Console::println("UDPServerIOThread[" + to_string(id) + "]::run(): start");
74 
75  // wait on startup barrier
77  startUpBarrier = nullptr;
78 
79  // catch kernel event and server socket exceptions
80  try {
81  // create server socket
82  socket = unique_ptr<UDPSocket>(UDPSocket::createServerSocket(server->host, server->port));
83 
84  // initialize kernel event mechanismn
87 
88  // do event loop
89  auto lastMessageQueueAckTime = Time::getCurrentMillis();
90  while (isStopRequested() == false) {
91  auto now = Time::getCurrentMillis();
92 
93  // process ack messages every 25ms
94  if (now >= lastMessageQueueAckTime + 25L) {
96  lastMessageQueueAckTime = now;
97  }
98 
99  // do kernel event mechanism
100  auto events = kem.doKernelEventMechanism();
101 
102  // iterate the event list
103  for (auto i = 0; i < events; i++) {
104  NIOInterest keInterest;
105  void* nil;
106 
107  // decode kernel event
108  kem.decodeKernelEvent(i, keInterest, (void*&)nil);
109 
110  // interests
111  auto hasReadInterest = (keInterest & NIO_INTEREST_READ) == NIO_INTEREST_READ;
112  auto hasWriteInterest = (keInterest & NIO_INTEREST_WRITE) == NIO_INTEREST_WRITE;
113 
114  // process read interest
115  if (hasReadInterest == true) {
116  ssize_t bytesReceived;
117  string ip;
118  unsigned int port;
119  char message[512];
120 
121  // receive datagrams as long as its size > 0 and read would not block
122  while ((bytesReceived = socket->read(ip, port, (void*)message, sizeof(message))) > 0) {
123  //
124  AtomicOperations::increment(server->statistics.received);
125 
126  // process event, catch and handle client related exceptions
127  UDPServerClient* client = nullptr;
128  UDPServerClient* clientNew = nullptr;
129  try {
130  // transfer buffer to string stream
131  auto packet = make_unique<UDPPacket>();
132  packet->putBytes((const uint8_t*)message, bytesReceived);
133  packet->reset();
134 
135  // validate datagram
136  server->validate(packet.get());
137 
138  // identify datagram
139  UDPServer::MessageType messageType;
140  uint32_t clientId;
141  uint32_t messageId;
142  uint8_t retries;
143  server->identify(packet.get(), messageType, clientId, messageId, retries);
144 
145  // process message depending on messageType
146  switch(messageType) {
148  {
149  //
150  AtomicOperations::increment(server->statistics.accepts);
151 
152  // check if client is connected already
153  client = server->getClientByIp(ip, port);
154  if (client != nullptr) {
155  // delete packet
156  client->sendConnected();
157  client->releaseReference();
158  // we are done
159  break;
160  }
161 
162  // create client
163  clientNew = server->accept(
165  ip,
166  port
167  );
168 
169  // assign server
170  clientNew->server = server;
171 
172  // add client to server
173  server->addClient(clientNew);
174 
175  // switch from client new to client
176  client = clientNew;
177  clientNew = nullptr;
178 
179  // send connected ack
180  client->sendConnected();
181 
182  // set/register client in Server
183  if (client->setKey(client->getKey()) == false) {
184  throw NetworkServerException("Client key is already in use");
185  }
186 
187  // fire on init
188  client->init();
189 
190  // we are done
191  break;
192  }
194  {
195  // look up client
196  client = server->lookupClient(clientId);
197  // check if client ip, port matches datagram ip and prt
198  if (client->ip != ip || client->port != port) {
199  //
200  client->releaseReference();
201  throw NetworkServerException("message invalid");
202  }
203  // delegate
204  client->onPacketReceived(packet.release(), messageId, retries);
205  break;
206  }
208  {
209  client = server->lookupClient(clientId);
210  server->processAckReceived(client, messageId);
211  break;
212  }
213  default:
214  throw NetworkServerException("Invalid message type");
215  }
216  } catch(Exception& exception) {
217  // log
218  Console::println(
219  "UDPServerIOThread[" +
220  to_string(id) +
221  "]::run(): " +
222  (RTTI::demangle(typeid(exception).name())) +
223  ": " +
224  (exception.what())
225  );
226 
227  if (clientNew != nullptr) {
228  delete clientNew;
229  }
230  // in case it was a client related exception
231  if (client != nullptr) {
232  // otherwise shut down client
233  client->shutdown();
234  }
235  //
236  AtomicOperations::increment(server->statistics.errors);
237  }
238  }
239  }
240 
241  // process write interest
242  while (hasWriteInterest == true) {
243  // fetch batch of messages to be send
244  MessageQueue messageQueueBatch;
246  for (int i = 0; i < MESSAGEQUEUE_SEND_BATCH_SIZE && messageQueue.empty() == false; i++) {
247  auto message = messageQueue.front();
248  messageQueueBatch.push(message);
249  messageQueue.pop();
250  }
252 
253  // try to send batch
254  while (messageQueueBatch.empty() == false) {
255  auto message = messageQueueBatch.front();
256  if (socket->write(message->ip, message->port, (void*)message->message, message->bytes) == -1) {
257  // sending would block, stop trying to sendin
258  AtomicOperations::increment(server->statistics.errors);
259  break;
260  } else {
261  // success, remove message from message queue batch and continue
262  delete message;
263  messageQueueBatch.pop();
264  //
265  AtomicOperations::increment(server->statistics.sent);
266  }
267  }
268 
269  // re add messages not sent in batch to message queue
270  if (messageQueueBatch.empty() == true) {
272  if (messageQueue.empty() == true) {
274  socket.get(),
277  nullptr
278  );
279 
280  // no more data to send, so stop the loop
281  hasWriteInterest = false;
282  }
284  } else {
286  do {
287  auto message = messageQueueBatch.front();
288  messageQueue.push(message);
289  messageQueueBatch.pop();
290  } while (messageQueueBatch.empty() == false);
292 
293  // we did not send all batched messages, so stop the loop
294  hasWriteInterest = false;
295  }
296  }
297  }
298  }
299 
300  //
301  } catch (Exception &exception) {
302  // log
303  Console::println(
304  "UDPServerIOThread[" +
305  to_string(id) +
306  "]::run(): " +
307  (RTTI::demangle(typeid(exception).name())) +
308  ": " +
309  (exception.what())
310  );
311  }
312 
313  // exit gracefully
315  socket->close();
316 
317  // log
318  Console::println("UDPServerIOThread[" + to_string(id) + "]::run(): done");
319 }
320 
321 void UDPServerIOThread::sendMessage(const UDPServerClient* client, const uint8_t messageType, const uint32_t messageId, const UDPPacket* packet, const bool safe, const bool deletePacket) {
322  // create message
323  auto message = make_unique<Message>();
324  message->ip = client->ip;
325  message->port = client->port;
326  message->time = Time::getCurrentMillis();
327  message->messageType = messageType;
328  message->clientId = client->clientId;
329  message->messageId = messageId;
330  message->retries = 0;
331  message->bytes = packet->getSize();
332 
333  // store current position which should be end of packet
334  auto position = packet->getPosition();
335  // reset position to be able to write header
336  packet->reset();
337  packet->getBytes((uint8_t*)message->message, message->bytes);
338  // restore position to end of stream
339  packet->setPosition(position);
340 
341  // delete packet if requested
342  if (deletePacket == true) delete packet;
343 
344  // requires ack and retransmission ?
345  if (safe == true) {
347  // check if message has already be pushed to ack
348  auto it = messageMapAck.find(messageId);
349  if (it != messageMapAck.end()) {
350  // its on ack queue already, so unlock
352  throw NetworkServerException("message already on message queue ack");
353  }
354  // check if message queue is full
355  if (messageMapAck.size() > maxCCU * 20) {
357  throw NetworkServerException("message queue ack overflow");
358  }
359  // push to message queue ack
360  // create message ack
361  auto messageAck = new Message();
362  *messageAck = *message;
363  messageMapAck[messageId] = messageAck;
365  }
366 
367  // push to message queue
369 
370  // check if message queue is full
371  if (messageQueue.size() > maxCCU * 20) {
373  throw NetworkServerException("message queue overflow");
374  }
375  messageQueue.push(message.release());
376 
377  // set nio interest
378  if (messageQueue.size() == 1) {
380  socket.get(),
383  nullptr
384  );
385  }
386 
387  // done
389 }
390 
391 void UDPServerIOThread::processAckReceived(UDPServerClient* client, const uint32_t messageId) {
392  bool messageAckValid = true;
393 
394  // delete message from message queue ack
396  auto it = messageMapAck.find(messageId);
397  if (it != messageMapAck.end()) {
398  // message exists
399  auto messageAck = it->second;
400  // message ack valid?
401  messageAckValid = messageAck->ip == client->ip && messageAck->port == client->port;
402  // remove if valid
403  if (messageAckValid == true) {
404  // remove message from message queue ack
405  delete it->second;
406  messageMapAck.erase(it);
407  }
408  }
410 
411  //
412  client->releaseReference();
413 
414  // check if message ack was valid
415  if (messageAckValid == false) {
416  throw NetworkServerException("message ack invalid");
417  }
418 }
419 
421  MessageQueue messageQueueResend;
422  auto now = Time::getCurrentMillis();
423 
425  auto it = messageMapAck.begin();
426  while (it != messageMapAck.end()) {
427  auto messageAck = it->second;
428  // message ack timed out?
429  // most likely the client is gone
430  if (messageAck->retries == MESSAGEACK_RESENDTIMES_TRIES) {
431  // delete from message map ack
432  delete it->second;
433  messageMapAck.erase(it++);
434  // skip
435  continue;
436  } else
437  // message should be resend?
438  if (now > (messageAck->time + (MESSAGEACK_RESENDTIMES[messageAck->retries]))) {
439  // increase tries
440  messageAck->retries++;
441 
442  // construct message
443  auto message = new Message();
444  *message = *messageAck;
445 
446  // recreate packet header with updated hash and retries
447  UDPPacket packet;
448  packet.putBytes((const uint8_t*)message->message, message->bytes);
449  packet.reset();
450  server->writeHeader(&packet, (UDPServer::MessageType)message->messageType, message->clientId, message->messageId, message->retries);
451  packet.getBytes((uint8_t*)message->message, message->bytes);
452 
453  // and push to be resent
454  messageQueueResend.push(message);
455  }
456  ++it;
457  }
459 
460  // reissue messages to be resent
461  if (messageQueueResend.empty() == false) {
463  do {
464  auto message = messageQueueResend.front();
465  messageQueue.push(message);
466  messageQueueResend.pop();
467 
468  // set nio interest
469  if (messageQueue.size() == 1) {
471  socket.get(),
474  nullptr
475  );
476  }
477  } while (messageQueueResend.empty() == false);
479  }
480 }
const UDPPacket * getBytes(uint8_t *bytes, uint16_t byteCount) const
Get raw bytes from packet.
Definition: UDPPacket.h:268
const UDPPacket * setPosition(uint16_t position) const
Set position.
Definition: UDPPacket.h:62
uint16_t getSize() const
Get size of packet.
Definition: UDPPacket.h:39
uint16_t getPosition() const
Get position.
Definition: UDPPacket.h:53
UDPPacket * putBytes(const uint8_t *bytes, uint16_t byteCount)
Puts raw bytes into packet.
Definition: UDPPacket.h:290
void reset() const
Reset position for read.
Definition: UDPPacket.h:80
Base exception class for network server exceptions.
Base class for network UDP server clients.
void init()
initiates this network client
const string & getKey() const
Client identification key.
void shutdown()
Shuts down this network client.
virtual void onPacketReceived(const UDPPacket *packet, const uint32_t messageId=0, const uint8_t retries=0)
Event, which will be called if packet has been received, defaults to worker thread pool.
const bool setKey(const string &key)
sets the clients identification key
void sendConnected()
Sends an connect message to client.
void processAckMessages()
Clean up timed out safe messages, reissue messages not beeing acknowlegded from client.
void processAckReceived(UDPServerClient *client, const uint32_t messageId)
Processes an acknowlegdement reception.
STATIC_DLL_IMPEXT static const uint64_t MESSAGEACK_RESENDTIMES[MESSAGEACK_RESENDTIMES_TRIES]
void sendMessage(const UDPServerClient *client, const uint8_t messageType, const uint32_t messageId, const UDPPacket *packet, const bool safe, const bool deletePacket)
pushes a message to be send, takes over ownership of frame
Base class for network UDP servers.
Definition: UDPServer.h:42
UDPServer_Statistics statistics
Definition: UDPServer.h:218
virtual void identify(const UDPPacket *packet, MessageType &messageType, uint32_t &connectionId, uint32_t &messageId, uint8_t &retries)
Identifies a client message.
Definition: UDPServer.cpp:161
virtual void validate(const UDPPacket *packet)
Validates a client message.
Definition: UDPServer.cpp:214
virtual UDPServerClient * accept(const uint32_t clientId, const string &ip, const uint16_t port)
method to implement for accepting clients
Definition: UDPServer.cpp:157
void processAckReceived(UDPServerClient *client, const uint32_t messageId)
Processes an acknowlegdement reception.
Definition: UDPServer.cpp:476
UDPServerClient * getClientByIp(const string &ip, const uint16_t port)
Returns client by host name and port.
Definition: UDPServer.cpp:411
void addClient(UDPServerClient *client)
maps a new client to a given client id
Definition: UDPServer.cpp:269
UDPServerClient * lookupClient(const uint32_t clientId)
Look ups a client by client id.
Definition: UDPServer.cpp:378
const uint32_t allocateClientId()
Allocates a client id for a new client.
Definition: UDPServer.cpp:481
virtual void writeHeader(UDPPacket *packet, MessageType messageType, const uint32_t clientId, const uint32_t messageId, const uint8_t retries)
Writes a message header to message.
Definition: UDPServer.cpp:226
Interface to kernel event mechanismns.
void shutdownKernelEventMechanism()
Shutdowns the kernel event mechanism.
void setSocketInterest(NetworkSocket *socket, const NIOInterest lastInterest, const NIOInterest interest, const void *cookie)
Sets a non blocked socket io interest.
int doKernelEventMechanism()
Do the kernel event mechanism.
void initKernelEventMechanism(const unsigned int maxSockets)
Initializes the kernel event mechanism.
void decodeKernelEvent(const unsigned int index, NIOInterest &interest, void *&cookie)
Decodes a kernel event.
Barrier implementation.
Definition: Barrier.h:21
bool wait()
Waits on barrier.
Definition: Barrier.cpp:28
Mutex implementation.
Definition: Mutex.h:19
void unlock()
Unlocks this mutex.
Definition: Mutex.h:54
void lock()
Locks the mutex, additionally mutex locks will block until other locks have been unlocked.
Definition: Mutex.h:47
Base class for threads.
Definition: Thread.h:20
bool isStopRequested()
Returns if stop has been requested.
Definition: Thread.h:77
Console class.
Definition: Console.h:29
Run time type information utility class.
Definition: RTTI.h:14
virtual void releaseReference()
Releases a reference, thus decrementing the counter and delete it if reference counter is zero.
Definition: Reference.h:38
Time utility class.
Definition: Time.h:20
const NIOInterest NIO_INTEREST_NONE
Definition: NIOInterest.h:11
const NIOInterest NIO_INTEREST_READ
Definition: NIOInterest.h:12
uint8_t NIOInterest
type definition for network IO interest
Definition: NIOInterest.h:10
const NIOInterest NIO_INTEREST_WRITE
Definition: NIOInterest.h:13
std::exception Exception
Exception base class.
Definition: Exception.h:18