pull/20/merge
Dave Behnke 1 month ago committed by GitHub
commit a8016763ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

6
.gitignore vendored

@ -5,3 +5,9 @@ reflector/urfd.*
urfd
inicheck
dbutil
.devcontainer/
/test_urfd.ini
/staging_urfd.ini
/pr_comment_nng.md
/pr_body_fix.md
/staging/

@ -0,0 +1,145 @@
# NNG Event System Documentation
This document describes the real-time event system in `urfd`, which uses NNG (nanomsg next gen) to broadcast system state and activity as JSON.
## Architecture Overview
The `urfd` reflector acts as an NNG **Publisher** (PUB). Any number of subscribers (e.g., a middle-tier service or dashboard) can connect as **Subscribers** (SUB) to receive the event stream.
```mermaid
graph TD
subgraph "urfd Core"
CR["CReflector"]
CC["CClients"]
CU["CUsers"]
PS["CPacketStream"]
end
subgraph "Publishing Layer"
NP["g_NNGPublisher"]
end
subgraph "Network"
ADDR["tcp://0.0.0.0:5555"]
end
subgraph "External"
MT["Middle Tier / Dashboard"]
end
%% Internal Flows
CC -- "client_connect / client_disconnect" --> NP
CU -- "hearing / closing" --> NP
CR -- "periodic state report" --> NP
PS -- "IsActive status" --> CR
%% Network Flow
NP --> ADDR
ADDR -.-> MT
```
## Messaging Protocols
Events are sent as serialized JSON strings. Each message contains a `type` field to identify the payload structure.
### 1. State Broadcast (`state`)
Sent periodically based on `DashboardInterval` (default 10s). It provides a full snapshot of the reflector's configuration and status.
**Payload Structure:**
```json
{
"type": "state",
"Configure": {
"Key": "Value",
...
},
"Peers": [
{
"Callsign": "XLX123",
"Modules": "ABC",
"Protocol": "D-Extra",
"ConnectTime": "2023-10-27T10:00:00Z"
}
],
"Clients": [
{
"Callsign": "N7TAE",
"OnModule": "A",
"Protocol": "DMR",
"ConnectTime": "2023-10-27T10:05:00Z"
}
],
"Users": [
{
"Callsign": "G4XYZ",
"Repeater": "GB3NB",
"OnModule": "B",
"ViaPeer": "XLX456",
"LastHeard": "2023-10-27T10:10:00Z"
}
],
"ActiveTalkers": [
{
"Module": "A",
"Callsign": "N7TAE"
}
]
}
```
### 2. Client Connectivity (`client_connect` / `client_disconnect`)
Triggered immediately when a client (Repeater, Hotspot, or Mobile App) links or unlinks from a module.
**Payload Structure:**
```json
{
"type": "client_connect",
"callsign": "N7TAE",
"ip": "1.2.3.4",
"protocol": "DMR",
"module": "A"
}
```
### 3. Voice Activity (`hearing`)
Triggered when the reflector "hears" an active transmission. This event is sent for every "tick" or heartbeat of voice activity processed by the reflector.
**Payload Structure:**
```json
{
"type": "hearing",
"my": "G4XYZ",
"ur": "CQCQCQ",
"rpt1": "GB3NB",
"rpt2": "XLX123 A",
"module": "A",
"protocol": "M17"
}
```
### 4. Transmission End (`closing`)
Triggered when a transmission stream is closed (user stops talking).
**Payload Structure:**
```json
{
"type": "closing",
"my": "G4XYZ",
"module": "A",
"protocol": "M17"
}
```
## Middle Tier Design Considerations
1. **Late Joining**: The `state` message is broadcast periodically to ensure a middle-tier connecting at any time (or reconnecting) can synchronize its internal state without waiting for new events.
2. **Active Talkers**: The `ActiveTalkers` array in the `state` message identifies who is currently keyed up. Real-time transitions (start/stop) are driven by the `hearing` events and the absence of such events over a timeout (typically 2-3 seconds).
3. **Deduplication**: The `state` report is a snapshot. If the middle-tier is already tracking events, it can use the `state` report to "re-base" its state and clear out stale data.

