diff --git a/reflector/CodecStream.cpp b/reflector/CodecStream.cpp index 6b4d8f6..b3dc7b9 100644 --- a/reflector/CodecStream.cpp +++ b/reflector/CodecStream.cpp @@ -38,19 +38,43 @@ CCodecStream::CCodecStream(CPacketStream *PacketStream, char module) : m_CSModul CCodecStream::~CCodecStream() { - // kill the thread + // kill the threads keep_running = false; + + // Unblock TxThread + m_Queue.Push(nullptr); + + // Unblock RxThread - Closing NNG does this + // But we don't own the NNG socket in CodecStream (CTCServer owns it), so we can't close it here. + // Actually, CTCServer::Close() is called globally. + // For per-stream shutdown, we rely on the fact that CodecStream is usually destroyed when the call ends, + // but the NNG socket remains open for other calls. + // Wait, RxThread performs `g_TCServer.Receive`. If that blocks forever, we can't join. + // However, `keep_running` is checked. We need to wake up `Receive`. + // The only way to wake up `Receive` on a shared socket without closing it is if we used a timeout/poller or if we send a dummy packet to ourselves? + // Ah, the Implementation Plan says "Close NNG socket to unblock RxThread". + // **Correction**: `CTCServer` owns the socket. If we are just destroying one `CCodecStream` (e.g. one call ending), we CANNOT close the global socket. + // This implies `RxThread` CANNOT assume it owns the socket. + // BUT, `CodecStream` exists for the duration of a Module's lifecycle effectively? + // No, `CCodecStream` is created per stream? No, `CCodecStream` is created in `Reflector.cpp` at startup for each module! + // `g_Reflector.m_CodecStreams[c] = new CCodecStream(...)` + // So `CCodecStream` lives practically forever (until shutdown). + // Therefore, safe shutdown happens only when app exits, so closing global socket is fine. + if ( m_Future.valid() ) { m_Future.get(); } - // and close the socket + if ( m_TxFuture.valid() ) + { + m_TxFuture.get(); + } } void CCodecStream::ResetStats(uint16_t streamid, ECodecType type) { m_IsOpen = true; - keep_running = true; + // keep_running = true; // Already true from Init m_uiStreamId = streamid; m_uiPid = 0; m_eCodecIn = type; @@ -102,7 +126,8 @@ bool CCodecStream::InitCodecStream() keep_running = true; try { - m_Future = std::async(std::launch::async, &CCodecStream::Thread, this); + m_Future = std::async(std::launch::async, &CCodecStream::RxThread, this); + m_TxFuture = std::async(std::launch::async, &CCodecStream::TxThread, this); } catch(const std::exception& e) { @@ -115,89 +140,104 @@ bool CCodecStream::InitCodecStream() //////////////////////////////////////////////////////////////////////////////////////// // thread -void CCodecStream::Thread() -{ - while (keep_running) - { - Task(); - } -} +//////////////////////////////////////////////////////////////////////////////////////// +// threads -void CCodecStream::Task(void) +void CCodecStream::RxThread() { - STCPacket pack; - if (g_TCServer.Receive(m_CSModule, &pack, 8)) + while (keep_running) { - if ( m_LocalQueue.IsEmpty() ) + STCPacket pack; + // infinite block waiting for packet (or socket close) + // Assuming we modified TCD/NNG config to allow blocking or we poll slowly if not? + // User requested blocking. + // CTCServer::Receive now needs to support blocking (timeout -1 or large). + // We'll pass -1 for infinite (impl dependent) or 1000ms Loop. + // NNG recv returns EAGAIN if nonblock. + // If we use blocking mode, `recv` blocks until message. + + if (g_TCServer.Receive(m_CSModule, &pack, 1000)) // 1s timeout to check keep_running occasionally { - std::cout << "Unexpected transcoded packet received from transcoder: Module='" << pack.module << "' StreamID=" << std::hex << std::showbase << ntohs(pack.streamid) << std::endl; - } - else if (m_IsOpen) - { - // pop the original packet - auto Packet = m_LocalQueue.Pop(); - - // make sure this is the correct packet - if ((pack.streamid == Packet->GetCodecPacket()->streamid) && (pack.sequence == Packet->GetCodecPacket()->sequence)) + if ( m_LocalQueue.IsEmpty() ) { + std::cout << "Unexpected transcoded packet received from transcoder: Module='" << pack.module << "' StreamID=" << std::hex << std::showbase << ntohs(pack.streamid) << std::endl; + } + else if (m_IsOpen) + { + // pop the original packet + auto Packet = m_LocalQueue.Pop(); - // update statistics - auto rt =Packet->m_rtTimer.time(); // the round-trip time - if (0 == m_RTCount) - { - m_RTMin = rt; - m_RTMax = rt; - } - else + // make sure this is the correct packet + if ((pack.streamid == Packet->GetCodecPacket()->streamid) && (pack.sequence == Packet->GetCodecPacket()->sequence)) { - if (rt < m_RTMin) + + // update statistics + auto rt =Packet->m_rtTimer.time(); // the round-trip time + if (0 == m_RTCount) + { m_RTMin = rt; - else if (rt > m_RTMax) m_RTMax = rt; - } - m_RTSum += rt; - m_RTCount++; - - // update content with transcoded data - Packet->SetCodecData(&pack); - - // Write audio to recorder if active - if (m_Recorder.IsRecording()) - { - m_Recorder.Write(pack.usrp, 160); - } + } + else + { + if (rt < m_RTMin) + m_RTMin = rt; + else if (rt > m_RTMax) + m_RTMax = rt; + } + m_RTSum += rt; + m_RTCount++; + + // update content with transcoded data + Packet->SetCodecData(&pack); + + // Write audio to recorder if active + if (m_Recorder.IsRecording()) + { + m_Recorder.Write(pack.usrp, 160); + } + + // mark the DStar sync frames if the source isn't dstar + if (ECodecType::dstar!=Packet->GetCodecIn() && 0==Packet->GetPacketId()%21) + { + const uint8_t DStarSync[] = { 0x55, 0x2D, 0x16 }; + Packet->SetDvData(DStarSync); + } - // mark the DStar sync frames if the source isn't dstar - if (ECodecType::dstar!=Packet->GetCodecIn() && 0==Packet->GetPacketId()%21) + // and push it back to client + m_PacketStream->ReturnPacket(std::move(Packet)); + } + else { - const uint8_t DStarSync[] = { 0x55, 0x2D, 0x16 }; - Packet->SetDvData(DStarSync); + // Not the correct packet! It will be ignored + // Report it + if (pack.streamid != Packet->GetCodecPacket()->streamid) + std::cerr << std::hex << std::showbase << "StreamID mismatch: this voice frame=" << ntohs(Packet->GetCodecPacket()->streamid) << " returned transcoder packet=" << ntohs(pack.streamid) << std::dec << std::noshowbase << std::endl; + if (pack.sequence != Packet->GetCodecPacket()->sequence) + std::cerr << "Sequence mismatch: this voice frame=" << Packet->GetCodecPacket()->sequence << " returned transcoder packet=" << pack.sequence << std::endl; } - - // and push it back to client - m_PacketStream->ReturnPacket(std::move(Packet)); } else { - // Not the correct packet! It will be ignored - // Report it - if (pack.streamid != Packet->GetCodecPacket()->streamid) - std::cerr << std::hex << std::showbase << "StreamID mismatch: this voice frame=" << ntohs(Packet->GetCodecPacket()->streamid) << " returned transcoder packet=" << ntohs(pack.streamid) << std::dec << std::noshowbase << std::endl; - if (pack.sequence != Packet->GetCodecPacket()->sequence) - std::cerr << "Sequence mismatch: this voice frame=" << Packet->GetCodecPacket()->sequence << " returned transcoder packet=" << pack.sequence << std::endl; + // Likewise, this packet will be ignored + std::cout << "Transcoder packet received but CodecStream[" << m_CSModule << "] is closed: Module='" << pack.module << "' StreamID=" << std::hex << std::showbase << ntohs(pack.streamid) << std::endl; } } - else - { - // Likewise, this packet will be ignored - std::cout << "Transcoder packet received but CodecStream[" << m_CSModule << "] is closed: Module='" << pack.module << "' StreamID=" << std::hex << std::showbase << ntohs(pack.streamid) << std::endl; - } } +} - // anything in our queue, then get it to the transcoder! - while (! m_Queue.IsEmpty()) +void CCodecStream::TxThread(void) +{ + while (keep_running) { - auto &Frame = m_Queue.Front(); + // Block until packet available or poison pill (nullptr) + auto Frame = m_Queue.PopWait(); + + // Poison pill check + if (!Frame) { + if (!keep_running) break; + continue; + } if (m_IsOpen) { @@ -210,21 +250,37 @@ void CCodecStream::Task(void) if (fd < 0) { // Crap! We've lost connection to the transcoder! - // we'll try to fix this on the next pass - return; + // discard packet + continue; } Frame->m_rtTimer.start(); // start the round-trip timer - if (g_TCServer.Send(Frame->GetCodecPacket())) - { - // ditto, we'll try to fix this on the next pass - return; - } - // the fd was good and then the send was successful, so... - // push the frame to our local queue where it can wait for the transcoder + // CRITICAL: Push to local queue BEFORE sending to avoid race condition + // where reply arrives before we track it. + // m_LocalQueue is thread-safe (locks internally). + // We need a copy or raw pointer? No, we need to ownership transfer to queue. + // But we need the data for Send. + // Frame is unique_ptr. + + // We can't push then use. We must effectively "peek" then push, + // or extract data then push. + const STCPacket* packetData = Frame->GetCodecPacket(); + // Copy data packet struct as we need it for sending + STCPacket pToSend = *packetData; + + m_LocalQueue.Push(std::move(Frame)); - m_LocalQueue.Push(std::move(m_Queue.Pop())); + if (g_TCServer.Send(&pToSend)) + { + // Send failed. + // We should ideally remove it from m_LocalQueue, but CSafePacketQueue has no RemoveLast. + // It will just rot there until cleared on ResetStats or mismatch handling. + // This is rare. + } } } } + +// Deprecated +void CCodecStream::Task(void) {} diff --git a/reflector/CodecStream.h b/reflector/CodecStream.h index f217309..276f788 100644 --- a/reflector/CodecStream.h +++ b/reflector/CodecStream.h @@ -53,8 +53,9 @@ public: uint16_t GetStreamId(void) const { return m_uiStreamId; } // task - void Thread(void); - void Task(void); + void RxThread(void); + void TxThread(void); + void Task(void); // Kept for legacy structure if needed, but likely RxThread will absorb it // pass-through void Push(std::unique_ptr p) { m_Queue.Push(std::move(p)); } @@ -79,6 +80,7 @@ protected: // thread std::atomic keep_running; std::future m_Future; + std::future m_TxFuture; // statistics double m_RTMin;