From 71a9f785ea7f35a6e90bf992e567928cfefd13fb Mon Sep 17 00:00:00 2001 From: Tom Early Date: Fri, 3 Jul 2020 15:46:36 -0700 Subject: [PATCH] CTranscoder:m_Streams is list --- src/ctranscoder.cpp | 104 ++++++++++++++++++-------------------------- src/ctranscoder.h | 17 ++++---- 2 files changed, 51 insertions(+), 70 deletions(-) diff --git a/src/ctranscoder.cpp b/src/ctranscoder.cpp index 6b810c9..6e48ccc 100644 --- a/src/ctranscoder.cpp +++ b/src/ctranscoder.cpp @@ -48,7 +48,6 @@ CTranscoder::CTranscoder() { m_bStopThread = false; m_pThread = NULL; - m_Streams.reserve(12); m_bConnected = false; m_LastKeepaliveTime.Now(); m_LastActivityTime.Now(); @@ -62,25 +61,7 @@ CTranscoder::CTranscoder() CTranscoder::~CTranscoder() { - // close all streams - m_Mutex.lock(); - { - for ( int i = 0; i < m_Streams.size(); i++ ) - { - delete m_Streams[i]; - } - m_Streams.clear(); - - } - m_Mutex.unlock(); - - // kill threads - m_bStopThread = true; - if ( m_pThread != NULL ) - { - m_pThread->join(); - delete m_pThread; - } + Close(); } //////////////////////////////////////////////////////////////////////////////////////// @@ -89,13 +70,13 @@ CTranscoder::~CTranscoder() bool CTranscoder::Init(void) { bool ok; - + // reset stop flag m_bStopThread = false; // create server's IP m_Ip = g_Reflector.GetTranscoderIp(); - + // create our socket ok = m_Socket.Open(TRANSCODER_PORT); if ( ok ) @@ -116,19 +97,19 @@ void CTranscoder::Close(void) { // close socket m_Socket.Close(); - + // close all streams m_Mutex.lock(); { - for ( int i = 0; i < m_Streams.size(); i++ ) + for ( auto it=m_Streams.begin(); it!=m_Streams.end(); it++ ) { - delete m_Streams[i]; + delete *it; } m_Streams.clear(); - + } m_Mutex.unlock(); - + // kill threads m_bStopThread = true; if ( m_pThread != NULL ) @@ -156,13 +137,13 @@ void CTranscoder::Task(void) CIp Ip; uint16 StreamId; uint16 Port; - + // anything coming in from codec server ? //if ( (m_Socket.Receive(&Buffer, &Ip, 20) != -1) && (Ip == m_Ip) ) if ( m_Socket.Receive(&Buffer, &Ip, 20) != -1 ) { m_LastActivityTime.Now(); - + // crack packet if ( IsValidStreamDescrPacket(Buffer, &StreamId, &Port) ) { @@ -185,15 +166,15 @@ void CTranscoder::Task(void) } m_bConnected = true; } - + } - + // keep client alive if ( m_LastKeepaliveTime.DurationSinceNow() > TRANSCODER_KEEPALIVE_PERIOD ) { // HandleKeepalives(); - + // update time m_LastKeepaliveTime.Now(); } @@ -205,9 +186,9 @@ void CTranscoder::Task(void) CCodecStream *CTranscoder::GetStream(CPacketStream *PacketStream, uint8 uiCodecIn) { CBuffer Buffer; - + CCodecStream *stream = NULL; - + // do we need transcoding if ( uiCodecIn != CODEC_NONE ) { @@ -217,17 +198,17 @@ CCodecStream *CTranscoder::GetStream(CPacketStream *PacketStream, uint8 uiCodecI // yes, post openstream request EncodeOpenstreamPacket(&Buffer, uiCodecIn, (uiCodecIn == CODEC_AMBEPLUS) ? CODEC_AMBE2PLUS : CODEC_AMBEPLUS); m_Socket.Send(Buffer, m_Ip, TRANSCODER_PORT); - + // wait relpy here if ( m_SemaphoreOpenStream.WaitFor(AMBED_OPENSTREAM_TIMEOUT) ) { if ( m_bStreamOpened ) { std::cout << "ambed openstream ok" << std::endl; - + // create stream object stream = new CCodecStream(PacketStream, m_StreamidOpenStream, uiCodecIn, (uiCodecIn == CODEC_AMBEPLUS) ? CODEC_AMBE2PLUS : CODEC_AMBEPLUS); - + // init it if ( stream->Init(m_PortOpenStream) ) { @@ -255,7 +236,7 @@ CCodecStream *CTranscoder::GetStream(CPacketStream *PacketStream, uint8 uiCodecI { std::cout << "ambed openstream timeout" << std::endl; } - + } } return stream; @@ -264,46 +245,46 @@ CCodecStream *CTranscoder::GetStream(CPacketStream *PacketStream, uint8 uiCodecI void CTranscoder::ReleaseStream(CCodecStream *stream) { CBuffer Buffer; - + if ( stream != NULL ) { // look for the stream bool found = false; Lock(); { - for ( int i = 0; (i < m_Streams.size()) && !found; i++ ) + for ( auto it=m_Streams.begin(); it!=m_Streams.end(); it++ ) { // compare object pointers - if ( (m_Streams[i]) == stream ) + if ( (*it) == stream ) { // send close packet - EncodeClosestreamPacket(&Buffer, m_Streams[i]->GetStreamId()); + EncodeClosestreamPacket(&Buffer, (*it)->GetStreamId()); m_Socket.Send(Buffer, m_Ip, TRANSCODER_PORT); - + // display stats - if ( m_Streams[i]->GetPingMin() >= 0.0 ) + if ( (*it)->GetPingMin() >= 0.0 ) { char sz[256]; sprintf(sz, "ambed stats (ms) : %.1f/%.1f/%.1f", - m_Streams[i]->GetPingMin() * 1000.0, - m_Streams[i]->GetPingAve() * 1000.0, - m_Streams[i]->GetPingMax() * 1000.0); + (*it)->GetPingMin() * 1000.0, + (*it)->GetPingAve() * 1000.0, + (*it)->GetPingMax() * 1000.0); std::cout << sz << std::endl; } - if ( m_Streams[i]->GetTimeoutPackets() > 0 ) + if ( (*it)->GetTimeoutPackets() > 0 ) { char sz[256]; sprintf(sz, "ambed %d of %d packets timed out", - m_Streams[i]->GetTimeoutPackets(), - m_Streams[i]->GetTotalPackets()); + (*it)->GetTimeoutPackets(), + (*it)->GetTotalPackets()); std::cout << sz << std::endl; } // and close it - m_Streams[i]->Close(); - delete m_Streams[i]; - m_Streams.erase(m_Streams.begin()+i); - found = true; + (*it)->Close(); + delete (*it); + m_Streams.erase(it); + break; } } } @@ -317,11 +298,11 @@ void CTranscoder::ReleaseStream(CCodecStream *stream) void CTranscoder::HandleKeepalives(void) { CBuffer keepalive; - + // send keepalive EncodeKeepAlivePacket(&keepalive); m_Socket.Send(keepalive, m_Ip, TRANSCODER_PORT); - + // check if still with us if ( m_bConnected && (m_LastActivityTime.DurationSinceNow() >= TRANSCODER_KEEPALIVE_TIMEOUT) ) { @@ -337,7 +318,7 @@ void CTranscoder::HandleKeepalives(void) bool CTranscoder::IsValidKeepAlivePacket(const CBuffer &Buffer) { uint8 tag[] = { 'A','M','B','E','D','P','O','N','G' }; - + bool valid = false; if ( (Buffer.size() == 9) && (Buffer.Compare(tag, sizeof(tag)) == 0) ) { @@ -349,7 +330,7 @@ bool CTranscoder::IsValidKeepAlivePacket(const CBuffer &Buffer) bool CTranscoder::IsValidStreamDescrPacket(const CBuffer &Buffer, uint16 *Id, uint16 *Port) { uint8 tag[] = { 'A','M','B','E','D','S','T','D' }; - + bool valid = false; if ( (Buffer.size() == 14) && (Buffer.Compare(tag, sizeof(tag)) == 0) ) { @@ -365,7 +346,7 @@ bool CTranscoder::IsValidStreamDescrPacket(const CBuffer &Buffer, uint16 *Id, ui bool CTranscoder::IsValidNoStreamAvailablePacket(const CBuffer&Buffer) { uint8 tag[] = { 'A','M','B','E','D','B','U','S','Y' }; - + return ( (Buffer.size() == 9) && (Buffer.Compare(tag, sizeof(tag)) == 0) ); } @@ -376,7 +357,7 @@ bool CTranscoder::IsValidNoStreamAvailablePacket(const CBuffer&Buffer) void CTranscoder::EncodeKeepAlivePacket(CBuffer *Buffer) { uint8 tag[] = { 'A','M','B','E','D','P','I','N','G' }; - + Buffer->Set(tag, sizeof(tag)); Buffer->Append((uint8 *)(const char *)g_Reflector.GetCallsign(), CALLSIGN_LEN); } @@ -394,8 +375,7 @@ void CTranscoder::EncodeOpenstreamPacket(CBuffer *Buffer, uint8 uiCodecIn, uint8 void CTranscoder::EncodeClosestreamPacket(CBuffer *Buffer, uint16 uiStreamId) { uint8 tag[] = { 'A','M','B','E','D','C','S' }; - + Buffer->Set(tag, sizeof(tag)); Buffer->Append((uint16)uiStreamId); } - diff --git a/src/ctranscoder.h b/src/ctranscoder.h index 7d54c58..f40d07c 100644 --- a/src/ctranscoder.h +++ b/src/ctranscoder.h @@ -4,6 +4,7 @@ // // Created by Jean-Luc Deltombe (LX3JL) on 13/04/2017. // Copyright © 2015 Jean-Luc Deltombe (LX3JL). All rights reserved. +// Copyright © 2020 Thomas A. Early N7TAE // // ---------------------------------------------------------------------------- // This file is part of xlxd. @@ -43,7 +44,7 @@ class CTranscoder public: // constructor CTranscoder(); - + // destructor virtual ~CTranscoder(); @@ -54,14 +55,14 @@ public: // locks void Lock(void) { m_Mutex.lock(); } void Unlock(void) { m_Mutex.unlock(); } - + // status bool IsConnected(void) const { return m_bConnected; } - + // manage streams CCodecStream *GetStream(CPacketStream *, uint8); void ReleaseStream(CCodecStream *); - + // task static void Thread(CTranscoder *); void Task(void); @@ -79,18 +80,18 @@ protected: void EncodeKeepAlivePacket(CBuffer *); void EncodeOpenstreamPacket(CBuffer *, uint8, uint8); void EncodeClosestreamPacket(CBuffer *, uint16); - + protected: // streams - std::mutex m_Mutex; - std::vector m_Streams; + std::mutex m_Mutex; + std::list m_Streams; // sync objects for Openstream CSemaphore m_SemaphoreOpenStream; bool m_bStreamOpened; uint16 m_StreamidOpenStream; uint16 m_PortOpenStream; - + // thread bool m_bStopThread; std::thread *m_pThread;