refactor FNE RTP packet handling, this change better handles a peer end-point transmitting multiple varied RTP streams; refactor promiscuous handling of RTP streams on the host network API; refactor handling non-promiscuous handling of RTP streams on the host network API; bump project version from 4.11F to 4.11G to reflect these larger changes; correct bad use of magic numbers when dealing with signal values;

pull/86/head
Bryan Biedenkapp 11 months ago
parent 4025db8200
commit ef1a0df496

@ -32,6 +32,10 @@ using namespace lookups;
// Global Variables
// ---------------------------------------------------------------------------
#ifndef SIGHUP
#define SIGHUP 1
#endif
int g_signal = 0;
std::string g_progExe = std::string(__EXE_NAME__);
std::string g_iniFile = std::string(DEFAULT_CONF_FILE);
@ -293,15 +297,15 @@ int main(int argc, char** argv)
ret = bridge->run();
delete bridge;
if (g_signal == 2)
if (g_signal == SIGINT)
::LogInfoEx(LOG_HOST, "Exited on receipt of SIGINT");
if (g_signal == 15)
if (g_signal == SIGTERM)
::LogInfoEx(LOG_HOST, "Exited on receipt of SIGTERM");
if (g_signal == 1)
if (g_signal == SIGHUP)
::LogInfoEx(LOG_HOST, "Restarting on receipt of SIGHUP");
} while (g_signal == 1);
} while (g_signal == SIGHUP);
::LogFinalise();
::ActivityLogFinalise();

@ -109,7 +109,7 @@ typedef unsigned long long ulong64_t;
#define VERSION_MAJOR "04"
#define VERSION_MINOR "11"
#define VERSION_REV "F"
#define VERSION_REV "G"
#define __NETVER__ "DVM_R" VERSION_MAJOR VERSION_REV VERSION_MINOR
#define __VER__ VERSION_MAJOR "." VERSION_MINOR VERSION_REV " (R" VERSION_MAJOR VERSION_REV VERSION_MINOR " " __GIT_VER__ ")"

@ -544,12 +544,7 @@ bool BaseNetwork::writeP25TDU(const p25::lc::LC& control, const p25::data::LowSp
return false;
}
uint16_t seq = pktSeq(resetSeq);
if (controlByte == 0x00U) {
seq = RTP_END_OF_CALL_SEQ;
}
return writeMaster({ NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, seq, m_p25StreamId);
return writeMaster({ NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, m_p25StreamId);
}
/* Writes P25 TSDU frame data to the network. */

@ -32,6 +32,10 @@ using namespace lookups;
// Global Variables
// ---------------------------------------------------------------------------
#ifndef SIGHUP
#define SIGHUP 1
#endif
int g_signal = 0;
std::string g_progExe = std::string(__EXE_NAME__);
std::string g_iniFile = std::string(DEFAULT_CONF_FILE);
@ -209,15 +213,15 @@ int main(int argc, char** argv)
ret = fne->run();
delete fne;
if (g_signal == 2)
if (g_signal == SIGINT)
::LogInfoEx(LOG_HOST, "[STOP] dvmfne:main SIGINT");
if (g_signal == 15)
if (g_signal == SIGTERM)
::LogInfoEx(LOG_HOST, "[STOP] dvmfne:main SIGTERM");
if (g_signal == 1)
if (g_signal == SIGHUP)
::LogInfoEx(LOG_HOST, "[RSTR] dvmfne:main SIGHUP");
} while (g_signal == 1);
} while (g_signal == SIGHUP);
::LogFinalise();
::ActivityLogFinalise();

@ -4,7 +4,7 @@
* GPLv2 Open Source. Use is subject to license terms.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* Copyright (C) 2023,2024 Bryan Biedenkapp, N2PLL
* Copyright (C) 2023,2024,2025 Bryan Biedenkapp, N2PLL
*
*/
#include "Defines.h"
@ -972,7 +972,7 @@ void HostFNE::processPeer(network::PeerNetwork* peerNetwork)
if (ret) {
uint32_t peerId = peerNetwork->getPeerId();
uint32_t slotNo = (data[15U] & 0x80U) == 0x80U ? 2U : 1U;
uint32_t streamId = peerNetwork->getDMRStreamId(slotNo);
uint32_t streamId = peerNetwork->getRxDMRStreamId(slotNo);
m_network->dmrTrafficHandler()->processFrame(data.get(), length, peerId, peerNetwork->pktLastSeq(), streamId, true);
}
@ -985,7 +985,7 @@ void HostFNE::processPeer(network::PeerNetwork* peerNetwork)
UInt8Array data = peerNetwork->readP25(ret, length);
if (ret) {
uint32_t peerId = peerNetwork->getPeerId();
uint32_t streamId = peerNetwork->getP25StreamId();
uint32_t streamId = peerNetwork->getRxP25StreamId();
m_network->p25TrafficHandler()->processFrame(data.get(), length, peerId, peerNetwork->pktLastSeq(), streamId, true);
}
@ -998,7 +998,7 @@ void HostFNE::processPeer(network::PeerNetwork* peerNetwork)
UInt8Array data = peerNetwork->readNXDN(ret, length);
if (ret) {
uint32_t peerId = peerNetwork->getPeerId();
uint32_t streamId = peerNetwork->getNXDNStreamId();
uint32_t streamId = peerNetwork->getRxNXDNStreamId();
m_network->nxdnTrafficHandler()->processFrame(data.get(), length, peerId, peerNetwork->pktLastSeq(), streamId, true);
}

