add missing mutex unlocks for the lookup tables (I'm not sure how this hasn't caused a problem); reorganize the code in FNENetwork and make threadedNetworkRx private; add some counting logic to ensure the voice tag classes flush voice frames every 5 peers to ensure timely delivery of packets;

pull/51/head
Bryan Biedenkapp 2 years ago
parent e348b15d7f
commit 5d6c99da43

@ -18,6 +18,7 @@
#include "common/Defines.h"
#include <string>
#include <pthread.h>
// ---------------------------------------------------------------------------

@ -114,9 +114,11 @@ namespace lookups
{
try {
m_table.at(id);
m_mutex.unlock();
return true;
}
catch (...) {
m_mutex.unlock();
return false;
}
}
@ -145,6 +147,8 @@ namespace lookups
/// <returns>True, if lookup table was loaded, otherwise false.</returns>
virtual bool load() = 0;
/// <summary>Saves the table from the lookup table in memory.</summary>
/// <returns>True, if lookup table was saved, otherwise false.</returns>
virtual bool save() = 0;
};
} // namespace lookups

@ -14,7 +14,6 @@
#include "common/edac/SHA256.h"
#include "common/network/json/json.h"
#include "common/Log.h"
#include "common/ThreadFunc.h"
#include "common/Utils.h"
#include "network/FNENetwork.h"
#include "network/fne/TagDMRData.h"
@ -206,6 +205,113 @@ void FNENetwork::processNetwork()
}
}
/// <summary>
/// Updates the timer by the passed number of milliseconds.
/// </summary>
/// <param name="ms"></param>
void FNENetwork::clock(uint32_t ms)
{
if (m_status != NET_STAT_MST_RUNNING) {
return;
}
uint64_t now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
m_maintainenceTimer.clock(ms);
if (m_maintainenceTimer.isRunning() && m_maintainenceTimer.hasExpired()) {
// check to see if any peers have been quiet (no ping) longer than allowed
std::vector<uint32_t> peersToRemove = std::vector<uint32_t>();
for (auto peer : m_peers) {
uint32_t id = peer.first;
FNEPeerConnection* connection = peer.second;
if (connection != nullptr) {
if (connection->connected()) {
uint64_t dt = connection->lastPing() + (m_host->m_pingTime * m_host->m_maxMissedPings);
if (dt < now) {
LogInfoEx(LOG_NET, "PEER %u timed out, dt = %u, now = %u", id, dt, now);
peersToRemove.push_back(id);
}
}
}
}
// remove any peers
for (uint32_t peerId : peersToRemove) {
FNEPeerConnection* connection = m_peers[peerId];
m_peers.erase(peerId);
if (connection != nullptr) {
delete connection;
}
erasePeerAffiliations(peerId);
}
// roll the RTP timestamp if no call is in progress
if (!m_callInProgress) {
frame::RTPHeader::resetStartTime();
m_frameQueue->clearTimestamps();
}
m_maintainenceTimer.start();
}
}
/// <summary>
/// Opens connection to the network.
/// </summary>
/// <returns></returns>
bool FNENetwork::open()
{
if (m_debug)
LogMessage(LOG_NET, "Opening Network");
m_status = NET_STAT_MST_RUNNING;
m_maintainenceTimer.start();
m_socket = new udp::Socket(m_address, m_port);
// reinitialize the frame queue
if (m_frameQueue != nullptr) {
delete m_frameQueue;
m_frameQueue = new FrameQueue(m_socket, m_peerId, m_debug);
}
bool ret = m_socket->open();
if (!ret) {
m_status = NET_STAT_INVALID;
}
return ret;
}
/// <summary>
/// Closes connection to the network.
/// </summary>
void FNENetwork::close()
{
if (m_debug)
LogMessage(LOG_NET, "Closing Network");
if (m_status == NET_STAT_MST_RUNNING) {
uint8_t buffer[1U];
::memset(buffer, 0x00U, 1U);
for (auto peer : m_peers) {
writePeer(peer.first, { NET_FUNC_MST_CLOSING, NET_SUBFUNC_NOP }, buffer, 1U, (ushort)0U, 0U);
}
}
m_socket->close();
m_maintainenceTimer.stop();
m_status = NET_STAT_INVALID;
}
// ---------------------------------------------------------------------------
// Private Class Members
// ---------------------------------------------------------------------------
/// <summary>
/// Process a data frames from the network.
/// </summary>
@ -776,113 +882,6 @@ void* FNENetwork::threadedNetworkRx(void* arg)
return nullptr;
}
/// <summary>
/// Updates the timer by the passed number of milliseconds.
/// </summary>
/// <param name="ms"></param>
void FNENetwork::clock(uint32_t ms)
{
if (m_status != NET_STAT_MST_RUNNING) {
return;
}
uint64_t now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
m_maintainenceTimer.clock(ms);
if (m_maintainenceTimer.isRunning() && m_maintainenceTimer.hasExpired()) {
// check to see if any peers have been quiet (no ping) longer than allowed
std::vector<uint32_t> peersToRemove = std::vector<uint32_t>();
for (auto peer : m_peers) {
uint32_t id = peer.first;
FNEPeerConnection* connection = peer.second;
if (connection != nullptr) {
if (connection->connected()) {
uint64_t dt = connection->lastPing() + (m_host->m_pingTime * m_host->m_maxMissedPings);
if (dt < now) {
LogInfoEx(LOG_NET, "PEER %u timed out, dt = %u, now = %u", id, dt, now);
peersToRemove.push_back(id);
}
}
}
}
// remove any peers
for (uint32_t peerId : peersToRemove) {
FNEPeerConnection* connection = m_peers[peerId];
m_peers.erase(peerId);
if (connection != nullptr) {
delete connection;
}
erasePeerAffiliations(peerId);
}
// roll the RTP timestamp if no call is in progress
if (!m_callInProgress) {
frame::RTPHeader::resetStartTime();
m_frameQueue->clearTimestamps();
}
m_maintainenceTimer.start();
}
}
/// <summary>
/// Opens connection to the network.
/// </summary>
/// <returns></returns>
bool FNENetwork::open()
{
if (m_debug)
LogMessage(LOG_NET, "Opening Network");
m_status = NET_STAT_MST_RUNNING;
m_maintainenceTimer.start();
m_socket = new udp::Socket(m_address, m_port);
// reinitialize the frame queue
if (m_frameQueue != nullptr) {
delete m_frameQueue;
m_frameQueue = new FrameQueue(m_socket, m_peerId, m_debug);
}
bool ret = m_socket->open();
if (!ret) {
m_status = NET_STAT_INVALID;
}
return ret;
}
/// <summary>
/// Closes connection to the network.
/// </summary>
void FNENetwork::close()
{
if (m_debug)
LogMessage(LOG_NET, "Closing Network");
if (m_status == NET_STAT_MST_RUNNING) {
uint8_t buffer[1U];
::memset(buffer, 0x00U, 1U);
for (auto peer : m_peers) {
writePeer(peer.first, { NET_FUNC_MST_CLOSING, NET_SUBFUNC_NOP }, buffer, 1U, (ushort)0U, 0U);
}
}
m_socket->close();
m_maintainenceTimer.stop();
m_status = NET_STAT_INVALID;
}
// ---------------------------------------------------------------------------
// Private Class Members
// ---------------------------------------------------------------------------
/// <summary>
/// Helper to erase the peer from the peers affiliations list.
/// </summary>

