implement functionality to block traffic to specific peers from specific external peers; use std::string over const char* wherever possible to limit pointer passing; implement support in FrameQueue and RawFrameQueue to bypass the queue and directly send messages to specified peers (useful in some situations for packets to be immediately dispatched vs queuing and batching); chunk RID list transmissions to aid in increasing performance (this will likely also need to be done with the TGID lists in the future as well); refactor transmitting ACL lists to peers into its own spun off thread so we don't block the main network processing loop (and infact, this requires documentation; since at a certain peer size DVM network partitioning should be considered to reduce the number of concurrent peers serviced by any given FNE and to better load balance connections across a network, instead of relying on a singular central FNE); implement /peer/count REST API to simply return the known connected count of peers; disable the forced ACL list update functionality (for now);

pull/49/head
Bryan Biedenkapp 2 years ago
parent 26940f0187
commit eb57f1b5d8

@ -102,6 +102,9 @@ peers:
# Network Peer ID
peerId: 9000990
# List of peer IDs to block sending traffic to from this peer.
blockTrafficTo: []
# Flag indicating whether or not peer endpoint networking is encrypted.
encrypted: false
# AES-256 32-byte Preshared Key

@ -32,7 +32,7 @@ public:
/// <summary>Initializes a new instance of the RingBuffer class.</summary>
/// <param name="length">Length of ring buffer.</param>
/// <param name="name">Name of buffer.</param>
RingBuffer(uint32_t length, const char* name) :
RingBuffer(uint32_t length, const std::string name) :
m_length(length),
m_name(name),
m_buffer(nullptr),
@ -40,7 +40,6 @@ public:
m_oPtr(0U)
{
assert(length > 0U);
assert(name != nullptr);
m_buffer = new T[length];
@ -60,7 +59,7 @@ public:
bool addData(const T* buffer, uint32_t length)
{
if (length > freeSpace()) {
LogError(LOG_HOST, "**** Overflow in %s ring buffer, %u > %u, clearing the buffer", m_name, length, freeSpace());
LogError(LOG_HOST, "**** Overflow in %s ring buffer, %u > %u, clearing the buffer", m_name.c_str(), length, freeSpace());
clear();
return false;
}
@ -74,7 +73,7 @@ public:
m_iPtr = 0U;
}
#if DEBUG_RINGBUFFER
LogDebug(LOG_HOST, "RingBuffer::addData(%s): iPtr_Before = %u, iPtr_After = %u, oPtr = %u, len = %u, len_Written = %u", m_name, iPtr_BeforeWrite, m_iPtr, m_oPtr, m_length, (m_iPtr - iPtr_BeforeWrite));
LogDebug(LOG_HOST, "RingBuffer::addData(%s): iPtr_Before = %u, iPtr_After = %u, oPtr = %u, len = %u, len_Written = %u", m_name.c_str(), iPtr_BeforeWrite, m_iPtr, m_oPtr, m_length, (m_iPtr - iPtr_BeforeWrite));
#endif
return true;
}
@ -86,7 +85,7 @@ public:
bool get(T* buffer, uint32_t length)
{
if (dataSize() < length) {
LogError(LOG_HOST, "**** Underflow get in %s ring buffer, %u < %u", m_name, dataSize(), length);
LogError(LOG_HOST, "**** Underflow get in %s ring buffer, %u < %u", m_name.c_str(), dataSize(), length);
return false;
}
#if DEBUG_RINGBUFFER
@ -99,7 +98,7 @@ public:
m_oPtr = 0U;
}
#if DEBUG_RINGBUFFER
LogDebug(LOG_HOST, "RingBuffer::getData(%s): iPtr = %u, oPtr_Before = %u, oPtr_After = %u, len = %u, len_Read = %u", m_name, m_iPtr, oPtr_BeforeRead, m_oPtr, m_length, (m_oPtr - oPtr_BeforeRead));
LogDebug(LOG_HOST, "RingBuffer::getData(%s): iPtr = %u, oPtr_Before = %u, oPtr_After = %u, len = %u, len_Read = %u", m_name.c_str(), m_iPtr, oPtr_BeforeRead, m_oPtr, m_length, (m_oPtr - oPtr_BeforeRead));
#endif
return true;
}
@ -111,7 +110,7 @@ public:
bool peek(T* buffer, uint32_t length)
{
if (dataSize() < length) {
LogError(LOG_HOST, "**** Underflow peek in %s ring buffer, %u < %u", m_name, dataSize(), length);
LogError(LOG_HOST, "**** Underflow peek in %s ring buffer, %u < %u", m_name.c_str(), dataSize(), length);
return false;
}
@ -204,7 +203,7 @@ public:
private:
uint32_t m_length;
const char* m_name;
const std::string m_name;
T* m_buffer;

@ -24,7 +24,7 @@ using namespace lookups;
/// </summary>
/// <param name="name">Name of lookup table.</param>
/// <param name="verbose">Flag indicating whether verbose logging is enabled.</param>
AffiliationLookup::AffiliationLookup(const char* name, bool verbose) :
AffiliationLookup::AffiliationLookup(const std::string name, bool verbose) :
m_rfChTable(),
m_rfChDataTable(),
m_rfGrantChCnt(0U),
@ -52,10 +52,7 @@ AffiliationLookup::AffiliationLookup(const char* name, bool verbose) :
/// <summary>
/// Finalizes a instance of the AffiliationLookup class.
/// </summary>
AffiliationLookup::~AffiliationLookup()
{
/* stub */
}
AffiliationLookup::~AffiliationLookup() = default;
/// <summary>
/// Helper to group affiliate a source ID.
@ -71,7 +68,7 @@ void AffiliationLookup::unitReg(uint32_t srcId)
if (m_verbose) {
LogMessage(LOG_HOST, "%s, unit registration, srcId = %u",
m_name, srcId);
m_name.c_str(), srcId);
}
}
@ -89,7 +86,7 @@ bool AffiliationLookup::unitDereg(uint32_t srcId)
if (m_verbose) {
LogMessage(LOG_HOST, "%s, unit deregistration, srcId = %u",
m_name, srcId);
m_name.c_str(), srcId);
}
groupUnaff(srcId);
@ -126,7 +123,7 @@ bool AffiliationLookup::isUnitReg(uint32_t srcId) const
void AffiliationLookup::clearUnitReg()
{
std::vector<uint32_t> srcToRel = std::vector<uint32_t>();
LogWarning(LOG_HOST, "%s, releasing all unit registrations", m_name);
LogWarning(LOG_HOST, "%s, releasing all unit registrations", m_name.c_str());
m_unitRegTable.clear();
}
@ -143,7 +140,7 @@ void AffiliationLookup::groupAff(uint32_t srcId, uint32_t dstId)
if (m_verbose) {
LogMessage(LOG_HOST, "%s, group affiliation, srcId = %u, dstId = %u",
m_name, srcId, dstId);
m_name.c_str(), srcId, dstId);
}
}
}
@ -160,7 +157,7 @@ bool AffiliationLookup::groupUnaff(uint32_t srcId)
uint32_t tblDstId = m_grpAffTable.at(srcId);
if (m_verbose) {
LogMessage(LOG_HOST, "%s, group unaffiliation, srcId = %u, dstId = %u",
m_name, srcId, tblDstId);
m_name.c_str(), srcId, tblDstId);
}
} else {
return false;
@ -230,14 +227,14 @@ std::vector<uint32_t> AffiliationLookup::clearGroupAff(uint32_t dstId, bool rele
}
if (dstId == 0U && releaseAll) {
LogWarning(LOG_HOST, "%s, releasing all group affiliations", m_name);
LogWarning(LOG_HOST, "%s, releasing all group affiliations", m_name.c_str());
for (auto entry : m_grpAffTable) {
uint32_t srcId = entry.first;
srcToRel.push_back(srcId);
}
}
else {
LogWarning(LOG_HOST, "%s, releasing group affiliations, dstId = %u", m_name, dstId);
LogWarning(LOG_HOST, "%s, releasing group affiliations, dstId = %u", m_name.c_str(), dstId);
for (auto entry : m_grpAffTable) {
uint32_t srcId = entry.first;
uint32_t grpId = entry.second;
@ -289,7 +286,7 @@ bool AffiliationLookup::grantCh(uint32_t dstId, uint32_t srcId, uint32_t grantTi
if (m_verbose) {
LogMessage(LOG_HOST, "%s, granting channel, chNo = %u, dstId = %u, srcId = %u, group = %u",
m_name, chNo, dstId, srcId, grp);
m_name.c_str(), chNo, dstId, srcId, grp);
}
return true;
@ -324,7 +321,7 @@ bool AffiliationLookup::releaseGrant(uint32_t dstId, bool releaseAll)
// are we trying to release all grants?
if (dstId == 0U && releaseAll) {
LogWarning(LOG_HOST, "%s, force releasing all channel grants", m_name);
LogWarning(LOG_HOST, "%s, force releasing all channel grants", m_name.c_str());
std::vector<uint32_t> gntsToRel = std::vector<uint32_t>();
for (auto entry : m_grantChTable) {
@ -345,7 +342,7 @@ bool AffiliationLookup::releaseGrant(uint32_t dstId, bool releaseAll)
if (m_verbose) {
LogMessage(LOG_HOST, "%s, releasing channel grant, chNo = %u, dstId = %u",
m_name, chNo, dstId);
m_name.c_str(), chNo, dstId);
}
if (m_releaseGrant != nullptr) {

@ -100,7 +100,7 @@ namespace lookups
class HOST_SW_API AffiliationLookup {
public:
/// <summary>Initializes a new instance of the AffiliationLookup class.</summary>
AffiliationLookup(const char* name, bool verbose);
AffiliationLookup(const std::string name, bool verbose);
/// <summary>Finalizes a instance of the AffiliationLookup class.</summary>
virtual ~AffiliationLookup();
@ -196,7 +196,7 @@ namespace lookups
// chNo dstId slot
std::function<void(uint32_t, uint32_t, uint8_t)> m_releaseGrant;
const char *m_name;
const std::string m_name;
bool m_verbose;
};

@ -131,7 +131,39 @@ UInt8Array FrameQueue::read(int& messageLength, sockaddr_storage& address, uint3
}
/// <summary>
/// Cache "message" to frame queue.
/// Write message to the UDP socket.
/// </summary>
/// <param name="message">Message buffer to frame and queue.</param>
/// <param name="length">Length of message.</param>
/// <param name="streamId">Message stream ID.</param>
/// <param name="peerId">Peer ID.</param>
/// <param name="ssrc">RTP SSRC ID.</param>
/// <param name="opcode">Opcode.</param>
/// <param name="rtpSeq">RTP Sequence.</param>
/// <param name="addr">IP address to write data to.</param>
/// <param name="addrLen"></param>
/// <returns></returns>
bool FrameQueue::write(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId,
uint32_t ssrc, OpcodePair opcode, uint16_t rtpSeq, sockaddr_storage& addr, uint32_t addrLen)
{
assert(message != nullptr);
assert(length > 0U);
uint32_t bufferLen = 0U;
uint8_t* buffer = generateMessage(message, length, streamId, peerId, ssrc, opcode, rtpSeq, &bufferLen);
bool ret = true;
if (!m_socket->write(buffer, bufferLen, addr, addrLen)) {
LogError(LOG_NET, "Failed writing data to the network");
ret = false;
}
delete buffer;
return ret;
}
/// <summary>
/// Cache message to frame queue.
/// </summary>
/// <param name="message">Message buffer to frame and queue.</param>
/// <param name="length">Length of message.</param>
@ -149,7 +181,7 @@ void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_
}
/// <summary>
/// Cache "message" to frame queue.
/// Cache message to frame queue.
/// </summary>
/// <param name="message">Message buffer to frame and queue.</param>
/// <param name="length">Length of message.</param>
@ -167,6 +199,48 @@ void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_
assert(message != nullptr);
assert(length > 0U);
uint32_t bufferLen = 0U;
uint8_t* buffer = generateMessage(message, length, streamId, peerId, ssrc, opcode, rtpSeq, &bufferLen);
udp::UDPDatagram *dgram = new udp::UDPDatagram;
dgram->buffer = buffer;
dgram->length = bufferLen;
dgram->address = addr;
dgram->addrLen = addrLen;
m_buffers.push_back(dgram);
}
/// <summary>
/// Helper method to clear any tracked stream timestamps.
/// </summary>
void FrameQueue::clearTimestamps()
{
m_streamTimestamps.clear();
}
// ---------------------------------------------------------------------------
// Private Class Members
// ---------------------------------------------------------------------------
/// <summary>
/// Generate RTP message for the frame queue.
/// </summary>
/// <param name="message">Message buffer to frame and queue.</param>
/// <param name="length">Length of message.</param>
/// <param name="streamId">Message stream ID.</param>
/// <param name="peerId">Peer ID.</param>
/// <param name="ssrc">RTP SSRC ID.</param>
/// <param name="opcode">Opcode.</param>
/// <param name="rtpSeq">RTP Sequence.</param>
/// <param name="outBufferLen"></param>
/// <returns></returns>
uint8_t* FrameQueue::generateMessage(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId,
uint32_t ssrc, OpcodePair opcode, uint16_t rtpSeq, uint32_t* outBufferLen)
{
assert(message != nullptr);
assert(length > 0U);
uint32_t timestamp = INVALID_TS;
if (streamId != 0U) {
auto entry = m_streamTimestamps.find(streamId);
@ -177,7 +251,7 @@ void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_
if (timestamp != INVALID_TS) {
timestamp += (RTP_GENERIC_CLOCK_RATE / 133);
if (m_debug)
LogDebug(LOG_NET, "FrameQueue::enqueueMessage() RTP streamId = %u, previous TS = %u, TS = %u, rtpSeq = %u", streamId, m_streamTimestamps[streamId], timestamp, rtpSeq);
LogDebug(LOG_NET, "FrameQueue::generateMessage() RTP streamId = %u, previous TS = %u, TS = %u, rtpSeq = %u", streamId, m_streamTimestamps[streamId], timestamp, rtpSeq);
m_streamTimestamps[streamId] = timestamp;
}
}
@ -198,7 +272,7 @@ void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_
if (streamId != 0U && timestamp == INVALID_TS && rtpSeq != RTP_END_OF_CALL_SEQ) {
if (m_debug)
LogDebug(LOG_NET, "FrameQueue::enqueueMessage() RTP streamId = %u, initial TS = %u, rtpSeq = %u", streamId, header.getTimestamp(), rtpSeq);
LogDebug(LOG_NET, "FrameQueue::generateMessage() RTP streamId = %u, initial TS = %u, rtpSeq = %u", streamId, header.getTimestamp(), rtpSeq);
m_streamTimestamps[streamId] = header.getTimestamp();
}
@ -206,7 +280,7 @@ void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_
auto entry = m_streamTimestamps.find(streamId);
if (entry != m_streamTimestamps.end()) {
if (m_debug)
LogDebug(LOG_NET, "FrameQueue::enqueueMessage() RTP streamId = %u, rtpSeq = %u", streamId, rtpSeq);
LogDebug(LOG_NET, "FrameQueue::generateMessage() RTP streamId = %u, rtpSeq = %u", streamId, rtpSeq);
m_streamTimestamps.erase(streamId);
}
}
@ -225,21 +299,11 @@ void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_
::memcpy(buffer + RTP_HEADER_LENGTH_BYTES + RTP_EXTENSION_HEADER_LENGTH_BYTES + RTP_FNE_HEADER_LENGTH_BYTES, message, length);
if (m_debug)
Utils::dump(1U, "FrameQueue::enqueueMessage() Buffered Message", buffer, bufferLen);
udp::UDPDatagram *dgram = new udp::UDPDatagram;
dgram->buffer = buffer;
dgram->length = bufferLen;
dgram->address = addr;
dgram->addrLen = addrLen;
Utils::dump(1U, "FrameQueue::generateMessage() Buffered Message", buffer, bufferLen);
m_buffers.push_back(dgram);
}
if (outBufferLen != nullptr) {
*outBufferLen = bufferLen;
}
/// <summary>
/// Helper method to clear any tracked stream timestamps.
/// </summary>
void FrameQueue::clearTimestamps()
{
m_streamTimestamps.clear();
return buffer;
}

@ -46,11 +46,14 @@ namespace network
/// <summary>Read message from the received UDP packet.</summary>
UInt8Array read(int& messageLength, sockaddr_storage& address, uint32_t& addrLen,
frame::RTPHeader* rtpHeader = nullptr, frame::RTPFNEHeader* fneHeader = nullptr);
/// <summary>Write message to the UDP socket.</summary>
bool write(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId,
uint32_t ssrc, OpcodePair opcode, uint16_t rtpSeq, sockaddr_storage& addr, uint32_t addrLen);
/// <summary>Cache "message" to frame queue.</summary>
/// <summary>Cache message to frame queue.</summary>
void enqueueMessage(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId,
OpcodePair opcode, uint16_t rtpSeq, sockaddr_storage& addr, uint32_t addrLen);
/// <summary>Cache "message" to frame queue.</summary>
/// <summary>Cache message to frame queue.</summary>
void enqueueMessage(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId,
uint32_t ssrc, OpcodePair opcode, uint16_t rtpSeq, sockaddr_storage& addr, uint32_t addrLen);
@ -60,6 +63,10 @@ namespace network
private:
uint32_t m_peerId;
std::unordered_map<uint32_t, uint32_t> m_streamTimestamps;
/// <summary>Generate RTP message for the frame queue.</summary>
uint8_t* generateMessage(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId,
uint32_t ssrc, OpcodePair opcode, uint16_t rtpSeq, uint32_t* outBufferLen);
};
} // namespace network

