diff --git a/src/fne/network/callhandler/packetdata/P25PacketData.cpp b/src/fne/network/callhandler/packetdata/P25PacketData.cpp index 3e8d3c4d..9619f889 100644 --- a/src/fne/network/callhandler/packetdata/P25PacketData.cpp +++ b/src/fne/network/callhandler/packetdata/P25PacketData.cpp @@ -96,94 +96,92 @@ bool P25PacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t pee LogWarning(LOG_NET, "P25, Data Call Collision, peer = %u, streamId = %u, rxPeer = %u, rxLlId = %u, rxStreamId = %u, external = %u", peerId, streamId, status->peerId, status->llId, status->streamId, external); - uint64_t duration = hrc::diff(pktTime, status->callStartTime); - - if ((duration / 1000) > DATA_CALL_COLL_TIMEOUT) { - LogWarning(LOG_NET, "P25, force clearing stuck data call, timeout, peer = %u, streamId = %u, rxPeer = %u, rxLlId = %u, rxStreamId = %u, external = %u", - peerId, streamId, status->peerId, status->llId, status->streamId, external); + LogWarning(LOG_NET, "P25, clearing previous data call, timeout, peer = %u, streamId = %u, rxPeer = %u, rxLlId = %u, rxStreamId = %u, external = %u", + peerId, streamId, status->peerId, status->llId, status->streamId, external); - delete status; - m_status.erase(peerId); - } + delete status; + m_status.erase(peerId); - return false; - } - } else { - if (currentBlock == 0U) { - // this is a new call stream - RxStatus* status = new RxStatus(); + // create a new status entry + m_status.lock(true); + RxStatus *status = new RxStatus(); status->callStartTime = pktTime; status->streamId = streamId; status->peerId = peerId; + m_status.unlock(); - bool ret = status->header.decode(buffer); - if (!ret) { - LogWarning(LOG_NET, P25_PDU_STR ", unfixable RF 1/2 rate header data"); - Utils::dump(1U, "Unfixable PDU Data", buffer, P25_PDU_FEC_LENGTH_BYTES); - return false; - } + m_status.insert(peerId, status); + } + } else { + // create a new status entry + m_status.lock(true); + RxStatus *status = new RxStatus(); + status->callStartTime = pktTime; + status->streamId = streamId; + status->peerId = peerId; + m_status.unlock(); + + m_status.insert(peerId, status); + } - LogMessage(LOG_NET, P25_PDU_STR ", peerId = %u, ack = %u, outbound = %u, fmt = $%02X, sap = $%02X, fullMessage = %u, blocksToFollow = %u, padLength = %u, packetLength = %u, S = %u, n = %u, seqNo = %u, hdrOffset = %u, llId = %u", - peerId, status->header.getAckNeeded(), status->header.getOutbound(), status->header.getFormat(), status->header.getSAP(), status->header.getFullMessage(), - status->header.getBlocksToFollow(), status->header.getPadLength(), status->header.getPacketLength(), status->header.getSynchronize(), status->header.getNs(), status->header.getFSN(), - status->header.getHeaderOffset(), status->header.getLLId()); + RxStatus* status = m_status[peerId]; - // make sure we don't get a PDU with more blocks then we support - if (status->header.getBlocksToFollow() >= P25_MAX_PDU_BLOCKS) { - LogError(LOG_NET, P25_PDU_STR ", too many PDU blocks to process, %u > %u", status->header.getBlocksToFollow(), P25_MAX_PDU_BLOCKS); - return false; - } + // make sure we don't get a PDU with more blocks then we support + if (currentBlock >= P25_MAX_PDU_BLOCKS) { + LogError(LOG_NET, P25_PDU_STR ", too many PDU blocks to process, %u > %u", currentBlock, P25_MAX_PDU_BLOCKS); - status->llId = status->header.getLLId(); + delete status; + m_status.erase(peerId); + return false; + } - m_status[peerId] = status; - m_readyForNextPkt[status->llId] = true; + // block 0 is always the PDU header block + if (currentBlock == 0U) { + bool ret = status->header.decode(buffer); + if (!ret) { + LogWarning(LOG_NET, P25_PDU_STR ", unfixable RF 1/2 rate header data"); + Utils::dump(1U, "Unfixable PDU Data", buffer, P25_PDU_FEC_LENGTH_BYTES); - // is this a response header? - if (status->header.getFormat() == PDUFormatType::RSP) { - dispatch(peerId); + delete status; + m_status.erase(peerId); + return true; + } - delete status; - m_status.erase(peerId); - return true; - } + LogMessage(LOG_NET, P25_PDU_STR ", peerId = %u, ack = %u, outbound = %u, fmt = $%02X, sap = $%02X, fullMessage = %u, blocksToFollow = %u, padLength = %u, packetLength = %u, S = %u, n = %u, seqNo = %u, hdrOffset = %u, llId = %u", + peerId, status->header.getAckNeeded(), status->header.getOutbound(), status->header.getFormat(), status->header.getSAP(), status->header.getFullMessage(), + status->header.getBlocksToFollow(), status->header.getPadLength(), status->header.getPacketLength(), status->header.getSynchronize(), status->header.getNs(), status->header.getFSN(), + status->header.getHeaderOffset(), status->header.getLLId()); - if (status->header.getSAP() != PDUSAP::EXT_ADDR && - status->header.getFormat() != PDUFormatType::UNCONFIRMED) { - m_suSendSeq[status->llId] = 0U; - } + // make sure we don't get a PDU with more blocks then we support + if (status->header.getBlocksToFollow() >= P25_MAX_PDU_BLOCKS) { + LogError(LOG_NET, P25_PDU_STR ", too many PDU blocks to process, %u > %u", status->header.getBlocksToFollow(), P25_MAX_PDU_BLOCKS); - LogMessage(LOG_NET, "P25, Data Call Start, peer = %u, llId = %u, streamId = %u, external = %u", peerId, status->llId, streamId, external); - return true; - } else { - LogError(LOG_NET, P25_PDU_STR ", illegal starting data block, peerId = %u", peerId); + delete status; + m_status.erase(peerId); return false; } - } - RxStatus* status = m_status[peerId]; + status->hasRxHeader = true; + status->llId = status->header.getLLId(); - // is the source ID a blacklisted ID? - lookups::RadioId rid = m_network->m_ridLookup->find(status->header.getLLId()); - if (!rid.radioDefault()) { - if (!rid.radioEnabled()) { - // report error event to InfluxDB - if (m_network->m_enableInfluxDB) { - influxdb::QueryBuilder() - .meas("call_error_event") - .tag("peerId", std::to_string(peerId)) - .tag("streamId", std::to_string(streamId)) - .tag("srcId", std::to_string(status->header.getLLId())) - .tag("dstId", std::to_string(status->header.getLLId())) - .field("message", INFLUXDB_ERRSTR_DISABLED_SRC_RID) - .timestamp(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count()) - .requestAsync(m_network->m_influxServer); - } + m_readyForNextPkt[status->llId] = true; + + // is this a response header? + if (status->header.getFormat() == PDUFormatType::RSP) { + dispatch(peerId); delete status; m_status.erase(peerId); - return false; + return true; + } + + if (status->header.getSAP() != PDUSAP::EXT_ADDR && + status->header.getFormat() != PDUFormatType::UNCONFIRMED) { + m_suSendSeq[status->llId] = 0U; } + + LogMessage(LOG_NET, "P25, Data Call Start, peer = %u, llId = %u, streamId = %u, external = %u", peerId, status->llId, streamId, external); + return true; } ::memcpy(status->netPDU + status->dataOffset, data + 24U, blockLength); @@ -191,7 +189,30 @@ bool P25PacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t pee status->netPDUCount++; status->dataBlockCnt++; - if (status->dataBlockCnt >= status->header.getBlocksToFollow()) { + if (status->hasRxHeader && (status->dataBlockCnt >= status->header.getBlocksToFollow())) { + // is the source ID a blacklisted ID? + lookups::RadioId rid = m_network->m_ridLookup->find(status->header.getLLId()); + if (!rid.radioDefault()) { + if (!rid.radioEnabled()) { + // report error event to InfluxDB + if (m_network->m_enableInfluxDB) { + influxdb::QueryBuilder() + .meas("call_error_event") + .tag("peerId", std::to_string(peerId)) + .tag("streamId", std::to_string(streamId)) + .tag("srcId", std::to_string(status->header.getLLId())) + .tag("dstId", std::to_string(status->header.getLLId())) + .field("message", INFLUXDB_ERRSTR_DISABLED_SRC_RID) + .timestamp(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count()) + .requestAsync(m_network->m_influxServer); + } + + delete status; + m_status.erase(peerId); + return false; + } + } + uint32_t blocksToFollow = status->header.getBlocksToFollow(); uint32_t offset = 0U; uint32_t dataOffset = 0U; @@ -297,7 +318,7 @@ bool P25PacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t pee } // dispatch the PDU data - if (status->dataBlockCnt > 0U) { + if (status->dataBlockCnt > 0U && status->hasRxHeader) { dispatch(peerId); } @@ -482,6 +503,11 @@ void P25PacketData::dispatch(uint32_t peerId) { RxStatus* status = m_status[peerId]; + if (status == nullptr) { + LogError(LOG_NET, P25_PDU_STR ", illegal PDU packet state, status shouldn't be null"); + return; + } + bool crcValid = false; if (status->header.getBlocksToFollow() > 0U) { if (status->pduUserDataLength < 4U) { diff --git a/src/fne/network/callhandler/packetdata/P25PacketData.h b/src/fne/network/callhandler/packetdata/P25PacketData.h index a1d3848f..f33d8097 100644 --- a/src/fne/network/callhandler/packetdata/P25PacketData.h +++ b/src/fne/network/callhandler/packetdata/P25PacketData.h @@ -119,6 +119,7 @@ namespace network p25::data::DataBlock* blockData; p25::data::DataHeader header; + bool hasRxHeader; bool extendedAddress; uint32_t dataOffset; uint8_t dataBlockCnt; @@ -137,6 +138,7 @@ namespace network peerId(0U), blockData(nullptr), header(), + hasRxHeader(false), extendedAddress(false), dataOffset(0U), dataBlockCnt(0U), @@ -158,9 +160,12 @@ namespace network */ ~RxStatus() { - delete[] blockData; - delete[] netPDU; - delete[] pduUserData; + if (blockData != nullptr) + delete[] blockData; + if (netPDU != nullptr) + delete[] netPDU; + if (pduUserData != nullptr) + delete[] pduUserData; } }; typedef std::pair StatusMapPair;