fix a thread concurrency issue when modifying the m_status unordered maps, instead of utilizing erase and checking if entries don't exist add a activeCall flag and check that, this resolves a possible dual access issue with find_if and erase that could result in an FNE crash;

82-dvmbridge---implement-notch-filter-for-2175hz-trc-guard-tone
Bryan Biedenkapp 1 year ago
parent 3b395a99cf
commit 7979b6ff40

@ -141,19 +141,33 @@ bool TagDMRData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
}
RxStatus status;
auto it = std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) { return (x.second.dstId == dstId && x.second.slotNo == slotNo); });
if (it == m_status.end()) {
LogError(LOG_NET, "DMR, tried to end call for non-existent call in progress?, peer = %u, srcId = %u, dstId = %u, streamId = %u, external = %u",
peerId, srcId, dstId, streamId, external);
}
else {
status = it->second;
{
auto it = std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) {
if (x.second.dstId == dstId && x.second.slotNo == slotNo) {
return true;
}
return false;
});
if (it == m_status.end()) {
LogError(LOG_NET, "DMR, tried to end call for non-existent call in progress?, peer = %u, srcId = %u, dstId = %u, streamId = %u, external = %u",
peerId, srcId, dstId, streamId, external);
}
else {
status = it->second;
}
}
uint64_t duration = hrc::diff(pktTime, status.callStartTime);
if (std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) { return (x.second.dstId == dstId && x.second.slotNo == slotNo); }) != m_status.end()) {
m_status.erase(dstId);
auto it = std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) {
if (x.second.dstId == dstId && x.second.slotNo == slotNo) {
if (x.second.activeCall)
return true;
}
return false;
});
if (it != m_status.end()) {
m_status[dstId].reset();
// is this a parrot talkgroup? if so, clear any remaining frames from the buffer
lookups::TalkgroupRuleGroupVoice tg = m_network->m_tidLookup->find(dstId);
@ -194,7 +208,13 @@ bool TagDMRData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
return false;
}
auto it = std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) { return (x.second.dstId == dstId && x.second.slotNo == slotNo); });
auto it = std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) {
if (x.second.dstId == dstId && x.second.slotNo == slotNo) {
if (x.second.activeCall)
return true;
}
return false;
});
if (it != m_status.end()) {
RxStatus status = it->second;
if (streamId != status.streamId) {
@ -202,7 +222,7 @@ bool TagDMRData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
uint64_t lastPktDuration = hrc::diff(hrc::now(), status.lastPacket);
if ((lastPktDuration / 1000) > CALL_COLL_TIMEOUT) {
LogWarning(LOG_NET, "DMR, Call Collision, lasted more then %us with no further updates, forcibly ending call");
m_status.erase(dstId);
m_status[dstId].reset();
m_network->m_callInProgress = false;
}
@ -228,14 +248,14 @@ bool TagDMRData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
}
// this is a new call stream
RxStatus status = RxStatus();
status.callStartTime = pktTime;
status.srcId = srcId;
status.dstId = dstId;
status.slotNo = slotNo;
status.streamId = streamId;
status.peerId = peerId;
m_status[dstId] = status; // this *could* be an issue if a dstId appears on both slots somehow...
// bryanb: this could be problematic and is naive, if a dstId appears on both slots (which shouldn't happen)
m_status[dstId].callStartTime = pktTime;
m_status[dstId].srcId = srcId;
m_status[dstId].dstId = dstId;
m_status[dstId].slotNo = slotNo;
m_status[dstId].streamId = streamId;
m_status[dstId].peerId = peerId;
m_status[dstId].activeCall = true;
LogMessage(LOG_NET, "DMR, Call Start, peer = %u, srcId = %u, dstId = %u, streamId = %u, external = %u", peerId, srcId, dstId, streamId, external);
@ -367,7 +387,14 @@ bool TagDMRData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
bool TagDMRData::processGrantReq(uint32_t srcId, uint32_t dstId, uint8_t slot, bool unitToUnit, uint32_t peerId, uint16_t pktSeq, uint32_t streamId)
{
// if we have an Rx status for the destination deny the grant
if (std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) { return x.second.dstId == dstId; }) != m_status.end()) {
auto it = std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) {
if (x.second.dstId == dstId/* && x.second.slotNo == slot*/) {
if (x.second.activeCall)
return true;
}
return false;
});
if (it != m_status.end()) {
return false;
}

