Merge remote-tracking branch 'origin/feature/nng' into dev

# Conflicts:
#	reflector/Configure.h
pull/23/head
Dave Behnke 1 month ago
commit 986f7c9641

@ -43,14 +43,11 @@ CClients::~CClients()
void CClients::AddClient(std::shared_ptr<CClient> client) void CClients::AddClient(std::shared_ptr<CClient> client)
{ {
// first check if client already exists // first check if client already exists
for ( auto it=begin(); it!=end(); it++ ) for ( auto it=m_Clients.begin(); it!=m_Clients.end(); it++ )
{ {
if (*client == *(*it)) if (*client == *(*it))
// if found, just do nothing
// so *client keep pointing on a valid object
// on function return
{ {
// delete new one // if found, just do nothing
return; return;
} }
} }
@ -63,13 +60,21 @@ void CClients::AddClient(std::shared_ptr<CClient> client)
std::cout << " on module " << client->GetReflectorModule(); std::cout << " on module " << client->GetReflectorModule();
} }
std::cout << std::endl; std::cout << std::endl;
// dashboard event
nlohmann::json event;
event["type"] = "client_connect";
event["callsign"] = client->GetCallsign().GetCS();
event["ip"] = client->GetIp().GetAddress();
event["protocol"] = client->GetProtocolName();
event["module"] = std::string(1, client->GetReflectorModule());
g_NNGPublisher.Publish(event);
} }
void CClients::RemoveClient(std::shared_ptr<CClient> client) void CClients::RemoveClient(std::shared_ptr<CClient> client)
{ {
// look for the client // look for the client
bool found = false; for ( auto it=m_Clients.begin(); it!=m_Clients.end(); it++ )
for ( auto it=begin(); it!=end(); it++ )
{ {
// compare object pointers // compare object pointers
if ( *it == client ) if ( *it == client )
@ -84,6 +89,16 @@ void CClients::RemoveClient(std::shared_ptr<CClient> client)
std::cout << " on module " << (*it)->GetReflectorModule(); std::cout << " on module " << (*it)->GetReflectorModule();
} }
std::cout << std::endl; std::cout << std::endl;
// dashboard event
nlohmann::json event;
event["type"] = "client_disconnect";
event["callsign"] = (*it)->GetCallsign().GetCS();
event["ip"] = (*it)->GetIp().GetAddress();
event["protocol"] = (*it)->GetProtocolName();
event["module"] = std::string(1, (*it)->GetReflectorModule());
g_NNGPublisher.Publish(event);
m_Clients.erase(it); m_Clients.erase(it);
break; break;
} }

@ -38,6 +38,7 @@
#define JBRANDMEISTER "Brandmeister" #define JBRANDMEISTER "Brandmeister"
#define JCALLSIGN "Callsign" #define JCALLSIGN "Callsign"
#define JCOUNTRY "Country" #define JCOUNTRY "Country"
#define JDASHBOARD "Dashboard"
#define JDASHBOARDURL "DashboardUrl" #define JDASHBOARDURL "DashboardUrl"
#define JDCS "DCS" #define JDCS "DCS"
#define JDEFAULTID "DefaultId" #define JDEFAULTID "DefaultId"
@ -124,6 +125,9 @@ CConfigure::CConfigure()
{ {
IPv4RegEx = std::regex("^((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])\\.){3,3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9]){1,1}$", std::regex::extended); IPv4RegEx = std::regex("^((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])\\.){3,3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9]){1,1}$", std::regex::extended);
IPv6RegEx = std::regex("^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}(:[0-9a-fA-F]{1,4}){1,1}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|([0-9a-fA-F]{1,4}:){1,1}(:[0-9a-fA-F]{1,4}){1,6}|:((:[0-9a-fA-F]{1,4}){1,7}|:))$", std::regex::extended); IPv6RegEx = std::regex("^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}(:[0-9a-fA-F]{1,4}){1,1}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|([0-9a-fA-F]{1,4}:){1,1}(:[0-9a-fA-F]{1,4}){1,6}|:((:[0-9a-fA-F]{1,4}){1,7}|:))$", std::regex::extended);
data[g_Keys.dashboard.nngaddr] = "tcp://127.0.0.1:5555";
data[g_Keys.dashboard.enable] = false;
} }
bool CConfigure::ReadData(const std::string &path) bool CConfigure::ReadData(const std::string &path)
@ -185,6 +189,8 @@ bool CConfigure::ReadData(const std::string &path)
section = ESection::ip; section = ESection::ip;
else if (0 == hname.compare(JTRANSCODER)) else if (0 == hname.compare(JTRANSCODER))
section = ESection::tc; section = ESection::tc;
else if (0 == hname.compare(JDASHBOARD))
section = ESection::dashboard;
else if (0 == hname.compare(JMODULES)) else if (0 == hname.compare(JMODULES))
section = ESection::modules; section = ESection::modules;
else if (0 == hname.compare(JDPLUS)) else if (0 == hname.compare(JDPLUS))
@ -509,6 +515,14 @@ bool CConfigure::ReadData(const std::string &path)
else else
badParam(key); badParam(key);
break; break;
case ESection::dashboard:
if (0 == key.compare(JENABLE))
data[g_Keys.dashboard.enable] = IS_TRUE(value[0]);
else if (0 == key.compare("NNGAddr"))
data[g_Keys.dashboard.nngaddr] = value;
else
badParam(key);
break;
default: default:
std::cout << "WARNING: parameter '" << line << "' defined before any [section]" << std::endl; std::cout << "WARNING: parameter '" << line << "' defined before any [section]" << std::endl;
} }
@ -812,6 +826,9 @@ bool CConfigure::ReadData(const std::string &path)
if (isDefined(ErrorLevel::fatal, JFILES, JG3TERMINALPATH, g_Keys.files.terminal, rval)) if (isDefined(ErrorLevel::fatal, JFILES, JG3TERMINALPATH, g_Keys.files.terminal, rval))
checkFile(JFILES, JG3TERMINALPATH, data[g_Keys.files.terminal]); checkFile(JFILES, JG3TERMINALPATH, data[g_Keys.files.terminal]);
} }
// Dashboard section
isDefined(ErrorLevel::mild, JDASHBOARD, JENABLE, g_Keys.dashboard.enable, rval);
isDefined(ErrorLevel::mild, JDASHBOARD, "NNGAddr", g_Keys.dashboard.nngaddr, rval);
return rval; return rval;
} }

