add Peer-Link support to SysView, so long as the FNE is appropriately configured, this will allow SysView to operate without TGID or RID configuration files;

pull/75/head
Bryan Biedenkapp 1 year ago
parent 38492cf709
commit 50bdb43db2

@ -231,7 +231,7 @@ void PeerNetwork::userPacketHandler(uint32_t peerId, FrameQueue::OpcodePair opco
}
}
tid_lookup_cleanup:
tid_lookup_cleanup:
m_tgidSize = 0U;
m_tgidCompressedSize = 0U;
if (m_tgidBuffer != nullptr)
@ -363,7 +363,7 @@ tid_lookup_cleanup:
}
}
rid_lookup_cleanup:
rid_lookup_cleanup:
m_ridSize = 0U;
m_ridCompressedSize = 0U;
if (m_ridBuffer != nullptr)
@ -495,7 +495,7 @@ rid_lookup_cleanup:
}
}
pid_lookup_cleanup:
pid_lookup_cleanup:
m_pidSize = 0U;
m_pidCompressedSize = 0U;
if (m_pidBuffer != nullptr)

@ -9,15 +9,19 @@
*/
#if !defined(NO_WEBSOCKETS)
#include "Defines.h"
#include "common/lookups/TalkgroupRulesLookup.h"
#include "common/Log.h"
#include "common/StopWatch.h"
#include "common/Thread.h"
#include "common/Utils.h"
#include "fne/network/RESTDefines.h"
#include "remote/RESTClient.h"
#include "network/PeerNetwork.h"
#include "HostWS.h"
#include "SysViewMain.h"
using namespace lookups;
#include <unistd.h>
#include <pwd.h>
@ -27,6 +31,110 @@
#define IDLE_WARMUP_MS 5U
// ---------------------------------------------------------------------------
// Global Functions
// ---------------------------------------------------------------------------
/**
* @brief Helper to convert a TalkgroupRuleGroupVoice to JSON.
* @param groupVoice Instance of TalkgroupRuleGroupVoice to convert to JSON.
* @returns json::object JSON object.
*/
json::object tgToJson(const TalkgroupRuleGroupVoice& groupVoice)
{
json::object tg = json::object();
std::string tgName = groupVoice.name();
tg["name"].set<std::string>(tgName);
std::string tgAlias = groupVoice.nameAlias();
tg["alias"].set<std::string>(tgAlias);
bool invalid = groupVoice.isInvalid();
tg["invalid"].set<bool>(invalid);
// source stanza
{
json::object source = json::object();
uint32_t tgId = groupVoice.source().tgId();
source["tgid"].set<uint32_t>(tgId);
uint8_t tgSlot = groupVoice.source().tgSlot();
source["slot"].set<uint8_t>(tgSlot);
tg["source"].set<json::object>(source);
}
// config stanza
{
json::object config = json::object();
bool active = groupVoice.config().active();
config["active"].set<bool>(active);
bool affiliated = groupVoice.config().affiliated();
config["affiliated"].set<bool>(affiliated);
bool parrot = groupVoice.config().parrot();
config["parrot"].set<bool>(parrot);
json::array inclusions = json::array();
std::vector<uint32_t> inclusion = groupVoice.config().inclusion();
if (inclusion.size() > 0) {
for (auto inclEntry : inclusion) {
uint32_t peerId = inclEntry;
inclusions.push_back(json::value((double)peerId));
}
}
config["inclusion"].set<json::array>(inclusions);
json::array exclusions = json::array();
std::vector<uint32_t> exclusion = groupVoice.config().exclusion();
if (exclusion.size() > 0) {
for (auto exclEntry : exclusion) {
uint32_t peerId = exclEntry;
exclusions.push_back(json::value((double)peerId));
}
}
config["exclusion"].set<json::array>(exclusions);
json::array rewrites = json::array();
std::vector<lookups::TalkgroupRuleRewrite> rewrite = groupVoice.config().rewrite();
if (rewrite.size() > 0) {
for (auto rewrEntry : rewrite) {
json::object rewrite = json::object();
uint32_t peerId = rewrEntry.peerId();
rewrite["peerid"].set<uint32_t>(peerId);
uint32_t tgId = rewrEntry.tgId();
rewrite["tgid"].set<uint32_t>(tgId);
uint8_t tgSlot = rewrEntry.tgSlot();
rewrite["slot"].set<uint8_t>(tgSlot);
rewrites.push_back(json::value(rewrite));
}
}
config["rewrite"].set<json::array>(rewrites);
json::array always = json::array();
std::vector<uint32_t> alwaysSend = groupVoice.config().alwaysSend();
if (alwaysSend.size() > 0) {
for (auto alwaysEntry : alwaysSend) {
uint32_t peerId = alwaysEntry;
always.push_back(json::value((double)peerId));
}
}
config["always"].set<json::array>(always);
json::array preferreds = json::array();
std::vector<uint32_t> preferred = groupVoice.config().preferred();
if (preferred.size() > 0) {
for (auto prefEntry : preferred) {
uint32_t peerId = prefEntry;
preferreds.push_back(json::value((double)peerId));
}
}
config["preferred"].set<json::array>(preferreds);
tg["config"].set<json::object>(config);
}
return tg;
}
// ---------------------------------------------------------------------------
// Public Class Members
// ---------------------------------------------------------------------------
@ -161,6 +269,11 @@ int HostWS::run()
Timer peerStatusUpdate(1000U, 0U, 175U);
peerStatusUpdate.start();
Timer tgDataUpdate(1000U, 30U);
tgDataUpdate.start();
Timer ridDataUpdate(1000U, 30U);
ridDataUpdate.start();
setNetDataEventCallback([=](json::object obj) { netDataEvent(obj); });
// main execution loop
@ -257,6 +370,58 @@ int HostWS::run()
}
}
}
// send full talkgroup list data
tgDataUpdate.clock(ms);
if (tgDataUpdate.isRunning() && tgDataUpdate.hasExpired()) {
tgDataUpdate.start();
json::object wsObj = json::object();
std::string type = "tg_data";
json::array tgs = json::array();
if (g_tidLookup != nullptr) {
if (g_tidLookup->groupVoice().size() > 0) {
for (auto entry : g_tidLookup->groupVoice()) {
json::object tg = tgToJson(entry);
tgs.push_back(json::value(tg));
}
}
}
wsObj["payload"].set<json::array>(tgs);
send(wsObj);
}
// send full radio ID list data
ridDataUpdate.clock(ms);
if (ridDataUpdate.isRunning() && ridDataUpdate.hasExpired()) {
ridDataUpdate.start();
json::object wsObj = json::object();
std::string type = "rid_data";
json::array rids = json::array();
if (g_ridLookup != nullptr) {
if (g_ridLookup->table().size() > 0) {
for (auto entry : g_ridLookup->table()) {
json::object ridObj = json::object();
uint32_t rid = entry.first;
ridObj["id"].set<uint32_t>(rid);
bool enabled = entry.second.radioEnabled();
ridObj["enabled"].set<bool>(enabled);
std::string alias = entry.second.radioAlias();
ridObj["alias"].set<std::string>(alias);
rids.push_back(json::value(ridObj));
}
}
}
wsObj["payload"].set<json::array>(rids);
send(wsObj);
}
} else {
// clear ostream
logOutput.str("");

@ -11,6 +11,7 @@
#include "common/network/json/json.h"
#include "common/p25/dfsi/DFSIDefines.h"
#include "common/p25/dfsi/LC.h"
#include "common/zlib/zlib.h"
#include "common/Utils.h"
#include "network/PeerNetwork.h"
#include "SysViewMain.h"
@ -18,6 +19,8 @@
using namespace network;
#include <cassert>
#include <fstream>
#include <streambuf>
// ---------------------------------------------------------------------------
// Static Class Members
@ -34,7 +37,14 @@ std::mutex PeerNetwork::m_peerStatusMutex;
PeerNetwork::PeerNetwork(const std::string& address, uint16_t port, uint16_t localPort, uint32_t peerId, const std::string& password,
bool duplex, bool debug, bool dmr, bool p25, bool nxdn, bool slot1, bool slot2, bool allowActivityTransfer, bool allowDiagnosticTransfer, bool updateLookup, bool saveLookup) :
Network(address, port, localPort, peerId, password, duplex, debug, dmr, p25, nxdn, slot1, slot2, allowActivityTransfer, allowDiagnosticTransfer, updateLookup, saveLookup),
peerStatus()
peerStatus(),
m_peerLink(false),
m_tgidCompressedSize(0U),
m_tgidSize(0U),
m_tgidBuffer(nullptr),
m_ridCompressedSize(0U),
m_ridSize(0U),
m_ridBuffer(nullptr)
{
assert(!address.empty());
assert(port > 0U);
@ -112,6 +122,279 @@ void PeerNetwork::userPacketHandler(uint32_t peerId, FrameQueue::OpcodePair opco
}
break;
case NET_FUNC::PEER_LINK:
{
switch (opcode.second) {
case NET_SUBFUNC::PL_TALKGROUP_LIST:
{
uint8_t curBlock = data[8U];
uint8_t blockCnt = data[9U];
// if this is the first block store sizes and initialize temp buffer
if (curBlock == 0U) {
m_tgidSize = __GET_UINT32(data, 0U);
m_tgidCompressedSize = __GET_UINT32(data, 4U);
if (m_tgidBuffer != nullptr)
delete[] m_tgidBuffer;
if (m_tgidSize < PEER_LINK_BLOCK_SIZE)
m_tgidBuffer = new uint8_t[PEER_LINK_BLOCK_SIZE + 1U];
else
m_tgidBuffer = new uint8_t[m_tgidSize + 1U];
}
if (m_tgidBuffer != nullptr) {
if (curBlock < blockCnt) {
uint32_t offs = curBlock * PEER_LINK_BLOCK_SIZE;
::memcpy(m_tgidBuffer + offs, data + 10U, PEER_LINK_BLOCK_SIZE);
// Utils::dump(1U, "Block Payload", data, 10U + PEER_LINK_BLOCK_SIZE);
} else {
uint32_t offs = curBlock * PEER_LINK_BLOCK_SIZE;
::memcpy(m_tgidBuffer + offs, data + 10U, PEER_LINK_BLOCK_SIZE);
// Utils::dump(1U, "Block Payload", data, 10U + PEER_LINK_BLOCK_SIZE);
// Utils::dump(1U, "Compressed Payload", m_tgidBuffer, m_tgidCompressedSize);
// handle last block
// compression structures
z_stream strm;
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
// set input data
strm.avail_in = m_tgidCompressedSize;
strm.next_in = m_tgidBuffer;
// initialize decompression
int ret = inflateInit(&strm);
if (ret != Z_OK) {
LogError(LOG_NET, "PEER %u error initializing ZLIB", peerId);
m_tgidSize = 0U;
m_tgidCompressedSize = 0U;
if (m_tgidBuffer != nullptr)
delete[] m_tgidBuffer;
m_tgidBuffer = nullptr;
break;
}
// decompress data
std::vector<uint8_t> decompressedData;
uint8_t outbuffer[1024];
do {
strm.avail_out = sizeof(outbuffer);
strm.next_out = outbuffer;
ret = inflate(&strm, Z_NO_FLUSH);
if (ret == Z_STREAM_ERROR) {
LogError(LOG_NET, "PEER %u error decompressing TGID list", peerId);
inflateEnd(&strm);
goto tid_lookup_cleanup; // yes - I hate myself; but this is quick
}
decompressedData.insert(decompressedData.end(), outbuffer, outbuffer + sizeof(outbuffer) - strm.avail_out);
} while (ret != Z_STREAM_END);
// cleanup
inflateEnd(&strm);
// scope is intentional
{
uint32_t decompressedLen = strm.total_out;
uint8_t* decompressed = decompressedData.data();
// Utils::dump(1U, "Raw TGID Data", decompressed, decompressedLen);
// check that we got the appropriate data
if (decompressedLen == m_tgidSize) {
// store to file
std::unique_ptr<char[]> __str = std::make_unique<char[]>(decompressedLen + 1U);
char* str = __str.get();
::memcpy(str, decompressed, decompressedLen);
str[decompressedLen] = 0; // null termination
// randomize filename
std::ostringstream s;
std::random_device rd;
std::mt19937 mt(rd());
std::uniform_int_distribution<uint32_t> dist(0x00U, 0xFFFFFFFFU);
s << "/tmp/talkgroup_rules.yml." << dist(mt);
std::string filename = s.str();
std::ofstream file(filename, std::ofstream::out);
if (file.fail()) {
LogError(LOG_NET, "Cannot open the talkgroup ID lookup file - %s", filename.c_str());
goto tid_lookup_cleanup; // yes - I hate myself; but this is quick
}
file << str;
file.close();
m_tidLookup->stop(true);
m_tidLookup->filename(filename);
m_tidLookup->reload();
// flag this peer as Peer-Link enabled
m_peerLink = true;
// cleanup temporary file
::remove(filename.c_str());
}
else {
LogError(LOG_NET, "PEER %u error decompressed TGID list, was not of expected size! %u != %u", peerId, decompressedLen, m_tgidSize);
}
}
tid_lookup_cleanup:
m_tgidSize = 0U;
m_tgidCompressedSize = 0U;
if (m_tgidBuffer != nullptr)
delete[] m_tgidBuffer;
m_tgidBuffer = nullptr;
}
}
}
break;
case NET_SUBFUNC::PL_RID_LIST:
{
uint8_t curBlock = data[8U];
uint8_t blockCnt = data[9U];
// if this is the first block store sizes and initialize temp buffer
if (curBlock == 0U) {
m_ridSize = __GET_UINT32(data, 0U);
m_ridCompressedSize = __GET_UINT32(data, 4U);
if (m_ridBuffer != nullptr)
delete[] m_ridBuffer;
if (m_ridSize < PEER_LINK_BLOCK_SIZE)
m_ridBuffer = new uint8_t[PEER_LINK_BLOCK_SIZE + 1U];
else
m_ridBuffer = new uint8_t[m_ridSize + 1U];
}
if (m_ridBuffer != nullptr) {
if (curBlock < blockCnt) {
uint32_t offs = curBlock * PEER_LINK_BLOCK_SIZE;
::memcpy(m_ridBuffer + offs, data + 10U, PEER_LINK_BLOCK_SIZE);
// Utils::dump(1U, "Block Payload", data, 10U + PEER_LINK_BLOCK_SIZE);
} else {
uint32_t offs = curBlock * PEER_LINK_BLOCK_SIZE;
::memcpy(m_ridBuffer + offs, data + 10U, PEER_LINK_BLOCK_SIZE);
// Utils::dump(1U, "Block Payload", data, 10U + PEER_LINK_BLOCK_SIZE);
// Utils::dump(1U, "Compressed Payload", m_ridBuffer, m_ridCompressedSize);
// handle last block
// compression structures
z_stream strm;
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
// set input data
strm.avail_in = m_ridCompressedSize;
strm.next_in = m_ridBuffer;
// initialize decompression
int ret = inflateInit(&strm);
if (ret != Z_OK) {
LogError(LOG_NET, "PEER %u error initializing ZLIB", peerId);
m_ridSize = 0U;
m_ridCompressedSize = 0U;
if (m_ridBuffer != nullptr)
delete[] m_ridBuffer;
m_ridBuffer = nullptr;
break;
}
// decompress data
std::vector<uint8_t> decompressedData;
uint8_t outbuffer[1024];
do {
strm.avail_out = sizeof(outbuffer);
strm.next_out = outbuffer;
ret = inflate(&strm, Z_NO_FLUSH);
if (ret == Z_STREAM_ERROR) {
LogError(LOG_NET, "PEER %u error decompressing RID list", peerId);
inflateEnd(&strm);
goto rid_lookup_cleanup; // yes - I hate myself; but this is quick
}
decompressedData.insert(decompressedData.end(), outbuffer, outbuffer + sizeof(outbuffer) - strm.avail_out);
} while (ret != Z_STREAM_END);
// cleanup
inflateEnd(&strm);
// scope is intentional
{
uint32_t decompressedLen = strm.total_out;
uint8_t* decompressed = decompressedData.data();
// Utils::dump(1U, "Raw RID Data", decompressed, decompressedLen);
// check that we got the appropriate data
if (decompressedLen == m_ridSize) {
// store to file
std::unique_ptr<char[]> __str = std::make_unique<char[]>(decompressedLen + 1U);
char* str = __str.get();
::memcpy(str, decompressed, decompressedLen);
str[decompressedLen] = 0; // null termination
// randomize filename
std::ostringstream s;
std::random_device rd;
std::mt19937 mt(rd());
std::uniform_int_distribution<uint32_t> dist(0x00U, 0xFFFFFFFFU);
s << "/tmp/rid_acl.dat." << dist(mt);
std::string filename = s.str();
std::ofstream file(filename, std::ofstream::out);
if (file.fail()) {
LogError(LOG_NET, "Cannot open the radio ID lookup file - %s", filename.c_str());
goto rid_lookup_cleanup; // yes - I hate myself; but this is quick
}
file << str;
file.close();
m_ridLookup->stop(true);
m_ridLookup->filename(filename);
m_ridLookup->reload();
// flag this peer as Peer-Link enabled
m_peerLink = true;
// cleanup temporary file
::remove(filename.c_str());
}
else {
LogError(LOG_NET, "PEER %u error decompressed RID list, was not of expected size! %u != %u", peerId, decompressedLen, m_ridSize);
}
}
rid_lookup_cleanup:
m_ridSize = 0U;
m_ridCompressedSize = 0U;
if (m_ridBuffer != nullptr)
delete[] m_ridBuffer;
m_ridBuffer = nullptr;
}
}
}
break;
default:
break;
}
}
break;
default:
Utils::dump("unknown opcode from the master", data, length);
break;
@ -160,6 +443,8 @@ bool PeerNetwork::writeConfig()
rcon["port"].set<uint16_t>(m_restApiPort); // REST API Port
config["rcon"].set<json::object>(rcon);
bool external = true;
config["externalPeer"].set<bool>(external); // External Peer Marker
bool convPeer = true;
config["conventionalPeer"].set<bool>(convPeer); // Conventional Peer Marker
bool sysView = true;

@ -57,6 +57,11 @@ namespace network
PeerNetwork(const std::string& address, uint16_t port, uint16_t localPort, uint32_t peerId, const std::string& password,
bool duplex, bool debug, bool dmr, bool p25, bool nxdn, bool slot1, bool slot2, bool allowActivityTransfer, bool allowDiagnosticTransfer, bool updateLookup, bool saveLookup);
/**
* @brief Flag indicating whether or not SysView has received Peer-Link data transfers.
*/
bool hasPeerLink() const { return m_peerLink; }
/**
* @brief Helper to lock the peer status mutex.
*/
@ -91,6 +96,17 @@ namespace network
private:
static std::mutex m_peerStatusMutex;
bool m_peerLink;
uint32_t m_tgidCompressedSize;
uint32_t m_tgidSize;
uint8_t* m_tgidBuffer;
uint32_t m_ridCompressedSize;
uint32_t m_ridSize;
uint8_t* m_ridBuffer;
};
} // namespace network

Loading…
Cancel
Save

Powered by TurnKey Linux.