changed CProtocol::m_Streams from a list to a unordered_map keyed by the streamID

unstable
Tom Early 4 years ago
parent 436e0b00d9
commit 065be9bf90

@ -28,7 +28,7 @@
////////////////////////////////////////////////////////////////////////////////////////
// operation
bool CBMProtocol::Initialize(const char *type, EProtocol ptype, const uint16_t port, const bool has_ipv4, const bool has_ipv6)
bool CBMProtocol::Initialize(const char *type, const EProtocol ptype, const uint16_t port, const bool has_ipv4, const bool has_ipv6)
{
if (! CProtocol::Initialize(type, ptype, port, has_ipv4, has_ipv6))
return false;
@ -234,15 +234,10 @@ void CBMProtocol::HandleQueue(void)
case EProtoRev::ambe:
default:
#ifdef TRANSCODED_MODULES
if ( g_Transcoder.IsConnected() )
{
Send(buffer, client->GetIp());
}
else
#endif
{
#else
Send(bufferLegacy, client->GetIp());
}
#endif
break;
}
}
@ -382,7 +377,7 @@ void CBMProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header, c
if ( (stream = g_Reflector.OpenStream(Header, client)) != nullptr )
{
// keep the handle
m_Streams.push_back(stream);
m_Streams[stream->GetStreamId()] = stream;
}
// get origin
peer = client->GetCallsign();

@ -34,7 +34,7 @@ class CBMProtocol : public CProtocol
{
public:
// initialization
bool Initialize(const char *type, EProtocol ptype, const uint16_t port, const bool has_ipv4, const bool has_ipv6);
bool Initialize(const char *type, const EProtocol ptype, const uint16_t port, const bool has_ipv4, const bool has_ipv6);
// task
void Task(void);

@ -24,8 +24,7 @@
#include "Callsign.h"
// if a client is using special characters ',', '-' or '/', he's out of luck!
//#define M17CHARACTERS " ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-/."
#define M17CHARACTERS " ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
#define M17CHARACTERS " ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-/."
////////////////////////////////////////////////////////////////////////////////////////
// constructors

@ -24,7 +24,7 @@
#include "Buffer.h"
#include "Packet.h"
enum class EProtoRev { original, revised, ambe, urf };
enum class EProtoRev { original, revised, ambe };
class CClient
{

@ -202,7 +202,7 @@ void CDcsProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header,
if ( (stream = g_Reflector.OpenStream(Header, client)) != nullptr )
{
// keep the handle
m_Streams.push_back(stream);
m_Streams[stream->GetStreamId()] = stream;
}
}
// release

@ -25,20 +25,20 @@
CDextraClient::CDextraClient()
{
m_ProtRev = 0;
m_ProtRev = EProtoRev::ambe;
}
CDextraClient::CDextraClient(const CCallsign &callsign, const CIp &ip, char reflectorModule, int protRev)
CDextraClient::CDextraClient(const CCallsign &callsign, const CIp &ip, char reflectorModule, EProtoRev protRev)
: CClient(callsign, ip, reflectorModule)
{
m_ProtRev = protRev;
}
CDextraClient::CDextraClient(const CDextraClient &client)
: CClient(client)
{
m_ProtRev = client.m_ProtRev;
}
//CDextraClient::CDextraClient(const CDextraClient &client)
// : CClient(client)
//{
// m_ProtRev = client.m_ProtRev;
//}
////////////////////////////////////////////////////////////////////////////////////////
// status

@ -32,15 +32,15 @@ class CDextraClient : public CClient
public:
// constructors
CDextraClient();
CDextraClient(const CCallsign &, const CIp &, char = ' ', int = 0);
CDextraClient(const CDextraClient &);
CDextraClient(const CCallsign &, const CIp &, char, EProtoRev);
// CDextraClient(const CDextraClient &);
// destructor
virtual ~CDextraClient() {};
// identity
EProtocol GetProtocol(void) const { return EProtocol::dextra; }
int GetProtocolRevision(void) const { return m_ProtRev; }
EProtoRev GetProtocolRevision(void) const { return m_ProtRev; }
const char *GetProtocolName(void) const { return "DExtra"; }
ECodecType GetCodec(void) const { return ECodecType::dstar; }
bool IsNode(void) const { return true; }
@ -50,5 +50,5 @@ public:
protected:
// data
int m_ProtRev;
EProtoRev m_ProtRev;
};

@ -40,8 +40,8 @@ CDextraPeer::CDextraPeer(const CCallsign &callsign, const CIp &ip, const char *m
// and construct the DExtra clients
for ( unsigned i = 0; i < ::strlen(modules); i++ )
{
// create and append to vector
m_Clients.push_back(std::make_shared<CDextraClient>(callsign, ip, modules[i], version.GetMajor()));
// create and append to list
m_Clients.push_back(std::make_shared<CDextraClient>(callsign, ip, modules[i], EProtoRev::ambe));
}
}