@ -82,7 +82,36 @@ UInt8Array RawFrameQueue::read(int& messageLength, sockaddr_storage& address, ui
}
/// <summary>
/// Cache "message" to frame queue.
/// Write message to the UDP socket.
/// </summary>
/// <param name="message">Message buffer to frame and queue.</param>
/// <param name="length">Length of message.</param>
/// <param name="addr">IP address to write data to.</param>
/// <param name="addrLen"></param>
/// <returns></returns>
bool RawFrameQueue::write(const uint8_t* message, uint32_t length, sockaddr_storage& addr, uint32_t addrLen)
{
assert(message != nullptr);
assert(length > 0U);
uint8_t* buffer = new uint8_t[length];
::memset(buffer, 0x00U, length);
::memcpy(buffer, message, length);
if (m_debug)
Utils::dump(1U, "RawFrameQueue::write() Message", buffer, length);
bool ret = true;
if (!m_socket->write(buffer, length, addr, addrLen)) {
LogError(LOG_NET, "Failed writing data to the network");
ret = false;
}
return ret;
}
/// <summary>
/// Cache message to frame queue.
/// </summary>
/// <param name="message">Message buffer to frame and queue.</param>
/// <param name="length">Length of message.</param>

@ -43,8 +43,10 @@ namespace network
/// <summary>Read message from the received UDP packet.</summary>
UInt8Array read(int& messageLength, sockaddr_storage& address, uint32_t& addrLen);
/// <summary>Write message to the UDP socket.</summary>
bool write(const uint8_t* message, uint32_t length, sockaddr_storage& addr, uint32_t addrLen);
/// <summary>Cache "message" to frame queue.</summary>
/// <summary>Cache message to frame queue.</summary>
void enqueueMessage(const uint8_t* message, uint32_t length, sockaddr_storage& addr, uint32_t addrLen);
/// <summary>Flush the message queue.</summary>

