std::thread is not std::future

pull/1/head
Tom Early 5 years ago
parent bfa204a5c5
commit a4621db57a

@ -39,7 +39,6 @@
CCodecStream::CCodecStream(CPacketStream *PacketStream, uint16 uiId, uint8 uiCodecIn, uint8 uiCodecOut)
{
keep_running = true;
m_pThread = nullptr;
m_uiStreamId = uiId;
m_uiPid = 0;
m_uiCodecIn = uiCodecIn;
@ -64,11 +63,9 @@ CCodecStream::~CCodecStream()
// kill threads
keep_running = false;
if ( m_pThread != nullptr )
if ( m_Future.valid() )
{
m_pThread->join();
delete m_pThread;
m_pThread = nullptr;
m_Future.get();
}
// empty local queue
@ -130,7 +127,7 @@ bool CCodecStream::Init(uint16 uiPort)
}
keep_running = m_bConnected = true;
m_pThread = new std::thread(CCodecStream::Thread, this);
m_Future = std::async(std::launch::async, &CCodecStream::Thread, this);
return true;
}
@ -142,11 +139,9 @@ void CCodecStream::Close(void)
m_Socket.Close();
// kill threads
if ( m_pThread != nullptr )
if ( m_Future.valid() )
{
m_pThread->join();
delete m_pThread;
m_pThread = nullptr;
m_Future.get();
}
}
@ -161,11 +156,11 @@ bool CCodecStream::IsEmpty(void) const
////////////////////////////////////////////////////////////////////////////////////////
// thread
void CCodecStream::Thread(CCodecStream *This)
void CCodecStream::Thread()
{
while (This->keep_running)
while (keep_running)
{
This->Task();
Task();
}
}

@ -67,7 +67,7 @@ public:
bool IsEmpty(void) const;
// task
static void Thread(CCodecStream *);
void Thread(void);
void Task(void);
@ -98,7 +98,7 @@ protected:
// thread
std::atomic<bool> keep_running;
std::thread *m_pThread;
std::future<void> m_Future;
CTimePoint m_TimeoutTimer;
CTimePoint m_StatsTimer;

@ -35,19 +35,11 @@
CDmridDir::CDmridDir()
{
keep_running = true;
m_pThread = nullptr;
}
CDmridDir::~CDmridDir()
{
// kill threads
keep_running = false;
if ( m_pThread != nullptr )
{
m_pThread->join();
delete m_pThread;
m_pThread = nullptr;
}
Close();
}
@ -63,7 +55,7 @@ bool CDmridDir::Init(void)
keep_running = true;
// start thread;
m_pThread = new std::thread(CDmridDir::Thread, this);
m_Future = std::async(std::launch::async, &CDmridDir::Thread, this);
return true;
}
@ -71,29 +63,27 @@ bool CDmridDir::Init(void)
void CDmridDir::Close(void)
{
keep_running = false;
if ( m_pThread != nullptr )
if ( m_Future.valid() )
{
m_pThread->join();
delete m_pThread;
m_pThread = nullptr;
m_Future.get();
}
}
////////////////////////////////////////////////////////////////////////////////////////
// thread
void CDmridDir::Thread(CDmridDir *This)
void CDmridDir::Thread()
{
while (This->keep_running)
while (keep_running)
{
// Wait DMRIDDB_REFRESH_RATE minutes
for (int i=0; i<30*DMRIDDB_REFRESH_RATE && This->keep_running; i++)
for (int i=0; i<30*DMRIDDB_REFRESH_RATE && keep_running; i++)
CTimePoint::TaskSleepFor(2000);
// have lists files changed ?
if ( This->NeedReload() )
if ( NeedReload() )
{
This->Reload();
Reload();
}
}
}

@ -70,7 +70,7 @@ public:
protected:
// thread
static void Thread(CDmridDir *);
void Thread();
// reload helpers
bool Reload(void);
@ -87,7 +87,7 @@ protected:
// thread
std::atomic<bool> keep_running;
std::thread *m_pThread;
std::future<void> m_Future;
};

