diff --git a/src/common/network/RawFrameQueue.cpp b/src/common/network/RawFrameQueue.cpp
index b31b2889..2d28665a 100644
--- a/src/common/network/RawFrameQueue.cpp
+++ b/src/common/network/RawFrameQueue.cpp
@@ -32,6 +32,7 @@ using namespace network;
///
RawFrameQueue::RawFrameQueue(udp::Socket* socket, bool debug) :
m_socket(socket),
+ m_flushMutex(),
m_buffers(),
m_debug(debug)
{
@@ -145,25 +146,32 @@ void RawFrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, sock
///
bool RawFrameQueue::flushQueue()
{
- if (m_buffers.empty()) {
- return false;
- }
+ bool ret = true;
+ m_flushMutex.lock();
+ {
+ if (m_buffers.empty()) {
+ m_flushMutex.unlock();
+ return false;
+ }
- // bryanb: this is the same as above -- but for some assinine reason prevents
- // weirdness
- if (m_buffers.size() == 0U) {
- return false;
- }
+ // bryanb: this is the same as above -- but for some assinine reason prevents
+ // weirdness
+ if (m_buffers.size() == 0U) {
+ m_flushMutex.unlock();
+ return false;
+ }
- // LogDebug(LOG_NET, "m_buffers len = %u", m_buffers.size());
+ // LogDebug(LOG_NET, "m_buffers len = %u", m_buffers.size());
- bool ret = true;
- if (!m_socket->write(m_buffers)) {
- LogError(LOG_NET, "Failed writing data to the network");
- ret = false;
- }
+ ret = true;
+ if (!m_socket->write(m_buffers)) {
+ LogError(LOG_NET, "Failed writing data to the network");
+ ret = false;
+ }
- deleteBuffers();
+ deleteBuffers();
+ }
+ m_flushMutex.unlock();
return ret;
}
diff --git a/src/common/network/RawFrameQueue.h b/src/common/network/RawFrameQueue.h
index ae2ab1d6..58bdc524 100644
--- a/src/common/network/RawFrameQueue.h
+++ b/src/common/network/RawFrameQueue.h
@@ -17,6 +17,8 @@
#include "common/network/udp/Socket.h"
#include "common/Utils.h"
+#include
+
namespace network
{
// ---------------------------------------------------------------------------
@@ -57,6 +59,7 @@ namespace network
uint32_t m_addrLen;
udp::Socket* m_socket;
+ std::mutex m_flushMutex;
udp::BufferVector m_buffers;
bool m_debug;
diff --git a/src/fne/network/FNENetwork.cpp b/src/fne/network/FNENetwork.cpp
index 8436b1b4..cf531371 100644
--- a/src/fne/network/FNENetwork.cpp
+++ b/src/fne/network/FNENetwork.cpp
@@ -80,6 +80,7 @@ FNENetwork::FNENetwork(HostFNE* host, const std::string& address, uint16_t port,
m_ridLookup(nullptr),
m_tidLookup(nullptr),
m_status(NET_STAT_INVALID),
+ m_peerMutex(),
m_peers(),
m_peerAffiliations(),
m_maintainenceTimer(1000U, pingTime),
@@ -158,8 +159,6 @@ void FNENetwork::processNetwork()
return;
}
- uint64_t now = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count();
-
sockaddr_storage address;
uint32_t addrLen;
frame::RTPHeader rtpHeader;
@@ -173,559 +172,608 @@ void FNENetwork::processNetwork()
Utils::dump(1U, "Network Message", buffer.get(), length);
uint32_t peerId = fneHeader.getPeerId();
- uint32_t streamId = fneHeader.getStreamId();
- // update current peer packet sequence and stream ID
- if (peerId > 0 && (m_peers.find(peerId) != m_peers.end()) && streamId != 0U) {
- FNEPeerConnection* connection = m_peers[peerId];
- uint16_t pktSeq = rtpHeader.getSequence();
+ NetPacketRequest* req = new NetPacketRequest();
+ req->network = this;
+ req->peerId = peerId;
- if (connection != nullptr) {
- if (pktSeq == RTP_END_OF_CALL_SEQ) {
- connection->pktLastSeq(pktSeq);
- connection->pktNextSeq(0U);
- } else {
- if ((connection->currStreamId() == streamId) && (pktSeq != connection->pktNextSeq())) {
- LogWarning(LOG_NET, "PEER %u stream %u out-of-sequence; %u != %u", peerId, streamId, pktSeq, connection->pktNextSeq());
- }
+ req->address = address;
+ req->addrLen = addrLen;
+ req->rtpHeader = rtpHeader;
+ req->fneHeader = fneHeader;
+
+ req->length = length;
+ req->buffer = new uint8_t[length];
+ ::memcpy(req->buffer, buffer.get(), length);
+
+ ::pthread_create(&req->thread, NULL, threadedNetworkRx, req);
+ }
+ else {
+ // if the DMR handler has parrot frames to playback, playback a frame
+ if (m_tagDMR->hasParrotFrames()) {
+ m_tagDMR->playbackParrot();
+ }
+
+ // if the P25 handler has parrot frames to playback, playback a frame
+ if (m_tagP25->hasParrotFrames()) {
+ m_tagP25->playbackParrot();
+ }
+
+ // if the NXDN handler has parrot frames to playback, playback a frame
+ if (m_tagNXDN->hasParrotFrames()) {
+ m_tagNXDN->playbackParrot();
+ }
+ }
+}
+
+///
+/// Process a data frames from the network.
+///
+void* FNENetwork::threadedNetworkRx(void* arg)
+{
+ NetPacketRequest* req = (NetPacketRequest*)arg;
+ if (req != nullptr) {
+ uint64_t now = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count();
+
+ FNENetwork* network = req->network;
+ if (req->length > 0) {
+ uint32_t peerId = req->fneHeader.getPeerId();
+ uint32_t streamId = req->fneHeader.getStreamId();
+
+ std::stringstream peerName;
+ peerName << peerId << ":rx-pckt";
+ if (pthread_kill(req->thread, 0) == 0) {
+ ::pthread_setname_np(req->thread, peerName.str().c_str());
+ }
+
+ // update current peer packet sequence and stream ID
+ if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end()) && streamId != 0U) {
+ FNEPeerConnection* connection = network->m_peers[peerId];
+ uint16_t pktSeq = req->rtpHeader.getSequence();
- connection->currStreamId(streamId);
- connection->pktLastSeq(pktSeq);
- connection->pktNextSeq(pktSeq + 1);
- if (connection->pktNextSeq() > UINT16_MAX) {
+ if (connection != nullptr) {
+ if (pktSeq == RTP_END_OF_CALL_SEQ) {
+ connection->pktLastSeq(pktSeq);
connection->pktNextSeq(0U);
+ } else {
+ if ((connection->currStreamId() == streamId) && (pktSeq != connection->pktNextSeq())) {
+ LogWarning(LOG_NET, "PEER %u stream %u out-of-sequence; %u != %u", peerId, streamId, pktSeq, connection->pktNextSeq());
+ }
+
+ connection->currStreamId(streamId);
+ connection->pktLastSeq(pktSeq);
+ connection->pktNextSeq(pktSeq + 1);
+ if (connection->pktNextSeq() > UINT16_MAX) {
+ connection->pktNextSeq(0U);
+ }
}
}
+
+ network->m_peers[peerId] = connection;
}
- m_peers[peerId] = connection;
- }
+ // if we don't have a stream ID and are receiving call data -- throw an error and discard
+ if (streamId == 0 && req->fneHeader.getFunction() == NET_FUNC_PROTOCOL)
+ {
+ LogError(LOG_NET, "PEER %u malformed packet (no stream ID for a call?)", peerId);
+
+ if (req->buffer != nullptr)
+ delete req->buffer;
+ delete req;
- // if we don't have a stream ID and are receiving call data -- throw an error and discard
- if (streamId == 0 && fneHeader.getFunction() == NET_FUNC_PROTOCOL)
- {
- LogError(LOG_NET, "PEER %u malformed packet (no stream ID for a call?)", peerId);
- return;
- }
+ return nullptr;
+ }
- // process incoming message frame opcodes
- switch (fneHeader.getFunction()) {
- case NET_FUNC_PROTOCOL:
- {
- if (fneHeader.getSubFunction() == NET_PROTOCOL_SUBFUNC_DMR) { // Encapsulated DMR data frame
- if (peerId > 0 && (m_peers.find(peerId) != m_peers.end())) {
- FNEPeerConnection* connection = m_peers[peerId];
- if (connection != nullptr) {
- std::string ip = udp::Socket::address(address);
+ // process incoming message frame opcodes
+ switch (req->fneHeader.getFunction()) {
+ case NET_FUNC_PROTOCOL:
+ {
+ if (req->fneHeader.getSubFunction() == NET_PROTOCOL_SUBFUNC_DMR) { // Encapsulated DMR data frame
+ if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
+ FNEPeerConnection* connection = network->m_peers[peerId];
+ if (connection != nullptr) {
+ std::string ip = udp::Socket::address(req->address);
+ connection->lastPing(now);
- // validate peer (simple validation really)
- if (connection->connected() && connection->address() == ip) {
- if (m_dmrEnabled) {
- if (m_tagDMR != nullptr) {
- m_tagDMR->processFrame(buffer.get(), length, peerId, rtpHeader.getSequence(), streamId);
+ // validate peer (simple validation really)
+ if (connection->connected() && connection->address() == ip) {
+ if (network->m_dmrEnabled) {
+ if (network->m_tagDMR != nullptr) {
+ network->m_tagDMR->processFrame(req->buffer, req->length, peerId, req->rtpHeader.getSequence(), streamId);
+ }
}
}
}
}
+ else {
+ network->writePeerNAK(peerId, TAG_DMR_DATA, NET_CONN_NAK_FNE_UNAUTHORIZED);
+ }
}
- else {
- writePeerNAK(peerId, TAG_DMR_DATA, NET_CONN_NAK_FNE_UNAUTHORIZED);
- }
- }
- else if (fneHeader.getSubFunction() == NET_PROTOCOL_SUBFUNC_P25) { // Encapsulated P25 data frame
- if (peerId > 0 && (m_peers.find(peerId) != m_peers.end())) {
- FNEPeerConnection* connection = m_peers[peerId];
- if (connection != nullptr) {
- std::string ip = udp::Socket::address(address);
+ else if (req->fneHeader.getSubFunction() == NET_PROTOCOL_SUBFUNC_P25) { // Encapsulated P25 data frame
+ if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
+ FNEPeerConnection* connection = network->m_peers[peerId];
+ if (connection != nullptr) {
+ std::string ip = udp::Socket::address(req->address);
+ connection->lastPing(now);
- // validate peer (simple validation really)
- if (connection->connected() && connection->address() == ip) {
- if (m_p25Enabled) {
- if (m_tagP25 != nullptr) {
- m_tagP25->processFrame(buffer.get(), length, peerId, rtpHeader.getSequence(), streamId);
+ // validate peer (simple validation really)
+ if (connection->connected() && connection->address() == ip) {
+ if (network->m_p25Enabled) {
+ if (network->m_tagP25 != nullptr) {
+ network->m_tagP25->processFrame(req->buffer, req->length, peerId, req->rtpHeader.getSequence(), streamId);
+ }
}
}
}
}
+ else {
+ network->writePeerNAK(peerId, TAG_P25_DATA, NET_CONN_NAK_FNE_UNAUTHORIZED);
+ }
}
- else {
- writePeerNAK(peerId, TAG_P25_DATA, NET_CONN_NAK_FNE_UNAUTHORIZED);
- }
- }
- else if (fneHeader.getSubFunction() == NET_PROTOCOL_SUBFUNC_NXDN) { // Encapsulated NXDN data frame
- if (peerId > 0 && (m_peers.find(peerId) != m_peers.end())) {
- FNEPeerConnection* connection = m_peers[peerId];
- if (connection != nullptr) {
- std::string ip = udp::Socket::address(address);
+ else if (req->fneHeader.getSubFunction() == NET_PROTOCOL_SUBFUNC_NXDN) { // Encapsulated NXDN data frame
+ if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
+ FNEPeerConnection* connection = network->m_peers[peerId];
+ if (connection != nullptr) {
+ std::string ip = udp::Socket::address(req->address);
+ connection->lastPing(now);
- // validate peer (simple validation really)
- if (connection->connected() && connection->address() == ip) {
- if (m_nxdnEnabled) {
- if (m_tagNXDN != nullptr) {
- m_tagNXDN->processFrame(buffer.get(), length, peerId, rtpHeader.getSequence(), streamId);
+ // validate peer (simple validation really)
+ if (connection->connected() && connection->address() == ip) {
+ if (network->m_nxdnEnabled) {
+ if (network->m_tagNXDN != nullptr) {
+ network->m_tagNXDN->processFrame(req->buffer, req->length, peerId, req->rtpHeader.getSequence(), streamId);
+ }
}
}
}
}
+ else {
+ network->writePeerNAK(peerId, TAG_NXDN_DATA, NET_CONN_NAK_FNE_UNAUTHORIZED);
+ }
}
else {
- writePeerNAK(peerId, TAG_NXDN_DATA, NET_CONN_NAK_FNE_UNAUTHORIZED);
+ Utils::dump("Unknown protocol opcode from peer", req->buffer, req->length);
}
}
- else {
- Utils::dump("Unknown protocol opcode from peer", buffer.get(), length);
- }
- }
- break;
+ break;
+
+ case NET_FUNC_RPTL: // Repeater Login
+ {
+ if (peerId > 0 && (network->m_peers.find(peerId) == network->m_peers.end())) {
+ if (network->m_peers.size() >= MAX_HARD_CONN_CAP) {
+ LogError(LOG_NET, "PEER %u attempted to connect with no more connections available, currConnections = %u", peerId, network->m_peers.size());
+ network->writePeerNAK(peerId, TAG_REPEATER_LOGIN, NET_CONN_NAK_FNE_MAX_CONN, req->address, req->addrLen);
+ break;
+ }
- case NET_FUNC_RPTL: // Repeater Login
- {
- if (peerId > 0 && (m_peers.find(peerId) == m_peers.end())) {
- if (m_peers.size() >= MAX_HARD_CONN_CAP) {
- LogError(LOG_NET, "PEER %u attempted to connect with no more connections available, currConnections = %u", peerId, m_peers.size());
- writePeerNAK(peerId, TAG_REPEATER_LOGIN, NET_CONN_NAK_FNE_MAX_CONN, address, addrLen);
- break;
- }
+ if (network->m_softConnLimit > 0U && network->m_peers.size() >= network->m_softConnLimit) {
+ LogError(LOG_NET, "PEER %u attempted to connect with no more connections available, maxConnections = %u, currConnections = %u", peerId, network->m_softConnLimit, network->m_peers.size());
+ network->writePeerNAK(peerId, TAG_REPEATER_LOGIN, NET_CONN_NAK_FNE_MAX_CONN, req->address, req->addrLen);
+ break;
+ }
- if (m_softConnLimit > 0U && m_peers.size() >= m_softConnLimit) {
- LogError(LOG_NET, "PEER %u attempted to connect with no more connections available, maxConnections = %u, currConnections = %u", peerId, m_softConnLimit, m_peers.size());
- writePeerNAK(peerId, TAG_REPEATER_LOGIN, NET_CONN_NAK_FNE_MAX_CONN, address, addrLen);
- break;
+ FNEPeerConnection* connection = new FNEPeerConnection(peerId, req->address, req->addrLen);
+ connection->lastPing(now);
+ connection->currStreamId(streamId);
+
+ network->setupRepeaterLogin(peerId, connection);
}
+ else {
+ // check if the peer is in our peer list -- if he is, and he isn't in a running state, reset
+ // the login sequence
+ if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
+ FNEPeerConnection* connection = network->m_peers[peerId];
+ if (connection != nullptr) {
+ if (connection->connectionState() == NET_STAT_RUNNING) {
+ LogMessage(LOG_NET, "PEER %u resetting peer connection, connectionState = %u", peerId, connection->connectionState());
+ delete connection;
- FNEPeerConnection* connection = new FNEPeerConnection(peerId, address, addrLen);
- connection->lastPing(now);
- connection->currStreamId(streamId);
+ connection = new FNEPeerConnection(peerId, req->address, req->addrLen);
+ connection->lastPing(now);
+ connection->currStreamId(streamId);
- setupRepeaterLogin(peerId, connection);
- }
- else {
- // check if the peer is in our peer list -- if he is, and he isn't in a running state, reset
- // the login sequence
- if (peerId > 0 && (m_peers.find(peerId) != m_peers.end())) {
- FNEPeerConnection* connection = m_peers[peerId];
- if (connection != nullptr) {
- if (connection->connectionState() == NET_STAT_RUNNING) {
- LogMessage(LOG_NET, "PEER %u resetting peer connection, connectionState = %u", peerId, connection->connectionState());
- delete connection;
+ lookups::AffiliationLookup* affLookup = network->m_peerAffiliations[peerId];
+ if (affLookup != nullptr)
+ delete affLookup;
+ network->m_peerAffiliations.erase(peerId);
- connection = new FNEPeerConnection(peerId, address, addrLen);
- connection->lastPing(now);
- connection->currStreamId(streamId);
+ network->setupRepeaterLogin(peerId, connection);
+ } else {
+ network->writePeerNAK(peerId, TAG_REPEATER_LOGIN, NET_CONN_NAK_BAD_CONN_STATE, req->address, req->addrLen);
- lookups::AffiliationLookup* affLookup = m_peerAffiliations[peerId];
- if (affLookup != nullptr)
- delete affLookup;
- m_peerAffiliations.erase(peerId);
+ LogWarning(LOG_NET, "PEER %u RPTL NAK, bad connection state, connectionState = %u", peerId, connection->connectionState());
- setupRepeaterLogin(peerId, connection);
+ delete connection;
+ network->erasePeer(peerId);
+ }
} else {
- writePeerNAK(peerId, TAG_REPEATER_LOGIN, NET_CONN_NAK_BAD_CONN_STATE, address, addrLen);
-
- LogWarning(LOG_NET, "PEER %u RPTL NAK, bad connection state, connectionState = %u", peerId, connection->connectionState());
+ network->writePeerNAK(peerId, TAG_REPEATER_LOGIN, NET_CONN_NAK_BAD_CONN_STATE, req->address, req->addrLen);
- delete connection;
- erasePeer(peerId);
+ network->erasePeer(peerId);
+ LogWarning(LOG_NET, "PEER %u RPTL NAK, having no connection", peerId);
}
- } else {
- writePeerNAK(peerId, TAG_REPEATER_LOGIN, NET_CONN_NAK_BAD_CONN_STATE, address, addrLen);
-
- erasePeer(peerId);
- LogWarning(LOG_NET, "PEER %u RPTL NAK, having no connection", peerId);
}
}
}
- }
- break;
- case NET_FUNC_RPTK: // Repeater Authentication
- {
- if (peerId > 0 && (m_peers.find(peerId) != m_peers.end())) {
- FNEPeerConnection* connection = m_peers[peerId];
- if (connection != nullptr) {
- connection->lastPing(now);
+ break;
+ case NET_FUNC_RPTK: // Repeater Authentication
+ {
+ if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
+ FNEPeerConnection* connection = network->m_peers[peerId];
+ if (connection != nullptr) {
+ connection->lastPing(now);
- if (connection->connectionState() == NET_STAT_WAITING_AUTHORISATION) {
- // get the hash from the frame message
- uint8_t hash[length - 8U];
- ::memset(hash, 0x00U, length - 8U);
- ::memcpy(hash, buffer.get() + 8U, length - 8U);
-
- // generate our own hash
- uint8_t salt[4U];
- ::memset(salt, 0x00U, 4U);
- __SET_UINT32(connection->salt(), salt, 0U);
-
- size_t size = m_password.size();
- uint8_t* in = new uint8_t[size + sizeof(uint32_t)];
- ::memcpy(in, salt, sizeof(uint32_t));
- for (size_t i = 0U; i < size; i++)
- in[i + sizeof(uint32_t)] = m_password.at(i);
-
- uint8_t out[32U];
- edac::SHA256 sha256;
- sha256.buffer(in, (uint32_t)(size + sizeof(uint32_t)), out);
-
- delete[] in;
-
- // validate hash
- bool valid = false;
- if (length - 8U == 32U) {
- valid = true;
- for (uint8_t i = 0; i < 32U; i++) {
- if (hash[i] != out[i]) {
- valid = false;
- break;
+ if (connection->connectionState() == NET_STAT_WAITING_AUTHORISATION) {
+ // get the hash from the frame message
+ uint8_t hash[req->length - 8U];
+ ::memset(hash, 0x00U, req->length - 8U);
+ ::memcpy(hash, req->buffer + 8U, req->length - 8U);
+
+ // generate our own hash
+ uint8_t salt[4U];
+ ::memset(salt, 0x00U, 4U);
+ __SET_UINT32(connection->salt(), salt, 0U);
+
+ size_t size = network->m_password.size();
+ uint8_t* in = new uint8_t[size + sizeof(uint32_t)];
+ ::memcpy(in, salt, sizeof(uint32_t));
+ for (size_t i = 0U; i < size; i++)
+ in[i + sizeof(uint32_t)] = network->m_password.at(i);
+
+ uint8_t out[32U];
+ edac::SHA256 sha256;
+ sha256.buffer(in, (uint32_t)(size + sizeof(uint32_t)), out);
+
+ delete[] in;
+
+ // validate hash
+ bool valid = false;
+ if (req->length - 8U == 32U) {
+ valid = true;
+ for (uint8_t i = 0; i < 32U; i++) {
+ if (hash[i] != out[i]) {
+ valid = false;
+ break;
+ }
}
}
- }
- if (valid) {
- connection->connectionState(NET_STAT_WAITING_CONFIG);
- writePeerACK(peerId);
- LogInfoEx(LOG_NET, "PEER %u RPTK ACK, completed the login exchange", peerId);
+ if (valid) {
+ connection->connectionState(NET_STAT_WAITING_CONFIG);
+ network->writePeerACK(peerId);
+ LogInfoEx(LOG_NET, "PEER %u RPTK ACK, completed the login exchange", peerId);
+ }
+ else {
+ LogWarning(LOG_NET, "PEER %u RPTK NAK, failed the login exchange", peerId);
+ network->writePeerNAK(peerId, TAG_REPEATER_AUTH, NET_CONN_NAK_FNE_UNAUTHORIZED);
+ network->erasePeer(peerId);
+ }
+
+ network->m_peers[peerId] = connection;
}
else {
- LogWarning(LOG_NET, "PEER %u RPTK NAK, failed the login exchange", peerId);
- writePeerNAK(peerId, TAG_REPEATER_AUTH, NET_CONN_NAK_FNE_UNAUTHORIZED);
- erasePeer(peerId);
+ LogWarning(LOG_NET, "PEER %u RPTK NAK, login exchange while in an incorrect state, connectionState = %u", peerId, connection->connectionState());
+ network->writePeerNAK(peerId, TAG_REPEATER_AUTH, NET_CONN_NAK_BAD_CONN_STATE);
+ network->erasePeer(peerId);
}
-
- m_peers[peerId] = connection;
- }
- else {
- LogWarning(LOG_NET, "PEER %u RPTK NAK, login exchange while in an incorrect state, connectionState = %u", peerId, connection->connectionState());
- writePeerNAK(peerId, TAG_REPEATER_AUTH, NET_CONN_NAK_BAD_CONN_STATE);
- erasePeer(peerId);
}
}
+ else {
+ network->writePeerNAK(peerId, TAG_REPEATER_AUTH, NET_CONN_NAK_BAD_CONN_STATE, req->address, req->addrLen);
+ LogWarning(LOG_NET, "PEER %u RPTK NAK, having no connection", peerId);
+ }
}
- else {
- writePeerNAK(peerId, TAG_REPEATER_AUTH, NET_CONN_NAK_BAD_CONN_STATE, address, addrLen);
- LogWarning(LOG_NET, "PEER %u RPTK NAK, having no connection", peerId);
- }
- }
- break;
- case NET_FUNC_RPTC: // Repeater Configuration
- {
- if (peerId > 0 && (m_peers.find(peerId) != m_peers.end())) {
- FNEPeerConnection* connection = m_peers[peerId];
- if (connection != nullptr) {
- connection->lastPing(now);
+ break;
+ case NET_FUNC_RPTC: // Repeater Configuration
+ {
+ if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
+ FNEPeerConnection* connection = network->m_peers[peerId];
+ if (connection != nullptr) {
+ connection->lastPing(now);
- if (connection->connectionState() == NET_STAT_WAITING_CONFIG) {
- uint8_t rawPayload[length - 8U];
- ::memset(rawPayload, 0x00U, length - 8U);
- ::memcpy(rawPayload, buffer.get() + 8U, length - 8U);
- std::string payload(rawPayload, rawPayload + (length - 8U));
-
- // parse JSON body
- json::value v;
- std::string err = json::parse(v, payload);
- if (!err.empty()) {
- LogWarning(LOG_NET, "PEER %u RPTC NAK, supplied invalid configuration data", peerId);
- writePeerNAK(peerId, TAG_REPEATER_AUTH, NET_CONN_NAK_INVALID_CONFIG_DATA);
- erasePeer(peerId);
- }
- else {
- // ensure parsed JSON is an object
- if (!v.is()) {
+ if (connection->connectionState() == NET_STAT_WAITING_CONFIG) {
+ uint8_t rawPayload[req->length - 8U];
+ ::memset(rawPayload, 0x00U, req->length - 8U);
+ ::memcpy(rawPayload, req->buffer + 8U, req->length - 8U);
+ std::string payload(rawPayload, rawPayload + (req->length - 8U));
+
+ // parse JSON body
+ json::value v;
+ std::string err = json::parse(v, payload);
+ if (!err.empty()) {
LogWarning(LOG_NET, "PEER %u RPTC NAK, supplied invalid configuration data", peerId);
- writePeerNAK(peerId, TAG_REPEATER_AUTH, NET_CONN_NAK_INVALID_CONFIG_DATA);
- erasePeer(peerId);
+ network->writePeerNAK(peerId, TAG_REPEATER_AUTH, NET_CONN_NAK_INVALID_CONFIG_DATA);
+ network->erasePeer(peerId);
}
- else {
- connection->config(v.get());
- connection->connectionState(NET_STAT_RUNNING);
- connection->connected(true);
- connection->pingsReceived(0U);
- connection->lastPing(now);
- connection->lastACLUpdate(now);
- m_peers[peerId] = connection;
-
- writePeerACK(peerId);
- LogInfoEx(LOG_NET, "PEER %u RPTC ACK, completed the configuration exchange", peerId);
-
- json::object peerConfig = connection->config();
- if (peerConfig["software"].is()) {
- std::string software = peerConfig["software"].get();
- LogInfoEx(LOG_NET, "PEER %u reports software %s", peerId, software.c_str());
+ else {
+ // ensure parsed JSON is an object
+ if (!v.is()) {
+ LogWarning(LOG_NET, "PEER %u RPTC NAK, supplied invalid configuration data", peerId);
+ network->writePeerNAK(peerId, TAG_REPEATER_AUTH, NET_CONN_NAK_INVALID_CONFIG_DATA);
+ network->erasePeer(peerId);
}
+ else {
+ connection->config(v.get());
+ connection->connectionState(NET_STAT_RUNNING);
+ connection->connected(true);
+ connection->pingsReceived(0U);
+ connection->lastPing(now);
+ connection->lastACLUpdate(now);
+ network->m_peers[peerId] = connection;
+
+ network->writePeerACK(peerId);
+ LogInfoEx(LOG_NET, "PEER %u RPTC ACK, completed the configuration exchange", peerId);
+
+ json::object peerConfig = connection->config();
+ if (peerConfig["software"].is()) {
+ std::string software = peerConfig["software"].get();
+ LogInfoEx(LOG_NET, "PEER %u reports software %s", peerId, software.c_str());
+ }
+
+ // setup the affiliations list for this peer
+ std::stringstream peerName;
+ peerName << "PEER " << peerId;
+ network->m_peerAffiliations[peerId] = new lookups::AffiliationLookup(peerName.str(), network->m_verbose);
+
+ // spin up a thread and send ACL list over to peer
+ network->peerACLUpdate(peerId);
+ }
+ }
+ }
+ else {
+ LogWarning(LOG_NET, "PEER %u RPTC NAK, login exchange while in an incorrect state, connectionState = %u", peerId, connection->connectionState());
+ network->writePeerNAK(peerId, TAG_REPEATER_CONFIG, NET_CONN_NAK_BAD_CONN_STATE);
+ network->erasePeer(peerId);
+ }
+ }
+ }
+ else {
+ network->writePeerNAK(peerId, TAG_REPEATER_CONFIG, NET_CONN_NAK_BAD_CONN_STATE, req->address, req->addrLen);
+ LogWarning(LOG_NET, "PEER %u RPTC NAK, having no connection", peerId);
+ }
+ }
+ break;
- // setup the affiliations list for this peer
- std::stringstream peerName;
- peerName << "PEER " << peerId;
- m_peerAffiliations[peerId] = new lookups::AffiliationLookup(peerName.str(), m_verbose);
+ case NET_FUNC_RPT_CLOSING: // Repeater Closing (Disconnect)
+ {
+ if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
+ FNEPeerConnection* connection = network->m_peers[peerId];
+ if (connection != nullptr) {
+ std::string ip = udp::Socket::address(req->address);
- // spin up a thread and send ACL list over to peer
- peerACLUpdate(peerId);
+ // validate peer (simple validation really)
+ if (connection->connected() && connection->address() == ip) {
+ LogInfoEx(LOG_NET, "PEER %u is closing down", peerId);
+ if (network->erasePeer(peerId)) {
+ network->erasePeerAffiliations(peerId);
+ delete connection;
}
}
}
- else {
- LogWarning(LOG_NET, "PEER %u RPTC NAK, login exchange while in an incorrect state, connectionState = %u", peerId, connection->connectionState());
- writePeerNAK(peerId, TAG_REPEATER_CONFIG, NET_CONN_NAK_BAD_CONN_STATE);
- erasePeer(peerId);
- }
}
}
- else {
- writePeerNAK(peerId, TAG_REPEATER_CONFIG, NET_CONN_NAK_BAD_CONN_STATE, address, addrLen);
- LogWarning(LOG_NET, "PEER %u RPTC NAK, having no connection", peerId);
- }
- }
- break;
+ break;
+ case NET_FUNC_PING: // Repeater Ping
+ {
+ if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
+ FNEPeerConnection* connection = network->m_peers[peerId];
+ if (connection != nullptr) {
+ std::string ip = udp::Socket::address(req->address);
- case NET_FUNC_RPT_CLOSING: // Repeater Closing (Disconnect)
- {
- if (peerId > 0 && (m_peers.find(peerId) != m_peers.end())) {
- FNEPeerConnection* connection = m_peers[peerId];
- if (connection != nullptr) {
- std::string ip = udp::Socket::address(address);
-
- // validate peer (simple validation really)
- if (connection->connected() && connection->address() == ip) {
- LogInfoEx(LOG_NET, "PEER %u is closing down", peerId);
- if (erasePeer(peerId)) {
- erasePeerAffiliations(peerId);
- delete connection;
+ // validate peer (simple validation really)
+ if (connection->connected() && connection->address() == ip) {
+ uint32_t pingsRx = connection->pingsReceived();
+ uint64_t lastPing = connection->lastPing();
+ pingsRx++;
+
+ connection->pingsReceived(pingsRx);
+ connection->lastPing(now);
+ connection->pktLastSeq(connection->pktLastSeq() + 1);
+
+ // does this peer need an ACL update?
+ uint64_t dt = connection->lastACLUpdate() + network->m_updateLookupTime;
+ if (dt < now) {
+ LogInfoEx(LOG_NET, "PEER %u updating ACL list, dt = %u, now = %u", peerId, dt, now);
+ network->peerACLUpdate(peerId);
+ connection->lastACLUpdate(now);
+ }
+
+ network->m_peers[peerId] = connection;
+ network->writePeerCommand(peerId, { NET_FUNC_PONG, NET_SUBFUNC_NOP });
+
+ if (network->m_reportPeerPing) {
+ LogInfoEx(LOG_NET, "PEER %u ping, pingsReceived = %u, lastPing = %u", peerId, connection->pingsReceived(), lastPing);
+ }
+ }
+ else {
+ network->writePeerNAK(peerId, TAG_REPEATER_PING);
}
}
}
}
- }
- break;
- case NET_FUNC_PING: // Repeater Ping
- {
- if (peerId > 0 && (m_peers.find(peerId) != m_peers.end())) {
- FNEPeerConnection* connection = m_peers[peerId];
- if (connection != nullptr) {
- std::string ip = udp::Socket::address(address);
-
- // validate peer (simple validation really)
- if (connection->connected() && connection->address() == ip) {
- uint32_t pingsRx = connection->pingsReceived();
- uint64_t lastPing = connection->lastPing();
- pingsRx++;
-
- connection->pingsReceived(pingsRx);
- connection->lastPing(now);
- connection->pktLastSeq(connection->pktLastSeq() + 1);
-
- // does this peer need an ACL update?
- uint64_t dt = connection->lastACLUpdate() + m_updateLookupTime;
- if (dt < now) {
- LogInfoEx(LOG_NET, "PEER %u updating ACL list, dt = %u, now = %u", peerId, dt, now);
- peerACLUpdate(peerId);
- connection->lastACLUpdate(now);
- }
+ break;
- m_peers[peerId] = connection;
- writePeerCommand(peerId, { NET_FUNC_PONG, NET_SUBFUNC_NOP });
+ case NET_FUNC_GRANT_REQ: // Repeater Grant Request
+ {
+ if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
+ FNEPeerConnection* connection = network->m_peers[peerId];
+ if (connection != nullptr) {
+ std::string ip = udp::Socket::address(req->address);
- if (m_reportPeerPing) {
- LogInfoEx(LOG_NET, "PEER %u ping, pingsReceived = %u, lastPing = %u", peerId, connection->pingsReceived(), lastPing);
+ // validate peer (simple validation really)
+ if (connection->connected() && connection->address() == ip) {
+ /* ignored */
+ }
+ else {
+ network->writePeerNAK(peerId, TAG_REPEATER_GRANT, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
- }
- else {
- writePeerNAK(peerId, TAG_REPEATER_PING);
}
}
}
- }
- break;
-
- case NET_FUNC_GRANT_REQ: // Repeater Grant Request
- {
- if (peerId > 0 && (m_peers.find(peerId) != m_peers.end())) {
- FNEPeerConnection* connection = m_peers[peerId];
- if (connection != nullptr) {
- std::string ip = udp::Socket::address(address);
-
- // validate peer (simple validation really)
- if (connection->connected() && connection->address() == ip) {
- /* ignored */
+ break;
+
+ case NET_FUNC_TRANSFER:
+ {
+ if (req->fneHeader.getSubFunction() == NET_TRANSFER_SUBFUNC_ACTIVITY) { // Peer Activity Log Transfer
+ if (network->m_allowActivityTransfer) {
+ if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
+ FNEPeerConnection* connection = network->m_peers[peerId];
+ if (connection != nullptr) {
+ std::string ip = udp::Socket::address(req->address);
+
+ // validate peer (simple validation really)
+ if (connection->connected() && connection->address() == ip) {
+ uint8_t rawPayload[req->length - 11U];
+ ::memset(rawPayload, 0x00U, req->length - 11U);
+ ::memcpy(rawPayload, req->buffer + 11U, req->length - 11U);
+ std::string payload(rawPayload, rawPayload + (req->length - 11U));
+
+ ::ActivityLog("%u %s", peerId, payload.c_str());
+ }
+ else {
+ network->writePeerNAK(peerId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED);
+ }
+ }
+ }
}
- else {
- writePeerNAK(peerId, TAG_REPEATER_GRANT, NET_CONN_NAK_FNE_UNAUTHORIZED);
+ }
+ else if (req->fneHeader.getSubFunction() == NET_TRANSFER_SUBFUNC_DIAG) { // Peer Diagnostic Log Transfer
+ if (network->m_allowDiagnosticTransfer) {
+ if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
+ FNEPeerConnection* connection = network->m_peers[peerId];
+ if (connection != nullptr) {
+ std::string ip = udp::Socket::address(req->address);
+
+ // validate peer (simple validation really)
+ if (connection->connected() && connection->address() == ip) {
+ uint8_t rawPayload[req->length - 11U];
+ ::memset(rawPayload, 0x00U, req->length - 11U);
+ ::memcpy(rawPayload, req->buffer + 11U, req->length - 11U);
+ std::string payload(rawPayload, rawPayload + (req->length - 11U));
+
+ bool currState = g_disableTimeDisplay;
+ g_disableTimeDisplay = true;
+ ::Log(9999U, nullptr, "%u %s", peerId, payload.c_str());
+ g_disableTimeDisplay = currState;
+ }
+ else {
+ network->writePeerNAK(peerId, TAG_TRANSFER_DIAG_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED);
+ }
+ }
+ }
}
}
+ else {
+ Utils::dump("unknown transfer opcode from the peer", req->buffer, req->length);
+ }
}
- }
- break;
+ break;
- case NET_FUNC_TRANSFER:
- {
- if (fneHeader.getSubFunction() == NET_TRANSFER_SUBFUNC_ACTIVITY) { // Peer Activity Log Transfer
- if (m_allowActivityTransfer) {
- if (peerId > 0 && (m_peers.find(peerId) != m_peers.end())) {
- FNEPeerConnection* connection = m_peers[peerId];
+ case NET_FUNC_ANNOUNCE:
+ {
+ if (req->fneHeader.getSubFunction() == NET_ANNC_SUBFUNC_GRP_AFFIL) { // Announce Group Affiliation
+ if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
+ FNEPeerConnection* connection = network->m_peers[peerId];
if (connection != nullptr) {
- std::string ip = udp::Socket::address(address);
+ std::string ip = udp::Socket::address(req->address);
+ lookups::AffiliationLookup* aff = network->m_peerAffiliations[peerId];
// validate peer (simple validation really)
if (connection->connected() && connection->address() == ip) {
- uint8_t rawPayload[length - 11U];
- ::memset(rawPayload, 0x00U, length - 11U);
- ::memcpy(rawPayload, buffer.get() + 11U, length - 11U);
- std::string payload(rawPayload, rawPayload + (length - 11U));
-
- ::ActivityLog("%u %s", peerId, payload.c_str());
+ uint32_t srcId = __GET_UINT16(req->buffer, 0U);
+ uint32_t dstId = __GET_UINT16(req->buffer, 3U);
+ aff->groupUnaff(srcId);
+ aff->groupAff(srcId, dstId);
}
else {
- writePeerNAK(peerId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED);
+ network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
}
- }
- else if (fneHeader.getSubFunction() == NET_TRANSFER_SUBFUNC_DIAG) { // Peer Diagnostic Log Transfer
- if (m_allowDiagnosticTransfer) {
- if (peerId > 0 && (m_peers.find(peerId) != m_peers.end())) {
- FNEPeerConnection* connection = m_peers[peerId];
+ else if (req->fneHeader.getSubFunction() == NET_ANNC_SUBFUNC_UNIT_REG) { // Announce Unit Registration
+ if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
+ FNEPeerConnection* connection = network->m_peers[peerId];
if (connection != nullptr) {
- std::string ip = udp::Socket::address(address);
+ std::string ip = udp::Socket::address(req->address);
+ lookups::AffiliationLookup* aff = network->m_peerAffiliations[peerId];
// validate peer (simple validation really)
if (connection->connected() && connection->address() == ip) {
- uint8_t rawPayload[length - 11U];
- ::memset(rawPayload, 0x00U, length - 11U);
- ::memcpy(rawPayload, buffer.get() + 11U, length - 11U);
- std::string payload(rawPayload, rawPayload + (length - 11U));
-
- bool currState = g_disableTimeDisplay;
- g_disableTimeDisplay = true;
- ::Log(9999U, nullptr, "%u %s", peerId, payload.c_str());
- g_disableTimeDisplay = currState;
+ uint32_t srcId = __GET_UINT16(req->buffer, 0U);
+ aff->unitReg(srcId);
}
else {
- writePeerNAK(peerId, TAG_TRANSFER_DIAG_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED);
+ network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
}
}
}
}
- }
- else {
- Utils::dump("unknown transfer opcode from the peer", buffer.get(), length);
- }
- }
- break;
-
- case NET_FUNC_ANNOUNCE:
- {
- if (fneHeader.getSubFunction() == NET_ANNC_SUBFUNC_GRP_AFFIL) { // Announce Group Affiliation
- if (peerId > 0 && (m_peers.find(peerId) != m_peers.end())) {
- FNEPeerConnection* connection = m_peers[peerId];
- if (connection != nullptr) {
- std::string ip = udp::Socket::address(address);
- lookups::AffiliationLookup* aff = m_peerAffiliations[peerId];
+ else if (req->fneHeader.getSubFunction() == NET_ANNC_SUBFUNC_UNIT_DEREG) { // Announce Unit Deregistration
+ if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
+ FNEPeerConnection* connection = network->m_peers[peerId];
+ if (connection != nullptr) {
+ std::string ip = udp::Socket::address(req->address);
+ lookups::AffiliationLookup* aff = network->m_peerAffiliations[peerId];
- // validate peer (simple validation really)
- if (connection->connected() && connection->address() == ip) {
- uint32_t srcId = __GET_UINT16(buffer.get(), 0U);
- uint32_t dstId = __GET_UINT16(buffer.get(), 3U);
- aff->groupUnaff(srcId);
- aff->groupAff(srcId, dstId);
- }
- else {
- writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
+ // validate peer (simple validation really)
+ if (connection->connected() && connection->address() == ip) {
+ uint32_t srcId = __GET_UINT16(req->buffer, 0U);
+ aff->unitDereg(srcId);
+ }
+ else {
+ network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
+ }
}
}
}
- }
- else if (fneHeader.getSubFunction() == NET_ANNC_SUBFUNC_UNIT_REG) { // Announce Unit Registration
- if (peerId > 0 && (m_peers.find(peerId) != m_peers.end())) {
- FNEPeerConnection* connection = m_peers[peerId];
- if (connection != nullptr) {
- std::string ip = udp::Socket::address(address);
- lookups::AffiliationLookup* aff = m_peerAffiliations[peerId];
+ else if (req->fneHeader.getSubFunction() == NET_ANNC_SUBFUNC_AFFILS) { // Announce Update All Affiliations
+ if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) {
+ FNEPeerConnection* connection = network->m_peers[peerId];
+ if (connection != nullptr) {
+ std::string ip = udp::Socket::address(req->address);
- // validate peer (simple validation really)
- if (connection->connected() && connection->address() == ip) {
- uint32_t srcId = __GET_UINT16(buffer.get(), 0U);
- aff->unitReg(srcId);
- }
- else {
- writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
+ // validate peer (simple validation really)
+ if (connection->connected() && connection->address() == ip) {
+ lookups::AffiliationLookup* aff = network->m_peerAffiliations[peerId];
+ aff->clearGroupAff(0U, true);
+
+ // update TGID lists
+ uint32_t len = __GET_UINT32(req->buffer, 0U);
+ uint32_t offs = 4U;
+ for (uint32_t i = 0; i < len; i++) {
+ uint32_t srcId = __GET_UINT16(req->buffer, offs);
+ uint32_t dstId = __GET_UINT16(req->buffer, offs + 4U);
+
+ aff->groupAff(srcId, dstId);
+ offs += 8U;
+ }
+ LogMessage(LOG_NET, "PEER %u announced %u affiliations", peerId, len);
+ }
+ else {
+ network->writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
+ }
}
}
}
- }
- else if (fneHeader.getSubFunction() == NET_ANNC_SUBFUNC_UNIT_DEREG) { // Announce Unit Deregistration
- if (peerId > 0 && (m_peers.find(peerId) != m_peers.end())) {
- FNEPeerConnection* connection = m_peers[peerId];
- if (connection != nullptr) {
- std::string ip = udp::Socket::address(address);
- lookups::AffiliationLookup* aff = m_peerAffiliations[peerId];
-
- // validate peer (simple validation really)
- if (connection->connected() && connection->address() == ip) {
- uint32_t srcId = __GET_UINT16(buffer.get(), 0U);
- aff->unitDereg(srcId);
- }
- else {
- writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
- }
- }
+ else {
+ Utils::dump("unknown announcement opcode from the peer", req->buffer, req->length);
}
}
- else if (fneHeader.getSubFunction() == NET_ANNC_SUBFUNC_AFFILS) { // Announce Update All Affiliations
- if (peerId > 0 && (m_peers.find(peerId) != m_peers.end())) {
- FNEPeerConnection* connection = m_peers[peerId];
- if (connection != nullptr) {
- std::string ip = udp::Socket::address(address);
-
- // validate peer (simple validation really)
- if (connection->connected() && connection->address() == ip) {
- lookups::AffiliationLookup* aff = m_peerAffiliations[peerId];
- aff->clearGroupAff(0U, true);
+ break;
- // update TGID lists
- uint32_t len = __GET_UINT32(buffer.get(), 0U);
- uint32_t offs = 4U;
- for (uint32_t i = 0; i < len; i++) {
- uint32_t srcId = __GET_UINT16(buffer.get(), offs);
- uint32_t dstId = __GET_UINT16(buffer.get(), offs + 4U);
-
- aff->groupAff(srcId, dstId);
- offs += 8U;
- }
- LogMessage(LOG_NET, "PEER %u announced %u affiliations", peerId, len);
- }
- else {
- writePeerNAK(peerId, TAG_ANNOUNCE, NET_CONN_NAK_FNE_UNAUTHORIZED);
- }
- }
- }
- }
- else {
- Utils::dump("unknown announcement opcode from the peer", buffer.get(), length);
- }
+ default:
+ Utils::dump("unknown opcode from the peer", req->buffer, req->length);
+ break;
}
- break;
-
- default:
- Utils::dump("unknown opcode from the peer", buffer.get(), length);
- break;
- }
- }
- else {
- // if the DMR handler has parrot frames to playback, playback a frame
- if (m_tagDMR->hasParrotFrames()) {
- m_tagDMR->playbackParrot();
- }
-
- // if the P25 handler has parrot frames to playback, playback a frame
- if (m_tagP25->hasParrotFrames()) {
- m_tagP25->playbackParrot();
}
- // if the NXDN handler has parrot frames to playback, playback a frame
- if (m_tagNXDN->hasParrotFrames()) {
- m_tagNXDN->playbackParrot();
- }
- }
+ if (req->buffer != nullptr)
+ delete req->buffer;
+ delete req;
+ }
- return;
+ return nullptr;
}
///
@@ -842,14 +890,19 @@ void FNENetwork::close()
///
bool FNENetwork::erasePeerAffiliations(uint32_t peerId)
{
- auto it = std::find_if(m_peerAffiliations.begin(), m_peerAffiliations.end(), [&](PeerAffiliationMapPair x) { return x.first == peerId; });
- if (it != m_peerAffiliations.end()) {
- lookups::AffiliationLookup* aff = m_peerAffiliations[peerId];
- m_peerAffiliations.erase(peerId);
- delete aff;
-
- return true;
+ m_peerMutex.lock();
+ {
+ auto it = std::find_if(m_peerAffiliations.begin(), m_peerAffiliations.end(), [&](PeerAffiliationMapPair x) { return x.first == peerId; });
+ if (it != m_peerAffiliations.end()) {
+ lookups::AffiliationLookup* aff = m_peerAffiliations[peerId];
+ m_peerAffiliations.erase(peerId);
+ delete aff;
+
+ m_peerMutex.unlock();
+ return true;
+ }
}
+ m_peerMutex.unlock();
return false;
}
@@ -861,11 +914,16 @@ bool FNENetwork::erasePeerAffiliations(uint32_t peerId)
///
bool FNENetwork::erasePeer(uint32_t peerId)
{
- auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; });
- if (it != m_peers.end()) {
- m_peers.erase(peerId);
- return true;
+ m_peerMutex.lock();
+ {
+ auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; });
+ if (it != m_peers.end()) {
+ m_peers.erase(peerId);
+ m_peerMutex.unlock();
+ return true;
+ }
}
+ m_peerMutex.unlock();
return false;
}
@@ -905,7 +963,7 @@ void FNENetwork::peerACLUpdate(uint32_t peerId)
req->peerId = peerId;
std::stringstream peerName;
- peerName << "peer" << peerId << ":acl-update";
+ peerName << peerId << ":acl-update";
::pthread_create(&req->thread, NULL, threadedACLUpdate, req);
if (pthread_kill(req->thread, 0) == 0) {
@@ -928,7 +986,9 @@ void* FNENetwork::threadedACLUpdate(void* arg)
req->network->writeTGIDs(req->peerId);
req->network->writeDeactiveTGIDs(req->peerId);
+ uint32_t thread = req->thread;
delete req;
+ ::pthread_exit(&thread);
}
return nullptr;
@@ -1339,7 +1399,6 @@ bool FNENetwork::writePeerNAK(uint32_t peerId, const char* tag, NET_CONN_NAK_REA
__SET_UINT16B((uint16_t)reason, buffer, 10U); // Reason
LogWarning(LOG_NET, "PEER %u NAK %s, reason = %u", peerId, tag, (uint16_t)reason);
- m_frameQueue->enqueueMessage(buffer, 12U, createStreamId(), peerId, m_peerId,
+ return m_frameQueue->write(buffer, 12U, createStreamId(), peerId, m_peerId,
{ NET_FUNC_NAK, NET_SUBFUNC_NOP }, 0U, addr, addrLen);
- return m_frameQueue->flushQueue();
}
diff --git a/src/fne/network/FNENetwork.h b/src/fne/network/FNENetwork.h
index 68d92177..ebe2b81e 100644
--- a/src/fne/network/FNENetwork.h
+++ b/src/fne/network/FNENetwork.h
@@ -24,6 +24,7 @@
#include
#include
#include
+#include
// ---------------------------------------------------------------------------
// Class Prototypes
@@ -155,6 +156,25 @@ namespace network
pthread_t thread;
};
+ // ---------------------------------------------------------------------------
+ // Structure Declaration
+ // Represents the data required for a network packet handler thread.
+ // ---------------------------------------------------------------------------
+
+ struct NetPacketRequest {
+ FNENetwork* network;
+ uint32_t peerId;
+
+ sockaddr_storage address;
+ uint32_t addrLen;
+ frame::RTPHeader rtpHeader;
+ frame::RTPFNEHeader fneHeader;
+ int length = 0U;
+ uint8_t *buffer;
+
+ pthread_t thread;
+ };
+
// ---------------------------------------------------------------------------
// Class Declaration
// Implements the core FNE networking logic.
@@ -189,6 +209,8 @@ namespace network
/// Process a data frames from the network.
void processNetwork();
+ /// Entry point to process a given network packet.
+ static void* threadedNetworkRx(void* arg);
/// Updates the timer by the passed number of milliseconds.
void clock(uint32_t ms) override;
@@ -227,6 +249,7 @@ namespace network
NET_CONN_STATUS m_status;
+ std::mutex m_peerMutex;
typedef std::pair PeerMapPair;
std::unordered_map m_peers;
typedef std::pair PeerAffiliationMapPair;