6 #include <unordered_map>
22 using std::make_unique;
25 using std::unique_ptr;
26 using std::unordered_map;
44 const uint64_t UDPServerIOThread::MESSAGEACK_RESENDTIMES[UDPServerIOThread::MESSAGEACK_RESENDTIMES_TRIES] = {125L, 250L, 500L, 750L, 1000L, 2000L, 5000L};
46 UDPServerIOThread::UDPServerIOThread(
const unsigned int id,
UDPServer *server,
const unsigned int maxCCU,
Barrier* startUpBarrier) :
47 Thread(
"nioudpserveriothread"),
51 startUpBarrier(startUpBarrier),
52 messageQueueMutex(
"nioupserveriothread_messagequeue"),
53 messageMapAckMutex(
"nioupserveriothread_messagequeueack") {
67 for (
const auto& [messageId, message]:
messageMapAck)
delete message;
73 Console::println(
"UDPServerIOThread[" + to_string(
id) +
"]::run(): start");
89 auto lastMessageQueueAckTime = Time::getCurrentMillis();
91 auto now = Time::getCurrentMillis();
94 if (now >= lastMessageQueueAckTime + 25L) {
96 lastMessageQueueAckTime = now;
103 for (
auto i = 0; i < events; i++) {
115 if (hasReadInterest ==
true) {
116 ssize_t bytesReceived;
122 while ((bytesReceived =
socket->read(ip, port, (
void*)message,
sizeof(message))) > 0) {
131 auto packet = make_unique<UDPPacket>();
132 packet->putBytes((
const uint8_t*)message, bytesReceived);
143 server->
identify(packet.get(), messageType, clientId, messageId, retries);
146 switch(messageType) {
154 if (client !=
nullptr) {
198 if (client->
ip != ip || client->
port != port) {
219 "UDPServerIOThread[" +
222 (RTTI::demangle(
typeid(exception).
name())) +
227 if (clientNew !=
nullptr) {
231 if (client !=
nullptr) {
242 while (hasWriteInterest ==
true) {
248 messageQueueBatch.push(message);
254 while (messageQueueBatch.empty() ==
false) {
255 auto message = messageQueueBatch.front();
256 if (
socket->write(message->ip, message->port, (
void*)message->message, message->bytes) == -1) {
263 messageQueueBatch.pop();
270 if (messageQueueBatch.empty() ==
true) {
281 hasWriteInterest =
false;
287 auto message = messageQueueBatch.front();
289 messageQueueBatch.pop();
290 }
while (messageQueueBatch.empty() ==
false);
294 hasWriteInterest =
false;
304 "UDPServerIOThread[" +
307 (RTTI::demangle(
typeid(exception).
name())) +
318 Console::println(
"UDPServerIOThread[" + to_string(
id) +
"]::run(): done");
323 auto message = make_unique<Message>();
324 message->ip = client->
ip;
325 message->port = client->
port;
326 message->time = Time::getCurrentMillis();
327 message->messageType = messageType;
328 message->clientId = client->
clientId;
329 message->messageId = messageId;
330 message->retries = 0;
331 message->bytes = packet->
getSize();
337 packet->
getBytes((uint8_t*)message->message, message->bytes);
342 if (deletePacket ==
true)
delete packet;
361 auto messageAck =
new Message();
362 *messageAck = *message;
392 bool messageAckValid =
true;
399 auto messageAck = it->second;
401 messageAckValid = messageAck->ip == client->
ip && messageAck->port == client->
port;
403 if (messageAckValid ==
true) {
415 if (messageAckValid ==
false) {
422 auto now = Time::getCurrentMillis();
427 auto messageAck = it->second;
440 messageAck->retries++;
444 *message = *messageAck;
448 packet.
putBytes((
const uint8_t*)message->message, message->bytes);
451 packet.
getBytes((uint8_t*)message->message, message->bytes);
454 messageQueueResend.push(message);
461 if (messageQueueResend.empty() ==
false) {
464 auto message = messageQueueResend.front();
466 messageQueueResend.pop();
477 }
while (messageQueueResend.empty() ==
false);
const UDPPacket * getBytes(uint8_t *bytes, uint16_t byteCount) const
Get raw bytes from packet.
const UDPPacket * setPosition(uint16_t position) const
Set position.
uint16_t getSize() const
Get size of packet.
uint16_t getPosition() const
Get position.
UDPPacket * putBytes(const uint8_t *bytes, uint16_t byteCount)
Puts raw bytes into packet.
void reset() const
Reset position for read.
Base exception class for network server exceptions.
Base class for network UDP server clients.
void init()
initiates this network client
const string & getKey() const
Client identification key.
void shutdown()
Shuts down this network client.
virtual void onPacketReceived(const UDPPacket *packet, const uint32_t messageId=0, const uint8_t retries=0)
Event, which will be called if packet has been received, defaults to worker thread pool.
const bool setKey(const string &key)
sets the clients identification key
void sendConnected()
Sends an connect message to client.
UDP Network server IO thread.
static const int MESSAGEACK_RESENDTIMES_TRIES
virtual void run()
thread program
MessageQueue messageQueue
MessageMapAck messageMapAck
void processAckMessages()
Clean up timed out safe messages, reissue messages not beeing acknowlegded from client.
void processAckReceived(UDPServerClient *client, const uint32_t messageId)
Processes an acknowlegdement reception.
queue< Message * > MessageQueue
~UDPServerIOThread()
Destructor.
static const int MESSAGEQUEUE_SEND_BATCH_SIZE
unique_ptr< UDPSocket > socket
STATIC_DLL_IMPEXT static const uint64_t MESSAGEACK_RESENDTIMES[MESSAGEACK_RESENDTIMES_TRIES]
void sendMessage(const UDPServerClient *client, const uint8_t messageType, const uint32_t messageId, const UDPPacket *packet, const bool safe, const bool deletePacket)
pushes a message to be send, takes over ownership of frame
Base class for network UDP servers.
UDPServer_Statistics statistics
virtual void identify(const UDPPacket *packet, MessageType &messageType, uint32_t &connectionId, uint32_t &messageId, uint8_t &retries)
Identifies a client message.
virtual void validate(const UDPPacket *packet)
Validates a client message.
virtual UDPServerClient * accept(const uint32_t clientId, const string &ip, const uint16_t port)
method to implement for accepting clients
void processAckReceived(UDPServerClient *client, const uint32_t messageId)
Processes an acknowlegdement reception.
UDPServerClient * getClientByIp(const string &ip, const uint16_t port)
Returns client by host name and port.
void addClient(UDPServerClient *client)
maps a new client to a given client id
UDPServerClient * lookupClient(const uint32_t clientId)
Look ups a client by client id.
@ MESSAGETYPE_ACKNOWLEDGEMENT
const uint32_t allocateClientId()
Allocates a client id for a new client.
virtual void writeHeader(UDPPacket *packet, MessageType messageType, const uint32_t clientId, const uint32_t messageId, const uint8_t retries)
Writes a message header to message.
Interface to kernel event mechanismns.
void shutdownKernelEventMechanism()
Shutdowns the kernel event mechanism.
void setSocketInterest(NetworkSocket *socket, const NIOInterest lastInterest, const NIOInterest interest, const void *cookie)
Sets a non blocked socket io interest.
int doKernelEventMechanism()
Do the kernel event mechanism.
void initKernelEventMechanism(const unsigned int maxSockets)
Initializes the kernel event mechanism.
void decodeKernelEvent(const unsigned int index, NIOInterest &interest, void *&cookie)
Decodes a kernel event.
bool wait()
Waits on barrier.
void unlock()
Unlocks this mutex.
void lock()
Locks the mutex, additionally mutex locks will block until other locks have been unlocked.
bool isStopRequested()
Returns if stop has been requested.
Run time type information utility class.
virtual void releaseReference()
Releases a reference, thus decrementing the counter and delete it if reference counter is zero.
const NIOInterest NIO_INTEREST_NONE
const NIOInterest NIO_INTEREST_READ
uint8_t NIOInterest
type definition for network IO interest
const NIOInterest NIO_INTEREST_WRITE
std::exception Exception
Exception base class.