for performance reasons on very noisy/busy setups that may have *lots* of peers, use of an alternate port to service diagnostic and activity log transfers helps ensure the traffic port doesn't become overloaded with data;

pull/51/head
Bryan Biedenkapp 2 years ago
parent 8ca4ed5a40
commit c4ca72581f

@ -144,6 +144,10 @@ system:
# Flag indicating the TGID information for this master will be sent to its peers.
sendTalkgroups: true
# Flag indicating the FNE should use an alternate port dedicated to diagnostic and activity
# log processing. This port number is always: master port + 1 (so for example, a master port
# of 62031 will use 62032 for diagnostic and activity messages.)
useAlternatePortForDiagnostics: false
# Flag indicating whether or not the host activity log will be sent to the network.
allowActivityTransfer: true
# Flag indicating whether or not the host diagnostic log will be sent to the network.

@ -48,6 +48,7 @@ BaseNetwork::BaseNetwork(uint32_t peerId, bool duplex, bool debug, bool slot1, b
m_slot1(slot1),
m_slot2(slot2),
m_duplex(duplex),
m_useAlternatePortForDiagnostics(false),
m_allowActivityTransfer(allowActivityTransfer),
m_allowDiagnosticTransfer(allowDiagnosticTransfer),
m_debug(debug),
@ -128,6 +129,7 @@ bool BaseNetwork::writeGrantReq(const uint8_t mode, const uint32_t srcId, const
/// Writes the local activity log to the network.
/// </summary>
/// <param name="message"></param>
/// <param name="useAlternatePort"></param>
/// <returns></returns>
bool BaseNetwork::writeActLog(const char* message)
{
@ -145,13 +147,14 @@ bool BaseNetwork::writeActLog(const char* message)
::strcpy(buffer + 11U, message);
return writeMaster({ NET_FUNC_TRANSFER, NET_TRANSFER_SUBFUNC_ACTIVITY }, (uint8_t*)buffer, (uint32_t)len + 12U,
0U, 0U);
0U, 0U, false, m_useAlternatePortForDiagnostics);
}
/// <summary>
/// Writes the local diagnostics log to the network.
/// </summary>
/// <param name="message"></param>
/// <param name="useAlternatePort"></param>
/// <returns></returns>
bool BaseNetwork::writeDiagLog(const char* message)
{
@ -169,7 +172,7 @@ bool BaseNetwork::writeDiagLog(const char* message)
::strcpy(buffer + 11U, message);
return writeMaster({ NET_FUNC_TRANSFER, NET_TRANSFER_SUBFUNC_DIAG }, (uint8_t*)buffer, (uint32_t)len + 12U,
0U, 0U);
0U, 0U, false, m_useAlternatePortForDiagnostics);
}
/// <summary>
@ -312,9 +315,25 @@ uint32_t BaseNetwork::getDMRStreamId(uint32_t slotNo) const
/// <param name="pktSeq"></param>
/// <param name="streamId"></param>
/// <param name="queueOnly"></param>
bool BaseNetwork::writeMaster(FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length, uint16_t pktSeq, uint32_t streamId, bool queueOnly)
/// <param name="useAlternatePort"></param>
bool BaseNetwork::writeMaster(FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length, uint16_t pktSeq, uint32_t streamId,
bool queueOnly, bool useAlternatePort)
{
m_frameQueue->enqueueMessage(data, length, streamId, m_peerId, opcode, pktSeq, m_addr, m_addrLen);
if (useAlternatePort) {
sockaddr_storage addr;
uint32_t addrLen;
std::string address = udp::Socket::address(m_addr);
uint16_t port = udp::Socket::port(m_addr) + 1U;
if (udp::Socket::lookup(address, port, addr, addrLen) == 0) {
m_frameQueue->enqueueMessage(data, length, streamId, m_peerId, opcode, pktSeq, addr, addrLen);
}
}
else {
m_frameQueue->enqueueMessage(data, length, streamId, m_peerId, opcode, pktSeq, m_addr, m_addrLen);
}
if (queueOnly)
return true;
return m_frameQueue->flushQueue();

@ -209,7 +209,7 @@ namespace network
/// <summary>Helper to send a data message to the master.</summary>
bool writeMaster(FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length,
uint16_t pktSeq, uint32_t streamId, bool queueOnly = false);
uint16_t pktSeq, uint32_t streamId, bool queueOnly = false, bool useAlternatePort = false);
/** Digital Mobile Radio */
/// <summary>Reads DMR raw frame data from the DMR ring buffer.</summary>
@ -268,6 +268,8 @@ namespace network
__PROTECTED_READONLY_PROPERTY(bool, duplex, Duplex);
protected:
bool m_useAlternatePortForDiagnostics;
bool m_allowActivityTransfer;
bool m_allowDiagnosticTransfer;

