works (in idle) without crashing

pull/1/head
Tom Early 3 years ago
parent 939773f54b
commit 10e4972040

@ -190,11 +190,10 @@ void CBMProtocol::Task(void)
void CBMProtocol::HandleQueue(void)
{
m_Queue.Lock();
while ( !m_Queue.empty() )
while (! m_Queue.IsEmpty())
{
// get the packet
auto packet = m_Queue.pop();
auto packet = m_Queue.Pop();
// encode it
CBuffer buffer;
@ -237,7 +236,6 @@ void CBMProtocol::HandleQueue(void)
g_Refl.ReleaseClients();
}
}
m_Queue.Unlock();
}
////////////////////////////////////////////////////////////////////////////////////////

@ -134,14 +134,14 @@ void CCodecStream::Task(void)
m_RTSum += rt;
m_RTCount++;
if ( m_LocalQueue.empty() )
if ( m_LocalQueue.IsEmpty() )
{
std::cout << "Unexpected transcoded packet received from transcoder" << std::endl;
}
else
{
// pop the original packet
auto Packet = m_LocalQueue.pop();
auto Packet = m_LocalQueue.Pop();
auto Frame = (CDvFramePacket *)Packet.get();
// do things look okay?
@ -160,31 +160,27 @@ void CCodecStream::Task(void)
}
// and push it back to client
m_PacketStream->Lock();
m_PacketStream->push(Packet);
m_PacketStream->Unlock();
m_PacketStream->ReturnPacket(std::move(Packet));
}
}
// anything in our queue
while ( !empty() )
auto Packet = m_Queue.Pop();
while (Packet)
{
// yes, pop it from queue
auto Packet = pop();
// we need a CDvFramePacket pointer to access Frame stuff
auto Frame = (CDvFramePacket *)Packet.get();
// push to our local queue so it can wait for the transcoder
m_LocalQueue.push(Packet);
// push to our local queue where it can wait for the transcoder
m_LocalQueue.Push(std::move(Packet));
// update important stuff in Frame->m_TCPack for the transcoder
Frame->SetTCParams(m_uiTotalPackets++);
Frame->SetTCParams(m_uiTotalPackets++); // Frame still points to the packet
// now send to transcoder
// this assume that thread pushing the Packet
// have verified that the CodecStream is connected
// and that the packet needs transcoding
m_TCWriter.Send(Frame->GetCodecPacket());
// get the next packet
Packet = m_Queue.Pop();
}
}

@ -21,15 +21,16 @@
#include <atomic>
#include <future>
#include "DVFramePacket.h"
#include "UnixDgramSocket.h"
#include "PacketQueue.h"
#include "SafePacketQueue.h"
////////////////////////////////////////////////////////////////////////////////////////
// class
class CPacketStream;
class CCodecStream : public CPacketQueue
class CCodecStream
{
public:
// constructor
@ -49,6 +50,9 @@ public:
void Thread(void);
void Task(void);
// pass-thru
void Push(std::unique_ptr<CPacket> p) { m_Queue.Push(std::move(p)); }
protected:
// initialization
// data
@ -63,7 +67,7 @@ protected:
// associated packet stream
CPacketStream *m_PacketStream;
CPacketQueue m_LocalQueue;
CSafePacketQueue<std::unique_ptr<CPacket>> m_LocalQueue, m_Queue;
// thread
std::atomic<bool> keep_running;

@ -218,11 +218,10 @@ void CDcsProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header,
void CDcsProtocol::HandleQueue(void)
{
m_Queue.Lock();
while ( !m_Queue.empty() )
while (! m_Queue.IsEmpty())
{
// get the packet
auto packet = m_Queue.pop();
auto packet = m_Queue.Pop();
// get our sender's id
const auto module = packet->GetPacketModule();
@ -269,7 +268,6 @@ void CDcsProtocol::HandleQueue(void)
}
}
}
m_Queue.Unlock();
}
////////////////////////////////////////////////////////////////////////////////////////

