From eb57f1b5d8154a4f2fdaead651cea74a18712663 Mon Sep 17 00:00:00 2001 From: Bryan Biedenkapp Date: Mon, 5 Feb 2024 10:46:47 -0500 Subject: [PATCH] implement functionality to block traffic to specific peers from specific external peers; use std::string over const char* wherever possible to limit pointer passing; implement support in FrameQueue and RawFrameQueue to bypass the queue and directly send messages to specified peers (useful in some situations for packets to be immediately dispatched vs queuing and batching); chunk RID list transmissions to aid in increasing performance (this will likely also need to be done with the TGID lists in the future as well); refactor transmitting ACL lists to peers into its own spun off thread so we don't block the main network processing loop (and infact, this requires documentation; since at a certain peer size DVM network partitioning should be considered to reduce the number of concurrent peers serviced by any given FNE and to better load balance connections across a network, instead of relying on a singular central FNE); implement /peer/count REST API to simply return the known connected count of peers; disable the forced ACL list update functionality (for now); --- configs/fne-config.example.yml | 3 + src/common/RingBuffer.h | 15 +- src/common/lookups/AffiliationLookup.cpp | 27 +-- src/common/lookups/AffiliationLookup.h | 4 +- src/common/network/FrameQueue.cpp | 104 +++++++-- src/common/network/FrameQueue.h | 11 +- src/common/network/RawFrameQueue.cpp | 31 ++- src/common/network/RawFrameQueue.h | 4 +- src/common/network/udp/Socket.cpp | 18 +- src/fne/HostFNE.cpp | 13 ++ src/fne/network/FNENetwork.cpp | 280 +++++++++++++---------- src/fne/network/FNENetwork.h | 55 +++-- src/fne/network/PeerNetwork.cpp | 20 +- src/fne/network/PeerNetwork.h | 10 + src/fne/network/RESTAPI.cpp | 40 +++- src/fne/network/RESTAPI.h | 2 + src/fne/network/RESTDefines.h | 1 + src/fne/network/fne/TagDMRData.cpp | 5 + src/fne/network/fne/TagNXDNData.cpp | 5 + src/fne/network/fne/TagP25Data.cpp | 5 + src/host/network/Network.cpp | 2 + src/host/p25/packet/ControlSignaling.cpp | 3 +- src/remote/RESTClientMain.cpp | 5 + 23 files changed, 465 insertions(+), 198 deletions(-) diff --git a/configs/fne-config.example.yml b/configs/fne-config.example.yml index b2c149e4..f24f5936 100644 --- a/configs/fne-config.example.yml +++ b/configs/fne-config.example.yml @@ -102,6 +102,9 @@ peers: # Network Peer ID peerId: 9000990 + # List of peer IDs to block sending traffic to from this peer. + blockTrafficTo: [] + # Flag indicating whether or not peer endpoint networking is encrypted. encrypted: false # AES-256 32-byte Preshared Key diff --git a/src/common/RingBuffer.h b/src/common/RingBuffer.h index 5f7ba59e..2dbf54da 100644 --- a/src/common/RingBuffer.h +++ b/src/common/RingBuffer.h @@ -32,7 +32,7 @@ public: /// Initializes a new instance of the RingBuffer class. /// Length of ring buffer. /// Name of buffer. - RingBuffer(uint32_t length, const char* name) : + RingBuffer(uint32_t length, const std::string name) : m_length(length), m_name(name), m_buffer(nullptr), @@ -40,7 +40,6 @@ public: m_oPtr(0U) { assert(length > 0U); - assert(name != nullptr); m_buffer = new T[length]; @@ -60,7 +59,7 @@ public: bool addData(const T* buffer, uint32_t length) { if (length > freeSpace()) { - LogError(LOG_HOST, "**** Overflow in %s ring buffer, %u > %u, clearing the buffer", m_name, length, freeSpace()); + LogError(LOG_HOST, "**** Overflow in %s ring buffer, %u > %u, clearing the buffer", m_name.c_str(), length, freeSpace()); clear(); return false; } @@ -74,7 +73,7 @@ public: m_iPtr = 0U; } #if DEBUG_RINGBUFFER - LogDebug(LOG_HOST, "RingBuffer::addData(%s): iPtr_Before = %u, iPtr_After = %u, oPtr = %u, len = %u, len_Written = %u", m_name, iPtr_BeforeWrite, m_iPtr, m_oPtr, m_length, (m_iPtr - iPtr_BeforeWrite)); + LogDebug(LOG_HOST, "RingBuffer::addData(%s): iPtr_Before = %u, iPtr_After = %u, oPtr = %u, len = %u, len_Written = %u", m_name.c_str(), iPtr_BeforeWrite, m_iPtr, m_oPtr, m_length, (m_iPtr - iPtr_BeforeWrite)); #endif return true; } @@ -86,7 +85,7 @@ public: bool get(T* buffer, uint32_t length) { if (dataSize() < length) { - LogError(LOG_HOST, "**** Underflow get in %s ring buffer, %u < %u", m_name, dataSize(), length); + LogError(LOG_HOST, "**** Underflow get in %s ring buffer, %u < %u", m_name.c_str(), dataSize(), length); return false; } #if DEBUG_RINGBUFFER @@ -99,7 +98,7 @@ public: m_oPtr = 0U; } #if DEBUG_RINGBUFFER - LogDebug(LOG_HOST, "RingBuffer::getData(%s): iPtr = %u, oPtr_Before = %u, oPtr_After = %u, len = %u, len_Read = %u", m_name, m_iPtr, oPtr_BeforeRead, m_oPtr, m_length, (m_oPtr - oPtr_BeforeRead)); + LogDebug(LOG_HOST, "RingBuffer::getData(%s): iPtr = %u, oPtr_Before = %u, oPtr_After = %u, len = %u, len_Read = %u", m_name.c_str(), m_iPtr, oPtr_BeforeRead, m_oPtr, m_length, (m_oPtr - oPtr_BeforeRead)); #endif return true; } @@ -111,7 +110,7 @@ public: bool peek(T* buffer, uint32_t length) { if (dataSize() < length) { - LogError(LOG_HOST, "**** Underflow peek in %s ring buffer, %u < %u", m_name, dataSize(), length); + LogError(LOG_HOST, "**** Underflow peek in %s ring buffer, %u < %u", m_name.c_str(), dataSize(), length); return false; } @@ -204,7 +203,7 @@ public: private: uint32_t m_length; - const char* m_name; + const std::string m_name; T* m_buffer; diff --git a/src/common/lookups/AffiliationLookup.cpp b/src/common/lookups/AffiliationLookup.cpp index 972c639d..d422b3e3 100644 --- a/src/common/lookups/AffiliationLookup.cpp +++ b/src/common/lookups/AffiliationLookup.cpp @@ -24,7 +24,7 @@ using namespace lookups; /// /// Name of lookup table. /// Flag indicating whether verbose logging is enabled. -AffiliationLookup::AffiliationLookup(const char* name, bool verbose) : +AffiliationLookup::AffiliationLookup(const std::string name, bool verbose) : m_rfChTable(), m_rfChDataTable(), m_rfGrantChCnt(0U), @@ -52,10 +52,7 @@ AffiliationLookup::AffiliationLookup(const char* name, bool verbose) : /// /// Finalizes a instance of the AffiliationLookup class. /// -AffiliationLookup::~AffiliationLookup() -{ - /* stub */ -} +AffiliationLookup::~AffiliationLookup() = default; /// /// Helper to group affiliate a source ID. @@ -71,7 +68,7 @@ void AffiliationLookup::unitReg(uint32_t srcId) if (m_verbose) { LogMessage(LOG_HOST, "%s, unit registration, srcId = %u", - m_name, srcId); + m_name.c_str(), srcId); } } @@ -89,7 +86,7 @@ bool AffiliationLookup::unitDereg(uint32_t srcId) if (m_verbose) { LogMessage(LOG_HOST, "%s, unit deregistration, srcId = %u", - m_name, srcId); + m_name.c_str(), srcId); } groupUnaff(srcId); @@ -126,7 +123,7 @@ bool AffiliationLookup::isUnitReg(uint32_t srcId) const void AffiliationLookup::clearUnitReg() { std::vector srcToRel = std::vector(); - LogWarning(LOG_HOST, "%s, releasing all unit registrations", m_name); + LogWarning(LOG_HOST, "%s, releasing all unit registrations", m_name.c_str()); m_unitRegTable.clear(); } @@ -143,7 +140,7 @@ void AffiliationLookup::groupAff(uint32_t srcId, uint32_t dstId) if (m_verbose) { LogMessage(LOG_HOST, "%s, group affiliation, srcId = %u, dstId = %u", - m_name, srcId, dstId); + m_name.c_str(), srcId, dstId); } } } @@ -160,7 +157,7 @@ bool AffiliationLookup::groupUnaff(uint32_t srcId) uint32_t tblDstId = m_grpAffTable.at(srcId); if (m_verbose) { LogMessage(LOG_HOST, "%s, group unaffiliation, srcId = %u, dstId = %u", - m_name, srcId, tblDstId); + m_name.c_str(), srcId, tblDstId); } } else { return false; @@ -230,14 +227,14 @@ std::vector AffiliationLookup::clearGroupAff(uint32_t dstId, bool rele } if (dstId == 0U && releaseAll) { - LogWarning(LOG_HOST, "%s, releasing all group affiliations", m_name); + LogWarning(LOG_HOST, "%s, releasing all group affiliations", m_name.c_str()); for (auto entry : m_grpAffTable) { uint32_t srcId = entry.first; srcToRel.push_back(srcId); } } else { - LogWarning(LOG_HOST, "%s, releasing group affiliations, dstId = %u", m_name, dstId); + LogWarning(LOG_HOST, "%s, releasing group affiliations, dstId = %u", m_name.c_str(), dstId); for (auto entry : m_grpAffTable) { uint32_t srcId = entry.first; uint32_t grpId = entry.second; @@ -289,7 +286,7 @@ bool AffiliationLookup::grantCh(uint32_t dstId, uint32_t srcId, uint32_t grantTi if (m_verbose) { LogMessage(LOG_HOST, "%s, granting channel, chNo = %u, dstId = %u, srcId = %u, group = %u", - m_name, chNo, dstId, srcId, grp); + m_name.c_str(), chNo, dstId, srcId, grp); } return true; @@ -324,7 +321,7 @@ bool AffiliationLookup::releaseGrant(uint32_t dstId, bool releaseAll) // are we trying to release all grants? if (dstId == 0U && releaseAll) { - LogWarning(LOG_HOST, "%s, force releasing all channel grants", m_name); + LogWarning(LOG_HOST, "%s, force releasing all channel grants", m_name.c_str()); std::vector gntsToRel = std::vector(); for (auto entry : m_grantChTable) { @@ -345,7 +342,7 @@ bool AffiliationLookup::releaseGrant(uint32_t dstId, bool releaseAll) if (m_verbose) { LogMessage(LOG_HOST, "%s, releasing channel grant, chNo = %u, dstId = %u", - m_name, chNo, dstId); + m_name.c_str(), chNo, dstId); } if (m_releaseGrant != nullptr) { diff --git a/src/common/lookups/AffiliationLookup.h b/src/common/lookups/AffiliationLookup.h index e37f0c3f..f52da52f 100644 --- a/src/common/lookups/AffiliationLookup.h +++ b/src/common/lookups/AffiliationLookup.h @@ -100,7 +100,7 @@ namespace lookups class HOST_SW_API AffiliationLookup { public: /// Initializes a new instance of the AffiliationLookup class. - AffiliationLookup(const char* name, bool verbose); + AffiliationLookup(const std::string name, bool verbose); /// Finalizes a instance of the AffiliationLookup class. virtual ~AffiliationLookup(); @@ -196,7 +196,7 @@ namespace lookups // chNo dstId slot std::function m_releaseGrant; - const char *m_name; + const std::string m_name; bool m_verbose; }; diff --git a/src/common/network/FrameQueue.cpp b/src/common/network/FrameQueue.cpp index 9fc78782..d68e46e7 100644 --- a/src/common/network/FrameQueue.cpp +++ b/src/common/network/FrameQueue.cpp @@ -131,7 +131,39 @@ UInt8Array FrameQueue::read(int& messageLength, sockaddr_storage& address, uint3 } /// -/// Cache "message" to frame queue. +/// Write message to the UDP socket. +/// +/// Message buffer to frame and queue. +/// Length of message. +/// Message stream ID. +/// Peer ID. +/// RTP SSRC ID. +/// Opcode. +/// RTP Sequence. +/// IP address to write data to. +/// +/// +bool FrameQueue::write(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId, + uint32_t ssrc, OpcodePair opcode, uint16_t rtpSeq, sockaddr_storage& addr, uint32_t addrLen) +{ + assert(message != nullptr); + assert(length > 0U); + + uint32_t bufferLen = 0U; + uint8_t* buffer = generateMessage(message, length, streamId, peerId, ssrc, opcode, rtpSeq, &bufferLen); + + bool ret = true; + if (!m_socket->write(buffer, bufferLen, addr, addrLen)) { + LogError(LOG_NET, "Failed writing data to the network"); + ret = false; + } + + delete buffer; + return ret; +} + +/// +/// Cache message to frame queue. /// /// Message buffer to frame and queue. /// Length of message. @@ -149,7 +181,7 @@ void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_ } /// -/// Cache "message" to frame queue. +/// Cache message to frame queue. /// /// Message buffer to frame and queue. /// Length of message. @@ -167,6 +199,48 @@ void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_ assert(message != nullptr); assert(length > 0U); + uint32_t bufferLen = 0U; + uint8_t* buffer = generateMessage(message, length, streamId, peerId, ssrc, opcode, rtpSeq, &bufferLen); + + udp::UDPDatagram *dgram = new udp::UDPDatagram; + dgram->buffer = buffer; + dgram->length = bufferLen; + dgram->address = addr; + dgram->addrLen = addrLen; + + m_buffers.push_back(dgram); +} + +/// +/// Helper method to clear any tracked stream timestamps. +/// +void FrameQueue::clearTimestamps() +{ + m_streamTimestamps.clear(); +} + +// --------------------------------------------------------------------------- +// Private Class Members +// --------------------------------------------------------------------------- + +/// +/// Generate RTP message for the frame queue. +/// +/// Message buffer to frame and queue. +/// Length of message. +/// Message stream ID. +/// Peer ID. +/// RTP SSRC ID. +/// Opcode. +/// RTP Sequence. +/// +/// +uint8_t* FrameQueue::generateMessage(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId, + uint32_t ssrc, OpcodePair opcode, uint16_t rtpSeq, uint32_t* outBufferLen) +{ + assert(message != nullptr); + assert(length > 0U); + uint32_t timestamp = INVALID_TS; if (streamId != 0U) { auto entry = m_streamTimestamps.find(streamId); @@ -177,7 +251,7 @@ void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_ if (timestamp != INVALID_TS) { timestamp += (RTP_GENERIC_CLOCK_RATE / 133); if (m_debug) - LogDebug(LOG_NET, "FrameQueue::enqueueMessage() RTP streamId = %u, previous TS = %u, TS = %u, rtpSeq = %u", streamId, m_streamTimestamps[streamId], timestamp, rtpSeq); + LogDebug(LOG_NET, "FrameQueue::generateMessage() RTP streamId = %u, previous TS = %u, TS = %u, rtpSeq = %u", streamId, m_streamTimestamps[streamId], timestamp, rtpSeq); m_streamTimestamps[streamId] = timestamp; } } @@ -198,7 +272,7 @@ void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_ if (streamId != 0U && timestamp == INVALID_TS && rtpSeq != RTP_END_OF_CALL_SEQ) { if (m_debug) - LogDebug(LOG_NET, "FrameQueue::enqueueMessage() RTP streamId = %u, initial TS = %u, rtpSeq = %u", streamId, header.getTimestamp(), rtpSeq); + LogDebug(LOG_NET, "FrameQueue::generateMessage() RTP streamId = %u, initial TS = %u, rtpSeq = %u", streamId, header.getTimestamp(), rtpSeq); m_streamTimestamps[streamId] = header.getTimestamp(); } @@ -206,7 +280,7 @@ void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_ auto entry = m_streamTimestamps.find(streamId); if (entry != m_streamTimestamps.end()) { if (m_debug) - LogDebug(LOG_NET, "FrameQueue::enqueueMessage() RTP streamId = %u, rtpSeq = %u", streamId, rtpSeq); + LogDebug(LOG_NET, "FrameQueue::generateMessage() RTP streamId = %u, rtpSeq = %u", streamId, rtpSeq); m_streamTimestamps.erase(streamId); } } @@ -225,21 +299,11 @@ void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_ ::memcpy(buffer + RTP_HEADER_LENGTH_BYTES + RTP_EXTENSION_HEADER_LENGTH_BYTES + RTP_FNE_HEADER_LENGTH_BYTES, message, length); if (m_debug) - Utils::dump(1U, "FrameQueue::enqueueMessage() Buffered Message", buffer, bufferLen); - - udp::UDPDatagram *dgram = new udp::UDPDatagram; - dgram->buffer = buffer; - dgram->length = bufferLen; - dgram->address = addr; - dgram->addrLen = addrLen; + Utils::dump(1U, "FrameQueue::generateMessage() Buffered Message", buffer, bufferLen); - m_buffers.push_back(dgram); -} + if (outBufferLen != nullptr) { + *outBufferLen = bufferLen; + } -/// -/// Helper method to clear any tracked stream timestamps. -/// -void FrameQueue::clearTimestamps() -{ - m_streamTimestamps.clear(); + return buffer; } diff --git a/src/common/network/FrameQueue.h b/src/common/network/FrameQueue.h index ea7fa34c..074a5e58 100644 --- a/src/common/network/FrameQueue.h +++ b/src/common/network/FrameQueue.h @@ -46,11 +46,14 @@ namespace network /// Read message from the received UDP packet. UInt8Array read(int& messageLength, sockaddr_storage& address, uint32_t& addrLen, frame::RTPHeader* rtpHeader = nullptr, frame::RTPFNEHeader* fneHeader = nullptr); + /// Write message to the UDP socket. + bool write(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId, + uint32_t ssrc, OpcodePair opcode, uint16_t rtpSeq, sockaddr_storage& addr, uint32_t addrLen); - /// Cache "message" to frame queue. + /// Cache message to frame queue. void enqueueMessage(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId, OpcodePair opcode, uint16_t rtpSeq, sockaddr_storage& addr, uint32_t addrLen); - /// Cache "message" to frame queue. + /// Cache message to frame queue. void enqueueMessage(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId, uint32_t ssrc, OpcodePair opcode, uint16_t rtpSeq, sockaddr_storage& addr, uint32_t addrLen); @@ -60,6 +63,10 @@ namespace network private: uint32_t m_peerId; std::unordered_map m_streamTimestamps; + + /// Generate RTP message for the frame queue. + uint8_t* generateMessage(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId, + uint32_t ssrc, OpcodePair opcode, uint16_t rtpSeq, uint32_t* outBufferLen); }; } // namespace network diff --git a/src/common/network/RawFrameQueue.cpp b/src/common/network/RawFrameQueue.cpp index 5757d25b..b31b2889 100644 --- a/src/common/network/RawFrameQueue.cpp +++ b/src/common/network/RawFrameQueue.cpp @@ -82,7 +82,36 @@ UInt8Array RawFrameQueue::read(int& messageLength, sockaddr_storage& address, ui } /// -/// Cache "message" to frame queue. +/// Write message to the UDP socket. +/// +/// Message buffer to frame and queue. +/// Length of message. +/// IP address to write data to. +/// +/// +bool RawFrameQueue::write(const uint8_t* message, uint32_t length, sockaddr_storage& addr, uint32_t addrLen) +{ + assert(message != nullptr); + assert(length > 0U); + + uint8_t* buffer = new uint8_t[length]; + ::memset(buffer, 0x00U, length); + ::memcpy(buffer, message, length); + + if (m_debug) + Utils::dump(1U, "RawFrameQueue::write() Message", buffer, length); + + bool ret = true; + if (!m_socket->write(buffer, length, addr, addrLen)) { + LogError(LOG_NET, "Failed writing data to the network"); + ret = false; + } + + return ret; +} + +/// +/// Cache message to frame queue. /// /// Message buffer to frame and queue. /// Length of message. diff --git a/src/common/network/RawFrameQueue.h b/src/common/network/RawFrameQueue.h index 093d74eb..ae2ab1d6 100644 --- a/src/common/network/RawFrameQueue.h +++ b/src/common/network/RawFrameQueue.h @@ -43,8 +43,10 @@ namespace network /// Read message from the received UDP packet. UInt8Array read(int& messageLength, sockaddr_storage& address, uint32_t& addrLen); + /// Write message to the UDP socket. + bool write(const uint8_t* message, uint32_t length, sockaddr_storage& addr, uint32_t addrLen); - /// Cache "message" to frame queue. + /// Cache message to frame queue. void enqueueMessage(const uint8_t* message, uint32_t length, sockaddr_storage& addr, uint32_t addrLen); /// Flush the message queue. diff --git a/src/common/network/udp/Socket.cpp b/src/common/network/udp/Socket.cpp index 1c802ffd..3f312c8b 100644 --- a/src/common/network/udp/Socket.cpp +++ b/src/common/network/udp/Socket.cpp @@ -232,10 +232,23 @@ ssize_t Socket::read(uint8_t* buffer, uint32_t length, sockaddr_storage& address uint16_t magic = __GET_UINT16B(buffer, 0U); if (magic == AES_WRAPPED_PCKT_MAGIC) { uint32_t cryptedLen = (len - 2U) * sizeof(uint8_t); - // Utils::dump(1U, "Socket::read() crypted", buffer + 2U, cryptedLen); + uint8_t* cryptoBuffer = buffer + 2U; + + // do we need to pad the original buffer to be block aligned? + if (cryptedLen % crypto::AES::BLOCK_BYTES_LEN != 0) { + uint32_t alignment = crypto::AES::BLOCK_BYTES_LEN - (cryptedLen % crypto::AES::BLOCK_BYTES_LEN); + cryptedLen += alignment; + + // reallocate buffer and copy + cryptoBuffer = new uint8_t[cryptedLen]; + ::memset(cryptoBuffer, 0x00U, cryptedLen); + ::memcpy(cryptoBuffer, buffer + 2U, len - 2U); + } + + // Utils::dump(1U, "Socket::read() crypted", cryptoBuffer, cryptedLen); // decrypt - uint8_t* decrypted = m_aes->decryptECB(buffer + 2U, cryptedLen, m_presharedKey); + uint8_t* decrypted = m_aes->decryptECB(cryptoBuffer, cryptedLen, m_presharedKey); // Utils::dump(1U, "Socket::read() decrypted", decrypted, cryptedLen); @@ -320,6 +333,7 @@ bool Socket::write(const uint8_t* buffer, uint32_t length, const sockaddr_storag ::memcpy(out.get() + 2U, crypted, cryptedLen); __SET_UINT16B(AES_WRAPPED_PCKT_MAGIC, out.get(), 0U); delete[] crypted; + length = cryptedLen + 2U; } else { if (lenWritten != nullptr) { *lenWritten = -1; diff --git a/src/fne/HostFNE.cpp b/src/fne/HostFNE.cpp index 54291bce..40b171b2 100644 --- a/src/fne/HostFNE.cpp +++ b/src/fne/HostFNE.cpp @@ -542,6 +542,19 @@ bool HostFNE::createPeerNetworks() network->setPresharedKey(presharedKey); } + /* + ** Block Traffic To Peers + */ + yaml::Node& blockTrafficTo = peerConf["blockTrafficTo"]; + if (blockTrafficTo.size() > 0U) { + for (size_t i = 0; i < blockTrafficTo.size(); i++) { + uint32_t peerId = (uint32_t)::strtoul(blockTrafficTo[i].as("0").c_str(), NULL, 10); + if (peerId != 0U) { + network->addBlockedTrafficPeer(peerId); + } + } + } + network->enable(enabled); if (enabled) { bool ret = network->open(); diff --git a/src/fne/network/FNENetwork.cpp b/src/fne/network/FNENetwork.cpp index aea03db3..1d617444 100644 --- a/src/fne/network/FNENetwork.cpp +++ b/src/fne/network/FNENetwork.cpp @@ -14,6 +14,7 @@ #include "common/edac/SHA256.h" #include "common/network/json/json.h" #include "common/Log.h" +#include "common/ThreadFunc.h" #include "common/Utils.h" #include "network/FNENetwork.h" #include "network/fne/TagDMRData.h" @@ -28,6 +29,13 @@ using namespace network::fne; #include #include +// --------------------------------------------------------------------------- +// Constants +// --------------------------------------------------------------------------- + +const uint8_t MAX_PEER_LIST_BEFORE_FLUSH = 10U; +const uint32_t MAX_RID_LIST_CHUNK = 50U; + // --------------------------------------------------------------------------- // Public Class Members // --------------------------------------------------------------------------- @@ -74,8 +82,7 @@ FNENetwork::FNENetwork(HostFNE* host, const std::string& address, uint16_t port, m_peers(), m_peerAffiliations(), m_maintainenceTimer(1000U, pingTime), - m_updateLookupTimer(1000U, (updateLookupTime * 60U)), - m_forceListUpdate(false), + m_updateLookupTime(updateLookupTime * 60U), m_callInProgress(false), m_disallowP25AdjStsBcast(true), m_reportPeerPing(reportPeerPing), @@ -184,20 +191,6 @@ void FNENetwork::clock(uint32_t ms) m_maintainenceTimer.start(); } - m_updateLookupTimer.clock(ms); - if ((m_updateLookupTimer.isRunning() && m_updateLookupTimer.hasExpired()) || m_forceListUpdate) { - writeWhitelistRIDs(); - writeBlacklistRIDs(); - m_frameQueue->flushQueue(); - - writeTGIDs(); - writeDeactiveTGIDs(); - m_frameQueue->flushQueue(); - - m_updateLookupTimer.start(); - m_forceListUpdate = false; - } - sockaddr_storage address; uint32_t addrLen; frame::RTPHeader rtpHeader; @@ -456,20 +449,12 @@ void FNENetwork::clock(uint32_t ms) connection->connected(true); connection->pingsReceived(0U); connection->lastPing(now); + connection->lastACLUpdate(now); m_peers[peerId] = connection; writePeerACK(peerId); LogInfoEx(LOG_NET, "PEER %u RPTC ACK, completed the configuration exchange", peerId); - // queue final update messages and flush - writeWhitelistRIDs(peerId, true); - writeBlacklistRIDs(peerId, true); - m_frameQueue->flushQueue(); - - writeTGIDs(peerId, true); - writeDeactiveTGIDs(peerId, true); - m_frameQueue->flushQueue(); - json::object peerConfig = connection->config(); if (peerConfig["software"].is()) { std::string software = peerConfig["software"].get(); @@ -477,9 +462,12 @@ void FNENetwork::clock(uint32_t ms) } // setup the affiliations list for this peer - char *peerName = new char[16]; - ::sprintf(peerName, "PEER %u", peerId); - m_peerAffiliations[peerId] = new lookups::AffiliationLookup(peerName, m_verbose); + std::stringstream peerName; + peerName << "PEER " << peerId; + m_peerAffiliations[peerId] = new lookups::AffiliationLookup(peerName.str(), m_verbose); + + // spin up a thread and send ACL list over to peer + peerACLUpdate(peerId); } } } @@ -526,17 +514,26 @@ void FNENetwork::clock(uint32_t ms) // validate peer (simple validation really) if (connection->connected() && connection->address() == ip) { uint32_t pingsRx = connection->pingsReceived(); + uint64_t lastPing = connection->lastPing(); pingsRx++; connection->pingsReceived(pingsRx); connection->lastPing(now); connection->pktLastSeq(connection->pktLastSeq() + 1); + // does this peer need an ACL update? + uint64_t dt = connection->lastACLUpdate() + m_updateLookupTime; + if (dt < now) { + LogInfoEx(LOG_NET, "PEER %u updating ACL list, dt = %u, now = %u", peerId, dt, now); + peerACLUpdate(peerId); + connection->lastACLUpdate(now); + } + m_peers[peerId] = connection; writePeerCommand(peerId, { NET_FUNC_PONG, NET_SUBFUNC_NOP }); if (m_reportPeerPing) { - LogInfoEx(LOG_NET, "PEER %u ping received and answered, pingsReceived = %u", peerId, connection->pingsReceived()); + LogInfoEx(LOG_NET, "PEER %u ping, pingsReceived = %u, lastPing = %u", peerId, connection->pingsReceived(), lastPing); } } else { @@ -789,7 +786,6 @@ void FNENetwork::close() m_socket->close(); m_maintainenceTimer.stop(); - m_updateLookupTimer.stop(); m_status = NET_STAT_INVALID; } @@ -857,12 +853,51 @@ void FNENetwork::setupRepeaterLogin(uint32_t peerId, FNEPeerConnection* connecti LogInfoEx(LOG_NET, "PEER %u RPTL ACK, challenge response sent for login", peerId); } +/// +/// Helper to send the ACL lists to the specified peer in a separate thread. +/// +/// +void FNENetwork::peerACLUpdate(uint32_t peerId) +{ + ACLUpdateRequest* req = new ACLUpdateRequest(); + req->network = this; + req->peerId = peerId; + + std::stringstream peerName; + peerName << "peer" << peerId << ":acl-update"; + + ::pthread_create(&req->thread, NULL, threadedACLUpdate, req); + if (pthread_kill(req->thread, 0) == 0) { + ::pthread_setname_np(req->thread, peerName.str().c_str()); + } +} + +/// +/// Helper to send the ACL lists to the specified peer in a separate thread. +/// +/// +void* FNENetwork::threadedACLUpdate(void* arg) +{ + ACLUpdateRequest* req = (ACLUpdateRequest*)arg; + if (req != nullptr) { + LogInfoEx(LOG_NET, "PEER %u sending ACL list updates", req->peerId); + + req->network->writeWhitelistRIDs(req->peerId); + req->network->writeBlacklistRIDs(req->peerId); + req->network->writeTGIDs(req->peerId); + req->network->writeDeactiveTGIDs(req->peerId); + + delete req; + } + + return nullptr; +} + /// /// Helper to send the list of whitelisted RIDs to the specified peer. /// /// -/// -void FNENetwork::writeWhitelistRIDs(uint32_t peerId, bool queueOnly) +void FNENetwork::writeWhitelistRIDs(uint32_t peerId) { // send radio ID white/black lists std::vector ridWhitelist; @@ -879,37 +914,46 @@ void FNENetwork::writeWhitelistRIDs(uint32_t peerId, bool queueOnly) return; } - // build dataset - uint8_t payload[4U + (ridWhitelist.size() * 4U)]; - ::memset(payload, 0x00U, 4U + (ridWhitelist.size() * 4U)); + // send a chunk of RIDs to the peer + FNEPeerConnection* connection = m_peers[peerId]; + if (connection != nullptr) { + uint32_t chunkCnt = (ridWhitelist.size() / MAX_RID_LIST_CHUNK) + 1U; + for (uint32_t i = 0U; i < chunkCnt; i++) { + size_t listSize = ridWhitelist.size(); + if (chunkCnt > 1U) { + listSize = MAX_RID_LIST_CHUNK; + } - __SET_UINT32(ridWhitelist.size(), payload, 0U); + if (i == chunkCnt - 1U) { + listSize = (chunkCnt * MAX_RID_LIST_CHUNK) - ridWhitelist.size(); + } - // write whitelisted IDs to whitelist payload - uint32_t offs = 4U; - for (uint32_t id : ridWhitelist) { - if (m_debug) - LogDebug(LOG_NET, "PEER %u whitelisting RID %u", peerId, id); + if (listSize > ridWhitelist.size()) { + listSize = ridWhitelist.size(); + } - __SET_UINT32(id, payload, offs); - offs += 4U; - } + // build dataset + uint16_t bufSize = 4U + (listSize * 4U); + uint8_t payload[bufSize]; + ::memset(payload, 0x00U, bufSize); - writePeerCommand(peerId, { NET_FUNC_MASTER, NET_MASTER_SUBFUNC_WL_RID }, - payload, 4U + (ridWhitelist.size() * 4U), queueOnly, true); -} + __SET_UINT32(listSize, payload, 0U); -/// -/// Helper to send the list of whitelisted RIDs to connected peers. -/// -void FNENetwork::writeWhitelistRIDs() -{ - if (m_ridLookup->table().size() == 0U) { - return; - } + // write whitelisted IDs to whitelist payload + uint32_t offs = 4U; + for (uint32_t j = 0; j < listSize; j++) { + uint32_t id = ridWhitelist.at(j + (i * MAX_RID_LIST_CHUNK)); + + if (m_debug) + LogDebug(LOG_NET, "PEER %u whitelisting RID %u (%d / %d)", peerId, id, i, j); + + __SET_UINT32(id, payload, offs); + offs += 4U; + } - for (auto peer : m_peers) { - writeWhitelistRIDs(peer.first, true); + writePeerCommand(peerId, { NET_FUNC_MASTER, NET_MASTER_SUBFUNC_WL_RID }, + payload, bufSize, true); + } } } @@ -917,8 +961,7 @@ void FNENetwork::writeWhitelistRIDs() /// Helper to send the list of whitelisted RIDs to the specified peer. /// /// -/// -void FNENetwork::writeBlacklistRIDs(uint32_t peerId, bool queueOnly) +void FNENetwork::writeBlacklistRIDs(uint32_t peerId) { // send radio ID blacklist std::vector ridBlacklist; @@ -935,37 +978,46 @@ void FNENetwork::writeBlacklistRIDs(uint32_t peerId, bool queueOnly) return; } - // build dataset - uint8_t payload[4U + (ridBlacklist.size() * 4U)]; - ::memset(payload, 0x00U, 4U + (ridBlacklist.size() * 4U)); + // send a chunk of RIDs to the peer + FNEPeerConnection* connection = m_peers[peerId]; + if (connection != nullptr) { + uint32_t chunkCnt = (ridBlacklist.size() / MAX_RID_LIST_CHUNK) + 1U; + for (uint32_t i = 0U; i < chunkCnt; i++) { + size_t listSize = ridBlacklist.size(); + if (chunkCnt > 1U) { + listSize = MAX_RID_LIST_CHUNK; + } - __SET_UINT32(ridBlacklist.size(), payload, 0U); + if (i == chunkCnt - 1U) { + listSize = (chunkCnt * MAX_RID_LIST_CHUNK) - ridBlacklist.size(); + } - // write blacklisted IDs to blacklist payload - uint32_t offs = 4U; - for (uint32_t id : ridBlacklist) { - if (m_debug) - LogDebug(LOG_NET, "PEER %u blacklisting RID %u", peerId, id); + if (listSize > ridBlacklist.size()) { + listSize = ridBlacklist.size(); + } - __SET_UINT32(id, payload, offs); - offs += 4U; - } + // build dataset + uint16_t bufSize = 4U + (listSize * 4U); + uint8_t payload[bufSize]; + ::memset(payload, 0x00U, bufSize); - writePeerCommand(peerId, { NET_FUNC_MASTER, NET_MASTER_SUBFUNC_BL_RID }, - payload, 4U + (ridBlacklist.size() * 4U), queueOnly, true); -} + __SET_UINT32(listSize, payload, 0U); -/// -/// Helper to send the list of whitelisted RIDs to connected peers. -/// -void FNENetwork::writeBlacklistRIDs() -{ - if (m_ridLookup->table().size() == 0U) { - return; - } + // write whitelisted IDs to whitelist payload + uint32_t offs = 4U; + for (uint32_t j = 0; j < listSize; j++) { + uint32_t id = ridBlacklist.at(j + (i * MAX_RID_LIST_CHUNK)); - for (auto peer : m_peers) { - writeBlacklistRIDs(peer.first, true); + if (m_debug) + LogDebug(LOG_NET, "PEER %u blacklisting RID %u (%d / %d)", peerId, id, i, j); + + __SET_UINT32(id, payload, offs); + offs += 4U; + } + + writePeerCommand(peerId, { NET_FUNC_MASTER, NET_MASTER_SUBFUNC_BL_RID }, + payload, bufSize, true); + } } } @@ -973,8 +1025,7 @@ void FNENetwork::writeBlacklistRIDs() /// Helper to send the list of active TGIDs to the specified peer. /// /// -/// -void FNENetwork::writeTGIDs(uint32_t peerId, bool queueOnly) +void FNENetwork::writeTGIDs(uint32_t peerId) { if (!m_tidLookup->sendTalkgroups()) { return; @@ -1026,25 +1077,14 @@ void FNENetwork::writeTGIDs(uint32_t peerId, bool queueOnly) } writePeerCommand(peerId, { NET_FUNC_MASTER, NET_MASTER_SUBFUNC_ACTIVE_TGS }, - payload, 4U + (tgidList.size() * 5U), queueOnly, true); -} - -/// -/// Helper to send the list of active TGIDs to connected peers. -/// -void FNENetwork::writeTGIDs() -{ - for (auto peer : m_peers) { - writeTGIDs(peer.first, true); - } + payload, 4U + (tgidList.size() * 5U), true); } /// /// Helper to send the list of deactivated TGIDs to the specified peer. /// /// -/// -void FNENetwork::writeDeactiveTGIDs(uint32_t peerId, bool queueOnly) +void FNENetwork::writeDeactiveTGIDs(uint32_t peerId) { if (!m_tidLookup->sendTalkgroups()) { return; @@ -1096,17 +1136,7 @@ void FNENetwork::writeDeactiveTGIDs(uint32_t peerId, bool queueOnly) } writePeerCommand(peerId, { NET_FUNC_MASTER, NET_MASTER_SUBFUNC_DEACTIVE_TGS }, - payload, 4U + (tgidList.size() * 5U), queueOnly, true); -} - -/// -/// Helper to send the list of deactivated TGIDs to connected peers. -/// -void FNENetwork::writeDeactiveTGIDs() -{ - for (auto peer : m_peers) { - writeDeactiveTGIDs(peer.first, true); - } + payload, 4U + (tgidList.size() * 5U), true); } /// @@ -1119,11 +1149,12 @@ void FNENetwork::writeDeactiveTGIDs() /// /// /// -bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length, uint16_t pktSeq, uint32_t streamId, bool queueOnly) +bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, + uint32_t length, uint16_t pktSeq, uint32_t streamId, bool queueOnly, bool directWrite) const { auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; }); if (it != m_peers.end()) { - FNEPeerConnection* connection = m_peers[peerId]; + FNEPeerConnection* connection = m_peers.at(peerId); if (connection != nullptr) { uint32_t peerStreamId = connection->currStreamId(); if (streamId == 0U) { @@ -1132,10 +1163,14 @@ bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const sockaddr_storage addr = connection->socketStorage(); uint32_t addrLen = connection->sockStorageLen(); - m_frameQueue->enqueueMessage(data, length, streamId, peerId, m_peerId, opcode, pktSeq, addr, addrLen); - if (queueOnly) - return true; - return m_frameQueue->flushQueue(); + if (directWrite) + return m_frameQueue->write(data, length, streamId, peerId, m_peerId, opcode, pktSeq, addr, addrLen); + else { + m_frameQueue->enqueueMessage(data, length, streamId, peerId, m_peerId, opcode, pktSeq, addr, addrLen); + if (queueOnly) + return true; + return m_frameQueue->flushQueue(); + } } } @@ -1152,18 +1187,20 @@ bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const /// /// /// -bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length, uint32_t streamId, bool queueOnly, bool incPktSeq) +/// +bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, + uint32_t length, uint32_t streamId, bool queueOnly, bool incPktSeq, bool directWrite) const { auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; }); if (it != m_peers.end()) { - FNEPeerConnection* connection = m_peers[peerId]; + FNEPeerConnection* connection = m_peers.at(peerId); if (connection != nullptr) { if (incPktSeq) { connection->pktLastSeq(connection->pktLastSeq() + 1); } uint16_t pktSeq = connection->pktLastSeq(); - return writePeer(peerId, opcode, data, length, pktSeq, streamId, queueOnly); + return writePeer(peerId, opcode, data, length, pktSeq, streamId, queueOnly, directWrite); } } @@ -1177,10 +1214,9 @@ bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const /// Opcode. /// Buffer to write to the network. /// Length of buffer to write. -/// /// bool FNENetwork::writePeerCommand(uint32_t peerId, FrameQueue::OpcodePair opcode, - const uint8_t* data, uint32_t length, bool queueOnly, bool incPktSeq) + const uint8_t* data, uint32_t length, bool incPktSeq) const { assert(peerId > 0); @@ -1192,7 +1228,7 @@ bool FNENetwork::writePeerCommand(uint32_t peerId, FrameQueue::OpcodePair opcode } uint32_t len = length + 6U; - return writePeer(peerId, opcode, buffer, len, 0U, queueOnly, incPktSeq); + return writePeer(peerId, opcode, buffer, len, 0U, false, incPktSeq, true); } /// diff --git a/src/fne/network/FNENetwork.h b/src/fne/network/FNENetwork.h index 9280dd28..c66d536d 100644 --- a/src/fne/network/FNENetwork.h +++ b/src/fne/network/FNENetwork.h @@ -37,6 +37,12 @@ namespace network { namespace fne { class HOST_SW_API TagNXDNData; } } namespace network { + // --------------------------------------------------------------------------- + // Class Prototypes + // --------------------------------------------------------------------------- + + class HOST_SW_API FNENetwork; + // --------------------------------------------------------------------------- // Class Declaration // Represents an peer connection to the FNE. @@ -61,6 +67,7 @@ namespace network m_connectionState(NET_STAT_INVALID), m_pingsReceived(0U), m_lastPing(0U), + m_lastACLUpdate(0U), m_config(), m_pktLastSeq(0U), m_pktNextSeq(1U) @@ -83,6 +90,7 @@ namespace network m_connectionState(NET_STAT_INVALID), m_pingsReceived(0U), m_lastPing(0U), + m_lastACLUpdate(0U), m_config(), m_pktLastSeq(0U), m_pktNextSeq(1U) @@ -123,6 +131,9 @@ namespace network /// Last ping received. __PROPERTY_PLAIN(uint64_t, lastPing); + /// Last ACL update sent. + __PROPERTY_PLAIN(uint64_t, lastACLUpdate); + /// JSON objecting containing peer configuration information. __PROPERTY_PLAIN(json::object, config); @@ -132,6 +143,18 @@ namespace network __PROPERTY_PLAIN(uint16_t, pktNextSeq); }; + // --------------------------------------------------------------------------- + // Structure Declaration + // + // --------------------------------------------------------------------------- + + struct ACLUpdateRequest { + FNENetwork* network; + uint32_t peerId; + + pthread_t thread; + }; + // --------------------------------------------------------------------------- // Class Declaration // Implements the core FNE networking logic. @@ -207,9 +230,9 @@ namespace network std::unordered_map m_peerAffiliations; Timer m_maintainenceTimer; - Timer m_updateLookupTimer; - bool m_forceListUpdate; + uint32_t m_updateLookupTime; + bool m_callInProgress; bool m_disallowP25AdjStsBcast; @@ -225,34 +248,30 @@ namespace network /// Helper to complete setting up a repeater login request. void setupRepeaterLogin(uint32_t peerId, FNEPeerConnection* connection); + /// Helper to send the ACL lists to the specified peer in a separate thread. + void peerACLUpdate(uint32_t peerId); + /// Entry point to send the ACL lists to the specified peer in a separate thread. + static void* threadedACLUpdate(void* arg); + /// Helper to send the list of whitelisted RIDs to the specified peer. - void writeWhitelistRIDs(uint32_t peerId, bool queueOnly = false); - /// Helper to send the list of whitelisted RIDs to connected peers. - void writeWhitelistRIDs(); + void writeWhitelistRIDs(uint32_t peerId); /// Helper to send the list of blacklisted RIDs to the specified peer. - void writeBlacklistRIDs(uint32_t peerId, bool queueOnly = false); - /// Helper to send the list of blacklisted RIDs to connected peers. - void writeBlacklistRIDs(); - + void writeBlacklistRIDs(uint32_t peerId); /// Helper to send the list of active TGIDs to the specified peer. - void writeTGIDs(uint32_t peerId, bool queueOnly = false); - /// Helper to send the list of active TGIDs to connected peers. - void writeTGIDs(); + void writeTGIDs(uint32_t peerId); /// Helper to send the list of deactivated TGIDs to the specified peer. - void writeDeactiveTGIDs(uint32_t peerId, bool queueOnly = false); - /// Helper to send the list of deactivated TGIDs to connected peers. - void writeDeactiveTGIDs(); + void writeDeactiveTGIDs(uint32_t peerId); /// Helper to send a data message to the specified peer. bool writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length, - uint16_t pktSeq, uint32_t streamId, bool queueOnly = false); + uint16_t pktSeq, uint32_t streamId, bool queueOnly = false, bool directWrite = false) const; /// Helper to send a data message to the specified peer. bool writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length, - uint32_t streamId, bool queueOnly = false, bool incPktSeq = false); + uint32_t streamId, bool queueOnly = false, bool incPktSeq = false, bool directWrite = false) const; /// Helper to send a command message to the specified peer. bool writePeerCommand(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data = nullptr, uint32_t length = 0U, - bool queueOnly = false, bool incPktSeq = false); + bool incPktSeq = false) const; /// Helper to send a ACK response to the specified peer. bool writePeerACK(uint32_t peerId, const uint8_t* data = nullptr, uint32_t length = 0U); diff --git a/src/fne/network/PeerNetwork.cpp b/src/fne/network/PeerNetwork.cpp index dbcb7126..8555f0a6 100644 --- a/src/fne/network/PeerNetwork.cpp +++ b/src/fne/network/PeerNetwork.cpp @@ -19,6 +19,7 @@ using namespace network; #include #include +#include // --------------------------------------------------------------------------- // Public Class Members @@ -44,13 +45,30 @@ using namespace network; /// Flag indicating that the system will accept radio ID and talkgroup ID lookups from the network. PeerNetwork::PeerNetwork(const std::string& address, uint16_t port, uint16_t localPort, uint32_t peerId, const std::string& password, bool duplex, bool debug, bool dmr, bool p25, bool nxdn, bool slot1, bool slot2, bool allowActivityTransfer, bool allowDiagnosticTransfer, bool updateLookup) : - Network(address, port, localPort, peerId, password, duplex, debug, dmr, p25, nxdn, slot1, slot2, allowActivityTransfer, allowDiagnosticTransfer, updateLookup) + Network(address, port, localPort, peerId, password, duplex, debug, dmr, p25, nxdn, slot1, slot2, allowActivityTransfer, allowDiagnosticTransfer, updateLookup), + m_blockTrafficToTable() { assert(!address.empty()); assert(port > 0U); assert(!password.empty()); } +/// +/// Checks if the passed peer ID is blocked from sending to this peer. +/// +/// +bool PeerNetwork::checkBlockedPeer(uint32_t peerId) +{ + if (m_blockTrafficToTable.empty()) + return false; + + if (std::find(m_blockTrafficToTable.begin(), m_blockTrafficToTable.end(), peerId) != m_blockTrafficToTable.end()) { + return true; + } + + return false; +} + // --------------------------------------------------------------------------- // Protected Class Members // --------------------------------------------------------------------------- diff --git a/src/fne/network/PeerNetwork.h b/src/fne/network/PeerNetwork.h index 373be1d9..26f4c8e1 100644 --- a/src/fne/network/PeerNetwork.h +++ b/src/fne/network/PeerNetwork.h @@ -18,6 +18,7 @@ #include #include +#include namespace network { @@ -32,7 +33,16 @@ namespace network PeerNetwork(const std::string& address, uint16_t port, uint16_t localPort, uint32_t peerId, const std::string& password, bool duplex, bool debug, bool dmr, bool p25, bool nxdn, bool slot1, bool slot2, bool allowActivityTransfer, bool allowDiagnosticTransfer, bool updateLookup); + /// Gets the blocked traffic peer ID table. + std::vector blockTrafficTo() const { return m_blockTrafficToTable; } + /// Adds an entry to the blocked traffic peer ID table. + void addBlockedTrafficPeer(uint32_t peerId) { m_blockTrafficToTable.push_back(peerId); } + /// Checks if the passed peer ID is blocked from sending to this peer. + bool checkBlockedPeer(uint32_t peerId); + protected: + std::vector m_blockTrafficToTable; + /// Writes configuration to the network. bool writeConfig() override; }; diff --git a/src/fne/network/RESTAPI.cpp b/src/fne/network/RESTAPI.cpp index d1390622..d2cdd9fd 100644 --- a/src/fne/network/RESTAPI.cpp +++ b/src/fne/network/RESTAPI.cpp @@ -473,6 +473,7 @@ void RESTAPI::initializeEndpoints() m_dispatcher.match(GET_STATUS).get(REST_API_BIND(RESTAPI::restAPI_GetStatus, this)); m_dispatcher.match(FNE_GET_PEER_QUERY).get(REST_API_BIND(RESTAPI::restAPI_GetPeerQuery, this)); + m_dispatcher.match(FNE_GET_PEER_COUNT).get(REST_API_BIND(RESTAPI::restAPI_GetPeerCount, this)); m_dispatcher.match(FNE_GET_RID_QUERY).get(REST_API_BIND(RESTAPI::restAPI_GetRIDQuery, this)); m_dispatcher.match(FNE_PUT_RID_ADD).put(REST_API_BIND(RESTAPI::restAPI_PutRIDAdd, this)); @@ -721,6 +722,30 @@ void RESTAPI::restAPI_GetPeerQuery(const HTTPPayload& request, HTTPPayload& repl reply.payload(response); } +/// +/// +/// +/// +/// +/// +void RESTAPI::restAPI_GetPeerCount(const HTTPPayload& request, HTTPPayload& reply, const RequestMatch& match) +{ + if (!validateAuth(request, reply)) { + return; + } + + json::object response = json::object(); + setResponseDefaultStatus(response); + + json::array peers = json::array(); + if (m_network != nullptr) { + uint32_t count = m_network->m_peers.size(); + response["peerCount"].set(count); + } + + reply.payload(response); +} + /// /// /// @@ -799,10 +824,11 @@ void RESTAPI::restAPI_PutRIDAdd(const HTTPPayload& request, HTTPPayload& reply, // The addEntry function will automatically update an existing entry, so no need to check for an exisitng one here m_ridLookup->addEntry(rid, enabled, alias); - +/* if (m_network != nullptr) { m_network->m_forceListUpdate = true; } +*/ } /// @@ -838,10 +864,11 @@ void RESTAPI::restAPI_PutRIDDelete(const HTTPPayload& request, HTTPPayload& repl } m_ridLookup->eraseEntry(rid); - +/* if (m_network != nullptr) { m_network->m_forceListUpdate = true; } +*/ } /// @@ -934,10 +961,11 @@ void RESTAPI::restAPI_PutTGAdd(const HTTPPayload& request, HTTPPayload& reply, c ::LogInfoEx(LOG_REST, "Talkgroup NAME: %s SRC_TGID: %u SRC_TS: %u ACTIVE: %u PARROT: %u INCLUSIONS: %u EXCLUSIONS: %u REWRITES: %u", groupName.c_str(), tgId, tgSlot, active, parrot, incCount, excCount, rewrCount); m_tidLookup->addEntry(groupVoice); - +/* if (m_network != nullptr) { m_network->m_forceListUpdate = true; } +*/ } /// @@ -974,10 +1002,11 @@ void RESTAPI::restAPI_PutTGDelete(const HTTPPayload& request, HTTPPayload& reply } m_tidLookup->eraseEntry(groupVoice.source().tgId(), groupVoice.source().tgSlot()); - +/* if (m_network != nullptr) { m_network->m_forceListUpdate = true; } +*/ } /// @@ -1014,10 +1043,11 @@ void RESTAPI::restAPI_GetForceUpdate(const HTTPPayload& request, HTTPPayload& re json::object response = json::object(); setResponseDefaultStatus(response); +/* if (m_network != nullptr) { m_network->m_forceListUpdate = true; } - +*/ reply.payload(response); } diff --git a/src/fne/network/RESTAPI.h b/src/fne/network/RESTAPI.h index a17e2e02..5fd46995 100644 --- a/src/fne/network/RESTAPI.h +++ b/src/fne/network/RESTAPI.h @@ -97,6 +97,8 @@ private: /// void restAPI_GetPeerQuery(const HTTPPayload& request, HTTPPayload& reply, const network::rest::RequestMatch& match); + /// + void restAPI_GetPeerCount(const HTTPPayload& request, HTTPPayload& reply, const network::rest::RequestMatch& match); /// void restAPI_GetRIDQuery(const HTTPPayload& request, HTTPPayload& reply, const network::rest::RequestMatch& match); diff --git a/src/fne/network/RESTDefines.h b/src/fne/network/RESTDefines.h index 46e1bbe2..448a1a6d 100644 --- a/src/fne/network/RESTDefines.h +++ b/src/fne/network/RESTDefines.h @@ -21,6 +21,7 @@ // --------------------------------------------------------------------------- #define FNE_GET_PEER_QUERY "/peer/query" +#define FNE_GET_PEER_COUNT "/peer/count" #define FNE_GET_RID_QUERY "/rid/query" #define FNE_PUT_RID_ADD "/rid/add" diff --git a/src/fne/network/fne/TagDMRData.cpp b/src/fne/network/fne/TagDMRData.cpp index cd8d189a..3069205a 100644 --- a/src/fne/network/fne/TagDMRData.cpp +++ b/src/fne/network/fne/TagDMRData.cpp @@ -252,6 +252,11 @@ bool TagDMRData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId continue; } + // check if the source peer is blocked from sending to this peer + if (peer.second->checkBlockedPeer(peerId)) { + continue; + } + uint8_t outboundPeerBuffer[len]; ::memset(outboundPeerBuffer, 0x00U, len); ::memcpy(outboundPeerBuffer, buffer, len); diff --git a/src/fne/network/fne/TagNXDNData.cpp b/src/fne/network/fne/TagNXDNData.cpp index 4f4c8fd6..9ac0311e 100644 --- a/src/fne/network/fne/TagNXDNData.cpp +++ b/src/fne/network/fne/TagNXDNData.cpp @@ -222,6 +222,11 @@ bool TagNXDNData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerI continue; } + // check if the source peer is blocked from sending to this peer + if (peer.second->checkBlockedPeer(peerId)) { + continue; + } + uint8_t outboundPeerBuffer[len]; ::memset(outboundPeerBuffer, 0x00U, len); ::memcpy(outboundPeerBuffer, buffer, len); diff --git a/src/fne/network/fne/TagP25Data.cpp b/src/fne/network/fne/TagP25Data.cpp index fc3af658..dd5500fa 100644 --- a/src/fne/network/fne/TagP25Data.cpp +++ b/src/fne/network/fne/TagP25Data.cpp @@ -286,6 +286,11 @@ bool TagP25Data::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId continue; } + // check if the source peer is blocked from sending to this peer + if (peer.second->checkBlockedPeer(peerId)) { + continue; + } + uint8_t outboundPeerBuffer[len]; ::memset(outboundPeerBuffer, 0x00U, len); ::memcpy(outboundPeerBuffer, buffer, len); diff --git a/src/host/network/Network.cpp b/src/host/network/Network.cpp index cb120a76..d9b8f7fb 100644 --- a/src/host/network/Network.cpp +++ b/src/host/network/Network.cpp @@ -396,6 +396,7 @@ void Network::clock(uint32_t ms) m_ridLookup->toggleEntry(id, true); offs += 4U; } + LogMessage(LOG_NET, "Network Announced %u whitelisted RIDs", len); } } } @@ -413,6 +414,7 @@ void Network::clock(uint32_t ms) m_ridLookup->toggleEntry(id, false); offs += 4U; } + LogMessage(LOG_NET, "Network Announced %u blacklisted RIDs", len); } } } diff --git a/src/host/p25/packet/ControlSignaling.cpp b/src/host/p25/packet/ControlSignaling.cpp index 48e12e74..1227786b 100644 --- a/src/host/p25/packet/ControlSignaling.cpp +++ b/src/host/p25/packet/ControlSignaling.cpp @@ -751,8 +751,9 @@ bool ControlSignaling::processNetwork(uint8_t* data, uint32_t len, lc::LC& contr m_p25->m_affiliations.releaseGrant(dstId, false); } } + + return true; // don't allow this to write to the air } - break; case TSBK_OSP_DVM_GIT_HASH: // ignore return true; // don't allow this to write to the air diff --git a/src/remote/RESTClientMain.cpp b/src/remote/RESTClientMain.cpp index bc02c990..93521851 100644 --- a/src/remote/RESTClientMain.cpp +++ b/src/remote/RESTClientMain.cpp @@ -41,6 +41,7 @@ #define RCD_GET_VOICE_CH "voice-ch" #define RCD_FNE_GET_PEERLIST "fne-peerlist" +#define RCD_FNE_GET_PEERCOUNT "fne-peercount" #define RCD_FNE_GET_TGIDLIST "fne-tgidlist" #define RCD_FNE_GET_FORCEUPDATE "fne-force-update" #define RCD_FNE_GET_AFFLIST "fne-affs" @@ -182,6 +183,7 @@ void usage(const char* message, const char* arg) reply += " voice-ch Retrieves the list of configured voice channels\r\n"; reply += "\r\n"; reply += " fne-peerlist Retrieves the list of connected peers (Converged FNE only)\r\n"; + reply += " fne-peercount Retrieves the count of connected peers (Converged FNE only)\r\n"; reply += " fne-tgidlist Retrieves the list of configured TGIDs (Converged FNE only)\r\n"; reply += " fne-force-update Forces the FNE to send list update (Converged FNE only)\r\n"; reply += " fne-affs Retrieves the list of currently affiliated SUs (Converged FNE only)\r\n"; @@ -730,6 +732,9 @@ int main(int argc, char** argv) else if (rcom == RCD_FNE_GET_PEERLIST) { retCode = client->send(HTTP_GET, FNE_GET_PEER_QUERY, json::object(), response); } + else if (rcom == RCD_FNE_GET_PEERCOUNT) { + retCode = client->send(HTTP_GET, FNE_GET_PEER_COUNT, json::object(), response); + } else if (rcom == RCD_FNE_GET_TGIDLIST) { retCode = client->send(HTTP_GET, FNE_GET_TGID_QUERY, json::object(), response); }