diff --git a/src/fne/network/PeerNetwork.cpp b/src/fne/network/PeerNetwork.cpp index e8af9f02..b66446ff 100644 --- a/src/fne/network/PeerNetwork.cpp +++ b/src/fne/network/PeerNetwork.cpp @@ -231,7 +231,7 @@ void PeerNetwork::userPacketHandler(uint32_t peerId, FrameQueue::OpcodePair opco } } -tid_lookup_cleanup: + tid_lookup_cleanup: m_tgidSize = 0U; m_tgidCompressedSize = 0U; if (m_tgidBuffer != nullptr) @@ -363,7 +363,7 @@ tid_lookup_cleanup: } } -rid_lookup_cleanup: + rid_lookup_cleanup: m_ridSize = 0U; m_ridCompressedSize = 0U; if (m_ridBuffer != nullptr) @@ -495,7 +495,7 @@ rid_lookup_cleanup: } } -pid_lookup_cleanup: + pid_lookup_cleanup: m_pidSize = 0U; m_pidCompressedSize = 0U; if (m_pidBuffer != nullptr) diff --git a/src/sysview/HostWS.cpp b/src/sysview/HostWS.cpp index 8295dca3..fe60e38e 100644 --- a/src/sysview/HostWS.cpp +++ b/src/sysview/HostWS.cpp @@ -9,15 +9,19 @@ */ #if !defined(NO_WEBSOCKETS) #include "Defines.h" +#include "common/lookups/TalkgroupRulesLookup.h" #include "common/Log.h" #include "common/StopWatch.h" #include "common/Thread.h" #include "common/Utils.h" #include "fne/network/RESTDefines.h" #include "remote/RESTClient.h" +#include "network/PeerNetwork.h" #include "HostWS.h" #include "SysViewMain.h" +using namespace lookups; + #include #include @@ -27,6 +31,110 @@ #define IDLE_WARMUP_MS 5U + +// --------------------------------------------------------------------------- +// Global Functions +// --------------------------------------------------------------------------- + +/** + * @brief Helper to convert a TalkgroupRuleGroupVoice to JSON. + * @param groupVoice Instance of TalkgroupRuleGroupVoice to convert to JSON. + * @returns json::object JSON object. + */ +json::object tgToJson(const TalkgroupRuleGroupVoice& groupVoice) +{ + json::object tg = json::object(); + + std::string tgName = groupVoice.name(); + tg["name"].set(tgName); + std::string tgAlias = groupVoice.nameAlias(); + tg["alias"].set(tgAlias); + bool invalid = groupVoice.isInvalid(); + tg["invalid"].set(invalid); + + // source stanza + { + json::object source = json::object(); + uint32_t tgId = groupVoice.source().tgId(); + source["tgid"].set(tgId); + uint8_t tgSlot = groupVoice.source().tgSlot(); + source["slot"].set(tgSlot); + tg["source"].set(source); + } + + // config stanza + { + json::object config = json::object(); + bool active = groupVoice.config().active(); + config["active"].set(active); + bool affiliated = groupVoice.config().affiliated(); + config["affiliated"].set(affiliated); + bool parrot = groupVoice.config().parrot(); + config["parrot"].set(parrot); + + json::array inclusions = json::array(); + std::vector inclusion = groupVoice.config().inclusion(); + if (inclusion.size() > 0) { + for (auto inclEntry : inclusion) { + uint32_t peerId = inclEntry; + inclusions.push_back(json::value((double)peerId)); + } + } + config["inclusion"].set(inclusions); + + json::array exclusions = json::array(); + std::vector exclusion = groupVoice.config().exclusion(); + if (exclusion.size() > 0) { + for (auto exclEntry : exclusion) { + uint32_t peerId = exclEntry; + exclusions.push_back(json::value((double)peerId)); + } + } + config["exclusion"].set(exclusions); + + json::array rewrites = json::array(); + std::vector rewrite = groupVoice.config().rewrite(); + if (rewrite.size() > 0) { + for (auto rewrEntry : rewrite) { + json::object rewrite = json::object(); + uint32_t peerId = rewrEntry.peerId(); + rewrite["peerid"].set(peerId); + uint32_t tgId = rewrEntry.tgId(); + rewrite["tgid"].set(tgId); + uint8_t tgSlot = rewrEntry.tgSlot(); + rewrite["slot"].set(tgSlot); + + rewrites.push_back(json::value(rewrite)); + } + } + config["rewrite"].set(rewrites); + + json::array always = json::array(); + std::vector alwaysSend = groupVoice.config().alwaysSend(); + if (alwaysSend.size() > 0) { + for (auto alwaysEntry : alwaysSend) { + uint32_t peerId = alwaysEntry; + always.push_back(json::value((double)peerId)); + } + } + config["always"].set(always); + + json::array preferreds = json::array(); + std::vector preferred = groupVoice.config().preferred(); + if (preferred.size() > 0) { + for (auto prefEntry : preferred) { + uint32_t peerId = prefEntry; + preferreds.push_back(json::value((double)peerId)); + } + } + config["preferred"].set(preferreds); + + tg["config"].set(config); + } + + return tg; +} + // --------------------------------------------------------------------------- // Public Class Members // --------------------------------------------------------------------------- @@ -161,6 +269,11 @@ int HostWS::run() Timer peerStatusUpdate(1000U, 0U, 175U); peerStatusUpdate.start(); + Timer tgDataUpdate(1000U, 30U); + tgDataUpdate.start(); + Timer ridDataUpdate(1000U, 30U); + ridDataUpdate.start(); + setNetDataEventCallback([=](json::object obj) { netDataEvent(obj); }); // main execution loop @@ -257,6 +370,58 @@ int HostWS::run() } } } + + // send full talkgroup list data + tgDataUpdate.clock(ms); + if (tgDataUpdate.isRunning() && tgDataUpdate.hasExpired()) { + tgDataUpdate.start(); + + json::object wsObj = json::object(); + std::string type = "tg_data"; + + json::array tgs = json::array(); + if (g_tidLookup != nullptr) { + if (g_tidLookup->groupVoice().size() > 0) { + for (auto entry : g_tidLookup->groupVoice()) { + json::object tg = tgToJson(entry); + tgs.push_back(json::value(tg)); + } + } + } + + wsObj["payload"].set(tgs); + send(wsObj); + } + + // send full radio ID list data + ridDataUpdate.clock(ms); + if (ridDataUpdate.isRunning() && ridDataUpdate.hasExpired()) { + ridDataUpdate.start(); + + json::object wsObj = json::object(); + std::string type = "rid_data"; + + json::array rids = json::array(); + if (g_ridLookup != nullptr) { + if (g_ridLookup->table().size() > 0) { + for (auto entry : g_ridLookup->table()) { + json::object ridObj = json::object(); + + uint32_t rid = entry.first; + ridObj["id"].set(rid); + bool enabled = entry.second.radioEnabled(); + ridObj["enabled"].set(enabled); + std::string alias = entry.second.radioAlias(); + ridObj["alias"].set(alias); + + rids.push_back(json::value(ridObj)); + } + } + } + + wsObj["payload"].set(rids); + send(wsObj); + } } else { // clear ostream logOutput.str(""); diff --git a/src/sysview/network/PeerNetwork.cpp b/src/sysview/network/PeerNetwork.cpp index 80c0152d..f3128011 100644 --- a/src/sysview/network/PeerNetwork.cpp +++ b/src/sysview/network/PeerNetwork.cpp @@ -11,6 +11,7 @@ #include "common/network/json/json.h" #include "common/p25/dfsi/DFSIDefines.h" #include "common/p25/dfsi/LC.h" +#include "common/zlib/zlib.h" #include "common/Utils.h" #include "network/PeerNetwork.h" #include "SysViewMain.h" @@ -18,6 +19,8 @@ using namespace network; #include +#include +#include // --------------------------------------------------------------------------- // Static Class Members @@ -34,7 +37,14 @@ std::mutex PeerNetwork::m_peerStatusMutex; PeerNetwork::PeerNetwork(const std::string& address, uint16_t port, uint16_t localPort, uint32_t peerId, const std::string& password, bool duplex, bool debug, bool dmr, bool p25, bool nxdn, bool slot1, bool slot2, bool allowActivityTransfer, bool allowDiagnosticTransfer, bool updateLookup, bool saveLookup) : Network(address, port, localPort, peerId, password, duplex, debug, dmr, p25, nxdn, slot1, slot2, allowActivityTransfer, allowDiagnosticTransfer, updateLookup, saveLookup), - peerStatus() + peerStatus(), + m_peerLink(false), + m_tgidCompressedSize(0U), + m_tgidSize(0U), + m_tgidBuffer(nullptr), + m_ridCompressedSize(0U), + m_ridSize(0U), + m_ridBuffer(nullptr) { assert(!address.empty()); assert(port > 0U); @@ -112,6 +122,279 @@ void PeerNetwork::userPacketHandler(uint32_t peerId, FrameQueue::OpcodePair opco } break; + case NET_FUNC::PEER_LINK: + { + switch (opcode.second) { + case NET_SUBFUNC::PL_TALKGROUP_LIST: + { + uint8_t curBlock = data[8U]; + uint8_t blockCnt = data[9U]; + + // if this is the first block store sizes and initialize temp buffer + if (curBlock == 0U) { + m_tgidSize = __GET_UINT32(data, 0U); + m_tgidCompressedSize = __GET_UINT32(data, 4U); + + if (m_tgidBuffer != nullptr) + delete[] m_tgidBuffer; + if (m_tgidSize < PEER_LINK_BLOCK_SIZE) + m_tgidBuffer = new uint8_t[PEER_LINK_BLOCK_SIZE + 1U]; + else + m_tgidBuffer = new uint8_t[m_tgidSize + 1U]; + } + + if (m_tgidBuffer != nullptr) { + if (curBlock < blockCnt) { + uint32_t offs = curBlock * PEER_LINK_BLOCK_SIZE; + ::memcpy(m_tgidBuffer + offs, data + 10U, PEER_LINK_BLOCK_SIZE); + // Utils::dump(1U, "Block Payload", data, 10U + PEER_LINK_BLOCK_SIZE); + } else { + uint32_t offs = curBlock * PEER_LINK_BLOCK_SIZE; + ::memcpy(m_tgidBuffer + offs, data + 10U, PEER_LINK_BLOCK_SIZE); + + // Utils::dump(1U, "Block Payload", data, 10U + PEER_LINK_BLOCK_SIZE); + // Utils::dump(1U, "Compressed Payload", m_tgidBuffer, m_tgidCompressedSize); + + // handle last block + // compression structures + z_stream strm; + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + + // set input data + strm.avail_in = m_tgidCompressedSize; + strm.next_in = m_tgidBuffer; + + // initialize decompression + int ret = inflateInit(&strm); + if (ret != Z_OK) { + LogError(LOG_NET, "PEER %u error initializing ZLIB", peerId); + + m_tgidSize = 0U; + m_tgidCompressedSize = 0U; + if (m_tgidBuffer != nullptr) + delete[] m_tgidBuffer; + m_tgidBuffer = nullptr; + break; + } + + // decompress data + std::vector decompressedData; + uint8_t outbuffer[1024]; + do { + strm.avail_out = sizeof(outbuffer); + strm.next_out = outbuffer; + + ret = inflate(&strm, Z_NO_FLUSH); + if (ret == Z_STREAM_ERROR) { + LogError(LOG_NET, "PEER %u error decompressing TGID list", peerId); + inflateEnd(&strm); + goto tid_lookup_cleanup; // yes - I hate myself; but this is quick + } + + decompressedData.insert(decompressedData.end(), outbuffer, outbuffer + sizeof(outbuffer) - strm.avail_out); + } while (ret != Z_STREAM_END); + + // cleanup + inflateEnd(&strm); + + // scope is intentional + { + uint32_t decompressedLen = strm.total_out; + uint8_t* decompressed = decompressedData.data(); + + // Utils::dump(1U, "Raw TGID Data", decompressed, decompressedLen); + + // check that we got the appropriate data + if (decompressedLen == m_tgidSize) { + // store to file + std::unique_ptr __str = std::make_unique(decompressedLen + 1U); + char* str = __str.get(); + ::memcpy(str, decompressed, decompressedLen); + str[decompressedLen] = 0; // null termination + + // randomize filename + std::ostringstream s; + std::random_device rd; + std::mt19937 mt(rd()); + std::uniform_int_distribution dist(0x00U, 0xFFFFFFFFU); + s << "/tmp/talkgroup_rules.yml." << dist(mt); + + std::string filename = s.str(); + std::ofstream file(filename, std::ofstream::out); + if (file.fail()) { + LogError(LOG_NET, "Cannot open the talkgroup ID lookup file - %s", filename.c_str()); + goto tid_lookup_cleanup; // yes - I hate myself; but this is quick + } + + file << str; + file.close(); + + m_tidLookup->stop(true); + m_tidLookup->filename(filename); + m_tidLookup->reload(); + + // flag this peer as Peer-Link enabled + m_peerLink = true; + + // cleanup temporary file + ::remove(filename.c_str()); + } + else { + LogError(LOG_NET, "PEER %u error decompressed TGID list, was not of expected size! %u != %u", peerId, decompressedLen, m_tgidSize); + } + } + + tid_lookup_cleanup: + m_tgidSize = 0U; + m_tgidCompressedSize = 0U; + if (m_tgidBuffer != nullptr) + delete[] m_tgidBuffer; + m_tgidBuffer = nullptr; + } + } + } + break; + + case NET_SUBFUNC::PL_RID_LIST: + { + uint8_t curBlock = data[8U]; + uint8_t blockCnt = data[9U]; + + // if this is the first block store sizes and initialize temp buffer + if (curBlock == 0U) { + m_ridSize = __GET_UINT32(data, 0U); + m_ridCompressedSize = __GET_UINT32(data, 4U); + + if (m_ridBuffer != nullptr) + delete[] m_ridBuffer; + if (m_ridSize < PEER_LINK_BLOCK_SIZE) + m_ridBuffer = new uint8_t[PEER_LINK_BLOCK_SIZE + 1U]; + else + m_ridBuffer = new uint8_t[m_ridSize + 1U]; + } + + if (m_ridBuffer != nullptr) { + if (curBlock < blockCnt) { + uint32_t offs = curBlock * PEER_LINK_BLOCK_SIZE; + ::memcpy(m_ridBuffer + offs, data + 10U, PEER_LINK_BLOCK_SIZE); + // Utils::dump(1U, "Block Payload", data, 10U + PEER_LINK_BLOCK_SIZE); + } else { + uint32_t offs = curBlock * PEER_LINK_BLOCK_SIZE; + ::memcpy(m_ridBuffer + offs, data + 10U, PEER_LINK_BLOCK_SIZE); + + // Utils::dump(1U, "Block Payload", data, 10U + PEER_LINK_BLOCK_SIZE); + // Utils::dump(1U, "Compressed Payload", m_ridBuffer, m_ridCompressedSize); + + // handle last block + // compression structures + z_stream strm; + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + + // set input data + strm.avail_in = m_ridCompressedSize; + strm.next_in = m_ridBuffer; + + // initialize decompression + int ret = inflateInit(&strm); + if (ret != Z_OK) { + LogError(LOG_NET, "PEER %u error initializing ZLIB", peerId); + + m_ridSize = 0U; + m_ridCompressedSize = 0U; + if (m_ridBuffer != nullptr) + delete[] m_ridBuffer; + m_ridBuffer = nullptr; + break; + } + + // decompress data + std::vector decompressedData; + uint8_t outbuffer[1024]; + do { + strm.avail_out = sizeof(outbuffer); + strm.next_out = outbuffer; + + ret = inflate(&strm, Z_NO_FLUSH); + if (ret == Z_STREAM_ERROR) { + LogError(LOG_NET, "PEER %u error decompressing RID list", peerId); + inflateEnd(&strm); + goto rid_lookup_cleanup; // yes - I hate myself; but this is quick + } + + decompressedData.insert(decompressedData.end(), outbuffer, outbuffer + sizeof(outbuffer) - strm.avail_out); + } while (ret != Z_STREAM_END); + + // cleanup + inflateEnd(&strm); + + // scope is intentional + { + uint32_t decompressedLen = strm.total_out; + uint8_t* decompressed = decompressedData.data(); + + // Utils::dump(1U, "Raw RID Data", decompressed, decompressedLen); + + // check that we got the appropriate data + if (decompressedLen == m_ridSize) { + // store to file + std::unique_ptr __str = std::make_unique(decompressedLen + 1U); + char* str = __str.get(); + ::memcpy(str, decompressed, decompressedLen); + str[decompressedLen] = 0; // null termination + + // randomize filename + std::ostringstream s; + std::random_device rd; + std::mt19937 mt(rd()); + std::uniform_int_distribution dist(0x00U, 0xFFFFFFFFU); + s << "/tmp/rid_acl.dat." << dist(mt); + + std::string filename = s.str(); + std::ofstream file(filename, std::ofstream::out); + if (file.fail()) { + LogError(LOG_NET, "Cannot open the radio ID lookup file - %s", filename.c_str()); + goto rid_lookup_cleanup; // yes - I hate myself; but this is quick + } + + file << str; + file.close(); + + m_ridLookup->stop(true); + m_ridLookup->filename(filename); + m_ridLookup->reload(); + + // flag this peer as Peer-Link enabled + m_peerLink = true; + + // cleanup temporary file + ::remove(filename.c_str()); + } + else { + LogError(LOG_NET, "PEER %u error decompressed RID list, was not of expected size! %u != %u", peerId, decompressedLen, m_ridSize); + } + } + + rid_lookup_cleanup: + m_ridSize = 0U; + m_ridCompressedSize = 0U; + if (m_ridBuffer != nullptr) + delete[] m_ridBuffer; + m_ridBuffer = nullptr; + } + } + } + break; + + default: + break; + } + } + break; + default: Utils::dump("unknown opcode from the master", data, length); break; @@ -160,6 +443,8 @@ bool PeerNetwork::writeConfig() rcon["port"].set(m_restApiPort); // REST API Port config["rcon"].set(rcon); + bool external = true; + config["externalPeer"].set(external); // External Peer Marker bool convPeer = true; config["conventionalPeer"].set(convPeer); // Conventional Peer Marker bool sysView = true; diff --git a/src/sysview/network/PeerNetwork.h b/src/sysview/network/PeerNetwork.h index 1321fcce..c02efdce 100644 --- a/src/sysview/network/PeerNetwork.h +++ b/src/sysview/network/PeerNetwork.h @@ -57,6 +57,11 @@ namespace network PeerNetwork(const std::string& address, uint16_t port, uint16_t localPort, uint32_t peerId, const std::string& password, bool duplex, bool debug, bool dmr, bool p25, bool nxdn, bool slot1, bool slot2, bool allowActivityTransfer, bool allowDiagnosticTransfer, bool updateLookup, bool saveLookup); + /** + * @brief Flag indicating whether or not SysView has received Peer-Link data transfers. + */ + bool hasPeerLink() const { return m_peerLink; } + /** * @brief Helper to lock the peer status mutex. */ @@ -91,6 +96,17 @@ namespace network private: static std::mutex m_peerStatusMutex; + bool m_peerLink; + + uint32_t m_tgidCompressedSize; + uint32_t m_tgidSize; + + uint8_t* m_tgidBuffer; + + uint32_t m_ridCompressedSize; + uint32_t m_ridSize; + + uint8_t* m_ridBuffer; }; } // namespace network