@ -51,7 +51,7 @@ void CDextraProtocol::Task(void)
CIp Ip;
CCallsign Callsign;
char ToLinkModule;
int ProtRev;
EProtoRev ProtRev;
std::unique_ptr<CDvHeaderPacket> Header;
std::unique_ptr<CDvFramePacket> Frame;
@ -79,9 +79,23 @@ void CDextraProtocol::Task(void)
OnDvHeaderPacketIn(Header, Ip);
}
}
else if ( IsValidConnectPacket(Buffer, &Callsign, &ToLinkModule, &ProtRev) )
else if ( IsValidConnectPacket(Buffer, Callsign, ToLinkModule, ProtRev) )
{
std::cout << "DExtra connect packet for module " << ToLinkModule << " from " << Callsign << " at " << Ip << " rev " << ProtRev << std::endl;
std::cout << "DExtra connect packet for module " << ToLinkModule << " from " << Callsign << " at " << Ip << " rev ";
switch (ProtRev) {
case EProtoRev::original:
std::cout << "Original" << std::endl;
break;
case EProtoRev::revised:
std::cout << "Revised" << std::endl;
break;
case EProtoRev::ambe:
std::cout << "AMBE" << std::endl;
break;
default:
std::cout << "UNEXPECTED Revision" << std::endl;
break;
}
// callsign authorized?
if ( g_GateKeeper.MayLink(Callsign, Ip, EProtocol::dextra) )
@ -419,7 +433,7 @@ void CDextraProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Heade
if ( (stream = g_Reflector.OpenStream(Header, client)) != nullptr )
{
// keep the handle
m_Streams.push_back(stream);
m_Streams[stream->GetStreamId()] = stream;
}
}
// release
@ -434,28 +448,27 @@ void CDextraProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Heade
////////////////////////////////////////////////////////////////////////////////////////
// packet decoding helpers
bool CDextraProtocol::IsValidConnectPacket(const CBuffer &Buffer, CCallsign *callsign, char *reflectormodule, int *revision)
bool CDextraProtocol::IsValidConnectPacket(const CBuffer &Buffer, CCallsign &callsign, char &module, EProtoRev &protrev)
{
bool valid = false;
if ((Buffer.size() == 11) && (Buffer.data()[9] != ' '))
{
callsign->SetCallsign(Buffer.data(), 8);
callsign->SetModule(Buffer.data()[8]);
*reflectormodule = Buffer.data()[9];
*revision = (Buffer.data()[10] == 11) ? 1 : 0;
valid = (callsign->IsValid() && IsLetter(*reflectormodule));
callsign.SetCallsign(Buffer.data(), 8);
callsign.SetModule(Buffer.data()[8]);
module = Buffer.data()[9];
valid = (callsign.IsValid() && IsLetter(module));
// detect revision
if ( (Buffer.data()[10] == 11) )
{
*revision = 1;
protrev = EProtoRev::revised;
}
else if ( callsign->HasSameCallsignWithWildcard(CCallsign("XRF*")) )
else if ( callsign.HasSameCallsignWithWildcard(CCallsign("XRF*")) )
{
*revision = 2;
protrev = EProtoRev::ambe;
}
else
{
*revision = 0;
EProtoRev::original;
}
}
return valid;
@ -529,10 +542,10 @@ void CDextraProtocol::EncodeConnectPacket(CBuffer *Buffer, const char *Modules)
Buffer->Append((uint8_t)0);
}
void CDextraProtocol::EncodeConnectAckPacket(CBuffer *Buffer, int ProtRev)
void CDextraProtocol::EncodeConnectAckPacket(CBuffer *Buffer, EProtoRev ProtRev)
{
// is it for a XRF or repeater
if ( ProtRev == 2 )
if ( ProtRev == EProtoRev::ambe )
{
// XRFxxx
uint8_t rm = (Buffer->data())[8];

@ -40,7 +40,7 @@
// the protocol is detected by looking at "XRF" in connect packet callsign
// the protocol require a specific connect ack packet
// the protocol also implement a workaround for detecting stream's module
// as dxrfd soes not set DV header RPT2 properly.
// as dxrfd does not set DV header RPT2 properly.
// the protocol assumes that a dxrfd can only be linked to one module at a time
@ -68,7 +68,7 @@ protected:
void OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &, const CIp &);
// packet decoding helpers
bool IsValidConnectPacket( const CBuffer &, CCallsign *, char *, int *);
bool IsValidConnectPacket( const CBuffer &, CCallsign &, char &, EProtoRev &);
bool IsValidDisconnectPacket( const CBuffer &, CCallsign *);
bool IsValidKeepAlivePacket( const CBuffer &, CCallsign *);
bool IsValidDvHeaderPacket( const CBuffer &, std::unique_ptr<CDvHeaderPacket> &);
@ -77,7 +77,7 @@ protected:
// packet encoding helpers
void EncodeKeepAlivePacket(CBuffer *);
void EncodeConnectPacket(CBuffer *, const char *);
void EncodeConnectAckPacket(CBuffer *, int);
void EncodeConnectAckPacket(CBuffer *, EProtoRev);
void EncodeConnectNackPacket(CBuffer *);
void EncodeDisconnectPacket(CBuffer *, char);
void EncodeDisconnectedPacket(CBuffer *);

@ -319,7 +319,7 @@ void CDmrmmdvmProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Hea
if ( (stream = g_Reflector.OpenStream(Header, client)) != nullptr )
{
// keep the handle
m_Streams.push_back(stream);
m_Streams[stream->GetStreamId()] = stream;
lastheard = true;
}
}

