TDME2  1.9.200
UDPServer.cpp
Go to the documentation of this file.
2 
3 #include <exception>
4 #include <memory>
5 #include <string>
6 #include <string_view>
7 #include <typeinfo>
8 
9 #include <tdme/tdme.h>
10 #include <tdme/math/Math.h>
17 #include <tdme/utilities/Console.h>
18 #include <tdme/utilities/Integer.h>
19 #include <tdme/utilities/RTTI.h>
20 #include <tdme/utilities/Time.h>
21 
22 using std::ios_base;
23 using std::make_unique;
24 using std::string;
25 using std::string_view;
26 using std::to_string;
27 using std::unique_ptr;
28 
29 using tdme::math::Math;
41 
42 UDPServer::UDPServer(const std::string& name, const std::string& host, const unsigned int port, const unsigned int maxCCU) :
43  Server<UDPServerClient, UDPServerGroup>(name, host, port, maxCCU),
44  Thread("nioudpserver"),
45  clientIdMapReadWriteLock("nioudpserver_clientidmap"),
46  clientIpMapReadWriteLock("nioudpserver_clientipmap"),
47  workerThreadPool(nullptr),
48  clientCount(0),
49  messageCount(0) {
50  //
51 }
52 
54 }
55 
57  Console::println("UDPServer::run(): start");
58 
59  {
60  // create start up barrier for io threads
61  auto startUpBarrier = make_unique<Barrier>("nioudpserver_startup_workers", workerThreadPoolCount + 1);
62 
63  // setup worker thread pool
64  workerThreadPool = make_unique<ServerWorkerThreadPool>(workerThreadPoolCount, workerThreadPoolMaxElements, startUpBarrier.get());
65  workerThreadPool->start();
66 
67  // wait on startup barrier and delete it
68  startUpBarrier->wait();
69  }
70 
71  {
72  // create start up barrier for IO threads
73  auto startUpBarrier = make_unique<Barrier>("nioudpserver_startup_iothreads", ioThreadCount + 1);
74 
75  // create and start IO threads
76  ioThreads.resize(ioThreadCount);
77  for(auto i = 0; i < ioThreadCount; i++) {
78  ioThreads[i] = make_unique<UDPServerIOThread>(i, this, (int)Math::ceil((float)maxCCU / (float)ioThreadCount), startUpBarrier.get());
79  ioThreads[i] ->start();
80  }
81 
82  // wait on startup barrier and delete it
83  startUpBarrier->wait();
84  }
85 
86  // init worker thread pool
87  //
88  Console::println("UDPServer::run(): ready");
89 
90  // do main event loop, waiting until stop requested
91  auto lastCleanUpClientsTime = Time::getCurrentMillis();
92  auto lastCleanUpClientsSafeMessagesTime = Time::getCurrentMillis();
93  while (isStopRequested() == false) {
94  // start time
95  auto now = Time::getCurrentMillis();
96 
97  // clean up clients
98  if (now >= lastCleanUpClientsTime + 100L) {
100  lastCleanUpClientsTime = now;
101  }
102 
103  // iterate over clients and clean up safe messages
104  if (now >= lastCleanUpClientsSafeMessagesTime + 100L) {
105  auto _clientKeySet = getClientKeySet();
106  for (const auto& clientKey: _clientKeySet) {
107  auto client = getClientByKey(clientKey);
108 
109  // skip on clients that have been gone
110  if (client == nullptr) continue;
111 
112  // clean up safe messages
113  client->cleanUpSafeMessages();
114 
115  // never forget to release ;)
116  client->releaseReference();
117  }
118  lastCleanUpClientsSafeMessagesTime = now;
119  }
120 
121  // duration
122  auto duration = Time::getCurrentMillis() - now;
123 
124  // wait total of 100ms seconds before repeat
125  if (duration < 100L) {
126  sleep(100L - duration);
127  }
128  }
129 
130  // we stopped accept, now iterate over clients and close them
131  auto _clientKeySet = getClientKeySet();
132  for (const auto& clientKey: _clientKeySet) {
133  auto client = getClientByKey(clientKey);
134  // continue if gone already
135  if (client == nullptr) continue;
136  // client close logic
137  client->close();
138  // remove from udp client list
139  removeClient(client);
140  }
141 
142  // now stop io threads
143  for(const auto& ioThread: ioThreads) {
144  ioThread->stop();
145  ioThread->join();
146  }
147  ioThreads.clear();
148 
149  // stop thread pool
150  workerThreadPool->stop();
151  workerThreadPool = nullptr;
152 
153  //
154  Console::println("UDPServer::run(): done");
155 }
156 
157 UDPServerClient* UDPServer::accept(const uint32_t clientId, const std::string& ip, const uint16_t port) {
158  return nullptr;
159 }
160 
161 void UDPServer::identify(const UDPPacket* packet, MessageType& messageType, uint32_t& connectionId, uint32_t& messageId, uint8_t& retries) {
162  // format 1char_message_type,6_char_connection_id,6_char_message_id,1_char_retries
163  char inMessageType;
164  char inConnectionId[6];
165  char inMessageId[6];
166  char inRetries[1];
167 
168  // check if enough data available
169  if (packet->getSize() <
170  sizeof(inMessageType) +
171  sizeof(inConnectionId) +
172  sizeof(inMessageId) +
173  sizeof(inRetries)) {
174  throw NetworkServerException("Invalid message header size");
175  }
176 
177  // check message type
178  inMessageType = packet->getByte();
179  switch(inMessageType) {
180  case('C'):
181  messageType = MESSAGETYPE_CONNECT;
182  break;
183  case('M'):
184  messageType = MESSAGETYPE_MESSAGE;
185  break;
186  case('A'):
187  messageType = MESSAGETYPE_ACKNOWLEDGEMENT;
188  break;
189  default:
190  throw NetworkServerException("Invalid message type");
191  }
192 
193  // connection id
194  packet->getBytes((uint8_t*)&inConnectionId, sizeof(inConnectionId));
195  if (Integer::viewDecode(string_view(inConnectionId, sizeof(inConnectionId)), connectionId) == false) {
196  throw NetworkServerException("Invalid connection id");
197  }
198 
199  // decode message id
200  packet->getBytes((uint8_t*)&inMessageId, sizeof(inMessageId));
201  if (Integer::viewDecode(string_view(inMessageId, sizeof(inMessageId)), messageId) == false) {
202  throw NetworkServerException("Invalid message id");
203  }
204 
205  // decode retries
206  packet->getBytes((uint8_t*)&inRetries, sizeof(inRetries));
207  uint32_t _retries;
208  if (Integer::viewDecode(string_view(inRetries, sizeof(inRetries)), _retries) == false) {
209  throw NetworkServerException("Invalid retries");
210  }
211  retries = _retries;
212 }
213 
214 void UDPServer::validate(const UDPPacket* packet) {
215 }
216 
218  // 14(messagetype, clientid, messageid, retries)
219  uint8_t emptyHeader[14] =
220  "\0\0\0\0\0\0\0\0\0\0"
221  "\0\0\0";
222 
223  packet->putBytes(emptyHeader, sizeof(emptyHeader));
224 }
225 
226 void UDPServer::writeHeader(UDPPacket* packet, MessageType messageType, const uint32_t clientId, const uint32_t messageId, const uint8_t retries) {
227  // store current position which should be end of packet
228  auto position = packet->getPosition();
229  // reset position to be able to write header
230  packet->reset();
231 
232  // message type
233  switch(messageType) {
234  case(MESSAGETYPE_CONNECT):
235  packet->putByte('C');
236  break;
237  case(MESSAGETYPE_MESSAGE):
238  packet->putByte('M');
239  break;
241  packet->putByte('A');
242  break;
243  default:
244  delete packet;
245  throw NetworkServerException("Invalid message type");
246  }
247 
248  // client id
249  string clientIdEncoded;
250  Integer::encode(clientId, clientIdEncoded);
251 
252  // message id
253  string messageIdEncoded;
254  Integer::encode(messageId, messageIdEncoded);
255 
256  // retries
257  string retriesEncoded;
258  Integer::encode((uint32_t)retries, retriesEncoded);
259 
260  // put to packet
261  packet->putBytes((const uint8_t*)clientIdEncoded.data(), clientIdEncoded.size());
262  packet->putBytes((const uint8_t*)messageIdEncoded.data(), messageIdEncoded.size());
263  packet->putByte(retriesEncoded[retriesEncoded.size() - 1]);
264 
265  // restore position to end of stream
266  packet->setPosition(position);
267 }
268 
270  auto clientId = client->clientId;
271 
272  //
274 
275  if (clientIdMap.size() >= maxCCU) {
276  // should actually never happen
278 
279  // failure
280  throw NetworkServerException("too many clients");
281  }
282 
283  // check if client id was mapped already?
284  auto clientIdMapIt = clientIdMap.find(clientId);
285  if (clientIdMapIt != clientIdMap.end()) {
286  // should actually never happen
288 
289  // failure
290  throw NetworkServerException("client id is already mapped");
291  }
292 
293  // prepare client struct for map
294  auto _clientId = new ClientId();
295  _clientId->clientId = clientId;
296  _clientId->client = client;
297  _clientId->time = Time::getCurrentMillis();
298 
299  // put to map
300  clientIdMap[clientId] = _clientId;
301 
302  // put to client ip set
304 
305  // check if ip exists already?
306  string clientIp = client->getIp() + ":" + to_string(client->getPort());
307  auto clientIpMapIt = clientIpMap.find(clientIp);
308  if (clientIpMapIt != clientIpMap.end()) {
309  // should actually never happen
312 
313  // failure
314  throw NetworkServerException("client ip is already registered");
315  }
316 
317  // put to map
318  clientIpMap[clientIp] = client;
319 
320  ///
322 
323  // reference counter +1
324  client->acquireReference();
325 
326  // unlock
328 }
329 
331  uint32_t clientId = client->clientId;
332 
333  //
335 
336  // check if client id was mapped already?
337  auto clientIdMapit = clientIdMap.find(clientId);
338  if (clientIdMapit == clientIdMap.end()) {
339  // should actually never happen
341 
342  // failure
343  throw NetworkServerException("client id is not mapped");
344  }
345 
346  // remove from client id map
347  delete clientIdMapit->second;
348  clientIdMap.erase(clientIdMapit);
349 
350  // remove from client ip set
352 
353  // check if ip exists already?
354  auto clientIp = client->getIp() + ":" + to_string(client->getPort());
355  auto clientIpMapIt = clientIpMap.find(clientIp);
356  if (clientIpMapIt == clientIpMap.end()) {
357  // should actually never happen
360 
361  // failure
362  throw NetworkServerException("client ip is not registered");
363  }
364 
365  // remove from ip map
366  clientIpMap.erase(clientIpMapIt);
367 
368  //
370 
371  // reference counter -1
372  client->releaseReference();
373 
374  // unlock
376 }
377 
378 UDPServerClient* UDPServer::lookupClient(const uint32_t clientId) {
379  UDPServerClient* client = nullptr;
380 
381  //
383 
384  // check if client id was mapped already?
385  auto it = clientIdMap.find(clientId);
386  if (it == clientIdMap.end()) {
387  // should actually never happen
389 
390  // failure
391  throw NetworkServerException("client does not exist");
392  }
393 
394  // get client
395  auto _client = it->second;
396  // update last access time
397  _client->time = Time::getCurrentMillis();
398  // get client
399  client = _client->client;
400 
401  //
402  client->acquireReference();
403 
404  // unlock
406 
407  //
408  return client;
409 }
410 
411 UDPServerClient* UDPServer::getClientByIp(const string& ip, const uint16_t port) {
412  UDPServerClient* client = nullptr;
414  auto clientIp = ip + ":" + to_string(port);
415  auto clientIpMapIt = clientIpMap.find(clientIp);
416  if (clientIpMapIt != clientIpMap.end()) {
417  client = clientIpMapIt->second;
418  client->acquireReference();
419  }
421  return client;
422 }
423 
425  ClientSet clientCloseList;
426 
427  // determine clients that are idle or beeing flagged to be shut down
429 
430  auto now = Time::getCurrentMillis();
431  for (const auto& [clientId, client]: clientIdMap) {
432  if (client->client->shutdownRequested == true ||
433  client->time < now - CLIENT_CLEANUP_IDLETIME) {
434 
435  // acquire reference for worker
436  client->client->acquireReference();
437 
438  // mark for beeing closed
439  clientCloseList.insert(client->client);
440  }
441  }
442 
443  //
445 
446  // erase clients
447  for (auto client: clientCloseList) {
448  // client close logic
449  client->close();
450  // remove from udp client list
451  removeClient(client);
452  }
453 }
454 
455 void UDPServer::sendMessage(const UDPServerClient* client, UDPPacket* packet, const bool safe, const bool deleteFrame, const MessageType messageType, const uint32_t messageId) {
456  // determine message id by message type
457  uint32_t _messageId;
458  switch(messageType) {
459  case(MESSAGETYPE_CONNECT):
460  case(MESSAGETYPE_MESSAGE):
461  _messageId = AtomicOperations::increment(messageCount);
462  break;
464  _messageId = messageId;
465  break;
466  default:
467  delete packet;
468  throw NetworkServerException("Invalid message type");
469  }
470 
471  unsigned int threadIdx = _messageId % ioThreads.size();
472  writeHeader(packet, messageType, client->clientId, _messageId, 0);
473  ioThreads[threadIdx]->sendMessage(client, (uint8_t)messageType, _messageId, packet, safe, deleteFrame);
474 }
475 
476 void UDPServer::processAckReceived(UDPServerClient* client, const uint32_t messageId) {
477  unsigned int threadIdx = messageId % ioThreads.size();
478  ioThreads[threadIdx]->processAckReceived(client, messageId);
479 }
480 
481 const uint32_t UDPServer::allocateClientId() {
482  return AtomicOperations::increment(clientCount);
483 }
484 
486  auto stats = statistics;
487  statistics.time = Time::getCurrentMillis();
488  statistics.received = 0;
489  statistics.sent = 0;
490  statistics.accepts = 0;
491  statistics.errors = 0;
492  // determine clients that are idle or beeing flagged to be shut down
494  stats.clients = clientIdMap.size();
496  return stats;
497 }
Standard math functions.
Definition: Math.h:19
const UDPPacket * getBytes(uint8_t *bytes, uint16_t byteCount) const
Get raw bytes from packet.
Definition: UDPPacket.h:268
const UDPPacket * setPosition(uint16_t position) const
Set position.
Definition: UDPPacket.h:62
uint16_t getSize() const
Get size of packet.
Definition: UDPPacket.h:39
uint16_t getPosition() const
Get position.
Definition: UDPPacket.h:53
UDPPacket * putByte(uint8_t value)
Puts a byte into packet.
Definition: UDPPacket.h:123
UDPPacket * putBytes(const uint8_t *bytes, uint16_t byteCount)
Puts raw bytes into packet.
Definition: UDPPacket.h:290
void reset() const
Reset position for read.
Definition: UDPPacket.h:80
uint8_t getByte() const
Get a byte from packet.
Definition: UDPPacket.h:106
Base exception class for network server exceptions.
Base class for network servers.
Definition: Server.h:32
UDPServerClient * getClientByKey(const string &clientKey)
retrieve a client by key, the client reference is acquired, must be released after usage
Definition: Server.h:109
ClientKeySet getClientKeySet()
get a copy of current client keys
Definition: Server.h:94
Base class for network UDP server clients.
const uint16_t getPort() const
returns client port
const string & getIp() const
returns client's ip
Base class for network UDP servers.
Definition: UDPServer.h:42
UDPServer_Statistics statistics
Definition: UDPServer.h:218
virtual void identify(const UDPPacket *packet, MessageType &messageType, uint32_t &connectionId, uint32_t &messageId, uint8_t &retries)
Identifies a client message.
Definition: UDPServer.cpp:161
virtual void validate(const UDPPacket *packet)
Validates a client message.
Definition: UDPServer.cpp:214
virtual void run()
main event loop
Definition: UDPServer.cpp:56
unique_ptr< ServerWorkerThreadPool > workerThreadPool
Definition: UDPServer.h:213
virtual UDPServerClient * accept(const uint32_t clientId, const string &ip, const uint16_t port)
method to implement for accepting clients
Definition: UDPServer.cpp:157
void processAckReceived(UDPServerClient *client, const uint32_t messageId)
Processes an acknowlegdement reception.
Definition: UDPServer.cpp:476
UDPServerClient * getClientByIp(const string &ip, const uint16_t port)
Returns client by host name and port.
Definition: UDPServer.cpp:411
static void initializeHeader(UDPPacket *packet)
Writes a empty header to packet.
Definition: UDPServer.cpp:217
virtual ~UDPServer()
destructor
Definition: UDPServer.cpp:53
const UDPServer_Statistics getStatistics()
Definition: UDPServer.cpp:485
void addClient(UDPServerClient *client)
maps a new client to a given client id
Definition: UDPServer.cpp:269
UDPServerClient * lookupClient(const uint32_t clientId)
Look ups a client by client id.
Definition: UDPServer.cpp:378
vector< unique_ptr< UDPServerIOThread > > ioThreads
Definition: UDPServer.h:212
void cleanUpClients()
Clean up clients that have been idle for some time or are flagged to be shut down.
Definition: UDPServer.cpp:424
static const uint64_t CLIENT_CLEANUP_IDLETIME
Definition: UDPServer.h:128
unordered_set< UDPServerClient * > ClientSet
Definition: UDPServer.h:136
void sendMessage(const UDPServerClient *client, UDPPacket *packet, const bool safe, const bool deleteFrame, const MessageType messageType, const uint32_t messageId=MESSAGE_ID_NONE)
pushes a message to be send, takes over ownership of frame
Definition: UDPServer.cpp:455
const uint32_t allocateClientId()
Allocates a client id for a new client.
Definition: UDPServer.cpp:481
virtual void writeHeader(UDPPacket *packet, MessageType messageType, const uint32_t clientId, const uint32_t messageId, const uint8_t retries)
Writes a message header to message.
Definition: UDPServer.cpp:226
void removeClient(UDPServerClient *client)
removes a client
Definition: UDPServer.cpp:330
Barrier implementation.
Definition: Barrier.h:21
Implementation for read/write lock.
Definition: ReadWriteLock.h:17
void writeLock()
Locks for writing / exclusive lock.
Definition: ReadWriteLock.h:43
void unlock()
Unlocks this read write lock.
Definition: ReadWriteLock.h:50
void readLock()
Locks for reading / shared lock.
Definition: ReadWriteLock.h:36
Base class for threads.
Definition: Thread.h:20
static void sleep(const uint64_t milliseconds)
sleeps current thread for given time in milliseconds
Definition: Thread.h:48
bool isStopRequested()
Returns if stop has been requested.
Definition: Thread.h:77
Console class.
Definition: Console.h:29
Integer class.
Definition: Integer.h:25
Run time type information utility class.
Definition: RTTI.h:14
virtual void releaseReference()
Releases a reference, thus decrementing the counter and delete it if reference counter is zero.
Definition: Reference.h:38
virtual void acquireReference()
Acquires a reference, incrementing the counter.
Definition: Reference.h:31
Time utility class.
Definition: Time.h:20