CTranscoder:m_Streams is list

pull/1/head
Tom Early 6 years ago
parent 4bc72ce09e
commit 71a9f785ea

@ -48,7 +48,6 @@ CTranscoder::CTranscoder()
{
m_bStopThread = false;
m_pThread = NULL;
m_Streams.reserve(12);
m_bConnected = false;
m_LastKeepaliveTime.Now();
m_LastActivityTime.Now();
@ -62,25 +61,7 @@ CTranscoder::CTranscoder()
CTranscoder::~CTranscoder()
{
// close all streams
m_Mutex.lock();
{
for ( int i = 0; i < m_Streams.size(); i++ )
{
delete m_Streams[i];
}
m_Streams.clear();
}
m_Mutex.unlock();
// kill threads
m_bStopThread = true;
if ( m_pThread != NULL )
{
m_pThread->join();
delete m_pThread;
}
Close();
}
////////////////////////////////////////////////////////////////////////////////////////
@ -89,13 +70,13 @@ CTranscoder::~CTranscoder()
bool CTranscoder::Init(void)
{
bool ok;
// reset stop flag
m_bStopThread = false;
// create server's IP
m_Ip = g_Reflector.GetTranscoderIp();
// create our socket
ok = m_Socket.Open(TRANSCODER_PORT);
if ( ok )
@ -116,19 +97,19 @@ void CTranscoder::Close(void)
{
// close socket
m_Socket.Close();
// close all streams
m_Mutex.lock();
{
for ( int i = 0; i < m_Streams.size(); i++ )
for ( auto it=m_Streams.begin(); it!=m_Streams.end(); it++ )
{
delete m_Streams[i];
delete *it;
}
m_Streams.clear();
}
m_Mutex.unlock();
// kill threads
m_bStopThread = true;
if ( m_pThread != NULL )
@ -156,13 +137,13 @@ void CTranscoder::Task(void)
CIp Ip;
uint16 StreamId;
uint16 Port;
// anything coming in from codec server ?
//if ( (m_Socket.Receive(&Buffer, &Ip, 20) != -1) && (Ip == m_Ip) )
if ( m_Socket.Receive(&Buffer, &Ip, 20) != -1 )
{
m_LastActivityTime.Now();
// crack packet
if ( IsValidStreamDescrPacket(Buffer, &StreamId, &Port) )
{
@ -185,15 +166,15 @@ void CTranscoder::Task(void)
}
m_bConnected = true;
}
}
// keep client alive
if ( m_LastKeepaliveTime.DurationSinceNow() > TRANSCODER_KEEPALIVE_PERIOD )
{
//
HandleKeepalives();
// update time
m_LastKeepaliveTime.Now();
}
@ -205,9 +186,9 @@ void CTranscoder::Task(void)
CCodecStream *CTranscoder::GetStream(CPacketStream *PacketStream, uint8 uiCodecIn)
{
CBuffer Buffer;
CCodecStream *stream = NULL;
// do we need transcoding
if ( uiCodecIn != CODEC_NONE )
{
@ -217,17 +198,17 @@ CCodecStream *CTranscoder::GetStream(CPacketStream *PacketStream, uint8 uiCodecI
// yes, post openstream request
EncodeOpenstreamPacket(&Buffer, uiCodecIn, (uiCodecIn == CODEC_AMBEPLUS) ? CODEC_AMBE2PLUS : CODEC_AMBEPLUS);
m_Socket.Send(Buffer, m_Ip, TRANSCODER_PORT);
// wait relpy here
if ( m_SemaphoreOpenStream.WaitFor(AMBED_OPENSTREAM_TIMEOUT) )
{
if ( m_bStreamOpened )
{
std::cout << "ambed openstream ok" << std::endl;
// create stream object
stream = new CCodecStream(PacketStream, m_StreamidOpenStream, uiCodecIn, (uiCodecIn == CODEC_AMBEPLUS) ? CODEC_AMBE2PLUS : CODEC_AMBEPLUS);
// init it
if ( stream->Init(m_PortOpenStream) )
{
@ -255,7 +236,7 @@ CCodecStream *CTranscoder::GetStream(CPacketStream *PacketStream, uint8 uiCodecI
{
std::cout << "ambed openstream timeout" << std::endl;
}
}
}
return stream;
@ -264,46 +245,46 @@ CCodecStream *CTranscoder::GetStream(CPacketStream *PacketStream, uint8 uiCodecI
void CTranscoder::ReleaseStream(CCodecStream *stream)
{
CBuffer Buffer;
if ( stream != NULL )
{
// look for the stream
bool found = false;
Lock();
{
for ( int i = 0; (i < m_Streams.size()) && !found; i++ )
for ( auto it=m_Streams.begin(); it!=m_Streams.end(); it++ )
{
// compare object pointers
if ( (m_Streams[i]) == stream )
if ( (*it) == stream )
{
// send close packet
EncodeClosestreamPacket(&Buffer, m_Streams[i]->GetStreamId());
EncodeClosestreamPacket(&Buffer, (*it)->GetStreamId());
m_Socket.Send(Buffer, m_Ip, TRANSCODER_PORT);
// display stats
if ( m_Streams[i]->GetPingMin() >= 0.0 )
if ( (*it)->GetPingMin() >= 0.0 )
{
char sz[256];
sprintf(sz, "ambed stats (ms) : %.1f/%.1f/%.1f",
m_Streams[i]->GetPingMin() * 1000.0,
m_Streams[i]->GetPingAve() * 1000.0,
m_Streams[i]->GetPingMax() * 1000.0);
(*it)->GetPingMin() * 1000.0,
(*it)->GetPingAve() * 1000.0,
(*it)->GetPingMax() * 1000.0);
std::cout << sz << std::endl;
}
if ( m_Streams[i]->GetTimeoutPackets() > 0 )
if ( (*it)->GetTimeoutPackets() > 0 )
{
char sz[256];
sprintf(sz, "ambed %d of %d packets timed out",
m_Streams[i]->GetTimeoutPackets(),
m_Streams[i]->GetTotalPackets());
(*it)->GetTimeoutPackets(),
(*it)->GetTotalPackets());
std::cout << sz << std::endl;
}
// and close it
m_Streams[i]->Close();
delete m_Streams[i];
m_Streams.erase(m_Streams.begin()+i);
found = true;
(*it)->Close();
delete (*it);
m_Streams.erase(it);
break;
}
}
}
@ -317,11 +298,11 @@ void CTranscoder::ReleaseStream(CCodecStream *stream)
void CTranscoder::HandleKeepalives(void)
{
CBuffer keepalive;
// send keepalive
EncodeKeepAlivePacket(&keepalive);
m_Socket.Send(keepalive, m_Ip, TRANSCODER_PORT);
// check if still with us
if ( m_bConnected && (m_LastActivityTime.DurationSinceNow() >= TRANSCODER_KEEPALIVE_TIMEOUT) )
{
@ -337,7 +318,7 @@ void CTranscoder::HandleKeepalives(void)
bool CTranscoder::IsValidKeepAlivePacket(const CBuffer &Buffer)
{
uint8 tag[] = { 'A','M','B','E','D','P','O','N','G' };
bool valid = false;
if ( (Buffer.size() == 9) && (Buffer.Compare(tag, sizeof(tag)) == 0) )
{
@ -349,7 +330,7 @@ bool CTranscoder::IsValidKeepAlivePacket(const CBuffer &Buffer)
bool CTranscoder::IsValidStreamDescrPacket(const CBuffer &Buffer, uint16 *Id, uint16 *Port)
{
uint8 tag[] = { 'A','M','B','E','D','S','T','D' };
bool valid = false;
if ( (Buffer.size() == 14) && (Buffer.Compare(tag, sizeof(tag)) == 0) )
{
@ -365,7 +346,7 @@ bool CTranscoder::IsValidStreamDescrPacket(const CBuffer &Buffer, uint16 *Id, ui
bool CTranscoder::IsValidNoStreamAvailablePacket(const CBuffer&Buffer)
{
uint8 tag[] = { 'A','M','B','E','D','B','U','S','Y' };
return ( (Buffer.size() == 9) && (Buffer.Compare(tag, sizeof(tag)) == 0) );
}
@ -376,7 +357,7 @@ bool CTranscoder::IsValidNoStreamAvailablePacket(const CBuffer&Buffer)
void CTranscoder::EncodeKeepAlivePacket(CBuffer *Buffer)
{
uint8 tag[] = { 'A','M','B','E','D','P','I','N','G' };
Buffer->Set(tag, sizeof(tag));
Buffer->Append((uint8 *)(const char *)g_Reflector.GetCallsign(), CALLSIGN_LEN);
}
@ -394,8 +375,7 @@ void CTranscoder::EncodeOpenstreamPacket(CBuffer *Buffer, uint8 uiCodecIn, uint8
void CTranscoder::EncodeClosestreamPacket(CBuffer *Buffer, uint16 uiStreamId)
{
uint8 tag[] = { 'A','M','B','E','D','C','S' };
Buffer->Set(tag, sizeof(tag));
Buffer->Append((uint16)uiStreamId);
}

@ -4,6 +4,7 @@
//
// Created by Jean-Luc Deltombe (LX3JL) on 13/04/2017.
// Copyright © 2015 Jean-Luc Deltombe (LX3JL). All rights reserved.
// Copyright © 2020 Thomas A. Early N7TAE
//
// ----------------------------------------------------------------------------
// This file is part of xlxd.
@ -43,7 +44,7 @@ class CTranscoder
public:
// constructor
CTranscoder();
// destructor
virtual ~CTranscoder();
@ -54,14 +55,14 @@ public:
// locks
void Lock(void) { m_Mutex.lock(); }
void Unlock(void) { m_Mutex.unlock(); }
// status
bool IsConnected(void) const { return m_bConnected; }
// manage streams
CCodecStream *GetStream(CPacketStream *, uint8);
void ReleaseStream(CCodecStream *);
// task
static void Thread(CTranscoder *);
void Task(void);
@ -79,18 +80,18 @@ protected:
void EncodeKeepAlivePacket(CBuffer *);
void EncodeOpenstreamPacket(CBuffer *, uint8, uint8);
void EncodeClosestreamPacket(CBuffer *, uint16);
protected:
// streams
std::mutex m_Mutex;
std::vector<CCodecStream *> m_Streams;
std::mutex m_Mutex;
std::list<CCodecStream *> m_Streams;
// sync objects for Openstream
CSemaphore m_SemaphoreOpenStream;
bool m_bStreamOpened;
uint16 m_StreamidOpenStream;
uint16 m_PortOpenStream;
// thread
bool m_bStopThread;
std::thread *m_pThread;

Loading…
Cancel
Save

Powered by TurnKey Linux.