Binary file not shown.

After

Width:  |  Height:  |  Size: 40 KiB

@ -368,7 +368,7 @@ void CBMProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header, c
// release
g_Reflector.ReleaseClients();
// update last heard
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, peer);
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, peer, EProtocol::bm);
g_Reflector.ReleaseUsers();
}
}

@ -43,14 +43,11 @@ CClients::~CClients()
void CClients::AddClient(std::shared_ptr<CClient> client)
{
// first check if client already exists
for ( auto it=begin(); it!=end(); it++ )
for ( auto it=m_Clients.begin(); it!=m_Clients.end(); it++ )
{
if (*client == *(*it))
// if found, just do nothing
// so *client keep pointing on a valid object
// on function return
{
// delete new one
// if found, just do nothing
return;
}
}
@ -63,13 +60,21 @@ void CClients::AddClient(std::shared_ptr<CClient> client)
std::cout << " on module " << client->GetReflectorModule();
}
std::cout << std::endl;
// dashboard event
nlohmann::json event;
event["type"] = "client_connect";
event["callsign"] = client->GetCallsign().GetCS();
event["ip"] = client->GetIp().GetAddress();
event["protocol"] = client->GetProtocolName();
event["module"] = std::string(1, client->GetReflectorModule());
g_NNGPublisher.Publish(event);
}
void CClients::RemoveClient(std::shared_ptr<CClient> client)
{
// look for the client
bool found = false;
for ( auto it=begin(); it!=end(); it++ )
for ( auto it=m_Clients.begin(); it!=m_Clients.end(); it++ )
{
// compare object pointers
if ( *it == client )
@ -84,6 +89,16 @@ void CClients::RemoveClient(std::shared_ptr<CClient> client)
std::cout << " on module " << (*it)->GetReflectorModule();
}
std::cout << std::endl;
// dashboard event
nlohmann::json event;
event["type"] = "client_disconnect";
event["callsign"] = (*it)->GetCallsign().GetCS();
event["ip"] = (*it)->GetIp().GetAddress();
event["protocol"] = (*it)->GetProtocolName();
event["module"] = std::string(1, (*it)->GetReflectorModule());
g_NNGPublisher.Publish(event);
m_Clients.erase(it);
break;
}

