BUGFIX: due to refactor in #88, it is possible for PDUs to arrive simultaneously in a very efficient manner, this confuses the P25 packet data handler logic causing very strange behavior (including a crash), this bugfix corrects that problem by: a) allowing data blocks for a given peer to arrive and be stored in any order (order is defined by the current block number), b) allow the data header to arrive at any time, c) once all blocks have been received and a valid data header is received, then the original PDU dispatch logic may execute;

pull/91/head
Bryan Biedenkapp 8 months ago
parent 1bf33a2055
commit de2c471cd7

@ -96,94 +96,92 @@ bool P25PacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t pee
LogWarning(LOG_NET, "P25, Data Call Collision, peer = %u, streamId = %u, rxPeer = %u, rxLlId = %u, rxStreamId = %u, external = %u",
peerId, streamId, status->peerId, status->llId, status->streamId, external);
uint64_t duration = hrc::diff(pktTime, status->callStartTime);
if ((duration / 1000) > DATA_CALL_COLL_TIMEOUT) {
LogWarning(LOG_NET, "P25, force clearing stuck data call, timeout, peer = %u, streamId = %u, rxPeer = %u, rxLlId = %u, rxStreamId = %u, external = %u",
peerId, streamId, status->peerId, status->llId, status->streamId, external);
LogWarning(LOG_NET, "P25, clearing previous data call, timeout, peer = %u, streamId = %u, rxPeer = %u, rxLlId = %u, rxStreamId = %u, external = %u",
peerId, streamId, status->peerId, status->llId, status->streamId, external);
delete status;
m_status.erase(peerId);
}
delete status;
m_status.erase(peerId);
return false;
}
} else {
if (currentBlock == 0U) {
// this is a new call stream
RxStatus* status = new RxStatus();
// create a new status entry
m_status.lock(true);
RxStatus *status = new RxStatus();
status->callStartTime = pktTime;
status->streamId = streamId;
status->peerId = peerId;
m_status.unlock();
bool ret = status->header.decode(buffer);
if (!ret) {
LogWarning(LOG_NET, P25_PDU_STR ", unfixable RF 1/2 rate header data");
Utils::dump(1U, "Unfixable PDU Data", buffer, P25_PDU_FEC_LENGTH_BYTES);
return false;
}
m_status.insert(peerId, status);
}
} else {
// create a new status entry
m_status.lock(true);
RxStatus *status = new RxStatus();
status->callStartTime = pktTime;
status->streamId = streamId;
status->peerId = peerId;
m_status.unlock();
m_status.insert(peerId, status);
}
LogMessage(LOG_NET, P25_PDU_STR ", peerId = %u, ack = %u, outbound = %u, fmt = $%02X, sap = $%02X, fullMessage = %u, blocksToFollow = %u, padLength = %u, packetLength = %u, S = %u, n = %u, seqNo = %u, hdrOffset = %u, llId = %u",
peerId, status->header.getAckNeeded(), status->header.getOutbound(), status->header.getFormat(), status->header.getSAP(), status->header.getFullMessage(),
status->header.getBlocksToFollow(), status->header.getPadLength(), status->header.getPacketLength(), status->header.getSynchronize(), status->header.getNs(), status->header.getFSN(),
status->header.getHeaderOffset(), status->header.getLLId());
RxStatus* status = m_status[peerId];
// make sure we don't get a PDU with more blocks then we support
if (status->header.getBlocksToFollow() >= P25_MAX_PDU_BLOCKS) {
LogError(LOG_NET, P25_PDU_STR ", too many PDU blocks to process, %u > %u", status->header.getBlocksToFollow(), P25_MAX_PDU_BLOCKS);
return false;
}
// make sure we don't get a PDU with more blocks then we support
if (currentBlock >= P25_MAX_PDU_BLOCKS) {
LogError(LOG_NET, P25_PDU_STR ", too many PDU blocks to process, %u > %u", currentBlock, P25_MAX_PDU_BLOCKS);
status->llId = status->header.getLLId();
delete status;
m_status.erase(peerId);
return false;
}
m_status[peerId] = status;
m_readyForNextPkt[status->llId] = true;
// block 0 is always the PDU header block
if (currentBlock == 0U) {
bool ret = status->header.decode(buffer);
if (!ret) {
LogWarning(LOG_NET, P25_PDU_STR ", unfixable RF 1/2 rate header data");
Utils::dump(1U, "Unfixable PDU Data", buffer, P25_PDU_FEC_LENGTH_BYTES);
// is this a response header?
if (status->header.getFormat() == PDUFormatType::RSP) {
dispatch(peerId);
delete status;
m_status.erase(peerId);
return true;
}
delete status;
m_status.erase(peerId);
return true;
}
LogMessage(LOG_NET, P25_PDU_STR ", peerId = %u, ack = %u, outbound = %u, fmt = $%02X, sap = $%02X, fullMessage = %u, blocksToFollow = %u, padLength = %u, packetLength = %u, S = %u, n = %u, seqNo = %u, hdrOffset = %u, llId = %u",
peerId, status->header.getAckNeeded(), status->header.getOutbound(), status->header.getFormat(), status->header.getSAP(), status->header.getFullMessage(),
status->header.getBlocksToFollow(), status->header.getPadLength(), status->header.getPacketLength(), status->header.getSynchronize(), status->header.getNs(), status->header.getFSN(),
status->header.getHeaderOffset(), status->header.getLLId());
if (status->header.getSAP() != PDUSAP::EXT_ADDR &&
status->header.getFormat() != PDUFormatType::UNCONFIRMED) {
m_suSendSeq[status->llId] = 0U;
}
// make sure we don't get a PDU with more blocks then we support
if (status->header.getBlocksToFollow() >= P25_MAX_PDU_BLOCKS) {
LogError(LOG_NET, P25_PDU_STR ", too many PDU blocks to process, %u > %u", status->header.getBlocksToFollow(), P25_MAX_PDU_BLOCKS);
LogMessage(LOG_NET, "P25, Data Call Start, peer = %u, llId = %u, streamId = %u, external = %u", peerId, status->llId, streamId, external);
return true;
} else {
LogError(LOG_NET, P25_PDU_STR ", illegal starting data block, peerId = %u", peerId);
delete status;
m_status.erase(peerId);
return false;
}
}
RxStatus* status = m_status[peerId];
status->hasRxHeader = true;
status->llId = status->header.getLLId();
// is the source ID a blacklisted ID?
lookups::RadioId rid = m_network->m_ridLookup->find(status->header.getLLId());
if (!rid.radioDefault()) {
if (!rid.radioEnabled()) {
// report error event to InfluxDB
if (m_network->m_enableInfluxDB) {
influxdb::QueryBuilder()
.meas("call_error_event")
.tag("peerId", std::to_string(peerId))
.tag("streamId", std::to_string(streamId))
.tag("srcId", std::to_string(status->header.getLLId()))
.tag("dstId", std::to_string(status->header.getLLId()))
.field("message", INFLUXDB_ERRSTR_DISABLED_SRC_RID)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.requestAsync(m_network->m_influxServer);
}
m_readyForNextPkt[status->llId] = true;
// is this a response header?
if (status->header.getFormat() == PDUFormatType::RSP) {
dispatch(peerId);
delete status;
m_status.erase(peerId);
return false;
return true;
}
if (status->header.getSAP() != PDUSAP::EXT_ADDR &&
status->header.getFormat() != PDUFormatType::UNCONFIRMED) {
m_suSendSeq[status->llId] = 0U;
}
LogMessage(LOG_NET, "P25, Data Call Start, peer = %u, llId = %u, streamId = %u, external = %u", peerId, status->llId, streamId, external);
return true;
}
::memcpy(status->netPDU + status->dataOffset, data + 24U, blockLength);
@ -191,7 +189,30 @@ bool P25PacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t pee
status->netPDUCount++;
status->dataBlockCnt++;
if (status->dataBlockCnt >= status->header.getBlocksToFollow()) {
if (status->hasRxHeader && (status->dataBlockCnt >= status->header.getBlocksToFollow())) {
// is the source ID a blacklisted ID?
lookups::RadioId rid = m_network->m_ridLookup->find(status->header.getLLId());
if (!rid.radioDefault()) {
if (!rid.radioEnabled()) {
// report error event to InfluxDB
if (m_network->m_enableInfluxDB) {
influxdb::QueryBuilder()
.meas("call_error_event")
.tag("peerId", std::to_string(peerId))
.tag("streamId", std::to_string(streamId))
.tag("srcId", std::to_string(status->header.getLLId()))
.tag("dstId", std::to_string(status->header.getLLId()))
.field("message", INFLUXDB_ERRSTR_DISABLED_SRC_RID)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.requestAsync(m_network->m_influxServer);
}
delete status;
m_status.erase(peerId);
return false;
}
}
uint32_t blocksToFollow = status->header.getBlocksToFollow();
uint32_t offset = 0U;
uint32_t dataOffset = 0U;
@ -297,7 +318,7 @@ bool P25PacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t pee
}
// dispatch the PDU data
if (status->dataBlockCnt > 0U) {
if (status->dataBlockCnt > 0U && status->hasRxHeader) {
dispatch(peerId);
}
@ -482,6 +503,11 @@ void P25PacketData::dispatch(uint32_t peerId)
{
RxStatus* status = m_status[peerId];
if (status == nullptr) {
LogError(LOG_NET, P25_PDU_STR ", illegal PDU packet state, status shouldn't be null");
return;
}
bool crcValid = false;
if (status->header.getBlocksToFollow() > 0U) {
if (status->pduUserDataLength < 4U) {

@ -119,6 +119,7 @@ namespace network
p25::data::DataBlock* blockData;
p25::data::DataHeader header;
bool hasRxHeader;
bool extendedAddress;
uint32_t dataOffset;
uint8_t dataBlockCnt;
@ -137,6 +138,7 @@ namespace network
peerId(0U),
blockData(nullptr),
header(),
hasRxHeader(false),
extendedAddress(false),
dataOffset(0U),
dataBlockCnt(0U),
@ -158,9 +160,12 @@ namespace network
*/
~RxStatus()
{
delete[] blockData;
delete[] netPDU;
delete[] pduUserData;
if (blockData != nullptr)
delete[] blockData;
if (netPDU != nullptr)
delete[] netPDU;
if (pduUserData != nullptr)
delete[] pduUserData;
}
};
typedef std::pair<const uint32_t, RxStatus*> StatusMapPair;

Loading…
Cancel
Save

Powered by TurnKey Linux.