@ -25,7 +25,7 @@
enum class ErrorLevel { fatal, mild }; enum class ErrorLevel { fatal, mild };
enum class ERefreshType { file, http, both }; enum class ERefreshType { file, http, both };
enum class ESection { none, names, ip, modules, urf, dplus, dextra, dcs, g3, imrs, dmrplus, mmdvm, nxdn, bm, ysf, p25, m17, usrp, dmrid, nxdnid, ysffreq, files, tc }; enum class ESection { none, names, ip, modules, urf, dplus, dextra, dcs, g3, imrs, dmrplus, mmdvm, nxdn, bm, ysf, p25, m17, usrp, dmrid, nxdnid, ysffreq, files, tc, dashboard };
#define IS_TRUE(a) ((a)=='t' || (a)=='T' || (a)=='1') #define IS_TRUE(a) ((a)=='t' || (a)=='T' || (a)=='1')

@ -23,6 +23,7 @@
#include "LookupYsf.h" #include "LookupYsf.h"
#include "TCSocket.h" #include "TCSocket.h"
#include "JsonKeys.h" #include "JsonKeys.h"
#include "NNGPublisher.h"
extern CReflector g_Reflector; extern CReflector g_Reflector;
extern CGateKeeper g_GateKeeper; extern CGateKeeper g_GateKeeper;
@ -33,3 +34,4 @@ extern CLookupNxdn g_LNid;
extern CLookupYsf g_LYtr; extern CLookupYsf g_LYtr;
extern SJsonKeys g_Keys; extern SJsonKeys g_Keys;
extern CTCServer g_TCServer; extern CTCServer g_TCServer;
extern CNNGPublisher g_NNGPublisher;

@ -74,4 +74,7 @@ struct SJsonKeys {
struct FILES { const std::string pid, xml, json, white, black, interlink, terminal; } struct FILES { const std::string pid, xml, json, white, black, interlink, terminal; }
files { "pidFilePath", "xmlFilePath", "jsonFilePath", "whitelistFilePath", "blacklistFilePath", "interlinkFilePath", "g3TerminalFilePath" }; files { "pidFilePath", "xmlFilePath", "jsonFilePath", "whitelistFilePath", "blacklistFilePath", "interlinkFilePath", "g3TerminalFilePath" };
struct DASHBOARD { const std::string enable, nngaddr; }
dashboard { "DashboardEnable", "DashboardNNGAddr" };
}; };