@ -232,10 +232,23 @@ ssize_t Socket::read(uint8_t* buffer, uint32_t length, sockaddr_storage& address
uint16_t magic = __GET_UINT16B(buffer, 0U);
if (magic == AES_WRAPPED_PCKT_MAGIC) {
uint32_t cryptedLen = (len - 2U) * sizeof(uint8_t);
// Utils::dump(1U, "Socket::read() crypted", buffer + 2U, cryptedLen);
uint8_t* cryptoBuffer = buffer + 2U;
// do we need to pad the original buffer to be block aligned?
if (cryptedLen % crypto::AES::BLOCK_BYTES_LEN != 0) {
uint32_t alignment = crypto::AES::BLOCK_BYTES_LEN - (cryptedLen % crypto::AES::BLOCK_BYTES_LEN);
cryptedLen += alignment;
// reallocate buffer and copy
cryptoBuffer = new uint8_t[cryptedLen];
::memset(cryptoBuffer, 0x00U, cryptedLen);
::memcpy(cryptoBuffer, buffer + 2U, len - 2U);
}
// Utils::dump(1U, "Socket::read() crypted", cryptoBuffer, cryptedLen);
// decrypt
uint8_t* decrypted = m_aes->decryptECB(buffer + 2U, cryptedLen, m_presharedKey);
uint8_t* decrypted = m_aes->decryptECB(cryptoBuffer, cryptedLen, m_presharedKey);
// Utils::dump(1U, "Socket::read() decrypted", decrypted, cryptedLen);
@ -320,6 +333,7 @@ bool Socket::write(const uint8_t* buffer, uint32_t length, const sockaddr_storag
::memcpy(out.get() + 2U, crypted, cryptedLen);
__SET_UINT16B(AES_WRAPPED_PCKT_MAGIC, out.get(), 0U);
delete[] crypted;
length = cryptedLen + 2U;
} else {
if (lenWritten != nullptr) {
*lenWritten = -1;

@ -542,6 +542,19 @@ bool HostFNE::createPeerNetworks()
network->setPresharedKey(presharedKey);
}
/*
** Block Traffic To Peers
*/
yaml::Node& blockTrafficTo = peerConf["blockTrafficTo"];
if (blockTrafficTo.size() > 0U) {
for (size_t i = 0; i < blockTrafficTo.size(); i++) {
uint32_t peerId = (uint32_t)::strtoul(blockTrafficTo[i].as<std::string>("0").c_str(), NULL, 10);
if (peerId != 0U) {
network->addBlockedTrafficPeer(peerId);
}
}
}
network->enable(enabled);
if (enabled) {
bool ret = network->open();

@ -14,6 +14,7 @@
#include "common/edac/SHA256.h"
#include "common/network/json/json.h"
#include "common/Log.h"
#include "common/ThreadFunc.h"
#include "common/Utils.h"
#include "network/FNENetwork.h"
#include "network/fne/TagDMRData.h"
@ -28,6 +29,13 @@ using namespace network::fne;
#include <cassert>
#include <chrono>
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
const uint8_t MAX_PEER_LIST_BEFORE_FLUSH = 10U;
const uint32_t MAX_RID_LIST_CHUNK = 50U;
// ---------------------------------------------------------------------------
// Public Class Members
// ---------------------------------------------------------------------------
@ -74,8 +82,7 @@ FNENetwork::FNENetwork(HostFNE* host, const std::string& address, uint16_t port,
m_peers(),
m_peerAffiliations(),
m_maintainenceTimer(1000U, pingTime),
m_updateLookupTimer(1000U, (updateLookupTime * 60U)),
m_forceListUpdate(false),
m_updateLookupTime(updateLookupTime * 60U),
m_callInProgress(false),
m_disallowP25AdjStsBcast(true),
m_reportPeerPing(reportPeerPing),
@ -184,20 +191,6 @@ void FNENetwork::clock(uint32_t ms)
m_maintainenceTimer.start();
}
m_updateLookupTimer.clock(ms);
if ((m_updateLookupTimer.isRunning() && m_updateLookupTimer.hasExpired()) || m_forceListUpdate) {
writeWhitelistRIDs();
writeBlacklistRIDs();
m_frameQueue->flushQueue();
writeTGIDs();
writeDeactiveTGIDs();
m_frameQueue->flushQueue();
m_updateLookupTimer.start();
m_forceListUpdate = false;
}
sockaddr_storage address;
uint32_t addrLen;
frame::RTPHeader rtpHeader;
@ -456,20 +449,12 @@ void FNENetwork::clock(uint32_t ms)
connection->connected(true);
connection->pingsReceived(0U);
connection->lastPing(now);
connection->lastACLUpdate(now);
m_peers[peerId] = connection;
writePeerACK(peerId);
LogInfoEx(LOG_NET, "PEER %u RPTC ACK, completed the configuration exchange", peerId);
// queue final update messages and flush
writeWhitelistRIDs(peerId, true);
writeBlacklistRIDs(peerId, true);
m_frameQueue->flushQueue();
writeTGIDs(peerId, true);
writeDeactiveTGIDs(peerId, true);
m_frameQueue->flushQueue();
json::object peerConfig = connection->config();
if (peerConfig["software"].is<std::string>()) {
std::string software = peerConfig["software"].get<std::string>();
@ -477,9 +462,12 @@ void FNENetwork::clock(uint32_t ms)
}
// setup the affiliations list for this peer
char *peerName = new char[16];
::sprintf(peerName, "PEER %u", peerId);
m_peerAffiliations[peerId] = new lookups::AffiliationLookup(peerName, m_verbose);
std::stringstream peerName;
peerName << "PEER " << peerId;
m_peerAffiliations[peerId] = new lookups::AffiliationLookup(peerName.str(), m_verbose);
// spin up a thread and send ACL list over to peer
peerACLUpdate(peerId);
}
}
}
@ -526,17 +514,26 @@ void FNENetwork::clock(uint32_t ms)
// validate peer (simple validation really)
if (connection->connected() && connection->address() == ip) {
uint32_t pingsRx = connection->pingsReceived();
uint64_t lastPing = connection->lastPing();
pingsRx++;
connection->pingsReceived(pingsRx);
connection->lastPing(now);
connection->pktLastSeq(connection->pktLastSeq() + 1);
// does this peer need an ACL update?
uint64_t dt = connection->lastACLUpdate() + m_updateLookupTime;
if (dt < now) {
LogInfoEx(LOG_NET, "PEER %u updating ACL list, dt = %u, now = %u", peerId, dt, now);
peerACLUpdate(peerId);
connection->lastACLUpdate(now);
}
m_peers[peerId] = connection;
writePeerCommand(peerId, { NET_FUNC_PONG, NET_SUBFUNC_NOP });
if (m_reportPeerPing) {
LogInfoEx(LOG_NET, "PEER %u ping received and answered, pingsReceived = %u", peerId, connection->pingsReceived());
LogInfoEx(LOG_NET, "PEER %u ping, pingsReceived = %u, lastPing = %u", peerId, connection->pingsReceived(), lastPing);
}
}
else {
@ -789,7 +786,6 @@ void FNENetwork::close()
m_socket->close();
m_maintainenceTimer.stop();
m_updateLookupTimer.stop();
m_status = NET_STAT_INVALID;
}
@ -857,12 +853,51 @@ void FNENetwork::setupRepeaterLogin(uint32_t peerId, FNEPeerConnection* connecti
LogInfoEx(LOG_NET, "PEER %u RPTL ACK, challenge response sent for login", peerId);
}
/// <summary>
/// Helper to send the ACL lists to the specified peer in a separate thread.
/// </summary>
/// <param name="peerId"></param>
void FNENetwork::peerACLUpdate(uint32_t peerId)
{
ACLUpdateRequest* req = new ACLUpdateRequest();
req->network = this;
req->peerId = peerId;
std::stringstream peerName;
peerName << "peer" << peerId << ":acl-update";
::pthread_create(&req->thread, NULL, threadedACLUpdate, req);
if (pthread_kill(req->thread, 0) == 0) {
::pthread_setname_np(req->thread, peerName.str().c_str());
}
}
/// <summary>
/// Helper to send the ACL lists to the specified peer in a separate thread.
/// </summary>
/// <param name="arg"></param>
void* FNENetwork::threadedACLUpdate(void* arg)
{
ACLUpdateRequest* req = (ACLUpdateRequest*)arg;
if (req != nullptr) {
LogInfoEx(LOG_NET, "PEER %u sending ACL list updates", req->peerId);
req->network->writeWhitelistRIDs(req->peerId);
req->network->writeBlacklistRIDs(req->peerId);
req->network->writeTGIDs(req->peerId);
req->network->writeDeactiveTGIDs(req->peerId);
delete req;
}
return nullptr;
}
/// <summary>
/// Helper to send the list of whitelisted RIDs to the specified peer.
/// </summary>
/// <param name="peerId"></param>
/// <param name="queueOnly"></param>
void FNENetwork::writeWhitelistRIDs(uint32_t peerId, bool queueOnly)
void FNENetwork::writeWhitelistRIDs(uint32_t peerId)
{
// send radio ID white/black lists
std::vector<uint32_t> ridWhitelist;
@ -879,37 +914,46 @@ void FNENetwork::writeWhitelistRIDs(uint32_t peerId, bool queueOnly)
return;
}
// build dataset
uint8_t payload[4U + (ridWhitelist.size() * 4U)];
::memset(payload, 0x00U, 4U + (ridWhitelist.size() * 4U));
// send a chunk of RIDs to the peer
FNEPeerConnection* connection = m_peers[peerId];
if (connection != nullptr) {
uint32_t chunkCnt = (ridWhitelist.size() / MAX_RID_LIST_CHUNK) + 1U;
for (uint32_t i = 0U; i < chunkCnt; i++) {
size_t listSize = ridWhitelist.size();
if (chunkCnt > 1U) {
listSize = MAX_RID_LIST_CHUNK;
}
__SET_UINT32(ridWhitelist.size(), payload, 0U);
if (i == chunkCnt - 1U) {
listSize = (chunkCnt * MAX_RID_LIST_CHUNK) - ridWhitelist.size();
}
// write whitelisted IDs to whitelist payload
uint32_t offs = 4U;
for (uint32_t id : ridWhitelist) {
if (m_debug)
LogDebug(LOG_NET, "PEER %u whitelisting RID %u", peerId, id);
if (listSize > ridWhitelist.size()) {
listSize = ridWhitelist.size();
}
__SET_UINT32(id, payload, offs);
offs += 4U;
}
// build dataset
uint16_t bufSize = 4U + (listSize * 4U);
uint8_t payload[bufSize];
::memset(payload, 0x00U, bufSize);
writePeerCommand(peerId, { NET_FUNC_MASTER, NET_MASTER_SUBFUNC_WL_RID },
payload, 4U + (ridWhitelist.size() * 4U), queueOnly, true);
}
__SET_UINT32(listSize, payload, 0U);
/// <summary>
/// Helper to send the list of whitelisted RIDs to connected peers.
/// </summary>
void FNENetwork::writeWhitelistRIDs()
{
if (m_ridLookup->table().size() == 0U) {
return;
}
// write whitelisted IDs to whitelist payload
uint32_t offs = 4U;
for (uint32_t j = 0; j < listSize; j++) {
uint32_t id = ridWhitelist.at(j + (i * MAX_RID_LIST_CHUNK));
if (m_debug)
LogDebug(LOG_NET, "PEER %u whitelisting RID %u (%d / %d)", peerId, id, i, j);
__SET_UINT32(id, payload, offs);
offs += 4U;
}
for (auto peer : m_peers) {
writeWhitelistRIDs(peer.first, true);
writePeerCommand(peerId, { NET_FUNC_MASTER, NET_MASTER_SUBFUNC_WL_RID },
payload, bufSize, true);
}
}
}
@ -917,8 +961,7 @@ void FNENetwork::writeWhitelistRIDs()
/// Helper to send the list of whitelisted RIDs to the specified peer.
/// </summary>
/// <param name="peerId"></param>
/// <param name="queueOnly"></param>
void FNENetwork::writeBlacklistRIDs(uint32_t peerId, bool queueOnly)
void FNENetwork::writeBlacklistRIDs(uint32_t peerId)
{
// send radio ID blacklist
std::vector<uint32_t> ridBlacklist;
@ -935,37 +978,46 @@ void FNENetwork::writeBlacklistRIDs(uint32_t peerId, bool queueOnly)
return;
}
// build dataset
uint8_t payload[4U + (ridBlacklist.size() * 4U)];
::memset(payload, 0x00U, 4U + (ridBlacklist.size() * 4U));
// send a chunk of RIDs to the peer
FNEPeerConnection* connection = m_peers[peerId];
if (connection != nullptr) {
uint32_t chunkCnt = (ridBlacklist.size() / MAX_RID_LIST_CHUNK) + 1U;
for (uint32_t i = 0U; i < chunkCnt; i++) {
size_t listSize = ridBlacklist.size();
if (chunkCnt > 1U) {
listSize = MAX_RID_LIST_CHUNK;
}
__SET_UINT32(ridBlacklist.size(), payload, 0U);
if (i == chunkCnt - 1U) {
listSize = (chunkCnt * MAX_RID_LIST_CHUNK) - ridBlacklist.size();
}
// write blacklisted IDs to blacklist payload
uint32_t offs = 4U;
for (uint32_t id : ridBlacklist) {
if (m_debug)
LogDebug(LOG_NET, "PEER %u blacklisting RID %u", peerId, id);
if (listSize > ridBlacklist.size()) {
listSize = ridBlacklist.size();
}
__SET_UINT32(id, payload, offs);
offs += 4U;
}
// build dataset
uint16_t bufSize = 4U + (listSize * 4U);
uint8_t payload[bufSize];
::memset(payload, 0x00U, bufSize);
writePeerCommand(peerId, { NET_FUNC_MASTER, NET_MASTER_SUBFUNC_BL_RID },
payload, 4U + (ridBlacklist.size() * 4U), queueOnly, true);
}
__SET_UINT32(listSize, payload, 0U);
/// <summary>
/// Helper to send the list of whitelisted RIDs to connected peers.
/// </summary>
void FNENetwork::writeBlacklistRIDs()
{
if (m_ridLookup->table().size() == 0U) {
return;
}
// write whitelisted IDs to whitelist payload
uint32_t offs = 4U;
for (uint32_t j = 0; j < listSize; j++) {
uint32_t id = ridBlacklist.at(j + (i * MAX_RID_LIST_CHUNK));
for (auto peer : m_peers) {
writeBlacklistRIDs(peer.first, true);
if (m_debug)
LogDebug(LOG_NET, "PEER %u blacklisting RID %u (%d / %d)", peerId, id, i, j);
__SET_UINT32(id, payload, offs);
offs += 4U;
}
writePeerCommand(peerId, { NET_FUNC_MASTER, NET_MASTER_SUBFUNC_BL_RID },
payload, bufSize, true);
}
}
}
@ -973,8 +1025,7 @@ void FNENetwork::writeBlacklistRIDs()
/// Helper to send the list of active TGIDs to the specified peer.
/// </summary>
/// <param name="peerId"></param>
/// <param name="queueOnly"></param>
void FNENetwork::writeTGIDs(uint32_t peerId, bool queueOnly)
void FNENetwork::writeTGIDs(uint32_t peerId)
{
if (!m_tidLookup->sendTalkgroups()) {
return;
@ -1026,25 +1077,14 @@ void FNENetwork::writeTGIDs(uint32_t peerId, bool queueOnly)
}
writePeerCommand(peerId, { NET_FUNC_MASTER, NET_MASTER_SUBFUNC_ACTIVE_TGS },
payload, 4U + (tgidList.size() * 5U), queueOnly, true);
}
/// <summary>
/// Helper to send the list of active TGIDs to connected peers.
/// </summary>
void FNENetwork::writeTGIDs()
{
for (auto peer : m_peers) {
writeTGIDs(peer.first, true);
}
payload, 4U + (tgidList.size() * 5U), true);
}
/// <summary>
/// Helper to send the list of deactivated TGIDs to the specified peer.
/// </summary>
/// <param name="peerId"></param>
/// <param name="queueOnly"></param>
void FNENetwork::writeDeactiveTGIDs(uint32_t peerId, bool queueOnly)
void FNENetwork::writeDeactiveTGIDs(uint32_t peerId)
{
if (!m_tidLookup->sendTalkgroups()) {
return;
@ -1096,17 +1136,7 @@ void FNENetwork::writeDeactiveTGIDs(uint32_t peerId, bool queueOnly)
}
writePeerCommand(peerId, { NET_FUNC_MASTER, NET_MASTER_SUBFUNC_DEACTIVE_TGS },
payload, 4U + (tgidList.size() * 5U), queueOnly, true);
}
/// <summary>
/// Helper to send the list of deactivated TGIDs to connected peers.
/// </summary>
void FNENetwork::writeDeactiveTGIDs()
{
for (auto peer : m_peers) {
writeDeactiveTGIDs(peer.first, true);
}
payload, 4U + (tgidList.size() * 5U), true);
}
/// <summary>
@ -1119,11 +1149,12 @@ void FNENetwork::writeDeactiveTGIDs()
/// <param name="pktSeq"></param>
/// <param name="streamId"></param>
/// <param name="queueOnly"></param>
bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length, uint16_t pktSeq, uint32_t streamId, bool queueOnly)
bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data,
uint32_t length, uint16_t pktSeq, uint32_t streamId, bool queueOnly, bool directWrite) const
{
auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; });
if (it != m_peers.end()) {
FNEPeerConnection* connection = m_peers[peerId];
FNEPeerConnection* connection = m_peers.at(peerId);
if (connection != nullptr) {
uint32_t peerStreamId = connection->currStreamId();
if (streamId == 0U) {
@ -1132,10 +1163,14 @@ bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const
sockaddr_storage addr = connection->socketStorage();
uint32_t addrLen = connection->sockStorageLen();
m_frameQueue->enqueueMessage(data, length, streamId, peerId, m_peerId, opcode, pktSeq, addr, addrLen);
if (queueOnly)
return true;
return m_frameQueue->flushQueue();
if (directWrite)
return m_frameQueue->write(data, length, streamId, peerId, m_peerId, opcode, pktSeq, addr, addrLen);
else {
m_frameQueue->enqueueMessage(data, length, streamId, peerId, m_peerId, opcode, pktSeq, addr, addrLen);
if (queueOnly)
return true;
return m_frameQueue->flushQueue();
}
}
}
@ -1152,18 +1187,20 @@ bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const
/// <param name="streamId"></param>
/// <param name="queueOnly"></param>
/// <param name="incPktSeq"></param>
bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length, uint32_t streamId, bool queueOnly, bool incPktSeq)
/// <param name="directWrite"></param>
bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data,
uint32_t length, uint32_t streamId, bool queueOnly, bool incPktSeq, bool directWrite) const
{
auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; });
if (it != m_peers.end()) {
FNEPeerConnection* connection = m_peers[peerId];
FNEPeerConnection* connection = m_peers.at(peerId);
if (connection != nullptr) {
if (incPktSeq) {
connection->pktLastSeq(connection->pktLastSeq() + 1);
}
uint16_t pktSeq = connection->pktLastSeq();
return writePeer(peerId, opcode, data, length, pktSeq, streamId, queueOnly);
return writePeer(peerId, opcode, data, length, pktSeq, streamId, queueOnly, directWrite);
}
}
@ -1177,10 +1214,9 @@ bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const
/// <param name="opcode">Opcode.</param>
/// <param name="data">Buffer to write to the network.</param>
/// <param name="length">Length of buffer to write.</param>
/// <param name="queueOnly"></param>
/// <param name="incPktSeq"></param>
bool FNENetwork::writePeerCommand(uint32_t peerId, FrameQueue::OpcodePair opcode,
const uint8_t* data, uint32_t length, bool queueOnly, bool incPktSeq)
const uint8_t* data, uint32_t length, bool incPktSeq) const
{
assert(peerId > 0);
@ -1192,7 +1228,7 @@ bool FNENetwork::writePeerCommand(uint32_t peerId, FrameQueue::OpcodePair opcode
}
uint32_t len = length + 6U;
return writePeer(peerId, opcode, buffer, len, 0U, queueOnly, incPktSeq);
return writePeer(peerId, opcode, buffer, len, 0U, false, incPktSeq, true);
}
/// <summary>

