(this first part is subject to Git revert if it becomes problematic, but because this is a dev branch I am gonna roll with it) revert FrameQueue timestamp list/map changes back to pre-R04J32, the original implementation was far cleaner and faster with O(1) speed for timestamp lookup vs the O(n) lookup, additionally, the newer R04J32+ implementation introduces a pointer that can be dereferenced incorrectly causing a edge case crash; fix the implementation for handling the mutex for timestamp map locking, the implementation used a incorrect instance mutex which could in high-traffic conditions cause a race condition and ultimately a crash;

r05a04_dev
Bryan Biedenkapp 3 weeks ago
parent 782469d788
commit 752213d48e

@ -28,7 +28,8 @@ using namespace network::frame;
// Static Class Members
// ---------------------------------------------------------------------------
std::vector<FrameQueue::Timestamp> FrameQueue::m_streamTimestamps;
std::mutex FrameQueue::s_timestampMtx;
std::unordered_map<uint32_t, uint32_t> FrameQueue::s_streamTimestamps;
// ---------------------------------------------------------------------------
// Public Class Members
@ -37,8 +38,7 @@ std::vector<FrameQueue::Timestamp> FrameQueue::m_streamTimestamps;
/* Initializes a new instance of the FrameQueue class. */
FrameQueue::FrameQueue(udp::Socket* socket, uint32_t peerId, bool debug) : RawFrameQueue(socket, debug),
m_peerId(peerId),
m_timestampMtx()
m_peerId(peerId)
{
assert(peerId < 999999999U);
}
@ -218,8 +218,8 @@ void FrameQueue::enqueueMessage(udp::BufferQueue* queue, const uint8_t* message,
void FrameQueue::clearTimestamps()
{
std::lock_guard<std::mutex> lock(m_timestampMtx);
m_streamTimestamps.clear();
std::lock_guard<std::mutex> lock(s_timestampMtx);
s_streamTimestamps.clear();
}
// ---------------------------------------------------------------------------
@ -228,59 +228,36 @@ void FrameQueue::clearTimestamps()
/* Search for a timestamp entry by stream ID. */
FrameQueue::Timestamp* FrameQueue::findTimestamp(uint32_t streamId)
uint32_t FrameQueue::findTimestamp(uint32_t streamId)
{
std::lock_guard<std::mutex> lock(m_timestampMtx);
for (size_t i = 0; i < m_streamTimestamps.size(); i++) {
if (m_streamTimestamps[i].streamId == streamId)
return &m_streamTimestamps[i];
std::lock_guard<std::mutex> lock(s_timestampMtx);
auto it = s_streamTimestamps.find(streamId);
if (it != s_streamTimestamps.end()) {
return it->second;
}
return nullptr;
return INVALID_TS;
}
/* Insert a timestamp for a stream ID. */
/* Insert/update a timestamp for a stream ID. */
void FrameQueue::insertTimestamp(uint32_t streamId, uint32_t timestamp)
void FrameQueue::setTimestamp(uint32_t streamId, uint32_t timestamp)
{
std::lock_guard<std::mutex> lock(m_timestampMtx);
std::lock_guard<std::mutex> lock(s_timestampMtx);
if (streamId == 0U || timestamp == INVALID_TS) {
LogError(LOG_NET, "FrameQueue::insertTimestamp(), invalid streamId or timestamp");
LogError(LOG_NET, "FrameQueue::setTimestamp(), invalid streamId or timestamp");
return;
}
Timestamp entry = { streamId, timestamp };
m_streamTimestamps.push_back(entry);
}
/* Update a timestamp for a stream ID. */
void FrameQueue::updateTimestamp(uint32_t streamId, uint32_t timestamp)
{
std::lock_guard<std::mutex> lock(m_timestampMtx);
if (streamId == 0U || timestamp == INVALID_TS) {
LogError(LOG_NET, "FrameQueue::updateTimestamp(), invalid streamId or timestamp");
return;
}
// find the timestamp entry and update it
for (size_t i = 0; i < m_streamTimestamps.size(); i++) {
if (m_streamTimestamps[i].streamId == streamId) {
m_streamTimestamps[i].timestamp = timestamp;
break;
}
}
s_streamTimestamps[streamId] = timestamp;
}
/* Erase a timestamp for a stream ID. */
void FrameQueue::eraseTimestamp(uint32_t streamId)
{
std::lock_guard<std::mutex> lock(m_timestampMtx);
m_streamTimestamps.erase(
std::remove_if(m_streamTimestamps.begin(), m_streamTimestamps.end(),
[streamId](const Timestamp& entry) { return entry.streamId == streamId; }),
m_streamTimestamps.end());
std::lock_guard<std::mutex> lock(s_timestampMtx);
s_streamTimestamps.erase(streamId);
}
/* Generate RTP message for the frame queue. */
@ -300,8 +277,8 @@ uint8_t* FrameQueue::generateMessage(const uint8_t* message, uint32_t length, ui
uint32_t timestamp = INVALID_TS;
if (streamId != 0U) {
auto entry = findTimestamp(streamId);
if (entry != nullptr) {
timestamp = entry->timestamp;
if (entry != INVALID_TS) {
timestamp = entry;
}
if (timestamp != INVALID_TS) {
@ -309,7 +286,7 @@ uint8_t* FrameQueue::generateMessage(const uint8_t* message, uint32_t length, ui
timestamp += (RTP_GENERIC_CLOCK_RATE / 133);
if (m_debug)
LogDebugEx(LOG_NET, "FrameQueue::generateMessage()", "RTP streamId = %u, previous TS = %u, TS = %u, rtpSeq = %u", streamId, prevTimestamp, timestamp, rtpSeq);
updateTimestamp(streamId, timestamp);
setTimestamp(streamId, timestamp);
}
}
@ -332,14 +309,14 @@ uint8_t* FrameQueue::generateMessage(const uint8_t* message, uint32_t length, ui
timestamp = (uint32_t)system_clock::ntp::now();
header.setTimestamp(timestamp);
insertTimestamp(streamId, timestamp);
setTimestamp(streamId, timestamp);
}
header.encode(buffer);
if (streamId != 0U && rtpSeq == RTP_END_OF_CALL_SEQ) {
auto entry = findTimestamp(streamId);
if (entry != nullptr) {
if (entry != INVALID_TS) {
if (m_debug)
LogDebugEx(LOG_NET, "FrameQueue::generateMessage()", "RTP streamId = %u, rtpSeq = %u", streamId, rtpSeq);
eraseTimestamp(streamId);

@ -45,11 +45,6 @@ namespace network
class HOST_SW_API FrameQueue : public RawFrameQueue {
public: typedef std::pair<const NET_FUNC::ENUM, const NET_SUBFUNC::ENUM> OpcodePair;
public:
typedef struct {
uint32_t streamId;
uint32_t timestamp;
} Timestamp;
auto operator=(FrameQueue&) -> FrameQueue& = delete;
auto operator=(FrameQueue&&) -> FrameQueue& = delete;
FrameQueue(FrameQueue&) = delete;
@ -126,28 +121,21 @@ namespace network
private:
uint32_t m_peerId;
std::mutex m_timestampMtx;
static std::vector<Timestamp> m_streamTimestamps;
static std::mutex s_timestampMtx;
static std::unordered_map<uint32_t, uint32_t> s_streamTimestamps;
/**
* @brief Search for a timestamp entry by stream ID.
* @param streamId Stream ID to find.
* @return Timestamp* Table entry.
*/
Timestamp* findTimestamp(uint32_t streamId);
/**
* @brief Insert a timestamp for a stream ID.
* @param streamId Stream ID.
* @param timestamp Timestamp.
* @return uint32_t Table entry.
*/
void insertTimestamp(uint32_t streamId, uint32_t timestamp);
uint32_t findTimestamp(uint32_t streamId);
/**
* @brief Update a timestamp for a stream ID.
* @brief Insert/update a timestamp for a stream ID.
* @param streamId Stream ID.
* @param timestamp Timestamp.
*/
void updateTimestamp(uint32_t streamId, uint32_t timestamp);
void setTimestamp(uint32_t streamId, uint32_t timestamp);
/**
* @brief Erase a timestamp for a stream ID.
* @param streamId Stream ID.

Loading…
Cancel
Save

Powered by TurnKey Linux.