From b5ed9b55c33035e7b675f6f79fc0e2bf4abad5e8 Mon Sep 17 00:00:00 2001 From: Tom Early Date: Fri, 24 Jul 2020 13:28:29 -0700 Subject: [PATCH] CRflector too --- src/creflector.cpp | 107 +++++++++++++++++++-------------------------- src/creflector.h | 11 +++-- 2 files changed, 50 insertions(+), 68 deletions(-) diff --git a/src/creflector.cpp b/src/creflector.cpp index b010748..e55c8e9 100644 --- a/src/creflector.cpp +++ b/src/creflector.cpp @@ -39,12 +39,6 @@ CReflector::CReflector() { keep_running = true; - m_XmlReportThread = nullptr; - m_JsonReportThread = nullptr; - for ( int i = 0; i < NB_OF_MODULES; i++ ) - { - m_RouterThreads[i] = nullptr; - } #ifdef DEBUG_DUMPFILE m_DebugFile.open("/Users/jeanluc/Desktop/xlxdebug.txt"); #endif @@ -56,12 +50,6 @@ CReflector::CReflector(const CCallsign &callsign) m_DebugFile.close(); #endif keep_running = true; - m_XmlReportThread = nullptr; - m_JsonReportThread = nullptr; - for ( int i = 0; i < NB_OF_MODULES; i++ ) - { - m_RouterThreads[i] = nullptr; - } m_Callsign = callsign; } @@ -71,22 +59,21 @@ CReflector::CReflector(const CCallsign &callsign) CReflector::~CReflector() { keep_running = false; - if ( m_XmlReportThread != nullptr ) + if ( m_XmlReportFuture.valid() ) { - m_XmlReportThread->join(); - delete m_XmlReportThread; + m_XmlReportFuture.get(); } - if ( m_JsonReportThread != nullptr ) +#ifdef JSON_MONITOR + if ( m_JsonReportFuture.valid() ) { - m_JsonReportThread->join(); - delete m_JsonReportThread; + m_JsonReportFuture.get(); } +#endif for ( int i = 0; i < NB_OF_MODULES; i++ ) { - if ( m_RouterThreads[i] != nullptr ) + if ( m_RouterFuture[i].valid() ) { - m_RouterThreads[i]->join(); - delete m_RouterThreads[i]; + m_RouterFuture[i].get(); } } } @@ -127,13 +114,13 @@ bool CReflector::Start(void) // start one thread per reflector module for ( int i = 0; i < NB_OF_MODULES; i++ ) { - m_RouterThreads[i] = new std::thread(CReflector::RouterThread, this, &(m_Streams[i])); + m_RouterFuture[i] = std::async(std::launch::async, &CReflector::RouterThread, this, &(m_Streams[i])); } // start the reporting threads - m_XmlReportThread = new std::thread(CReflector::XmlReportThread, this); + m_XmlReportFuture = std::async(std::launch::async, &CReflector::XmlReportThread, this); #ifdef JSON_MONITOR - m_JsonReportThread = new std::thread(CReflector::JsonReportThread, this); + m_JsonReportFuture = std::async(std::launch::async, &CReflector::JsonReportThread, this); #endif return true; @@ -145,27 +132,23 @@ void CReflector::Stop(void) keep_running = false; // stop & delete report threads - if ( m_XmlReportThread != nullptr ) + if ( m_XmlReportFuture.valid() ) { - m_XmlReportThread->join(); - delete m_XmlReportThread; - m_XmlReportThread = nullptr; + m_XmlReportFuture.get(); } - if ( m_JsonReportThread != nullptr ) +#ifdef JSON_MONITOR + if ( m_JsonReportFuture.valid() ) { - m_JsonReportThread->join(); - delete m_JsonReportThread; - m_JsonReportThread = nullptr; + m_JsonReportFuture.get(); } +#endif // stop & delete all router thread for ( int i = 0; i < NB_OF_MODULES; i++ ) { - if ( m_RouterThreads[i] != nullptr ) + if ( m_RouterFuture[i].valid() ) { - m_RouterThreads[i]->join(); - delete m_RouterThreads[i]; - m_RouterThreads[i] = nullptr; + m_RouterFuture[i].get(); } } @@ -317,15 +300,15 @@ void CReflector::CloseStream(CPacketStream *stream) //////////////////////////////////////////////////////////////////////////////////////// // router threads -void CReflector::RouterThread(CReflector *This, CPacketStream *streamIn) +void CReflector::RouterThread(CPacketStream *streamIn) { // get our module - uint8 uiModuleId = This->GetStreamModule(streamIn); + uint8 uiModuleId = GetStreamModule(streamIn); // get on input queue CPacket *packet; - while (This->keep_running) + while (keep_running) { // any packet in our input queue ? streamIn->Lock(); @@ -348,8 +331,8 @@ void CReflector::RouterThread(CReflector *This, CPacketStream *streamIn) packet->SetModuleId(uiModuleId); // iterate on all protocols - This->m_Protocols.Lock(); - for ( auto it=This->m_Protocols.begin(); it!=This->m_Protocols.end(); it++ ) + m_Protocols.Lock(); + for ( auto it=m_Protocols.begin(); it!=m_Protocols.end(); it++ ) { // duplicate packet CPacket *packetClone = packet->Duplicate(); @@ -359,7 +342,7 @@ void CReflector::RouterThread(CReflector *This, CPacketStream *streamIn) { // get our callsign CCallsign csRPT = (*it)->GetReflectorCallsign(); - csRPT.SetModule(This->GetStreamModule(streamIn)); + csRPT.SetModule(GetStreamModule(streamIn)); ((CDvHeaderPacket *)packetClone)->SetRpt2Callsign(csRPT); } @@ -368,7 +351,7 @@ void CReflector::RouterThread(CReflector *This, CPacketStream *streamIn) queue->push(packetClone); (*it)->ReleaseQueue(); } - This->m_Protocols.Unlock(); + m_Protocols.Unlock(); // done delete packet; packet = nullptr; @@ -384,9 +367,9 @@ void CReflector::RouterThread(CReflector *This, CPacketStream *streamIn) //////////////////////////////////////////////////////////////////////////////////////// // report threads -void CReflector::XmlReportThread(CReflector *This) +void CReflector::XmlReportThread() { - while (This->keep_running) + while (keep_running) { // report to xml file std::ofstream xmlFile; @@ -394,7 +377,7 @@ void CReflector::XmlReportThread(CReflector *This) if ( xmlFile.is_open() ) { // write xml file - This->WriteXmlFile(xmlFile); + WriteXmlFile(xmlFile); // and close file xmlFile.close(); @@ -407,13 +390,13 @@ void CReflector::XmlReportThread(CReflector *This) #endif // and wait a bit - for (int i=0; i< XML_UPDATE_PERIOD && This->keep_running; i++) + for (int i=0; i< XML_UPDATE_PERIOD && keep_running; i++) CTimePoint::TaskSleepFor(1000); } } #ifdef JSON_MONITOR -void CReflector::JsonReportThread(CReflector *This) +void CReflector::JsonReportThread() { CUdpSocket Socket; CBuffer Buffer; @@ -427,7 +410,7 @@ void CReflector::JsonReportThread(CReflector *This) if ( Socket.Open(JSON_PORT) ) { // and loop - while (This->keep_running) + while (keep_running) { // any command ? if ( Socket.Receive(Buffer, Ip, 50) ) @@ -441,11 +424,11 @@ void CReflector::JsonReportThread(CReflector *This) bOn = true; // announce ourselves - This->SendJsonReflectorObject(Socket, Ip); + SendJsonReflectorObject(Socket, Ip); // dump tables - This->SendJsonNodesObject(Socket, Ip); - This->SendJsonStationsObject(Socket, Ip); + SendJsonNodesObject(Socket, Ip); + SendJsonStationsObject(Socket, Ip); } else if ( Buffer.Compare((uint8 *)"bye", 3) == 0 ) { @@ -458,14 +441,14 @@ void CReflector::JsonReportThread(CReflector *This) // any notifications ? CNotification notification; - This->m_Notifications.Lock(); - if ( !This->m_Notifications.empty() ) + m_Notifications.Lock(); + if ( !m_Notifications.empty() ) { // get the packet - notification = This->m_Notifications.front(); - This->m_Notifications.pop(); + notification = m_Notifications.front(); + m_Notifications.pop(); } - This->m_Notifications.Unlock(); + m_Notifications.Unlock(); // handle it if ( bOn ) @@ -475,20 +458,20 @@ void CReflector::JsonReportThread(CReflector *This) case NOTIFICATION_CLIENTS: case NOTIFICATION_PEERS: //std::cout << "Monitor updating nodes table" << std::endl; - This->SendJsonNodesObject(Socket, Ip); + SendJsonNodesObject(Socket, Ip); break; case NOTIFICATION_USERS: //std::cout << "Monitor updating stations table" << std::endl; - This->SendJsonStationsObject(Socket, Ip); + SendJsonStationsObject(Socket, Ip); break; case NOTIFICATION_STREAM_OPEN: //std::cout << "Monitor notify station " << notification.GetCallsign() << "going ON air" << std::endl; - This->SendJsonStationsObject(Socket, Ip); - This->SendJsonOnairObject(Socket, Ip, notification.GetCallsign()); + SendJsonStationsObject(Socket, Ip); + SendJsonOnairObject(Socket, Ip, notification.GetCallsign()); break; case NOTIFICATION_STREAM_CLOSE: //std::cout << "Monitor notify station " << notification.GetCallsign() << "going OFF air" << std::endl; - This->SendJsonOffairObject(Socket, Ip, notification.GetCallsign()); + SendJsonOffairObject(Socket, Ip, notification.GetCallsign()); break; case NOTIFICATION_NONE: default: diff --git a/src/creflector.h b/src/creflector.h index 8ae51e6..c1ad3c3 100644 --- a/src/creflector.h +++ b/src/creflector.h @@ -109,10 +109,10 @@ public: protected: // threads - static void RouterThread(CReflector *, CPacketStream *); - static void XmlReportThread(CReflector *); + void RouterThread(CPacketStream *); + void XmlReportThread(void); #ifdef JSON_MONITOR - static void JsonReportThread(CReflector *); + void JsonReportThread(void); #endif // streams @@ -156,9 +156,8 @@ protected: // threads std::atomic keep_running; - std::array m_RouterThreads; - std::thread *m_XmlReportThread; - std::thread *m_JsonReportThread; + std::array, NB_OF_MODULES> m_RouterFuture; + std::future m_XmlReportFuture, m_JsonReportFuture; // notifications CNotificationQueue m_Notifications;