Add patch registry service to fne

pull/121/head
Lorenzo L. Romero 4 weeks ago
parent 9a31910526
commit a802b22b7a

@ -120,6 +120,17 @@ master:
# spanning tree updates.)
spanningTreeFastReconnect: true
# Console patch status registry configuration.
patchStatus:
# Flag indicating whether or not console patch status publishing is enabled.
enabled: true
# Default TTL, in seconds, for console patch status updates that do not specify one.
defaultTtlSeconds: 15
# Minimum accepted TTL, in seconds, for console patch status updates.
minTtlSeconds: 5
# Maximum accepted TTL, in seconds, for console patch status updates.
maxTtlSeconds: 300
# Flag indicating whether or not peer pinging will be reported.
reportPeerPing: true

@ -69,6 +69,7 @@
#define TAG_TRANSFER_ACT_LOG "TRNSLOG"
#define TAG_TRANSFER_DIAG_LOG "TRNSDIAG"
#define TAG_TRANSFER_STATUS "TRNSSTS"
#define TAG_TRANSFER_PATCH_STATUS "TRNSPTCH"
#define TAG_ANNOUNCE "ANNC"
#define TAG_PEER_REPLICA "REPL"

@ -100,6 +100,7 @@ namespace network
TRANSFER_SUBFUNC_ACTIVITY = 0x01U, //!< Activity Log Transfer
TRANSFER_SUBFUNC_DIAG = 0x02U, //!< Diagnostic Log Transfer
TRANSFER_SUBFUNC_STATUS = 0x03U, //!< Status Transfer
TRANSFER_SUBFUNC_PATCH_STATUS = 0x04U, //!< Console Patch Status Transfer
ANNC_SUBFUNC_GRP_AFFIL = 0x00U, //!< Announce Group Affiliation
ANNC_SUBFUNC_UNIT_REG = 0x01U, //!< Announce Unit Registration

