FNE patch status registry replication. Add docs to new patch registry functions. Replace unordered map with mutex guarded map.

pull/121/head
Lorenzo L. Romero 3 weeks ago
parent a802b22b7a
commit 9c3601462f

@ -0,0 +1,23 @@
param(
[string]$Generator = "Ninja",
[string]$BuildDir = "out\\build\\windows",
[string]$Config = "RelWithDebInfo"
)
function Abort($msg) {
Write-Error $msg
exit 1
}
if (-not (Get-Command cmake -ErrorAction SilentlyContinue)) { Abort "cmake not found on PATH. Install CMake and retry." }
if ($Generator -eq "Ninja" -and -not (Get-Command ninja -ErrorAction SilentlyContinue)) { Abort "ninja not found on PATH. Install Ninja and retry." }
Write-Host "Configuring (Generator=$Generator, BuildDir=$BuildDir, Config=$Config)"
cmake -S . -B $BuildDir -G $Generator -DCOMPILE_WIN32=ON -DCMAKE_BUILD_TYPE=$Config
if ($LASTEXITCODE -ne 0) { Abort "CMake configure failed." }
Write-Host "Building"
cmake --build $BuildDir --config $Config -- -v
if ($LASTEXITCODE -ne 0) { Abort "Build failed." }
Write-Host "Build finished. Artifacts are in: $BuildDir"

@ -0,0 +1,30 @@
Windows build prerequisites and steps for dvmhost
Prerequisites
- Visual Studio 2019/2022 (Desktop development with C++ workload) or at least the MSVC build tools.
- CMake 3.16 or newer
- Ninja (recommended generator)
- Git
- (Optional) OpenSSL for Windows if you need SSL; not required by default when building with `-DCOMPILE_WIN32=ON`.
Quick steps
1. Open a Developer command prompt (e.g. "x64 Native Tools Command Prompt for VS 2022") or run the MSVC environment so compilers are on PATH.
2. Ensure `cmake` and `ninja` are on PATH.
3. From the repository root:
```powershell
# create an out/build directory and configure
mkdir -p out\build\windows
cmake -S . -B out\build\windows -G Ninja -DCOMPILE_WIN32=ON -DCMAKE_BUILD_TYPE=RelWithDebInfo
# build
cmake --build out\build\windows --config RelWithDebInfo
```
Notes
- The project provides `CMakeSettings.json` presets for Visual Studio Code/CMake Tools which enable `COMPILE_WIN32` and use Ninja.
- TUI support is disabled on Windows by design; some utilities that require ncurses will not be available when `COMPILE_WIN32=ON`.
- If you prefer Visual Studio IDE: open the CMake project in Visual Studio, select the provided configuration (see `CMakeSettings.json`) and build.
- If you need OpenSSL for Windows, install a binary distribution and set `-DOPENSSL_ROOT_DIR="C:/path/to/openssl"` when invoking `cmake`.
If you want, run `.uild-windows.ps1` from a Developer PowerShell prompt — it will run the configure+build steps automatically.

@ -115,6 +115,7 @@ namespace network
REPL_ACT_PEER_LIST = 0xA2U, //!< FNE Replication Active Peer List Transfer REPL_ACT_PEER_LIST = 0xA2U, //!< FNE Replication Active Peer List Transfer
REPL_HA_PARAMS = 0xA3U, //!< FNE Replication HA Parameters REPL_HA_PARAMS = 0xA3U, //!< FNE Replication HA Parameters
REPL_PATCH_STATUS = 0xA4U, //!< FNE Replication Patch Status Transfer
NET_TREE_LIST = 0x00U, //!< FNE Network Tree List NET_TREE_LIST = 0x00U, //!< FNE Network Tree List
NET_TREE_DISC = 0x01U //!< FNE Network Tree Disconnect NET_TREE_DISC = 0x01U //!< FNE Network Tree Disconnect

@ -899,6 +899,7 @@ bool HostFNE::createPeerNetworks()
network->setNetTreeDiscCallback(std::bind(&HostFNE::processNetworkTreeDisconnect, this, std::placeholders::_1, std::placeholders::_2)); network->setNetTreeDiscCallback(std::bind(&HostFNE::processNetworkTreeDisconnect, this, std::placeholders::_1, std::placeholders::_2));
network->setNotifyPeerReplicaCallback(std::bind(&HostFNE::processPeerReplicaNotify, this, std::placeholders::_1)); network->setNotifyPeerReplicaCallback(std::bind(&HostFNE::processPeerReplicaNotify, this, std::placeholders::_1));
network->setPatchStatusCallback(std::bind(&HostFNE::processPeerPatchStatus, this, std::placeholders::_1, std::placeholders::_2));
network->enable(enabled); network->enable(enabled);
if (enabled) { if (enabled) {
@ -1194,3 +1195,12 @@ void HostFNE::processPeerReplicaNotify(network::PeerNetwork* peerNetwork)
m_network->setPeerReplica(true); m_network->setPeerReplica(true);
} }
} }
/* Processes peer patch status replication. */
void HostFNE::processPeerPatchStatus(network::PeerNetwork* peerNetwork, json::object obj)
{
if (m_network != nullptr && peerNetwork != nullptr) {
m_network->processReplicatedPatchStatus(peerNetwork->getPeerId(), obj);
}
}

@ -17,6 +17,7 @@
#define __HOST_FNE_H__ #define __HOST_FNE_H__
#include "Defines.h" #include "Defines.h"
#include "common/json/json.h"
#include "common/lookups/RadioIdLookup.h" #include "common/lookups/RadioIdLookup.h"
#include "common/lookups/TalkgroupRulesLookup.h" #include "common/lookups/TalkgroupRulesLookup.h"
#include "common/lookups/PeerListLookup.h" #include "common/lookups/PeerListLookup.h"
@ -265,6 +266,12 @@ private:
* @param peerNetwork Peer network instance. * @param peerNetwork Peer network instance.
*/ */
void processPeerReplicaNotify(network::PeerNetwork* peerNetwork); void processPeerReplicaNotify(network::PeerNetwork* peerNetwork);
/**
* @brief Processes peer patch status replication.
* @param peerNetwork Peer network instance.
* @param obj Patch status JSON payload.
*/
void processPeerPatchStatus(network::PeerNetwork* peerNetwork, json::object obj);
}; };
#endif // __HOST_FNE_H__ #endif // __HOST_FNE_H__

