feat(nng): implement one-time first packet logging and periodic statistical aggregation for NNG events

pull/23/head
Dave Behnke 1 month ago
parent 2f7008abb1
commit e5095eb3d8

@ -1,6 +1,7 @@
#include "NNGPublisher.h"
#include "Global.h"
#include <iostream>
#include <sstream>
CNNGPublisher::CNNGPublisher()
: m_started(false)
@ -59,8 +60,27 @@ void CNNGPublisher::Publish(const nlohmann::json &event)
std::cout << "NNG debug: Attempting to publish message of size " << msg.size() << ": " << msg << std::endl;
int rv = nng_send(m_sock, (void *)msg.c_str(), msg.size(), NNG_FLAG_NONBLOCK);
if (rv == 0) {
std::cout << "NNG: Published event: " << event["type"] << std::endl;
// Count event instead of logging
std::string type = event["type"];
m_EventCounts[type]++;
} else if (rv != NNG_EAGAIN) {
std::cerr << "NNG: Send error: " << nng_strerror(rv) << std::endl;
}
}
std::string CNNGPublisher::GetAndClearStats()
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_EventCounts.empty()) return "";
std::stringstream ss;
bool first = true;
for (const auto& kv : m_EventCounts)
{
if (!first) ss << ", ";
ss << "\"" << kv.first << "\": " << kv.second;
first = false;
}
m_EventCounts.clear();
return ss.str();
}

@ -3,6 +3,7 @@
#include <string>
#include <mutex>
#include <nlohmann/json.hpp>
#include <map>
#include <nng/nng.h>
#include <nng/protocol/pubsub0/pub.h>
@ -17,8 +18,13 @@ public:
void Publish(const nlohmann::json &event);
std::string GetAndClearStats();
private:
nng_socket m_sock;
std::mutex m_mutex;
bool m_started;
// Event counters
std::map<std::string, int> m_EventCounts;
};

@ -397,13 +397,47 @@ void CReflector::MaintenanceThread()
if (++nngCounter >= (nngInterval * 10))
{
nngCounter = 0;
std::cout << "NNG debug: Periodic state broadcast..." << std::endl;
// Removed spammy log: std::cout << "NNG debug: Periodic state broadcast..." << std::endl;
nlohmann::json state;
state["type"] = "state";
JsonReport(state);
g_NNGPublisher.Publish(state);
}
// Log aggregated stats every ~2 minutes (assuming loop runs every 10s * XML_UPDATE_PERIOD=10 = 100s per cycle? No wait)
// XML_UPDATE_PERIOD is 10. Loop is XML_UPDATE_PERIOD * 10 = 100 iterations.
// Sleep is 100ms. So loop is 10s total.
// nngInterval default is 10s.
// Reflector.cpp loop logic is:
// while(keep_running) {
// Update XML/JSON
// for (10s) {
// update NNG state
// check TC
// sleep(100ms)
// }
// }
// So the outer loop runs every 10s.
// To get ~2 minutes, we can use a static counter in the outer loop or piggyback here.
// Let's use a static counter inside the loop or check 'i' (which resets every 10s).
// Easier: add a static counter to MaintenanceThread or verify nngCounter.
}
// New Aggregated Stats Logic
// Log every 1200 iterations (1200 * 100ms = 120s = 2 mins)
static int statsCounter = 0;
if (++statsCounter >= 1200) {
statsCounter = 0;
std::string nngStats = g_NNGPublisher.GetAndClearStats();
std::string tcStats = g_TCServer.GetAndClearStats();
if (!nngStats.empty() || !tcStats.empty()) {
std::cout << "Stats: ";
if (!nngStats.empty()) std::cout << "NNG [" << nngStats << "] ";
if (!tcStats.empty()) std::cout << "TCD [" << tcStats << "]";
std::cout << std::endl;
}
}
if (tcport && g_TCServer.AnyAreClosed())
{

@ -81,6 +81,18 @@ void CTCSocket::Dispatcher()
memcpy(&pkt, buf, sizeof(STCPacket));
nng_free(buf, sz);
// Log first packet from this module
if (m_SeenModules.find(pkt.module) == m_SeenModules.end())
{
std::cout << "NNG: Received first packet from module " << pkt.module << std::endl;
m_SeenModules.insert(pkt.module);
}
{
std::lock_guard<std::mutex> lock(m_StatsMutex);
m_PacketCounts[pkt.module]++;
}
if (m_ClientQueue)
{
// Client mode: everything goes to one queue
@ -247,3 +259,20 @@ void CTCClient::ReConnect()
{
// NNG handles reconnection automatically
}
std::string CTCSocket::GetAndClearStats()
{
std::lock_guard<std::mutex> lock(m_StatsMutex);
if (m_PacketCounts.empty()) return "";
std::stringstream ss;
bool first = true;
for (const auto& kv : m_PacketCounts)
{
if (!first) ss << ", ";
ss << kv.first << ": " << kv.second;
first = false;
}
m_PacketCounts.clear();
return ss.str();
}

@ -10,6 +10,8 @@
#include <condition_variable>
#include <map>
#include <atomic>
#include <set>
#include <sstream>
#include <nng/nng.h>
#include <nng/protocol/pair1/pair.h>
@ -57,6 +59,8 @@ public:
bool IsConnected(char module) const;
int GetFD(char module) const; // Legacy compat: returns 1 if connected, -1 if not
std::string GetAndClearStats();
protected:
nng_socket m_Sock;
@ -68,8 +72,16 @@ protected:
// Per-module input queues
std::map<char, std::shared_ptr<CTCPacketQueue>> m_Queues;
// Client queue (receives all)
// Client queue (receives all)
std::shared_ptr<CTCPacketQueue> m_ClientQueue;
// Track seen modules for logging
std::set<char> m_SeenModules;
// Packet counters
std::map<char, int> m_PacketCounts;
std::mutex m_StatsMutex;
void Dispatcher();
};

Loading…
Cancel
Save

Powered by TurnKey Linux.