implement proper packet reordering for P25 PDU data; BUGFIX: add boundary checking for P25 PDU disassembly;

r05a04_dev
Bryan Biedenkapp 1 month ago
parent 096e9fb0cc
commit cb0dcc7f03

@ -104,6 +104,14 @@ bool Assembler::disassemble(const uint8_t* pduBlock, uint32_t blockLength, bool
dataHeader.getHeaderOffset(), dataHeader.getLLId());
}
if (dataHeader.getPacketLength() > P25_MAX_PDU_BLOCKS * P25_PDU_CONFIRMED_LENGTH_BYTES + 2U) {
LogError(LOG_P25, P25_PDU_STR ", ISP, packet length %u exceeds maximum supported size %u",
dataHeader.getPacketLength(), P25_MAX_PDU_BLOCKS * P25_PDU_CONFIRMED_LENGTH_BYTES);
resetDisassemblyState();
return false;
}
// make sure we don't get a PDU with more blocks then we support
if (dataHeader.getBlocksToFollow() >= P25_MAX_PDU_BLOCKS) {
LogError(LOG_P25, P25_PDU_STR ", ISP, too many PDU blocks to process, %u > %u", dataHeader.getBlocksToFollow(), P25_MAX_PDU_BLOCKS);
@ -485,6 +493,21 @@ uint32_t Assembler::getUserData(uint8_t* buffer) const
assert(buffer != nullptr);
assert(m_pduUserData != nullptr);
if (m_pduUserDataLength == 0U) {
LogError(LOG_P25, P25_PDU_STR ", no user data available to retrieve! BUGBUG!");
return 0U;
}
if (m_pduUserDataLength > (P25_MAX_PDU_BLOCKS * P25_PDU_CONFIRMED_LENGTH_BYTES + 2U)) {
LogError(LOG_P25, P25_PDU_STR ", user data length %u exceeds maximum allowable size! BUGBUG!", m_pduUserDataLength);
return 0U;
}
if (m_pduUserData == nullptr) {
LogError(LOG_P25, P25_PDU_STR ", no user data available to retrieve! BUGBUG!");
return 0U;
}
if (m_complete) {
::memcpy(buffer, m_pduUserData, m_pduUserDataLength);
return m_pduUserDataLength;

@ -124,23 +124,24 @@ namespace network
*/
class RxStatus {
public:
system_clock::hrc::hrc_t callStartTime;
system_clock::hrc::hrc_t lastPacket;
uint32_t srcId;
uint32_t dstId;
uint8_t slotNo;
uint32_t streamId;
uint32_t peerId;
system_clock::hrc::hrc_t callStartTime; //!< Data call start time
system_clock::hrc::hrc_t lastPacket; //!< Last packet time
uint32_t srcId; //!< Source Radio ID
uint32_t dstId; //!< Destination Radio ID
uint8_t slotNo; //!< DMR Slot Number
uint32_t streamId; //!< Stream ID
uint32_t peerId; //!< Peer ID
dmr::data::DataHeader header;
bool hasRxHeader;
uint8_t dataBlockCnt;
uint8_t frames;
std::unordered_map<uint16_t, uint8_t*> receivedBlocks;
dmr::data::DataHeader header; //!< PDU Data Header
bool hasRxHeader; //!< Flag indicating whether or not a valid Rx header has been received
uint16_t dataBlockCnt; //!< Number of data blocks received
uint8_t frames; //!< Number of frames received
bool callBusy;
bool callBusy; //!< Flag indicating whether or not the call is busy
uint8_t* pduUserData;
uint32_t pduDataOffset;
uint8_t* pduUserData; //!< PDU user data buffer
uint32_t pduDataOffset; //!< Offset within the PDU data buffer
/**
* @brief Initializes a new instance of the RxStatus class
@ -151,6 +152,7 @@ namespace network
slotNo(0U),
streamId(0U),
peerId(0U),
receivedBlocks(),
header(),
hasRxHeader(false),
dataBlockCnt(0U),
@ -167,6 +169,16 @@ namespace network
*/
~RxStatus()
{
if (!receivedBlocks.empty()) {
for (auto& it : receivedBlocks) {
if (it.second != nullptr) {
delete[] it.second;
it.second = nullptr;
}
}
receivedBlocks.clear();
}
delete[] pduUserData;
}
};

@ -104,17 +104,15 @@ bool P25PacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t pee
{
hrc::hrc_t pktTime = hrc::now();
uint8_t totalBlocks = data[20U] + 1U;
uint32_t blockLength = GET_UINT24(data, 8U);
uint8_t currentBlock = data[21U];
if (totalBlocks == 0U)
return false;
if (blockLength == 0U)
return false;
uint8_t buffer[P25_PDU_FEC_LENGTH_BYTES];
::memset(buffer, 0x00U, P25_PDU_FEC_LENGTH_BYTES);
::memcpy(buffer, data + 24U, P25_PDU_FEC_LENGTH_BYTES);
auto it = std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) { return x.second->peerId == peerId; });
if (it == m_status.end()) {
// create a new status entry
@ -123,6 +121,7 @@ bool P25PacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t pee
status->callStartTime = pktTime;
status->streamId = streamId;
status->peerId = peerId;
status->totalBlocks = totalBlocks;
m_status.unlock();
m_status.insert(peerId, status);
@ -164,116 +163,140 @@ bool P25PacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t pee
m_status.unlock();
// make sure we don't get a PDU with more blocks then we support
if (currentBlock >= P25_MAX_PDU_BLOCKS) {
if (currentBlock >= P25_MAX_PDU_BLOCKS || status->totalBlocks > P25_MAX_PDU_BLOCKS) {
LogError(LOG_P25, P25_PDU_STR ", too many PDU blocks to process, %u > %u", currentBlock, P25_MAX_PDU_BLOCKS);
return false;
}
// block 0 is always the PDU header block
if (currentBlock == 0U) {
bool ret = status->assembler.disassemble(buffer, P25_PDU_FEC_LENGTH_BYTES, true);
if (!ret) {
status->streamId = 0U;
return false;
}
LogInfoEx(LOG_P25, P25_PDU_STR ", peerId = %u, ack = %u, outbound = %u, fmt = $%02X, sap = $%02X, fullMessage = %u, blocksToFollow = %u, padLength = %u, packetLength = %u, S = %u, n = %u, seqNo = %u, hdrOffset = %u, llId = %u",
peerId, status->assembler.dataHeader.getAckNeeded(), status->assembler.dataHeader.getOutbound(), status->assembler.dataHeader.getFormat(), status->assembler.dataHeader.getSAP(), status->assembler.dataHeader.getFullMessage(),
status->assembler.dataHeader.getBlocksToFollow(), status->assembler.dataHeader.getPadLength(), status->assembler.dataHeader.getPacketLength(), status->assembler.dataHeader.getSynchronize(), status->assembler.dataHeader.getNs(),
status->assembler.dataHeader.getFSN(), status->assembler.dataHeader.getHeaderOffset(), status->assembler.dataHeader.getLLId());
// make sure we don't get a PDU with more blocks then we support
if (status->assembler.dataHeader.getBlocksToFollow() >= P25_MAX_PDU_BLOCKS) {
LogError(LOG_P25, P25_PDU_STR ", too many PDU blocks to process, %u > %u", status->assembler.dataHeader.getBlocksToFollow(), P25_MAX_PDU_BLOCKS);
status->streamId = 0U;
return false;
}
status->hasRxHeader = true;
status->llId = status->assembler.dataHeader.getLLId();
LogInfoEx(LOG_NET, P25_PDU_STR ", received block %u, peerId = %u, len = %u",
currentBlock, peerId, blockLength);
// store the received block
uint8_t* blockData = new uint8_t[blockLength];
::memcpy(blockData, data + 24U, blockLength);
status->receivedBlocks[currentBlock] = blockData;
status->dataBlockCnt++;
totalBlocks = status->totalBlocks;
if (status->dataBlockCnt == totalBlocks) {
for (uint16_t i = 0U; i < totalBlocks; i++) {
if (status->receivedBlocks.find(i) != status->receivedBlocks.end()) {
// block 0 is always the PDU header block
if (i == 0U) {
bool ret = status->assembler.disassemble(status->receivedBlocks[i], P25_PDU_FEC_LENGTH_BYTES, true);
if (!ret) {
status->streamId = 0U;
status->clearReceivedBlocks();
return false;
}
m_readyForNextPkt[status->llId] = true;
LogInfoEx(LOG_P25, P25_PDU_STR ", peerId = %u, ack = %u, outbound = %u, fmt = $%02X, sap = $%02X, fullMessage = %u, blocksToFollow = %u, padLength = %u, packetLength = %u, S = %u, n = %u, seqNo = %u, hdrOffset = %u, llId = %u",
peerId, status->assembler.dataHeader.getAckNeeded(), status->assembler.dataHeader.getOutbound(), status->assembler.dataHeader.getFormat(), status->assembler.dataHeader.getSAP(), status->assembler.dataHeader.getFullMessage(),
status->assembler.dataHeader.getBlocksToFollow(), status->assembler.dataHeader.getPadLength(), status->assembler.dataHeader.getPacketLength(), status->assembler.dataHeader.getSynchronize(), status->assembler.dataHeader.getNs(),
status->assembler.dataHeader.getFSN(), status->assembler.dataHeader.getHeaderOffset(), status->assembler.dataHeader.getLLId());
// make sure we don't get a PDU with more blocks then we support
if (status->assembler.dataHeader.getBlocksToFollow() >= P25_MAX_PDU_BLOCKS) {
LogError(LOG_P25, P25_PDU_STR ", too many PDU blocks to process, %u > %u", status->assembler.dataHeader.getBlocksToFollow(), P25_MAX_PDU_BLOCKS);
status->streamId = 0U;
status->clearReceivedBlocks();
return false;
}
// is this a response header?
if (status->assembler.dataHeader.getFormat() == PDUFormatType::RSP) {
dispatch(peerId);
status->streamId = 0U;
return true;
}
status->hasRxHeader = true;
status->llId = status->assembler.dataHeader.getLLId();
LogInfoEx((fromUpstream) ? LOG_PEER : LOG_MASTER, "P25, Data Call Start, peer = %u, llId = %u, streamId = %u, fromUpstream = %u", peerId, status->llId, streamId, fromUpstream);
return true;
}
m_readyForNextPkt[status->llId] = true;
status->callBusy = true;
bool ret = status->assembler.disassemble(data + 24U, blockLength);
if (!ret) {
status->callBusy = false;
return false;
}
else {
if (status->hasRxHeader && status->assembler.getComplete()) {
// is the source ID a blacklisted ID?
lookups::RadioId rid = m_network->m_ridLookup->find(status->assembler.dataHeader.getLLId());
if (!rid.radioDefault()) {
if (!rid.radioEnabled()) {
// report error event to InfluxDB
if (m_network->m_enableInfluxDB) {
influxdb::QueryBuilder()
.meas("call_error_event")
.tag("peerId", std::to_string(peerId))
.tag("streamId", std::to_string(streamId))
.tag("srcId", std::to_string(status->assembler.dataHeader.getLLId()))
.tag("dstId", std::to_string(status->assembler.dataHeader.getLLId()))
.field("message", INFLUXDB_ERRSTR_DISABLED_SRC_RID)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.requestAsync(m_network->m_influxServer);
// is this a response header?
if (status->assembler.dataHeader.getFormat() == PDUFormatType::RSP) {
dispatch(peerId);
status->streamId = 0U;
status->clearReceivedBlocks();
return true;
}
m_status.erase(peerId);
delete status;
status = nullptr;
return false;
LogInfoEx((fromUpstream) ? LOG_PEER : LOG_MASTER, "P25, Data Call Start, peer = %u, llId = %u, streamId = %u, fromUpstream = %u", peerId, status->llId, streamId, fromUpstream);
continue;
}
}
status->callBusy = true;
// process all blocks in the data stream
status->pduUserDataLength = status->assembler.getUserDataLength();
status->pduUserData = new uint8_t[P25_MAX_PDU_BLOCKS * P25_PDU_CONFIRMED_LENGTH_BYTES + 2U];
::memset(status->pduUserData, 0x00U, P25_MAX_PDU_BLOCKS * P25_PDU_CONFIRMED_LENGTH_BYTES + 2U);
status->assembler.getUserData(status->pduUserData);
// dispatch the PDU data
dispatch(peerId);
uint64_t duration = hrc::diff(pktTime, status->callStartTime);
uint32_t srcId = (status->assembler.getExtendedAddress()) ? status->assembler.dataHeader.getSrcLLId() : status->assembler.dataHeader.getLLId();
uint32_t dstId = status->assembler.dataHeader.getLLId();
LogInfoEx((fromUpstream) ? LOG_PEER : LOG_MASTER, "P25, Data Call End, peer = %u, srcId = %u, dstId = %u, blocks = %u, duration = %u, streamId = %u, fromUpstream = %u",
peerId, srcId, dstId, status->assembler.dataHeader.getBlocksToFollow(), duration / 1000, streamId, fromUpstream);
// report call event to InfluxDB
if (m_network->m_enableInfluxDB) {
influxdb::QueryBuilder()
.meas("call_event")
.tag("peerId", std::to_string(peerId))
.tag("mode", "P25")
.tag("streamId", std::to_string(streamId))
.tag("srcId", std::to_string(srcId))
.tag("dstId", std::to_string(dstId))
.field("duration", duration)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.requestAsync(m_network->m_influxServer);
status->callBusy = true;
bool ret = status->assembler.disassemble(status->receivedBlocks[i], blockLength);
if (!ret) {
status->callBusy = false;
status->clearReceivedBlocks();
return false;
}
else {
if (status->hasRxHeader && status->assembler.getComplete()) {
// is the source ID a blacklisted ID?
lookups::RadioId rid = m_network->m_ridLookup->find(status->assembler.dataHeader.getLLId());
if (!rid.radioDefault()) {
if (!rid.radioEnabled()) {
// report error event to InfluxDB
if (m_network->m_enableInfluxDB) {
influxdb::QueryBuilder()
.meas("call_error_event")
.tag("peerId", std::to_string(peerId))
.tag("streamId", std::to_string(streamId))
.tag("srcId", std::to_string(status->assembler.dataHeader.getLLId()))
.tag("dstId", std::to_string(status->assembler.dataHeader.getLLId()))
.field("message", INFLUXDB_ERRSTR_DISABLED_SRC_RID)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.requestAsync(m_network->m_influxServer);
}
m_status.erase(peerId);
delete status;
status = nullptr;
return false;
}
}
status->callBusy = true;
// process all blocks in the data stream
status->pduUserDataLength = status->assembler.getUserDataLength();
status->pduUserData = new uint8_t[P25_MAX_PDU_BLOCKS * P25_PDU_CONFIRMED_LENGTH_BYTES + 2U];
::memset(status->pduUserData, 0x00U, P25_MAX_PDU_BLOCKS * P25_PDU_CONFIRMED_LENGTH_BYTES + 2U);
// dispatch the PDU data
if (status->assembler.getUserData(status->pduUserData) > 0U) {
if (m_network->m_dumpPacketData) {
Utils::dump(1U, "P25, PDU Packet", status->pduUserData, status->pduUserDataLength);
}
dispatch(peerId);
}
uint64_t duration = hrc::diff(pktTime, status->callStartTime);
uint32_t srcId = (status->assembler.getExtendedAddress()) ? status->assembler.dataHeader.getSrcLLId() : status->assembler.dataHeader.getLLId();
uint32_t dstId = status->assembler.dataHeader.getLLId();
LogInfoEx((fromUpstream) ? LOG_PEER : LOG_MASTER, "P25, Data Call End, peer = %u, srcId = %u, dstId = %u, blocks = %u, duration = %u, streamId = %u, fromUpstream = %u",
peerId, srcId, dstId, status->assembler.dataHeader.getBlocksToFollow(), duration / 1000, streamId, fromUpstream);
// report call event to InfluxDB
if (m_network->m_enableInfluxDB) {
influxdb::QueryBuilder()
.meas("call_event")
.tag("peerId", std::to_string(peerId))
.tag("mode", "P25")
.tag("streamId", std::to_string(streamId))
.tag("srcId", std::to_string(srcId))
.tag("dstId", std::to_string(dstId))
.field("duration", duration)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.requestAsync(m_network->m_influxServer);
}
m_status.erase(peerId);
delete status;
status = nullptr;
break;
} else {
status->callBusy = false;
}
}
}
m_status.erase(peerId);
delete status;
status = nullptr;
} else {
status->callBusy = false;
}
}

@ -153,19 +153,22 @@ namespace network
*/
class RxStatus {
public:
system_clock::hrc::hrc_t callStartTime;
system_clock::hrc::hrc_t lastPacket;
uint32_t llId;
uint32_t streamId;
uint32_t peerId;
system_clock::hrc::hrc_t callStartTime; //!< Data call start time
system_clock::hrc::hrc_t lastPacket; //!< Last packet time
uint32_t llId; //!< Logical Link ID
uint32_t streamId; //!< Stream ID
uint32_t peerId; //!< Peer ID
p25::data::Assembler assembler;
bool hasRxHeader;
std::unordered_map<uint16_t, uint8_t*> receivedBlocks;
p25::data::Assembler assembler; //!< PDU Assembler Instance
bool hasRxHeader; //!< Flag indicating whether or not a valid Rx header has been received
uint16_t dataBlockCnt; //!< Number of data blocks received
uint16_t totalBlocks; //!< Total number of blocks expected
bool callBusy;
bool callBusy; //!< Flag indicating whether or not the call is busy
uint8_t* pduUserData;
uint32_t pduUserDataLength;
uint8_t* pduUserData; //!< PDU user data buffer
uint32_t pduUserDataLength; //!< Length of PDU user data buffer
/**
* @brief Initializes a new instance of the RxStatus class
@ -174,8 +177,11 @@ namespace network
llId(0U),
streamId(0U),
peerId(0U),
receivedBlocks(),
assembler(),
hasRxHeader(false),
dataBlockCnt(0U),
totalBlocks(0U),
callBusy(false),
pduUserData(nullptr),
pduUserDataLength(0U)
@ -188,9 +194,29 @@ namespace network
*/
~RxStatus()
{
clearReceivedBlocks();
if (pduUserData != nullptr)
delete[] pduUserData;
}
/**
* @brief Clears all received blocks and frees associated memory.
*/
void clearReceivedBlocks()
{
totalBlocks = 0U;
dataBlockCnt = 0U;
if (!receivedBlocks.empty()) {
for (auto& it : receivedBlocks) {
if (it.second != nullptr) {
delete[] it.second;
it.second = nullptr;
}
}
receivedBlocks.clear();
}
}
};
typedef std::pair<const uint32_t, RxStatus*> StatusMapPair;
concurrent::unordered_map<uint32_t, RxStatus*> m_status;

Loading…
Cancel
Save

Powered by TurnKey Linux.