diff --git a/configs/fne-config.example.yml b/configs/fne-config.example.yml index 2fb1c119..5c221faf 100644 --- a/configs/fne-config.example.yml +++ b/configs/fne-config.example.yml @@ -120,6 +120,17 @@ master: # spanning tree updates.) spanningTreeFastReconnect: true + # Console patch status registry configuration. + patchStatus: + # Flag indicating whether or not console patch status publishing is enabled. + enabled: true + # Default TTL, in seconds, for console patch status updates that do not specify one. + defaultTtlSeconds: 15 + # Minimum accepted TTL, in seconds, for console patch status updates. + minTtlSeconds: 5 + # Maximum accepted TTL, in seconds, for console patch status updates. + maxTtlSeconds: 300 + # Flag indicating whether or not peer pinging will be reported. reportPeerPing: true diff --git a/src/common/network/BaseNetwork.h b/src/common/network/BaseNetwork.h index e6538f00..a4023011 100644 --- a/src/common/network/BaseNetwork.h +++ b/src/common/network/BaseNetwork.h @@ -69,6 +69,7 @@ #define TAG_TRANSFER_ACT_LOG "TRNSLOG" #define TAG_TRANSFER_DIAG_LOG "TRNSDIAG" #define TAG_TRANSFER_STATUS "TRNSSTS" +#define TAG_TRANSFER_PATCH_STATUS "TRNSPTCH" #define TAG_ANNOUNCE "ANNC" #define TAG_PEER_REPLICA "REPL" diff --git a/src/common/network/RTPFNEHeader.h b/src/common/network/RTPFNEHeader.h index 3b842276..9e52e8b8 100644 --- a/src/common/network/RTPFNEHeader.h +++ b/src/common/network/RTPFNEHeader.h @@ -100,6 +100,7 @@ namespace network TRANSFER_SUBFUNC_ACTIVITY = 0x01U, //!< Activity Log Transfer TRANSFER_SUBFUNC_DIAG = 0x02U, //!< Diagnostic Log Transfer TRANSFER_SUBFUNC_STATUS = 0x03U, //!< Status Transfer + TRANSFER_SUBFUNC_PATCH_STATUS = 0x04U, //!< Console Patch Status Transfer ANNC_SUBFUNC_GRP_AFFIL = 0x00U, //!< Announce Group Affiliation ANNC_SUBFUNC_UNIT_REG = 0x01U, //!< Announce Unit Registration @@ -215,4 +216,4 @@ namespace network } // namespace frame } // namespace network -#endif // __RTP_FNE_HEADER_H__ \ No newline at end of file +#endif // __RTP_FNE_HEADER_H__ diff --git a/src/fne/PatchStatusRegistry.cpp b/src/fne/PatchStatusRegistry.cpp new file mode 100644 index 00000000..132ef484 --- /dev/null +++ b/src/fne/PatchStatusRegistry.cpp @@ -0,0 +1,405 @@ +// SPDX-License-Identifier: GPL-2.0-only +/* + * Digital Voice Modem - Converged FNE Software + * GPLv2 Open Source. Use is subject to license terms. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * Copyright (C) 2026 DVMProject Authors + * + */ +#include "fne/PatchStatusRegistry.h" + +#include "common/Log.h" + +#include +#include +#include +#include +#include + +constexpr uint32_t PatchStatusRegistry::DEFAULT_TTL_SECONDS; +constexpr uint32_t PatchStatusRegistry::MIN_TTL_SECONDS; +constexpr uint32_t PatchStatusRegistry::MAX_TTL_SECONDS; +constexpr uint32_t PatchStatusRegistry::MAX_WAIT_MS; + +PatchStatusRegistry::PatchStatusRegistry() : + m_mutex(), + m_revisionChanged(), + m_peerPatches(), + m_revision(0U), + m_defaultTtlSeconds(DEFAULT_TTL_SECONDS), + m_minTtlSeconds(MIN_TTL_SECONDS), + m_maxTtlSeconds(MAX_TTL_SECONDS) +{ + /* stub */ +} + +void PatchStatusRegistry::configure(uint32_t defaultTtlSeconds, uint32_t minTtlSeconds, uint32_t maxTtlSeconds) +{ + if (minTtlSeconds == 0U) + minTtlSeconds = MIN_TTL_SECONDS; + if (maxTtlSeconds < minTtlSeconds) + maxTtlSeconds = minTtlSeconds; + + std::lock_guard guard(m_mutex); + m_minTtlSeconds = minTtlSeconds; + m_maxTtlSeconds = maxTtlSeconds; + m_defaultTtlSeconds = std::max(m_minTtlSeconds, std::min(defaultTtlSeconds, m_maxTtlSeconds)); +} + +bool PatchStatusRegistry::publish(json::object& request, json::object& response, std::string& errorMessage) +{ + if (!request["peerId"].is()) { + errorMessage = "peerId was not a valid integer"; + return false; + } + + if (!request["patches"].is()) { + errorMessage = "patches was not a valid array"; + return false; + } + + PeerPatchSnapshot incoming; + incoming.peerId = request["peerId"].get(); + if (incoming.peerId == 0U) { + errorMessage = "peerId cannot be zero"; + return false; + } + + if (request["peerName"].is()) + incoming.peerName = request["peerName"].get(); + + if (request["sequence"].is()) + incoming.sequence = request["sequence"].get(); + + uint32_t ttlSeconds = defaultTtlSeconds(); + if (request["ttlSeconds"].is()) + ttlSeconds = request["ttlSeconds"].get(); + ttlSeconds = clampTtl(ttlSeconds); + + incoming.updatedAt = nowMs(); + incoming.expiresAt = incoming.updatedAt + (static_cast(ttlSeconds) * 1000U); + + json::array patches = request["patches"].get(); + for (json::value& value : patches) { + if (!value.is()) { + errorMessage = "patches contained a non-object entry"; + return false; + } + + json::object patchObj = value.get(); + PatchRecord patch; + if (!parsePatch(patchObj, patch, errorMessage)) + return false; + + incoming.patches.push_back(patch); + } + + cleanupExpired(); + + { + std::lock_guard guard(m_mutex); + if (incoming.patches.empty()) + m_peerPatches.erase(incoming.peerId); + else + m_peerPatches[incoming.peerId] = incoming; + bumpRevisionLocked(); + + response = snapshotLocked(); + response["acceptedPeerId"].set(incoming.peerId); + response["ttlSeconds"].set(ttlSeconds); + } + + m_revisionChanged.notify_all(); + return true; +} + +bool PatchStatusRegistry::removePeer(uint32_t peerId) +{ + if (peerId == 0U) + return false; + + bool removed = false; + { + std::lock_guard guard(m_mutex); + removed = m_peerPatches.erase(peerId) > 0U; + if (removed) + bumpRevisionLocked(); + } + + if (removed) + m_revisionChanged.notify_all(); + + return removed; +} + +uint32_t PatchStatusRegistry::cleanupExpired() +{ + uint32_t removed = 0U; + bool changed = false; + uint64_t now = nowMs(); + + { + std::lock_guard guard(m_mutex); + for (auto it = m_peerPatches.begin(); it != m_peerPatches.end();) { + if (it->second.expiresAt <= now) { + it = m_peerPatches.erase(it); + removed++; + changed = true; + } + else { + ++it; + } + } + + if (changed) + bumpRevisionLocked(); + } + + if (changed) + m_revisionChanged.notify_all(); + + return removed; +} + +json::object PatchStatusRegistry::snapshot() +{ + cleanupExpired(); + + std::lock_guard guard(m_mutex); + return snapshotLocked(); +} + +json::object PatchStatusRegistry::waitForChanges(uint64_t sinceRevision, uint32_t waitMs) +{ + waitMs = std::min(waitMs, MAX_WAIT_MS); + cleanupExpired(); + + std::unique_lock lock(m_mutex); + if (waitMs > 0U && sinceRevision >= m_revision) { + m_revisionChanged.wait_for(lock, std::chrono::milliseconds(waitMs), [&]() { + return m_revision > sinceRevision; + }); + } + + return snapshotLocked(); +} + +uint64_t PatchStatusRegistry::revision() const +{ + std::lock_guard guard(m_mutex); + return m_revision; +} + +uint32_t PatchStatusRegistry::defaultTtlSeconds() const +{ + std::lock_guard guard(m_mutex); + return m_defaultTtlSeconds; +} + +uint32_t PatchStatusRegistry::minTtlSeconds() const +{ + std::lock_guard guard(m_mutex); + return m_minTtlSeconds; +} + +uint32_t PatchStatusRegistry::maxTtlSeconds() const +{ + std::lock_guard guard(m_mutex); + return m_maxTtlSeconds; +} + +uint64_t PatchStatusRegistry::nowMs() +{ + return std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); +} + +std::string PatchStatusRegistry::normalizeMode(const std::string& mode) +{ + std::string normalized = mode; + std::transform(normalized.begin(), normalized.end(), normalized.begin(), [](unsigned char c) { + return static_cast(std::tolower(c)); + }); + return normalized; +} + +std::string PatchStatusRegistry::buildTalkgroupKey(const PatchMember& member) +{ + std::ostringstream ss; + ss << normalizeMode(member.mode) << ':' << member.tgid << ':' << static_cast(member.slot); + return ss.str(); +} + +json::object PatchStatusRegistry::memberToJson(const PatchMember& member) +{ + json::object obj = json::object(); + obj["system"].set(member.system); + obj["mode"].set(member.mode); + obj["tgid"].set(member.tgid); + obj["slot"].set(member.slot); + obj["key"].set(buildTalkgroupKey(member)); + return obj; +} + +json::object PatchStatusRegistry::patchToJson(const PatchRecord& patch) +{ + json::object obj = json::object(); + obj["patchId"].set(patch.patchId); + obj["active"].set(patch.active); + obj["oneWay"].set(patch.oneWay); + + json::array members = json::array(); + for (const PatchMember& member : patch.members) + members.push_back(json::value(memberToJson(member))); + obj["members"].set(members); + + return obj; +} + +json::object PatchStatusRegistry::peerSnapshotToJson(const PeerPatchSnapshot& peer) +{ + json::object obj = json::object(); + obj["peerId"].set(peer.peerId); + obj["peerName"].set(peer.peerName); + obj["sequence"].set(peer.sequence); + obj["updatedAt"].set(peer.updatedAt); + obj["expiresAt"].set(peer.expiresAt); + + json::array patches = json::array(); + for (const PatchRecord& patch : peer.patches) + patches.push_back(json::value(patchToJson(patch))); + obj["patches"].set(patches); + + return obj; +} + +bool PatchStatusRegistry::parsePatch(json::object& obj, PatchRecord& patch, std::string& errorMessage) const +{ + if (obj["patchId"].is()) + patch.patchId = obj["patchId"].get(); + + if (obj["active"].is()) + patch.active = obj["active"].get(); + + if (obj["oneWay"].is()) + patch.oneWay = obj["oneWay"].get(); + + if (!obj["members"].is()) { + errorMessage = "patch members was not a valid array"; + return false; + } + + json::array members = obj["members"].get(); + for (json::value& value : members) { + if (!value.is()) { + errorMessage = "patch members contained a non-object entry"; + return false; + } + + json::object memberObj = value.get(); + PatchMember member; + if (!parseMember(memberObj, member, errorMessage)) + return false; + + patch.members.push_back(member); + } + + return true; +} + +bool PatchStatusRegistry::parseMember(json::object& obj, PatchMember& member, std::string& errorMessage) const +{ + if (obj["system"].is()) + member.system = obj["system"].get(); + + if (obj["mode"].is()) + member.mode = normalizeMode(obj["mode"].get()); + else + member.mode = "unknown"; + + if (!obj["tgid"].is()) { + errorMessage = "patch member tgid was not a valid integer"; + return false; + } + + member.tgid = obj["tgid"].get(); + if (member.tgid == 0U) { + errorMessage = "patch member tgid cannot be zero"; + return false; + } + + if (obj["slot"].is()) + member.slot = obj["slot"].get(); + else if (obj["slot"].is()) { + uint32_t slot = obj["slot"].get(); + if (slot > std::numeric_limits::max()) { + errorMessage = "patch member slot was out of range"; + return false; + } + member.slot = static_cast(slot); + } + + return true; +} + +uint32_t PatchStatusRegistry::clampTtl(uint32_t ttlSeconds) const +{ + std::lock_guard guard(m_mutex); + return std::max(m_minTtlSeconds, std::min(ttlSeconds, m_maxTtlSeconds)); +} + +json::object PatchStatusRegistry::snapshotLocked() const +{ + json::object response = json::object(); + response["revision"].set(m_revision); + + json::array peers = json::array(); + json::array patches = json::array(); + json::object byTalkgroup = json::object(); + + for (const auto& entry : m_peerPatches) { + const PeerPatchSnapshot& peer = entry.second; + peers.push_back(json::value(peerSnapshotToJson(peer))); + + for (const PatchRecord& patch : peer.patches) { + json::object patchObj = patchToJson(patch); + patchObj["peerId"].set(peer.peerId); + patchObj["peerName"].set(peer.peerName); + patchObj["updatedAt"].set(peer.updatedAt); + patchObj["expiresAt"].set(peer.expiresAt); + patches.push_back(json::value(patchObj)); + + for (const PatchMember& member : patch.members) { + std::string key = buildTalkgroupKey(member); + json::array entries = json::array(); + if (byTalkgroup[key].is()) + entries = byTalkgroup[key].get(); + + json::object tgPatch = json::object(); + tgPatch["peerId"].set(peer.peerId); + tgPatch["peerName"].set(peer.peerName); + tgPatch["patchId"].set(patch.patchId); + tgPatch["active"].set(patch.active); + tgPatch["oneWay"].set(patch.oneWay); + tgPatch["updatedAt"].set(peer.updatedAt); + tgPatch["expiresAt"].set(peer.expiresAt); + tgPatch["member"].set(memberToJson(member)); + entries.push_back(json::value(tgPatch)); + byTalkgroup[key].set(entries); + } + } + } + + response["peers"].set(peers); + response["patches"].set(patches); + response["byTalkgroup"].set(byTalkgroup); + return response; +} + +void PatchStatusRegistry::bumpRevisionLocked() +{ + m_revision++; + if (m_revision == 0U) + m_revision = 1U; +} diff --git a/src/fne/PatchStatusRegistry.h b/src/fne/PatchStatusRegistry.h new file mode 100644 index 00000000..0537e6cd --- /dev/null +++ b/src/fne/PatchStatusRegistry.h @@ -0,0 +1,101 @@ +// SPDX-License-Identifier: GPL-2.0-only +/* + * Digital Voice Modem - Converged FNE Software + * GPLv2 Open Source. Use is subject to license terms. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * Copyright (C) 2026 DVMProject Authors + * + */ +/** + * @file PatchStatusRegistry.h + * @ingroup fne + */ +#if !defined(__FNE_PATCH_STATUS_REGISTRY_H__) +#define __FNE_PATCH_STATUS_REGISTRY_H__ + +#include "fne/Defines.h" +#include "common/json/json.h" + +#include +#include +#include +#include +#include +#include + +/** + * @brief In-memory registry for console-advertised patch status. + * @ingroup fne + */ +class HOST_SW_API PatchStatusRegistry { +public: + static constexpr uint32_t DEFAULT_TTL_SECONDS = 15U; + static constexpr uint32_t MIN_TTL_SECONDS = 5U; + static constexpr uint32_t MAX_TTL_SECONDS = 300U; + static constexpr uint32_t MAX_WAIT_MS = 30000U; + + PatchStatusRegistry(); + ~PatchStatusRegistry() = default; + + void configure(uint32_t defaultTtlSeconds, uint32_t minTtlSeconds, uint32_t maxTtlSeconds); + + bool publish(json::object& request, json::object& response, std::string& errorMessage); + bool removePeer(uint32_t peerId); + uint32_t cleanupExpired(); + + json::object snapshot(); + json::object waitForChanges(uint64_t sinceRevision, uint32_t waitMs); + + uint64_t revision() const; + uint32_t defaultTtlSeconds() const; + uint32_t minTtlSeconds() const; + uint32_t maxTtlSeconds() const; + +private: + struct PatchMember { + std::string system; + std::string mode; + uint32_t tgid = 0U; + uint8_t slot = 0U; + }; + + struct PatchRecord { + std::string patchId; + bool active = true; + bool oneWay = false; + std::vector members; + }; + + struct PeerPatchSnapshot { + uint32_t peerId = 0U; + std::string peerName; + uint32_t sequence = 0U; + uint64_t updatedAt = 0U; + uint64_t expiresAt = 0U; + std::vector patches; + }; + + static uint64_t nowMs(); + static std::string normalizeMode(const std::string& mode); + static std::string buildTalkgroupKey(const PatchMember& member); + static json::object memberToJson(const PatchMember& member); + static json::object patchToJson(const PatchRecord& patch); + static json::object peerSnapshotToJson(const PeerPatchSnapshot& peer); + + bool parsePatch(json::object& obj, PatchRecord& patch, std::string& errorMessage) const; + bool parseMember(json::object& obj, PatchMember& member, std::string& errorMessage) const; + uint32_t clampTtl(uint32_t ttlSeconds) const; + json::object snapshotLocked() const; + void bumpRevisionLocked(); + + mutable std::mutex m_mutex; + std::condition_variable m_revisionChanged; + std::unordered_map m_peerPatches; + uint64_t m_revision; + uint32_t m_defaultTtlSeconds; + uint32_t m_minTtlSeconds; + uint32_t m_maxTtlSeconds; +}; + +#endif // __FNE_PATCH_STATUS_REGISTRY_H__ diff --git a/src/fne/network/MetadataNetwork.cpp b/src/fne/network/MetadataNetwork.cpp index 560faaeb..bc58a09f 100644 --- a/src/fne/network/MetadataNetwork.cpp +++ b/src/fne/network/MetadataNetwork.cpp @@ -12,6 +12,7 @@ #include "common/Log.h" #include "common/Utils.h" #include "network/MetadataNetwork.h" +#include "common/json/json.h" #include "fne/ActivityLog.h" #include "HostFNE.h" @@ -375,6 +376,69 @@ void MetadataNetwork::taskNetworkRx(NetPacketRequest* req) } break; + case NET_SUBFUNC::TRANSFER_SUBFUNC_PATCH_STATUS: // Console Patch Status Transfer + { + if (pktPeerId > 0 && validPeerId) { + FNEPeerConnection* connection = network->m_peers[pktPeerId]; + if (connection != nullptr) { + if (!network->patchStatusEnabled()) { + network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED); + break; + } + + std::string ip = udp::Socket::address(req->address); + + // Only authenticated console peers may publish or request patch registry state. + if (req->length <= 11U) { + network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_ILLEGAL_PACKET); + break; + } + + if (connection->connected() && connection->address() == ip && connection->peerClass() == PEER_CONN_CLASS_CONSOLE) { + DECLARE_UINT8_ARRAY(rawPayload, req->length - 11U); + ::memcpy(rawPayload, req->buffer + 11U, req->length - 11U); + std::string payload(rawPayload, rawPayload + (req->length - 11U)); + + json::value v; + std::string err = json::parse(v, payload); + if (!err.empty() || !v.is()) { + network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_ILLEGAL_PACKET); + break; + } + + json::object reqObj = v.get(); + std::string type = "snapshot"; + if (reqObj["type"].is()) + type = reqObj["type"].get(); + + if (type == "request") { + json::object snapshot = network->patchStatusRegistry().snapshot(); + network->writePatchStatusToPeer(pktPeerId, snapshot); + break; + } + + // The authenticated peer identity is authoritative; do not allow spoofed peer IDs. + reqObj["peerId"].set(pktPeerId); + if (!reqObj["peerName"].is() || reqObj["peerName"].get().empty()) + reqObj["peerName"].set(connection->identity()); + + json::object response = json::object(); + std::string errorMessage; + if (!network->patchStatusRegistry().publish(reqObj, response, errorMessage)) { + LogWarning(LOG_MASTER, "PEER %u (%s) invalid patch status payload, %s", pktPeerId, connection->identWithQualifier().c_str(), errorMessage.c_str()); + network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_ILLEGAL_PACKET); + break; + } + + network->writePatchStatusToConsoles(response); + } + else { + network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED); + } + } + } + } + break; default: network->writePeerNAK(peerId, network->createStreamId(), TAG_TRANSFER, NET_CONN_NAK_ILLEGAL_PACKET); Utils::dump("Unknown transfer opcode from the peer", req->buffer, req->length); diff --git a/src/fne/network/TrafficNetwork.cpp b/src/fne/network/TrafficNetwork.cpp index 176a80ae..ab06bb22 100644 --- a/src/fne/network/TrafficNetwork.cpp +++ b/src/fne/network/TrafficNetwork.cpp @@ -109,6 +109,8 @@ TrafficNetwork::TrafficNetwork(HostFNE* host, const std::string& address, uint16 m_maintainenceTimer(1000U, pingTime), m_updateLookupTimer(1000U, (updateLookupTime * 60U)), m_haUpdateTimer(1000U, FIXED_HA_UPDATE_INTERVAL), + m_patchStatusRegistry(), + m_patchStatusEnabled(true), m_softConnLimit(0U), m_enableSpanningTree(true), m_logSpanningTreeChanges(false), @@ -229,6 +231,13 @@ void TrafficNetwork::setOptions(yaml::Node& conf, bool printOptions) m_logSpanningTreeChanges = conf["logSpanningTreeChanges"].as(false); m_spanningTreeFastReconnect = conf["spanningTreeFastReconnect"].as(true); + yaml::Node patchStatusConf = conf["patchStatus"]; + m_patchStatusEnabled = patchStatusConf["enabled"].as(true); + uint32_t patchStatusDefaultTtl = patchStatusConf["defaultTtlSeconds"].as(PatchStatusRegistry::DEFAULT_TTL_SECONDS); + uint32_t patchStatusMinTtl = patchStatusConf["minTtlSeconds"].as(PatchStatusRegistry::MIN_TTL_SECONDS); + uint32_t patchStatusMaxTtl = patchStatusConf["maxTtlSeconds"].as(PatchStatusRegistry::MAX_TTL_SECONDS); + m_patchStatusRegistry.configure(patchStatusDefaultTtl, patchStatusMinTtl, patchStatusMaxTtl); + // always force disable ADJ_STS_BCAST to neighbor FNE peers if the all option // is enabled if (m_disallowAdjStsBcast) { @@ -350,6 +359,12 @@ void TrafficNetwork::setOptions(yaml::Node& conf, bool printOptions) LogInfo(" Enable Peer Spanning Tree: %s", m_enableSpanningTree ? "yes" : "no"); LogInfo(" Log Spanning Tree Changes: %s", m_logSpanningTreeChanges ? "yes" : "no"); LogInfo(" Spanning Tree Allow Fast Reconnect: %s", m_spanningTreeFastReconnect ? "yes" : "no"); + LogInfo(" Console Patch Status Enabled: %s", m_patchStatusEnabled ? "yes" : "no"); + if (m_patchStatusEnabled) { + LogInfo(" Console Patch Status Default TTL: %us", m_patchStatusRegistry.defaultTtlSeconds()); + LogInfo(" Console Patch Status Minimum TTL: %us", m_patchStatusRegistry.minTtlSeconds()); + LogInfo(" Console Patch Status Maximum TTL: %us", m_patchStatusRegistry.maxTtlSeconds()); + } LogInfo(" Disable adjacent site broadcasts to any peers: %s", m_disallowAdjStsBcast ? "yes" : "no"); if (m_disallowAdjStsBcast) { LogWarning(LOG_MASTER, "NOTICE: All P25 ADJ_STS_BCAST messages will be blocked and dropped!"); @@ -636,6 +651,8 @@ void TrafficNetwork::clock(uint32_t ms) } // cleanup possibly stale data calls + if (m_patchStatusEnabled && m_patchStatusRegistry.cleanupExpired() > 0U) + writePatchStatusToConsoles(m_patchStatusRegistry.snapshot()); m_tagDMR->packetData()->cleanupStale(); m_tagP25->packetData()->cleanupStale(); @@ -1571,6 +1588,8 @@ void TrafficNetwork::taskNetworkRx(NetPacketRequest* req) // spin up a thread and send metadata over to peer network->peerMetadataUpdate(peerId); + if (network->m_patchStatusEnabled && connection->peerClass() == PEER_CONN_CLASS_CONSOLE) + network->writePatchStatusToPeer(peerId, network->patchStatusRegistry().snapshot()); } } } @@ -2304,6 +2323,10 @@ void TrafficNetwork::erasePeer(uint32_t peerId) } } + // erase any console patch status records for this peer + if (m_patchStatusEnabled && m_patchStatusRegistry.removePeer(peerId)) + writePatchStatusToConsoles(m_patchStatusRegistry.snapshot()); + // erase any HA parameters for this peer { auto it = std::find_if(m_peerReplicaHAParams.begin(), m_peerReplicaHAParams.end(), [&](auto& x) { return x.peerId == peerId; }); @@ -2408,6 +2431,88 @@ json::object TrafficNetwork::fneConnObject(uint32_t peerId, FNEPeerConnection *c return peerObj; } +/* Helper to send patch status state to one console peer. */ + +bool TrafficNetwork::writePatchStatusToPeer(uint32_t peerId, json::object obj) +{ + if (peerId == 0U) + return false; + if (!m_patchStatusEnabled) + return false; + + bool ret = false; + m_peers.shared_lock(); + auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; }); + if (it != m_peers.end() && it->second != nullptr) + ret = writePatchStatusPayload(it->second, obj); + m_peers.shared_unlock(); + + return ret; +} + +/* Helper to broadcast patch status state to connected console peers. */ + +void TrafficNetwork::writePatchStatusToConsoles(json::object obj, uint32_t exceptPeerId) +{ + if (!m_patchStatusEnabled) + return; + + m_peers.shared_lock(); + if (m_peers.size() == 0U) { + m_peers.shared_unlock(); + return; + } + + for (auto peer : m_peers) { + if (peer.first == exceptPeerId) + continue; + if (peer.second == nullptr) + continue; + if (!peer.second->connected() || peer.second->peerClass() != PEER_CONN_CLASS_CONSOLE) + continue; + + writePatchStatusPayload(peer.second, obj); + } + m_peers.shared_unlock(); +} + +/* Helper to serialize and queue a patch status transfer payload. */ + +bool TrafficNetwork::writePatchStatusPayload(FNEPeerConnection* connection, json::object obj) +{ + if (connection == nullptr) + return false; + if (!m_patchStatusEnabled) + return false; + if (!connection->connected()) + return false; + if (connection->peerClass() != PEER_CONN_CLASS_CONSOLE) + return false; + + obj["type"].set("registry"); + json::value v = json::value(obj); + std::string payload = std::string(v.serialize()); + uint32_t len = static_cast(payload.length()); + if ((len + 11U) > DATA_PACKET_LENGTH) { + LogError(LOG_MASTER, "PEER %u (%s) patch status registry payload too large, len = %u", connection->id(), connection->identWithQualifier().c_str(), len); + return false; + } + + uint8_t buffer[DATA_PACKET_LENGTH]; + ::memset(buffer, 0x00U, DATA_PACKET_LENGTH); + ::memcpy(buffer + 11U, payload.c_str(), len); + + sockaddr_storage addr = connection->socketStorage(); + uint32_t addrLen = connection->sockStorageLen(); + + if (m_debug) { + LogDebug(LOG_MASTER, "PEER %u (%s) sending patch status registry, len = %u", connection->id(), connection->identWithQualifier().c_str(), len); + } + + return m_frameQueue->write(buffer, len + 11U, createStreamId(), connection->id(), m_peerId, + { NET_FUNC::TRANSFER, NET_SUBFUNC::TRANSFER_SUBFUNC_PATCH_STATUS }, RTP_END_OF_CALL_SEQ, addr, addrLen); +} + /* Helper to reset a peer connection. */ bool TrafficNetwork::resetPeer(uint32_t peerId) diff --git a/src/fne/network/TrafficNetwork.h b/src/fne/network/TrafficNetwork.h index 27d7f4d3..f234df87 100644 --- a/src/fne/network/TrafficNetwork.h +++ b/src/fne/network/TrafficNetwork.h @@ -41,6 +41,7 @@ #include "fne/network/FNEPeerConnection.h" #include "fne/network/SpanningTree.h" #include "fne/network/HAParameters.h" +#include "fne/PatchStatusRegistry.h" #include "fne/CryptoContainer.h" #include @@ -274,6 +275,29 @@ namespace network * @return json::object */ json::object fneConnObject(uint32_t peerId, FNEPeerConnection* conn); + /** + * @brief Gets the console patch status registry. + * @return PatchStatusRegistry& Patch status registry. + */ + PatchStatusRegistry& patchStatusRegistry() { return m_patchStatusRegistry; } + /** + * @brief Flag indicating whether console patch status handling is enabled. + * @returns bool True, if enabled. + */ + bool patchStatusEnabled() const { return m_patchStatusEnabled; } + /** + * @brief Sends patch status registry state to one console peer. + * @param peerId Destination peer ID. + * @param obj Patch status JSON payload. + * @returns bool True, if the message was queued, otherwise false. + */ + bool writePatchStatusToPeer(uint32_t peerId, json::object obj); + /** + * @brief Broadcasts patch status registry state to connected console peers. + * @param obj Patch status JSON payload. + * @param exceptPeerId Optional peer ID to skip. + */ + void writePatchStatusToConsoles(json::object obj, uint32_t exceptPeerId = 0U); /** * @brief Helper to reset a peer connection. @@ -359,6 +383,9 @@ namespace network Timer m_updateLookupTimer; Timer m_haUpdateTimer; + PatchStatusRegistry m_patchStatusRegistry; + bool m_patchStatusEnabled; + uint32_t m_softConnLimit; bool m_enableSpanningTree; @@ -793,6 +820,14 @@ namespace network */ bool writePeerNAK(uint32_t peerId, const char* tag, NET_CONN_NAK_REASON reason, sockaddr_storage& addr, uint32_t addrLen); + /** + * @brief Serializes and queues a patch status transfer payload. + * @param connection Destination connection. + * @param obj Patch status JSON payload. + * @returns bool True, if message was queued, otherwise false. + */ + bool writePatchStatusPayload(FNEPeerConnection* connection, json::object obj); + /* ** Internal KMM Callback. */