implement initial logic to provide PDU processing on the FNE;

pull/61/head
Bryan Biedenkapp 2 years ago
parent 96ea9269be
commit 88b619c71f

@ -633,9 +633,9 @@ namespace network
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | Protocol Tag (P25D) | * | Protocol Tag (P25D) |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* |C| SAP | PDU Length (Bytes) | * |C| SAP | Reserved |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | Reserved | * | PDU Length (Bytes) | Reserved |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | | MFId | * | | MFId |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

@ -16,6 +16,8 @@ file(GLOB dvmfne_SRC
"src/fne/network/callhandler/*.h" "src/fne/network/callhandler/*.h"
"src/fne/network/callhandler/*.cpp" "src/fne/network/callhandler/*.cpp"
"src/fne/network/callhandler/packetdata/*.h"
"src/fne/network/callhandler/packetdata/*.cpp"
"src/fne/network/influxdb/*.h" "src/fne/network/influxdb/*.h"
"src/fne/network/*.h" "src/fne/network/*.h"
"src/fne/network/*.cpp" "src/fne/network/*.cpp"

@ -37,6 +37,7 @@
namespace network { namespace callhandler { class HOST_SW_API TagDMRData; } } namespace network { namespace callhandler { class HOST_SW_API TagDMRData; } }
namespace network { namespace callhandler { class HOST_SW_API TagP25Data; } } namespace network { namespace callhandler { class HOST_SW_API TagP25Data; } }
namespace network { namespace callhandler { namespace packetdata { class HOST_SW_API P25PacketData; } } }
namespace network { namespace callhandler { class HOST_SW_API TagNXDNData; } } namespace network { namespace callhandler { class HOST_SW_API TagNXDNData; } }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -72,6 +73,7 @@ private:
friend class network::FNENetwork; friend class network::FNENetwork;
friend class network::callhandler::TagDMRData; friend class network::callhandler::TagDMRData;
friend class network::callhandler::TagP25Data; friend class network::callhandler::TagP25Data;
friend class network::callhandler::packetdata::P25PacketData;
friend class network::callhandler::TagNXDNData; friend class network::callhandler::TagNXDNData;
network::FNENetwork* m_network; network::FNENetwork* m_network;
network::DiagNetwork* m_diagNetwork; network::DiagNetwork* m_diagNetwork;

@ -89,6 +89,7 @@ FNENetwork::FNENetwork(HostFNE* host, const std::string& address, uint16_t port,
m_influxOrg("dvm"), m_influxOrg("dvm"),
m_influxBucket("dvm"), m_influxBucket("dvm"),
m_influxLogRawData(false), m_influxLogRawData(false),
m_dumpDataPacket(false),
m_reportPeerPing(reportPeerPing), m_reportPeerPing(reportPeerPing),
m_verbose(verbose) m_verbose(verbose)
{ {
@ -146,6 +147,8 @@ void FNENetwork::setOptions(yaml::Node& conf, bool printOptions)
m_filterHeaders = conf["filterHeaders"].as<bool>(true); m_filterHeaders = conf["filterHeaders"].as<bool>(true);
m_filterTerminators = conf["filterTerminators"].as<bool>(true); m_filterTerminators = conf["filterTerminators"].as<bool>(true);
m_dumpDataPacket = conf["dumpDataPacket"].as<bool>(false);
/* /*
** Drop Unit to Unit Peers ** Drop Unit to Unit Peers
*/ */
@ -166,6 +169,7 @@ void FNENetwork::setOptions(yaml::Node& conf, bool printOptions)
if (m_disallowAdjStsBcast) { if (m_disallowAdjStsBcast) {
LogWarning(LOG_NET, "NOTICE: All P25 ADJ_STS_BCAST messages will be blocked and dropped!"); LogWarning(LOG_NET, "NOTICE: All P25 ADJ_STS_BCAST messages will be blocked and dropped!");
} }
LogInfo(" Dump Packet Data: %s", m_dumpDataPacket ? "yes" : "no");
LogInfo(" Disable P25 ADJ_STS_BCAST to external peers: %s", m_disallowExtAdjStsBcast ? "yes" : "no"); LogInfo(" Disable P25 ADJ_STS_BCAST to external peers: %s", m_disallowExtAdjStsBcast ? "yes" : "no");
LogInfo(" Allow conventional sites to override affiliation and receive all traffic: %s", m_allowConvSiteAffOverride ? "yes" : "no"); LogInfo(" Allow conventional sites to override affiliation and receive all traffic: %s", m_allowConvSiteAffOverride ? "yes" : "no");
LogInfo(" Restrict grant response by affiliation: %s", m_restrictGrantToAffOnly ? "yes" : "no"); LogInfo(" Restrict grant response by affiliation: %s", m_restrictGrantToAffOnly ? "yes" : "no");

@ -49,6 +49,7 @@ class HOST_SW_API HostFNE;
class HOST_SW_API RESTAPI; class HOST_SW_API RESTAPI;
namespace network { namespace callhandler { class HOST_SW_API TagDMRData; } } namespace network { namespace callhandler { class HOST_SW_API TagDMRData; } }
namespace network { namespace callhandler { class HOST_SW_API TagP25Data; } } namespace network { namespace callhandler { class HOST_SW_API TagP25Data; } }
namespace network { namespace callhandler { namespace packetdata { class HOST_SW_API P25PacketData; } } }
namespace network { namespace callhandler { class HOST_SW_API TagNXDNData; } } namespace network { namespace callhandler { class HOST_SW_API TagNXDNData; } }
namespace network namespace network
@ -394,6 +395,7 @@ namespace network
friend class callhandler::TagDMRData; friend class callhandler::TagDMRData;
callhandler::TagDMRData* m_tagDMR; callhandler::TagDMRData* m_tagDMR;
friend class callhandler::TagP25Data; friend class callhandler::TagP25Data;
friend class callhandler::packetdata::P25PacketData;
callhandler::TagP25Data* m_tagP25; callhandler::TagP25Data* m_tagP25;
friend class callhandler::TagNXDNData; friend class callhandler::TagNXDNData;
callhandler::TagNXDNData* m_tagNXDN; callhandler::TagNXDNData* m_tagNXDN;
@ -456,6 +458,7 @@ namespace network
bool m_influxLogRawData; bool m_influxLogRawData;
influxdb::ServerInfo m_influxServer; influxdb::ServerInfo m_influxServer;
bool m_dumpDataPacket;
bool m_reportPeerPing; bool m_reportPeerPing;
bool m_verbose; bool m_verbose;

@ -21,6 +21,7 @@
using namespace system_clock; using namespace system_clock;
using namespace network; using namespace network;
using namespace network::callhandler; using namespace network::callhandler;
using namespace network::callhandler::packetdata;
using namespace p25; using namespace p25;
using namespace p25::defines; using namespace p25::defines;
@ -45,14 +46,20 @@ TagP25Data::TagP25Data(FNENetwork* network, bool debug) :
m_parrotFramesReady(false), m_parrotFramesReady(false),
m_parrotFirstFrame(true), m_parrotFirstFrame(true),
m_status(), m_status(),
m_packetData(nullptr),
m_debug(debug) m_debug(debug)
{ {
assert(network != nullptr); assert(network != nullptr);
m_packetData = new P25PacketData(network, debug);
} }
/* Finalizes a instance of the TagP25Data class. */ /* Finalizes a instance of the TagP25Data class. */
TagP25Data::~TagP25Data() = default; TagP25Data::~TagP25Data()
{
delete m_packetData;
}
/* Process a data frame from the network. */ /* Process a data frame from the network. */
@ -77,6 +84,10 @@ bool TagP25Data::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
DUID::E duid = (DUID::E)data[22U]; DUID::E duid = (DUID::E)data[22U];
FrameType::E frameType = FrameType::DATA_UNIT; FrameType::E frameType = FrameType::DATA_UNIT;
if (duid == DUID::PDU) {
return m_packetData->processFrame(data, len, peerId, pktSeq, streamId, external);
}
// perform TGID route rewrites if configured // perform TGID route rewrites if configured
routeRewrite(buffer, peerId, duid, dstId, false); routeRewrite(buffer, peerId, duid, dstId, false);
dstId = __GET_UINT16(buffer, 8U); dstId = __GET_UINT16(buffer, 8U);
@ -394,7 +405,7 @@ bool TagP25Data::processGrantReq(uint32_t srcId, uint32_t dstId, bool unitToUnit
if (!tg.config().active()) { if (!tg.config().active()) {
return false; return false;
} }
// repeat traffic to the connected peers // repeat traffic to the connected peers
if (m_network->m_peers.size() > 0U) { if (m_network->m_peers.size() > 0U) {

@ -27,6 +27,7 @@
#include "common/p25/lc/TSBK.h" #include "common/p25/lc/TSBK.h"
#include "common/p25/lc/TDULC.h" #include "common/p25/lc/TDULC.h"
#include "network/FNENetwork.h" #include "network/FNENetwork.h"
#include "network/callhandler/packetdata/P25PacketData.h"
#include <deque> #include <deque>
@ -36,7 +37,6 @@ namespace network
{ {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Class Declaration // Class Declaration
//
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
/** /**
@ -161,6 +161,8 @@ namespace network
typedef std::pair<const uint32_t, RxStatus> StatusMapPair; typedef std::pair<const uint32_t, RxStatus> StatusMapPair;
std::unordered_map<uint32_t, RxStatus> m_status; std::unordered_map<uint32_t, RxStatus> m_status;
packetdata::P25PacketData *m_packetData;
bool m_debug; bool m_debug;
/** /**

@ -0,0 +1,462 @@
// SPDX-License-Identifier: GPL-2.0-only
/*
* Digital Voice Modem - Converged FNE Software
* GPLv2 Open Source. Use is subject to license terms.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* Copyright (C) 2024 Bryan Biedenkapp, N2PLL
*
*/
#include "fne/Defines.h"
#include "common/p25/lc/tsbk/TSBKFactory.h"
#include "common/p25/Sync.h"
#include "common/edac/CRC.h"
#include "common/Clock.h"
#include "common/Log.h"
#include "common/Thread.h"
#include "common/Utils.h"
#include "network/FNENetwork.h"
#include "network/callhandler/packetdata/P25PacketData.h"
#include "HostFNE.h"
using namespace system_clock;
using namespace network;
using namespace network::callhandler::packetdata;
using namespace p25;
using namespace p25::defines;
#include <cassert>
#include <chrono>
// ---------------------------------------------------------------------------
// Public Class Members
// ---------------------------------------------------------------------------
/* Initializes a new instance of the P25PacketData class. */
P25PacketData::P25PacketData(FNENetwork* network, bool debug) :
m_network(network),
m_status(),
m_debug(debug)
{
assert(network != nullptr);
}
/* Finalizes a instance of the P25PacketData class. */
P25PacketData::~P25PacketData() = default;
/* Process a data frame from the network. */
bool P25PacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId, uint16_t pktSeq, uint32_t streamId, bool external)
{
hrc::hrc_t pktTime = hrc::now();
uint32_t blockLength = __GET_UINT16(data, 8U);
uint8_t blocksToFollow = data[20U];
uint8_t currentBlock = data[21U];
if (blockLength == 0U)
return false;
uint8_t buffer[P25_PDU_FEC_LENGTH_BYTES];
::memset(buffer, 0x00U, P25_PDU_FEC_LENGTH_BYTES);
::memcpy(buffer, data + 24U, P25_PDU_FEC_LENGTH_BYTES);
auto it = std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) { return x.second->peerId == peerId; });
if (it != m_status.end()) {
RxStatus* status = m_status[peerId];
if (streamId != status->streamId) {
LogWarning(LOG_NET, "P25, Data Call Collision, peer = %u, streamId = %u, rxPeer = %u, rxLlId = %u, rxStreamId = %u, external = %u",
peerId, streamId, status->peerId, status->llId, status->streamId, external);
return false;
}
} else {
if (currentBlock == 0U) {
// this is a new call stream
RxStatus* status = new RxStatus();
status->callStartTime = pktTime;
status->streamId = streamId;
status->peerId = peerId;
bool ret = status->header.decode(buffer);
if (!ret) {
LogWarning(LOG_NET, P25_PDU_STR ", unfixable RF 1/2 rate header data");
Utils::dump(1U, "Unfixable PDU Data", buffer, P25_PDU_FEC_LENGTH_BYTES);
return false;
}
LogMessage(LOG_NET, P25_PDU_STR ", peerId = %u, ack = %u, outbound = %u, fmt = $%02X, sap = $%02X, fullMessage = %u, blocksToFollow = %u, padLength = %u, packetLength = %u, n = %u, seqNo = %u, hdrOffset = %u, llId = %u",
peerId, status->header.getAckNeeded(), status->header.getOutbound(), status->header.getFormat(), status->header.getSAP(), status->header.getFullMessage(),
status->header.getBlocksToFollow(), status->header.getPadLength(), status->header.getPacketLength(), status->header.getNs(), status->header.getFSN(),
status->header.getHeaderOffset(), status->header.getLLId());
// make sure we don't get a PDU with more blocks then we support
if (status->header.getBlocksToFollow() >= P25_MAX_PDU_BLOCKS) {
LogError(LOG_NET, P25_PDU_STR ", too many PDU blocks to process, %u > %u", status->header.getBlocksToFollow(), P25_MAX_PDU_BLOCKS);
return false;
}
status->llId = status->header.getLLId();
m_status[peerId] = status;
LogMessage(LOG_NET, "P25, Data Call Start, peer = %u, llId = %u, streamId = %u, external = %u", peerId, status->llId, streamId, external);
return true;
} else {
LogError(LOG_NET, P25_PDU_STR ", illegal starting data block, peerId = %u", peerId);
return false;
}
}
RxStatus* status = m_status[peerId];
::memcpy(status->netPDU + status->dataOffset, data + 24U, blockLength);
status->dataOffset += blockLength;
status->netPDUCount++;
status->dataBlockCnt++;
if (status->dataBlockCnt >= status->header.getBlocksToFollow()) {
uint32_t blocksToFollow = status->header.getBlocksToFollow();
uint32_t offset = 0U;
uint8_t buffer[P25_PDU_FEC_LENGTH_BYTES];
// process second header if we're using enhanced addressing
if (status->header.getSAP() == PDUSAP::EXT_ADDR &&
status->header.getFormat() == PDUFormatType::UNCONFIRMED) {
::memset(buffer, 0x00U, P25_PDU_FEC_LENGTH_BYTES);
::memcpy(buffer, status->netPDU, P25_PDU_FEC_LENGTH_BYTES);
bool ret = status->secondHeader.decode(buffer);
if (!ret) {
LogWarning(LOG_NET, P25_PDU_STR ", unfixable RF 1/2 rate second header data");
Utils::dump(1U, "Unfixable PDU Data", buffer, P25_PDU_HEADER_LENGTH_BYTES);
delete status;
m_status.erase(peerId);
return false;
}
LogMessage(LOG_NET, P25_PDU_STR ", peerId = %u, fmt = $%02X, mfId = $%02X, sap = $%02X, fullMessage = %u, blocksToFollow = %u, padLength = %u, n = %u, seqNo = %u, lastFragment = %u, hdrOffset = %u, llId = %u",
peerId, status->secondHeader.getFormat(), status->secondHeader.getMFId(), status->secondHeader.getSAP(), status->secondHeader.getFullMessage(),
status->secondHeader.getBlocksToFollow(), status->secondHeader.getPadLength(), status->secondHeader.getNs(), status->secondHeader.getFSN(), status->secondHeader.getLastFragment(),
status->secondHeader.getHeaderOffset(), status->secondHeader.getLLId());
status->useSecondHeader = true;
offset += P25_PDU_FEC_LENGTH_BYTES;
blocksToFollow--;
}
status->dataBlockCnt = 0U;
// process all blocks in the data stream
uint32_t dataOffset = 0U;
status->pduUserData = new uint8_t[P25_MAX_PDU_BLOCKS * P25_PDU_CONFIRMED_LENGTH_BYTES + 2U];
::memset(status->pduUserData, 0x00U, P25_MAX_PDU_BLOCKS * P25_PDU_CONFIRMED_LENGTH_BYTES + 2U);
// if we are using a secondary header place it in the PDU user data buffer
if (status->useSecondHeader) {
status->secondHeader.getData(status->pduUserData + dataOffset);
dataOffset += P25_PDU_HEADER_LENGTH_BYTES;
status->pduUserDataLength += P25_PDU_HEADER_LENGTH_BYTES;
}
// decode data blocks
for (uint32_t i = 0U; i < blocksToFollow; i++) {
::memset(buffer, 0x00U, P25_PDU_FEC_LENGTH_BYTES);
::memcpy(buffer, status->netPDU + offset, P25_PDU_FEC_LENGTH_BYTES);
bool ret = status->blockData[i].decode(buffer, (status->useSecondHeader) ? status->secondHeader : status->header);
if (ret) {
// if we are getting unconfirmed or confirmed blocks, and if we've reached the total number of blocks
// set this block as the last block for full packet CRC
if ((status->header.getFormat() == PDUFormatType::CONFIRMED) || (status->header.getFormat() == PDUFormatType::UNCONFIRMED)) {
if ((status->dataBlockCnt + 1U) == blocksToFollow) {
status->blockData[i].setLastBlock(true);
}
}
// are we processing extended address data from the first block?
if (status->header.getSAP() == PDUSAP::EXT_ADDR && status->header.getFormat() == PDUFormatType::CONFIRMED &&
status->blockData[i].getSerialNo() == 0U) {
LogMessage(LOG_NET, P25_PDU_STR ", peerId = %u, block %u, fmt = $%02X, lastBlock = %u, sap = $%02X, llId = %u",
peerId, status->blockData[i].getSerialNo(), status->blockData[i].getFormat(), status->blockData[i].getLastBlock(), status->blockData[i].getSAP(), status->blockData[i].getLLId());
if (m_network->m_dumpDataPacket) {
uint8_t dataBlock[P25_PDU_CONFIRMED_DATA_LENGTH_BYTES];
::memset(dataBlock, 0xAAU, P25_PDU_CONFIRMED_DATA_LENGTH_BYTES);
status->blockData[i].getData(dataBlock);
Utils::dump(2U, "Data Block", dataBlock, P25_PDU_CONFIRMED_DATA_LENGTH_BYTES);
}
status->secondHeader.reset();
status->secondHeader.setAckNeeded(true);
status->secondHeader.setFormat(status->blockData[i].getFormat());
status->secondHeader.setLLId(status->blockData[i].getLLId());
status->secondHeader.setSAP(status->blockData[i].getSAP());
status->extendedAddress = true;
}
else {
LogMessage(LOG_NET, P25_PDU_STR ", peerId = %u, block %u, fmt = $%02X, lastBlock = %u",
peerId, (status->header.getFormat() == PDUFormatType::CONFIRMED) ? status->blockData[i].getSerialNo() : status->dataBlockCnt, status->blockData[i].getFormat(),
status->blockData[i].getLastBlock());
if (m_network->m_dumpDataPacket) {
uint8_t dataBlock[P25_PDU_CONFIRMED_DATA_LENGTH_BYTES];
::memset(dataBlock, 0xAAU, P25_PDU_CONFIRMED_DATA_LENGTH_BYTES);
status->blockData[i].getData(dataBlock);
Utils::dump(2U, "Data Block", dataBlock, P25_PDU_CONFIRMED_DATA_LENGTH_BYTES);
}
}
status->blockData[i].getData(status->pduUserData + dataOffset);
dataOffset += (status->header.getFormat() == PDUFormatType::CONFIRMED) ? P25_PDU_CONFIRMED_DATA_LENGTH_BYTES : P25_PDU_UNCONFIRMED_LENGTH_BYTES;
status->pduUserDataLength = dataOffset;
status->dataBlockCnt++;
// is this the last block?
if (status->blockData[i].getLastBlock() && status->dataBlockCnt == blocksToFollow) {
bool crcRet = edac::CRC::checkCRC32(status->pduUserData, status->pduUserDataLength);
if (!crcRet) {
LogWarning(LOG_NET, P25_PDU_STR ", failed CRC-32 check, blocks %u, len %u", blocksToFollow, status->pduUserDataLength);
}
}
}
else {
if (status->blockData[i].getFormat() == PDUFormatType::CONFIRMED)
LogWarning(LOG_NET, P25_PDU_STR ", unfixable PDU data (3/4 rate or CRC), block %u", i);
else
LogWarning(LOG_NET, P25_PDU_STR ", unfixable PDU data (1/2 rate or CRC), block %u", i);
if (m_network->m_dumpDataPacket) {
Utils::dump(1U, "Unfixable PDU Data", buffer, P25_PDU_FEC_LENGTH_BYTES);
}
}
offset += P25_PDU_FEC_LENGTH_BYTES;
}
if (status->dataBlockCnt < blocksToFollow) {
LogWarning(LOG_NET, P25_PDU_STR ", incomplete PDU (%d / %d blocks), peerId = %u, llId = %u", status->dataBlockCnt, blocksToFollow, peerId, status->llId);
}
// dispatch the PDU data
if (status->dataBlockCnt > 0U) {
dispatch(peerId);
}
uint64_t duration = hrc::diff(pktTime, status->callStartTime);
uint32_t srcId = status->header.getLLId();
uint32_t dstId = (status->useSecondHeader || status->extendedAddress) ? status->secondHeader.getLLId() : status->header.getLLId();
LogMessage(LOG_NET, "P25, Data Call End, peer = %u, srcId = %u, dstId = %u, blocks = %u, duration = %u, streamId = %u, external = %u",
peerId, srcId, dstId, status->header.getBlocksToFollow(), duration / 1000, streamId, external);
// report call event to InfluxDB
if (m_network->m_enableInfluxDB) {
influxdb::QueryBuilder()
.meas("call_event")
.tag("peerId", std::to_string(peerId))
.tag("mode", "P25")
.tag("streamId", std::to_string(streamId))
.tag("srcId", std::to_string(srcId))
.tag("dstId", std::to_string(dstId))
.field("duration", duration)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
}
delete status;
m_status.erase(peerId);
}
return true;
}
// ---------------------------------------------------------------------------
// Private Class Members
// ---------------------------------------------------------------------------
/* Helper to dispatch PDU user data. */
void P25PacketData::dispatch(uint32_t peerId)
{
RxStatus* status = m_status[peerId];
if (m_network->m_dumpDataPacket) {
Utils::dump(1U, "PDU Packet", status->pduUserData, status->pduUserDataLength);
}
uint32_t srcId = status->header.getLLId();
uint32_t dstId = (status->useSecondHeader || status->extendedAddress) ? status->secondHeader.getLLId() : status->header.getLLId();
// repeat traffic to the connected peers
if (m_network->m_peers.size() > 0U) {
uint32_t i = 0U;
for (auto peer : m_network->m_peers) {
if (peerId != peer.first) {
// every 2 peers flush the queue
if (i % 2U == 0U) {
m_network->m_frameQueue->flushQueue();
}
write_PDU_User(peer.first, nullptr, status->header, status->secondHeader, status->useSecondHeader, status->pduUserData, true);
if (m_network->m_debug) {
LogDebug(LOG_NET, "P25, srcPeer = %u, dstPeer = %u, duid = $%02X, srcId = %u, dstId = %u",
peerId, peer.first, DUID::PDU, srcId, dstId);
}
i++;
}
}
m_network->m_frameQueue->flushQueue();
}
// repeat traffic to external peers
if (m_network->m_host->m_peerNetworks.size() > 0U) {
for (auto peer : m_network->m_host->m_peerNetworks) {
uint32_t dstPeerId = peer.second->getPeerId();
// don't try to repeat traffic to the source peer...if this traffic
// is coming from a external peer
if (dstPeerId != peerId) {
// check if the source peer is blocked from sending to this peer
if (peer.second->checkBlockedPeer(peerId)) {
continue;
}
// skip peer if it isn't enabled
if (!peer.second->isEnabled()) {
continue;
}
write_PDU_User(dstPeerId, peer.second, status->header, status->secondHeader, status->useSecondHeader, status->pduUserData);
if (m_network->m_debug) {
LogDebug(LOG_NET, "P25, srcPeer = %u, dstPeer = %u, duid = $%02X, srcId = %u, dstId = %u",
peerId, dstPeerId, DUID::PDU, srcId, dstId);
}
}
}
}
}
/* Helper to write user data as a P25 PDU packet. */
void P25PacketData::write_PDU_User(uint32_t peerId, network::PeerNetwork* peerNet, data::DataHeader& dataHeader, data::DataHeader& secondHeader,
bool useSecondHeader, uint8_t* pduUserData, bool queueOnly)
{
assert(pduUserData != nullptr);
uint32_t streamId = m_network->createStreamId();
uint16_t pktSeq = 0U;
uint8_t buffer[P25_PDU_FEC_LENGTH_BYTES];
::memset(buffer, 0x00U, P25_PDU_FEC_LENGTH_BYTES);
uint32_t blocksToFollow = dataHeader.getBlocksToFollow();
LogMessage(LOG_NET, P25_PDU_STR ", OSP, peerId = %u, ack = %u, outbound = %u, fmt = $%02X, mfId = $%02X, sap = $%02X, fullMessage = %u, blocksToFollow = %u, padLength = %u, packetLength = %u, n = %u, seqNo = %u, lastFragment = %u, hdrOffset = %u, llId = %u",
peerId, dataHeader.getAckNeeded(), dataHeader.getOutbound(), dataHeader.getFormat(), dataHeader.getMFId(), dataHeader.getSAP(), dataHeader.getFullMessage(),
dataHeader.getBlocksToFollow(), dataHeader.getPadLength(), dataHeader.getPacketLength(), dataHeader.getNs(), dataHeader.getFSN(), dataHeader.getLastFragment(),
dataHeader.getHeaderOffset(), dataHeader.getLLId());
// generate the PDU header and 1/2 rate Trellis
dataHeader.encode(buffer);
writeNetwork(peerId, peerNet, dataHeader, 0U, buffer, P25_PDU_FEC_LENGTH_BYTES, pktSeq, streamId, queueOnly);
++pktSeq;
uint32_t dataOffset = 0U;
// generate the second PDU header
if (useSecondHeader) {
secondHeader.encode(pduUserData, true);
::memset(buffer, 0x00U, P25_PDU_FEC_LENGTH_BYTES);
secondHeader.encode(buffer);
writeNetwork(peerId, peerNet, dataHeader, 1U, buffer, P25_PDU_FEC_LENGTH_BYTES, pktSeq, streamId, queueOnly);
++pktSeq;
dataOffset += P25_PDU_HEADER_LENGTH_BYTES;
blocksToFollow--;
LogMessage(LOG_NET, P25_PDU_STR ", OSP, peerId = %u, fmt = $%02X, mfId = $%02X, sap = $%02X, fullMessage = %u, blocksToFollow = %u, padLength = %u, n = %u, seqNo = %u, lastFragment = %u, hdrOffset = %u, llId = %u",
peerId, secondHeader.getFormat(), secondHeader.getMFId(), secondHeader.getSAP(), secondHeader.getFullMessage(),
secondHeader.getBlocksToFollow(), secondHeader.getPadLength(), secondHeader.getNs(), secondHeader.getFSN(), secondHeader.getLastFragment(),
secondHeader.getHeaderOffset(), secondHeader.getLLId());
}
if (dataHeader.getFormat() != PDUFormatType::AMBT) {
edac::CRC::addCRC32(pduUserData, (dataHeader.getPacketLength() + dataHeader.getPadLength() + 4U));
}
uint32_t networkBlock = 1U;
if (useSecondHeader)
++networkBlock;
// generate the PDU data
for (uint32_t i = 0U; i < blocksToFollow; i++) {
data::DataBlock dataBlock = data::DataBlock();
dataBlock.setFormat((useSecondHeader) ? secondHeader : dataHeader);
dataBlock.setSerialNo(i);
dataBlock.setData(pduUserData + dataOffset);
// are we processing extended address data from the first block?
if (dataHeader.getSAP() == PDUSAP::EXT_ADDR && dataHeader.getFormat() == PDUFormatType::CONFIRMED &&
dataBlock.getSerialNo() == 0U) {
LogMessage(LOG_NET, P25_PDU_STR ", OSP, peerId = %u, block %u, fmt = $%02X, lastBlock = %u, sap = $%02X, llId = %u",
peerId, dataBlock.getSerialNo(), dataBlock.getFormat(), dataBlock.getLastBlock(), dataBlock.getSAP(), dataBlock.getLLId());
if (m_network->m_dumpDataPacket) {
uint8_t rawDataBlock[P25_PDU_CONFIRMED_DATA_LENGTH_BYTES];
::memset(rawDataBlock, 0xAAU, P25_PDU_CONFIRMED_DATA_LENGTH_BYTES);
dataBlock.getData(rawDataBlock);
Utils::dump(2U, "Data Block", rawDataBlock, P25_PDU_CONFIRMED_DATA_LENGTH_BYTES);
}
}
else {
LogMessage(LOG_NET, P25_PDU_STR ", OSP, peerId = %u, block %u, fmt = $%02X, lastBlock = %u",
peerId, (dataHeader.getFormat() == PDUFormatType::CONFIRMED) ? dataBlock.getSerialNo() : i, dataBlock.getFormat(),
dataBlock.getLastBlock());
if (m_network->m_dumpDataPacket) {
uint8_t rawDataBlock[P25_PDU_CONFIRMED_DATA_LENGTH_BYTES];
::memset(rawDataBlock, 0xAAU, P25_PDU_CONFIRMED_DATA_LENGTH_BYTES);
dataBlock.getData(rawDataBlock);
Utils::dump(2U, "Data Block", rawDataBlock, P25_PDU_CONFIRMED_DATA_LENGTH_BYTES);
}
}
::memset(buffer, 0x00U, P25_PDU_FEC_LENGTH_BYTES);
dataBlock.encode(buffer);
writeNetwork(peerId, peerNet, dataHeader, networkBlock, buffer, P25_PDU_FEC_LENGTH_BYTES, (dataBlock.getLastBlock()) ? RTP_END_OF_CALL_SEQ : pktSeq, streamId);
++pktSeq;
networkBlock++;
dataOffset += (dataHeader.getFormat() == PDUFormatType::CONFIRMED) ? P25_PDU_CONFIRMED_DATA_LENGTH_BYTES : P25_PDU_UNCONFIRMED_LENGTH_BYTES;
}
}
/* Write data processed to the network. */
bool P25PacketData::writeNetwork(uint32_t peerId, network::PeerNetwork* peerNet, const p25::data::DataHeader& dataHeader, const uint8_t currentBlock,
const uint8_t *data, uint32_t len, uint16_t pktSeq, uint32_t streamId, bool queueOnly)
{
assert(data != nullptr);
uint32_t messageLength = 0U;
UInt8Array message = m_network->createP25_PDUMessage(messageLength, dataHeader, currentBlock, data, len);
if (message == nullptr) {
return false;
}
if (peerNet != nullptr) {
return peerNet->writeMaster({ NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, pktSeq, streamId);
} else {
return m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, pktSeq, streamId, false, true);
}
}

@ -0,0 +1,172 @@
// SPDX-License-Identifier: GPL-2.0-only
/*
* Digital Voice Modem - Converged FNE Software
* GPLv2 Open Source. Use is subject to license terms.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* Copyright (C) 2024 Bryan Biedenkapp, N2PLL
*
*/
/**
* @file P25PacketData.h
* @ingroup fne_callhandler
* @file P25PacketData.cpp
* @ingroup fne_callhandler
*/
#if !defined(__PACKETDATA__P25_PACKET_DATA_H__)
#define __PACKETDATA__P25_PACKET_DATA_H__
#include "fne/Defines.h"
#include "common/Clock.h"
#include "common/p25/P25Defines.h"
#include "common/p25/data/DataBlock.h"
#include "common/p25/data/DataHeader.h"
#include "network/FNENetwork.h"
#include "network/PeerNetwork.h"
#include <deque>
namespace network
{
namespace callhandler
{
namespace packetdata
{
// ---------------------------------------------------------------------------
// Class Declaration
// ---------------------------------------------------------------------------
/**
* @brief Implements the P25 packet data handler.
* @ingroup fne_callhandler
*/
class HOST_SW_API P25PacketData {
public:
/**
* @brief Initializes a new instance of the P25PacketData class.
* @param network Instance of the FNENetwork class.
* @param debug Flag indicating whether network debug is enabled.
*/
P25PacketData(FNENetwork* network, bool debug);
/**
* @brief Finalizes a instance of the P25PacketData class.
*/
~P25PacketData();
/**
* @brief Process a data frame from the network.
* @param data Network data buffer.
* @param len Length of data.
* @param peerId Peer ID.
* @param pktSeq RTP packet sequence.
* @param streamId Stream ID.
* @param external Flag indicating traffic is from an external peer.
* @returns bool True, if frame is processed, otherwise false.
*/
bool processFrame(const uint8_t* data, uint32_t len, uint32_t peerId, uint16_t pktSeq, uint32_t streamId, bool external = false);
private:
FNENetwork* m_network;
/**
* @brief Represents the receive status of a call.
*/
class RxStatus {
public:
system_clock::hrc::hrc_t callStartTime;
uint32_t llId;
uint32_t streamId;
uint32_t peerId;
p25::data::DataBlock* blockData;
p25::data::DataHeader header;
p25::data::DataHeader secondHeader;
bool useSecondHeader;
bool extendedAddress;
uint32_t dataOffset;
uint8_t dataBlockCnt;
uint8_t* netPDU;
uint32_t netPDUCount;
uint8_t* pduUserData;
uint32_t pduUserDataLength;
/**
* @brief Initializes a new instance of the RxStatus class
*/
RxStatus() :
llId(0U),
streamId(0U),
peerId(0U),
blockData(nullptr),
header(),
secondHeader(),
useSecondHeader(false),
extendedAddress(false),
dataOffset(0U),
dataBlockCnt(0U),
netPDU(nullptr),
netPDUCount(0U),
pduUserData(nullptr),
pduUserDataLength(0U)
{
blockData = new p25::data::DataBlock[P25DEF::P25_MAX_PDU_BLOCKS];
netPDU = new uint8_t[P25DEF::P25_PDU_FRAME_LENGTH_BYTES + 2U];
::memset(netPDU, 0x00U, P25DEF::P25_PDU_FRAME_LENGTH_BYTES + 2U);
pduUserData = new uint8_t[P25DEF::P25_MAX_PDU_BLOCKS * P25DEF::P25_PDU_CONFIRMED_LENGTH_BYTES + 2U];
::memset(pduUserData, 0x00U, P25DEF::P25_MAX_PDU_BLOCKS * P25DEF::P25_PDU_CONFIRMED_LENGTH_BYTES + 2U);
}
/**
* @brief Finalizes a instance of the RxStatus class
*/
~RxStatus()
{
delete[] blockData;
delete[] netPDU;
delete[] pduUserData;
}
};
typedef std::pair<const uint32_t, RxStatus*> StatusMapPair;
std::unordered_map<uint32_t, RxStatus*> m_status;
bool m_debug;
/**
* @brief Helper to dispatch PDU user data.
* @param peerId Peer ID.
*/
void dispatch(uint32_t peerId);
/**
* @brief Helper to write user data as a P25 PDU packet.
* @param peerId Peer ID.
* @param peerNet Instance of PeerNetwork to use to send traffic.
* @param dataHeader Instance of a PDU data header.
* @param secondHeader Instance of a PDU data header.
* @param useSecondHeader Flag indicating whether or not to use a second data header.
* @param pduUserData Buffer containing user data to transmit.
*/
void write_PDU_User(uint32_t peerId, network::PeerNetwork* peerNet, p25::data::DataHeader& dataHeader, p25::data::DataHeader& secondHeader,
bool useSecondHeader, uint8_t* pduUserData, bool queueOnly = false);
/**
* @brief Write data processed to the network.
* @param peerId Peer ID.
* @param peerNet Instance of PeerNetwork to use to send traffic.
* @param dataHeader Instance of a PDU data header.
* @param currentBlock Current Block ID.
* @param data Buffer containing block data.
* @param len Length of buffer.
* @param pktSeq RTP packet sequence.
* @param streamId Stream ID.
*/
bool writeNetwork(uint32_t peerId, network::PeerNetwork* peerNet, const p25::data::DataHeader& dataHeader, const uint8_t currentBlock,
const uint8_t* data, uint32_t len, uint16_t pktSeq, uint32_t streamId, bool queueOnly = false);
};
} // namespace packetdata
} // namespace callhandler
} // namespace network
#endif // __PACKETDATA__P25_PACKET_DATA_H__

@ -293,7 +293,11 @@ bool Data::process(uint8_t* data, uint32_t len)
if ((m_rfDataHeader.getFormat() != PDUFormatType::AMBT) && if ((m_rfDataHeader.getFormat() != PDUFormatType::AMBT) &&
(m_rfDataHeader.getFormat() != PDUFormatType::RSP) && (m_rfDataHeader.getFormat() != PDUFormatType::RSP) &&
(m_rfDataHeader.getSAP() != PDUSAP::CONV_DATA_REG)) { (m_rfDataHeader.getSAP() != PDUSAP::CONV_DATA_REG)) {
writeNetwork(m_rfDataBlockCnt, buffer, P25_PDU_FEC_LENGTH_BYTES, m_rfData[i].getLastBlock()); uint32_t networkBlock = m_rfDataBlockCnt + 1U;
if (m_rfUseSecondHeader)
++networkBlock;
writeNetwork(networkBlock, buffer, P25_PDU_FEC_LENGTH_BYTES, m_rfData[i].getLastBlock());
} }
m_rfDataBlockCnt++; m_rfDataBlockCnt++;
@ -645,6 +649,13 @@ bool Data::processNetwork(uint8_t* data, uint32_t len, uint32_t blockLength)
m_netData[i].getSerialNo() == 0U) { m_netData[i].getSerialNo() == 0U) {
LogMessage(LOG_NET, P25_PDU_STR ", block %u, fmt = $%02X, lastBlock = %u, sap = $%02X, llId = %u", LogMessage(LOG_NET, P25_PDU_STR ", block %u, fmt = $%02X, lastBlock = %u, sap = $%02X, llId = %u",
m_netData[i].getSerialNo(), m_netData[i].getFormat(), m_netData[i].getLastBlock(), m_netData[i].getSAP(), m_netData[i].getLLId()); m_netData[i].getSerialNo(), m_netData[i].getFormat(), m_netData[i].getLastBlock(), m_netData[i].getSAP(), m_netData[i].getLLId());
if (m_dumpPDUData) {
uint8_t dataBlock[P25_PDU_CONFIRMED_DATA_LENGTH_BYTES];
::memset(dataBlock, 0xAAU, P25_PDU_CONFIRMED_DATA_LENGTH_BYTES);
m_netData[i].getData(dataBlock);
Utils::dump(2U, "Network Data Block", dataBlock, P25_PDU_CONFIRMED_DATA_LENGTH_BYTES);
}
m_netSecondHeader.reset(); m_netSecondHeader.reset();
m_netSecondHeader.setAckNeeded(true); m_netSecondHeader.setAckNeeded(true);
m_netSecondHeader.setFormat(m_netData[i].getFormat()); m_netSecondHeader.setFormat(m_netData[i].getFormat());
@ -656,10 +667,17 @@ bool Data::processNetwork(uint8_t* data, uint32_t len, uint32_t blockLength)
LogMessage(LOG_NET, P25_PDU_STR ", block %u, fmt = $%02X, lastBlock = %u", LogMessage(LOG_NET, P25_PDU_STR ", block %u, fmt = $%02X, lastBlock = %u",
(m_netDataHeader.getFormat() == PDUFormatType::CONFIRMED) ? m_netData[i].getSerialNo() : m_netDataBlockCnt, m_netData[i].getFormat(), (m_netDataHeader.getFormat() == PDUFormatType::CONFIRMED) ? m_netData[i].getSerialNo() : m_netDataBlockCnt, m_netData[i].getFormat(),
m_netData[i].getLastBlock()); m_netData[i].getLastBlock());
if (m_dumpPDUData) {
uint8_t dataBlock[P25_PDU_CONFIRMED_DATA_LENGTH_BYTES];
::memset(dataBlock, 0xAAU, P25_PDU_CONFIRMED_DATA_LENGTH_BYTES);
m_netData[i].getData(dataBlock);
Utils::dump(2U, "Network Data Block", dataBlock, P25_PDU_CONFIRMED_DATA_LENGTH_BYTES);
}
} }
m_netData[i].getData(m_pduUserData + dataOffset); m_netData[i].getData(m_pduUserData + dataOffset);
dataOffset += (m_rfDataHeader.getFormat() == PDUFormatType::CONFIRMED) ? P25_PDU_CONFIRMED_DATA_LENGTH_BYTES : P25_PDU_UNCONFIRMED_LENGTH_BYTES; dataOffset += (m_netDataHeader.getFormat() == PDUFormatType::CONFIRMED) ? P25_PDU_CONFIRMED_DATA_LENGTH_BYTES : P25_PDU_UNCONFIRMED_LENGTH_BYTES;
m_pduUserDataLength = dataOffset; m_pduUserDataLength = dataOffset;
m_netDataBlockCnt++; m_netDataBlockCnt++;
@ -699,14 +717,12 @@ bool Data::processNetwork(uint8_t* data, uint32_t len, uint32_t blockLength)
::ActivityLog("P25", false, "Net data transmission from %u to %u, %u blocks", srcId, dstId, m_netDataHeader.getBlocksToFollow()); ::ActivityLog("P25", false, "Net data transmission from %u to %u, %u blocks", srcId, dstId, m_netDataHeader.getBlocksToFollow());
if (m_repeatPDU) { if (m_verbose) {
if (m_verbose) { LogMessage(LOG_NET, P25_PDU_STR ", transmitting network PDU, llId = %u", (m_netUseSecondHeader || m_netExtendedAddress) ? m_netSecondHeader.getLLId() : m_netDataHeader.getLLId());
LogMessage(LOG_NET, P25_PDU_STR ", repeating PDU, llId = %u", (m_netUseSecondHeader || m_netExtendedAddress) ? m_netSecondHeader.getLLId() : m_netDataHeader.getLLId());
}
writeNet_PDU_Buffered(); // re-generate buffered PDU and send it on
} }
writeNet_PDU_Buffered(); // re-generate buffered PDU and send it on
::ActivityLog("P25", false, "end of Net data transmission"); ::ActivityLog("P25", false, "end of Net data transmission");
m_netDataHeader.reset(); m_netDataHeader.reset();
@ -775,7 +791,7 @@ void Data::writeRF_PDU_User(data::DataHeader& dataHeader, data::DataHeader& seco
// generate the second PDU header // generate the second PDU header
if (useSecondHeader) { if (useSecondHeader) {
secondHeader.encode(m_pduUserData, true); secondHeader.encode(pduUserData, true);
::memset(block, 0x00U, P25_PDU_FEC_LENGTH_BYTES); ::memset(block, 0x00U, P25_PDU_FEC_LENGTH_BYTES);
secondHeader.encode(block); secondHeader.encode(block);

Loading…
Cancel
Save

Powered by TurnKey Linux.