diff --git a/ambedtest/ccodecstream.cpp b/ambedtest/ccodecstream.cpp index 5c5fcf3..90fca6c 100644 --- a/ambedtest/ccodecstream.cpp +++ b/ambedtest/ccodecstream.cpp @@ -36,7 +36,7 @@ CCodecStream::CCodecStream(uint16 uiId, uint8 uiCodecIn, uint8 uiCodecOut) { - m_bStopThread = false; + keep_running = true; m_pThread = NULL; m_uiStreamId = uiId; m_uiPid = 0; @@ -59,9 +59,9 @@ CCodecStream::~CCodecStream() { // close socket m_Socket.Close(); - + // kill threads - m_bStopThread = true; + keep_running = false; if ( m_pThread != NULL ) { m_pThread->join(); @@ -75,10 +75,10 @@ CCodecStream::~CCodecStream() bool CCodecStream::Init(uint16 uiPort) { bool ok; - + // reset stop flag - m_bStopThread = false; - + keep_running = true; + // copy our test data if ( m_uiCodecIn == CODEC_AMBE2PLUS ) { @@ -108,11 +108,11 @@ bool CCodecStream::Init(uint16 uiPort) m_AmbeDest.push_back(ambe); } } - + // create server's IP m_Ip = g_Transcoder.GetAmbedIp(); m_uiPort = uiPort; - + // create our socket ok = m_Socket.Open(uiPort); if ( ok ) @@ -129,7 +129,7 @@ bool CCodecStream::Init(uint16 uiPort) std::cout << "Error opening socket on port UDP" << uiPort << " on ip " << m_Ip << std::endl; m_bConnected = false; } - + // done return ok; } @@ -139,9 +139,9 @@ void CCodecStream::Close(void) // close socket m_bConnected = false; m_Socket.Close(); - + // kill threads - m_bStopThread = true; + keep_running = false; if ( m_pThread != NULL ) { m_pThread->join(); @@ -156,7 +156,7 @@ void CCodecStream::Close(void) void CCodecStream::Thread(CCodecStream *This) { - while ( !This->m_bStopThread ) + while (This->keep_running) { This->Task(); } @@ -167,7 +167,7 @@ void CCodecStream::Task(void) CBuffer Buffer; CIp Ip; uint8 Ambe[AMBE_SIZE]; - + // connected ? if ( m_bConnected ) { @@ -176,17 +176,17 @@ void CCodecStream::Task(void) { // yes m_FrameTimer.Now(); - + // encode packet @ send it EncodeAmbePacket(&Buffer, m_AmbeSrc[m_iAmbeSrcPtr]->GetData()); m_Socket.Send(Buffer, m_Ip, m_uiPort); - + // and increment pointer m_iAmbeSrcPtr = (m_iAmbeSrcPtr + 1) % m_AmbeSrc.size(); m_uiNbTotalPacketSent++; m_uiNbPacketSent++; }*/ - + // any packt to send to trancoder ? uint32 uiNbPacketToSend = (uint32)(m_FrameTimer.DurationSinceNow() * 50.0) - m_uiNbTotalPacketSent; if ( uiNbPacketToSend > 0 ) @@ -196,14 +196,14 @@ void CCodecStream::Task(void) // encode packet @ send it EncodeAmbePacket(&Buffer, m_AmbeSrc[m_iAmbeSrcPtr]->GetData()); m_Socket.Send(Buffer, m_Ip, m_uiPort); - + // and increment pointer m_iAmbeSrcPtr = (m_iAmbeSrcPtr + 1) % m_AmbeSrc.size(); m_uiNbTotalPacketSent++; m_uiNbPacketSent++; } } - + // any packet from transcoder if ( m_Socket.Receive(&Buffer, &Ip, 1) != -1 ) { @@ -211,7 +211,7 @@ void CCodecStream::Task(void) if ( IsValidAmbePacket(Buffer, Ambe) ) { m_TimeoutTimer.Now(); - + // check the PID // check the transcoded packet /*if ( ::memcmp(Ambe, m_AmbeDest[m_iAmbeDestPtr]->GetData(), AMBE_SIZE) != 0 ) @@ -219,15 +219,15 @@ void CCodecStream::Task(void) m_uiNbPacketBad++; ::memcpy((void *)m_AmbeDest[m_iAmbeDestPtr]->GetData(), Ambe, AMBE_SIZE); }*/ - + // and increment pointer m_iAmbeDestPtr = (m_iAmbeDestPtr + 1) % m_AmbeDest.size(); m_uiNbPacketReceived++; - + } } } - + // display stats if ( m_DisplayStatsTimer.DurationSinceNow() >= 2.0 ) { @@ -249,7 +249,7 @@ void CCodecStream::Task(void) bool CCodecStream::IsValidAmbePacket(const CBuffer &Buffer, uint8 *Ambe) { bool valid = false; - + if ( (Buffer.size() == 11) && (Buffer.data()[0] == m_uiCodecOut) ) { ::memcpy(Ambe, &(Buffer.data()[2]), 9); @@ -282,7 +282,7 @@ void CCodecStream::ResetStats(void) m_uiNbPacketReceived = 0; m_uiNbPacketBad = 0; m_uiNbPacketTimeout = 0; - + } void CCodecStream::DisplayStats(void) @@ -292,10 +292,10 @@ void CCodecStream::DisplayStats(void) uint32 uiReceived = m_uiNbPacketReceived; uint32 uiBad = m_uiNbPacketBad; double fps = (double)uiReceived / m_StatsTimer.DurationSinceNow(); - + // resets ResetStats(); - + // displays char sz[256]; sprintf(sz, "Stream %d (%d->%d) : %u / %u / %u : %.1f fps", diff --git a/ambedtest/ccodecstream.h b/ambedtest/ccodecstream.h index 4619648..7c840c9 100644 --- a/ambedtest/ccodecstream.h +++ b/ambedtest/ccodecstream.h @@ -47,14 +47,14 @@ class CCodecStream public: // constructor CCodecStream(uint16, uint8, uint8); - + // destructor virtual ~CCodecStream(); - + // initialization bool Init(uint16); void Close(void); - + // get bool IsConnected(void) const { return m_bConnected; } uint16 GetStreamId(void) const { return m_uiStreamId; } @@ -62,26 +62,26 @@ public: // task static void Thread(CCodecStream *); void Task(void); - + protected: // packet decoding helpers bool IsValidAmbePacket(const CBuffer &, uint8 *); - + // packet encoding helpers void EncodeAmbePacket(CBuffer *, const uint8 *); // stats helpers void ResetStats(void); void DisplayStats(void); - + protected: // test data std::vector m_AmbeSrc; int m_iAmbeSrcPtr; std::vector m_AmbeDest; int m_iAmbeDestPtr; - + // data uint16 m_uiStreamId; uint16 m_uiPort; @@ -93,14 +93,14 @@ protected: CIp m_Ip; CUdpSocket m_Socket; bool m_bConnected; - + // thread - bool m_bStopThread; + std::atomic keep_running; std::thread *m_pThread; CTimePoint m_TimeoutTimer; CTimePoint m_FrameTimer; uint32 m_uiNbTotalPacketSent; - + // stats CTimePoint m_StatsTimer; CTimePoint m_DisplayStatsTimer; diff --git a/ambedtest/ctranscoder.cpp b/ambedtest/ctranscoder.cpp index 5d7805b..65bc28f 100644 --- a/ambedtest/ctranscoder.cpp +++ b/ambedtest/ctranscoder.cpp @@ -45,7 +45,7 @@ CTranscoder g_Transcoder; CTranscoder::CTranscoder() { - m_bStopThread = false; + keep_running = true; m_pThread = NULL; m_Streams.reserve(12); m_bConnected = false; @@ -69,17 +69,17 @@ CTranscoder::~CTranscoder() delete m_Streams[i]; } m_Streams.clear(); - + } m_Mutex.unlock(); - + // kill threads - m_bStopThread = true; + keep_running = false; if ( m_pThread != NULL ) { m_pThread->join(); delete m_pThread; - } + } } //////////////////////////////////////////////////////////////////////////////////////// @@ -88,14 +88,14 @@ CTranscoder::~CTranscoder() bool CTranscoder::Init(const CIp &ListenIp, const CIp &AmbedIp) { bool ok; - + // reset stop flag - m_bStopThread = false; + keep_running = true; // create server's IP m_ListenIp = ListenIp; m_AmbedIp = AmbedIp; - + // create our socket ok = m_Socket.Open(TRANSCODER_PORT); if ( ok ) @@ -116,7 +116,7 @@ void CTranscoder::Close(void) { // close socket m_Socket.Close(); - + // close all streams m_Mutex.lock(); { @@ -125,12 +125,12 @@ void CTranscoder::Close(void) delete m_Streams[i]; } m_Streams.clear(); - + } m_Mutex.unlock(); - + // kill threads - m_bStopThread = true; + keep_running = false; if ( m_pThread != NULL ) { m_pThread->join(); @@ -144,7 +144,7 @@ void CTranscoder::Close(void) void CTranscoder::Thread(CTranscoder *This) { - while ( !This->m_bStopThread ) + while (This->keep_running) { This->Task(); } @@ -156,13 +156,13 @@ void CTranscoder::Task(void) CIp Ip; uint16 StreamId; uint16 Port; - + // anything coming in from codec server ? //if ( (m_Socket.Receive(&Buffer, &Ip, 20) != -1) && (Ip == m_Ip) ) if ( m_Socket.Receive(&Buffer, &Ip, 20) != -1 ) { m_LastActivityTime.Now(); - + // crack packet if ( IsValidStreamDescrPacket(Buffer, &StreamId, &Port) ) { @@ -185,21 +185,21 @@ void CTranscoder::Task(void) } m_bConnected = true; } - + } - + // handle end of streaming timeout //CheckStreamsTimeout(); - + // handle queue from reflector //HandleQueue(); - + // keep client alive if ( m_LastKeepaliveTime.DurationSinceNow() > TRANSCODER_KEEPALIVE_PERIOD ) { // HandleKeepalives(); - + // update time m_LastKeepaliveTime.Now(); } @@ -211,9 +211,9 @@ void CTranscoder::Task(void) CCodecStream *CTranscoder::GetStream(uint8 uiCodecIn) { CBuffer Buffer; - + CCodecStream *stream = NULL; - + // do we need transcoding if ( uiCodecIn != CODEC_NONE ) { @@ -223,17 +223,17 @@ CCodecStream *CTranscoder::GetStream(uint8 uiCodecIn) // yes, post openstream request EncodeOpenstreamPacket(&Buffer, uiCodecIn, (uiCodecIn == CODEC_AMBEPLUS) ? CODEC_AMBE2PLUS : CODEC_AMBEPLUS); m_Socket.Send(Buffer, m_AmbedIp, TRANSCODER_PORT); - + // wait relpy here if ( m_SemaphoreOpenStream.WaitFor(AMBED_OPENSTREAM_TIMEOUT) ) { if ( m_bStreamOpened ) { std::cout << "ambed openstream(" << m_StreamidOpenStream << ") ok" << std::endl; - + // create stream object stream = new CCodecStream(m_StreamidOpenStream, uiCodecIn, (uiCodecIn == CODEC_AMBEPLUS) ? CODEC_AMBE2PLUS : CODEC_AMBEPLUS); - + // init it if ( stream->Init(m_PortOpenStream) ) { @@ -261,7 +261,7 @@ CCodecStream *CTranscoder::GetStream(uint8 uiCodecIn) { std::cout << "ambed openstream timeout" << std::endl; } - + } } return stream; @@ -270,7 +270,7 @@ CCodecStream *CTranscoder::GetStream(uint8 uiCodecIn) void CTranscoder::ReleaseStream(CCodecStream *stream) { CBuffer Buffer; - + if ( stream != NULL ) { // look for the stream @@ -285,7 +285,7 @@ void CTranscoder::ReleaseStream(CCodecStream *stream) // send close packet EncodeClosestreamPacket(&Buffer, m_Streams[i]->GetStreamId()); m_Socket.Send(Buffer, m_AmbedIp, TRANSCODER_PORT); - + // and close it m_Streams[i]->Close(); delete m_Streams[i]; @@ -304,11 +304,11 @@ void CTranscoder::ReleaseStream(CCodecStream *stream) void CTranscoder::HandleKeepalives(void) { CBuffer keepalive; - + // send keepalive EncodeKeepAlivePacket(&keepalive); m_Socket.Send(keepalive, m_AmbedIp, TRANSCODER_PORT); - + // check if still with us if ( m_bConnected && (m_LastActivityTime.DurationSinceNow() >= TRANSCODER_KEEPALIVE_TIMEOUT) ) { @@ -324,7 +324,7 @@ void CTranscoder::HandleKeepalives(void) bool CTranscoder::IsValidKeepAlivePacket(const CBuffer &Buffer) { uint8 tag[] = { 'A','M','B','E','D','P','O','N','G' }; - + bool valid = false; if ( (Buffer.size() == 9) && (Buffer.Compare(tag, sizeof(tag)) == 0) ) { @@ -336,7 +336,7 @@ bool CTranscoder::IsValidKeepAlivePacket(const CBuffer &Buffer) bool CTranscoder::IsValidStreamDescrPacket(const CBuffer &Buffer, uint16 *Id, uint16 *Port) { uint8 tag[] = { 'A','M','B','E','D','S','T','D' }; - + bool valid = false; if ( (Buffer.size() == 14) && (Buffer.Compare(tag, sizeof(tag)) == 0) ) { @@ -352,7 +352,7 @@ bool CTranscoder::IsValidStreamDescrPacket(const CBuffer &Buffer, uint16 *Id, ui bool CTranscoder::IsValidNoStreamAvailablePacket(const CBuffer&Buffer) { uint8 tag[] = { 'A','M','B','E','D','B','U','S','Y' }; - + return ( (Buffer.size() == 9) && (Buffer.Compare(tag, sizeof(tag)) == 0) ); } @@ -363,7 +363,7 @@ bool CTranscoder::IsValidNoStreamAvailablePacket(const CBuffer&Buffer) void CTranscoder::EncodeKeepAlivePacket(CBuffer *Buffer) { uint8 tag[] = { 'A','M','B','E','D','P','I','N','G' }; - + Buffer->Set(tag, sizeof(tag)); Buffer->Append((uint8 *)(const char *)"XLX000 ", 8); } @@ -381,8 +381,7 @@ void CTranscoder::EncodeOpenstreamPacket(CBuffer *Buffer, uint8 uiCodecIn, uint8 void CTranscoder::EncodeClosestreamPacket(CBuffer *Buffer, uint16 uiStreamId) { uint8 tag[] = { 'A','M','B','E','D','C','S' }; - + Buffer->Set(tag, sizeof(tag)); Buffer->Append((uint16)uiStreamId); } - diff --git a/ambedtest/ctranscoder.h b/ambedtest/ctranscoder.h index 2df670d..838fc9d 100644 --- a/ambedtest/ctranscoder.h +++ b/ambedtest/ctranscoder.h @@ -41,7 +41,7 @@ class CTranscoder public: // constructor CTranscoder(); - + // destructor virtual ~CTranscoder(); @@ -52,16 +52,16 @@ public: // locks void Lock(void) { m_Mutex.lock(); } void Unlock(void) { m_Mutex.unlock(); } - + // get const CIp &GetListenIp(void) const { return m_ListenIp; } const CIp &GetAmbedIp(void) const { return m_AmbedIp; } bool IsAmbedConnected(void) const { return m_bConnected; } - + // manage streams CCodecStream *GetStream(uint8); void ReleaseStream(CCodecStream *); - + // task static void Thread(CTranscoder *); void Task(void); @@ -79,12 +79,12 @@ protected: void EncodeKeepAlivePacket(CBuffer *); void EncodeOpenstreamPacket(CBuffer *, uint8, uint8); void EncodeClosestreamPacket(CBuffer *, uint16); - + protected: // IP's CIp m_ListenIp; CIp m_AmbedIp; - + // streams std::mutex m_Mutex; std::vector m_Streams; @@ -94,9 +94,9 @@ protected: bool m_bStreamOpened; uint16 m_StreamidOpenStream; uint16 m_PortOpenStream; - + // thread - bool m_bStopThread; + std::atomic keep_running; std::thread *m_pThread; // socket diff --git a/src/ccodecstream.cpp b/src/ccodecstream.cpp index cd4c224..9bd008c 100644 --- a/src/ccodecstream.cpp +++ b/src/ccodecstream.cpp @@ -38,7 +38,7 @@ CCodecStream::CCodecStream(CPacketStream *PacketStream, uint16 uiId, uint8 uiCodecIn, uint8 uiCodecOut) { - m_bStopThread = false; + keep_running = true; m_pThread = NULL; m_uiStreamId = uiId; m_uiPid = 0; @@ -63,7 +63,7 @@ CCodecStream::~CCodecStream() m_Socket.Close(); // kill threads - m_bStopThread = true; + keep_running = false; if ( m_pThread != NULL ) { m_pThread->join(); @@ -90,7 +90,7 @@ CCodecStream::~CCodecStream() bool CCodecStream::Init(uint16 uiPort) { // reset stop flag - m_bConnected = m_bStopThread = false; + m_bConnected = keep_running = true; // create server's IP m_uiPort = uiPort; @@ -124,7 +124,7 @@ void CCodecStream::Close(void) m_Socket.Close(); // kill threads - m_bStopThread = true; + keep_running = false; if ( m_pThread != NULL ) { m_pThread->join(); @@ -146,7 +146,7 @@ bool CCodecStream::IsEmpty(void) const void CCodecStream::Thread(CCodecStream *This) { - while ( !This->m_bStopThread ) + while (This->keep_running) { This->Task(); } diff --git a/src/ccodecstream.h b/src/ccodecstream.h index c22e864..1b9e6e0 100644 --- a/src/ccodecstream.h +++ b/src/ccodecstream.h @@ -48,14 +48,14 @@ class CCodecStream : public CPacketQueue public: // constructor CCodecStream(CPacketStream *, uint16, uint8, uint8); - + // destructor virtual ~CCodecStream(); - + // initialization bool Init(uint16); void Close(void); - + // get bool IsConnected(void) const { return m_bConnected; } uint16 GetStreamId(void) const { return m_uiStreamId; } @@ -69,16 +69,16 @@ public: // task static void Thread(CCodecStream *); void Task(void); - + protected: // packet decoding helpers bool IsValidAmbePacket(const CBuffer &, uint8 *); - + // packet encoding helpers void EncodeAmbePacket(CBuffer *, const uint8 *); - + protected: // data uint16 m_uiStreamId; @@ -91,17 +91,17 @@ protected: CIp m_Ip; CUdpSocket m_Socket; bool m_bConnected; - + // associated packet stream CPacketStream *m_PacketStream; CPacketQueue m_LocalQueue; // thread - bool m_bStopThread; + std::atomic keep_running; std::thread *m_pThread; CTimePoint m_TimeoutTimer; CTimePoint m_StatsTimer; - + // statistics double m_fPingMin; double m_fPingMax; diff --git a/src/cdmriddir.cpp b/src/cdmriddir.cpp index 5d62db3..eff0973 100644 --- a/src/cdmriddir.cpp +++ b/src/cdmriddir.cpp @@ -34,14 +34,14 @@ CDmridDir::CDmridDir() { - m_bStopThread = false; + keep_running = true; m_pThread = NULL; } CDmridDir::~CDmridDir() { // kill threads - m_bStopThread = true; + keep_running = false; if ( m_pThread != NULL ) { m_pThread->join(); @@ -57,10 +57,10 @@ bool CDmridDir::Init(void) { // load content Reload(); - + // reset stop flag - m_bStopThread = false; - + keep_running = true; + // start thread; m_pThread = new std::thread(CDmridDir::Thread, this); @@ -69,7 +69,7 @@ bool CDmridDir::Init(void) void CDmridDir::Close(void) { - m_bStopThread = true; + keep_running = false; if ( m_pThread != NULL ) { m_pThread->join(); @@ -83,7 +83,7 @@ void CDmridDir::Close(void) void CDmridDir::Thread(CDmridDir *This) { - while ( !This->m_bStopThread ) + while (This->keep_running) { // Wait 30 seconds CTimePoint::TaskSleepFor(DMRIDDB_REFRESH_RATE * 60000); @@ -103,7 +103,7 @@ bool CDmridDir::Reload(void) { CBuffer buffer; bool ok = false; - + if ( LoadContent(&buffer) ) { Lock(); @@ -157,4 +157,3 @@ bool CDmridDir::IsValidDmrid(const char *sz) } return ok; } - diff --git a/src/cdmriddir.h b/src/cdmriddir.h index 23334e3..8794b5d 100644 --- a/src/cdmriddir.h +++ b/src/cdmriddir.h @@ -48,26 +48,26 @@ class CDmridDir public: // constructor CDmridDir(); - + // destructor ~CDmridDir(); - + // init & close virtual bool Init(void); virtual void Close(void); - + // locks void Lock(void) { m_Mutex.lock(); } void Unlock(void) { m_Mutex.unlock(); } - + // refresh virtual bool LoadContent(CBuffer *) { return false; } virtual bool RefreshContent(const CBuffer &) { return false; } - + // find const CCallsign *FindCallsign(uint32); uint32 FindDmrid(const CCallsign &); - + protected: // thread static void Thread(CDmridDir *); @@ -76,17 +76,17 @@ protected: bool Reload(void); virtual bool NeedReload(void) { return false; } bool IsValidDmrid(const char *); - + protected: // data std::map m_CallsignMap; std::map m_DmridMap; - + // Lock() std::mutex m_Mutex; - + // thread - bool m_bStopThread; + std::atomic keep_running; std::thread *m_pThread; }; diff --git a/src/cg3protocol.cpp b/src/cg3protocol.cpp index 68ef97f..95bc307 100644 --- a/src/cg3protocol.cpp +++ b/src/cg3protocol.cpp @@ -46,7 +46,7 @@ bool CG3Protocol::Init(void) m_ReflectorCallsign = g_Reflector.GetCallsign(); // reset stop flag - m_bStopThread = false; + keep_running = true; // update the reflector callsign m_ReflectorCallsign.PatchCallsign(0, (const uint8 *)"XLX", 3); @@ -124,7 +124,7 @@ void CG3Protocol::Close(void) void CG3Protocol::PresenceThread(CG3Protocol *This) { - while ( !This->m_bStopThread ) + while (This->keep_running) { This->PresenceTask(); } @@ -132,7 +132,7 @@ void CG3Protocol::PresenceThread(CG3Protocol *This) void CG3Protocol::ConfigThread(CG3Protocol *This) { - while ( !This->m_bStopThread ) + while (This->keep_running) { This->ConfigTask(); } @@ -140,7 +140,7 @@ void CG3Protocol::ConfigThread(CG3Protocol *This) void CG3Protocol::IcmpThread(CG3Protocol *This) { - while ( !This->m_bStopThread ) + while (This->keep_running) { This->IcmpTask(); } diff --git a/src/cgatekeeper.cpp b/src/cgatekeeper.cpp index 50c079f..a7fecc6 100644 --- a/src/cgatekeeper.cpp +++ b/src/cgatekeeper.cpp @@ -19,7 +19,7 @@ // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License -// along with Foobar. If not, see . +// along with Foobar. If not, see . // ---------------------------------------------------------------------------- #include "main.h" @@ -36,7 +36,7 @@ CGateKeeper g_GateKeeper; CGateKeeper::CGateKeeper() { - m_bStopThread = false; + keep_running = true; m_pThread = NULL; } @@ -46,7 +46,7 @@ CGateKeeper::CGateKeeper() CGateKeeper::~CGateKeeper() { // kill threads - m_bStopThread = true; + keep_running = false; if ( m_pThread != NULL ) { m_pThread->join(); @@ -60,15 +60,15 @@ CGateKeeper::~CGateKeeper() bool CGateKeeper::Init(void) { - + // load lists from files m_NodeWhiteList.LoadFromFile(WHITELIST_PATH); m_NodeBlackList.LoadFromFile(BLACKLIST_PATH); m_PeerList.LoadFromFile(INTERLINKLIST_PATH); - + // reset stop flag - m_bStopThread = false; - + keep_running = true; + // start thread; m_pThread = new std::thread(CGateKeeper::Thread, this); @@ -77,7 +77,7 @@ bool CGateKeeper::Init(void) void CGateKeeper::Close(void) { - m_bStopThread = true; + keep_running = false; if ( m_pThread != NULL ) { m_pThread->join(); @@ -92,7 +92,7 @@ void CGateKeeper::Close(void) bool CGateKeeper::MayLink(const CCallsign &callsign, const CIp &ip, int protocol, char *modules) const { bool ok = true; - + switch (protocol) { // repeaters @@ -107,33 +107,33 @@ bool CGateKeeper::MayLink(const CCallsign &callsign, const CIp &ip, int protocol ok &= IsNodeListedOk(callsign, ip); // todo: then apply any protocol specific authorisation for the operation break; - + // XLX interlinks case PROTOCOL_XLX: ok &= IsPeerListedOk(callsign, ip, modules); break; - + // unsupported case PROTOCOL_NONE: default: ok = false; break; } - + // report if ( !ok ) { std::cout << "Gatekeeper blocking linking of " << callsign << " @ " << ip << " using protocol " << protocol << std::endl; } - + // done return ok; } - + bool CGateKeeper::MayTransmit(const CCallsign &callsign, const CIp &ip, int protocol, char module) const { bool ok = true; - + switch (protocol) { // repeaters, protocol specific @@ -149,25 +149,25 @@ bool CGateKeeper::MayTransmit(const CCallsign &callsign, const CIp &ip, int prot ok &= IsNodeListedOk(callsign, ip, module); // todo: then apply any protocol specific authorisation for the operation break; - + // XLX interlinks case PROTOCOL_XLX: ok &= IsPeerListedOk(callsign, ip, module); break; - + // unsupported case PROTOCOL_NONE: default: ok = false; break; } - + // report if ( !ok ) { std::cout << "Gatekeeper blocking transmitting of " << callsign << " @ " << ip << " using protocol " << protocol << std::endl; } - + // done return ok; } @@ -177,7 +177,7 @@ bool CGateKeeper::MayTransmit(const CCallsign &callsign, const CIp &ip, int prot void CGateKeeper::Thread(CGateKeeper *This) { - while ( !This->m_bStopThread ) + while (This->keep_running) { // Wait 30 seconds CTimePoint::TaskSleepFor(30000); @@ -204,9 +204,9 @@ void CGateKeeper::Thread(CGateKeeper *This) bool CGateKeeper::IsNodeListedOk(const CCallsign &callsign, const CIp &ip, char module) const { bool ok = true; - + // first check IP - + // next, check callsign if ( ok ) { @@ -218,24 +218,24 @@ bool CGateKeeper::IsNodeListedOk(const CCallsign &callsign, const CIp &ip, char ok = m_NodeWhiteList.IsCallsignListedWithWildcard(callsign, module); } const_cast(m_NodeWhiteList).Unlock(); - + // then check if not blacklisted const_cast(m_NodeBlackList).Lock(); ok &= !m_NodeBlackList.IsCallsignListedWithWildcard(callsign); const_cast(m_NodeBlackList).Unlock(); } - + // done return ok; - + } bool CGateKeeper::IsPeerListedOk(const CCallsign &callsign, const CIp &ip, char module) const { bool ok = true; - + // first check IP - + // next, check callsign if ( ok ) { @@ -247,7 +247,7 @@ bool CGateKeeper::IsPeerListedOk(const CCallsign &callsign, const CIp &ip, char } const_cast(m_PeerList).Unlock(); } - + // done return ok; } @@ -255,9 +255,9 @@ bool CGateKeeper::IsPeerListedOk(const CCallsign &callsign, const CIp &ip, char bool CGateKeeper::IsPeerListedOk(const CCallsign &callsign, const CIp &ip, char *modules) const { bool ok = true; - + // first check IP - + // next, check callsign if ( ok ) { @@ -269,8 +269,7 @@ bool CGateKeeper::IsPeerListedOk(const CCallsign &callsign, const CIp &ip, char } const_cast(m_PeerList).Unlock(); } - + // done return ok; } - diff --git a/src/cgatekeeper.h b/src/cgatekeeper.h index 9ee7504..eef20f7 100644 --- a/src/cgatekeeper.h +++ b/src/cgatekeeper.h @@ -19,7 +19,7 @@ // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License -// along with Foobar. If not, see . +// along with Foobar. If not, see . // ---------------------------------------------------------------------------- #ifndef cgatekeeper_h @@ -39,22 +39,22 @@ class CGateKeeper public: // constructor CGateKeeper(); - + // destructor virtual ~CGateKeeper(); - + // init & clode bool Init(void); void Close(void); - + // authorizations bool MayLink(const CCallsign &, const CIp &, int, char * = NULL) const; bool MayTransmit(const CCallsign &, const CIp &, int = PROTOCOL_ANY, char = ' ') const; - + // peer list handeling CPeerCallsignList *GetPeerList(void) { m_PeerList.Lock(); return &m_PeerList; } void ReleasePeerList(void) { m_PeerList.Unlock(); } - + protected: // thread static void Thread(CGateKeeper *); @@ -63,15 +63,15 @@ protected: bool IsNodeListedOk(const CCallsign &, const CIp &, char = ' ') const; bool IsPeerListedOk(const CCallsign &, const CIp &, char) const; bool IsPeerListedOk(const CCallsign &, const CIp &, char *) const; - + protected: // data CCallsignList m_NodeWhiteList; CCallsignList m_NodeBlackList; CPeerCallsignList m_PeerList; - + // thread - bool m_bStopThread; + std::atomic keep_running; std::thread *m_pThread; }; diff --git a/src/cprotocol.cpp b/src/cprotocol.cpp index 968be53..b6aee24 100644 --- a/src/cprotocol.cpp +++ b/src/cprotocol.cpp @@ -33,7 +33,7 @@ // constructor -CProtocol::CProtocol() : m_bStopThread(false), m_pThread(NULL) {} +CProtocol::CProtocol() : keep_running(true), m_pThread(NULL) {} //////////////////////////////////////////////////////////////////////////////////////// @@ -42,7 +42,7 @@ CProtocol::CProtocol() : m_bStopThread(false), m_pThread(NULL) {} CProtocol::~CProtocol() { // kill threads - m_bStopThread = true; + keep_running = false; if ( m_pThread != NULL ) { m_pThread->join(); @@ -71,7 +71,7 @@ bool CProtocol::Initialize(const char *type, uint16 port) m_ReflectorCallsign = g_Reflector.GetCallsign(); // reset stop flag - m_bStopThread = false; + keep_running = true; // update the reflector callsign if (type) @@ -110,7 +110,7 @@ bool CProtocol::Initialize(const char *type, uint16 port) void CProtocol::Thread(CProtocol *This) { - while (! This->m_bStopThread) + while (This->keep_running) { This->Task(); } @@ -118,7 +118,7 @@ void CProtocol::Thread(CProtocol *This) void CProtocol::Close(void) { - m_bStopThread = true; + keep_running = true; if ( m_pThread != NULL ) { m_pThread->join(); diff --git a/src/cprotocol.h b/src/cprotocol.h index b7a9fef..0e5f659 100644 --- a/src/cprotocol.h +++ b/src/cprotocol.h @@ -137,7 +137,7 @@ protected: CPacketQueue m_Queue; // thread - bool m_bStopThread; + std::atomic keep_running; std::thread *m_pThread; // identity diff --git a/src/creflector.cpp b/src/creflector.cpp index 0d8da87..f86c2f7 100644 --- a/src/creflector.cpp +++ b/src/creflector.cpp @@ -38,7 +38,7 @@ CReflector::CReflector() { - m_bStopThreads = false; + keep_running = true; m_XmlReportThread = NULL; m_JsonReportThread = NULL; for ( int i = 0; i < NB_OF_MODULES; i++ ) @@ -55,7 +55,7 @@ CReflector::CReflector(const CCallsign &callsign) #ifdef DEBUG_DUMPFILE m_DebugFile.close(); #endif - m_bStopThreads = false; + keep_running = true; m_XmlReportThread = NULL; m_JsonReportThread = NULL; for ( int i = 0; i < NB_OF_MODULES; i++ ) @@ -70,7 +70,7 @@ CReflector::CReflector(const CCallsign &callsign) CReflector::~CReflector() { - m_bStopThreads = true; + keep_running = false; if ( m_XmlReportThread != NULL ) { m_XmlReportThread->join(); @@ -100,7 +100,7 @@ bool CReflector::Start(void) bool ok = true; // reset stop flag - m_bStopThreads = false; + keep_running = true; // init gate keeper ok &= g_GateKeeper.Init(); @@ -144,7 +144,7 @@ bool CReflector::Start(void) void CReflector::Stop(void) { // stop & delete all threads - m_bStopThreads = true; + keep_running = false; // stop & delete report threads if ( m_XmlReportThread != NULL ) @@ -325,7 +325,7 @@ void CReflector::RouterThread(CReflector *This, CPacketStream *streamIn) // get on input queue CPacket *packet; - while ( !This->m_bStopThreads ) + while (This->keep_running) { // any packet in our input queue ? streamIn->Lock(); @@ -384,7 +384,7 @@ void CReflector::RouterThread(CReflector *This, CPacketStream *streamIn) void CReflector::XmlReportThread(CReflector *This) { - while ( !This->m_bStopThreads ) + while (This->keep_running) { // report to xml file std::ofstream xmlFile; @@ -423,7 +423,7 @@ void CReflector::JsonReportThread(CReflector *This) if ( Socket.Open(JSON_PORT) ) { // and loop - while ( !This->m_bStopThreads ) + while (This->keep_running) { // any command ? if ( Socket.Receive(Buffer, Ip, 50) ) diff --git a/src/creflector.h b/src/creflector.h index bd6c913..eea1c35 100644 --- a/src/creflector.h +++ b/src/creflector.h @@ -136,7 +136,7 @@ protected: std::array m_Streams; // threads - bool m_bStopThreads; + std::atomic keep_running; std::array m_RouterThreads; std::thread *m_XmlReportThread; std::thread *m_JsonReportThread; diff --git a/src/ctranscoder.cpp b/src/ctranscoder.cpp index bf545d7..1991f5c 100644 --- a/src/ctranscoder.cpp +++ b/src/ctranscoder.cpp @@ -46,7 +46,7 @@ CTranscoder g_Transcoder; CTranscoder::CTranscoder() { - m_bStopThread = false; + keep_running = true; m_pThread = NULL; m_bConnected = false; m_LastKeepaliveTime.Now(); @@ -72,7 +72,7 @@ bool CTranscoder::Init(void) bool ok; // reset stop flag - m_bStopThread = false; + keep_running = true; // create server's IP auto s = g_Reflector.GetTranscoderIp(); @@ -116,7 +116,7 @@ void CTranscoder::Close(void) m_Mutex.unlock(); // kill threads - m_bStopThread = true; + keep_running = false; if ( m_pThread != NULL ) { m_pThread->join(); @@ -130,7 +130,7 @@ void CTranscoder::Close(void) void CTranscoder::Thread(CTranscoder *This) { - while ( !This->m_bStopThread ) + while (This->keep_running) { This->Task(); } diff --git a/src/ctranscoder.h b/src/ctranscoder.h index f40d07c..cf9f8ca 100644 --- a/src/ctranscoder.h +++ b/src/ctranscoder.h @@ -93,7 +93,7 @@ protected: uint16 m_PortOpenStream; // thread - bool m_bStopThread; + std::atomic keep_running; std::thread *m_pThread; // socket diff --git a/src/cwiresxcmdhandler.cpp b/src/cwiresxcmdhandler.cpp index 0f7af53..ed22a02 100644 --- a/src/cwiresxcmdhandler.cpp +++ b/src/cwiresxcmdhandler.cpp @@ -40,7 +40,7 @@ CWiresxCmdHandler::CWiresxCmdHandler() { m_seqNo = 0; - m_bStopThread = false; + keep_running = true; m_pThread = NULL; } @@ -51,13 +51,13 @@ CWiresxCmdHandler::CWiresxCmdHandler() CWiresxCmdHandler::~CWiresxCmdHandler() { // kill threads - m_bStopThread = true; + keep_running = false; if ( m_pThread != NULL ) { m_pThread->join(); delete m_pThread; } - + // empty queue m_CmdQueue.Lock(); while ( !m_CmdQueue.empty() ) @@ -78,18 +78,18 @@ bool CWiresxCmdHandler::Init(void) m_ReflectorWiresxInfo.SetName("Reflector"); // reset stop flag - m_bStopThread = false; + keep_running = true; // start thread; m_pThread = new std::thread(CWiresxCmdHandler::Thread, this); - + // done return true; } void CWiresxCmdHandler::Close(void) { - m_bStopThread = true; + keep_running = false; if ( m_pThread != NULL ) { m_pThread->join(); @@ -103,7 +103,7 @@ void CWiresxCmdHandler::Close(void) void CWiresxCmdHandler::Thread(CWiresxCmdHandler *This) { - while ( !This->m_bStopThread ) + while (This->keep_running) { This->Task(); } @@ -120,7 +120,7 @@ void CWiresxCmdHandler::Task(void) uint32 uiNodeRxFreq; char cModule; bool bCmd; - + // anything to do ? bCmd = false; m_CmdQueue.Lock(); @@ -142,8 +142,8 @@ void CWiresxCmdHandler::Task(void) } } m_CmdQueue.Unlock(); - - + + // handle it if ( bCmd ) { @@ -161,7 +161,7 @@ void CWiresxCmdHandler::Task(void) cModule = client->GetReflectorModule(); } g_Reflector.ReleaseClients(); - + // and crack the cmd switch ( Cmd.GetCmd() ) { @@ -236,12 +236,12 @@ bool CWiresxCmdHandler::ReplyToWiresxDxReqPacket(const CIp &Ip, const CWiresxInf uint8 data[150U]; uint8 RoomId; bool IsLinked; - + // linked module // module A == 0 IsLinked = (Module != ' '); RoomId = (uint8)(Module - 'A'); - + // fill data buffer ::memset(data, 0x00U, 150U); ::memset(data, ' ', 128U); @@ -307,7 +307,7 @@ bool CWiresxCmdHandler::ReplyToWiresxDxReqPacket(const CIp &Ip, const CWiresxInf ::sprintf(freq, "%05u.%03u000%c%03u.%06u", WiresxInfo.GetTxFrequency() / 1000000U, freqkHz, sign, offset / 1000000U, offset % 1000000U); - + ::memcpy(data + 84U, freq, 23U); } @@ -330,7 +330,7 @@ bool CWiresxCmdHandler::ReplyToWiresxAllReqPacket(const CIp &Ip, const CWiresxIn bool ok = false; uint8 ALL_RESP[] = {0x5DU, 0x46U, 0x5FU, 0x29U}; uint8 data[1100U]; - + // fill data buffer ::memset(data, 0x00U, 1100U); // seq no @@ -358,7 +358,7 @@ bool CWiresxCmdHandler::ReplyToWiresxAllReqPacket(const CIp &Ip, const CWiresxIn char item[16U]; // module A == 0 int RoomId = i + Start; - + // prepare ::memset(data + offset, ' ', 50U); data[offset + 0U] = '5'; @@ -394,18 +394,18 @@ bool CWiresxCmdHandler::ReplyToWiresxAllReqPacket(const CIp &Ip, const CWiresxIn uint k = 1029U - offset2; ::memset(data+offset2, ' ', k); offset2 += k; - + // EOD + CRC data[offset2 + 0U] = 0x03U; data[offset2 + 1U] = CCRC::addCRC(data, offset2 + 1U); offset2 += 2U; - + // and encode the reply CBuffer Data; Data.Set(data, offset2 + 2U); ok = EncodeAndSendWiresxPacket(Ip, Data, WiresxInfo); } - + // and next repeat with normal frame { @@ -418,7 +418,7 @@ bool CWiresxCmdHandler::ReplyToWiresxAllReqPacket(const CIp &Ip, const CWiresxIn //uint k = 1031U - offset; //::memset(data+offset, ' ', k); //offset += k; - + // and encode the reply CBuffer Data; Data.Set(data, offset + 2U); @@ -441,11 +441,11 @@ bool CWiresxCmdHandler::ReplyToWiresxConnReqPacket(const CIp &Ip, const CWiresxI // linked room // Module A == 0 RoomId = (uint8)(Module - 'A'); - + // prepare buffer ::memset(data, 0x00U, 110U); ::memset(data, ' ', 90U); - + // seq no data[0U] = m_seqNo; // command @@ -499,11 +499,11 @@ bool CWiresxCmdHandler::ReplyToWiresxDiscReqPacket(const CIp &Ip, const CWiresxI uint8 DISC_RESP[] = {0x5DU, 0x41U, 0x5FU, 0x26U}; bool ok = false; uint8 data[110U]; - + // prepare buffer ::memset(data, 0x00U, 110U); ::memset(data, ' ', 90U); - + // seq no data[0U] = m_seqNo; // command @@ -547,12 +547,12 @@ bool CWiresxCmdHandler::EncodeAndSendWiresxPacket(const CIp &Ip, const CBuffer & CYSFPayload payload; uint8 buffer[200U]; - + CBuffer Data(DataOrg); - + // seq no uint8 seqNo = 0U; - + // calculate bt and adjust length uint length = (uint)Data.size(); uint8 bt = 0; @@ -577,10 +577,10 @@ bool CWiresxCmdHandler::EncodeAndSendWiresxPacket(const CIp &Ip, const CBuffer & { Data.Append((uint8)0x20U, (int)(length - (uint)Data.size())); } - + // ft uint8 ft = WiresxCalcFt(length, 0U); - + // Write the header { //header @@ -675,7 +675,7 @@ bool CWiresxCmdHandler::EncodeAndSendWiresxPacket(const CIp &Ip, const CBuffer & // and post it SendPacket(Ip, buffer); } - + // done return true; } @@ -721,7 +721,7 @@ bool CWiresxCmdHandler::DebugTestDecodePacket(const CBuffer &Buffer) CYSFPayload payload; CBuffer dump; bool valid = false; - + if ( (Buffer.size() == 155) && (Buffer.Compare(tag, sizeof(tag)) == 0) ) { // decode YSH fich @@ -733,7 +733,7 @@ bool CWiresxCmdHandler::DebugTestDecodePacket(const CBuffer &Buffer) << (int)Fich.getBT() << "," << (int)Fich.getFN() << "," << (int)Fich.getFT() << " : "; - + switch ( Fich.getFI() ) { case YSF_FI_HEADER: diff --git a/src/cwiresxcmdhandler.h b/src/cwiresxcmdhandler.h index 3a3b7a3..ae9ce05 100644 --- a/src/cwiresxcmdhandler.h +++ b/src/cwiresxcmdhandler.h @@ -41,10 +41,10 @@ class CWiresxCmdHandler public: // constructor CWiresxCmdHandler(); - + // destructor virtual ~CWiresxCmdHandler(); - + // initialization virtual bool Init(void); virtual void Close(void); @@ -56,18 +56,18 @@ public: void ReleasePacketQueue(void) { m_PacketQueue.Unlock(); } // get - + // task static void Thread(CWiresxCmdHandler *); virtual void Task(void); - + protected: // packet encoding bool ReplyToWiresxDxReqPacket(const CIp &, const CWiresxInfo &, char); bool ReplyToWiresxAllReqPacket(const CIp &, const CWiresxInfo &, int); bool ReplyToWiresxConnReqPacket(const CIp &, const CWiresxInfo &, char); bool ReplyToWiresxDiscReqPacket(const CIp &, const CWiresxInfo &); - + // packet encoding helpers bool EncodeAndSendWiresxPacket(const CIp &, const CBuffer &, const CWiresxInfo &); uint8 WiresxCalcFt(uint, uint) const; @@ -80,13 +80,13 @@ protected: // data CWiresxInfo m_ReflectorWiresxInfo; uint8_t m_seqNo; - + // queues CWiresxCmdQueue m_CmdQueue; CWiresxPacketQueue m_PacketQueue; - + // thread - bool m_bStopThread; + std::atomic keep_running; std::thread *m_pThread; }; diff --git a/src/cysfnodedir.cpp b/src/cysfnodedir.cpp index a8722af..aa11e5d 100644 --- a/src/cysfnodedir.cpp +++ b/src/cysfnodedir.cpp @@ -33,14 +33,14 @@ CYsfNodeDir::CYsfNodeDir() { - m_bStopThread = false; + keep_running = true; m_pThread = NULL; } CYsfNodeDir::~CYsfNodeDir() { // kill threads - m_bStopThread = true; + keep_running = false; if ( m_pThread != NULL ) { m_pThread->join(); @@ -56,10 +56,10 @@ bool CYsfNodeDir::Init(void) { // load content Reload(); - + // reset stop flag - m_bStopThread = false; - + keep_running = true; + // start thread; m_pThread = new std::thread(CYsfNodeDir::Thread, this); @@ -68,7 +68,7 @@ bool CYsfNodeDir::Init(void) void CYsfNodeDir::Close(void) { - m_bStopThread = true; + keep_running = false; if ( m_pThread != NULL ) { m_pThread->join(); @@ -82,7 +82,7 @@ void CYsfNodeDir::Close(void) void CYsfNodeDir::Thread(CYsfNodeDir *This) { - while ( !This->m_bStopThread ) + while (This->keep_running) { // Wait 30 seconds CTimePoint::TaskSleepFor(YSFNODEDB_REFRESH_RATE * 60000); @@ -102,7 +102,7 @@ bool CYsfNodeDir::Reload(void) { CBuffer buffer; bool ok = false; - + if ( LoadContent(&buffer) ) { Lock(); diff --git a/src/cysfnodedir.h b/src/cysfnodedir.h index b4e3804..48d976c 100644 --- a/src/cysfnodedir.h +++ b/src/cysfnodedir.h @@ -59,11 +59,11 @@ public: // init & close virtual bool Init(void); virtual void Close(void); - + // locks void Lock(void) { m_Mutex.lock(); } void Unlock(void) { m_Mutex.unlock(); } - + // refresh virtual bool LoadContent(CBuffer *) { return false; } virtual bool RefreshContent(const CBuffer &) { return false; } @@ -79,14 +79,14 @@ protected: bool Reload(void); virtual bool NeedReload(void) { return false; } //bool IsValidDmrid(const char *); - + protected: // Lock() std::mutex m_Mutex; - + // thread - bool m_bStopThread; + std::atomic keep_running; std::thread *m_pThread; }; diff --git a/src/main.h b/src/main.h index 4b6b01c..178a77f 100644 --- a/src/main.h +++ b/src/main.h @@ -25,6 +25,7 @@ #ifndef main_h #define main_h +#include #include #include #include