// SPDX-License-Identifier: GPL-2.0-only /* * Digital Voice Modem - Converged FNE Software * GPLv2 Open Source. Use is subject to license terms. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * Copyright (C) 2023-2025 Bryan Biedenkapp, N2PLL * */ #include "fne/Defines.h" #include "common/edac/SHA256.h" #include "common/network/json/json.h" #include "common/zlib/zlib.h" #include "common/Log.h" #include "common/Utils.h" #include "network/FNENetwork.h" #include "network/callhandler/TagDMRData.h" #include "network/callhandler/TagP25Data.h" #include "network/callhandler/TagNXDNData.h" #include "fne/ActivityLog.h" #include "HostFNE.h" using namespace network; using namespace network::callhandler; #include #include #include #include #include // --------------------------------------------------------------------------- // Constants // --------------------------------------------------------------------------- const uint32_t MAX_HARD_CONN_CAP = 250U; const uint8_t MAX_PEER_LIST_BEFORE_FLUSH = 10U; const uint32_t MAX_RID_LIST_CHUNK = 50U; // --------------------------------------------------------------------------- // Static Class Members // --------------------------------------------------------------------------- std::mutex FNENetwork::m_peerMutex; // --------------------------------------------------------------------------- // Public Class Members // --------------------------------------------------------------------------- /* Initializes a new instance of the FNENetwork class. */ FNENetwork::FNENetwork(HostFNE* host, const std::string& address, uint16_t port, uint32_t peerId, const std::string& password, bool debug, bool verbose, bool reportPeerPing, bool dmr, bool p25, bool nxdn, uint32_t parrotDelay, bool parrotGrantDemand, bool allowActivityTransfer, bool allowDiagnosticTransfer, uint32_t pingTime, uint32_t updateLookupTime) : BaseNetwork(peerId, true, debug, true, true, allowActivityTransfer, allowDiagnosticTransfer), m_tagDMR(nullptr), m_tagP25(nullptr), m_tagNXDN(nullptr), m_host(host), m_address(address), m_port(port), m_password(password), m_dmrEnabled(dmr), m_p25Enabled(p25), m_nxdnEnabled(nxdn), m_parrotDelay(parrotDelay), m_parrotDelayTimer(1000U, 0U, parrotDelay), m_parrotGrantDemand(parrotGrantDemand), m_parrotOnlyOriginating(false), m_ridLookup(nullptr), m_tidLookup(nullptr), m_peerListLookup(nullptr), m_status(NET_STAT_INVALID), m_peers(), m_peerLinkPeers(), m_peerAffiliations(), m_ccPeerMap(), m_maintainenceTimer(1000U, pingTime), m_updateLookupTime(updateLookupTime * 60U), m_softConnLimit(0U), m_callInProgress(false), m_disallowAdjStsBcast(false), m_disallowExtAdjStsBcast(true), m_allowConvSiteAffOverride(false), m_disallowCallTerm(false), m_restrictGrantToAffOnly(false), m_filterHeaders(true), m_filterTerminators(true), m_disallowU2U(false), m_dropU2UPeerTable(), m_enableInfluxDB(false), m_influxServerAddress("127.0.0.1"), m_influxServerPort(8086U), m_influxServerToken(), m_influxOrg("dvm"), m_influxBucket("dvm"), m_influxLogRawData(false), m_disablePacketData(false), m_dumpPacketData(false), m_verbosePacketData(false), m_reportPeerPing(reportPeerPing), m_verbose(verbose) { assert(host != nullptr); assert(!address.empty()); assert(port > 0U); assert(!password.empty()); m_tagDMR = new TagDMRData(this, debug); m_tagP25 = new TagP25Data(this, debug); m_tagNXDN = new TagNXDNData(this, debug); } /* Finalizes a instance of the FNENetwork class. */ FNENetwork::~FNENetwork() { delete m_tagDMR; delete m_tagP25; delete m_tagNXDN; } /* Helper to set configuration options. */ void FNENetwork::setOptions(yaml::Node& conf, bool printOptions) { m_disallowAdjStsBcast = conf["disallowAdjStsBcast"].as(false); m_disallowExtAdjStsBcast = conf["disallowExtAdjStsBcast"].as(true); m_allowConvSiteAffOverride = conf["allowConvSiteAffOverride"].as(true); m_disallowCallTerm = conf["disallowCallTerm"].as(false); m_softConnLimit = conf["connectionLimit"].as(MAX_HARD_CONN_CAP); if (m_softConnLimit > MAX_HARD_CONN_CAP) { m_softConnLimit = MAX_HARD_CONN_CAP; } // always force disable ADJ_STS_BCAST to external peers if the all option // is enabled if (m_disallowAdjStsBcast) { m_disallowExtAdjStsBcast = true; } m_enableInfluxDB = conf["enableInflux"].as(false); m_influxServerAddress = conf["influxServerAddress"].as("127.0.0.1"); m_influxServerPort = conf["influxServerPort"].as(8086U); m_influxServerToken = conf["influxServerToken"].as(); m_influxOrg = conf["influxOrg"].as("dvm"); m_influxBucket = conf["influxBucket"].as("dvm"); m_influxLogRawData = conf["influxLogRawData"].as(false); if (m_enableInfluxDB) { m_influxServer = influxdb::ServerInfo(m_influxServerAddress, m_influxServerPort, m_influxOrg, m_influxServerToken, m_influxBucket); } m_parrotOnlyOriginating = conf["parrotOnlyToOrginiatingPeer"].as(false); m_restrictGrantToAffOnly = conf["restrictGrantToAffiliatedOnly"].as(false); m_filterHeaders = conf["filterHeaders"].as(true); m_filterTerminators = conf["filterTerminators"].as(true); m_disablePacketData = conf["disablePacketData"].as(false); m_dumpPacketData = conf["dumpPacketData"].as(false); m_verbosePacketData = conf["verbosePacketData"].as(false); /* ** Drop Unit to Unit Peers */ m_disallowU2U = conf["disallowAllUnitToUnit"].as(false); yaml::Node& dropUnitToUnit = conf["dropUnitToUnit"]; if (dropUnitToUnit.size() > 0U) { for (size_t i = 0; i < dropUnitToUnit.size(); i++) { uint32_t peerId = (uint32_t)::strtoul(dropUnitToUnit[i].as("0").c_str(), NULL, 10); if (peerId != 0U) { m_dropU2UPeerTable.push_back(peerId); } } } if (printOptions) { LogInfo(" Maximum Permitted Connections: %u", m_softConnLimit); LogInfo(" Disable adjacent site broadcasts to any peers: %s", m_disallowAdjStsBcast ? "yes" : "no"); if (m_disallowAdjStsBcast) { LogWarning(LOG_NET, "NOTICE: All P25 ADJ_STS_BCAST messages will be blocked and dropped!"); } LogInfo(" Disable Packet Data: %s", m_disablePacketData ? "yes" : "no"); LogInfo(" Dump Packet Data: %s", m_dumpPacketData ? "yes" : "no"); LogInfo(" Disable P25 ADJ_STS_BCAST to external peers: %s", m_disallowExtAdjStsBcast ? "yes" : "no"); LogInfo(" Disable P25 TDULC call termination broadcasts to any peers: %s", m_disallowCallTerm ? "yes" : "no"); LogInfo(" Allow conventional sites to override affiliation and receive all traffic: %s", m_allowConvSiteAffOverride ? "yes" : "no"); LogInfo(" Restrict grant response by affiliation: %s", m_restrictGrantToAffOnly ? "yes" : "no"); LogInfo(" Traffic Headers Filtered by Destination ID: %s", m_filterHeaders ? "yes" : "no"); LogInfo(" Traffic Terminators Filtered by Destination ID: %s", m_filterTerminators ? "yes" : "no"); LogInfo(" Disallow Unit-to-Unit: %s", m_disallowU2U ? "yes" : "no"); LogInfo(" InfluxDB Reporting Enabled: %s", m_enableInfluxDB ? "yes" : "no"); if (m_enableInfluxDB) { LogInfo(" InfluxDB Address: %s", m_influxServerAddress.c_str()); LogInfo(" InfluxDB Port: %u", m_influxServerPort); LogInfo(" InfluxDB Organization: %s", m_influxOrg.c_str()); LogInfo(" InfluxDB Bucket: %s", m_influxBucket.c_str()); LogInfo(" InfluxDB Log Raw TSBK/CSBK/RCCH: %s", m_influxLogRawData ? "yes" : "no"); } LogInfo(" Parrot Repeat to Only Originating Peer: %s", m_parrotOnlyOriginating ? "yes" : "no"); } } /* Sets the instances of the Radio ID, Talkgroup Rules, and Peer List lookup tables. */ void FNENetwork::setLookups(lookups::RadioIdLookup* ridLookup, lookups::TalkgroupRulesLookup* tidLookup, lookups::PeerListLookup* peerListLookup) { m_ridLookup = ridLookup; m_tidLookup = tidLookup; m_peerListLookup = peerListLookup; } /* Sets endpoint preshared encryption key. */ void FNENetwork::setPresharedKey(const uint8_t* presharedKey) { m_socket->setPresharedKey(presharedKey); } /* Process a data frames from the network. */ void FNENetwork::processNetwork() { if (m_status != NET_STAT_MST_RUNNING) { return; } sockaddr_storage address; uint32_t addrLen; frame::RTPHeader rtpHeader; frame::RTPFNEHeader fneHeader; int length = 0U; // read message UInt8Array buffer = m_frameQueue->read(length, address, addrLen, &rtpHeader, &fneHeader); if (length > 0) { if (m_debug) Utils::dump(1U, "Network Message", buffer.get(), length); uint32_t peerId = fneHeader.getPeerId(); NetPacketRequest* req = new NetPacketRequest(); req->peerId = peerId; req->address = address; req->addrLen = addrLen; req->rtpHeader = rtpHeader; req->fneHeader = fneHeader; req->length = length; req->buffer = new uint8_t[length]; ::memcpy(req->buffer, buffer.get(), length); if (!Thread::runAsThread(this, threadedNetworkRx, req)) { delete[] req->buffer; delete req; return; } } } /* Updates the timer by the passed number of milliseconds. */ void FNENetwork::clock(uint32_t ms) { if (m_status != NET_STAT_MST_RUNNING) { return; } uint64_t now = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); if (m_forceListUpdate) { for (auto peer : m_peers) { peerACLUpdate(peer.first); } m_forceListUpdate = false; } m_maintainenceTimer.clock(ms); if (m_maintainenceTimer.isRunning() && m_maintainenceTimer.hasExpired()) { // check to see if any peers have been quiet (no ping) longer than allowed std::vector peersToRemove = std::vector(); for (auto peer : m_peers) { uint32_t id = peer.first; FNEPeerConnection* connection = peer.second; if (connection != nullptr) { if (connection->connected()) { uint64_t dt = connection->lastPing() + ((m_host->m_pingTime * 1000) * m_host->m_maxMissedPings); if (dt < now) { LogInfoEx(LOG_NET, "PEER %u (%s) timed out, dt = %u, now = %u", id, connection->identity().c_str(), dt, now); peersToRemove.push_back(id); } } } } // remove any peers for (uint32_t peerId : peersToRemove) { FNEPeerConnection* connection = m_peers[peerId]; m_peers.erase(peerId); if (connection != nullptr) { delete connection; } erasePeerAffiliations(peerId); } // roll the RTP timestamp if no call is in progress if (!m_callInProgress) { frame::RTPHeader::resetStartTime(); m_frameQueue->clearTimestamps(); } // send active peer list to Peer-Link masters if (m_host->m_peerNetworks.size() > 0) { for (auto peer : m_host->m_peerNetworks) { if (peer.second != nullptr) { if (peer.second->isEnabled() && peer.second->isPeerLink()) { if (m_peers.size() > 0) { json::array peers = json::array(); for (auto entry : m_peers) { uint32_t peerId = entry.first; network::FNEPeerConnection* peerConn = entry.second; if (peerConn != nullptr) { json::object peerObj = fneConnObject(peerId, peerConn); uint32_t peerNetPeerId = peer.second->getPeerId(); peerObj["parentPeerId"].set(peerNetPeerId); peers.push_back(json::value(peerObj)); } } peer.second->writePeerLinkPeers(&peers); } } } } } // send ACL updates forcibly to any Peer-Link peers for (auto peer : m_peers) { uint32_t id = peer.first; FNEPeerConnection* connection = peer.second; if (connection != nullptr) { if (connection->connected() && connection->isPeerLink()) { // does this peer need an ACL update? uint64_t dt = connection->lastACLUpdate() + (m_updateLookupTime * 1000); if (dt < now) { LogInfoEx(LOG_NET, "PEER %u (%s) updating ACL list, dt = %u, now = %u", id, connection->identity().c_str(), dt, now); peerACLUpdate(id); connection->lastACLUpdate(now); } } } } m_maintainenceTimer.start(); } m_parrotDelayTimer.clock(ms); if (m_parrotDelayTimer.isRunning() && m_parrotDelayTimer.hasExpired()) { // if the DMR handler has parrot frames to playback, playback a frame if (m_tagDMR->hasParrotFrames()) { m_tagDMR->playbackParrot(); } // if the P25 handler has parrot frames to playback, playback a frame if (m_tagP25->hasParrotFrames()) { m_tagP25->playbackParrot(); } // if the NXDN handler has parrot frames to playback, playback a frame if (m_tagNXDN->hasParrotFrames()) { m_tagNXDN->playbackParrot(); } } if (!m_tagDMR->hasParrotFrames() && !m_tagP25->hasParrotFrames() && !m_tagNXDN->hasParrotFrames() && m_parrotDelayTimer.isRunning() && m_parrotDelayTimer.hasExpired()) { m_parrotDelayTimer.stop(); } } /* Opens connection to the network. */ bool FNENetwork::open() { if (m_debug) LogMessage(LOG_NET, "Opening Network"); m_status = NET_STAT_MST_RUNNING; m_maintainenceTimer.start(); m_socket = new udp::Socket(m_address, m_port); // reinitialize the frame queue if (m_frameQueue != nullptr) { delete m_frameQueue; m_frameQueue = new FrameQueue(m_socket, m_peerId, m_debug); } bool ret = m_socket->open(); if (!ret) { m_status = NET_STAT_INVALID; } return ret; } /* Closes connection to the network. */ void FNENetwork::close() { if (m_debug) LogMessage(LOG_NET, "Closing Network"); if (m_status == NET_STAT_MST_RUNNING) { uint8_t buffer[1U]; ::memset(buffer, 0x00U, 1U); uint32_t streamId = createStreamId(); for (auto peer : m_peers) { writePeer(peer.first, { NET_FUNC::MST_CLOSING, NET_SUBFUNC::NOP }, buffer, 1U, RTP_END_OF_CALL_SEQ, streamId, false); } } m_socket->close(); m_maintainenceTimer.stop(); m_status = NET_STAT_INVALID; } // --------------------------------------------------------------------------- // Private Class Members // --------------------------------------------------------------------------- /* Process a data frames from the network. */ void* FNENetwork::threadedNetworkRx(void* arg) { NetPacketRequest* req = (NetPacketRequest*)arg; if (req != nullptr) { #if defined(_WIN32) ::CloseHandle(req->thread); #else ::pthread_detach(req->thread); #endif // defined(_WIN32) uint64_t now = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); FNENetwork* network = static_cast(req->obj); if (network == nullptr) { delete req; return nullptr; } if (req->length > 0) { uint32_t peerId = req->fneHeader.getPeerId(); uint32_t streamId = req->fneHeader.getStreamId(); std::stringstream peerName; peerName << peerId << ":rx-pckt"; #ifdef _GNU_SOURCE ::pthread_setname_np(req->thread, peerName.str().c_str()); #endif // _GNU_SOURCE // update current peer packet sequence and stream ID if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end()) && streamId != 0U) { FNEPeerConnection* connection = network->m_peers[peerId]; uint16_t pktSeq = req->rtpHeader.getSequence(); if (connection != nullptr) { if (pktSeq == RTP_END_OF_CALL_SEQ) { // only reset packet sequences if we're a PROTOCOL or RPTC function if ((req->fneHeader.getFunction() == NET_FUNC::PROTOCOL) || (req->fneHeader.getFunction() == NET_FUNC::RPTC)) { connection->eraseStreamPktSeq(streamId); // attempt to erase packet sequence for the stream } } else { if (connection->hasStreamPktSeq(streamId)) { uint16_t currPkt = connection->getStreamPktSeq(streamId); if ((pktSeq != currPkt) && (pktSeq != (RTP_END_OF_CALL_SEQ - 1U)) && pktSeq != 0U) { LogWarning(LOG_NET, "PEER %u (%s) stream %u out-of-sequence; %u != %u", peerId, connection->identity().c_str(), streamId, pktSeq, currPkt); } } connection->incStreamPktSeq(streamId, pktSeq + 1U); } } network->m_peers[peerId] = connection; } // if we don't have a stream ID and are receiving call data -- throw an error and discard if (streamId == 0U && req->fneHeader.getFunction() == NET_FUNC::PROTOCOL) { std::string peerIdentity = network->resolvePeerIdentity(peerId); LogError(LOG_NET, "PEER %u (%s) malformed packet (no stream ID for a call?)", peerId, peerIdentity.c_str()); if (req->buffer != nullptr) delete req->buffer; delete req; return nullptr; } // process incoming message frame opcodes switch (req->fneHeader.getFunction()) { case NET_FUNC::PROTOCOL: { if (req->fneHeader.getSubFunction() == NET_SUBFUNC::PROTOCOL_SUBFUNC_DMR) { // Encapsulated DMR data frame if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { FNEPeerConnection* connection = network->m_peers[peerId]; if (connection != nullptr) { std::string ip = udp::Socket::address(req->address); connection->lastPing(now); // validate peer (simple validation really) if (connection->connected() && connection->address() == ip) { if (network->m_dmrEnabled) { if (network->m_tagDMR != nullptr) { network->m_tagDMR->processFrame(req->buffer, req->length, peerId, req->rtpHeader.getSequence(), streamId); } } else { network->writePeerNAK(peerId, streamId, TAG_DMR_DATA, NET_CONN_NAK_MODE_NOT_ENABLED); } } } } else { network->writePeerNAK(peerId, TAG_DMR_DATA, NET_CONN_NAK_FNE_UNAUTHORIZED, req->address, req->addrLen); } } else if (req->fneHeader.getSubFunction() == NET_SUBFUNC::PROTOCOL_SUBFUNC_P25) { // Encapsulated P25 data frame if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { FNEPeerConnection* connection = network->m_peers[peerId]; if (connection != nullptr) { std::string ip = udp::Socket::address(req->address); connection->lastPing(now); // validate peer (simple validation really) if (connection->connected() && connection->address() == ip) { if (network->m_p25Enabled) { if (network->m_tagP25 != nullptr) { network->m_tagP25->processFrame(req->buffer, req->length, peerId, req->rtpHeader.getSequence(), streamId); } } else { network->writePeerNAK(peerId, streamId, TAG_P25_DATA, NET_CONN_NAK_MODE_NOT_ENABLED); } } } } else { network->writePeerNAK(peerId, TAG_P25_DATA, NET_CONN_NAK_FNE_UNAUTHORIZED, req->address, req->addrLen); } } else if (req->fneHeader.getSubFunction() == NET_SUBFUNC::PROTOCOL_SUBFUNC_NXDN) { // Encapsulated NXDN data frame if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { FNEPeerConnection* connection = network->m_peers[peerId]; if (connection != nullptr) { std::string ip = udp::Socket::address(req->address); connection->lastPing(now); // validate peer (simple validation really) if (connection->connected() && connection->address() == ip) { if (network->m_nxdnEnabled) { if (network->m_tagNXDN != nullptr) { network->m_tagNXDN->processFrame(req->buffer, req->length, peerId, req->rtpHeader.getSequence(), streamId); } } else { network->writePeerNAK(peerId, streamId, TAG_NXDN_DATA, NET_CONN_NAK_MODE_NOT_ENABLED); } } } } else { network->writePeerNAK(peerId, TAG_NXDN_DATA, NET_CONN_NAK_FNE_UNAUTHORIZED, req->address, req->addrLen); } } else { Utils::dump("Unknown protocol opcode from peer", req->buffer, req->length); } } break; case NET_FUNC::RPTL: // Repeater Login { if (peerId > 0 && (network->m_peers.find(peerId) == network->m_peers.end())) { if (network->m_peers.size() >= MAX_HARD_CONN_CAP) { LogError(LOG_NET, "PEER %u attempted to connect with no more connections available, currConnections = %u", peerId, network->m_peers.size()); network->writePeerNAK(peerId, TAG_REPEATER_LOGIN, NET_CONN_NAK_FNE_MAX_CONN, req->address, req->addrLen); break; } if (network->m_softConnLimit > 0U && network->m_peers.size() >= network->m_softConnLimit) { LogError(LOG_NET, "PEER %u attempted to connect with no more connections available, maxConnections = %u, currConnections = %u", peerId, network->m_softConnLimit, network->m_peers.size()); network->writePeerNAK(peerId, TAG_REPEATER_LOGIN, NET_CONN_NAK_FNE_MAX_CONN, req->address, req->addrLen); break; } FNEPeerConnection* connection = new FNEPeerConnection(peerId, req->address, req->addrLen); connection->lastPing(now); network->setupRepeaterLogin(peerId, streamId, connection); // check if the peer is in the peer ACL list if (network->m_peerListLookup->getACL()) { if (network->m_peerListLookup->isPeerListEmpty()) { LogWarning(LOG_NET, "Peer List ACL enabled, but we have an empty peer list? Passing all peers."); } if (!network->m_peerListLookup->isPeerAllowed(peerId) && !network->m_peerListLookup->isPeerListEmpty()) { if (network->m_peerListLookup->getMode() == lookups::PeerListLookup::BLACKLIST) { LogWarning(LOG_NET, "PEER %u RPTL, blacklisted from access", peerId); } else { LogWarning(LOG_NET, "PEER %u RPTL, failed whitelist check", peerId); } network->writePeerNAK(peerId, TAG_REPEATER_LOGIN, NET_CONN_NAK_PEER_ACL, req->address, req->addrLen); delete connection; network->erasePeer(peerId); } } } else { // check if the peer is in our peer list -- if he is, and he isn't in a running state, reset // the login sequence if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { FNEPeerConnection* connection = network->m_peers[peerId]; if (connection != nullptr) { if (connection->connectionState() == NET_STAT_RUNNING) { LogMessage(LOG_NET, "PEER %u (%s) resetting peer connection, connectionState = %u", peerId, connection->identity().c_str(), connection->connectionState()); delete connection; connection = new FNEPeerConnection(peerId, req->address, req->addrLen); connection->lastPing(now); network->erasePeerAffiliations(peerId); network->setupRepeaterLogin(peerId, streamId, connection); // check if the peer is in the peer ACL list if (network->m_peerListLookup->getACL()) { if (network->m_peerListLookup->isPeerListEmpty()) { LogWarning(LOG_NET, "Peer List ACL enabled, but we have an empty peer list? Passing all peers."); } if (!network->m_peerListLookup->isPeerAllowed(peerId) && !network->m_peerListLookup->isPeerListEmpty()) { if (network->m_peerListLookup->getMode() == lookups::PeerListLookup::BLACKLIST) { LogWarning(LOG_NET, "PEER %u RPTL, blacklisted from access", peerId); } else { LogWarning(LOG_NET, "PEER %u RPTL, failed whitelist check", peerId); } network->writePeerNAK(peerId, TAG_REPEATER_LOGIN, NET_CONN_NAK_PEER_ACL, req->address, req->addrLen); delete connection; network->erasePeer(peerId); } } } else { network->writePeerNAK(peerId, TAG_REPEATER_LOGIN, NET_CONN_NAK_BAD_CONN_STATE, req->address, req->addrLen); LogWarning(LOG_NET, "PEER %u (%s) RPTL NAK, bad connection state, connectionState = %u", peerId, connection->identity().c_str(), connection->connectionState()); delete connection; network->erasePeer(peerId); } } else { network->writePeerNAK(peerId, TAG_REPEATER_LOGIN, NET_CONN_NAK_BAD_CONN_STATE, req->address, req->addrLen); network->erasePeer(peerId); LogWarning(LOG_NET, "PEER %u RPTL NAK, having no connection", peerId); } } } } break; case NET_FUNC::RPTK: // Repeater Authentication { if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { FNEPeerConnection* connection = network->m_peers[peerId]; if (connection != nullptr) { connection->lastPing(now); if (connection->connectionState() == NET_STAT_WAITING_AUTHORISATION) { // get the hash from the frame message UInt8Array __hash = std::make_unique(req->length - 8U); uint8_t* hash = __hash.get(); ::memset(hash, 0x00U, req->length - 8U); ::memcpy(hash, req->buffer + 8U, req->length - 8U); // generate our own hash uint8_t salt[4U]; ::memset(salt, 0x00U, 4U); __SET_UINT32(connection->salt(), salt, 0U); std::string passwordForPeer = network->m_password; // check if the peer is in the peer ACL list bool validAcl = true; if (network->m_peerListLookup->getACL()) { if (!network->m_peerListLookup->isPeerAllowed(peerId) && !network->m_peerListLookup->isPeerListEmpty()) { if (network->m_peerListLookup->getMode() == lookups::PeerListLookup::BLACKLIST) { LogWarning(LOG_NET, "PEER %u RPTK, blacklisted from access", peerId); } else { LogWarning(LOG_NET, "PEER %u RPTK, failed whitelist check", peerId); } validAcl = false; } else { lookups::PeerId peerEntry = network->m_peerListLookup->find(peerId); if (peerEntry.peerDefault()) { validAcl = false; // default peer IDs are a no-no as they have no data thus fail ACL check } else { passwordForPeer = peerEntry.peerPassword(); if (passwordForPeer.length() == 0) { passwordForPeer = network->m_password; } } } if (network->m_peerListLookup->isPeerListEmpty()) { LogWarning(LOG_NET, "Peer List ACL enabled, but we have an empty peer list? Passing all peers."); validAcl = true; } } if (validAcl) { size_t size = passwordForPeer.size(); uint8_t* in = new uint8_t[size + sizeof(uint32_t)]; ::memcpy(in, salt, sizeof(uint32_t)); for (size_t i = 0U; i < size; i++) in[i + sizeof(uint32_t)] = passwordForPeer.at(i); uint8_t out[32U]; edac::SHA256 sha256; sha256.buffer(in, (uint32_t)(size + sizeof(uint32_t)), out); delete[] in; // validate hash bool validHash = false; if (req->length - 8U == 32U) { validHash = true; for (uint8_t i = 0; i < 32U; i++) { if (hash[i] != out[i]) { validHash = false; break; } } } if (validHash) { connection->connectionState(NET_STAT_WAITING_CONFIG); network->writePeerACK(peerId, streamId); LogInfoEx(LOG_NET, "PEER %u RPTK ACK, completed the login exchange", peerId); network->m_peers[peerId] = connection; } else { LogWarning(LOG_NET, "PEER %u RPTK NAK, failed the login exchange", peerId); network->writePeerNAK(peerId, TAG_REPEATER_AUTH, NET_CONN_NAK_FNE_UNAUTHORIZED, req->address, req->addrLen); network->erasePeer(peerId); } } else { network->writePeerNAK(peerId, TAG_REPEATER_AUTH, NET_CONN_NAK_PEER_ACL, req->address, req->addrLen); network->erasePeer(peerId); } } else { LogWarning(LOG_NET, "PEER %u RPTK NAK, login exchange while in an incorrect state, connectionState = %u", peerId, connection->connectionState()); network->writePeerNAK(peerId, TAG_REPEATER_AUTH, NET_CONN_NAK_BAD_CONN_STATE, req->address, req->addrLen); delete connection; network->erasePeer(peerId); } } } else { network->writePeerNAK(peerId, TAG_REPEATER_AUTH, NET_CONN_NAK_BAD_CONN_STATE, req->address, req->addrLen); network->erasePeer(peerId); LogWarning(LOG_NET, "PEER %u RPTK NAK, having no connection", peerId); } } break; case NET_FUNC::RPTC: // Repeater Configuration { if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { FNEPeerConnection* connection = network->m_peers[peerId]; if (connection != nullptr) { connection->lastPing(now); if (connection->connectionState() == NET_STAT_WAITING_CONFIG) { UInt8Array __rawPayload = std::make_unique(req->length - 8U); uint8_t* rawPayload = __rawPayload.get(); ::memset(rawPayload, 0x00U, req->length - 8U); ::memcpy(rawPayload, req->buffer + 8U, req->length - 8U); std::string payload(rawPayload, rawPayload + (req->length - 8U)); // parse JSON body json::value v; std::string err = json::parse(v, payload); if (!err.empty()) { LogWarning(LOG_NET, "PEER %u RPTC NAK, supplied invalid configuration data", peerId); network->writePeerNAK(peerId, TAG_REPEATER_AUTH, NET_CONN_NAK_INVALID_CONFIG_DATA, req->address, req->addrLen); network->erasePeer(peerId); } else { // ensure parsed JSON is an object if (!v.is()) { LogWarning(LOG_NET, "PEER %u RPTC NAK, supplied invalid configuration data", peerId); network->writePeerNAK(peerId, TAG_REPEATER_AUTH, NET_CONN_NAK_INVALID_CONFIG_DATA, req->address, req->addrLen); network->erasePeer(peerId); } else { connection->config(v.get()); connection->connectionState(NET_STAT_RUNNING); connection->connected(true); connection->pingsReceived(0U); connection->lastPing(now); connection->lastACLUpdate(now); network->m_peers[peerId] = connection; // attach extra notification data to the RPTC ACK to notify the peer of // the use of the alternate diagnostic port uint8_t buffer[1U]; buffer[0U] = 0x00U; if (network->m_host->m_useAlternatePortForDiagnostics) { buffer[0U] = 0x80U; } network->writePeerACK(peerId, streamId, buffer, 1U); LogInfoEx(LOG_NET, "PEER %u RPTC ACK, completed the configuration exchange", peerId); json::object peerConfig = connection->config(); if (peerConfig["identity"].is()) { std::string identity = peerConfig["identity"].get(); connection->identity(identity); LogInfoEx(LOG_NET, "PEER %u reports identity [%8s]", peerId, identity.c_str()); } // is the peer reporting it is an external peer? if (peerConfig["externalPeer"].is()) { bool external = peerConfig["externalPeer"].get(); connection->isExternalPeer(external); if (external) LogInfoEx(LOG_NET, "PEER %u reports external peer", peerId); // check if the peer is participating in peer link lookups::PeerId peerEntry = network->m_peerListLookup->find(req->peerId); if (!peerEntry.peerDefault()) { if (peerEntry.peerLink()) { if (network->m_host->m_useAlternatePortForDiagnostics) { connection->isPeerLink(true); if (external) LogInfoEx(LOG_NET, "PEER %u configured for Peer-Link", peerId); } else { LogError(LOG_NET, "PEER %u, Peer-Link operations *require* the alternate diagnostics port option to be enabled.", peerId); LogError(LOG_NET, "PEER %u, will not receive Peer-Link ACL updates.", peerId); } } } } // is the peer reporting it is a conventional peer? if (peerConfig["conventionalPeer"].is()) { if (network->m_allowConvSiteAffOverride) { bool convPeer = peerConfig["conventionalPeer"].get(); connection->isConventionalPeer(convPeer); if (convPeer) LogInfoEx(LOG_NET, "PEER %u reports conventional peer", peerId); } } // is the peer reporting it is a SysView peer? if (peerConfig["sysView"].is()) { bool sysView = peerConfig["sysView"].get(); connection->isSysView(sysView); if (sysView) LogInfoEx(LOG_NET, "PEER %u reports SysView peer", peerId); } if (peerConfig["software"].is()) { std::string software = peerConfig["software"].get(); LogInfoEx(LOG_NET, "PEER %u reports software %s", peerId, software.c_str()); } // setup the affiliations list for this peer std::stringstream peerName; peerName << "PEER " << peerId; network->createPeerAffiliations(peerId, peerName.str()); // spin up a thread and send ACL list over to peer network->peerACLUpdate(peerId); } } } else { LogWarning(LOG_NET, "PEER %u (%s) RPTC NAK, login exchange while in an incorrect state, connectionState = %u", peerId, connection->identity().c_str(), connection->connectionState()); network->writePeerNAK(peerId, TAG_REPEATER_CONFIG, NET_CONN_NAK_BAD_CONN_STATE, req->address, req->addrLen); network->erasePeer(peerId); } } } else { network->writePeerNAK(peerId, TAG_REPEATER_CONFIG, NET_CONN_NAK_BAD_CONN_STATE, req->address, req->addrLen); LogWarning(LOG_NET, "PEER %u RPTC NAK, having no connection", peerId); } } break; case NET_FUNC::RPT_CLOSING: // Repeater Closing (Disconnect) { if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { FNEPeerConnection* connection = network->m_peers[peerId]; if (connection != nullptr) { std::string ip = udp::Socket::address(req->address); // validate peer (simple validation really) if (connection->connected() && connection->address() == ip) { LogInfoEx(LOG_NET, "PEER %u (%s) is closing down", peerId, connection->identity().c_str()); if (network->erasePeer(peerId)) { network->erasePeerAffiliations(peerId); delete connection; } } } } } break; case NET_FUNC::PING: // Repeater Ping { if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { FNEPeerConnection* connection = network->m_peers[peerId]; if (connection != nullptr) { std::string ip = udp::Socket::address(req->address); // validate peer (simple validation really) if (connection->connected() && connection->address() == ip) { uint32_t pingsRx = connection->pingsReceived(); uint64_t lastPing = connection->lastPing(); pingsRx++; connection->pingsReceived(pingsRx); connection->lastPing(now); // does this peer need an ACL update? uint64_t dt = connection->lastACLUpdate() + (network->m_updateLookupTime * 1000); if (dt < now) { LogInfoEx(LOG_NET, "PEER %u (%s) updating ACL list, dt = %u, now = %u", peerId, connection->identity().c_str(), dt, now); dt = connection->lastACLUpdate() + ((network->m_updateLookupTime * 1000) * 2); if (connection->streamCount() <= 1 || (dt < now)) { network->peerACLUpdate(peerId); connection->lastACLUpdate(now); } } uint8_t payload[8U]; ::memset(payload, 0x00U, 8U); // split ulong64_t (8 byte) value into bytes payload[0U] = (uint8_t)((now >> 56) & 0xFFU); payload[1U] = (uint8_t)((now >> 48) & 0xFFU); payload[2U] = (uint8_t)((now >> 40) & 0xFFU); payload[3U] = (uint8_t)((now >> 32) & 0xFFU); payload[4U] = (uint8_t)((now >> 24) & 0xFFU); payload[5U] = (uint8_t)((now >> 16) & 0xFFU); payload[6U] = (uint8_t)((now >> 8) & 0xFFU); payload[7U] = (uint8_t)((now >> 0) & 0xFFU); network->m_peers[peerId] = connection; network->writePeerCommand(peerId, { NET_FUNC::PONG, NET_SUBFUNC::NOP }, payload, 8U, streamId, false); if (network->m_reportPeerPing) { LogInfoEx(LOG_NET, "PEER %u (%s) ping, pingsReceived = %u, lastPing = %u, now = %u", peerId, connection->identity().c_str(), connection->pingsReceived(), lastPing, now); } } else { network->writePeerNAK(peerId, streamId, TAG_REPEATER_PING); } } } } break; case NET_FUNC::GRANT_REQ: // Repeater Grant Request { if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { FNEPeerConnection* connection = network->m_peers[peerId]; if (connection != nullptr) { std::string ip = udp::Socket::address(req->address); // validate peer (simple validation really) if (connection->connected() && connection->address() == ip) { uint32_t srcId = __GET_UINT16(req->buffer, 11U); // Source Address uint32_t dstId = __GET_UINT16(req->buffer, 15U); // Destination Address uint8_t slot = req->buffer[19U]; bool unitToUnit = (req->buffer[19U] & 0x80U) == 0x80U; DVM_STATE state = (DVM_STATE)req->buffer[20U]; // DVM Mode State switch (state) { case STATE_DMR: if (network->m_dmrEnabled) { if (network->m_tagDMR != nullptr) { network->m_tagDMR->processGrantReq(srcId, dstId, slot, unitToUnit, peerId, req->rtpHeader.getSequence(), streamId); } else { network->writePeerNAK(peerId, streamId, TAG_DMR_DATA, NET_CONN_NAK_MODE_NOT_ENABLED); } } break; case STATE_P25: if (network->m_p25Enabled) { if (network->m_tagP25 != nullptr) { network->m_tagP25->processGrantReq(srcId, dstId, unitToUnit, peerId, req->rtpHeader.getSequence(), streamId); } else { network->writePeerNAK(peerId, streamId, TAG_P25_DATA, NET_CONN_NAK_MODE_NOT_ENABLED); } } break; case STATE_NXDN: if (network->m_nxdnEnabled) { if (network->m_tagNXDN != nullptr) { network->m_tagNXDN->processGrantReq(srcId, dstId, unitToUnit, peerId, req->rtpHeader.getSequence(), streamId); } else { network->writePeerNAK(peerId, streamId, TAG_NXDN_DATA, NET_CONN_NAK_MODE_NOT_ENABLED); } } break; default: network->writePeerNAK(peerId, streamId, TAG_REPEATER_GRANT, NET_CONN_NAK_ILLEGAL_PACKET); Utils::dump("unknown state for grant request from the peer", req->buffer, req->length); break; } } else { network->writePeerNAK(peerId, streamId, TAG_REPEATER_GRANT, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } } break; case NET_FUNC::TRANSFER: { // are activity/diagnostic transfers occurring from the alternate port? if (network->m_host->m_useAlternatePortForDiagnostics) { break; // for performance and other reasons -- simply ignore the entire NET_FUNC::TRANSFER at this point // since they should be coming from the alternate port anyway } if (req->fneHeader.getSubFunction() == NET_SUBFUNC::TRANSFER_SUBFUNC_ACTIVITY) { // Peer Activity Log Transfer if (network->m_allowActivityTransfer) { if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { FNEPeerConnection* connection = network->m_peers[peerId]; if (connection != nullptr) { std::string ip = udp::Socket::address(req->address); // validate peer (simple validation really) if (connection->connected() && connection->address() == ip) { UInt8Array __rawPayload = std::make_unique(req->length - 11U); uint8_t* rawPayload = __rawPayload.get(); ::memset(rawPayload, 0x00U, req->length - 11U); ::memcpy(rawPayload, req->buffer + 11U, req->length - 11U); std::string payload(rawPayload, rawPayload + (req->length - 11U)); ::ActivityLog("%.9u (%8s) %s", peerId, connection->identity().c_str(), payload.c_str()); // report activity log to InfluxDB if (network->m_enableInfluxDB) { influxdb::QueryBuilder() .meas("activity") .tag("peerId", std::to_string(peerId)) .field("identity", connection->identity()) .field("msg", payload) .timestamp(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count()) .request(network->m_influxServer); } } else { network->writePeerNAK(peerId, streamId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } } } else if (req->fneHeader.getSubFunction() == NET_SUBFUNC::TRANSFER_SUBFUNC_DIAG) { // Peer Diagnostic Log Transfer if (network->m_allowDiagnosticTransfer) { if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { FNEPeerConnection* connection = network->m_peers[peerId]; if (connection != nullptr) { std::string ip = udp::Socket::address(req->address); // validate peer (simple validation really) if (connection->connected() && connection->address() == ip) { UInt8Array __rawPayload = std::make_unique(req->length - 11U); uint8_t* rawPayload = __rawPayload.get(); ::memset(rawPayload, 0x00U, req->length - 11U); ::memcpy(rawPayload, req->buffer + 11U, req->length - 11U); std::string payload(rawPayload, rawPayload + (req->length - 11U)); bool currState = g_disableTimeDisplay; g_disableTimeDisplay = true; ::Log(9999U, nullptr, "%.9u (%8s) %s", peerId, connection->identity().c_str(), payload.c_str()); g_disableTimeDisplay = currState; // report diagnostic log to InfluxDB if (network->m_enableInfluxDB) { influxdb::QueryBuilder() .meas("diag") .tag("peerId", std::to_string(peerId)) .field("identity", connection->identity()) .field("msg", payload) .timestamp(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count()) .request(network->m_influxServer); } } else { network->writePeerNAK(peerId, streamId, TAG_TRANSFER_DIAG_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } } } else if (req->fneHeader.getSubFunction() == NET_SUBFUNC::TRANSFER_SUBFUNC_STATUS) { // Peer Status Transfer // main traffic port status transfers aren't supported for performance reasons } else { network->writePeerNAK(peerId, streamId, TAG_TRANSFER, NET_CONN_NAK_ILLEGAL_PACKET); Utils::dump("unknown transfer opcode from the peer", req->buffer, req->length); } } break; case NET_FUNC::ANNOUNCE: { if (req->fneHeader.getSubFunction() == NET_SUBFUNC::ANNC_SUBFUNC_GRP_AFFIL) { // Announce Group Affiliation if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { FNEPeerConnection* connection = network->m_peers[peerId]; if (connection != nullptr) { std::string ip = udp::Socket::address(req->address); lookups::AffiliationLookup* aff = network->m_peerAffiliations[peerId]; if (aff == nullptr) { LogError(LOG_NET, "PEER %u (%s) has uninitialized affiliations lookup?", peerId, connection->identity().c_str()); network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID); } // validate peer (simple validation really) if (connection->connected() && connection->address() == ip && aff != nullptr) { uint32_t srcId = __GET_UINT16(req->buffer, 0U); // Source Address uint32_t dstId = __GET_UINT16(req->buffer, 3U); // Destination Address aff->groupUnaff(srcId); aff->groupAff(srcId, dstId); // attempt to repeat traffic to Peer-Link masters if (network->m_host->m_peerNetworks.size() > 0) { for (auto peer : network->m_host->m_peerNetworks) { if (peer.second != nullptr) { if (peer.second->isEnabled() && peer.second->isPeerLink()) { peer.second->writeMaster({ NET_FUNC::ANNOUNCE, NET_SUBFUNC::ANNC_SUBFUNC_GRP_AFFIL }, req->buffer, req->length, req->rtpHeader.getSequence(), streamId, false, false); } } } } } else { network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } } else if (req->fneHeader.getSubFunction() == NET_SUBFUNC::ANNC_SUBFUNC_UNIT_REG) { // Announce Unit Registration if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { FNEPeerConnection* connection = network->m_peers[peerId]; if (connection != nullptr) { std::string ip = udp::Socket::address(req->address); lookups::AffiliationLookup* aff = network->m_peerAffiliations[peerId]; if (aff == nullptr) { LogError(LOG_NET, "PEER %u (%s) has uninitialized affiliations lookup?", peerId, connection->identity().c_str()); network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID); } // validate peer (simple validation really) if (connection->connected() && connection->address() == ip && aff != nullptr) { uint32_t srcId = __GET_UINT16(req->buffer, 0U); // Source Address aff->unitReg(srcId); // attempt to repeat traffic to Peer-Link masters if (network->m_host->m_peerNetworks.size() > 0) { for (auto peer : network->m_host->m_peerNetworks) { if (peer.second != nullptr) { if (peer.second->isEnabled() && peer.second->isPeerLink()) { peer.second->writeMaster({ NET_FUNC::ANNOUNCE, NET_SUBFUNC::ANNC_SUBFUNC_UNIT_REG }, req->buffer, req->length, req->rtpHeader.getSequence(), streamId, false, false); } } } } } else { network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } } else if (req->fneHeader.getSubFunction() == NET_SUBFUNC::ANNC_SUBFUNC_UNIT_DEREG) { // Announce Unit Deregistration if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { FNEPeerConnection* connection = network->m_peers[peerId]; if (connection != nullptr) { std::string ip = udp::Socket::address(req->address); lookups::AffiliationLookup* aff = network->m_peerAffiliations[peerId]; if (aff == nullptr) { LogError(LOG_NET, "PEER %u (%s) has uninitialized affiliations lookup?", peerId, connection->identity().c_str()); network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID); } // validate peer (simple validation really) if (connection->connected() && connection->address() == ip && aff != nullptr) { uint32_t srcId = __GET_UINT16(req->buffer, 0U); // Source Address aff->unitDereg(srcId); // attempt to repeat traffic to Peer-Link masters if (network->m_host->m_peerNetworks.size() > 0) { for (auto peer : network->m_host->m_peerNetworks) { if (peer.second != nullptr) { if (peer.second->isEnabled() && peer.second->isPeerLink()) { peer.second->writeMaster({ NET_FUNC::ANNOUNCE, NET_SUBFUNC::ANNC_SUBFUNC_UNIT_DEREG }, req->buffer, req->length, req->rtpHeader.getSequence(), streamId, false, false); } } } } } else { network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } } else if (req->fneHeader.getSubFunction() == NET_SUBFUNC::ANNC_SUBFUNC_GRP_UNAFFIL) { // Announce Group Affiliation Removal if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { FNEPeerConnection* connection = network->m_peers[peerId]; if (connection != nullptr) { std::string ip = udp::Socket::address(req->address); lookups::AffiliationLookup* aff = network->m_peerAffiliations[peerId]; if (aff == nullptr) { LogError(LOG_NET, "PEER %u (%s) has uninitialized affiliations lookup?", peerId, connection->identity().c_str()); network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID); } // validate peer (simple validation really) if (connection->connected() && connection->address() == ip && aff != nullptr) { uint32_t srcId = __GET_UINT16(req->buffer, 0U); // Source Address aff->groupUnaff(srcId); // attempt to repeat traffic to Peer-Link masters if (network->m_host->m_peerNetworks.size() > 0) { for (auto peer : network->m_host->m_peerNetworks) { if (peer.second != nullptr) { if (peer.second->isEnabled() && peer.second->isPeerLink()) { peer.second->writeMaster({ NET_FUNC::ANNOUNCE, NET_SUBFUNC::ANNC_SUBFUNC_GRP_UNAFFIL }, req->buffer, req->length, req->rtpHeader.getSequence(), streamId, false, false); } } } } } else { network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } } else if (req->fneHeader.getSubFunction() == NET_SUBFUNC::ANNC_SUBFUNC_AFFILS) { // Announce Update All Affiliations if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { FNEPeerConnection* connection = network->m_peers[peerId]; if (connection != nullptr) { std::string ip = udp::Socket::address(req->address); // validate peer (simple validation really) if (connection->connected() && connection->address() == ip) { lookups::AffiliationLookup* aff = network->m_peerAffiliations[peerId]; if (aff == nullptr) { LogError(LOG_NET, "PEER %u (%s) has uninitialized affiliations lookup?", peerId, connection->identity().c_str()); network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_INVALID); } if (aff != nullptr) { aff->clearGroupAff(0U, true); // update TGID lists uint32_t len = __GET_UINT32(req->buffer, 0U); uint32_t offs = 4U; for (uint32_t i = 0; i < len; i++) { uint32_t srcId = __GET_UINT16(req->buffer, offs); uint32_t dstId = __GET_UINT16(req->buffer, offs + 4U); aff->groupAff(srcId, dstId); offs += 8U; } LogMessage(LOG_NET, "PEER %u (%s) announced %u affiliations", peerId, connection->identity().c_str(), len); // attempt to repeat traffic to Peer-Link masters if (network->m_host->m_peerNetworks.size() > 0) { for (auto peer : network->m_host->m_peerNetworks) { if (peer.second != nullptr) { if (peer.second->isEnabled() && peer.second->isPeerLink()) { peer.second->writeMaster({ NET_FUNC::ANNOUNCE, NET_SUBFUNC::ANNC_SUBFUNC_AFFILS }, req->buffer, req->length, req->rtpHeader.getSequence(), streamId, false, false); } } } } } } else { network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } } else if (req->fneHeader.getSubFunction() == NET_SUBFUNC::ANNC_SUBFUNC_SITE_VC) { // Announce Site VCs if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { FNEPeerConnection* connection = network->m_peers[peerId]; if (connection != nullptr) { std::string ip = udp::Socket::address(req->address); // validate peer (simple validation really) if (connection->connected() && connection->address() == ip) { std::vector vcPeers; // update peer association uint32_t len = __GET_UINT32(req->buffer, 0U); uint32_t offs = 4U; for (uint32_t i = 0; i < len; i++) { uint32_t vcPeerId = __GET_UINT32(req->buffer, offs); if (vcPeerId > 0 && (network->m_peers.find(vcPeerId) != network->m_peers.end())) { FNEPeerConnection* vcConnection = network->m_peers[vcPeerId]; if (vcConnection != nullptr) { vcConnection->ccPeerId(peerId); vcPeers.push_back(vcPeerId); } } offs += 4U; } LogMessage(LOG_NET, "PEER %u (%s) announced %u VCs", peerId, connection->identity().c_str(), len); network->m_ccPeerMap[peerId] = vcPeers; // attempt to repeat traffic to Peer-Link masters if (network->m_host->m_peerNetworks.size() > 0) { for (auto peer : network->m_host->m_peerNetworks) { if (peer.second != nullptr) { if (peer.second->isEnabled() && peer.second->isPeerLink()) { peer.second->writeMaster({ NET_FUNC::ANNOUNCE, NET_SUBFUNC::ANNC_SUBFUNC_SITE_VC }, req->buffer, req->length, req->rtpHeader.getSequence(), streamId, false, false); } } } } } else { network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED); } } } } else { network->writePeerNAK(peerId, streamId, TAG_ANNOUNCE, NET_CONN_NAK_ILLEGAL_PACKET); Utils::dump("unknown announcement opcode from the peer", req->buffer, req->length); } } break; default: Utils::dump("unknown opcode from the peer", req->buffer, req->length); break; } } if (req->buffer != nullptr) delete[] req->buffer; delete req; } return nullptr; } /* Checks if the passed peer ID is blocked from unit-to-unit traffic. */ bool FNENetwork::checkU2UDroppedPeer(uint32_t peerId) { if (m_dropU2UPeerTable.empty()) return false; if (std::find(m_dropU2UPeerTable.begin(), m_dropU2UPeerTable.end(), peerId) != m_dropU2UPeerTable.end()) { return true; } return false; } /* Erases a stream ID from the given peer ID connection. */ void FNENetwork::eraseStreamPktSeq(uint32_t peerId, uint32_t streamId) { if (peerId > 0 && (m_peers.find(peerId) != m_peers.end())) { FNEPeerConnection* connection = m_peers[peerId]; if (connection != nullptr) { std::lock_guard lock(m_peerMutex); connection->eraseStreamPktSeq(streamId); } } } /* Helper to create a peer on the peers affiliations list. */ void FNENetwork::createPeerAffiliations(uint32_t peerId, std::string peerName) { erasePeerAffiliations(peerId); std::lock_guard lock(m_peerMutex); lookups::ChannelLookup* chLookup = new lookups::ChannelLookup(); m_peerAffiliations[peerId] = new lookups::AffiliationLookup(peerName, chLookup, m_verbose); m_peerAffiliations[peerId]->setDisableUnitRegTimeout(true); // FNE doesn't allow unit registration timeouts (notification must come from the peers) } /* Helper to erase the peer from the peers affiliations list. */ bool FNENetwork::erasePeerAffiliations(uint32_t peerId) { std::lock_guard lock(m_peerMutex); auto it = std::find_if(m_peerAffiliations.begin(), m_peerAffiliations.end(), [&](PeerAffiliationMapPair x) { return x.first == peerId; }); if (it != m_peerAffiliations.end()) { lookups::AffiliationLookup* aff = m_peerAffiliations[peerId]; if (aff != nullptr) { lookups::ChannelLookup* rfCh = aff->rfCh(); if (rfCh != nullptr) delete rfCh; delete aff; } m_peerAffiliations.erase(peerId); return true; } return false; } /* Helper to erase the peer from the peers list. */ bool FNENetwork::erasePeer(uint32_t peerId) { std::lock_guard lock(m_peerMutex); { auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; }); if (it != m_peers.end()) { m_peers.erase(peerId); } } // erase any CC maps for this peer { auto it = std::find_if(m_ccPeerMap.begin(), m_ccPeerMap.end(), [&](auto x) { return x.first == peerId; }); if (it != m_ccPeerMap.end()) { m_ccPeerMap.erase(peerId); } } // erase any Peer-Link entries for this peer { auto it = std::find_if(m_peerLinkPeers.begin(), m_peerLinkPeers.end(), [&](auto x) { return x.first == peerId; }); if (it != m_peerLinkPeers.end()) { m_peerLinkPeers.erase(peerId); } } return true; } /* Helper to create a JSON representation of a FNE peer connection. */ json::object FNENetwork::fneConnObject(uint32_t peerId, FNEPeerConnection *conn) { json::object peerObj = json::object(); peerObj["peerId"].set(peerId); std::string address = conn->address(); peerObj["address"].set(address); uint16_t port = conn->port(); peerObj["port"].set(port); bool connected = conn->connected(); peerObj["connected"].set(connected); uint32_t connectionState = (uint32_t)conn->connectionState(); peerObj["connectionState"].set(connectionState); uint32_t pingsReceived = conn->pingsReceived(); peerObj["pingsReceived"].set(pingsReceived); uint64_t lastPing = conn->lastPing(); peerObj["lastPing"].set(lastPing); uint32_t ccPeerId = conn->ccPeerId(); peerObj["controlChannel"].set(ccPeerId); json::object peerConfig = conn->config(); if (peerConfig["rcon"].is()) peerConfig.erase("rcon"); peerObj["config"].set(peerConfig); json::array voiceChannels = json::array(); auto it = std::find_if(m_ccPeerMap.begin(), m_ccPeerMap.end(), [&](auto x) { return x.first == peerId; }); if (it != m_ccPeerMap.end()) { std::vector vcPeers = m_ccPeerMap[peerId]; for (uint32_t vcEntry : vcPeers) { voiceChannels.push_back(json::value((double)vcEntry)); } } peerObj["voiceChannels"].set(voiceChannels); return peerObj; } /* Helper to reset a peer connection. */ bool FNENetwork::resetPeer(uint32_t peerId) { if (peerId > 0 && (m_peers.find(peerId) != m_peers.end())) { FNEPeerConnection* connection = m_peers[peerId]; if (connection != nullptr) { sockaddr_storage addr = connection->socketStorage(); uint32_t addrLen = connection->sockStorageLen(); LogInfoEx(LOG_NET, "PEER %u (%s) resetting peer connection", peerId, connection->identity().c_str()); writePeerNAK(peerId, TAG_REPEATER_LOGIN, NET_CONN_NAK_PEER_RESET, addr, addrLen); delete connection; erasePeer(peerId); return true; } } LogWarning(LOG_NET, "PEER %u reset failed; peer not found.", peerId); return false; } /* Helper to resolve the peer ID to its identity string. */ std::string FNENetwork::resolvePeerIdentity(uint32_t peerId) { std::lock_guard lock(m_peerMutex); auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; }); if (it != m_peers.end()) { if (it->second != nullptr) { FNEPeerConnection* peer = it->second; return peer->identity(); } } return std::string(); } /* Helper to complete setting up a repeater login request. */ void FNENetwork::setupRepeaterLogin(uint32_t peerId, uint32_t streamId, FNEPeerConnection* connection) { std::uniform_int_distribution dist(DVM_RAND_MIN, DVM_RAND_MAX); connection->salt(dist(m_random)); LogInfoEx(LOG_NET, "PEER %u started login from, %s:%u", peerId, connection->address().c_str(), connection->port()); connection->connectionState(NET_STAT_WAITING_AUTHORISATION); m_peers[peerId] = connection; // transmit salt to peer uint8_t salt[4U]; ::memset(salt, 0x00U, 4U); __SET_UINT32(connection->salt(), salt, 0U); writePeerACK(peerId, streamId, salt, 4U); LogInfoEx(LOG_NET, "PEER %u RPTL ACK, challenge response sent for login", peerId); } /* Helper to send the ACL lists to the specified peer in a separate thread. */ void FNENetwork::peerACLUpdate(uint32_t peerId) { ACLUpdateRequest* req = new ACLUpdateRequest(); req->peerId = peerId; std::stringstream peerName; peerName << peerId << ":acl-update"; if (!Thread::runAsThread(this, threadedACLUpdate, req)) { delete req; return; } // pthread magic to rename the thread properly #ifdef _GNU_SOURCE ::pthread_setname_np(req->thread, peerName.str().c_str()); #endif // _GNU_SOURCE } /* Helper to send the ACL lists to the specified peer in a separate thread. */ void* FNENetwork::threadedACLUpdate(void* arg) { ACLUpdateRequest* req = (ACLUpdateRequest*)arg; if (req != nullptr) { #if defined(_WIN32) ::CloseHandle(req->thread); #else ::pthread_detach(req->thread); #endif // defined(_WIN32) FNENetwork* network = static_cast(req->obj); if (network == nullptr) { delete req; return nullptr; } std::string peerIdentity = network->resolvePeerIdentity(req->peerId); FNEPeerConnection* connection = network->m_peers[req->peerId]; if (connection != nullptr) { uint32_t aclStreamId = network->createStreamId(); // if the connection is an external peer, and peer is participating in peer link, // send the peer proper configuration data if (connection->isExternalPeer() && connection->isPeerLink()) { LogInfoEx(LOG_NET, "PEER %u (%s) sending Peer-Link ACL list updates", req->peerId, peerIdentity.c_str()); network->writeWhitelistRIDs(req->peerId, aclStreamId, true); network->writeTGIDs(req->peerId, aclStreamId, true); network->writePeerList(req->peerId, aclStreamId); } else { LogInfoEx(LOG_NET, "PEER %u (%s) sending ACL list updates", req->peerId, peerIdentity.c_str()); network->writeWhitelistRIDs(req->peerId, aclStreamId, false); network->writeBlacklistRIDs(req->peerId, aclStreamId); network->writeTGIDs(req->peerId, aclStreamId, false); network->writeDeactiveTGIDs(req->peerId, aclStreamId); } } delete req; } return nullptr; } /* Helper to send the list of whitelisted RIDs to the specified peer. */ void FNENetwork::writeWhitelistRIDs(uint32_t peerId, uint32_t streamId, bool isExternalPeer) { uint64_t now = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); // sending PEER_LINK style RID list to external peers if (isExternalPeer) { FNEPeerConnection* connection = m_peers[peerId]; if (connection != nullptr) { std::string filename = m_ridLookup->filename(); if (filename.empty()) { return; } // read entire file into string buffer std::stringstream b; std::ifstream stream(filename); if (stream.is_open()) { while (stream.peek() != EOF) { b << (char)stream.get(); } stream.close(); } // convert to a byte array uint32_t len = b.str().size(); UInt8Array __buffer = std::make_unique(len); uint8_t* buffer = __buffer.get(); ::memset(buffer, 0x00U, len); ::memcpy(buffer, b.str().data(), len); // compression structures z_stream strm; strm.zalloc = Z_NULL; strm.zfree = Z_NULL; strm.opaque = Z_NULL; // initialize compression if (deflateInit(&strm, Z_DEFAULT_COMPRESSION) != Z_OK) { LogError(LOG_NET, "PEER %u (%s) error initializing ZLIB", peerId, connection->identity().c_str()); return; } // set input data strm.avail_in = len; strm.next_in = buffer; // compress data std::vector compressedData; int ret; do { // resize the output buffer as needed compressedData.resize(compressedData.size() + 16384); strm.avail_out = 16384; strm.next_out = compressedData.data() + compressedData.size() - 16384; ret = deflate(&strm, Z_FINISH); if (ret == Z_STREAM_ERROR) { LogError(LOG_NET, "PEER %u (%s) error compressing TGID list", peerId, connection->identity().c_str()); deflateEnd(&strm); return; } } while (ret != Z_STREAM_END); // resize the output buffer to the actual compressed data size compressedData.resize(strm.total_out); // cleanup deflateEnd(&strm); uint32_t compressedLen = strm.total_out; uint8_t* compressed = compressedData.data(); // Utils::dump(1U, "Compressed Payload", compressed, compressedLen); // transmit TGIDs uint8_t blockCnt = (compressedLen / PEER_LINK_BLOCK_SIZE) + (compressedLen % PEER_LINK_BLOCK_SIZE ? 1U : 0U); uint32_t offs = 0U; for (uint8_t i = 0U; i < blockCnt; i++) { // build dataset uint16_t bufSize = 10U + (PEER_LINK_BLOCK_SIZE); UInt8Array __payload = std::make_unique(bufSize); uint8_t* payload = __payload.get(); ::memset(payload, 0x00U, bufSize); if (i == 0U) { __SET_UINT32(len, payload, 0U); __SET_UINT32(compressedLen, payload, 4U); } payload[8U] = i; payload[9U] = blockCnt - 1U; uint32_t blockSize = PEER_LINK_BLOCK_SIZE; if (offs + PEER_LINK_BLOCK_SIZE > compressedLen) blockSize = PEER_LINK_BLOCK_SIZE - ((offs + PEER_LINK_BLOCK_SIZE) - compressedLen); ::memcpy(payload + 10U, compressed + offs, blockSize); if (m_debug) Utils::dump(1U, "Peer-Link RID Block Payload", payload, bufSize); offs += PEER_LINK_BLOCK_SIZE; writePeer(peerId, { NET_FUNC::PEER_LINK, NET_SUBFUNC::PL_RID_LIST }, payload, bufSize, 0U, streamId, false, true, true); } connection->lastPing(now); } return; } // send radio ID white/black lists std::vector ridWhitelist; auto ridLookups = m_ridLookup->table(); for (auto entry : ridLookups) { uint32_t id = entry.first; if (entry.second.radioEnabled()) { ridWhitelist.push_back(id); } } if (ridWhitelist.size() == 0U) { return; } // send a chunk of RIDs to the peer FNEPeerConnection* connection = m_peers[peerId]; if (connection != nullptr) { uint32_t chunkCnt = (ridWhitelist.size() / MAX_RID_LIST_CHUNK) + 1U; for (uint32_t i = 0U; i < chunkCnt; i++) { size_t listSize = ridWhitelist.size(); if (chunkCnt > 1U) { listSize = MAX_RID_LIST_CHUNK; if (i == chunkCnt - 1U) { // this is a disgusting dirty hack... listSize = ::abs((long)((i * MAX_RID_LIST_CHUNK) - ridWhitelist.size())); } } if (listSize > ridWhitelist.size()) { listSize = ridWhitelist.size(); } // Ignore lists of size 0 (happens on even multiples of 50, TODO: there's probably a better fix for this) if (listSize == 0) { continue; } // build dataset uint16_t bufSize = 4U + (listSize * 4U); UInt8Array __payload = std::make_unique(bufSize); uint8_t* payload = __payload.get(); ::memset(payload, 0x00U, bufSize); __SET_UINT32(listSize, payload, 0U); // write whitelisted IDs to whitelist payload uint32_t offs = 4U; for (uint32_t j = 0; j < listSize; j++) { uint32_t id = ridWhitelist.at(j + (i * MAX_RID_LIST_CHUNK)); if (m_debug) LogDebug(LOG_NET, "PEER %u (%s) whitelisting RID %u (%d / %d)", peerId, connection->identity().c_str(), id, i, j); __SET_UINT32(id, payload, offs); offs += 4U; } writePeerCommand(peerId, { NET_FUNC::MASTER, NET_SUBFUNC::MASTER_SUBFUNC_WL_RID }, payload, bufSize, streamId, true); } connection->lastPing(now); } } /* Helper to send the list of whitelisted RIDs to the specified peer. */ void FNENetwork::writeBlacklistRIDs(uint32_t peerId, uint32_t streamId) { uint64_t now = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); // send radio ID blacklist std::vector ridBlacklist; auto ridLookups = m_ridLookup->table(); for (auto entry : ridLookups) { uint32_t id = entry.first; if (!entry.second.radioEnabled()) { ridBlacklist.push_back(id); } } if (ridBlacklist.size() == 0U) { return; } // send a chunk of RIDs to the peer FNEPeerConnection* connection = m_peers[peerId]; if (connection != nullptr) { uint32_t chunkCnt = (ridBlacklist.size() / MAX_RID_LIST_CHUNK) + 1U; for (uint32_t i = 0U; i < chunkCnt; i++) { size_t listSize = ridBlacklist.size(); if (chunkCnt > 1U) { listSize = MAX_RID_LIST_CHUNK; if (i == chunkCnt - 1U) { // this is a disgusting dirty hack... listSize = ::abs((long)((i * MAX_RID_LIST_CHUNK) - ridBlacklist.size())); } } if (listSize > ridBlacklist.size()) { listSize = ridBlacklist.size(); } // Ignore lists of size 0 (happens on even multiples of 50, TODO: there's probably a better fix for this) if (listSize == 0) { continue; } // build dataset uint16_t bufSize = 4U + (listSize * 4U); UInt8Array __payload = std::make_unique(bufSize); uint8_t* payload = __payload.get(); ::memset(payload, 0x00U, bufSize); __SET_UINT32(listSize, payload, 0U); // write blacklisted IDs to blacklist payload uint32_t offs = 4U; for (uint32_t j = 0; j < listSize; j++) { uint32_t id = ridBlacklist.at(j + (i * MAX_RID_LIST_CHUNK)); if (m_debug) LogDebug(LOG_NET, "PEER %u (%s) blacklisting RID %u (%d / %d)", peerId, connection->identity().c_str(), id, i, j); __SET_UINT32(id, payload, offs); offs += 4U; } writePeerCommand(peerId, { NET_FUNC::MASTER, NET_SUBFUNC::MASTER_SUBFUNC_BL_RID }, payload, bufSize, streamId, true); } connection->lastPing(now); } } /* Helper to send the list of active TGIDs to the specified peer. */ void FNENetwork::writeTGIDs(uint32_t peerId, uint32_t streamId, bool isExternalPeer) { uint64_t now = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); if (!m_tidLookup->sendTalkgroups()) { return; } // sending PEER_LINK style TGID list to external peers if (isExternalPeer) { FNEPeerConnection* connection = m_peers[peerId]; if (connection != nullptr) { std::string filename = m_tidLookup->filename(); if (filename.empty()) { return; } // read entire file into string buffer std::stringstream b; std::ifstream stream(filename); if (stream.is_open()) { while (stream.peek() != EOF) { b << (char)stream.get(); } stream.close(); } // convert to a byte array uint32_t len = b.str().size(); UInt8Array __buffer = std::make_unique(len); uint8_t* buffer = __buffer.get(); ::memset(buffer, 0x00U, len); ::memcpy(buffer, b.str().data(), len); // compression structures z_stream strm; strm.zalloc = Z_NULL; strm.zfree = Z_NULL; strm.opaque = Z_NULL; // initialize compression if (deflateInit(&strm, Z_DEFAULT_COMPRESSION) != Z_OK) { LogError(LOG_NET, "PEER %u (%s) error initializing ZLIB", peerId, connection->identity().c_str()); return; } // set input data strm.avail_in = len; strm.next_in = buffer; // compress data std::vector compressedData; int ret; do { // resize the output buffer as needed compressedData.resize(compressedData.size() + 16384); strm.avail_out = 16384; strm.next_out = compressedData.data() + compressedData.size() - 16384; ret = deflate(&strm, Z_FINISH); if (ret == Z_STREAM_ERROR) { LogError(LOG_NET, "PEER %u (%s) error compressing TGID list", peerId, connection->identity().c_str()); deflateEnd(&strm); return; } } while (ret != Z_STREAM_END); // resize the output buffer to the actual compressed data size compressedData.resize(strm.total_out); // cleanup deflateEnd(&strm); uint32_t compressedLen = strm.total_out; uint8_t* compressed = compressedData.data(); // Utils::dump(1U, "Compressed Payload", compressed, compressedLen); // transmit TGIDs uint8_t blockCnt = (compressedLen / PEER_LINK_BLOCK_SIZE) + (compressedLen % PEER_LINK_BLOCK_SIZE ? 1U : 0U); uint32_t offs = 0U; for (uint8_t i = 0U; i < blockCnt; i++) { // build dataset uint16_t bufSize = 10U + (PEER_LINK_BLOCK_SIZE); UInt8Array __payload = std::make_unique(bufSize); uint8_t* payload = __payload.get(); ::memset(payload, 0x00U, bufSize); if (i == 0U) { __SET_UINT32(len, payload, 0U); __SET_UINT32(compressedLen, payload, 4U); } payload[8U] = i; payload[9U] = blockCnt - 1U; uint32_t blockSize = PEER_LINK_BLOCK_SIZE; if (offs + PEER_LINK_BLOCK_SIZE > compressedLen) blockSize = PEER_LINK_BLOCK_SIZE - ((offs + PEER_LINK_BLOCK_SIZE) - compressedLen); ::memcpy(payload + 10U, compressed + offs, blockSize); if (m_debug) Utils::dump(1U, "Peer-Link TGID Block Payload", payload, bufSize); offs += PEER_LINK_BLOCK_SIZE; writePeer(peerId, { NET_FUNC::PEER_LINK, NET_SUBFUNC::PL_TALKGROUP_LIST }, payload, bufSize, 0U, streamId, false, true, true); } connection->lastPing(now); } return; } std::vector> tgidList; auto groupVoice = m_tidLookup->groupVoice(); for (auto entry : groupVoice) { std::vector inclusion = entry.config().inclusion(); std::vector exclusion = entry.config().exclusion(); std::vector preferred = entry.config().preferred(); // peer inclusion lists take priority over exclusion lists if (inclusion.size() > 0) { auto it = std::find(inclusion.begin(), inclusion.end(), peerId); if (it == inclusion.end()) { // LogDebug(LOG_NET, "PEER %u TGID %u TS %u -- not included peer", peerId, entry.source().tgId(), entry.source().tgSlot()); continue; } } else { if (exclusion.size() > 0) { auto it = std::find(exclusion.begin(), exclusion.end(), peerId); if (it != exclusion.end()) { // LogDebug(LOG_NET, "PEER %u TGID %u TS %u -- excluded peer", peerId, entry.source().tgId(), entry.source().tgSlot()); continue; } } } // determine if the peer is non-preferred bool nonPreferred = false; if (preferred.size() > 0) { auto it = std::find(preferred.begin(), preferred.end(), peerId); if (it == preferred.end()) { nonPreferred = true; } } if (entry.config().active()) { uint8_t slotNo = entry.source().tgSlot(); // set the $80 bit of the slot number to flag non-preferred if (nonPreferred) { slotNo |= 0x80U; } // set the $40 bit of the slot number to identify if this TG is by affiliation or not if (entry.config().affiliated()) { slotNo |= 0x40U; } tgidList.push_back({ entry.source().tgId(), slotNo }); } } // build dataset UInt8Array __payload = std::make_unique(4U + (tgidList.size() * 5U)); uint8_t* payload = __payload.get(); ::memset(payload, 0x00U, 4U + (tgidList.size() * 5U)); __SET_UINT32(tgidList.size(), payload, 0U); // write talkgroup IDs to active TGID payload uint32_t offs = 4U; for (std::pair tg : tgidList) { if (m_debug) { std::string peerIdentity = resolvePeerIdentity(peerId); LogDebug(LOG_NET, "PEER %u (%s) activating TGID %u TS %u", peerId, peerIdentity.c_str(), tg.first, tg.second); } __SET_UINT32(tg.first, payload, offs); payload[offs + 4U] = tg.second; offs += 5U; } writePeerCommand(peerId, { NET_FUNC::MASTER, NET_SUBFUNC::MASTER_SUBFUNC_ACTIVE_TGS }, payload, 4U + (tgidList.size() * 5U), streamId, true); } /* Helper to send the list of deactivated TGIDs to the specified peer. */ void FNENetwork::writeDeactiveTGIDs(uint32_t peerId, uint32_t streamId) { if (!m_tidLookup->sendTalkgroups()) { return; } std::vector> tgidList; auto groupVoice = m_tidLookup->groupVoice(); for (auto entry : groupVoice) { std::vector inclusion = entry.config().inclusion(); std::vector exclusion = entry.config().exclusion(); // peer inclusion lists take priority over exclusion lists if (inclusion.size() > 0) { auto it = std::find(inclusion.begin(), inclusion.end(), peerId); if (it == inclusion.end()) { // LogDebug(LOG_NET, "PEER %u TGID %u TS %u -- not included peer", peerId, entry.source().tgId(), entry.source().tgSlot()); continue; } } else { if (exclusion.size() > 0) { auto it = std::find(exclusion.begin(), exclusion.end(), peerId); if (it != exclusion.end()) { // LogDebug(LOG_NET, "PEER %u TGID %u TS %u -- excluded peer", peerId, entry.source().tgId(), entry.source().tgSlot()); continue; } } } if (!entry.config().active()) { tgidList.push_back({ entry.source().tgId(), entry.source().tgSlot() }); } } // build dataset UInt8Array __payload = std::make_unique(4U + (tgidList.size() * 5U)); uint8_t* payload = __payload.get(); ::memset(payload, 0x00U, 4U + (tgidList.size() * 5U)); __SET_UINT32(tgidList.size(), payload, 0U); // write talkgroup IDs to deactive TGID payload uint32_t offs = 4U; for (std::pair tg : tgidList) { if (m_debug) { std::string peerIdentity = resolvePeerIdentity(peerId); LogDebug(LOG_NET, "PEER %u (%s) deactivating TGID %u TS %u", peerId, peerIdentity.c_str(), tg.first, tg.second); } __SET_UINT32(tg.first, payload, offs); payload[offs + 4U] = tg.second; offs += 5U; } writePeerCommand(peerId, { NET_FUNC::MASTER, NET_SUBFUNC::MASTER_SUBFUNC_DEACTIVE_TGS }, payload, 4U + (tgidList.size() * 5U), streamId, true); } /* Helper to send the list of peers to the specified peer. */ void FNENetwork::writePeerList(uint32_t peerId, uint32_t streamId) { uint64_t now = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); // sending PEER_LINK style RID list to external peers FNEPeerConnection* connection = m_peers[peerId]; if (connection != nullptr) { std::string filename = m_peerListLookup->filename(); if (filename.empty()) { return; } // read entire file into string buffer std::stringstream b; std::ifstream stream(filename); if (stream.is_open()) { while (stream.peek() != EOF) { b << (char)stream.get(); } stream.close(); } // convert to a byte array uint32_t len = b.str().size(); UInt8Array __buffer = std::make_unique(len); uint8_t* buffer = __buffer.get(); ::memset(buffer, 0x00U, len); ::memcpy(buffer, b.str().data(), len); // compression structures z_stream strm; strm.zalloc = Z_NULL; strm.zfree = Z_NULL; strm.opaque = Z_NULL; // initialize compression if (deflateInit(&strm, Z_DEFAULT_COMPRESSION) != Z_OK) { LogError(LOG_NET, "PEER %u (%s) error initializing ZLIB", peerId, connection->identity().c_str()); return; } // set input data strm.avail_in = len; strm.next_in = buffer; // compress data std::vector compressedData; int ret; do { // resize the output buffer as needed compressedData.resize(compressedData.size() + 16384); strm.avail_out = 16384; strm.next_out = compressedData.data() + compressedData.size() - 16384; ret = deflate(&strm, Z_FINISH); if (ret == Z_STREAM_ERROR) { LogError(LOG_NET, "PEER %u (%s) error compressing TGID list", peerId, connection->identity().c_str()); deflateEnd(&strm); return; } } while (ret != Z_STREAM_END); // resize the output buffer to the actual compressed data size compressedData.resize(strm.total_out); // cleanup deflateEnd(&strm); uint32_t compressedLen = strm.total_out; uint8_t* compressed = compressedData.data(); // Utils::dump(1U, "Compressed Payload", compressed, compressedLen); // transmit TGIDs uint8_t blockCnt = (compressedLen / PEER_LINK_BLOCK_SIZE) + (compressedLen % PEER_LINK_BLOCK_SIZE ? 1U : 0U); uint32_t offs = 0U; for (uint8_t i = 0U; i < blockCnt; i++) { // build dataset uint16_t bufSize = 10U + (PEER_LINK_BLOCK_SIZE); UInt8Array __payload = std::make_unique(bufSize); uint8_t* payload = __payload.get(); ::memset(payload, 0x00U, bufSize); if (i == 0U) { __SET_UINT32(len, payload, 0U); __SET_UINT32(compressedLen, payload, 4U); } payload[8U] = i; payload[9U] = blockCnt - 1U; uint32_t blockSize = PEER_LINK_BLOCK_SIZE; if (offs + PEER_LINK_BLOCK_SIZE > compressedLen) blockSize = PEER_LINK_BLOCK_SIZE - ((offs + PEER_LINK_BLOCK_SIZE) - compressedLen); ::memcpy(payload + 10U, compressed + offs, blockSize); if (m_debug) Utils::dump(1U, "Peer-Link Peer List Block Payload", payload, bufSize); offs += PEER_LINK_BLOCK_SIZE; writePeer(peerId, { NET_FUNC::PEER_LINK, NET_SUBFUNC::PL_PEER_LIST }, payload, bufSize, 0U, streamId, false, true, true); } connection->lastPing(now); } return; } /* Helper to send a data message to the specified peer with a explicit packet sequence. */ bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length, uint16_t pktSeq, uint32_t streamId, bool queueOnly, bool incPktSeq, bool directWrite) const { if (streamId == 0U) { LogError(LOG_NET, "BUGBUG: PEER %u, trying to send data with a streamId of 0?", peerId); } auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; }); if (it != m_peers.end()) { FNEPeerConnection* connection = m_peers.at(peerId); if (connection != nullptr) { sockaddr_storage addr = connection->socketStorage(); uint32_t addrLen = connection->sockStorageLen(); if (incPktSeq) { pktSeq = connection->incStreamPktSeq(streamId, pktSeq); } if (directWrite) return m_frameQueue->write(data, length, streamId, peerId, m_peerId, opcode, pktSeq, addr, addrLen); else { m_frameQueue->enqueueMessage(data, length, streamId, peerId, m_peerId, opcode, pktSeq, addr, addrLen); if (queueOnly) return true; return m_frameQueue->flushQueue(); } } } return false; } /* Helper to send a command message to the specified peer. */ bool FNENetwork::writePeerCommand(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length, uint32_t streamId, bool incPktSeq) const { assert(peerId > 0); uint8_t buffer[DATA_PACKET_LENGTH]; ::memset(buffer, 0x00U, DATA_PACKET_LENGTH); if (data != nullptr && length > 0U) { ::memcpy(buffer + 6U, data, length); } uint32_t len = length + 6U; return writePeer(peerId, opcode, buffer, len, RTP_END_OF_CALL_SEQ, streamId, false, incPktSeq, true); } /* Helper to send a ACK response to the specified peer. */ bool FNENetwork::writePeerACK(uint32_t peerId, uint32_t streamId, const uint8_t* data, uint32_t length) { uint8_t buffer[DATA_PACKET_LENGTH]; ::memset(buffer, 0x00U, DATA_PACKET_LENGTH); __SET_UINT32(peerId, buffer, 0U); // Peer ID if (data != nullptr && length > 0U) { ::memcpy(buffer + 6U, data, length); } return writePeer(peerId, { NET_FUNC::ACK, NET_SUBFUNC::NOP }, buffer, length + 10U, RTP_END_OF_CALL_SEQ, streamId, false); } /* Helper to log a warning specifying which NAK reason is being sent a peer. */ void FNENetwork::logPeerNAKReason(uint32_t peerId, const char* tag, NET_CONN_NAK_REASON reason) { switch (reason) { case NET_CONN_NAK_MODE_NOT_ENABLED: LogWarning(LOG_NET, "PEER %u NAK %s, reason = %u; digital mode not enabled on FNE", peerId, tag, (uint16_t)reason); break; case NET_CONN_NAK_ILLEGAL_PACKET: LogWarning(LOG_NET, "PEER %u NAK %s, reason = %u; illegal/unknown packet", peerId ,tag, (uint16_t)reason); break; case NET_CONN_NAK_FNE_UNAUTHORIZED: LogWarning(LOG_NET, "PEER %u NAK %s, reason = %u; unauthorized", peerId, tag, (uint16_t)reason); break; case NET_CONN_NAK_BAD_CONN_STATE: LogWarning(LOG_NET, "PEER %u NAK %s, reason = %u; bad connection state", peerId ,tag, (uint16_t)reason); break; case NET_CONN_NAK_INVALID_CONFIG_DATA: LogWarning(LOG_NET, "PEER %u NAK %s, reason = %u; invalid configuration data", peerId, tag, (uint16_t)reason); break; case NET_CONN_NAK_FNE_MAX_CONN: LogWarning(LOG_NET, "PEER %u NAK %s, reason = %u; FNE has reached maximum permitted connections", peerId, tag, (uint16_t)reason); break; case NET_CONN_NAK_PEER_RESET: LogWarning(LOG_NET, "PEER %u NAK %s, reason = %u; FNE demanded connection reset", peerId, tag, (uint16_t)reason); break; case NET_CONN_NAK_PEER_ACL: LogWarning(LOG_NET, "PEER %u NAK %s, reason = %u; ACL rejection", peerId, tag, (uint16_t)reason); break; case NET_CONN_NAK_GENERAL_FAILURE: default: LogWarning(LOG_NET, "PEER %u NAK %s, reason = %u; general failure", peerId, tag, (uint16_t)reason); break; } } /* Helper to send a NAK response to the specified peer. */ bool FNENetwork::writePeerNAK(uint32_t peerId, uint32_t streamId, const char* tag, NET_CONN_NAK_REASON reason) { assert(peerId > 0); assert(tag != nullptr); uint8_t buffer[DATA_PACKET_LENGTH]; ::memset(buffer, 0x00U, DATA_PACKET_LENGTH); __SET_UINT32(peerId, buffer, 6U); // Peer ID __SET_UINT16B((uint16_t)reason, buffer, 10U); // Reason logPeerNAKReason(peerId, tag, reason); return writePeer(peerId, { NET_FUNC::NAK, NET_SUBFUNC::NOP }, buffer, 10U, RTP_END_OF_CALL_SEQ, streamId, false); } /* Helper to send a NAK response to the specified peer. */ bool FNENetwork::writePeerNAK(uint32_t peerId, const char* tag, NET_CONN_NAK_REASON reason, sockaddr_storage& addr, uint32_t addrLen) { assert(peerId > 0); assert(tag != nullptr); uint8_t buffer[DATA_PACKET_LENGTH]; ::memset(buffer, 0x00U, DATA_PACKET_LENGTH); __SET_UINT32(peerId, buffer, 6U); // Peer ID __SET_UINT16B((uint16_t)reason, buffer, 10U); // Reason logPeerNAKReason(peerId, tag, reason); LogWarning(LOG_NET, "PEER %u NAK %s -> %s:%u", peerId, tag, udp::Socket::address(addr).c_str(), udp::Socket::port(addr)); return m_frameQueue->write(buffer, 12U, createStreamId(), peerId, m_peerId, { NET_FUNC::NAK, NET_SUBFUNC::NOP }, 0U, addr, addrLen); }