diff --git a/src/bridge/BridgeMain.cpp b/src/bridge/BridgeMain.cpp index 0ca43790..ea03b492 100644 --- a/src/bridge/BridgeMain.cpp +++ b/src/bridge/BridgeMain.cpp @@ -32,6 +32,10 @@ using namespace lookups; // Global Variables // --------------------------------------------------------------------------- +#ifndef SIGHUP +#define SIGHUP 1 +#endif + int g_signal = 0; std::string g_progExe = std::string(__EXE_NAME__); std::string g_iniFile = std::string(DEFAULT_CONF_FILE); @@ -293,15 +297,15 @@ int main(int argc, char** argv) ret = bridge->run(); delete bridge; - if (g_signal == 2) + if (g_signal == SIGINT) ::LogInfoEx(LOG_HOST, "Exited on receipt of SIGINT"); - if (g_signal == 15) + if (g_signal == SIGTERM) ::LogInfoEx(LOG_HOST, "Exited on receipt of SIGTERM"); - if (g_signal == 1) + if (g_signal == SIGHUP) ::LogInfoEx(LOG_HOST, "Restarting on receipt of SIGHUP"); - } while (g_signal == 1); + } while (g_signal == SIGHUP); ::LogFinalise(); ::ActivityLogFinalise(); diff --git a/src/common/Defines.h b/src/common/Defines.h index 01190b25..56d6fb1e 100644 --- a/src/common/Defines.h +++ b/src/common/Defines.h @@ -109,7 +109,7 @@ typedef unsigned long long ulong64_t; #define VERSION_MAJOR "04" #define VERSION_MINOR "11" -#define VERSION_REV "F" +#define VERSION_REV "G" #define __NETVER__ "DVM_R" VERSION_MAJOR VERSION_REV VERSION_MINOR #define __VER__ VERSION_MAJOR "." VERSION_MINOR VERSION_REV " (R" VERSION_MAJOR VERSION_REV VERSION_MINOR " " __GIT_VER__ ")" diff --git a/src/common/network/BaseNetwork.cpp b/src/common/network/BaseNetwork.cpp index c34f4a76..3cdb384d 100644 --- a/src/common/network/BaseNetwork.cpp +++ b/src/common/network/BaseNetwork.cpp @@ -544,12 +544,7 @@ bool BaseNetwork::writeP25TDU(const p25::lc::LC& control, const p25::data::LowSp return false; } - uint16_t seq = pktSeq(resetSeq); - if (controlByte == 0x00U) { - seq = RTP_END_OF_CALL_SEQ; - } - - return writeMaster({ NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, seq, m_p25StreamId); + return writeMaster({ NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, m_p25StreamId); } /* Writes P25 TSDU frame data to the network. */ diff --git a/src/fne/FNEMain.cpp b/src/fne/FNEMain.cpp index 765c6cbb..7a7ba851 100644 --- a/src/fne/FNEMain.cpp +++ b/src/fne/FNEMain.cpp @@ -32,6 +32,10 @@ using namespace lookups; // Global Variables // --------------------------------------------------------------------------- +#ifndef SIGHUP +#define SIGHUP 1 +#endif + int g_signal = 0; std::string g_progExe = std::string(__EXE_NAME__); std::string g_iniFile = std::string(DEFAULT_CONF_FILE); @@ -209,15 +213,15 @@ int main(int argc, char** argv) ret = fne->run(); delete fne; - if (g_signal == 2) + if (g_signal == SIGINT) ::LogInfoEx(LOG_HOST, "[STOP] dvmfne:main SIGINT"); - if (g_signal == 15) + if (g_signal == SIGTERM) ::LogInfoEx(LOG_HOST, "[STOP] dvmfne:main SIGTERM"); - if (g_signal == 1) + if (g_signal == SIGHUP) ::LogInfoEx(LOG_HOST, "[RSTR] dvmfne:main SIGHUP"); - } while (g_signal == 1); + } while (g_signal == SIGHUP); ::LogFinalise(); ::ActivityLogFinalise(); diff --git a/src/fne/HostFNE.cpp b/src/fne/HostFNE.cpp index 95789346..f9818af8 100644 --- a/src/fne/HostFNE.cpp +++ b/src/fne/HostFNE.cpp @@ -4,7 +4,7 @@ * GPLv2 Open Source. Use is subject to license terms. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * Copyright (C) 2023,2024 Bryan Biedenkapp, N2PLL + * Copyright (C) 2023,2024,2025 Bryan Biedenkapp, N2PLL * */ #include "Defines.h" @@ -972,7 +972,7 @@ void HostFNE::processPeer(network::PeerNetwork* peerNetwork) if (ret) { uint32_t peerId = peerNetwork->getPeerId(); uint32_t slotNo = (data[15U] & 0x80U) == 0x80U ? 2U : 1U; - uint32_t streamId = peerNetwork->getDMRStreamId(slotNo); + uint32_t streamId = peerNetwork->getRxDMRStreamId(slotNo); m_network->dmrTrafficHandler()->processFrame(data.get(), length, peerId, peerNetwork->pktLastSeq(), streamId, true); } @@ -985,7 +985,7 @@ void HostFNE::processPeer(network::PeerNetwork* peerNetwork) UInt8Array data = peerNetwork->readP25(ret, length); if (ret) { uint32_t peerId = peerNetwork->getPeerId(); - uint32_t streamId = peerNetwork->getP25StreamId(); + uint32_t streamId = peerNetwork->getRxP25StreamId(); m_network->p25TrafficHandler()->processFrame(data.get(), length, peerId, peerNetwork->pktLastSeq(), streamId, true); } @@ -998,7 +998,7 @@ void HostFNE::processPeer(network::PeerNetwork* peerNetwork) UInt8Array data = peerNetwork->readNXDN(ret, length); if (ret) { uint32_t peerId = peerNetwork->getPeerId(); - uint32_t streamId = peerNetwork->getNXDNStreamId(); + uint32_t streamId = peerNetwork->getRxNXDNStreamId(); m_network->nxdnTrafficHandler()->processFrame(data.get(), length, peerId, peerNetwork->pktLastSeq(), streamId, true); } diff --git a/src/fne/network/DiagNetwork.cpp b/src/fne/network/DiagNetwork.cpp index c623e41d..9ee6c16b 100644 --- a/src/fne/network/DiagNetwork.cpp +++ b/src/fne/network/DiagNetwork.cpp @@ -4,7 +4,7 @@ * GPLv2 Open Source. Use is subject to license terms. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * Copyright (C) 2023-2024 Bryan Biedenkapp, N2PLL + * Copyright (C) 2023-2025 Bryan Biedenkapp, N2PLL * */ #include "fne/Defines.h" @@ -177,20 +177,17 @@ void* DiagNetwork::threadedNetworkRx(void* arg) if (connection != nullptr) { if (pktSeq == RTP_END_OF_CALL_SEQ) { - connection->pktLastSeq(pktSeq); - connection->pktNextSeq(0U); + connection->eraseStreamPktSeq(streamId); // attempt to erase packet sequence for the stream } else { - if ((connection->currStreamId() == streamId) && (pktSeq != connection->pktNextSeq()) && (pktSeq != (RTP_END_OF_CALL_SEQ - 1U))) { - LogWarning(LOG_NET, "PEER %u (%s) stream %u out-of-sequence; %u != %u", peerId, connection->identity().c_str(), - streamId, pktSeq, connection->pktNextSeq()); + if (connection->hasStreamPktSeq(streamId)) { + uint16_t currPkt = connection->getStreamPktSeq(streamId); + if ((pktSeq != currPkt) && (pktSeq != (RTP_END_OF_CALL_SEQ - 1U)) && pktSeq != 0U) { + LogWarning(LOG_NET, "PEER %u (%s) stream %u out-of-sequence; %u != %u", peerId, connection->identity().c_str(), + streamId, pktSeq, currPkt); + } } - connection->currStreamId(streamId); - connection->pktLastSeq(pktSeq); - connection->pktNextSeq(pktSeq + 1); - if (connection->pktNextSeq() > (RTP_END_OF_CALL_SEQ - 1U)) { - connection->pktNextSeq(0U); - } + connection->incStreamPktSeq(streamId, pktSeq + 1U); } } @@ -255,10 +252,6 @@ void* DiagNetwork::threadedNetworkRx(void* arg) for (auto peer : network->m_peers) { if (peer.second != nullptr) { if (peer.second->isSysView()) { - uint32_t peerStreamId = peer.second->currStreamId(); - if (streamId == 0U) { - streamId = peerStreamId; - } sockaddr_storage addr = peer.second->socketStorage(); uint32_t addrLen = peer.second->sockStorageLen(); @@ -284,7 +277,7 @@ void* DiagNetwork::threadedNetworkRx(void* arg) } } else { - network->writePeerNAK(pktPeerId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED); + network->writePeerNAK(pktPeerId, streamId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } @@ -322,7 +315,7 @@ void* DiagNetwork::threadedNetworkRx(void* arg) } } else { - network->writePeerNAK(peerId, TAG_TRANSFER_DIAG_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED); + network->writePeerNAK(peerId, streamId, TAG_TRANSFER_DIAG_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } @@ -341,10 +334,6 @@ void* DiagNetwork::threadedNetworkRx(void* arg) for (auto peer : network->m_peers) { if (peer.second != nullptr) { if (peer.second->isSysView()) { - uint32_t peerStreamId = peer.second->currStreamId(); - if (streamId == 0U) { - streamId = peerStreamId; - } sockaddr_storage addr = peer.second->socketStorage(); uint32_t addrLen = peer.second->sockStorageLen(); @@ -374,13 +363,13 @@ void* DiagNetwork::threadedNetworkRx(void* arg) } } else { - network->writePeerNAK(pktPeerId, TAG_TRANSFER_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED); + network->writePeerNAK(pktPeerId, streamId, TAG_TRANSFER_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } } else { - network->writePeerNAK(peerId, TAG_TRANSFER, NET_CONN_NAK_ILLEGAL_PACKET); + network->writePeerNAK(peerId, streamId, TAG_TRANSFER, NET_CONN_NAK_ILLEGAL_PACKET); Utils::dump("unknown transfer opcode from the peer", req->buffer, req->length); } } @@ -420,7 +409,7 @@ void* DiagNetwork::threadedNetworkRx(void* arg) } } else { - network->writePeerNAK(peerId, TAG_PEER_LINK, NET_CONN_NAK_FNE_UNAUTHORIZED); + network->writePeerNAK(peerId, streamId, TAG_PEER_LINK, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } diff --git a/src/fne/network/FNENetwork.cpp b/src/fne/network/FNENetwork.cpp index 4c16dd79..b9a64c17 100644 --- a/src/fne/network/FNENetwork.cpp +++ b/src/fne/network/FNENetwork.cpp @@ -4,7 +4,7 @@ * GPLv2 Open Source. Use is subject to license terms. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * Copyright (C) 2023-2024 Bryan Biedenkapp, N2PLL + * Copyright (C) 2023-2025 Bryan Biedenkapp, N2PLL * */ #include "fne/Defines.h" @@ -403,8 +403,9 @@ void FNENetwork::close() uint8_t buffer[1U]; ::memset(buffer, 0x00U, 1U); + uint32_t streamId = createStreamId(); for (auto peer : m_peers) { - writePeer(peer.first, { NET_FUNC::MST_CLOSING, NET_SUBFUNC::NOP }, buffer, 1U, (uint16_t)0U, 0U); + writePeer(peer.first, { NET_FUNC::MST_CLOSING, NET_SUBFUNC::NOP }, buffer, 1U, RTP_END_OF_CALL_SEQ, streamId, false); } } @@ -459,21 +460,18 @@ void* FNENetwork::threadedNetworkRx(void* arg) // only reset packet sequences if we're a PROTOCOL or RPTC function if ((req->fneHeader.getFunction() == NET_FUNC::PROTOCOL) || (req->fneHeader.getFunction() == NET_FUNC::RPTC)) { - connection->pktLastSeq(pktSeq); - connection->pktNextSeq(0U); + connection->eraseStreamPktSeq(streamId); // attempt to erase packet sequence for the stream } } else { - if ((connection->currStreamId() == streamId) && (pktSeq != connection->pktNextSeq()) && (pktSeq != (RTP_END_OF_CALL_SEQ - 1U)) && pktSeq != 0U) { - LogWarning(LOG_NET, "PEER %u (%s) stream %u out-of-sequence; %u != %u", peerId, connection->identity().c_str(), - streamId, pktSeq, connection->pktNextSeq()); + if (connection->hasStreamPktSeq(streamId)) { + uint16_t currPkt = connection->getStreamPktSeq(streamId); + if ((pktSeq != currPkt) && (pktSeq != (RTP_END_OF_CALL_SEQ - 1U)) && pktSeq != 0U) { + LogWarning(LOG_NET, "PEER %u (%s) stream %u out-of-sequence; %u != %u", peerId, connection->identity().c_str(), + streamId, pktSeq, currPkt); + } } - connection->currStreamId(streamId); - connection->pktLastSeq(pktSeq); - connection->pktNextSeq(pktSeq + 1); - if (connection->pktNextSeq() > (RTP_END_OF_CALL_SEQ - 1U)) { - connection->pktNextSeq(0U); - } + connection->incStreamPktSeq(streamId, pktSeq + 1U); } } @@ -510,7 +508,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) network->m_tagDMR->processFrame(req->buffer, req->length, peerId, req->rtpHeader.getSequence(), streamId); } } else { - network->writePeerNAK(peerId, TAG_DMR_DATA, NET_CONN_NAK_MODE_NOT_ENABLED); + network->writePeerNAK(peerId, streamId, TAG_DMR_DATA, NET_CONN_NAK_MODE_NOT_ENABLED); } } } @@ -533,7 +531,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) network->m_tagP25->processFrame(req->buffer, req->length, peerId, req->rtpHeader.getSequence(), streamId); } } else { - network->writePeerNAK(peerId, TAG_P25_DATA, NET_CONN_NAK_MODE_NOT_ENABLED); + network->writePeerNAK(peerId, streamId, TAG_P25_DATA, NET_CONN_NAK_MODE_NOT_ENABLED); } } } @@ -556,7 +554,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) network->m_tagNXDN->processFrame(req->buffer, req->length, peerId, req->rtpHeader.getSequence(), streamId); } } else { - network->writePeerNAK(peerId, TAG_NXDN_DATA, NET_CONN_NAK_MODE_NOT_ENABLED); + network->writePeerNAK(peerId, streamId, TAG_NXDN_DATA, NET_CONN_NAK_MODE_NOT_ENABLED); } } } @@ -588,9 +586,8 @@ void* FNENetwork::threadedNetworkRx(void* arg) FNEPeerConnection* connection = new FNEPeerConnection(peerId, req->address, req->addrLen); connection->lastPing(now); - connection->currStreamId(streamId); - network->setupRepeaterLogin(peerId, connection); + network->setupRepeaterLogin(peerId, streamId, connection); // check if the peer is in the peer ACL list if (network->m_peerListLookup->getACL()) { @@ -625,10 +622,9 @@ void* FNENetwork::threadedNetworkRx(void* arg) connection = new FNEPeerConnection(peerId, req->address, req->addrLen); connection->lastPing(now); - connection->currStreamId(streamId); network->erasePeerAffiliations(peerId); - network->setupRepeaterLogin(peerId, connection); + network->setupRepeaterLogin(peerId, streamId, connection); // check if the peer is in the peer ACL list if (network->m_peerListLookup->getACL()) { @@ -745,7 +741,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) if (validHash) { connection->connectionState(NET_STAT_WAITING_CONFIG); - network->writePeerACK(peerId); + network->writePeerACK(peerId, streamId); LogInfoEx(LOG_NET, "PEER %u RPTK ACK, completed the login exchange", peerId); network->m_peers[peerId] = connection; } @@ -822,7 +818,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) buffer[0U] = 0x80U; } - network->writePeerACK(peerId, buffer, 1U); + network->writePeerACK(peerId, streamId, buffer, 1U); LogInfoEx(LOG_NET, "PEER %u RPTC ACK, completed the configuration exchange", peerId); json::object peerConfig = connection->config(); @@ -943,7 +939,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) if (dt < now) { LogInfoEx(LOG_NET, "PEER %u (%s) updating ACL list, dt = %u, now = %u", peerId, connection->identity().c_str(), dt, now); - if (connection->pktLastSeq() == RTP_END_OF_CALL_SEQ) { + if (connection->streamCount() <= 1) { network->peerACLUpdate(peerId); } connection->lastACLUpdate(now); @@ -963,7 +959,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) payload[7U] = (uint8_t)((now >> 0) & 0xFFU); network->m_peers[peerId] = connection; - network->writePeerCommand(peerId, { NET_FUNC::PONG, NET_SUBFUNC::NOP }, payload, 8U); + network->writePeerCommand(peerId, { NET_FUNC::PONG, NET_SUBFUNC::NOP }, payload, 8U, streamId, false); if (network->m_reportPeerPing) { LogInfoEx(LOG_NET, "PEER %u (%s) ping, pingsReceived = %u, lastPing = %u, now = %u", peerId, connection->identity().c_str(), @@ -971,7 +967,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) } } else { - network->writePeerNAK(peerId, TAG_REPEATER_PING); + network->writePeerNAK(peerId, streamId, TAG_REPEATER_PING); } } } @@ -1001,7 +997,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) if (network->m_tagDMR != nullptr) { network->m_tagDMR->processGrantReq(srcId, dstId, slot, unitToUnit, peerId, req->rtpHeader.getSequence(), streamId); } else { - network->writePeerNAK(peerId, TAG_DMR_DATA, NET_CONN_NAK_MODE_NOT_ENABLED); + network->writePeerNAK(peerId, streamId, TAG_DMR_DATA, NET_CONN_NAK_MODE_NOT_ENABLED); } } break; @@ -1010,7 +1006,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) if (network->m_tagP25 != nullptr) { network->m_tagP25->processGrantReq(srcId, dstId, unitToUnit, peerId, req->rtpHeader.getSequence(), streamId); } else { - network->writePeerNAK(peerId, TAG_P25_DATA, NET_CONN_NAK_MODE_NOT_ENABLED); + network->writePeerNAK(peerId, streamId, TAG_P25_DATA, NET_CONN_NAK_MODE_NOT_ENABLED); } } break; @@ -1019,18 +1015,18 @@ void* FNENetwork::threadedNetworkRx(void* arg) if (network->m_tagNXDN != nullptr) { network->m_tagNXDN->processGrantReq(srcId, dstId, unitToUnit, peerId, req->rtpHeader.getSequence(), streamId); } else { - network->writePeerNAK(peerId, TAG_NXDN_DATA, NET_CONN_NAK_MODE_NOT_ENABLED); + network->writePeerNAK(peerId, streamId, TAG_NXDN_DATA, NET_CONN_NAK_MODE_NOT_ENABLED); } } break; default: - network->writePeerNAK(peerId, TAG_REPEATER_GRANT, NET_CONN_NAK_ILLEGAL_PACKET); + network->writePeerNAK(peerId, streamId, TAG_REPEATER_GRANT, NET_CONN_NAK_ILLEGAL_PACKET); Utils::dump("unknown state for grant request from the peer", req->buffer, req->length); break; } } else { - network->writePeerNAK(peerId, TAG_REPEATER_GRANT, NET_CONN_NAK_FNE_UNAUTHORIZED); + network->writePeerNAK(peerId, streamId, TAG_REPEATER_GRANT, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } @@ -1074,7 +1070,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) } } else { - network->writePeerNAK(peerId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED); + network->writePeerNAK(peerId, streamId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } @@ -1112,7 +1108,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) } } else { - network->writePeerNAK(peerId, TAG_TRANSFER_DIAG_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED); + network->writePeerNAK(peerId, streamId, TAG_TRANSFER_DIAG_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } @@ -1122,7 +1118,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) // main traffic port status transfers aren't supported for performance reasons } else { - network->writePeerNAK(peerId, TAG_TRANSFER, NET_CONN_NAK_ILLEGAL_PACKET); + network->writePeerNAK(peerId, streamId, TAG_TRANSFER, NET_CONN_NAK_ILLEGAL_PACKET); Utils::dump("unknown transfer opcode from the peer", req->buffer, req->length); } } @@ -1138,7 +1134,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) lookups::AffiliationLookup* aff = network->m_peerAffiliations[peerId]; if (aff == nullptr) { LogError(LOG_NET, "PEER %u (%s) has uninitialized affiliations lookup?", peerId, connection->identity().c_str()); - network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID); + network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID); } // validate peer (simple validation really) @@ -1161,7 +1157,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) } } else { - network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); + network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } @@ -1174,7 +1170,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) lookups::AffiliationLookup* aff = network->m_peerAffiliations[peerId]; if (aff == nullptr) { LogError(LOG_NET, "PEER %u (%s) has uninitialized affiliations lookup?", peerId, connection->identity().c_str()); - network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID); + network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID); } // validate peer (simple validation really) @@ -1195,7 +1191,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) } } else { - network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); + network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } @@ -1208,7 +1204,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) lookups::AffiliationLookup* aff = network->m_peerAffiliations[peerId]; if (aff == nullptr) { LogError(LOG_NET, "PEER %u (%s) has uninitialized affiliations lookup?", peerId, connection->identity().c_str()); - network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID); + network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID); } // validate peer (simple validation really) @@ -1229,7 +1225,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) } } else { - network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); + network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } @@ -1242,7 +1238,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) lookups::AffiliationLookup* aff = network->m_peerAffiliations[peerId]; if (aff == nullptr) { LogError(LOG_NET, "PEER %u (%s) has uninitialized affiliations lookup?", peerId, connection->identity().c_str()); - network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID); + network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID); } // validate peer (simple validation really) @@ -1263,7 +1259,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) } } else { - network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); + network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } @@ -1279,7 +1275,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) lookups::AffiliationLookup* aff = network->m_peerAffiliations[peerId]; if (aff == nullptr) { LogError(LOG_NET, "PEER %u (%s) has uninitialized affiliations lookup?", peerId, connection->identity().c_str()); - network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID); + network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID); } if (aff != nullptr) { @@ -1311,7 +1307,7 @@ void* FNENetwork::threadedNetworkRx(void* arg) } } else { - network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); + network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } @@ -1356,13 +1352,13 @@ void* FNENetwork::threadedNetworkRx(void* arg) } } else { - network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); + network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } } else { - network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_ILLEGAL_PACKET); + network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_ILLEGAL_PACKET); Utils::dump("unknown announcement opcode from the peer", req->buffer, req->length); } } @@ -1545,7 +1541,7 @@ std::string FNENetwork::resolvePeerIdentity(uint32_t peerId) /* Helper to complete setting up a repeater login request. */ -void FNENetwork::setupRepeaterLogin(uint32_t peerId, FNEPeerConnection* connection) +void FNENetwork::setupRepeaterLogin(uint32_t peerId, uint32_t streamId, FNEPeerConnection* connection) { std::uniform_int_distribution dist(DVM_RAND_MIN, DVM_RAND_MAX); connection->salt(dist(m_random)); @@ -1560,7 +1556,7 @@ void FNENetwork::setupRepeaterLogin(uint32_t peerId, FNEPeerConnection* connecti ::memset(salt, 0x00U, 4U); __SET_UINT32(connection->salt(), salt, 0U); - writePeerACK(peerId, salt, 4U); + writePeerACK(peerId, streamId, salt, 4U); LogInfoEx(LOG_NET, "PEER %u RPTL ACK, challenge response sent for login", peerId); } @@ -1607,26 +1603,24 @@ void* FNENetwork::threadedACLUpdate(void* arg) FNEPeerConnection* connection = network->m_peers[req->peerId]; if (connection != nullptr) { + uint32_t aclStreamId = network->createStreamId(); + // if the connection is an external peer, and peer is participating in peer link, // send the peer proper configuration data if (connection->isExternalPeer() && connection->isPeerLink()) { LogInfoEx(LOG_NET, "PEER %u (%s) sending Peer-Link ACL list updates", req->peerId, peerIdentity.c_str()); - network->writeWhitelistRIDs(req->peerId, true); - network->writeTGIDs(req->peerId, true); - - connection->pktLastSeq(RTP_END_OF_CALL_SEQ - 1U); - network->writePeerList(req->peerId); + network->writeWhitelistRIDs(req->peerId, aclStreamId, true); + network->writeTGIDs(req->peerId, aclStreamId, true); + network->writePeerList(req->peerId, aclStreamId); } else { LogInfoEx(LOG_NET, "PEER %u (%s) sending ACL list updates", req->peerId, peerIdentity.c_str()); - network->writeWhitelistRIDs(req->peerId, false); - network->writeBlacklistRIDs(req->peerId); - network->writeTGIDs(req->peerId, false); - - connection->pktLastSeq(RTP_END_OF_CALL_SEQ - 1U); - network->writeDeactiveTGIDs(req->peerId); + network->writeWhitelistRIDs(req->peerId, aclStreamId, false); + network->writeBlacklistRIDs(req->peerId, aclStreamId); + network->writeTGIDs(req->peerId, aclStreamId, false); + network->writeDeactiveTGIDs(req->peerId, aclStreamId); } } @@ -1638,7 +1632,7 @@ void* FNENetwork::threadedACLUpdate(void* arg) /* Helper to send the list of whitelisted RIDs to the specified peer. */ -void FNENetwork::writeWhitelistRIDs(uint32_t peerId, bool isExternalPeer) +void FNENetwork::writeWhitelistRIDs(uint32_t peerId, uint32_t streamId, bool isExternalPeer) { uint64_t now = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); @@ -1743,7 +1737,7 @@ void FNENetwork::writeWhitelistRIDs(uint32_t peerId, bool isExternalPeer) offs += PEER_LINK_BLOCK_SIZE; writePeer(peerId, { NET_FUNC::PEER_LINK, NET_SUBFUNC::PL_RID_LIST }, - payload, bufSize, 0U, false, true, true); + payload, bufSize, 0U, streamId, false, true, true); } connection->lastPing(now); @@ -1813,7 +1807,7 @@ void FNENetwork::writeWhitelistRIDs(uint32_t peerId, bool isExternalPeer) } writePeerCommand(peerId, { NET_FUNC::MASTER, NET_SUBFUNC::MASTER_SUBFUNC_WL_RID }, - payload, bufSize, true); + payload, bufSize, streamId, true); } connection->lastPing(now); @@ -1822,7 +1816,7 @@ void FNENetwork::writeWhitelistRIDs(uint32_t peerId, bool isExternalPeer) /* Helper to send the list of whitelisted RIDs to the specified peer. */ -void FNENetwork::writeBlacklistRIDs(uint32_t peerId) +void FNENetwork::writeBlacklistRIDs(uint32_t peerId, uint32_t streamId) { uint64_t now = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); @@ -1887,7 +1881,7 @@ void FNENetwork::writeBlacklistRIDs(uint32_t peerId) } writePeerCommand(peerId, { NET_FUNC::MASTER, NET_SUBFUNC::MASTER_SUBFUNC_BL_RID }, - payload, bufSize, true); + payload, bufSize, streamId, true); } connection->lastPing(now); @@ -1896,7 +1890,7 @@ void FNENetwork::writeBlacklistRIDs(uint32_t peerId) /* Helper to send the list of active TGIDs to the specified peer. */ -void FNENetwork::writeTGIDs(uint32_t peerId, bool isExternalPeer) +void FNENetwork::writeTGIDs(uint32_t peerId, uint32_t streamId, bool isExternalPeer) { uint64_t now = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); @@ -2005,7 +1999,7 @@ void FNENetwork::writeTGIDs(uint32_t peerId, bool isExternalPeer) offs += PEER_LINK_BLOCK_SIZE; writePeer(peerId, { NET_FUNC::PEER_LINK, NET_SUBFUNC::PL_TALKGROUP_LIST }, - payload, bufSize, 0U, false, true, true); + payload, bufSize, 0U, streamId, false, true, true); } connection->lastPing(now); @@ -2086,12 +2080,12 @@ void FNENetwork::writeTGIDs(uint32_t peerId, bool isExternalPeer) } writePeerCommand(peerId, { NET_FUNC::MASTER, NET_SUBFUNC::MASTER_SUBFUNC_ACTIVE_TGS }, - payload, 4U + (tgidList.size() * 5U), true); + payload, 4U + (tgidList.size() * 5U), streamId, true); } /* Helper to send the list of deactivated TGIDs to the specified peer. */ -void FNENetwork::writeDeactiveTGIDs(uint32_t peerId) +void FNENetwork::writeDeactiveTGIDs(uint32_t peerId, uint32_t streamId) { if (!m_tidLookup->sendTalkgroups()) { return; @@ -2147,12 +2141,12 @@ void FNENetwork::writeDeactiveTGIDs(uint32_t peerId) } writePeerCommand(peerId, { NET_FUNC::MASTER, NET_SUBFUNC::MASTER_SUBFUNC_DEACTIVE_TGS }, - payload, 4U + (tgidList.size() * 5U), true); + payload, 4U + (tgidList.size() * 5U), streamId, true); } /* Helper to send the list of peers to the specified peer. */ -void FNENetwork::writePeerList(uint32_t peerId) +void FNENetwork::writePeerList(uint32_t peerId, uint32_t streamId) { if (!m_tidLookup->sendTalkgroups()) { return; @@ -2260,7 +2254,7 @@ void FNENetwork::writePeerList(uint32_t peerId) offs += PEER_LINK_BLOCK_SIZE; writePeer(peerId, { NET_FUNC::PEER_LINK, NET_SUBFUNC::PL_PEER_LIST }, - payload, bufSize, 0U, false, true, true); + payload, bufSize, 0U, streamId, false, true, true); } connection->lastPing(now); @@ -2269,22 +2263,26 @@ void FNENetwork::writePeerList(uint32_t peerId) return; } -/* Helper to send a data message to the specified peer. */ +/* Helper to send a data message to the specified peer with a explicit packet sequence. */ bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, - uint32_t length, uint16_t pktSeq, uint32_t streamId, bool queueOnly, bool directWrite) const + uint32_t length, uint16_t pktSeq, uint32_t streamId, bool queueOnly, bool incPktSeq, bool directWrite) const { + if (streamId == 0U) { + LogError(LOG_NET, "PEER %u, trying to send data with a streamId of 0? BUGBUG", peerId); + } + auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; }); if (it != m_peers.end()) { FNEPeerConnection* connection = m_peers.at(peerId); if (connection != nullptr) { - uint32_t peerStreamId = connection->currStreamId(); - if (streamId == 0U) { - streamId = peerStreamId; - } sockaddr_storage addr = connection->socketStorage(); uint32_t addrLen = connection->sockStorageLen(); + if (incPktSeq) { + pktSeq = connection->incStreamPktSeq(streamId, pktSeq); + } + if (directWrite) return m_frameQueue->write(data, length, streamId, peerId, m_peerId, opcode, pktSeq, addr, addrLen); else { @@ -2299,31 +2297,10 @@ bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const return false; } -/* Helper to send a data message to the specified peer. */ - -bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, - uint32_t length, uint32_t streamId, bool queueOnly, bool incPktSeq, bool directWrite) const -{ - auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; }); - if (it != m_peers.end()) { - FNEPeerConnection* connection = m_peers.at(peerId); - if (connection != nullptr) { - if (incPktSeq) { - connection->pktLastSeq(connection->pktLastSeq() + 1); - } - uint16_t pktSeq = connection->pktLastSeq(); - - return writePeer(peerId, opcode, data, length, pktSeq, streamId, queueOnly, directWrite); - } - } - - return false; -} - /* Helper to send a command message to the specified peer. */ bool FNENetwork::writePeerCommand(uint32_t peerId, FrameQueue::OpcodePair opcode, - const uint8_t* data, uint32_t length, bool incPktSeq) const + const uint8_t* data, uint32_t length, uint32_t streamId, bool incPktSeq) const { assert(peerId > 0); @@ -2335,12 +2312,12 @@ bool FNENetwork::writePeerCommand(uint32_t peerId, FrameQueue::OpcodePair opcode } uint32_t len = length + 6U; - return writePeer(peerId, opcode, buffer, len, 0U, false, incPktSeq, true); + return writePeer(peerId, opcode, buffer, len, RTP_END_OF_CALL_SEQ, streamId, false, incPktSeq, true); } /* Helper to send a ACK response to the specified peer. */ -bool FNENetwork::writePeerACK(uint32_t peerId, const uint8_t* data, uint32_t length) +bool FNENetwork::writePeerACK(uint32_t peerId, uint32_t streamId, const uint8_t* data, uint32_t length) { uint8_t buffer[DATA_PACKET_LENGTH]; ::memset(buffer, 0x00U, DATA_PACKET_LENGTH); @@ -2351,7 +2328,8 @@ bool FNENetwork::writePeerACK(uint32_t peerId, const uint8_t* data, uint32_t len ::memcpy(buffer + 6U, data, length); } - return writePeer(peerId, { NET_FUNC::ACK, NET_SUBFUNC::NOP }, buffer, length + 10U, RTP_END_OF_CALL_SEQ, false, true); + return writePeer(peerId, { NET_FUNC::ACK, NET_SUBFUNC::NOP }, buffer, length + 10U, RTP_END_OF_CALL_SEQ, streamId, + false); } /* Helper to log a warning specifying which NAK reason is being sent a peer. */ @@ -2393,7 +2371,7 @@ void FNENetwork::logPeerNAKReason(uint32_t peerId, const char* tag, NET_CONN_NAK /* Helper to send a NAK response to the specified peer. */ -bool FNENetwork::writePeerNAK(uint32_t peerId, const char* tag, NET_CONN_NAK_REASON reason) +bool FNENetwork::writePeerNAK(uint32_t peerId, uint32_t streamId, const char* tag, NET_CONN_NAK_REASON reason) { assert(peerId > 0); assert(tag != nullptr); @@ -2405,7 +2383,8 @@ bool FNENetwork::writePeerNAK(uint32_t peerId, const char* tag, NET_CONN_NAK_REA __SET_UINT16B((uint16_t)reason, buffer, 10U); // Reason logPeerNAKReason(peerId, tag, reason); - return writePeer(peerId, { NET_FUNC::NAK, NET_SUBFUNC::NOP }, buffer, 10U, RTP_END_OF_CALL_SEQ, false, true); + return writePeer(peerId, { NET_FUNC::NAK, NET_SUBFUNC::NOP }, buffer, 10U, RTP_END_OF_CALL_SEQ, streamId, + false); } /* Helper to send a NAK response to the specified peer. */ diff --git a/src/fne/network/FNENetwork.h b/src/fne/network/FNENetwork.h index 8a2721f5..bd8f8630 100644 --- a/src/fne/network/FNENetwork.h +++ b/src/fne/network/FNENetwork.h @@ -4,7 +4,7 @@ * GPLv2 Open Source. Use is subject to license terms. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * Copyright (C) 2023-2024 Bryan Biedenkapp, N2PLL + * Copyright (C) 2023-2025 Bryan Biedenkapp, N2PLL * */ /** @@ -103,7 +103,6 @@ namespace network FNEPeerConnection() : m_id(0U), m_ccPeerId(0U), - m_currStreamId(0U), m_socketStorage(), m_sockStorageLen(0U), m_address(), @@ -119,8 +118,8 @@ namespace network m_isSysView(false), m_isPeerLink(false), m_config(), - m_pktLastSeq(RTP_END_OF_CALL_SEQ), - m_pktNextSeq(1U) + m_streamSeqMutex(), + m_streamSeqNos() { /* stub */ } @@ -133,7 +132,6 @@ namespace network FNEPeerConnection(uint32_t id, sockaddr_storage& socketStorage, uint32_t sockStorageLen) : m_id(id), m_ccPeerId(0U), - m_currStreamId(0U), m_socketStorage(socketStorage), m_sockStorageLen(sockStorageLen), m_address(udp::Socket::address(socketStorage)), @@ -149,8 +147,8 @@ namespace network m_isSysView(false), m_isPeerLink(false), m_config(), - m_pktLastSeq(RTP_END_OF_CALL_SEQ), - m_pktNextSeq(1U) + m_streamSeqMutex(), + m_streamSeqNos() { assert(id > 0U); assert(sockStorageLen > 0U); @@ -158,6 +156,118 @@ namespace network assert(m_port > 0U); } + /** + * @brief Helper to return the current count of mapped RTP streams. + * @returns size_t + */ + size_t streamCount() + { + return m_streamSeqNos.size(); + } + + /** + * @brief Helper to determine if the stream ID has a stored RTP sequence. + * @param streamId Stream ID. + * @returns bool + */ + bool hasStreamPktSeq(uint64_t streamId) + { + bool ret = false; + m_streamSeqMutex.try_lock_for(std::chrono::milliseconds(60)); + + // determine if the stream has a current sequence no and return + { + auto it = m_streamSeqNos.find(streamId); + if (it == m_streamSeqNos.end()) { + ret = false; + } + else { + ret = true; + } + } + + m_streamSeqMutex.unlock(); + + return ret; + } + + /** + * @brief Helper to get the stored RTP sequence for the given stream ID. + * @param streamId Stream ID. + * @returns uint16_t + */ + uint16_t getStreamPktSeq(uint64_t streamId) + { + m_streamSeqMutex.try_lock_for(std::chrono::milliseconds(60)); + + // find the current sequence no and return + uint32_t pktSeq = 0U; + { + auto it = m_streamSeqNos.find(streamId); + if (it == m_streamSeqNos.end()) { + pktSeq = RTP_END_OF_CALL_SEQ; + } else { + pktSeq = m_streamSeqNos[streamId]; + } + } + + m_streamSeqMutex.unlock(); + + return pktSeq; + } + + /** + * @brief Helper to increment the stored RTP sequence for the given stream ID. + * @param streamId Stream ID. + * @param initialSeq Initial sequence number to set. + * @returns uint16_t + */ + uint16_t incStreamPktSeq(uint64_t streamId, uint16_t initialSeq) + { + m_streamSeqMutex.try_lock_for(std::chrono::milliseconds(60)); + + // find the current sequence no, increment and return + uint32_t pktSeq = 0U; + { + auto it = m_streamSeqNos.find(streamId); + if (it == m_streamSeqNos.end()) { + m_streamSeqNos.insert({streamId, initialSeq}); + } else { + pktSeq = m_streamSeqNos[streamId]; + + ++pktSeq; + if (pktSeq > RTP_END_OF_CALL_SEQ) { + pktSeq = 0U; + } + + m_streamSeqNos[streamId] = pktSeq; + } + } + + m_streamSeqMutex.unlock(); + + return pktSeq; + } + /** + * @brief Helper to erase the stored RTP sequence for the given stream ID. + * @param streamId Stream ID. + * @returns uint16_t + */ + void eraseStreamPktSeq(uint64_t streamId) + { + m_streamSeqMutex.try_lock_for(std::chrono::milliseconds(60)); + + // find the sequence no and erase + { + auto entry = m_streamSeqNos.find(streamId); + if (entry != m_streamSeqNos.end()) { + m_streamSeqNos.erase(streamId); + } + } + + m_streamSeqMutex.unlock(); + } + public: /** * @brief Peer ID. @@ -173,11 +283,6 @@ namespace network */ __PROPERTY_PLAIN(uint32_t, ccPeerId); - /** - * @brief Current Stream ID. - */ - __PROPERTY_PLAIN(uint32_t, currStreamId); - /** * @brief Unix socket storage containing the connected address. */ @@ -246,14 +351,9 @@ namespace network */ __PROPERTY_PLAIN(json::object, config); - /** - * @brief Last received RTP sequence. - */ - __PROPERTY_PLAIN(uint16_t, pktLastSeq); - /** - * @brief Calculated next RTP sequence. - */ - __PROPERTY_PLAIN(uint16_t, pktNextSeq); + private: + std::timed_mutex m_streamSeqMutex; + std::unordered_map m_streamSeqNos; }; // --------------------------------------------------------------------------- @@ -526,9 +626,10 @@ namespace network /** * @brief Helper to complete setting up a repeater login request. * @param peerId Peer ID. + * @param streamId Stream ID for the login sequence. * @param connection Instance of the FNEPeerConnection class. */ - void setupRepeaterLogin(uint32_t peerId, FNEPeerConnection* connection); + void setupRepeaterLogin(uint32_t peerId, uint32_t streamId, FNEPeerConnection* connection); /** * @brief Helper to send the ACL lists to the specified peer in a separate thread. @@ -545,33 +646,38 @@ namespace network /** * @brief Helper to send the list of whitelisted RIDs to the specified peer. * @param peerId Peer ID. + * @param streamId Stream ID for this message. * @param sendISSI Flag indicating the RID transfer is to an external peer via ISSI. */ - void writeWhitelistRIDs(uint32_t peerId, bool sendISSI); + void writeWhitelistRIDs(uint32_t peerId, uint32_t streamId, bool sendISSI); /** * @brief Helper to send the list of blacklisted RIDs to the specified peer. * @param peerId Peer ID. + * @param streamId Stream ID for this message. */ - void writeBlacklistRIDs(uint32_t peerId); + void writeBlacklistRIDs(uint32_t peerId, uint32_t streamId); /** * @brief Helper to send the list of active TGIDs to the specified peer. * @param peerId Peer ID. + * @param streamId Stream ID for this message. * @param sendISSI Flag indicating the TGID transfer is to an external peer via ISSI. */ - void writeTGIDs(uint32_t peerId, bool sendISSI); + void writeTGIDs(uint32_t peerId, uint32_t streamId, bool sendISSI); /** * @brief Helper to send the list of deactivated TGIDs to the specified peer. * @param peerId Peer ID. + * @param streamId Stream ID for this message. */ - void writeDeactiveTGIDs(uint32_t peerId); + void writeDeactiveTGIDs(uint32_t peerId, uint32_t streamId); /** * @brief Helper to send the list of peers to the specified peer. * @param peerId Peer ID. + * @param streamId Stream ID for this message. */ - void writePeerList(uint32_t peerId); + void writePeerList(uint32_t peerId, uint32_t streamId); /** - * @brief Helper to send a data message to the specified peer. + * @brief Helper to send a data message to the specified peer with a explicit packet sequence. * @param peerId Peer ID. * @param opcode FNE network opcode pair. * @param[in] data Buffer containing message to send to peer. @@ -579,23 +685,11 @@ namespace network * @param pktSeq RTP packet sequence for this message. * @param streamId Stream ID for this message. * @param queueOnly Flag indicating this message should be queued for transmission. - * @param directWrite Flag indicating this message should be immediately directly written. - */ - bool writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length, - uint16_t pktSeq, uint32_t streamId, bool queueOnly = false, bool directWrite = false) const; - /** - * @brief Helper to send a data message to the specified peer. - * @param peerId Peer ID. - * @param opcode FNE network opcode pair. - * @param[in] data Buffer containing message to send to peer. - * @param length Length of buffer. - * @param streamId Stream ID for this message. - * @param queueOnly Flag indicating this message should be queued for transmission. * @param incPktSeq Flag indicating the message should increment the packet sequence after transmission. * @param directWrite Flag indicating this message should be immediately directly written. */ bool writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length, - uint32_t streamId, bool queueOnly = false, bool incPktSeq = false, bool directWrite = false) const; + uint16_t pktSeq, uint32_t streamId, bool queueOnly, bool incPktSeq = false, bool directWrite = false) const; /** * @brief Helper to send a command message to the specified peer. @@ -603,18 +697,20 @@ namespace network * @param opcode FNE network opcode pair. * @param[in] data Buffer containing message to send to peer. * @param length Length of buffer. + * @param streamId Stream ID for this message. * @param incPktSeq Flag indicating the message should increment the packet sequence after transmission. */ - bool writePeerCommand(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data = nullptr, uint32_t length = 0U, - bool incPktSeq = false) const; + bool writePeerCommand(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length, + uint32_t streamId, bool incPktSeq) const; /** * @brief Helper to send a ACK response to the specified peer. * @param peerId Peer ID. + * @param streamId Stream ID for this message. * @param[in] data Buffer containing response data to send to peer. * @param length Length of buffer. */ - bool writePeerACK(uint32_t peerId, const uint8_t* data = nullptr, uint32_t length = 0U); + bool writePeerACK(uint32_t peerId, uint32_t streamId, const uint8_t* data = nullptr, uint32_t length = 0U); /** * @brief Helper to log a warning specifying which NAK reason is being sent a peer. @@ -626,10 +722,11 @@ namespace network /** * @brief Helper to send a NAK response to the specified peer. * @param peerId Peer ID. + * @param streamId Stream ID for this message. * @param tag Tag. * @param reason NAK reason. */ - bool writePeerNAK(uint32_t peerId, const char* tag, NET_CONN_NAK_REASON reason = NET_CONN_NAK_GENERAL_FAILURE); + bool writePeerNAK(uint32_t peerId, uint32_t streamId, const char* tag, NET_CONN_NAK_REASON reason = NET_CONN_NAK_GENERAL_FAILURE); /** * @brief Helper to send a NAK response to the specified peer. * @param peerId Peer ID. diff --git a/src/fne/network/PeerNetwork.cpp b/src/fne/network/PeerNetwork.cpp index 5054daf5..a991cdc2 100644 --- a/src/fne/network/PeerNetwork.cpp +++ b/src/fne/network/PeerNetwork.cpp @@ -58,6 +58,20 @@ void PeerNetwork::setPeerLookups(lookups::PeerListLookup* pidLookup) m_pidLookup = pidLookup; } +/* Gets the received DMR stream ID. */ + +uint32_t PeerNetwork::getRxDMRStreamId(uint32_t slotNo) const +{ + assert(slotNo == 1U || slotNo == 2U); + + if (slotNo == 1U) { + return m_rxDMRStreamId[0U]; + } + else { + return m_rxDMRStreamId[1U]; + } +} + /* Checks if the passed peer ID is blocked from sending to this peer. */ bool PeerNetwork::checkBlockedPeer(uint32_t peerId) diff --git a/src/fne/network/PeerNetwork.h b/src/fne/network/PeerNetwork.h index 047fedb6..69a64340 100644 --- a/src/fne/network/PeerNetwork.h +++ b/src/fne/network/PeerNetwork.h @@ -63,6 +63,23 @@ namespace network */ void setPeerLookups(lookups::PeerListLookup* pidLookup); + /** + * @brief Gets the received DMR stream ID. + * @param slotNo DMR slot to get stream ID for. + * @return uint32_t Stream ID for the given DMR slot. + */ + uint32_t getRxDMRStreamId(uint32_t slotNo) const; + /** + * @brief Gets the received P25 stream ID. + * @return uint32_t Stream ID. + */ + uint32_t getRxP25StreamId() const { return m_rxP25StreamId; } + /** + * @brief Gets the received NXDN stream ID. + * @return uint32_t Stream ID. + */ + uint32_t getRxNXDNStreamId() const { return m_rxNXDNStreamId; } + /** * @brief Gets the blocked traffic peer ID table. * @returns std::vector List of peer IDs this peer network cannot send traffic to. diff --git a/src/fne/network/callhandler/TagDMRData.cpp b/src/fne/network/callhandler/TagDMRData.cpp index e926bb10..2fff63f5 100644 --- a/src/fne/network/callhandler/TagDMRData.cpp +++ b/src/fne/network/callhandler/TagDMRData.cpp @@ -982,7 +982,7 @@ void TagDMRData::write_CSBK(uint32_t peerId, uint8_t slot, lc::CSBK* csbk) } if (peerId > 0U) { - m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_DMR }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, streamId, false, true); + m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_DMR }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, streamId, false); } else { // repeat traffic to the connected peers if (m_network->m_peers.size() > 0U) { diff --git a/src/fne/network/callhandler/TagNXDNData.cpp b/src/fne/network/callhandler/TagNXDNData.cpp index 996b0d18..368112a3 100644 --- a/src/fne/network/callhandler/TagNXDNData.cpp +++ b/src/fne/network/callhandler/TagNXDNData.cpp @@ -759,5 +759,5 @@ void TagNXDNData::write_Message(uint32_t peerId, lc::RCCH* rcch) } uint32_t streamId = m_network->createStreamId(); - m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_NXDN }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, streamId, false, true); + m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_NXDN }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, streamId, false); } diff --git a/src/fne/network/callhandler/TagP25Data.cpp b/src/fne/network/callhandler/TagP25Data.cpp index 9f33d9d3..eaf9f61c 100644 --- a/src/fne/network/callhandler/TagP25Data.cpp +++ b/src/fne/network/callhandler/TagP25Data.cpp @@ -494,12 +494,14 @@ void TagP25Data::playbackParrot() if (message != nullptr) { if (m_network->m_parrotOnlyOriginating) { LogMessage(LOG_NET, "P25, Parrot Grant Demand, peer = %u, srcId = %u, dstId = %u", pkt.peerId, srcId, dstId); - m_network->writePeer(pkt.peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, 0U, false); + m_network->writePeer(pkt.peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, + RTP_END_OF_CALL_SEQ, m_network->createStreamId(), false); } else { // repeat traffic to the connected peers for (auto peer : m_network->m_peers) { LogMessage(LOG_NET, "P25, Parrot Grant Demand, peer = %u, srcId = %u, dstId = %u", peer.first, srcId, dstId); - m_network->writePeer(peer.first, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, 0U, false); + m_network->writePeer(peer.first, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, + RTP_END_OF_CALL_SEQ, m_network->createStreamId(), false); } } } @@ -1349,7 +1351,8 @@ void TagP25Data::write_TSDU(uint32_t peerId, lc::TSBK* tsbk) uint32_t streamId = m_network->createStreamId(); if (peerId > 0U) { - m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, streamId, false, true); + m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, + RTP_END_OF_CALL_SEQ, streamId, false); } else { // repeat traffic to the connected peers if (m_network->m_peers.size() > 0U) { @@ -1360,7 +1363,8 @@ void TagP25Data::write_TSDU(uint32_t peerId, lc::TSBK* tsbk) m_network->m_frameQueue->flushQueue(); } - m_network->writePeer(peer.first, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, streamId, true); + m_network->writePeer(peer.first, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, + RTP_END_OF_CALL_SEQ, streamId, true); if (m_network->m_debug) { LogDebug(LOG_NET, "P25, peer = %u, len = %u, streamId = %u", peer.first, messageLength, streamId); diff --git a/src/fne/network/callhandler/packetdata/DMRPacketData.cpp b/src/fne/network/callhandler/packetdata/DMRPacketData.cpp index 528eca05..8a351cc0 100644 --- a/src/fne/network/callhandler/packetdata/DMRPacketData.cpp +++ b/src/fne/network/callhandler/packetdata/DMRPacketData.cpp @@ -4,7 +4,7 @@ * GPLv2 Open Source. Use is subject to license terms. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * Copyright (C) 2024 Bryan Biedenkapp, N2PLL + * Copyright (C) 2024-2025 Bryan Biedenkapp, N2PLL * */ #include "fne/Defines.h" diff --git a/src/fne/network/callhandler/packetdata/P25PacketData.cpp b/src/fne/network/callhandler/packetdata/P25PacketData.cpp index 6a0b860b..23f69e74 100644 --- a/src/fne/network/callhandler/packetdata/P25PacketData.cpp +++ b/src/fne/network/callhandler/packetdata/P25PacketData.cpp @@ -4,7 +4,7 @@ * GPLv2 Open Source. Use is subject to license terms. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * Copyright (C) 2024 Bryan Biedenkapp, N2PLL + * Copyright (C) 2024-2025 Bryan Biedenkapp, N2PLL * */ #include "fne/Defines.h" @@ -1045,7 +1045,7 @@ bool P25PacketData::writeNetwork(uint32_t peerId, network::PeerNetwork* peerNet, if (peerNet != nullptr) { return peerNet->writeMaster({ NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, pktSeq, streamId); } else { - return m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, pktSeq, streamId, false, true); + return m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, pktSeq, streamId, false); } } diff --git a/src/host/HostMain.cpp b/src/host/HostMain.cpp index f5bd93fa..d3945669 100644 --- a/src/host/HostMain.cpp +++ b/src/host/HostMain.cpp @@ -36,6 +36,10 @@ using namespace lookups; // Global Variables // --------------------------------------------------------------------------- +#ifndef SIGHUP +#define SIGHUP 1 +#endif + int g_signal = 0; bool g_calibrate = false; bool g_setup = false; @@ -302,15 +306,15 @@ int main(int argc, char** argv) delete host; } - if (g_signal == 2) + if (g_signal == SIGINT) ::LogInfoEx(LOG_HOST, "[STOP] dvmhost:main SIGINT"); - if (g_signal == 15) + if (g_signal == SIGTERM) ::LogInfoEx(LOG_HOST, "[STOP] dvmhost:main SIGTERM"); - if (g_signal == 1) + if (g_signal == SIGHUP) ::LogInfoEx(LOG_HOST, "[RSTR] dvmhost:main SIGHUP"); - } while (g_signal == 1); + } while (g_signal == SIGHUP); ::LogFinalise(); ::ActivityLogFinalise(); diff --git a/src/host/network/Network.cpp b/src/host/network/Network.cpp index 2a79bbb3..cba5624f 100644 --- a/src/host/network/Network.cpp +++ b/src/host/network/Network.cpp @@ -5,7 +5,7 @@ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * Copyright (C) 2015,2016,2017 Jonathan Naylor, G4KLX - * Copyright (C) 2017-2024 Bryan Biedenkapp, N2PLL + * Copyright (C) 2017-2025 Bryan Biedenkapp, N2PLL * */ #include "Defines.h" @@ -263,20 +263,47 @@ void Network::clock(uint32_t ms) if (fneHeader.getSubFunction() == NET_SUBFUNC::PROTOCOL_SUBFUNC_DMR) { // Encapsulated DMR data frame if (m_enabled && m_dmrEnabled) { uint32_t slotNo = (buffer[15U] & 0x80U) == 0x80U ? 2U : 1U; - if (m_rxDMRStreamId[slotNo] == 0U) { + + if (m_debug) { + LogDebug(LOG_NET, "DMR Slot %u, peer = %u, len = %u, pktSeq = %u, streamId = %u", + slotNo, peerId, length, rtpHeader.getSequence(), streamId); + } + + if (m_promiscuousPeer) { m_rxDMRStreamId[slotNo] = streamId; m_pktLastSeq = m_pktSeq; } else { - if (m_rxDMRStreamId[slotNo] == streamId) { - if (m_pktSeq != 0U && m_pktLastSeq != 0U) { - if (m_pktSeq >= 1U && ((m_pktSeq != m_pktLastSeq + 1) && (m_pktSeq - 1 != m_pktLastSeq + 1))) { - LogWarning(LOG_NET, "DMR Stream %u out-of-sequence; %u != %u", streamId, m_pktSeq, m_pktLastSeq + 1); - } + if (m_rxDMRStreamId[slotNo] == 0U) { + if (rtpHeader.getSequence() == RTP_END_OF_CALL_SEQ) { + m_rxDMRStreamId[slotNo] = 0U; } - + else { + m_rxDMRStreamId[slotNo] = streamId; + } + m_pktLastSeq = m_pktSeq; } + else { + if (m_rxDMRStreamId[slotNo] == streamId) { + if (m_pktSeq != 0U && m_pktLastSeq != 0U) { + if (m_pktSeq >= 1U && ((m_pktSeq != m_pktLastSeq + 1) && (m_pktSeq - 1 != m_pktLastSeq + 1))) { + LogWarning(LOG_NET, "DMR Stream %u out-of-sequence; %u != %u", streamId, m_pktSeq, m_pktLastSeq + 1); + } + } + + m_pktLastSeq = m_pktSeq; + } + + if (m_rxDMRStreamId[slotNo] != streamId && (rtpHeader.getSequence() != RTP_END_OF_CALL_SEQ)) { + //LogDebug(LOG_NET, "DMR Incorrect Stream; %u != %u", streamId, m_rxDMRStreamId[slotNo]); + break; + } + + if (rtpHeader.getSequence() == RTP_END_OF_CALL_SEQ) { + m_rxDMRStreamId[slotNo] = 0U; + } + } } if (m_debug) @@ -291,20 +318,46 @@ void Network::clock(uint32_t ms) } else if (fneHeader.getSubFunction() == NET_SUBFUNC::PROTOCOL_SUBFUNC_P25) { // Encapsulated P25 data frame if (m_enabled && m_p25Enabled) { - if (m_rxP25StreamId == 0U) { + if (m_debug) { + LogDebug(LOG_NET, "P25, peer = %u, len = %u, pktSeq = %u, streamId = %u", + peerId, length, rtpHeader.getSequence(), streamId); + } + + if (m_promiscuousPeer) { m_rxP25StreamId = streamId; m_pktLastSeq = m_pktSeq; } else { - if (m_rxP25StreamId == streamId) { - if (m_pktSeq != 0U && m_pktLastSeq != 0U) { - if (m_pktSeq >= 1U && ((m_pktSeq != m_pktLastSeq + 1) && (m_pktSeq - 1 != m_pktLastSeq + 1))) { - LogWarning(LOG_NET, "P25 Stream %u out-of-sequence; %u != %u", streamId, m_pktSeq, m_pktLastSeq + 1); - } + if (m_rxP25StreamId == 0U) { + if (rtpHeader.getSequence() == RTP_END_OF_CALL_SEQ) { + m_rxP25StreamId = 0U; } - + else { + m_rxP25StreamId = streamId; + } + m_pktLastSeq = m_pktSeq; } + else { + if (m_rxP25StreamId == streamId) { + if (m_pktSeq != 0U && m_pktLastSeq != 0U) { + if (m_pktSeq >= 1U && ((m_pktSeq != m_pktLastSeq + 1) && (m_pktSeq - 1 != m_pktLastSeq + 1))) { + LogWarning(LOG_NET, "P25 Stream %u out-of-sequence; %u != %u", streamId, m_pktSeq, m_pktLastSeq + 1); + } + } + + m_pktLastSeq = m_pktSeq; + } + + if (m_rxP25StreamId != streamId && (rtpHeader.getSequence() != RTP_END_OF_CALL_SEQ)) { + //LogDebug(LOG_NET, "P25 Incorrect Stream; %u != %u", streamId, m_rxP25StreamId); + break; + } + + if (rtpHeader.getSequence() == RTP_END_OF_CALL_SEQ) { + m_rxP25StreamId = 0U; + } + } } if (m_debug) @@ -319,20 +372,46 @@ void Network::clock(uint32_t ms) } else if (fneHeader.getSubFunction() == NET_SUBFUNC::PROTOCOL_SUBFUNC_NXDN) { // Encapsulated NXDN data frame if (m_enabled && m_nxdnEnabled) { - if (m_rxNXDNStreamId == 0U) { + if (m_debug) { + LogDebug(LOG_NET, "NXDN, peer = %u, len = %u, pktSeq = %u, streamId = %u", + peerId, length, rtpHeader.getSequence(), streamId); + } + + if (m_promiscuousPeer) { m_rxNXDNStreamId = streamId; m_pktLastSeq = m_pktSeq; } else { - if (m_rxNXDNStreamId == streamId) { - if (m_pktSeq != 0U && m_pktLastSeq != 0U) { - if (m_pktSeq >= 1U && ((m_pktSeq != m_pktLastSeq + 1) && (m_pktSeq - 1 != m_pktLastSeq + 1))) { - LogWarning(LOG_NET, "NXDN Stream %u out-of-sequence; %u != %u", streamId, m_pktSeq, m_pktLastSeq + 1); - } + if (m_rxNXDNStreamId == 0U) { + if (rtpHeader.getSequence() == RTP_END_OF_CALL_SEQ) { + m_rxNXDNStreamId = 0U; } - + else { + m_rxNXDNStreamId = streamId; + } + m_pktLastSeq = m_pktSeq; } + else { + if (m_rxNXDNStreamId == streamId) { + if (m_pktSeq != 0U && m_pktLastSeq != 0U) { + if (m_pktSeq >= 1U && ((m_pktSeq != m_pktLastSeq + 1) && (m_pktSeq - 1 != m_pktLastSeq + 1))) { + LogWarning(LOG_NET, "NXDN Stream %u out-of-sequence; %u != %u", streamId, m_pktSeq, m_pktLastSeq + 1); + } + } + + m_pktLastSeq = m_pktSeq; + } + + if (m_rxNXDNStreamId != streamId && (rtpHeader.getSequence() != RTP_END_OF_CALL_SEQ)) { + //LogDebug(LOG_NET, "NXDN Incorrect Stream; %u != %u", streamId, m_rxNXDNStreamId); + break; + } + + if (rtpHeader.getSequence() == RTP_END_OF_CALL_SEQ) { + m_rxNXDNStreamId = 0U; + } + } } if (m_debug)