diff --git a/configs/fne-config.example.yml b/configs/fne-config.example.yml
index b2c149e4..f24f5936 100644
--- a/configs/fne-config.example.yml
+++ b/configs/fne-config.example.yml
@@ -102,6 +102,9 @@ peers:
# Network Peer ID
peerId: 9000990
+ # List of peer IDs to block sending traffic to from this peer.
+ blockTrafficTo: []
+
# Flag indicating whether or not peer endpoint networking is encrypted.
encrypted: false
# AES-256 32-byte Preshared Key
diff --git a/src/common/RingBuffer.h b/src/common/RingBuffer.h
index 5f7ba59e..2dbf54da 100644
--- a/src/common/RingBuffer.h
+++ b/src/common/RingBuffer.h
@@ -32,7 +32,7 @@ public:
/// Initializes a new instance of the RingBuffer class.
/// Length of ring buffer.
/// Name of buffer.
- RingBuffer(uint32_t length, const char* name) :
+ RingBuffer(uint32_t length, const std::string name) :
m_length(length),
m_name(name),
m_buffer(nullptr),
@@ -40,7 +40,6 @@ public:
m_oPtr(0U)
{
assert(length > 0U);
- assert(name != nullptr);
m_buffer = new T[length];
@@ -60,7 +59,7 @@ public:
bool addData(const T* buffer, uint32_t length)
{
if (length > freeSpace()) {
- LogError(LOG_HOST, "**** Overflow in %s ring buffer, %u > %u, clearing the buffer", m_name, length, freeSpace());
+ LogError(LOG_HOST, "**** Overflow in %s ring buffer, %u > %u, clearing the buffer", m_name.c_str(), length, freeSpace());
clear();
return false;
}
@@ -74,7 +73,7 @@ public:
m_iPtr = 0U;
}
#if DEBUG_RINGBUFFER
- LogDebug(LOG_HOST, "RingBuffer::addData(%s): iPtr_Before = %u, iPtr_After = %u, oPtr = %u, len = %u, len_Written = %u", m_name, iPtr_BeforeWrite, m_iPtr, m_oPtr, m_length, (m_iPtr - iPtr_BeforeWrite));
+ LogDebug(LOG_HOST, "RingBuffer::addData(%s): iPtr_Before = %u, iPtr_After = %u, oPtr = %u, len = %u, len_Written = %u", m_name.c_str(), iPtr_BeforeWrite, m_iPtr, m_oPtr, m_length, (m_iPtr - iPtr_BeforeWrite));
#endif
return true;
}
@@ -86,7 +85,7 @@ public:
bool get(T* buffer, uint32_t length)
{
if (dataSize() < length) {
- LogError(LOG_HOST, "**** Underflow get in %s ring buffer, %u < %u", m_name, dataSize(), length);
+ LogError(LOG_HOST, "**** Underflow get in %s ring buffer, %u < %u", m_name.c_str(), dataSize(), length);
return false;
}
#if DEBUG_RINGBUFFER
@@ -99,7 +98,7 @@ public:
m_oPtr = 0U;
}
#if DEBUG_RINGBUFFER
- LogDebug(LOG_HOST, "RingBuffer::getData(%s): iPtr = %u, oPtr_Before = %u, oPtr_After = %u, len = %u, len_Read = %u", m_name, m_iPtr, oPtr_BeforeRead, m_oPtr, m_length, (m_oPtr - oPtr_BeforeRead));
+ LogDebug(LOG_HOST, "RingBuffer::getData(%s): iPtr = %u, oPtr_Before = %u, oPtr_After = %u, len = %u, len_Read = %u", m_name.c_str(), m_iPtr, oPtr_BeforeRead, m_oPtr, m_length, (m_oPtr - oPtr_BeforeRead));
#endif
return true;
}
@@ -111,7 +110,7 @@ public:
bool peek(T* buffer, uint32_t length)
{
if (dataSize() < length) {
- LogError(LOG_HOST, "**** Underflow peek in %s ring buffer, %u < %u", m_name, dataSize(), length);
+ LogError(LOG_HOST, "**** Underflow peek in %s ring buffer, %u < %u", m_name.c_str(), dataSize(), length);
return false;
}
@@ -204,7 +203,7 @@ public:
private:
uint32_t m_length;
- const char* m_name;
+ const std::string m_name;
T* m_buffer;
diff --git a/src/common/lookups/AffiliationLookup.cpp b/src/common/lookups/AffiliationLookup.cpp
index 972c639d..d422b3e3 100644
--- a/src/common/lookups/AffiliationLookup.cpp
+++ b/src/common/lookups/AffiliationLookup.cpp
@@ -24,7 +24,7 @@ using namespace lookups;
///
/// Name of lookup table.
/// Flag indicating whether verbose logging is enabled.
-AffiliationLookup::AffiliationLookup(const char* name, bool verbose) :
+AffiliationLookup::AffiliationLookup(const std::string name, bool verbose) :
m_rfChTable(),
m_rfChDataTable(),
m_rfGrantChCnt(0U),
@@ -52,10 +52,7 @@ AffiliationLookup::AffiliationLookup(const char* name, bool verbose) :
///
/// Finalizes a instance of the AffiliationLookup class.
///
-AffiliationLookup::~AffiliationLookup()
-{
- /* stub */
-}
+AffiliationLookup::~AffiliationLookup() = default;
///
/// Helper to group affiliate a source ID.
@@ -71,7 +68,7 @@ void AffiliationLookup::unitReg(uint32_t srcId)
if (m_verbose) {
LogMessage(LOG_HOST, "%s, unit registration, srcId = %u",
- m_name, srcId);
+ m_name.c_str(), srcId);
}
}
@@ -89,7 +86,7 @@ bool AffiliationLookup::unitDereg(uint32_t srcId)
if (m_verbose) {
LogMessage(LOG_HOST, "%s, unit deregistration, srcId = %u",
- m_name, srcId);
+ m_name.c_str(), srcId);
}
groupUnaff(srcId);
@@ -126,7 +123,7 @@ bool AffiliationLookup::isUnitReg(uint32_t srcId) const
void AffiliationLookup::clearUnitReg()
{
std::vector srcToRel = std::vector();
- LogWarning(LOG_HOST, "%s, releasing all unit registrations", m_name);
+ LogWarning(LOG_HOST, "%s, releasing all unit registrations", m_name.c_str());
m_unitRegTable.clear();
}
@@ -143,7 +140,7 @@ void AffiliationLookup::groupAff(uint32_t srcId, uint32_t dstId)
if (m_verbose) {
LogMessage(LOG_HOST, "%s, group affiliation, srcId = %u, dstId = %u",
- m_name, srcId, dstId);
+ m_name.c_str(), srcId, dstId);
}
}
}
@@ -160,7 +157,7 @@ bool AffiliationLookup::groupUnaff(uint32_t srcId)
uint32_t tblDstId = m_grpAffTable.at(srcId);
if (m_verbose) {
LogMessage(LOG_HOST, "%s, group unaffiliation, srcId = %u, dstId = %u",
- m_name, srcId, tblDstId);
+ m_name.c_str(), srcId, tblDstId);
}
} else {
return false;
@@ -230,14 +227,14 @@ std::vector AffiliationLookup::clearGroupAff(uint32_t dstId, bool rele
}
if (dstId == 0U && releaseAll) {
- LogWarning(LOG_HOST, "%s, releasing all group affiliations", m_name);
+ LogWarning(LOG_HOST, "%s, releasing all group affiliations", m_name.c_str());
for (auto entry : m_grpAffTable) {
uint32_t srcId = entry.first;
srcToRel.push_back(srcId);
}
}
else {
- LogWarning(LOG_HOST, "%s, releasing group affiliations, dstId = %u", m_name, dstId);
+ LogWarning(LOG_HOST, "%s, releasing group affiliations, dstId = %u", m_name.c_str(), dstId);
for (auto entry : m_grpAffTable) {
uint32_t srcId = entry.first;
uint32_t grpId = entry.second;
@@ -289,7 +286,7 @@ bool AffiliationLookup::grantCh(uint32_t dstId, uint32_t srcId, uint32_t grantTi
if (m_verbose) {
LogMessage(LOG_HOST, "%s, granting channel, chNo = %u, dstId = %u, srcId = %u, group = %u",
- m_name, chNo, dstId, srcId, grp);
+ m_name.c_str(), chNo, dstId, srcId, grp);
}
return true;
@@ -324,7 +321,7 @@ bool AffiliationLookup::releaseGrant(uint32_t dstId, bool releaseAll)
// are we trying to release all grants?
if (dstId == 0U && releaseAll) {
- LogWarning(LOG_HOST, "%s, force releasing all channel grants", m_name);
+ LogWarning(LOG_HOST, "%s, force releasing all channel grants", m_name.c_str());
std::vector gntsToRel = std::vector();
for (auto entry : m_grantChTable) {
@@ -345,7 +342,7 @@ bool AffiliationLookup::releaseGrant(uint32_t dstId, bool releaseAll)
if (m_verbose) {
LogMessage(LOG_HOST, "%s, releasing channel grant, chNo = %u, dstId = %u",
- m_name, chNo, dstId);
+ m_name.c_str(), chNo, dstId);
}
if (m_releaseGrant != nullptr) {
diff --git a/src/common/lookups/AffiliationLookup.h b/src/common/lookups/AffiliationLookup.h
index e37f0c3f..f52da52f 100644
--- a/src/common/lookups/AffiliationLookup.h
+++ b/src/common/lookups/AffiliationLookup.h
@@ -100,7 +100,7 @@ namespace lookups
class HOST_SW_API AffiliationLookup {
public:
/// Initializes a new instance of the AffiliationLookup class.
- AffiliationLookup(const char* name, bool verbose);
+ AffiliationLookup(const std::string name, bool verbose);
/// Finalizes a instance of the AffiliationLookup class.
virtual ~AffiliationLookup();
@@ -196,7 +196,7 @@ namespace lookups
// chNo dstId slot
std::function m_releaseGrant;
- const char *m_name;
+ const std::string m_name;
bool m_verbose;
};
diff --git a/src/common/network/FrameQueue.cpp b/src/common/network/FrameQueue.cpp
index 9fc78782..d68e46e7 100644
--- a/src/common/network/FrameQueue.cpp
+++ b/src/common/network/FrameQueue.cpp
@@ -131,7 +131,39 @@ UInt8Array FrameQueue::read(int& messageLength, sockaddr_storage& address, uint3
}
///
-/// Cache "message" to frame queue.
+/// Write message to the UDP socket.
+///
+/// Message buffer to frame and queue.
+/// Length of message.
+/// Message stream ID.
+/// Peer ID.
+/// RTP SSRC ID.
+/// Opcode.
+/// RTP Sequence.
+/// IP address to write data to.
+///
+///
+bool FrameQueue::write(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId,
+ uint32_t ssrc, OpcodePair opcode, uint16_t rtpSeq, sockaddr_storage& addr, uint32_t addrLen)
+{
+ assert(message != nullptr);
+ assert(length > 0U);
+
+ uint32_t bufferLen = 0U;
+ uint8_t* buffer = generateMessage(message, length, streamId, peerId, ssrc, opcode, rtpSeq, &bufferLen);
+
+ bool ret = true;
+ if (!m_socket->write(buffer, bufferLen, addr, addrLen)) {
+ LogError(LOG_NET, "Failed writing data to the network");
+ ret = false;
+ }
+
+ delete buffer;
+ return ret;
+}
+
+///
+/// Cache message to frame queue.
///
/// Message buffer to frame and queue.
/// Length of message.
@@ -149,7 +181,7 @@ void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_
}
///
-/// Cache "message" to frame queue.
+/// Cache message to frame queue.
///
/// Message buffer to frame and queue.
/// Length of message.
@@ -167,6 +199,48 @@ void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_
assert(message != nullptr);
assert(length > 0U);
+ uint32_t bufferLen = 0U;
+ uint8_t* buffer = generateMessage(message, length, streamId, peerId, ssrc, opcode, rtpSeq, &bufferLen);
+
+ udp::UDPDatagram *dgram = new udp::UDPDatagram;
+ dgram->buffer = buffer;
+ dgram->length = bufferLen;
+ dgram->address = addr;
+ dgram->addrLen = addrLen;
+
+ m_buffers.push_back(dgram);
+}
+
+///
+/// Helper method to clear any tracked stream timestamps.
+///
+void FrameQueue::clearTimestamps()
+{
+ m_streamTimestamps.clear();
+}
+
+// ---------------------------------------------------------------------------
+// Private Class Members
+// ---------------------------------------------------------------------------
+
+///
+/// Generate RTP message for the frame queue.
+///
+/// Message buffer to frame and queue.
+/// Length of message.
+/// Message stream ID.
+/// Peer ID.
+/// RTP SSRC ID.
+/// Opcode.
+/// RTP Sequence.
+///
+///
+uint8_t* FrameQueue::generateMessage(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId,
+ uint32_t ssrc, OpcodePair opcode, uint16_t rtpSeq, uint32_t* outBufferLen)
+{
+ assert(message != nullptr);
+ assert(length > 0U);
+
uint32_t timestamp = INVALID_TS;
if (streamId != 0U) {
auto entry = m_streamTimestamps.find(streamId);
@@ -177,7 +251,7 @@ void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_
if (timestamp != INVALID_TS) {
timestamp += (RTP_GENERIC_CLOCK_RATE / 133);
if (m_debug)
- LogDebug(LOG_NET, "FrameQueue::enqueueMessage() RTP streamId = %u, previous TS = %u, TS = %u, rtpSeq = %u", streamId, m_streamTimestamps[streamId], timestamp, rtpSeq);
+ LogDebug(LOG_NET, "FrameQueue::generateMessage() RTP streamId = %u, previous TS = %u, TS = %u, rtpSeq = %u", streamId, m_streamTimestamps[streamId], timestamp, rtpSeq);
m_streamTimestamps[streamId] = timestamp;
}
}
@@ -198,7 +272,7 @@ void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_
if (streamId != 0U && timestamp == INVALID_TS && rtpSeq != RTP_END_OF_CALL_SEQ) {
if (m_debug)
- LogDebug(LOG_NET, "FrameQueue::enqueueMessage() RTP streamId = %u, initial TS = %u, rtpSeq = %u", streamId, header.getTimestamp(), rtpSeq);
+ LogDebug(LOG_NET, "FrameQueue::generateMessage() RTP streamId = %u, initial TS = %u, rtpSeq = %u", streamId, header.getTimestamp(), rtpSeq);
m_streamTimestamps[streamId] = header.getTimestamp();
}
@@ -206,7 +280,7 @@ void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_
auto entry = m_streamTimestamps.find(streamId);
if (entry != m_streamTimestamps.end()) {
if (m_debug)
- LogDebug(LOG_NET, "FrameQueue::enqueueMessage() RTP streamId = %u, rtpSeq = %u", streamId, rtpSeq);
+ LogDebug(LOG_NET, "FrameQueue::generateMessage() RTP streamId = %u, rtpSeq = %u", streamId, rtpSeq);
m_streamTimestamps.erase(streamId);
}
}
@@ -225,21 +299,11 @@ void FrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, uint32_
::memcpy(buffer + RTP_HEADER_LENGTH_BYTES + RTP_EXTENSION_HEADER_LENGTH_BYTES + RTP_FNE_HEADER_LENGTH_BYTES, message, length);
if (m_debug)
- Utils::dump(1U, "FrameQueue::enqueueMessage() Buffered Message", buffer, bufferLen);
-
- udp::UDPDatagram *dgram = new udp::UDPDatagram;
- dgram->buffer = buffer;
- dgram->length = bufferLen;
- dgram->address = addr;
- dgram->addrLen = addrLen;
+ Utils::dump(1U, "FrameQueue::generateMessage() Buffered Message", buffer, bufferLen);
- m_buffers.push_back(dgram);
-}
+ if (outBufferLen != nullptr) {
+ *outBufferLen = bufferLen;
+ }
-///
-/// Helper method to clear any tracked stream timestamps.
-///
-void FrameQueue::clearTimestamps()
-{
- m_streamTimestamps.clear();
+ return buffer;
}
diff --git a/src/common/network/FrameQueue.h b/src/common/network/FrameQueue.h
index ea7fa34c..074a5e58 100644
--- a/src/common/network/FrameQueue.h
+++ b/src/common/network/FrameQueue.h
@@ -46,11 +46,14 @@ namespace network
/// Read message from the received UDP packet.
UInt8Array read(int& messageLength, sockaddr_storage& address, uint32_t& addrLen,
frame::RTPHeader* rtpHeader = nullptr, frame::RTPFNEHeader* fneHeader = nullptr);
+ /// Write message to the UDP socket.
+ bool write(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId,
+ uint32_t ssrc, OpcodePair opcode, uint16_t rtpSeq, sockaddr_storage& addr, uint32_t addrLen);
- /// Cache "message" to frame queue.
+ /// Cache message to frame queue.
void enqueueMessage(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId,
OpcodePair opcode, uint16_t rtpSeq, sockaddr_storage& addr, uint32_t addrLen);
- /// Cache "message" to frame queue.
+ /// Cache message to frame queue.
void enqueueMessage(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId,
uint32_t ssrc, OpcodePair opcode, uint16_t rtpSeq, sockaddr_storage& addr, uint32_t addrLen);
@@ -60,6 +63,10 @@ namespace network
private:
uint32_t m_peerId;
std::unordered_map m_streamTimestamps;
+
+ /// Generate RTP message for the frame queue.
+ uint8_t* generateMessage(const uint8_t* message, uint32_t length, uint32_t streamId, uint32_t peerId,
+ uint32_t ssrc, OpcodePair opcode, uint16_t rtpSeq, uint32_t* outBufferLen);
};
} // namespace network
diff --git a/src/common/network/RawFrameQueue.cpp b/src/common/network/RawFrameQueue.cpp
index 5757d25b..b31b2889 100644
--- a/src/common/network/RawFrameQueue.cpp
+++ b/src/common/network/RawFrameQueue.cpp
@@ -82,7 +82,36 @@ UInt8Array RawFrameQueue::read(int& messageLength, sockaddr_storage& address, ui
}
///
-/// Cache "message" to frame queue.
+/// Write message to the UDP socket.
+///
+/// Message buffer to frame and queue.
+/// Length of message.
+/// IP address to write data to.
+///
+///
+bool RawFrameQueue::write(const uint8_t* message, uint32_t length, sockaddr_storage& addr, uint32_t addrLen)
+{
+ assert(message != nullptr);
+ assert(length > 0U);
+
+ uint8_t* buffer = new uint8_t[length];
+ ::memset(buffer, 0x00U, length);
+ ::memcpy(buffer, message, length);
+
+ if (m_debug)
+ Utils::dump(1U, "RawFrameQueue::write() Message", buffer, length);
+
+ bool ret = true;
+ if (!m_socket->write(buffer, length, addr, addrLen)) {
+ LogError(LOG_NET, "Failed writing data to the network");
+ ret = false;
+ }
+
+ return ret;
+}
+
+///
+/// Cache message to frame queue.
///
/// Message buffer to frame and queue.
/// Length of message.
diff --git a/src/common/network/RawFrameQueue.h b/src/common/network/RawFrameQueue.h
index 093d74eb..ae2ab1d6 100644
--- a/src/common/network/RawFrameQueue.h
+++ b/src/common/network/RawFrameQueue.h
@@ -43,8 +43,10 @@ namespace network
/// Read message from the received UDP packet.
UInt8Array read(int& messageLength, sockaddr_storage& address, uint32_t& addrLen);
+ /// Write message to the UDP socket.
+ bool write(const uint8_t* message, uint32_t length, sockaddr_storage& addr, uint32_t addrLen);
- /// Cache "message" to frame queue.
+ /// Cache message to frame queue.
void enqueueMessage(const uint8_t* message, uint32_t length, sockaddr_storage& addr, uint32_t addrLen);
/// Flush the message queue.
diff --git a/src/common/network/udp/Socket.cpp b/src/common/network/udp/Socket.cpp
index 1c802ffd..3f312c8b 100644
--- a/src/common/network/udp/Socket.cpp
+++ b/src/common/network/udp/Socket.cpp
@@ -232,10 +232,23 @@ ssize_t Socket::read(uint8_t* buffer, uint32_t length, sockaddr_storage& address
uint16_t magic = __GET_UINT16B(buffer, 0U);
if (magic == AES_WRAPPED_PCKT_MAGIC) {
uint32_t cryptedLen = (len - 2U) * sizeof(uint8_t);
- // Utils::dump(1U, "Socket::read() crypted", buffer + 2U, cryptedLen);
+ uint8_t* cryptoBuffer = buffer + 2U;
+
+ // do we need to pad the original buffer to be block aligned?
+ if (cryptedLen % crypto::AES::BLOCK_BYTES_LEN != 0) {
+ uint32_t alignment = crypto::AES::BLOCK_BYTES_LEN - (cryptedLen % crypto::AES::BLOCK_BYTES_LEN);
+ cryptedLen += alignment;
+
+ // reallocate buffer and copy
+ cryptoBuffer = new uint8_t[cryptedLen];
+ ::memset(cryptoBuffer, 0x00U, cryptedLen);
+ ::memcpy(cryptoBuffer, buffer + 2U, len - 2U);
+ }
+
+ // Utils::dump(1U, "Socket::read() crypted", cryptoBuffer, cryptedLen);
// decrypt
- uint8_t* decrypted = m_aes->decryptECB(buffer + 2U, cryptedLen, m_presharedKey);
+ uint8_t* decrypted = m_aes->decryptECB(cryptoBuffer, cryptedLen, m_presharedKey);
// Utils::dump(1U, "Socket::read() decrypted", decrypted, cryptedLen);
@@ -320,6 +333,7 @@ bool Socket::write(const uint8_t* buffer, uint32_t length, const sockaddr_storag
::memcpy(out.get() + 2U, crypted, cryptedLen);
__SET_UINT16B(AES_WRAPPED_PCKT_MAGIC, out.get(), 0U);
delete[] crypted;
+ length = cryptedLen + 2U;
} else {
if (lenWritten != nullptr) {
*lenWritten = -1;
diff --git a/src/fne/HostFNE.cpp b/src/fne/HostFNE.cpp
index 54291bce..40b171b2 100644
--- a/src/fne/HostFNE.cpp
+++ b/src/fne/HostFNE.cpp
@@ -542,6 +542,19 @@ bool HostFNE::createPeerNetworks()
network->setPresharedKey(presharedKey);
}
+ /*
+ ** Block Traffic To Peers
+ */
+ yaml::Node& blockTrafficTo = peerConf["blockTrafficTo"];
+ if (blockTrafficTo.size() > 0U) {
+ for (size_t i = 0; i < blockTrafficTo.size(); i++) {
+ uint32_t peerId = (uint32_t)::strtoul(blockTrafficTo[i].as("0").c_str(), NULL, 10);
+ if (peerId != 0U) {
+ network->addBlockedTrafficPeer(peerId);
+ }
+ }
+ }
+
network->enable(enabled);
if (enabled) {
bool ret = network->open();
diff --git a/src/fne/network/FNENetwork.cpp b/src/fne/network/FNENetwork.cpp
index aea03db3..1d617444 100644
--- a/src/fne/network/FNENetwork.cpp
+++ b/src/fne/network/FNENetwork.cpp
@@ -14,6 +14,7 @@
#include "common/edac/SHA256.h"
#include "common/network/json/json.h"
#include "common/Log.h"
+#include "common/ThreadFunc.h"
#include "common/Utils.h"
#include "network/FNENetwork.h"
#include "network/fne/TagDMRData.h"
@@ -28,6 +29,13 @@ using namespace network::fne;
#include
#include
+// ---------------------------------------------------------------------------
+// Constants
+// ---------------------------------------------------------------------------
+
+const uint8_t MAX_PEER_LIST_BEFORE_FLUSH = 10U;
+const uint32_t MAX_RID_LIST_CHUNK = 50U;
+
// ---------------------------------------------------------------------------
// Public Class Members
// ---------------------------------------------------------------------------
@@ -74,8 +82,7 @@ FNENetwork::FNENetwork(HostFNE* host, const std::string& address, uint16_t port,
m_peers(),
m_peerAffiliations(),
m_maintainenceTimer(1000U, pingTime),
- m_updateLookupTimer(1000U, (updateLookupTime * 60U)),
- m_forceListUpdate(false),
+ m_updateLookupTime(updateLookupTime * 60U),
m_callInProgress(false),
m_disallowP25AdjStsBcast(true),
m_reportPeerPing(reportPeerPing),
@@ -184,20 +191,6 @@ void FNENetwork::clock(uint32_t ms)
m_maintainenceTimer.start();
}
- m_updateLookupTimer.clock(ms);
- if ((m_updateLookupTimer.isRunning() && m_updateLookupTimer.hasExpired()) || m_forceListUpdate) {
- writeWhitelistRIDs();
- writeBlacklistRIDs();
- m_frameQueue->flushQueue();
-
- writeTGIDs();
- writeDeactiveTGIDs();
- m_frameQueue->flushQueue();
-
- m_updateLookupTimer.start();
- m_forceListUpdate = false;
- }
-
sockaddr_storage address;
uint32_t addrLen;
frame::RTPHeader rtpHeader;
@@ -456,20 +449,12 @@ void FNENetwork::clock(uint32_t ms)
connection->connected(true);
connection->pingsReceived(0U);
connection->lastPing(now);
+ connection->lastACLUpdate(now);
m_peers[peerId] = connection;
writePeerACK(peerId);
LogInfoEx(LOG_NET, "PEER %u RPTC ACK, completed the configuration exchange", peerId);
- // queue final update messages and flush
- writeWhitelistRIDs(peerId, true);
- writeBlacklistRIDs(peerId, true);
- m_frameQueue->flushQueue();
-
- writeTGIDs(peerId, true);
- writeDeactiveTGIDs(peerId, true);
- m_frameQueue->flushQueue();
-
json::object peerConfig = connection->config();
if (peerConfig["software"].is()) {
std::string software = peerConfig["software"].get();
@@ -477,9 +462,12 @@ void FNENetwork::clock(uint32_t ms)
}
// setup the affiliations list for this peer
- char *peerName = new char[16];
- ::sprintf(peerName, "PEER %u", peerId);
- m_peerAffiliations[peerId] = new lookups::AffiliationLookup(peerName, m_verbose);
+ std::stringstream peerName;
+ peerName << "PEER " << peerId;
+ m_peerAffiliations[peerId] = new lookups::AffiliationLookup(peerName.str(), m_verbose);
+
+ // spin up a thread and send ACL list over to peer
+ peerACLUpdate(peerId);
}
}
}
@@ -526,17 +514,26 @@ void FNENetwork::clock(uint32_t ms)
// validate peer (simple validation really)
if (connection->connected() && connection->address() == ip) {
uint32_t pingsRx = connection->pingsReceived();
+ uint64_t lastPing = connection->lastPing();
pingsRx++;
connection->pingsReceived(pingsRx);
connection->lastPing(now);
connection->pktLastSeq(connection->pktLastSeq() + 1);
+ // does this peer need an ACL update?
+ uint64_t dt = connection->lastACLUpdate() + m_updateLookupTime;
+ if (dt < now) {
+ LogInfoEx(LOG_NET, "PEER %u updating ACL list, dt = %u, now = %u", peerId, dt, now);
+ peerACLUpdate(peerId);
+ connection->lastACLUpdate(now);
+ }
+
m_peers[peerId] = connection;
writePeerCommand(peerId, { NET_FUNC_PONG, NET_SUBFUNC_NOP });
if (m_reportPeerPing) {
- LogInfoEx(LOG_NET, "PEER %u ping received and answered, pingsReceived = %u", peerId, connection->pingsReceived());
+ LogInfoEx(LOG_NET, "PEER %u ping, pingsReceived = %u, lastPing = %u", peerId, connection->pingsReceived(), lastPing);
}
}
else {
@@ -789,7 +786,6 @@ void FNENetwork::close()
m_socket->close();
m_maintainenceTimer.stop();
- m_updateLookupTimer.stop();
m_status = NET_STAT_INVALID;
}
@@ -857,12 +853,51 @@ void FNENetwork::setupRepeaterLogin(uint32_t peerId, FNEPeerConnection* connecti
LogInfoEx(LOG_NET, "PEER %u RPTL ACK, challenge response sent for login", peerId);
}
+///
+/// Helper to send the ACL lists to the specified peer in a separate thread.
+///
+///
+void FNENetwork::peerACLUpdate(uint32_t peerId)
+{
+ ACLUpdateRequest* req = new ACLUpdateRequest();
+ req->network = this;
+ req->peerId = peerId;
+
+ std::stringstream peerName;
+ peerName << "peer" << peerId << ":acl-update";
+
+ ::pthread_create(&req->thread, NULL, threadedACLUpdate, req);
+ if (pthread_kill(req->thread, 0) == 0) {
+ ::pthread_setname_np(req->thread, peerName.str().c_str());
+ }
+}
+
+///
+/// Helper to send the ACL lists to the specified peer in a separate thread.
+///
+///
+void* FNENetwork::threadedACLUpdate(void* arg)
+{
+ ACLUpdateRequest* req = (ACLUpdateRequest*)arg;
+ if (req != nullptr) {
+ LogInfoEx(LOG_NET, "PEER %u sending ACL list updates", req->peerId);
+
+ req->network->writeWhitelistRIDs(req->peerId);
+ req->network->writeBlacklistRIDs(req->peerId);
+ req->network->writeTGIDs(req->peerId);
+ req->network->writeDeactiveTGIDs(req->peerId);
+
+ delete req;
+ }
+
+ return nullptr;
+}
+
///
/// Helper to send the list of whitelisted RIDs to the specified peer.
///
///
-///
-void FNENetwork::writeWhitelistRIDs(uint32_t peerId, bool queueOnly)
+void FNENetwork::writeWhitelistRIDs(uint32_t peerId)
{
// send radio ID white/black lists
std::vector ridWhitelist;
@@ -879,37 +914,46 @@ void FNENetwork::writeWhitelistRIDs(uint32_t peerId, bool queueOnly)
return;
}
- // build dataset
- uint8_t payload[4U + (ridWhitelist.size() * 4U)];
- ::memset(payload, 0x00U, 4U + (ridWhitelist.size() * 4U));
+ // send a chunk of RIDs to the peer
+ FNEPeerConnection* connection = m_peers[peerId];
+ if (connection != nullptr) {
+ uint32_t chunkCnt = (ridWhitelist.size() / MAX_RID_LIST_CHUNK) + 1U;
+ for (uint32_t i = 0U; i < chunkCnt; i++) {
+ size_t listSize = ridWhitelist.size();
+ if (chunkCnt > 1U) {
+ listSize = MAX_RID_LIST_CHUNK;
+ }
- __SET_UINT32(ridWhitelist.size(), payload, 0U);
+ if (i == chunkCnt - 1U) {
+ listSize = (chunkCnt * MAX_RID_LIST_CHUNK) - ridWhitelist.size();
+ }
- // write whitelisted IDs to whitelist payload
- uint32_t offs = 4U;
- for (uint32_t id : ridWhitelist) {
- if (m_debug)
- LogDebug(LOG_NET, "PEER %u whitelisting RID %u", peerId, id);
+ if (listSize > ridWhitelist.size()) {
+ listSize = ridWhitelist.size();
+ }
- __SET_UINT32(id, payload, offs);
- offs += 4U;
- }
+ // build dataset
+ uint16_t bufSize = 4U + (listSize * 4U);
+ uint8_t payload[bufSize];
+ ::memset(payload, 0x00U, bufSize);
- writePeerCommand(peerId, { NET_FUNC_MASTER, NET_MASTER_SUBFUNC_WL_RID },
- payload, 4U + (ridWhitelist.size() * 4U), queueOnly, true);
-}
+ __SET_UINT32(listSize, payload, 0U);
-///
-/// Helper to send the list of whitelisted RIDs to connected peers.
-///
-void FNENetwork::writeWhitelistRIDs()
-{
- if (m_ridLookup->table().size() == 0U) {
- return;
- }
+ // write whitelisted IDs to whitelist payload
+ uint32_t offs = 4U;
+ for (uint32_t j = 0; j < listSize; j++) {
+ uint32_t id = ridWhitelist.at(j + (i * MAX_RID_LIST_CHUNK));
+
+ if (m_debug)
+ LogDebug(LOG_NET, "PEER %u whitelisting RID %u (%d / %d)", peerId, id, i, j);
+
+ __SET_UINT32(id, payload, offs);
+ offs += 4U;
+ }
- for (auto peer : m_peers) {
- writeWhitelistRIDs(peer.first, true);
+ writePeerCommand(peerId, { NET_FUNC_MASTER, NET_MASTER_SUBFUNC_WL_RID },
+ payload, bufSize, true);
+ }
}
}
@@ -917,8 +961,7 @@ void FNENetwork::writeWhitelistRIDs()
/// Helper to send the list of whitelisted RIDs to the specified peer.
///
///
-///
-void FNENetwork::writeBlacklistRIDs(uint32_t peerId, bool queueOnly)
+void FNENetwork::writeBlacklistRIDs(uint32_t peerId)
{
// send radio ID blacklist
std::vector ridBlacklist;
@@ -935,37 +978,46 @@ void FNENetwork::writeBlacklistRIDs(uint32_t peerId, bool queueOnly)
return;
}
- // build dataset
- uint8_t payload[4U + (ridBlacklist.size() * 4U)];
- ::memset(payload, 0x00U, 4U + (ridBlacklist.size() * 4U));
+ // send a chunk of RIDs to the peer
+ FNEPeerConnection* connection = m_peers[peerId];
+ if (connection != nullptr) {
+ uint32_t chunkCnt = (ridBlacklist.size() / MAX_RID_LIST_CHUNK) + 1U;
+ for (uint32_t i = 0U; i < chunkCnt; i++) {
+ size_t listSize = ridBlacklist.size();
+ if (chunkCnt > 1U) {
+ listSize = MAX_RID_LIST_CHUNK;
+ }
- __SET_UINT32(ridBlacklist.size(), payload, 0U);
+ if (i == chunkCnt - 1U) {
+ listSize = (chunkCnt * MAX_RID_LIST_CHUNK) - ridBlacklist.size();
+ }
- // write blacklisted IDs to blacklist payload
- uint32_t offs = 4U;
- for (uint32_t id : ridBlacklist) {
- if (m_debug)
- LogDebug(LOG_NET, "PEER %u blacklisting RID %u", peerId, id);
+ if (listSize > ridBlacklist.size()) {
+ listSize = ridBlacklist.size();
+ }
- __SET_UINT32(id, payload, offs);
- offs += 4U;
- }
+ // build dataset
+ uint16_t bufSize = 4U + (listSize * 4U);
+ uint8_t payload[bufSize];
+ ::memset(payload, 0x00U, bufSize);
- writePeerCommand(peerId, { NET_FUNC_MASTER, NET_MASTER_SUBFUNC_BL_RID },
- payload, 4U + (ridBlacklist.size() * 4U), queueOnly, true);
-}
+ __SET_UINT32(listSize, payload, 0U);
-///
-/// Helper to send the list of whitelisted RIDs to connected peers.
-///
-void FNENetwork::writeBlacklistRIDs()
-{
- if (m_ridLookup->table().size() == 0U) {
- return;
- }
+ // write whitelisted IDs to whitelist payload
+ uint32_t offs = 4U;
+ for (uint32_t j = 0; j < listSize; j++) {
+ uint32_t id = ridBlacklist.at(j + (i * MAX_RID_LIST_CHUNK));
- for (auto peer : m_peers) {
- writeBlacklistRIDs(peer.first, true);
+ if (m_debug)
+ LogDebug(LOG_NET, "PEER %u blacklisting RID %u (%d / %d)", peerId, id, i, j);
+
+ __SET_UINT32(id, payload, offs);
+ offs += 4U;
+ }
+
+ writePeerCommand(peerId, { NET_FUNC_MASTER, NET_MASTER_SUBFUNC_BL_RID },
+ payload, bufSize, true);
+ }
}
}
@@ -973,8 +1025,7 @@ void FNENetwork::writeBlacklistRIDs()
/// Helper to send the list of active TGIDs to the specified peer.
///
///
-///
-void FNENetwork::writeTGIDs(uint32_t peerId, bool queueOnly)
+void FNENetwork::writeTGIDs(uint32_t peerId)
{
if (!m_tidLookup->sendTalkgroups()) {
return;
@@ -1026,25 +1077,14 @@ void FNENetwork::writeTGIDs(uint32_t peerId, bool queueOnly)
}
writePeerCommand(peerId, { NET_FUNC_MASTER, NET_MASTER_SUBFUNC_ACTIVE_TGS },
- payload, 4U + (tgidList.size() * 5U), queueOnly, true);
-}
-
-///
-/// Helper to send the list of active TGIDs to connected peers.
-///
-void FNENetwork::writeTGIDs()
-{
- for (auto peer : m_peers) {
- writeTGIDs(peer.first, true);
- }
+ payload, 4U + (tgidList.size() * 5U), true);
}
///
/// Helper to send the list of deactivated TGIDs to the specified peer.
///
///
-///
-void FNENetwork::writeDeactiveTGIDs(uint32_t peerId, bool queueOnly)
+void FNENetwork::writeDeactiveTGIDs(uint32_t peerId)
{
if (!m_tidLookup->sendTalkgroups()) {
return;
@@ -1096,17 +1136,7 @@ void FNENetwork::writeDeactiveTGIDs(uint32_t peerId, bool queueOnly)
}
writePeerCommand(peerId, { NET_FUNC_MASTER, NET_MASTER_SUBFUNC_DEACTIVE_TGS },
- payload, 4U + (tgidList.size() * 5U), queueOnly, true);
-}
-
-///
-/// Helper to send the list of deactivated TGIDs to connected peers.
-///
-void FNENetwork::writeDeactiveTGIDs()
-{
- for (auto peer : m_peers) {
- writeDeactiveTGIDs(peer.first, true);
- }
+ payload, 4U + (tgidList.size() * 5U), true);
}
///
@@ -1119,11 +1149,12 @@ void FNENetwork::writeDeactiveTGIDs()
///
///
///
-bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length, uint16_t pktSeq, uint32_t streamId, bool queueOnly)
+bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data,
+ uint32_t length, uint16_t pktSeq, uint32_t streamId, bool queueOnly, bool directWrite) const
{
auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; });
if (it != m_peers.end()) {
- FNEPeerConnection* connection = m_peers[peerId];
+ FNEPeerConnection* connection = m_peers.at(peerId);
if (connection != nullptr) {
uint32_t peerStreamId = connection->currStreamId();
if (streamId == 0U) {
@@ -1132,10 +1163,14 @@ bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const
sockaddr_storage addr = connection->socketStorage();
uint32_t addrLen = connection->sockStorageLen();
- m_frameQueue->enqueueMessage(data, length, streamId, peerId, m_peerId, opcode, pktSeq, addr, addrLen);
- if (queueOnly)
- return true;
- return m_frameQueue->flushQueue();
+ if (directWrite)
+ return m_frameQueue->write(data, length, streamId, peerId, m_peerId, opcode, pktSeq, addr, addrLen);
+ else {
+ m_frameQueue->enqueueMessage(data, length, streamId, peerId, m_peerId, opcode, pktSeq, addr, addrLen);
+ if (queueOnly)
+ return true;
+ return m_frameQueue->flushQueue();
+ }
}
}
@@ -1152,18 +1187,20 @@ bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const
///
///
///
-bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length, uint32_t streamId, bool queueOnly, bool incPktSeq)
+///
+bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data,
+ uint32_t length, uint32_t streamId, bool queueOnly, bool incPktSeq, bool directWrite) const
{
auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; });
if (it != m_peers.end()) {
- FNEPeerConnection* connection = m_peers[peerId];
+ FNEPeerConnection* connection = m_peers.at(peerId);
if (connection != nullptr) {
if (incPktSeq) {
connection->pktLastSeq(connection->pktLastSeq() + 1);
}
uint16_t pktSeq = connection->pktLastSeq();
- return writePeer(peerId, opcode, data, length, pktSeq, streamId, queueOnly);
+ return writePeer(peerId, opcode, data, length, pktSeq, streamId, queueOnly, directWrite);
}
}
@@ -1177,10 +1214,9 @@ bool FNENetwork::writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const
/// Opcode.
/// Buffer to write to the network.
/// Length of buffer to write.
-///
///
bool FNENetwork::writePeerCommand(uint32_t peerId, FrameQueue::OpcodePair opcode,
- const uint8_t* data, uint32_t length, bool queueOnly, bool incPktSeq)
+ const uint8_t* data, uint32_t length, bool incPktSeq) const
{
assert(peerId > 0);
@@ -1192,7 +1228,7 @@ bool FNENetwork::writePeerCommand(uint32_t peerId, FrameQueue::OpcodePair opcode
}
uint32_t len = length + 6U;
- return writePeer(peerId, opcode, buffer, len, 0U, queueOnly, incPktSeq);
+ return writePeer(peerId, opcode, buffer, len, 0U, false, incPktSeq, true);
}
///
diff --git a/src/fne/network/FNENetwork.h b/src/fne/network/FNENetwork.h
index 9280dd28..c66d536d 100644
--- a/src/fne/network/FNENetwork.h
+++ b/src/fne/network/FNENetwork.h
@@ -37,6 +37,12 @@ namespace network { namespace fne { class HOST_SW_API TagNXDNData; } }
namespace network
{
+ // ---------------------------------------------------------------------------
+ // Class Prototypes
+ // ---------------------------------------------------------------------------
+
+ class HOST_SW_API FNENetwork;
+
// ---------------------------------------------------------------------------
// Class Declaration
// Represents an peer connection to the FNE.
@@ -61,6 +67,7 @@ namespace network
m_connectionState(NET_STAT_INVALID),
m_pingsReceived(0U),
m_lastPing(0U),
+ m_lastACLUpdate(0U),
m_config(),
m_pktLastSeq(0U),
m_pktNextSeq(1U)
@@ -83,6 +90,7 @@ namespace network
m_connectionState(NET_STAT_INVALID),
m_pingsReceived(0U),
m_lastPing(0U),
+ m_lastACLUpdate(0U),
m_config(),
m_pktLastSeq(0U),
m_pktNextSeq(1U)
@@ -123,6 +131,9 @@ namespace network
/// Last ping received.
__PROPERTY_PLAIN(uint64_t, lastPing);
+ /// Last ACL update sent.
+ __PROPERTY_PLAIN(uint64_t, lastACLUpdate);
+
/// JSON objecting containing peer configuration information.
__PROPERTY_PLAIN(json::object, config);
@@ -132,6 +143,18 @@ namespace network
__PROPERTY_PLAIN(uint16_t, pktNextSeq);
};
+ // ---------------------------------------------------------------------------
+ // Structure Declaration
+ //
+ // ---------------------------------------------------------------------------
+
+ struct ACLUpdateRequest {
+ FNENetwork* network;
+ uint32_t peerId;
+
+ pthread_t thread;
+ };
+
// ---------------------------------------------------------------------------
// Class Declaration
// Implements the core FNE networking logic.
@@ -207,9 +230,9 @@ namespace network
std::unordered_map m_peerAffiliations;
Timer m_maintainenceTimer;
- Timer m_updateLookupTimer;
- bool m_forceListUpdate;
+ uint32_t m_updateLookupTime;
+
bool m_callInProgress;
bool m_disallowP25AdjStsBcast;
@@ -225,34 +248,30 @@ namespace network
/// Helper to complete setting up a repeater login request.
void setupRepeaterLogin(uint32_t peerId, FNEPeerConnection* connection);
+ /// Helper to send the ACL lists to the specified peer in a separate thread.
+ void peerACLUpdate(uint32_t peerId);
+ /// Entry point to send the ACL lists to the specified peer in a separate thread.
+ static void* threadedACLUpdate(void* arg);
+
/// Helper to send the list of whitelisted RIDs to the specified peer.
- void writeWhitelistRIDs(uint32_t peerId, bool queueOnly = false);
- /// Helper to send the list of whitelisted RIDs to connected peers.
- void writeWhitelistRIDs();
+ void writeWhitelistRIDs(uint32_t peerId);
/// Helper to send the list of blacklisted RIDs to the specified peer.
- void writeBlacklistRIDs(uint32_t peerId, bool queueOnly = false);
- /// Helper to send the list of blacklisted RIDs to connected peers.
- void writeBlacklistRIDs();
-
+ void writeBlacklistRIDs(uint32_t peerId);
/// Helper to send the list of active TGIDs to the specified peer.
- void writeTGIDs(uint32_t peerId, bool queueOnly = false);
- /// Helper to send the list of active TGIDs to connected peers.
- void writeTGIDs();
+ void writeTGIDs(uint32_t peerId);
/// Helper to send the list of deactivated TGIDs to the specified peer.
- void writeDeactiveTGIDs(uint32_t peerId, bool queueOnly = false);
- /// Helper to send the list of deactivated TGIDs to connected peers.
- void writeDeactiveTGIDs();
+ void writeDeactiveTGIDs(uint32_t peerId);
/// Helper to send a data message to the specified peer.
bool writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length,
- uint16_t pktSeq, uint32_t streamId, bool queueOnly = false);
+ uint16_t pktSeq, uint32_t streamId, bool queueOnly = false, bool directWrite = false) const;
/// Helper to send a data message to the specified peer.
bool writePeer(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length,
- uint32_t streamId, bool queueOnly = false, bool incPktSeq = false);
+ uint32_t streamId, bool queueOnly = false, bool incPktSeq = false, bool directWrite = false) const;
/// Helper to send a command message to the specified peer.
bool writePeerCommand(uint32_t peerId, FrameQueue::OpcodePair opcode, const uint8_t* data = nullptr, uint32_t length = 0U,
- bool queueOnly = false, bool incPktSeq = false);
+ bool incPktSeq = false) const;
/// Helper to send a ACK response to the specified peer.
bool writePeerACK(uint32_t peerId, const uint8_t* data = nullptr, uint32_t length = 0U);
diff --git a/src/fne/network/PeerNetwork.cpp b/src/fne/network/PeerNetwork.cpp
index dbcb7126..8555f0a6 100644
--- a/src/fne/network/PeerNetwork.cpp
+++ b/src/fne/network/PeerNetwork.cpp
@@ -19,6 +19,7 @@ using namespace network;
#include
#include
+#include
// ---------------------------------------------------------------------------
// Public Class Members
@@ -44,13 +45,30 @@ using namespace network;
/// Flag indicating that the system will accept radio ID and talkgroup ID lookups from the network.
PeerNetwork::PeerNetwork(const std::string& address, uint16_t port, uint16_t localPort, uint32_t peerId, const std::string& password,
bool duplex, bool debug, bool dmr, bool p25, bool nxdn, bool slot1, bool slot2, bool allowActivityTransfer, bool allowDiagnosticTransfer, bool updateLookup) :
- Network(address, port, localPort, peerId, password, duplex, debug, dmr, p25, nxdn, slot1, slot2, allowActivityTransfer, allowDiagnosticTransfer, updateLookup)
+ Network(address, port, localPort, peerId, password, duplex, debug, dmr, p25, nxdn, slot1, slot2, allowActivityTransfer, allowDiagnosticTransfer, updateLookup),
+ m_blockTrafficToTable()
{
assert(!address.empty());
assert(port > 0U);
assert(!password.empty());
}
+///
+/// Checks if the passed peer ID is blocked from sending to this peer.
+///
+///
+bool PeerNetwork::checkBlockedPeer(uint32_t peerId)
+{
+ if (m_blockTrafficToTable.empty())
+ return false;
+
+ if (std::find(m_blockTrafficToTable.begin(), m_blockTrafficToTable.end(), peerId) != m_blockTrafficToTable.end()) {
+ return true;
+ }
+
+ return false;
+}
+
// ---------------------------------------------------------------------------
// Protected Class Members
// ---------------------------------------------------------------------------
diff --git a/src/fne/network/PeerNetwork.h b/src/fne/network/PeerNetwork.h
index 373be1d9..26f4c8e1 100644
--- a/src/fne/network/PeerNetwork.h
+++ b/src/fne/network/PeerNetwork.h
@@ -18,6 +18,7 @@
#include
#include
+#include
namespace network
{
@@ -32,7 +33,16 @@ namespace network
PeerNetwork(const std::string& address, uint16_t port, uint16_t localPort, uint32_t peerId, const std::string& password,
bool duplex, bool debug, bool dmr, bool p25, bool nxdn, bool slot1, bool slot2, bool allowActivityTransfer, bool allowDiagnosticTransfer, bool updateLookup);
+ /// Gets the blocked traffic peer ID table.
+ std::vector blockTrafficTo() const { return m_blockTrafficToTable; }
+ /// Adds an entry to the blocked traffic peer ID table.
+ void addBlockedTrafficPeer(uint32_t peerId) { m_blockTrafficToTable.push_back(peerId); }
+ /// Checks if the passed peer ID is blocked from sending to this peer.
+ bool checkBlockedPeer(uint32_t peerId);
+
protected:
+ std::vector m_blockTrafficToTable;
+
/// Writes configuration to the network.
bool writeConfig() override;
};
diff --git a/src/fne/network/RESTAPI.cpp b/src/fne/network/RESTAPI.cpp
index d1390622..d2cdd9fd 100644
--- a/src/fne/network/RESTAPI.cpp
+++ b/src/fne/network/RESTAPI.cpp
@@ -473,6 +473,7 @@ void RESTAPI::initializeEndpoints()
m_dispatcher.match(GET_STATUS).get(REST_API_BIND(RESTAPI::restAPI_GetStatus, this));
m_dispatcher.match(FNE_GET_PEER_QUERY).get(REST_API_BIND(RESTAPI::restAPI_GetPeerQuery, this));
+ m_dispatcher.match(FNE_GET_PEER_COUNT).get(REST_API_BIND(RESTAPI::restAPI_GetPeerCount, this));
m_dispatcher.match(FNE_GET_RID_QUERY).get(REST_API_BIND(RESTAPI::restAPI_GetRIDQuery, this));
m_dispatcher.match(FNE_PUT_RID_ADD).put(REST_API_BIND(RESTAPI::restAPI_PutRIDAdd, this));
@@ -721,6 +722,30 @@ void RESTAPI::restAPI_GetPeerQuery(const HTTPPayload& request, HTTPPayload& repl
reply.payload(response);
}
+///
+///
+///
+///
+///
+///
+void RESTAPI::restAPI_GetPeerCount(const HTTPPayload& request, HTTPPayload& reply, const RequestMatch& match)
+{
+ if (!validateAuth(request, reply)) {
+ return;
+ }
+
+ json::object response = json::object();
+ setResponseDefaultStatus(response);
+
+ json::array peers = json::array();
+ if (m_network != nullptr) {
+ uint32_t count = m_network->m_peers.size();
+ response["peerCount"].set(count);
+ }
+
+ reply.payload(response);
+}
+
///
///
///
@@ -799,10 +824,11 @@ void RESTAPI::restAPI_PutRIDAdd(const HTTPPayload& request, HTTPPayload& reply,
// The addEntry function will automatically update an existing entry, so no need to check for an exisitng one here
m_ridLookup->addEntry(rid, enabled, alias);
-
+/*
if (m_network != nullptr) {
m_network->m_forceListUpdate = true;
}
+*/
}
///
@@ -838,10 +864,11 @@ void RESTAPI::restAPI_PutRIDDelete(const HTTPPayload& request, HTTPPayload& repl
}
m_ridLookup->eraseEntry(rid);
-
+/*
if (m_network != nullptr) {
m_network->m_forceListUpdate = true;
}
+*/
}
///
@@ -934,10 +961,11 @@ void RESTAPI::restAPI_PutTGAdd(const HTTPPayload& request, HTTPPayload& reply, c
::LogInfoEx(LOG_REST, "Talkgroup NAME: %s SRC_TGID: %u SRC_TS: %u ACTIVE: %u PARROT: %u INCLUSIONS: %u EXCLUSIONS: %u REWRITES: %u", groupName.c_str(), tgId, tgSlot, active, parrot, incCount, excCount, rewrCount);
m_tidLookup->addEntry(groupVoice);
-
+/*
if (m_network != nullptr) {
m_network->m_forceListUpdate = true;
}
+*/
}
///
@@ -974,10 +1002,11 @@ void RESTAPI::restAPI_PutTGDelete(const HTTPPayload& request, HTTPPayload& reply
}
m_tidLookup->eraseEntry(groupVoice.source().tgId(), groupVoice.source().tgSlot());
-
+/*
if (m_network != nullptr) {
m_network->m_forceListUpdate = true;
}
+*/
}
///
@@ -1014,10 +1043,11 @@ void RESTAPI::restAPI_GetForceUpdate(const HTTPPayload& request, HTTPPayload& re
json::object response = json::object();
setResponseDefaultStatus(response);
+/*
if (m_network != nullptr) {
m_network->m_forceListUpdate = true;
}
-
+*/
reply.payload(response);
}
diff --git a/src/fne/network/RESTAPI.h b/src/fne/network/RESTAPI.h
index a17e2e02..5fd46995 100644
--- a/src/fne/network/RESTAPI.h
+++ b/src/fne/network/RESTAPI.h
@@ -97,6 +97,8 @@ private:
///
void restAPI_GetPeerQuery(const HTTPPayload& request, HTTPPayload& reply, const network::rest::RequestMatch& match);
+ ///
+ void restAPI_GetPeerCount(const HTTPPayload& request, HTTPPayload& reply, const network::rest::RequestMatch& match);
///
void restAPI_GetRIDQuery(const HTTPPayload& request, HTTPPayload& reply, const network::rest::RequestMatch& match);
diff --git a/src/fne/network/RESTDefines.h b/src/fne/network/RESTDefines.h
index 46e1bbe2..448a1a6d 100644
--- a/src/fne/network/RESTDefines.h
+++ b/src/fne/network/RESTDefines.h
@@ -21,6 +21,7 @@
// ---------------------------------------------------------------------------
#define FNE_GET_PEER_QUERY "/peer/query"
+#define FNE_GET_PEER_COUNT "/peer/count"
#define FNE_GET_RID_QUERY "/rid/query"
#define FNE_PUT_RID_ADD "/rid/add"
diff --git a/src/fne/network/fne/TagDMRData.cpp b/src/fne/network/fne/TagDMRData.cpp
index cd8d189a..3069205a 100644
--- a/src/fne/network/fne/TagDMRData.cpp
+++ b/src/fne/network/fne/TagDMRData.cpp
@@ -252,6 +252,11 @@ bool TagDMRData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
continue;
}
+ // check if the source peer is blocked from sending to this peer
+ if (peer.second->checkBlockedPeer(peerId)) {
+ continue;
+ }
+
uint8_t outboundPeerBuffer[len];
::memset(outboundPeerBuffer, 0x00U, len);
::memcpy(outboundPeerBuffer, buffer, len);
diff --git a/src/fne/network/fne/TagNXDNData.cpp b/src/fne/network/fne/TagNXDNData.cpp
index 4f4c8fd6..9ac0311e 100644
--- a/src/fne/network/fne/TagNXDNData.cpp
+++ b/src/fne/network/fne/TagNXDNData.cpp
@@ -222,6 +222,11 @@ bool TagNXDNData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerI
continue;
}
+ // check if the source peer is blocked from sending to this peer
+ if (peer.second->checkBlockedPeer(peerId)) {
+ continue;
+ }
+
uint8_t outboundPeerBuffer[len];
::memset(outboundPeerBuffer, 0x00U, len);
::memcpy(outboundPeerBuffer, buffer, len);
diff --git a/src/fne/network/fne/TagP25Data.cpp b/src/fne/network/fne/TagP25Data.cpp
index fc3af658..dd5500fa 100644
--- a/src/fne/network/fne/TagP25Data.cpp
+++ b/src/fne/network/fne/TagP25Data.cpp
@@ -286,6 +286,11 @@ bool TagP25Data::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
continue;
}
+ // check if the source peer is blocked from sending to this peer
+ if (peer.second->checkBlockedPeer(peerId)) {
+ continue;
+ }
+
uint8_t outboundPeerBuffer[len];
::memset(outboundPeerBuffer, 0x00U, len);
::memcpy(outboundPeerBuffer, buffer, len);
diff --git a/src/host/network/Network.cpp b/src/host/network/Network.cpp
index cb120a76..d9b8f7fb 100644
--- a/src/host/network/Network.cpp
+++ b/src/host/network/Network.cpp
@@ -396,6 +396,7 @@ void Network::clock(uint32_t ms)
m_ridLookup->toggleEntry(id, true);
offs += 4U;
}
+ LogMessage(LOG_NET, "Network Announced %u whitelisted RIDs", len);
}
}
}
@@ -413,6 +414,7 @@ void Network::clock(uint32_t ms)
m_ridLookup->toggleEntry(id, false);
offs += 4U;
}
+ LogMessage(LOG_NET, "Network Announced %u blacklisted RIDs", len);
}
}
}
diff --git a/src/host/p25/packet/ControlSignaling.cpp b/src/host/p25/packet/ControlSignaling.cpp
index 48e12e74..1227786b 100644
--- a/src/host/p25/packet/ControlSignaling.cpp
+++ b/src/host/p25/packet/ControlSignaling.cpp
@@ -751,8 +751,9 @@ bool ControlSignaling::processNetwork(uint8_t* data, uint32_t len, lc::LC& contr
m_p25->m_affiliations.releaseGrant(dstId, false);
}
}
+
+ return true; // don't allow this to write to the air
}
- break;
case TSBK_OSP_DVM_GIT_HASH:
// ignore
return true; // don't allow this to write to the air
diff --git a/src/remote/RESTClientMain.cpp b/src/remote/RESTClientMain.cpp
index bc02c990..93521851 100644
--- a/src/remote/RESTClientMain.cpp
+++ b/src/remote/RESTClientMain.cpp
@@ -41,6 +41,7 @@
#define RCD_GET_VOICE_CH "voice-ch"
#define RCD_FNE_GET_PEERLIST "fne-peerlist"
+#define RCD_FNE_GET_PEERCOUNT "fne-peercount"
#define RCD_FNE_GET_TGIDLIST "fne-tgidlist"
#define RCD_FNE_GET_FORCEUPDATE "fne-force-update"
#define RCD_FNE_GET_AFFLIST "fne-affs"
@@ -182,6 +183,7 @@ void usage(const char* message, const char* arg)
reply += " voice-ch Retrieves the list of configured voice channels\r\n";
reply += "\r\n";
reply += " fne-peerlist Retrieves the list of connected peers (Converged FNE only)\r\n";
+ reply += " fne-peercount Retrieves the count of connected peers (Converged FNE only)\r\n";
reply += " fne-tgidlist Retrieves the list of configured TGIDs (Converged FNE only)\r\n";
reply += " fne-force-update Forces the FNE to send list update (Converged FNE only)\r\n";
reply += " fne-affs Retrieves the list of currently affiliated SUs (Converged FNE only)\r\n";
@@ -730,6 +732,9 @@ int main(int argc, char** argv)
else if (rcom == RCD_FNE_GET_PEERLIST) {
retCode = client->send(HTTP_GET, FNE_GET_PEER_QUERY, json::object(), response);
}
+ else if (rcom == RCD_FNE_GET_PEERCOUNT) {
+ retCode = client->send(HTTP_GET, FNE_GET_PEER_COUNT, json::object(), response);
+ }
else if (rcom == RCD_FNE_GET_TGIDLIST) {
retCode = client->send(HTTP_GET, FNE_GET_TGID_QUERY, json::object(), response);
}