From 752213d48ee122f5311a01b9f0380e75e6794b0c Mon Sep 17 00:00:00 2001 From: Bryan Biedenkapp Date: Wed, 14 Jan 2026 21:27:55 -0500 Subject: [PATCH] (this first part is subject to Git revert if it becomes problematic, but because this is a dev branch I am gonna roll with it) revert FrameQueue timestamp list/map changes back to pre-R04J32, the original implementation was far cleaner and faster with O(1) speed for timestamp lookup vs the O(n) lookup, additionally, the newer R04J32+ implementation introduces a pointer that can be dereferenced incorrectly causing a edge case crash; fix the implementation for handling the mutex for timestamp map locking, the implementation used a incorrect instance mutex which could in high-traffic conditions cause a race condition and ultimately a crash; --- src/common/network/FrameQueue.cpp | 69 +++++++++++-------------------- src/common/network/FrameQueue.h | 24 +++-------- 2 files changed, 29 insertions(+), 64 deletions(-) diff --git a/src/common/network/FrameQueue.cpp b/src/common/network/FrameQueue.cpp index d2587138..8799e794 100644 --- a/src/common/network/FrameQueue.cpp +++ b/src/common/network/FrameQueue.cpp @@ -28,7 +28,8 @@ using namespace network::frame; // Static Class Members // --------------------------------------------------------------------------- -std::vector FrameQueue::m_streamTimestamps; +std::mutex FrameQueue::s_timestampMtx; +std::unordered_map FrameQueue::s_streamTimestamps; // --------------------------------------------------------------------------- // Public Class Members @@ -37,8 +38,7 @@ std::vector FrameQueue::m_streamTimestamps; /* Initializes a new instance of the FrameQueue class. */ FrameQueue::FrameQueue(udp::Socket* socket, uint32_t peerId, bool debug) : RawFrameQueue(socket, debug), - m_peerId(peerId), - m_timestampMtx() + m_peerId(peerId) { assert(peerId < 999999999U); } @@ -218,8 +218,8 @@ void FrameQueue::enqueueMessage(udp::BufferQueue* queue, const uint8_t* message, void FrameQueue::clearTimestamps() { - std::lock_guard lock(m_timestampMtx); - m_streamTimestamps.clear(); + std::lock_guard lock(s_timestampMtx); + s_streamTimestamps.clear(); } // --------------------------------------------------------------------------- @@ -228,59 +228,36 @@ void FrameQueue::clearTimestamps() /* Search for a timestamp entry by stream ID. */ -FrameQueue::Timestamp* FrameQueue::findTimestamp(uint32_t streamId) +uint32_t FrameQueue::findTimestamp(uint32_t streamId) { - std::lock_guard lock(m_timestampMtx); - for (size_t i = 0; i < m_streamTimestamps.size(); i++) { - if (m_streamTimestamps[i].streamId == streamId) - return &m_streamTimestamps[i]; + std::lock_guard lock(s_timestampMtx); + auto it = s_streamTimestamps.find(streamId); + if (it != s_streamTimestamps.end()) { + return it->second; } - return nullptr; + return INVALID_TS; } -/* Insert a timestamp for a stream ID. */ +/* Insert/update a timestamp for a stream ID. */ -void FrameQueue::insertTimestamp(uint32_t streamId, uint32_t timestamp) +void FrameQueue::setTimestamp(uint32_t streamId, uint32_t timestamp) { - std::lock_guard lock(m_timestampMtx); + std::lock_guard lock(s_timestampMtx); if (streamId == 0U || timestamp == INVALID_TS) { - LogError(LOG_NET, "FrameQueue::insertTimestamp(), invalid streamId or timestamp"); + LogError(LOG_NET, "FrameQueue::setTimestamp(), invalid streamId or timestamp"); return; } - Timestamp entry = { streamId, timestamp }; - m_streamTimestamps.push_back(entry); -} - -/* Update a timestamp for a stream ID. */ - -void FrameQueue::updateTimestamp(uint32_t streamId, uint32_t timestamp) -{ - std::lock_guard lock(m_timestampMtx); - if (streamId == 0U || timestamp == INVALID_TS) { - LogError(LOG_NET, "FrameQueue::updateTimestamp(), invalid streamId or timestamp"); - return; - } - - // find the timestamp entry and update it - for (size_t i = 0; i < m_streamTimestamps.size(); i++) { - if (m_streamTimestamps[i].streamId == streamId) { - m_streamTimestamps[i].timestamp = timestamp; - break; - } - } + s_streamTimestamps[streamId] = timestamp; } /* Erase a timestamp for a stream ID. */ void FrameQueue::eraseTimestamp(uint32_t streamId) { - std::lock_guard lock(m_timestampMtx); - m_streamTimestamps.erase( - std::remove_if(m_streamTimestamps.begin(), m_streamTimestamps.end(), - [streamId](const Timestamp& entry) { return entry.streamId == streamId; }), - m_streamTimestamps.end()); + std::lock_guard lock(s_timestampMtx); + s_streamTimestamps.erase(streamId); } /* Generate RTP message for the frame queue. */ @@ -300,8 +277,8 @@ uint8_t* FrameQueue::generateMessage(const uint8_t* message, uint32_t length, ui uint32_t timestamp = INVALID_TS; if (streamId != 0U) { auto entry = findTimestamp(streamId); - if (entry != nullptr) { - timestamp = entry->timestamp; + if (entry != INVALID_TS) { + timestamp = entry; } if (timestamp != INVALID_TS) { @@ -309,7 +286,7 @@ uint8_t* FrameQueue::generateMessage(const uint8_t* message, uint32_t length, ui timestamp += (RTP_GENERIC_CLOCK_RATE / 133); if (m_debug) LogDebugEx(LOG_NET, "FrameQueue::generateMessage()", "RTP streamId = %u, previous TS = %u, TS = %u, rtpSeq = %u", streamId, prevTimestamp, timestamp, rtpSeq); - updateTimestamp(streamId, timestamp); + setTimestamp(streamId, timestamp); } } @@ -332,14 +309,14 @@ uint8_t* FrameQueue::generateMessage(const uint8_t* message, uint32_t length, ui timestamp = (uint32_t)system_clock::ntp::now(); header.setTimestamp(timestamp); - insertTimestamp(streamId, timestamp); + setTimestamp(streamId, timestamp); } header.encode(buffer); if (streamId != 0U && rtpSeq == RTP_END_OF_CALL_SEQ) { auto entry = findTimestamp(streamId); - if (entry != nullptr) { + if (entry != INVALID_TS) { if (m_debug) LogDebugEx(LOG_NET, "FrameQueue::generateMessage()", "RTP streamId = %u, rtpSeq = %u", streamId, rtpSeq); eraseTimestamp(streamId); diff --git a/src/common/network/FrameQueue.h b/src/common/network/FrameQueue.h index 4db7a933..f1f63481 100644 --- a/src/common/network/FrameQueue.h +++ b/src/common/network/FrameQueue.h @@ -45,11 +45,6 @@ namespace network class HOST_SW_API FrameQueue : public RawFrameQueue { public: typedef std::pair OpcodePair; public: - typedef struct { - uint32_t streamId; - uint32_t timestamp; - } Timestamp; - auto operator=(FrameQueue&) -> FrameQueue& = delete; auto operator=(FrameQueue&&) -> FrameQueue& = delete; FrameQueue(FrameQueue&) = delete; @@ -126,28 +121,21 @@ namespace network private: uint32_t m_peerId; - std::mutex m_timestampMtx; - - static std::vector m_streamTimestamps; + static std::mutex s_timestampMtx; + static std::unordered_map s_streamTimestamps; /** * @brief Search for a timestamp entry by stream ID. * @param streamId Stream ID to find. - * @return Timestamp* Table entry. - */ - Timestamp* findTimestamp(uint32_t streamId); - /** - * @brief Insert a timestamp for a stream ID. - * @param streamId Stream ID. - * @param timestamp Timestamp. + * @return uint32_t Table entry. */ - void insertTimestamp(uint32_t streamId, uint32_t timestamp); + uint32_t findTimestamp(uint32_t streamId); /** - * @brief Update a timestamp for a stream ID. + * @brief Insert/update a timestamp for a stream ID. * @param streamId Stream ID. * @param timestamp Timestamp. */ - void updateTimestamp(uint32_t streamId, uint32_t timestamp); + void setTimestamp(uint32_t streamId, uint32_t timestamp); /** * @brief Erase a timestamp for a stream ID. * @param streamId Stream ID.