@ -22,6 +22,8 @@ constexpr uint32_t PatchStatusRegistry::MIN_TTL_SECONDS;
constexpr uint32_t PatchStatusRegistry::MAX_TTL_SECONDS; constexpr uint32_t PatchStatusRegistry::MAX_TTL_SECONDS;
constexpr uint32_t PatchStatusRegistry::MAX_WAIT_MS; constexpr uint32_t PatchStatusRegistry::MAX_WAIT_MS;
/* Initializes a new instance of the PatchStatusRegistry class. */
PatchStatusRegistry::PatchStatusRegistry() : PatchStatusRegistry::PatchStatusRegistry() :
m_mutex(), m_mutex(),
m_revisionChanged(), m_revisionChanged(),
@ -34,6 +36,8 @@ PatchStatusRegistry::PatchStatusRegistry() :
/* stub */ /* stub */
} }
/* Configures the accepted TTL range for patch status records. */
void PatchStatusRegistry::configure(uint32_t defaultTtlSeconds, uint32_t minTtlSeconds, uint32_t maxTtlSeconds) void PatchStatusRegistry::configure(uint32_t defaultTtlSeconds, uint32_t minTtlSeconds, uint32_t maxTtlSeconds)
{ {
if (minTtlSeconds == 0U) if (minTtlSeconds == 0U)
@ -47,6 +51,8 @@ void PatchStatusRegistry::configure(uint32_t defaultTtlSeconds, uint32_t minTtlS
m_defaultTtlSeconds = std::max(m_minTtlSeconds, std::min(defaultTtlSeconds, m_maxTtlSeconds)); m_defaultTtlSeconds = std::max(m_minTtlSeconds, std::min(defaultTtlSeconds, m_maxTtlSeconds));
} }
/* Publishes a complete patch snapshot for one console peer. */
bool PatchStatusRegistry::publish(json::object& request, json::object& response, std::string& errorMessage) bool PatchStatusRegistry::publish(json::object& request, json::object& response, std::string& errorMessage)
{ {
if (!request["peerId"].is<uint32_t>()) { if (!request["peerId"].is<uint32_t>()) {
@ -69,6 +75,9 @@ bool PatchStatusRegistry::publish(json::object& request, json::object& response,
if (request["peerName"].is<std::string>()) if (request["peerName"].is<std::string>())
incoming.peerName = request["peerName"].get<std::string>(); incoming.peerName = request["peerName"].get<std::string>();
if (request["originFnePeerId"].is<uint32_t>())
incoming.originFnePeerId = request["originFnePeerId"].get<uint32_t>();
if (request["sequence"].is<uint32_t>()) if (request["sequence"].is<uint32_t>())
incoming.sequence = request["sequence"].get<uint32_t>(); incoming.sequence = request["sequence"].get<uint32_t>();
@ -99,6 +108,14 @@ bool PatchStatusRegistry::publish(json::object& request, json::object& response,
{ {
std::lock_guard<std::mutex> guard(m_mutex); std::lock_guard<std::mutex> guard(m_mutex);
auto existing = m_peerPatches.find(incoming.peerId);
if (existing != m_peerPatches.end() && incoming.sequence > 0U && existing->second.sequence > incoming.sequence) {
response = snapshotLocked();
response["acceptedPeerId"].set<uint32_t>(incoming.peerId);
response["ttlSeconds"].set<uint32_t>(ttlSeconds);
return true;
}
if (incoming.patches.empty()) if (incoming.patches.empty())
m_peerPatches.erase(incoming.peerId); m_peerPatches.erase(incoming.peerId);
else else
@ -114,6 +131,8 @@ bool PatchStatusRegistry::publish(json::object& request, json::object& response,
return true; return true;
} }
/* Removes all patch records associated with a console peer. */
bool PatchStatusRegistry::removePeer(uint32_t peerId) bool PatchStatusRegistry::removePeer(uint32_t peerId)
{ {
if (peerId == 0U) if (peerId == 0U)
@ -133,6 +152,8 @@ bool PatchStatusRegistry::removePeer(uint32_t peerId)
return removed; return removed;
} }
/* Removes expired patch status records. */
uint32_t PatchStatusRegistry::cleanupExpired() uint32_t PatchStatusRegistry::cleanupExpired()
{ {
uint32_t removed = 0U; uint32_t removed = 0U;
@ -162,6 +183,8 @@ uint32_t PatchStatusRegistry::cleanupExpired()
return removed; return removed;
} }
/* Creates a complete JSON snapshot of the registry. */
json::object PatchStatusRegistry::snapshot() json::object PatchStatusRegistry::snapshot()
{ {
cleanupExpired(); cleanupExpired();
@ -170,6 +193,8 @@ json::object PatchStatusRegistry::snapshot()
return snapshotLocked(); return snapshotLocked();
} }
/* Waits for registry changes after the supplied revision. */
json::object PatchStatusRegistry::waitForChanges(uint64_t sinceRevision, uint32_t waitMs) json::object PatchStatusRegistry::waitForChanges(uint64_t sinceRevision, uint32_t waitMs)
{ {
waitMs = std::min(waitMs, MAX_WAIT_MS); waitMs = std::min(waitMs, MAX_WAIT_MS);
@ -185,36 +210,48 @@ json::object PatchStatusRegistry::waitForChanges(uint64_t sinceRevision, uint32_
return snapshotLocked(); return snapshotLocked();
} }
/* Gets the current registry revision. */
uint64_t PatchStatusRegistry::revision() const uint64_t PatchStatusRegistry::revision() const
{ {
std::lock_guard<std::mutex> guard(m_mutex); std::lock_guard<std::mutex> guard(m_mutex);
return m_revision; return m_revision;
} }
/* Gets the configured default patch status TTL. */
uint32_t PatchStatusRegistry::defaultTtlSeconds() const uint32_t PatchStatusRegistry::defaultTtlSeconds() const
{ {
std::lock_guard<std::mutex> guard(m_mutex); std::lock_guard<std::mutex> guard(m_mutex);
return m_defaultTtlSeconds; return m_defaultTtlSeconds;
} }
/* Gets the configured minimum patch status TTL. */
uint32_t PatchStatusRegistry::minTtlSeconds() const uint32_t PatchStatusRegistry::minTtlSeconds() const
{ {
std::lock_guard<std::mutex> guard(m_mutex); std::lock_guard<std::mutex> guard(m_mutex);
return m_minTtlSeconds; return m_minTtlSeconds;
} }
/* Gets the configured maximum patch status TTL. */
uint32_t PatchStatusRegistry::maxTtlSeconds() const uint32_t PatchStatusRegistry::maxTtlSeconds() const
{ {
std::lock_guard<std::mutex> guard(m_mutex); std::lock_guard<std::mutex> guard(m_mutex);
return m_maxTtlSeconds; return m_maxTtlSeconds;
} }
/* Gets the current system time in milliseconds. */
uint64_t PatchStatusRegistry::nowMs() uint64_t PatchStatusRegistry::nowMs()
{ {
return std::chrono::duration_cast<std::chrono::milliseconds>( return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count(); std::chrono::system_clock::now().time_since_epoch()).count();
} }
/* Normalizes a mode string for key generation. */
std::string PatchStatusRegistry::normalizeMode(const std::string& mode) std::string PatchStatusRegistry::normalizeMode(const std::string& mode)
{ {
std::string normalized = mode; std::string normalized = mode;
@ -224,6 +261,8 @@ std::string PatchStatusRegistry::normalizeMode(const std::string& mode)
return normalized; return normalized;
} }
/* Builds a stable talkgroup lookup key for a patch member. */
std::string PatchStatusRegistry::buildTalkgroupKey(const PatchMember& member) std::string PatchStatusRegistry::buildTalkgroupKey(const PatchMember& member)
{ {
std::ostringstream ss; std::ostringstream ss;
@ -231,6 +270,8 @@ std::string PatchStatusRegistry::buildTalkgroupKey(const PatchMember& member)
return ss.str(); return ss.str();
} }
/* Serializes a patch member to JSON. */
json::object PatchStatusRegistry::memberToJson(const PatchMember& member) json::object PatchStatusRegistry::memberToJson(const PatchMember& member)
{ {
json::object obj = json::object(); json::object obj = json::object();
@ -242,6 +283,8 @@ json::object PatchStatusRegistry::memberToJson(const PatchMember& member)
return obj; return obj;
} }
/* Serializes a patch record to JSON. */
json::object PatchStatusRegistry::patchToJson(const PatchRecord& patch) json::object PatchStatusRegistry::patchToJson(const PatchRecord& patch)
{ {
json::object obj = json::object(); json::object obj = json::object();
@ -257,10 +300,13 @@ json::object PatchStatusRegistry::patchToJson(const PatchRecord& patch)
return obj; return obj;
} }
/* Serializes a peer patch snapshot to JSON. */
json::object PatchStatusRegistry::peerSnapshotToJson(const PeerPatchSnapshot& peer) json::object PatchStatusRegistry::peerSnapshotToJson(const PeerPatchSnapshot& peer)
{ {
json::object obj = json::object(); json::object obj = json::object();
obj["peerId"].set<uint32_t>(peer.peerId); obj["peerId"].set<uint32_t>(peer.peerId);
obj["originFnePeerId"].set<uint32_t>(peer.originFnePeerId);
obj["peerName"].set<std::string>(peer.peerName); obj["peerName"].set<std::string>(peer.peerName);
obj["sequence"].set<uint32_t>(peer.sequence); obj["sequence"].set<uint32_t>(peer.sequence);
obj["updatedAt"].set<uint64_t>(peer.updatedAt); obj["updatedAt"].set<uint64_t>(peer.updatedAt);
@ -274,6 +320,8 @@ json::object PatchStatusRegistry::peerSnapshotToJson(const PeerPatchSnapshot& pe
return obj; return obj;
} }
/* Parses one patch record from JSON. */
bool PatchStatusRegistry::parsePatch(json::object& obj, PatchRecord& patch, std::string& errorMessage) const bool PatchStatusRegistry::parsePatch(json::object& obj, PatchRecord& patch, std::string& errorMessage) const
{ {
if (obj["patchId"].is<std::string>()) if (obj["patchId"].is<std::string>())
@ -308,6 +356,8 @@ bool PatchStatusRegistry::parsePatch(json::object& obj, PatchRecord& patch, std:
return true; return true;
} }
/* Parses one patch member from JSON. */
bool PatchStatusRegistry::parseMember(json::object& obj, PatchMember& member, std::string& errorMessage) const bool PatchStatusRegistry::parseMember(json::object& obj, PatchMember& member, std::string& errorMessage) const
{ {
if (obj["system"].is<std::string>()) if (obj["system"].is<std::string>())
@ -343,12 +393,16 @@ bool PatchStatusRegistry::parseMember(json::object& obj, PatchMember& member, st
return true; return true;
} }
/* Clamps a TTL value into the configured TTL range. */
uint32_t PatchStatusRegistry::clampTtl(uint32_t ttlSeconds) const uint32_t PatchStatusRegistry::clampTtl(uint32_t ttlSeconds) const
{ {
std::lock_guard<std::mutex> guard(m_mutex); std::lock_guard<std::mutex> guard(m_mutex);
return std::max(m_minTtlSeconds, std::min(ttlSeconds, m_maxTtlSeconds)); return std::max(m_minTtlSeconds, std::min(ttlSeconds, m_maxTtlSeconds));
} }
/* Creates a registry snapshot while the registry mutex is held. */
json::object PatchStatusRegistry::snapshotLocked() const json::object PatchStatusRegistry::snapshotLocked() const
{ {
json::object response = json::object(); json::object response = json::object();
@ -365,6 +419,7 @@ json::object PatchStatusRegistry::snapshotLocked() const
for (const PatchRecord& patch : peer.patches) { for (const PatchRecord& patch : peer.patches) {
json::object patchObj = patchToJson(patch); json::object patchObj = patchToJson(patch);
patchObj["peerId"].set<uint32_t>(peer.peerId); patchObj["peerId"].set<uint32_t>(peer.peerId);
patchObj["originFnePeerId"].set<uint32_t>(peer.originFnePeerId);
patchObj["peerName"].set<std::string>(peer.peerName); patchObj["peerName"].set<std::string>(peer.peerName);
patchObj["updatedAt"].set<uint64_t>(peer.updatedAt); patchObj["updatedAt"].set<uint64_t>(peer.updatedAt);
patchObj["expiresAt"].set<uint64_t>(peer.expiresAt); patchObj["expiresAt"].set<uint64_t>(peer.expiresAt);
@ -378,6 +433,7 @@ json::object PatchStatusRegistry::snapshotLocked() const
json::object tgPatch = json::object(); json::object tgPatch = json::object();
tgPatch["peerId"].set<uint32_t>(peer.peerId); tgPatch["peerId"].set<uint32_t>(peer.peerId);
tgPatch["originFnePeerId"].set<uint32_t>(peer.originFnePeerId);
tgPatch["peerName"].set<std::string>(peer.peerName); tgPatch["peerName"].set<std::string>(peer.peerName);
tgPatch["patchId"].set<std::string>(patch.patchId); tgPatch["patchId"].set<std::string>(patch.patchId);
tgPatch["active"].set<bool>(patch.active); tgPatch["active"].set<bool>(patch.active);
@ -397,6 +453,8 @@ json::object PatchStatusRegistry::snapshotLocked() const
return response; return response;
} }
/* Advances the registry revision while the registry mutex is held. */
void PatchStatusRegistry::bumpRevisionLocked() void PatchStatusRegistry::bumpRevisionLocked()
{ {
m_revision++; m_revision++;

@ -19,83 +19,220 @@
#include <condition_variable> #include <condition_variable>
#include <cstdint> #include <cstdint>
#include <map>
#include <mutex> #include <mutex>
#include <string> #include <string>
#include <unordered_map>
#include <vector> #include <vector>
/** /**
* @brief In-memory registry for console-advertised patch status. * @brief In-memory registry for console-advertised patch status.
*
* The registry owns all patch status records published by console peers and
* replicated from neighboring FNEs. All access to the backing storage is
* serialized through the registry mutex; callers receive JSON snapshots and
* never receive direct references or iterators into the registry storage.
*
* @ingroup fne * @ingroup fne
*/ */
class HOST_SW_API PatchStatusRegistry { class HOST_SW_API PatchStatusRegistry {
public: public:
/**
* @brief Default number of seconds before a patch status record expires.
*/
static constexpr uint32_t DEFAULT_TTL_SECONDS = 15U; static constexpr uint32_t DEFAULT_TTL_SECONDS = 15U;
/**
* @brief Minimum accepted patch status TTL in seconds.
*/
static constexpr uint32_t MIN_TTL_SECONDS = 5U; static constexpr uint32_t MIN_TTL_SECONDS = 5U;
/**
* @brief Maximum accepted patch status TTL in seconds.
*/
static constexpr uint32_t MAX_TTL_SECONDS = 300U; static constexpr uint32_t MAX_TTL_SECONDS = 300U;
/**
* @brief Maximum wait time for long-poll style change requests.
*/
static constexpr uint32_t MAX_WAIT_MS = 30000U; static constexpr uint32_t MAX_WAIT_MS = 30000U;
/**
* @brief Initializes a new instance of the PatchStatusRegistry class.
*/
PatchStatusRegistry(); PatchStatusRegistry();
/**
* @brief Finalizes an instance of the PatchStatusRegistry class.
*/
~PatchStatusRegistry() = default; ~PatchStatusRegistry() = default;
/**
* @brief Configures the accepted TTL range for patch status records.
* @param defaultTtlSeconds Default TTL used when a publish request omits a TTL.
* @param minTtlSeconds Minimum accepted TTL.
* @param maxTtlSeconds Maximum accepted TTL.
*/
void configure(uint32_t defaultTtlSeconds, uint32_t minTtlSeconds, uint32_t maxTtlSeconds); void configure(uint32_t defaultTtlSeconds, uint32_t minTtlSeconds, uint32_t maxTtlSeconds);
/**
* @brief Publishes a complete patch snapshot for one console peer.
* @param request JSON request containing peerId, peerName, optional sequence, optional ttlSeconds, and patches.
* @param response JSON registry snapshot returned after the publish is applied.
* @param errorMessage Validation error text populated when the request is invalid.
* @returns bool True, if the publish request was valid and applied, otherwise false.
*/
bool publish(json::object& request, json::object& response, std::string& errorMessage); bool publish(json::object& request, json::object& response, std::string& errorMessage);
/**
* @brief Removes all patch records associated with a console peer.
* @param peerId Console peer ID whose records should be removed.
* @returns bool True, if records were removed, otherwise false.
*/
bool removePeer(uint32_t peerId); bool removePeer(uint32_t peerId);
/**
* @brief Removes expired patch status records.
* @returns uint32_t Number of peer records removed.
*/
uint32_t cleanupExpired(); uint32_t cleanupExpired();
/**
* @brief Creates a complete JSON snapshot of the registry.
* @returns json::object Registry snapshot.
*/
json::object snapshot(); json::object snapshot();
/**
* @brief Waits for registry changes after the supplied revision.
* @param sinceRevision Revision already known to the caller.
* @param waitMs Maximum wait time in milliseconds.
* @returns json::object Registry snapshot after change or timeout.
*/
json::object waitForChanges(uint64_t sinceRevision, uint32_t waitMs); json::object waitForChanges(uint64_t sinceRevision, uint32_t waitMs);
/**
* @brief Gets the current registry revision.
* @returns uint64_t Registry revision number.
*/
uint64_t revision() const; uint64_t revision() const;
/**
* @brief Gets the configured default patch status TTL.
* @returns uint32_t Default TTL in seconds.
*/
uint32_t defaultTtlSeconds() const; uint32_t defaultTtlSeconds() const;
/**
* @brief Gets the configured minimum patch status TTL.
* @returns uint32_t Minimum TTL in seconds.
*/
uint32_t minTtlSeconds() const; uint32_t minTtlSeconds() const;
/**
* @brief Gets the configured maximum patch status TTL.
* @returns uint32_t Maximum TTL in seconds.
*/
uint32_t maxTtlSeconds() const; uint32_t maxTtlSeconds() const;
private: private:
/**
* @brief Represents a talkgroup member participating in a patch.
*/
struct PatchMember { struct PatchMember {
std::string system; std::string system; //!< System name for the member.
std::string mode; std::string mode; //!< Digital mode for the member.
uint32_t tgid = 0U; uint32_t tgid = 0U; //!< Talkgroup ID for the member.
uint8_t slot = 0U; uint8_t slot = 0U; //!< Timeslot for the member, if applicable.
}; };
/**
* @brief Represents one active console patch.
*/
struct PatchRecord { struct PatchRecord {
std::string patchId; std::string patchId; //!< Console-defined patch ID.
bool active = true; bool active = true; //!< Flag indicating whether the patch is active.
bool oneWay = false; bool oneWay = false; //!< Flag indicating whether the patch is one-way.
std::vector<PatchMember> members; std::vector<PatchMember> members; //!< Talkgroup members in the patch.
}; };
/**
* @brief Represents a console peer's complete patch status snapshot.
*/
struct PeerPatchSnapshot { struct PeerPatchSnapshot {
uint32_t peerId = 0U; uint32_t peerId = 0U; //!< Console peer ID.
std::string peerName; uint32_t originFnePeerId = 0U; //!< FNE peer ID where this status originated.
uint32_t sequence = 0U; std::string peerName; //!< Console peer display name.
uint64_t updatedAt = 0U; uint32_t sequence = 0U; //!< Console-defined sequence number.
uint64_t expiresAt = 0U; uint64_t updatedAt = 0U; //!< Time this snapshot was accepted.
std::vector<PatchRecord> patches; uint64_t expiresAt = 0U; //!< Time this snapshot expires.
std::vector<PatchRecord> patches; //!< Complete patch list for this console peer.
}; };
/**
* @brief Gets the current system time in milliseconds.
* @returns uint64_t Current time in milliseconds.
*/
static uint64_t nowMs(); static uint64_t nowMs();
/**
* @brief Normalizes a mode string for key generation.
* @param mode Mode string.
* @returns std::string Normalized mode string.
*/
static std::string normalizeMode(const std::string& mode); static std::string normalizeMode(const std::string& mode);
/**
* @brief Builds a stable talkgroup lookup key for a patch member.
* @param member Patch member.
* @returns std::string Talkgroup key.
*/
static std::string buildTalkgroupKey(const PatchMember& member); static std::string buildTalkgroupKey(const PatchMember& member);
/**
* @brief Serializes a patch member to JSON.
* @param member Patch member.
* @returns json::object JSON patch member.
*/
static json::object memberToJson(const PatchMember& member); static json::object memberToJson(const PatchMember& member);
/**
* @brief Serializes a patch record to JSON.
* @param patch Patch record.
* @returns json::object JSON patch record.
*/
static json::object patchToJson(const PatchRecord& patch); static json::object patchToJson(const PatchRecord& patch);
/**
* @brief Serializes a peer patch snapshot to JSON.
* @param peer Peer patch snapshot.
* @returns json::object JSON peer patch snapshot.
*/
static json::object peerSnapshotToJson(const PeerPatchSnapshot& peer); static json::object peerSnapshotToJson(const PeerPatchSnapshot& peer);
/**
* @brief Parses one patch record from JSON.
* @param obj JSON patch object.
* @param patch Parsed patch record.
* @param errorMessage Validation error text.
* @returns bool True, if parsed successfully, otherwise false.
*/
bool parsePatch(json::object& obj, PatchRecord& patch, std::string& errorMessage) const; bool parsePatch(json::object& obj, PatchRecord& patch, std::string& errorMessage) const;
/**
* @brief Parses one patch member from JSON.
* @param obj JSON patch member object.
* @param member Parsed patch member.
* @param errorMessage Validation error text.
* @returns bool True, if parsed successfully, otherwise false.
*/
bool parseMember(json::object& obj, PatchMember& member, std::string& errorMessage) const; bool parseMember(json::object& obj, PatchMember& member, std::string& errorMessage) const;
/**
* @brief Clamps a TTL value into the configured TTL range.
* @param ttlSeconds TTL in seconds.
* @returns uint32_t Clamped TTL in seconds.
*/
uint32_t clampTtl(uint32_t ttlSeconds) const; uint32_t clampTtl(uint32_t ttlSeconds) const;
/**
* @brief Creates a registry snapshot while the registry mutex is held.
* @returns json::object Registry snapshot.
*/
json::object snapshotLocked() const; json::object snapshotLocked() const;
/**
* @brief Advances the registry revision while the registry mutex is held.
*/
void bumpRevisionLocked(); void bumpRevisionLocked();
mutable std::mutex m_mutex; mutable std::mutex m_mutex; //!< Mutex guarding registry state.
std::condition_variable m_revisionChanged; std::condition_variable m_revisionChanged; //!< Condition variable signaled when revision changes.
std::unordered_map<uint32_t, PeerPatchSnapshot> m_peerPatches; std::map<uint32_t, PeerPatchSnapshot> m_peerPatches; //!< Peer patch snapshots keyed by console peer ID.
uint64_t m_revision; uint64_t m_revision; //!< Monotonic registry revision.
uint32_t m_defaultTtlSeconds; uint32_t m_defaultTtlSeconds; //!< Default TTL in seconds.
uint32_t m_minTtlSeconds; uint32_t m_minTtlSeconds; //!< Minimum TTL in seconds.
uint32_t m_maxTtlSeconds; uint32_t m_maxTtlSeconds; //!< Maximum TTL in seconds.
}; };
#endif // __FNE_PATCH_STATUS_REGISTRY_H__ #endif // __FNE_PATCH_STATUS_REGISTRY_H__

@ -419,6 +419,7 @@ void MetadataNetwork::taskNetworkRx(NetPacketRequest* req)
// The authenticated peer identity is authoritative; do not allow spoofed peer IDs. // The authenticated peer identity is authoritative; do not allow spoofed peer IDs.
reqObj["peerId"].set<uint32_t>(pktPeerId); reqObj["peerId"].set<uint32_t>(pktPeerId);
reqObj["originFnePeerId"].set<uint32_t>(network->m_peerId);
if (!reqObj["peerName"].is<std::string>() || reqObj["peerName"].get<std::string>().empty()) if (!reqObj["peerName"].is<std::string>() || reqObj["peerName"].get<std::string>().empty())
reqObj["peerName"].set<std::string>(connection->identity()); reqObj["peerName"].set<std::string>(connection->identity());
@ -431,6 +432,7 @@ void MetadataNetwork::taskNetworkRx(NetPacketRequest* req)
} }
network->writePatchStatusToConsoles(response); network->writePatchStatusToConsoles(response);
network->replicatePatchStatus(reqObj);
} }
else { else {
network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED); network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED);
@ -633,6 +635,89 @@ void MetadataNetwork::taskNetworkRx(NetPacketRequest* req)
} }
} }
} }
else if (req->fneHeader.getSubFunction() == NET_SUBFUNC::REPL_PATCH_STATUS) { // Peer Replication Patch Status
if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
FNEPeerConnection* connection = network->m_peers[peerId];
if (connection != nullptr) {
std::string ip = udp::Socket::address(req->address);
// validate peer (simple validation really)
if (connection->connected() && connection->address() == ip && connection->peerClass() == PEER_CONN_CLASS_NEIGHBOR &&
connection->isReplica()) {
DECLARE_UINT8_ARRAY(rawPayload, req->length);
::memcpy(rawPayload, req->buffer, req->length);
if (mdNetwork->m_peerPatchStatusPkt.find(peerId) == mdNetwork->m_peerPatchStatusPkt.end()) {
mdNetwork->m_peerPatchStatusPkt.insert(peerId, MetadataNetwork::PacketBufferEntry());
MetadataNetwork::PacketBufferEntry& pkt = mdNetwork->m_peerPatchStatusPkt[peerId];
pkt.buffer = new PacketBuffer(true, "Peer Replication, Patch Status");
pkt.streamId = streamId;
pkt.locked = false;
} else {
MetadataNetwork::PacketBufferEntry& pkt = mdNetwork->m_peerPatchStatusPkt[peerId];
if (!pkt.locked && pkt.streamId != streamId) {
LogError(LOG_REPL, "PEER %u (%s) Peer Replication, Patch Status, stream ID mismatch, expected %u, got %u", peerId,
connection->identWithQualifier().c_str(), pkt.streamId, streamId);
pkt.buffer->clear();
pkt.streamId = streamId;
}
if (pkt.streamId != streamId) {
// otherwise drop the packet
break;
}
}
MetadataNetwork::PacketBufferEntry& pkt = mdNetwork->m_peerPatchStatusPkt[peerId];
if (pkt.locked) {
while (pkt.locked)
Thread::sleep(1U);
}
pkt.locked = true;
uint32_t decompressedLen = 0U;
uint8_t* decompressed = nullptr;
if (pkt.buffer->decode(rawPayload, &decompressed, &decompressedLen)) {
mdNetwork->m_peerPatchStatusPkt.lock();
std::string payload(decompressed + 8U, decompressed + decompressedLen);
json::value v;
std::string err = json::parse(v, payload);
if (!err.empty() || !v.is<json::object>()) {
LogError(LOG_REPL, "PEER %u (%s) error parsing patch status replication, %s", peerId, connection->identWithQualifier().c_str(), err.c_str());
pkt.buffer->clear();
pkt.streamId = 0U;
if (decompressed != nullptr) {
delete[] decompressed;
}
mdNetwork->m_peerPatchStatusPkt.unlock();
mdNetwork->m_peerPatchStatusPkt.erase(peerId);
break;
}
network->processReplicatedPatchStatus(peerId, v.get<json::object>());
pkt.buffer->clear();
delete pkt.buffer;
pkt.streamId = 0U;
if (decompressed != nullptr) {
delete[] decompressed;
}
mdNetwork->m_peerPatchStatusPkt.unlock();
mdNetwork->m_peerPatchStatusPkt.erase(peerId);
} else {
pkt.locked = false;
}
} else {
network->writePeerNAK(peerId, 0U, TAG_PEER_REPLICA, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
}
break; break;
case NET_FUNC::NET_TREE: case NET_FUNC::NET_TREE:

@ -117,6 +117,7 @@ namespace network
bool locked; bool locked;
}; };
concurrent::unordered_map<uint32_t, PacketBufferEntry> m_peerReplicaActPkt; concurrent::unordered_map<uint32_t, PacketBufferEntry> m_peerReplicaActPkt;
concurrent::unordered_map<uint32_t, PacketBufferEntry> m_peerPatchStatusPkt;
concurrent::unordered_map<uint32_t, PacketBufferEntry> m_peerTreeListPkt; concurrent::unordered_map<uint32_t, PacketBufferEntry> m_peerTreeListPkt;
ThreadPool m_threadPool; ThreadPool m_threadPool;

@ -48,6 +48,7 @@ PeerNetwork::PeerNetwork(const std::string& address, uint16_t port, uint16_t loc
m_analogCallback(nullptr), m_analogCallback(nullptr),
m_netTreeDiscCallback(nullptr), m_netTreeDiscCallback(nullptr),
m_peerReplicaCallback(nullptr), m_peerReplicaCallback(nullptr),
m_patchStatusCallback(nullptr),
m_masterPeerId(0U), m_masterPeerId(0U),
m_pidLookup(nullptr), m_pidLookup(nullptr),
m_peerReplica(false), m_peerReplica(false),
@ -55,6 +56,7 @@ PeerNetwork::PeerNetwork(const std::string& address, uint16_t port, uint16_t loc
m_tgidPkt(true, "Peer Replication, TGID List"), m_tgidPkt(true, "Peer Replication, TGID List"),
m_ridPkt(true, "Peer Replication, RID List"), m_ridPkt(true, "Peer Replication, RID List"),
m_pidPkt(true, "Peer Replication, PID List"), m_pidPkt(true, "Peer Replication, PID List"),
m_patchStatusPkt(true, "Peer Replication, Patch Status"),
m_threadPool(WORKER_CNT, "peer"), m_threadPool(WORKER_CNT, "peer"),
m_prevSpanningTreeChildren(0U), m_prevSpanningTreeChildren(0U),
m_nakFallOver(false), m_nakFallOver(false),
@ -233,6 +235,40 @@ bool PeerNetwork::writeHAParams(std::vector<HAParameters>& haParams)
return false; return false;
} }
/* Writes a complete console patch status update upstream. */
bool PeerNetwork::writePatchStatus(json::object obj)
{
if (!m_peerReplica)
return false;
obj["type"].set<std::string>("publish");
json::value v = json::value(obj);
std::string json = std::string(v.serialize());
size_t len = json.length() + 9U;
DECLARE_CHAR_ARRAY(buffer, len);
::memcpy(buffer + 0U, TAG_PEER_REPLICA, 4U);
::snprintf(buffer + 8U, json.length() + 1U, "%s", json.c_str());
PacketBuffer pkt(true, "Peer Replication, Patch Status");
pkt.encode((uint8_t*)buffer, len);
uint32_t streamId = createStreamId();
LogInfoEx(LOG_REPL, "PEER %u Peer Replication, Patch Status, blocks %u, streamId = %u", m_peerId, pkt.fragments.size(), streamId);
if (pkt.fragments.size() > 0U) {
for (auto frag : pkt.fragments) {
writeMaster({ NET_FUNC::REPL, NET_SUBFUNC::REPL_PATCH_STATUS },
frag.second->data, FRAG_SIZE, RTP_END_OF_CALL_SEQ, streamId, true);
Thread::sleep(60U); // pace block transmission
}
}
pkt.clear();
return true;
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Protected Class Members // Protected Class Members
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -462,6 +498,32 @@ void PeerNetwork::userPacketHandler(uint32_t peerId, FrameQueue::OpcodePair opco
} }
break; break;
case NET_SUBFUNC::REPL_PATCH_STATUS: // Patch Status
{
uint32_t decompressedLen = 0U;
uint8_t* decompressed = nullptr;
if (m_patchStatusPkt.decode(data, &decompressed, &decompressedLen)) {
std::string payload(decompressed + 8U, decompressed + decompressedLen);
json::value v;
std::string err = json::parse(v, payload);
if (!err.empty() || !v.is<json::object>()) {
LogError(LOG_PEER, "PEER %u error parsing patch status replication, %s", m_peerId, err.c_str());
m_patchStatusPkt.clear();
delete[] decompressed;
break;
}
if (m_patchStatusCallback != nullptr)
m_patchStatusCallback(this, v.get<json::object>());
m_patchStatusPkt.clear();
delete[] decompressed;
}
}
break;
default: default:
break; break;
} }

@ -17,6 +17,7 @@
#define __PEER_NETWORK_H__ #define __PEER_NETWORK_H__
#include "Defines.h" #include "Defines.h"
#include "common/json/json.h"
#include "common/lookups/PeerListLookup.h" #include "common/lookups/PeerListLookup.h"
#include "common/network/Network.h" #include "common/network/Network.h"
#include "common/network/PacketBuffer.h" #include "common/network/PacketBuffer.h"
@ -138,9 +139,14 @@ namespace network
void setNetTreeDiscCallback(std::function<void(PeerNetwork*, const uint32_t offendingPeerId)>&& callback) { m_netTreeDiscCallback = callback; } void setNetTreeDiscCallback(std::function<void(PeerNetwork*, const uint32_t offendingPeerId)>&& callback) { m_netTreeDiscCallback = callback; }
/** /**
* @brief Helper to set the peer replica notification callback. * @brief Helper to set the peer replica notification callback.
* @param callback * @param callback
*/ */
void setNotifyPeerReplicaCallback(std::function<void(PeerNetwork*)>&& callback) { m_peerReplicaCallback = callback; } void setNotifyPeerReplicaCallback(std::function<void(PeerNetwork*)>&& callback) { m_peerReplicaCallback = callback; }
/**
* @brief Helper to set the peer patch status callback.
* @param callback
*/
void setPatchStatusCallback(std::function<void(PeerNetwork*, json::object)>&& callback) { m_patchStatusCallback = callback; }
/** /**
* @brief Writes a complete update of this CFNE's active peer list to the network. * @brief Writes a complete update of this CFNE's active peer list to the network.
@ -235,6 +241,26 @@ namespace network
* @returns bool True, if list was sent, otherwise false. * @returns bool True, if list was sent, otherwise false.
*/ */
bool writeHAParams(std::vector<HAParameters>& haParams); bool writeHAParams(std::vector<HAParameters>& haParams);
/**
* @brief Writes a complete console patch status update upstream.
* \code{.unparsed}
* The patch status replication message is a JSON body, and is a packet
* buffer compressed message.
*
* {
* "type": "publish",
* "peerId": <Console Peer ID>,
* "originFnePeerId": <Originating FNE Peer ID>,
* "peerName": "<Console Peer Name>",
* "sequence": <Console Sequence>,
* "ttlSeconds": <TTL Seconds>,
* "patches": []
* }
* \endcode
* @param obj JSON patch status publish payload.
* @returns bool True, if patch status was sent, otherwise false.
*/
bool writePatchStatus(json::object obj);
/** /**
* @brief Returns flag indicating whether or not this peer connection is peer replication enabled. * @brief Returns flag indicating whether or not this peer connection is peer replication enabled.
@ -298,6 +324,10 @@ namespace network
* @brief Peer Replica Notification Callback. * @brief Peer Replica Notification Callback.
*/ */
std::function<void(PeerNetwork* peer)> m_peerReplicaCallback; std::function<void(PeerNetwork* peer)> m_peerReplicaCallback;
/**
* @brief Peer patch status callback.
*/
std::function<void(PeerNetwork* peer, json::object obj)> m_patchStatusCallback;
/** /**
* @brief User overrideable handler that allows user code to process network packets not handled by this class. * @brief User overrideable handler that allows user code to process network packets not handled by this class.
@ -336,6 +366,7 @@ namespace network
PacketBuffer m_tgidPkt; PacketBuffer m_tgidPkt;
PacketBuffer m_ridPkt; PacketBuffer m_ridPkt;
PacketBuffer m_pidPkt; PacketBuffer m_pidPkt;
PacketBuffer m_patchStatusPkt;
ThreadPool m_threadPool; ThreadPool m_threadPool;

@ -532,6 +532,27 @@ void TrafficNetwork::processNetworkTreeDisconnect(uint32_t peerId, uint32_t offe
} }
} }
/* Processes a replicated console patch status update. */
void TrafficNetwork::processReplicatedPatchStatus(uint32_t peerId, json::object obj)
{
if (!m_patchStatusEnabled)
return;
if (!obj["peerId"].is<uint32_t>() || !obj["patches"].is<json::array>())
return;
json::object response = json::object();
std::string errorMessage;
if (!m_patchStatusRegistry.publish(obj, response, errorMessage)) {
LogWarning(LOG_MASTER, "PEER %u invalid replicated patch status payload, %s", peerId, errorMessage.c_str());
return;
}
writePatchStatusToConsoles(response);
replicatePatchStatus(obj, peerId);
}
/* Helper to process an downstream peer In-Call Control message. */ /* Helper to process an downstream peer In-Call Control message. */
void TrafficNetwork::processDownstreamInCallCtrl(network::NET_ICC::ENUM command, network::NET_SUBFUNC::ENUM subFunc, uint32_t dstId, void TrafficNetwork::processDownstreamInCallCtrl(network::NET_ICC::ENUM command, network::NET_SUBFUNC::ENUM subFunc, uint32_t dstId,
@ -2324,8 +2345,18 @@ void TrafficNetwork::erasePeer(uint32_t peerId)
} }
// erase any console patch status records for this peer // erase any console patch status records for this peer
if (m_patchStatusEnabled && m_patchStatusRegistry.removePeer(peerId)) if (m_patchStatusEnabled && m_patchStatusRegistry.removePeer(peerId)) {
json::object clearPatchStatus = json::object();
uint32_t originFnePeerId = m_peerId;
uint32_t ttlSeconds = m_patchStatusRegistry.defaultTtlSeconds();
clearPatchStatus["type"].set<std::string>("publish");
clearPatchStatus["peerId"].set<uint32_t>(peerId);
clearPatchStatus["originFnePeerId"].set<uint32_t>(originFnePeerId);
clearPatchStatus["ttlSeconds"].set<uint32_t>(ttlSeconds);
clearPatchStatus["patches"].set<json::array>(json::array());
writePatchStatusToConsoles(m_patchStatusRegistry.snapshot()); writePatchStatusToConsoles(m_patchStatusRegistry.snapshot());
replicatePatchStatus(clearPatchStatus);
}
// erase any HA parameters for this peer // erase any HA parameters for this peer
{ {
@ -2476,6 +2507,38 @@ void TrafficNetwork::writePatchStatusToConsoles(json::object obj, uint32_t excep
m_peers.shared_unlock(); m_peers.shared_unlock();
} }
/* Helper to replicate patch status state to neighboring FNE peers. */
void TrafficNetwork::replicatePatchStatus(json::object obj, uint32_t exceptPeerId)
{
if (!m_patchStatusEnabled)
return;
obj["type"].set<std::string>("publish");
if (m_host->m_peerNetworks.size() > 0U) {
for (auto peer : m_host->m_peerNetworks) {
if (peer.first == exceptPeerId)
continue;
if (peer.second != nullptr && peer.second->isEnabled() && peer.second->isReplica())
peer.second->writePatchStatus(obj);
}
}
m_peers.shared_lock();
for (auto peer : m_peers) {
if (peer.first == exceptPeerId)
continue;
if (peer.second == nullptr)
continue;
if (!peer.second->connected() || peer.second->peerClass() != PEER_CONN_CLASS_NEIGHBOR || !peer.second->isReplica())
continue;
writePatchStatusReplicationPayload(peer.second, obj);
}
m_peers.shared_unlock();
}
/* Helper to serialize and queue a patch status transfer payload. */ /* Helper to serialize and queue a patch status transfer payload. */
bool TrafficNetwork::writePatchStatusPayload(FNEPeerConnection* connection, json::object obj) bool TrafficNetwork::writePatchStatusPayload(FNEPeerConnection* connection, json::object obj)
@ -2513,6 +2576,47 @@ bool TrafficNetwork::writePatchStatusPayload(FNEPeerConnection* connection, json
{ NET_FUNC::TRANSFER, NET_SUBFUNC::TRANSFER_SUBFUNC_PATCH_STATUS }, RTP_END_OF_CALL_SEQ, addr, addrLen); { NET_FUNC::TRANSFER, NET_SUBFUNC::TRANSFER_SUBFUNC_PATCH_STATUS }, RTP_END_OF_CALL_SEQ, addr, addrLen);
} }
/* Helper to serialize and queue a patch status replication payload. */
bool TrafficNetwork::writePatchStatusReplicationPayload(FNEPeerConnection* connection, json::object obj)
{
if (connection == nullptr)
return false;
if (!m_patchStatusEnabled)
return false;
if (!connection->connected())
return false;
if (connection->peerClass() != PEER_CONN_CLASS_NEIGHBOR || !connection->isReplica())
return false;
obj["type"].set<std::string>("publish");
json::value v = json::value(obj);
std::string json = std::string(v.serialize());
size_t len = json.length() + 9U;
DECLARE_CHAR_ARRAY(buffer, len);
::memcpy(buffer + 0U, TAG_PEER_REPLICA, 4U);
::snprintf(buffer + 8U, json.length() + 1U, "%s", json.c_str());
PacketBuffer pkt(true, "Peer Replication, Patch Status");
pkt.encode((uint8_t*)buffer, len);
uint32_t streamId = createStreamId();
LogInfoEx(LOG_REPL, "PEER %u (%s) Peer Replication, Patch Status, blocks %u, streamId = %u", connection->id(),
connection->identWithQualifier().c_str(), pkt.fragments.size(), streamId);
if (pkt.fragments.size() > 0U) {
for (auto frag : pkt.fragments) {
writePeer(connection->id(), m_peerId, { NET_FUNC::REPL, NET_SUBFUNC::REPL_PATCH_STATUS },
frag.second->data, FRAG_SIZE, RTP_END_OF_CALL_SEQ, streamId);
Thread::sleep(60U); // pace block transmission
}
}
pkt.clear();
return true;
}
/* Helper to reset a peer connection. */ /* Helper to reset a peer connection. */
bool TrafficNetwork::resetPeer(uint32_t peerId) bool TrafficNetwork::resetPeer(uint32_t peerId)

@ -237,6 +237,12 @@ namespace network
* @param offendingPeerId Offending Peer ID. * @param offendingPeerId Offending Peer ID.
*/ */
void processNetworkTreeDisconnect(uint32_t peerId, uint32_t offendingPeerId); void processNetworkTreeDisconnect(uint32_t peerId, uint32_t offendingPeerId);
/**
* @brief Processes a replicated console patch status update.
* @param peerId Peer ID that delivered the replication update.
* @param obj Patch status JSON payload.
*/
void processReplicatedPatchStatus(uint32_t peerId, json::object obj);
/** /**
* @brief Helper to process an downstream peer In-Call Control message. * @brief Helper to process an downstream peer In-Call Control message.
@ -298,6 +304,12 @@ namespace network
* @param exceptPeerId Optional peer ID to skip. * @param exceptPeerId Optional peer ID to skip.
*/ */
void writePatchStatusToConsoles(json::object obj, uint32_t exceptPeerId = 0U); void writePatchStatusToConsoles(json::object obj, uint32_t exceptPeerId = 0U);
/**
* @brief Replicates patch status state to neighboring FNE peers.
* @param obj Patch status JSON payload.
* @param exceptPeerId Optional peer ID to skip.
*/
void replicatePatchStatus(json::object obj, uint32_t exceptPeerId = 0U);
/** /**
* @brief Helper to reset a peer connection. * @brief Helper to reset a peer connection.
@ -827,6 +839,13 @@ namespace network
* @returns bool True, if message was queued, otherwise false. * @returns bool True, if message was queued, otherwise false.
*/ */
bool writePatchStatusPayload(FNEPeerConnection* connection, json::object obj); bool writePatchStatusPayload(FNEPeerConnection* connection, json::object obj);
/**
* @brief Serializes and queues a patch status replication payload.
* @param connection Destination neighbor connection.
* @param obj Patch status JSON payload.
* @returns bool True, if message was queued, otherwise false.
*/
bool writePatchStatusReplicationPayload(FNEPeerConnection* connection, json::object obj);
/* /*
** Internal KMM Callback. ** Internal KMM Callback.

Loading…
Cancel
Save

Powered by TurnKey Linux.