@ -51,6 +51,7 @@ HostFNE::HostFNE(const std::string& confFile) :
m_confFile(confFile),
m_conf(),
m_network(nullptr),
m_diagNetwork(nullptr),
m_dmrEnabled(false),
m_p25Enabled(false),
m_nxdnEnabled(false),
@ -60,6 +61,7 @@ HostFNE::HostFNE(const std::string& confFile) :
m_pingTime(5U),
m_maxMissedPings(5U),
m_updateLookupTime(10U),
m_useAlternatePortForDiagnostics(false),
m_allowActivityTransfer(false),
m_allowDiagnosticTransfer(false),
m_RESTAPI(nullptr)
@ -196,6 +198,22 @@ int HostFNE::run()
networkLoop.run();
networkLoop.setName("dvmfne:network-loop");
ThreadFunc diagNetworkLoop([&, this]() {
if (g_killed)
return;
if (m_diagNetwork != nullptr) {
while (!g_killed) {
m_diagNetwork->processNetwork();
Thread::sleep(5U);
}
}
});
if (m_useAlternatePortForDiagnostics) {
diagNetworkLoop.run();
diagNetworkLoop.setName("dvmfne:diag-network-loop");
}
// main execution loop
while (!g_killed) {
uint32_t ms = stopWatch.elapsed();
@ -210,6 +228,8 @@ int HostFNE::run()
// clock master
if (m_network != nullptr)
m_network->clock(ms);
if (m_diagNetwork != nullptr)
m_diagNetwork->clock(ms);
// clock peers
for (auto network : m_peerNetworks) {
@ -228,12 +248,20 @@ int HostFNE::run()
// shutdown threads
networkLoop.wait();
if (m_useAlternatePortForDiagnostics) {
diagNetworkLoop.wait();
}
if (m_network != nullptr) {
m_network->close();
delete m_network;
}
if (m_diagNetwork != nullptr) {
m_diagNetwork->close();
delete m_diagNetwork;
}
for (auto network : m_peerNetworks) {
network::Network* peerNetwork = network.second;
if (peerNetwork != nullptr)
@ -286,6 +314,7 @@ bool HostFNE::readParams()
m_updateLookupTime = 10U;
}
m_useAlternatePortForDiagnostics = systemConf["useAlternatePortForDiagnostics"].as<bool>(false);
m_allowActivityTransfer = systemConf["allowActivityTransfer"].as<bool>(true);
m_allowDiagnosticTransfer = systemConf["allowDiagnosticTransfer"].as<bool>(true);
@ -296,6 +325,7 @@ bool HostFNE::readParams()
LogInfo(" Send Talkgroups: %s", sendTalkgroups ? "yes" : "no");
LogInfo(" Use Alternate Port for Diagnostics: %s", m_useAlternatePortForDiagnostics ? "yes" : "no");
LogInfo(" Allow Activity Log Transfer: %s", m_allowActivityTransfer ? "yes" : "no");
LogInfo(" Allow Diagnostic Log Transfer: %s", m_allowDiagnosticTransfer ? "yes" : "no");
@ -487,6 +517,23 @@ bool HostFNE::createMasterNetwork()
m_network->setPresharedKey(presharedKey);
}
// setup alternate port for diagnostics/activity logging
if (m_useAlternatePortForDiagnostics) {
m_diagNetwork = new DiagNetwork(this, m_network, address, port + 1U);
bool ret = m_diagNetwork->open();
if (!ret) {
delete m_diagNetwork;
m_diagNetwork = nullptr;
LogError(LOG_HOST, "failed to initialize diagnostic log networking!");
m_useAlternatePortForDiagnostics = false; // this isn't fatal so just disable alternate port
}
if (encrypted) {
m_diagNetwork->setPresharedKey(presharedKey);
}
}
return true;
}

