diff --git a/reflector/CodecStream.cpp b/reflector/CodecStream.cpp index 6109fc0..afd7334 100644 --- a/reflector/CodecStream.cpp +++ b/reflector/CodecStream.cpp @@ -25,7 +25,7 @@ //////////////////////////////////////////////////////////////////////////////////////// // constructor -CCodecStream::CCodecStream(CPacketStream *PacketStream) +CCodecStream::CCodecStream(CPacketStream *PacketStream, char module) : m_CSModule(module), m_IsOpen(false) { m_PacketStream = PacketStream; } @@ -47,6 +47,7 @@ CCodecStream::~CCodecStream() void CCodecStream::ResetStats(uint16_t streamid, ECodecType type) { + m_IsOpen = true; keep_running = true; m_uiStreamId = streamid; m_uiPid = 0; @@ -60,6 +61,7 @@ void CCodecStream::ResetStats(uint16_t streamid, ECodecType type) void CCodecStream::ReportStats() { + m_IsOpen = false; // display stats if (m_RTCount > 0) { @@ -68,7 +70,7 @@ void CCodecStream::ReportStats() double ave = 1000.0 * m_RTSum / double(m_RTCount); auto prec = std::cout.precision(); std::cout.precision(1); - std::cout << std::fixed << "TC round-trip time(ms): " << min << "/" << ave << "/" << max << ", " << m_RTCount << " total packets" << std::endl; + std::cout << std::fixed << "TC round-trip time(ms): " << min << '/' << ave << '/' << max << ", " << m_RTCount << " total packets" << std::endl; std::cout.precision(prec); } } @@ -76,16 +78,14 @@ void CCodecStream::ReportStats() //////////////////////////////////////////////////////////////////////////////////////// // initialization -bool CCodecStream::InitCodecStream(char module) +bool CCodecStream::InitCodecStream() { m_TCWriter.SetUp(REF2TC); std::string name(TC2REF); - name.append(1, module); + name.append(1, m_CSModule); if (m_TCReader.Open(name.c_str())) return true; -#ifdef DEBUG - std::cout << "Initialized unix socket " << name << std::endl; -#endif + std::cout << "Initialized CodecStream receive socket " << name << std::endl; keep_running = true; try { @@ -93,7 +93,8 @@ bool CCodecStream::InitCodecStream(char module) } catch(const std::exception& e) { - std::cerr << "Could not start Codec processing on module '" << module << "': " << e.what() << std::endl; + std::cerr << "Could not start Codec processing on module '" << m_CSModule << "': " << e.what() << std::endl; + m_TCReader.Close(); return true; } return false; @@ -136,15 +137,17 @@ void CCodecStream::Task(void) if ( m_LocalQueue.IsEmpty() ) { - std::cout << "Unexpected transcoded packet received from transcoder" << std::endl; + std::cout << "Unexpected transcoded packet received from transcoder: Module='" << pack.module << "' StreamID=" << std::hex << std::showbase << ntohs(pack.streamid) << std::endl; } - else + else if (m_IsOpen) { // pop the original packet auto Packet = m_LocalQueue.Pop(); auto Frame = (CDvFramePacket *)Packet.get(); // do things look okay? + if (pack.module != m_CSModule) + std::cerr << "CodecStream '" << m_CSModule << "' received a transcoded packet from module '" << pack.module << "'" << std::dec << std::noshowbase << std::endl; if (pack.sequence != Frame->GetCodecPacket()->sequence) std::cerr << "Sequence mismatch: this voice frame=" << Frame->GetCodecPacket()->sequence << " returned transcoder packet=" << pack.sequence << std::endl; if (pack.streamid != Frame->GetCodecPacket()->streamid) @@ -162,6 +165,10 @@ void CCodecStream::Task(void) // and push it back to client m_PacketStream->ReturnPacket(std::move(Packet)); } + else + { + std::cout << "Transcoder packet received but CodecStream[" << m_CSModule << "] is closed: Module='" << pack.module << "' StreamID=" << std::hex << std::showbase << ntohs(pack.streamid) << std::endl; + } } // anything in our queue @@ -171,14 +178,18 @@ void CCodecStream::Task(void) // we need a CDvFramePacket pointer to access Frame stuff auto Frame = (CDvFramePacket *)Packet.get(); - // update important stuff in Frame->m_TCPack for the transcoder - Frame->SetTCParams(m_uiTotalPackets++); + if (m_IsOpen) + { + // update important stuff in Frame->m_TCPack for the transcoder + // sets the packet counter, stream id, last_packet, module and start the trip timer + Frame->SetTCParams(m_uiTotalPackets++); - // now send to transcoder - m_TCWriter.Send(Frame->GetCodecPacket()); + // now send to transcoder + m_TCWriter.Send(Frame->GetCodecPacket()); - // push to our local queue where it can wait for the transcoder - m_LocalQueue.Push(std::move(Packet)); + // push to our local queue where it can wait for the transcoder + m_LocalQueue.Push(std::move(Packet)); + } // get the next packet, if there is one Packet = m_Queue.Pop(); diff --git a/reflector/CodecStream.h b/reflector/CodecStream.h index 3b5f3b9..74db476 100644 --- a/reflector/CodecStream.h +++ b/reflector/CodecStream.h @@ -34,8 +34,8 @@ class CCodecStream { public: // constructor - CCodecStream(CPacketStream *packetstream); - bool InitCodecStream(char module); + CCodecStream(CPacketStream *packetstream, char module); + bool InitCodecStream(); void ResetStats(uint16_t streamid, ECodecType codectype); void ReportStats(); @@ -54,7 +54,10 @@ public: void Push(std::unique_ptr p) { m_Queue.Push(std::move(p)); } protected: - // initialization + // identity + const char m_CSModule; + // state + std::atomic m_IsOpen; // data uint16_t m_uiStreamId; uint16_t m_uiPort; @@ -67,6 +70,8 @@ protected: // associated packet stream CPacketStream *m_PacketStream; + + // queues CSafePacketQueue> m_LocalQueue, m_Queue; // thread diff --git a/reflector/PacketStream.cpp b/reflector/PacketStream.cpp index a81c90e..15fb4b5 100644 --- a/reflector/PacketStream.cpp +++ b/reflector/PacketStream.cpp @@ -33,9 +33,9 @@ CPacketStream::CPacketStream(char module) : m_PSModule(module) bool CPacketStream::InitCodecStream() { - m_CodecStream = std::unique_ptr(new CCodecStream(this)); + m_CodecStream = std::unique_ptr(new CCodecStream(this, m_PSModule)); if (m_CodecStream) - return m_CodecStream->InitCodecStream(m_PSModule); + return m_CodecStream->InitCodecStream(); else { std::cerr << "Could not create a CCodecStream for module '" << m_PSModule << "'" << std::endl;