@ -4,7 +4,7 @@
* GPLv2 Open Source. Use is subject to license terms.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* Copyright (C) 2023-2024 Bryan Biedenkapp, N2PLL
* Copyright (C) 2023-2025 Bryan Biedenkapp, N2PLL
*
*/
#include "fne/Defines.h"
@ -177,20 +177,17 @@ void* DiagNetwork::threadedNetworkRx(void* arg)
if (connection != nullptr) {
if (pktSeq == RTP_END_OF_CALL_SEQ) {
connection->pktLastSeq(pktSeq);
connection->pktNextSeq(0U);
connection->eraseStreamPktSeq(streamId); // attempt to erase packet sequence for the stream
} else {
if ((connection->currStreamId() == streamId) && (pktSeq != connection->pktNextSeq()) && (pktSeq != (RTP_END_OF_CALL_SEQ - 1U))) {
if (connection->hasStreamPktSeq(streamId)) {
uint16_t currPkt = connection->getStreamPktSeq(streamId);
if ((pktSeq != currPkt) && (pktSeq != (RTP_END_OF_CALL_SEQ - 1U)) && pktSeq != 0U) {
LogWarning(LOG_NET, "PEER %u (%s) stream %u out-of-sequence; %u != %u", peerId, connection->identity().c_str(),
streamId, pktSeq, connection->pktNextSeq());
streamId, pktSeq, currPkt);
}
connection->currStreamId(streamId);
connection->pktLastSeq(pktSeq);
connection->pktNextSeq(pktSeq + 1);
if (connection->pktNextSeq() > (RTP_END_OF_CALL_SEQ - 1U)) {
connection->pktNextSeq(0U);
}
connection->incStreamPktSeq(streamId, pktSeq + 1U);
}
}
@ -255,10 +252,6 @@ void* DiagNetwork::threadedNetworkRx(void* arg)
for (auto peer : network->m_peers) {
if (peer.second != nullptr) {
if (peer.second->isSysView()) {
uint32_t peerStreamId = peer.second->currStreamId();
if (streamId == 0U) {
streamId = peerStreamId;
}
sockaddr_storage addr = peer.second->socketStorage();
uint32_t addrLen = peer.second->sockStorageLen();
@ -284,7 +277,7 @@ void* DiagNetwork::threadedNetworkRx(void* arg)
}
}
else {
network->writePeerNAK(pktPeerId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED);
network->writePeerNAK(pktPeerId, streamId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
@ -322,7 +315,7 @@ void* DiagNetwork::threadedNetworkRx(void* arg)
}
}
else {
network->writePeerNAK(peerId, TAG_TRANSFER_DIAG_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED);
network->writePeerNAK(peerId, streamId, TAG_TRANSFER_DIAG_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
@ -341,10 +334,6 @@ void* DiagNetwork::threadedNetworkRx(void* arg)
for (auto peer : network->m_peers) {
if (peer.second != nullptr) {
if (peer.second->isSysView()) {
uint32_t peerStreamId = peer.second->currStreamId();
if (streamId == 0U) {
streamId = peerStreamId;
}
sockaddr_storage addr = peer.second->socketStorage();
uint32_t addrLen = peer.second->sockStorageLen();
@ -374,13 +363,13 @@ void* DiagNetwork::threadedNetworkRx(void* arg)
}
}
else {
network->writePeerNAK(pktPeerId, TAG_TRANSFER_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED);
network->writePeerNAK(pktPeerId, streamId, TAG_TRANSFER_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
}
else {
network->writePeerNAK(peerId, TAG_TRANSFER, NET_CONN_NAK_ILLEGAL_PACKET);
network->writePeerNAK(peerId, streamId, TAG_TRANSFER, NET_CONN_NAK_ILLEGAL_PACKET);
Utils::dump("unknown transfer opcode from the peer", req->buffer, req->length);
}
}
@ -420,7 +409,7 @@ void* DiagNetwork::threadedNetworkRx(void* arg)
}
}
else {
network->writePeerNAK(peerId, TAG_PEER_LINK, NET_CONN_NAK_FNE_UNAUTHORIZED);
network->writePeerNAK(peerId, streamId, TAG_PEER_LINK, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}

@ -4,7 +4,7 @@
* GPLv2 Open Source. Use is subject to license terms.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* Copyright (C) 2023-2024 Bryan Biedenkapp, N2PLL
* Copyright (C) 2023-2025 Bryan Biedenkapp, N2PLL
*
*/
#include "fne/Defines.h"
@ -403,8 +403,9 @@ void FNENetwork::close()
uint8_t buffer[1U];
::memset(buffer, 0x00U, 1U);
uint32_t streamId = createStreamId();
for (auto peer : m_peers) {
writePeer(peer.first, { NET_FUNC::MST_CLOSING, NET_SUBFUNC::NOP }, buffer, 1U, (uint16_t)0U, 0U);
writePeer(peer.first, { NET_FUNC::MST_CLOSING, NET_SUBFUNC::NOP }, buffer, 1U, RTP_END_OF_CALL_SEQ, streamId, false);
}
}
@ -459,21 +460,18 @@ void* FNENetwork::threadedNetworkRx(void* arg)
// only reset packet sequences if we're a PROTOCOL or RPTC function
if ((req->fneHeader.getFunction() == NET_FUNC::PROTOCOL) ||
(req->fneHeader.getFunction() == NET_FUNC::RPTC)) {
connection->pktLastSeq(pktSeq);
connection->pktNextSeq(0U);
connection->eraseStreamPktSeq(streamId); // attempt to erase packet sequence for the stream
}
} else {
if ((connection->currStreamId() == streamId) && (pktSeq != connection->pktNextSeq()) && (pktSeq != (RTP_END_OF_CALL_SEQ - 1U)) && pktSeq != 0U) {
if (connection->hasStreamPktSeq(streamId)) {
uint16_t currPkt = connection->getStreamPktSeq(streamId);
if ((pktSeq != currPkt) && (pktSeq != (RTP_END_OF_CALL_SEQ - 1U)) && pktSeq != 0U) {
LogWarning(LOG_NET, "PEER %u (%s) stream %u out-of-sequence; %u != %u", peerId, connection->identity().c_str(),
streamId, pktSeq, connection->pktNextSeq());
streamId, pktSeq, currPkt);
}
connection->currStreamId(streamId);
connection->pktLastSeq(pktSeq);
connection->pktNextSeq(pktSeq + 1);
if (connection->pktNextSeq() > (RTP_END_OF_CALL_SEQ - 1U)) {
connection->pktNextSeq(0U);
}
connection->incStreamPktSeq(streamId, pktSeq + 1U);
}
}
@ -510,7 +508,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
network->m_tagDMR->processFrame(req->buffer, req->length, peerId, req->rtpHeader.getSequence(), streamId);
}
} else {
network->writePeerNAK(peerId, TAG_DMR_DATA, NET_CONN_NAK_MODE_NOT_ENABLED);
network->writePeerNAK(peerId, streamId, TAG_DMR_DATA, NET_CONN_NAK_MODE_NOT_ENABLED);
}
}
}
@ -533,7 +531,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
network->m_tagP25->processFrame(req->buffer, req->length, peerId, req->rtpHeader.getSequence(), streamId);
}
} else {
network->writePeerNAK(peerId, TAG_P25_DATA, NET_CONN_NAK_MODE_NOT_ENABLED);
network->writePeerNAK(peerId, streamId, TAG_P25_DATA, NET_CONN_NAK_MODE_NOT_ENABLED);
}
}
}
@ -556,7 +554,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
network->m_tagNXDN->processFrame(req->buffer, req->length, peerId, req->rtpHeader.getSequence(), streamId);
}
} else {
network->writePeerNAK(peerId, TAG_NXDN_DATA, NET_CONN_NAK_MODE_NOT_ENABLED);
network->writePeerNAK(peerId, streamId, TAG_NXDN_DATA, NET_CONN_NAK_MODE_NOT_ENABLED);
}
}
}
@ -588,9 +586,8 @@ void* FNENetwork::threadedNetworkRx(void* arg)
FNEPeerConnection* connection = new FNEPeerConnection(peerId, req->address, req->addrLen);
connection->lastPing(now);
connection->currStreamId(streamId);
network->setupRepeaterLogin(peerId, connection);
network->setupRepeaterLogin(peerId, streamId, connection);
// check if the peer is in the peer ACL list
if (network->m_peerListLookup->getACL()) {
@ -625,10 +622,9 @@ void* FNENetwork::threadedNetworkRx(void* arg)
connection = new FNEPeerConnection(peerId, req->address, req->addrLen);
connection->lastPing(now);
connection->currStreamId(streamId);
network->erasePeerAffiliations(peerId);
network->setupRepeaterLogin(peerId, connection);
network->setupRepeaterLogin(peerId, streamId, connection);
// check if the peer is in the peer ACL list
if (network->m_peerListLookup->getACL()) {
@ -745,7 +741,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
if (validHash) {
connection->connectionState(NET_STAT_WAITING_CONFIG);
network->writePeerACK(peerId);
network->writePeerACK(peerId, streamId);
LogInfoEx(LOG_NET, "PEER %u RPTK ACK, completed the login exchange", peerId);
network->m_peers[peerId] = connection;
}
@ -822,7 +818,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
buffer[0U] = 0x80U;
}
network->writePeerACK(peerId, buffer, 1U);
network->writePeerACK(peerId, streamId, buffer, 1U);
LogInfoEx(LOG_NET, "PEER %u RPTC ACK, completed the configuration exchange", peerId);
json::object peerConfig = connection->config();
@ -943,7 +939,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
if (dt < now) {
LogInfoEx(LOG_NET, "PEER %u (%s) updating ACL list, dt = %u, now = %u", peerId, connection->identity().c_str(),
dt, now);
if (connection->pktLastSeq() == RTP_END_OF_CALL_SEQ) {
if (connection->streamCount() <= 1) {
network->peerACLUpdate(peerId);
}
connection->lastACLUpdate(now);
@ -963,7 +959,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
payload[7U] = (uint8_t)((now >> 0) & 0xFFU);
network->m_peers[peerId] = connection;
network->writePeerCommand(peerId, { NET_FUNC::PONG, NET_SUBFUNC::NOP }, payload, 8U);
network->writePeerCommand(peerId, { NET_FUNC::PONG, NET_SUBFUNC::NOP }, payload, 8U, streamId, false);
if (network->m_reportPeerPing) {
LogInfoEx(LOG_NET, "PEER %u (%s) ping, pingsReceived = %u, lastPing = %u, now = %u", peerId, connection->identity().c_str(),
@ -971,7 +967,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
}
}
else {
network->writePeerNAK(peerId, TAG_REPEATER_PING);
network->writePeerNAK(peerId, streamId, TAG_REPEATER_PING);
}
}
}
@ -1001,7 +997,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
if (network->m_tagDMR != nullptr) {
network->m_tagDMR->processGrantReq(srcId, dstId, slot, unitToUnit, peerId, req->rtpHeader.getSequence(), streamId);
} else {
network->writePeerNAK(peerId, TAG_DMR_DATA, NET_CONN_NAK_MODE_NOT_ENABLED);
network->writePeerNAK(peerId, streamId, TAG_DMR_DATA, NET_CONN_NAK_MODE_NOT_ENABLED);
}
}
break;
@ -1010,7 +1006,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
if (network->m_tagP25 != nullptr) {
network->m_tagP25->processGrantReq(srcId, dstId, unitToUnit, peerId, req->rtpHeader.getSequence(), streamId);
} else {
network->writePeerNAK(peerId, TAG_P25_DATA, NET_CONN_NAK_MODE_NOT_ENABLED);
network->writePeerNAK(peerId, streamId, TAG_P25_DATA, NET_CONN_NAK_MODE_NOT_ENABLED);
}
}
break;
@ -1019,18 +1015,18 @@ void* FNENetwork::threadedNetworkRx(void* arg)
if (network->m_tagNXDN != nullptr) {
network->m_tagNXDN->processGrantReq(srcId, dstId, unitToUnit, peerId, req->rtpHeader.getSequence(), streamId);
} else {
network->writePeerNAK(peerId, TAG_NXDN_DATA, NET_CONN_NAK_MODE_NOT_ENABLED);
network->writePeerNAK(peerId, streamId, TAG_NXDN_DATA, NET_CONN_NAK_MODE_NOT_ENABLED);
}
}
break;
default:
network->writePeerNAK(peerId, TAG_REPEATER_GRANT, NET_CONN_NAK_ILLEGAL_PACKET);
network->writePeerNAK(peerId, streamId, TAG_REPEATER_GRANT, NET_CONN_NAK_ILLEGAL_PACKET);
Utils::dump("unknown state for grant request from the peer", req->buffer, req->length);
break;
}
}
else {
network->writePeerNAK(peerId, TAG_REPEATER_GRANT, NET_CONN_NAK_FNE_UNAUTHORIZED);
network->writePeerNAK(peerId, streamId, TAG_REPEATER_GRANT, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
@ -1074,7 +1070,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
}
}
else {
network->writePeerNAK(peerId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED);
network->writePeerNAK(peerId, streamId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
@ -1112,7 +1108,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
}
}
else {
network->writePeerNAK(peerId, TAG_TRANSFER_DIAG_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED);
network->writePeerNAK(peerId, streamId, TAG_TRANSFER_DIAG_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
@ -1122,7 +1118,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
// main traffic port status transfers aren't supported for performance reasons
}
else {
network->writePeerNAK(peerId, TAG_TRANSFER, NET_CONN_NAK_ILLEGAL_PACKET);
network->writePeerNAK(peerId, streamId, TAG_TRANSFER, NET_CONN_NAK_ILLEGAL_PACKET);
Utils::dump("unknown transfer opcode from the peer", req->buffer, req->length);
}
}
@ -1138,7 +1134,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
lookups::AffiliationLookup* aff = network->m_peerAffiliations[peerId];
if (aff == nullptr) {
LogError(LOG_NET, "PEER %u (%s) has uninitialized affiliations lookup?", peerId, connection->identity().c_str());
network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID);
network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID);
}
// validate peer (simple validation really)
@ -1161,7 +1157,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
}
}
else {
network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
@ -1174,7 +1170,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
lookups::AffiliationLookup* aff = network->m_peerAffiliations[peerId];
if (aff == nullptr) {
LogError(LOG_NET, "PEER %u (%s) has uninitialized affiliations lookup?", peerId, connection->identity().c_str());
network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID);
network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID);
}
// validate peer (simple validation really)
@ -1195,7 +1191,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
}
}
else {
network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
@ -1208,7 +1204,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
lookups::AffiliationLookup* aff = network->m_peerAffiliations[peerId];
if (aff == nullptr) {
LogError(LOG_NET, "PEER %u (%s) has uninitialized affiliations lookup?", peerId, connection->identity().c_str());
network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID);
network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID);
}
// validate peer (simple validation really)
@ -1229,7 +1225,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
}
}
else {
network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
@ -1242,7 +1238,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
lookups::AffiliationLookup* aff = network->m_peerAffiliations[peerId];
if (aff == nullptr) {
LogError(LOG_NET, "PEER %u (%s) has uninitialized affiliations lookup?", peerId, connection->identity().c_str());
network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID);
network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID);
}
// validate peer (simple validation really)
@ -1263,7 +1259,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
}
}
else {
network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
@ -1279,7 +1275,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
lookups::AffiliationLookup* aff = network->m_peerAffiliations[peerId];
if (aff == nullptr) {
LogError(LOG_NET, "PEER %u (%s) has uninitialized affiliations lookup?", peerId, connection->identity().c_str());
network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID);
network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID);
}
if (aff != nullptr) {
@ -1311,7 +1307,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
}
}
else {
network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
@ -1356,13 +1352,13 @@ void* FNENetwork::threadedNetworkRx(void* arg)
}
}
else {
network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
}
else {
network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_ILLEGAL_PACKET);
network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_ILLEGAL_PACKET);
Utils::dump("unknown announcement opcode from the peer", req->buffer, req->length);
}
}
@ -1545,7 +1541,7 @@ std::string FNENetwork::resolvePeerIdentity(uint32_t peerId)
/* Helper to complete setting up a repeater login request. */
void FNENetwork::setupRepeaterLogin(uint32_t peerId, FNEPeerConnection* connection)
void FNENetwork::setupRepeaterLogin(uint32_t peerId, uint32_t streamId, FNEPeerConnection* connection)
{
std::uniform_int_distribution<uint32_t> dist(DVM_RAND_MIN, DVM_RAND_MAX);
connection->salt(dist(m_random));
@ -1560,7 +1556,7 @@ void FNENetwork::setupRepeaterLogin(uint32_t peerId, FNEPeerConnection* connecti
::memset(salt, 0x00U, 4U);
__SET_UINT32(connection->salt(), salt, 0U);
writePeerACK(peerId, salt, 4U);
writePeerACK(peerId, streamId, salt, 4U);
LogInfoEx(LOG_NET, "PEER %u RPTL ACK, challenge response sent for login", peerId);
}
@ -1607,26 +1603,24 @@ void* FNENetwork::threadedACLUpdate(void* arg)
FNEPeerConnection* connection = network->m_peers[req->peerId];
if (connection != nullptr) {
uint32_t aclStreamId = network->createStreamId();
// if the connection is an external peer, and peer is participating in peer link,
// send the peer proper configuration data
if (connection->isExternalPeer() && connection->isPeerLink()) {
LogInfoEx(LOG_NET, "PEER %u (%s) sending Peer-Link ACL list updates", req->peerId, peerIdentity.c_str());
network->writeWhitelistRIDs(req->peerId, true);
network->writeTGIDs(req->peerId, true);
connection->pktLastSeq(RTP_END_OF_CALL_SEQ - 1U);
network->writePeerList(req->peerId);
network->writeWhitelistRIDs(req->peerId, aclStreamId, true);
network->writeTGIDs(req->peerId, aclStreamId, true);
network->writePeerList(req->peerId, aclStreamId);
}
else {
LogInfoEx(LOG_NET, "PEER %u (%s) sending ACL list updates", req->peerId, peerIdentity.c_str());
network->writeWhitelistRIDs(req->peerId, false);
network->writeBlacklistRIDs(req->peerId);
network->writeTGIDs(req->peerId, false);
connection->pktLastSeq(RTP_END_OF_CALL_SEQ - 1U);
network->writeDeactiveTGIDs(req->peerId);
network->writeWhitelistRIDs(req->peerId, aclStreamId, false);
network->writeBlacklistRIDs(req->peerId, aclStreamId);
network->writeTGIDs(req->peerId, aclStreamId, false);
network->writeDeactiveTGIDs(req->peerId, aclStreamId);
}
}
@ -1638,7 +1632,7 @@ void* FNENetwork::threadedACLUpdate(void* arg)
/* Helper to send the list of whitelisted RIDs to the specified peer. */
void FNENetwork::writeWhitelistRIDs(uint32_t peerId, bool isExternalPeer)
void FNENetwork::writeWhitelistRIDs(uint32_t peerId, uint32_t streamId, bool isExternalPeer)
{
uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
@ -1743,7 +1737,7 @@ void FNENetwork::writeWhitelistRIDs(uint32_t peerId, bool isExternalPeer)
offs += PEER_LINK_BLOCK_SIZE;
writePeer(peerId, { NET_FUNC::PEER_LINK, NET_SUBFUNC::PL_RID_LIST },
payload, bufSize, 0U, false, true, true);
payload, bufSize, 0U, streamId, false, true, true);
}
connection->lastPing(now);
@ -1813,7 +1807,7 @@ void FNENetwork::writeWhitelistRIDs(uint32_t peerId, bool isExternalPeer)
}
writePeerCommand(peerId, { NET_FUNC::MASTER, NET_SUBFUNC::MASTER_SUBFUNC_WL_RID },
payload, bufSize, true);
payload, bufSize, streamId, true);
}
connection->lastPing(now);
@ -1822,7 +1816,7 @@ void FNENetwork::writeWhitelistRIDs(uint32_t peerId, bool isExternalPeer)
/* Helper to send the list of whitelisted RIDs to the specified peer. */
void FNENetwork::writeBlacklistRIDs(uint32_t peerId)
void FNENetwork::writeBlacklistRIDs(uint32_t peerId, uint32_t streamId)
{
uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
@ -1887,7 +1881,7 @@ void FNENetwork::writeBlacklistRIDs(uint32_t peerId)
}
writePeerCommand(peerId, { NET_FUNC::MASTER, NET_SUBFUNC::MASTER_SUBFUNC_BL_RID },
payload, bufSize, true);
payload, bufSize, streamId, true);
}
connection->lastPing(now);
@ -1896,7 +1890,7 @@ void FNENetwork::writeBlacklistRIDs(uint32_t peerId)
/* Helper to send the list of active TGIDs to the specified peer. */
void FNENetwork::writeTGIDs(uint32_t peerId, bool isExternalPeer)
void FNENetwork::writeTGIDs(uint32_t peerId, uint32_t streamId, bool isExternalPeer)
{
uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
@ -2005,7 +1999,7 @@ void FNENetwork::writeTGIDs(uint32_t peerId, bool isExternalPeer)
offs += PEER_LINK_BLOCK_SIZE;
writePeer(peerId, { NET_FUNC::PEER_LINK, NET_SUBFUNC::PL_TALKGROUP_LIST },
payload, bufSize, 0U, false, true, true);
payload, bufSize, 0U, streamId, false, true, true);
}
connection->lastPing(now);
@ -2086,12 +2080,12 @@ void FNENetwork::writeTGIDs(uint32_t peerId, bool isExternalPeer)
}
writePeerCommand(peerId, { NET_FUNC::MASTER, NET_SUBFUNC::MASTER_SUBFUNC_ACTIVE_TGS },
payload, 4U + (tgidList.size() * 5U), true);
payload, 4U + (tgidList.size() * 5U), streamId, true);
}
/* Helper to send the list of deactivated TGIDs to the specified peer. */
void FNENetwork::writeDeactiveTGIDs(uint32_t peerId)
void FNENetwork::writeDeactiveTGIDs(uint32_t peerId, uint32_t streamId)
{
if (!m_tidLookup->sendTalkgroups()) {
return;
@ -2147,12 +2141,12 @@ void FNENetwork::writeDeactiveTGIDs(uint32_t peerId)
}
writePeerCommand(peerId, { NET_FUNC::MASTER, NET_SUBFUNC::MASTER_SUBFUNC_DEACTIVE_TGS },
payload, 4U + (tgidList.size() * 5U), true);
payload, 4U + (tgidList.size() * 5U), streamId, true);
}
/* Helper to send the list of peers to the specified peer. */
void FNENetwork::writePeerList(uint32_t peerId)
void FNENetwork::writePeerList(uint32_t peerId, uint32_t streamId)
{
if (!m_tidLookup->sendTalkgroups()) {
return;
@ -2260,7 +2254,7 @@ void FNENetwork::writePeerList(uint32_t peerId)
offs += PEER_LINK_BLOCK_SIZE;
writePeer(peerId, { NET_FUNC::PEER_LINK, NET_SUBFUNC::PL_PEER_LIST },
payload, bufSize, 0U, false, true, true);
payload, bufSize, 0U, streamId, false, true, true);
}
connection->lastPing(now);
@ -2269,22 +2263,26 @@ void FNENetwork::writePeerList(uint32_t peerId)
return;
}
/* Helper to send a data message to the specified peer. */
/* Helper to send a data message to the specified peer with a explicit packet sequence. */
bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data,
uint32_t length, uint16_t pktSeq, uint32_t streamId, bool queueOnly, bool directWrite) const
uint32_t length, uint16_t pktSeq, uint32_t streamId, bool queueOnly, bool incPktSeq, bool directWrite) const
{
if (streamId == 0U) {
LogError(LOG_NET, "PEER %u, trying to send data with a streamId of 0? BUGBUG", peerId);
}
auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; });
if (it != m_peers.end()) {
FNEPeerConnection* connection = m_peers.at(peerId);
if (connection != nullptr) {
uint32_t peerStreamId = connection->currStreamId();
if (streamId == 0U) {
streamId = peerStreamId;
}
sockaddr_storage addr = connection->socketStorage();
uint32_t addrLen = connection->sockStorageLen();
if (incPktSeq) {
pktSeq = connection->incStreamPktSeq(streamId, pktSeq);
}
if (directWrite)
return m_frameQueue->write(data, length, streamId, peerId, m_peerId, opcode, pktSeq, addr, addrLen);
else {
@ -2299,31 +2297,10 @@ bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const
return false;
}
/* Helper to send a data message to the specified peer. */
bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data,
uint32_t length, uint32_t streamId, bool queueOnly, bool incPktSeq, bool directWrite) const
{
auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; });
if (it != m_peers.end()) {
FNEPeerConnection* connection = m_peers.at(peerId);
if (connection != nullptr) {
if (incPktSeq) {
connection->pktLastSeq(connection->pktLastSeq() + 1);
}
uint16_t pktSeq = connection->pktLastSeq();
return writePeer(peerId, opcode, data, length, pktSeq, streamId, queueOnly, directWrite);
}
}
return false;
}
/* Helper to send a command message to the specified peer. */
bool FNENetwork::writePeerCommand(uint32_t peerId, FrameQueue::OpcodePair opcode,
const uint8_t* data, uint32_t length, bool incPktSeq) const
const uint8_t* data, uint32_t length, uint32_t streamId, bool incPktSeq) const
{
assert(peerId > 0);
@ -2335,12 +2312,12 @@ bool FNENetwork::writePeerCommand(uint32_t peerId, FrameQueue::OpcodePair opcode
}
uint32_t len = length + 6U;
return writePeer(peerId, opcode, buffer, len, 0U, false, incPktSeq, true);
return writePeer(peerId, opcode, buffer, len, RTP_END_OF_CALL_SEQ, streamId, false, incPktSeq, true);
}
/* Helper to send a ACK response to the specified peer. */
bool FNENetwork::writePeerACK(uint32_t peerId, const uint8_t* data, uint32_t length)
bool FNENetwork::writePeerACK(uint32_t peerId, uint32_t streamId, const uint8_t* data, uint32_t length)
{
uint8_t buffer[DATA_PACKET_LENGTH];
::memset(buffer, 0x00U, DATA_PACKET_LENGTH);
@ -2351,7 +2328,8 @@ bool FNENetwork::writePeerACK(uint32_t peerId, const uint8_t* data, uint32_t len
::memcpy(buffer + 6U, data, length);
}
return writePeer(peerId, { NET_FUNC::ACK, NET_SUBFUNC::NOP }, buffer, length + 10U, RTP_END_OF_CALL_SEQ, false, true);
return writePeer(peerId, { NET_FUNC::ACK, NET_SUBFUNC::NOP }, buffer, length + 10U, RTP_END_OF_CALL_SEQ, streamId,
false);
}
/* Helper to log a warning specifying which NAK reason is being sent a peer. */
@ -2393,7 +2371,7 @@ void FNENetwork::logPeerNAKReason(uint32_t peerId, const char* tag, NET_CONN_NAK
/* Helper to send a NAK response to the specified peer. */
bool FNENetwork::writePeerNAK(uint32_t peerId, const char* tag, NET_CONN_NAK_REASON reason)
bool FNENetwork::writePeerNAK(uint32_t peerId, uint32_t streamId, const char* tag, NET_CONN_NAK_REASON reason)
{
assert(peerId > 0);
assert(tag != nullptr);
@ -2405,7 +2383,8 @@ bool FNENetwork::writePeerNAK(uint32_t peerId, const char* tag, NET_CONN_NAK_REA
__SET_UINT16B((uint16_t)reason, buffer, 10U); // Reason
logPeerNAKReason(peerId, tag, reason);
return writePeer(peerId, { NET_FUNC::NAK, NET_SUBFUNC::NOP }, buffer, 10U, RTP_END_OF_CALL_SEQ, false, true);
return writePeer(peerId, { NET_FUNC::NAK, NET_SUBFUNC::NOP }, buffer, 10U, RTP_END_OF_CALL_SEQ, streamId,
false);
}
/* Helper to send a NAK response to the specified peer. */

@ -4,7 +4,7 @@
* GPLv2 Open Source. Use is subject to license terms.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* Copyright (C) 2023-2024 Bryan Biedenkapp, N2PLL
* Copyright (C) 2023-2025 Bryan Biedenkapp, N2PLL
*
*/
/**
@ -103,7 +103,6 @@ namespace network
FNEPeerConnection() :
m_id(0U),
m_ccPeerId(0U),
m_currStreamId(0U),
m_socketStorage(),
m_sockStorageLen(0U),
m_address(),
@ -119,8 +118,8 @@ namespace network
m_isSysView(false),
m_isPeerLink(false),
m_config(),
m_pktLastSeq(RTP_END_OF_CALL_SEQ),
m_pktNextSeq(1U)
m_streamSeqMutex(),
m_streamSeqNos()
{
/* stub */
}
@ -133,7 +132,6 @@ namespace network
FNEPeerConnection(uint32_t id, sockaddr_storage& socketStorage, uint32_t sockStorageLen) :
m_id(id),
m_ccPeerId(0U),
m_currStreamId(0U),
m_socketStorage(socketStorage),
m_sockStorageLen(sockStorageLen),
m_address(udp::Socket::address(socketStorage)),
@ -149,8 +147,8 @@ namespace network
m_isSysView(false),
m_isPeerLink(false),
m_config(),
m_pktLastSeq(RTP_END_OF_CALL_SEQ),
m_pktNextSeq(1U)
m_streamSeqMutex(),
m_streamSeqNos()
{
assert(id > 0U);
assert(sockStorageLen > 0U);
@ -158,6 +156,118 @@ namespace network
assert(m_port > 0U);
}
/**
* @brief Helper to return the current count of mapped RTP streams.
* @returns size_t
*/
size_t streamCount()
{
return m_streamSeqNos.size();
}
/**
* @brief Helper to determine if the stream ID has a stored RTP sequence.
* @param streamId Stream ID.
* @returns bool
*/
bool hasStreamPktSeq(uint64_t streamId)
{
bool ret = false;
m_streamSeqMutex.try_lock_for(std::chrono::milliseconds(60));
// determine if the stream has a current sequence no and return
{
auto it = m_streamSeqNos.find(streamId);
if (it == m_streamSeqNos.end()) {
ret = false;
}
else {
ret = true;
}
}
m_streamSeqMutex.unlock();
return ret;
}
/**
* @brief Helper to get the stored RTP sequence for the given stream ID.
* @param streamId Stream ID.
* @returns uint16_t
*/
uint16_t getStreamPktSeq(uint64_t streamId)
{
m_streamSeqMutex.try_lock_for(std::chrono::milliseconds(60));
// find the current sequence no and return
uint32_t pktSeq = 0U;
{
auto it = m_streamSeqNos.find(streamId);
if (it == m_streamSeqNos.end()) {
pktSeq = RTP_END_OF_CALL_SEQ;
} else {
pktSeq = m_streamSeqNos[streamId];
}
}
m_streamSeqMutex.unlock();
return pktSeq;
}
/**
* @brief Helper to increment the stored RTP sequence for the given stream ID.
* @param streamId Stream ID.
* @param initialSeq Initial sequence number to set.
* @returns uint16_t
*/
uint16_t incStreamPktSeq(uint64_t streamId, uint16_t initialSeq)
{
m_streamSeqMutex.try_lock_for(std::chrono::milliseconds(60));
// find the current sequence no, increment and return
uint32_t pktSeq = 0U;
{
auto it = m_streamSeqNos.find(streamId);
if (it == m_streamSeqNos.end()) {
m_streamSeqNos.insert({streamId, initialSeq});
} else {
pktSeq = m_streamSeqNos[streamId];
++pktSeq;
if (pktSeq > RTP_END_OF_CALL_SEQ) {
pktSeq = 0U;
}
m_streamSeqNos[streamId] = pktSeq;
}
}
m_streamSeqMutex.unlock();
return pktSeq;
}
/**
* @brief Helper to erase the stored RTP sequence for the given stream ID.
* @param streamId Stream ID.
* @returns uint16_t
*/
void eraseStreamPktSeq(uint64_t streamId)
{
m_streamSeqMutex.try_lock_for(std::chrono::milliseconds(60));
// find the sequence no and erase
{
auto entry = m_streamSeqNos.find(streamId);
if (entry != m_streamSeqNos.end()) {
m_streamSeqNos.erase(streamId);
}
}
m_streamSeqMutex.unlock();
}
public:
/**
* @brief Peer ID.
@ -173,11 +283,6 @@ namespace network
*/
__PROPERTY_PLAIN(uint32_t, ccPeerId);
/**
* @brief Current Stream ID.
*/
__PROPERTY_PLAIN(uint32_t, currStreamId);
/**
* @brief Unix socket storage containing the connected address.
*/
@ -246,14 +351,9 @@ namespace network
*/
__PROPERTY_PLAIN(json::object, config);
/**
* @brief Last received RTP sequence.
*/
__PROPERTY_PLAIN(uint16_t, pktLastSeq);
/**
* @brief Calculated next RTP sequence.
*/
__PROPERTY_PLAIN(uint16_t, pktNextSeq);
private:
std::timed_mutex m_streamSeqMutex;
std::unordered_map<uint64_t, uint16_t> m_streamSeqNos;
};
// ---------------------------------------------------------------------------
@ -526,9 +626,10 @@ namespace network
/**
* @brief Helper to complete setting up a repeater login request.
* @param peerId Peer ID.
* @param streamId Stream ID for the login sequence.
* @param connection Instance of the FNEPeerConnection class.
*/
void setupRepeaterLogin(uint32_t peerId, FNEPeerConnection* connection);
void setupRepeaterLogin(uint32_t peerId, uint32_t streamId, FNEPeerConnection* connection);
/**
* @brief Helper to send the ACL lists to the specified peer in a separate thread.
@ -545,33 +646,38 @@ namespace network
/**
* @brief Helper to send the list of whitelisted RIDs to the specified peer.
* @param peerId Peer ID.
* @param streamId Stream ID for this message.
* @param sendISSI Flag indicating the RID transfer is to an external peer via ISSI.
*/
void writeWhitelistRIDs(uint32_t peerId, bool sendISSI);
void writeWhitelistRIDs(uint32_t peerId, uint32_t streamId, bool sendISSI);
/**
* @brief Helper to send the list of blacklisted RIDs to the specified peer.
* @param peerId Peer ID.
* @param streamId Stream ID for this message.
*/
void writeBlacklistRIDs(uint32_t peerId);
void writeBlacklistRIDs(uint32_t peerId, uint32_t streamId);
/**
* @brief Helper to send the list of active TGIDs to the specified peer.
* @param peerId Peer ID.
* @param streamId Stream ID for this message.
* @param sendISSI Flag indicating the TGID transfer is to an external peer via ISSI.
*/
void writeTGIDs(uint32_t peerId, bool sendISSI);
void writeTGIDs(uint32_t peerId, uint32_t streamId, bool sendISSI);
/**
* @brief Helper to send the list of deactivated TGIDs to the specified peer.
* @param peerId Peer ID.
* @param streamId Stream ID for this message.
*/
void writeDeactiveTGIDs(uint32_t peerId);
void writeDeactiveTGIDs(uint32_t peerId, uint32_t streamId);
/**
* @brief Helper to send the list of peers to the specified peer.
* @param peerId Peer ID.
* @param streamId Stream ID for this message.
*/
void writePeerList(uint32_t peerId);
void writePeerList(uint32_t peerId, uint32_t streamId);
/**
* @brief Helper to send a data message to the specified peer.
* @brief Helper to send a data message to the specified peer with a explicit packet sequence.
* @param peerId Peer ID.
* @param opcode FNE network opcode pair.
* @param[in] data Buffer containing message to send to peer.
@ -579,23 +685,11 @@ namespace network
* @param pktSeq RTP packet sequence for this message.
* @param streamId Stream ID for this message.
* @param queueOnly Flag indicating this message should be queued for transmission.
* @param directWrite Flag indicating this message should be immediately directly written.
*/
bool writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length,
uint16_t pktSeq, uint32_t streamId, bool queueOnly = false, bool directWrite = false) const;
/**
* @brief Helper to send a data message to the specified peer.
* @param peerId Peer ID.
* @param opcode FNE network opcode pair.
* @param[in] data Buffer containing message to send to peer.
* @param length Length of buffer.
* @param streamId Stream ID for this message.
* @param queueOnly Flag indicating this message should be queued for transmission.
* @param incPktSeq Flag indicating the message should increment the packet sequence after transmission.
* @param directWrite Flag indicating this message should be immediately directly written.
*/
bool writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length,
uint32_t streamId, bool queueOnly = false, bool incPktSeq = false, bool directWrite = false) const;
uint16_t pktSeq, uint32_t streamId, bool queueOnly, bool incPktSeq = false, bool directWrite = false) const;
/**
* @brief Helper to send a command message to the specified peer.
@ -603,18 +697,20 @@ namespace network
* @param opcode FNE network opcode pair.
* @param[in] data Buffer containing message to send to peer.
* @param length Length of buffer.
* @param streamId Stream ID for this message.
* @param incPktSeq Flag indicating the message should increment the packet sequence after transmission.
*/
bool writePeerCommand(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data = nullptr, uint32_t length = 0U,
bool incPktSeq = false) const;
bool writePeerCommand(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length,
uint32_t streamId, bool incPktSeq) const;
/**
* @brief Helper to send a ACK response to the specified peer.
* @param peerId Peer ID.
* @param streamId Stream ID for this message.
* @param[in] data Buffer containing response data to send to peer.
* @param length Length of buffer.
*/
bool writePeerACK(uint32_t peerId, const uint8_t* data = nullptr, uint32_t length = 0U);
bool writePeerACK(uint32_t peerId, uint32_t streamId, const uint8_t* data = nullptr, uint32_t length = 0U);
/**
* @brief Helper to log a warning specifying which NAK reason is being sent a peer.
@ -626,10 +722,11 @@ namespace network
/**
* @brief Helper to send a NAK response to the specified peer.
* @param peerId Peer ID.
* @param streamId Stream ID for this message.
* @param tag Tag.
* @param reason NAK reason.
*/
bool writePeerNAK(uint32_t peerId, const char* tag, NET_CONN_NAK_REASON reason = NET_CONN_NAK_GENERAL_FAILURE);
bool writePeerNAK(uint32_t peerId, uint32_t streamId, const char* tag, NET_CONN_NAK_REASON reason = NET_CONN_NAK_GENERAL_FAILURE);
/**
* @brief Helper to send a NAK response to the specified peer.
* @param peerId Peer ID.

@ -58,6 +58,20 @@ void PeerNetwork::setPeerLookups(lookups::PeerListLookup* pidLookup)
m_pidLookup = pidLookup;
}
/* Gets the received DMR stream ID. */
uint32_t PeerNetwork::getRxDMRStreamId(uint32_t slotNo) const
{
assert(slotNo == 1U || slotNo == 2U);
if (slotNo == 1U) {
return m_rxDMRStreamId[0U];
}
else {
return m_rxDMRStreamId[1U];
}
}
/* Checks if the passed peer ID is blocked from sending to this peer. */
bool PeerNetwork::checkBlockedPeer(uint32_t peerId)

