diff --git a/src/common/Thread.h b/src/common/Thread.h index 326d0080..0a75292c 100644 --- a/src/common/Thread.h +++ b/src/common/Thread.h @@ -18,6 +18,7 @@ #include "common/Defines.h" #include + #include // --------------------------------------------------------------------------- diff --git a/src/common/lookups/LookupTable.h b/src/common/lookups/LookupTable.h index f013e83a..0b219483 100644 --- a/src/common/lookups/LookupTable.h +++ b/src/common/lookups/LookupTable.h @@ -114,9 +114,11 @@ namespace lookups { try { m_table.at(id); + m_mutex.unlock(); return true; } catch (...) { + m_mutex.unlock(); return false; } } @@ -145,6 +147,8 @@ namespace lookups /// True, if lookup table was loaded, otherwise false. virtual bool load() = 0; + /// Saves the table from the lookup table in memory. + /// True, if lookup table was saved, otherwise false. virtual bool save() = 0; }; } // namespace lookups diff --git a/src/fne/network/FNENetwork.cpp b/src/fne/network/FNENetwork.cpp index cf531371..2cfb4fe6 100644 --- a/src/fne/network/FNENetwork.cpp +++ b/src/fne/network/FNENetwork.cpp @@ -14,7 +14,6 @@ #include "common/edac/SHA256.h" #include "common/network/json/json.h" #include "common/Log.h" -#include "common/ThreadFunc.h" #include "common/Utils.h" #include "network/FNENetwork.h" #include "network/fne/TagDMRData.h" @@ -206,6 +205,113 @@ void FNENetwork::processNetwork() } } +/// +/// Updates the timer by the passed number of milliseconds. +/// +/// +void FNENetwork::clock(uint32_t ms) +{ + if (m_status != NET_STAT_MST_RUNNING) { + return; + } + + uint64_t now = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + + m_maintainenceTimer.clock(ms); + if (m_maintainenceTimer.isRunning() && m_maintainenceTimer.hasExpired()) { + // check to see if any peers have been quiet (no ping) longer than allowed + std::vector peersToRemove = std::vector(); + for (auto peer : m_peers) { + uint32_t id = peer.first; + FNEPeerConnection* connection = peer.second; + if (connection != nullptr) { + if (connection->connected()) { + uint64_t dt = connection->lastPing() + (m_host->m_pingTime * m_host->m_maxMissedPings); + if (dt < now) { + LogInfoEx(LOG_NET, "PEER %u timed out, dt = %u, now = %u", id, dt, now); + peersToRemove.push_back(id); + } + } + } + } + + // remove any peers + for (uint32_t peerId : peersToRemove) { + FNEPeerConnection* connection = m_peers[peerId]; + m_peers.erase(peerId); + if (connection != nullptr) { + delete connection; + } + + erasePeerAffiliations(peerId); + } + + // roll the RTP timestamp if no call is in progress + if (!m_callInProgress) { + frame::RTPHeader::resetStartTime(); + m_frameQueue->clearTimestamps(); + } + + m_maintainenceTimer.start(); + } +} + +/// +/// Opens connection to the network. +/// +/// +bool FNENetwork::open() +{ + if (m_debug) + LogMessage(LOG_NET, "Opening Network"); + + m_status = NET_STAT_MST_RUNNING; + m_maintainenceTimer.start(); + + m_socket = new udp::Socket(m_address, m_port); + + // reinitialize the frame queue + if (m_frameQueue != nullptr) { + delete m_frameQueue; + m_frameQueue = new FrameQueue(m_socket, m_peerId, m_debug); + } + + bool ret = m_socket->open(); + if (!ret) { + m_status = NET_STAT_INVALID; + } + + return ret; +} + +/// +/// Closes connection to the network. +/// +void FNENetwork::close() +{ + if (m_debug) + LogMessage(LOG_NET, "Closing Network"); + + if (m_status == NET_STAT_MST_RUNNING) { + uint8_t buffer[1U]; + ::memset(buffer, 0x00U, 1U); + + for (auto peer : m_peers) { + writePeer(peer.first, { NET_FUNC_MST_CLOSING, NET_SUBFUNC_NOP }, buffer, 1U, (ushort)0U, 0U); + } + } + + m_socket->close(); + + m_maintainenceTimer.stop(); + + m_status = NET_STAT_INVALID; +} + +// --------------------------------------------------------------------------- +// Private Class Members +// --------------------------------------------------------------------------- + /// /// Process a data frames from the network. /// @@ -776,113 +882,6 @@ void* FNENetwork::threadedNetworkRx(void* arg) return nullptr; } -/// -/// Updates the timer by the passed number of milliseconds. -/// -/// -void FNENetwork::clock(uint32_t ms) -{ - if (m_status != NET_STAT_MST_RUNNING) { - return; - } - - uint64_t now = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - - m_maintainenceTimer.clock(ms); - if (m_maintainenceTimer.isRunning() && m_maintainenceTimer.hasExpired()) { - // check to see if any peers have been quiet (no ping) longer than allowed - std::vector peersToRemove = std::vector(); - for (auto peer : m_peers) { - uint32_t id = peer.first; - FNEPeerConnection* connection = peer.second; - if (connection != nullptr) { - if (connection->connected()) { - uint64_t dt = connection->lastPing() + (m_host->m_pingTime * m_host->m_maxMissedPings); - if (dt < now) { - LogInfoEx(LOG_NET, "PEER %u timed out, dt = %u, now = %u", id, dt, now); - peersToRemove.push_back(id); - } - } - } - } - - // remove any peers - for (uint32_t peerId : peersToRemove) { - FNEPeerConnection* connection = m_peers[peerId]; - m_peers.erase(peerId); - if (connection != nullptr) { - delete connection; - } - - erasePeerAffiliations(peerId); - } - - // roll the RTP timestamp if no call is in progress - if (!m_callInProgress) { - frame::RTPHeader::resetStartTime(); - m_frameQueue->clearTimestamps(); - } - - m_maintainenceTimer.start(); - } -} - -/// -/// Opens connection to the network. -/// -/// -bool FNENetwork::open() -{ - if (m_debug) - LogMessage(LOG_NET, "Opening Network"); - - m_status = NET_STAT_MST_RUNNING; - m_maintainenceTimer.start(); - - m_socket = new udp::Socket(m_address, m_port); - - // reinitialize the frame queue - if (m_frameQueue != nullptr) { - delete m_frameQueue; - m_frameQueue = new FrameQueue(m_socket, m_peerId, m_debug); - } - - bool ret = m_socket->open(); - if (!ret) { - m_status = NET_STAT_INVALID; - } - - return ret; -} - -/// -/// Closes connection to the network. -/// -void FNENetwork::close() -{ - if (m_debug) - LogMessage(LOG_NET, "Closing Network"); - - if (m_status == NET_STAT_MST_RUNNING) { - uint8_t buffer[1U]; - ::memset(buffer, 0x00U, 1U); - - for (auto peer : m_peers) { - writePeer(peer.first, { NET_FUNC_MST_CLOSING, NET_SUBFUNC_NOP }, buffer, 1U, (ushort)0U, 0U); - } - } - - m_socket->close(); - - m_maintainenceTimer.stop(); - - m_status = NET_STAT_INVALID; -} - -// --------------------------------------------------------------------------- -// Private Class Members -// --------------------------------------------------------------------------- - /// /// Helper to erase the peer from the peers affiliations list. /// diff --git a/src/fne/network/FNENetwork.h b/src/fne/network/FNENetwork.h index ebe2b81e..2e692e57 100644 --- a/src/fne/network/FNENetwork.h +++ b/src/fne/network/FNENetwork.h @@ -26,6 +26,8 @@ #include #include +#include + // --------------------------------------------------------------------------- // Class Prototypes // --------------------------------------------------------------------------- @@ -209,8 +211,6 @@ namespace network /// Process a data frames from the network. void processNetwork(); - /// Entry point to process a given network packet. - static void* threadedNetworkRx(void* arg); /// Updates the timer by the passed number of milliseconds. void clock(uint32_t ms) override; @@ -267,6 +267,9 @@ namespace network bool m_reportPeerPing; bool m_verbose; + /// Entry point to process a given network packet. + static void* threadedNetworkRx(void* arg); + /// Helper to erase the peer from the peers affiliations list. bool erasePeerAffiliations(uint32_t peerId); /// Helper to erase the peer from the peers list. diff --git a/src/fne/network/fne/TagDMRData.cpp b/src/fne/network/fne/TagDMRData.cpp index 3069205a..2c6cd261 100644 --- a/src/fne/network/fne/TagDMRData.cpp +++ b/src/fne/network/fne/TagDMRData.cpp @@ -212,6 +212,7 @@ bool TagDMRData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId // repeat traffic to the connected peers if (m_network->m_peers.size() > 0U) { + uint32_t i = 0U; for (auto peer : m_network->m_peers) { if (peerId != peer.first) { // is this peer ignored? @@ -219,6 +220,11 @@ bool TagDMRData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId continue; } + // every 5 peers flush the queue + if (i % 5U == 0U) { + m_network->m_frameQueue->flushQueue(); + } + uint8_t outboundPeerBuffer[len]; ::memset(outboundPeerBuffer, 0x00U, len); ::memcpy(outboundPeerBuffer, buffer, len); @@ -234,6 +240,7 @@ bool TagDMRData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId if (!m_network->m_callInProgress) m_network->m_callInProgress = true; + i++; } } m_network->m_frameQueue->flushQueue(); diff --git a/src/fne/network/fne/TagNXDNData.cpp b/src/fne/network/fne/TagNXDNData.cpp index 9ac0311e..fde01e24 100644 --- a/src/fne/network/fne/TagNXDNData.cpp +++ b/src/fne/network/fne/TagNXDNData.cpp @@ -182,6 +182,7 @@ bool TagNXDNData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerI // repeat traffic to the connected peers if (m_network->m_peers.size() > 0U) { + uint32_t i = 0U; for (auto peer : m_network->m_peers) { if (peerId != peer.first) { // is this peer ignored? @@ -189,6 +190,11 @@ bool TagNXDNData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerI continue; } + // every 5 peers flush the queue + if (i % 5U == 0U) { + m_network->m_frameQueue->flushQueue(); + } + uint8_t outboundPeerBuffer[len]; ::memset(outboundPeerBuffer, 0x00U, len); ::memcpy(outboundPeerBuffer, buffer, len); @@ -204,6 +210,7 @@ bool TagNXDNData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerI if (!m_network->m_callInProgress) m_network->m_callInProgress = true; + i++; } } m_network->m_frameQueue->flushQueue(); diff --git a/src/fne/network/fne/TagP25Data.cpp b/src/fne/network/fne/TagP25Data.cpp index dd5500fa..41213224 100644 --- a/src/fne/network/fne/TagP25Data.cpp +++ b/src/fne/network/fne/TagP25Data.cpp @@ -246,6 +246,7 @@ bool TagP25Data::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId // repeat traffic to the connected peers if (m_network->m_peers.size() > 0U) { + uint32_t i = 0U; for (auto peer : m_network->m_peers) { if (peerId != peer.first) { // is this peer ignored? @@ -253,6 +254,11 @@ bool TagP25Data::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId continue; } + // every 5 peers flush the queue + if (i % 5U == 0U) { + m_network->m_frameQueue->flushQueue(); + } + uint8_t outboundPeerBuffer[len]; ::memset(outboundPeerBuffer, 0x00U, len); ::memcpy(outboundPeerBuffer, buffer, len); @@ -268,6 +274,7 @@ bool TagP25Data::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId if (!m_network->m_callInProgress) m_network->m_callInProgress = true; + i++; } } m_network->m_frameQueue->flushQueue();