@ -38,6 +38,7 @@
#define JBRANDMEISTER "Brandmeister"
#define JCALLSIGN "Callsign"
#define JCOUNTRY "Country"
#define JDASHBOARD "Dashboard"
#define JDASHBOARDURL "DashboardUrl"
#define JDCS "DCS"
#define JDEFAULTID "DefaultId"
@ -123,6 +124,12 @@ CConfigure::CConfigure()
{
IPv4RegEx = std::regex("^((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])\\.){3,3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9]){1,1}$", std::regex::extended);
IPv6RegEx = std::regex("^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}(:[0-9a-fA-F]{1,4}){1,1}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|([0-9a-fA-F]{1,4}:){1,1}(:[0-9a-fA-F]{1,4}){1,6}|:((:[0-9a-fA-F]{1,4}){1,7}|:))$", std::regex::extended);
data[g_Keys.dashboard.nngaddr] = "tcp://127.0.0.1:5555";
data[g_Keys.dashboard.interval] = 10U;
data[g_Keys.dashboard.enable] = false;
data[g_Keys.dashboard.debug] = false;
data[g_Keys.ysf.ysfreflectordb.id] = 0U;
}
bool CConfigure::ReadData(const std::string &path)
@ -184,6 +191,8 @@ bool CConfigure::ReadData(const std::string &path)
section = ESection::ip;
else if (0 == hname.compare(JTRANSCODER))
section = ESection::tc;
else if (0 == hname.compare(JDASHBOARD))
section = ESection::dashboard;
else if (0 == hname.compare(JMODULES))
section = ESection::modules;
else if (0 == hname.compare(JDPLUS))
@ -498,6 +507,18 @@ bool CConfigure::ReadData(const std::string &path)
else
badParam(key);
break;
case ESection::dashboard:
if (0 == key.compare(JENABLE))
data[g_Keys.dashboard.enable] = IS_TRUE(value[0]);
else if (0 == key.compare("NNGAddr"))
data[g_Keys.dashboard.nngaddr] = value;
else if (0 == key.compare("Interval"))
data[g_Keys.dashboard.interval] = getUnsigned(value, "Dashboard Interval", 1, 3600, 10);
else if (0 == key.compare("NNGDebug"))
data[g_Keys.dashboard.debug] = IS_TRUE(value[0]);
else
badParam(key);
break;
default:
std::cout << "WARNING: parameter '" << line << "' defined before any [section]" << std::endl;
}
@ -801,6 +822,10 @@ bool CConfigure::ReadData(const std::string &path)
if (isDefined(ErrorLevel::fatal, JFILES, JG3TERMINALPATH, g_Keys.files.terminal, rval))
checkFile(JFILES, JG3TERMINALPATH, data[g_Keys.files.terminal]);
}
// Dashboard section
isDefined(ErrorLevel::mild, JDASHBOARD, JENABLE, g_Keys.dashboard.enable, rval);
isDefined(ErrorLevel::mild, JDASHBOARD, "NNGAddr", g_Keys.dashboard.nngaddr, rval);
isDefined(ErrorLevel::mild, JDASHBOARD, "Interval", g_Keys.dashboard.interval, rval);
return rval;
}

@ -25,7 +25,7 @@
enum class ErrorLevel { fatal, mild };
enum class ERefreshType { file, http, both };
enum class ESection { none, names, ip, modules, urf, dplus, dextra, dcs, g3, dmrplus, mmdvm, nxdn, bm, ysf, p25, m17, usrp, dmrid, nxdnid, ysffreq, files, tc };
enum class ESection { none, names, ip, modules, urf, dplus, dextra, dcs, g3, dmrplus, mmdvm, nxdn, bm, ysf, p25, m17, usrp, dmrid, nxdnid, ysffreq, files, tc, dashboard };
#define IS_TRUE(a) ((a)=='t' || (a)=='T' || (a)=='1')

@ -208,7 +208,7 @@ void CDcsProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header,
g_Reflector.ReleaseClients();
// update last heard
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2);
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::dcs);
g_Reflector.ReleaseUsers();
}
}

@ -351,7 +351,7 @@ void CDextraProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Heade
g_Reflector.ReleaseClients();
// update last heard
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2);
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::dextra);
g_Reflector.ReleaseUsers();
}
}

@ -335,7 +335,7 @@ void CDmrmmdvmProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Hea
// update last heard
if ( lastheard )
{
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2);
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::dmrmmdvm);
g_Reflector.ReleaseUsers();
}
}

@ -208,7 +208,7 @@ void CDmrplusProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Head
// release
g_Reflector.ReleaseClients();
// update last heard
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2);
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, EProtocol::dmrplus);
g_Reflector.ReleaseUsers();
}
}

@ -213,7 +213,7 @@ void CDplusProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header
g_Reflector.ReleaseClients();
// update last heard
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2);
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::dplus);
g_Reflector.ReleaseUsers();
}
else

@ -570,7 +570,7 @@ void CG3Protocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header, c
}
// update last heard
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2);
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, EProtocol::g3);
g_Reflector.ReleaseUsers();
}
}