@ -37,6 +37,12 @@ namespace network { namespace fne { class HOST_SW_API TagNXDNData; } }
namespace network
{
// ---------------------------------------------------------------------------
// Class Prototypes
// ---------------------------------------------------------------------------
class HOST_SW_API FNENetwork;
// ---------------------------------------------------------------------------
// Class Declaration
// Represents an peer connection to the FNE.
@ -61,6 +67,7 @@ namespace network
m_connectionState(NET_STAT_INVALID),
m_pingsReceived(0U),
m_lastPing(0U),
m_lastACLUpdate(0U),
m_config(),
m_pktLastSeq(0U),
m_pktNextSeq(1U)
@ -83,6 +90,7 @@ namespace network
m_connectionState(NET_STAT_INVALID),
m_pingsReceived(0U),
m_lastPing(0U),
m_lastACLUpdate(0U),
m_config(),
m_pktLastSeq(0U),
m_pktNextSeq(1U)
@ -123,6 +131,9 @@ namespace network
/// <summary>Last ping received.</summary>
__PROPERTY_PLAIN(uint64_t, lastPing);
/// <summary>Last ACL update sent.</summary>
__PROPERTY_PLAIN(uint64_t, lastACLUpdate);
/// <summary>JSON objecting containing peer configuration information.</summary>
__PROPERTY_PLAIN(json::object, config);
@ -132,6 +143,18 @@ namespace network
__PROPERTY_PLAIN(uint16_t, pktNextSeq);
};
// ---------------------------------------------------------------------------
// Structure Declaration
//
// ---------------------------------------------------------------------------
struct ACLUpdateRequest {
FNENetwork* network;
uint32_t peerId;
pthread_t thread;
};
// ---------------------------------------------------------------------------
// Class Declaration
// Implements the core FNE networking logic.
@ -207,9 +230,9 @@ namespace network
std::unordered_map<uint32_t, lookups::AffiliationLookup*> m_peerAffiliations;
Timer m_maintainenceTimer;
Timer m_updateLookupTimer;
bool m_forceListUpdate;
uint32_t m_updateLookupTime;
bool m_callInProgress;
bool m_disallowP25AdjStsBcast;
@ -225,34 +248,30 @@ namespace network
/// <summary>Helper to complete setting up a repeater login request.</summary>
void setupRepeaterLogin(uint32_t peerId, FNEPeerConnection* connection);
/// <summary>Helper to send the ACL lists to the specified peer in a separate thread.</summary>
void peerACLUpdate(uint32_t peerId);
/// <summary>Entry point to send the ACL lists to the specified peer in a separate thread.</summary>
static void* threadedACLUpdate(void* arg);
/// <summary>Helper to send the list of whitelisted RIDs to the specified peer.</summary>
void writeWhitelistRIDs(uint32_t peerId, bool queueOnly = false);
/// <summary>Helper to send the list of whitelisted RIDs to connected peers.</summary>
void writeWhitelistRIDs();
void writeWhitelistRIDs(uint32_t peerId);
/// <summary>Helper to send the list of blacklisted RIDs to the specified peer.</summary>
void writeBlacklistRIDs(uint32_t peerId, bool queueOnly = false);
/// <summary>Helper to send the list of blacklisted RIDs to connected peers.</summary>
void writeBlacklistRIDs();
void writeBlacklistRIDs(uint32_t peerId);
/// <summary>Helper to send the list of active TGIDs to the specified peer.</summary>
void writeTGIDs(uint32_t peerId, bool queueOnly = false);
/// <summary>Helper to send the list of active TGIDs to connected peers.</summary>
void writeTGIDs();
void writeTGIDs(uint32_t peerId);
/// <summary>Helper to send the list of deactivated TGIDs to the specified peer.</summary>
void writeDeactiveTGIDs(uint32_t peerId, bool queueOnly = false);
/// <summary>Helper to send the list of deactivated TGIDs to connected peers.</summary>
void writeDeactiveTGIDs();
void writeDeactiveTGIDs(uint32_t peerId);
/// <summary>Helper to send a data message to the specified peer.</summary>
bool writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length,
uint16_t pktSeq, uint32_t streamId, bool queueOnly = false);
uint16_t pktSeq, uint32_t streamId, bool queueOnly = false, bool directWrite = false) const;
/// <summary>Helper to send a data message to the specified peer.</summary>
bool writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length,
uint32_t streamId, bool queueOnly = false, bool incPktSeq = false);
uint32_t streamId, bool queueOnly = false, bool incPktSeq = false, bool directWrite = false) const;
/// <summary>Helper to send a command message to the specified peer.</summary>
bool writePeerCommand(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data = nullptr, uint32_t length = 0U,
bool queueOnly = false, bool incPktSeq = false);
bool incPktSeq = false) const;
/// <summary>Helper to send a ACK response to the specified peer.</summary>
bool writePeerACK(uint32_t peerId, const uint8_t* data = nullptr, uint32_t length = 0U);