@ -203,7 +203,7 @@ void CDmrplusProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Head
if ( (stream = g_Reflector.OpenStream(Header, client)) != nullptr )
{
// keep the handle
m_Streams.push_back(stream);
m_Streams[stream->GetStreamId()] = stream;
}
}
// release

@ -207,7 +207,7 @@ void CDplusProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header
if ( (stream = g_Reflector.OpenStream(Header, client)) != nullptr )
{
// keep the handle
m_Streams.push_back(stream);
m_Streams[stream->GetStreamId()] = stream;
}
}
// release

@ -566,7 +566,7 @@ void CG3Protocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header, c
if ( (stream = g_Reflector.OpenStream(Header, client)) != nullptr )
{
// keep the handle
m_Streams.push_back(stream);
m_Streams[stream->GetStreamId()] = stream;
}
// update last heard

@ -98,7 +98,7 @@ bool CGateKeeper::MayLink(const CCallsign &callsign, const CIp &ip, EProtocol pr
break;
// XLX interlinks
case EProtocol::ulx:
case EProtocol::urf:
ok &= IsPeerListedOk(callsign, ip, modules);
break;
@ -143,7 +143,7 @@ bool CGateKeeper::MayTransmit(const CCallsign &callsign, const CIp &ip, const EP
break;
// XLX interlinks
case EProtocol::ulx:
case EProtocol::urf:
ok = ok && IsPeerListedOk(callsign, ip, module);
break;
@ -280,8 +280,8 @@ const std::string CGateKeeper::ProtocolName(const EProtocol p) const
return "MMDVM DMR";
case EProtocol::dmrplus:
return "DMR+";
case EProtocol::ulx:
return "ULX";
case EProtocol::urf:
return "URF";
case EProtocol::ysf:
return "YSF";
#ifndef NO_G3

@ -199,7 +199,7 @@ void CM17Protocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header,
if ( (stream = g_Reflector.OpenStream(Header, client)) != nullptr )
{
// keep the handle
m_Streams.push_back(stream);
m_Streams[stream->GetStreamId()] = stream;
}
}
// release

@ -225,6 +225,3 @@ extern CYsfNodeDirHttp g_YsfNodeDir;
class CYsfNodeDirFile;
extern CYsfNodeDirFile g_YsfNodeDir;
#endif
class CTranscoder;
extern CTranscoder g_Transcoder;

@ -36,23 +36,21 @@ endif
LDFLAGS=-pthread
XRFSRCS = Buffer.cpp Callsign.cpp CallsignList.cpp CallsignListItem.cpp Client.cpp Clients.cpp DCSClient.cpp DCSProtocol.cpp DExtraClient.cpp DExtraPeer.cpp DExtraProtocol.cpp DPlusClient.cpp DPlusProtocol.cpp DVFramePacket.cpp DVHeaderPacket.cpp DVLastFramePacket.cpp GateKeeper.cpp IP.cpp Notification.cpp Packet.cpp PacketStream.cpp PeerCallsignList.cpp Peer.cpp Peers.cpp Protocol.cpp Protocols.cpp Reflector.cpp UDPSocket.cpp User.cpp Users.cpp Version.cpp Main.cpp
XLXSRCS = BMClient.cpp BMPeer.cpp BPTC19696.cpp CRC.cpp DMRIdDir.cpp DMRIdDirFile.cpp DMRIdDirHttp.cpp DMRMMDVMClient.cpp DMRMMDVMProtocol.cpp DMRPlusClient.cpp DMRPlusProtocol.cpp Golay2087.cpp Golay24128.cpp Hamming.cpp QR1676.cpp RS129.cpp Semaphore.cpp Utils.cpp WiresXCmd.cpp WiresXCmdHandler.cpp WiresXInfo.cpp XLXClient.cpp XLXProtocol.cpp XLXPeer.cpp YSFClient.cpp YSFConvolution.cpp YSFFich.cpp YSFNode.cpp YSFNodeDir.cpp YSFNodeDirFile.cpp YSFNodeDirHttp.cpp YSFPayload.cpp YSFProtocol.cpp YSFUtils.cpp
XRFSRCS = Buffer.cpp Callsign.cpp CallsignList.cpp CallsignListItem.cpp Client.cpp Clients.cpp DCSClient.cpp DCSProtocol.cpp DExtraClient.cpp DExtraPeer.cpp DExtraProtocol.cpp DPlusClient.cpp DPlusProtocol.cpp DVFramePacket.cpp DVHeaderPacket.cpp GateKeeper.cpp IP.cpp M17Client.cpp M17CRC.cpp M17Packet.cpp M17Client.cpp Notification.cpp Packet.cpp PacketStream.cpp PeerCallsignList.cpp Peer.cpp Peers.cpp Protocol.cpp Protocols.cpp Reflector.cpp UDPSocket.cpp User.cpp Users.cpp Version.cpp Main.cpp
XLXSRCS = BMClient.cpp BMPeer.cpp BPTC19696.cpp CRC.cpp DMRIdDir.cpp DMRIdDirFile.cpp DMRIdDirHttp.cpp DMRMMDVMClient.cpp DMRMMDVMProtocol.cpp DMRPlusClient.cpp DMRPlusProtocol.cpp Golay2087.cpp Golay24128.cpp Hamming.cpp QR1676.cpp RS129.cpp Semaphore.cpp Utils.cpp WiresXCmd.cpp WiresXCmdHandler.cpp WiresXInfo.cpp URFClient.cpp URFProtocol.cpp URFPeer.cpp YSFClient.cpp YSFConvolution.cpp YSFFich.cpp YSFNode.cpp YSFNodeDir.cpp YSFNodeDirFile.cpp YSFNodeDirHttp.cpp YSFPayload.cpp YSFProtocol.cpp YSFUtils.cpp
G3SRCS = G3Client.cpp G3Protocol.cpp RawSocket.cpp UDPMsgSocket.cpp
SRCS = $(XRFSRCS)
SRCS = $(XRFSRCS) $(XLXSRCS)
ifeq ($(use_g3), true)
SRCS += $(G3SRCS)
endif
SRCS += $(XLXSRCS)
ifeq ($(ysf_db), true)
LDFLAGS += `mysql_config --libs`
endif
ifdef need_tc
SRCS += CodecStream.cpp
endif
ifeq ($(use_g3), true)
SRCS += $(G3SRCS)
SRCS += CodecStream.cpp UnixDgramSocket.cpp
endif
OBJS = $(SRCS:.cpp=.o)

@ -53,7 +53,7 @@ bool CPacketStream::OpenPacketStream(const CDvHeaderPacket &DvHeader, std::share
m_LastPacketTime.start();
#ifdef TRANSCODED_MODULES
if (std::string::npos != std::string(TRANSCODED_MODULES).find(DvHeader.GetRpt2Module()))
m_CodecStream = std::make_shared<CCodecStream>(this, client->GetCodec(), m_TCReader);
m_CodecStream = std::make_shared<CCodecStream>(this, m_uiStreamId, client->GetCodec(), m_TCReader);
#endif
ok = true;
}
@ -67,7 +67,6 @@ void CPacketStream::ClosePacketStream(void)
m_uiStreamId = 0;
m_OwnerClient = nullptr;
#ifdef TRANSCODED_MODULES
g_Transcoder.ReleaseStream(m_CodecStream);
m_CodecStream = nullptr;
#endif
}