@ -193,11 +193,10 @@ void CDextraProtocol::Task(void)
void CDextraProtocol::HandleQueue(void)
{
m_Queue.Lock();
while ( !m_Queue.empty() )
while (! m_Queue.IsEmpty() )
{
// get the packet
auto packet = m_Queue.pop();
auto packet = m_Queue.Pop();
// encode it
CBuffer buffer;
@ -223,7 +222,6 @@ void CDextraProtocol::HandleQueue(void)
g_Refl.ReleaseClients();
}
}
m_Queue.Unlock();
}
////////////////////////////////////////////////////////////////////////////////////////

@ -346,12 +346,10 @@ void CDmrmmdvmProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Hea
void CDmrmmdvmProtocol::HandleQueue(void)
{
m_Queue.Lock();
while ( !m_Queue.empty() )
while (! m_Queue.IsEmpty())
{
// get the packet
auto packet = m_Queue.pop();
auto packet = m_Queue.Pop();
// get our sender's id
const auto mod = packet->GetPacketModule();
@ -419,7 +417,6 @@ void CDmrmmdvmProtocol::HandleQueue(void)
g_Refl.ReleaseClients();
}
}
m_Queue.Unlock();
}
////////////////////////////////////////////////////////////////////////////////////////

@ -218,12 +218,10 @@ void CDmrplusProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Head
void CDmrplusProtocol::HandleQueue(void)
{
m_Queue.Lock();
while ( !m_Queue.empty() )
while (! m_Queue.IsEmpty())
{
// get the packet
auto packet = m_Queue.pop();
auto packet = m_Queue.Pop();
// get our sender's id
const auto mod = packet->GetPacketModule();
@ -285,7 +283,6 @@ void CDmrplusProtocol::HandleQueue(void)
//buffer.DebugDump(g_Refl.m_DebugFile);
}
}
m_Queue.Unlock();
}
void CDmrplusProtocol::SendBufferToClients(const CBuffer &buffer, uint8_t module)

@ -228,11 +228,10 @@ void CDplusProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header
void CDplusProtocol::HandleQueue(void)
{
m_Queue.Lock();
while ( !m_Queue.empty() )
while (! m_Queue.IsEmpty())
{
// get the packet
auto packet = m_Queue.pop();
auto packet = m_Queue.Pop();
// get our sender's id
const auto mod = packet->GetPacketModule();
@ -292,7 +291,6 @@ void CDplusProtocol::HandleQueue(void)
g_Refl.ReleaseClients();
}
}
m_Queue.Unlock();
}
void CDplusProtocol::SendDvHeader(CDvHeaderPacket *packet, CDplusClient *client)

@ -213,14 +213,6 @@ void CDvFramePacket::EncodeInterlinkPacket(CBuffer &buf) const
memcpy(data+off, m_TCPack.m17, 16);
}
////////////////////////////////////////////////////////////////////////////////////////
// virtual duplication
std::unique_ptr<CPacket> CDvFramePacket::Duplicate(void) const
{
return std::unique_ptr<CPacket>(new CDvFramePacket(*this));
}
////////////////////////////////////////////////////////////////////////////////////////
// get

@ -62,9 +62,6 @@ public:
static unsigned int GetNetworkSize();
void EncodeInterlinkPacket(CBuffer &buf) const;
// virtual duplication
std::unique_ptr<CPacket> Duplicate(void) const;
// identity
bool IsDvHeader(void) const { return false; }
bool IsDvFrame(void) const { return true; }

@ -165,14 +165,6 @@ CDvHeaderPacket::CDvHeaderPacket(const CM17Packet &m17) : CPacket(m17)
m_csRPT1.SetCSModule('G');
}
////////////////////////////////////////////////////////////////////////////////////////
// virtual duplication
std::unique_ptr<CPacket> CDvHeaderPacket::Duplicate(void) const
{
return std::unique_ptr<CPacket>(new CDvHeaderPacket(*this));
}
////////////////////////////////////////////////////////////////////////////////////////
// conversion

@ -65,9 +65,6 @@ public:
static unsigned int GetNetworkSize();
void EncodeInterlinkPacket(CBuffer &buf) const;
// virtual duplication
std::unique_ptr<CPacket> Duplicate(void) const;
// identity
bool IsDvHeader(void) const { return true; }
bool IsDvFrame(void) const { return false; }