@ -19,6 +19,7 @@ using namespace network;
#include <cstdio>
#include <cassert>
#include <algorithm>
// ---------------------------------------------------------------------------
// Public Class Members
@ -44,13 +45,30 @@ using namespace network;
/// <param name="updateLookup">Flag indicating that the system will accept radio ID and talkgroup ID lookups from the network.</param>
PeerNetwork::PeerNetwork(const std::string& address, uint16_t port, uint16_t localPort, uint32_t peerId, const std::string& password,
bool duplex, bool debug, bool dmr, bool p25, bool nxdn, bool slot1, bool slot2, bool allowActivityTransfer, bool allowDiagnosticTransfer, bool updateLookup) :
Network(address, port, localPort, peerId, password, duplex, debug, dmr, p25, nxdn, slot1, slot2, allowActivityTransfer, allowDiagnosticTransfer, updateLookup)
Network(address, port, localPort, peerId, password, duplex, debug, dmr, p25, nxdn, slot1, slot2, allowActivityTransfer, allowDiagnosticTransfer, updateLookup),
m_blockTrafficToTable()
{
assert(!address.empty());
assert(port > 0U);
assert(!password.empty());
}
/// <summary>
/// Checks if the passed peer ID is blocked from sending to this peer.
/// </summary>
/// <param name="peerId"></param>
bool PeerNetwork::checkBlockedPeer(uint32_t peerId)
{
if (m_blockTrafficToTable.empty())
return false;
if (std::find(m_blockTrafficToTable.begin(), m_blockTrafficToTable.end(), peerId) != m_blockTrafficToTable.end()) {
return true;
}
return false;
}
// ---------------------------------------------------------------------------
// Protected Class Members
// ---------------------------------------------------------------------------