@ -83,10 +83,10 @@ bool CG3Protocol::Initalize(const char */*type*/, const uint16 /*port*/, const b
}
// start helper threads
m_pThread = new std::thread(CProtocol::Thread, this);
m_pPresenceThread = new std::thread(PresenceThread, this);
m_pPresenceThread = new std::thread(ConfigThread, this);
m_pPresenceThread = new std::thread(IcmpThread, this);
m_Future = std::async(std::launch::async, &CProtocol::Thread, this);
m_PresenceFuture = std::async(std::launch::async, &CG3Protocol::PresenceThread, this);
m_PresenceFuture = std::async(std::launch::async, &CG3Protocol::ConfigThread, this);
m_PresenceFuture = std::async(std::launch::async, &CG3Protocol::IcmpThread, this);
// update time
m_LastKeepaliveTime.Now();
@ -97,25 +97,19 @@ bool CG3Protocol::Initalize(const char */*type*/, const uint16 /*port*/, const b
void CG3Protocol::Close(void)
{
if (m_pPresenceThread != nullptr)
if (m_PresenceFuture.valid())
{
m_pPresenceThread->join();
delete m_pPresenceThread;
m_pPresenceThread = nullptr;
m_PresenceFuture.get();
}
if (m_pConfigThread != nullptr)
if (m_ConfigFuture.valid())
{
m_pConfigThread->join();
delete m_pConfigThread;
m_pConfigThread = nullptr;
m_ConfigFuture.get();
}
if (m_pIcmpThread != nullptr)
if (m_IcmpFuture.valid())
{
m_pIcmpThread->join();
delete m_pIcmpThread;
m_pIcmpThread = nullptr;
m_IcmpFuture.get();
}
}
@ -123,27 +117,27 @@ void CG3Protocol::Close(void)
////////////////////////////////////////////////////////////////////////////////////////
// private threads
void CG3Protocol::PresenceThread(CG3Protocol *This)
void CG3Protocol::PresenceThread()
{
while (This->keep_running)
while (keep_running)
{
This->PresenceTask();
PresenceTask();
}
}
void CG3Protocol::ConfigThread(CG3Protocol *This)
void CG3Protocol::ConfigThread()
{
while (This->keep_running)
while (keep_running)
{
This->ConfigTask();
ConfigTask();
}
}
void CG3Protocol::IcmpThread(CG3Protocol *This)
void CG3Protocol::IcmpThread()
{
while (This->keep_running)
while (keep_running)
{
This->IcmpTask();
IcmpTask();
}
}
@ -216,11 +210,8 @@ void CG3Protocol::PresenceTask(void)
}
}
// create new client
CG3Client *client = new CG3Client(Terminal, Ip);
// and append
clients->AddClient(client);
// create new client and append
clients->AddClient(std::make_shared<CG3Client>(Terminal, Ip));
}
else
{
@ -230,11 +221,8 @@ void CG3Protocol::PresenceTask(void)
//delete old client
clients->RemoveClient(extant);
// create new client
CG3Client *client = new CG3Client(Terminal, Ip);
// and append
clients->AddClient(client);
// create new client and append
clients->AddClient(std::make_shared<CG3Client>(Terminal, Ip));
}
}
g_Reflector.ReleaseClients();
@ -600,7 +588,7 @@ bool CG3Protocol::OnDvHeaderPacketIn(CDvHeaderPacket *Header, const CIp &Ip)
// drop if invalid module
delete Header;
g_Reflector.ReleaseClients();
return nullptr;
return false;
}
}

@ -77,9 +77,9 @@ public:
protected:
// threads
static void PresenceThread(CG3Protocol *);
static void ConfigThread(CG3Protocol *);
static void IcmpThread(CG3Protocol *);
void PresenceThread(void);
void ConfigThread(void);
void IcmpThread(void);
// helper tasks
void PresenceTask(void);
@ -113,9 +113,9 @@ protected:
bool EncodeDvLastFramePacket(const CDvLastFramePacket &, CBuffer *) const;
protected:
std::thread *m_pPresenceThread;
std::thread *m_pConfigThread;
std::thread *m_pIcmpThread;
std::future<void> m_PresenceFuture;
std::future<void> m_ConfigFuture;
std::future<void> m_IcmpFuture;
// time
CTimePoint m_LastKeepaliveTime;