@ -47,6 +47,7 @@ public:
// authorizations
bool MayLink(const CCallsign &, const CIp &, const EProtocol, char * = nullptr) const;
bool MayTransmit(const CCallsign &, const CIp &, EProtocol = EProtocol::any, char = ' ') const;
const std::string ProtocolName(EProtocol) const;
protected:
// thread
@ -56,7 +57,6 @@ protected:
bool IsNodeListedOk(const std::string &) const;
bool IsPeerListedOk(const std::string &, char) const;
bool IsPeerListedOk(const std::string &, const CIp &, char *) const;
const std::string ProtocolName(EProtocol) const;
protected:
// data

@ -23,6 +23,7 @@
#include "LookupYsf.h"
#include "TCSocket.h"
#include "JsonKeys.h"
#include "NNGPublisher.h"
extern CReflector g_Reflector;
extern CGateKeeper g_GateKeeper;
@ -33,3 +34,4 @@ extern CLookupNxdn g_LNid;
extern CLookupYsf g_LYtr;
extern SJsonKeys g_Keys;
extern CTCServer g_TCServer;
extern CNNGPublisher g_NNGPublisher;

@ -71,4 +71,7 @@ struct SJsonKeys {
struct FILES { const std::string pid, xml, json, white, black, interlink, terminal; }
files { "pidFilePath", "xmlFilePath", "jsonFilePath", "whitelistFilePath", "blacklistFilePath", "interlinkFilePath", "g3TerminalFilePath" };
struct DASHBOARD { const std::string enable, nngaddr, interval, debug; }
dashboard { "DashboardEnable", "DashboardNNGAddr", "DashboardInterval", "NNGDebug" };
};

@ -23,6 +23,12 @@
#include "M17Packet.h"
#include "Global.h"
////////////////////////////////////////////////////////////////////////////////////////
// constructor
CM17Protocol::CM17Protocol() : CSEProtocol()
{
}
////////////////////////////////////////////////////////////////////////////////////////
// operation
@ -209,7 +215,9 @@ void CM17Protocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header,
g_Reflector.ReleaseClients();
// update last heard
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2);
CCallsign reflectorCall = rpt2;
reflectorCall.SetCSModule(Header->GetRpt2Module());
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, reflectorCall, EProtocol::m17);
g_Reflector.ReleaseUsers();
}
}
@ -411,3 +419,15 @@ void CM17Protocol::EncodeM17Packet(SM17Frame &frame, const CDvHeaderPacket &Head
frame.streamid = Header.GetStreamId(); // no host<--->network byte swapping since we never do any math on this value
// the CRC will be set in HandleQueue, after lich.dest is set
}
bool CM17Protocol::EncodeDvHeaderPacket(const CDvHeaderPacket &packet, CBuffer &buffer) const
{
packet.EncodeInterlinkPacket(buffer);
return true;
}
bool CM17Protocol::EncodeDvFramePacket(const CDvFramePacket &packet, CBuffer &buffer) const
{
packet.EncodeInterlinkPacket(buffer);
return true;
}

@ -33,6 +33,7 @@ CLookupDmr g_LDid;
CLookupNxdn g_LNid;
CLookupYsf g_LYtr;
CTCServer g_TCServer;
CNNGPublisher g_NNGPublisher;
////////////////////////////////////////////////////////////////////////////////////////
@ -49,20 +50,26 @@ int main(int argc, char *argv[])
std::cout << "IPv4 binding address is '" << g_Configure.GetString(g_Keys.ip.ipv4bind) << "'" << std::endl;
// remove pidfile
const std::string pidpath(g_Configure.GetString(g_Keys.files.pid));
std::string pidpath = g_Configure.GetString(g_Keys.files.pid);
const std::string callsign(g_Configure.GetString(g_Keys.names.callsign));
remove(pidpath.c_str());
// splash
std::cout << "Starting " << callsign << " " << g_Version << std::endl;
// and let it run
// start everything
if (g_Reflector.Start())
{
std::cout << "Error starting reflector" << std::endl;
return EXIT_FAILURE;
}
// dashboard nng publisher
if (g_Configure.GetBoolean(g_Keys.dashboard.enable))
{
g_NNGPublisher.Start(g_Configure.GetString(g_Keys.dashboard.nngaddr));
}
std::cout << "Reflector " << callsign << " started and listening" << std::endl;
// write new pid file
@ -72,6 +79,7 @@ int main(int argc, char *argv[])
pause(); // wait for any signal
g_NNGPublisher.Stop();
g_Reflector.Stop();
std::cout << "Reflector stopped" << std::endl;

