minor cleanup

unstable
Tom Early 4 years ago
parent 55deafb30c
commit d013a3d532

@ -22,11 +22,6 @@
#include "DVFramePacket.h" #include "DVFramePacket.h"
#include "Reflector.h" #include "Reflector.h"
////////////////////////////////////////////////////////////////////////////////////////
// define
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
// constructor // constructor
@ -198,7 +193,7 @@ void CCodecStream::Task(void)
} }
else else
{ {
std::cout << "Unexpected transcoded packet received from ambed" << std::endl; std::cout << "Unexpected transcoded packet received from transcoder" << std::endl;
} }
} }
} }

@ -18,7 +18,6 @@
#pragma once #pragma once
#include "Semaphore.h"
#include "UDPSocket.h" #include "UDPSocket.h"
#include "PacketQueue.h" #include "PacketQueue.h"
@ -50,14 +49,14 @@ public:
void Close(void); void Close(void);
// get // get
bool IsConnected(void) const { return m_bConnected; } bool IsConnected(void) const { return m_bConnected; }
uint16_t GetStreamId(void) const { return m_uiStreamId; } uint16_t GetStreamId(void) const { return m_uiStreamId; }
double GetPingMin(void) const { return m_fPingMin; } double GetPingMin(void) const { return m_fPingMin; }
double GetPingMax(void) const { return m_fPingMax; } double GetPingMax(void) const { return m_fPingMax; }
double GetPingAve(void) const { return (m_fPingCount != 0) ? m_fPingSum/m_fPingCount : 0; } double GetPingAve(void) const { return (m_fPingCount != 0) ? m_fPingSum/m_fPingCount : 0; }
uint32_t GetTotalPackets(void) const { return m_uiTotalPackets; } uint32_t GetTotalPackets(void) const { return m_uiTotalPackets; }
uint32_t GetTimeoutPackets(void) const { return m_uiTimeoutPackets; } uint32_t GetTimeoutPackets(void) const { return m_uiTimeoutPackets; }
bool IsEmpty(void) const; bool IsEmpty(void) const;
// task // task
void Thread(void); void Thread(void);
@ -100,6 +99,6 @@ protected:
double m_fPingMax; double m_fPingMax;
double m_fPingSum; double m_fPingSum;
double m_fPingCount; double m_fPingCount;
uint32_t m_uiTotalPackets; uint32_t m_uiTotalPackets;
uint32_t m_uiTimeoutPackets; uint32_t m_uiTimeoutPackets;
}; };

@ -69,7 +69,7 @@ $(EXE) : $(OBJS)
g++ $(CFLAGS) $< -o $@ g++ $(CFLAGS) $< -o $@
clean : clean :
$(RM) *.o *.d ulxd xrfd $(RM) *.o *.d urfd
-include $(DEPS) -include $(DEPS)

@ -49,9 +49,9 @@ public:
// get // get
std::shared_ptr<CClient> GetOwnerClient(void) { return m_OwnerClient; } std::shared_ptr<CClient> GetOwnerClient(void) { return m_OwnerClient; }
const CIp *GetOwnerIp(void); const CIp *GetOwnerIp(void);
bool IsExpired(void) const { return (m_LastPacketTime.time() > STREAM_TIMEOUT); } bool IsExpired(void) const { return (m_LastPacketTime.time() > STREAM_TIMEOUT); }
bool IsOpen(void) const { return m_bOpen; } bool IsOpen(void) const { return m_bOpen; }
uint16_t GetStreamId(void) const { return m_uiStreamId; } uint16_t GetStreamId(void) const { return m_uiStreamId; }
const CCallsign &GetUserCallsign(void) const { return m_DvHeader.GetMyCallsign(); } const CCallsign &GetUserCallsign(void) const { return m_DvHeader.GetMyCallsign(); }
protected: protected:

@ -94,7 +94,7 @@ bool CReflector::Start(void)
for (auto it=m_Modules.cbegin(); it!=m_Modules.cend(); it++) for (auto it=m_Modules.cbegin(); it!=m_Modules.cend(); it++)
{ {
m_Stream[*it] = std::make_shared<CPacketStream>(); m_Stream[*it] = std::make_shared<CPacketStream>();
m_RouterFuture[*it] = std::async(std::launch::async, &CReflector::RouterThread, this, m_Stream[*it]); m_RouterFuture[*it] = std::async(std::launch::async, &CReflector::RouterThread, this, *it);
} }
// start the reporting threads // start the reporting threads
@ -255,16 +255,12 @@ void CReflector::CloseStream(std::shared_ptr<CPacketStream> stream)
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
// router threads // router threads
void CReflector::RouterThread(std::shared_ptr<CPacketStream> streamIn) void CReflector::RouterThread(const char ThisModule)
{ {
// get our module
const auto module = GetStreamModule(streamIn);
// get on input queue
std::unique_ptr<CPacket> packet;
while (keep_running) while (keep_running)
{ {
std::unique_ptr<CPacket> packet;
auto streamIn = m_Stream[ThisModule];
// any packet in our input queue ? // any packet in our input queue ?
streamIn->Lock(); streamIn->Lock();
if ( !streamIn->empty() ) if ( !streamIn->empty() )
@ -282,7 +278,7 @@ void CReflector::RouterThread(std::shared_ptr<CPacketStream> streamIn)
if ( packet != nullptr ) if ( packet != nullptr )
{ {
// set origin // set origin
packet->SetModule(module); packet->SetModule(ThisModule);
// iterate on all protocols // iterate on all protocols
m_Protocols.Lock(); m_Protocols.Lock();
@ -296,7 +292,7 @@ void CReflector::RouterThread(std::shared_ptr<CPacketStream> streamIn)
{ {
// get our callsign // get our callsign
CCallsign csRPT = (*it)->GetReflectorCallsign(); CCallsign csRPT = (*it)->GetReflectorCallsign();
csRPT.SetModule(GetStreamModule(streamIn)); csRPT.SetModule(ThisModule);
(dynamic_cast<CDvHeaderPacket *>(packetClone.get()))->SetRpt2Callsign(csRPT); (dynamic_cast<CDvHeaderPacket *>(packetClone.get()))->SetRpt2Callsign(csRPT);
} }

@ -87,7 +87,7 @@ public:
protected: protected:
// threads // threads
void RouterThread(std::shared_ptr<CPacketStream>); void RouterThread(const char);
void XmlReportThread(void); void XmlReportThread(void);
#ifdef JSON_MONITOR #ifdef JSON_MONITOR
void JsonReportThread(void); void JsonReportThread(void);
@ -130,7 +130,10 @@ protected:
// threads // threads
std::atomic<bool> keep_running; std::atomic<bool> keep_running;
std::unordered_map<char, std::future<void>> m_RouterFuture; std::unordered_map<char, std::future<void>> m_RouterFuture;
std::future<void> m_XmlReportFuture /*, m_JsonReportFuture*/; std::future<void> m_XmlReportFuture;
#ifdef JSON_MONITOR
std::future<void> m_JsonReportFuture;
#endif
// notifications // notifications
CNotificationQueue m_Notifications; CNotificationQueue m_Notifications;

@ -74,8 +74,8 @@ protected:
// sync objects for Openstream // sync objects for Openstream
CSemaphore m_SemaphoreOpenStream; CSemaphore m_SemaphoreOpenStream;
bool m_bStreamOpened; bool m_bStreamOpened;
uint16_t m_StreamidOpenStream; uint16_t m_StreamidOpenStream;
uint16_t m_PortOpenStream; uint16_t m_PortOpenStream;
// thread // thread
std::atomic<bool> keep_running; std::atomic<bool> keep_running;

Loading…
Cancel
Save

Powered by TurnKey Linux.