push to local queue before write & remove IsEmpty()

unstable
Tom Early 4 years ago
parent e1c55b4ca8
commit a409a475f0

@ -76,14 +76,6 @@ void CCodecStream::InitCodecStream(void)
m_Future = std::async(std::launch::async, &CCodecStream::Thread, this); m_Future = std::async(std::launch::async, &CCodecStream::Thread, this);
} }
////////////////////////////////////////////////////////////////////////////////////////
// get
bool CCodecStream::IsEmpty(void) const
{
return (m_LocalQueue.empty() && m_PacketStream->empty());
}
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
// thread // thread
@ -160,17 +152,16 @@ void CCodecStream::Task(void)
// we need a CDvFramePacket pointer to access Frame stuff // we need a CDvFramePacket pointer to access Frame stuff
auto Frame = (CDvFramePacket *)Packet.get(); auto Frame = (CDvFramePacket *)Packet.get();
// push to our local queue so it can wait for the transcoder
m_LocalQueue.push(Packet);
// update important stuff in Frame->m_TCPack for the transcoder // update important stuff in Frame->m_TCPack for the transcoder
Frame->SetTCParams(m_uiTotalPackets); Frame->SetTCParams(m_uiTotalPackets++);
// now send to transcoder // now send to transcoder
// this assume that thread pushing the Packet // this assume that thread pushing the Packet
// have verified that the CodecStream is connected // have verified that the CodecStream is connected
// and that the packet needs transcoding // and that the packet needs transcoding
m_uiTotalPackets++;
m_TCWriter.Send(Frame->GetCodecPacket()); m_TCWriter.Send(Frame->GetCodecPacket());
// and push to our local queue
m_LocalQueue.push(Packet);
} }
} }

@ -37,7 +37,6 @@ public:
// get // get
uint16_t GetStreamId(void) const { return m_uiStreamId; } uint16_t GetStreamId(void) const { return m_uiStreamId; }
bool IsEmpty(void) const;
// task // task
void Thread(void); void Thread(void);

@ -130,11 +130,6 @@ CDvFramePacket::CDvFramePacket(const CBuffer &buf) : CPacket(buf)
memcpy(m_TCPack.dmr, data+off, 9); off += 9; memcpy(m_TCPack.dmr, data+off, 9); off += 9;
memcpy(m_TCPack.m17, data+off, 16); off += 16; memcpy(m_TCPack.m17, data+off, 16); off += 16;
SetTCParams(seq); SetTCParams(seq);
m_TCPack.rt_timer.start();
m_TCPack.module = m_cModule;
m_TCPack.is_last = m_bLastPacket;
m_TCPack.streamid = m_uiStreamId;
m_TCPack.codec_in = m_eCodecIn;
} }
else else
std::cerr << "CBuffer is too small to initialize a CDvFramePacket" << std::endl; std::cerr << "CBuffer is too small to initialize a CDvFramePacket" << std::endl;

@ -59,6 +59,7 @@ public:
std::unique_ptr<CPacket> Duplicate(void) const; std::unique_ptr<CPacket> Duplicate(void) const;
// identity // identity
bool IsDvHeader(void) const { return false; }
bool IsDvFrame(void) const { return true; } bool IsDvFrame(void) const { return true; }
// get // get

@ -69,6 +69,7 @@ public:
// identity // identity
bool IsDvHeader(void) const { return true; } bool IsDvHeader(void) const { return true; }
bool IsDvFrame(void) const { return false; }
// conversion // conversion
void ConvertToDstarStruct(struct dstar_header *) const; void ConvertToDstarStruct(struct dstar_header *) const;

@ -45,8 +45,8 @@ public:
virtual std::unique_ptr<CPacket> Duplicate(void) const = 0; virtual std::unique_ptr<CPacket> Duplicate(void) const = 0;
// identity // identity
virtual bool IsDvHeader(void) const { return false; } virtual bool IsDvHeader(void) const = 0;
virtual bool IsDvFrame(void) const { return false; } virtual bool IsDvFrame(void) const = 0;
bool IsLastPacket(void) const { return m_bLastPacket; } bool IsLastPacket(void) const { return m_bLastPacket; }
// get // get

@ -19,7 +19,6 @@
#pragma once #pragma once
#include "Packet.h" #include "Packet.h"
#include "Client.h"
class CClient; class CClient;
@ -64,7 +63,6 @@ protected:
uint16_t m_uiStreamId; uint16_t m_uiStreamId;
std::mutex m_Mutex; std::mutex m_Mutex;
// owner // the queue
CClient *m_Client;
std::queue<std::unique_ptr<CPacket>> queue; std::queue<std::unique_ptr<CPacket>> queue;
}; };

@ -131,23 +131,6 @@ void CPacketStream::Push(std::unique_ptr<CPacket> Packet)
} }
} }
bool CPacketStream::IsEmpty(void) const
{
#ifdef TRANSCODED_MODULES
bool bEmpty = empty();
// also check no packets still in Codec stream's queue
if ( bEmpty && (m_CodecStream != nullptr) )
{
bEmpty = m_CodecStream->IsEmpty();
}
// done
return bEmpty;
#else
return empty();
#endif
}
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
// get // get

@ -21,10 +21,11 @@
#include "PacketQueue.h" #include "PacketQueue.h"
#include "Timer.h" #include "Timer.h"
#include "DVHeaderPacket.h" #include "DVHeaderPacket.h"
#include "Client.h"
#ifdef TRANSCODED_MODULES #ifdef TRANSCODED_MODULES
#include "UnixDgramSocket.h" #include "UnixDgramSocket.h"
#endif
#include "CodecStream.h" #include "CodecStream.h"
#endif
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
@ -51,7 +52,6 @@ public:
// push & pop // push & pop
void Push(std::unique_ptr<CPacket> packet); void Push(std::unique_ptr<CPacket> packet);
void Tickle(void) { m_LastPacketTime.start(); } void Tickle(void) { m_LastPacketTime.start(); }
bool IsEmpty(void) const;
// get // get
std::shared_ptr<CClient> GetOwnerClient(void) { return m_OwnerClient; } std::shared_ptr<CClient> GetOwnerClient(void) { return m_OwnerClient; }

Loading…
Cancel
Save

Powered by TurnKey Linux.