@ -32,7 +32,7 @@ else
CFLAGS = -W -Werror -std=c++17 -MMD -MD
endif
LDFLAGS=-pthread -lcurl
LDFLAGS=-pthread -lcurl -lnng
ifeq ($(DHT), true)
LDFLAGS += -lopendht

@ -0,0 +1,66 @@
#include "NNGPublisher.h"
#include "Global.h"
#include <iostream>
CNNGPublisher::CNNGPublisher()
: m_started(false)
{
m_sock.id = 0;
}
CNNGPublisher::~CNNGPublisher()
{
Stop();
}
bool CNNGPublisher::Start(const std::string &addr)
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_started) return true;
int rv;
if ((rv = nng_pub0_open(&m_sock)) != 0) {
std::cerr << "NNG: Failed to open pub socket: " << nng_strerror(rv) << std::endl;
return false;
}
if ((rv = nng_listen(m_sock, addr.c_str(), nullptr, 0)) != 0) {
std::cerr << "NNG: Failed to listen on " << addr << ": " << nng_strerror(rv) << std::endl;
nng_close(m_sock);
return false;
}
m_started = true;
std::cout << "NNG: Publisher started at " << addr << std::endl;
return true;
}
void CNNGPublisher::Stop()
{
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_started) return;
nng_close(m_sock);
m_started = false;
std::cout << "NNG: Publisher stopped" << std::endl;
}
void CNNGPublisher::Publish(const nlohmann::json &event)
{
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_started) return;
if (m_sock.id == 0) {
std::cerr << "NNG debug: Cannot publish, socket not initialized." << std::endl;
return;
}
std::string msg = event.dump();
if (g_Configure.GetBoolean(g_Keys.dashboard.debug))
std::cout << "NNG debug: Attempting to publish message of size " << msg.size() << ": " << msg << std::endl;
int rv = nng_send(m_sock, (void *)msg.c_str(), msg.size(), NNG_FLAG_NONBLOCK);
if (rv == 0) {
std::cout << "NNG: Published event: " << event["type"] << std::endl;
} else if (rv != NNG_EAGAIN) {
std::cerr << "NNG: Send error: " << nng_strerror(rv) << std::endl;
}
}

@ -0,0 +1,24 @@
#pragma once
#include <string>
#include <mutex>
#include <nlohmann/json.hpp>
#include <nng/nng.h>
#include <nng/protocol/pubsub0/pub.h>
class CNNGPublisher
{
public:
CNNGPublisher();
~CNNGPublisher();
bool Start(const std::string &addr);
void Stop();
void Publish(const nlohmann::json &event);
private:
nng_socket m_sock;
std::mutex m_mutex;
bool m_started;
};

@ -235,7 +235,7 @@ void CNXDNProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header,
// update last heard
if ( g_Reflector.IsValidModule(rpt2.GetCSModule()) )
{
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2);
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::nxdn);
g_Reflector.ReleaseUsers();
}
}

@ -219,7 +219,7 @@ void CP25Protocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header,
g_Reflector.ReleaseClients();
// update last heard
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2);
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::p25);
g_Reflector.ReleaseUsers();
}
}

