From 1120e64b684595ecfa7f79a8c54b637244d5cbfb Mon Sep 17 00:00:00 2001 From: Dave Behnke <916775+dbehnke@users.noreply.github.com> Date: Wed, 24 Dec 2025 15:19:13 -0500 Subject: [PATCH] Implement non-blocking NNG Publisher for real-time dashboard events --- reflector/Clients.cpp | 29 ++++++++++++++----- reflector/Configure.cpp | 17 ++++++++++++ reflector/Configure.h | 2 +- reflector/Global.h | 2 ++ reflector/JsonKeys.h | 3 ++ reflector/Main.cpp | 12 ++++++-- reflector/Makefile | 2 +- reflector/NNGPublisher.cpp | 57 ++++++++++++++++++++++++++++++++++++++ reflector/NNGPublisher.h | 24 ++++++++++++++++ reflector/Users.cpp | 10 +++++++ 10 files changed, 147 insertions(+), 11 deletions(-) create mode 100644 reflector/NNGPublisher.cpp create mode 100644 reflector/NNGPublisher.h diff --git a/reflector/Clients.cpp b/reflector/Clients.cpp index 74e7b66..e5e22c3 100644 --- a/reflector/Clients.cpp +++ b/reflector/Clients.cpp @@ -43,14 +43,11 @@ CClients::~CClients() void CClients::AddClient(std::shared_ptr client) { // first check if client already exists - for ( auto it=begin(); it!=end(); it++ ) + for ( auto it=m_Clients.begin(); it!=m_Clients.end(); it++ ) { if (*client == *(*it)) - // if found, just do nothing - // so *client keep pointing on a valid object - // on function return { - // delete new one + // if found, just do nothing return; } } @@ -63,13 +60,21 @@ void CClients::AddClient(std::shared_ptr client) std::cout << " on module " << client->GetReflectorModule(); } std::cout << std::endl; + + // dashboard event + nlohmann::json event; + event["type"] = "client_connect"; + event["callsign"] = client->GetCallsign().GetCS(); + event["ip"] = client->GetIp().GetAddress(); + event["protocol"] = client->GetProtocolName(); + event["module"] = std::string(1, client->GetReflectorModule()); + g_NNGPublisher.Publish(event); } void CClients::RemoveClient(std::shared_ptr client) { // look for the client - bool found = false; - for ( auto it=begin(); it!=end(); it++ ) + for ( auto it=m_Clients.begin(); it!=m_Clients.end(); it++ ) { // compare object pointers if ( *it == client ) @@ -84,6 +89,16 @@ void CClients::RemoveClient(std::shared_ptr client) std::cout << " on module " << (*it)->GetReflectorModule(); } std::cout << std::endl; + + // dashboard event + nlohmann::json event; + event["type"] = "client_disconnect"; + event["callsign"] = (*it)->GetCallsign().GetCS(); + event["ip"] = (*it)->GetIp().GetAddress(); + event["protocol"] = (*it)->GetProtocolName(); + event["module"] = std::string(1, (*it)->GetReflectorModule()); + g_NNGPublisher.Publish(event); + m_Clients.erase(it); break; } diff --git a/reflector/Configure.cpp b/reflector/Configure.cpp index 445e953..d391c88 100644 --- a/reflector/Configure.cpp +++ b/reflector/Configure.cpp @@ -38,6 +38,7 @@ #define JBRANDMEISTER "Brandmeister" #define JCALLSIGN "Callsign" #define JCOUNTRY "Country" +#define JDASHBOARD "Dashboard" #define JDASHBOARDURL "DashboardUrl" #define JDCS "DCS" #define JDEFAULTID "DefaultId" @@ -122,6 +123,9 @@ CConfigure::CConfigure() { IPv4RegEx = std::regex("^((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])\\.){3,3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9]){1,1}$", std::regex::extended); IPv6RegEx = std::regex("^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}(:[0-9a-fA-F]{1,4}){1,1}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|([0-9a-fA-F]{1,4}:){1,1}(:[0-9a-fA-F]{1,4}){1,6}|:((:[0-9a-fA-F]{1,4}){1,7}|:))$", std::regex::extended); + + data[g_Keys.dashboard.nngaddr] = "tcp://127.0.0.1:5555"; + data[g_Keys.dashboard.enable] = false; } bool CConfigure::ReadData(const std::string &path) @@ -183,6 +187,8 @@ bool CConfigure::ReadData(const std::string &path) section = ESection::ip; else if (0 == hname.compare(JTRANSCODER)) section = ESection::tc; + else if (0 == hname.compare(JDASHBOARD)) + section = ESection::dashboard; else if (0 == hname.compare(JMODULES)) section = ESection::modules; else if (0 == hname.compare(JDPLUS)) @@ -495,6 +501,14 @@ bool CConfigure::ReadData(const std::string &path) else badParam(key); break; + case ESection::dashboard: + if (0 == key.compare(JENABLE)) + data[g_Keys.dashboard.enable] = IS_TRUE(value[0]); + else if (0 == key.compare("NNGAddr")) + data[g_Keys.dashboard.nngaddr] = value; + else + badParam(key); + break; default: std::cout << "WARNING: parameter '" << line << "' defined before any [section]" << std::endl; } @@ -797,6 +811,9 @@ bool CConfigure::ReadData(const std::string &path) if (isDefined(ErrorLevel::fatal, JFILES, JG3TERMINALPATH, g_Keys.files.terminal, rval)) checkFile(JFILES, JG3TERMINALPATH, data[g_Keys.files.terminal]); } + // Dashboard section + isDefined(ErrorLevel::mild, JDASHBOARD, JENABLE, g_Keys.dashboard.enable, rval); + isDefined(ErrorLevel::mild, JDASHBOARD, "NNGAddr", g_Keys.dashboard.nngaddr, rval); return rval; } diff --git a/reflector/Configure.h b/reflector/Configure.h index b76fa3e..9396cf3 100644 --- a/reflector/Configure.h +++ b/reflector/Configure.h @@ -25,7 +25,7 @@ enum class ErrorLevel { fatal, mild }; enum class ERefreshType { file, http, both }; -enum class ESection { none, names, ip, modules, urf, dplus, dextra, dcs, g3, dmrplus, mmdvm, nxdn, bm, ysf, p25, m17, usrp, dmrid, nxdnid, ysffreq, files, tc }; +enum class ESection { none, names, ip, modules, urf, dplus, dextra, dcs, g3, dmrplus, mmdvm, nxdn, bm, ysf, p25, m17, usrp, dmrid, nxdnid, ysffreq, files, tc, dashboard }; #define IS_TRUE(a) ((a)=='t' || (a)=='T' || (a)=='1') diff --git a/reflector/Global.h b/reflector/Global.h index 0239507..e84af3c 100644 --- a/reflector/Global.h +++ b/reflector/Global.h @@ -23,6 +23,7 @@ #include "LookupYsf.h" #include "TCSocket.h" #include "JsonKeys.h" +#include "NNGPublisher.h" extern CReflector g_Reflector; extern CGateKeeper g_GateKeeper; @@ -33,3 +34,4 @@ extern CLookupNxdn g_LNid; extern CLookupYsf g_LYtr; extern SJsonKeys g_Keys; extern CTCServer g_TCServer; +extern CNNGPublisher g_NNGPublisher; diff --git a/reflector/JsonKeys.h b/reflector/JsonKeys.h index 72cfc0d..d04985c 100644 --- a/reflector/JsonKeys.h +++ b/reflector/JsonKeys.h @@ -71,4 +71,7 @@ struct SJsonKeys { struct FILES { const std::string pid, xml, json, white, black, interlink, terminal; } files { "pidFilePath", "xmlFilePath", "jsonFilePath", "whitelistFilePath", "blacklistFilePath", "interlinkFilePath", "g3TerminalFilePath" }; + + struct DASHBOARD { const std::string enable, nngaddr; } + dashboard { "DashboardEnable", "DashboardNNGAddr" }; }; diff --git a/reflector/Main.cpp b/reflector/Main.cpp index eb43f79..ac73544 100644 --- a/reflector/Main.cpp +++ b/reflector/Main.cpp @@ -33,6 +33,7 @@ CLookupDmr g_LDid; CLookupNxdn g_LNid; CLookupYsf g_LYtr; CTCServer g_TCServer; +CNNGPublisher g_NNGPublisher; //////////////////////////////////////////////////////////////////////////////////////// @@ -49,20 +50,26 @@ int main(int argc, char *argv[]) std::cout << "IPv4 binding address is '" << g_Configure.GetString(g_Keys.ip.ipv4bind) << "'" << std::endl; // remove pidfile - const std::string pidpath(g_Configure.GetString(g_Keys.files.pid)); + std::string pidpath = g_Configure.GetString(g_Keys.files.pid); const std::string callsign(g_Configure.GetString(g_Keys.names.callsign)); remove(pidpath.c_str()); // splash std::cout << "Starting " << callsign << " " << g_Version << std::endl; - // and let it run + // start everything if (g_Reflector.Start()) { std::cout << "Error starting reflector" << std::endl; return EXIT_FAILURE; } + // dashboard nng publisher + if (g_Configure.GetBoolean(g_Keys.dashboard.enable)) + { + g_NNGPublisher.Start(g_Configure.GetString(g_Keys.dashboard.nngaddr)); + } + std::cout << "Reflector " << callsign << " started and listening" << std::endl; // write new pid file @@ -72,6 +79,7 @@ int main(int argc, char *argv[]) pause(); // wait for any signal + g_NNGPublisher.Stop(); g_Reflector.Stop(); std::cout << "Reflector stopped" << std::endl; diff --git a/reflector/Makefile b/reflector/Makefile index 3116642..3dd9778 100644 --- a/reflector/Makefile +++ b/reflector/Makefile @@ -32,7 +32,7 @@ else CFLAGS = -W -Werror -std=c++17 -MMD -MD endif -LDFLAGS=-pthread -lcurl +LDFLAGS=-pthread -lcurl -lnng ifeq ($(DHT), true) LDFLAGS += -lopendht diff --git a/reflector/NNGPublisher.cpp b/reflector/NNGPublisher.cpp new file mode 100644 index 0000000..dba5e00 --- /dev/null +++ b/reflector/NNGPublisher.cpp @@ -0,0 +1,57 @@ +#include "NNGPublisher.h" +#include + +CNNGPublisher::CNNGPublisher() + : m_started(false) +{ + m_sock.id = 0; +} + +CNNGPublisher::~CNNGPublisher() +{ + Stop(); +} + +bool CNNGPublisher::Start(const std::string &addr) +{ + std::lock_guard lock(m_mutex); + if (m_started) return true; + + int rv; + if ((rv = nng_pub0_open(&m_sock)) != 0) { + std::cerr << "NNG: Failed to open pub socket: " << nng_strerror(rv) << std::endl; + return false; + } + + if ((rv = nng_listen(m_sock, addr.c_str(), nullptr, 0)) != 0) { + std::cerr << "NNG: Failed to listen on " << addr << ": " << nng_strerror(rv) << std::endl; + nng_close(m_sock); + return false; + } + + m_started = true; + std::cout << "NNG: Publisher started at " << addr << std::endl; + return true; +} + +void CNNGPublisher::Stop() +{ + std::lock_guard lock(m_mutex); + if (!m_started) return; + + nng_close(m_sock); + m_started = false; + std::cout << "NNG: Publisher stopped" << std::endl; +} + +void CNNGPublisher::Publish(const nlohmann::json &event) +{ + std::lock_guard lock(m_mutex); + if (!m_started) return; + + std::string msg = event.dump(); + int rv = nng_send(m_sock, (void *)msg.c_str(), msg.size(), NNG_FLAG_NONBLOCK); + if (rv != 0 && rv != NNG_EAGAIN) { + std::cerr << "NNG: Send error: " << nng_strerror(rv) << std::endl; + } +} diff --git a/reflector/NNGPublisher.h b/reflector/NNGPublisher.h new file mode 100644 index 0000000..478fb20 --- /dev/null +++ b/reflector/NNGPublisher.h @@ -0,0 +1,24 @@ +#pragma once + +#include +#include +#include +#include +#include + +class CNNGPublisher +{ +public: + CNNGPublisher(); + ~CNNGPublisher(); + + bool Start(const std::string &addr); + void Stop(); + + void Publish(const nlohmann::json &event); + +private: + nng_socket m_sock; + std::mutex m_mutex; + bool m_started; +}; diff --git a/reflector/Users.cpp b/reflector/Users.cpp index 31f4591..2d655d6 100644 --- a/reflector/Users.cpp +++ b/reflector/Users.cpp @@ -64,4 +64,14 @@ void CUsers::Hearing(const CCallsign &my, const CCallsign &rpt1, const CCallsign } AddUser(heard); + + // dashboard event + nlohmann::json event; + event["type"] = "hearing"; + event["my"] = my.GetCS(); + event["ur"] = rpt1.GetCS(); + event["rpt1"] = rpt2.GetCS(); + event["rpt2"] = xlx.GetCS(); + event["module"] = std::string(1, xlx.GetCSModule()); + g_NNGPublisher.Publish(event); }