23 using std::make_unique;
25 using std::string_view;
27 using std::unique_ptr;
42 UDPServer::UDPServer(
const std::string& name,
const std::string& host,
const unsigned int port,
const unsigned int maxCCU) :
45 clientIdMapReadWriteLock(
"nioudpserver_clientidmap"),
46 clientIpMapReadWriteLock(
"nioudpserver_clientipmap"),
47 workerThreadPool(nullptr),
57 Console::println(
"UDPServer::run(): start");
61 auto startUpBarrier = make_unique<Barrier>(
"nioudpserver_startup_workers",
workerThreadPoolCount + 1);
68 startUpBarrier->wait();
73 auto startUpBarrier = make_unique<Barrier>(
"nioudpserver_startup_iothreads",
ioThreadCount + 1);
83 startUpBarrier->wait();
88 Console::println(
"UDPServer::run(): ready");
91 auto lastCleanUpClientsTime = Time::getCurrentMillis();
92 auto lastCleanUpClientsSafeMessagesTime = Time::getCurrentMillis();
95 auto now = Time::getCurrentMillis();
98 if (now >= lastCleanUpClientsTime + 100L) {
100 lastCleanUpClientsTime = now;
104 if (now >= lastCleanUpClientsSafeMessagesTime + 100L) {
106 for (
const auto& clientKey: _clientKeySet) {
110 if (client ==
nullptr)
continue;
113 client->cleanUpSafeMessages();
116 client->releaseReference();
118 lastCleanUpClientsSafeMessagesTime = now;
122 auto duration = Time::getCurrentMillis() - now;
125 if (duration < 100L) {
126 sleep(100L - duration);
132 for (
const auto& clientKey: _clientKeySet) {
135 if (client ==
nullptr)
continue;
154 Console::println(
"UDPServer::run(): done");
164 char inConnectionId[6];
170 sizeof(inMessageType) +
171 sizeof(inConnectionId) +
172 sizeof(inMessageId) +
178 inMessageType = packet->
getByte();
179 switch(inMessageType) {
194 packet->
getBytes((uint8_t*)&inConnectionId,
sizeof(inConnectionId));
195 if (Integer::viewDecode(string_view(inConnectionId,
sizeof(inConnectionId)), connectionId) ==
false) {
200 packet->
getBytes((uint8_t*)&inMessageId,
sizeof(inMessageId));
201 if (Integer::viewDecode(string_view(inMessageId,
sizeof(inMessageId)), messageId) ==
false) {
206 packet->
getBytes((uint8_t*)&inRetries,
sizeof(inRetries));
208 if (Integer::viewDecode(string_view(inRetries,
sizeof(inRetries)), _retries) ==
false) {
219 uint8_t emptyHeader[14] =
220 "\0\0\0\0\0\0\0\0\0\0"
223 packet->
putBytes(emptyHeader,
sizeof(emptyHeader));
233 switch(messageType) {
249 string clientIdEncoded;
250 Integer::encode(clientId, clientIdEncoded);
253 string messageIdEncoded;
254 Integer::encode(messageId, messageIdEncoded);
257 string retriesEncoded;
258 Integer::encode((uint32_t)retries, retriesEncoded);
261 packet->
putBytes((
const uint8_t*)clientIdEncoded.data(), clientIdEncoded.size());
262 packet->
putBytes((
const uint8_t*)messageIdEncoded.data(), messageIdEncoded.size());
263 packet->
putByte(retriesEncoded[retriesEncoded.size() - 1]);
295 _clientId->clientId = clientId;
296 _clientId->client = client;
297 _clientId->time = Time::getCurrentMillis();
306 string clientIp = client->
getIp() +
":" + to_string(client->
getPort());
331 uint32_t clientId = client->
clientId;
347 delete clientIdMapit->second;
354 auto clientIp = client->
getIp() +
":" + to_string(client->
getPort());
395 auto _client = it->second;
397 _client->time = Time::getCurrentMillis();
399 client = _client->client;
414 auto clientIp = ip +
":" + to_string(
port);
417 client = clientIpMapIt->second;
430 auto now = Time::getCurrentMillis();
431 for (
const auto& [clientId, client]:
clientIdMap) {
432 if (client->client->shutdownRequested ==
true ||
436 client->client->acquireReference();
439 clientCloseList.insert(client->client);
447 for (
auto client: clientCloseList) {
458 switch(messageType) {
464 _messageId = messageId;
471 unsigned int threadIdx = _messageId %
ioThreads.size();
473 ioThreads[threadIdx]->sendMessage(client, (uint8_t)messageType, _messageId, packet, safe, deleteFrame);
477 unsigned int threadIdx = messageId %
ioThreads.size();
478 ioThreads[threadIdx]->processAckReceived(client, messageId);
const UDPPacket * getBytes(uint8_t *bytes, uint16_t byteCount) const
Get raw bytes from packet.
const UDPPacket * setPosition(uint16_t position) const
Set position.
uint16_t getSize() const
Get size of packet.
uint16_t getPosition() const
Get position.
UDPPacket * putByte(uint8_t value)
Puts a byte into packet.
UDPPacket * putBytes(const uint8_t *bytes, uint16_t byteCount)
Puts raw bytes into packet.
void reset() const
Reset position for read.
uint8_t getByte() const
Get a byte from packet.
Base exception class for network server exceptions.
Base class for network servers.
UDPServerClient * getClientByKey(const string &clientKey)
retrieve a client by key, the client reference is acquired, must be released after usage
ClientKeySet getClientKeySet()
get a copy of current client keys
int workerThreadPoolMaxElements
int workerThreadPoolCount
Base class for network UDP server clients.
const uint16_t getPort() const
returns client port
const string & getIp() const
returns client's ip
UDP Network server IO thread.
Base class for network UDP servers.
UDPServer_Statistics statistics
virtual void identify(const UDPPacket *packet, MessageType &messageType, uint32_t &connectionId, uint32_t &messageId, uint8_t &retries)
Identifies a client message.
virtual void validate(const UDPPacket *packet)
Validates a client message.
virtual void run()
main event loop
unique_ptr< ServerWorkerThreadPool > workerThreadPool
virtual UDPServerClient * accept(const uint32_t clientId, const string &ip, const uint16_t port)
method to implement for accepting clients
void processAckReceived(UDPServerClient *client, const uint32_t messageId)
Processes an acknowlegdement reception.
UDPServerClient * getClientByIp(const string &ip, const uint16_t port)
Returns client by host name and port.
static void initializeHeader(UDPPacket *packet)
Writes a empty header to packet.
ReadWriteLock clientIpMapReadWriteLock
virtual ~UDPServer()
destructor
const UDPServer_Statistics getStatistics()
void addClient(UDPServerClient *client)
maps a new client to a given client id
UDPServerClient * lookupClient(const uint32_t clientId)
Look ups a client by client id.
vector< unique_ptr< UDPServerIOThread > > ioThreads
void cleanUpClients()
Clean up clients that have been idle for some time or are flagged to be shut down.
static const uint64_t CLIENT_CLEANUP_IDLETIME
unordered_set< UDPServerClient * > ClientSet
@ MESSAGETYPE_ACKNOWLEDGEMENT
ReadWriteLock clientIdMapReadWriteLock
void sendMessage(const UDPServerClient *client, UDPPacket *packet, const bool safe, const bool deleteFrame, const MessageType messageType, const uint32_t messageId=MESSAGE_ID_NONE)
pushes a message to be send, takes over ownership of frame
const uint32_t allocateClientId()
Allocates a client id for a new client.
virtual void writeHeader(UDPPacket *packet, MessageType messageType, const uint32_t clientId, const uint32_t messageId, const uint8_t retries)
Writes a message header to message.
void removeClient(UDPServerClient *client)
removes a client
Implementation for read/write lock.
void writeLock()
Locks for writing / exclusive lock.
void unlock()
Unlocks this read write lock.
void readLock()
Locks for reading / shared lock.
static void sleep(const uint64_t milliseconds)
sleeps current thread for given time in milliseconds
bool isStopRequested()
Returns if stop has been requested.
Run time type information utility class.
virtual void releaseReference()
Releases a reference, thus decrementing the counter and delete it if reference counter is zero.
virtual void acquireReference()
Acquires a reference, incrementing the counter.