@ -276,6 +276,10 @@ void CReflector::CloseStream(std::shared_ptr<CPacketStream> stream)
// notify
//OnStreamClose(stream->GetUserCallsign());
// dashboard event
GetUsers()->Closing(stream->GetUserCallsign(), GetStreamModule(stream), stream->GetOwnerClient()->GetProtocol());
ReleaseUsers();
std::cout << "Closing stream of module " << GetStreamModule(stream) << std::endl;
}
@ -340,9 +344,13 @@ void CReflector::MaintenanceThread()
if (g_Configure.Contains(g_Keys.files.json))
jsonpath.assign(g_Configure.GetString(g_Keys.files.json));
auto tcport = g_Configure.GetUnsigned(g_Keys.tc.port);
if (xmlpath.empty() && jsonpath.empty())
if (xmlpath.empty() && jsonpath.empty() && !g_Configure.GetBoolean(g_Keys.dashboard.enable))
{
return; // nothing to do
}
unsigned int nngInterval = g_Configure.GetUnsigned(g_Keys.dashboard.interval);
unsigned int nngCounter = 0;
while (keep_running)
{
@ -383,6 +391,20 @@ void CReflector::MaintenanceThread()
// and wait a bit and do something useful at the same time
for (int i=0; i< XML_UPDATE_PERIOD*10 && keep_running; i++)
{
// NNG periodic state update
if (g_Configure.GetBoolean(g_Keys.dashboard.enable))
{
if (++nngCounter >= (nngInterval * 10))
{
nngCounter = 0;
std::cout << "NNG debug: Periodic state broadcast..." << std::endl;
nlohmann::json state;
state["type"] = "state";
JsonReport(state);
g_NNGPublisher.Publish(state);
}
}
if (tcport && g_TCServer.AnyAreClosed())
{
if (g_TCServer.Accept())
@ -391,6 +413,7 @@ void CReflector::MaintenanceThread()
abort();
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
@ -408,6 +431,16 @@ std::shared_ptr<CPacketStream> CReflector::GetStream(char module)
return nullptr;
}
bool CReflector::IsAnyStreamOpen()
{
for (auto it=m_Stream.begin(); it!=m_Stream.end(); it++)
{
if ( it->second->IsOpen() )
return true;
}
return false;
}
bool CReflector::IsStreamOpen(const std::unique_ptr<CDvHeaderPacket> &DvHeader)
{
for (auto it=m_Stream.begin(); it!=m_Stream.end(); it++)
@ -456,6 +489,18 @@ void CReflector::JsonReport(nlohmann::json &report)
for (auto uid=users->begin(); uid!=users->end(); uid++)
(*uid).JsonReport(report);
ReleaseUsers();
report["ActiveTalkers"] = nlohmann::json::array();
for (auto const& [module, stream] : m_Stream)
{
if (stream->IsOpen())
{
nlohmann::json jactive;
jactive["Module"] = std::string(1, module);
jactive["Callsign"] = stream->GetUserCallsign().GetCS();
report["ActiveTalkers"].push_back(jactive);
}
}
}
void CReflector::WriteXmlFile(std::ofstream &xmlFile)

@ -92,6 +92,7 @@ protected:
// streams
std::shared_ptr<CPacketStream> GetStream(char);
bool IsAnyStreamOpen(void);
bool IsStreamOpen(const std::unique_ptr<CDvHeaderPacket> &);
char GetStreamModule(std::shared_ptr<CPacketStream>);

@ -411,7 +411,9 @@ void CURFProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header,
// release
g_Reflector.ReleaseClients();
// update last heard
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, peer);
CCallsign xlx = rpt2;
xlx.SetCSModule(Header->GetRpt2Module());
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, xlx, EProtocol::urf);
g_Reflector.ReleaseUsers();
}
}

@ -225,7 +225,7 @@ void CUSRPProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header,
g_Reflector.ReleaseClients();
// update last heard
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2);
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, EProtocol::usrp);
g_Reflector.ReleaseUsers();
}
}

