diff --git a/reflector/CodecStream.cpp b/reflector/CodecStream.cpp index 53c1e80..20c7f68 100644 --- a/reflector/CodecStream.cpp +++ b/reflector/CodecStream.cpp @@ -22,11 +22,6 @@ #include "DVFramePacket.h" #include "Reflector.h" -//////////////////////////////////////////////////////////////////////////////////////// -// define - - - //////////////////////////////////////////////////////////////////////////////////////// // constructor @@ -198,7 +193,7 @@ void CCodecStream::Task(void) } else { - std::cout << "Unexpected transcoded packet received from ambed" << std::endl; + std::cout << "Unexpected transcoded packet received from transcoder" << std::endl; } } } diff --git a/reflector/CodecStream.h b/reflector/CodecStream.h index 1c55b6f..a1f9288 100644 --- a/reflector/CodecStream.h +++ b/reflector/CodecStream.h @@ -18,7 +18,6 @@ #pragma once -#include "Semaphore.h" #include "UDPSocket.h" #include "PacketQueue.h" @@ -50,14 +49,14 @@ public: void Close(void); // get - bool IsConnected(void) const { return m_bConnected; } + bool IsConnected(void) const { return m_bConnected; } uint16_t GetStreamId(void) const { return m_uiStreamId; } - double GetPingMin(void) const { return m_fPingMin; } - double GetPingMax(void) const { return m_fPingMax; } - double GetPingAve(void) const { return (m_fPingCount != 0) ? m_fPingSum/m_fPingCount : 0; } + double GetPingMin(void) const { return m_fPingMin; } + double GetPingMax(void) const { return m_fPingMax; } + double GetPingAve(void) const { return (m_fPingCount != 0) ? m_fPingSum/m_fPingCount : 0; } uint32_t GetTotalPackets(void) const { return m_uiTotalPackets; } uint32_t GetTimeoutPackets(void) const { return m_uiTimeoutPackets; } - bool IsEmpty(void) const; + bool IsEmpty(void) const; // task void Thread(void); @@ -100,6 +99,6 @@ protected: double m_fPingMax; double m_fPingSum; double m_fPingCount; - uint32_t m_uiTotalPackets; - uint32_t m_uiTimeoutPackets; + uint32_t m_uiTotalPackets; + uint32_t m_uiTimeoutPackets; }; diff --git a/reflector/Makefile b/reflector/Makefile index ca467a8..a878fbe 100644 --- a/reflector/Makefile +++ b/reflector/Makefile @@ -69,7 +69,7 @@ $(EXE) : $(OBJS) g++ $(CFLAGS) $< -o $@ clean : - $(RM) *.o *.d ulxd xrfd + $(RM) *.o *.d urfd -include $(DEPS) diff --git a/reflector/PacketStream.h b/reflector/PacketStream.h index 3765c66..fca9098 100644 --- a/reflector/PacketStream.h +++ b/reflector/PacketStream.h @@ -49,9 +49,9 @@ public: // get std::shared_ptr GetOwnerClient(void) { return m_OwnerClient; } const CIp *GetOwnerIp(void); - bool IsExpired(void) const { return (m_LastPacketTime.time() > STREAM_TIMEOUT); } - bool IsOpen(void) const { return m_bOpen; } - uint16_t GetStreamId(void) const { return m_uiStreamId; } + bool IsExpired(void) const { return (m_LastPacketTime.time() > STREAM_TIMEOUT); } + bool IsOpen(void) const { return m_bOpen; } + uint16_t GetStreamId(void) const { return m_uiStreamId; } const CCallsign &GetUserCallsign(void) const { return m_DvHeader.GetMyCallsign(); } protected: diff --git a/reflector/Reflector.cpp b/reflector/Reflector.cpp index 0c040b3..5ce575a 100644 --- a/reflector/Reflector.cpp +++ b/reflector/Reflector.cpp @@ -94,7 +94,7 @@ bool CReflector::Start(void) for (auto it=m_Modules.cbegin(); it!=m_Modules.cend(); it++) { m_Stream[*it] = std::make_shared(); - m_RouterFuture[*it] = std::async(std::launch::async, &CReflector::RouterThread, this, m_Stream[*it]); + m_RouterFuture[*it] = std::async(std::launch::async, &CReflector::RouterThread, this, *it); } // start the reporting threads @@ -255,16 +255,12 @@ void CReflector::CloseStream(std::shared_ptr stream) //////////////////////////////////////////////////////////////////////////////////////// // router threads -void CReflector::RouterThread(std::shared_ptr streamIn) +void CReflector::RouterThread(const char ThisModule) { - // get our module - const auto module = GetStreamModule(streamIn); - - // get on input queue - std::unique_ptr packet; - while (keep_running) { + std::unique_ptr packet; + auto streamIn = m_Stream[ThisModule]; // any packet in our input queue ? streamIn->Lock(); if ( !streamIn->empty() ) @@ -282,7 +278,7 @@ void CReflector::RouterThread(std::shared_ptr streamIn) if ( packet != nullptr ) { // set origin - packet->SetModule(module); + packet->SetModule(ThisModule); // iterate on all protocols m_Protocols.Lock(); @@ -296,7 +292,7 @@ void CReflector::RouterThread(std::shared_ptr streamIn) { // get our callsign CCallsign csRPT = (*it)->GetReflectorCallsign(); - csRPT.SetModule(GetStreamModule(streamIn)); + csRPT.SetModule(ThisModule); (dynamic_cast(packetClone.get()))->SetRpt2Callsign(csRPT); } diff --git a/reflector/Reflector.h b/reflector/Reflector.h index 7255a0f..9e43b8c 100644 --- a/reflector/Reflector.h +++ b/reflector/Reflector.h @@ -87,7 +87,7 @@ public: protected: // threads - void RouterThread(std::shared_ptr); + void RouterThread(const char); void XmlReportThread(void); #ifdef JSON_MONITOR void JsonReportThread(void); @@ -130,7 +130,10 @@ protected: // threads std::atomic keep_running; std::unordered_map> m_RouterFuture; - std::future m_XmlReportFuture /*, m_JsonReportFuture*/; + std::future m_XmlReportFuture; +#ifdef JSON_MONITOR + std::future m_JsonReportFuture; +#endif // notifications CNotificationQueue m_Notifications; diff --git a/reflector/Transcoder.h b/reflector/Transcoder.h index 06e7d7e..45e3533 100644 --- a/reflector/Transcoder.h +++ b/reflector/Transcoder.h @@ -74,8 +74,8 @@ protected: // sync objects for Openstream CSemaphore m_SemaphoreOpenStream; bool m_bStreamOpened; - uint16_t m_StreamidOpenStream; - uint16_t m_PortOpenStream; + uint16_t m_StreamidOpenStream; + uint16_t m_PortOpenStream; // thread std::atomic keep_running;