refactor: implement active talker state and always-on periodic NNG broadcast

pull/20/head
Dave Behnke 1 month ago
parent 1120e64b68
commit ba6a5dfcfa

@ -0,0 +1,129 @@
# NNG Event System Documentation
This document describes the real-time event system in `urfd`, which uses NNG (nanomsg next gen) to broadcast system state and activity as JSON.
## Architecture Overview
The `urfd` reflector acts as an NNG **Publisher** (PUB). Any number of subscribers (e.g., a middle-tier service or dashboard) can connect as **Subscribers** (SUB) to receive the event stream.
```mermaid
graph TD
subgraph "urfd Core"
CR["CReflector"]
CC["CClients"]
CU["CUsers"]
PS["CPacketStream"]
end
subgraph "Publishing Layer"
NP["g_NNGPublisher"]
end
subgraph "Network"
ADDR["tcp://0.0.0.0:5555"]
end
subgraph "External"
MT["Middle Tier / Dashboard"]
end
%% Internal Flows
CC -- "client_connect / client_disconnect" --> NP
CU -- "hearing (activity)" --> NP
CR -- "periodic state report" --> NP
PS -- "IsActive status" --> CR
%% Network Flow
NP --> ADDR
ADDR -.-> MT
```
## Messaging Protocols
Events are sent as serialized JSON strings. Each message contains a `type` field to identify the payload structure.
### 1. State Broadcast (`state`)
Sent periodically based on `DashboardInterval` (default 10s). It provides a full snapshot of the reflector's configuration and status.
**Payload Structure:**
```json
{
"type": "state",
"Configure": {
"Key": "Value",
...
},
"Peers": [
{
"Callsign": "XLX123",
"Modules": "ABC",
"Protocol": "D-Extra",
"ConnectTime": "2023-10-27T10:00:00Z"
}
],
"Clients": [
{
"Callsign": "N7TAE",
"OnModule": "A",
"Protocol": "DMR",
"ConnectTime": "2023-10-27T10:05:00Z"
}
],
"Users": [
{
"Callsign": "G4XYZ",
"Repeater": "GB3NB",
"OnModule": "B",
"ViaPeer": "XLX456",
"LastHeard": "2023-10-27T10:10:00Z"
}
],
"ActiveTalkers": [
{
"Module": "A",
"Callsign": "N7TAE"
}
]
}
```
### 2. Client Connectivity (`client_connect` / `client_disconnect`)
Triggered immediately when a client (Repeater, Hotspot, or Mobile App) links or unlinks from a module.
**Payload Structure:**
```json
{
"type": "client_connect",
"callsign": "N7TAE",
"ip": "1.2.3.4",
"protocol": "DMR",
"module": "A"
}
```
### 3. Voice Activity (`hearing`)
Triggered when the reflector "hears" an active transmission. This event is sent for every "tick" or heartbeat of voice activity processed by the reflector.
**Payload Structure:**
```json
{
"type": "hearing",
"my": "G4XYZ",
"ur": "CQCQCQ",
"rpt1": "GB3NB",
"rpt2": "XLX123 A",
"module": "A"
}
```
## Middle Tier Design Considerations
1. **Late Joining**: The `state` message is broadcast periodically to ensure a middle-tier connecting at any time (or reconnecting) can synchronize its internal state without waiting for new events.
2. **Active Talkers**: The `ActiveTalkers` array in the `state` message identifies who is currently keyed up. Real-time transitions (start/stop) are driven by the `hearing` events and the absence of such events over a timeout (typically 2-3 seconds).
3. **Deduplication**: The `state` report is a snapshot. If the middle-tier is already tracking events, it can use the `state` report to "re-base" its state and clear out stale data.

Binary file not shown.

After

Width:  |  Height:  |  Size: 40 KiB

@ -125,7 +125,9 @@ CConfigure::CConfigure()
IPv6RegEx = std::regex("^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}(:[0-9a-fA-F]{1,4}){1,1}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|([0-9a-fA-F]{1,4}:){1,1}(:[0-9a-fA-F]{1,4}){1,6}|:((:[0-9a-fA-F]{1,4}){1,7}|:))$", std::regex::extended);
data[g_Keys.dashboard.nngaddr] = "tcp://127.0.0.1:5555";
data[g_Keys.dashboard.interval] = 10U;
data[g_Keys.dashboard.enable] = false;
data[g_Keys.ysf.ysfreflectordb.id] = 0U;
}
bool CConfigure::ReadData(const std::string &path)
@ -506,6 +508,8 @@ bool CConfigure::ReadData(const std::string &path)
data[g_Keys.dashboard.enable] = IS_TRUE(value[0]);
else if (0 == key.compare("NNGAddr"))
data[g_Keys.dashboard.nngaddr] = value;
else if (0 == key.compare("Interval"))
data[g_Keys.dashboard.interval] = getUnsigned(value, "Dashboard Interval", 1, 3600, 10);
else
badParam(key);
break;
@ -814,6 +818,7 @@ bool CConfigure::ReadData(const std::string &path)
// Dashboard section
isDefined(ErrorLevel::mild, JDASHBOARD, JENABLE, g_Keys.dashboard.enable, rval);
isDefined(ErrorLevel::mild, JDASHBOARD, "NNGAddr", g_Keys.dashboard.nngaddr, rval);
isDefined(ErrorLevel::mild, JDASHBOARD, "Interval", g_Keys.dashboard.interval, rval);
return rval;
}