@ -21,7 +21,8 @@
#include "PacketQueue.h"
#include "Timer.h"
#include "DVHeaderPacket.h"
#include "Transcoder.h"
#include "UnixDgramSocket.h"
#include "CodecStream.h"
////////////////////////////////////////////////////////////////////////////////////////
@ -31,8 +32,6 @@
////////////////////////////////////////////////////////////////////////////////////////
// class
#include "UnixDgramSocket.h"
class CPacketStream : public CPacketQueue
{
public:

@ -47,7 +47,7 @@ public:
// identity
virtual EProtocol GetProtocol(void) const { return EProtocol::none; }
virtual int GetProtocolRevision(void) const { return 0; }
virtual EProtoRev GetProtocolRevision(void) const { return EProtoRev::original; }
virtual const char *GetProtocolName(void) const { return "NONE"; }
// status

@ -166,21 +166,18 @@ void CProtocol::OnDvFramePacketIn(std::unique_ptr<CDvFramePacket> &Frame, const
std::shared_ptr<CPacketStream> CProtocol::GetStream(uint16_t uiStreamId, const CIp *Ip)
{
for ( auto it=m_Streams.begin(); it!=m_Streams.end(); it++ )
{
if ( (*it)->GetStreamId() == uiStreamId )
{
// if Ip not nullptr, also check if IP match
if ( (Ip != nullptr) && ((*it)->GetOwnerIp() != nullptr) )
auto it = m_Streams.find(uiStreamId);
if (it == m_Streams.end())
return nullptr;
if (it->second->GetOwnerIp() != nullptr)
{
if ( *Ip == *((*it)->GetOwnerIp()) )
if (*Ip == *it->second->GetOwnerIp())
{
return *it;
return it->second;
}
}
}
}
// done
return nullptr;
}
@ -189,18 +186,19 @@ void CProtocol::CheckStreamsTimeout(void)
for ( auto it=m_Streams.begin(); it!=m_Streams.end(); )
{
// time out ?
(*it)->Lock();
if ( (*it)->IsExpired() )
it->second->Lock();
if ( it->second->IsExpired() )
{
// yes, close it
(*it)->Unlock();
g_Reflector.CloseStream(*it);
// and remove it
it->second->Unlock();
g_Reflector.CloseStream(it->second);
// and remove it from the m_Streams map
it = m_Streams.erase(it);
}
else
{
(*it++)->Unlock();
it->second->Unlock();
it++;
}
}
}

@ -125,7 +125,7 @@ protected:
CUdpSocket m_Socket6;
// streams
std::list<std::shared_ptr<CPacketStream>> m_Streams;
std::unordered_map<uint16_t, std::shared_ptr<CPacketStream>> m_Streams;
// queue
CPacketQueue m_Queue;

@ -22,7 +22,6 @@
#include "GateKeeper.h"
#include "DMRIdDirFile.h"
#include "DMRIdDirHttp.h"
#include "Transcoder.h"
#include "YSFNodeDirFile.h"
#include "YSFNodeDirHttp.h"
@ -77,12 +76,6 @@ bool CReflector::Start(void)
// init wiresx node directory. Likewise with the return vale.
g_YsfNodeDir.Init();
#ifdef TRANSCODER_IP
// init the transcoder
if (! g_Transcoder.Init())
return false;
#endif
// create protocols
if (! m_Protocols.Init())
{
@ -144,11 +137,6 @@ void CReflector::Stop(void)
// close gatekeeper
g_GateKeeper.Close();
#ifdef TRANSCODER_IP
// close transcoder
g_Transcoder.Close();
#endif
// close databases
g_DmridDir.Close();
g_YsfNodeDir.Close();
@ -441,6 +429,7 @@ void CReflector::JsonReportThread()
std::cout << "Error creating monitor socket" << std::endl;
}
}
#endif
////////////////////////////////////////////////////////////////////////////////////////
// notifications
@ -489,7 +478,6 @@ void CReflector::OnStreamClose(const CCallsign &callsign)
m_Notifications.push(notification);
m_Notifications.Unlock();
}
#endif
////////////////////////////////////////////////////////////////////////////////////////
// modules & queues

