From e5095eb3d8f2a06613392ba579d0f8d465dcd4b0 Mon Sep 17 00:00:00 2001 From: Dave Behnke <916775+dbehnke@users.noreply.github.com> Date: Sat, 27 Dec 2025 13:02:40 -0500 Subject: [PATCH] feat(nng): implement one-time first packet logging and periodic statistical aggregation for NNG events --- reflector/NNGPublisher.cpp | 22 +++++++++++++++++++++- reflector/NNGPublisher.h | 6 ++++++ reflector/Reflector.cpp | 36 +++++++++++++++++++++++++++++++++++- reflector/TCSocket.cpp | 29 +++++++++++++++++++++++++++++ reflector/TCSocket.h | 12 ++++++++++++ 5 files changed, 103 insertions(+), 2 deletions(-) diff --git a/reflector/NNGPublisher.cpp b/reflector/NNGPublisher.cpp index 9f87347..fbaf1ca 100644 --- a/reflector/NNGPublisher.cpp +++ b/reflector/NNGPublisher.cpp @@ -1,6 +1,7 @@ #include "NNGPublisher.h" #include "Global.h" #include +#include CNNGPublisher::CNNGPublisher() : m_started(false) @@ -59,8 +60,27 @@ void CNNGPublisher::Publish(const nlohmann::json &event) std::cout << "NNG debug: Attempting to publish message of size " << msg.size() << ": " << msg << std::endl; int rv = nng_send(m_sock, (void *)msg.c_str(), msg.size(), NNG_FLAG_NONBLOCK); if (rv == 0) { - std::cout << "NNG: Published event: " << event["type"] << std::endl; + // Count event instead of logging + std::string type = event["type"]; + m_EventCounts[type]++; } else if (rv != NNG_EAGAIN) { std::cerr << "NNG: Send error: " << nng_strerror(rv) << std::endl; } } + +std::string CNNGPublisher::GetAndClearStats() +{ + std::lock_guard lock(m_mutex); + if (m_EventCounts.empty()) return ""; + + std::stringstream ss; + bool first = true; + for (const auto& kv : m_EventCounts) + { + if (!first) ss << ", "; + ss << "\"" << kv.first << "\": " << kv.second; + first = false; + } + m_EventCounts.clear(); + return ss.str(); +} diff --git a/reflector/NNGPublisher.h b/reflector/NNGPublisher.h index 478fb20..f9b6f80 100644 --- a/reflector/NNGPublisher.h +++ b/reflector/NNGPublisher.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -17,8 +18,13 @@ public: void Publish(const nlohmann::json &event); + std::string GetAndClearStats(); + private: nng_socket m_sock; std::mutex m_mutex; bool m_started; + + // Event counters + std::map m_EventCounts; }; diff --git a/reflector/Reflector.cpp b/reflector/Reflector.cpp index 3ec3413..60f9061 100644 --- a/reflector/Reflector.cpp +++ b/reflector/Reflector.cpp @@ -397,13 +397,47 @@ void CReflector::MaintenanceThread() if (++nngCounter >= (nngInterval * 10)) { nngCounter = 0; - std::cout << "NNG debug: Periodic state broadcast..." << std::endl; + // Removed spammy log: std::cout << "NNG debug: Periodic state broadcast..." << std::endl; nlohmann::json state; state["type"] = "state"; JsonReport(state); g_NNGPublisher.Publish(state); } + + // Log aggregated stats every ~2 minutes (assuming loop runs every 10s * XML_UPDATE_PERIOD=10 = 100s per cycle? No wait) + // XML_UPDATE_PERIOD is 10. Loop is XML_UPDATE_PERIOD * 10 = 100 iterations. + // Sleep is 100ms. So loop is 10s total. + // nngInterval default is 10s. + // Reflector.cpp loop logic is: + // while(keep_running) { + // Update XML/JSON + // for (10s) { + // update NNG state + // check TC + // sleep(100ms) + // } + // } + // So the outer loop runs every 10s. + // To get ~2 minutes, we can use a static counter in the outer loop or piggyback here. + // Let's use a static counter inside the loop or check 'i' (which resets every 10s). + // Easier: add a static counter to MaintenanceThread or verify nngCounter. } + + // New Aggregated Stats Logic + // Log every 1200 iterations (1200 * 100ms = 120s = 2 mins) + static int statsCounter = 0; + if (++statsCounter >= 1200) { + statsCounter = 0; + std::string nngStats = g_NNGPublisher.GetAndClearStats(); + std::string tcStats = g_TCServer.GetAndClearStats(); + + if (!nngStats.empty() || !tcStats.empty()) { + std::cout << "Stats: "; + if (!nngStats.empty()) std::cout << "NNG [" << nngStats << "] "; + if (!tcStats.empty()) std::cout << "TCD [" << tcStats << "]"; + std::cout << std::endl; + } + } if (tcport && g_TCServer.AnyAreClosed()) { diff --git a/reflector/TCSocket.cpp b/reflector/TCSocket.cpp index 1ea498b..485369b 100644 --- a/reflector/TCSocket.cpp +++ b/reflector/TCSocket.cpp @@ -81,6 +81,18 @@ void CTCSocket::Dispatcher() memcpy(&pkt, buf, sizeof(STCPacket)); nng_free(buf, sz); + // Log first packet from this module + if (m_SeenModules.find(pkt.module) == m_SeenModules.end()) + { + std::cout << "NNG: Received first packet from module " << pkt.module << std::endl; + m_SeenModules.insert(pkt.module); + } + + { + std::lock_guard lock(m_StatsMutex); + m_PacketCounts[pkt.module]++; + } + if (m_ClientQueue) { // Client mode: everything goes to one queue @@ -247,3 +259,20 @@ void CTCClient::ReConnect() { // NNG handles reconnection automatically } + +std::string CTCSocket::GetAndClearStats() +{ + std::lock_guard lock(m_StatsMutex); + if (m_PacketCounts.empty()) return ""; + + std::stringstream ss; + bool first = true; + for (const auto& kv : m_PacketCounts) + { + if (!first) ss << ", "; + ss << kv.first << ": " << kv.second; + first = false; + } + m_PacketCounts.clear(); + return ss.str(); +} diff --git a/reflector/TCSocket.h b/reflector/TCSocket.h index 68c7b71..86ebf98 100644 --- a/reflector/TCSocket.h +++ b/reflector/TCSocket.h @@ -10,6 +10,8 @@ #include #include #include +#include +#include #include #include @@ -57,6 +59,8 @@ public: bool IsConnected(char module) const; int GetFD(char module) const; // Legacy compat: returns 1 if connected, -1 if not + + std::string GetAndClearStats(); protected: nng_socket m_Sock; @@ -68,8 +72,16 @@ protected: // Per-module input queues std::map> m_Queues; // Client queue (receives all) + // Client queue (receives all) std::shared_ptr m_ClientQueue; + // Track seen modules for logging + std::set m_SeenModules; + + // Packet counters + std::map m_PacketCounts; + std::mutex m_StatsMutex; + void Dispatcher(); };