@ -72,6 +72,6 @@ struct SJsonKeys {
struct FILES { const std::string pid, xml, json, white, black, interlink, terminal; }
files { "pidFilePath", "xmlFilePath", "jsonFilePath", "whitelistFilePath", "blacklistFilePath", "interlinkFilePath", "g3TerminalFilePath" };
struct DASHBOARD { const std::string enable, nngaddr; }
dashboard { "DashboardEnable", "DashboardNNGAddr" };
struct DASHBOARD { const std::string enable, nngaddr, interval; }
dashboard { "DashboardEnable", "DashboardNNGAddr", "DashboardInterval" };
};

@ -49,9 +49,16 @@ void CNNGPublisher::Publish(const nlohmann::json &event)
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_started) return;
if (m_sock.id == 0) {
std::cerr << "NNG debug: Cannot publish, socket not initialized." << std::endl;
return;
}
std::string msg = event.dump();
std::cout << "NNG debug: Attempting to publish message of size " << msg.size() << ": " << msg << std::endl;
int rv = nng_send(m_sock, (void *)msg.c_str(), msg.size(), NNG_FLAG_NONBLOCK);
if (rv != 0 && rv != NNG_EAGAIN) {
if (rv == 0) {
std::cout << "NNG: Published event: " << event["type"] << std::endl;
} else if (rv != NNG_EAGAIN) {
std::cerr << "NNG: Send error: " << nng_strerror(rv) << std::endl;
}
}

@ -340,9 +340,13 @@ void CReflector::MaintenanceThread()
if (g_Configure.Contains(g_Keys.files.json))
jsonpath.assign(g_Configure.GetString(g_Keys.files.json));
auto tcport = g_Configure.GetUnsigned(g_Keys.tc.port);
if (xmlpath.empty() && jsonpath.empty())
if (xmlpath.empty() && jsonpath.empty() && !g_Configure.GetBoolean(g_Keys.dashboard.enable))
{
return; // nothing to do
}
unsigned int nngInterval = g_Configure.GetUnsigned(g_Keys.dashboard.interval);
unsigned int nngCounter = 0;
while (keep_running)
{
@ -383,6 +387,20 @@ void CReflector::MaintenanceThread()
// and wait a bit and do something useful at the same time
for (int i=0; i< XML_UPDATE_PERIOD*10 && keep_running; i++)
{
// NNG periodic state update
if (g_Configure.GetBoolean(g_Keys.dashboard.enable))
{
if (++nngCounter >= (nngInterval * 10))
{
nngCounter = 0;
std::cout << "NNG debug: Periodic state broadcast..." << std::endl;
nlohmann::json state;
state["type"] = "state";
JsonReport(state);
g_NNGPublisher.Publish(state);
}
}
if (tcport && g_TCServer.AnyAreClosed())
{
if (g_TCServer.Accept())
@ -391,6 +409,7 @@ void CReflector::MaintenanceThread()
abort();
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
@ -408,6 +427,16 @@ std::shared_ptr<CPacketStream> CReflector::GetStream(char module)
return nullptr;
}
bool CReflector::IsAnyStreamOpen()
{
for (auto it=m_Stream.begin(); it!=m_Stream.end(); it++)
{
if ( it->second->IsOpen() )
return true;
}
return false;
}
bool CReflector::IsStreamOpen(const std::unique_ptr<CDvHeaderPacket> &DvHeader)
{
for (auto it=m_Stream.begin(); it!=m_Stream.end(); it++)
@ -456,6 +485,18 @@ void CReflector::JsonReport(nlohmann::json &report)
for (auto uid=users->begin(); uid!=users->end(); uid++)
(*uid).JsonReport(report);
ReleaseUsers();
report["ActiveTalkers"] = nlohmann::json::array();
for (auto const& [module, stream] : m_Stream)
{
if (stream->IsOpen())
{
nlohmann::json jactive;
jactive["Module"] = std::string(1, module);
jactive["Callsign"] = stream->GetUserCallsign().GetCS();
report["ActiveTalkers"].push_back(jactive);
}
}
}
void CReflector::WriteXmlFile(std::ofstream &xmlFile)

@ -92,6 +92,7 @@ protected:
// streams
std::shared_ptr<CPacketStream> GetStream(char);
bool IsAnyStreamOpen(void);
bool IsStreamOpen(const std::unique_ptr<CDvHeaderPacket> &);
char GetStreamModule(std::shared_ptr<CPacketStream>);

Loading…
Cancel
Save

Powered by TurnKey Linux.