From 870c64320015ab2a20b2dbb6c9057a36615e5a48 Mon Sep 17 00:00:00 2001 From: Bryan Biedenkapp Date: Thu, 8 Jan 2026 21:34:50 -0500 Subject: [PATCH] very experimental fix for #111, along with additional enhancements to repeat parrot traffic to all VCs of a trunk site to ensure parrot traffic repeats; --- src/fne/network/FNENetwork.h | 2 +- src/fne/network/callhandler/TagP25Data.cpp | 60 ++++++++++++++++++++-- 2 files changed, 56 insertions(+), 6 deletions(-) diff --git a/src/fne/network/FNENetwork.h b/src/fne/network/FNENetwork.h index c78c2017..e2ea4274 100644 --- a/src/fne/network/FNENetwork.h +++ b/src/fne/network/FNENetwork.h @@ -340,7 +340,7 @@ namespace network concurrent::unordered_map m_peerReplicaPeers; typedef std::pair PeerAffiliationMapPair; concurrent::unordered_map m_peerAffiliations; - concurrent::unordered_map> m_ccPeerMap; + concurrent::shared_unordered_map> m_ccPeerMap; static std::timed_mutex s_keyQueueMutex; std::unordered_map m_peerReplicaKeyQueue; diff --git a/src/fne/network/callhandler/TagP25Data.cpp b/src/fne/network/callhandler/TagP25Data.cpp index 64f97c76..41bd7a36 100644 --- a/src/fne/network/callhandler/TagP25Data.cpp +++ b/src/fne/network/callhandler/TagP25Data.cpp @@ -755,6 +755,7 @@ bool TagP25Data::processGrantReq(uint32_t srcId, uint32_t dstId, bool unitToUnit } // repeat traffic to the connected peers + m_network->m_peers.shared_lock(); if (m_network->m_peers.size() > 0U) { for (auto peer : m_network->m_peers) { if (peerId != peer.first) { @@ -762,6 +763,7 @@ bool TagP25Data::processGrantReq(uint32_t srcId, uint32_t dstId, bool unitToUnit } } } + m_network->m_peers.shared_unlock(); return true; } @@ -820,8 +822,19 @@ void TagP25Data::playbackParrot() UInt8Array message = m_network->createP25_TDUMessage(messageLength, control, lsd, controlByte); if (message != nullptr) { if (m_network->m_parrotOnlyOriginating) { + uint32_t targetPeerId = pkt.peerId; + + // check if this peer is a VC peer (ccPeerId non-zero), and if so + // redirect the grant demand to its CC peer + m_network->m_peers.shared_lock(); + auto it = m_network->m_peers.find(pkt.peerId); + if (it != m_network->m_peers.end() && it->second->ccPeerId() > 0) { + targetPeerId = it->second->ccPeerId(); + } + m_network->m_peers.shared_unlock(); + LogInfoEx(LOG_P25, "Parrot Grant Demand, peer = %u, srcId = %u, dstId = %u", pkt.peerId, srcId, dstId); - m_network->writePeer(pkt.peerId, pkt.peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, + m_network->writePeer(targetPeerId, pkt.peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, m_network->createStreamId()); } else { // repeat traffic to the connected peers @@ -842,10 +855,47 @@ void TagP25Data::playbackParrot() m_lastParrotDstId = pkt.dstId; if (m_network->m_parrotOnlyOriginating) { - m_network->writePeer(pkt.peerId, pkt.peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, pkt.buffer, pkt.bufferLen, pkt.pktSeq, pkt.streamId); - if (m_network->m_debug) { - LogDebugEx(LOG_P25, "TagP25Data::playbackParrot()", "Parrot, dstPeer = %u, len = %u, pktSeq = %u, streamId = %u", - pkt.peerId, pkt.bufferLen, pkt.pktSeq, pkt.streamId); + uint32_t ccPeerId = 0U; + + // check if this peer is a VC peer (ccPeerId non-zero), and if so + // determine its CC peer ID + m_network->m_peers.shared_lock(); + auto it = m_network->m_peers.find(pkt.peerId); + if (it != m_network->m_peers.end() && it->second->ccPeerId() > 0) { + ccPeerId = it->second->ccPeerId(); + } + m_network->m_peers.shared_unlock(); + + // if we have a CC peer ID repeat the parrot traffic to all its connected VC peers + if (ccPeerId > 0U && m_network->m_ccPeerMap.find(ccPeerId) != m_network->m_ccPeerMap.end()) { + m_network->m_ccPeerMap.shared_lock(); + + // repeat traffic to the connected VC peers + uint32_t i = 0U; + udp::BufferQueue queue = udp::BufferQueue(); + + for (auto peer : m_network->m_ccPeerMap[ccPeerId]) { + // every MAX_QUEUED_PEER_MSGS peers flush the queue + if (i % MAX_QUEUED_PEER_MSGS == 0U) { + m_network->m_frameQueue->flushQueue(&queue); + } + + m_network->writePeerQueue(&queue, peer, pkt.peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, pkt.buffer, pkt.bufferLen, pkt.pktSeq, pkt.streamId); + if (m_network->m_debug) { + LogDebug(LOG_P25, "TagP25Data::playbackParrot()", "Parrot, dstPeer = %u, len = %u, pktSeq = %u, streamId = %u", + peer, pkt.bufferLen, pkt.pktSeq, pkt.streamId); + } + + i++; + } + m_network->m_frameQueue->flushQueue(&queue); + m_network->m_ccPeerMap.shared_unlock(); + } else { + m_network->writePeer(pkt.peerId, pkt.peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, pkt.buffer, pkt.bufferLen, pkt.pktSeq, pkt.streamId); + if (m_network->m_debug) { + LogDebugEx(LOG_P25, "TagP25Data::playbackParrot()", "Parrot, dstPeer = %u, len = %u, pktSeq = %u, streamId = %u", + pkt.peerId, pkt.bufferLen, pkt.pktSeq, pkt.streamId); + } } } else { // repeat traffic to the connected peers