From 76497403e0ac0c4b285c1a86b5b488e6a98f1ae4 Mon Sep 17 00:00:00 2001 From: "Mark Landis (N6AZX)" Date: Sun, 1 Jan 2023 16:53:57 -0800 Subject: [PATCH 1/2] Use signals for program termination This commit is the first in a small series to improve the shutdown behavior of xlxd. (1) Ensures worker threads block signals by masking all signals prior to the main reflector startup via CReflector::Start(). (2) Uses sigwaitinfo() in the main thread to wait on a few normal termination signals. This works for both daemon and non-daemon operation. --- src/main.cpp | 67 ++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 21 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 723ab5b..017af80 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -26,6 +26,7 @@ #include "creflector.h" #include "syslog.h" +#include #include @@ -39,6 +40,37 @@ CReflector g_Reflector; #include "cusers.h" +// Returns caught termination signal or -1 on error +static int wait_for_termination() +{ + sigset_t waitset; + siginfo_t siginfo; + + sigemptyset(&waitset); + sigaddset(&waitset, SIGTERM); + sigaddset(&waitset, SIGINT); + sigaddset(&waitset, SIGQUIT); + sigaddset(&waitset, SIGHUP); + pthread_sigmask(SIG_BLOCK, &waitset, nullptr); + + // Now wait for termination signal + int result = -1; + while (result < 0) + { + result = sigwaitinfo(&waitset, &siginfo); + if (result == -1 && errno == EINTR) + { + // try again + if (errno == EINTR) + continue; + + // an unexpected error occurred, consider it fatal + break; + } + } + return result; +} + int main(int argc, const char * argv[]) { #ifdef RUN_AS_DAEMON @@ -101,37 +133,30 @@ int main(int argc, const char * argv[]) g_Reflector.SetCallsign(argv[1]); g_Reflector.SetListenIp(CIp(argv[2])); g_Reflector.SetTranscoderIp(CIp(CIp(argv[3]))); - + + // Block all signals while starting up the reflector -- we don't + // want any of the worker threads handling them. + sigset_t sigblockall, sigorig; + sigfillset(&sigblockall); + pthread_sigmask(SIG_SETMASK, &sigblockall, &sigorig); + // and let it run if ( !g_Reflector.Start() ) { std::cout << "Error starting reflector" << std::endl; exit(EXIT_FAILURE); } + + // Restore main thread default signal state + pthread_sigmask(SIG_SETMASK, &sigorig, nullptr); + std::cout << "Reflector " << g_Reflector.GetCallsign() << "started and listening on " << g_Reflector.GetListenIp() << std::endl; -#ifdef RUN_AS_DAEMON - // run forever - while ( true ) - { - // sleep 60 seconds - CTimePoint::TaskSleepFor(60000); - } -#else - // wait any key - for (;;) - { - // sleep 60 seconds - CTimePoint::TaskSleepFor(60000); -#ifdef DEBUG_DUMPFILE - g_Reflector.m_DebugFile.close(); -#endif - } -#endif - // and wait for end - g_Reflector.Stop(); + wait_for_termination(); + + g_Reflector->Stop(); std::cout << "Reflector stopped" << std::endl; // done From 9a37f2c680f702de9bc4797d164a4c963bc89cfb Mon Sep 17 00:00:00 2001 From: "Mark Landis (N6AZX)" Date: Sun, 1 Jan 2023 17:09:26 -0800 Subject: [PATCH 2/2] Replace thread sleep with condition variables Second patch in the improve-shutdown series. This replaces blocking sleeps with the use of condition variables for signaling thread shutdown. The details are: (1) Create CSimpleCondition class to provide a very basic (simple) condition variable that can be used in situations where external mutex control is not required, i.e., the class contains both a managed mutex and condition variable. Instances of this class can be used for basic signaling, and waiters can specify user defined predicates. (2) Replace instances of large sleeps in worker threads with use of CSimpleCondition. This allows for very quick response times from worker threads when a shutdown has been initiated. (3) Change stop thread booleans to atomic_bool. (4) Fixes small whitespace discprencies. --- src/ccodecstream.h | 2 +- src/cdmriddir.cpp | 18 ++++++++++-------- src/cdmriddir.h | 4 +++- src/cg3protocol.cpp | 5 +++-- src/cg3protocol.h | 8 +++++++- src/cgatekeeper.cpp | 11 +++++++---- src/cgatekeeper.h | 5 ++++- src/cprotocol.h | 2 +- src/creflector.cpp | 11 ++++++++--- src/creflector.h | 5 ++++- src/csimplecondition.h | 41 +++++++++++++++++++++++++++++++++++++++++ src/ctranscoder.h | 2 +- src/cwiresxcmdhandler.h | 2 +- src/cysfnodedir.cpp | 10 +++++----- src/cysfnodedir.h | 6 ++++-- src/main.cpp | 4 ++-- 16 files changed, 102 insertions(+), 34 deletions(-) create mode 100644 src/csimplecondition.h diff --git a/src/ccodecstream.h b/src/ccodecstream.h index c22e864..e11f520 100644 --- a/src/ccodecstream.h +++ b/src/ccodecstream.h @@ -97,7 +97,7 @@ protected: CPacketQueue m_LocalQueue; // thread - bool m_bStopThread; + std::atomic_bool m_bStopThread; std::thread *m_pThread; CTimePoint m_TimeoutTimer; CTimePoint m_StatsTimer; diff --git a/src/cdmriddir.cpp b/src/cdmriddir.cpp index 5d62db3..17b5400 100644 --- a/src/cdmriddir.cpp +++ b/src/cdmriddir.cpp @@ -29,6 +29,8 @@ #include "cdmriddirfile.h" #include "cdmriddirhttp.h" +#include + //////////////////////////////////////////////////////////////////////////////////////// // constructor & destructor @@ -42,6 +44,7 @@ CDmridDir::~CDmridDir() { // kill threads m_bStopThread = true; + m_cv.signal(); if ( m_pThread != NULL ) { m_pThread->join(); @@ -55,12 +58,12 @@ CDmridDir::~CDmridDir() bool CDmridDir::Init(void) { - // load content - Reload(); + // load content + Reload(); // reset stop flag m_bStopThread = false; - + // start thread; m_pThread = new std::thread(CDmridDir::Thread, this); @@ -70,6 +73,7 @@ bool CDmridDir::Init(void) void CDmridDir::Close(void) { m_bStopThread = true; + m_cv.signal(); if ( m_pThread != NULL ) { m_pThread->join(); @@ -83,15 +87,13 @@ void CDmridDir::Close(void) void CDmridDir::Thread(CDmridDir *This) { - while ( !This->m_bStopThread ) + std::chrono::minutes timeout(DMRIDDB_REFRESH_RATE); + while (!This->m_cv.wait(timeout, [&]{return This->m_bStopThread==true;})) { - // Wait 30 seconds - CTimePoint::TaskSleepFor(DMRIDDB_REFRESH_RATE * 60000); - // have lists files changed ? if ( This->NeedReload() ) { - This->Reload(); + This->Reload(); } } } diff --git a/src/cdmriddir.h b/src/cdmriddir.h index 23334e3..f98b9fd 100644 --- a/src/cdmriddir.h +++ b/src/cdmriddir.h @@ -31,6 +31,7 @@ #include #include "cbuffer.h" #include "ccallsign.h" +#include "csimplecondition.h" // compare function for std::map::find @@ -84,9 +85,10 @@ protected: // Lock() std::mutex m_Mutex; + CSimpleCondition m_cv; // thread - bool m_bStopThread; + std::atomic_bool m_bStopThread; std::thread *m_pThread; }; diff --git a/src/cg3protocol.cpp b/src/cg3protocol.cpp index 9f85420..3ae72eb 100755 --- a/src/cg3protocol.cpp +++ b/src/cg3protocol.cpp @@ -80,8 +80,8 @@ bool CG3Protocol::Init(void) { // start helper threads m_pPresenceThread = new std::thread(PresenceThread, this); - m_pPresenceThread = new std::thread(ConfigThread, this); - m_pPresenceThread = new std::thread(IcmpThread, this); + m_pConfigThread = new std::thread(ConfigThread, this); + m_pIcmpThread = new std::thread(IcmpThread, this); } #endif @@ -94,6 +94,7 @@ bool CG3Protocol::Init(void) void CG3Protocol::Close(void) { + m_bStopThread = true; if (m_pPresenceThread != NULL) { m_pPresenceThread->join(); diff --git a/src/cg3protocol.h b/src/cg3protocol.h index 5ffdb79..53d5d6a 100644 --- a/src/cg3protocol.h +++ b/src/cg3protocol.h @@ -64,7 +64,13 @@ class CG3Protocol : public CProtocol { public: // constructor - CG3Protocol() : m_GwAddress(0u), m_Modules("*"), m_LastModTime(0) {}; + CG3Protocol() : + m_pPresenceThread(nullptr), + m_pConfigThread(nullptr), + m_pIcmpThread(nullptr), + m_GwAddress(0u), + m_Modules("*"), + m_LastModTime(0) {} // destructor virtual ~CG3Protocol() {}; diff --git a/src/cgatekeeper.cpp b/src/cgatekeeper.cpp index 6bbacca..2bc4eb9 100644 --- a/src/cgatekeeper.cpp +++ b/src/cgatekeeper.cpp @@ -25,6 +25,9 @@ #include "main.h" #include "ctimepoint.h" #include "cgatekeeper.h" +#include "csimplecondition.h" + +#include //////////////////////////////////////////////////////////////////////////////////////// @@ -47,6 +50,7 @@ CGateKeeper::~CGateKeeper() { // kill threads m_bStopThread = true; + m_cv.signal(); if ( m_pThread != NULL ) { m_pThread->join(); @@ -78,6 +82,7 @@ bool CGateKeeper::Init(void) void CGateKeeper::Close(void) { m_bStopThread = true; + m_cv.signal(); if ( m_pThread != NULL ) { m_pThread->join(); @@ -179,11 +184,9 @@ bool CGateKeeper::MayTransmit(const CCallsign &callsign, const CIp &ip, int prot void CGateKeeper::Thread(CGateKeeper *This) { - while ( !This->m_bStopThread ) + std::chrono::seconds timeout(30); + while (!This->m_cv.wait(timeout, [&]{return This->m_bStopThread==true;})) { - // Wait 30 seconds - CTimePoint::TaskSleepFor(30000); - // have lists files changed ? if ( This->m_NodeWhiteList.NeedReload() ) { diff --git a/src/cgatekeeper.h b/src/cgatekeeper.h index 9ee7504..fe57456 100644 --- a/src/cgatekeeper.h +++ b/src/cgatekeeper.h @@ -30,6 +30,7 @@ #include "cip.h" #include "ccallsignlist.h" #include "cpeercallsignlist.h" +#include "csimplecondition.h" //////////////////////////////////////////////////////////////////////////////////////// // class @@ -71,8 +72,10 @@ protected: CPeerCallsignList m_PeerList; // thread - bool m_bStopThread; + CSimpleCondition m_cv; + std::atomic_bool m_bStopThread; std::thread *m_pThread; + }; diff --git a/src/cprotocol.h b/src/cprotocol.h index 1b43371..a6365c1 100644 --- a/src/cprotocol.h +++ b/src/cprotocol.h @@ -132,7 +132,7 @@ protected: CPacketQueue m_Queue; // thread - bool m_bStopThread; + std::atomic_bool m_bStopThread; std::thread *m_pThread; // identity diff --git a/src/creflector.cpp b/src/creflector.cpp index a813df9..7e6ef29 100644 --- a/src/creflector.cpp +++ b/src/creflector.cpp @@ -32,6 +32,8 @@ #include "cysfnodedirfile.h" #include "cysfnodedirhttp.h" +#include + //////////////////////////////////////////////////////////////////////////////////////// // constructor @@ -72,6 +74,7 @@ CReflector::CReflector(const CCallsign &callsign) CReflector::~CReflector() { m_bStopThreads = true; + m_cv.signal(); if ( m_XmlReportThread != NULL ) { m_XmlReportThread->join(); @@ -146,6 +149,7 @@ void CReflector::Stop(void) { // stop & delete all threads m_bStopThreads = true; + m_cv.signal(); // stop & delete report threads if ( m_XmlReportThread != NULL ) @@ -388,6 +392,7 @@ void CReflector::RouterThread(CReflector *This, CPacketStream *streamIn) void CReflector::XmlReportThread(CReflector *This) { + const std::chrono::minutes timeout(XML_UPDATE_PERIOD); while ( !This->m_bStopThreads ) { // report to xml file @@ -408,8 +413,7 @@ void CReflector::XmlReportThread(CReflector *This) } #endif - // and wait a bit - CTimePoint::TaskSleepFor(XML_UPDATE_PERIOD * 1000); + This->m_cv.wait(timeout, [&]{return This->m_bStopThreads==true;}); } } @@ -493,7 +497,8 @@ void CReflector::JsonReportThread(CReflector *This) case NOTIFICATION_NONE: default: // nothing to do, just sleep a bit - CTimePoint::TaskSleepFor(250); + std::chrono::milliseconds timeout(250); + This->m_cv.wait(timeout, [&]{return This->m_bStopThreads==true;}); break; } } diff --git a/src/creflector.h b/src/creflector.h index ece983e..e809328 100644 --- a/src/creflector.h +++ b/src/creflector.h @@ -31,6 +31,8 @@ #include "cprotocols.h" #include "cpacketstream.h" #include "cnotificationqueue.h" +#include "cysfnodedir.h" +#include "csimplecondition.h" //////////////////////////////////////////////////////////////////////////////////////// @@ -137,7 +139,8 @@ protected: std::array m_Streams; // threads - bool m_bStopThreads; + CSimpleCondition m_cv; + std::atomic_bool m_bStopThreads; std::array m_RouterThreads; std::thread *m_XmlReportThread; std::thread *m_JsonReportThread; diff --git a/src/csimplecondition.h b/src/csimplecondition.h new file mode 100644 index 0000000..0845c06 --- /dev/null +++ b/src/csimplecondition.h @@ -0,0 +1,41 @@ +#pragma once + +#include +#include + +class CSimpleCondition final +{ +public: + CSimpleCondition() : m_Mutex(), m_Condition() {} + CSimpleCondition(const CSimpleCondition&) = delete; + CSimpleCondition& operator=(const CSimpleCondition&) = delete; + CSimpleCondition(CSimpleCondition&&) = delete; + ~CSimpleCondition() {}; + + // Wait up to @duration to be signaled, or until @predicate is true. + // Returns result of predicate after timing out or being signaled. + template + bool wait(Duration, Predicate); + + // Signal waiters. If @all is true, all waiters will be woken up. + void signal(bool all=true) + { + if (all) + m_Condition.notify_all(); + else + m_Condition.notify_one(); + } + +private: + std::mutex m_Mutex; + std::condition_variable m_Condition; +}; + +// Note: @timeout is a relative duration, e.g., "30s". +template +bool CSimpleCondition::wait(Duration timeout, Predicate predicate) +{ + std::unique_lock lock(m_Mutex); + auto bound = std::chrono::system_clock::now() + timeout; + return m_Condition.wait_until(lock, bound, predicate); +} diff --git a/src/ctranscoder.h b/src/ctranscoder.h index 7d54c58..470e9bf 100644 --- a/src/ctranscoder.h +++ b/src/ctranscoder.h @@ -92,7 +92,7 @@ protected: uint16 m_PortOpenStream; // thread - bool m_bStopThread; + std::atomic_bool m_bStopThread; std::thread *m_pThread; // socket diff --git a/src/cwiresxcmdhandler.h b/src/cwiresxcmdhandler.h index 3a3b7a3..429bc08 100644 --- a/src/cwiresxcmdhandler.h +++ b/src/cwiresxcmdhandler.h @@ -86,7 +86,7 @@ protected: CWiresxPacketQueue m_PacketQueue; // thread - bool m_bStopThread; + std::atomic_bool m_bStopThread; std::thread *m_pThread; }; diff --git a/src/cysfnodedir.cpp b/src/cysfnodedir.cpp index a8722af..74e50d9 100644 --- a/src/cysfnodedir.cpp +++ b/src/cysfnodedir.cpp @@ -26,7 +26,7 @@ #include "main.h" #include "creflector.h" #include "cysfnodedir.h" - +#include //////////////////////////////////////////////////////////////////////////////////////// // constructor & destructor @@ -41,6 +41,7 @@ CYsfNodeDir::~CYsfNodeDir() { // kill threads m_bStopThread = true; + m_cv.signal(); if ( m_pThread != NULL ) { m_pThread->join(); @@ -69,6 +70,7 @@ bool CYsfNodeDir::Init(void) void CYsfNodeDir::Close(void) { m_bStopThread = true; + m_cv.signal(); if ( m_pThread != NULL ) { m_pThread->join(); @@ -82,11 +84,9 @@ void CYsfNodeDir::Close(void) void CYsfNodeDir::Thread(CYsfNodeDir *This) { - while ( !This->m_bStopThread ) + const std::chrono::minutes timeout(YSFNODEDB_REFRESH_RATE); + while (!This->m_cv.wait(timeout, [&]{return This->m_bStopThread==true;})) { - // Wait 30 seconds - CTimePoint::TaskSleepFor(YSFNODEDB_REFRESH_RATE * 60000); - // have lists files changed ? if ( This->NeedReload() ) { diff --git a/src/cysfnodedir.h b/src/cysfnodedir.h index b4e3804..916b2c2 100644 --- a/src/cysfnodedir.h +++ b/src/cysfnodedir.h @@ -32,6 +32,7 @@ #include "cbuffer.h" #include "ccallsign.h" #include "cysfnode.h" +#include "csimplecondition.h" //////////////////////////////////////////////////////////////////////////////////////// // define @@ -84,9 +85,10 @@ protected: protected: // Lock() std::mutex m_Mutex; - + // thread - bool m_bStopThread; + CSimpleCondition m_cv; + std::atomic_bool m_bStopThread; std::thread *m_pThread; }; diff --git a/src/main.cpp b/src/main.cpp index 017af80..6d8a17e 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -53,7 +53,7 @@ static int wait_for_termination() sigaddset(&waitset, SIGHUP); pthread_sigmask(SIG_BLOCK, &waitset, nullptr); - // Now wait for termination signal + // Wait for a termination signal int result = -1; while (result < 0) { @@ -156,7 +156,7 @@ int main(int argc, const char * argv[]) // and wait for end wait_for_termination(); - g_Reflector->Stop(); + g_Reflector.Stop(); std::cout << "Reflector stopped" << std::endl; // done