@ -24,9 +24,6 @@
#include "Protocols.h"
#include "PacketStream.h"
#include "NotificationQueue.h"
#ifdef TRANSCODED_MODULES
//#include "UnixDgramSocket.h"
#endif
////////////////////////////////////////////////////////////////////////////////////////
@ -82,6 +79,7 @@ public:
bool IsValidModule(char c) const { return m_Modules.npos!=m_Modules.find(c); }
// notifications
void OnPeersChanged(void);
void OnClientsChanged(void);
void OnUsersChanged(void);
@ -137,9 +135,9 @@ protected:
std::future<void> m_XmlReportFuture;
#ifdef JSON_MONITOR
std::future<void> m_JsonReportFuture;
#endif
// notifications
CNotificationQueue m_Notifications;
#endif
public:
#ifdef DEBUG_DUMPFILE

@ -1,386 +0,0 @@
// Copyright © 2015 Jean-Luc Deltombe (LX3JL). All rights reserved.
// urfd -- The universal reflector
// Copyright © 2021 Thomas A. Early N7TAE
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#include "Main.h"
#include "Reflector.h"
#include "Transcoder.h"
////////////////////////////////////////////////////////////////////////////////////////
// define
// status
#define STATUS_IDLE 0
#define STATUS_LOGGED 1
// timeout
#define AMBED_OPENSTREAM_TIMEOUT 200 // in ms
////////////////////////////////////////////////////////////////////////////////////////
CTranscoder g_Transcoder;
////////////////////////////////////////////////////////////////////////////////////////
// constructor
CTranscoder::CTranscoder()
{
keep_running = true;
m_bConnected = false;
m_LastKeepaliveTime.start();
m_LastActivityTime.start();
m_bStreamOpened = false;
m_StreamidOpenStream = 0;
m_PortOpenStream = 0;
}
////////////////////////////////////////////////////////////////////////////////////////
// destructor
CTranscoder::~CTranscoder()
{
Close();
}
////////////////////////////////////////////////////////////////////////////////////////
// initialization
bool CTranscoder::Init(void)
{
// create address to the transcoder
auto s = g_Reflector.GetTranscoderIp();
m_Ip.Initialize(strchr(s, ':') ? AF_INET6 : AF_INET, TRANSCODER_PORT, s);
// does the user not want to use a transcoder?
if (0 == strncasecmp(s, "none", 4))
{
std::cout << "Transcoder will not be enabled because the transcoder IP addess is 'none'" << std::endl;
return true;
}
// now open the transcoder port
#ifdef LISTEN_IPV4
#ifdef LISTEN_IPV6
auto paddr = (AF_INET == m_Ip.GetFamily()) ? LISTEN_IPV4 : LISTEN_IPV6;
#else
auto paddr = LISTEN_IPV4;
#endif
#else
auto paddr = LISTEN_IPV6;
#endif
CIp tc(m_Ip.GetFamily(), TRANSCODER_PORT, paddr);
// create our socket
if (tc.IsSet())
{
if (! m_Socket.Open(tc))
{
std::cerr << "Error opening socket on port UDP" << TRANSCODER_PORT << " on ip " << m_Ip << std::endl;
return false;
}
}
else
{
// something bad was specified for the transcoder IP?
std::cerr << "Error initializing transcoder port using " << ((AF_INET == m_Ip.GetFamily()) ? "IPv4" : "IPv6") << " on " << paddr << std::endl;
return false;
}
// start thread
keep_running = true;
m_Future = std::async(std::launch::async, &CTranscoder::Thread, this);
return true;
}
void CTranscoder::Close(void)
{
// close socket
m_Socket.Close();
// close all streams
m_Mutex.lock();
m_Streams.clear();
m_Mutex.unlock();
// kill threads
keep_running = false;
if ( m_Future.valid() )
{
m_Future.get();
}
}
////////////////////////////////////////////////////////////////////////////////////////
// thread
void CTranscoder::Thread()
{
while (keep_running)
{
Task();
}
}
void CTranscoder::Task(void)
{
CBuffer Buffer;
CIp Ip;
uint16_t StreamId;
uint16_t Port;
// anything coming in from codec server ?
//if ( (m_Socket.Receive(&Buffer, &Ip, 20) != -1) && (Ip == m_Ip) )
if ( m_Socket.Receive(Buffer, Ip, 20) )
{
m_LastActivityTime.start();
// crack packet
if ( IsValidStreamDescrPacket(Buffer, &StreamId, &Port) )
{
//std::cout << "Transcoder stream " << (int) StreamId << " descr packet " << std::endl;
m_bStreamOpened = true;
m_StreamidOpenStream = StreamId;
m_PortOpenStream = Port;
m_SemaphoreOpenStream.Notify();
}
else if ( IsValidNoStreamAvailablePacket(Buffer) )
{
m_bStreamOpened = false;
m_SemaphoreOpenStream.Notify();
}
else if ( IsValidKeepAlivePacket(Buffer) )
{
if ( !m_bConnected )
{
std::cout << "Transcoder connected at " << Ip << std::endl;
}
m_bConnected = true;
}
}
// keep client alive
if ( m_LastKeepaliveTime.time() > TRANSCODER_KEEPALIVE_PERIOD )
{
//
HandleKeepalives();
// update time
m_LastKeepaliveTime.start();
}
}
////////////////////////////////////////////////////////////////////////////////////////
// manage streams
std::shared_ptr<CCodecStream> CTranscoder::GetCodecStream(CPacketStream *PacketStream, uint8_t uiCodecIn)
{
CBuffer Buffer;
// do we need transcoding
if ( uiCodecIn != CODEC_NONE )
{
// are we connected to server
if ( m_bConnected )
{
// yes, post openstream request
EncodeOpenstreamPacket(&Buffer, uiCodecIn, (uiCodecIn == CODEC_AMBEPLUS) ? CODEC_AMBE2PLUS : CODEC_AMBEPLUS);
m_Socket.Send(Buffer, m_Ip, TRANSCODER_PORT);
// wait relpy here
if ( m_SemaphoreOpenStream.WaitFor(AMBED_OPENSTREAM_TIMEOUT) )
{
if ( m_bStreamOpened )
{
std::cout << "ambed openstream ok" << std::endl;
// create stream object
auto stream = std::make_shared<CCodecStream>(PacketStream, m_StreamidOpenStream, uiCodecIn, (uiCodecIn == CODEC_AMBEPLUS) ? CODEC_AMBE2PLUS : CODEC_AMBEPLUS);
if ( stream ) {
if ( stream->Init(m_PortOpenStream) )
{
// and append to list
Lock();
m_Streams.push_back(stream);
Unlock();
return stream;
}
else
{
// send close packet
EncodeClosestreamPacket(&Buffer, stream->GetStreamId());
m_Socket.Send(Buffer, m_Ip, TRANSCODER_PORT);
stream.reset();
}
}
else
std::cerr << "ERROR: Unable to make a new CCodecStream" << std::endl;
}
else
{
std::cerr << "ERROR: Ambed openstream failed (no suitable channel available)" << std::endl;
}
}
else
{
std::cout << "ambed openstream timeout" << std::endl;
}
}
}
return nullptr;
}
void CTranscoder::ReleaseStream(std::shared_ptr<CCodecStream> stream)
{
CBuffer Buffer;
if ( stream )
{
// look for the stream
bool found = false;
Lock();
{
for ( auto it=m_Streams.begin(); it!=m_Streams.end(); it++ )
{
// compare object pointers
if ( (*it) == stream )
{
// send close packet
EncodeClosestreamPacket(&Buffer, (*it)->GetStreamId());
m_Socket.Send(Buffer, m_Ip, TRANSCODER_PORT);
// display stats
if ( (*it)->GetPingMin() >= 0.0 )
{
char sz[256];
sprintf(sz, "ambed stats (ms) : %.1f/%.1f/%.1f",
(*it)->GetPingMin() * 1000.0,
(*it)->GetPingAve() * 1000.0,
(*it)->GetPingMax() * 1000.0);
std::cout << sz << std::endl;
}
if ( (*it)->GetTimeoutPackets() > 0 )
{
char sz[256];
sprintf(sz, "ambed %d of %d packets timed out",
(*it)->GetTimeoutPackets(),
(*it)->GetTotalPackets());
std::cout << sz << std::endl;
}
// and close it
(*it)->Close();
m_Streams.erase(it);
break;
}
}
}
Unlock();
}
}
////////////////////////////////////////////////////////////////////////////////////////
// keepalive helpers
void CTranscoder::HandleKeepalives(void)
{
CBuffer keepalive;
// send keepalive
EncodeKeepAlivePacket(&keepalive);
m_Socket.Send(keepalive, m_Ip, TRANSCODER_PORT);
// check if still with us
if ( m_bConnected && (m_LastActivityTime.time() >= TRANSCODER_KEEPALIVE_TIMEOUT) )
{
// no, disconnect
m_bConnected = false;
std::cout << "Transcoder keepalive timeout" << std::endl;
}
}
////////////////////////////////////////////////////////////////////////////////////////
// packet decoding helpers
bool CTranscoder::IsValidKeepAlivePacket(const CBuffer &Buffer)
{
uint8_t tag[] = { 'A','M','B','E','D','P','O','N','G' };
bool valid = false;
if ( (Buffer.size() == 9) && (Buffer.Compare(tag, sizeof(tag)) == 0) )
{
valid = true;
}
return valid;
}
bool CTranscoder::IsValidStreamDescrPacket(const CBuffer &Buffer, uint16_t *Id, uint16_t *Port)
{
uint8_t tag[] = { 'A','M','B','E','D','S','T','D' };
bool valid = false;
if ( (Buffer.size() == 14) && (Buffer.Compare(tag, sizeof(tag)) == 0) )
{
*Id = *(uint16_t *)(&Buffer.data()[8]);
*Port = *(uint16_t *)(&Buffer.data()[10]);
// uint8_t CodecIn = Buffer.data()[12];
// uint8_t CodecOut = Buffer.data()[13];
valid = true;
}
return valid;
}
bool CTranscoder::IsValidNoStreamAvailablePacket(const CBuffer&Buffer)
{
uint8_t tag[] = { 'A','M','B','E','D','B','U','S','Y' };
return ( (Buffer.size() == 9) && (Buffer.Compare(tag, sizeof(tag)) == 0) );
}
////////////////////////////////////////////////////////////////////////////////////////
// packet encoding helpers
void CTranscoder::EncodeKeepAlivePacket(CBuffer *Buffer)
{
uint8_t tag[] = { 'A','M','B','E','D','P','I','N','G' };
Buffer->Set(tag, sizeof(tag));
Buffer->Append((uint8_t *)(const char *)g_Reflector.GetCallsign(), CALLSIGN_LEN);
}
void CTranscoder::EncodeOpenstreamPacket(CBuffer *Buffer, uint8_t uiCodecIn, uint8_t uiCodecOut)
{
uint8_t tag[] = { 'A','M','B','E','D','O','S' };
Buffer->Set(tag, sizeof(tag));
Buffer->Append((uint8_t *)(const char *)g_Reflector.GetCallsign(), CALLSIGN_LEN);
Buffer->Append((uint8_t)uiCodecIn);
Buffer->Append((uint8_t)uiCodecOut);
}
void CTranscoder::EncodeClosestreamPacket(CBuffer *Buffer, uint16_t uiStreamId)
{
uint8_t tag[] = { 'A','M','B','E','D','C','S' };
Buffer->Set(tag, sizeof(tag));
Buffer->Append((uint16_t)uiStreamId);
}

