diff --git a/reflector/BMProtocol.cpp b/reflector/BMProtocol.cpp index d733977..0b6018b 100644 --- a/reflector/BMProtocol.cpp +++ b/reflector/BMProtocol.cpp @@ -190,11 +190,10 @@ void CBMProtocol::Task(void) void CBMProtocol::HandleQueue(void) { - m_Queue.Lock(); - while ( !m_Queue.empty() ) + while (! m_Queue.IsEmpty()) { // get the packet - auto packet = m_Queue.pop(); + auto packet = m_Queue.Pop(); // encode it CBuffer buffer; @@ -237,7 +236,6 @@ void CBMProtocol::HandleQueue(void) g_Refl.ReleaseClients(); } } - m_Queue.Unlock(); } //////////////////////////////////////////////////////////////////////////////////////// diff --git a/reflector/CodecStream.cpp b/reflector/CodecStream.cpp index a6258e0..914d72d 100644 --- a/reflector/CodecStream.cpp +++ b/reflector/CodecStream.cpp @@ -134,14 +134,14 @@ void CCodecStream::Task(void) m_RTSum += rt; m_RTCount++; - if ( m_LocalQueue.empty() ) + if ( m_LocalQueue.IsEmpty() ) { std::cout << "Unexpected transcoded packet received from transcoder" << std::endl; } else { // pop the original packet - auto Packet = m_LocalQueue.pop(); + auto Packet = m_LocalQueue.Pop(); auto Frame = (CDvFramePacket *)Packet.get(); // do things look okay? @@ -160,31 +160,27 @@ void CCodecStream::Task(void) } // and push it back to client - m_PacketStream->Lock(); - m_PacketStream->push(Packet); - m_PacketStream->Unlock(); + m_PacketStream->ReturnPacket(std::move(Packet)); } } // anything in our queue - while ( !empty() ) + auto Packet = m_Queue.Pop(); + while (Packet) { - // yes, pop it from queue - auto Packet = pop(); - // we need a CDvFramePacket pointer to access Frame stuff auto Frame = (CDvFramePacket *)Packet.get(); - // push to our local queue so it can wait for the transcoder - m_LocalQueue.push(Packet); + // push to our local queue where it can wait for the transcoder + m_LocalQueue.Push(std::move(Packet)); // update important stuff in Frame->m_TCPack for the transcoder - Frame->SetTCParams(m_uiTotalPackets++); + Frame->SetTCParams(m_uiTotalPackets++); // Frame still points to the packet // now send to transcoder - // this assume that thread pushing the Packet - // have verified that the CodecStream is connected - // and that the packet needs transcoding m_TCWriter.Send(Frame->GetCodecPacket()); + + // get the next packet + Packet = m_Queue.Pop(); } } diff --git a/reflector/CodecStream.h b/reflector/CodecStream.h index e4652c4..3b5f3b9 100644 --- a/reflector/CodecStream.h +++ b/reflector/CodecStream.h @@ -21,15 +21,16 @@ #include #include +#include "DVFramePacket.h" #include "UnixDgramSocket.h" -#include "PacketQueue.h" +#include "SafePacketQueue.h" //////////////////////////////////////////////////////////////////////////////////////// // class class CPacketStream; -class CCodecStream : public CPacketQueue +class CCodecStream { public: // constructor @@ -49,6 +50,9 @@ public: void Thread(void); void Task(void); + // pass-thru + void Push(std::unique_ptr p) { m_Queue.Push(std::move(p)); } + protected: // initialization // data @@ -63,7 +67,7 @@ protected: // associated packet stream CPacketStream *m_PacketStream; - CPacketQueue m_LocalQueue; + CSafePacketQueue> m_LocalQueue, m_Queue; // thread std::atomic keep_running; diff --git a/reflector/DCSProtocol.cpp b/reflector/DCSProtocol.cpp index 90eb419..0efecfe 100644 --- a/reflector/DCSProtocol.cpp +++ b/reflector/DCSProtocol.cpp @@ -218,11 +218,10 @@ void CDcsProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, void CDcsProtocol::HandleQueue(void) { - m_Queue.Lock(); - while ( !m_Queue.empty() ) + while (! m_Queue.IsEmpty()) { // get the packet - auto packet = m_Queue.pop(); + auto packet = m_Queue.Pop(); // get our sender's id const auto module = packet->GetPacketModule(); @@ -269,7 +268,6 @@ void CDcsProtocol::HandleQueue(void) } } } - m_Queue.Unlock(); } //////////////////////////////////////////////////////////////////////////////////////// diff --git a/reflector/DExtraProtocol.cpp b/reflector/DExtraProtocol.cpp index 8a4b046..9e4c64b 100644 --- a/reflector/DExtraProtocol.cpp +++ b/reflector/DExtraProtocol.cpp @@ -193,11 +193,10 @@ void CDextraProtocol::Task(void) void CDextraProtocol::HandleQueue(void) { - m_Queue.Lock(); - while ( !m_Queue.empty() ) + while (! m_Queue.IsEmpty() ) { // get the packet - auto packet = m_Queue.pop(); + auto packet = m_Queue.Pop(); // encode it CBuffer buffer; @@ -223,7 +222,6 @@ void CDextraProtocol::HandleQueue(void) g_Refl.ReleaseClients(); } } - m_Queue.Unlock(); } //////////////////////////////////////////////////////////////////////////////////////// diff --git a/reflector/DMRMMDVMProtocol.cpp b/reflector/DMRMMDVMProtocol.cpp index 0515537..e50a943 100644 --- a/reflector/DMRMMDVMProtocol.cpp +++ b/reflector/DMRMMDVMProtocol.cpp @@ -346,12 +346,10 @@ void CDmrmmdvmProtocol::OnDvHeaderPacketIn(std::unique_ptr &Hea void CDmrmmdvmProtocol::HandleQueue(void) { - - m_Queue.Lock(); - while ( !m_Queue.empty() ) + while (! m_Queue.IsEmpty()) { // get the packet - auto packet = m_Queue.pop(); + auto packet = m_Queue.Pop(); // get our sender's id const auto mod = packet->GetPacketModule(); @@ -419,7 +417,6 @@ void CDmrmmdvmProtocol::HandleQueue(void) g_Refl.ReleaseClients(); } } - m_Queue.Unlock(); } //////////////////////////////////////////////////////////////////////////////////////// diff --git a/reflector/DMRPlusProtocol.cpp b/reflector/DMRPlusProtocol.cpp index 224635f..a625a74 100644 --- a/reflector/DMRPlusProtocol.cpp +++ b/reflector/DMRPlusProtocol.cpp @@ -218,12 +218,10 @@ void CDmrplusProtocol::OnDvHeaderPacketIn(std::unique_ptr &Head void CDmrplusProtocol::HandleQueue(void) { - - m_Queue.Lock(); - while ( !m_Queue.empty() ) + while (! m_Queue.IsEmpty()) { // get the packet - auto packet = m_Queue.pop(); + auto packet = m_Queue.Pop(); // get our sender's id const auto mod = packet->GetPacketModule(); @@ -285,7 +283,6 @@ void CDmrplusProtocol::HandleQueue(void) //buffer.DebugDump(g_Refl.m_DebugFile); } } - m_Queue.Unlock(); } void CDmrplusProtocol::SendBufferToClients(const CBuffer &buffer, uint8_t module) diff --git a/reflector/DPlusProtocol.cpp b/reflector/DPlusProtocol.cpp index bf00561..cd7ac07 100644 --- a/reflector/DPlusProtocol.cpp +++ b/reflector/DPlusProtocol.cpp @@ -228,11 +228,10 @@ void CDplusProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header void CDplusProtocol::HandleQueue(void) { - m_Queue.Lock(); - while ( !m_Queue.empty() ) + while (! m_Queue.IsEmpty()) { // get the packet - auto packet = m_Queue.pop(); + auto packet = m_Queue.Pop(); // get our sender's id const auto mod = packet->GetPacketModule(); @@ -292,7 +291,6 @@ void CDplusProtocol::HandleQueue(void) g_Refl.ReleaseClients(); } } - m_Queue.Unlock(); } void CDplusProtocol::SendDvHeader(CDvHeaderPacket *packet, CDplusClient *client) diff --git a/reflector/DVFramePacket.cpp b/reflector/DVFramePacket.cpp index 3bf7a94..6260270 100644 --- a/reflector/DVFramePacket.cpp +++ b/reflector/DVFramePacket.cpp @@ -213,14 +213,6 @@ void CDvFramePacket::EncodeInterlinkPacket(CBuffer &buf) const memcpy(data+off, m_TCPack.m17, 16); } -//////////////////////////////////////////////////////////////////////////////////////// -// virtual duplication - -std::unique_ptr CDvFramePacket::Duplicate(void) const -{ - return std::unique_ptr(new CDvFramePacket(*this)); -} - //////////////////////////////////////////////////////////////////////////////////////// // get diff --git a/reflector/DVFramePacket.h b/reflector/DVFramePacket.h index 10e5eed..bc91aed 100644 --- a/reflector/DVFramePacket.h +++ b/reflector/DVFramePacket.h @@ -62,9 +62,6 @@ public: static unsigned int GetNetworkSize(); void EncodeInterlinkPacket(CBuffer &buf) const; - // virtual duplication - std::unique_ptr Duplicate(void) const; - // identity bool IsDvHeader(void) const { return false; } bool IsDvFrame(void) const { return true; } diff --git a/reflector/DVHeaderPacket.cpp b/reflector/DVHeaderPacket.cpp index 0e09db0..356d2c6 100644 --- a/reflector/DVHeaderPacket.cpp +++ b/reflector/DVHeaderPacket.cpp @@ -165,14 +165,6 @@ CDvHeaderPacket::CDvHeaderPacket(const CM17Packet &m17) : CPacket(m17) m_csRPT1.SetCSModule('G'); } -//////////////////////////////////////////////////////////////////////////////////////// -// virtual duplication - -std::unique_ptr CDvHeaderPacket::Duplicate(void) const -{ - return std::unique_ptr(new CDvHeaderPacket(*this)); -} - //////////////////////////////////////////////////////////////////////////////////////// // conversion diff --git a/reflector/DVHeaderPacket.h b/reflector/DVHeaderPacket.h index 2a6a8cd..7ffd25c 100644 --- a/reflector/DVHeaderPacket.h +++ b/reflector/DVHeaderPacket.h @@ -20,7 +20,7 @@ #include "Callsign.h" #include "Packet.h" - + //////////////////////////////////////////////////////////////////////////////////////// // implementation details @@ -65,9 +65,6 @@ public: static unsigned int GetNetworkSize(); void EncodeInterlinkPacket(CBuffer &buf) const; - // virtual duplication - std::unique_ptr Duplicate(void) const; - // identity bool IsDvHeader(void) const { return true; } bool IsDvFrame(void) const { return false; } diff --git a/reflector/G3Protocol.cpp b/reflector/G3Protocol.cpp index c49ffc1..c075c14 100644 --- a/reflector/G3Protocol.cpp +++ b/reflector/G3Protocol.cpp @@ -442,14 +442,13 @@ void CG3Protocol::Task(void) void CG3Protocol::HandleQueue(void) { - m_Queue.Lock(); - while ( !m_Queue.empty() ) + while (! m_Queue.IsEmpty()) { // supress host checks m_LastKeepaliveTime.start(); // get the packet - auto packet = m_Queue.pop(); + auto packet = m_Queue.Pop(); // encode it CBuffer buffer; @@ -475,7 +474,6 @@ void CG3Protocol::HandleQueue(void) g_Refl.ReleaseClients(); } } - m_Queue.Unlock(); } //////////////////////////////////////////////////////////////////////////////////////// diff --git a/reflector/M17Packet.cpp b/reflector/M17Packet.cpp index 94e9510..7ed45a8 100644 --- a/reflector/M17Packet.cpp +++ b/reflector/M17Packet.cpp @@ -84,11 +84,6 @@ void CM17Packet::SetCRC(uint16_t crc) m17.crc = htons(crc); } -std::unique_ptr CM17Packet::Duplicate(void) const -{ - return std::unique_ptr(new CM17Packet(*this)); -} - bool CM17Packet::IsLastPacket() const { return ((0x8000u & ntohs(m17.framenumber)) == 0x8000u); diff --git a/reflector/M17Packet.h b/reflector/M17Packet.h index 2b0fb83..3d58ffd 100644 --- a/reflector/M17Packet.h +++ b/reflector/M17Packet.h @@ -69,7 +69,6 @@ public: uint16_t GetStreamId() const; uint16_t GetCRC() const; void SetCRC(uint16_t crc); - std::unique_ptr Duplicate(void) const; bool IsLastPacket() const; private: diff --git a/reflector/M17Protocol.cpp b/reflector/M17Protocol.cpp index fcbfce9..cbbfe74 100644 --- a/reflector/M17Protocol.cpp +++ b/reflector/M17Protocol.cpp @@ -221,11 +221,10 @@ void CM17Protocol::OnDvHeaderPacketIn(std::unique_ptr &Header, void CM17Protocol::HandleQueue(void) { - m_Queue.Lock(); - while ( !m_Queue.empty() ) + while (! m_Queue.IsEmpty()) { // get the packet - auto packet = m_Queue.pop(); + auto packet = m_Queue.Pop(); // get our sender's id const auto module = packet->GetPacketModule(); @@ -270,7 +269,6 @@ void CM17Protocol::HandleQueue(void) m_StreamsCache[module].m_iSeqCounter++; } } - m_Queue.Unlock(); } //////////////////////////////////////////////////////////////////////////////////////// diff --git a/reflector/NXDNProtocol.cpp b/reflector/NXDNProtocol.cpp index a46f488..9bb4434 100644 --- a/reflector/NXDNProtocol.cpp +++ b/reflector/NXDNProtocol.cpp @@ -246,11 +246,10 @@ void CNXDNProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, void CNXDNProtocol::HandleQueue(void) { - m_Queue.Lock(); - while ( !m_Queue.empty() ) + while (! m_Queue.IsEmpty()) { // get the packet - auto packet = m_Queue.pop(); + auto packet = m_Queue.Pop(); // get our sender's id const auto mod = packet->GetPacketModule(); @@ -310,7 +309,6 @@ void CNXDNProtocol::HandleQueue(void) g_Refl.ReleaseClients(); } } - m_Queue.Unlock(); } //////////////////////////////////////////////////////////////////////////////////////// diff --git a/reflector/P25Protocol.cpp b/reflector/P25Protocol.cpp index f93da67..449e2e3 100644 --- a/reflector/P25Protocol.cpp +++ b/reflector/P25Protocol.cpp @@ -229,11 +229,10 @@ void CP25Protocol::OnDvHeaderPacketIn(std::unique_ptr &Header, void CP25Protocol::HandleQueue(void) { - m_Queue.Lock(); - while ( !m_Queue.empty() ) + while (! m_Queue.IsEmpty()) { // get the packet - auto packet = m_Queue.pop(); + auto packet = m_Queue.Pop(); // get our sender's id const auto module = packet->GetPacketModule(); @@ -276,7 +275,6 @@ void CP25Protocol::HandleQueue(void) } } } - m_Queue.Unlock(); } diff --git a/reflector/Packet.h b/reflector/Packet.h index c09e419..94e6ab5 100644 --- a/reflector/Packet.h +++ b/reflector/Packet.h @@ -40,12 +40,6 @@ public: CPacket(uint16_t sid, uint8_t dstarpid, uint8_t dmrpid, uint8_t dmrsubpid, uint8_t ysfpid, uint8_t ysfsubpid, uint8_t ysfsubpidmax, ECodecType, bool lastpacket); CPacket(const CM17Packet &); - // destructor - virtual ~CPacket() {} - - // virtual duplication - virtual std::unique_ptr Duplicate(void) const = 0; - // identity virtual bool IsDvHeader(void) const = 0; virtual bool IsDvFrame(void) const = 0; diff --git a/reflector/PacketQueue.h b/reflector/PacketQueue.h deleted file mode 100644 index e53881c..0000000 --- a/reflector/PacketQueue.h +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright © 2015 Jean-Luc Deltombe (LX3JL). All rights reserved. - -// urfd -- The universal reflector -// Copyright © 2021 Thomas A. Early N7TAE -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -#pragma once - -#include -#include -#include "Packet.h" - -class CClient; - -class CPacketQueue -{ -public: - // destructor - virtual ~CPacketQueue() {} - - // lock - void Lock() - { - m_Mutex.lock(); - } - - void Unlock() - { - m_Mutex.unlock(); - } - - // pass thru - std::unique_ptr pop() - { - auto pack = std::move(queue.front()); - queue.pop(); - return std::move(pack); - } - - bool empty() const - { - return queue.empty(); - } - - void push(std::unique_ptr &packet) - { - queue.push(std::move(packet)); - } - -protected: - // status - bool m_bOpen; - uint16_t m_uiStreamId; - std::mutex m_Mutex; - - // the queue - std::queue> queue; -}; diff --git a/reflector/PacketStream.cpp b/reflector/PacketStream.cpp index 7774020..3e17cb5 100644 --- a/reflector/PacketStream.cpp +++ b/reflector/PacketStream.cpp @@ -83,32 +83,18 @@ void CPacketStream::Push(std::unique_ptr Packet) { Packet->UpdatePids(m_uiPacketCntr++); } - // transcoder avaliable ? - if ( m_CodecStream ) + // transcoder avaliable and is this a DvFramePacket? + if ( m_CodecStream && Packet->IsDvFrame()) { - // todo: verify no possibilty of double lock here - m_CodecStream->Lock(); - { - // transcoder ready & frame need transcoding ? - if (Packet->IsDvFrame()) - { - // yes, push packet to trancoder queue - // trancoder will push it after transcoding - // is completed - m_CodecStream->push(Packet); - } - else - { - // no, just bypass transcoder - push(Packet); - } - } - m_CodecStream->Unlock(); + // yes, push packet to trancoder queue + // trancoder will push it after transcoding + // is completed + m_CodecStream->Push(std::move(Packet)); } else { - // otherwise, push direct push - push(Packet); + // no, just bypass transcoder + Push(std::move(Packet)); } } diff --git a/reflector/PacketStream.h b/reflector/PacketStream.h index 607d918..e7b7581 100644 --- a/reflector/PacketStream.h +++ b/reflector/PacketStream.h @@ -18,7 +18,6 @@ #pragma once -#include "PacketQueue.h" #include "Timer.h" #include "DVHeaderPacket.h" #include "Client.h" @@ -32,7 +31,7 @@ //////////////////////////////////////////////////////////////////////////////////////// // class -class CPacketStream : public CPacketQueue +class CPacketStream { public: CPacketStream(char module); @@ -43,6 +42,7 @@ public: void ClosePacketStream(void); // push & pop + void ReturnPacket(std::unique_ptr p) { m_Queue.Push(std::move(p)); } void Push(std::unique_ptr packet); void Tickle(void) { m_LastPacketTime.start(); } @@ -55,8 +55,14 @@ public: const CCallsign &GetUserCallsign(void) const { return m_DvHeader.GetMyCallsign(); } char GetRpt2Module(void) const { return m_DvHeader.GetRpt2Module(); } + // pass-through + std::unique_ptr Pop() { return m_Queue.Pop(); } + std::unique_ptr PopWait() { return m_Queue.PopWait(); } + bool IsEmpty() { return m_Queue.IsEmpty(); } + protected: // data + CSafePacketQueue> m_Queue; const char m_PSModule; bool m_bOpen; uint16_t m_uiStreamId; diff --git a/reflector/Protocol.cpp b/reflector/Protocol.cpp index c5fa213..5998127 100644 --- a/reflector/Protocol.cpp +++ b/reflector/Protocol.cpp @@ -37,12 +37,10 @@ CProtocol::~CProtocol() Close(); // empty queue - m_Queue.Lock(); - while ( !m_Queue.empty() ) + while ( !m_Queue.IsEmpty() ) { - m_Queue.pop(); + m_Queue.Pop(); } - m_Queue.Unlock(); } //////////////////////////////////////////////////////////////////////////////////////// @@ -138,9 +136,7 @@ void CProtocol::OnDvFramePacketIn(std::unique_ptr &Frame, const // set the packet module, the transcoder needs this Frame->SetPacketModule(stream->GetOwnerClient()->GetReflectorModule()); // and push - stream->Lock(); stream->Push(std::move(Frame)); - stream->Unlock(); } #ifdef DEBUG else @@ -176,18 +172,15 @@ void CProtocol::CheckStreamsTimeout(void) for ( auto it=m_Streams.begin(); it!=m_Streams.end(); ) { // time out ? - it->second->Lock(); if ( it->second->IsExpired() ) { // yes, close it - it->second->Unlock(); g_Refl.CloseStream(it->second); // and remove it from the m_Streams map it = m_Streams.erase(it); } else { - it->second->Unlock(); it++; } } diff --git a/reflector/Protocol.h b/reflector/Protocol.h index 201764a..415d98c 100644 --- a/reflector/Protocol.h +++ b/reflector/Protocol.h @@ -71,10 +71,6 @@ public: virtual bool Initialize(const char *type, const EProtocol ptype, const uint16_t port, const bool has_ipv4, const bool has_ipv6); virtual void Close(void); - // queue - CPacketQueue *GetQueue(void) { m_Queue.Lock(); return &m_Queue; } - void ReleaseQueue(void) { m_Queue.Unlock(); } - // get const CCallsign &GetReflectorCallsign(void)const { return m_ReflectorCallsign; } @@ -82,6 +78,9 @@ public: void Thread(void); virtual void Task(void) = 0; + // pass-through + void Push(std::shared_ptr p) { m_Queue.Push(p); } + protected: // stream helpers virtual void OnDvFramePacketIn(std::unique_ptr &, const CIp * = nullptr); @@ -118,7 +117,6 @@ protected: void Dump(const char *title, const uint8_t *data, int length); #endif - // socket CUdpSocket m_Socket4; CUdpSocket m_Socket6; @@ -127,7 +125,7 @@ protected: std::unordered_map> m_Streams; // queue - CPacketQueue m_Queue; + CSafePacketQueue> m_Queue; // thread std::atomic keep_running; diff --git a/reflector/Reflector.cpp b/reflector/Reflector.cpp index 032984f..4c235cb 100644 --- a/reflector/Reflector.cpp +++ b/reflector/Reflector.cpp @@ -188,7 +188,6 @@ std::shared_ptr CReflector::OpenStream(std::unique_ptrLock(); // is it available ? if ( stream->OpenPacketStream(*DvHeader, client) ) { @@ -211,7 +210,6 @@ std::shared_ptr CReflector::OpenStream(std::unique_ptrGetUserCallsign()); } - stream->Unlock(); return stream; } @@ -220,22 +218,14 @@ void CReflector::CloseStream(std::shared_ptr stream) if ( stream != nullptr ) { // wait queue is empty. this waits forever - bool bEmpty = false; - do + bool bEmpty = stream->IsEmpty(); + while (! bEmpty) { - stream->Lock(); - // do not use stream->IsEmpty() has this "may" never succeed - // and anyway, the DvLastFramPacket short-circuit the transcoder - // loop queues - bEmpty = stream->empty(); - stream->Unlock(); - if ( !bEmpty ) - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + bEmpty = stream->IsEmpty(); } - while (!bEmpty); GetClients(); // lock clients - stream->Lock(); // lock stream // get and check the master std::shared_ptrclient = stream->GetOwnerClient(); @@ -253,11 +243,6 @@ void CReflector::CloseStream(std::shared_ptr stream) // release clients ReleaseClients(); - // unlock before closing - // to avoid double lock in assiociated - // codecstream close/thread-join - stream->Unlock(); - // and stop the queue stream->ClosePacketStream(); } @@ -270,54 +255,31 @@ void CReflector::RouterThread(const char ThisModule) { while (keep_running) { - std::unique_ptr packet; auto streamIn = m_Stream[ThisModule]; - // any packet in our input queue ? - streamIn->Lock(); - if ( !streamIn->empty() ) - { - // get the packet - packet = streamIn->pop(); - } - else - { - packet = nullptr; - } - streamIn->Unlock(); - // route it - if ( packet != nullptr ) - { - // set origin - packet->SetPacketModule(ThisModule); + // convert the incoming packet to a shared_ptr + // wait until s + std::shared_ptr packet = std::move(streamIn->PopWait()); - // iterate on all protocols - m_Protocols.Lock(); - for ( auto it=m_Protocols.begin(); it!=m_Protocols.end(); it++ ) + // set origin + packet->SetPacketModule(ThisModule); + + // iterate on all protocols + m_Protocols.Lock(); + for ( auto it=m_Protocols.begin(); it!=m_Protocols.end(); it++ ) + { + // if packet is header, update RPT2 according to protocol + if ( packet->IsDvHeader() ) { - // duplicate packet - auto packetClone = packet->Duplicate(); - - // if packet is header, update RPT2 according to protocol - if ( packetClone->IsDvHeader() ) - { - // get our callsign - CCallsign csRPT = (*it)->GetReflectorCallsign(); - csRPT.SetCSModule(ThisModule); - (dynamic_cast(packetClone.get()))->SetRpt2Callsign(csRPT); - } - - // and push it - CPacketQueue *queue = (*it)->GetQueue(); - queue->push(packetClone); - (*it)->ReleaseQueue(); + // get our callsign + CCallsign csRPT = (*it)->GetReflectorCallsign(); + csRPT.SetCSModule(ThisModule); + (dynamic_cast(packet.get()))->SetRpt2Callsign(csRPT); } - m_Protocols.Unlock(); - } - else - { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + (*it)->Push(packet); } + m_Protocols.Unlock(); } } diff --git a/reflector/SafePacketQueue.h b/reflector/SafePacketQueue.h new file mode 100644 index 0000000..f2a89bb --- /dev/null +++ b/reflector/SafePacketQueue.h @@ -0,0 +1,76 @@ +// urfd -- The universal reflector +// Copyright © 2023 Thomas A. Early N7TAE +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +#pragma once + +#include +#include +#include + +// A threadsafe-queue. +template +class CSafePacketQueue +{ +public: + CSafePacketQueue(void) : q() , m() , c() {} + + ~CSafePacketQueue(void) {} + + void Push(T t) + { + std::lock_guard lock(m); + q.push(std::move(t)); + c.notify_one(); + } + + T Pop(void) + { + std::lock_guard lock(m); + if (q.empty()) + return nullptr; + else + { + T val = std::move(q.front()); + q.pop(); + return val; + } + } + + // If the queue is empty, wait till a element is avaiable. + T PopWait(void) + { + std::unique_lock lock(m); + while(q.empty()) + { + // release lock as long as the wait and reaquire it afterwards. + c.wait(lock); + } + T val = std::move(q.front()); + q.pop(); + return val; + } + + bool IsEmpty(void) + { + std::unique_lock lock(m); + return q.empty(); + } + +private: + std::queue q; + mutable std::mutex m; + std::condition_variable c; +}; diff --git a/reflector/URFProtocol.cpp b/reflector/URFProtocol.cpp index ce7cbaf..c9d4f12 100644 --- a/reflector/URFProtocol.cpp +++ b/reflector/URFProtocol.cpp @@ -200,11 +200,10 @@ void CURFProtocol::Task(void) void CURFProtocol::HandleQueue(void) { - m_Queue.Lock(); - while ( !m_Queue.empty() ) + while (! m_Queue.IsEmpty()) { // get the packet - auto packet = m_Queue.pop(); + auto packet = m_Queue.Pop(); // check if origin of packet is local // if not, do not stream it out as it will cause @@ -236,7 +235,6 @@ void CURFProtocol::HandleQueue(void) } } } - m_Queue.Unlock(); } //////////////////////////////////////////////////////////////////////////////////////// diff --git a/reflector/USRPProtocol.cpp b/reflector/USRPProtocol.cpp index 0c6c794..4a17ea9 100644 --- a/reflector/USRPProtocol.cpp +++ b/reflector/USRPProtocol.cpp @@ -223,11 +223,10 @@ void CUSRPProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, void CUSRPProtocol::HandleQueue(void) { - m_Queue.Lock(); - while ( !m_Queue.empty() ) + while (! m_Queue.IsEmpty()) { // get the packet - auto packet = m_Queue.pop(); + auto packet = m_Queue.Pop(); // get our sender's id const auto module = packet->GetPacketModule(); @@ -266,7 +265,6 @@ void CUSRPProtocol::HandleQueue(void) g_Refl.ReleaseClients(); } } - m_Queue.Unlock(); } diff --git a/reflector/YSFProtocol.cpp b/reflector/YSFProtocol.cpp index 68fcfe9..20190e0 100644 --- a/reflector/YSFProtocol.cpp +++ b/reflector/YSFProtocol.cpp @@ -304,11 +304,10 @@ void CYsfProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, void CYsfProtocol::HandleQueue(void) { - m_Queue.Lock(); - while ( !m_Queue.empty() ) + while (! m_Queue.IsEmpty()) { // get the packet - auto packet = m_Queue.pop(); + auto packet = m_Queue.Pop(); // get our sender's id const auto mod = packet->GetPacketModule(); @@ -369,7 +368,6 @@ void CYsfProtocol::HandleQueue(void) g_Refl.ReleaseClients(); } } - m_Queue.Unlock(); } ////////////////////////////////////////////////////////////////////////////////////////