@ -33,6 +33,7 @@ CLookupDmr g_LDid;
CLookupNxdn g_LNid; CLookupNxdn g_LNid;
CLookupYsf g_LYtr; CLookupYsf g_LYtr;
CTCServer g_TCServer; CTCServer g_TCServer;
CNNGPublisher g_NNGPublisher;
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
@ -49,20 +50,26 @@ int main(int argc, char *argv[])
std::cout << "IPv4 binding address is '" << g_Configure.GetString(g_Keys.ip.ipv4bind) << "'" << std::endl; std::cout << "IPv4 binding address is '" << g_Configure.GetString(g_Keys.ip.ipv4bind) << "'" << std::endl;
// remove pidfile // remove pidfile
const std::string pidpath(g_Configure.GetString(g_Keys.files.pid)); std::string pidpath = g_Configure.GetString(g_Keys.files.pid);
const std::string callsign(g_Configure.GetString(g_Keys.names.callsign)); const std::string callsign(g_Configure.GetString(g_Keys.names.callsign));
remove(pidpath.c_str()); remove(pidpath.c_str());
// splash // splash
std::cout << "Starting " << callsign << " " << g_Version << std::endl; std::cout << "Starting " << callsign << " " << g_Version << std::endl;
// and let it run // start everything
if (g_Reflector.Start()) if (g_Reflector.Start())
{ {
std::cout << "Error starting reflector" << std::endl; std::cout << "Error starting reflector" << std::endl;
return EXIT_FAILURE; return EXIT_FAILURE;
} }
// dashboard nng publisher
if (g_Configure.GetBoolean(g_Keys.dashboard.enable))
{
g_NNGPublisher.Start(g_Configure.GetString(g_Keys.dashboard.nngaddr));
}
std::cout << "Reflector " << callsign << " started and listening" << std::endl; std::cout << "Reflector " << callsign << " started and listening" << std::endl;
// write new pid file // write new pid file
@ -72,6 +79,7 @@ int main(int argc, char *argv[])
pause(); // wait for any signal pause(); // wait for any signal
g_NNGPublisher.Stop();
g_Reflector.Stop(); g_Reflector.Stop();
std::cout << "Reflector stopped" << std::endl; std::cout << "Reflector stopped" << std::endl;

@ -32,7 +32,7 @@ else
CFLAGS = -W -Werror -std=c++17 -MMD -MD CFLAGS = -W -Werror -std=c++17 -MMD -MD
endif endif
LDFLAGS=-pthread -lcurl LDFLAGS=-pthread -lcurl -lnng
ifeq ($(DHT), true) ifeq ($(DHT), true)
LDFLAGS += -lopendht LDFLAGS += -lopendht

@ -0,0 +1,57 @@
#include "NNGPublisher.h"
#include <iostream>
CNNGPublisher::CNNGPublisher()
: m_started(false)
{
m_sock.id = 0;
}
CNNGPublisher::~CNNGPublisher()
{
Stop();
}
bool CNNGPublisher::Start(const std::string &addr)
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_started) return true;
int rv;
if ((rv = nng_pub0_open(&m_sock)) != 0) {
std::cerr << "NNG: Failed to open pub socket: " << nng_strerror(rv) << std::endl;
return false;
}
if ((rv = nng_listen(m_sock, addr.c_str(), nullptr, 0)) != 0) {
std::cerr << "NNG: Failed to listen on " << addr << ": " << nng_strerror(rv) << std::endl;
nng_close(m_sock);
return false;
}
m_started = true;
std::cout << "NNG: Publisher started at " << addr << std::endl;
return true;
}
void CNNGPublisher::Stop()
{
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_started) return;
nng_close(m_sock);
m_started = false;
std::cout << "NNG: Publisher stopped" << std::endl;
}
void CNNGPublisher::Publish(const nlohmann::json &event)
{
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_started) return;
std::string msg = event.dump();
int rv = nng_send(m_sock, (void *)msg.c_str(), msg.size(), NNG_FLAG_NONBLOCK);
if (rv != 0 && rv != NNG_EAGAIN) {
std::cerr << "NNG: Send error: " << nng_strerror(rv) << std::endl;
}
}

@ -0,0 +1,24 @@
#pragma once
#include <string>
#include <mutex>
#include <nlohmann/json.hpp>
#include <nng/nng.h>
#include <nng/protocol/pubsub0/pub.h>
class CNNGPublisher
{
public:
CNNGPublisher();
~CNNGPublisher();
bool Start(const std::string &addr);
void Stop();
void Publish(const nlohmann::json &event);
private:
nng_socket m_sock;
std::mutex m_mutex;
bool m_started;
};

@ -64,4 +64,14 @@ void CUsers::Hearing(const CCallsign &my, const CCallsign &rpt1, const CCallsign
} }
AddUser(heard); AddUser(heard);
// dashboard event
nlohmann::json event;
event["type"] = "hearing";
event["my"] = my.GetCS();
event["ur"] = rpt1.GetCS();
event["rpt1"] = rpt2.GetCS();
event["rpt2"] = xlx.GetCS();
event["module"] = std::string(1, xlx.GetCSModule());
g_NNGPublisher.Publish(event);
} }

Loading…
Cancel
Save

Powered by TurnKey Linux.