@ -1,92 +0,0 @@
// Copyright © 2015 Jean-Luc Deltombe (LX3JL). All rights reserved.
// urfd -- The universal reflector
// Copyright © 2021 Thomas A. Early N7TAE
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#pragma once
#include "Semaphore.h"
#include "CodecStream.h"
#include "UDPSocket.h"
class CPacketStream;
class CTranscoder
{
public:
// constructor
CTranscoder();
// destructor
~CTranscoder();
// initialization
bool Init(void);
void Close(void);
// locks
void Lock(void) { m_Mutex.lock(); }
void Unlock(void) { m_Mutex.unlock(); }
// status
bool IsConnected(void) const { return m_bConnected; }
// manage streams
std::shared_ptr<CCodecStream> GetCodecStream(CPacketStream *, uint8_t);
void ReleaseStream(std::shared_ptr<CCodecStream>);
// task
void Thread(void);
void Task(void);
protected:
// keepalive helpers
void HandleKeepalives(void);
// packet decoding helpers
bool IsValidKeepAlivePacket(const CBuffer &);
bool IsValidStreamDescrPacket(const CBuffer &, uint16_t *, uint16_t *);
bool IsValidNoStreamAvailablePacket(const CBuffer&);
// packet encoding helpers
void EncodeKeepAlivePacket(CBuffer *);
void EncodeOpenstreamPacket(CBuffer *, uint8_t, uint8_t);
void EncodeClosestreamPacket(CBuffer *, uint16_t);
protected:
// streams
std::mutex m_Mutex;
std::list<std::shared_ptr<CCodecStream>> m_Streams;
// sync objects for Openstream
CSemaphore m_SemaphoreOpenStream;
bool m_bStreamOpened;
uint16_t m_StreamidOpenStream;
uint16_t m_PortOpenStream;
// thread
std::atomic<bool> keep_running;
std::future<void> m_Future;
// socket
CIp m_Ip;
CUdpSocket m_Socket;
bool m_bConnected;
// time
CTimer m_LastKeepaliveTime;
CTimer m_LastActivityTime;
};

