diff --git a/ambed/ccontroller.cpp b/ambed/ccontroller.cpp index e81f4b9..ef1a012 100644 --- a/ambed/ccontroller.cpp +++ b/ambed/ccontroller.cpp @@ -34,8 +34,7 @@ CController::CController() { - m_bStopThread = false; - m_pThread = NULL; + keep_running = true; m_uiLastStreamId = 0; } @@ -59,12 +58,8 @@ CController::~CController() } m_Mutex.unlock(); - m_bStopThread = true; - if ( m_pThread != NULL ) - { - m_pThread->join(); - delete m_pThread; - } + keep_running = false; + Close(); } //////////////////////////////////////////////////////////////////////////////////////// @@ -73,7 +68,7 @@ CController::~CController() bool CController::Init(void) { // reset stop flag - m_bStopThread = false; + keep_running = true; // create our socket CIp ip(strchr(m_addr, ':') ? AF_INET6 : AF_INET, TRANSCODER_PORT, m_addr); @@ -88,30 +83,17 @@ bool CController::Init(void) return false; } // start thread; - m_pThread = new std::thread(CController::Thread, this); + m_Future = std::async(std::launch::async, &CController::Task, this); return true; } void CController::Close(void) { - m_bStopThread = true; - if ( m_pThread != NULL ) - { - m_pThread->join(); - delete m_pThread; - m_pThread = NULL; - } -} - -//////////////////////////////////////////////////////////////////////////////////////// -// thread - -void CController::Thread(CController *This) -{ - while ( !This->m_bStopThread ) + keep_running = false; + if (m_Future.valid() ) { - This->Task(); + m_Future.get(); } } @@ -120,49 +102,51 @@ void CController::Thread(CController *This) void CController::Task(void) { - CBuffer Buffer; - CIp Ip; - CCallsign Callsign; - uint8 CodecIn; - uint8 CodecOut; - uint16 StreamId; - CStream *Stream; - - // anything coming in from codec client ? - if ( m_Socket.Receive(Buffer, Ip, 20) ) - { - // crack packet - if ( IsValidOpenstreamPacket(Buffer, &Callsign, &CodecIn, &CodecOut) ) + while (keep_running) { + CBuffer Buffer; + CIp Ip; + CCallsign Callsign; + uint8 CodecIn; + uint8 CodecOut; + uint16 StreamId; + CStream *Stream; + + // anything coming in from codec client ? + if ( m_Socket.Receive(Buffer, Ip, 20) ) { - std::cout << "Stream Open from " << Callsign << std::endl; - - // try create the stream - Stream = OpenStream(Callsign, Ip, CodecIn, CodecOut); - - // send back details - if ( Stream != NULL ) + // crack packet + if ( IsValidOpenstreamPacket(Buffer, &Callsign, &CodecIn, &CodecOut) ) { - EncodeStreamDescrPacket(&Buffer, *Stream); + std::cout << "Stream Open from " << Callsign << std::endl; + + // try create the stream + Stream = OpenStream(Callsign, Ip, CodecIn, CodecOut); + + // send back details + if ( Stream != NULL ) + { + EncodeStreamDescrPacket(&Buffer, *Stream); + } + else + { + EncodeNoStreamAvailablePacket(&Buffer); + } + m_Socket.Send(Buffer, Ip); } - else + else if ( IsValidClosestreamPacket(Buffer, &StreamId) ) { - EncodeNoStreamAvailablePacket(&Buffer); - } - m_Socket.Send(Buffer, Ip); - } - else if ( IsValidClosestreamPacket(Buffer, &StreamId) ) - { - // close the stream - CloseStream(StreamId); + // close the stream + CloseStream(StreamId); - std::cout << "Stream " << (int)StreamId << " closed" << std::endl; - } - else if ( IsValidKeepAlivePacket(Buffer, &Callsign) ) - { - //std::cout << "ping - " << Callsign << std::endl; - // pong back - EncodeKeepAlivePacket(&Buffer); - m_Socket.Send(Buffer, Ip); + std::cout << "Stream " << (int)StreamId << " closed" << std::endl; + } + else if ( IsValidKeepAlivePacket(Buffer, &Callsign) ) + { + //std::cout << "ping - " << Callsign << std::endl; + // pong back + EncodeKeepAlivePacket(&Buffer); + m_Socket.Send(Buffer, Ip); + } } } diff --git a/ambed/ccontroller.h b/ambed/ccontroller.h index f4c9c90..3abd70f 100644 --- a/ambed/ccontroller.h +++ b/ambed/ccontroller.h @@ -56,7 +56,6 @@ public: void SetListenIp(const char *str) { memset(m_addr, 0, INET6_ADDRSTRLEN); strncpy(m_addr, str, INET6_ADDRSTRLEN-1); } // task - static void Thread(CController *); void Task(void); protected: @@ -88,8 +87,8 @@ protected: std::list m_Streams; // thread - bool m_bStopThread; - std::thread *m_pThread; + std::atomic keep_running; + std::future m_Future; }; diff --git a/ambed/cstream.cpp b/ambed/cstream.cpp index fe8e037..95dace6 100644 --- a/ambed/cstream.cpp +++ b/ambed/cstream.cpp @@ -43,8 +43,7 @@ CStream::CStream() { m_uiId = 0; m_uiPort = 0; - m_bStopThread = false; - m_pThread = NULL; + keep_running = true; m_VocodecChannel = NULL; m_LastActivity.Now(); m_iTotalPackets = 0; @@ -59,8 +58,7 @@ CStream::CStream(uint16 uiId, const CCallsign &Callsign, const CIp &Ip, uint8 ui m_uiPort = 0; m_uiCodecIn = uiCodecIn; m_uiCodecOut = uiCodecOut; - m_bStopThread = false; - m_pThread = NULL; + keep_running = true; m_VocodecChannel = NULL; m_LastActivity.Now(); m_iTotalPackets = 0; @@ -73,12 +71,10 @@ CStream::CStream(uint16 uiId, const CCallsign &Callsign, const CIp &Ip, uint8 ui CStream::~CStream() { // stop thread first - m_bStopThread = true; - if ( m_pThread != NULL ) + keep_running = false; + if ( m_Future.valid() ) { - m_pThread->join(); - delete m_pThread; - m_pThread = NULL; + m_Future.get(); } // then close everything @@ -95,7 +91,7 @@ CStream::~CStream() bool CStream::Init(uint16 uiPort) { // reset stop flag - m_bStopThread = false; + keep_running = true; // create our socket auto s = g_AmbeServer.GetListenIp(); @@ -125,7 +121,7 @@ bool CStream::Init(uint16 uiPort) m_uiPort = uiPort; // start thread; - m_pThread = new std::thread(CStream::Thread, this); + m_Future = std::async(std::launch::async, &CStream::Task, this); // init timeout system m_LastActivity.Now(); @@ -141,12 +137,10 @@ bool CStream::Init(uint16 uiPort) void CStream::Close(void) { // stop thread first - m_bStopThread = true; - if ( m_pThread != NULL ) + keep_running = false; + if ( m_Future.valid() ) { - m_pThread->join(); - delete m_pThread; - m_pThread = NULL; + m_Future.get(); } // then close everything @@ -161,61 +155,52 @@ void CStream::Close(void) std::cout << m_iLostPackets << " of " << m_iTotalPackets << " packets lost" << std::endl; } -//////////////////////////////////////////////////////////////////////////////////////// -// thread - -void CStream::Thread(CStream *This) -{ - while ( !This->m_bStopThread ) - { - This->Task(); - } -} - //////////////////////////////////////////////////////////////////////////////////////// // task void CStream::Task(void) { - CBuffer Buffer; - static CIp Ip; - uint8 uiPid; - uint8 Ambe[AMBE_FRAME_SIZE]; - CAmbePacket *packet; - CPacketQueue *queue; - - // anything coming in from codec client ? - if ( m_Socket.Receive(Buffer, Ip, 1) ) - { - // crack packet - if ( IsValidDvFramePacket(Buffer, &uiPid, Ambe) ) + while (keep_running) { + CBuffer Buffer; + static CIp Ip; + uint8 uiPid; + uint8 Ambe[AMBE_FRAME_SIZE]; + CAmbePacket *packet; + CPacketQueue *queue; + + // anything coming in from codec client ? + if ( m_Socket.Receive(Buffer, Ip, 1) ) { - // transcode AMBE here - m_LastActivity.Now(); - m_iTotalPackets++; - - // post packet to VocoderChannel - packet = new CAmbePacket(uiPid, m_uiCodecIn, Ambe); - queue = m_VocodecChannel->GetPacketQueueIn(); - queue->push(packet); - m_VocodecChannel->ReleasePacketQueueIn(); + // crack packet + if ( IsValidDvFramePacket(Buffer, &uiPid, Ambe) ) + { + // transcode AMBE here + m_LastActivity.Now(); + m_iTotalPackets++; + + // post packet to VocoderChannel + packet = new CAmbePacket(uiPid, m_uiCodecIn, Ambe); + queue = m_VocodecChannel->GetPacketQueueIn(); + queue->push(packet); + m_VocodecChannel->ReleasePacketQueueIn(); + } } - } - // anything in our queue ? - queue = m_VocodecChannel->GetPacketQueueOut(); - while ( !queue->empty() ) - { - // get the packet - packet = (CAmbePacket *)queue->front(); - queue->pop(); - // send it to client - EncodeDvFramePacket(&Buffer, packet->GetPid(), packet->GetAmbe()); - m_Socket.Send(Buffer, Ip, m_uiPort); - // and done - delete packet; + // anything in our queue ? + queue = m_VocodecChannel->GetPacketQueueOut(); + while ( !queue->empty() ) + { + // get the packet + packet = (CAmbePacket *)queue->front(); + queue->pop(); + // send it to client + EncodeDvFramePacket(&Buffer, packet->GetPid(), packet->GetAmbe()); + m_Socket.Send(Buffer, Ip, m_uiPort); + // and done + delete packet; + } + m_VocodecChannel->ReleasePacketQueueOut(); } - m_VocodecChannel->ReleasePacketQueueOut(); } //////////////////////////////////////////////////////////////////////////////////////// diff --git a/ambed/cstream.h b/ambed/cstream.h index 8bf44b6..44463bb 100644 --- a/ambed/cstream.h +++ b/ambed/cstream.h @@ -56,7 +56,6 @@ public: bool IsActive(void) const { return m_LastActivity.DurationSinceNow() <= STREAM_ACTIVITY_TIMEOUT; } // task - static void Thread(CStream *); void Task(void); protected: @@ -88,8 +87,8 @@ protected: CTimePoint m_LastActivity; // thread - bool m_bStopThread; - std::thread *m_pThread; + std::atomic keep_running; + std::future m_Future; }; diff --git a/ambed/cvocodecinterface.cpp b/ambed/cvocodecinterface.cpp index 6d1a60d..19ef699 100644 --- a/ambed/cvocodecinterface.cpp +++ b/ambed/cvocodecinterface.cpp @@ -34,8 +34,7 @@ CVocodecInterface::CVocodecInterface() { m_Channels.reserve(5); - m_bStopThread = false; - m_pThread = NULL; + keep_running = true; } //////////////////////////////////////////////////////////////////////////////////////// @@ -48,11 +47,10 @@ CVocodecInterface::~CVocodecInterface() m_Channels.clear(); // stop thread - m_bStopThread = true; - if ( m_pThread != NULL ) + keep_running = false; + if ( m_Future.valid() ) { - m_pThread->join(); - delete m_pThread; + m_Future.get(); } } @@ -62,10 +60,10 @@ CVocodecInterface::~CVocodecInterface() bool CVocodecInterface::Init(void) { // reset stop flag - m_bStopThread = false; + keep_running = true; // start thread; - m_pThread = new std::thread(CVocodecInterface::Thread, this); + m_Future = std::async(std::launch::async, &CVocodecInterface::Thread, this); // done return true; @@ -75,11 +73,11 @@ bool CVocodecInterface::Init(void) //////////////////////////////////////////////////////////////////////////////////////// // thread -void CVocodecInterface::Thread(CVocodecInterface *This) +void CVocodecInterface::Thread() { - while ( !This->m_bStopThread ) + while ( keep_running ) { - This->Task(); + Task(); } } @@ -91,5 +89,3 @@ void CVocodecInterface::AddChannel(CVocodecChannel *Channel) { m_Channels.push_back(Channel); } - - diff --git a/ambed/cvocodecinterface.h b/ambed/cvocodecinterface.h index d73a5c3..9889efa 100644 --- a/ambed/cvocodecinterface.h +++ b/ambed/cvocodecinterface.h @@ -55,8 +55,8 @@ public: virtual CVocodecChannel *GetChannelWithChannelOut(int) { return NULL; } // task - static void Thread(CVocodecInterface *); - virtual void Task(void) {}; + void Thread(void); + virtual void Task(void) {} // operators virtual bool operator ==(const CVocodecInterface &) const { return false; } @@ -66,8 +66,8 @@ protected: std::vector m_Channels; // thread - bool m_bStopThread; - std::thread *m_pThread; + std::atomic keep_running; + std::future m_Future; }; diff --git a/ambed/main.h b/ambed/main.h index ec0b541..b88da3f 100644 --- a/ambed/main.h +++ b/ambed/main.h @@ -31,7 +31,8 @@ #include #include #include -#include +#include +#include #include #include #include