added open boolean to control the CodecStream Task()

pull/1/head
Tom Early 3 years ago
parent 73ac08e34b
commit f6138584e0

@ -25,7 +25,7 @@
////////////////////////////////////////////////////////////////////////////////////////
// constructor
CCodecStream::CCodecStream(CPacketStream *PacketStream)
CCodecStream::CCodecStream(CPacketStream *PacketStream, char module) : m_CSModule(module), m_IsOpen(false)
{
m_PacketStream = PacketStream;
}
@ -47,6 +47,7 @@ CCodecStream::~CCodecStream()
void CCodecStream::ResetStats(uint16_t streamid, ECodecType type)
{
m_IsOpen = true;
keep_running = true;
m_uiStreamId = streamid;
m_uiPid = 0;
@ -60,6 +61,7 @@ void CCodecStream::ResetStats(uint16_t streamid, ECodecType type)
void CCodecStream::ReportStats()
{
m_IsOpen = false;
// display stats
if (m_RTCount > 0)
{
@ -68,7 +70,7 @@ void CCodecStream::ReportStats()
double ave = 1000.0 * m_RTSum / double(m_RTCount);
auto prec = std::cout.precision();
std::cout.precision(1);
std::cout << std::fixed << "TC round-trip time(ms): " << min << "/" << ave << "/" << max << ", " << m_RTCount << " total packets" << std::endl;
std::cout << std::fixed << "TC round-trip time(ms): " << min << '/' << ave << '/' << max << ", " << m_RTCount << " total packets" << std::endl;
std::cout.precision(prec);
}
}
@ -76,16 +78,14 @@ void CCodecStream::ReportStats()
////////////////////////////////////////////////////////////////////////////////////////
// initialization
bool CCodecStream::InitCodecStream(char module)
bool CCodecStream::InitCodecStream()
{
m_TCWriter.SetUp(REF2TC);
std::string name(TC2REF);
name.append(1, module);
name.append(1, m_CSModule);
if (m_TCReader.Open(name.c_str()))
return true;
#ifdef DEBUG
std::cout << "Initialized unix socket " << name << std::endl;
#endif
std::cout << "Initialized CodecStream receive socket " << name << std::endl;
keep_running = true;
try
{
@ -93,7 +93,8 @@ bool CCodecStream::InitCodecStream(char module)
}
catch(const std::exception& e)
{
std::cerr << "Could not start Codec processing on module '" << module << "': " << e.what() << std::endl;
std::cerr << "Could not start Codec processing on module '" << m_CSModule << "': " << e.what() << std::endl;
m_TCReader.Close();
return true;
}
return false;
@ -136,15 +137,17 @@ void CCodecStream::Task(void)
if ( m_LocalQueue.IsEmpty() )
{
std::cout << "Unexpected transcoded packet received from transcoder" << std::endl;
std::cout << "Unexpected transcoded packet received from transcoder: Module='" << pack.module << "' StreamID=" << std::hex << std::showbase << ntohs(pack.streamid) << std::endl;
}
else
else if (m_IsOpen)
{
// pop the original packet
auto Packet = m_LocalQueue.Pop();
auto Frame = (CDvFramePacket *)Packet.get();
// do things look okay?
if (pack.module != m_CSModule)
std::cerr << "CodecStream '" << m_CSModule << "' received a transcoded packet from module '" << pack.module << "'" << std::dec << std::noshowbase << std::endl;
if (pack.sequence != Frame->GetCodecPacket()->sequence)
std::cerr << "Sequence mismatch: this voice frame=" << Frame->GetCodecPacket()->sequence << " returned transcoder packet=" << pack.sequence << std::endl;
if (pack.streamid != Frame->GetCodecPacket()->streamid)
@ -162,6 +165,10 @@ void CCodecStream::Task(void)
// and push it back to client
m_PacketStream->ReturnPacket(std::move(Packet));
}
else
{
std::cout << "Transcoder packet received but CodecStream[" << m_CSModule << "] is closed: Module='" << pack.module << "' StreamID=" << std::hex << std::showbase << ntohs(pack.streamid) << std::endl;
}
}
// anything in our queue
@ -171,7 +178,10 @@ void CCodecStream::Task(void)
// we need a CDvFramePacket pointer to access Frame stuff
auto Frame = (CDvFramePacket *)Packet.get();
if (m_IsOpen)
{
// update important stuff in Frame->m_TCPack for the transcoder
// sets the packet counter, stream id, last_packet, module and start the trip timer
Frame->SetTCParams(m_uiTotalPackets++);
// now send to transcoder
@ -179,6 +189,7 @@ void CCodecStream::Task(void)
// push to our local queue where it can wait for the transcoder
m_LocalQueue.Push(std::move(Packet));
}
// get the next packet, if there is one
Packet = m_Queue.Pop();

@ -34,8 +34,8 @@ class CCodecStream
{
public:
// constructor
CCodecStream(CPacketStream *packetstream);
bool InitCodecStream(char module);
CCodecStream(CPacketStream *packetstream, char module);
bool InitCodecStream();
void ResetStats(uint16_t streamid, ECodecType codectype);
void ReportStats();
@ -54,7 +54,10 @@ public:
void Push(std::unique_ptr<CPacket> p) { m_Queue.Push(std::move(p)); }
protected:
// initialization
// identity
const char m_CSModule;
// state
std::atomic<bool> m_IsOpen;
// data
uint16_t m_uiStreamId;
uint16_t m_uiPort;
@ -67,6 +70,8 @@ protected:
// associated packet stream
CPacketStream *m_PacketStream;
// queues
CSafePacketQueue<std::unique_ptr<CPacket>> m_LocalQueue, m_Queue;
// thread

@ -33,9 +33,9 @@ CPacketStream::CPacketStream(char module) : m_PSModule(module)
bool CPacketStream::InitCodecStream()
{
m_CodecStream = std::unique_ptr<CCodecStream>(new CCodecStream(this));
m_CodecStream = std::unique_ptr<CCodecStream>(new CCodecStream(this, m_PSModule));
if (m_CodecStream)
return m_CodecStream->InitCodecStream(m_PSModule);
return m_CodecStream->InitCodecStream();
else
{
std::cerr << "Could not create a CCodecStream for module '" << m_PSModule << "'" << std::endl;

Loading…
Cancel
Save

Powered by TurnKey Linux.