@ -120,13 +120,31 @@ namespace network
uint8_t* buffer;
uint32_t bufferLen;
/**
* @brief DMR slot number.
*/
uint8_t slotNo;
/**
* @brief RTP Packet Sequence.
*/
uint16_t pktSeq;
/**
* @brief Call Stream ID.
*/
uint32_t streamId;
/**
* @brief Peer ID.
*/
uint32_t peerId;
/**
* @brief Source ID.
*/
uint32_t srcId;
/**
* @brief Destination ID.
*/
uint32_t dstId;
};
std::deque<ParrotFrame> m_parrotFrames;
@ -139,11 +157,43 @@ namespace network
public:
system_clock::hrc::hrc_t callStartTime;
system_clock::hrc::hrc_t lastPacket;
/**
* @brief Source ID.
*/
uint32_t srcId;
/**
* @brief Destination ID.
*/
uint32_t dstId;
/**
* @brief DMR slot number.
*/
uint8_t slotNo;
/**
* @brief Call Stream ID.
*/
uint32_t streamId;
/**
* @brief Peer ID.
*/
uint32_t peerId;
/**
* @brief Flag indicating this call is active with traffic currently in progress.
*/
bool activeCall;
/**
* @brief Helper to reset call status.
*/
void reset()
{
srcId = 0U;
dstId = 0U;
slotNo = 0U;
streamId = 0U;
peerId = 0U;
activeCall = false;
}
};
typedef std::pair<const uint32_t, RxStatus> StatusMapPair;
std::unordered_map<uint32_t, RxStatus> m_status;

@ -113,8 +113,15 @@ bool TagNXDNData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerI
RxStatus status = m_status[dstId];
uint64_t duration = hrc::diff(pktTime, status.callStartTime);
if (std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) { return x.second.dstId == dstId; }) != m_status.end()) {
m_status.erase(dstId);
auto it = std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) {
if (x.second.dstId == dstId) {
if (x.second.activeCall)
return true;
}
return false;
});
if (it != m_status.end()) {
m_status[dstId].reset();
// is this a parrot talkgroup? if so, clear any remaining frames from the buffer
lookups::TalkgroupRuleGroupVoice tg = m_network->m_tidLookup->find(dstId);
@ -154,7 +161,13 @@ bool TagNXDNData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerI
return false;
}
auto it = std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) { return x.second.dstId == dstId; });
auto it = std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) {
if (x.second.dstId == dstId) {
if (x.second.activeCall)
return true;
}
return false;
});
if (it != m_status.end()) {
RxStatus status = m_status[dstId];
if (streamId != status.streamId) {
@ -162,7 +175,7 @@ bool TagNXDNData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerI
uint64_t lastPktDuration = hrc::diff(hrc::now(), status.lastPacket);
if ((lastPktDuration / 1000) > CALL_COLL_TIMEOUT) {
LogWarning(LOG_NET, "NXDN, Call Collision, lasted more then %us with no further updates, forcibly ending call");
m_status.erase(dstId);
m_status[dstId].reset();
m_network->m_callInProgress = false;
}
@ -188,13 +201,12 @@ bool TagNXDNData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerI
}
// this is a new call stream
RxStatus status = RxStatus();
status.callStartTime = pktTime;
status.srcId = srcId;
status.dstId = dstId;
status.streamId = streamId;
status.peerId = peerId;
m_status[dstId] = status;
m_status[dstId].callStartTime = pktTime;
m_status[dstId].srcId = srcId;
m_status[dstId].dstId = dstId;
m_status[dstId].streamId = streamId;
m_status[dstId].peerId = peerId;
m_status[dstId].activeCall = true;
LogMessage(LOG_NET, "NXDN, Call Start, peer = %u, srcId = %u, dstId = %u, streamId = %u, external = %u", peerId, srcId, dstId, streamId, external);
@ -320,7 +332,14 @@ bool TagNXDNData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerI
bool TagNXDNData::processGrantReq(uint32_t srcId, uint32_t dstId, bool unitToUnit, uint32_t peerId, uint16_t pktSeq, uint32_t streamId)
{
// if we have an Rx status for the destination deny the grant
if (std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) { return x.second.dstId == dstId; }) != m_status.end()) {
auto it = std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) {
if (x.second.dstId == dstId) {
if (x.second.activeCall)
return true;
}
return false;
});
if (it != m_status.end()) {
return false;
}

@ -94,11 +94,26 @@ namespace network
uint8_t* buffer;
uint32_t bufferLen;
/**
* @brief RTP Packet Sequence.
*/
uint16_t pktSeq;
/**
* @brief Call Stream ID.
*/
uint32_t streamId;
/**
* @brief Peer ID.
*/
uint32_t peerId;
/**
* @brief Source ID.
*/
uint32_t srcId;
/**
* @brief Destination ID.
*/
uint32_t dstId;
};
std::deque<ParrotFrame> m_parrotFrames;
@ -111,10 +126,38 @@ namespace network
public:
system_clock::hrc::hrc_t callStartTime;
system_clock::hrc::hrc_t lastPacket;
/**
* @brief Source ID.
*/
uint32_t srcId;
/**
* @brief Destination ID.
*/
uint32_t dstId;
/**
* @brief Call Stream ID.
*/
uint32_t streamId;
/**
* @brief Peer ID.
*/
uint32_t peerId;
/**
* @brief Flag indicating this call is active with traffic currently in progress.
*/
bool activeCall;
/**
* @brief Helper to reset call status.
*/
void reset()
{
srcId = 0U;
dstId = 0U;
streamId = 0U;
peerId = 0U;
activeCall = false;
}
};
typedef std::pair<const uint32_t, RxStatus> StatusMapPair;
std::unordered_map<uint32_t, RxStatus> m_status;

