diff --git a/src/common/network/RawFrameQueue.cpp b/src/common/network/RawFrameQueue.cpp index 1f85d1d8..c0f2643f 100644 --- a/src/common/network/RawFrameQueue.cpp +++ b/src/common/network/RawFrameQueue.cpp @@ -12,6 +12,7 @@ #include "network/udp/Socket.h" #include "Log.h" #include "Utils.h" +#include "Thread.h" using namespace network; @@ -22,7 +23,8 @@ using namespace network; // Static Class Members // --------------------------------------------------------------------------- -std::mutex RawFrameQueue::m_flushMutex; +std::mutex RawFrameQueue::m_queueMutex; +bool RawFrameQueue::m_queueFlushing = false; // --------------------------------------------------------------------------- // Public Class Members @@ -114,6 +116,13 @@ void RawFrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, sock assert(message != nullptr); assert(length > 0U); + // if the queue is flushing -- don't attempt to enqueue any messages + if (m_queueFlushing) { + LogWarning(LOG_NET, "RawFrameQueue::enqueueMessage() -- queue is flushing, waiting to enqueue message"); + while (m_queueFlushing) + Thread::sleep(1U); + } + uint8_t* buffer = new uint8_t[length]; ::memset(buffer, 0x00U, length); ::memcpy(buffer, message, length); @@ -135,7 +144,8 @@ void RawFrameQueue::enqueueMessage(const uint8_t* message, uint32_t length, sock bool RawFrameQueue::flushQueue() { bool ret = true; - std::lock_guard lock(m_flushMutex); + std::lock_guard lock(m_queueMutex); + m_queueFlushing = true; if (m_buffers.empty()) { return false; @@ -156,6 +166,7 @@ bool RawFrameQueue::flushQueue() } deleteBuffers(); + m_queueFlushing = false; return ret; } diff --git a/src/common/network/RawFrameQueue.h b/src/common/network/RawFrameQueue.h index f0d6671f..b7317afb 100644 --- a/src/common/network/RawFrameQueue.h +++ b/src/common/network/RawFrameQueue.h @@ -94,7 +94,8 @@ namespace network uint32_t m_addrLen; udp::Socket* m_socket; - static std::mutex m_flushMutex; + static std::mutex m_queueMutex; + static bool m_queueFlushing; udp::BufferVector m_buffers; uint32_t m_failedReadCnt; diff --git a/src/fne/network/FNENetwork.cpp b/src/fne/network/FNENetwork.cpp index 721a231c..527c2176 100644 --- a/src/fne/network/FNENetwork.cpp +++ b/src/fne/network/FNENetwork.cpp @@ -2594,7 +2594,7 @@ bool FNENetwork::writePeerACK(uint32_t peerId, uint32_t streamId, const uint8_t* } return writePeer(peerId, { NET_FUNC::ACK, NET_SUBFUNC::NOP }, buffer, length + 10U, RTP_END_OF_CALL_SEQ, streamId, - false); + false, false, true); } /* Helper to log a warning specifying which NAK reason is being sent a peer. */