Merge Peer Link Enhancement Changes (#73)

* add support for Peer-Link CFNEs to pass activity and peer status messages to the prime/master CFNE; add support for Peer-Link CFNEs to transmit their currently active peer list to the prime/master CFNE; add support on the master/prime CFNE to support reporting Peer-Link peer IDs in the /peer/query REST API request; add support to pass affiliation updates from Peer-Link CFNEs to the prime/master CFNE (Note: this passing does not preserve the original peer ID the affiliation came from, from the prime/master perspective, currently, the affiliation will appear as if it came from the Peer-Link CFNE); correct a bug with buffer overflow for Peer-Link configuration transfers;

* hide debug messages;

* remove debug print;
pull/75/head
Bryan Biedenkapp 1 year ago committed by GitHub
parent 8c00c7b81c
commit f40e6c6af0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -335,8 +335,11 @@ uint32_t BaseNetwork::getDMRStreamId(uint32_t slotNo) const
/* Helper to send a data message to the master. */
bool BaseNetwork::writeMaster(FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length, uint16_t pktSeq, uint32_t streamId,
bool queueOnly, bool useAlternatePort)
bool queueOnly, bool useAlternatePort, uint32_t peerId)
{
if (peerId == 0U)
peerId = m_peerId;
if (useAlternatePort) {
sockaddr_storage addr;
uint32_t addrLen;
@ -346,14 +349,14 @@ bool BaseNetwork::writeMaster(FrameQueue::OpcodePair opcode, const uint8_t* data
if (udp::Socket::lookup(address, port, addr, addrLen) == 0) {
if (!queueOnly)
return m_frameQueue->write(data, length, streamId, m_peerId, m_peerId, opcode, pktSeq, addr, addrLen);
return m_frameQueue->write(data, length, streamId, peerId, m_peerId, opcode, pktSeq, addr, addrLen);
else
m_frameQueue->enqueueMessage(data, length, streamId, m_peerId, opcode, pktSeq, addr, addrLen);
}
}
else {
if (!queueOnly)
return m_frameQueue->write(data, length, streamId, m_peerId, m_peerId, opcode, pktSeq, m_addr, m_addrLen);
return m_frameQueue->write(data, length, streamId, peerId, m_peerId, opcode, pktSeq, m_addr, m_addrLen);
else
m_frameQueue->enqueueMessage(data, length, streamId, m_peerId, opcode, pktSeq, m_addr, m_addrLen);
}

@ -66,6 +66,7 @@
#define TAG_TRANSFER_STATUS "TRNSSTS"
#define TAG_ANNOUNCE "ANNC"
#define TAG_PEER_LINK "PRLNK"
namespace network
{
@ -311,10 +312,11 @@ namespace network
* @param streamId Stream ID.
* @param queueOnly Flag indicating this message should be queued instead of send immediately.
* @param useAlternatePort Flag indicating the message shuold be sent using the alternate port (mainly for activity and diagnostics).
* @param peerId If non-zero, overrides the peer ID sent in the packet to the master.
* @returns bool True, if message was sent, otherwise false.
*/
bool writeMaster(FrameQueue::OpcodePair opcode, const uint8_t* data, uint32_t length,
uint16_t pktSeq, uint32_t streamId, bool queueOnly = false, bool useAlternatePort = false);
uint16_t pktSeq, uint32_t streamId, bool queueOnly = false, bool useAlternatePort = false, uint32_t peerId = 0U);
// Digital Mobile Radio
/**

@ -105,7 +105,9 @@ namespace network
PL_TALKGROUP_LIST = 0x00U, //! FNE Peer-Link Talkgroup Transfer
PL_RID_LIST = 0x01U, //! FNE Peer-Link Radio ID Transfer
PL_PEER_LIST = 0x02U //! FNE Peer-Link Peer List Transfer
PL_PEER_LIST = 0x02U, //! FNE Peer-Link Peer List Transfer
PL_ACT_PEER_LIST = 0xA2U, //! FNE Peer-Link Active Peer List Transfer
};
};

@ -81,6 +81,7 @@ private:
yaml::Node m_conf;
friend class network::FNENetwork;
friend class network::DiagNetwork;
friend class network::callhandler::TagDMRData;
friend class network::callhandler::packetdata::DMRPacketData;
friend class network::callhandler::TagP25Data;

@ -8,6 +8,7 @@
*
*/
#include "fne/Defines.h"
#include "common/zlib/zlib.h"
#include "common/Log.h"
#include "common/Utils.h"
#include "network/DiagNetwork.h"
@ -200,10 +201,31 @@ void* DiagNetwork::threadedNetworkRx(void* arg)
switch (req->fneHeader.getFunction()) {
case NET_FUNC::TRANSFER:
{
// resolve peer ID (used for Activity Log and Status Transfer)
bool validPeerId = false;
uint32_t pktPeerId = 0U;
if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
validPeerId = true;
pktPeerId = peerId;
} else {
if (peerId > 0) {
// this could be a peer-link transfer -- in which case, we need to check the SSRC of the packet not the peer ID
if (network->m_peers.find(req->rtpHeader.getSSRC()) != network->m_peers.end()) {
FNEPeerConnection* connection = network->m_peers[req->rtpHeader.getSSRC()];
if (connection != nullptr) {
if (connection->isExternalPeer() && connection->isPeerLink()) {
validPeerId = true;
pktPeerId = req->rtpHeader.getSSRC();
}
}
}
}
}
if (req->fneHeader.getSubFunction() == NET_SUBFUNC::TRANSFER_SUBFUNC_ACTIVITY) { // Peer Activity Log Transfer
if (network->m_allowActivityTransfer) {
if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
FNEPeerConnection* connection = network->m_peers[peerId];
if (pktPeerId > 0 && validPeerId) {
FNEPeerConnection* connection = network->m_peers[pktPeerId];
if (connection != nullptr) {
std::string ip = udp::Socket::address(req->address);
@ -215,20 +237,20 @@ void* DiagNetwork::threadedNetworkRx(void* arg)
::memcpy(rawPayload, req->buffer + 11U, req->length - 11U);
std::string payload(rawPayload, rawPayload + (req->length - 11U));
::ActivityLog("%.9u (%8s) %s", peerId, connection->identity().c_str(), payload.c_str());
::ActivityLog("%.9u (%8s) %s", pktPeerId, connection->identity().c_str(), payload.c_str());
// report activity log to InfluxDB
if (network->m_enableInfluxDB) {
influxdb::QueryBuilder()
.meas("activity")
.tag("peerId", std::to_string(peerId))
.tag("peerId", std::to_string(pktPeerId))
.field("identity", connection->identity())
.field("msg", payload)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(network->m_influxServer);
}
// repeat traffic to the connected peers
// repeat traffic to the connected SysView peers
if (network->m_peers.size() > 0U) {
for (auto peer : network->m_peers) {
if (peer.second != nullptr) {
@ -240,7 +262,7 @@ void* DiagNetwork::threadedNetworkRx(void* arg)
sockaddr_storage addr = peer.second->socketStorage();
uint32_t addrLen = peer.second->sockStorageLen();
network->m_frameQueue->write(req->buffer, req->length, streamId, peerId, network->m_peerId,
network->m_frameQueue->write(req->buffer, req->length, streamId, pktPeerId, network->m_peerId,
{ NET_FUNC::TRANSFER, NET_SUBFUNC::TRANSFER_SUBFUNC_ACTIVITY }, RTP_END_OF_CALL_SEQ, addr, addrLen);
}
} else {
@ -248,9 +270,21 @@ void* DiagNetwork::threadedNetworkRx(void* arg)
}
}
}
// attempt to repeat traffic to Peer-Link masters
if (network->m_host->m_peerNetworks.size() > 0) {
for (auto peer : network->m_host->m_peerNetworks) {
if (peer.second != nullptr) {
if (peer.second->isEnabled() && peer.second->isPeerLink()) {
peer.second->writeMaster({ NET_FUNC::TRANSFER, NET_SUBFUNC::TRANSFER_SUBFUNC_ACTIVITY },
req->buffer, req->length, RTP_END_OF_CALL_SEQ, streamId, false, true, pktPeerId);
}
}
}
}
}
else {
network->writePeerNAK(peerId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED);
network->writePeerNAK(pktPeerId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
@ -295,14 +329,15 @@ void* DiagNetwork::threadedNetworkRx(void* arg)
}
}
else if (req->fneHeader.getSubFunction() == NET_SUBFUNC::TRANSFER_SUBFUNC_STATUS) { // Peer Status Transfer
if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
FNEPeerConnection* connection = network->m_peers[peerId];
if (pktPeerId > 0 && validPeerId) {
FNEPeerConnection* connection = network->m_peers[pktPeerId];
if (connection != nullptr) {
std::string ip = udp::Socket::address(req->address);
// validate peer (simple validation really)
if (connection->connected() && connection->address() == ip) {
if (network->m_peers.size() > 0U) {
// attempt to repeat status traffic to SysView clients
for (auto peer : network->m_peers) {
if (peer.second != nullptr) {
if (peer.second->isSysView()) {
@ -315,19 +350,31 @@ void* DiagNetwork::threadedNetworkRx(void* arg)
if (network->m_debug) {
LogDebug(LOG_NET, "SysView, srcPeer = %u, dstPeer = %u, peer status message, len = %u",
peerId, peer.first, req->length);
pktPeerId, peer.first, req->length);
}
network->m_frameQueue->write(req->buffer, req->length, streamId, peerId, network->m_peerId,
network->m_frameQueue->write(req->buffer, req->length, streamId, pktPeerId, network->m_peerId,
{ NET_FUNC::TRANSFER, NET_SUBFUNC::TRANSFER_SUBFUNC_STATUS }, RTP_END_OF_CALL_SEQ, addr, addrLen);
}
} else {
continue;
}
}
// attempt to repeat status traffic to Peer-Link masters
if (network->m_host->m_peerNetworks.size() > 0) {
for (auto peer : network->m_host->m_peerNetworks) {
if (peer.second != nullptr) {
if (peer.second->isEnabled() && peer.second->isPeerLink()) {
peer.second->writeMaster({ NET_FUNC::TRANSFER, NET_SUBFUNC::TRANSFER_SUBFUNC_STATUS },
req->buffer, req->length, RTP_END_OF_CALL_SEQ, streamId, false, true, pktPeerId);
}
}
}
}
}
}
else {
network->writePeerNAK(peerId, TAG_TRANSFER_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED);
network->writePeerNAK(pktPeerId, TAG_TRANSFER_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
@ -339,6 +386,47 @@ void* DiagNetwork::threadedNetworkRx(void* arg)
}
break;
case NET_FUNC::PEER_LINK:
if (req->fneHeader.getSubFunction() == NET_SUBFUNC::PL_ACT_PEER_LIST) { // Peer-Link Active Peer List
if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
FNEPeerConnection* connection = network->m_peers[peerId];
if (connection != nullptr) {
std::string ip = udp::Socket::address(req->address);
// validate peer (simple validation really)
if (connection->connected() && connection->address() == ip && connection->isExternalPeer() &&
connection->isPeerLink()) {
UInt8Array __rawPayload = std::make_unique<uint8_t[]>(req->length - 8U);
uint8_t* rawPayload = __rawPayload.get();
::memset(rawPayload, 0x00U, req->length - 8U);
::memcpy(rawPayload, req->buffer + 8U, req->length - 8U);
std::string payload(rawPayload, rawPayload + (req->length - 8U));
// parse JSON body
json::value v;
std::string err = json::parse(v, payload);
if (!err.empty()) {
break;
}
else {
// ensure parsed JSON is an array
if (!v.is<json::array>()) {
break;
}
else {
json::array arr = v.get<json::array>();
network->m_peerLinkPeers[peerId] = arr;
}
}
}
else {
network->writePeerNAK(peerId, TAG_PEER_LINK, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
}
break;
default:
// diagostic network ignores unknowns for everything else...
break;

@ -72,6 +72,7 @@ FNENetwork::FNENetwork(HostFNE* host, const std::string& address, uint16_t port,
m_peerListLookup(nullptr),
m_status(NET_STAT_INVALID),
m_peers(),
m_peerLinkPeers(),
m_peerAffiliations(),
m_ccPeerMap(),
m_maintainenceTimer(1000U, pingTime),
@ -304,6 +305,31 @@ void FNENetwork::clock(uint32_t ms)
m_frameQueue->clearTimestamps();
}
// send active peer list to Peer-Link masters
if (m_host->m_peerNetworks.size() > 0) {
for (auto peer : m_host->m_peerNetworks) {
if (peer.second != nullptr) {
if (peer.second->isEnabled() && peer.second->isPeerLink()) {
if (m_peers.size() > 0) {
json::array peers = json::array();
for (auto entry : m_peers) {
uint32_t peerId = entry.first;
network::FNEPeerConnection* peerConn = entry.second;
if (peerConn != nullptr) {
json::object peerObj = fneConnObject(peerId, peerConn);
uint32_t peerNetPeerId = peer.second->getPeerId();
peerObj["parentPeerId"].set<uint32_t>(peerNetPeerId);
peers.push_back(json::value(peerObj));
}
}
peer.second->writePeerLinkPeers(&peers);
}
}
}
}
}
m_maintainenceTimer.start();
}
@ -786,6 +812,21 @@ void* FNENetwork::threadedNetworkRx(void* arg)
connection->isExternalPeer(external);
if (external)
LogInfoEx(LOG_NET, "PEER %u reports external peer", peerId);
// check if the peer is participating in peer link
lookups::PeerId peerEntry = network->m_peerListLookup->find(req->peerId);
if (!peerEntry.peerDefault()) {
if (peerEntry.peerLink()) {
if (network->m_host->m_useAlternatePortForDiagnostics) {
connection->isPeerLink(true);
if (external)
LogInfoEx(LOG_NET, "PEER %u configured for Peer-Link", peerId);
} else {
LogError(LOG_NET, "PEER %u, Peer-Link operations *require* the alternate diagnostics port option to be enabled.", peerId);
LogError(LOG_NET, "PEER %u, will not receive Peer-Link ACL updates.", peerId);
}
}
}
}
// is the peer reporting it is a conventional peer?
@ -1080,6 +1121,18 @@ void* FNENetwork::threadedNetworkRx(void* arg)
uint32_t dstId = __GET_UINT16(req->buffer, 3U); // Destination Address
aff->groupUnaff(srcId);
aff->groupAff(srcId, dstId);
// attempt to repeat traffic to Peer-Link masters
if (network->m_host->m_peerNetworks.size() > 0) {
for (auto peer : network->m_host->m_peerNetworks) {
if (peer.second != nullptr) {
if (peer.second->isEnabled() && peer.second->isPeerLink()) {
peer.second->writeMaster({ NET_FUNC::ANNOUNCE, NET_SUBFUNC::ANNC_SUBFUNC_GRP_AFFIL },
req->buffer, req->length, req->rtpHeader.getSequence(), streamId, false, false);
}
}
}
}
}
else {
network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
@ -1102,6 +1155,18 @@ void* FNENetwork::threadedNetworkRx(void* arg)
if (connection->connected() && connection->address() == ip && aff != nullptr) {
uint32_t srcId = __GET_UINT16(req->buffer, 0U); // Source Address
aff->unitReg(srcId);
// attempt to repeat traffic to Peer-Link masters
if (network->m_host->m_peerNetworks.size() > 0) {
for (auto peer : network->m_host->m_peerNetworks) {
if (peer.second != nullptr) {
if (peer.second->isEnabled() && peer.second->isPeerLink()) {
peer.second->writeMaster({ NET_FUNC::ANNOUNCE, NET_SUBFUNC::ANNC_SUBFUNC_UNIT_REG },
req->buffer, req->length, req->rtpHeader.getSequence(), streamId, false, false);
}
}
}
}
}
else {
network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
@ -1124,6 +1189,18 @@ void* FNENetwork::threadedNetworkRx(void* arg)
if (connection->connected() && connection->address() == ip && aff != nullptr) {
uint32_t srcId = __GET_UINT16(req->buffer, 0U); // Source Address
aff->unitDereg(srcId);
// attempt to repeat traffic to Peer-Link masters
if (network->m_host->m_peerNetworks.size() > 0) {
for (auto peer : network->m_host->m_peerNetworks) {
if (peer.second != nullptr) {
if (peer.second->isEnabled() && peer.second->isPeerLink()) {
peer.second->writeMaster({ NET_FUNC::ANNOUNCE, NET_SUBFUNC::ANNC_SUBFUNC_UNIT_DEREG },
req->buffer, req->length, req->rtpHeader.getSequence(), streamId, false, false);
}
}
}
}
}
else {
network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
@ -1146,6 +1223,18 @@ void* FNENetwork::threadedNetworkRx(void* arg)
if (connection->connected() && connection->address() == ip && aff != nullptr) {
uint32_t srcId = __GET_UINT16(req->buffer, 0U); // Source Address
aff->groupUnaff(srcId);
// attempt to repeat traffic to Peer-Link masters
if (network->m_host->m_peerNetworks.size() > 0) {
for (auto peer : network->m_host->m_peerNetworks) {
if (peer.second != nullptr) {
if (peer.second->isEnabled() && peer.second->isPeerLink()) {
peer.second->writeMaster({ NET_FUNC::ANNOUNCE, NET_SUBFUNC::ANNC_SUBFUNC_GRP_UNAFFIL },
req->buffer, req->length, req->rtpHeader.getSequence(), streamId, false, false);
}
}
}
}
}
else {
network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
@ -1181,6 +1270,18 @@ void* FNENetwork::threadedNetworkRx(void* arg)
offs += 8U;
}
LogMessage(LOG_NET, "PEER %u (%s) announced %u affiliations", peerId, connection->identity().c_str(), len);
// attempt to repeat traffic to Peer-Link masters
if (network->m_host->m_peerNetworks.size() > 0) {
for (auto peer : network->m_host->m_peerNetworks) {
if (peer.second != nullptr) {
if (peer.second->isEnabled() && peer.second->isPeerLink()) {
peer.second->writeMaster({ NET_FUNC::ANNOUNCE, NET_SUBFUNC::ANNC_SUBFUNC_AFFILS },
req->buffer, req->length, req->rtpHeader.getSequence(), streamId, false, false);
}
}
}
}
}
}
else {
@ -1215,6 +1316,18 @@ void* FNENetwork::threadedNetworkRx(void* arg)
}
LogMessage(LOG_NET, "PEER %u (%s) announced %u VCs", peerId, connection->identity().c_str(), len);
network->m_ccPeerMap[peerId] = vcPeers;
// attempt to repeat traffic to Peer-Link masters
if (network->m_host->m_peerNetworks.size() > 0) {
for (auto peer : network->m_host->m_peerNetworks) {
if (peer.second != nullptr) {
if (peer.second->isEnabled() && peer.second->isPeerLink()) {
peer.second->writeMaster({ NET_FUNC::ANNOUNCE, NET_SUBFUNC::ANNC_SUBFUNC_SITE_VC },
req->buffer, req->length, req->rtpHeader.getSequence(), streamId, false, false);
}
}
}
}
}
else {
network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
@ -1300,7 +1413,6 @@ bool FNENetwork::erasePeer(uint32_t peerId)
auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; });
if (it != m_peers.end()) {
m_peers.erase(peerId);
return true;
}
}
@ -1309,11 +1421,59 @@ bool FNENetwork::erasePeer(uint32_t peerId)
auto it = std::find_if(m_ccPeerMap.begin(), m_ccPeerMap.end(), [&](auto x) { return x.first == peerId; });
if (it != m_ccPeerMap.end()) {
m_ccPeerMap.erase(peerId);
return true;
}
}
return false;
// erase any Peer-Link entries for this peer
{
auto it = std::find_if(m_peerLinkPeers.begin(), m_peerLinkPeers.end(), [&](auto x) { return x.first == peerId; });
if (it != m_peerLinkPeers.end()) {
m_peerLinkPeers.erase(peerId);
}
}
return true;
}
/* Helper to create a JSON representation of a FNE peer connection. */
json::object FNENetwork::fneConnObject(uint32_t peerId, FNEPeerConnection *conn)
{
json::object peerObj = json::object();
peerObj["peerId"].set<uint32_t>(peerId);
std::string address = conn->address();
peerObj["address"].set<std::string>(address);
uint16_t port = conn->port();
peerObj["port"].set<uint16_t>(port);
bool connected = conn->connected();
peerObj["connected"].set<bool>(connected);
uint32_t connectionState = (uint32_t)conn->connectionState();
peerObj["connectionState"].set<uint32_t>(connectionState);
uint32_t pingsReceived = conn->pingsReceived();
peerObj["pingsReceived"].set<uint32_t>(pingsReceived);
uint64_t lastPing = conn->lastPing();
peerObj["lastPing"].set<uint64_t>(lastPing);
uint32_t ccPeerId = conn->ccPeerId();
peerObj["controlChannel"].set<uint32_t>(ccPeerId);
json::object peerConfig = conn->config();
if (peerConfig["rcon"].is<json::object>())
peerConfig.erase("rcon");
peerObj["config"].set<json::object>(peerConfig);
json::array voiceChannels = json::array();
auto it = std::find_if(m_ccPeerMap.begin(), m_ccPeerMap.end(), [&](auto x) { return x.first == peerId; });
if (it != m_ccPeerMap.end()) {
std::vector<uint32_t> vcPeers = m_ccPeerMap[peerId];
for (uint32_t vcEntry : vcPeers) {
voiceChannels.push_back(json::value((double)vcEntry));
}
}
peerObj["voiceChannels"].set<json::array>(voiceChannels);
return peerObj;
}
/* Helper to reset a peer connection. */
@ -1419,20 +1579,11 @@ void* FNENetwork::threadedACLUpdate(void* arg)
std::string peerIdentity = network->resolvePeerIdentity(req->peerId);
// check if the peer is participating in peer link
bool peerLink = false;
lookups::PeerId peerEntry = network->m_peerListLookup->find(req->peerId);
if (!peerEntry.peerDefault()) {
if (peerEntry.peerLink()) {
peerLink = true;
}
}
FNEPeerConnection* connection = network->m_peers[req->peerId];
if (connection != nullptr) {
// if the connection is an external peer, and peer is participating in peer link,
// send the peer proper configuration data
if (connection->isExternalPeer() && peerLink) {
if (connection->isExternalPeer() && connection->isPeerLink()) {
LogInfoEx(LOG_NET, "PEER %u (%s) sending Peer-Link ACL list updates", req->peerId, peerIdentity.c_str());
network->writeWhitelistRIDs(req->peerId, true);

@ -117,6 +117,7 @@ namespace network
m_isExternalPeer(false),
m_isConventionalPeer(false),
m_isSysView(false),
m_isPeerLink(false),
m_config(),
m_pktLastSeq(RTP_END_OF_CALL_SEQ),
m_pktNextSeq(1U)
@ -146,6 +147,7 @@ namespace network
m_isExternalPeer(false),
m_isConventionalPeer(false),
m_isSysView(false),
m_isPeerLink(false),
m_config(),
m_pktLastSeq(RTP_END_OF_CALL_SEQ),
m_pktNextSeq(1U)
@ -234,6 +236,11 @@ namespace network
*/
__PROPERTY_PLAIN(bool, isSysView);
/**
* @brief Flag indicating this connection is from an external peer that is peer link enabled.
*/
__PROPERTY_PLAIN(bool, isPeerLink);
/**
* @brief JSON objecting containing peer configuration information.
*/
@ -382,6 +389,14 @@ namespace network
*/
void close() override;
/**
* @brief Helper to create a JSON representation of a FNE peer connection.
* @param peerId Peer ID.
* @param conn FNE Peer Connection.
* @return json::object
*/
json::object fneConnObject(uint32_t peerId, FNEPeerConnection *conn);
/**
* @brief Helper to reset a peer connection.
* @param peerId Peer ID to reset.
@ -426,6 +441,7 @@ namespace network
static std::mutex m_peerMutex;
typedef std::pair<const uint32_t, network::FNEPeerConnection*> PeerMapPair;
std::unordered_map<uint32_t, FNEPeerConnection*> m_peers;
std::unordered_map<uint32_t, json::array> m_peerLinkPeers;
typedef std::pair<const uint32_t, lookups::AffiliationLookup*> PeerAffiliationMapPair;
std::unordered_map<uint32_t, lookups::AffiliationLookup*> m_peerAffiliations;
std::unordered_map<uint32_t, std::vector<uint32_t>> m_ccPeerMap;

@ -32,6 +32,7 @@ PeerNetwork::PeerNetwork(const std::string& address, uint16_t port, uint16_t loc
Network(address, port, localPort, peerId, password, duplex, debug, dmr, p25, nxdn, slot1, slot2, allowActivityTransfer, allowDiagnosticTransfer, updateLookup, saveLookup),
m_blockTrafficToTable(),
m_pidLookup(nullptr),
m_peerLink(false),
m_tgidCompressedSize(0U),
m_tgidSize(0U),
m_tgidBuffer(nullptr),
@ -71,6 +72,32 @@ bool PeerNetwork::checkBlockedPeer(uint32_t peerId)
return false;
}
/* Writes a complete update of this CFNE's active peer list to the network. */
bool PeerNetwork::writePeerLinkPeers(json::array* peerList)
{
if (peerList == nullptr)
return false;
if (peerList->size() == 0)
return false;
if (peerList->size() > 0 && m_peerLink) {
json::value v = json::value(*peerList);
std::string json = std::string(v.serialize());
CharArray __buffer = std::make_unique<char[]>(json.length() + 9U);
char* buffer = __buffer.get();
::memcpy(buffer + 0U, TAG_PEER_LINK, 4U);
::snprintf(buffer + 8U, json.length() + 1U, "%s", json.c_str());
return writeMaster({ NET_FUNC::PEER_LINK, NET_SUBFUNC::PL_ACT_PEER_LIST },
(uint8_t*)buffer, json.length() + 8U, RTP_END_OF_CALL_SEQ, createStreamId(), false, true);
}
return false;
}
// ---------------------------------------------------------------------------
// Protected Class Members
// ---------------------------------------------------------------------------
@ -95,7 +122,10 @@ void PeerNetwork::userPacketHandler(uint32_t peerId, FrameQueue::OpcodePair opco
if (m_tgidBuffer != nullptr)
delete[] m_tgidBuffer;
m_tgidBuffer = new uint8_t[m_tgidSize];
if (m_tgidSize < PEER_LINK_BLOCK_SIZE)
m_tgidBuffer = new uint8_t[PEER_LINK_BLOCK_SIZE + 1U];
else
m_tgidBuffer = new uint8_t[m_tgidSize + 1U];
}
if (m_tgidBuffer != nullptr) {
@ -166,7 +196,7 @@ void PeerNetwork::userPacketHandler(uint32_t peerId, FrameQueue::OpcodePair opco
// store to file
std::unique_ptr<char[]> __str = std::make_unique<char[]>(decompressedLen + 1U);
char* str = __str.get();
::memcpy(str, decompressed, decompressedLen + 1U);
::memcpy(str, decompressed, decompressedLen);
str[decompressedLen] = 0; // null termination
std::string filename = "/tmp/talkgroup_rules.yml";
@ -183,6 +213,9 @@ void PeerNetwork::userPacketHandler(uint32_t peerId, FrameQueue::OpcodePair opco
m_tidLookup->filename(filename);
m_tidLookup->reload();
// flag this peer as Peer-Link enabled
m_peerLink = true;
// cleanup temporary file
::remove(filename.c_str());
}
@ -214,7 +247,10 @@ tid_lookup_cleanup:
if (m_ridBuffer != nullptr)
delete[] m_ridBuffer;
m_ridBuffer = new uint8_t[m_ridSize];
if (m_ridSize < PEER_LINK_BLOCK_SIZE)
m_ridBuffer = new uint8_t[PEER_LINK_BLOCK_SIZE + 1U];
else
m_ridBuffer = new uint8_t[m_ridSize + 1U];
}
if (m_ridBuffer != nullptr) {
@ -285,7 +321,7 @@ tid_lookup_cleanup:
// store to file
std::unique_ptr<char[]> __str = std::make_unique<char[]>(decompressedLen + 1U);
char* str = __str.get();
::memcpy(str, decompressed, decompressedLen + 1U);
::memcpy(str, decompressed, decompressedLen);
str[decompressedLen] = 0; // null termination
std::string filename = "/tmp/rid_acl.dat";
@ -302,6 +338,9 @@ tid_lookup_cleanup:
m_ridLookup->filename(filename);
m_ridLookup->reload();
// flag this peer as Peer-Link enabled
m_peerLink = true;
// cleanup temporary file
::remove(filename.c_str());
}
@ -333,7 +372,10 @@ rid_lookup_cleanup:
if (m_pidBuffer != nullptr)
delete[] m_pidBuffer;
m_pidBuffer = new uint8_t[m_pidSize];
if (m_pidSize < PEER_LINK_BLOCK_SIZE)
m_pidBuffer = new uint8_t[PEER_LINK_BLOCK_SIZE + 1U];
else
m_pidBuffer = new uint8_t[m_pidSize + 1U];
}
if (m_pidBuffer != nullptr) {
@ -404,7 +446,7 @@ rid_lookup_cleanup:
// store to file
std::unique_ptr<char[]> __str = std::make_unique<char[]>(decompressedLen + 1U);
char* str = __str.get();
::memcpy(str, decompressed, decompressedLen + 1U);
::memcpy(str, decompressed, decompressedLen);
str[decompressedLen] = 0; // null termination
std::string filename = "/tmp/peer_list.dat";
@ -421,6 +463,9 @@ rid_lookup_cleanup:
m_pidLookup->filename(filename);
m_pidLookup->reload();
// flag this peer as Peer-Link enabled
m_peerLink = true;
// cleanup temporary file
::remove(filename.c_str());
}

@ -79,6 +79,19 @@ namespace network
*/
bool checkBlockedPeer(uint32_t peerId);
/**
* @brief Writes a complete update of this CFNE's active peer list to the network.
* @param peerList List of active peers.
* @returns bool True, if list was sent, otherwise false.
*/
bool writePeerLinkPeers(json::array* peerList);
/**
* @brief Returns flag indicating whether or not this peer connection is Peer-Link enabled.
* @returns bool True, if Peer-Link enabled, otherwise false.
*/
bool isPeerLink() const { return m_peerLink; }
protected:
std::vector<uint32_t> m_blockTrafficToTable;
@ -101,6 +114,7 @@ namespace network
private:
lookups::PeerListLookup* m_pidLookup;
bool m_peerLink;
uint32_t m_tgidCompressedSize;
uint32_t m_tgidSize;

@ -821,39 +821,7 @@ void RESTAPI::restAPI_GetPeerQuery(const HTTPPayload& request, HTTPPayload& repl
LogDebug(LOG_REST, "Preparing Peer %u (%s) for REST API query", peerId, peer->address().c_str());
}
json::object peerObj = json::object();
peerObj["peerId"].set<uint32_t>(peerId);
std::string address = peer->address();
peerObj["address"].set<std::string>(address);
uint16_t port = peer->port();
peerObj["port"].set<uint16_t>(port);
bool connected = peer->connected();
peerObj["connected"].set<bool>(connected);
uint32_t connectionState = (uint32_t)peer->connectionState();
peerObj["connectionState"].set<uint32_t>(connectionState);
uint32_t pingsReceived = peer->pingsReceived();
peerObj["pingsReceived"].set<uint32_t>(pingsReceived);
uint64_t lastPing = peer->lastPing();
peerObj["lastPing"].set<uint64_t>(lastPing);
uint32_t ccPeerId = peer->ccPeerId();
peerObj["controlChannel"].set<uint32_t>(ccPeerId);
json::object peerConfig = peer->config();
if (peerConfig["rcon"].is<json::object>())
peerConfig.erase("rcon");
peerObj["config"].set<json::object>(peerConfig);
json::array voiceChannels = json::array();
auto it = std::find_if(m_network->m_ccPeerMap.begin(), m_network->m_ccPeerMap.end(), [&](auto x) { return x.first == peerId; });
if (it != m_network->m_ccPeerMap.end()) {
std::vector<uint32_t> vcPeers = m_network->m_ccPeerMap[peerId];
for (uint32_t vcEntry : vcPeers) {
voiceChannels.push_back(json::value((double)vcEntry));
}
}
peerObj["voiceChannels"].set<json::array>(voiceChannels);
json::object peerObj = m_network->fneConnObject(peerId, peer);
peers.push_back(json::value(peerObj));
}
}
@ -861,6 +829,20 @@ void RESTAPI::restAPI_GetPeerQuery(const HTTPPayload& request, HTTPPayload& repl
else {
LogDebug(LOG_REST, "No peers connected to this FNE");
}
// report any Peer-Link reported peers
if (m_network->m_peerLinkPeers.size() > 0) {
for (auto entry : m_network->m_peerLinkPeers) {
json::array peerObjs = entry.second;
if (entry.second.size() > 0) {
for (auto linkEntry : entry.second) {
if (linkEntry.is<json::object>()) {
peers.push_back(json::value(linkEntry));
}
}
}
}
}
}
else {
LogDebug(LOG_REST, "Network not set up, no peers to return");

@ -454,17 +454,21 @@ public:
const auto& rootWidget = getRootWidget();
std::map<uint32_t, json::object> peerStatus(getNetwork()->peerStatus.begin(), getNetwork()->peerStatus.end());
for (auto entry : peerStatus) {
uint32_t peerId = entry.first;
json::object peerObj = entry.second;
if (peerObj["peerId"].is<uint32_t>())
peerId = peerObj["peerId"].get<uint32_t>();
auto it = std::find_if(m_nodes.begin(), m_nodes.end(), [&](NodeStatusWidget* wdgt) {
if (wdgt->peerId == entry.first && wdgt->uniqueId == (int32_t)entry.first)
if (wdgt->peerId == peerId && wdgt->uniqueId == (int32_t)peerId)
return true;
return false;
});
if (it == m_nodes.end()) {
json::object peerObj = entry.second;
addNode(entry.first, peerObj);
addNode(peerId, peerObj);
uint32_t peerId = entry.first;
uint8_t channelId = peerObj["channelId"].get<uint8_t>();
uint32_t channelNo = peerObj["channelNo"].get<uint32_t>();
@ -538,9 +542,8 @@ public:
} else {
NodeStatusWidget* wdgt = *it;
json::object peerObj = entry.second;
updateNode(wdgt, entry.first, peerObj);
updateNode(wdgt, peerId, peerObj);
uint32_t peerId = entry.first;
uint8_t channelId = peerObj["channelId"].get<uint8_t>();
uint32_t channelNo = peerObj["channelNo"].get<uint32_t>();

@ -138,6 +138,10 @@ public:
uint32_t pingsReceived = (uint32_t)peerObj["pingsReceived"].getDefault<uint32_t>(0U);
uint64_t lastPing = (uint64_t)peerObj["lastPing"].getDefault<uint32_t>(0U);
uint32_t parentPeerId = 0U;
if (peerObj["parentPeerId"].is<uint32_t>())
parentPeerId = (uint32_t)peerObj["parentPeerId"].getDefault<uint32_t>(0U);
uint32_t ccPeerId = (uint32_t)peerObj["controlChannel"].getDefault<uint32_t>(0U);
json::array voiceChannels = peerObj["voiceChannels"].get<json::array>();
@ -183,16 +187,20 @@ public:
std::ostringstream peerOss;
peerOss << std::setw(9) << std::setfill('0') << peerId;
// pad peer IDs properly
std::ostringstream parentPeerOss;
parentPeerOss << std::setw(9) << std::setfill('0') << parentPeerId;
// pad peer IDs properly
std::ostringstream ccPeerOss;
ccPeerOss << std::setw(9) << std::setfill('0') << ccPeerId;
// build list view entry
const std::array<std::string, 14U> columns = {
const std::array<std::string, 15U> columns = {
peerOss.str(),
identity, software,
peerAddress, std::to_string(port),
ccPeerOss.str(),
parentPeerOss.str(), ccPeerOss.str(),
std::to_string(voiceChannelPeers.size()),
(connected) ? "X" : "",
strConnState,
@ -256,6 +264,7 @@ private:
m_listView.addColumn("Software", 15);
m_listView.addColumn("IP Address", 15);
m_listView.addColumn("Port", 8);
m_listView.addColumn("Link Peer ID", 10);
m_listView.addColumn("CC Peer ID", 10);
m_listView.addColumn("VC Count", 8);
m_listView.addColumn("Connected", 5);
@ -270,6 +279,7 @@ private:
m_listView.setColumnAlignment(1, finalcut::Align::Right);
m_listView.setColumnAlignment(4, finalcut::Align::Right);
m_listView.setColumnAlignment(6, finalcut::Align::Center);
m_listView.setColumnAlignment(7, finalcut::Align::Center);
// set type of sorting
m_listView.setColumnSortType(1, finalcut::SortType::Name);

Loading…
Cancel
Save

Powered by TurnKey Linux.