@ -18,6 +18,7 @@
#include <string>
#include <cstdint>
#include <vector>
namespace network
{
@ -32,7 +33,16 @@ namespace network
PeerNetwork(const std::string& address, uint16_t port, uint16_t localPort, uint32_t peerId, const std::string& password,
bool duplex, bool debug, bool dmr, bool p25, bool nxdn, bool slot1, bool slot2, bool allowActivityTransfer, bool allowDiagnosticTransfer, bool updateLookup);
/// <summary>Gets the blocked traffic peer ID table.</summary>
std::vector<uint32_t> blockTrafficTo() const { return m_blockTrafficToTable; }
/// <summary>Adds an entry to the blocked traffic peer ID table.</summary>
void addBlockedTrafficPeer(uint32_t peerId) { m_blockTrafficToTable.push_back(peerId); }
/// <summary>Checks if the passed peer ID is blocked from sending to this peer.</summary>
bool checkBlockedPeer(uint32_t peerId);
protected:
std::vector<uint32_t> m_blockTrafficToTable;
/// <summary>Writes configuration to the network.</summary>
bool writeConfig() override;
};

@ -473,6 +473,7 @@ void RESTAPI::initializeEndpoints()
m_dispatcher.match(GET_STATUS).get(REST_API_BIND(RESTAPI::restAPI_GetStatus, this));
m_dispatcher.match(FNE_GET_PEER_QUERY).get(REST_API_BIND(RESTAPI::restAPI_GetPeerQuery, this));
m_dispatcher.match(FNE_GET_PEER_COUNT).get(REST_API_BIND(RESTAPI::restAPI_GetPeerCount, this));
m_dispatcher.match(FNE_GET_RID_QUERY).get(REST_API_BIND(RESTAPI::restAPI_GetRIDQuery, this));
m_dispatcher.match(FNE_PUT_RID_ADD).put(REST_API_BIND(RESTAPI::restAPI_PutRIDAdd, this));
@ -721,6 +722,30 @@ void RESTAPI::restAPI_GetPeerQuery(const HTTPPayload& request, HTTPPayload& repl
reply.payload(response);
}
/// <summary>
///
/// </summary>
/// <param name="request"></param>
/// <param name="reply"></param>
/// <param name="match"></param>
void RESTAPI::restAPI_GetPeerCount(const HTTPPayload& request, HTTPPayload& reply, const RequestMatch& match)
{
if (!validateAuth(request, reply)) {
return;
}
json::object response = json::object();
setResponseDefaultStatus(response);
json::array peers = json::array();
if (m_network != nullptr) {
uint32_t count = m_network->m_peers.size();
response["peerCount"].set<uint32_t>(count);
}
reply.payload(response);
}
/// <summary>
///
/// </summary>
@ -799,10 +824,11 @@ void RESTAPI::restAPI_PutRIDAdd(const HTTPPayload& request, HTTPPayload& reply,
// The addEntry function will automatically update an existing entry, so no need to check for an exisitng one here
m_ridLookup->addEntry(rid, enabled, alias);
/*
if (m_network != nullptr) {
m_network->m_forceListUpdate = true;
}
*/
}
/// <summary>
@ -838,10 +864,11 @@ void RESTAPI::restAPI_PutRIDDelete(const HTTPPayload& request, HTTPPayload& repl
}
m_ridLookup->eraseEntry(rid);
/*
if (m_network != nullptr) {
m_network->m_forceListUpdate = true;
}
*/
}
/// <summary>
@ -934,10 +961,11 @@ void RESTAPI::restAPI_PutTGAdd(const HTTPPayload& request, HTTPPayload& reply, c
::LogInfoEx(LOG_REST, "Talkgroup NAME: %s SRC_TGID: %u SRC_TS: %u ACTIVE: %u PARROT: %u INCLUSIONS: %u EXCLUSIONS: %u REWRITES: %u", groupName.c_str(), tgId, tgSlot, active, parrot, incCount, excCount, rewrCount);
m_tidLookup->addEntry(groupVoice);
/*
if (m_network != nullptr) {
m_network->m_forceListUpdate = true;
}
*/
}
/// <summary>
@ -974,10 +1002,11 @@ void RESTAPI::restAPI_PutTGDelete(const HTTPPayload& request, HTTPPayload& reply
}
m_tidLookup->eraseEntry(groupVoice.source().tgId(), groupVoice.source().tgSlot());
/*
if (m_network != nullptr) {
m_network->m_forceListUpdate = true;
}
*/
}
/// <summary>
@ -1014,10 +1043,11 @@ void RESTAPI::restAPI_GetForceUpdate(const HTTPPayload& request, HTTPPayload& re
json::object response = json::object();
setResponseDefaultStatus(response);
/*
if (m_network != nullptr) {
m_network->m_forceListUpdate = true;
}
*/
reply.payload(response);
}

