m_bStopThread -> keep_running

pull/1/head
Tom Early 6 years ago
parent ef252ec37a
commit e5f734266e

@ -36,7 +36,7 @@
CCodecStream::CCodecStream(uint16 uiId, uint8 uiCodecIn, uint8 uiCodecOut) CCodecStream::CCodecStream(uint16 uiId, uint8 uiCodecIn, uint8 uiCodecOut)
{ {
m_bStopThread = false; keep_running = true;
m_pThread = NULL; m_pThread = NULL;
m_uiStreamId = uiId; m_uiStreamId = uiId;
m_uiPid = 0; m_uiPid = 0;
@ -59,9 +59,9 @@ CCodecStream::~CCodecStream()
{ {
// close socket // close socket
m_Socket.Close(); m_Socket.Close();
// kill threads // kill threads
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
@ -75,10 +75,10 @@ CCodecStream::~CCodecStream()
bool CCodecStream::Init(uint16 uiPort) bool CCodecStream::Init(uint16 uiPort)
{ {
bool ok; bool ok;
// reset stop flag // reset stop flag
m_bStopThread = false; keep_running = true;
// copy our test data // copy our test data
if ( m_uiCodecIn == CODEC_AMBE2PLUS ) if ( m_uiCodecIn == CODEC_AMBE2PLUS )
{ {
@ -108,11 +108,11 @@ bool CCodecStream::Init(uint16 uiPort)
m_AmbeDest.push_back(ambe); m_AmbeDest.push_back(ambe);
} }
} }
// create server's IP // create server's IP
m_Ip = g_Transcoder.GetAmbedIp(); m_Ip = g_Transcoder.GetAmbedIp();
m_uiPort = uiPort; m_uiPort = uiPort;
// create our socket // create our socket
ok = m_Socket.Open(uiPort); ok = m_Socket.Open(uiPort);
if ( ok ) if ( ok )
@ -129,7 +129,7 @@ bool CCodecStream::Init(uint16 uiPort)
std::cout << "Error opening socket on port UDP" << uiPort << " on ip " << m_Ip << std::endl; std::cout << "Error opening socket on port UDP" << uiPort << " on ip " << m_Ip << std::endl;
m_bConnected = false; m_bConnected = false;
} }
// done // done
return ok; return ok;
} }
@ -139,9 +139,9 @@ void CCodecStream::Close(void)
// close socket // close socket
m_bConnected = false; m_bConnected = false;
m_Socket.Close(); m_Socket.Close();
// kill threads // kill threads
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
@ -156,7 +156,7 @@ void CCodecStream::Close(void)
void CCodecStream::Thread(CCodecStream *This) void CCodecStream::Thread(CCodecStream *This)
{ {
while ( !This->m_bStopThread ) while (This->keep_running)
{ {
This->Task(); This->Task();
} }
@ -167,7 +167,7 @@ void CCodecStream::Task(void)
CBuffer Buffer; CBuffer Buffer;
CIp Ip; CIp Ip;
uint8 Ambe[AMBE_SIZE]; uint8 Ambe[AMBE_SIZE];
// connected ? // connected ?
if ( m_bConnected ) if ( m_bConnected )
{ {
@ -176,17 +176,17 @@ void CCodecStream::Task(void)
{ {
// yes // yes
m_FrameTimer.Now(); m_FrameTimer.Now();
// encode packet @ send it // encode packet @ send it
EncodeAmbePacket(&Buffer, m_AmbeSrc[m_iAmbeSrcPtr]->GetData()); EncodeAmbePacket(&Buffer, m_AmbeSrc[m_iAmbeSrcPtr]->GetData());
m_Socket.Send(Buffer, m_Ip, m_uiPort); m_Socket.Send(Buffer, m_Ip, m_uiPort);
// and increment pointer // and increment pointer
m_iAmbeSrcPtr = (m_iAmbeSrcPtr + 1) % m_AmbeSrc.size(); m_iAmbeSrcPtr = (m_iAmbeSrcPtr + 1) % m_AmbeSrc.size();
m_uiNbTotalPacketSent++; m_uiNbTotalPacketSent++;
m_uiNbPacketSent++; m_uiNbPacketSent++;
}*/ }*/
// any packt to send to trancoder ? // any packt to send to trancoder ?
uint32 uiNbPacketToSend = (uint32)(m_FrameTimer.DurationSinceNow() * 50.0) - m_uiNbTotalPacketSent; uint32 uiNbPacketToSend = (uint32)(m_FrameTimer.DurationSinceNow() * 50.0) - m_uiNbTotalPacketSent;
if ( uiNbPacketToSend > 0 ) if ( uiNbPacketToSend > 0 )
@ -196,14 +196,14 @@ void CCodecStream::Task(void)
// encode packet @ send it // encode packet @ send it
EncodeAmbePacket(&Buffer, m_AmbeSrc[m_iAmbeSrcPtr]->GetData()); EncodeAmbePacket(&Buffer, m_AmbeSrc[m_iAmbeSrcPtr]->GetData());
m_Socket.Send(Buffer, m_Ip, m_uiPort); m_Socket.Send(Buffer, m_Ip, m_uiPort);
// and increment pointer // and increment pointer
m_iAmbeSrcPtr = (m_iAmbeSrcPtr + 1) % m_AmbeSrc.size(); m_iAmbeSrcPtr = (m_iAmbeSrcPtr + 1) % m_AmbeSrc.size();
m_uiNbTotalPacketSent++; m_uiNbTotalPacketSent++;
m_uiNbPacketSent++; m_uiNbPacketSent++;
} }
} }
// any packet from transcoder // any packet from transcoder
if ( m_Socket.Receive(&Buffer, &Ip, 1) != -1 ) if ( m_Socket.Receive(&Buffer, &Ip, 1) != -1 )
{ {
@ -211,7 +211,7 @@ void CCodecStream::Task(void)
if ( IsValidAmbePacket(Buffer, Ambe) ) if ( IsValidAmbePacket(Buffer, Ambe) )
{ {
m_TimeoutTimer.Now(); m_TimeoutTimer.Now();
// check the PID // check the PID
// check the transcoded packet // check the transcoded packet
/*if ( ::memcmp(Ambe, m_AmbeDest[m_iAmbeDestPtr]->GetData(), AMBE_SIZE) != 0 ) /*if ( ::memcmp(Ambe, m_AmbeDest[m_iAmbeDestPtr]->GetData(), AMBE_SIZE) != 0 )
@ -219,15 +219,15 @@ void CCodecStream::Task(void)
m_uiNbPacketBad++; m_uiNbPacketBad++;
::memcpy((void *)m_AmbeDest[m_iAmbeDestPtr]->GetData(), Ambe, AMBE_SIZE); ::memcpy((void *)m_AmbeDest[m_iAmbeDestPtr]->GetData(), Ambe, AMBE_SIZE);
}*/ }*/
// and increment pointer // and increment pointer
m_iAmbeDestPtr = (m_iAmbeDestPtr + 1) % m_AmbeDest.size(); m_iAmbeDestPtr = (m_iAmbeDestPtr + 1) % m_AmbeDest.size();
m_uiNbPacketReceived++; m_uiNbPacketReceived++;
} }
} }
} }
// display stats // display stats
if ( m_DisplayStatsTimer.DurationSinceNow() >= 2.0 ) if ( m_DisplayStatsTimer.DurationSinceNow() >= 2.0 )
{ {
@ -249,7 +249,7 @@ void CCodecStream::Task(void)
bool CCodecStream::IsValidAmbePacket(const CBuffer &Buffer, uint8 *Ambe) bool CCodecStream::IsValidAmbePacket(const CBuffer &Buffer, uint8 *Ambe)
{ {
bool valid = false; bool valid = false;
if ( (Buffer.size() == 11) && (Buffer.data()[0] == m_uiCodecOut) ) if ( (Buffer.size() == 11) && (Buffer.data()[0] == m_uiCodecOut) )
{ {
::memcpy(Ambe, &(Buffer.data()[2]), 9); ::memcpy(Ambe, &(Buffer.data()[2]), 9);
@ -282,7 +282,7 @@ void CCodecStream::ResetStats(void)
m_uiNbPacketReceived = 0; m_uiNbPacketReceived = 0;
m_uiNbPacketBad = 0; m_uiNbPacketBad = 0;
m_uiNbPacketTimeout = 0; m_uiNbPacketTimeout = 0;
} }
void CCodecStream::DisplayStats(void) void CCodecStream::DisplayStats(void)
@ -292,10 +292,10 @@ void CCodecStream::DisplayStats(void)
uint32 uiReceived = m_uiNbPacketReceived; uint32 uiReceived = m_uiNbPacketReceived;
uint32 uiBad = m_uiNbPacketBad; uint32 uiBad = m_uiNbPacketBad;
double fps = (double)uiReceived / m_StatsTimer.DurationSinceNow(); double fps = (double)uiReceived / m_StatsTimer.DurationSinceNow();
// resets // resets
ResetStats(); ResetStats();
// displays // displays
char sz[256]; char sz[256];
sprintf(sz, "Stream %d (%d->%d) : %u / %u / %u : %.1f fps", sprintf(sz, "Stream %d (%d->%d) : %u / %u / %u : %.1f fps",

@ -47,14 +47,14 @@ class CCodecStream
public: public:
// constructor // constructor
CCodecStream(uint16, uint8, uint8); CCodecStream(uint16, uint8, uint8);
// destructor // destructor
virtual ~CCodecStream(); virtual ~CCodecStream();
// initialization // initialization
bool Init(uint16); bool Init(uint16);
void Close(void); void Close(void);
// get // get
bool IsConnected(void) const { return m_bConnected; } bool IsConnected(void) const { return m_bConnected; }
uint16 GetStreamId(void) const { return m_uiStreamId; } uint16 GetStreamId(void) const { return m_uiStreamId; }
@ -62,26 +62,26 @@ public:
// task // task
static void Thread(CCodecStream *); static void Thread(CCodecStream *);
void Task(void); void Task(void);
protected: protected:
// packet decoding helpers // packet decoding helpers
bool IsValidAmbePacket(const CBuffer &, uint8 *); bool IsValidAmbePacket(const CBuffer &, uint8 *);
// packet encoding helpers // packet encoding helpers
void EncodeAmbePacket(CBuffer *, const uint8 *); void EncodeAmbePacket(CBuffer *, const uint8 *);
// stats helpers // stats helpers
void ResetStats(void); void ResetStats(void);
void DisplayStats(void); void DisplayStats(void);
protected: protected:
// test data // test data
std::vector<CAmbe *> m_AmbeSrc; std::vector<CAmbe *> m_AmbeSrc;
int m_iAmbeSrcPtr; int m_iAmbeSrcPtr;
std::vector<CAmbe *> m_AmbeDest; std::vector<CAmbe *> m_AmbeDest;
int m_iAmbeDestPtr; int m_iAmbeDestPtr;
// data // data
uint16 m_uiStreamId; uint16 m_uiStreamId;
uint16 m_uiPort; uint16 m_uiPort;
@ -93,14 +93,14 @@ protected:
CIp m_Ip; CIp m_Ip;
CUdpSocket m_Socket; CUdpSocket m_Socket;
bool m_bConnected; bool m_bConnected;
// thread // thread
bool m_bStopThread; std::atomic<bool> keep_running;
std::thread *m_pThread; std::thread *m_pThread;
CTimePoint m_TimeoutTimer; CTimePoint m_TimeoutTimer;
CTimePoint m_FrameTimer; CTimePoint m_FrameTimer;
uint32 m_uiNbTotalPacketSent; uint32 m_uiNbTotalPacketSent;
// stats // stats
CTimePoint m_StatsTimer; CTimePoint m_StatsTimer;
CTimePoint m_DisplayStatsTimer; CTimePoint m_DisplayStatsTimer;

@ -45,7 +45,7 @@ CTranscoder g_Transcoder;
CTranscoder::CTranscoder() CTranscoder::CTranscoder()
{ {
m_bStopThread = false; keep_running = true;
m_pThread = NULL; m_pThread = NULL;
m_Streams.reserve(12); m_Streams.reserve(12);
m_bConnected = false; m_bConnected = false;
@ -69,17 +69,17 @@ CTranscoder::~CTranscoder()
delete m_Streams[i]; delete m_Streams[i];
} }
m_Streams.clear(); m_Streams.clear();
} }
m_Mutex.unlock(); m_Mutex.unlock();
// kill threads // kill threads
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
delete m_pThread; delete m_pThread;
} }
} }
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
@ -88,14 +88,14 @@ CTranscoder::~CTranscoder()
bool CTranscoder::Init(const CIp &ListenIp, const CIp &AmbedIp) bool CTranscoder::Init(const CIp &ListenIp, const CIp &AmbedIp)
{ {
bool ok; bool ok;
// reset stop flag // reset stop flag
m_bStopThread = false; keep_running = true;
// create server's IP // create server's IP
m_ListenIp = ListenIp; m_ListenIp = ListenIp;
m_AmbedIp = AmbedIp; m_AmbedIp = AmbedIp;
// create our socket // create our socket
ok = m_Socket.Open(TRANSCODER_PORT); ok = m_Socket.Open(TRANSCODER_PORT);
if ( ok ) if ( ok )
@ -116,7 +116,7 @@ void CTranscoder::Close(void)
{ {
// close socket // close socket
m_Socket.Close(); m_Socket.Close();
// close all streams // close all streams
m_Mutex.lock(); m_Mutex.lock();
{ {
@ -125,12 +125,12 @@ void CTranscoder::Close(void)
delete m_Streams[i]; delete m_Streams[i];
} }
m_Streams.clear(); m_Streams.clear();
} }
m_Mutex.unlock(); m_Mutex.unlock();
// kill threads // kill threads
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
@ -144,7 +144,7 @@ void CTranscoder::Close(void)
void CTranscoder::Thread(CTranscoder *This) void CTranscoder::Thread(CTranscoder *This)
{ {
while ( !This->m_bStopThread ) while (This->keep_running)
{ {
This->Task(); This->Task();
} }
@ -156,13 +156,13 @@ void CTranscoder::Task(void)
CIp Ip; CIp Ip;
uint16 StreamId; uint16 StreamId;
uint16 Port; uint16 Port;
// anything coming in from codec server ? // anything coming in from codec server ?
//if ( (m_Socket.Receive(&Buffer, &Ip, 20) != -1) && (Ip == m_Ip) ) //if ( (m_Socket.Receive(&Buffer, &Ip, 20) != -1) && (Ip == m_Ip) )
if ( m_Socket.Receive(&Buffer, &Ip, 20) != -1 ) if ( m_Socket.Receive(&Buffer, &Ip, 20) != -1 )
{ {
m_LastActivityTime.Now(); m_LastActivityTime.Now();
// crack packet // crack packet
if ( IsValidStreamDescrPacket(Buffer, &StreamId, &Port) ) if ( IsValidStreamDescrPacket(Buffer, &StreamId, &Port) )
{ {
@ -185,21 +185,21 @@ void CTranscoder::Task(void)
} }
m_bConnected = true; m_bConnected = true;
} }
} }
// handle end of streaming timeout // handle end of streaming timeout
//CheckStreamsTimeout(); //CheckStreamsTimeout();
// handle queue from reflector // handle queue from reflector
//HandleQueue(); //HandleQueue();
// keep client alive // keep client alive
if ( m_LastKeepaliveTime.DurationSinceNow() > TRANSCODER_KEEPALIVE_PERIOD ) if ( m_LastKeepaliveTime.DurationSinceNow() > TRANSCODER_KEEPALIVE_PERIOD )
{ {
// //
HandleKeepalives(); HandleKeepalives();
// update time // update time
m_LastKeepaliveTime.Now(); m_LastKeepaliveTime.Now();
} }
@ -211,9 +211,9 @@ void CTranscoder::Task(void)
CCodecStream *CTranscoder::GetStream(uint8 uiCodecIn) CCodecStream *CTranscoder::GetStream(uint8 uiCodecIn)
{ {
CBuffer Buffer; CBuffer Buffer;
CCodecStream *stream = NULL; CCodecStream *stream = NULL;
// do we need transcoding // do we need transcoding
if ( uiCodecIn != CODEC_NONE ) if ( uiCodecIn != CODEC_NONE )
{ {
@ -223,17 +223,17 @@ CCodecStream *CTranscoder::GetStream(uint8 uiCodecIn)
// yes, post openstream request // yes, post openstream request
EncodeOpenstreamPacket(&Buffer, uiCodecIn, (uiCodecIn == CODEC_AMBEPLUS) ? CODEC_AMBE2PLUS : CODEC_AMBEPLUS); EncodeOpenstreamPacket(&Buffer, uiCodecIn, (uiCodecIn == CODEC_AMBEPLUS) ? CODEC_AMBE2PLUS : CODEC_AMBEPLUS);
m_Socket.Send(Buffer, m_AmbedIp, TRANSCODER_PORT); m_Socket.Send(Buffer, m_AmbedIp, TRANSCODER_PORT);
// wait relpy here // wait relpy here
if ( m_SemaphoreOpenStream.WaitFor(AMBED_OPENSTREAM_TIMEOUT) ) if ( m_SemaphoreOpenStream.WaitFor(AMBED_OPENSTREAM_TIMEOUT) )
{ {
if ( m_bStreamOpened ) if ( m_bStreamOpened )
{ {
std::cout << "ambed openstream(" << m_StreamidOpenStream << ") ok" << std::endl; std::cout << "ambed openstream(" << m_StreamidOpenStream << ") ok" << std::endl;
// create stream object // create stream object
stream = new CCodecStream(m_StreamidOpenStream, uiCodecIn, (uiCodecIn == CODEC_AMBEPLUS) ? CODEC_AMBE2PLUS : CODEC_AMBEPLUS); stream = new CCodecStream(m_StreamidOpenStream, uiCodecIn, (uiCodecIn == CODEC_AMBEPLUS) ? CODEC_AMBE2PLUS : CODEC_AMBEPLUS);
// init it // init it
if ( stream->Init(m_PortOpenStream) ) if ( stream->Init(m_PortOpenStream) )
{ {
@ -261,7 +261,7 @@ CCodecStream *CTranscoder::GetStream(uint8 uiCodecIn)
{ {
std::cout << "ambed openstream timeout" << std::endl; std::cout << "ambed openstream timeout" << std::endl;
} }
} }
} }
return stream; return stream;
@ -270,7 +270,7 @@ CCodecStream *CTranscoder::GetStream(uint8 uiCodecIn)
void CTranscoder::ReleaseStream(CCodecStream *stream) void CTranscoder::ReleaseStream(CCodecStream *stream)
{ {
CBuffer Buffer; CBuffer Buffer;
if ( stream != NULL ) if ( stream != NULL )
{ {
// look for the stream // look for the stream
@ -285,7 +285,7 @@ void CTranscoder::ReleaseStream(CCodecStream *stream)
// send close packet // send close packet
EncodeClosestreamPacket(&Buffer, m_Streams[i]->GetStreamId()); EncodeClosestreamPacket(&Buffer, m_Streams[i]->GetStreamId());
m_Socket.Send(Buffer, m_AmbedIp, TRANSCODER_PORT); m_Socket.Send(Buffer, m_AmbedIp, TRANSCODER_PORT);
// and close it // and close it
m_Streams[i]->Close(); m_Streams[i]->Close();
delete m_Streams[i]; delete m_Streams[i];
@ -304,11 +304,11 @@ void CTranscoder::ReleaseStream(CCodecStream *stream)
void CTranscoder::HandleKeepalives(void) void CTranscoder::HandleKeepalives(void)
{ {
CBuffer keepalive; CBuffer keepalive;
// send keepalive // send keepalive
EncodeKeepAlivePacket(&keepalive); EncodeKeepAlivePacket(&keepalive);
m_Socket.Send(keepalive, m_AmbedIp, TRANSCODER_PORT); m_Socket.Send(keepalive, m_AmbedIp, TRANSCODER_PORT);
// check if still with us // check if still with us
if ( m_bConnected && (m_LastActivityTime.DurationSinceNow() >= TRANSCODER_KEEPALIVE_TIMEOUT) ) if ( m_bConnected && (m_LastActivityTime.DurationSinceNow() >= TRANSCODER_KEEPALIVE_TIMEOUT) )
{ {
@ -324,7 +324,7 @@ void CTranscoder::HandleKeepalives(void)
bool CTranscoder::IsValidKeepAlivePacket(const CBuffer &Buffer) bool CTranscoder::IsValidKeepAlivePacket(const CBuffer &Buffer)
{ {
uint8 tag[] = { 'A','M','B','E','D','P','O','N','G' }; uint8 tag[] = { 'A','M','B','E','D','P','O','N','G' };
bool valid = false; bool valid = false;
if ( (Buffer.size() == 9) && (Buffer.Compare(tag, sizeof(tag)) == 0) ) if ( (Buffer.size() == 9) && (Buffer.Compare(tag, sizeof(tag)) == 0) )
{ {
@ -336,7 +336,7 @@ bool CTranscoder::IsValidKeepAlivePacket(const CBuffer &Buffer)
bool CTranscoder::IsValidStreamDescrPacket(const CBuffer &Buffer, uint16 *Id, uint16 *Port) bool CTranscoder::IsValidStreamDescrPacket(const CBuffer &Buffer, uint16 *Id, uint16 *Port)
{ {
uint8 tag[] = { 'A','M','B','E','D','S','T','D' }; uint8 tag[] = { 'A','M','B','E','D','S','T','D' };
bool valid = false; bool valid = false;
if ( (Buffer.size() == 14) && (Buffer.Compare(tag, sizeof(tag)) == 0) ) if ( (Buffer.size() == 14) && (Buffer.Compare(tag, sizeof(tag)) == 0) )
{ {
@ -352,7 +352,7 @@ bool CTranscoder::IsValidStreamDescrPacket(const CBuffer &Buffer, uint16 *Id, ui
bool CTranscoder::IsValidNoStreamAvailablePacket(const CBuffer&Buffer) bool CTranscoder::IsValidNoStreamAvailablePacket(const CBuffer&Buffer)
{ {
uint8 tag[] = { 'A','M','B','E','D','B','U','S','Y' }; uint8 tag[] = { 'A','M','B','E','D','B','U','S','Y' };
return ( (Buffer.size() == 9) && (Buffer.Compare(tag, sizeof(tag)) == 0) ); return ( (Buffer.size() == 9) && (Buffer.Compare(tag, sizeof(tag)) == 0) );
} }
@ -363,7 +363,7 @@ bool CTranscoder::IsValidNoStreamAvailablePacket(const CBuffer&Buffer)
void CTranscoder::EncodeKeepAlivePacket(CBuffer *Buffer) void CTranscoder::EncodeKeepAlivePacket(CBuffer *Buffer)
{ {
uint8 tag[] = { 'A','M','B','E','D','P','I','N','G' }; uint8 tag[] = { 'A','M','B','E','D','P','I','N','G' };
Buffer->Set(tag, sizeof(tag)); Buffer->Set(tag, sizeof(tag));
Buffer->Append((uint8 *)(const char *)"XLX000 ", 8); Buffer->Append((uint8 *)(const char *)"XLX000 ", 8);
} }
@ -381,8 +381,7 @@ void CTranscoder::EncodeOpenstreamPacket(CBuffer *Buffer, uint8 uiCodecIn, uint8
void CTranscoder::EncodeClosestreamPacket(CBuffer *Buffer, uint16 uiStreamId) void CTranscoder::EncodeClosestreamPacket(CBuffer *Buffer, uint16 uiStreamId)
{ {
uint8 tag[] = { 'A','M','B','E','D','C','S' }; uint8 tag[] = { 'A','M','B','E','D','C','S' };
Buffer->Set(tag, sizeof(tag)); Buffer->Set(tag, sizeof(tag));
Buffer->Append((uint16)uiStreamId); Buffer->Append((uint16)uiStreamId);
} }

@ -41,7 +41,7 @@ class CTranscoder
public: public:
// constructor // constructor
CTranscoder(); CTranscoder();
// destructor // destructor
virtual ~CTranscoder(); virtual ~CTranscoder();
@ -52,16 +52,16 @@ public:
// locks // locks
void Lock(void) { m_Mutex.lock(); } void Lock(void) { m_Mutex.lock(); }
void Unlock(void) { m_Mutex.unlock(); } void Unlock(void) { m_Mutex.unlock(); }
// get // get
const CIp &GetListenIp(void) const { return m_ListenIp; } const CIp &GetListenIp(void) const { return m_ListenIp; }
const CIp &GetAmbedIp(void) const { return m_AmbedIp; } const CIp &GetAmbedIp(void) const { return m_AmbedIp; }
bool IsAmbedConnected(void) const { return m_bConnected; } bool IsAmbedConnected(void) const { return m_bConnected; }
// manage streams // manage streams
CCodecStream *GetStream(uint8); CCodecStream *GetStream(uint8);
void ReleaseStream(CCodecStream *); void ReleaseStream(CCodecStream *);
// task // task
static void Thread(CTranscoder *); static void Thread(CTranscoder *);
void Task(void); void Task(void);
@ -79,12 +79,12 @@ protected:
void EncodeKeepAlivePacket(CBuffer *); void EncodeKeepAlivePacket(CBuffer *);
void EncodeOpenstreamPacket(CBuffer *, uint8, uint8); void EncodeOpenstreamPacket(CBuffer *, uint8, uint8);
void EncodeClosestreamPacket(CBuffer *, uint16); void EncodeClosestreamPacket(CBuffer *, uint16);
protected: protected:
// IP's // IP's
CIp m_ListenIp; CIp m_ListenIp;
CIp m_AmbedIp; CIp m_AmbedIp;
// streams // streams
std::mutex m_Mutex; std::mutex m_Mutex;
std::vector<CCodecStream *> m_Streams; std::vector<CCodecStream *> m_Streams;
@ -94,9 +94,9 @@ protected:
bool m_bStreamOpened; bool m_bStreamOpened;
uint16 m_StreamidOpenStream; uint16 m_StreamidOpenStream;
uint16 m_PortOpenStream; uint16 m_PortOpenStream;
// thread // thread
bool m_bStopThread; std::atomic<bool> keep_running;
std::thread *m_pThread; std::thread *m_pThread;
// socket // socket

@ -38,7 +38,7 @@
CCodecStream::CCodecStream(CPacketStream *PacketStream, uint16 uiId, uint8 uiCodecIn, uint8 uiCodecOut) CCodecStream::CCodecStream(CPacketStream *PacketStream, uint16 uiId, uint8 uiCodecIn, uint8 uiCodecOut)
{ {
m_bStopThread = false; keep_running = true;
m_pThread = NULL; m_pThread = NULL;
m_uiStreamId = uiId; m_uiStreamId = uiId;
m_uiPid = 0; m_uiPid = 0;
@ -63,7 +63,7 @@ CCodecStream::~CCodecStream()
m_Socket.Close(); m_Socket.Close();
// kill threads // kill threads
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
@ -90,7 +90,7 @@ CCodecStream::~CCodecStream()
bool CCodecStream::Init(uint16 uiPort) bool CCodecStream::Init(uint16 uiPort)
{ {
// reset stop flag // reset stop flag
m_bConnected = m_bStopThread = false; m_bConnected = keep_running = true;
// create server's IP // create server's IP
m_uiPort = uiPort; m_uiPort = uiPort;
@ -124,7 +124,7 @@ void CCodecStream::Close(void)
m_Socket.Close(); m_Socket.Close();
// kill threads // kill threads
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
@ -146,7 +146,7 @@ bool CCodecStream::IsEmpty(void) const
void CCodecStream::Thread(CCodecStream *This) void CCodecStream::Thread(CCodecStream *This)
{ {
while ( !This->m_bStopThread ) while (This->keep_running)
{ {
This->Task(); This->Task();
} }

@ -48,14 +48,14 @@ class CCodecStream : public CPacketQueue
public: public:
// constructor // constructor
CCodecStream(CPacketStream *, uint16, uint8, uint8); CCodecStream(CPacketStream *, uint16, uint8, uint8);
// destructor // destructor
virtual ~CCodecStream(); virtual ~CCodecStream();
// initialization // initialization
bool Init(uint16); bool Init(uint16);
void Close(void); void Close(void);
// get // get
bool IsConnected(void) const { return m_bConnected; } bool IsConnected(void) const { return m_bConnected; }
uint16 GetStreamId(void) const { return m_uiStreamId; } uint16 GetStreamId(void) const { return m_uiStreamId; }
@ -69,16 +69,16 @@ public:
// task // task
static void Thread(CCodecStream *); static void Thread(CCodecStream *);
void Task(void); void Task(void);
protected: protected:
// packet decoding helpers // packet decoding helpers
bool IsValidAmbePacket(const CBuffer &, uint8 *); bool IsValidAmbePacket(const CBuffer &, uint8 *);
// packet encoding helpers // packet encoding helpers
void EncodeAmbePacket(CBuffer *, const uint8 *); void EncodeAmbePacket(CBuffer *, const uint8 *);
protected: protected:
// data // data
uint16 m_uiStreamId; uint16 m_uiStreamId;
@ -91,17 +91,17 @@ protected:
CIp m_Ip; CIp m_Ip;
CUdpSocket m_Socket; CUdpSocket m_Socket;
bool m_bConnected; bool m_bConnected;
// associated packet stream // associated packet stream
CPacketStream *m_PacketStream; CPacketStream *m_PacketStream;
CPacketQueue m_LocalQueue; CPacketQueue m_LocalQueue;
// thread // thread
bool m_bStopThread; std::atomic<bool> keep_running;
std::thread *m_pThread; std::thread *m_pThread;
CTimePoint m_TimeoutTimer; CTimePoint m_TimeoutTimer;
CTimePoint m_StatsTimer; CTimePoint m_StatsTimer;
// statistics // statistics
double m_fPingMin; double m_fPingMin;
double m_fPingMax; double m_fPingMax;

@ -34,14 +34,14 @@
CDmridDir::CDmridDir() CDmridDir::CDmridDir()
{ {
m_bStopThread = false; keep_running = true;
m_pThread = NULL; m_pThread = NULL;
} }
CDmridDir::~CDmridDir() CDmridDir::~CDmridDir()
{ {
// kill threads // kill threads
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
@ -57,10 +57,10 @@ bool CDmridDir::Init(void)
{ {
// load content // load content
Reload(); Reload();
// reset stop flag // reset stop flag
m_bStopThread = false; keep_running = true;
// start thread; // start thread;
m_pThread = new std::thread(CDmridDir::Thread, this); m_pThread = new std::thread(CDmridDir::Thread, this);
@ -69,7 +69,7 @@ bool CDmridDir::Init(void)
void CDmridDir::Close(void) void CDmridDir::Close(void)
{ {
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
@ -83,7 +83,7 @@ void CDmridDir::Close(void)
void CDmridDir::Thread(CDmridDir *This) void CDmridDir::Thread(CDmridDir *This)
{ {
while ( !This->m_bStopThread ) while (This->keep_running)
{ {
// Wait 30 seconds // Wait 30 seconds
CTimePoint::TaskSleepFor(DMRIDDB_REFRESH_RATE * 60000); CTimePoint::TaskSleepFor(DMRIDDB_REFRESH_RATE * 60000);
@ -103,7 +103,7 @@ bool CDmridDir::Reload(void)
{ {
CBuffer buffer; CBuffer buffer;
bool ok = false; bool ok = false;
if ( LoadContent(&buffer) ) if ( LoadContent(&buffer) )
{ {
Lock(); Lock();
@ -157,4 +157,3 @@ bool CDmridDir::IsValidDmrid(const char *sz)
} }
return ok; return ok;
} }

@ -48,26 +48,26 @@ class CDmridDir
public: public:
// constructor // constructor
CDmridDir(); CDmridDir();
// destructor // destructor
~CDmridDir(); ~CDmridDir();
// init & close // init & close
virtual bool Init(void); virtual bool Init(void);
virtual void Close(void); virtual void Close(void);
// locks // locks
void Lock(void) { m_Mutex.lock(); } void Lock(void) { m_Mutex.lock(); }
void Unlock(void) { m_Mutex.unlock(); } void Unlock(void) { m_Mutex.unlock(); }
// refresh // refresh
virtual bool LoadContent(CBuffer *) { return false; } virtual bool LoadContent(CBuffer *) { return false; }
virtual bool RefreshContent(const CBuffer &) { return false; } virtual bool RefreshContent(const CBuffer &) { return false; }
// find // find
const CCallsign *FindCallsign(uint32); const CCallsign *FindCallsign(uint32);
uint32 FindDmrid(const CCallsign &); uint32 FindDmrid(const CCallsign &);
protected: protected:
// thread // thread
static void Thread(CDmridDir *); static void Thread(CDmridDir *);
@ -76,17 +76,17 @@ protected:
bool Reload(void); bool Reload(void);
virtual bool NeedReload(void) { return false; } virtual bool NeedReload(void) { return false; }
bool IsValidDmrid(const char *); bool IsValidDmrid(const char *);
protected: protected:
// data // data
std::map <uint32, CCallsign> m_CallsignMap; std::map <uint32, CCallsign> m_CallsignMap;
std::map <CCallsign, uint32, CDmridDirCallsignCompare> m_DmridMap; std::map <CCallsign, uint32, CDmridDirCallsignCompare> m_DmridMap;
// Lock() // Lock()
std::mutex m_Mutex; std::mutex m_Mutex;
// thread // thread
bool m_bStopThread; std::atomic<bool> keep_running;
std::thread *m_pThread; std::thread *m_pThread;
}; };

@ -46,7 +46,7 @@ bool CG3Protocol::Init(void)
m_ReflectorCallsign = g_Reflector.GetCallsign(); m_ReflectorCallsign = g_Reflector.GetCallsign();
// reset stop flag // reset stop flag
m_bStopThread = false; keep_running = true;
// update the reflector callsign // update the reflector callsign
m_ReflectorCallsign.PatchCallsign(0, (const uint8 *)"XLX", 3); m_ReflectorCallsign.PatchCallsign(0, (const uint8 *)"XLX", 3);
@ -124,7 +124,7 @@ void CG3Protocol::Close(void)
void CG3Protocol::PresenceThread(CG3Protocol *This) void CG3Protocol::PresenceThread(CG3Protocol *This)
{ {
while ( !This->m_bStopThread ) while (This->keep_running)
{ {
This->PresenceTask(); This->PresenceTask();
} }
@ -132,7 +132,7 @@ void CG3Protocol::PresenceThread(CG3Protocol *This)
void CG3Protocol::ConfigThread(CG3Protocol *This) void CG3Protocol::ConfigThread(CG3Protocol *This)
{ {
while ( !This->m_bStopThread ) while (This->keep_running)
{ {
This->ConfigTask(); This->ConfigTask();
} }
@ -140,7 +140,7 @@ void CG3Protocol::ConfigThread(CG3Protocol *This)
void CG3Protocol::IcmpThread(CG3Protocol *This) void CG3Protocol::IcmpThread(CG3Protocol *This)
{ {
while ( !This->m_bStopThread ) while (This->keep_running)
{ {
This->IcmpTask(); This->IcmpTask();
} }

@ -19,7 +19,7 @@
// GNU General Public License for more details. // GNU General Public License for more details.
// //
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Foobar. If not, see <http://www.gnu.org/licenses/>. // along with Foobar. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
#include "main.h" #include "main.h"
@ -36,7 +36,7 @@ CGateKeeper g_GateKeeper;
CGateKeeper::CGateKeeper() CGateKeeper::CGateKeeper()
{ {
m_bStopThread = false; keep_running = true;
m_pThread = NULL; m_pThread = NULL;
} }
@ -46,7 +46,7 @@ CGateKeeper::CGateKeeper()
CGateKeeper::~CGateKeeper() CGateKeeper::~CGateKeeper()
{ {
// kill threads // kill threads
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
@ -60,15 +60,15 @@ CGateKeeper::~CGateKeeper()
bool CGateKeeper::Init(void) bool CGateKeeper::Init(void)
{ {
// load lists from files // load lists from files
m_NodeWhiteList.LoadFromFile(WHITELIST_PATH); m_NodeWhiteList.LoadFromFile(WHITELIST_PATH);
m_NodeBlackList.LoadFromFile(BLACKLIST_PATH); m_NodeBlackList.LoadFromFile(BLACKLIST_PATH);
m_PeerList.LoadFromFile(INTERLINKLIST_PATH); m_PeerList.LoadFromFile(INTERLINKLIST_PATH);
// reset stop flag // reset stop flag
m_bStopThread = false; keep_running = true;
// start thread; // start thread;
m_pThread = new std::thread(CGateKeeper::Thread, this); m_pThread = new std::thread(CGateKeeper::Thread, this);
@ -77,7 +77,7 @@ bool CGateKeeper::Init(void)
void CGateKeeper::Close(void) void CGateKeeper::Close(void)
{ {
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
@ -92,7 +92,7 @@ void CGateKeeper::Close(void)
bool CGateKeeper::MayLink(const CCallsign &callsign, const CIp &ip, int protocol, char *modules) const bool CGateKeeper::MayLink(const CCallsign &callsign, const CIp &ip, int protocol, char *modules) const
{ {
bool ok = true; bool ok = true;
switch (protocol) switch (protocol)
{ {
// repeaters // repeaters
@ -107,33 +107,33 @@ bool CGateKeeper::MayLink(const CCallsign &callsign, const CIp &ip, int protocol
ok &= IsNodeListedOk(callsign, ip); ok &= IsNodeListedOk(callsign, ip);
// todo: then apply any protocol specific authorisation for the operation // todo: then apply any protocol specific authorisation for the operation
break; break;
// XLX interlinks // XLX interlinks
case PROTOCOL_XLX: case PROTOCOL_XLX:
ok &= IsPeerListedOk(callsign, ip, modules); ok &= IsPeerListedOk(callsign, ip, modules);
break; break;
// unsupported // unsupported
case PROTOCOL_NONE: case PROTOCOL_NONE:
default: default:
ok = false; ok = false;
break; break;
} }
// report // report
if ( !ok ) if ( !ok )
{ {
std::cout << "Gatekeeper blocking linking of " << callsign << " @ " << ip << " using protocol " << protocol << std::endl; std::cout << "Gatekeeper blocking linking of " << callsign << " @ " << ip << " using protocol " << protocol << std::endl;
} }
// done // done
return ok; return ok;
} }
bool CGateKeeper::MayTransmit(const CCallsign &callsign, const CIp &ip, int protocol, char module) const bool CGateKeeper::MayTransmit(const CCallsign &callsign, const CIp &ip, int protocol, char module) const
{ {
bool ok = true; bool ok = true;
switch (protocol) switch (protocol)
{ {
// repeaters, protocol specific // repeaters, protocol specific
@ -149,25 +149,25 @@ bool CGateKeeper::MayTransmit(const CCallsign &callsign, const CIp &ip, int prot
ok &= IsNodeListedOk(callsign, ip, module); ok &= IsNodeListedOk(callsign, ip, module);
// todo: then apply any protocol specific authorisation for the operation // todo: then apply any protocol specific authorisation for the operation
break; break;
// XLX interlinks // XLX interlinks
case PROTOCOL_XLX: case PROTOCOL_XLX:
ok &= IsPeerListedOk(callsign, ip, module); ok &= IsPeerListedOk(callsign, ip, module);
break; break;
// unsupported // unsupported
case PROTOCOL_NONE: case PROTOCOL_NONE:
default: default:
ok = false; ok = false;
break; break;
} }
// report // report
if ( !ok ) if ( !ok )
{ {
std::cout << "Gatekeeper blocking transmitting of " << callsign << " @ " << ip << " using protocol " << protocol << std::endl; std::cout << "Gatekeeper blocking transmitting of " << callsign << " @ " << ip << " using protocol " << protocol << std::endl;
} }
// done // done
return ok; return ok;
} }
@ -177,7 +177,7 @@ bool CGateKeeper::MayTransmit(const CCallsign &callsign, const CIp &ip, int prot
void CGateKeeper::Thread(CGateKeeper *This) void CGateKeeper::Thread(CGateKeeper *This)
{ {
while ( !This->m_bStopThread ) while (This->keep_running)
{ {
// Wait 30 seconds // Wait 30 seconds
CTimePoint::TaskSleepFor(30000); CTimePoint::TaskSleepFor(30000);
@ -204,9 +204,9 @@ void CGateKeeper::Thread(CGateKeeper *This)
bool CGateKeeper::IsNodeListedOk(const CCallsign &callsign, const CIp &ip, char module) const bool CGateKeeper::IsNodeListedOk(const CCallsign &callsign, const CIp &ip, char module) const
{ {
bool ok = true; bool ok = true;
// first check IP // first check IP
// next, check callsign // next, check callsign
if ( ok ) if ( ok )
{ {
@ -218,24 +218,24 @@ bool CGateKeeper::IsNodeListedOk(const CCallsign &callsign, const CIp &ip, char
ok = m_NodeWhiteList.IsCallsignListedWithWildcard(callsign, module); ok = m_NodeWhiteList.IsCallsignListedWithWildcard(callsign, module);
} }
const_cast<CCallsignList &>(m_NodeWhiteList).Unlock(); const_cast<CCallsignList &>(m_NodeWhiteList).Unlock();
// then check if not blacklisted // then check if not blacklisted
const_cast<CCallsignList &>(m_NodeBlackList).Lock(); const_cast<CCallsignList &>(m_NodeBlackList).Lock();
ok &= !m_NodeBlackList.IsCallsignListedWithWildcard(callsign); ok &= !m_NodeBlackList.IsCallsignListedWithWildcard(callsign);
const_cast<CCallsignList &>(m_NodeBlackList).Unlock(); const_cast<CCallsignList &>(m_NodeBlackList).Unlock();
} }
// done // done
return ok; return ok;
} }
bool CGateKeeper::IsPeerListedOk(const CCallsign &callsign, const CIp &ip, char module) const bool CGateKeeper::IsPeerListedOk(const CCallsign &callsign, const CIp &ip, char module) const
{ {
bool ok = true; bool ok = true;
// first check IP // first check IP
// next, check callsign // next, check callsign
if ( ok ) if ( ok )
{ {
@ -247,7 +247,7 @@ bool CGateKeeper::IsPeerListedOk(const CCallsign &callsign, const CIp &ip, char
} }
const_cast<CPeerCallsignList &>(m_PeerList).Unlock(); const_cast<CPeerCallsignList &>(m_PeerList).Unlock();
} }
// done // done
return ok; return ok;
} }
@ -255,9 +255,9 @@ bool CGateKeeper::IsPeerListedOk(const CCallsign &callsign, const CIp &ip, char
bool CGateKeeper::IsPeerListedOk(const CCallsign &callsign, const CIp &ip, char *modules) const bool CGateKeeper::IsPeerListedOk(const CCallsign &callsign, const CIp &ip, char *modules) const
{ {
bool ok = true; bool ok = true;
// first check IP // first check IP
// next, check callsign // next, check callsign
if ( ok ) if ( ok )
{ {
@ -269,8 +269,7 @@ bool CGateKeeper::IsPeerListedOk(const CCallsign &callsign, const CIp &ip, char
} }
const_cast<CPeerCallsignList &>(m_PeerList).Unlock(); const_cast<CPeerCallsignList &>(m_PeerList).Unlock();
} }
// done // done
return ok; return ok;
} }

@ -19,7 +19,7 @@
// GNU General Public License for more details. // GNU General Public License for more details.
// //
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Foobar. If not, see <http://www.gnu.org/licenses/>. // along with Foobar. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
#ifndef cgatekeeper_h #ifndef cgatekeeper_h
@ -39,22 +39,22 @@ class CGateKeeper
public: public:
// constructor // constructor
CGateKeeper(); CGateKeeper();
// destructor // destructor
virtual ~CGateKeeper(); virtual ~CGateKeeper();
// init & clode // init & clode
bool Init(void); bool Init(void);
void Close(void); void Close(void);
// authorizations // authorizations
bool MayLink(const CCallsign &, const CIp &, int, char * = NULL) const; bool MayLink(const CCallsign &, const CIp &, int, char * = NULL) const;
bool MayTransmit(const CCallsign &, const CIp &, int = PROTOCOL_ANY, char = ' ') const; bool MayTransmit(const CCallsign &, const CIp &, int = PROTOCOL_ANY, char = ' ') const;
// peer list handeling // peer list handeling
CPeerCallsignList *GetPeerList(void) { m_PeerList.Lock(); return &m_PeerList; } CPeerCallsignList *GetPeerList(void) { m_PeerList.Lock(); return &m_PeerList; }
void ReleasePeerList(void) { m_PeerList.Unlock(); } void ReleasePeerList(void) { m_PeerList.Unlock(); }
protected: protected:
// thread // thread
static void Thread(CGateKeeper *); static void Thread(CGateKeeper *);
@ -63,15 +63,15 @@ protected:
bool IsNodeListedOk(const CCallsign &, const CIp &, char = ' ') const; bool IsNodeListedOk(const CCallsign &, const CIp &, char = ' ') const;
bool IsPeerListedOk(const CCallsign &, const CIp &, char) const; bool IsPeerListedOk(const CCallsign &, const CIp &, char) const;
bool IsPeerListedOk(const CCallsign &, const CIp &, char *) const; bool IsPeerListedOk(const CCallsign &, const CIp &, char *) const;
protected: protected:
// data // data
CCallsignList m_NodeWhiteList; CCallsignList m_NodeWhiteList;
CCallsignList m_NodeBlackList; CCallsignList m_NodeBlackList;
CPeerCallsignList m_PeerList; CPeerCallsignList m_PeerList;
// thread // thread
bool m_bStopThread; std::atomic<bool> keep_running;
std::thread *m_pThread; std::thread *m_pThread;
}; };

@ -33,7 +33,7 @@
// constructor // constructor
CProtocol::CProtocol() : m_bStopThread(false), m_pThread(NULL) {} CProtocol::CProtocol() : keep_running(true), m_pThread(NULL) {}
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
@ -42,7 +42,7 @@ CProtocol::CProtocol() : m_bStopThread(false), m_pThread(NULL) {}
CProtocol::~CProtocol() CProtocol::~CProtocol()
{ {
// kill threads // kill threads
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
@ -71,7 +71,7 @@ bool CProtocol::Initialize(const char *type, uint16 port)
m_ReflectorCallsign = g_Reflector.GetCallsign(); m_ReflectorCallsign = g_Reflector.GetCallsign();
// reset stop flag // reset stop flag
m_bStopThread = false; keep_running = true;
// update the reflector callsign // update the reflector callsign
if (type) if (type)
@ -110,7 +110,7 @@ bool CProtocol::Initialize(const char *type, uint16 port)
void CProtocol::Thread(CProtocol *This) void CProtocol::Thread(CProtocol *This)
{ {
while (! This->m_bStopThread) while (This->keep_running)
{ {
This->Task(); This->Task();
} }
@ -118,7 +118,7 @@ void CProtocol::Thread(CProtocol *This)
void CProtocol::Close(void) void CProtocol::Close(void)
{ {
m_bStopThread = true; keep_running = true;
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();

@ -137,7 +137,7 @@ protected:
CPacketQueue m_Queue; CPacketQueue m_Queue;
// thread // thread
bool m_bStopThread; std::atomic<bool> keep_running;
std::thread *m_pThread; std::thread *m_pThread;
// identity // identity

@ -38,7 +38,7 @@
CReflector::CReflector() CReflector::CReflector()
{ {
m_bStopThreads = false; keep_running = true;
m_XmlReportThread = NULL; m_XmlReportThread = NULL;
m_JsonReportThread = NULL; m_JsonReportThread = NULL;
for ( int i = 0; i < NB_OF_MODULES; i++ ) for ( int i = 0; i < NB_OF_MODULES; i++ )
@ -55,7 +55,7 @@ CReflector::CReflector(const CCallsign &callsign)
#ifdef DEBUG_DUMPFILE #ifdef DEBUG_DUMPFILE
m_DebugFile.close(); m_DebugFile.close();
#endif #endif
m_bStopThreads = false; keep_running = true;
m_XmlReportThread = NULL; m_XmlReportThread = NULL;
m_JsonReportThread = NULL; m_JsonReportThread = NULL;
for ( int i = 0; i < NB_OF_MODULES; i++ ) for ( int i = 0; i < NB_OF_MODULES; i++ )
@ -70,7 +70,7 @@ CReflector::CReflector(const CCallsign &callsign)
CReflector::~CReflector() CReflector::~CReflector()
{ {
m_bStopThreads = true; keep_running = false;
if ( m_XmlReportThread != NULL ) if ( m_XmlReportThread != NULL )
{ {
m_XmlReportThread->join(); m_XmlReportThread->join();
@ -100,7 +100,7 @@ bool CReflector::Start(void)
bool ok = true; bool ok = true;
// reset stop flag // reset stop flag
m_bStopThreads = false; keep_running = true;
// init gate keeper // init gate keeper
ok &= g_GateKeeper.Init(); ok &= g_GateKeeper.Init();
@ -144,7 +144,7 @@ bool CReflector::Start(void)
void CReflector::Stop(void) void CReflector::Stop(void)
{ {
// stop & delete all threads // stop & delete all threads
m_bStopThreads = true; keep_running = false;
// stop & delete report threads // stop & delete report threads
if ( m_XmlReportThread != NULL ) if ( m_XmlReportThread != NULL )
@ -325,7 +325,7 @@ void CReflector::RouterThread(CReflector *This, CPacketStream *streamIn)
// get on input queue // get on input queue
CPacket *packet; CPacket *packet;
while ( !This->m_bStopThreads ) while (This->keep_running)
{ {
// any packet in our input queue ? // any packet in our input queue ?
streamIn->Lock(); streamIn->Lock();
@ -384,7 +384,7 @@ void CReflector::RouterThread(CReflector *This, CPacketStream *streamIn)
void CReflector::XmlReportThread(CReflector *This) void CReflector::XmlReportThread(CReflector *This)
{ {
while ( !This->m_bStopThreads ) while (This->keep_running)
{ {
// report to xml file // report to xml file
std::ofstream xmlFile; std::ofstream xmlFile;
@ -423,7 +423,7 @@ void CReflector::JsonReportThread(CReflector *This)
if ( Socket.Open(JSON_PORT) ) if ( Socket.Open(JSON_PORT) )
{ {
// and loop // and loop
while ( !This->m_bStopThreads ) while (This->keep_running)
{ {
// any command ? // any command ?
if ( Socket.Receive(Buffer, Ip, 50) ) if ( Socket.Receive(Buffer, Ip, 50) )

@ -136,7 +136,7 @@ protected:
std::array<CPacketStream, NB_OF_MODULES> m_Streams; std::array<CPacketStream, NB_OF_MODULES> m_Streams;
// threads // threads
bool m_bStopThreads; std::atomic<bool> keep_running;
std::array<std::thread *, NB_OF_MODULES> m_RouterThreads; std::array<std::thread *, NB_OF_MODULES> m_RouterThreads;
std::thread *m_XmlReportThread; std::thread *m_XmlReportThread;
std::thread *m_JsonReportThread; std::thread *m_JsonReportThread;

@ -46,7 +46,7 @@ CTranscoder g_Transcoder;
CTranscoder::CTranscoder() CTranscoder::CTranscoder()
{ {
m_bStopThread = false; keep_running = true;
m_pThread = NULL; m_pThread = NULL;
m_bConnected = false; m_bConnected = false;
m_LastKeepaliveTime.Now(); m_LastKeepaliveTime.Now();
@ -72,7 +72,7 @@ bool CTranscoder::Init(void)
bool ok; bool ok;
// reset stop flag // reset stop flag
m_bStopThread = false; keep_running = true;
// create server's IP // create server's IP
auto s = g_Reflector.GetTranscoderIp(); auto s = g_Reflector.GetTranscoderIp();
@ -116,7 +116,7 @@ void CTranscoder::Close(void)
m_Mutex.unlock(); m_Mutex.unlock();
// kill threads // kill threads
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
@ -130,7 +130,7 @@ void CTranscoder::Close(void)
void CTranscoder::Thread(CTranscoder *This) void CTranscoder::Thread(CTranscoder *This)
{ {
while ( !This->m_bStopThread ) while (This->keep_running)
{ {
This->Task(); This->Task();
} }

@ -93,7 +93,7 @@ protected:
uint16 m_PortOpenStream; uint16 m_PortOpenStream;
// thread // thread
bool m_bStopThread; std::atomic<bool> keep_running;
std::thread *m_pThread; std::thread *m_pThread;
// socket // socket

@ -40,7 +40,7 @@
CWiresxCmdHandler::CWiresxCmdHandler() CWiresxCmdHandler::CWiresxCmdHandler()
{ {
m_seqNo = 0; m_seqNo = 0;
m_bStopThread = false; keep_running = true;
m_pThread = NULL; m_pThread = NULL;
} }
@ -51,13 +51,13 @@ CWiresxCmdHandler::CWiresxCmdHandler()
CWiresxCmdHandler::~CWiresxCmdHandler() CWiresxCmdHandler::~CWiresxCmdHandler()
{ {
// kill threads // kill threads
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
delete m_pThread; delete m_pThread;
} }
// empty queue // empty queue
m_CmdQueue.Lock(); m_CmdQueue.Lock();
while ( !m_CmdQueue.empty() ) while ( !m_CmdQueue.empty() )
@ -78,18 +78,18 @@ bool CWiresxCmdHandler::Init(void)
m_ReflectorWiresxInfo.SetName("Reflector"); m_ReflectorWiresxInfo.SetName("Reflector");
// reset stop flag // reset stop flag
m_bStopThread = false; keep_running = true;
// start thread; // start thread;
m_pThread = new std::thread(CWiresxCmdHandler::Thread, this); m_pThread = new std::thread(CWiresxCmdHandler::Thread, this);
// done // done
return true; return true;
} }
void CWiresxCmdHandler::Close(void) void CWiresxCmdHandler::Close(void)
{ {
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
@ -103,7 +103,7 @@ void CWiresxCmdHandler::Close(void)
void CWiresxCmdHandler::Thread(CWiresxCmdHandler *This) void CWiresxCmdHandler::Thread(CWiresxCmdHandler *This)
{ {
while ( !This->m_bStopThread ) while (This->keep_running)
{ {
This->Task(); This->Task();
} }
@ -120,7 +120,7 @@ void CWiresxCmdHandler::Task(void)
uint32 uiNodeRxFreq; uint32 uiNodeRxFreq;
char cModule; char cModule;
bool bCmd; bool bCmd;
// anything to do ? // anything to do ?
bCmd = false; bCmd = false;
m_CmdQueue.Lock(); m_CmdQueue.Lock();
@ -142,8 +142,8 @@ void CWiresxCmdHandler::Task(void)
} }
} }
m_CmdQueue.Unlock(); m_CmdQueue.Unlock();
// handle it // handle it
if ( bCmd ) if ( bCmd )
{ {
@ -161,7 +161,7 @@ void CWiresxCmdHandler::Task(void)
cModule = client->GetReflectorModule(); cModule = client->GetReflectorModule();
} }
g_Reflector.ReleaseClients(); g_Reflector.ReleaseClients();
// and crack the cmd // and crack the cmd
switch ( Cmd.GetCmd() ) switch ( Cmd.GetCmd() )
{ {
@ -236,12 +236,12 @@ bool CWiresxCmdHandler::ReplyToWiresxDxReqPacket(const CIp &Ip, const CWiresxInf
uint8 data[150U]; uint8 data[150U];
uint8 RoomId; uint8 RoomId;
bool IsLinked; bool IsLinked;
// linked module // linked module
// module A == 0 // module A == 0
IsLinked = (Module != ' '); IsLinked = (Module != ' ');
RoomId = (uint8)(Module - 'A'); RoomId = (uint8)(Module - 'A');
// fill data buffer // fill data buffer
::memset(data, 0x00U, 150U); ::memset(data, 0x00U, 150U);
::memset(data, ' ', 128U); ::memset(data, ' ', 128U);
@ -307,7 +307,7 @@ bool CWiresxCmdHandler::ReplyToWiresxDxReqPacket(const CIp &Ip, const CWiresxInf
::sprintf(freq, "%05u.%03u000%c%03u.%06u", ::sprintf(freq, "%05u.%03u000%c%03u.%06u",
WiresxInfo.GetTxFrequency() / 1000000U, WiresxInfo.GetTxFrequency() / 1000000U,
freqkHz, sign, offset / 1000000U, offset % 1000000U); freqkHz, sign, offset / 1000000U, offset % 1000000U);
::memcpy(data + 84U, freq, 23U); ::memcpy(data + 84U, freq, 23U);
} }
@ -330,7 +330,7 @@ bool CWiresxCmdHandler::ReplyToWiresxAllReqPacket(const CIp &Ip, const CWiresxIn
bool ok = false; bool ok = false;
uint8 ALL_RESP[] = {0x5DU, 0x46U, 0x5FU, 0x29U}; uint8 ALL_RESP[] = {0x5DU, 0x46U, 0x5FU, 0x29U};
uint8 data[1100U]; uint8 data[1100U];
// fill data buffer // fill data buffer
::memset(data, 0x00U, 1100U); ::memset(data, 0x00U, 1100U);
// seq no // seq no
@ -358,7 +358,7 @@ bool CWiresxCmdHandler::ReplyToWiresxAllReqPacket(const CIp &Ip, const CWiresxIn
char item[16U]; char item[16U];
// module A == 0 // module A == 0
int RoomId = i + Start; int RoomId = i + Start;
// prepare // prepare
::memset(data + offset, ' ', 50U); ::memset(data + offset, ' ', 50U);
data[offset + 0U] = '5'; data[offset + 0U] = '5';
@ -394,18 +394,18 @@ bool CWiresxCmdHandler::ReplyToWiresxAllReqPacket(const CIp &Ip, const CWiresxIn
uint k = 1029U - offset2; uint k = 1029U - offset2;
::memset(data+offset2, ' ', k); ::memset(data+offset2, ' ', k);
offset2 += k; offset2 += k;
// EOD + CRC // EOD + CRC
data[offset2 + 0U] = 0x03U; data[offset2 + 0U] = 0x03U;
data[offset2 + 1U] = CCRC::addCRC(data, offset2 + 1U); data[offset2 + 1U] = CCRC::addCRC(data, offset2 + 1U);
offset2 += 2U; offset2 += 2U;
// and encode the reply // and encode the reply
CBuffer Data; CBuffer Data;
Data.Set(data, offset2 + 2U); Data.Set(data, offset2 + 2U);
ok = EncodeAndSendWiresxPacket(Ip, Data, WiresxInfo); ok = EncodeAndSendWiresxPacket(Ip, Data, WiresxInfo);
} }
// and next repeat with normal frame // and next repeat with normal frame
{ {
@ -418,7 +418,7 @@ bool CWiresxCmdHandler::ReplyToWiresxAllReqPacket(const CIp &Ip, const CWiresxIn
//uint k = 1031U - offset; //uint k = 1031U - offset;
//::memset(data+offset, ' ', k); //::memset(data+offset, ' ', k);
//offset += k; //offset += k;
// and encode the reply // and encode the reply
CBuffer Data; CBuffer Data;
Data.Set(data, offset + 2U); Data.Set(data, offset + 2U);
@ -441,11 +441,11 @@ bool CWiresxCmdHandler::ReplyToWiresxConnReqPacket(const CIp &Ip, const CWiresxI
// linked room // linked room
// Module A == 0 // Module A == 0
RoomId = (uint8)(Module - 'A'); RoomId = (uint8)(Module - 'A');
// prepare buffer // prepare buffer
::memset(data, 0x00U, 110U); ::memset(data, 0x00U, 110U);
::memset(data, ' ', 90U); ::memset(data, ' ', 90U);
// seq no // seq no
data[0U] = m_seqNo; data[0U] = m_seqNo;
// command // command
@ -499,11 +499,11 @@ bool CWiresxCmdHandler::ReplyToWiresxDiscReqPacket(const CIp &Ip, const CWiresxI
uint8 DISC_RESP[] = {0x5DU, 0x41U, 0x5FU, 0x26U}; uint8 DISC_RESP[] = {0x5DU, 0x41U, 0x5FU, 0x26U};
bool ok = false; bool ok = false;
uint8 data[110U]; uint8 data[110U];
// prepare buffer // prepare buffer
::memset(data, 0x00U, 110U); ::memset(data, 0x00U, 110U);
::memset(data, ' ', 90U); ::memset(data, ' ', 90U);
// seq no // seq no
data[0U] = m_seqNo; data[0U] = m_seqNo;
// command // command
@ -547,12 +547,12 @@ bool CWiresxCmdHandler::EncodeAndSendWiresxPacket(const CIp &Ip, const CBuffer &
CYSFPayload payload; CYSFPayload payload;
uint8 buffer[200U]; uint8 buffer[200U];
CBuffer Data(DataOrg); CBuffer Data(DataOrg);
// seq no // seq no
uint8 seqNo = 0U; uint8 seqNo = 0U;
// calculate bt and adjust length // calculate bt and adjust length
uint length = (uint)Data.size(); uint length = (uint)Data.size();
uint8 bt = 0; uint8 bt = 0;
@ -577,10 +577,10 @@ bool CWiresxCmdHandler::EncodeAndSendWiresxPacket(const CIp &Ip, const CBuffer &
{ {
Data.Append((uint8)0x20U, (int)(length - (uint)Data.size())); Data.Append((uint8)0x20U, (int)(length - (uint)Data.size()));
} }
// ft // ft
uint8 ft = WiresxCalcFt(length, 0U); uint8 ft = WiresxCalcFt(length, 0U);
// Write the header // Write the header
{ {
//header //header
@ -675,7 +675,7 @@ bool CWiresxCmdHandler::EncodeAndSendWiresxPacket(const CIp &Ip, const CBuffer &
// and post it // and post it
SendPacket(Ip, buffer); SendPacket(Ip, buffer);
} }
// done // done
return true; return true;
} }
@ -721,7 +721,7 @@ bool CWiresxCmdHandler::DebugTestDecodePacket(const CBuffer &Buffer)
CYSFPayload payload; CYSFPayload payload;
CBuffer dump; CBuffer dump;
bool valid = false; bool valid = false;
if ( (Buffer.size() == 155) && (Buffer.Compare(tag, sizeof(tag)) == 0) ) if ( (Buffer.size() == 155) && (Buffer.Compare(tag, sizeof(tag)) == 0) )
{ {
// decode YSH fich // decode YSH fich
@ -733,7 +733,7 @@ bool CWiresxCmdHandler::DebugTestDecodePacket(const CBuffer &Buffer)
<< (int)Fich.getBT() << "," << (int)Fich.getBT() << ","
<< (int)Fich.getFN() << "," << (int)Fich.getFN() << ","
<< (int)Fich.getFT() << " : "; << (int)Fich.getFT() << " : ";
switch ( Fich.getFI() ) switch ( Fich.getFI() )
{ {
case YSF_FI_HEADER: case YSF_FI_HEADER:

@ -41,10 +41,10 @@ class CWiresxCmdHandler
public: public:
// constructor // constructor
CWiresxCmdHandler(); CWiresxCmdHandler();
// destructor // destructor
virtual ~CWiresxCmdHandler(); virtual ~CWiresxCmdHandler();
// initialization // initialization
virtual bool Init(void); virtual bool Init(void);
virtual void Close(void); virtual void Close(void);
@ -56,18 +56,18 @@ public:
void ReleasePacketQueue(void) { m_PacketQueue.Unlock(); } void ReleasePacketQueue(void) { m_PacketQueue.Unlock(); }
// get // get
// task // task
static void Thread(CWiresxCmdHandler *); static void Thread(CWiresxCmdHandler *);
virtual void Task(void); virtual void Task(void);
protected: protected:
// packet encoding // packet encoding
bool ReplyToWiresxDxReqPacket(const CIp &, const CWiresxInfo &, char); bool ReplyToWiresxDxReqPacket(const CIp &, const CWiresxInfo &, char);
bool ReplyToWiresxAllReqPacket(const CIp &, const CWiresxInfo &, int); bool ReplyToWiresxAllReqPacket(const CIp &, const CWiresxInfo &, int);
bool ReplyToWiresxConnReqPacket(const CIp &, const CWiresxInfo &, char); bool ReplyToWiresxConnReqPacket(const CIp &, const CWiresxInfo &, char);
bool ReplyToWiresxDiscReqPacket(const CIp &, const CWiresxInfo &); bool ReplyToWiresxDiscReqPacket(const CIp &, const CWiresxInfo &);
// packet encoding helpers // packet encoding helpers
bool EncodeAndSendWiresxPacket(const CIp &, const CBuffer &, const CWiresxInfo &); bool EncodeAndSendWiresxPacket(const CIp &, const CBuffer &, const CWiresxInfo &);
uint8 WiresxCalcFt(uint, uint) const; uint8 WiresxCalcFt(uint, uint) const;
@ -80,13 +80,13 @@ protected:
// data // data
CWiresxInfo m_ReflectorWiresxInfo; CWiresxInfo m_ReflectorWiresxInfo;
uint8_t m_seqNo; uint8_t m_seqNo;
// queues // queues
CWiresxCmdQueue m_CmdQueue; CWiresxCmdQueue m_CmdQueue;
CWiresxPacketQueue m_PacketQueue; CWiresxPacketQueue m_PacketQueue;
// thread // thread
bool m_bStopThread; std::atomic<bool> keep_running;
std::thread *m_pThread; std::thread *m_pThread;
}; };

@ -33,14 +33,14 @@
CYsfNodeDir::CYsfNodeDir() CYsfNodeDir::CYsfNodeDir()
{ {
m_bStopThread = false; keep_running = true;
m_pThread = NULL; m_pThread = NULL;
} }
CYsfNodeDir::~CYsfNodeDir() CYsfNodeDir::~CYsfNodeDir()
{ {
// kill threads // kill threads
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
@ -56,10 +56,10 @@ bool CYsfNodeDir::Init(void)
{ {
// load content // load content
Reload(); Reload();
// reset stop flag // reset stop flag
m_bStopThread = false; keep_running = true;
// start thread; // start thread;
m_pThread = new std::thread(CYsfNodeDir::Thread, this); m_pThread = new std::thread(CYsfNodeDir::Thread, this);
@ -68,7 +68,7 @@ bool CYsfNodeDir::Init(void)
void CYsfNodeDir::Close(void) void CYsfNodeDir::Close(void)
{ {
m_bStopThread = true; keep_running = false;
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
@ -82,7 +82,7 @@ void CYsfNodeDir::Close(void)
void CYsfNodeDir::Thread(CYsfNodeDir *This) void CYsfNodeDir::Thread(CYsfNodeDir *This)
{ {
while ( !This->m_bStopThread ) while (This->keep_running)
{ {
// Wait 30 seconds // Wait 30 seconds
CTimePoint::TaskSleepFor(YSFNODEDB_REFRESH_RATE * 60000); CTimePoint::TaskSleepFor(YSFNODEDB_REFRESH_RATE * 60000);
@ -102,7 +102,7 @@ bool CYsfNodeDir::Reload(void)
{ {
CBuffer buffer; CBuffer buffer;
bool ok = false; bool ok = false;
if ( LoadContent(&buffer) ) if ( LoadContent(&buffer) )
{ {
Lock(); Lock();

@ -59,11 +59,11 @@ public:
// init & close // init & close
virtual bool Init(void); virtual bool Init(void);
virtual void Close(void); virtual void Close(void);
// locks // locks
void Lock(void) { m_Mutex.lock(); } void Lock(void) { m_Mutex.lock(); }
void Unlock(void) { m_Mutex.unlock(); } void Unlock(void) { m_Mutex.unlock(); }
// refresh // refresh
virtual bool LoadContent(CBuffer *) { return false; } virtual bool LoadContent(CBuffer *) { return false; }
virtual bool RefreshContent(const CBuffer &) { return false; } virtual bool RefreshContent(const CBuffer &) { return false; }
@ -79,14 +79,14 @@ protected:
bool Reload(void); bool Reload(void);
virtual bool NeedReload(void) { return false; } virtual bool NeedReload(void) { return false; }
//bool IsValidDmrid(const char *); //bool IsValidDmrid(const char *);
protected: protected:
// Lock() // Lock()
std::mutex m_Mutex; std::mutex m_Mutex;
// thread // thread
bool m_bStopThread; std::atomic<bool> keep_running;
std::thread *m_pThread; std::thread *m_pThread;
}; };

@ -25,6 +25,7 @@
#ifndef main_h #ifndef main_h
#define main_h #define main_h
#include <atomic>
#include <vector> #include <vector>
#include <array> #include <array>
#include <list> #include <list>

Loading…
Cancel
Save

Powered by TurnKey Linux.