// But we don't own the NNG socket in CodecStream (CTCServer owns it), so we can't close it here.
// Actually, CTCServer::Close() is called globally.
// For per-stream shutdown, we rely on the fact that CodecStream is usually destroyed when the call ends,
// but the NNG socket remains open for other calls.
// Wait, RxThread performs `g_TCServer.Receive`. If that blocks forever, we can't join.
// However, `keep_running` is checked. We need to wake up `Receive`.
// The only way to wake up `Receive` on a shared socket without closing it is if we used a timeout/poller or if we send a dummy packet to ourselves?
// Ah, the Implementation Plan says "Close NNG socket to unblock RxThread".
// **Correction**: `CTCServer` owns the socket. If we are just destroying one `CCodecStream` (e.g. one call ending), we CANNOT close the global socket.
// This implies `RxThread` CANNOT assume it owns the socket.
// BUT, `CodecStream` exists for the duration of a Module's lifecycle effectively?
// No, `CCodecStream` is created per stream? No, `CCodecStream` is created in `Reflector.cpp` at startup for each module!
// `g_Reflector.m_CodecStreams[c] = new CCodecStream(...)`
// So `CCodecStream` lives practically forever (until shutdown).
// Therefore, safe shutdown happens only when app exits, so closing global socket is fine.
// infinite block waiting for packet (or socket close)
// Assuming we modified TCD/NNG config to allow blocking or we poll slowly if not?
// User requested blocking.
// CTCServer::Receive now needs to support blocking (timeout -1 or large).
// We'll pass -1 for infinite (impl dependent) or 1000ms Loop.
// NNG recv returns EAGAIN if nonblock.
// If we use blocking mode, `recv` blocks until message.
if(g_TCServer.Receive(m_CSModule,&pack,1000))// 1s timeout to check keep_running occasionally
{
std::cout<<"Unexpected transcoded packet received from transcoder: Module='"<<pack.module<<"' StreamID="<<std::hex<<std::showbase<<ntohs(pack.streamid)<<std::endl;
std::cout<<"Unexpected transcoded packet received from transcoder: Module='"<<pack.module<<"' StreamID="<<std::hex<<std::showbase<<ntohs(pack.streamid)<<std::endl;
}
elseif(m_IsOpen)
{
// pop the original packet
autoPacket=m_LocalQueue.Pop();
// update statistics
autort=Packet->m_rtTimer.time();// the round-trip time
std::cerr<<std::hex<<std::showbase<<"StreamID mismatch: this voice frame="<<ntohs(Packet->GetCodecPacket()->streamid)<<" returned transcoder packet="<<ntohs(pack.streamid)<<std::dec<<std::noshowbase<<std::endl;
std::cerr<<std::hex<<std::showbase<<"StreamID mismatch: this voice frame="<<ntohs(Packet->GetCodecPacket()->streamid)<<" returned transcoder packet="<<ntohs(pack.streamid)<<std::dec<<std::noshowbase<<std::endl;
std::cerr<<"Sequence mismatch: this voice frame="<<Packet->GetCodecPacket()->sequence<<" returned transcoder packet="<<pack.sequence<<std::endl;
// Likewise, this packet will be ignored
std::cout<<"Transcoder packet received but CodecStream["<<m_CSModule<<"] is closed: Module='"<<pack.module<<"' StreamID="<<std::hex<<std::showbase<<ntohs(pack.streamid)<<std::endl;
}
}
else
{
// Likewise, this packet will be ignored
std::cout<<"Transcoder packet received but CodecStream["<<m_CSModule<<"] is closed: Module='"<<pack.module<<"' StreamID="<<std::hex<<std::showbase<<ntohs(pack.streamid)<<std::endl;
}
}
}
// anything in our queue, then get it to the transcoder!
while(!m_Queue.IsEmpty())
voidCCodecStream::TxThread(void)
{
while(keep_running)
{
auto&Frame=m_Queue.Front();
// Block until packet available or poison pill (nullptr)