From a0123ceca90c55228de397328e171c4a41446721 Mon Sep 17 00:00:00 2001 From: Tom Early Date: Thu, 23 Feb 2023 11:08:54 -0700 Subject: [PATCH] a few more msgs --- reflector/CodecStream.cpp | 6 +++--- reflector/PacketStream.cpp | 3 +++ reflector/Reflector.cpp | 22 +++++++++++++++------- reflector/SafePacketQueue.h | 11 +++++++++-- 4 files changed, 30 insertions(+), 12 deletions(-) diff --git a/reflector/CodecStream.cpp b/reflector/CodecStream.cpp index 914d72d..0c30791 100644 --- a/reflector/CodecStream.cpp +++ b/reflector/CodecStream.cpp @@ -78,14 +78,14 @@ void CCodecStream::ReportStats() bool CCodecStream::InitCodecStream(char module) { -#ifdef DEBUG - std::cout << "Initializing codec stream for module '" << module << "'" << std::endl; -#endif m_TCWriter.SetUp(REF2TC); std::string name(TC2REF); name.append(1, module); if (m_TCReader.Open(name.c_str())) return true; +#ifdef DEBUG + std::cout << "Initialized unix socket " << name << std::endl; +#endif keep_running = true; try { diff --git a/reflector/PacketStream.cpp b/reflector/PacketStream.cpp index 3e17cb5..7e2043f 100644 --- a/reflector/PacketStream.cpp +++ b/reflector/PacketStream.cpp @@ -37,7 +37,10 @@ bool CPacketStream::InitCodecStream() if (m_CodecStream) return m_CodecStream->InitCodecStream(m_PSModule); else + { + std::cerr << "Could not create a CCodecStream for module '" << m_PSModule << "'" << std::endl; return true; + } } //////////////////////////////////////////////////////////////////////////////////////// diff --git a/reflector/Reflector.cpp b/reflector/Reflector.cpp index 4c235cb..23637a1 100644 --- a/reflector/Reflector.cpp +++ b/reflector/Reflector.cpp @@ -85,9 +85,11 @@ bool CReflector::Start(void) if (stream->InitCodecStream()) return true; } + m_Stream[c] = stream; } else { + std::cerr << "Could not make a CPacketStream for module '" << c << "'" << std::endl; return true; } try @@ -253,14 +255,19 @@ void CReflector::CloseStream(std::shared_ptr stream) void CReflector::RouterThread(const char ThisModule) { + auto pitem = m_Stream.find(ThisModule); + if (m_Stream.end() == pitem) + { + std::cerr << "Module '" << ThisModule << " CPacketStream doesn't exist! aborting RouterThread()" << std::endl; + return; + } + const auto streamIn = pitem->second; while (keep_running) { - auto streamIn = m_Stream[ThisModule]; - + // wait until something shows up + auto uptmp = streamIn->PopWait(); // convert the incoming packet to a shared_ptr - // wait until s - std::shared_ptr packet = std::move(streamIn->PopWait()); - + std::shared_ptr packet = std::move(uptmp); // set origin packet->SetPacketModule(ThisModule); @@ -276,8 +283,9 @@ void CReflector::RouterThread(const char ThisModule) csRPT.SetCSModule(ThisModule); (dynamic_cast(packet.get()))->SetRpt2Callsign(csRPT); } - - (*it)->Push(packet); + // make a copy! after the Push(tmp), tmp will be nullptr! + auto tmp = packet; + (*it)->Push(tmp); } m_Protocols.Unlock(); } diff --git a/reflector/SafePacketQueue.h b/reflector/SafePacketQueue.h index f2a89bb..fd51b40 100644 --- a/reflector/SafePacketQueue.h +++ b/reflector/SafePacketQueue.h @@ -20,7 +20,14 @@ #include #include -// A threadsafe-queue. +/************************************************************ + * THIS IS IMPORTANT + * This template is primarly designed for std::unique_ptr! + * If you are going to use it for std::shared_ptr, then + * please consider that when you Push(), what you pushed + * from will be nullptr after the Push()! +\************************************************************/ + template class CSafePacketQueue { @@ -49,7 +56,7 @@ public: } } - // If the queue is empty, wait till a element is avaiable. + // If the queue is empty, wait until an element is avaiable. T PopWait(void) { std::unique_lock lock(m);