@ -442,14 +442,13 @@ void CG3Protocol::Task(void)
void CG3Protocol::HandleQueue(void)
{
m_Queue.Lock();
while ( !m_Queue.empty() )
while (! m_Queue.IsEmpty())
{
// supress host checks
m_LastKeepaliveTime.start();
// get the packet
auto packet = m_Queue.pop();
auto packet = m_Queue.Pop();
// encode it
CBuffer buffer;
@ -475,7 +474,6 @@ void CG3Protocol::HandleQueue(void)
g_Refl.ReleaseClients();
}
}
m_Queue.Unlock();
}
////////////////////////////////////////////////////////////////////////////////////////

@ -84,11 +84,6 @@ void CM17Packet::SetCRC(uint16_t crc)
m17.crc = htons(crc);
}
std::unique_ptr<CM17Packet> CM17Packet::Duplicate(void) const
{
return std::unique_ptr<CM17Packet>(new CM17Packet(*this));
}
bool CM17Packet::IsLastPacket() const
{
return ((0x8000u & ntohs(m17.framenumber)) == 0x8000u);

@ -69,7 +69,6 @@ public:
uint16_t GetStreamId() const;
uint16_t GetCRC() const;
void SetCRC(uint16_t crc);
std::unique_ptr<CM17Packet> Duplicate(void) const;
bool IsLastPacket() const;
private:

@ -221,11 +221,10 @@ void CM17Protocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header,
void CM17Protocol::HandleQueue(void)
{
m_Queue.Lock();
while ( !m_Queue.empty() )
while (! m_Queue.IsEmpty())
{
// get the packet
auto packet = m_Queue.pop();
auto packet = m_Queue.Pop();
// get our sender's id
const auto module = packet->GetPacketModule();
@ -270,7 +269,6 @@ void CM17Protocol::HandleQueue(void)
m_StreamsCache[module].m_iSeqCounter++;
}
}
m_Queue.Unlock();
}
////////////////////////////////////////////////////////////////////////////////////////

@ -246,11 +246,10 @@ void CNXDNProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header,
void CNXDNProtocol::HandleQueue(void)
{
m_Queue.Lock();
while ( !m_Queue.empty() )
while (! m_Queue.IsEmpty())
{
// get the packet
auto packet = m_Queue.pop();
auto packet = m_Queue.Pop();
// get our sender's id
const auto mod = packet->GetPacketModule();
@ -310,7 +309,6 @@ void CNXDNProtocol::HandleQueue(void)
g_Refl.ReleaseClients();
}
}
m_Queue.Unlock();
}
////////////////////////////////////////////////////////////////////////////////////////

@ -229,11 +229,10 @@ void CP25Protocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header,
void CP25Protocol::HandleQueue(void)
{
m_Queue.Lock();
while ( !m_Queue.empty() )
while (! m_Queue.IsEmpty())
{
// get the packet
auto packet = m_Queue.pop();
auto packet = m_Queue.Pop();
// get our sender's id
const auto module = packet->GetPacketModule();
@ -276,7 +275,6 @@ void CP25Protocol::HandleQueue(void)
}
}
}
m_Queue.Unlock();
}

@ -40,12 +40,6 @@ public:
CPacket(uint16_t sid, uint8_t dstarpid, uint8_t dmrpid, uint8_t dmrsubpid, uint8_t ysfpid, uint8_t ysfsubpid, uint8_t ysfsubpidmax, ECodecType, bool lastpacket);
CPacket(const CM17Packet &);
// destructor
virtual ~CPacket() {}
// virtual duplication
virtual std::unique_ptr<CPacket> Duplicate(void) const = 0;
// identity
virtual bool IsDvHeader(void) const = 0;
virtual bool IsDvFrame(void) const = 0;

