ambed up on std::future

pull/1/head
Tom Early 5 years ago
parent a0e615c459
commit 177f1d7e07

@ -34,8 +34,7 @@
CController::CController() CController::CController()
{ {
m_bStopThread = false; keep_running = true;
m_pThread = NULL;
m_uiLastStreamId = 0; m_uiLastStreamId = 0;
} }
@ -59,12 +58,8 @@ CController::~CController()
} }
m_Mutex.unlock(); m_Mutex.unlock();
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) Close();
{
m_pThread->join();
delete m_pThread;
}
} }
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
@ -73,7 +68,7 @@ CController::~CController()
bool CController::Init(void) bool CController::Init(void)
{ {
// reset stop flag // reset stop flag
m_bStopThread = false; keep_running = true;
// create our socket // create our socket
CIp ip(strchr(m_addr, ':') ? AF_INET6 : AF_INET, TRANSCODER_PORT, m_addr); CIp ip(strchr(m_addr, ':') ? AF_INET6 : AF_INET, TRANSCODER_PORT, m_addr);
@ -88,30 +83,17 @@ bool CController::Init(void)
return false; return false;
} }
// start thread; // start thread;
m_pThread = new std::thread(CController::Thread, this); m_Future = std::async(std::launch::async, &CController::Task, this);
return true; return true;
} }
void CController::Close(void) void CController::Close(void)
{ {
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) if (m_Future.valid() )
{
m_pThread->join();
delete m_pThread;
m_pThread = NULL;
}
}
////////////////////////////////////////////////////////////////////////////////////////
// thread
void CController::Thread(CController *This)
{
while ( !This->m_bStopThread )
{ {
This->Task(); m_Future.get();
} }
} }
@ -120,49 +102,51 @@ void CController::Thread(CController *This)
void CController::Task(void) void CController::Task(void)
{ {
CBuffer Buffer; while (keep_running) {
CIp Ip; CBuffer Buffer;
CCallsign Callsign; CIp Ip;
uint8 CodecIn; CCallsign Callsign;
uint8 CodecOut; uint8 CodecIn;
uint16 StreamId; uint8 CodecOut;
CStream *Stream; uint16 StreamId;
CStream *Stream;
// anything coming in from codec client ?
if ( m_Socket.Receive(Buffer, Ip, 20) ) // anything coming in from codec client ?
{ if ( m_Socket.Receive(Buffer, Ip, 20) )
// crack packet
if ( IsValidOpenstreamPacket(Buffer, &Callsign, &CodecIn, &CodecOut) )
{ {
std::cout << "Stream Open from " << Callsign << std::endl; // crack packet
if ( IsValidOpenstreamPacket(Buffer, &Callsign, &CodecIn, &CodecOut) )
// try create the stream
Stream = OpenStream(Callsign, Ip, CodecIn, CodecOut);
// send back details
if ( Stream != NULL )
{ {
EncodeStreamDescrPacket(&Buffer, *Stream); std::cout << "Stream Open from " << Callsign << std::endl;
// try create the stream
Stream = OpenStream(Callsign, Ip, CodecIn, CodecOut);
// send back details
if ( Stream != NULL )
{
EncodeStreamDescrPacket(&Buffer, *Stream);
}
else
{
EncodeNoStreamAvailablePacket(&Buffer);
}
m_Socket.Send(Buffer, Ip);
} }
else else if ( IsValidClosestreamPacket(Buffer, &StreamId) )
{ {
EncodeNoStreamAvailablePacket(&Buffer); // close the stream
} CloseStream(StreamId);
m_Socket.Send(Buffer, Ip);
}
else if ( IsValidClosestreamPacket(Buffer, &StreamId) )
{
// close the stream
CloseStream(StreamId);
std::cout << "Stream " << (int)StreamId << " closed" << std::endl; std::cout << "Stream " << (int)StreamId << " closed" << std::endl;
} }
else if ( IsValidKeepAlivePacket(Buffer, &Callsign) ) else if ( IsValidKeepAlivePacket(Buffer, &Callsign) )
{ {
//std::cout << "ping - " << Callsign << std::endl; //std::cout << "ping - " << Callsign << std::endl;
// pong back // pong back
EncodeKeepAlivePacket(&Buffer); EncodeKeepAlivePacket(&Buffer);
m_Socket.Send(Buffer, Ip); m_Socket.Send(Buffer, Ip);
}
} }
} }

@ -56,7 +56,6 @@ public:
void SetListenIp(const char *str) { memset(m_addr, 0, INET6_ADDRSTRLEN); strncpy(m_addr, str, INET6_ADDRSTRLEN-1); } void SetListenIp(const char *str) { memset(m_addr, 0, INET6_ADDRSTRLEN); strncpy(m_addr, str, INET6_ADDRSTRLEN-1); }
// task // task
static void Thread(CController *);
void Task(void); void Task(void);
protected: protected:
@ -88,8 +87,8 @@ protected:
std::list<CStream *> m_Streams; std::list<CStream *> m_Streams;
// thread // thread
bool m_bStopThread; std::atomic<bool> keep_running;
std::thread *m_pThread; std::future<void> m_Future;
}; };

