From cb0dcc7f034711e4a31bb21e584fe6824310f1ed Mon Sep 17 00:00:00 2001 From: Bryan Biedenkapp Date: Mon, 5 Jan 2026 13:57:26 -0500 Subject: [PATCH] implement proper packet reordering for P25 PDU data; BUGFIX: add boundary checking for P25 PDU disassembly; --- src/common/p25/data/Assembler.cpp | 23 ++ .../callhandler/packetdata/DMRPacketData.h | 40 +-- .../callhandler/packetdata/P25PacketData.cpp | 227 ++++++++++-------- .../callhandler/packetdata/P25PacketData.h | 46 +++- 4 files changed, 210 insertions(+), 126 deletions(-) diff --git a/src/common/p25/data/Assembler.cpp b/src/common/p25/data/Assembler.cpp index 357cb22a..b4bc36f4 100644 --- a/src/common/p25/data/Assembler.cpp +++ b/src/common/p25/data/Assembler.cpp @@ -104,6 +104,14 @@ bool Assembler::disassemble(const uint8_t* pduBlock, uint32_t blockLength, bool dataHeader.getHeaderOffset(), dataHeader.getLLId()); } + if (dataHeader.getPacketLength() > P25_MAX_PDU_BLOCKS * P25_PDU_CONFIRMED_LENGTH_BYTES + 2U) { + LogError(LOG_P25, P25_PDU_STR ", ISP, packet length %u exceeds maximum supported size %u", + dataHeader.getPacketLength(), P25_MAX_PDU_BLOCKS * P25_PDU_CONFIRMED_LENGTH_BYTES); + + resetDisassemblyState(); + return false; + } + // make sure we don't get a PDU with more blocks then we support if (dataHeader.getBlocksToFollow() >= P25_MAX_PDU_BLOCKS) { LogError(LOG_P25, P25_PDU_STR ", ISP, too many PDU blocks to process, %u > %u", dataHeader.getBlocksToFollow(), P25_MAX_PDU_BLOCKS); @@ -485,6 +493,21 @@ uint32_t Assembler::getUserData(uint8_t* buffer) const assert(buffer != nullptr); assert(m_pduUserData != nullptr); + if (m_pduUserDataLength == 0U) { + LogError(LOG_P25, P25_PDU_STR ", no user data available to retrieve! BUGBUG!"); + return 0U; + } + + if (m_pduUserDataLength > (P25_MAX_PDU_BLOCKS * P25_PDU_CONFIRMED_LENGTH_BYTES + 2U)) { + LogError(LOG_P25, P25_PDU_STR ", user data length %u exceeds maximum allowable size! BUGBUG!", m_pduUserDataLength); + return 0U; + } + + if (m_pduUserData == nullptr) { + LogError(LOG_P25, P25_PDU_STR ", no user data available to retrieve! BUGBUG!"); + return 0U; + } + if (m_complete) { ::memcpy(buffer, m_pduUserData, m_pduUserDataLength); return m_pduUserDataLength; diff --git a/src/fne/network/callhandler/packetdata/DMRPacketData.h b/src/fne/network/callhandler/packetdata/DMRPacketData.h index 50524927..45cd1207 100644 --- a/src/fne/network/callhandler/packetdata/DMRPacketData.h +++ b/src/fne/network/callhandler/packetdata/DMRPacketData.h @@ -124,23 +124,24 @@ namespace network */ class RxStatus { public: - system_clock::hrc::hrc_t callStartTime; - system_clock::hrc::hrc_t lastPacket; - uint32_t srcId; - uint32_t dstId; - uint8_t slotNo; - uint32_t streamId; - uint32_t peerId; + system_clock::hrc::hrc_t callStartTime; //!< Data call start time + system_clock::hrc::hrc_t lastPacket; //!< Last packet time + uint32_t srcId; //!< Source Radio ID + uint32_t dstId; //!< Destination Radio ID + uint8_t slotNo; //!< DMR Slot Number + uint32_t streamId; //!< Stream ID + uint32_t peerId; //!< Peer ID - dmr::data::DataHeader header; - bool hasRxHeader; - uint8_t dataBlockCnt; - uint8_t frames; + std::unordered_map receivedBlocks; + dmr::data::DataHeader header; //!< PDU Data Header + bool hasRxHeader; //!< Flag indicating whether or not a valid Rx header has been received + uint16_t dataBlockCnt; //!< Number of data blocks received + uint8_t frames; //!< Number of frames received - bool callBusy; + bool callBusy; //!< Flag indicating whether or not the call is busy - uint8_t* pduUserData; - uint32_t pduDataOffset; + uint8_t* pduUserData; //!< PDU user data buffer + uint32_t pduDataOffset; //!< Offset within the PDU data buffer /** * @brief Initializes a new instance of the RxStatus class @@ -151,6 +152,7 @@ namespace network slotNo(0U), streamId(0U), peerId(0U), + receivedBlocks(), header(), hasRxHeader(false), dataBlockCnt(0U), @@ -167,6 +169,16 @@ namespace network */ ~RxStatus() { + if (!receivedBlocks.empty()) { + for (auto& it : receivedBlocks) { + if (it.second != nullptr) { + delete[] it.second; + it.second = nullptr; + } + } + receivedBlocks.clear(); + } + delete[] pduUserData; } }; diff --git a/src/fne/network/callhandler/packetdata/P25PacketData.cpp b/src/fne/network/callhandler/packetdata/P25PacketData.cpp index 0ff81a05..52f4f00a 100644 --- a/src/fne/network/callhandler/packetdata/P25PacketData.cpp +++ b/src/fne/network/callhandler/packetdata/P25PacketData.cpp @@ -104,17 +104,15 @@ bool P25PacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t pee { hrc::hrc_t pktTime = hrc::now(); + uint8_t totalBlocks = data[20U] + 1U; uint32_t blockLength = GET_UINT24(data, 8U); - uint8_t currentBlock = data[21U]; + if (totalBlocks == 0U) + return false; if (blockLength == 0U) return false; - uint8_t buffer[P25_PDU_FEC_LENGTH_BYTES]; - ::memset(buffer, 0x00U, P25_PDU_FEC_LENGTH_BYTES); - ::memcpy(buffer, data + 24U, P25_PDU_FEC_LENGTH_BYTES); - auto it = std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) { return x.second->peerId == peerId; }); if (it == m_status.end()) { // create a new status entry @@ -123,6 +121,7 @@ bool P25PacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t pee status->callStartTime = pktTime; status->streamId = streamId; status->peerId = peerId; + status->totalBlocks = totalBlocks; m_status.unlock(); m_status.insert(peerId, status); @@ -164,116 +163,140 @@ bool P25PacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t pee m_status.unlock(); // make sure we don't get a PDU with more blocks then we support - if (currentBlock >= P25_MAX_PDU_BLOCKS) { + if (currentBlock >= P25_MAX_PDU_BLOCKS || status->totalBlocks > P25_MAX_PDU_BLOCKS) { LogError(LOG_P25, P25_PDU_STR ", too many PDU blocks to process, %u > %u", currentBlock, P25_MAX_PDU_BLOCKS); return false; } - // block 0 is always the PDU header block - if (currentBlock == 0U) { - bool ret = status->assembler.disassemble(buffer, P25_PDU_FEC_LENGTH_BYTES, true); - if (!ret) { - status->streamId = 0U; - return false; - } - - LogInfoEx(LOG_P25, P25_PDU_STR ", peerId = %u, ack = %u, outbound = %u, fmt = $%02X, sap = $%02X, fullMessage = %u, blocksToFollow = %u, padLength = %u, packetLength = %u, S = %u, n = %u, seqNo = %u, hdrOffset = %u, llId = %u", - peerId, status->assembler.dataHeader.getAckNeeded(), status->assembler.dataHeader.getOutbound(), status->assembler.dataHeader.getFormat(), status->assembler.dataHeader.getSAP(), status->assembler.dataHeader.getFullMessage(), - status->assembler.dataHeader.getBlocksToFollow(), status->assembler.dataHeader.getPadLength(), status->assembler.dataHeader.getPacketLength(), status->assembler.dataHeader.getSynchronize(), status->assembler.dataHeader.getNs(), - status->assembler.dataHeader.getFSN(), status->assembler.dataHeader.getHeaderOffset(), status->assembler.dataHeader.getLLId()); - - // make sure we don't get a PDU with more blocks then we support - if (status->assembler.dataHeader.getBlocksToFollow() >= P25_MAX_PDU_BLOCKS) { - LogError(LOG_P25, P25_PDU_STR ", too many PDU blocks to process, %u > %u", status->assembler.dataHeader.getBlocksToFollow(), P25_MAX_PDU_BLOCKS); - status->streamId = 0U; - return false; - } - - status->hasRxHeader = true; - status->llId = status->assembler.dataHeader.getLLId(); + LogInfoEx(LOG_NET, P25_PDU_STR ", received block %u, peerId = %u, len = %u", + currentBlock, peerId, blockLength); + + // store the received block + uint8_t* blockData = new uint8_t[blockLength]; + ::memcpy(blockData, data + 24U, blockLength); + status->receivedBlocks[currentBlock] = blockData; + status->dataBlockCnt++; + + totalBlocks = status->totalBlocks; + if (status->dataBlockCnt == totalBlocks) { + for (uint16_t i = 0U; i < totalBlocks; i++) { + if (status->receivedBlocks.find(i) != status->receivedBlocks.end()) { + // block 0 is always the PDU header block + if (i == 0U) { + bool ret = status->assembler.disassemble(status->receivedBlocks[i], P25_PDU_FEC_LENGTH_BYTES, true); + if (!ret) { + status->streamId = 0U; + status->clearReceivedBlocks(); + return false; + } - m_readyForNextPkt[status->llId] = true; + LogInfoEx(LOG_P25, P25_PDU_STR ", peerId = %u, ack = %u, outbound = %u, fmt = $%02X, sap = $%02X, fullMessage = %u, blocksToFollow = %u, padLength = %u, packetLength = %u, S = %u, n = %u, seqNo = %u, hdrOffset = %u, llId = %u", + peerId, status->assembler.dataHeader.getAckNeeded(), status->assembler.dataHeader.getOutbound(), status->assembler.dataHeader.getFormat(), status->assembler.dataHeader.getSAP(), status->assembler.dataHeader.getFullMessage(), + status->assembler.dataHeader.getBlocksToFollow(), status->assembler.dataHeader.getPadLength(), status->assembler.dataHeader.getPacketLength(), status->assembler.dataHeader.getSynchronize(), status->assembler.dataHeader.getNs(), + status->assembler.dataHeader.getFSN(), status->assembler.dataHeader.getHeaderOffset(), status->assembler.dataHeader.getLLId()); + + // make sure we don't get a PDU with more blocks then we support + if (status->assembler.dataHeader.getBlocksToFollow() >= P25_MAX_PDU_BLOCKS) { + LogError(LOG_P25, P25_PDU_STR ", too many PDU blocks to process, %u > %u", status->assembler.dataHeader.getBlocksToFollow(), P25_MAX_PDU_BLOCKS); + status->streamId = 0U; + status->clearReceivedBlocks(); + return false; + } - // is this a response header? - if (status->assembler.dataHeader.getFormat() == PDUFormatType::RSP) { - dispatch(peerId); - status->streamId = 0U; - return true; - } + status->hasRxHeader = true; + status->llId = status->assembler.dataHeader.getLLId(); - LogInfoEx((fromUpstream) ? LOG_PEER : LOG_MASTER, "P25, Data Call Start, peer = %u, llId = %u, streamId = %u, fromUpstream = %u", peerId, status->llId, streamId, fromUpstream); - return true; - } + m_readyForNextPkt[status->llId] = true; - status->callBusy = true; - bool ret = status->assembler.disassemble(data + 24U, blockLength); - if (!ret) { - status->callBusy = false; - return false; - } - else { - if (status->hasRxHeader && status->assembler.getComplete()) { - // is the source ID a blacklisted ID? - lookups::RadioId rid = m_network->m_ridLookup->find(status->assembler.dataHeader.getLLId()); - if (!rid.radioDefault()) { - if (!rid.radioEnabled()) { - // report error event to InfluxDB - if (m_network->m_enableInfluxDB) { - influxdb::QueryBuilder() - .meas("call_error_event") - .tag("peerId", std::to_string(peerId)) - .tag("streamId", std::to_string(streamId)) - .tag("srcId", std::to_string(status->assembler.dataHeader.getLLId())) - .tag("dstId", std::to_string(status->assembler.dataHeader.getLLId())) - .field("message", INFLUXDB_ERRSTR_DISABLED_SRC_RID) - .timestamp(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count()) - .requestAsync(m_network->m_influxServer); + // is this a response header? + if (status->assembler.dataHeader.getFormat() == PDUFormatType::RSP) { + dispatch(peerId); + status->streamId = 0U; + status->clearReceivedBlocks(); + return true; } - m_status.erase(peerId); - delete status; - status = nullptr; - return false; + LogInfoEx((fromUpstream) ? LOG_PEER : LOG_MASTER, "P25, Data Call Start, peer = %u, llId = %u, streamId = %u, fromUpstream = %u", peerId, status->llId, streamId, fromUpstream); + continue; } - } - status->callBusy = true; - - // process all blocks in the data stream - status->pduUserDataLength = status->assembler.getUserDataLength(); - status->pduUserData = new uint8_t[P25_MAX_PDU_BLOCKS * P25_PDU_CONFIRMED_LENGTH_BYTES + 2U]; - ::memset(status->pduUserData, 0x00U, P25_MAX_PDU_BLOCKS * P25_PDU_CONFIRMED_LENGTH_BYTES + 2U); - - status->assembler.getUserData(status->pduUserData); - - // dispatch the PDU data - dispatch(peerId); - - uint64_t duration = hrc::diff(pktTime, status->callStartTime); - uint32_t srcId = (status->assembler.getExtendedAddress()) ? status->assembler.dataHeader.getSrcLLId() : status->assembler.dataHeader.getLLId(); - uint32_t dstId = status->assembler.dataHeader.getLLId(); - LogInfoEx((fromUpstream) ? LOG_PEER : LOG_MASTER, "P25, Data Call End, peer = %u, srcId = %u, dstId = %u, blocks = %u, duration = %u, streamId = %u, fromUpstream = %u", - peerId, srcId, dstId, status->assembler.dataHeader.getBlocksToFollow(), duration / 1000, streamId, fromUpstream); - - // report call event to InfluxDB - if (m_network->m_enableInfluxDB) { - influxdb::QueryBuilder() - .meas("call_event") - .tag("peerId", std::to_string(peerId)) - .tag("mode", "P25") - .tag("streamId", std::to_string(streamId)) - .tag("srcId", std::to_string(srcId)) - .tag("dstId", std::to_string(dstId)) - .field("duration", duration) - .timestamp(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count()) - .requestAsync(m_network->m_influxServer); + status->callBusy = true; + bool ret = status->assembler.disassemble(status->receivedBlocks[i], blockLength); + if (!ret) { + status->callBusy = false; + status->clearReceivedBlocks(); + return false; + } + else { + if (status->hasRxHeader && status->assembler.getComplete()) { + // is the source ID a blacklisted ID? + lookups::RadioId rid = m_network->m_ridLookup->find(status->assembler.dataHeader.getLLId()); + if (!rid.radioDefault()) { + if (!rid.radioEnabled()) { + // report error event to InfluxDB + if (m_network->m_enableInfluxDB) { + influxdb::QueryBuilder() + .meas("call_error_event") + .tag("peerId", std::to_string(peerId)) + .tag("streamId", std::to_string(streamId)) + .tag("srcId", std::to_string(status->assembler.dataHeader.getLLId())) + .tag("dstId", std::to_string(status->assembler.dataHeader.getLLId())) + .field("message", INFLUXDB_ERRSTR_DISABLED_SRC_RID) + .timestamp(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count()) + .requestAsync(m_network->m_influxServer); + } + + m_status.erase(peerId); + delete status; + status = nullptr; + return false; + } + } + + status->callBusy = true; + + // process all blocks in the data stream + status->pduUserDataLength = status->assembler.getUserDataLength(); + status->pduUserData = new uint8_t[P25_MAX_PDU_BLOCKS * P25_PDU_CONFIRMED_LENGTH_BYTES + 2U]; + ::memset(status->pduUserData, 0x00U, P25_MAX_PDU_BLOCKS * P25_PDU_CONFIRMED_LENGTH_BYTES + 2U); + + // dispatch the PDU data + if (status->assembler.getUserData(status->pduUserData) > 0U) { + if (m_network->m_dumpPacketData) { + Utils::dump(1U, "P25, PDU Packet", status->pduUserData, status->pduUserDataLength); + } + dispatch(peerId); + } + + uint64_t duration = hrc::diff(pktTime, status->callStartTime); + uint32_t srcId = (status->assembler.getExtendedAddress()) ? status->assembler.dataHeader.getSrcLLId() : status->assembler.dataHeader.getLLId(); + uint32_t dstId = status->assembler.dataHeader.getLLId(); + LogInfoEx((fromUpstream) ? LOG_PEER : LOG_MASTER, "P25, Data Call End, peer = %u, srcId = %u, dstId = %u, blocks = %u, duration = %u, streamId = %u, fromUpstream = %u", + peerId, srcId, dstId, status->assembler.dataHeader.getBlocksToFollow(), duration / 1000, streamId, fromUpstream); + + // report call event to InfluxDB + if (m_network->m_enableInfluxDB) { + influxdb::QueryBuilder() + .meas("call_event") + .tag("peerId", std::to_string(peerId)) + .tag("mode", "P25") + .tag("streamId", std::to_string(streamId)) + .tag("srcId", std::to_string(srcId)) + .tag("dstId", std::to_string(dstId)) + .field("duration", duration) + .timestamp(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count()) + .requestAsync(m_network->m_influxServer); + } + + m_status.erase(peerId); + delete status; + status = nullptr; + break; + } else { + status->callBusy = false; + } + } } - - m_status.erase(peerId); - delete status; - status = nullptr; - } else { - status->callBusy = false; } } diff --git a/src/fne/network/callhandler/packetdata/P25PacketData.h b/src/fne/network/callhandler/packetdata/P25PacketData.h index aa1514a4..27b3189a 100644 --- a/src/fne/network/callhandler/packetdata/P25PacketData.h +++ b/src/fne/network/callhandler/packetdata/P25PacketData.h @@ -153,19 +153,22 @@ namespace network */ class RxStatus { public: - system_clock::hrc::hrc_t callStartTime; - system_clock::hrc::hrc_t lastPacket; - uint32_t llId; - uint32_t streamId; - uint32_t peerId; + system_clock::hrc::hrc_t callStartTime; //!< Data call start time + system_clock::hrc::hrc_t lastPacket; //!< Last packet time + uint32_t llId; //!< Logical Link ID + uint32_t streamId; //!< Stream ID + uint32_t peerId; //!< Peer ID - p25::data::Assembler assembler; - bool hasRxHeader; + std::unordered_map receivedBlocks; + p25::data::Assembler assembler; //!< PDU Assembler Instance + bool hasRxHeader; //!< Flag indicating whether or not a valid Rx header has been received + uint16_t dataBlockCnt; //!< Number of data blocks received + uint16_t totalBlocks; //!< Total number of blocks expected - bool callBusy; + bool callBusy; //!< Flag indicating whether or not the call is busy - uint8_t* pduUserData; - uint32_t pduUserDataLength; + uint8_t* pduUserData; //!< PDU user data buffer + uint32_t pduUserDataLength; //!< Length of PDU user data buffer /** * @brief Initializes a new instance of the RxStatus class @@ -174,8 +177,11 @@ namespace network llId(0U), streamId(0U), peerId(0U), + receivedBlocks(), assembler(), hasRxHeader(false), + dataBlockCnt(0U), + totalBlocks(0U), callBusy(false), pduUserData(nullptr), pduUserDataLength(0U) @@ -188,9 +194,29 @@ namespace network */ ~RxStatus() { + clearReceivedBlocks(); if (pduUserData != nullptr) delete[] pduUserData; } + + /** + * @brief Clears all received blocks and frees associated memory. + */ + void clearReceivedBlocks() + { + totalBlocks = 0U; + dataBlockCnt = 0U; + + if (!receivedBlocks.empty()) { + for (auto& it : receivedBlocks) { + if (it.second != nullptr) { + delete[] it.second; + it.second = nullptr; + } + } + receivedBlocks.clear(); + } + } }; typedef std::pair StatusMapPair; concurrent::unordered_map m_status;