@ -178,14 +178,21 @@ bool TagP25Data::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
}
}
if (std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) { return x.second.dstId == dstId; }) != m_status.end()) {
auto it = std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) {
if (x.second.dstId == dstId) {
if (x.second.activeCall)
return true;
}
return false;
});
if (it != m_status.end()) {
if (grantDemand) {
LogWarning(LOG_NET, "P25, Call Collision, peer = %u, srcId = %u, dstId = %u, streamId = %u, rxPeer = %u, rxSrcId = %u, rxDstId = %u, rxStreamId = %u, external = %u",
peerId, srcId, dstId, streamId, status.peerId, status.srcId, status.dstId, status.streamId, external);
return false;
}
else {
m_status.erase(dstId);
m_status[dstId].reset();
// is this a parrot talkgroup? if so, clear any remaining frames from the buffer
lookups::TalkgroupRuleGroupVoice tg = m_network->m_tidLookup->find(dstId);
@ -227,7 +234,13 @@ bool TagP25Data::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
return false;
}
auto it = std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) { return x.second.dstId == dstId; });
auto it = std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) {
if (x.second.dstId == dstId) {
if (x.second.activeCall)
return true;
}
return false;
});
if (it != m_status.end()) {
RxStatus status = m_status[dstId];
if (streamId != status.streamId && ((duid != DUID::TDU) && (duid != DUID::TDULC))) {
@ -235,7 +248,7 @@ bool TagP25Data::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
uint64_t lastPktDuration = hrc::diff(hrc::now(), status.lastPacket);
if ((lastPktDuration / 1000) > CALL_COLL_TIMEOUT) {
LogWarning(LOG_NET, "P25, Call Collision, lasted more then %us with no further updates, forcibly ending call");
m_status.erase(dstId);
m_status[dstId].reset();
m_network->m_callInProgress = false;
}
@ -261,13 +274,12 @@ bool TagP25Data::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
}
// this is a new call stream
RxStatus status = RxStatus();
status.callStartTime = pktTime;
status.srcId = srcId;
status.dstId = dstId;
status.streamId = streamId;
status.peerId = peerId;
m_status[dstId] = status;
m_status[dstId].callStartTime = pktTime;
m_status[dstId].srcId = srcId;
m_status[dstId].dstId = dstId;
m_status[dstId].streamId = streamId;
m_status[dstId].peerId = peerId;
m_status[dstId].activeCall = true;
LogMessage(LOG_NET, "P25, Call Start, peer = %u, srcId = %u, dstId = %u, streamId = %u, external = %u", peerId, srcId, dstId, streamId, external);
@ -406,7 +418,14 @@ bool TagP25Data::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
bool TagP25Data::processGrantReq(uint32_t srcId, uint32_t dstId, bool unitToUnit, uint32_t peerId, uint16_t pktSeq, uint32_t streamId)
{
// if we have an Rx status for the destination deny the grant
if (std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) { return x.second.dstId == dstId; }) != m_status.end()) {
auto it = std::find_if(m_status.begin(), m_status.end(), [&](StatusMapPair x) {
if (x.second.dstId == dstId) {
if (x.second.activeCall)
return true;
}
return false;
});
if (it != m_status.end()) {
return false;
}

@ -142,11 +142,26 @@ namespace network
uint8_t* buffer;
uint32_t bufferLen;
/**
* @brief RTP Packet Sequence.
*/
uint16_t pktSeq;
/**
* @brief Call Stream ID.
*/
uint32_t streamId;
/**
* @brief Peer ID.
*/
uint32_t peerId;
/**
* @brief Source ID.
*/
uint32_t srcId;
/**
* @brief Destination ID.
*/
uint32_t dstId;
};
std::deque<ParrotFrame> m_parrotFrames;
@ -160,16 +175,44 @@ namespace network
public:
system_clock::hrc::hrc_t callStartTime;
system_clock::hrc::hrc_t lastPacket;
/**
* @brief Source ID.
*/
uint32_t srcId;
/**
* @brief Destination ID.
*/
uint32_t dstId;
/**
* @brief Call Stream ID.
*/
uint32_t streamId;
/**
* @brief Peer ID.
*/
uint32_t peerId;
/**
* @brief Flag indicating this call is active with traffic currently in progress.
*/
bool activeCall;
/**
* @brief Helper to reset call status.
*/
void reset()
{
srcId = 0U;
dstId = 0U;
streamId = 0U;
peerId = 0U;
activeCall = false;
}
};
typedef std::pair<const uint32_t, RxStatus> StatusMapPair;
std::unordered_map<uint32_t, RxStatus> m_status;
friend class packetdata::P25PacketData;
packetdata::P25PacketData *m_packetData;
packetdata::P25PacketData* m_packetData;
bool m_debug;

Loading…
Cancel
Save

Powered by TurnKey Linux.