@ -63,6 +63,23 @@ namespace network
*/
void setPeerLookups(lookups::PeerListLookup* pidLookup);
/**
* @brief Gets the received DMR stream ID.
* @param slotNo DMR slot to get stream ID for.
* @return uint32_t Stream ID for the given DMR slot.
*/
uint32_t getRxDMRStreamId(uint32_t slotNo) const;
/**
* @brief Gets the received P25 stream ID.
* @return uint32_t Stream ID.
*/
uint32_t getRxP25StreamId() const { return m_rxP25StreamId; }
/**
* @brief Gets the received NXDN stream ID.
* @return uint32_t Stream ID.
*/
uint32_t getRxNXDNStreamId() const { return m_rxNXDNStreamId; }
/**
* @brief Gets the blocked traffic peer ID table.
* @returns std::vector<uint32_t> List of peer IDs this peer network cannot send traffic to.

@ -982,7 +982,7 @@ void TagDMRData::write_CSBK(uint32_t peerId, uint8_t slot, lc::CSBK* csbk)
}
if (peerId > 0U) {
m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_DMR }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, streamId, false, true);
m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_DMR }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, streamId, false);
} else {
// repeat traffic to the connected peers
if (m_network->m_peers.size() > 0U) {

@ -759,5 +759,5 @@ void TagNXDNData::write_Message(uint32_t peerId, lc::RCCH* rcch)
}
uint32_t streamId = m_network->createStreamId();
m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_NXDN }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, streamId, false, true);
m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_NXDN }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, streamId, false);
}

@ -494,12 +494,14 @@ void TagP25Data::playbackParrot()
if (message != nullptr) {
if (m_network->m_parrotOnlyOriginating) {
LogMessage(LOG_NET, "P25, Parrot Grant Demand, peer = %u, srcId = %u, dstId = %u", pkt.peerId, srcId, dstId);
m_network->writePeer(pkt.peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, 0U, false);
m_network->writePeer(pkt.peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength,
RTP_END_OF_CALL_SEQ, m_network->createStreamId(), false);
} else {
// repeat traffic to the connected peers
for (auto peer : m_network->m_peers) {
LogMessage(LOG_NET, "P25, Parrot Grant Demand, peer = %u, srcId = %u, dstId = %u", peer.first, srcId, dstId);
m_network->writePeer(peer.first, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, 0U, false);
m_network->writePeer(peer.first, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength,
RTP_END_OF_CALL_SEQ, m_network->createStreamId(), false);
}
}
}
@ -1349,7 +1351,8 @@ void TagP25Data::write_TSDU(uint32_t peerId, lc::TSBK* tsbk)
uint32_t streamId = m_network->createStreamId();
if (peerId > 0U) {
m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, streamId, false, true);
m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength,
RTP_END_OF_CALL_SEQ, streamId, false);
} else {
// repeat traffic to the connected peers
if (m_network->m_peers.size() > 0U) {
@ -1360,7 +1363,8 @@ void TagP25Data::write_TSDU(uint32_t peerId, lc::TSBK* tsbk)
m_network->m_frameQueue->flushQueue();
}
m_network->writePeer(peer.first, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, streamId, true);
m_network->writePeer(peer.first, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength,
RTP_END_OF_CALL_SEQ, streamId, true);
if (m_network->m_debug) {
LogDebug(LOG_NET, "P25, peer = %u, len = %u, streamId = %u",
peer.first, messageLength, streamId);

@ -4,7 +4,7 @@
* GPLv2 Open Source. Use is subject to license terms.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* Copyright (C) 2024 Bryan Biedenkapp, N2PLL
* Copyright (C) 2024-2025 Bryan Biedenkapp, N2PLL
*
*/
#include "fne/Defines.h"