@ -0,0 +1,405 @@
// SPDX-License-Identifier: GPL-2.0-only
/*
* Digital Voice Modem - Converged FNE Software
* GPLv2 Open Source. Use is subject to license terms.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* Copyright (C) 2026 DVMProject Authors
*
*/
#include "fne/PatchStatusRegistry.h"
#include "common/Log.h"
#include <algorithm>
#include <chrono>
#include <cctype>
#include <limits>
#include <sstream>
constexpr uint32_t PatchStatusRegistry::DEFAULT_TTL_SECONDS;
constexpr uint32_t PatchStatusRegistry::MIN_TTL_SECONDS;
constexpr uint32_t PatchStatusRegistry::MAX_TTL_SECONDS;
constexpr uint32_t PatchStatusRegistry::MAX_WAIT_MS;
PatchStatusRegistry::PatchStatusRegistry() :
m_mutex(),
m_revisionChanged(),
m_peerPatches(),
m_revision(0U),
m_defaultTtlSeconds(DEFAULT_TTL_SECONDS),
m_minTtlSeconds(MIN_TTL_SECONDS),
m_maxTtlSeconds(MAX_TTL_SECONDS)
{
/* stub */
}
void PatchStatusRegistry::configure(uint32_t defaultTtlSeconds, uint32_t minTtlSeconds, uint32_t maxTtlSeconds)
{
if (minTtlSeconds == 0U)
minTtlSeconds = MIN_TTL_SECONDS;
if (maxTtlSeconds < minTtlSeconds)
maxTtlSeconds = minTtlSeconds;
std::lock_guard<std::mutex> guard(m_mutex);
m_minTtlSeconds = minTtlSeconds;
m_maxTtlSeconds = maxTtlSeconds;
m_defaultTtlSeconds = std::max(m_minTtlSeconds, std::min(defaultTtlSeconds, m_maxTtlSeconds));
}
bool PatchStatusRegistry::publish(json::object& request, json::object& response, std::string& errorMessage)
{
if (!request["peerId"].is<uint32_t>()) {
errorMessage = "peerId was not a valid integer";
return false;
}
if (!request["patches"].is<json::array>()) {
errorMessage = "patches was not a valid array";
return false;
}
PeerPatchSnapshot incoming;
incoming.peerId = request["peerId"].get<uint32_t>();
if (incoming.peerId == 0U) {
errorMessage = "peerId cannot be zero";
return false;
}
if (request["peerName"].is<std::string>())
incoming.peerName = request["peerName"].get<std::string>();
if (request["sequence"].is<uint32_t>())
incoming.sequence = request["sequence"].get<uint32_t>();
uint32_t ttlSeconds = defaultTtlSeconds();
if (request["ttlSeconds"].is<uint32_t>())
ttlSeconds = request["ttlSeconds"].get<uint32_t>();
ttlSeconds = clampTtl(ttlSeconds);
incoming.updatedAt = nowMs();
incoming.expiresAt = incoming.updatedAt + (static_cast<uint64_t>(ttlSeconds) * 1000U);
json::array patches = request["patches"].get<json::array>();
for (json::value& value : patches) {
if (!value.is<json::object>()) {
errorMessage = "patches contained a non-object entry";
return false;
}
json::object patchObj = value.get<json::object>();
PatchRecord patch;
if (!parsePatch(patchObj, patch, errorMessage))
return false;
incoming.patches.push_back(patch);
}
cleanupExpired();
{
std::lock_guard<std::mutex> guard(m_mutex);
if (incoming.patches.empty())
m_peerPatches.erase(incoming.peerId);
else
m_peerPatches[incoming.peerId] = incoming;
bumpRevisionLocked();
response = snapshotLocked();
response["acceptedPeerId"].set<uint32_t>(incoming.peerId);
response["ttlSeconds"].set<uint32_t>(ttlSeconds);
}
m_revisionChanged.notify_all();
return true;
}
bool PatchStatusRegistry::removePeer(uint32_t peerId)
{
if (peerId == 0U)
return false;
bool removed = false;
{
std::lock_guard<std::mutex> guard(m_mutex);
removed = m_peerPatches.erase(peerId) > 0U;
if (removed)
bumpRevisionLocked();
}
if (removed)
m_revisionChanged.notify_all();
return removed;
}
uint32_t PatchStatusRegistry::cleanupExpired()
{
uint32_t removed = 0U;
bool changed = false;
uint64_t now = nowMs();
{
std::lock_guard<std::mutex> guard(m_mutex);
for (auto it = m_peerPatches.begin(); it != m_peerPatches.end();) {
if (it->second.expiresAt <= now) {
it = m_peerPatches.erase(it);
removed++;
changed = true;
}
else {
++it;
}
}
if (changed)
bumpRevisionLocked();
}
if (changed)
m_revisionChanged.notify_all();
return removed;
}
json::object PatchStatusRegistry::snapshot()
{
cleanupExpired();
std::lock_guard<std::mutex> guard(m_mutex);
return snapshotLocked();
}
json::object PatchStatusRegistry::waitForChanges(uint64_t sinceRevision, uint32_t waitMs)
{
waitMs = std::min(waitMs, MAX_WAIT_MS);
cleanupExpired();
std::unique_lock<std::mutex> lock(m_mutex);
if (waitMs > 0U && sinceRevision >= m_revision) {
m_revisionChanged.wait_for(lock, std::chrono::milliseconds(waitMs), [&]() {
return m_revision > sinceRevision;
});
}
return snapshotLocked();
}
uint64_t PatchStatusRegistry::revision() const
{
std::lock_guard<std::mutex> guard(m_mutex);
return m_revision;
}
uint32_t PatchStatusRegistry::defaultTtlSeconds() const
{
std::lock_guard<std::mutex> guard(m_mutex);
return m_defaultTtlSeconds;
}
uint32_t PatchStatusRegistry::minTtlSeconds() const
{
std::lock_guard<std::mutex> guard(m_mutex);
return m_minTtlSeconds;
}
uint32_t PatchStatusRegistry::maxTtlSeconds() const
{
std::lock_guard<std::mutex> guard(m_mutex);
return m_maxTtlSeconds;
}
uint64_t PatchStatusRegistry::nowMs()
{
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
}
std::string PatchStatusRegistry::normalizeMode(const std::string& mode)
{
std::string normalized = mode;
std::transform(normalized.begin(), normalized.end(), normalized.begin(), [](unsigned char c) {
return static_cast<char>(std::tolower(c));
});
return normalized;
}
std::string PatchStatusRegistry::buildTalkgroupKey(const PatchMember& member)
{
std::ostringstream ss;
ss << normalizeMode(member.mode) << ':' << member.tgid << ':' << static_cast<uint32_t>(member.slot);
return ss.str();
}
json::object PatchStatusRegistry::memberToJson(const PatchMember& member)
{
json::object obj = json::object();
obj["system"].set<std::string>(member.system);
obj["mode"].set<std::string>(member.mode);
obj["tgid"].set<uint32_t>(member.tgid);
obj["slot"].set<uint8_t>(member.slot);
obj["key"].set<std::string>(buildTalkgroupKey(member));
return obj;
}
json::object PatchStatusRegistry::patchToJson(const PatchRecord& patch)
{
json::object obj = json::object();
obj["patchId"].set<std::string>(patch.patchId);
obj["active"].set<bool>(patch.active);
obj["oneWay"].set<bool>(patch.oneWay);
json::array members = json::array();
for (const PatchMember& member : patch.members)
members.push_back(json::value(memberToJson(member)));
obj["members"].set<json::array>(members);
return obj;
}
json::object PatchStatusRegistry::peerSnapshotToJson(const PeerPatchSnapshot& peer)
{
json::object obj = json::object();
obj["peerId"].set<uint32_t>(peer.peerId);
obj["peerName"].set<std::string>(peer.peerName);
obj["sequence"].set<uint32_t>(peer.sequence);
obj["updatedAt"].set<uint64_t>(peer.updatedAt);
obj["expiresAt"].set<uint64_t>(peer.expiresAt);
json::array patches = json::array();
for (const PatchRecord& patch : peer.patches)
patches.push_back(json::value(patchToJson(patch)));
obj["patches"].set<json::array>(patches);
return obj;
}
bool PatchStatusRegistry::parsePatch(json::object& obj, PatchRecord& patch, std::string& errorMessage) const
{
if (obj["patchId"].is<std::string>())
patch.patchId = obj["patchId"].get<std::string>();
if (obj["active"].is<bool>())
patch.active = obj["active"].get<bool>();
if (obj["oneWay"].is<bool>())
patch.oneWay = obj["oneWay"].get<bool>();
if (!obj["members"].is<json::array>()) {
errorMessage = "patch members was not a valid array";
return false;
}
json::array members = obj["members"].get<json::array>();
for (json::value& value : members) {
if (!value.is<json::object>()) {
errorMessage = "patch members contained a non-object entry";
return false;
}
json::object memberObj = value.get<json::object>();
PatchMember member;
if (!parseMember(memberObj, member, errorMessage))
return false;
patch.members.push_back(member);
}
return true;
}
bool PatchStatusRegistry::parseMember(json::object& obj, PatchMember& member, std::string& errorMessage) const
{
if (obj["system"].is<std::string>())
member.system = obj["system"].get<std::string>();
if (obj["mode"].is<std::string>())
member.mode = normalizeMode(obj["mode"].get<std::string>());
else
member.mode = "unknown";
if (!obj["tgid"].is<uint32_t>()) {
errorMessage = "patch member tgid was not a valid integer";
return false;
}
member.tgid = obj["tgid"].get<uint32_t>();
if (member.tgid == 0U) {
errorMessage = "patch member tgid cannot be zero";
return false;
}
if (obj["slot"].is<uint8_t>())
member.slot = obj["slot"].get<uint8_t>();
else if (obj["slot"].is<uint32_t>()) {
uint32_t slot = obj["slot"].get<uint32_t>();
if (slot > std::numeric_limits<uint8_t>::max()) {
errorMessage = "patch member slot was out of range";
return false;
}
member.slot = static_cast<uint8_t>(slot);
}
return true;
}
uint32_t PatchStatusRegistry::clampTtl(uint32_t ttlSeconds) const
{
std::lock_guard<std::mutex> guard(m_mutex);
return std::max(m_minTtlSeconds, std::min(ttlSeconds, m_maxTtlSeconds));
}
json::object PatchStatusRegistry::snapshotLocked() const
{
json::object response = json::object();
response["revision"].set<uint64_t>(m_revision);
json::array peers = json::array();
json::array patches = json::array();
json::object byTalkgroup = json::object();
for (const auto& entry : m_peerPatches) {
const PeerPatchSnapshot& peer = entry.second;
peers.push_back(json::value(peerSnapshotToJson(peer)));
for (const PatchRecord& patch : peer.patches) {
json::object patchObj = patchToJson(patch);
patchObj["peerId"].set<uint32_t>(peer.peerId);
patchObj["peerName"].set<std::string>(peer.peerName);
patchObj["updatedAt"].set<uint64_t>(peer.updatedAt);
patchObj["expiresAt"].set<uint64_t>(peer.expiresAt);
patches.push_back(json::value(patchObj));
for (const PatchMember& member : patch.members) {
std::string key = buildTalkgroupKey(member);
json::array entries = json::array();
if (byTalkgroup[key].is<json::array>())
entries = byTalkgroup[key].get<json::array>();
json::object tgPatch = json::object();
tgPatch["peerId"].set<uint32_t>(peer.peerId);
tgPatch["peerName"].set<std::string>(peer.peerName);
tgPatch["patchId"].set<std::string>(patch.patchId);
tgPatch["active"].set<bool>(patch.active);
tgPatch["oneWay"].set<bool>(patch.oneWay);
tgPatch["updatedAt"].set<uint64_t>(peer.updatedAt);
tgPatch["expiresAt"].set<uint64_t>(peer.expiresAt);
tgPatch["member"].set<json::object>(memberToJson(member));
entries.push_back(json::value(tgPatch));
byTalkgroup[key].set<json::array>(entries);
}
}
}
response["peers"].set<json::array>(peers);
response["patches"].set<json::array>(patches);
response["byTalkgroup"].set<json::object>(byTalkgroup);
return response;
}
void PatchStatusRegistry::bumpRevisionLocked()
{
m_revision++;
if (m_revision == 0U)
m_revision = 1U;
}