@ -97,6 +97,8 @@ private:
/// <summary></summary>
void restAPI_GetPeerQuery(const HTTPPayload& request, HTTPPayload& reply, const network::rest::RequestMatch& match);
/// <summary></summary>
void restAPI_GetPeerCount(const HTTPPayload& request, HTTPPayload& reply, const network::rest::RequestMatch& match);
/// <summary></summary>
void restAPI_GetRIDQuery(const HTTPPayload& request, HTTPPayload& reply, const network::rest::RequestMatch& match);

@ -21,6 +21,7 @@
// ---------------------------------------------------------------------------
#define FNE_GET_PEER_QUERY "/peer/query"
#define FNE_GET_PEER_COUNT "/peer/count"
#define FNE_GET_RID_QUERY "/rid/query"
#define FNE_PUT_RID_ADD "/rid/add"

@ -252,6 +252,11 @@ bool TagDMRData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
continue;
}
// check if the source peer is blocked from sending to this peer
if (peer.second->checkBlockedPeer(peerId)) {
continue;
}
uint8_t outboundPeerBuffer[len];
::memset(outboundPeerBuffer, 0x00U, len);
::memcpy(outboundPeerBuffer, buffer, len);

@ -222,6 +222,11 @@ bool TagNXDNData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerI
continue;
}
// check if the source peer is blocked from sending to this peer
if (peer.second->checkBlockedPeer(peerId)) {
continue;
}
uint8_t outboundPeerBuffer[len];
::memset(outboundPeerBuffer, 0x00U, len);
::memcpy(outboundPeerBuffer, buffer, len);