@ -4,7 +4,7 @@
* GPLv2 Open Source. Use is subject to license terms.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* Copyright (C) 2024 Bryan Biedenkapp, N2PLL
* Copyright (C) 2024-2025 Bryan Biedenkapp, N2PLL
*
*/
#include "fne/Defines.h"
@ -1045,7 +1045,7 @@ bool P25PacketData::writeNetwork(uint32_t peerId, network::PeerNetwork* peerNet,
if (peerNet != nullptr) {
return peerNet->writeMaster({ NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, pktSeq, streamId);
} else {
return m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, pktSeq, streamId, false, true);
return m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, pktSeq, streamId, false);
}
}

@ -36,6 +36,10 @@ using namespace lookups;
// Global Variables
// ---------------------------------------------------------------------------
#ifndef SIGHUP
#define SIGHUP 1
#endif
int g_signal = 0;
bool g_calibrate = false;
bool g_setup = false;
@ -302,15 +306,15 @@ int main(int argc, char** argv)
delete host;
}
if (g_signal == 2)
if (g_signal == SIGINT)
::LogInfoEx(LOG_HOST, "[STOP] dvmhost:main SIGINT");
if (g_signal == 15)
if (g_signal == SIGTERM)
::LogInfoEx(LOG_HOST, "[STOP] dvmhost:main SIGTERM");
if (g_signal == 1)
if (g_signal == SIGHUP)
::LogInfoEx(LOG_HOST, "[RSTR] dvmhost:main SIGHUP");
} while (g_signal == 1);
} while (g_signal == SIGHUP);
::LogFinalise();
::ActivityLogFinalise();