@ -0,0 +1,101 @@
// SPDX-License-Identifier: GPL-2.0-only
/*
* Digital Voice Modem - Converged FNE Software
* GPLv2 Open Source. Use is subject to license terms.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* Copyright (C) 2026 DVMProject Authors
*
*/
/**
* @file PatchStatusRegistry.h
* @ingroup fne
*/
#if !defined(__FNE_PATCH_STATUS_REGISTRY_H__)
#define __FNE_PATCH_STATUS_REGISTRY_H__
#include "fne/Defines.h"
#include "common/json/json.h"
#include <condition_variable>
#include <cstdint>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>
/**
* @brief In-memory registry for console-advertised patch status.
* @ingroup fne
*/
class HOST_SW_API PatchStatusRegistry {
public:
static constexpr uint32_t DEFAULT_TTL_SECONDS = 15U;
static constexpr uint32_t MIN_TTL_SECONDS = 5U;
static constexpr uint32_t MAX_TTL_SECONDS = 300U;
static constexpr uint32_t MAX_WAIT_MS = 30000U;
PatchStatusRegistry();
~PatchStatusRegistry() = default;
void configure(uint32_t defaultTtlSeconds, uint32_t minTtlSeconds, uint32_t maxTtlSeconds);
bool publish(json::object& request, json::object& response, std::string& errorMessage);
bool removePeer(uint32_t peerId);
uint32_t cleanupExpired();
json::object snapshot();
json::object waitForChanges(uint64_t sinceRevision, uint32_t waitMs);
uint64_t revision() const;
uint32_t defaultTtlSeconds() const;
uint32_t minTtlSeconds() const;
uint32_t maxTtlSeconds() const;
private:
struct PatchMember {
std::string system;
std::string mode;
uint32_t tgid = 0U;
uint8_t slot = 0U;
};
struct PatchRecord {
std::string patchId;
bool active = true;
bool oneWay = false;
std::vector<PatchMember> members;
};
struct PeerPatchSnapshot {
uint32_t peerId = 0U;
std::string peerName;
uint32_t sequence = 0U;
uint64_t updatedAt = 0U;
uint64_t expiresAt = 0U;
std::vector<PatchRecord> patches;
};
static uint64_t nowMs();
static std::string normalizeMode(const std::string& mode);
static std::string buildTalkgroupKey(const PatchMember& member);
static json::object memberToJson(const PatchMember& member);
static json::object patchToJson(const PatchRecord& patch);
static json::object peerSnapshotToJson(const PeerPatchSnapshot& peer);
bool parsePatch(json::object& obj, PatchRecord& patch, std::string& errorMessage) const;
bool parseMember(json::object& obj, PatchMember& member, std::string& errorMessage) const;
uint32_t clampTtl(uint32_t ttlSeconds) const;
json::object snapshotLocked() const;
void bumpRevisionLocked();
mutable std::mutex m_mutex;
std::condition_variable m_revisionChanged;
std::unordered_map<uint32_t, PeerPatchSnapshot> m_peerPatches;
uint64_t m_revision;
uint32_t m_defaultTtlSeconds;
uint32_t m_minTtlSeconds;
uint32_t m_maxTtlSeconds;
};
#endif // __FNE_PATCH_STATUS_REGISTRY_H__