@ -19,6 +19,7 @@
#include "common/yaml/Yaml.h"
#include "common/Timer.h"
#include "network/FNENetwork.h"
#include "network/DiagNetwork.h"
#include "network/PeerNetwork.h"
#include "network/RESTAPI.h"
@ -58,6 +59,7 @@ private:
friend class network::fne::TagP25Data;
friend class network::fne::TagNXDNData;
network::FNENetwork* m_network;
network::DiagNetwork* m_diagNetwork;
bool m_dmrEnabled;
bool m_p25Enabled;
@ -72,6 +74,8 @@ private:
uint32_t m_maxMissedPings;
uint32_t m_updateLookupTime;
bool m_useAlternatePortForDiagnostics;
bool m_allowActivityTransfer;
bool m_allowDiagnosticTransfer;

@ -0,0 +1,273 @@
// SPDX-License-Identifier: GPL-2.0-only
/**
* Digital Voice Modem - Converged FNE Software
* GPLv2 Open Source. Use is subject to license terms.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* @package DVM / Converged FNE Software
* @license GPLv2 License (https://opensource.org/licenses/GPL-2.0)
*
* Copyright (C) 2023-2024 Bryan Biedenkapp, N2PLL
*
*/
#include "fne/Defines.h"
#include "common/Log.h"
#include "common/Utils.h"
#include "network/DiagNetwork.h"
#include "fne/ActivityLog.h"
#include "HostFNE.h"
using namespace network;
using namespace network::fne;
#include <cassert>
// ---------------------------------------------------------------------------
// Public Class Members
// ---------------------------------------------------------------------------
/// <summary>
/// Initializes a new instance of the DiagNetwork class.
/// </summary>
/// <param name="host"></param>
/// <param name="network"></param>
/// <param name="address">Network Hostname/IP address to listen on.</param>
/// <param name="port">Network port number.</param>
DiagNetwork::DiagNetwork(HostFNE* host, FNENetwork* fneNetwork, const std::string& address, uint16_t port) :
BaseNetwork(fneNetwork->m_peerId, true, fneNetwork->m_debug, true, true, fneNetwork->m_allowActivityTransfer, fneNetwork->m_allowDiagnosticTransfer),
m_fneNetwork(fneNetwork),
m_host(host),
m_address(address),
m_port(port)
{
assert(fneNetwork != nullptr);
assert(host != nullptr);
assert(!address.empty());
assert(port > 0U);
}
/// <summary>
/// Finalizes a instance of the DiagNetwork class.
/// </summary>
DiagNetwork::~DiagNetwork() = default;
/// <summary>
/// Sets endpoint preshared encryption key.
/// </summary>
void DiagNetwork::setPresharedKey(const uint8_t* presharedKey)
{
m_socket->setPresharedKey(presharedKey);
}
/// <summary>
/// Process a data frames from the network.
/// </summary>
void DiagNetwork::processNetwork()
{
if (m_status != NET_STAT_MST_RUNNING) {
return;
}
sockaddr_storage address;
uint32_t addrLen;
frame::RTPHeader rtpHeader;
frame::RTPFNEHeader fneHeader;
int length = 0U;
// read message
UInt8Array buffer = m_frameQueue->read(length, address, addrLen, &rtpHeader, &fneHeader);
if (length > 0) {
if (m_debug)
Utils::dump(1U, "Network Message", buffer.get(), length);
uint32_t peerId = fneHeader.getPeerId();
NetPacketRequest* req = new NetPacketRequest();
req->network = m_fneNetwork;
req->peerId = peerId;
req->address = address;
req->addrLen = addrLen;
req->rtpHeader = rtpHeader;
req->fneHeader = fneHeader;
req->length = length;
req->buffer = new uint8_t[length];
::memcpy(req->buffer, buffer.get(), length);
::pthread_create(&req->thread, NULL, threadedNetworkRx, req);
}
}
/// <summary>
/// Updates the timer by the passed number of milliseconds.
/// </summary>
/// <param name="ms"></param>
void DiagNetwork::clock(uint32_t ms)
{
if (m_status != NET_STAT_MST_RUNNING) {
return;
}
}
/// <summary>
/// Opens connection to the network.
/// </summary>
/// <returns></returns>
bool DiagNetwork::open()
{
if (m_debug)
LogMessage(LOG_NET, "Opening Network");
m_status = NET_STAT_MST_RUNNING;
m_socket = new udp::Socket(m_address, m_port);
// reinitialize the frame queue
if (m_frameQueue != nullptr) {
delete m_frameQueue;
m_frameQueue = new FrameQueue(m_socket, m_peerId, m_debug);
}
bool ret = m_socket->open();
if (!ret) {
m_status = NET_STAT_INVALID;
}
return ret;
}
/// <summary>
/// Closes connection to the network.
/// </summary>
void DiagNetwork::close()
{
if (m_debug)
LogMessage(LOG_NET, "Closing Network");
m_socket->close();
m_status = NET_STAT_INVALID;
}
// ---------------------------------------------------------------------------
// Private Class Members
// ---------------------------------------------------------------------------
/// <summary>
/// Process a data frames from the network.
/// </summary>
void* DiagNetwork::threadedNetworkRx(void* arg)
{
NetPacketRequest* req = (NetPacketRequest*)arg;
if (req != nullptr) {
FNENetwork* network = req->network;
if (req->length > 0) {
uint32_t peerId = req->fneHeader.getPeerId();
uint32_t streamId = req->fneHeader.getStreamId();
std::stringstream peerName;
peerName << peerId << ":diag-rx-pckt";
if (pthread_kill(req->thread, 0) == 0) {
::pthread_setname_np(req->thread, peerName.str().c_str());
}
// update current peer packet sequence and stream ID
if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end()) && streamId != 0U) {
FNEPeerConnection* connection = network->m_peers[peerId];
uint16_t pktSeq = req->rtpHeader.getSequence();
if (connection != nullptr) {
if (pktSeq == RTP_END_OF_CALL_SEQ) {
connection->pktLastSeq(pktSeq);
connection->pktNextSeq(0U);
} else {
if ((connection->currStreamId() == streamId) && (pktSeq != connection->pktNextSeq())) {
LogWarning(LOG_NET, "PEER %u stream %u out-of-sequence; %u != %u", peerId, streamId, pktSeq, connection->pktNextSeq());
}
connection->currStreamId(streamId);
connection->pktLastSeq(pktSeq);
connection->pktNextSeq(pktSeq + 1);
if (connection->pktNextSeq() > UINT16_MAX) {
connection->pktNextSeq(0U);
}
}
}
network->m_peers[peerId] = connection;
}
// process incoming message frame opcodes
switch (req->fneHeader.getFunction()) {
case NET_FUNC_TRANSFER:
{
if (req->fneHeader.getSubFunction() == NET_TRANSFER_SUBFUNC_ACTIVITY) { // Peer Activity Log Transfer
if (network->m_allowActivityTransfer) {
if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
FNEPeerConnection* connection = network->m_peers[peerId];
if (connection != nullptr) {
std::string ip = udp::Socket::address(req->address);
// validate peer (simple validation really)
if (connection->connected() && connection->address() == ip) {
uint8_t rawPayload[req->length - 11U];
::memset(rawPayload, 0x00U, req->length - 11U);
::memcpy(rawPayload, req->buffer + 11U, req->length - 11U);
std::string payload(rawPayload, rawPayload + (req->length - 11U));
::ActivityLog("%u %s", peerId, payload.c_str());
}
else {
network->writePeerNAK(peerId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
}
}
else if (req->fneHeader.getSubFunction() == NET_TRANSFER_SUBFUNC_DIAG) { // Peer Diagnostic Log Transfer
if (network->m_allowDiagnosticTransfer) {
if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
FNEPeerConnection* connection = network->m_peers[peerId];
if (connection != nullptr) {
std::string ip = udp::Socket::address(req->address);
// validate peer (simple validation really)
if (connection->connected() && connection->address() == ip) {
uint8_t rawPayload[req->length - 11U];
::memset(rawPayload, 0x00U, req->length - 11U);
::memcpy(rawPayload, req->buffer + 11U, req->length - 11U);
std::string payload(rawPayload, rawPayload + (req->length - 11U));
bool currState = g_disableTimeDisplay;
g_disableTimeDisplay = true;
::Log(9999U, nullptr, "%u %s", peerId, payload.c_str());
g_disableTimeDisplay = currState;
}
else {
network->writePeerNAK(peerId, TAG_TRANSFER_DIAG_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
}
}
else {
network->writePeerNAK(peerId, TAG_TRANSFER, NET_CONN_NAK_ILLEGAL_PACKET);
Utils::dump("unknown transfer opcode from the peer", req->buffer, req->length);
}
}
break;
default:
// diagostic network ignores unknowns for everything else...
break;
}
}
if (req->buffer != nullptr)
delete req->buffer;
delete req;
}
return nullptr;
}

@ -0,0 +1,77 @@
// SPDX-License-Identifier: GPL-2.0-only
/**
* Digital Voice Modem - Converged FNE Software
* GPLv2 Open Source. Use is subject to license terms.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* @package DVM / Converged FNE Software
* @license GPLv2 License (https://opensource.org/licenses/GPL-2.0)
*
* Copyright (C) 2024 Bryan Biedenkapp, N2PLL
*
*/
#if !defined(__DIAG_NETWORK_H__)
#define __DIAG_NETWORK_H__
#include "fne/Defines.h"
#include "common/network/BaseNetwork.h"
#include "fne/network/FNENetwork.h"
#include <string>
#include <pthread.h>
// ---------------------------------------------------------------------------
// Class Prototypes
// ---------------------------------------------------------------------------
class HOST_SW_API HostFNE;
namespace network
{
// ---------------------------------------------------------------------------
// Class Declaration
// Implements the diagnostic/activity log networking logic.
// ---------------------------------------------------------------------------
class HOST_SW_API DiagNetwork : public BaseNetwork {
public:
/// <summary>Initializes a new instance of the DiagNetwork class.</summary>
DiagNetwork(HostFNE* host, FNENetwork* fneNetwork, const std::string& address, uint16_t port);
/// <summary>Finalizes a instance of the DiagNetwork class.</summary>
~DiagNetwork() override;
/// <summary>Gets the current status of the network.</summary>
NET_CONN_STATUS getStatus() { return m_status; }
/// <summary>Sets endpoint preshared encryption key.</summary>
void setPresharedKey(const uint8_t* presharedKey);
/// <summary>Process a data frames from the network.</summary>
void processNetwork();
/// <summary>Updates the timer by the passed number of milliseconds.</summary>
void clock(uint32_t ms) override;
/// <summary>Opens connection to the network.</summary>
bool open() override;
/// <summary>Closes connection to the network.</summary>
void close() override;
private:
friend class FNENetwork;
FNENetwork* m_fneNetwork;
HostFNE* m_host;
std::string m_address;
uint16_t m_port;
NET_CONN_STATUS m_status;
/// <summary>Entry point to process a given network packet.</summary>
static void* threadedNetworkRx(void* arg);
};
} // namespace network
#endif // __FNE_NETWORK_H__

@ -613,7 +613,15 @@ void* FNENetwork::threadedNetworkRx(void* arg)
connection->lastACLUpdate(now);
network->m_peers[peerId] = connection;
network->writePeerACK(peerId);
// attach extra notification data to the RPTC ACK to notify the peer of
// the use of the alternate diagnostic port
uint8_t buffer[1U];
buffer[0U] = 0x00U;
if (network->m_host->m_useAlternatePortForDiagnostics) {
buffer[0U] = 0x80U;
}
network->writePeerACK(peerId, buffer, 1U);
LogInfoEx(LOG_NET, "PEER %u RPTC ACK, completed the configuration exchange", peerId);
json::object peerConfig = connection->config();
@ -766,6 +774,12 @@ void* FNENetwork::threadedNetworkRx(void* arg)
case NET_FUNC_TRANSFER:
{
// are activity/diagnostic transfers occurring from the alternate port?
if (network->m_host->m_useAlternatePortForDiagnostics) {
break; // for performance and other reasons -- simply ignore the entire NET_FUNC_TRANSFER at this point
// since they should be coming from the alternate port anyway
}
if (req->fneHeader.getSubFunction() == NET_TRANSFER_SUBFUNC_ACTIVITY) { // Peer Activity Log Transfer
if (network->m_allowActivityTransfer) {
if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
@ -1398,6 +1412,8 @@ bool FNENetwork::writePeerACK(uint32_t peerId, const uint8_t* data, uint32_t len
uint8_t buffer[DATA_PACKET_LENGTH];
::memset(buffer, 0x00U, DATA_PACKET_LENGTH);
__SET_UINT32(peerId, buffer, 0U); // Peer ID
if (data != nullptr && length > 0U) {
::memcpy(buffer + 6U, data, length);
}

@ -58,6 +58,7 @@ namespace network
// Class Prototypes
// ---------------------------------------------------------------------------
class HOST_SW_API DiagNetwork;
class HOST_SW_API FNENetwork;
// ---------------------------------------------------------------------------
@ -236,6 +237,7 @@ namespace network
void close() override;
private:
friend class DiagNetwork;
friend class fne::TagDMRData;
fne::TagDMRData* m_tagDMR;
friend class fne::TagP25Data;
@ -324,8 +326,6 @@ namespace network
bool writePeerNAK(uint32_t peerId, const char* tag, NET_CONN_NAK_REASON reason = NET_CONN_NAK_GENERAL_FAILURE);
/// <summary>Helper to send a NAK response to the specified peer.</summary>
bool writePeerNAK(uint32_t peerId, const char* tag, NET_CONN_NAK_REASON reason, sockaddr_storage& addr, uint32_t addrLen);
};
} // namespace network

@ -531,15 +531,19 @@ void Network::clock(uint32_t ms)
switch (m_status) {
case NET_STAT_WAITING_LOGIN:
LogDebug(LOG_NET, "PEER %u RPTL ACK, performing login exchange, remotePeerId = %u", m_peerId, rtpHeader.getSSRC());
::memcpy(m_salt, buffer.get() + 6U, sizeof(uint32_t));
writeAuthorisation();
m_status = NET_STAT_WAITING_AUTHORISATION;
m_timeoutTimer.start();
m_retryTimer.start();
break;
case NET_STAT_WAITING_AUTHORISATION:
LogDebug(LOG_NET, "PEER %u RPTK ACK, performing configuration exchange, remotePeerId = %u", m_peerId, rtpHeader.getSSRC());
writeConfig();
m_status = NET_STAT_WAITING_CONFIG;
m_timeoutTimer.start();
m_retryTimer.start();
@ -548,10 +552,21 @@ void Network::clock(uint32_t ms)
LogMessage(LOG_NET, "PEER %u RPTC ACK, logged into the master successfully, remotePeerId = %u", m_peerId, rtpHeader.getSSRC());
m_loginStreamId = 0U;
m_remotePeerId = rtpHeader.getSSRC();
pktSeq(true);
m_status = NET_STAT_RUNNING;
m_timeoutTimer.start();
m_retryTimer.start();
Utils::dump(1U, "buffer", buffer.get(), length);
if (length > 6) {
m_useAlternatePortForDiagnostics = (buffer[6U] & 0x80U) == 0x80U;
if (m_useAlternatePortForDiagnostics) {
LogMessage(LOG_NET, "PEER %u RPTC ACK, master commanded alternate port for diagnostics and activity logging, remotePeerId = %u", m_peerId, rtpHeader.getSSRC());
}
}
break;
default:
break;

Loading…
Cancel
Save

Powered by TurnKey Linux.