@ -5,7 +5,7 @@
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* Copyright (C) 2015,2016,2017 Jonathan Naylor, G4KLX
* Copyright (C) 2017-2024 Bryan Biedenkapp, N2PLL
* Copyright (C) 2017-2025 Bryan Biedenkapp, N2PLL
*
*/
#include "Defines.h"
@ -263,8 +263,25 @@ void Network::clock(uint32_t ms)
if (fneHeader.getSubFunction() == NET_SUBFUNC::PROTOCOL_SUBFUNC_DMR) { // Encapsulated DMR data frame
if (m_enabled && m_dmrEnabled) {
uint32_t slotNo = (buffer[15U] & 0x80U) == 0x80U ? 2U : 1U;
if (m_debug) {
LogDebug(LOG_NET, "DMR Slot %u, peer = %u, len = %u, pktSeq = %u, streamId = %u",
slotNo, peerId, length, rtpHeader.getSequence(), streamId);
}
if (m_promiscuousPeer) {
m_rxDMRStreamId[slotNo] = streamId;
m_pktLastSeq = m_pktSeq;
}
else {
if (m_rxDMRStreamId[slotNo] == 0U) {
if (rtpHeader.getSequence() == RTP_END_OF_CALL_SEQ) {
m_rxDMRStreamId[slotNo] = 0U;
}
else {
m_rxDMRStreamId[slotNo] = streamId;
}
m_pktLastSeq = m_pktSeq;
}
else {
@ -277,6 +294,16 @@ void Network::clock(uint32_t ms)
m_pktLastSeq = m_pktSeq;
}
if (m_rxDMRStreamId[slotNo] != streamId && (rtpHeader.getSequence() != RTP_END_OF_CALL_SEQ)) {
//LogDebug(LOG_NET, "DMR Incorrect Stream; %u != %u", streamId, m_rxDMRStreamId[slotNo]);
break;
}
if (rtpHeader.getSequence() == RTP_END_OF_CALL_SEQ) {
m_rxDMRStreamId[slotNo] = 0U;
}
}
}
if (m_debug)
@ -291,8 +318,24 @@ void Network::clock(uint32_t ms)
}
else if (fneHeader.getSubFunction() == NET_SUBFUNC::PROTOCOL_SUBFUNC_P25) { // Encapsulated P25 data frame
if (m_enabled && m_p25Enabled) {
if (m_debug) {
LogDebug(LOG_NET, "P25, peer = %u, len = %u, pktSeq = %u, streamId = %u",
peerId, length, rtpHeader.getSequence(), streamId);
}
if (m_promiscuousPeer) {
m_rxP25StreamId = streamId;
m_pktLastSeq = m_pktSeq;
}
else {
if (m_rxP25StreamId == 0U) {
if (rtpHeader.getSequence() == RTP_END_OF_CALL_SEQ) {
m_rxP25StreamId = 0U;
}
else {
m_rxP25StreamId = streamId;
}
m_pktLastSeq = m_pktSeq;
}
else {
@ -305,6 +348,16 @@ void Network::clock(uint32_t ms)
m_pktLastSeq = m_pktSeq;
}
if (m_rxP25StreamId != streamId && (rtpHeader.getSequence() != RTP_END_OF_CALL_SEQ)) {
//LogDebug(LOG_NET, "P25 Incorrect Stream; %u != %u", streamId, m_rxP25StreamId);
break;
}
if (rtpHeader.getSequence() == RTP_END_OF_CALL_SEQ) {
m_rxP25StreamId = 0U;
}
}
}
if (m_debug)
@ -319,8 +372,24 @@ void Network::clock(uint32_t ms)
}
else if (fneHeader.getSubFunction() == NET_SUBFUNC::PROTOCOL_SUBFUNC_NXDN) { // Encapsulated NXDN data frame
if (m_enabled && m_nxdnEnabled) {
if (m_debug) {
LogDebug(LOG_NET, "NXDN, peer = %u, len = %u, pktSeq = %u, streamId = %u",
peerId, length, rtpHeader.getSequence(), streamId);
}
if (m_promiscuousPeer) {
m_rxNXDNStreamId = streamId;
m_pktLastSeq = m_pktSeq;
}
else {
if (m_rxNXDNStreamId == 0U) {
if (rtpHeader.getSequence() == RTP_END_OF_CALL_SEQ) {
m_rxNXDNStreamId = 0U;
}
else {
m_rxNXDNStreamId = streamId;
}
m_pktLastSeq = m_pktSeq;
}
else {
@ -333,6 +402,16 @@ void Network::clock(uint32_t ms)
m_pktLastSeq = m_pktSeq;
}
if (m_rxNXDNStreamId != streamId && (rtpHeader.getSequence() != RTP_END_OF_CALL_SEQ)) {
//LogDebug(LOG_NET, "NXDN Incorrect Stream; %u != %u", streamId, m_rxNXDNStreamId);
break;
}
if (rtpHeader.getSequence() == RTP_END_OF_CALL_SEQ) {
m_rxNXDNStreamId = 0U;
}
}
}
if (m_debug)

Loading…
Cancel
Save

Powered by TurnKey Linux.