@ -12,6 +12,7 @@
#include "common/Log.h"
#include "common/Utils.h"
#include "network/MetadataNetwork.h"
#include "common/json/json.h"
#include "fne/ActivityLog.h"
#include "HostFNE.h"
@ -375,6 +376,69 @@ void MetadataNetwork::taskNetworkRx(NetPacketRequest* req)
}
break;
case NET_SUBFUNC::TRANSFER_SUBFUNC_PATCH_STATUS: // Console Patch Status Transfer
{
if (pktPeerId > 0 && validPeerId) {
FNEPeerConnection* connection = network->m_peers[pktPeerId];
if (connection != nullptr) {
if (!network->patchStatusEnabled()) {
network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED);
break;
}
std::string ip = udp::Socket::address(req->address);
// Only authenticated console peers may publish or request patch registry state.
if (req->length <= 11U) {
network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_ILLEGAL_PACKET);
break;
}
if (connection->connected() && connection->address() == ip && connection->peerClass() == PEER_CONN_CLASS_CONSOLE) {
DECLARE_UINT8_ARRAY(rawPayload, req->length - 11U);
::memcpy(rawPayload, req->buffer + 11U, req->length - 11U);
std::string payload(rawPayload, rawPayload + (req->length - 11U));
json::value v;
std::string err = json::parse(v, payload);
if (!err.empty() || !v.is<json::object>()) {
network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_ILLEGAL_PACKET);
break;
}
json::object reqObj = v.get<json::object>();
std::string type = "snapshot";
if (reqObj["type"].is<std::string>())
type = reqObj["type"].get<std::string>();
if (type == "request") {
json::object snapshot = network->patchStatusRegistry().snapshot();
network->writePatchStatusToPeer(pktPeerId, snapshot);
break;
}
// The authenticated peer identity is authoritative; do not allow spoofed peer IDs.
reqObj["peerId"].set<uint32_t>(pktPeerId);
if (!reqObj["peerName"].is<std::string>() || reqObj["peerName"].get<std::string>().empty())
reqObj["peerName"].set<std::string>(connection->identity());
json::object response = json::object();
std::string errorMessage;
if (!network->patchStatusRegistry().publish(reqObj, response, errorMessage)) {
LogWarning(LOG_MASTER, "PEER %u (%s) invalid patch status payload, %s", pktPeerId, connection->identWithQualifier().c_str(), errorMessage.c_str());
network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_ILLEGAL_PACKET);
break;
}
network->writePatchStatusToConsoles(response);
}
else {
network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
}
break;
default:
network->writePeerNAK(peerId, network->createStreamId(), TAG_TRANSFER, NET_CONN_NAK_ILLEGAL_PACKET);
Utils::dump("Unknown transfer opcode from the peer", req->buffer, req->length);