@ -37,7 +37,6 @@ CGateKeeper g_GateKeeper;
CGateKeeper::CGateKeeper()
{
keep_running = true;
m_pThread = nullptr;
}
////////////////////////////////////////////////////////////////////////////////////////
@ -45,14 +44,7 @@ CGateKeeper::CGateKeeper()
CGateKeeper::~CGateKeeper()
{
// kill threads
keep_running = false;
if ( m_pThread != nullptr )
{
m_pThread->join();
delete m_pThread;
m_pThread = nullptr;
}
Close();
}
@ -71,19 +63,18 @@ bool CGateKeeper::Init(void)
keep_running = true;
// start thread;
m_pThread = new std::thread(CGateKeeper::Thread, this);
m_Future = std::async(std::launch::async, &CGateKeeper::Thread, this);
return true;
}
void CGateKeeper::Close(void)
{
// kill threads
keep_running = false;
if ( m_pThread != nullptr )
if ( m_Future.valid() )
{
m_pThread->join();
delete m_pThread;
m_pThread = nullptr;
m_Future.get();
}
}
@ -157,14 +148,14 @@ bool CGateKeeper::MayTransmit(const CCallsign &callsign, const CIp &ip, int prot
case PROTOCOL_G3:
#endif
// first check is IP & callsigned listed OK
ok &= IsNodeListedOk(callsign, ip, module);
ok = ok && IsNodeListedOk(callsign, ip, module);
// todo: then apply any protocol specific authorisation for the operation
break;
#ifndef NO_XLX
// XLX interlinks
case PROTOCOL_XLX:
ok &= IsPeerListedOk(callsign, ip, module);
ok = ok && IsPeerListedOk(callsign, ip, module);
break;
#endif
@ -188,26 +179,26 @@ bool CGateKeeper::MayTransmit(const CCallsign &callsign, const CIp &ip, int prot
////////////////////////////////////////////////////////////////////////////////////////
// thread
void CGateKeeper::Thread(CGateKeeper *This)
void CGateKeeper::Thread()
{
while (This->keep_running)
while (keep_running)
{
// Wait 30 seconds
for (int i=0; i<15 && This->keep_running; i++)
for (int i=0; i<15 && keep_running; i++)
CTimePoint::TaskSleepFor(2000);
// have lists files changed ?
if ( This->m_NodeWhiteList.NeedReload() )
if ( m_NodeWhiteList.NeedReload() )
{
This->m_NodeWhiteList.ReloadFromFile();
m_NodeWhiteList.ReloadFromFile();
}
if ( This->m_NodeBlackList.NeedReload() )
if ( m_NodeBlackList.NeedReload() )
{
This->m_NodeBlackList.ReloadFromFile();
m_NodeBlackList.ReloadFromFile();
}
if ( This->m_PeerList.NeedReload() )
if ( m_PeerList.NeedReload() )
{
This->m_PeerList.ReloadFromFile();
m_PeerList.ReloadFromFile();
}
}
}

@ -57,7 +57,7 @@ public:
protected:
// thread
static void Thread(CGateKeeper *);
void Thread();
// operation helpers
bool IsNodeListedOk(const CCallsign &, const CIp &, char = ' ') const;
@ -72,7 +72,7 @@ protected:
// thread
std::atomic<bool> keep_running;
std::thread *m_pThread;
std::future<void> m_Future;
};

@ -33,7 +33,7 @@
// constructor
CProtocol::CProtocol() : keep_running(true), m_pThread(nullptr) {}
CProtocol::CProtocol() : keep_running(true) {}
////////////////////////////////////////////////////////////////////////////////////////
@ -42,17 +42,7 @@ CProtocol::CProtocol() : keep_running(true), m_pThread(nullptr) {}
CProtocol::~CProtocol()
{
// kill threads
keep_running = false;
if ( m_pThread != nullptr )
{
m_pThread->join();
delete m_pThread;
m_pThread = nullptr;
}
// Close sockets
m_Socket6.Close();
m_Socket4.Close();
Close();
// empty queue
m_Queue.Lock();
@ -107,10 +97,12 @@ bool CProtocol::Initialize(const char *type, const uint16 port, const bool has_i
#endif
// start thread;
m_pThread = new std::thread(CProtocol::Thread, this);
if (m_pThread == nullptr)
try {
m_Future = std::async(std::launch::async, &CProtocol::Thread, this);
}
catch (const std::exception &e)
{
std::cerr << "Could not start DCS thread!" << std::endl;
std::cerr << "Could not start thread: " << e.what() << std::endl;
m_Socket4.Close();
m_Socket6.Close();
return false;
@ -120,22 +112,20 @@ bool CProtocol::Initialize(const char *type, const uint16 port, const bool has_i
return true;
}
void CProtocol::Thread(CProtocol *This)
void CProtocol::Thread()
{
while (This->keep_running)
while (keep_running)
{
This->Task();
Task();
}
}
void CProtocol::Close(void)
{
keep_running = false;
if ( m_pThread != nullptr )
if ( m_Future.valid() )
{
m_pThread->join();
delete m_pThread;
m_pThread = nullptr;
m_Future.get();
}
m_Socket4.Close();
m_Socket6.Close();

@ -88,7 +88,7 @@ public:
const CCallsign &GetReflectorCallsign(void)const { return m_ReflectorCallsign; }
// task
static void Thread(CProtocol *);
void Thread(void);
virtual void Task(void) = 0;
protected:
@ -143,7 +143,7 @@ protected:
// thread
std::atomic<bool> keep_running;
std::thread *m_pThread;
std::future<void> m_Future;
// identity
CCallsign m_ReflectorCallsign;

@ -85,7 +85,7 @@ bool CProtocols::Init(void)
#ifndef NO_G3
m_Protocols.emplace_back(std::unique_ptr<CG3Protocol>(new CG3Protocol));
if (! m_Protocols.back()->Initialize("XLX", G3_PORT, DMR_IPV4, DMR_IPV6))
if (! m_Protocols.back()->Initialize("XLX", G3_DV_PORT, DMR_IPV4, DMR_IPV6))
return false;
#endif

@ -47,7 +47,6 @@ CTranscoder g_Transcoder;
CTranscoder::CTranscoder()
{
keep_running = true;
m_pThread = nullptr;
m_bConnected = false;
m_LastKeepaliveTime.Now();
m_LastActivityTime.Now();
@ -110,7 +109,7 @@ bool CTranscoder::Init(void)
// start thread
keep_running = true;
m_pThread = new std::thread(CTranscoder::Thread, this);
m_Future = std::async(std::launch::async, &CTranscoder::Thread, this);
return true;
}
@ -134,22 +133,20 @@ void CTranscoder::Close(void)
// kill threads
keep_running = false;
if ( m_pThread != nullptr )
if ( m_Future.valid() )
{
m_pThread->join();
delete m_pThread;
m_pThread = nullptr;
m_Future.get();
}
}
////////////////////////////////////////////////////////////////////////////////////////
// thread
void CTranscoder::Thread(CTranscoder *This)
void CTranscoder::Thread()
{
while (This->keep_running)
while (keep_running)
{
This->Task();
Task();
}
}

@ -46,7 +46,7 @@ public:
CTranscoder();
// destructor
virtual ~CTranscoder();
~CTranscoder();
// initialization
bool Init(void);
@ -64,7 +64,7 @@ public:
void ReleaseStream(CCodecStream *);
// task
static void Thread(CTranscoder *);
void Thread(void);
void Task(void);
protected:
@ -94,7 +94,7 @@ protected:
// thread
std::atomic<bool> keep_running;
std::thread *m_pThread;
std::future<void> m_Future;
// socket
CIp m_Ip;

@ -41,7 +41,6 @@ CWiresxCmdHandler::CWiresxCmdHandler()
{
m_seqNo = 0;
keep_running = true;
m_pThread = nullptr;
}
@ -51,13 +50,7 @@ CWiresxCmdHandler::CWiresxCmdHandler()
CWiresxCmdHandler::~CWiresxCmdHandler()
{
// kill threads
keep_running = false;
if ( m_pThread != nullptr )
{
m_pThread->join();
delete m_pThread;
m_pThread = nullptr;
}
Close();
// empty queue
m_CmdQueue.Lock();
@ -82,7 +75,7 @@ bool CWiresxCmdHandler::Init(void)
keep_running = true;
// start thread;
m_pThread = new std::thread(CWiresxCmdHandler::Thread, this);
m_Future = std::async(std::launch::async, &CWiresxCmdHandler::Thread, this);
// done
return true;
@ -91,22 +84,20 @@ bool CWiresxCmdHandler::Init(void)
void CWiresxCmdHandler::Close(void)
{
keep_running = false;
if ( m_pThread != nullptr )
if ( m_Future.valid() )
{
m_pThread->join();
delete m_pThread;
m_pThread = nullptr;
m_Future.get();
}
}
////////////////////////////////////////////////////////////////////////////////////////
// thread
void CWiresxCmdHandler::Thread(CWiresxCmdHandler *This)
void CWiresxCmdHandler::Thread()
{
while (This->keep_running)
while (keep_running)
{
This->Task();
Task();
}
}

@ -43,11 +43,11 @@ public:
CWiresxCmdHandler();
// destructor
virtual ~CWiresxCmdHandler();
~CWiresxCmdHandler();
// initialization
virtual bool Init(void);
virtual void Close(void);
bool Init(void);
void Close(void);
// queues
CWiresxCmdQueue *GetCmdQueue(void) { m_CmdQueue.Lock(); return &m_CmdQueue; }
@ -58,8 +58,8 @@ public:
// get
// task
static void Thread(CWiresxCmdHandler *);
virtual void Task(void);
void Thread(void);
void Task(void);
protected:
// packet encoding
@ -87,7 +87,7 @@ protected:
// thread
std::atomic<bool> keep_running;
std::thread *m_pThread;
std::future<void> m_Future;
};
////////////////////////////////////////////////////////////////////////////////////////

@ -34,19 +34,12 @@
CYsfNodeDir::CYsfNodeDir()
{
keep_running = true;
m_pThread = nullptr;
}
CYsfNodeDir::~CYsfNodeDir()
{
// kill threads
keep_running = false;
if ( m_pThread != nullptr )
{
m_pThread->join();
delete m_pThread;
m_pThread = nullptr;
}
Close();
}
@ -62,7 +55,7 @@ bool CYsfNodeDir::Init(void)
keep_running = true;
// start thread;
m_pThread = new std::thread(CYsfNodeDir::Thread, this);
m_Future = std::async(std::launch::async, &CYsfNodeDir::Thread, this);
return true;
}
@ -70,29 +63,27 @@ bool CYsfNodeDir::Init(void)
void CYsfNodeDir::Close(void)
{
keep_running = false;
if ( m_pThread != nullptr )
if ( m_Future.valid() )
{
m_pThread->join();
delete m_pThread;
m_pThread = nullptr;
m_Future.get();
}
}
////////////////////////////////////////////////////////////////////////////////////////
// thread
void CYsfNodeDir::Thread(CYsfNodeDir *This)
void CYsfNodeDir::Thread()
{
while (This->keep_running)
while (keep_running)
{
// Wait YSFNODEDB_REFRESH_RATE minutes
for (int i=0; i<30*YSFNODEDB_REFRESH_RATE && This->keep_running; i++)
for (int i=0; i<30*YSFNODEDB_REFRESH_RATE && keep_running; i++)
CTimePoint::TaskSleepFor(2000);
// have lists files changed ?
if ( This->NeedReload() )
if ( NeedReload() )
{
This->Reload();
Reload();
}
}
}

@ -79,7 +79,7 @@ public:
protected:
// thread
static void Thread(CYsfNodeDir *);
void Thread();
// reload helpers
bool Reload(void);
@ -93,7 +93,7 @@ protected:
// thread
std::atomic<bool> keep_running;
std::thread *m_pThread;
std::future<void> m_Future;
CsNodeMap m_map;
};

@ -33,7 +33,7 @@
#include <map>
#include <queue>
#include <chrono>
#include <thread>
#include <future>
#include <mutex>
#include <memory>
#include <atomic>

Loading…
Cancel
Save

Powered by TurnKey Linux.