@ -43,8 +43,7 @@ CStream::CStream()
{ {
m_uiId = 0; m_uiId = 0;
m_uiPort = 0; m_uiPort = 0;
m_bStopThread = false; keep_running = true;
m_pThread = NULL;
m_VocodecChannel = NULL; m_VocodecChannel = NULL;
m_LastActivity.Now(); m_LastActivity.Now();
m_iTotalPackets = 0; m_iTotalPackets = 0;
@ -59,8 +58,7 @@ CStream::CStream(uint16 uiId, const CCallsign &Callsign, const CIp &Ip, uint8 ui
m_uiPort = 0; m_uiPort = 0;
m_uiCodecIn = uiCodecIn; m_uiCodecIn = uiCodecIn;
m_uiCodecOut = uiCodecOut; m_uiCodecOut = uiCodecOut;
m_bStopThread = false; keep_running = true;
m_pThread = NULL;
m_VocodecChannel = NULL; m_VocodecChannel = NULL;
m_LastActivity.Now(); m_LastActivity.Now();
m_iTotalPackets = 0; m_iTotalPackets = 0;
@ -73,12 +71,10 @@ CStream::CStream(uint16 uiId, const CCallsign &Callsign, const CIp &Ip, uint8 ui
CStream::~CStream() CStream::~CStream()
{ {
// stop thread first // stop thread first
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) if ( m_Future.valid() )
{ {
m_pThread->join(); m_Future.get();
delete m_pThread;
m_pThread = NULL;
} }
// then close everything // then close everything
@ -95,7 +91,7 @@ CStream::~CStream()
bool CStream::Init(uint16 uiPort) bool CStream::Init(uint16 uiPort)
{ {
// reset stop flag // reset stop flag
m_bStopThread = false; keep_running = true;
// create our socket // create our socket
auto s = g_AmbeServer.GetListenIp(); auto s = g_AmbeServer.GetListenIp();
@ -125,7 +121,7 @@ bool CStream::Init(uint16 uiPort)
m_uiPort = uiPort; m_uiPort = uiPort;
// start thread; // start thread;
m_pThread = new std::thread(CStream::Thread, this); m_Future = std::async(std::launch::async, &CStream::Task, this);
// init timeout system // init timeout system
m_LastActivity.Now(); m_LastActivity.Now();
@ -141,12 +137,10 @@ bool CStream::Init(uint16 uiPort)
void CStream::Close(void) void CStream::Close(void)
{ {
// stop thread first // stop thread first
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) if ( m_Future.valid() )
{ {
m_pThread->join(); m_Future.get();
delete m_pThread;
m_pThread = NULL;
} }
// then close everything // then close everything
@ -161,61 +155,52 @@ void CStream::Close(void)
std::cout << m_iLostPackets << " of " << m_iTotalPackets << " packets lost" << std::endl; std::cout << m_iLostPackets << " of " << m_iTotalPackets << " packets lost" << std::endl;
} }
////////////////////////////////////////////////////////////////////////////////////////
// thread
void CStream::Thread(CStream *This)
{
while ( !This->m_bStopThread )
{
This->Task();
}
}
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
// task // task
void CStream::Task(void) void CStream::Task(void)
{ {
CBuffer Buffer; while (keep_running) {
static CIp Ip; CBuffer Buffer;
uint8 uiPid; static CIp Ip;
uint8 Ambe[AMBE_FRAME_SIZE]; uint8 uiPid;
CAmbePacket *packet; uint8 Ambe[AMBE_FRAME_SIZE];
CPacketQueue *queue; CAmbePacket *packet;
CPacketQueue *queue;
// anything coming in from codec client ?
if ( m_Socket.Receive(Buffer, Ip, 1) ) // anything coming in from codec client ?
{ if ( m_Socket.Receive(Buffer, Ip, 1) )
// crack packet
if ( IsValidDvFramePacket(Buffer, &uiPid, Ambe) )
{ {
// transcode AMBE here // crack packet
m_LastActivity.Now(); if ( IsValidDvFramePacket(Buffer, &uiPid, Ambe) )
m_iTotalPackets++; {
// transcode AMBE here
// post packet to VocoderChannel m_LastActivity.Now();
packet = new CAmbePacket(uiPid, m_uiCodecIn, Ambe); m_iTotalPackets++;
queue = m_VocodecChannel->GetPacketQueueIn();
queue->push(packet); // post packet to VocoderChannel
m_VocodecChannel->ReleasePacketQueueIn(); packet = new CAmbePacket(uiPid, m_uiCodecIn, Ambe);
queue = m_VocodecChannel->GetPacketQueueIn();
queue->push(packet);
m_VocodecChannel->ReleasePacketQueueIn();
}
} }
}
// anything in our queue ? // anything in our queue ?
queue = m_VocodecChannel->GetPacketQueueOut(); queue = m_VocodecChannel->GetPacketQueueOut();
while ( !queue->empty() ) while ( !queue->empty() )
{ {
// get the packet // get the packet
packet = (CAmbePacket *)queue->front(); packet = (CAmbePacket *)queue->front();
queue->pop(); queue->pop();
// send it to client // send it to client
EncodeDvFramePacket(&Buffer, packet->GetPid(), packet->GetAmbe()); EncodeDvFramePacket(&Buffer, packet->GetPid(), packet->GetAmbe());
m_Socket.Send(Buffer, Ip, m_uiPort); m_Socket.Send(Buffer, Ip, m_uiPort);
// and done // and done
delete packet; delete packet;
}
m_VocodecChannel->ReleasePacketQueueOut();
} }
m_VocodecChannel->ReleasePacketQueueOut();
} }
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////

@ -56,7 +56,6 @@ public:
bool IsActive(void) const { return m_LastActivity.DurationSinceNow() <= STREAM_ACTIVITY_TIMEOUT; } bool IsActive(void) const { return m_LastActivity.DurationSinceNow() <= STREAM_ACTIVITY_TIMEOUT; }
// task // task
static void Thread(CStream *);
void Task(void); void Task(void);
protected: protected:
@ -88,8 +87,8 @@ protected:
CTimePoint m_LastActivity; CTimePoint m_LastActivity;
// thread // thread
bool m_bStopThread; std::atomic<bool> keep_running;
std::thread *m_pThread; std::future<void> m_Future;
}; };

@ -34,8 +34,7 @@
CVocodecInterface::CVocodecInterface() CVocodecInterface::CVocodecInterface()
{ {
m_Channels.reserve(5); m_Channels.reserve(5);
m_bStopThread = false; keep_running = true;
m_pThread = NULL;
} }
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
@ -48,11 +47,10 @@ CVocodecInterface::~CVocodecInterface()
m_Channels.clear(); m_Channels.clear();
// stop thread // stop thread
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) if ( m_Future.valid() )
{ {
m_pThread->join(); m_Future.get();
delete m_pThread;
} }
} }
@ -62,10 +60,10 @@ CVocodecInterface::~CVocodecInterface()
bool CVocodecInterface::Init(void) bool CVocodecInterface::Init(void)
{ {
// reset stop flag // reset stop flag
m_bStopThread = false; keep_running = true;
// start thread; // start thread;
m_pThread = new std::thread(CVocodecInterface::Thread, this); m_Future = std::async(std::launch::async, &CVocodecInterface::Thread, this);
// done // done
return true; return true;
@ -75,11 +73,11 @@ bool CVocodecInterface::Init(void)
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
// thread // thread
void CVocodecInterface::Thread(CVocodecInterface *This) void CVocodecInterface::Thread()
{ {
while ( !This->m_bStopThread ) while ( keep_running )
{ {
This->Task(); Task();
} }
} }
@ -91,5 +89,3 @@ void CVocodecInterface::AddChannel(CVocodecChannel *Channel)
{ {
m_Channels.push_back(Channel); m_Channels.push_back(Channel);
} }

@ -55,8 +55,8 @@ public:
virtual CVocodecChannel *GetChannelWithChannelOut(int) { return NULL; } virtual CVocodecChannel *GetChannelWithChannelOut(int) { return NULL; }
// task // task
static void Thread(CVocodecInterface *); void Thread(void);
virtual void Task(void) {}; virtual void Task(void) {}
// operators // operators
virtual bool operator ==(const CVocodecInterface &) const { return false; } virtual bool operator ==(const CVocodecInterface &) const { return false; }
@ -66,8 +66,8 @@ protected:
std::vector<CVocodecChannel *> m_Channels; std::vector<CVocodecChannel *> m_Channels;
// thread // thread
bool m_bStopThread; std::atomic<bool> keep_running;
std::thread *m_pThread; std::future<void> m_Future;
}; };

@ -31,7 +31,8 @@
#include <map> #include <map>
#include <queue> #include <queue>
#include <chrono> #include <chrono>
#include <thread> #include <future>
#include <atomic>
#include <mutex> #include <mutex>
#include <atomic> #include <atomic>
#include <condition_variable> #include <condition_variable>

Loading…
Cancel
Save

Powered by TurnKey Linux.