@ -109,6 +109,8 @@ TrafficNetwork::TrafficNetwork(HostFNE* host, const std::string& address, uint16
m_maintainenceTimer(1000U, pingTime),
m_updateLookupTimer(1000U, (updateLookupTime * 60U)),
m_haUpdateTimer(1000U, FIXED_HA_UPDATE_INTERVAL),
m_patchStatusRegistry(),
m_patchStatusEnabled(true),
m_softConnLimit(0U),
m_enableSpanningTree(true),
m_logSpanningTreeChanges(false),
@ -229,6 +231,13 @@ void TrafficNetwork::setOptions(yaml::Node& conf, bool printOptions)
m_logSpanningTreeChanges = conf["logSpanningTreeChanges"].as<bool>(false);
m_spanningTreeFastReconnect = conf["spanningTreeFastReconnect"].as<bool>(true);
yaml::Node patchStatusConf = conf["patchStatus"];
m_patchStatusEnabled = patchStatusConf["enabled"].as<bool>(true);
uint32_t patchStatusDefaultTtl = patchStatusConf["defaultTtlSeconds"].as<uint32_t>(PatchStatusRegistry::DEFAULT_TTL_SECONDS);
uint32_t patchStatusMinTtl = patchStatusConf["minTtlSeconds"].as<uint32_t>(PatchStatusRegistry::MIN_TTL_SECONDS);
uint32_t patchStatusMaxTtl = patchStatusConf["maxTtlSeconds"].as<uint32_t>(PatchStatusRegistry::MAX_TTL_SECONDS);
m_patchStatusRegistry.configure(patchStatusDefaultTtl, patchStatusMinTtl, patchStatusMaxTtl);
// always force disable ADJ_STS_BCAST to neighbor FNE peers if the all option
// is enabled
if (m_disallowAdjStsBcast) {
@ -350,6 +359,12 @@ void TrafficNetwork::setOptions(yaml::Node& conf, bool printOptions)
LogInfo(" Enable Peer Spanning Tree: %s", m_enableSpanningTree ? "yes" : "no");
LogInfo(" Log Spanning Tree Changes: %s", m_logSpanningTreeChanges ? "yes" : "no");
LogInfo(" Spanning Tree Allow Fast Reconnect: %s", m_spanningTreeFastReconnect ? "yes" : "no");
LogInfo(" Console Patch Status Enabled: %s", m_patchStatusEnabled ? "yes" : "no");
if (m_patchStatusEnabled) {
LogInfo(" Console Patch Status Default TTL: %us", m_patchStatusRegistry.defaultTtlSeconds());
LogInfo(" Console Patch Status Minimum TTL: %us", m_patchStatusRegistry.minTtlSeconds());
LogInfo(" Console Patch Status Maximum TTL: %us", m_patchStatusRegistry.maxTtlSeconds());
}
LogInfo(" Disable adjacent site broadcasts to any peers: %s", m_disallowAdjStsBcast ? "yes" : "no");
if (m_disallowAdjStsBcast) {
LogWarning(LOG_MASTER, "NOTICE: All P25 ADJ_STS_BCAST messages will be blocked and dropped!");
@ -636,6 +651,8 @@ void TrafficNetwork::clock(uint32_t ms)
}
// cleanup possibly stale data calls
if (m_patchStatusEnabled && m_patchStatusRegistry.cleanupExpired() > 0U)
writePatchStatusToConsoles(m_patchStatusRegistry.snapshot());
m_tagDMR->packetData()->cleanupStale();
m_tagP25->packetData()->cleanupStale();
@ -1571,6 +1588,8 @@ void TrafficNetwork::taskNetworkRx(NetPacketRequest* req)
// spin up a thread and send metadata over to peer
network->peerMetadataUpdate(peerId);
if (network->m_patchStatusEnabled && connection->peerClass() == PEER_CONN_CLASS_CONSOLE)
network->writePatchStatusToPeer(peerId, network->patchStatusRegistry().snapshot());
}
}
}
@ -2304,6 +2323,10 @@ void TrafficNetwork::erasePeer(uint32_t peerId)
}
}
// erase any console patch status records for this peer
if (m_patchStatusEnabled && m_patchStatusRegistry.removePeer(peerId))
writePatchStatusToConsoles(m_patchStatusRegistry.snapshot());
// erase any HA parameters for this peer
{
auto it = std::find_if(m_peerReplicaHAParams.begin(), m_peerReplicaHAParams.end(), [&](auto& x) { return x.peerId == peerId; });
@ -2408,6 +2431,88 @@ json::object TrafficNetwork::fneConnObject(uint32_t peerId, FNEPeerConnection *c
return peerObj;
}
/* Helper to send patch status state to one console peer. */
bool TrafficNetwork::writePatchStatusToPeer(uint32_t peerId, json::object obj)
{
if (peerId == 0U)
return false;
if (!m_patchStatusEnabled)
return false;
bool ret = false;
m_peers.shared_lock();
auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; });
if (it != m_peers.end() && it->second != nullptr)
ret = writePatchStatusPayload(it->second, obj);
m_peers.shared_unlock();
return ret;
}
/* Helper to broadcast patch status state to connected console peers. */
void TrafficNetwork::writePatchStatusToConsoles(json::object obj, uint32_t exceptPeerId)
{
if (!m_patchStatusEnabled)
return;
m_peers.shared_lock();
if (m_peers.size() == 0U) {
m_peers.shared_unlock();
return;
}
for (auto peer : m_peers) {
if (peer.first == exceptPeerId)
continue;
if (peer.second == nullptr)
continue;
if (!peer.second->connected() || peer.second->peerClass() != PEER_CONN_CLASS_CONSOLE)
continue;
writePatchStatusPayload(peer.second, obj);
}
m_peers.shared_unlock();
}
/* Helper to serialize and queue a patch status transfer payload. */
bool TrafficNetwork::writePatchStatusPayload(FNEPeerConnection* connection, json::object obj)
{
if (connection == nullptr)
return false;
if (!m_patchStatusEnabled)
return false;
if (!connection->connected())
return false;
if (connection->peerClass() != PEER_CONN_CLASS_CONSOLE)
return false;
obj["type"].set<std::string>("registry");
json::value v = json::value(obj);
std::string payload = std::string(v.serialize());
uint32_t len = static_cast<uint32_t>(payload.length());
if ((len + 11U) > DATA_PACKET_LENGTH) {
LogError(LOG_MASTER, "PEER %u (%s) patch status registry payload too large, len = %u", connection->id(), connection->identWithQualifier().c_str(), len);
return false;
}
uint8_t buffer[DATA_PACKET_LENGTH];
::memset(buffer, 0x00U, DATA_PACKET_LENGTH);
::memcpy(buffer + 11U, payload.c_str(), len);
sockaddr_storage addr = connection->socketStorage();
uint32_t addrLen = connection->sockStorageLen();
if (m_debug) {
LogDebug(LOG_MASTER, "PEER %u (%s) sending patch status registry, len = %u", connection->id(), connection->identWithQualifier().c_str(), len);
}
return m_frameQueue->write(buffer, len + 11U, createStreamId(), connection->id(), m_peerId,
{ NET_FUNC::TRANSFER, NET_SUBFUNC::TRANSFER_SUBFUNC_PATCH_STATUS }, RTP_END_OF_CALL_SEQ, addr, addrLen);
}
/* Helper to reset a peer connection. */
bool TrafficNetwork::resetPeer(uint32_t peerId)

