diff --git a/build-windows.ps1 b/build-windows.ps1 new file mode 100644 index 00000000..e41ac4c1 --- /dev/null +++ b/build-windows.ps1 @@ -0,0 +1,23 @@ +param( + [string]$Generator = "Ninja", + [string]$BuildDir = "out\\build\\windows", + [string]$Config = "RelWithDebInfo" +) + +function Abort($msg) { + Write-Error $msg + exit 1 +} + +if (-not (Get-Command cmake -ErrorAction SilentlyContinue)) { Abort "cmake not found on PATH. Install CMake and retry." } +if ($Generator -eq "Ninja" -and -not (Get-Command ninja -ErrorAction SilentlyContinue)) { Abort "ninja not found on PATH. Install Ninja and retry." } + +Write-Host "Configuring (Generator=$Generator, BuildDir=$BuildDir, Config=$Config)" +cmake -S . -B $BuildDir -G $Generator -DCOMPILE_WIN32=ON -DCMAKE_BUILD_TYPE=$Config +if ($LASTEXITCODE -ne 0) { Abort "CMake configure failed." } + +Write-Host "Building" +cmake --build $BuildDir --config $Config -- -v +if ($LASTEXITCODE -ne 0) { Abort "Build failed." } + +Write-Host "Build finished. Artifacts are in: $BuildDir" \ No newline at end of file diff --git a/docs/WINDOWS_BUILD.md b/docs/WINDOWS_BUILD.md new file mode 100644 index 00000000..5fc00413 --- /dev/null +++ b/docs/WINDOWS_BUILD.md @@ -0,0 +1,30 @@ +Windows build prerequisites and steps for dvmhost + +Prerequisites +- Visual Studio 2019/2022 (Desktop development with C++ workload) or at least the MSVC build tools. +- CMake 3.16 or newer +- Ninja (recommended generator) +- Git +- (Optional) OpenSSL for Windows if you need SSL; not required by default when building with `-DCOMPILE_WIN32=ON`. + +Quick steps +1. Open a Developer command prompt (e.g. "x64 Native Tools Command Prompt for VS 2022") or run the MSVC environment so compilers are on PATH. +2. Ensure `cmake` and `ninja` are on PATH. +3. From the repository root: + +```powershell +# create an out/build directory and configure +mkdir -p out\build\windows +cmake -S . -B out\build\windows -G Ninja -DCOMPILE_WIN32=ON -DCMAKE_BUILD_TYPE=RelWithDebInfo + +# build +cmake --build out\build\windows --config RelWithDebInfo +``` + +Notes +- The project provides `CMakeSettings.json` presets for Visual Studio Code/CMake Tools which enable `COMPILE_WIN32` and use Ninja. +- TUI support is disabled on Windows by design; some utilities that require ncurses will not be available when `COMPILE_WIN32=ON`. +- If you prefer Visual Studio IDE: open the CMake project in Visual Studio, select the provided configuration (see `CMakeSettings.json`) and build. +- If you need OpenSSL for Windows, install a binary distribution and set `-DOPENSSL_ROOT_DIR="C:/path/to/openssl"` when invoking `cmake`. + +If you want, run `.uild-windows.ps1` from a Developer PowerShell prompt — it will run the configure+build steps automatically. \ No newline at end of file diff --git a/src/common/network/RTPFNEHeader.h b/src/common/network/RTPFNEHeader.h index 9e52e8b8..67563259 100644 --- a/src/common/network/RTPFNEHeader.h +++ b/src/common/network/RTPFNEHeader.h @@ -115,6 +115,7 @@ namespace network REPL_ACT_PEER_LIST = 0xA2U, //!< FNE Replication Active Peer List Transfer REPL_HA_PARAMS = 0xA3U, //!< FNE Replication HA Parameters + REPL_PATCH_STATUS = 0xA4U, //!< FNE Replication Patch Status Transfer NET_TREE_LIST = 0x00U, //!< FNE Network Tree List NET_TREE_DISC = 0x01U //!< FNE Network Tree Disconnect diff --git a/src/fne/HostFNE.cpp b/src/fne/HostFNE.cpp index cb85d22f..52f969d5 100644 --- a/src/fne/HostFNE.cpp +++ b/src/fne/HostFNE.cpp @@ -899,6 +899,7 @@ bool HostFNE::createPeerNetworks() network->setNetTreeDiscCallback(std::bind(&HostFNE::processNetworkTreeDisconnect, this, std::placeholders::_1, std::placeholders::_2)); network->setNotifyPeerReplicaCallback(std::bind(&HostFNE::processPeerReplicaNotify, this, std::placeholders::_1)); + network->setPatchStatusCallback(std::bind(&HostFNE::processPeerPatchStatus, this, std::placeholders::_1, std::placeholders::_2)); network->enable(enabled); if (enabled) { @@ -1194,3 +1195,12 @@ void HostFNE::processPeerReplicaNotify(network::PeerNetwork* peerNetwork) m_network->setPeerReplica(true); } } + +/* Processes peer patch status replication. */ + +void HostFNE::processPeerPatchStatus(network::PeerNetwork* peerNetwork, json::object obj) +{ + if (m_network != nullptr && peerNetwork != nullptr) { + m_network->processReplicatedPatchStatus(peerNetwork->getPeerId(), obj); + } +} diff --git a/src/fne/HostFNE.h b/src/fne/HostFNE.h index 83d38a4e..ef9d706b 100644 --- a/src/fne/HostFNE.h +++ b/src/fne/HostFNE.h @@ -17,6 +17,7 @@ #define __HOST_FNE_H__ #include "Defines.h" +#include "common/json/json.h" #include "common/lookups/RadioIdLookup.h" #include "common/lookups/TalkgroupRulesLookup.h" #include "common/lookups/PeerListLookup.h" @@ -265,6 +266,12 @@ private: * @param peerNetwork Peer network instance. */ void processPeerReplicaNotify(network::PeerNetwork* peerNetwork); + /** + * @brief Processes peer patch status replication. + * @param peerNetwork Peer network instance. + * @param obj Patch status JSON payload. + */ + void processPeerPatchStatus(network::PeerNetwork* peerNetwork, json::object obj); }; #endif // __HOST_FNE_H__ diff --git a/src/fne/PatchStatusRegistry.cpp b/src/fne/PatchStatusRegistry.cpp index 132ef484..eb5870dc 100644 --- a/src/fne/PatchStatusRegistry.cpp +++ b/src/fne/PatchStatusRegistry.cpp @@ -22,6 +22,8 @@ constexpr uint32_t PatchStatusRegistry::MIN_TTL_SECONDS; constexpr uint32_t PatchStatusRegistry::MAX_TTL_SECONDS; constexpr uint32_t PatchStatusRegistry::MAX_WAIT_MS; +/* Initializes a new instance of the PatchStatusRegistry class. */ + PatchStatusRegistry::PatchStatusRegistry() : m_mutex(), m_revisionChanged(), @@ -34,6 +36,8 @@ PatchStatusRegistry::PatchStatusRegistry() : /* stub */ } +/* Configures the accepted TTL range for patch status records. */ + void PatchStatusRegistry::configure(uint32_t defaultTtlSeconds, uint32_t minTtlSeconds, uint32_t maxTtlSeconds) { if (minTtlSeconds == 0U) @@ -47,6 +51,8 @@ void PatchStatusRegistry::configure(uint32_t defaultTtlSeconds, uint32_t minTtlS m_defaultTtlSeconds = std::max(m_minTtlSeconds, std::min(defaultTtlSeconds, m_maxTtlSeconds)); } +/* Publishes a complete patch snapshot for one console peer. */ + bool PatchStatusRegistry::publish(json::object& request, json::object& response, std::string& errorMessage) { if (!request["peerId"].is()) { @@ -69,6 +75,9 @@ bool PatchStatusRegistry::publish(json::object& request, json::object& response, if (request["peerName"].is()) incoming.peerName = request["peerName"].get(); + if (request["originFnePeerId"].is()) + incoming.originFnePeerId = request["originFnePeerId"].get(); + if (request["sequence"].is()) incoming.sequence = request["sequence"].get(); @@ -99,6 +108,14 @@ bool PatchStatusRegistry::publish(json::object& request, json::object& response, { std::lock_guard guard(m_mutex); + auto existing = m_peerPatches.find(incoming.peerId); + if (existing != m_peerPatches.end() && incoming.sequence > 0U && existing->second.sequence > incoming.sequence) { + response = snapshotLocked(); + response["acceptedPeerId"].set(incoming.peerId); + response["ttlSeconds"].set(ttlSeconds); + return true; + } + if (incoming.patches.empty()) m_peerPatches.erase(incoming.peerId); else @@ -114,6 +131,8 @@ bool PatchStatusRegistry::publish(json::object& request, json::object& response, return true; } +/* Removes all patch records associated with a console peer. */ + bool PatchStatusRegistry::removePeer(uint32_t peerId) { if (peerId == 0U) @@ -133,6 +152,8 @@ bool PatchStatusRegistry::removePeer(uint32_t peerId) return removed; } +/* Removes expired patch status records. */ + uint32_t PatchStatusRegistry::cleanupExpired() { uint32_t removed = 0U; @@ -162,6 +183,8 @@ uint32_t PatchStatusRegistry::cleanupExpired() return removed; } +/* Creates a complete JSON snapshot of the registry. */ + json::object PatchStatusRegistry::snapshot() { cleanupExpired(); @@ -170,6 +193,8 @@ json::object PatchStatusRegistry::snapshot() return snapshotLocked(); } +/* Waits for registry changes after the supplied revision. */ + json::object PatchStatusRegistry::waitForChanges(uint64_t sinceRevision, uint32_t waitMs) { waitMs = std::min(waitMs, MAX_WAIT_MS); @@ -185,36 +210,48 @@ json::object PatchStatusRegistry::waitForChanges(uint64_t sinceRevision, uint32_ return snapshotLocked(); } +/* Gets the current registry revision. */ + uint64_t PatchStatusRegistry::revision() const { std::lock_guard guard(m_mutex); return m_revision; } +/* Gets the configured default patch status TTL. */ + uint32_t PatchStatusRegistry::defaultTtlSeconds() const { std::lock_guard guard(m_mutex); return m_defaultTtlSeconds; } +/* Gets the configured minimum patch status TTL. */ + uint32_t PatchStatusRegistry::minTtlSeconds() const { std::lock_guard guard(m_mutex); return m_minTtlSeconds; } +/* Gets the configured maximum patch status TTL. */ + uint32_t PatchStatusRegistry::maxTtlSeconds() const { std::lock_guard guard(m_mutex); return m_maxTtlSeconds; } +/* Gets the current system time in milliseconds. */ + uint64_t PatchStatusRegistry::nowMs() { return std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()).count(); } +/* Normalizes a mode string for key generation. */ + std::string PatchStatusRegistry::normalizeMode(const std::string& mode) { std::string normalized = mode; @@ -224,6 +261,8 @@ std::string PatchStatusRegistry::normalizeMode(const std::string& mode) return normalized; } +/* Builds a stable talkgroup lookup key for a patch member. */ + std::string PatchStatusRegistry::buildTalkgroupKey(const PatchMember& member) { std::ostringstream ss; @@ -231,6 +270,8 @@ std::string PatchStatusRegistry::buildTalkgroupKey(const PatchMember& member) return ss.str(); } +/* Serializes a patch member to JSON. */ + json::object PatchStatusRegistry::memberToJson(const PatchMember& member) { json::object obj = json::object(); @@ -242,6 +283,8 @@ json::object PatchStatusRegistry::memberToJson(const PatchMember& member) return obj; } +/* Serializes a patch record to JSON. */ + json::object PatchStatusRegistry::patchToJson(const PatchRecord& patch) { json::object obj = json::object(); @@ -257,10 +300,13 @@ json::object PatchStatusRegistry::patchToJson(const PatchRecord& patch) return obj; } +/* Serializes a peer patch snapshot to JSON. */ + json::object PatchStatusRegistry::peerSnapshotToJson(const PeerPatchSnapshot& peer) { json::object obj = json::object(); obj["peerId"].set(peer.peerId); + obj["originFnePeerId"].set(peer.originFnePeerId); obj["peerName"].set(peer.peerName); obj["sequence"].set(peer.sequence); obj["updatedAt"].set(peer.updatedAt); @@ -274,6 +320,8 @@ json::object PatchStatusRegistry::peerSnapshotToJson(const PeerPatchSnapshot& pe return obj; } +/* Parses one patch record from JSON. */ + bool PatchStatusRegistry::parsePatch(json::object& obj, PatchRecord& patch, std::string& errorMessage) const { if (obj["patchId"].is()) @@ -308,6 +356,8 @@ bool PatchStatusRegistry::parsePatch(json::object& obj, PatchRecord& patch, std: return true; } +/* Parses one patch member from JSON. */ + bool PatchStatusRegistry::parseMember(json::object& obj, PatchMember& member, std::string& errorMessage) const { if (obj["system"].is()) @@ -343,12 +393,16 @@ bool PatchStatusRegistry::parseMember(json::object& obj, PatchMember& member, st return true; } +/* Clamps a TTL value into the configured TTL range. */ + uint32_t PatchStatusRegistry::clampTtl(uint32_t ttlSeconds) const { std::lock_guard guard(m_mutex); return std::max(m_minTtlSeconds, std::min(ttlSeconds, m_maxTtlSeconds)); } +/* Creates a registry snapshot while the registry mutex is held. */ + json::object PatchStatusRegistry::snapshotLocked() const { json::object response = json::object(); @@ -365,6 +419,7 @@ json::object PatchStatusRegistry::snapshotLocked() const for (const PatchRecord& patch : peer.patches) { json::object patchObj = patchToJson(patch); patchObj["peerId"].set(peer.peerId); + patchObj["originFnePeerId"].set(peer.originFnePeerId); patchObj["peerName"].set(peer.peerName); patchObj["updatedAt"].set(peer.updatedAt); patchObj["expiresAt"].set(peer.expiresAt); @@ -378,6 +433,7 @@ json::object PatchStatusRegistry::snapshotLocked() const json::object tgPatch = json::object(); tgPatch["peerId"].set(peer.peerId); + tgPatch["originFnePeerId"].set(peer.originFnePeerId); tgPatch["peerName"].set(peer.peerName); tgPatch["patchId"].set(patch.patchId); tgPatch["active"].set(patch.active); @@ -397,6 +453,8 @@ json::object PatchStatusRegistry::snapshotLocked() const return response; } +/* Advances the registry revision while the registry mutex is held. */ + void PatchStatusRegistry::bumpRevisionLocked() { m_revision++; diff --git a/src/fne/PatchStatusRegistry.h b/src/fne/PatchStatusRegistry.h index 0537e6cd..3c8ee02c 100644 --- a/src/fne/PatchStatusRegistry.h +++ b/src/fne/PatchStatusRegistry.h @@ -19,83 +19,220 @@ #include #include +#include #include #include -#include #include /** * @brief In-memory registry for console-advertised patch status. + * + * The registry owns all patch status records published by console peers and + * replicated from neighboring FNEs. All access to the backing storage is + * serialized through the registry mutex; callers receive JSON snapshots and + * never receive direct references or iterators into the registry storage. + * * @ingroup fne */ class HOST_SW_API PatchStatusRegistry { public: + /** + * @brief Default number of seconds before a patch status record expires. + */ static constexpr uint32_t DEFAULT_TTL_SECONDS = 15U; + /** + * @brief Minimum accepted patch status TTL in seconds. + */ static constexpr uint32_t MIN_TTL_SECONDS = 5U; + /** + * @brief Maximum accepted patch status TTL in seconds. + */ static constexpr uint32_t MAX_TTL_SECONDS = 300U; + /** + * @brief Maximum wait time for long-poll style change requests. + */ static constexpr uint32_t MAX_WAIT_MS = 30000U; + /** + * @brief Initializes a new instance of the PatchStatusRegistry class. + */ PatchStatusRegistry(); + /** + * @brief Finalizes an instance of the PatchStatusRegistry class. + */ ~PatchStatusRegistry() = default; + /** + * @brief Configures the accepted TTL range for patch status records. + * @param defaultTtlSeconds Default TTL used when a publish request omits a TTL. + * @param minTtlSeconds Minimum accepted TTL. + * @param maxTtlSeconds Maximum accepted TTL. + */ void configure(uint32_t defaultTtlSeconds, uint32_t minTtlSeconds, uint32_t maxTtlSeconds); + /** + * @brief Publishes a complete patch snapshot for one console peer. + * @param request JSON request containing peerId, peerName, optional sequence, optional ttlSeconds, and patches. + * @param response JSON registry snapshot returned after the publish is applied. + * @param errorMessage Validation error text populated when the request is invalid. + * @returns bool True, if the publish request was valid and applied, otherwise false. + */ bool publish(json::object& request, json::object& response, std::string& errorMessage); + /** + * @brief Removes all patch records associated with a console peer. + * @param peerId Console peer ID whose records should be removed. + * @returns bool True, if records were removed, otherwise false. + */ bool removePeer(uint32_t peerId); + /** + * @brief Removes expired patch status records. + * @returns uint32_t Number of peer records removed. + */ uint32_t cleanupExpired(); + /** + * @brief Creates a complete JSON snapshot of the registry. + * @returns json::object Registry snapshot. + */ json::object snapshot(); + /** + * @brief Waits for registry changes after the supplied revision. + * @param sinceRevision Revision already known to the caller. + * @param waitMs Maximum wait time in milliseconds. + * @returns json::object Registry snapshot after change or timeout. + */ json::object waitForChanges(uint64_t sinceRevision, uint32_t waitMs); + /** + * @brief Gets the current registry revision. + * @returns uint64_t Registry revision number. + */ uint64_t revision() const; + /** + * @brief Gets the configured default patch status TTL. + * @returns uint32_t Default TTL in seconds. + */ uint32_t defaultTtlSeconds() const; + /** + * @brief Gets the configured minimum patch status TTL. + * @returns uint32_t Minimum TTL in seconds. + */ uint32_t minTtlSeconds() const; + /** + * @brief Gets the configured maximum patch status TTL. + * @returns uint32_t Maximum TTL in seconds. + */ uint32_t maxTtlSeconds() const; private: + /** + * @brief Represents a talkgroup member participating in a patch. + */ struct PatchMember { - std::string system; - std::string mode; - uint32_t tgid = 0U; - uint8_t slot = 0U; + std::string system; //!< System name for the member. + std::string mode; //!< Digital mode for the member. + uint32_t tgid = 0U; //!< Talkgroup ID for the member. + uint8_t slot = 0U; //!< Timeslot for the member, if applicable. }; + /** + * @brief Represents one active console patch. + */ struct PatchRecord { - std::string patchId; - bool active = true; - bool oneWay = false; - std::vector members; + std::string patchId; //!< Console-defined patch ID. + bool active = true; //!< Flag indicating whether the patch is active. + bool oneWay = false; //!< Flag indicating whether the patch is one-way. + std::vector members; //!< Talkgroup members in the patch. }; + /** + * @brief Represents a console peer's complete patch status snapshot. + */ struct PeerPatchSnapshot { - uint32_t peerId = 0U; - std::string peerName; - uint32_t sequence = 0U; - uint64_t updatedAt = 0U; - uint64_t expiresAt = 0U; - std::vector patches; + uint32_t peerId = 0U; //!< Console peer ID. + uint32_t originFnePeerId = 0U; //!< FNE peer ID where this status originated. + std::string peerName; //!< Console peer display name. + uint32_t sequence = 0U; //!< Console-defined sequence number. + uint64_t updatedAt = 0U; //!< Time this snapshot was accepted. + uint64_t expiresAt = 0U; //!< Time this snapshot expires. + std::vector patches; //!< Complete patch list for this console peer. }; + /** + * @brief Gets the current system time in milliseconds. + * @returns uint64_t Current time in milliseconds. + */ static uint64_t nowMs(); + /** + * @brief Normalizes a mode string for key generation. + * @param mode Mode string. + * @returns std::string Normalized mode string. + */ static std::string normalizeMode(const std::string& mode); + /** + * @brief Builds a stable talkgroup lookup key for a patch member. + * @param member Patch member. + * @returns std::string Talkgroup key. + */ static std::string buildTalkgroupKey(const PatchMember& member); + /** + * @brief Serializes a patch member to JSON. + * @param member Patch member. + * @returns json::object JSON patch member. + */ static json::object memberToJson(const PatchMember& member); + /** + * @brief Serializes a patch record to JSON. + * @param patch Patch record. + * @returns json::object JSON patch record. + */ static json::object patchToJson(const PatchRecord& patch); + /** + * @brief Serializes a peer patch snapshot to JSON. + * @param peer Peer patch snapshot. + * @returns json::object JSON peer patch snapshot. + */ static json::object peerSnapshotToJson(const PeerPatchSnapshot& peer); + /** + * @brief Parses one patch record from JSON. + * @param obj JSON patch object. + * @param patch Parsed patch record. + * @param errorMessage Validation error text. + * @returns bool True, if parsed successfully, otherwise false. + */ bool parsePatch(json::object& obj, PatchRecord& patch, std::string& errorMessage) const; + /** + * @brief Parses one patch member from JSON. + * @param obj JSON patch member object. + * @param member Parsed patch member. + * @param errorMessage Validation error text. + * @returns bool True, if parsed successfully, otherwise false. + */ bool parseMember(json::object& obj, PatchMember& member, std::string& errorMessage) const; + /** + * @brief Clamps a TTL value into the configured TTL range. + * @param ttlSeconds TTL in seconds. + * @returns uint32_t Clamped TTL in seconds. + */ uint32_t clampTtl(uint32_t ttlSeconds) const; + /** + * @brief Creates a registry snapshot while the registry mutex is held. + * @returns json::object Registry snapshot. + */ json::object snapshotLocked() const; + /** + * @brief Advances the registry revision while the registry mutex is held. + */ void bumpRevisionLocked(); - mutable std::mutex m_mutex; - std::condition_variable m_revisionChanged; - std::unordered_map m_peerPatches; - uint64_t m_revision; - uint32_t m_defaultTtlSeconds; - uint32_t m_minTtlSeconds; - uint32_t m_maxTtlSeconds; + mutable std::mutex m_mutex; //!< Mutex guarding registry state. + std::condition_variable m_revisionChanged; //!< Condition variable signaled when revision changes. + std::map m_peerPatches; //!< Peer patch snapshots keyed by console peer ID. + uint64_t m_revision; //!< Monotonic registry revision. + uint32_t m_defaultTtlSeconds; //!< Default TTL in seconds. + uint32_t m_minTtlSeconds; //!< Minimum TTL in seconds. + uint32_t m_maxTtlSeconds; //!< Maximum TTL in seconds. }; #endif // __FNE_PATCH_STATUS_REGISTRY_H__ diff --git a/src/fne/network/MetadataNetwork.cpp b/src/fne/network/MetadataNetwork.cpp index bc58a09f..c5702412 100644 --- a/src/fne/network/MetadataNetwork.cpp +++ b/src/fne/network/MetadataNetwork.cpp @@ -419,6 +419,7 @@ void MetadataNetwork::taskNetworkRx(NetPacketRequest* req) // The authenticated peer identity is authoritative; do not allow spoofed peer IDs. reqObj["peerId"].set(pktPeerId); + reqObj["originFnePeerId"].set(network->m_peerId); if (!reqObj["peerName"].is() || reqObj["peerName"].get().empty()) reqObj["peerName"].set(connection->identity()); @@ -431,6 +432,7 @@ void MetadataNetwork::taskNetworkRx(NetPacketRequest* req) } network->writePatchStatusToConsoles(response); + network->replicatePatchStatus(reqObj); } else { network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED); @@ -633,6 +635,89 @@ void MetadataNetwork::taskNetworkRx(NetPacketRequest* req) } } } + else if (req->fneHeader.getSubFunction() == NET_SUBFUNC::REPL_PATCH_STATUS) { // Peer Replication Patch Status + if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { + FNEPeerConnection* connection = network->m_peers[peerId]; + if (connection != nullptr) { + std::string ip = udp::Socket::address(req->address); + + // validate peer (simple validation really) + if (connection->connected() && connection->address() == ip && connection->peerClass() == PEER_CONN_CLASS_NEIGHBOR && + connection->isReplica()) { + DECLARE_UINT8_ARRAY(rawPayload, req->length); + ::memcpy(rawPayload, req->buffer, req->length); + + if (mdNetwork->m_peerPatchStatusPkt.find(peerId) == mdNetwork->m_peerPatchStatusPkt.end()) { + mdNetwork->m_peerPatchStatusPkt.insert(peerId, MetadataNetwork::PacketBufferEntry()); + + MetadataNetwork::PacketBufferEntry& pkt = mdNetwork->m_peerPatchStatusPkt[peerId]; + pkt.buffer = new PacketBuffer(true, "Peer Replication, Patch Status"); + pkt.streamId = streamId; + + pkt.locked = false; + } else { + MetadataNetwork::PacketBufferEntry& pkt = mdNetwork->m_peerPatchStatusPkt[peerId]; + if (!pkt.locked && pkt.streamId != streamId) { + LogError(LOG_REPL, "PEER %u (%s) Peer Replication, Patch Status, stream ID mismatch, expected %u, got %u", peerId, + connection->identWithQualifier().c_str(), pkt.streamId, streamId); + pkt.buffer->clear(); + pkt.streamId = streamId; + } + + if (pkt.streamId != streamId) { + // otherwise drop the packet + break; + } + } + + MetadataNetwork::PacketBufferEntry& pkt = mdNetwork->m_peerPatchStatusPkt[peerId]; + if (pkt.locked) { + while (pkt.locked) + Thread::sleep(1U); + } + + pkt.locked = true; + + uint32_t decompressedLen = 0U; + uint8_t* decompressed = nullptr; + + if (pkt.buffer->decode(rawPayload, &decompressed, &decompressedLen)) { + mdNetwork->m_peerPatchStatusPkt.lock(); + std::string payload(decompressed + 8U, decompressed + decompressedLen); + + json::value v; + std::string err = json::parse(v, payload); + if (!err.empty() || !v.is()) { + LogError(LOG_REPL, "PEER %u (%s) error parsing patch status replication, %s", peerId, connection->identWithQualifier().c_str(), err.c_str()); + pkt.buffer->clear(); + pkt.streamId = 0U; + if (decompressed != nullptr) { + delete[] decompressed; + } + mdNetwork->m_peerPatchStatusPkt.unlock(); + mdNetwork->m_peerPatchStatusPkt.erase(peerId); + break; + } + + network->processReplicatedPatchStatus(peerId, v.get()); + + pkt.buffer->clear(); + delete pkt.buffer; + pkt.streamId = 0U; + if (decompressed != nullptr) { + delete[] decompressed; + } + mdNetwork->m_peerPatchStatusPkt.unlock(); + mdNetwork->m_peerPatchStatusPkt.erase(peerId); + } else { + pkt.locked = false; + } + } else { + network->writePeerNAK(peerId, 0U, TAG_PEER_REPLICA, NET_CONN_NAK_FNE_UNAUTHORIZED); + } + } + } + } break; case NET_FUNC::NET_TREE: diff --git a/src/fne/network/MetadataNetwork.h b/src/fne/network/MetadataNetwork.h index 62629a2e..dcc6dda4 100644 --- a/src/fne/network/MetadataNetwork.h +++ b/src/fne/network/MetadataNetwork.h @@ -117,6 +117,7 @@ namespace network bool locked; }; concurrent::unordered_map m_peerReplicaActPkt; + concurrent::unordered_map m_peerPatchStatusPkt; concurrent::unordered_map m_peerTreeListPkt; ThreadPool m_threadPool; diff --git a/src/fne/network/PeerNetwork.cpp b/src/fne/network/PeerNetwork.cpp index b2d517cb..477ad985 100644 --- a/src/fne/network/PeerNetwork.cpp +++ b/src/fne/network/PeerNetwork.cpp @@ -48,6 +48,7 @@ PeerNetwork::PeerNetwork(const std::string& address, uint16_t port, uint16_t loc m_analogCallback(nullptr), m_netTreeDiscCallback(nullptr), m_peerReplicaCallback(nullptr), + m_patchStatusCallback(nullptr), m_masterPeerId(0U), m_pidLookup(nullptr), m_peerReplica(false), @@ -55,6 +56,7 @@ PeerNetwork::PeerNetwork(const std::string& address, uint16_t port, uint16_t loc m_tgidPkt(true, "Peer Replication, TGID List"), m_ridPkt(true, "Peer Replication, RID List"), m_pidPkt(true, "Peer Replication, PID List"), + m_patchStatusPkt(true, "Peer Replication, Patch Status"), m_threadPool(WORKER_CNT, "peer"), m_prevSpanningTreeChildren(0U), m_nakFallOver(false), @@ -233,6 +235,40 @@ bool PeerNetwork::writeHAParams(std::vector& haParams) return false; } +/* Writes a complete console patch status update upstream. */ + +bool PeerNetwork::writePatchStatus(json::object obj) +{ + if (!m_peerReplica) + return false; + + obj["type"].set("publish"); + json::value v = json::value(obj); + std::string json = std::string(v.serialize()); + + size_t len = json.length() + 9U; + DECLARE_CHAR_ARRAY(buffer, len); + + ::memcpy(buffer + 0U, TAG_PEER_REPLICA, 4U); + ::snprintf(buffer + 8U, json.length() + 1U, "%s", json.c_str()); + + PacketBuffer pkt(true, "Peer Replication, Patch Status"); + pkt.encode((uint8_t*)buffer, len); + + uint32_t streamId = createStreamId(); + LogInfoEx(LOG_REPL, "PEER %u Peer Replication, Patch Status, blocks %u, streamId = %u", m_peerId, pkt.fragments.size(), streamId); + if (pkt.fragments.size() > 0U) { + for (auto frag : pkt.fragments) { + writeMaster({ NET_FUNC::REPL, NET_SUBFUNC::REPL_PATCH_STATUS }, + frag.second->data, FRAG_SIZE, RTP_END_OF_CALL_SEQ, streamId, true); + Thread::sleep(60U); // pace block transmission + } + } + + pkt.clear(); + return true; +} + // --------------------------------------------------------------------------- // Protected Class Members // --------------------------------------------------------------------------- @@ -462,6 +498,32 @@ void PeerNetwork::userPacketHandler(uint32_t peerId, FrameQueue::OpcodePair opco } break; + case NET_SUBFUNC::REPL_PATCH_STATUS: // Patch Status + { + uint32_t decompressedLen = 0U; + uint8_t* decompressed = nullptr; + + if (m_patchStatusPkt.decode(data, &decompressed, &decompressedLen)) { + std::string payload(decompressed + 8U, decompressed + decompressedLen); + + json::value v; + std::string err = json::parse(v, payload); + if (!err.empty() || !v.is()) { + LogError(LOG_PEER, "PEER %u error parsing patch status replication, %s", m_peerId, err.c_str()); + m_patchStatusPkt.clear(); + delete[] decompressed; + break; + } + + if (m_patchStatusCallback != nullptr) + m_patchStatusCallback(this, v.get()); + + m_patchStatusPkt.clear(); + delete[] decompressed; + } + } + break; + default: break; } diff --git a/src/fne/network/PeerNetwork.h b/src/fne/network/PeerNetwork.h index 48fbf40b..74f3c8a4 100644 --- a/src/fne/network/PeerNetwork.h +++ b/src/fne/network/PeerNetwork.h @@ -17,6 +17,7 @@ #define __PEER_NETWORK_H__ #include "Defines.h" +#include "common/json/json.h" #include "common/lookups/PeerListLookup.h" #include "common/network/Network.h" #include "common/network/PacketBuffer.h" @@ -138,9 +139,14 @@ namespace network void setNetTreeDiscCallback(std::function&& callback) { m_netTreeDiscCallback = callback; } /** * @brief Helper to set the peer replica notification callback. - * @param callback + * @param callback */ void setNotifyPeerReplicaCallback(std::function&& callback) { m_peerReplicaCallback = callback; } + /** + * @brief Helper to set the peer patch status callback. + * @param callback + */ + void setPatchStatusCallback(std::function&& callback) { m_patchStatusCallback = callback; } /** * @brief Writes a complete update of this CFNE's active peer list to the network. @@ -235,6 +241,26 @@ namespace network * @returns bool True, if list was sent, otherwise false. */ bool writeHAParams(std::vector& haParams); + /** + * @brief Writes a complete console patch status update upstream. + * \code{.unparsed} + * The patch status replication message is a JSON body, and is a packet + * buffer compressed message. + * + * { + * "type": "publish", + * "peerId": , + * "originFnePeerId": , + * "peerName": "", + * "sequence": , + * "ttlSeconds": , + * "patches": [] + * } + * \endcode + * @param obj JSON patch status publish payload. + * @returns bool True, if patch status was sent, otherwise false. + */ + bool writePatchStatus(json::object obj); /** * @brief Returns flag indicating whether or not this peer connection is peer replication enabled. @@ -298,6 +324,10 @@ namespace network * @brief Peer Replica Notification Callback. */ std::function m_peerReplicaCallback; + /** + * @brief Peer patch status callback. + */ + std::function m_patchStatusCallback; /** * @brief User overrideable handler that allows user code to process network packets not handled by this class. @@ -336,6 +366,7 @@ namespace network PacketBuffer m_tgidPkt; PacketBuffer m_ridPkt; PacketBuffer m_pidPkt; + PacketBuffer m_patchStatusPkt; ThreadPool m_threadPool; diff --git a/src/fne/network/TrafficNetwork.cpp b/src/fne/network/TrafficNetwork.cpp index ab06bb22..16b23cf3 100644 --- a/src/fne/network/TrafficNetwork.cpp +++ b/src/fne/network/TrafficNetwork.cpp @@ -532,6 +532,27 @@ void TrafficNetwork::processNetworkTreeDisconnect(uint32_t peerId, uint32_t offe } } +/* Processes a replicated console patch status update. */ + +void TrafficNetwork::processReplicatedPatchStatus(uint32_t peerId, json::object obj) +{ + if (!m_patchStatusEnabled) + return; + + if (!obj["peerId"].is() || !obj["patches"].is()) + return; + + json::object response = json::object(); + std::string errorMessage; + if (!m_patchStatusRegistry.publish(obj, response, errorMessage)) { + LogWarning(LOG_MASTER, "PEER %u invalid replicated patch status payload, %s", peerId, errorMessage.c_str()); + return; + } + + writePatchStatusToConsoles(response); + replicatePatchStatus(obj, peerId); +} + /* Helper to process an downstream peer In-Call Control message. */ void TrafficNetwork::processDownstreamInCallCtrl(network::NET_ICC::ENUM command, network::NET_SUBFUNC::ENUM subFunc, uint32_t dstId, @@ -2324,8 +2345,18 @@ void TrafficNetwork::erasePeer(uint32_t peerId) } // erase any console patch status records for this peer - if (m_patchStatusEnabled && m_patchStatusRegistry.removePeer(peerId)) + if (m_patchStatusEnabled && m_patchStatusRegistry.removePeer(peerId)) { + json::object clearPatchStatus = json::object(); + uint32_t originFnePeerId = m_peerId; + uint32_t ttlSeconds = m_patchStatusRegistry.defaultTtlSeconds(); + clearPatchStatus["type"].set("publish"); + clearPatchStatus["peerId"].set(peerId); + clearPatchStatus["originFnePeerId"].set(originFnePeerId); + clearPatchStatus["ttlSeconds"].set(ttlSeconds); + clearPatchStatus["patches"].set(json::array()); writePatchStatusToConsoles(m_patchStatusRegistry.snapshot()); + replicatePatchStatus(clearPatchStatus); + } // erase any HA parameters for this peer { @@ -2476,6 +2507,38 @@ void TrafficNetwork::writePatchStatusToConsoles(json::object obj, uint32_t excep m_peers.shared_unlock(); } +/* Helper to replicate patch status state to neighboring FNE peers. */ + +void TrafficNetwork::replicatePatchStatus(json::object obj, uint32_t exceptPeerId) +{ + if (!m_patchStatusEnabled) + return; + + obj["type"].set("publish"); + + if (m_host->m_peerNetworks.size() > 0U) { + for (auto peer : m_host->m_peerNetworks) { + if (peer.first == exceptPeerId) + continue; + if (peer.second != nullptr && peer.second->isEnabled() && peer.second->isReplica()) + peer.second->writePatchStatus(obj); + } + } + + m_peers.shared_lock(); + for (auto peer : m_peers) { + if (peer.first == exceptPeerId) + continue; + if (peer.second == nullptr) + continue; + if (!peer.second->connected() || peer.second->peerClass() != PEER_CONN_CLASS_NEIGHBOR || !peer.second->isReplica()) + continue; + + writePatchStatusReplicationPayload(peer.second, obj); + } + m_peers.shared_unlock(); +} + /* Helper to serialize and queue a patch status transfer payload. */ bool TrafficNetwork::writePatchStatusPayload(FNEPeerConnection* connection, json::object obj) @@ -2513,6 +2576,47 @@ bool TrafficNetwork::writePatchStatusPayload(FNEPeerConnection* connection, json { NET_FUNC::TRANSFER, NET_SUBFUNC::TRANSFER_SUBFUNC_PATCH_STATUS }, RTP_END_OF_CALL_SEQ, addr, addrLen); } +/* Helper to serialize and queue a patch status replication payload. */ + +bool TrafficNetwork::writePatchStatusReplicationPayload(FNEPeerConnection* connection, json::object obj) +{ + if (connection == nullptr) + return false; + if (!m_patchStatusEnabled) + return false; + if (!connection->connected()) + return false; + if (connection->peerClass() != PEER_CONN_CLASS_NEIGHBOR || !connection->isReplica()) + return false; + + obj["type"].set("publish"); + json::value v = json::value(obj); + std::string json = std::string(v.serialize()); + + size_t len = json.length() + 9U; + DECLARE_CHAR_ARRAY(buffer, len); + + ::memcpy(buffer + 0U, TAG_PEER_REPLICA, 4U); + ::snprintf(buffer + 8U, json.length() + 1U, "%s", json.c_str()); + + PacketBuffer pkt(true, "Peer Replication, Patch Status"); + pkt.encode((uint8_t*)buffer, len); + + uint32_t streamId = createStreamId(); + LogInfoEx(LOG_REPL, "PEER %u (%s) Peer Replication, Patch Status, blocks %u, streamId = %u", connection->id(), + connection->identWithQualifier().c_str(), pkt.fragments.size(), streamId); + if (pkt.fragments.size() > 0U) { + for (auto frag : pkt.fragments) { + writePeer(connection->id(), m_peerId, { NET_FUNC::REPL, NET_SUBFUNC::REPL_PATCH_STATUS }, + frag.second->data, FRAG_SIZE, RTP_END_OF_CALL_SEQ, streamId); + Thread::sleep(60U); // pace block transmission + } + } + + pkt.clear(); + return true; +} + /* Helper to reset a peer connection. */ bool TrafficNetwork::resetPeer(uint32_t peerId) diff --git a/src/fne/network/TrafficNetwork.h b/src/fne/network/TrafficNetwork.h index f234df87..1a1a9ccc 100644 --- a/src/fne/network/TrafficNetwork.h +++ b/src/fne/network/TrafficNetwork.h @@ -237,6 +237,12 @@ namespace network * @param offendingPeerId Offending Peer ID. */ void processNetworkTreeDisconnect(uint32_t peerId, uint32_t offendingPeerId); + /** + * @brief Processes a replicated console patch status update. + * @param peerId Peer ID that delivered the replication update. + * @param obj Patch status JSON payload. + */ + void processReplicatedPatchStatus(uint32_t peerId, json::object obj); /** * @brief Helper to process an downstream peer In-Call Control message. @@ -298,6 +304,12 @@ namespace network * @param exceptPeerId Optional peer ID to skip. */ void writePatchStatusToConsoles(json::object obj, uint32_t exceptPeerId = 0U); + /** + * @brief Replicates patch status state to neighboring FNE peers. + * @param obj Patch status JSON payload. + * @param exceptPeerId Optional peer ID to skip. + */ + void replicatePatchStatus(json::object obj, uint32_t exceptPeerId = 0U); /** * @brief Helper to reset a peer connection. @@ -827,6 +839,13 @@ namespace network * @returns bool True, if message was queued, otherwise false. */ bool writePatchStatusPayload(FNEPeerConnection* connection, json::object obj); + /** + * @brief Serializes and queues a patch status replication payload. + * @param connection Destination neighbor connection. + * @param obj Patch status JSON payload. + * @returns bool True, if message was queued, otherwise false. + */ + bool writePatchStatusReplicationPayload(FNEPeerConnection* connection, json::object obj); /* ** Internal KMM Callback.