From f40e6c6af0d02a1469c0899624ade61a5b399921 Mon Sep 17 00:00:00 2001 From: Bryan Biedenkapp Date: Tue, 22 Oct 2024 21:37:48 -0400 Subject: [PATCH] Merge Peer Link Enhancement Changes (#73) * add support for Peer-Link CFNEs to pass activity and peer status messages to the prime/master CFNE; add support for Peer-Link CFNEs to transmit their currently active peer list to the prime/master CFNE; add support on the master/prime CFNE to support reporting Peer-Link peer IDs in the /peer/query REST API request; add support to pass affiliation updates from Peer-Link CFNEs to the prime/master CFNE (Note: this passing does not preserve the original peer ID the affiliation came from, from the prime/master perspective, currently, the affiliation will appear as if it came from the Peer-Link CFNE); correct a bug with buffer overflow for Peer-Link configuration transfers; * hide debug messages; * remove debug print; --- src/common/network/BaseNetwork.cpp | 9 +- src/common/network/BaseNetwork.h | 4 +- src/common/network/RTPFNEHeader.h | 4 +- src/fne/HostFNE.h | 1 + src/fne/network/DiagNetwork.cpp | 112 ++++++++++++++++-- src/fne/network/FNENetwork.cpp | 177 ++++++++++++++++++++++++++--- src/fne/network/FNENetwork.h | 16 +++ src/fne/network/PeerNetwork.cpp | 57 +++++++++- src/fne/network/PeerNetwork.h | 14 +++ src/fne/network/RESTAPI.cpp | 48 +++----- src/sysview/NodeStatusWnd.h | 13 ++- src/sysview/PeerListWnd.h | 14 ++- 12 files changed, 393 insertions(+), 76 deletions(-) diff --git a/src/common/network/BaseNetwork.cpp b/src/common/network/BaseNetwork.cpp index d0ccf1af..f5235b39 100644 --- a/src/common/network/BaseNetwork.cpp +++ b/src/common/network/BaseNetwork.cpp @@ -335,8 +335,11 @@ uint32_t BaseNetwork::getDMRStreamId(uint32_t slotNo) const /* Helper to send a data message to the master. */ bool BaseNetwork::writeMaster(FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length, uint16_t pktSeq, uint32_t streamId, - bool queueOnly, bool useAlternatePort) + bool queueOnly, bool useAlternatePort, uint32_t peerId) { + if (peerId == 0U) + peerId = m_peerId; + if (useAlternatePort) { sockaddr_storage addr; uint32_t addrLen; @@ -346,14 +349,14 @@ bool BaseNetwork::writeMaster(FrameQueue::OpcodePair opcode, const uint8_t* data if (udp::Socket::lookup(address, port, addr, addrLen) == 0) { if (!queueOnly) - return m_frameQueue->write(data, length, streamId, m_peerId, m_peerId, opcode, pktSeq, addr, addrLen); + return m_frameQueue->write(data, length, streamId, peerId, m_peerId, opcode, pktSeq, addr, addrLen); else m_frameQueue->enqueueMessage(data, length, streamId, m_peerId, opcode, pktSeq, addr, addrLen); } } else { if (!queueOnly) - return m_frameQueue->write(data, length, streamId, m_peerId, m_peerId, opcode, pktSeq, m_addr, m_addrLen); + return m_frameQueue->write(data, length, streamId, peerId, m_peerId, opcode, pktSeq, m_addr, m_addrLen); else m_frameQueue->enqueueMessage(data, length, streamId, m_peerId, opcode, pktSeq, m_addr, m_addrLen); } diff --git a/src/common/network/BaseNetwork.h b/src/common/network/BaseNetwork.h index a8b9178c..c3a6b26e 100644 --- a/src/common/network/BaseNetwork.h +++ b/src/common/network/BaseNetwork.h @@ -66,6 +66,7 @@ #define TAG_TRANSFER_STATUS "TRNSSTS" #define TAG_ANNOUNCE "ANNC" +#define TAG_PEER_LINK "PRLNK" namespace network { @@ -311,10 +312,11 @@ namespace network * @param streamId Stream ID. * @param queueOnly Flag indicating this message should be queued instead of send immediately. * @param useAlternatePort Flag indicating the message shuold be sent using the alternate port (mainly for activity and diagnostics). + * @param peerId If non-zero, overrides the peer ID sent in the packet to the master. * @returns bool True, if message was sent, otherwise false. */ bool writeMaster(FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length, - uint16_t pktSeq, uint32_t streamId, bool queueOnly = false, bool useAlternatePort = false); + uint16_t pktSeq, uint32_t streamId, bool queueOnly = false, bool useAlternatePort = false, uint32_t peerId = 0U); // Digital Mobile Radio /** diff --git a/src/common/network/RTPFNEHeader.h b/src/common/network/RTPFNEHeader.h index 4fd49d8a..0892b689 100644 --- a/src/common/network/RTPFNEHeader.h +++ b/src/common/network/RTPFNEHeader.h @@ -105,7 +105,9 @@ namespace network PL_TALKGROUP_LIST = 0x00U, //! FNE Peer-Link Talkgroup Transfer PL_RID_LIST = 0x01U, //! FNE Peer-Link Radio ID Transfer - PL_PEER_LIST = 0x02U //! FNE Peer-Link Peer List Transfer + PL_PEER_LIST = 0x02U, //! FNE Peer-Link Peer List Transfer + + PL_ACT_PEER_LIST = 0xA2U, //! FNE Peer-Link Active Peer List Transfer }; }; diff --git a/src/fne/HostFNE.h b/src/fne/HostFNE.h index 919c99c8..e86ea372 100644 --- a/src/fne/HostFNE.h +++ b/src/fne/HostFNE.h @@ -81,6 +81,7 @@ private: yaml::Node m_conf; friend class network::FNENetwork; + friend class network::DiagNetwork; friend class network::callhandler::TagDMRData; friend class network::callhandler::packetdata::DMRPacketData; friend class network::callhandler::TagP25Data; diff --git a/src/fne/network/DiagNetwork.cpp b/src/fne/network/DiagNetwork.cpp index 5dac7e02..c623e41d 100644 --- a/src/fne/network/DiagNetwork.cpp +++ b/src/fne/network/DiagNetwork.cpp @@ -8,6 +8,7 @@ * */ #include "fne/Defines.h" +#include "common/zlib/zlib.h" #include "common/Log.h" #include "common/Utils.h" #include "network/DiagNetwork.h" @@ -200,10 +201,31 @@ void* DiagNetwork::threadedNetworkRx(void* arg) switch (req->fneHeader.getFunction()) { case NET_FUNC::TRANSFER: { + // resolve peer ID (used for Activity Log and Status Transfer) + bool validPeerId = false; + uint32_t pktPeerId = 0U; + if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { + validPeerId = true; + pktPeerId = peerId; + } else { + if (peerId > 0) { + // this could be a peer-link transfer -- in which case, we need to check the SSRC of the packet not the peer ID + if (network->m_peers.find(req->rtpHeader.getSSRC()) != network->m_peers.end()) { + FNEPeerConnection* connection = network->m_peers[req->rtpHeader.getSSRC()]; + if (connection != nullptr) { + if (connection->isExternalPeer() && connection->isPeerLink()) { + validPeerId = true; + pktPeerId = req->rtpHeader.getSSRC(); + } + } + } + } + } + if (req->fneHeader.getSubFunction() == NET_SUBFUNC::TRANSFER_SUBFUNC_ACTIVITY) { // Peer Activity Log Transfer if (network->m_allowActivityTransfer) { - if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { - FNEPeerConnection* connection = network->m_peers[peerId]; + if (pktPeerId > 0 && validPeerId) { + FNEPeerConnection* connection = network->m_peers[pktPeerId]; if (connection != nullptr) { std::string ip = udp::Socket::address(req->address); @@ -215,20 +237,20 @@ void* DiagNetwork::threadedNetworkRx(void* arg) ::memcpy(rawPayload, req->buffer + 11U, req->length - 11U); std::string payload(rawPayload, rawPayload + (req->length - 11U)); - ::ActivityLog("%.9u (%8s) %s", peerId, connection->identity().c_str(), payload.c_str()); + ::ActivityLog("%.9u (%8s) %s", pktPeerId, connection->identity().c_str(), payload.c_str()); // report activity log to InfluxDB if (network->m_enableInfluxDB) { influxdb::QueryBuilder() .meas("activity") - .tag("peerId", std::to_string(peerId)) + .tag("peerId", std::to_string(pktPeerId)) .field("identity", connection->identity()) .field("msg", payload) .timestamp(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count()) .request(network->m_influxServer); } - // repeat traffic to the connected peers + // repeat traffic to the connected SysView peers if (network->m_peers.size() > 0U) { for (auto peer : network->m_peers) { if (peer.second != nullptr) { @@ -240,7 +262,7 @@ void* DiagNetwork::threadedNetworkRx(void* arg) sockaddr_storage addr = peer.second->socketStorage(); uint32_t addrLen = peer.second->sockStorageLen(); - network->m_frameQueue->write(req->buffer, req->length, streamId, peerId, network->m_peerId, + network->m_frameQueue->write(req->buffer, req->length, streamId, pktPeerId, network->m_peerId, { NET_FUNC::TRANSFER, NET_SUBFUNC::TRANSFER_SUBFUNC_ACTIVITY }, RTP_END_OF_CALL_SEQ, addr, addrLen); } } else { @@ -248,9 +270,21 @@ void* DiagNetwork::threadedNetworkRx(void* arg) } } } + + // attempt to repeat traffic to Peer-Link masters + if (network->m_host->m_peerNetworks.size() > 0) { + for (auto peer : network->m_host->m_peerNetworks) { + if (peer.second != nullptr) { + if (peer.second->isEnabled() && peer.second->isPeerLink()) { + peer.second->writeMaster({ NET_FUNC::TRANSFER, NET_SUBFUNC::TRANSFER_SUBFUNC_ACTIVITY }, + req->buffer, req->length, RTP_END_OF_CALL_SEQ, streamId, false, true, pktPeerId); + } + } + } + } } else { - network->writePeerNAK(peerId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED); + network->writePeerNAK(pktPeerId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } @@ -295,14 +329,15 @@ void* DiagNetwork::threadedNetworkRx(void* arg) } } else if (req->fneHeader.getSubFunction() == NET_SUBFUNC::TRANSFER_SUBFUNC_STATUS) { // Peer Status Transfer - if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { - FNEPeerConnection* connection = network->m_peers[peerId]; + if (pktPeerId > 0 && validPeerId) { + FNEPeerConnection* connection = network->m_peers[pktPeerId]; if (connection != nullptr) { std::string ip = udp::Socket::address(req->address); // validate peer (simple validation really) if (connection->connected() && connection->address() == ip) { if (network->m_peers.size() > 0U) { + // attempt to repeat status traffic to SysView clients for (auto peer : network->m_peers) { if (peer.second != nullptr) { if (peer.second->isSysView()) { @@ -315,19 +350,31 @@ void* DiagNetwork::threadedNetworkRx(void* arg) if (network->m_debug) { LogDebug(LOG_NET, "SysView, srcPeer = %u, dstPeer = %u, peer status message, len = %u", - peerId, peer.first, req->length); + pktPeerId, peer.first, req->length); } - network->m_frameQueue->write(req->buffer, req->length, streamId, peerId, network->m_peerId, + network->m_frameQueue->write(req->buffer, req->length, streamId, pktPeerId, network->m_peerId, { NET_FUNC::TRANSFER, NET_SUBFUNC::TRANSFER_SUBFUNC_STATUS }, RTP_END_OF_CALL_SEQ, addr, addrLen); } } else { continue; } } + + // attempt to repeat status traffic to Peer-Link masters + if (network->m_host->m_peerNetworks.size() > 0) { + for (auto peer : network->m_host->m_peerNetworks) { + if (peer.second != nullptr) { + if (peer.second->isEnabled() && peer.second->isPeerLink()) { + peer.second->writeMaster({ NET_FUNC::TRANSFER, NET_SUBFUNC::TRANSFER_SUBFUNC_STATUS }, + req->buffer, req->length, RTP_END_OF_CALL_SEQ, streamId, false, true, pktPeerId); + } + } + } + } } } else { - network->writePeerNAK(peerId, TAG_TRANSFER_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED); + network->writePeerNAK(pktPeerId, TAG_TRANSFER_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } @@ -339,6 +386,47 @@ void* DiagNetwork::threadedNetworkRx(void* arg) } break; + case NET_FUNC::PEER_LINK: + if (req->fneHeader.getSubFunction() == NET_SUBFUNC::PL_ACT_PEER_LIST) { // Peer-Link Active Peer List + if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { + FNEPeerConnection* connection = network->m_peers[peerId]; + if (connection != nullptr) { + std::string ip = udp::Socket::address(req->address); + + // validate peer (simple validation really) + if (connection->connected() && connection->address() == ip && connection->isExternalPeer() && + connection->isPeerLink()) { + UInt8Array __rawPayload = std::make_unique(req->length - 8U); + uint8_t* rawPayload = __rawPayload.get(); + ::memset(rawPayload, 0x00U, req->length - 8U); + ::memcpy(rawPayload, req->buffer + 8U, req->length - 8U); + std::string payload(rawPayload, rawPayload + (req->length - 8U)); + + // parse JSON body + json::value v; + std::string err = json::parse(v, payload); + if (!err.empty()) { + break; + } + else { + // ensure parsed JSON is an array + if (!v.is()) { + break; + } + else { + json::array arr = v.get(); + network->m_peerLinkPeers[peerId] = arr; + } + } + } + else { + network->writePeerNAK(peerId, TAG_PEER_LINK, NET_CONN_NAK_FNE_UNAUTHORIZED); + } + } + } + } + break; + default: // diagostic network ignores unknowns for everything else... break; diff --git a/src/fne/network/FNENetwork.cpp b/src/fne/network/FNENetwork.cpp index 94ee5432..8c2ff3ca 100644 --- a/src/fne/network/FNENetwork.cpp +++ b/src/fne/network/FNENetwork.cpp @@ -72,6 +72,7 @@ FNENetwork::FNENetwork(HostFNE* host, const std::string& address, uint16_t port, m_peerListLookup(nullptr), m_status(NET_STAT_INVALID), m_peers(), + m_peerLinkPeers(), m_peerAffiliations(), m_ccPeerMap(), m_maintainenceTimer(1000U, pingTime), @@ -304,6 +305,31 @@ void FNENetwork::clock(uint32_t ms) m_frameQueue->clearTimestamps(); } + // send active peer list to Peer-Link masters + if (m_host->m_peerNetworks.size() > 0) { + for (auto peer : m_host->m_peerNetworks) { + if (peer.second != nullptr) { + if (peer.second->isEnabled() && peer.second->isPeerLink()) { + if (m_peers.size() > 0) { + json::array peers = json::array(); + for (auto entry : m_peers) { + uint32_t peerId = entry.first; + network::FNEPeerConnection* peerConn = entry.second; + if (peerConn != nullptr) { + json::object peerObj = fneConnObject(peerId, peerConn); + uint32_t peerNetPeerId = peer.second->getPeerId(); + peerObj["parentPeerId"].set(peerNetPeerId); + peers.push_back(json::value(peerObj)); + } + } + + peer.second->writePeerLinkPeers(&peers); + } + } + } + } + } + m_maintainenceTimer.start(); } @@ -786,6 +812,21 @@ void* FNENetwork::threadedNetworkRx(void* arg) connection->isExternalPeer(external); if (external) LogInfoEx(LOG_NET, "PEER %u reports external peer", peerId); + + // check if the peer is participating in peer link + lookups::PeerId peerEntry = network->m_peerListLookup->find(req->peerId); + if (!peerEntry.peerDefault()) { + if (peerEntry.peerLink()) { + if (network->m_host->m_useAlternatePortForDiagnostics) { + connection->isPeerLink(true); + if (external) + LogInfoEx(LOG_NET, "PEER %u configured for Peer-Link", peerId); + } else { + LogError(LOG_NET, "PEER %u, Peer-Link operations *require* the alternate diagnostics port option to be enabled.", peerId); + LogError(LOG_NET, "PEER %u, will not receive Peer-Link ACL updates.", peerId); + } + } + } } // is the peer reporting it is a conventional peer? @@ -1080,6 +1121,18 @@ void* FNENetwork::threadedNetworkRx(void* arg) uint32_t dstId = __GET_UINT16(req->buffer, 3U); // Destination Address aff->groupUnaff(srcId); aff->groupAff(srcId, dstId); + + // attempt to repeat traffic to Peer-Link masters + if (network->m_host->m_peerNetworks.size() > 0) { + for (auto peer : network->m_host->m_peerNetworks) { + if (peer.second != nullptr) { + if (peer.second->isEnabled() && peer.second->isPeerLink()) { + peer.second->writeMaster({ NET_FUNC::ANNOUNCE, NET_SUBFUNC::ANNC_SUBFUNC_GRP_AFFIL }, + req->buffer, req->length, req->rtpHeader.getSequence(), streamId, false, false); + } + } + } + } } else { network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); @@ -1102,6 +1155,18 @@ void* FNENetwork::threadedNetworkRx(void* arg) if (connection->connected() && connection->address() == ip && aff != nullptr) { uint32_t srcId = __GET_UINT16(req->buffer, 0U); // Source Address aff->unitReg(srcId); + + // attempt to repeat traffic to Peer-Link masters + if (network->m_host->m_peerNetworks.size() > 0) { + for (auto peer : network->m_host->m_peerNetworks) { + if (peer.second != nullptr) { + if (peer.second->isEnabled() && peer.second->isPeerLink()) { + peer.second->writeMaster({ NET_FUNC::ANNOUNCE, NET_SUBFUNC::ANNC_SUBFUNC_UNIT_REG }, + req->buffer, req->length, req->rtpHeader.getSequence(), streamId, false, false); + } + } + } + } } else { network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); @@ -1124,6 +1189,18 @@ void* FNENetwork::threadedNetworkRx(void* arg) if (connection->connected() && connection->address() == ip && aff != nullptr) { uint32_t srcId = __GET_UINT16(req->buffer, 0U); // Source Address aff->unitDereg(srcId); + + // attempt to repeat traffic to Peer-Link masters + if (network->m_host->m_peerNetworks.size() > 0) { + for (auto peer : network->m_host->m_peerNetworks) { + if (peer.second != nullptr) { + if (peer.second->isEnabled() && peer.second->isPeerLink()) { + peer.second->writeMaster({ NET_FUNC::ANNOUNCE, NET_SUBFUNC::ANNC_SUBFUNC_UNIT_DEREG }, + req->buffer, req->length, req->rtpHeader.getSequence(), streamId, false, false); + } + } + } + } } else { network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); @@ -1146,6 +1223,18 @@ void* FNENetwork::threadedNetworkRx(void* arg) if (connection->connected() && connection->address() == ip && aff != nullptr) { uint32_t srcId = __GET_UINT16(req->buffer, 0U); // Source Address aff->groupUnaff(srcId); + + // attempt to repeat traffic to Peer-Link masters + if (network->m_host->m_peerNetworks.size() > 0) { + for (auto peer : network->m_host->m_peerNetworks) { + if (peer.second != nullptr) { + if (peer.second->isEnabled() && peer.second->isPeerLink()) { + peer.second->writeMaster({ NET_FUNC::ANNOUNCE, NET_SUBFUNC::ANNC_SUBFUNC_GRP_UNAFFIL }, + req->buffer, req->length, req->rtpHeader.getSequence(), streamId, false, false); + } + } + } + } } else { network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); @@ -1181,6 +1270,18 @@ void* FNENetwork::threadedNetworkRx(void* arg) offs += 8U; } LogMessage(LOG_NET, "PEER %u (%s) announced %u affiliations", peerId, connection->identity().c_str(), len); + + // attempt to repeat traffic to Peer-Link masters + if (network->m_host->m_peerNetworks.size() > 0) { + for (auto peer : network->m_host->m_peerNetworks) { + if (peer.second != nullptr) { + if (peer.second->isEnabled() && peer.second->isPeerLink()) { + peer.second->writeMaster({ NET_FUNC::ANNOUNCE, NET_SUBFUNC::ANNC_SUBFUNC_AFFILS }, + req->buffer, req->length, req->rtpHeader.getSequence(), streamId, false, false); + } + } + } + } } } else { @@ -1215,6 +1316,18 @@ void* FNENetwork::threadedNetworkRx(void* arg) } LogMessage(LOG_NET, "PEER %u (%s) announced %u VCs", peerId, connection->identity().c_str(), len); network->m_ccPeerMap[peerId] = vcPeers; + + // attempt to repeat traffic to Peer-Link masters + if (network->m_host->m_peerNetworks.size() > 0) { + for (auto peer : network->m_host->m_peerNetworks) { + if (peer.second != nullptr) { + if (peer.second->isEnabled() && peer.second->isPeerLink()) { + peer.second->writeMaster({ NET_FUNC::ANNOUNCE, NET_SUBFUNC::ANNC_SUBFUNC_SITE_VC }, + req->buffer, req->length, req->rtpHeader.getSequence(), streamId, false, false); + } + } + } + } } else { network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); @@ -1300,7 +1413,6 @@ bool FNENetwork::erasePeer(uint32_t peerId) auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; }); if (it != m_peers.end()) { m_peers.erase(peerId); - return true; } } @@ -1309,11 +1421,59 @@ bool FNENetwork::erasePeer(uint32_t peerId) auto it = std::find_if(m_ccPeerMap.begin(), m_ccPeerMap.end(), [&](auto x) { return x.first == peerId; }); if (it != m_ccPeerMap.end()) { m_ccPeerMap.erase(peerId); - return true; } } - return false; + // erase any Peer-Link entries for this peer + { + auto it = std::find_if(m_peerLinkPeers.begin(), m_peerLinkPeers.end(), [&](auto x) { return x.first == peerId; }); + if (it != m_peerLinkPeers.end()) { + m_peerLinkPeers.erase(peerId); + } + } + + return true; +} + + +/* Helper to create a JSON representation of a FNE peer connection. */ + +json::object FNENetwork::fneConnObject(uint32_t peerId, FNEPeerConnection *conn) +{ + json::object peerObj = json::object(); + peerObj["peerId"].set(peerId); + + std::string address = conn->address(); + peerObj["address"].set(address); + uint16_t port = conn->port(); + peerObj["port"].set(port); + bool connected = conn->connected(); + peerObj["connected"].set(connected); + uint32_t connectionState = (uint32_t)conn->connectionState(); + peerObj["connectionState"].set(connectionState); + uint32_t pingsReceived = conn->pingsReceived(); + peerObj["pingsReceived"].set(pingsReceived); + uint64_t lastPing = conn->lastPing(); + peerObj["lastPing"].set(lastPing); + uint32_t ccPeerId = conn->ccPeerId(); + peerObj["controlChannel"].set(ccPeerId); + + json::object peerConfig = conn->config(); + if (peerConfig["rcon"].is()) + peerConfig.erase("rcon"); + peerObj["config"].set(peerConfig); + + json::array voiceChannels = json::array(); + auto it = std::find_if(m_ccPeerMap.begin(), m_ccPeerMap.end(), [&](auto x) { return x.first == peerId; }); + if (it != m_ccPeerMap.end()) { + std::vector vcPeers = m_ccPeerMap[peerId]; + for (uint32_t vcEntry : vcPeers) { + voiceChannels.push_back(json::value((double)vcEntry)); + } + } + peerObj["voiceChannels"].set(voiceChannels); + + return peerObj; } /* Helper to reset a peer connection. */ @@ -1419,20 +1579,11 @@ void* FNENetwork::threadedACLUpdate(void* arg) std::string peerIdentity = network->resolvePeerIdentity(req->peerId); - // check if the peer is participating in peer link - bool peerLink = false; - lookups::PeerId peerEntry = network->m_peerListLookup->find(req->peerId); - if (!peerEntry.peerDefault()) { - if (peerEntry.peerLink()) { - peerLink = true; - } - } - FNEPeerConnection* connection = network->m_peers[req->peerId]; if (connection != nullptr) { // if the connection is an external peer, and peer is participating in peer link, // send the peer proper configuration data - if (connection->isExternalPeer() && peerLink) { + if (connection->isExternalPeer() && connection->isPeerLink()) { LogInfoEx(LOG_NET, "PEER %u (%s) sending Peer-Link ACL list updates", req->peerId, peerIdentity.c_str()); network->writeWhitelistRIDs(req->peerId, true); diff --git a/src/fne/network/FNENetwork.h b/src/fne/network/FNENetwork.h index 29033961..779c8558 100644 --- a/src/fne/network/FNENetwork.h +++ b/src/fne/network/FNENetwork.h @@ -117,6 +117,7 @@ namespace network m_isExternalPeer(false), m_isConventionalPeer(false), m_isSysView(false), + m_isPeerLink(false), m_config(), m_pktLastSeq(RTP_END_OF_CALL_SEQ), m_pktNextSeq(1U) @@ -146,6 +147,7 @@ namespace network m_isExternalPeer(false), m_isConventionalPeer(false), m_isSysView(false), + m_isPeerLink(false), m_config(), m_pktLastSeq(RTP_END_OF_CALL_SEQ), m_pktNextSeq(1U) @@ -234,6 +236,11 @@ namespace network */ __PROPERTY_PLAIN(bool, isSysView); + /** + * @brief Flag indicating this connection is from an external peer that is peer link enabled. + */ + __PROPERTY_PLAIN(bool, isPeerLink); + /** * @brief JSON objecting containing peer configuration information. */ @@ -382,6 +389,14 @@ namespace network */ void close() override; + /** + * @brief Helper to create a JSON representation of a FNE peer connection. + * @param peerId Peer ID. + * @param conn FNE Peer Connection. + * @return json::object + */ + json::object fneConnObject(uint32_t peerId, FNEPeerConnection *conn); + /** * @brief Helper to reset a peer connection. * @param peerId Peer ID to reset. @@ -426,6 +441,7 @@ namespace network static std::mutex m_peerMutex; typedef std::pair PeerMapPair; std::unordered_map m_peers; + std::unordered_map m_peerLinkPeers; typedef std::pair PeerAffiliationMapPair; std::unordered_map m_peerAffiliations; std::unordered_map> m_ccPeerMap; diff --git a/src/fne/network/PeerNetwork.cpp b/src/fne/network/PeerNetwork.cpp index adc8eafe..8c75f8fb 100644 --- a/src/fne/network/PeerNetwork.cpp +++ b/src/fne/network/PeerNetwork.cpp @@ -32,6 +32,7 @@ PeerNetwork::PeerNetwork(const std::string& address, uint16_t port, uint16_t loc Network(address, port, localPort, peerId, password, duplex, debug, dmr, p25, nxdn, slot1, slot2, allowActivityTransfer, allowDiagnosticTransfer, updateLookup, saveLookup), m_blockTrafficToTable(), m_pidLookup(nullptr), + m_peerLink(false), m_tgidCompressedSize(0U), m_tgidSize(0U), m_tgidBuffer(nullptr), @@ -71,6 +72,32 @@ bool PeerNetwork::checkBlockedPeer(uint32_t peerId) return false; } +/* Writes a complete update of this CFNE's active peer list to the network. */ + +bool PeerNetwork::writePeerLinkPeers(json::array* peerList) +{ + if (peerList == nullptr) + return false; + if (peerList->size() == 0) + return false; + + if (peerList->size() > 0 && m_peerLink) { + json::value v = json::value(*peerList); + std::string json = std::string(v.serialize()); + + CharArray __buffer = std::make_unique(json.length() + 9U); + char* buffer = __buffer.get(); + + ::memcpy(buffer + 0U, TAG_PEER_LINK, 4U); + ::snprintf(buffer + 8U, json.length() + 1U, "%s", json.c_str()); + + return writeMaster({ NET_FUNC::PEER_LINK, NET_SUBFUNC::PL_ACT_PEER_LIST }, + (uint8_t*)buffer, json.length() + 8U, RTP_END_OF_CALL_SEQ, createStreamId(), false, true); + } + + return false; +} + // --------------------------------------------------------------------------- // Protected Class Members // --------------------------------------------------------------------------- @@ -95,7 +122,10 @@ void PeerNetwork::userPacketHandler(uint32_t peerId, FrameQueue::OpcodePair opco if (m_tgidBuffer != nullptr) delete[] m_tgidBuffer; - m_tgidBuffer = new uint8_t[m_tgidSize]; + if (m_tgidSize < PEER_LINK_BLOCK_SIZE) + m_tgidBuffer = new uint8_t[PEER_LINK_BLOCK_SIZE + 1U]; + else + m_tgidBuffer = new uint8_t[m_tgidSize + 1U]; } if (m_tgidBuffer != nullptr) { @@ -166,7 +196,7 @@ void PeerNetwork::userPacketHandler(uint32_t peerId, FrameQueue::OpcodePair opco // store to file std::unique_ptr __str = std::make_unique(decompressedLen + 1U); char* str = __str.get(); - ::memcpy(str, decompressed, decompressedLen + 1U); + ::memcpy(str, decompressed, decompressedLen); str[decompressedLen] = 0; // null termination std::string filename = "/tmp/talkgroup_rules.yml"; @@ -183,6 +213,9 @@ void PeerNetwork::userPacketHandler(uint32_t peerId, FrameQueue::OpcodePair opco m_tidLookup->filename(filename); m_tidLookup->reload(); + // flag this peer as Peer-Link enabled + m_peerLink = true; + // cleanup temporary file ::remove(filename.c_str()); } @@ -214,7 +247,10 @@ tid_lookup_cleanup: if (m_ridBuffer != nullptr) delete[] m_ridBuffer; - m_ridBuffer = new uint8_t[m_ridSize]; + if (m_ridSize < PEER_LINK_BLOCK_SIZE) + m_ridBuffer = new uint8_t[PEER_LINK_BLOCK_SIZE + 1U]; + else + m_ridBuffer = new uint8_t[m_ridSize + 1U]; } if (m_ridBuffer != nullptr) { @@ -285,7 +321,7 @@ tid_lookup_cleanup: // store to file std::unique_ptr __str = std::make_unique(decompressedLen + 1U); char* str = __str.get(); - ::memcpy(str, decompressed, decompressedLen + 1U); + ::memcpy(str, decompressed, decompressedLen); str[decompressedLen] = 0; // null termination std::string filename = "/tmp/rid_acl.dat"; @@ -302,6 +338,9 @@ tid_lookup_cleanup: m_ridLookup->filename(filename); m_ridLookup->reload(); + // flag this peer as Peer-Link enabled + m_peerLink = true; + // cleanup temporary file ::remove(filename.c_str()); } @@ -333,7 +372,10 @@ rid_lookup_cleanup: if (m_pidBuffer != nullptr) delete[] m_pidBuffer; - m_pidBuffer = new uint8_t[m_pidSize]; + if (m_pidSize < PEER_LINK_BLOCK_SIZE) + m_pidBuffer = new uint8_t[PEER_LINK_BLOCK_SIZE + 1U]; + else + m_pidBuffer = new uint8_t[m_pidSize + 1U]; } if (m_pidBuffer != nullptr) { @@ -404,7 +446,7 @@ rid_lookup_cleanup: // store to file std::unique_ptr __str = std::make_unique(decompressedLen + 1U); char* str = __str.get(); - ::memcpy(str, decompressed, decompressedLen + 1U); + ::memcpy(str, decompressed, decompressedLen); str[decompressedLen] = 0; // null termination std::string filename = "/tmp/peer_list.dat"; @@ -421,6 +463,9 @@ rid_lookup_cleanup: m_pidLookup->filename(filename); m_pidLookup->reload(); + // flag this peer as Peer-Link enabled + m_peerLink = true; + // cleanup temporary file ::remove(filename.c_str()); } diff --git a/src/fne/network/PeerNetwork.h b/src/fne/network/PeerNetwork.h index d60d572b..047fedb6 100644 --- a/src/fne/network/PeerNetwork.h +++ b/src/fne/network/PeerNetwork.h @@ -79,6 +79,19 @@ namespace network */ bool checkBlockedPeer(uint32_t peerId); + /** + * @brief Writes a complete update of this CFNE's active peer list to the network. + * @param peerList List of active peers. + * @returns bool True, if list was sent, otherwise false. + */ + bool writePeerLinkPeers(json::array* peerList); + + /** + * @brief Returns flag indicating whether or not this peer connection is Peer-Link enabled. + * @returns bool True, if Peer-Link enabled, otherwise false. + */ + bool isPeerLink() const { return m_peerLink; } + protected: std::vector m_blockTrafficToTable; @@ -101,6 +114,7 @@ namespace network private: lookups::PeerListLookup* m_pidLookup; + bool m_peerLink; uint32_t m_tgidCompressedSize; uint32_t m_tgidSize; diff --git a/src/fne/network/RESTAPI.cpp b/src/fne/network/RESTAPI.cpp index af15c346..62491805 100644 --- a/src/fne/network/RESTAPI.cpp +++ b/src/fne/network/RESTAPI.cpp @@ -821,39 +821,7 @@ void RESTAPI::restAPI_GetPeerQuery(const HTTPPayload& request, HTTPPayload& repl LogDebug(LOG_REST, "Preparing Peer %u (%s) for REST API query", peerId, peer->address().c_str()); } - json::object peerObj = json::object(); - peerObj["peerId"].set(peerId); - - std::string address = peer->address(); - peerObj["address"].set(address); - uint16_t port = peer->port(); - peerObj["port"].set(port); - bool connected = peer->connected(); - peerObj["connected"].set(connected); - uint32_t connectionState = (uint32_t)peer->connectionState(); - peerObj["connectionState"].set(connectionState); - uint32_t pingsReceived = peer->pingsReceived(); - peerObj["pingsReceived"].set(pingsReceived); - uint64_t lastPing = peer->lastPing(); - peerObj["lastPing"].set(lastPing); - uint32_t ccPeerId = peer->ccPeerId(); - peerObj["controlChannel"].set(ccPeerId); - - json::object peerConfig = peer->config(); - if (peerConfig["rcon"].is()) - peerConfig.erase("rcon"); - peerObj["config"].set(peerConfig); - - json::array voiceChannels = json::array(); - auto it = std::find_if(m_network->m_ccPeerMap.begin(), m_network->m_ccPeerMap.end(), [&](auto x) { return x.first == peerId; }); - if (it != m_network->m_ccPeerMap.end()) { - std::vector vcPeers = m_network->m_ccPeerMap[peerId]; - for (uint32_t vcEntry : vcPeers) { - voiceChannels.push_back(json::value((double)vcEntry)); - } - } - peerObj["voiceChannels"].set(voiceChannels); - + json::object peerObj = m_network->fneConnObject(peerId, peer); peers.push_back(json::value(peerObj)); } } @@ -861,6 +829,20 @@ void RESTAPI::restAPI_GetPeerQuery(const HTTPPayload& request, HTTPPayload& repl else { LogDebug(LOG_REST, "No peers connected to this FNE"); } + + // report any Peer-Link reported peers + if (m_network->m_peerLinkPeers.size() > 0) { + for (auto entry : m_network->m_peerLinkPeers) { + json::array peerObjs = entry.second; + if (entry.second.size() > 0) { + for (auto linkEntry : entry.second) { + if (linkEntry.is()) { + peers.push_back(json::value(linkEntry)); + } + } + } + } + } } else { LogDebug(LOG_REST, "Network not set up, no peers to return"); diff --git a/src/sysview/NodeStatusWnd.h b/src/sysview/NodeStatusWnd.h index a09da828..e57138b0 100644 --- a/src/sysview/NodeStatusWnd.h +++ b/src/sysview/NodeStatusWnd.h @@ -454,17 +454,21 @@ public: const auto& rootWidget = getRootWidget(); std::map peerStatus(getNetwork()->peerStatus.begin(), getNetwork()->peerStatus.end()); for (auto entry : peerStatus) { + uint32_t peerId = entry.first; + json::object peerObj = entry.second; + if (peerObj["peerId"].is()) + peerId = peerObj["peerId"].get(); + auto it = std::find_if(m_nodes.begin(), m_nodes.end(), [&](NodeStatusWidget* wdgt) { - if (wdgt->peerId == entry.first && wdgt->uniqueId == (int32_t)entry.first) + if (wdgt->peerId == peerId && wdgt->uniqueId == (int32_t)peerId) return true; return false; }); if (it == m_nodes.end()) { json::object peerObj = entry.second; - addNode(entry.first, peerObj); + addNode(peerId, peerObj); - uint32_t peerId = entry.first; uint8_t channelId = peerObj["channelId"].get(); uint32_t channelNo = peerObj["channelNo"].get(); @@ -538,9 +542,8 @@ public: } else { NodeStatusWidget* wdgt = *it; json::object peerObj = entry.second; - updateNode(wdgt, entry.first, peerObj); + updateNode(wdgt, peerId, peerObj); - uint32_t peerId = entry.first; uint8_t channelId = peerObj["channelId"].get(); uint32_t channelNo = peerObj["channelNo"].get(); diff --git a/src/sysview/PeerListWnd.h b/src/sysview/PeerListWnd.h index f60eba20..475f4b95 100644 --- a/src/sysview/PeerListWnd.h +++ b/src/sysview/PeerListWnd.h @@ -138,6 +138,10 @@ public: uint32_t pingsReceived = (uint32_t)peerObj["pingsReceived"].getDefault(0U); uint64_t lastPing = (uint64_t)peerObj["lastPing"].getDefault(0U); + uint32_t parentPeerId = 0U; + if (peerObj["parentPeerId"].is()) + parentPeerId = (uint32_t)peerObj["parentPeerId"].getDefault(0U); + uint32_t ccPeerId = (uint32_t)peerObj["controlChannel"].getDefault(0U); json::array voiceChannels = peerObj["voiceChannels"].get(); @@ -183,16 +187,20 @@ public: std::ostringstream peerOss; peerOss << std::setw(9) << std::setfill('0') << peerId; + // pad peer IDs properly + std::ostringstream parentPeerOss; + parentPeerOss << std::setw(9) << std::setfill('0') << parentPeerId; + // pad peer IDs properly std::ostringstream ccPeerOss; ccPeerOss << std::setw(9) << std::setfill('0') << ccPeerId; // build list view entry - const std::array columns = { + const std::array columns = { peerOss.str(), identity, software, peerAddress, std::to_string(port), - ccPeerOss.str(), + parentPeerOss.str(), ccPeerOss.str(), std::to_string(voiceChannelPeers.size()), (connected) ? "X" : "", strConnState, @@ -256,6 +264,7 @@ private: m_listView.addColumn("Software", 15); m_listView.addColumn("IP Address", 15); m_listView.addColumn("Port", 8); + m_listView.addColumn("Link Peer ID", 10); m_listView.addColumn("CC Peer ID", 10); m_listView.addColumn("VC Count", 8); m_listView.addColumn("Connected", 5); @@ -270,6 +279,7 @@ private: m_listView.setColumnAlignment(1, finalcut::Align::Right); m_listView.setColumnAlignment(4, finalcut::Align::Right); m_listView.setColumnAlignment(6, finalcut::Align::Center); + m_listView.setColumnAlignment(7, finalcut::Align::Center); // set type of sorting m_listView.setColumnSortType(1, finalcut::SortType::Name);