@ -44,12 +44,12 @@ void CUsers::AddUser(const CUser &user)
////////////////////////////////////////////////////////////////////////////////////////
// operation
void CUsers::Hearing(const CCallsign &my, const CCallsign &rpt1, const CCallsign &rpt2)
void CUsers::Hearing(const CCallsign &my, const CCallsign &rpt1, const CCallsign &rpt2, EProtocol protocol)
{
Hearing(my, rpt1, rpt2, g_Reflector.GetCallsign());
Hearing(my, rpt1, rpt2, g_Reflector.GetCallsign(), protocol);
}
void CUsers::Hearing(const CCallsign &my, const CCallsign &rpt1, const CCallsign &rpt2, const CCallsign &xlx)
void CUsers::Hearing(const CCallsign &my, const CCallsign &rpt1, const CCallsign &rpt2, const CCallsign &xlx, EProtocol protocol)
{
CUser heard(my, rpt1, rpt2, xlx);
@ -64,4 +64,26 @@ void CUsers::Hearing(const CCallsign &my, const CCallsign &rpt1, const CCallsign
}
AddUser(heard);
// dashboard event
nlohmann::json event;
event["type"] = "hearing";
event["my"] = my.GetCS();
event["ur"] = rpt1.GetCS();
event["rpt1"] = rpt2.GetCS();
event["rpt2"] = xlx.GetCS();
event["module"] = std::string(1, xlx.GetCSModule());
event["protocol"] = g_GateKeeper.ProtocolName(protocol);
g_NNGPublisher.Publish(event);
}
void CUsers::Closing(const CCallsign &my, char module, EProtocol protocol)
{
// dashboard event
nlohmann::json event;
event["type"] = "closing";
event["my"] = my.GetCS();
event["module"] = std::string(1, module);
event["protocol"] = g_GateKeeper.ProtocolName(protocol);
g_NNGPublisher.Publish(event);
}

@ -22,6 +22,7 @@
#include <mutex>
#include "User.h"
#include "Defines.h"
class CUsers
{
@ -47,8 +48,9 @@ public:
std::list<CUser>::const_iterator cend() { return m_Users.cend(); }
// operation
void Hearing(const CCallsign &, const CCallsign &, const CCallsign &);
void Hearing(const CCallsign &, const CCallsign &, const CCallsign &, const CCallsign &);
void Hearing(const CCallsign &, const CCallsign &, const CCallsign &, EProtocol protocol = EProtocol::none);
void Hearing(const CCallsign &, const CCallsign &, const CCallsign &, const CCallsign &, EProtocol protocol);
void Closing(const CCallsign &, char module, EProtocol protocol);
protected:
// data

@ -304,7 +304,7 @@ void CYsfProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header,
// update last heard
if ( g_Reflector.IsValidModule(rpt2.GetCSModule()) )
{
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2);
g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::ysf);
g_Reflector.ReleaseUsers();
}
}
@ -489,7 +489,7 @@ bool CYsfProtocol::IsValidDvHeaderPacket(const CIp &Ip, const CYSFFICH &Fich, co
sz[YSF_CALLSIGN_LENGTH] = 0;
CCallsign rpt1 = CCallsign((const char *)sz);
rpt1.SetCSModule(YSF_MODULE_ID);
CCallsign rpt2 = m_ReflectorCallsign;
CCallsign rpt2 = g_Reflector.GetCallsign();
// as YSF protocol does not provide a module-tranlatable
// destid, set module to none and rely on OnDvHeaderPacketIn()
// to later fill it with proper value
@ -542,7 +542,7 @@ bool CYsfProtocol::IsValidDvFramePacket(const CIp &Ip, const CYSFFICH &Fich, con
sz[YSF_CALLSIGN_LENGTH] = 0;
CCallsign rpt1 = CCallsign((const char *)sz);
rpt1.SetCSModule(YSF_MODULE_ID);
CCallsign rpt2 = m_ReflectorCallsign;
CCallsign rpt2 = g_Reflector.GetCallsign();
rpt2.SetCSModule(' ');
header = std::unique_ptr<CDvHeaderPacket>(new CDvHeaderPacket(csMY, CCallsign("CQCQCQ"), rpt1, rpt2, uiStreamId, Fich.getFN()));

Loading…
Cancel
Save

Powered by TurnKey Linux.