a few more msgs

pull/1/head
Tom Early 3 years ago
parent 10e4972040
commit a0123ceca9

@ -78,14 +78,14 @@ void CCodecStream::ReportStats()
bool CCodecStream::InitCodecStream(char module) bool CCodecStream::InitCodecStream(char module)
{ {
#ifdef DEBUG
std::cout << "Initializing codec stream for module '" << module << "'" << std::endl;
#endif
m_TCWriter.SetUp(REF2TC); m_TCWriter.SetUp(REF2TC);
std::string name(TC2REF); std::string name(TC2REF);
name.append(1, module); name.append(1, module);
if (m_TCReader.Open(name.c_str())) if (m_TCReader.Open(name.c_str()))
return true; return true;
#ifdef DEBUG
std::cout << "Initialized unix socket " << name << std::endl;
#endif
keep_running = true; keep_running = true;
try try
{ {

@ -37,7 +37,10 @@ bool CPacketStream::InitCodecStream()
if (m_CodecStream) if (m_CodecStream)
return m_CodecStream->InitCodecStream(m_PSModule); return m_CodecStream->InitCodecStream(m_PSModule);
else else
{
std::cerr << "Could not create a CCodecStream for module '" << m_PSModule << "'" << std::endl;
return true; return true;
}
} }
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////

@ -85,9 +85,11 @@ bool CReflector::Start(void)
if (stream->InitCodecStream()) if (stream->InitCodecStream())
return true; return true;
} }
m_Stream[c] = stream;
} }
else else
{ {
std::cerr << "Could not make a CPacketStream for module '" << c << "'" << std::endl;
return true; return true;
} }
try try
@ -253,14 +255,19 @@ void CReflector::CloseStream(std::shared_ptr<CPacketStream> stream)
void CReflector::RouterThread(const char ThisModule) void CReflector::RouterThread(const char ThisModule)
{ {
auto pitem = m_Stream.find(ThisModule);
if (m_Stream.end() == pitem)
{
std::cerr << "Module '" << ThisModule << " CPacketStream doesn't exist! aborting RouterThread()" << std::endl;
return;
}
const auto streamIn = pitem->second;
while (keep_running) while (keep_running)
{ {
auto streamIn = m_Stream[ThisModule]; // wait until something shows up
auto uptmp = streamIn->PopWait();
// convert the incoming packet to a shared_ptr // convert the incoming packet to a shared_ptr
// wait until s std::shared_ptr<CPacket> packet = std::move(uptmp);
std::shared_ptr<CPacket> packet = std::move(streamIn->PopWait());
// set origin // set origin
packet->SetPacketModule(ThisModule); packet->SetPacketModule(ThisModule);
@ -276,8 +283,9 @@ void CReflector::RouterThread(const char ThisModule)
csRPT.SetCSModule(ThisModule); csRPT.SetCSModule(ThisModule);
(dynamic_cast<CDvHeaderPacket *>(packet.get()))->SetRpt2Callsign(csRPT); (dynamic_cast<CDvHeaderPacket *>(packet.get()))->SetRpt2Callsign(csRPT);
} }
// make a copy! after the Push(tmp), tmp will be nullptr!
(*it)->Push(packet); auto tmp = packet;
(*it)->Push(tmp);
} }
m_Protocols.Unlock(); m_Protocols.Unlock();
} }

@ -20,7 +20,14 @@
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
// A threadsafe-queue. /************************************************************
* THIS IS IMPORTANT
* This template is primarly designed for std::unique_ptr!
* If you are going to use it for std::shared_ptr, then
* please consider that when you Push(), what you pushed
* from will be nullptr after the Push()!
\************************************************************/
template <class T> template <class T>
class CSafePacketQueue class CSafePacketQueue
{ {
@ -49,7 +56,7 @@ public:
} }
} }
// If the queue is empty, wait till a element is avaiable. // If the queue is empty, wait until an element is avaiable.
T PopWait(void) T PopWait(void)
{ {
std::unique_lock<std::mutex> lock(m); std::unique_lock<std::mutex> lock(m);

Loading…
Cancel
Save

Powered by TurnKey Linux.