@ -286,6 +286,11 @@ bool TagP25Data::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
continue;
}
// check if the source peer is blocked from sending to this peer
if (peer.second->checkBlockedPeer(peerId)) {
continue;
}
uint8_t outboundPeerBuffer[len];
::memset(outboundPeerBuffer, 0x00U, len);
::memcpy(outboundPeerBuffer, buffer, len);

@ -396,6 +396,7 @@ void Network::clock(uint32_t ms)
m_ridLookup->toggleEntry(id, true);
offs += 4U;
}
LogMessage(LOG_NET, "Network Announced %u whitelisted RIDs", len);
}
}
}
@ -413,6 +414,7 @@ void Network::clock(uint32_t ms)
m_ridLookup->toggleEntry(id, false);
offs += 4U;
}
LogMessage(LOG_NET, "Network Announced %u blacklisted RIDs", len);
}
}
}

@ -751,8 +751,9 @@ bool ControlSignaling::processNetwork(uint8_t* data, uint32_t len, lc::LC& contr
m_p25->m_affiliations.releaseGrant(dstId, false);
}
}
return true; // don't allow this to write to the air
}
break;
case TSBK_OSP_DVM_GIT_HASH:
// ignore
return true; // don't allow this to write to the air

@ -41,6 +41,7 @@
#define RCD_GET_VOICE_CH "voice-ch"
#define RCD_FNE_GET_PEERLIST "fne-peerlist"
#define RCD_FNE_GET_PEERCOUNT "fne-peercount"
#define RCD_FNE_GET_TGIDLIST "fne-tgidlist"
#define RCD_FNE_GET_FORCEUPDATE "fne-force-update"
#define RCD_FNE_GET_AFFLIST "fne-affs"
@ -182,6 +183,7 @@ void usage(const char* message, const char* arg)
reply += " voice-ch Retrieves the list of configured voice channels\r\n";
reply += "\r\n";
reply += " fne-peerlist Retrieves the list of connected peers (Converged FNE only)\r\n";
reply += " fne-peercount Retrieves the count of connected peers (Converged FNE only)\r\n";
reply += " fne-tgidlist Retrieves the list of configured TGIDs (Converged FNE only)\r\n";
reply += " fne-force-update Forces the FNE to send list update (Converged FNE only)\r\n";
reply += " fne-affs Retrieves the list of currently affiliated SUs (Converged FNE only)\r\n";
@ -730,6 +732,9 @@ int main(int argc, char** argv)
else if (rcom == RCD_FNE_GET_PEERLIST) {
retCode = client->send(HTTP_GET, FNE_GET_PEER_QUERY, json::object(), response);
}
else if (rcom == RCD_FNE_GET_PEERCOUNT) {
retCode = client->send(HTTP_GET, FNE_GET_PEER_COUNT, json::object(), response);
}
else if (rcom == RCD_FNE_GET_TGIDLIST) {
retCode = client->send(HTTP_GET, FNE_GET_TGID_QUERY, json::object(), response);
}

Loading…
Cancel
Save

Powered by TurnKey Linux.