@ -1,70 +0,0 @@
// Copyright © 2015 Jean-Luc Deltombe (LX3JL). All rights reserved.
// urfd -- The universal reflector
// Copyright © 2021 Thomas A. Early N7TAE
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#pragma once
#include <mutex>
#include <queue>
#include "Packet.h"
class CClient;
class CPacketQueue
{
public:
// destructor
virtual ~CPacketQueue() {}
// lock
void Lock()
{
m_Mutex.lock();
}
void Unlock()
{
m_Mutex.unlock();
}
// pass thru
std::unique_ptr<CPacket> pop()
{
auto pack = std::move(queue.front());
queue.pop();
return std::move(pack);
}
bool empty() const
{
return queue.empty();
}
void push(std::unique_ptr<CPacket> &packet)
{
queue.push(std::move(packet));
}
protected:
// status
bool m_bOpen;
uint16_t m_uiStreamId;
std::mutex m_Mutex;
// the queue
std::queue<std::unique_ptr<CPacket>> queue;
};

@ -83,32 +83,18 @@ void CPacketStream::Push(std::unique_ptr<CPacket> Packet)
{
Packet->UpdatePids(m_uiPacketCntr++);
}
// transcoder avaliable ?
if ( m_CodecStream )
// transcoder avaliable and is this a DvFramePacket?
if ( m_CodecStream && Packet->IsDvFrame())
{
// todo: verify no possibilty of double lock here
m_CodecStream->Lock();
{
// transcoder ready & frame need transcoding ?
if (Packet->IsDvFrame())
{
// yes, push packet to trancoder queue
// trancoder will push it after transcoding
// is completed
m_CodecStream->push(Packet);
}
else
{
// no, just bypass transcoder
push(Packet);
}
}
m_CodecStream->Unlock();
// yes, push packet to trancoder queue
// trancoder will push it after transcoding
// is completed
m_CodecStream->Push(std::move(Packet));
}
else
{
// otherwise, push direct push
push(Packet);
// no, just bypass transcoder
Push(std::move(Packet));
}
}

@ -18,7 +18,6 @@
#pragma once
#include "PacketQueue.h"
#include "Timer.h"
#include "DVHeaderPacket.h"
#include "Client.h"
@ -32,7 +31,7 @@
////////////////////////////////////////////////////////////////////////////////////////
// class
class CPacketStream : public CPacketQueue
class CPacketStream
{
public:
CPacketStream(char module);
@ -43,6 +42,7 @@ public:
void ClosePacketStream(void);
// push & pop
void ReturnPacket(std::unique_ptr<CPacket> p) { m_Queue.Push(std::move(p)); }
void Push(std::unique_ptr<CPacket> packet);
void Tickle(void) { m_LastPacketTime.start(); }
@ -55,8 +55,14 @@ public:
const CCallsign &GetUserCallsign(void) const { return m_DvHeader.GetMyCallsign(); }
char GetRpt2Module(void) const { return m_DvHeader.GetRpt2Module(); }
// pass-through
std::unique_ptr<CPacket> Pop() { return m_Queue.Pop(); }
std::unique_ptr<CPacket> PopWait() { return m_Queue.PopWait(); }
bool IsEmpty() { return m_Queue.IsEmpty(); }
protected:
// data
CSafePacketQueue<std::unique_ptr<CPacket>> m_Queue;
const char m_PSModule;
bool m_bOpen;
uint16_t m_uiStreamId;

@ -37,12 +37,10 @@ CProtocol::~CProtocol()
Close();
// empty queue
m_Queue.Lock();
while ( !m_Queue.empty() )
while ( !m_Queue.IsEmpty() )
{
m_Queue.pop();
m_Queue.Pop();
}
m_Queue.Unlock();
}
////////////////////////////////////////////////////////////////////////////////////////
@ -138,9 +136,7 @@ void CProtocol::OnDvFramePacketIn(std::unique_ptr<CDvFramePacket> &Frame, const
// set the packet module, the transcoder needs this
Frame->SetPacketModule(stream->GetOwnerClient()->GetReflectorModule());
// and push
stream->Lock();
stream->Push(std::move(Frame));
stream->Unlock();
}
#ifdef DEBUG
else
@ -176,18 +172,15 @@ void CProtocol::CheckStreamsTimeout(void)
for ( auto it=m_Streams.begin(); it!=m_Streams.end(); )
{
// time out ?
it->second->Lock();
if ( it->second->IsExpired() )
{
// yes, close it
it->second->Unlock();
g_Refl.CloseStream(it->second);
// and remove it from the m_Streams map
it = m_Streams.erase(it);
}
else
{
it->second->Unlock();
it++;
}
}