@ -59,5 +59,5 @@ bool CURFPeer::IsAlive(void) const
EProtoRev CURFPeer::GetProtocolRevision(const CVersion &/*version*/)
{
return EProtoRev::urf;
return EProtoRev::original;
}

@ -87,7 +87,7 @@ void CURFProtocol::Task(void)
{
// acknowledge connecting request
// following is version dependent
if (EProtoRev::urf == CURFPeer::GetProtocolRevision(Version))
if (EProtoRev::original == CURFPeer::GetProtocolRevision(Version))
{
// already connected ?
CPeers *peers = g_Reflector.GetPeers();
@ -235,7 +235,7 @@ void CURFProtocol::HandleQueue(void)
{
// no, send the packet
// this is protocol revision dependent
if (EProtoRev::urf == client->GetProtocolRevision())
if (EProtoRev::original == client->GetProtocolRevision())
{
Send(buffer, client->GetIp());
}
@ -376,7 +376,7 @@ void CURFProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header,
if ( (stream = g_Reflector.OpenStream(Header, client)) != nullptr )
{
// keep the handle
m_Streams.push_back(stream);
m_Streams[stream->GetStreamId()] = stream;
}
// get origin
peer = client->GetCallsign();

@ -14,8 +14,6 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#pragma once
#include <iostream>
#include <unistd.h>
#include <string.h>

@ -254,7 +254,7 @@ void CYsfProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header,
if ( (stream = g_Reflector.OpenStream(Header, client)) != nullptr )
{
// keep the handle
m_Streams.push_back(stream);
m_Streams[stream->GetStreamId()] = stream;
}
}
// release

Loading…
Cancel
Save

Powered by TurnKey Linux.