@ -26,6 +26,8 @@
#include <unordered_map>
#include <mutex>
#include <pthread.h>
// ---------------------------------------------------------------------------
// Class Prototypes
// ---------------------------------------------------------------------------
@ -209,8 +211,6 @@ namespace network
/// <summary>Process a data frames from the network.</summary>
void processNetwork();
/// <summary>Entry point to process a given network packet.</summary>
static void* threadedNetworkRx(void* arg);
/// <summary>Updates the timer by the passed number of milliseconds.</summary>
void clock(uint32_t ms) override;
@ -267,6 +267,9 @@ namespace network
bool m_reportPeerPing;
bool m_verbose;
/// <summary>Entry point to process a given network packet.</summary>
static void* threadedNetworkRx(void* arg);
/// <summary>Helper to erase the peer from the peers affiliations list.</summary>
bool erasePeerAffiliations(uint32_t peerId);
/// <summary>Helper to erase the peer from the peers list.</summary>

@ -212,6 +212,7 @@ bool TagDMRData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
// repeat traffic to the connected peers
if (m_network->m_peers.size() > 0U) {
uint32_t i = 0U;
for (auto peer : m_network->m_peers) {
if (peerId != peer.first) {
// is this peer ignored?
@ -219,6 +220,11 @@ bool TagDMRData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
continue;
}
// every 5 peers flush the queue
if (i % 5U == 0U) {
m_network->m_frameQueue->flushQueue();
}
uint8_t outboundPeerBuffer[len];
::memset(outboundPeerBuffer, 0x00U, len);
::memcpy(outboundPeerBuffer, buffer, len);
@ -234,6 +240,7 @@ bool TagDMRData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
if (!m_network->m_callInProgress)
m_network->m_callInProgress = true;
i++;
}
}
m_network->m_frameQueue->flushQueue();

@ -182,6 +182,7 @@ bool TagNXDNData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerI
// repeat traffic to the connected peers
if (m_network->m_peers.size() > 0U) {
uint32_t i = 0U;
for (auto peer : m_network->m_peers) {
if (peerId != peer.first) {
// is this peer ignored?
@ -189,6 +190,11 @@ bool TagNXDNData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerI
continue;
}
// every 5 peers flush the queue
if (i % 5U == 0U) {
m_network->m_frameQueue->flushQueue();
}
uint8_t outboundPeerBuffer[len];
::memset(outboundPeerBuffer, 0x00U, len);
::memcpy(outboundPeerBuffer, buffer, len);
@ -204,6 +210,7 @@ bool TagNXDNData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerI
if (!m_network->m_callInProgress)
m_network->m_callInProgress = true;
i++;
}
}
m_network->m_frameQueue->flushQueue();

@ -246,6 +246,7 @@ bool TagP25Data::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
// repeat traffic to the connected peers
if (m_network->m_peers.size() > 0U) {
uint32_t i = 0U;
for (auto peer : m_network->m_peers) {
if (peerId != peer.first) {
// is this peer ignored?
@ -253,6 +254,11 @@ bool TagP25Data::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
continue;
}
// every 5 peers flush the queue
if (i % 5U == 0U) {
m_network->m_frameQueue->flushQueue();
}
uint8_t outboundPeerBuffer[len];
::memset(outboundPeerBuffer, 0x00U, len);
::memcpy(outboundPeerBuffer, buffer, len);
@ -268,6 +274,7 @@ bool TagP25Data::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
if (!m_network->m_callInProgress)
m_network->m_callInProgress = true;
i++;
}
}
m_network->m_frameQueue->flushQueue();

Loading…
Cancel
Save

Powered by TurnKey Linux.