@ -41,6 +41,7 @@
#include "fne/network/FNEPeerConnection.h"
#include "fne/network/SpanningTree.h"
#include "fne/network/HAParameters.h"
#include "fne/PatchStatusRegistry.h"
#include "fne/CryptoContainer.h"
#include <string>
@ -274,6 +275,29 @@ namespace network
* @return json::object
*/
json::object fneConnObject(uint32_t peerId, FNEPeerConnection* conn);
/**
* @brief Gets the console patch status registry.
* @return PatchStatusRegistry& Patch status registry.
*/
PatchStatusRegistry& patchStatusRegistry() { return m_patchStatusRegistry; }
/**
* @brief Flag indicating whether console patch status handling is enabled.
* @returns bool True, if enabled.
*/
bool patchStatusEnabled() const { return m_patchStatusEnabled; }
/**
* @brief Sends patch status registry state to one console peer.
* @param peerId Destination peer ID.
* @param obj Patch status JSON payload.
* @returns bool True, if the message was queued, otherwise false.
*/
bool writePatchStatusToPeer(uint32_t peerId, json::object obj);
/**
* @brief Broadcasts patch status registry state to connected console peers.
* @param obj Patch status JSON payload.
* @param exceptPeerId Optional peer ID to skip.
*/
void writePatchStatusToConsoles(json::object obj, uint32_t exceptPeerId = 0U);
/**
* @brief Helper to reset a peer connection.
@ -359,6 +383,9 @@ namespace network
Timer m_updateLookupTimer;
Timer m_haUpdateTimer;
PatchStatusRegistry m_patchStatusRegistry;
bool m_patchStatusEnabled;
uint32_t m_softConnLimit;
bool m_enableSpanningTree;
@ -793,6 +820,14 @@ namespace network
*/
bool writePeerNAK(uint32_t peerId, const char* tag, NET_CONN_NAK_REASON reason, sockaddr_storage& addr, uint32_t addrLen);
/**
* @brief Serializes and queues a patch status transfer payload.
* @param connection Destination connection.
* @param obj Patch status JSON payload.
* @returns bool True, if message was queued, otherwise false.
*/
bool writePatchStatusPayload(FNEPeerConnection* connection, json::object obj);
/*
** Internal KMM Callback.
*/

Loading…
Cancel
Save

Powered by TurnKey Linux.