@ -71,10 +71,6 @@ public:
virtual bool Initialize(const char *type, const EProtocol ptype, const uint16_t port, const bool has_ipv4, const bool has_ipv6);
virtual void Close(void);
// queue
CPacketQueue *GetQueue(void) { m_Queue.Lock(); return &m_Queue; }
void ReleaseQueue(void) { m_Queue.Unlock(); }
// get
const CCallsign &GetReflectorCallsign(void)const { return m_ReflectorCallsign; }
@ -82,6 +78,9 @@ public:
void Thread(void);
virtual void Task(void) = 0;
// pass-through
void Push(std::shared_ptr<CPacket> p) { m_Queue.Push(p); }
protected:
// stream helpers
virtual void OnDvFramePacketIn(std::unique_ptr<CDvFramePacket> &, const CIp * = nullptr);
@ -118,7 +117,6 @@ protected:
void Dump(const char *title, const uint8_t *data, int length);
#endif
// socket
CUdpSocket m_Socket4;
CUdpSocket m_Socket6;
@ -127,7 +125,7 @@ protected:
std::unordered_map<uint16_t, std::shared_ptr<CPacketStream>> m_Streams;
// queue
CPacketQueue m_Queue;
CSafePacketQueue<std::shared_ptr<CPacket>> m_Queue;
// thread
std::atomic<bool> keep_running;

@ -188,7 +188,6 @@ std::shared_ptr<CPacketStream> CReflector::OpenStream(std::unique_ptr<CDvHeaderP
return nullptr;
}
stream->Lock();
// is it available ?
if ( stream->OpenPacketStream(*DvHeader, client) )
{
@ -211,7 +210,6 @@ std::shared_ptr<CPacketStream> CReflector::OpenStream(std::unique_ptr<CDvHeaderP
OnStreamOpen(stream->GetUserCallsign());
}
stream->Unlock();
return stream;
}
@ -220,22 +218,14 @@ void CReflector::CloseStream(std::shared_ptr<CPacketStream> stream)
if ( stream != nullptr )
{
// wait queue is empty. this waits forever
bool bEmpty = false;
do
bool bEmpty = stream->IsEmpty();
while (! bEmpty)
{
stream->Lock();
// do not use stream->IsEmpty() has this "may" never succeed
// and anyway, the DvLastFramPacket short-circuit the transcoder
// loop queues
bEmpty = stream->empty();
stream->Unlock();
if ( !bEmpty )
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::this_thread::sleep_for(std::chrono::milliseconds(10));
bEmpty = stream->IsEmpty();
}
while (!bEmpty);
GetClients(); // lock clients
stream->Lock(); // lock stream
// get and check the master
std::shared_ptr<CClient>client = stream->GetOwnerClient();
@ -253,11 +243,6 @@ void CReflector::CloseStream(std::shared_ptr<CPacketStream> stream)
// release clients
ReleaseClients();
// unlock before closing
// to avoid double lock in assiociated
// codecstream close/thread-join
stream->Unlock();
// and stop the queue
stream->ClosePacketStream();
}
@ -270,54 +255,31 @@ void CReflector::RouterThread(const char ThisModule)
{
while (keep_running)
{
std::unique_ptr<CPacket> packet;
auto streamIn = m_Stream[ThisModule];
// any packet in our input queue ?
streamIn->Lock();
if ( !streamIn->empty() )
{
// get the packet
packet = streamIn->pop();
}
else
{
packet = nullptr;
}
streamIn->Unlock();
// route it
if ( packet != nullptr )
{
// set origin
packet->SetPacketModule(ThisModule);
// convert the incoming packet to a shared_ptr
// wait until s
std::shared_ptr<CPacket> packet = std::move(streamIn->PopWait());
// iterate on all protocols
m_Protocols.Lock();
for ( auto it=m_Protocols.begin(); it!=m_Protocols.end(); it++ )
// set origin
packet->SetPacketModule(ThisModule);
// iterate on all protocols
m_Protocols.Lock();
for ( auto it=m_Protocols.begin(); it!=m_Protocols.end(); it++ )
{
// if packet is header, update RPT2 according to protocol
if ( packet->IsDvHeader() )
{
// duplicate packet
auto packetClone = packet->Duplicate();
// if packet is header, update RPT2 according to protocol
if ( packetClone->IsDvHeader() )
{
// get our callsign
CCallsign csRPT = (*it)->GetReflectorCallsign();
csRPT.SetCSModule(ThisModule);
(dynamic_cast<CDvHeaderPacket *>(packetClone.get()))->SetRpt2Callsign(csRPT);
}
// and push it
CPacketQueue *queue = (*it)->GetQueue();
queue->push(packetClone);
(*it)->ReleaseQueue();
// get our callsign
CCallsign csRPT = (*it)->GetReflectorCallsign();
csRPT.SetCSModule(ThisModule);
(dynamic_cast<CDvHeaderPacket *>(packet.get()))->SetRpt2Callsign(csRPT);
}
m_Protocols.Unlock();
}
else
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
(*it)->Push(packet);
}
m_Protocols.Unlock();
}
}

