7 #include <unordered_map>
24 using std::make_unique;
26 using std::unique_ptr;
27 using std::unordered_map;
47 const uint64_t UDPClient::MESSAGEACK_RESENDTIMES[UDPClient::MESSAGEACK_RESENDTIMES_TRIES] = {125L, 250L, 500L, 750L, 1000L, 2000L, 5000L};
49 UDPClient::UDPClient(
const string& ip,
const uint16_t port) :
50 Thread(
"nioudpclientthread"),
51 messageQueueMutex(
"nioupclientthread_messagequeue"),
52 messageMapAckMutex(
"nioupclientthread_messagequeueack"),
53 recvMessageQueueMutex(
"nioupclientthread_recvmessagequeuemutex"),
54 messageMapSafeMutex(
"nioupclientthread_messagemasafemutex"),
74 for (
const auto& [messageId, message]:
messageMapAck)
delete message;
86 for (
const auto& [messageId, message]:
messageMapSafe)
delete message;
92 Console::println(
"UDPClient::run(): start");
97 socket = unique_ptr<UDPSocket>(UDPSocket::createClientSocket(NetworkSocket::determineIpVersion(
ip)));
107 auto lastMessageQueueAckTime = Time::getCurrentMillis();
108 auto lastMessageConnectTime = Time::getCurrentMillis();
109 auto lastMessageSafeCleanTime = Time::getCurrentMillis();
111 auto now = Time::getCurrentMillis();
114 if (
connected ==
false && now >= lastMessageConnectTime + 25L) {
117 lastMessageConnectTime = now;
121 if (now >= lastMessageQueueAckTime + 25L) {
123 lastMessageQueueAckTime = now;
127 if (now >= lastMessageSafeCleanTime + 25L) {
129 lastMessageQueueAckTime = now;
136 for(
unsigned int i = 0; i < (
unsigned int)events; i++) {
148 if (hasReadInterest ==
true) {
149 ssize_t bytesReceived;
151 unsigned int fromPort;
155 while ((bytesReceived =
socket->read(fromIp, fromPort, (
void*)message,
sizeof(message))) > 0) {
161 if (clientMessage ==
nullptr) {
164 switch(clientMessage->getMessageType()) {
176 clientMessage->getClientId(),
177 clientMessage->getMessageId(),
178 clientMessage->getRetryCount() + 1,
183 clientId = clientMessage->getClientId();
185 auto packet = clientMessage->getPacket();
212 "UDPClient::run(): " +
213 RTTI::demangle(
typeid(exception).
name()) +
215 string(exception.what())
230 while (hasWriteInterest ==
true) {
236 messageQueueBatch.push(message);
242 while (messageQueueBatch.empty() ==
false) {
243 auto message = messageQueueBatch.front();
244 if (
socket->write(
ip,
port, (
void*)message->message, message->bytes) == -1) {
251 auto message = messageQueueBatch.front();
253 messageQueueBatch.pop();
260 if (messageQueueBatch.empty() ==
true) {
271 hasWriteInterest =
false;
277 auto message = messageQueueBatch.front();
279 messageQueueBatch.pop();
280 }
while (messageQueueBatch.empty() ==
false);
284 hasWriteInterest =
false;
294 "UDPClient::run(): " +
295 RTTI::demangle(
typeid(exception).
name()) +
297 string(exception.what())
306 Console::println(
"UDPClient::run(): done");
311 auto clientMessagePtr = unique_ptr<UDPClientMessage>(clientMessage);
312 auto message = make_unique<Message>();
313 message->time = clientMessagePtr->getTime();
314 message->messageType = clientMessagePtr->getMessageType();
315 message->messageId = clientMessagePtr->getMessageId();
316 message->retries = 0;
317 clientMessagePtr->generate(message->message, message->bytes);
336 auto messageAck =
new Message();
337 *messageAck = *message;
367 auto messageAckValid =
true;
374 messageAckValid =
true;
376 if (messageAckValid ==
true) {
385 if (messageAckValid ==
false) {
392 auto now = Time::getCurrentMillis();
397 auto messageAck = it->second;
410 messageAck->retries++;
414 *message = *messageAck;
420 clientMessage->retry();
423 clientMessage->generate(message->message, message->bytes);
426 messageQueueResend.push(message);
433 if (messageQueueResend.empty() ==
false) {
436 auto message = messageQueueResend.front();
438 messageQueueResend.pop();
449 }
while (messageQueueResend.empty() ==
false);
455 auto messageProcessed =
false;
465 messageProcessed =
true;
466 auto message = it->second;
467 message->receptions++;
471 message->messageId = messageId;
472 message->receptions = 1;
473 message->time = Time::getCurrentMillis();
494 return messageProcessed ==
true?
false:
true;
503 auto now = Time::getCurrentMillis();
Base exception class for network client exceptions.
const uint32_t getMessageId()
static UDPClientMessage * parse(const char message[512], uint16_t bytes)
Parse UDP client message.
@ MESSAGETYPE_ACKNOWLEDGEMENT
void cleanUpSafeMessages()
Clean up safe messages.
static const int MESSAGEACK_RESENDTIMES_TRIES
virtual void run()
Run thread program.
MessageQueue messageQueue
MessageMapSafe messageMapSafe
MessageMapAck messageMapAck
void processAckReceived(const uint32_t messageId)
Processes ack reveived.
void processAckMessages()
Process ack messages.
queue< Message * > MessageQueue
static const uint64_t MESSAGESSAFE_KEEPTIME
const UDPClient_Statistics getStatistics()
static const int MESSAGEQUEUE_SEND_BATCH_SIZE
unique_ptr< UDPSocket > socket
void sendMessage(UDPClientMessage *clientMessage, bool safe)
Pushes a message to be send, takes over ownership of message.
UDPClientMessage * createMessage(const UDPPacket *packet)
Create message.
Mutex recvMessageQueueMutex
STATIC_DLL_IMPEXT static const uint64_t MESSAGEACK_RESENDTIMES[MESSAGEACK_RESENDTIMES_TRIES]
Mutex messageMapSafeMutex
bool processSafeMessage(UDPClientMessage *clientMessage)
Returns if a message should be processed or already have been processed.
UDPClientMessage * receiveMessage()
Receive message.
UDPClient_Statistics statistics
RecvMessageQueue recvMessageQueue
Interface to kernel event mechanismns.
void shutdownKernelEventMechanism()
Shutdowns the kernel event mechanism.
void setSocketInterest(NetworkSocket *socket, const NIOInterest lastInterest, const NIOInterest interest, const void *cookie)
Sets a non blocked socket io interest.
int doKernelEventMechanism()
Do the kernel event mechanism.
void initKernelEventMechanism(const unsigned int maxSockets)
Initializes the kernel event mechanism.
void decodeKernelEvent(const unsigned int index, NIOInterest &interest, void *&cookie)
Decodes a kernel event.
Base class of network sockets.
void unlock()
Unlocks this mutex.
void lock()
Locks the mutex, additionally mutex locks will block until other locks have been unlocked.
bool isStopRequested()
Returns if stop has been requested.
Run time type information utility class.
const NIOInterest NIO_INTEREST_NONE
const NIOInterest NIO_INTEREST_READ
uint8_t NIOInterest
type definition for network IO interest
const NIOInterest NIO_INTEREST_WRITE
std::exception Exception
Exception base class.