diff --git a/.gitignore b/.gitignore index 949061f..b081193 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,9 @@ reflector/urfd.* urfd inicheck dbutil +.devcontainer/ +/test_urfd.ini +/staging_urfd.ini +/pr_comment_nng.md +/pr_body_fix.md +/staging/ diff --git a/docs/nng.md b/docs/nng.md new file mode 100644 index 0000000..5503406 --- /dev/null +++ b/docs/nng.md @@ -0,0 +1,145 @@ +# NNG Event System Documentation + +This document describes the real-time event system in `urfd`, which uses NNG (nanomsg next gen) to broadcast system state and activity as JSON. + +## Architecture Overview + +The `urfd` reflector acts as an NNG **Publisher** (PUB). Any number of subscribers (e.g., a middle-tier service or dashboard) can connect as **Subscribers** (SUB) to receive the event stream. + +```mermaid +graph TD + subgraph "urfd Core" + CR["CReflector"] + CC["CClients"] + CU["CUsers"] + PS["CPacketStream"] + end + + subgraph "Publishing Layer" + NP["g_NNGPublisher"] + end + + subgraph "Network" + ADDR["tcp://0.0.0.0:5555"] + end + + subgraph "External" + MT["Middle Tier / Dashboard"] + end + + %% Internal Flows + CC -- "client_connect / client_disconnect" --> NP + CU -- "hearing / closing" --> NP + CR -- "periodic state report" --> NP + PS -- "IsActive status" --> CR + + %% Network Flow + NP --> ADDR + ADDR -.-> MT +``` + +## Messaging Protocols + +Events are sent as serialized JSON strings. Each message contains a `type` field to identify the payload structure. + +### 1. State Broadcast (`state`) + +Sent periodically based on `DashboardInterval` (default 10s). It provides a full snapshot of the reflector's configuration and status. + +**Payload Structure:** + +```json +{ + "type": "state", + "Configure": { + "Key": "Value", + ... + }, + "Peers": [ + { + "Callsign": "XLX123", + "Modules": "ABC", + "Protocol": "D-Extra", + "ConnectTime": "2023-10-27T10:00:00Z" + } + ], + "Clients": [ + { + "Callsign": "N7TAE", + "OnModule": "A", + "Protocol": "DMR", + "ConnectTime": "2023-10-27T10:05:00Z" + } + ], + "Users": [ + { + "Callsign": "G4XYZ", + "Repeater": "GB3NB", + "OnModule": "B", + "ViaPeer": "XLX456", + "LastHeard": "2023-10-27T10:10:00Z" + } + ], + "ActiveTalkers": [ + { + "Module": "A", + "Callsign": "N7TAE" + } + ] +} +``` + +### 2. Client Connectivity (`client_connect` / `client_disconnect`) + +Triggered immediately when a client (Repeater, Hotspot, or Mobile App) links or unlinks from a module. + +**Payload Structure:** + +```json +{ + "type": "client_connect", + "callsign": "N7TAE", + "ip": "1.2.3.4", + "protocol": "DMR", + "module": "A" +} +``` + +### 3. Voice Activity (`hearing`) + +Triggered when the reflector "hears" an active transmission. This event is sent for every "tick" or heartbeat of voice activity processed by the reflector. + +**Payload Structure:** + +```json +{ + "type": "hearing", + "my": "G4XYZ", + "ur": "CQCQCQ", + "rpt1": "GB3NB", + "rpt2": "XLX123 A", + "module": "A", + "protocol": "M17" +} +``` + +### 4. Transmission End (`closing`) + +Triggered when a transmission stream is closed (user stops talking). + +**Payload Structure:** + +```json +{ + "type": "closing", + "my": "G4XYZ", + "module": "A", + "protocol": "M17" +} +``` + +## Middle Tier Design Considerations + +1. **Late Joining**: The `state` message is broadcast periodically to ensure a middle-tier connecting at any time (or reconnecting) can synchronize its internal state without waiting for new events. +2. **Active Talkers**: The `ActiveTalkers` array in the `state` message identifies who is currently keyed up. Real-time transitions (start/stop) are driven by the `hearing` events and the absence of such events over a timeout (typically 2-3 seconds). +3. **Deduplication**: The `state` report is a snapshot. If the middle-tier is already tracking events, it can use the `state` report to "re-base" its state and clear out stale data. diff --git a/docs/nng_diagram.png b/docs/nng_diagram.png new file mode 100644 index 0000000..ec1347c Binary files /dev/null and b/docs/nng_diagram.png differ diff --git a/reflector/BMProtocol.cpp b/reflector/BMProtocol.cpp index 3f211b8..92942d1 100644 --- a/reflector/BMProtocol.cpp +++ b/reflector/BMProtocol.cpp @@ -368,7 +368,7 @@ void CBMProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, c // release g_Reflector.ReleaseClients(); // update last heard - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, peer); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, peer, EProtocol::bm); g_Reflector.ReleaseUsers(); } } diff --git a/reflector/Clients.cpp b/reflector/Clients.cpp index 74e7b66..e5e22c3 100644 --- a/reflector/Clients.cpp +++ b/reflector/Clients.cpp @@ -43,14 +43,11 @@ CClients::~CClients() void CClients::AddClient(std::shared_ptr client) { // first check if client already exists - for ( auto it=begin(); it!=end(); it++ ) + for ( auto it=m_Clients.begin(); it!=m_Clients.end(); it++ ) { if (*client == *(*it)) - // if found, just do nothing - // so *client keep pointing on a valid object - // on function return { - // delete new one + // if found, just do nothing return; } } @@ -63,13 +60,21 @@ void CClients::AddClient(std::shared_ptr client) std::cout << " on module " << client->GetReflectorModule(); } std::cout << std::endl; + + // dashboard event + nlohmann::json event; + event["type"] = "client_connect"; + event["callsign"] = client->GetCallsign().GetCS(); + event["ip"] = client->GetIp().GetAddress(); + event["protocol"] = client->GetProtocolName(); + event["module"] = std::string(1, client->GetReflectorModule()); + g_NNGPublisher.Publish(event); } void CClients::RemoveClient(std::shared_ptr client) { // look for the client - bool found = false; - for ( auto it=begin(); it!=end(); it++ ) + for ( auto it=m_Clients.begin(); it!=m_Clients.end(); it++ ) { // compare object pointers if ( *it == client ) @@ -84,6 +89,16 @@ void CClients::RemoveClient(std::shared_ptr client) std::cout << " on module " << (*it)->GetReflectorModule(); } std::cout << std::endl; + + // dashboard event + nlohmann::json event; + event["type"] = "client_disconnect"; + event["callsign"] = (*it)->GetCallsign().GetCS(); + event["ip"] = (*it)->GetIp().GetAddress(); + event["protocol"] = (*it)->GetProtocolName(); + event["module"] = std::string(1, (*it)->GetReflectorModule()); + g_NNGPublisher.Publish(event); + m_Clients.erase(it); break; } diff --git a/reflector/Configure.cpp b/reflector/Configure.cpp index a452302..37a76a9 100644 --- a/reflector/Configure.cpp +++ b/reflector/Configure.cpp @@ -38,6 +38,7 @@ #define JBRANDMEISTER "Brandmeister" #define JCALLSIGN "Callsign" #define JCOUNTRY "Country" +#define JDASHBOARD "Dashboard" #define JDASHBOARDURL "DashboardUrl" #define JDCS "DCS" #define JDEFAULTID "DefaultId" @@ -123,6 +124,12 @@ CConfigure::CConfigure() { IPv4RegEx = std::regex("^((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])\\.){3,3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9]){1,1}$", std::regex::extended); IPv6RegEx = std::regex("^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}(:[0-9a-fA-F]{1,4}){1,1}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|([0-9a-fA-F]{1,4}:){1,1}(:[0-9a-fA-F]{1,4}){1,6}|:((:[0-9a-fA-F]{1,4}){1,7}|:))$", std::regex::extended); + + data[g_Keys.dashboard.nngaddr] = "tcp://127.0.0.1:5555"; + data[g_Keys.dashboard.interval] = 10U; + data[g_Keys.dashboard.enable] = false; + data[g_Keys.dashboard.debug] = false; + data[g_Keys.ysf.ysfreflectordb.id] = 0U; } bool CConfigure::ReadData(const std::string &path) @@ -184,6 +191,8 @@ bool CConfigure::ReadData(const std::string &path) section = ESection::ip; else if (0 == hname.compare(JTRANSCODER)) section = ESection::tc; + else if (0 == hname.compare(JDASHBOARD)) + section = ESection::dashboard; else if (0 == hname.compare(JMODULES)) section = ESection::modules; else if (0 == hname.compare(JDPLUS)) @@ -498,6 +507,18 @@ bool CConfigure::ReadData(const std::string &path) else badParam(key); break; + case ESection::dashboard: + if (0 == key.compare(JENABLE)) + data[g_Keys.dashboard.enable] = IS_TRUE(value[0]); + else if (0 == key.compare("NNGAddr")) + data[g_Keys.dashboard.nngaddr] = value; + else if (0 == key.compare("Interval")) + data[g_Keys.dashboard.interval] = getUnsigned(value, "Dashboard Interval", 1, 3600, 10); + else if (0 == key.compare("NNGDebug")) + data[g_Keys.dashboard.debug] = IS_TRUE(value[0]); + else + badParam(key); + break; default: std::cout << "WARNING: parameter '" << line << "' defined before any [section]" << std::endl; } @@ -801,6 +822,10 @@ bool CConfigure::ReadData(const std::string &path) if (isDefined(ErrorLevel::fatal, JFILES, JG3TERMINALPATH, g_Keys.files.terminal, rval)) checkFile(JFILES, JG3TERMINALPATH, data[g_Keys.files.terminal]); } + // Dashboard section + isDefined(ErrorLevel::mild, JDASHBOARD, JENABLE, g_Keys.dashboard.enable, rval); + isDefined(ErrorLevel::mild, JDASHBOARD, "NNGAddr", g_Keys.dashboard.nngaddr, rval); + isDefined(ErrorLevel::mild, JDASHBOARD, "Interval", g_Keys.dashboard.interval, rval); return rval; } diff --git a/reflector/Configure.h b/reflector/Configure.h index b76fa3e..9396cf3 100644 --- a/reflector/Configure.h +++ b/reflector/Configure.h @@ -25,7 +25,7 @@ enum class ErrorLevel { fatal, mild }; enum class ERefreshType { file, http, both }; -enum class ESection { none, names, ip, modules, urf, dplus, dextra, dcs, g3, dmrplus, mmdvm, nxdn, bm, ysf, p25, m17, usrp, dmrid, nxdnid, ysffreq, files, tc }; +enum class ESection { none, names, ip, modules, urf, dplus, dextra, dcs, g3, dmrplus, mmdvm, nxdn, bm, ysf, p25, m17, usrp, dmrid, nxdnid, ysffreq, files, tc, dashboard }; #define IS_TRUE(a) ((a)=='t' || (a)=='T' || (a)=='1') diff --git a/reflector/DCSProtocol.cpp b/reflector/DCSProtocol.cpp index 29dc576..3821dac 100644 --- a/reflector/DCSProtocol.cpp +++ b/reflector/DCSProtocol.cpp @@ -208,7 +208,7 @@ void CDcsProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, g_Reflector.ReleaseClients(); // update last heard - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::dcs); g_Reflector.ReleaseUsers(); } } diff --git a/reflector/DExtraProtocol.cpp b/reflector/DExtraProtocol.cpp index 88ed7ef..32be8d2 100644 --- a/reflector/DExtraProtocol.cpp +++ b/reflector/DExtraProtocol.cpp @@ -351,7 +351,7 @@ void CDextraProtocol::OnDvHeaderPacketIn(std::unique_ptr &Heade g_Reflector.ReleaseClients(); // update last heard - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::dextra); g_Reflector.ReleaseUsers(); } } diff --git a/reflector/DMRMMDVMProtocol.cpp b/reflector/DMRMMDVMProtocol.cpp index 5f10ba7..c36c921 100644 --- a/reflector/DMRMMDVMProtocol.cpp +++ b/reflector/DMRMMDVMProtocol.cpp @@ -335,7 +335,7 @@ void CDmrmmdvmProtocol::OnDvHeaderPacketIn(std::unique_ptr &Hea // update last heard if ( lastheard ) { - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::dmrmmdvm); g_Reflector.ReleaseUsers(); } } diff --git a/reflector/DMRPlusProtocol.cpp b/reflector/DMRPlusProtocol.cpp index fefc957..d257eb9 100644 --- a/reflector/DMRPlusProtocol.cpp +++ b/reflector/DMRPlusProtocol.cpp @@ -208,7 +208,7 @@ void CDmrplusProtocol::OnDvHeaderPacketIn(std::unique_ptr &Head // release g_Reflector.ReleaseClients(); // update last heard - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, EProtocol::dmrplus); g_Reflector.ReleaseUsers(); } } diff --git a/reflector/DPlusProtocol.cpp b/reflector/DPlusProtocol.cpp index 14682fe..ce908f4 100644 --- a/reflector/DPlusProtocol.cpp +++ b/reflector/DPlusProtocol.cpp @@ -213,7 +213,7 @@ void CDplusProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header g_Reflector.ReleaseClients(); // update last heard - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::dplus); g_Reflector.ReleaseUsers(); } else diff --git a/reflector/G3Protocol.cpp b/reflector/G3Protocol.cpp index 8d5e24b..97c5b47 100644 --- a/reflector/G3Protocol.cpp +++ b/reflector/G3Protocol.cpp @@ -570,7 +570,7 @@ void CG3Protocol::OnDvHeaderPacketIn(std::unique_ptr &Header, c } // update last heard - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, EProtocol::g3); g_Reflector.ReleaseUsers(); } } diff --git a/reflector/GateKeeper.h b/reflector/GateKeeper.h index 619dd86..d6baabd 100644 --- a/reflector/GateKeeper.h +++ b/reflector/GateKeeper.h @@ -47,6 +47,7 @@ public: // authorizations bool MayLink(const CCallsign &, const CIp &, const EProtocol, char * = nullptr) const; bool MayTransmit(const CCallsign &, const CIp &, EProtocol = EProtocol::any, char = ' ') const; + const std::string ProtocolName(EProtocol) const; protected: // thread @@ -56,7 +57,6 @@ protected: bool IsNodeListedOk(const std::string &) const; bool IsPeerListedOk(const std::string &, char) const; bool IsPeerListedOk(const std::string &, const CIp &, char *) const; - const std::string ProtocolName(EProtocol) const; protected: // data diff --git a/reflector/Global.h b/reflector/Global.h index 0239507..e84af3c 100644 --- a/reflector/Global.h +++ b/reflector/Global.h @@ -23,6 +23,7 @@ #include "LookupYsf.h" #include "TCSocket.h" #include "JsonKeys.h" +#include "NNGPublisher.h" extern CReflector g_Reflector; extern CGateKeeper g_GateKeeper; @@ -33,3 +34,4 @@ extern CLookupNxdn g_LNid; extern CLookupYsf g_LYtr; extern SJsonKeys g_Keys; extern CTCServer g_TCServer; +extern CNNGPublisher g_NNGPublisher; diff --git a/reflector/JsonKeys.h b/reflector/JsonKeys.h index 64a86e9..d9a783f 100644 --- a/reflector/JsonKeys.h +++ b/reflector/JsonKeys.h @@ -71,4 +71,7 @@ struct SJsonKeys { struct FILES { const std::string pid, xml, json, white, black, interlink, terminal; } files { "pidFilePath", "xmlFilePath", "jsonFilePath", "whitelistFilePath", "blacklistFilePath", "interlinkFilePath", "g3TerminalFilePath" }; + + struct DASHBOARD { const std::string enable, nngaddr, interval, debug; } + dashboard { "DashboardEnable", "DashboardNNGAddr", "DashboardInterval", "NNGDebug" }; }; diff --git a/reflector/M17Protocol.cpp b/reflector/M17Protocol.cpp index d51dfbd..3ecce1f 100644 --- a/reflector/M17Protocol.cpp +++ b/reflector/M17Protocol.cpp @@ -23,6 +23,12 @@ #include "M17Packet.h" #include "Global.h" +//////////////////////////////////////////////////////////////////////////////////////// +// constructor +CM17Protocol::CM17Protocol() : CSEProtocol() +{ +} + //////////////////////////////////////////////////////////////////////////////////////// // operation @@ -209,7 +215,9 @@ void CM17Protocol::OnDvHeaderPacketIn(std::unique_ptr &Header, g_Reflector.ReleaseClients(); // update last heard - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2); + CCallsign reflectorCall = rpt2; + reflectorCall.SetCSModule(Header->GetRpt2Module()); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, reflectorCall, EProtocol::m17); g_Reflector.ReleaseUsers(); } } @@ -411,3 +419,15 @@ void CM17Protocol::EncodeM17Packet(SM17Frame &frame, const CDvHeaderPacket &Head frame.streamid = Header.GetStreamId(); // no host<--->network byte swapping since we never do any math on this value // the CRC will be set in HandleQueue, after lich.dest is set } + +bool CM17Protocol::EncodeDvHeaderPacket(const CDvHeaderPacket &packet, CBuffer &buffer) const +{ + packet.EncodeInterlinkPacket(buffer); + return true; +} + +bool CM17Protocol::EncodeDvFramePacket(const CDvFramePacket &packet, CBuffer &buffer) const +{ + packet.EncodeInterlinkPacket(buffer); + return true; +} diff --git a/reflector/Main.cpp b/reflector/Main.cpp index eb43f79..ac73544 100644 --- a/reflector/Main.cpp +++ b/reflector/Main.cpp @@ -33,6 +33,7 @@ CLookupDmr g_LDid; CLookupNxdn g_LNid; CLookupYsf g_LYtr; CTCServer g_TCServer; +CNNGPublisher g_NNGPublisher; //////////////////////////////////////////////////////////////////////////////////////// @@ -49,20 +50,26 @@ int main(int argc, char *argv[]) std::cout << "IPv4 binding address is '" << g_Configure.GetString(g_Keys.ip.ipv4bind) << "'" << std::endl; // remove pidfile - const std::string pidpath(g_Configure.GetString(g_Keys.files.pid)); + std::string pidpath = g_Configure.GetString(g_Keys.files.pid); const std::string callsign(g_Configure.GetString(g_Keys.names.callsign)); remove(pidpath.c_str()); // splash std::cout << "Starting " << callsign << " " << g_Version << std::endl; - // and let it run + // start everything if (g_Reflector.Start()) { std::cout << "Error starting reflector" << std::endl; return EXIT_FAILURE; } + // dashboard nng publisher + if (g_Configure.GetBoolean(g_Keys.dashboard.enable)) + { + g_NNGPublisher.Start(g_Configure.GetString(g_Keys.dashboard.nngaddr)); + } + std::cout << "Reflector " << callsign << " started and listening" << std::endl; // write new pid file @@ -72,6 +79,7 @@ int main(int argc, char *argv[]) pause(); // wait for any signal + g_NNGPublisher.Stop(); g_Reflector.Stop(); std::cout << "Reflector stopped" << std::endl; diff --git a/reflector/Makefile b/reflector/Makefile index 3116642..3dd9778 100644 --- a/reflector/Makefile +++ b/reflector/Makefile @@ -32,7 +32,7 @@ else CFLAGS = -W -Werror -std=c++17 -MMD -MD endif -LDFLAGS=-pthread -lcurl +LDFLAGS=-pthread -lcurl -lnng ifeq ($(DHT), true) LDFLAGS += -lopendht diff --git a/reflector/NNGPublisher.cpp b/reflector/NNGPublisher.cpp new file mode 100644 index 0000000..9f87347 --- /dev/null +++ b/reflector/NNGPublisher.cpp @@ -0,0 +1,66 @@ +#include "NNGPublisher.h" +#include "Global.h" +#include + +CNNGPublisher::CNNGPublisher() + : m_started(false) +{ + m_sock.id = 0; +} + +CNNGPublisher::~CNNGPublisher() +{ + Stop(); +} + +bool CNNGPublisher::Start(const std::string &addr) +{ + std::lock_guard lock(m_mutex); + if (m_started) return true; + + int rv; + if ((rv = nng_pub0_open(&m_sock)) != 0) { + std::cerr << "NNG: Failed to open pub socket: " << nng_strerror(rv) << std::endl; + return false; + } + + if ((rv = nng_listen(m_sock, addr.c_str(), nullptr, 0)) != 0) { + std::cerr << "NNG: Failed to listen on " << addr << ": " << nng_strerror(rv) << std::endl; + nng_close(m_sock); + return false; + } + + m_started = true; + std::cout << "NNG: Publisher started at " << addr << std::endl; + return true; +} + +void CNNGPublisher::Stop() +{ + std::lock_guard lock(m_mutex); + if (!m_started) return; + + nng_close(m_sock); + m_started = false; + std::cout << "NNG: Publisher stopped" << std::endl; +} + +void CNNGPublisher::Publish(const nlohmann::json &event) +{ + std::lock_guard lock(m_mutex); + if (!m_started) return; + + if (m_sock.id == 0) { + std::cerr << "NNG debug: Cannot publish, socket not initialized." << std::endl; + return; + } + std::string msg = event.dump(); + if (g_Configure.GetBoolean(g_Keys.dashboard.debug)) + std::cout << "NNG debug: Attempting to publish message of size " << msg.size() << ": " << msg << std::endl; + int rv = nng_send(m_sock, (void *)msg.c_str(), msg.size(), NNG_FLAG_NONBLOCK); + if (rv == 0) { + std::cout << "NNG: Published event: " << event["type"] << std::endl; + } else if (rv != NNG_EAGAIN) { + std::cerr << "NNG: Send error: " << nng_strerror(rv) << std::endl; + } +} diff --git a/reflector/NNGPublisher.h b/reflector/NNGPublisher.h new file mode 100644 index 0000000..478fb20 --- /dev/null +++ b/reflector/NNGPublisher.h @@ -0,0 +1,24 @@ +#pragma once + +#include +#include +#include +#include +#include + +class CNNGPublisher +{ +public: + CNNGPublisher(); + ~CNNGPublisher(); + + bool Start(const std::string &addr); + void Stop(); + + void Publish(const nlohmann::json &event); + +private: + nng_socket m_sock; + std::mutex m_mutex; + bool m_started; +}; diff --git a/reflector/NXDNProtocol.cpp b/reflector/NXDNProtocol.cpp index e5d2b0e..58638d2 100644 --- a/reflector/NXDNProtocol.cpp +++ b/reflector/NXDNProtocol.cpp @@ -235,7 +235,7 @@ void CNXDNProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, // update last heard if ( g_Reflector.IsValidModule(rpt2.GetCSModule()) ) { - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::nxdn); g_Reflector.ReleaseUsers(); } } diff --git a/reflector/P25Protocol.cpp b/reflector/P25Protocol.cpp index 0c52204..011eec8 100644 --- a/reflector/P25Protocol.cpp +++ b/reflector/P25Protocol.cpp @@ -219,7 +219,7 @@ void CP25Protocol::OnDvHeaderPacketIn(std::unique_ptr &Header, g_Reflector.ReleaseClients(); // update last heard - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::p25); g_Reflector.ReleaseUsers(); } } diff --git a/reflector/Reflector.cpp b/reflector/Reflector.cpp index 4c19087..3ec3413 100644 --- a/reflector/Reflector.cpp +++ b/reflector/Reflector.cpp @@ -276,6 +276,10 @@ void CReflector::CloseStream(std::shared_ptr stream) // notify //OnStreamClose(stream->GetUserCallsign()); + // dashboard event + GetUsers()->Closing(stream->GetUserCallsign(), GetStreamModule(stream), stream->GetOwnerClient()->GetProtocol()); + ReleaseUsers(); + std::cout << "Closing stream of module " << GetStreamModule(stream) << std::endl; } @@ -340,9 +344,13 @@ void CReflector::MaintenanceThread() if (g_Configure.Contains(g_Keys.files.json)) jsonpath.assign(g_Configure.GetString(g_Keys.files.json)); auto tcport = g_Configure.GetUnsigned(g_Keys.tc.port); - - if (xmlpath.empty() && jsonpath.empty()) + if (xmlpath.empty() && jsonpath.empty() && !g_Configure.GetBoolean(g_Keys.dashboard.enable)) + { return; // nothing to do + } + + unsigned int nngInterval = g_Configure.GetUnsigned(g_Keys.dashboard.interval); + unsigned int nngCounter = 0; while (keep_running) { @@ -383,6 +391,20 @@ void CReflector::MaintenanceThread() // and wait a bit and do something useful at the same time for (int i=0; i< XML_UPDATE_PERIOD*10 && keep_running; i++) { + // NNG periodic state update + if (g_Configure.GetBoolean(g_Keys.dashboard.enable)) + { + if (++nngCounter >= (nngInterval * 10)) + { + nngCounter = 0; + std::cout << "NNG debug: Periodic state broadcast..." << std::endl; + nlohmann::json state; + state["type"] = "state"; + JsonReport(state); + g_NNGPublisher.Publish(state); + } + } + if (tcport && g_TCServer.AnyAreClosed()) { if (g_TCServer.Accept()) @@ -391,6 +413,7 @@ void CReflector::MaintenanceThread() abort(); } } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } @@ -408,6 +431,16 @@ std::shared_ptr CReflector::GetStream(char module) return nullptr; } +bool CReflector::IsAnyStreamOpen() +{ + for (auto it=m_Stream.begin(); it!=m_Stream.end(); it++) + { + if ( it->second->IsOpen() ) + return true; + } + return false; +} + bool CReflector::IsStreamOpen(const std::unique_ptr &DvHeader) { for (auto it=m_Stream.begin(); it!=m_Stream.end(); it++) @@ -456,6 +489,18 @@ void CReflector::JsonReport(nlohmann::json &report) for (auto uid=users->begin(); uid!=users->end(); uid++) (*uid).JsonReport(report); ReleaseUsers(); + + report["ActiveTalkers"] = nlohmann::json::array(); + for (auto const& [module, stream] : m_Stream) + { + if (stream->IsOpen()) + { + nlohmann::json jactive; + jactive["Module"] = std::string(1, module); + jactive["Callsign"] = stream->GetUserCallsign().GetCS(); + report["ActiveTalkers"].push_back(jactive); + } + } } void CReflector::WriteXmlFile(std::ofstream &xmlFile) diff --git a/reflector/Reflector.h b/reflector/Reflector.h index dd260f3..52969f5 100644 --- a/reflector/Reflector.h +++ b/reflector/Reflector.h @@ -92,6 +92,7 @@ protected: // streams std::shared_ptr GetStream(char); + bool IsAnyStreamOpen(void); bool IsStreamOpen(const std::unique_ptr &); char GetStreamModule(std::shared_ptr); diff --git a/reflector/URFProtocol.cpp b/reflector/URFProtocol.cpp index 6a5bf3f..cdac493 100644 --- a/reflector/URFProtocol.cpp +++ b/reflector/URFProtocol.cpp @@ -411,7 +411,9 @@ void CURFProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, // release g_Reflector.ReleaseClients(); // update last heard - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, peer); + CCallsign xlx = rpt2; + xlx.SetCSModule(Header->GetRpt2Module()); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, xlx, EProtocol::urf); g_Reflector.ReleaseUsers(); } } diff --git a/reflector/USRPProtocol.cpp b/reflector/USRPProtocol.cpp index fe794ae..4fa58c2 100644 --- a/reflector/USRPProtocol.cpp +++ b/reflector/USRPProtocol.cpp @@ -225,7 +225,7 @@ void CUSRPProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, g_Reflector.ReleaseClients(); // update last heard - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, EProtocol::usrp); g_Reflector.ReleaseUsers(); } } diff --git a/reflector/Users.cpp b/reflector/Users.cpp index 31f4591..80b2faf 100644 --- a/reflector/Users.cpp +++ b/reflector/Users.cpp @@ -44,12 +44,12 @@ void CUsers::AddUser(const CUser &user) //////////////////////////////////////////////////////////////////////////////////////// // operation -void CUsers::Hearing(const CCallsign &my, const CCallsign &rpt1, const CCallsign &rpt2) +void CUsers::Hearing(const CCallsign &my, const CCallsign &rpt1, const CCallsign &rpt2, EProtocol protocol) { - Hearing(my, rpt1, rpt2, g_Reflector.GetCallsign()); + Hearing(my, rpt1, rpt2, g_Reflector.GetCallsign(), protocol); } -void CUsers::Hearing(const CCallsign &my, const CCallsign &rpt1, const CCallsign &rpt2, const CCallsign &xlx) +void CUsers::Hearing(const CCallsign &my, const CCallsign &rpt1, const CCallsign &rpt2, const CCallsign &xlx, EProtocol protocol) { CUser heard(my, rpt1, rpt2, xlx); @@ -64,4 +64,26 @@ void CUsers::Hearing(const CCallsign &my, const CCallsign &rpt1, const CCallsign } AddUser(heard); + + // dashboard event + nlohmann::json event; + event["type"] = "hearing"; + event["my"] = my.GetCS(); + event["ur"] = rpt1.GetCS(); + event["rpt1"] = rpt2.GetCS(); + event["rpt2"] = xlx.GetCS(); + event["module"] = std::string(1, xlx.GetCSModule()); + event["protocol"] = g_GateKeeper.ProtocolName(protocol); + g_NNGPublisher.Publish(event); +} + +void CUsers::Closing(const CCallsign &my, char module, EProtocol protocol) +{ + // dashboard event + nlohmann::json event; + event["type"] = "closing"; + event["my"] = my.GetCS(); + event["module"] = std::string(1, module); + event["protocol"] = g_GateKeeper.ProtocolName(protocol); + g_NNGPublisher.Publish(event); } diff --git a/reflector/Users.h b/reflector/Users.h index da8a680..9638ebd 100644 --- a/reflector/Users.h +++ b/reflector/Users.h @@ -22,6 +22,7 @@ #include #include "User.h" +#include "Defines.h" class CUsers { @@ -47,8 +48,9 @@ public: std::list::const_iterator cend() { return m_Users.cend(); } // operation - void Hearing(const CCallsign &, const CCallsign &, const CCallsign &); - void Hearing(const CCallsign &, const CCallsign &, const CCallsign &, const CCallsign &); + void Hearing(const CCallsign &, const CCallsign &, const CCallsign &, EProtocol protocol = EProtocol::none); + void Hearing(const CCallsign &, const CCallsign &, const CCallsign &, const CCallsign &, EProtocol protocol); + void Closing(const CCallsign &, char module, EProtocol protocol); protected: // data diff --git a/reflector/YSFProtocol.cpp b/reflector/YSFProtocol.cpp index 4c8e6ab..83436c1 100644 --- a/reflector/YSFProtocol.cpp +++ b/reflector/YSFProtocol.cpp @@ -304,7 +304,7 @@ void CYsfProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, // update last heard if ( g_Reflector.IsValidModule(rpt2.GetCSModule()) ) { - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::ysf); g_Reflector.ReleaseUsers(); } } @@ -489,7 +489,7 @@ bool CYsfProtocol::IsValidDvHeaderPacket(const CIp &Ip, const CYSFFICH &Fich, co sz[YSF_CALLSIGN_LENGTH] = 0; CCallsign rpt1 = CCallsign((const char *)sz); rpt1.SetCSModule(YSF_MODULE_ID); - CCallsign rpt2 = m_ReflectorCallsign; + CCallsign rpt2 = g_Reflector.GetCallsign(); // as YSF protocol does not provide a module-tranlatable // destid, set module to none and rely on OnDvHeaderPacketIn() // to later fill it with proper value @@ -542,7 +542,7 @@ bool CYsfProtocol::IsValidDvFramePacket(const CIp &Ip, const CYSFFICH &Fich, con sz[YSF_CALLSIGN_LENGTH] = 0; CCallsign rpt1 = CCallsign((const char *)sz); rpt1.SetCSModule(YSF_MODULE_ID); - CCallsign rpt2 = m_ReflectorCallsign; + CCallsign rpt2 = g_Reflector.GetCallsign(); rpt2.SetCSModule(' '); header = std::unique_ptr(new CDvHeaderPacket(csMY, CCallsign("CQCQCQ"), rpt1, rpt2, uiStreamId, Fich.getFN()));