@ -0,0 +1,76 @@
// urfd -- The universal reflector
// Copyright © 2023 Thomas A. Early N7TAE
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
// A threadsafe-queue.
template <class T>
class CSafePacketQueue
{
public:
CSafePacketQueue(void) : q() , m() , c() {}
~CSafePacketQueue(void) {}
void Push(T t)
{
std::lock_guard<std::mutex> lock(m);
q.push(std::move(t));
c.notify_one();
}
T Pop(void)
{
std::lock_guard<std::mutex> lock(m);
if (q.empty())
return nullptr;
else
{
T val = std::move(q.front());
q.pop();
return val;
}
}
// If the queue is empty, wait till a element is avaiable.
T PopWait(void)
{
std::unique_lock<std::mutex> lock(m);
while(q.empty())
{
// release lock as long as the wait and reaquire it afterwards.
c.wait(lock);
}
T val = std::move(q.front());
q.pop();
return val;
}
bool IsEmpty(void)
{
std::unique_lock<std::mutex> lock(m);
return q.empty();
}
private:
std::queue<T> q;
mutable std::mutex m;
std::condition_variable c;
};

@ -200,11 +200,10 @@ void CURFProtocol::Task(void)
void CURFProtocol::HandleQueue(void)
{
m_Queue.Lock();
while ( !m_Queue.empty() )
while (! m_Queue.IsEmpty())
{
// get the packet
auto packet = m_Queue.pop();
auto packet = m_Queue.Pop();
// check if origin of packet is local
// if not, do not stream it out as it will cause
@ -236,7 +235,6 @@ void CURFProtocol::HandleQueue(void)
}
}
}
m_Queue.Unlock();
}
////////////////////////////////////////////////////////////////////////////////////////

@ -223,11 +223,10 @@ void CUSRPProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header,
void CUSRPProtocol::HandleQueue(void)
{
m_Queue.Lock();
while ( !m_Queue.empty() )
while (! m_Queue.IsEmpty())
{
// get the packet
auto packet = m_Queue.pop();
auto packet = m_Queue.Pop();
// get our sender's id
const auto module = packet->GetPacketModule();
@ -266,7 +265,6 @@ void CUSRPProtocol::HandleQueue(void)
g_Refl.ReleaseClients();
}
}
m_Queue.Unlock();
}

@ -304,11 +304,10 @@ void CYsfProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header,
void CYsfProtocol::HandleQueue(void)
{
m_Queue.Lock();
while ( !m_Queue.empty() )
while (! m_Queue.IsEmpty())
{
// get the packet
auto packet = m_Queue.pop();
auto packet = m_Queue.Pop();
// get our sender's id
const auto mod = packet->GetPacketModule();
@ -369,7 +368,6 @@ void CYsfProtocol::HandleQueue(void)
g_Refl.ReleaseClients();
}
}
m_Queue.Unlock();
}
////////////////////////////////////////////////////////////////////////////////////////

Loading…
Cancel
Save

Powered by TurnKey Linux.