CRflector too

pull/1/head
Tom Early 5 years ago
parent a4621db57a
commit b5ed9b55c3

@ -39,12 +39,6 @@
CReflector::CReflector()
{
keep_running = true;
m_XmlReportThread = nullptr;
m_JsonReportThread = nullptr;
for ( int i = 0; i < NB_OF_MODULES; i++ )
{
m_RouterThreads[i] = nullptr;
}
#ifdef DEBUG_DUMPFILE
m_DebugFile.open("/Users/jeanluc/Desktop/xlxdebug.txt");
#endif
@ -56,12 +50,6 @@ CReflector::CReflector(const CCallsign &callsign)
m_DebugFile.close();
#endif
keep_running = true;
m_XmlReportThread = nullptr;
m_JsonReportThread = nullptr;
for ( int i = 0; i < NB_OF_MODULES; i++ )
{
m_RouterThreads[i] = nullptr;
}
m_Callsign = callsign;
}
@ -71,22 +59,21 @@ CReflector::CReflector(const CCallsign &callsign)
CReflector::~CReflector()
{
keep_running = false;
if ( m_XmlReportThread != nullptr )
if ( m_XmlReportFuture.valid() )
{
m_XmlReportThread->join();
delete m_XmlReportThread;
m_XmlReportFuture.get();
}
if ( m_JsonReportThread != nullptr )
#ifdef JSON_MONITOR
if ( m_JsonReportFuture.valid() )
{
m_JsonReportThread->join();
delete m_JsonReportThread;
m_JsonReportFuture.get();
}
#endif
for ( int i = 0; i < NB_OF_MODULES; i++ )
{
if ( m_RouterThreads[i] != nullptr )
if ( m_RouterFuture[i].valid() )
{
m_RouterThreads[i]->join();
delete m_RouterThreads[i];
m_RouterFuture[i].get();
}
}
}
@ -127,13 +114,13 @@ bool CReflector::Start(void)
// start one thread per reflector module
for ( int i = 0; i < NB_OF_MODULES; i++ )
{
m_RouterThreads[i] = new std::thread(CReflector::RouterThread, this, &(m_Streams[i]));
m_RouterFuture[i] = std::async(std::launch::async, &CReflector::RouterThread, this, &(m_Streams[i]));
}
// start the reporting threads
m_XmlReportThread = new std::thread(CReflector::XmlReportThread, this);
m_XmlReportFuture = std::async(std::launch::async, &CReflector::XmlReportThread, this);
#ifdef JSON_MONITOR
m_JsonReportThread = new std::thread(CReflector::JsonReportThread, this);
m_JsonReportFuture = std::async(std::launch::async, &CReflector::JsonReportThread, this);
#endif
return true;
@ -145,27 +132,23 @@ void CReflector::Stop(void)
keep_running = false;
// stop & delete report threads
if ( m_XmlReportThread != nullptr )
if ( m_XmlReportFuture.valid() )
{
m_XmlReportThread->join();
delete m_XmlReportThread;
m_XmlReportThread = nullptr;
m_XmlReportFuture.get();
}
if ( m_JsonReportThread != nullptr )
#ifdef JSON_MONITOR
if ( m_JsonReportFuture.valid() )
{
m_JsonReportThread->join();
delete m_JsonReportThread;
m_JsonReportThread = nullptr;
m_JsonReportFuture.get();
}
#endif
// stop & delete all router thread
for ( int i = 0; i < NB_OF_MODULES; i++ )
{
if ( m_RouterThreads[i] != nullptr )
if ( m_RouterFuture[i].valid() )
{
m_RouterThreads[i]->join();
delete m_RouterThreads[i];
m_RouterThreads[i] = nullptr;
m_RouterFuture[i].get();
}
}
@ -317,15 +300,15 @@ void CReflector::CloseStream(CPacketStream *stream)
////////////////////////////////////////////////////////////////////////////////////////
// router threads
void CReflector::RouterThread(CReflector *This, CPacketStream *streamIn)
void CReflector::RouterThread(CPacketStream *streamIn)
{
// get our module
uint8 uiModuleId = This->GetStreamModule(streamIn);
uint8 uiModuleId = GetStreamModule(streamIn);
// get on input queue
CPacket *packet;
while (This->keep_running)
while (keep_running)
{
// any packet in our input queue ?
streamIn->Lock();
@ -348,8 +331,8 @@ void CReflector::RouterThread(CReflector *This, CPacketStream *streamIn)
packet->SetModuleId(uiModuleId);
// iterate on all protocols
This->m_Protocols.Lock();
for ( auto it=This->m_Protocols.begin(); it!=This->m_Protocols.end(); it++ )
m_Protocols.Lock();
for ( auto it=m_Protocols.begin(); it!=m_Protocols.end(); it++ )
{
// duplicate packet
CPacket *packetClone = packet->Duplicate();
@ -359,7 +342,7 @@ void CReflector::RouterThread(CReflector *This, CPacketStream *streamIn)
{
// get our callsign
CCallsign csRPT = (*it)->GetReflectorCallsign();
csRPT.SetModule(This->GetStreamModule(streamIn));
csRPT.SetModule(GetStreamModule(streamIn));
((CDvHeaderPacket *)packetClone)->SetRpt2Callsign(csRPT);
}
@ -368,7 +351,7 @@ void CReflector::RouterThread(CReflector *This, CPacketStream *streamIn)
queue->push(packetClone);
(*it)->ReleaseQueue();
}
This->m_Protocols.Unlock();
m_Protocols.Unlock();
// done
delete packet;
packet = nullptr;
@ -384,9 +367,9 @@ void CReflector::RouterThread(CReflector *This, CPacketStream *streamIn)
////////////////////////////////////////////////////////////////////////////////////////
// report threads
void CReflector::XmlReportThread(CReflector *This)
void CReflector::XmlReportThread()
{
while (This->keep_running)
while (keep_running)
{
// report to xml file
std::ofstream xmlFile;
@ -394,7 +377,7 @@ void CReflector::XmlReportThread(CReflector *This)
if ( xmlFile.is_open() )
{
// write xml file
This->WriteXmlFile(xmlFile);
WriteXmlFile(xmlFile);
// and close file
xmlFile.close();
@ -407,13 +390,13 @@ void CReflector::XmlReportThread(CReflector *This)
#endif
// and wait a bit
for (int i=0; i< XML_UPDATE_PERIOD && This->keep_running; i++)
for (int i=0; i< XML_UPDATE_PERIOD && keep_running; i++)
CTimePoint::TaskSleepFor(1000);
}
}
#ifdef JSON_MONITOR
void CReflector::JsonReportThread(CReflector *This)
void CReflector::JsonReportThread()
{
CUdpSocket Socket;
CBuffer Buffer;
@ -427,7 +410,7 @@ void CReflector::JsonReportThread(CReflector *This)
if ( Socket.Open(JSON_PORT) )
{
// and loop
while (This->keep_running)
while (keep_running)
{
// any command ?
if ( Socket.Receive(Buffer, Ip, 50) )
@ -441,11 +424,11 @@ void CReflector::JsonReportThread(CReflector *This)
bOn = true;
// announce ourselves
This->SendJsonReflectorObject(Socket, Ip);
SendJsonReflectorObject(Socket, Ip);
// dump tables
This->SendJsonNodesObject(Socket, Ip);
This->SendJsonStationsObject(Socket, Ip);
SendJsonNodesObject(Socket, Ip);
SendJsonStationsObject(Socket, Ip);
}
else if ( Buffer.Compare((uint8 *)"bye", 3) == 0 )
{
@ -458,14 +441,14 @@ void CReflector::JsonReportThread(CReflector *This)
// any notifications ?
CNotification notification;
This->m_Notifications.Lock();
if ( !This->m_Notifications.empty() )
m_Notifications.Lock();
if ( !m_Notifications.empty() )
{
// get the packet
notification = This->m_Notifications.front();
This->m_Notifications.pop();
notification = m_Notifications.front();
m_Notifications.pop();
}
This->m_Notifications.Unlock();
m_Notifications.Unlock();
// handle it
if ( bOn )
@ -475,20 +458,20 @@ void CReflector::JsonReportThread(CReflector *This)
case NOTIFICATION_CLIENTS:
case NOTIFICATION_PEERS:
//std::cout << "Monitor updating nodes table" << std::endl;
This->SendJsonNodesObject(Socket, Ip);
SendJsonNodesObject(Socket, Ip);
break;
case NOTIFICATION_USERS:
//std::cout << "Monitor updating stations table" << std::endl;
This->SendJsonStationsObject(Socket, Ip);
SendJsonStationsObject(Socket, Ip);
break;
case NOTIFICATION_STREAM_OPEN:
//std::cout << "Monitor notify station " << notification.GetCallsign() << "going ON air" << std::endl;
This->SendJsonStationsObject(Socket, Ip);
This->SendJsonOnairObject(Socket, Ip, notification.GetCallsign());
SendJsonStationsObject(Socket, Ip);
SendJsonOnairObject(Socket, Ip, notification.GetCallsign());
break;
case NOTIFICATION_STREAM_CLOSE:
//std::cout << "Monitor notify station " << notification.GetCallsign() << "going OFF air" << std::endl;
This->SendJsonOffairObject(Socket, Ip, notification.GetCallsign());
SendJsonOffairObject(Socket, Ip, notification.GetCallsign());
break;
case NOTIFICATION_NONE:
default:

@ -109,10 +109,10 @@ public:
protected:
// threads
static void RouterThread(CReflector *, CPacketStream *);
static void XmlReportThread(CReflector *);
void RouterThread(CPacketStream *);
void XmlReportThread(void);
#ifdef JSON_MONITOR
static void JsonReportThread(CReflector *);
void JsonReportThread(void);
#endif
// streams
@ -156,9 +156,8 @@ protected:
// threads
std::atomic<bool> keep_running;
std::array<std::thread *, NB_OF_MODULES> m_RouterThreads;
std::thread *m_XmlReportThread;
std::thread *m_JsonReportThread;
std::array<std::future<void>, NB_OF_MODULES> m_RouterFuture;
std::future<void> m_XmlReportFuture, m_JsonReportFuture;
// notifications
CNotificationQueue m_Notifications;

Loading…
Cancel
Save

Powered by TurnKey Linux.