diff --git a/docs/nng.md b/docs/nng.md new file mode 100644 index 0000000..b669a8e --- /dev/null +++ b/docs/nng.md @@ -0,0 +1,129 @@ +# NNG Event System Documentation + +This document describes the real-time event system in `urfd`, which uses NNG (nanomsg next gen) to broadcast system state and activity as JSON. + +## Architecture Overview + +The `urfd` reflector acts as an NNG **Publisher** (PUB). Any number of subscribers (e.g., a middle-tier service or dashboard) can connect as **Subscribers** (SUB) to receive the event stream. + +```mermaid +graph TD + subgraph "urfd Core" + CR["CReflector"] + CC["CClients"] + CU["CUsers"] + PS["CPacketStream"] + end + + subgraph "Publishing Layer" + NP["g_NNGPublisher"] + end + + subgraph "Network" + ADDR["tcp://0.0.0.0:5555"] + end + + subgraph "External" + MT["Middle Tier / Dashboard"] + end + + %% Internal Flows + CC -- "client_connect / client_disconnect" --> NP + CU -- "hearing (activity)" --> NP + CR -- "periodic state report" --> NP + PS -- "IsActive status" --> CR + + %% Network Flow + NP --> ADDR + ADDR -.-> MT +``` + +## Messaging Protocols + +Events are sent as serialized JSON strings. Each message contains a `type` field to identify the payload structure. + +### 1. State Broadcast (`state`) + +Sent periodically based on `DashboardInterval` (default 10s). It provides a full snapshot of the reflector's configuration and status. + +**Payload Structure:** + +```json +{ + "type": "state", + "Configure": { + "Key": "Value", + ... + }, + "Peers": [ + { + "Callsign": "XLX123", + "Modules": "ABC", + "Protocol": "D-Extra", + "ConnectTime": "2023-10-27T10:00:00Z" + } + ], + "Clients": [ + { + "Callsign": "N7TAE", + "OnModule": "A", + "Protocol": "DMR", + "ConnectTime": "2023-10-27T10:05:00Z" + } + ], + "Users": [ + { + "Callsign": "G4XYZ", + "Repeater": "GB3NB", + "OnModule": "B", + "ViaPeer": "XLX456", + "LastHeard": "2023-10-27T10:10:00Z" + } + ], + "ActiveTalkers": [ + { + "Module": "A", + "Callsign": "N7TAE" + } + ] +} +``` + +### 2. Client Connectivity (`client_connect` / `client_disconnect`) + +Triggered immediately when a client (Repeater, Hotspot, or Mobile App) links or unlinks from a module. + +**Payload Structure:** + +```json +{ + "type": "client_connect", + "callsign": "N7TAE", + "ip": "1.2.3.4", + "protocol": "DMR", + "module": "A" +} +``` + +### 3. Voice Activity (`hearing`) + +Triggered when the reflector "hears" an active transmission. This event is sent for every "tick" or heartbeat of voice activity processed by the reflector. + +**Payload Structure:** + +```json +{ + "type": "hearing", + "my": "G4XYZ", + "ur": "CQCQCQ", + "rpt1": "GB3NB", + "rpt2": "XLX123 A", + "module": "A" +} +``` + +## Middle Tier Design Considerations + +1. **Late Joining**: The `state` message is broadcast periodically to ensure a middle-tier connecting at any time (or reconnecting) can synchronize its internal state without waiting for new events. +2. **Active Talkers**: The `ActiveTalkers` array in the `state` message identifies who is currently keyed up. Real-time transitions (start/stop) are driven by the `hearing` events and the absence of such events over a timeout (typically 2-3 seconds). +3. **Deduplication**: The `state` report is a snapshot. If the middle-tier is already tracking events, it can use the `state` report to "re-base" its state and clear out stale data. diff --git a/docs/nng_diagram.png b/docs/nng_diagram.png new file mode 100644 index 0000000..ec1347c Binary files /dev/null and b/docs/nng_diagram.png differ diff --git a/reflector/Configure.cpp b/reflector/Configure.cpp index 2614217..dac5750 100644 --- a/reflector/Configure.cpp +++ b/reflector/Configure.cpp @@ -127,7 +127,9 @@ CConfigure::CConfigure() IPv6RegEx = std::regex("^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}(:[0-9a-fA-F]{1,4}){1,1}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|([0-9a-fA-F]{1,4}:){1,1}(:[0-9a-fA-F]{1,4}){1,6}|:((:[0-9a-fA-F]{1,4}){1,7}|:))$", std::regex::extended); data[g_Keys.dashboard.nngaddr] = "tcp://127.0.0.1:5555"; + data[g_Keys.dashboard.interval] = 10U; data[g_Keys.dashboard.enable] = false; + data[g_Keys.ysf.ysfreflectordb.id] = 0U; } bool CConfigure::ReadData(const std::string &path) @@ -520,6 +522,8 @@ bool CConfigure::ReadData(const std::string &path) data[g_Keys.dashboard.enable] = IS_TRUE(value[0]); else if (0 == key.compare("NNGAddr")) data[g_Keys.dashboard.nngaddr] = value; + else if (0 == key.compare("Interval")) + data[g_Keys.dashboard.interval] = getUnsigned(value, "Dashboard Interval", 1, 3600, 10); else badParam(key); break; @@ -829,6 +833,7 @@ bool CConfigure::ReadData(const std::string &path) // Dashboard section isDefined(ErrorLevel::mild, JDASHBOARD, JENABLE, g_Keys.dashboard.enable, rval); isDefined(ErrorLevel::mild, JDASHBOARD, "NNGAddr", g_Keys.dashboard.nngaddr, rval); + isDefined(ErrorLevel::mild, JDASHBOARD, "Interval", g_Keys.dashboard.interval, rval); return rval; } diff --git a/reflector/JsonKeys.h b/reflector/JsonKeys.h index 1ed145d..469391f 100644 --- a/reflector/JsonKeys.h +++ b/reflector/JsonKeys.h @@ -75,6 +75,6 @@ struct SJsonKeys { struct FILES { const std::string pid, xml, json, white, black, interlink, terminal; } files { "pidFilePath", "xmlFilePath", "jsonFilePath", "whitelistFilePath", "blacklistFilePath", "interlinkFilePath", "g3TerminalFilePath" }; - struct DASHBOARD { const std::string enable, nngaddr; } - dashboard { "DashboardEnable", "DashboardNNGAddr" }; + struct DASHBOARD { const std::string enable, nngaddr, interval; } + dashboard { "DashboardEnable", "DashboardNNGAddr", "DashboardInterval" }; }; diff --git a/reflector/NNGPublisher.cpp b/reflector/NNGPublisher.cpp index dba5e00..5c02ab2 100644 --- a/reflector/NNGPublisher.cpp +++ b/reflector/NNGPublisher.cpp @@ -49,9 +49,16 @@ void CNNGPublisher::Publish(const nlohmann::json &event) std::lock_guard lock(m_mutex); if (!m_started) return; + if (m_sock.id == 0) { + std::cerr << "NNG debug: Cannot publish, socket not initialized." << std::endl; + return; + } std::string msg = event.dump(); + std::cout << "NNG debug: Attempting to publish message of size " << msg.size() << ": " << msg << std::endl; int rv = nng_send(m_sock, (void *)msg.c_str(), msg.size(), NNG_FLAG_NONBLOCK); - if (rv != 0 && rv != NNG_EAGAIN) { + if (rv == 0) { + std::cout << "NNG: Published event: " << event["type"] << std::endl; + } else if (rv != NNG_EAGAIN) { std::cerr << "NNG: Send error: " << nng_strerror(rv) << std::endl; } } diff --git a/reflector/Reflector.cpp b/reflector/Reflector.cpp index 4c19087..5ffec10 100644 --- a/reflector/Reflector.cpp +++ b/reflector/Reflector.cpp @@ -340,9 +340,13 @@ void CReflector::MaintenanceThread() if (g_Configure.Contains(g_Keys.files.json)) jsonpath.assign(g_Configure.GetString(g_Keys.files.json)); auto tcport = g_Configure.GetUnsigned(g_Keys.tc.port); - - if (xmlpath.empty() && jsonpath.empty()) + if (xmlpath.empty() && jsonpath.empty() && !g_Configure.GetBoolean(g_Keys.dashboard.enable)) + { return; // nothing to do + } + + unsigned int nngInterval = g_Configure.GetUnsigned(g_Keys.dashboard.interval); + unsigned int nngCounter = 0; while (keep_running) { @@ -383,6 +387,20 @@ void CReflector::MaintenanceThread() // and wait a bit and do something useful at the same time for (int i=0; i< XML_UPDATE_PERIOD*10 && keep_running; i++) { + // NNG periodic state update + if (g_Configure.GetBoolean(g_Keys.dashboard.enable)) + { + if (++nngCounter >= (nngInterval * 10)) + { + nngCounter = 0; + std::cout << "NNG debug: Periodic state broadcast..." << std::endl; + nlohmann::json state; + state["type"] = "state"; + JsonReport(state); + g_NNGPublisher.Publish(state); + } + } + if (tcport && g_TCServer.AnyAreClosed()) { if (g_TCServer.Accept()) @@ -391,6 +409,7 @@ void CReflector::MaintenanceThread() abort(); } } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } @@ -408,6 +427,16 @@ std::shared_ptr CReflector::GetStream(char module) return nullptr; } +bool CReflector::IsAnyStreamOpen() +{ + for (auto it=m_Stream.begin(); it!=m_Stream.end(); it++) + { + if ( it->second->IsOpen() ) + return true; + } + return false; +} + bool CReflector::IsStreamOpen(const std::unique_ptr &DvHeader) { for (auto it=m_Stream.begin(); it!=m_Stream.end(); it++) @@ -456,6 +485,18 @@ void CReflector::JsonReport(nlohmann::json &report) for (auto uid=users->begin(); uid!=users->end(); uid++) (*uid).JsonReport(report); ReleaseUsers(); + + report["ActiveTalkers"] = nlohmann::json::array(); + for (auto const& [module, stream] : m_Stream) + { + if (stream->IsOpen()) + { + nlohmann::json jactive; + jactive["Module"] = std::string(1, module); + jactive["Callsign"] = stream->GetUserCallsign().GetCS(); + report["ActiveTalkers"].push_back(jactive); + } + } } void CReflector::WriteXmlFile(std::ofstream &xmlFile) diff --git a/reflector/Reflector.h b/reflector/Reflector.h index dd260f3..52969f5 100644 --- a/reflector/Reflector.h +++ b/reflector/Reflector.h @@ -92,6 +92,7 @@ protected: // streams std::shared_ptr GetStream(char); + bool IsAnyStreamOpen(void); bool IsStreamOpen(const std::unique_ptr &); char GetStreamModule(std::shared_ptr);