CCodecStream redone with Unix sockets

unstable
Tom Early 4 years ago
parent a7e52fc330
commit 60bc3ab68f

@ -25,13 +25,12 @@
////////////////////////////////////////////////////////////////////////////////////////
// constructor
CCodecStream::CCodecStream(CPacketStream *PacketStream, uint16_t uiId, ECodecType type)
CCodecStream::CCodecStream(CPacketStream *PacketStream, uint16_t streamid, ECodecType type, std::shared_ptr<CUnixDgramReader> reader)
{
keep_running = true;
m_uiStreamId = uiId;
m_uiStreamId = streamid;
m_uiPid = 0;
m_eCodecIn = type;
m_bConnected = false;
m_fPingMin = -1;
m_fPingMax = -1;
m_fPingSum = 0;
@ -39,6 +38,7 @@ CCodecStream::CCodecStream(CPacketStream *PacketStream, uint16_t uiId, ECodecTyp
m_uiTotalPackets = 0;
m_uiTimeoutPackets = 0;
m_PacketStream = PacketStream;
m_TCReader = reader;
}
////////////////////////////////////////////////////////////////////////////////////////
@ -47,7 +47,7 @@ CCodecStream::CCodecStream(CPacketStream *PacketStream, uint16_t uiId, ECodecTyp
CCodecStream::~CCodecStream()
{
// close socket
m_Socket.Close();
m_TCReader->Close();
// kill threads
keep_running = false;
@ -60,48 +60,10 @@ CCodecStream::~CCodecStream()
////////////////////////////////////////////////////////////////////////////////////////
// initialization
bool CCodecStream::Init(uint16_t uiPort)
bool CCodecStream::Init()
{
m_bConnected = keep_running = false; // prepare for the worst
// create the send to address
m_uiPort = uiPort;
auto s = g_Reflector.GetTranscoderIp();
m_Ip.Initialize(strchr(s, ':') ? AF_INET6 : AF_INET, m_uiPort, s);
if (0 == strncasecmp(s, "none", 4))
{
return false; // the user has disabled the transcoder
}
// create socket address, family based on transcoder listen address
#ifdef LISTEN_IPV4
#ifdef LISTEN_IPV6
auto paddr = (AF_INET == m_Ip.GetFamily()) ? LISTEN_IPV4 : LISTEN_IPV6;
#else
auto paddr = LISTEN_IPV4;
#endif
#else
auto paddr = LISTEN_IPV6;
#endif
CIp ip(m_Ip.GetFamily(), m_uiPort, paddr);
// create our socket
if (ip.IsSet())
{
if (! m_Socket.Open(ip))
{
std::cerr << "Error opening socket on IP address " << m_Ip << std::endl;
return false;
}
}
else
{
std::cerr << "Could not initialize Codec Stream on " << paddr << std::endl;
return false;
}
keep_running = m_bConnected = true;
m_TCWriter.SetUp("ReftoTC");
keep_running = true;
m_Future = std::async(std::launch::async, &CCodecStream::Thread, this);
return true;
@ -110,8 +72,8 @@ bool CCodecStream::Init(uint16_t uiPort)
void CCodecStream::Close(void)
{
// close socket
keep_running = m_bConnected = false;
m_Socket.Close();
keep_running = false;
m_TCReader->Close();
// kill threads
if ( m_Future.valid() )
@ -141,59 +103,47 @@ void CCodecStream::Thread()
void CCodecStream::Task(void)
{
CBuffer Buffer;
CIp Ip;
uint8_t Ambe[9];
uint8_t DStarSync[] = { 0x55,0x2D,0x16 };
STCPacket pack;
// any packet from transcoder
if ( m_Socket.Receive(Buffer, Ip, 5) )
if ( ! m_TCReader->Receive(&pack, 5) )
{
// crack
if ( IsValidAmbePacket(Buffer, Ambe) )
// tickle
m_TimeoutTimer.start();
// update statistics
double ping = m_StatsTimer.time();
if ( m_fPingMin == -1 )
{
// tickle
m_TimeoutTimer.start();
// update statistics
double ping = m_StatsTimer.time();
if ( m_fPingMin == -1 )
{
m_fPingMin = ping;
m_fPingMax = ping;
}
else
{
m_fPingMin = MIN(m_fPingMin, ping);
m_fPingMax = MAX(m_fPingMax, ping);
}
m_fPingSum += ping;
m_fPingCount += 1;
// pop the original packet
if ( !m_LocalQueue.empty() )
{
auto Packet = m_LocalQueue.pop();
auto Frame = (CDvFramePacket *)Packet.get();
// todo: check the PID
// update content with transcoded ambe
Frame->SetAmbe(m_uiCodecOut, Ambe);
// tag syncs in DvData
if ( (m_uiCodecOut == CODEC_AMBEPLUS) && (Frame->GetPacketId() % 21) == 0 )
{
Frame->SetDvData(DStarSync);
}
// and push it back to client
m_PacketStream->Lock();
m_PacketStream->push(Packet);
m_PacketStream->Unlock();
}
else
{
std::cout << "Unexpected transcoded packet received from transcoder" << std::endl;
}
m_fPingMin = ping;
m_fPingMax = ping;
}
else
{
m_fPingMin = MIN(m_fPingMin, ping);
m_fPingMax = MAX(m_fPingMax, ping);
}
m_fPingSum += ping;
m_fPingCount += 1;
// pop the original packet
if ( !m_LocalQueue.empty() )
{
auto Packet = m_LocalQueue.pop();
auto Frame = (CDvFramePacket *)Packet.get();
// todo: check the PID
// update content with transcoded data
Frame->SetCodecData(&pack);
// and push it back to client
m_PacketStream->Lock();
m_PacketStream->push(Packet);
m_PacketStream->Unlock();
}
else
{
std::cout << "Unexpected transcoded packet received from transcoder" << std::endl;
}
}
@ -204,14 +154,13 @@ void CCodecStream::Task(void)
auto Packet = pop();
auto Frame = (CDvFramePacket *)Packet.get();
// yes, send to ambed
// yes, send to transcoder
// this assume that thread pushing the Packet
// have verified that the CodecStream is connected
// and that the packet needs transcoding
m_StatsTimer.start();
m_uiTotalPackets++;
EncodeAmbePacket(&Buffer, Frame->GetAmbe(m_uiCodecIn));
m_Socket.Send(Buffer, m_Ip, m_uiPort);
m_TCWriter.Send(Frame->GetCodecPacket());
// and push to our local queue
m_LocalQueue.push(Packet);
@ -220,33 +169,7 @@ void CCodecStream::Task(void)
// handle timeout
if ( !m_LocalQueue.empty() && (m_TimeoutTimer.time() >= (TRANSCODER_AMBEPACKET_TIMEOUT/1000.0f)) )
{
//std::cout << "ambed packet timeout" << std::endl;
//std::cout << "transcoder packet timeout" << std::endl;
m_uiTimeoutPackets++;
}
}
////////////////////////////////////////////////////////////////////////////////////////
/// packet decoding helpers
bool CCodecStream::IsValidAmbePacket(const CBuffer &Buffer, uint8_t *Ambe)
{
bool valid = false;
if ( (Buffer.size() == 11) && (Buffer.data()[0] == m_uiCodecOut) )
{
memcpy(Ambe, &(Buffer.data()[2]), 9);
valid = true;
}
return valid;
}
////////////////////////////////////////////////////////////////////////////////////////
/// packet encoding helpers
void CCodecStream::EncodeAmbePacket(CBuffer *Buffer, const uint8_t *Ambe)
{
Buffer->clear();
Buffer->Append(m_uiCodecIn);
Buffer->Append(m_uiPid);
Buffer->Append((uint8_t *)Ambe, 9);
}

@ -18,7 +18,7 @@
#pragma once
#include "UDPSocket.h"
#include "UnixDgramSocket.h"
#include "PacketQueue.h"
////////////////////////////////////////////////////////////////////////////////////////
@ -30,17 +30,16 @@ class CCodecStream : public CPacketQueue
{
public:
// constructor
CCodecStream(CPacketStream *, uint16_t, ECodecType);
CCodecStream(CPacketStream *packetstream, uint16_t streamid, ECodecType codectype, std::shared_ptr<CUnixDgramReader> reader);
// destructor
virtual ~CCodecStream();
// initialization
bool Init(uint16_t);
bool Init();
void Close(void);
// get
bool IsConnected(void) const { return m_bConnected; }
uint16_t GetStreamId(void) const { return m_uiStreamId; }
double GetPingMin(void) const { return m_fPingMin; }
double GetPingMax(void) const { return m_fPingMax; }
@ -53,26 +52,16 @@ public:
void Thread(void);
void Task(void);
protected:
// packet decoding helpers
bool IsValidAmbePacket(const CBuffer &, uint8_t *);
// packet encoding helpers
void EncodeAmbePacket(CBuffer *, const uint8_t *);
protected:
// data
uint16_t m_uiStreamId;
uint16_t m_uiPort;
uint8_t m_uiPid;
ECodecType m_eCodecIn;
// socket
CIp m_Ip;
CUdpSocket m_Socket;
bool m_bConnected;
uint16_t m_uiStreamId;
uint16_t m_uiPort;
uint8_t m_uiPid;
ECodecType m_eCodecIn;
// sockets
std::shared_ptr<CUnixDgramReader> m_TCReader;
CUnixDgramWriter m_TCWriter;
// associated packet stream
CPacketStream *m_PacketStream;

@ -396,12 +396,12 @@ bool CDcsProtocol::IsValidDvPacket(const CBuffer &Buffer, std::unique_ptr<CDvHea
if ( Buffer.data()[45] & 0x40U )
{
// it's the last frame
frame = std::unique_ptr<CDvLastFramePacket>(new CDvLastFramePacket((struct dstar_dvframe *)&(Buffer.data()[46]), *((uint16_t *)&(Buffer.data()[43])), Buffer.data()[45]));
frame = std::unique_ptr<CDvLastFramePacket>(new CDvLastFramePacket((SDstarFrame *)&(Buffer.data()[46]), *((uint16_t *)&(Buffer.data()[43])), Buffer.data()[45]));
}
else
{
// it's a regular DV frame
frame = std::unique_ptr<CDvFramePacket>(new CDvFramePacket((struct dstar_dvframe *)&(Buffer.data()[46]), *((uint16_t *)&(Buffer.data()[43])), Buffer.data()[45]));
frame = std::unique_ptr<CDvFramePacket>(new CDvFramePacket((SDstarFrame *)&(Buffer.data()[46]), *((uint16_t *)&(Buffer.data()[43])), Buffer.data()[45]));
}
// check validity of packets

@ -507,7 +507,7 @@ bool CDextraProtocol::IsValidDvFramePacket(const CBuffer &Buffer, std::unique_pt
if ( 27==Buffer.size() && 0==Buffer.Compare((uint8_t *)"DSVT", 4) && 0x20U==Buffer.data()[4] && 0x20U==Buffer.data()[8] && 0U==(Buffer.data()[14] & 0x40U) )
{
// create packet
dvframe = std::unique_ptr<CDvFramePacket>(new CDvFramePacket((struct dstar_dvframe *)&(Buffer.data()[15]), *((uint16_t *)&(Buffer.data()[12])), Buffer.data()[14]));
dvframe = std::unique_ptr<CDvFramePacket>(new CDvFramePacket((SDstarFrame *)&(Buffer.data()[15]), *((uint16_t *)&(Buffer.data()[12])), Buffer.data()[14]));
// check validity of packet
if ( dvframe && dvframe->IsValid() )
return true;
@ -520,7 +520,7 @@ bool CDextraProtocol::IsValidDvLastFramePacket(const CBuffer &Buffer, std::uniqu
if ( 27==Buffer.size() && 0==Buffer.Compare((uint8_t *)"DSVT", 4) && 0x20U==Buffer.data()[4] && 0x20U==Buffer.data()[8] && (Buffer.data()[14] & 0x40) )
{
// create packet
dvframe = std::unique_ptr<CDvLastFramePacket>(new CDvLastFramePacket((struct dstar_dvframe *)&(Buffer.data()[15]), *((uint16_t *)&(Buffer.data()[12])), Buffer.data()[14]));
dvframe = std::unique_ptr<CDvLastFramePacket>(new CDvLastFramePacket((SDstarFrame *)&(Buffer.data()[15]), *((uint16_t *)&(Buffer.data()[12])), Buffer.data()[14]));
// check validity of packet
if ( dvframe && dvframe->IsValid() )
return true;

@ -431,7 +431,7 @@ bool CDplusProtocol::IsValidDvFramePacket(const CBuffer &Buffer, std::unique_ptr
if ( 29==Buffer.size() && 0x1DU==Buffer.data()[0] && 0x80U==Buffer.data()[1] && 0==Buffer.Compare((uint8_t *)"DSVT", 2, 4) && 0x20U==Buffer.data()[6] && 0x20U==Buffer.data()[10] )
{
// create packet
dvframe = std::unique_ptr<CDvFramePacket>(new CDvFramePacket((struct dstar_dvframe *)&(Buffer.data()[17]), *((uint16_t *)&(Buffer.data()[14])), Buffer.data()[16]));
dvframe = std::unique_ptr<CDvFramePacket>(new CDvFramePacket((SDstarFrame *)&(Buffer.data()[17]), *((uint16_t *)&(Buffer.data()[14])), Buffer.data()[16]));
// check validity of packet
if ( dvframe && dvframe->IsValid() )
return true;
@ -444,7 +444,7 @@ bool CDplusProtocol::IsValidDvLastFramePacket(const CBuffer &Buffer, std::unique
if ( 32==Buffer.size() && 0==Buffer.Compare((uint8_t *)"DSVT", 2, 4) && 0x20U==Buffer.data()[0] && 0x80U==Buffer.data()[1] && 0x20U==Buffer.data()[6] && 0x20U==Buffer.data()[10] )
{
// create packet
dvframe = std::unique_ptr<CDvLastFramePacket>(new CDvLastFramePacket((struct dstar_dvframe *)&(Buffer.data()[17]), *((uint16_t *)&(Buffer.data()[14])), Buffer.data()[16]));
dvframe = std::unique_ptr<CDvLastFramePacket>(new CDvLastFramePacket((SDstarFrame *)&(Buffer.data()[17]), *((uint16_t *)&(Buffer.data()[14])), Buffer.data()[16]));
// check validity of packet
if ( dvframe && dvframe->IsValid() )
return true;

@ -25,25 +25,27 @@
CDvFramePacket::CDvFramePacket()
{
memset(m_uiAmbe, 0, AMBE_SIZE);
memset(m_uiDvData, 0, DVDATA_SIZE);
memset(m_uiAmbePlus, 0, AMBEPLUS_SIZE);
memset(m_uiDvSync, 0, DVSYNC_SIZE);
memset(m_uiCodec2, 0, 16);
memset(m_TCPack.dstar, 0, 9);
memset(m_uiDvData, 0, 3);
memset(m_TCPack.dmr, 0, 9);
memset(m_uiDvSync, 0, 7);
memset(m_TCPack.m17, 0, 16);
memset(m_Nonce, 0, 14);
m_TCPack.codec_in = ECodecType::none;
};
// dstar constructor
CDvFramePacket::CDvFramePacket(const struct dstar_dvframe *dvframe, uint16_t sid, uint8_t pid)
CDvFramePacket::CDvFramePacket(const SDStarFrame *dvframe, uint16_t sid, uint8_t pid)
: CPacket(sid, pid)
{
memcpy(m_uiAmbe, dvframe->AMBE, AMBE_SIZE);
memcpy(m_uiDvData, dvframe->DVDATA, DVDATA_SIZE);
memset(m_uiAmbePlus, 0, AMBEPLUS_SIZE);
memset(m_uiDvSync, 0, DVSYNC_SIZE);
memset(m_uiCodec2, 0, 16);
memcpy(m_TCPack.dstar, dvframe->AMBE, 9);
memcpy(m_uiDvData, dvframe->DVDATA, 3);
memset(m_TCPack.dmr, 0, 9);
memset(m_uiDvSync, 0, 7);
memset(m_TCPack.m17, 0, 16);
memset(m_Nonce, 0, 14);
m_TCPack.codec_in = ECodecType::dstar;
}
// dmr constructor
@ -51,12 +53,13 @@ CDvFramePacket::CDvFramePacket(const struct dstar_dvframe *dvframe, uint16_t sid
CDvFramePacket::CDvFramePacket(const uint8_t *ambe, const uint8_t *sync, uint16_t sid, uint8_t pid, uint8_t spid)
: CPacket(sid, pid, spid)
{
memcpy(m_uiAmbePlus, ambe, AMBEPLUS_SIZE);
memcpy(m_uiDvSync, sync, DVSYNC_SIZE);
memset(m_uiAmbe, 0, AMBE_SIZE);
memset(m_uiDvData, 0, DVDATA_SIZE);
memset(m_uiCodec2, 0, 16);
memcpy(m_TCPack.dmr, ambe, 9);
memcpy(m_uiDvSync, sync, 7);
memset(m_TCPack.dstar, 0, 9);
memset(m_uiDvData, 0, 3);
memset(m_TCPack.m17, 0, 16);
memset(m_Nonce, 0, 14);
m_TCPack.codec_in = ECodecType::dmr;
}
// ysf constructor
@ -64,12 +67,13 @@ CDvFramePacket::CDvFramePacket(const uint8_t *ambe, const uint8_t *sync, uint16_
CDvFramePacket::CDvFramePacket(const uint8_t *ambe, uint16_t sid, uint8_t pid, uint8_t spid, uint8_t fid)
: CPacket(sid, pid, spid, fid)
{
memcpy(m_uiAmbePlus, ambe, AMBEPLUS_SIZE);
memset(m_uiDvSync, 0, DVSYNC_SIZE);
memset(m_uiAmbe, 0, AMBE_SIZE);
memset(m_uiDvData, 0, DVDATA_SIZE);
memset(m_uiCodec2, 0, 16);
memcpy(m_TCPack.dmr, ambe, 9);
memset(m_uiDvSync, 0, 7);
memset(m_TCPack.dstar, 0, 9);
memset(m_uiDvData, 0, 3);
memset(m_TCPack.m17, 0, 16);
memset(m_Nonce, 0, 14);
m_TCPack.codec_in = ECodecType::dmr;
}
// xlx constructor
@ -80,22 +84,35 @@ CDvFramePacket::CDvFramePacket
uint8_t dmrpid, uint8_t dprspid, const uint8_t *dmrambe, const uint8_t *dmrsync, ECodecType codecInType, const uint8_t *codec2, const uint8_t * nonce)
: CPacket(sid, dstarpid, dmrpid, dprspid, 0xFF, 0xFF, 0xFF, codecInType)
{
memcpy(m_uiAmbe, dstarambe, AMBE_SIZE);
memcpy(m_uiDvData, dstardvdata, DVDATA_SIZE);
memcpy(m_uiAmbePlus, dmrambe, AMBEPLUS_SIZE);
memcpy(m_uiDvSync, dmrsync, DVSYNC_SIZE);
memcpy(m_uiCodec2, codec2, 16);
memcpy(m_TCPack.dstar, dstarambe, 9);
memcpy(m_uiDvData, dstardvdata, 3);
memcpy(m_TCPack.dmr, dmrambe, 9);
memcpy(m_uiDvSync, dmrsync, 7);
memcpy(m_TCPack.m17, codec2, 16);
memcpy(m_Nonce, nonce, 14);
m_TCPack.codec_in = codecInType;
}
CDvFramePacket::CDvFramePacket(const CM17Packet &m17) : CPacket(m17)
{
memset(m_uiAmbe, 0, AMBE_SIZE);
memset(m_uiDvData, 0, DVDATA_SIZE);
memset(m_uiAmbePlus, 0, AMBEPLUS_SIZE);
memset(m_uiDvSync, 0, DVSYNC_SIZE);
memcpy(m_uiCodec2, m17.GetPayload(), 16);
memset(m_TCPack.dstar, 0, 9);
memset(m_uiDvData, 0, 3);
memset(m_TCPack.dmr, 0, 9);
memset(m_uiDvSync, 0, 7);
memcpy(m_TCPack.m17, m17.GetPayload(), 16);
memcpy(m_Nonce, m17.GetNonce(), 14);
switch (0x6U & m17.GetFrameType())
{
case 0x4U:
m_TCPack.codec_in = ECodecType::c2_3200;
break;
case 0x6U:
m_TCPack.codec_in = ECodecType::c2_1600;
break;
default:
m_TCPack.codec_in = ECodecType::none;
break;
}
}
////////////////////////////////////////////////////////////////////////////////////////
@ -114,12 +131,12 @@ const uint8_t *CDvFramePacket::GetCodecData(ECodecType type) const
switch (type)
{
case ECodecType::dstar:
return m_uiAmbe;
return m_TCPack.dstar;
case ECodecType::dmr:
return m_uiAmbePlus;
return m_TCPack.dmr;
case ECodecType::c2_1600:
case ECodecType::c2_3200:
return m_uiCodec2;
return m_TCPack.m17;
default:
return nullptr;
}
@ -130,39 +147,42 @@ const uint8_t *CDvFramePacket::GetCodecData(ECodecType type) const
void CDvFramePacket::SetDvData(uint8_t *DvData)
{
memcpy(m_uiDvData, DvData, DVDATA_SIZE);
memcpy(m_uiDvData, DvData, 3);
}
void CDvFramePacket::SetCodecData(ECodecType type, uint8_t *data)
void CDvFramePacket::SetCodecData(ECodecType type, const uint8_t *data)
{
switch (type)
{
case ECodecType::dstar:
memcpy(m_uiAmbe, data, AMBE_SIZE);
memcpy(m_TCPack.dstar, data, 9);
break;
case ECodecType::dmr:
memcpy(m_uiAmbePlus, data, DVDATA_SIZE);
memcpy(m_TCPack.dmr, data, 3);
break;
case ECodecType::c2_1600:
memcpy(m_uiCodec2, data, 8);
memcpy(m_TCPack.m17, data, 8);
break;
case ECodecType::c2_3200:
memcpy(m_uiCodec2, data, 16);
memcpy(m_TCPack.m17, data, 16);
break;
}
}
void CDvFramePacket::SetCodecData(const STCPacket *pack)
{
memcpy(&m_TCPack, pack, sizeof(STCPacket));
}
////////////////////////////////////////////////////////////////////////////////////////
// operators
bool CDvFramePacket::operator ==(const CDvFramePacket &DvFrame) const
{
return ( (memcmp(m_uiAmbe, DvFrame.m_uiAmbe, AMBE_SIZE) == 0)
&& (memcmp(m_uiDvData, DvFrame.m_uiDvData, DVDATA_SIZE) == 0)
#ifndef NO_XLX
&& (memcmp(m_uiAmbePlus, DvFrame.m_uiAmbePlus, AMBEPLUS_SIZE) == 0)
&& (memcmp(m_uiDvSync, DvFrame.m_uiDvSync, DVSYNC_SIZE) == 0)
#endif
return ( (memcmp(m_TCPack.dstar, DvFrame.m_TCPack.dstar, 9) == 0)
&& (memcmp(m_uiDvData, DvFrame.m_uiDvData, 3) == 0)
&& (memcmp(m_TCPack.dmr, DvFrame.m_TCPack.dmr, 9) == 0)
&& (memcmp(m_uiDvSync, DvFrame.m_uiDvSync, 7) == 0)
);
}

@ -23,18 +23,12 @@
////////////////////////////////////////////////////////////////////////////////////////
// defines
#define AMBE_SIZE 9
#define DVDATA_SIZE 3
#define AMBEPLUS_SIZE 9
#define DVSYNC_SIZE 7
// typedef & structures
struct __attribute__ ((__packed__))dstar_dvframe
using SDStarFrame = struct __attribute__ ((__packed__))dstar_dvframe_tag
{
uint8_t AMBE[AMBE_SIZE];
uint8_t DVDATA[DVDATA_SIZE];
uint8_t AMBE[9];
uint8_t DVDATA[3];
};
////////////////////////////////////////////////////////////////////////////////////////
@ -46,7 +40,7 @@ class CDvFramePacket : public CPacket
public:
// constructor
CDvFramePacket();
CDvFramePacket(const struct dstar_dvframe *, uint16_t, uint8_t);
CDvFramePacket(const SDStarFrame *, uint16_t, uint8_t);
CDvFramePacket(const uint8_t *, const uint8_t *, uint16_t, uint8_t, uint8_t);
CDvFramePacket(const uint8_t *, uint16_t, uint8_t, uint8_t, uint8_t);
CDvFramePacket(uint16_t, uint8_t, const uint8_t *, const uint8_t *, uint8_t, uint8_t, const uint8_t *, const uint8_t *, ECodecType, const uint8_t *, const uint8_t *);
@ -60,6 +54,7 @@ public:
bool HasTranscodableAmbe(void) const { return true; }
// get
const STCPacket *GetCodecPacket() const { return &m_TCPack; }
const uint8_t *GetCodecData(ECodecType) const;
const uint8_t *GetDvSync(void) const { return m_uiDvSync; }
const uint8_t *GetDvData(void) const { return m_uiDvData; }
@ -67,19 +62,19 @@ public:
// set
void SetDvData(uint8_t *);
void SetCodecData(ECodecType, uint8_t *);
void SetCodecData(ECodecType, const uint8_t *);
void SetCodecData(const STCPacket *pack);
// operators
bool operator ==(const CDvFramePacket &) const;
protected:
// data (dstar)
uint8_t m_uiAmbe[AMBE_SIZE];
uint8_t m_uiDvData[DVDATA_SIZE];
uint8_t m_uiDvData[3];
// data (dmr)
uint8_t m_uiAmbePlus[AMBEPLUS_SIZE];
uint8_t m_uiDvSync[DVSYNC_SIZE];
uint8_t m_uiCodec2[16];
uint8_t m_uiDvSync[7];
// m17
uint8_t m_Nonce[14];
// the transcoder packet
STCPacket m_TCPack;
};

@ -29,7 +29,7 @@ CDvLastFramePacket::CDvLastFramePacket()
// dstar constructor
CDvLastFramePacket::CDvLastFramePacket(const struct dstar_dvframe *DvFrame, uint16_t sid, uint8_t pid)
CDvLastFramePacket::CDvLastFramePacket(const SDstarFrame *DvFrame, uint16_t sid, uint8_t pid)
: CDvFramePacket(DvFrame, sid, pid)
{
}

@ -32,7 +32,7 @@ class CDvLastFramePacket : public CDvFramePacket
public:
// constructor
CDvLastFramePacket();
CDvLastFramePacket(const struct dstar_dvframe *, uint16_t, uint8_t);
CDvLastFramePacket(const SDstarFrame *, uint16_t, uint8_t);
CDvLastFramePacket(const uint8_t *, const uint8_t *, uint16_t, uint8_t, uint8_t);
CDvLastFramePacket(const uint8_t *, uint16_t, uint8_t, uint8_t, uint8_t);
CDvLastFramePacket(uint16_t, uint8_t, const uint8_t *, const uint8_t *, uint8_t, uint8_t, const uint8_t *, const uint8_t *, ECodecType, const uint8_t *);

@ -0,0 +1,32 @@
#pragma once
// urfd -- The universal reflector
// Copyright © 2021 Thomas A. Early N7TAE
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#include <cstdint>
enum class ECodecType { none, dstar, dmr, c2_1600, c2_3200 };
using STCPacket = struct tcpacket_tag {
char module;
bool is_last_packet;
bool has_valid_m17;
uint16_t streamid;
ECodecType codec_in;
uint8_t dstar[9];
uint8_t dmr[9];
uint8_t m17[16];
};

@ -606,7 +606,7 @@ bool CG3Protocol::IsValidDvFramePacket(const CBuffer &Buffer, std::unique_ptr<CD
if ( 27==Buffer.size() && 0==Buffer.Compare((uint8_t *)"DSVT", 4) && 0x20U==Buffer.data()[4] && 0x20U==Buffer.data()[8] && 0U==(Buffer.data()[14] & 0x40U) )
{
// create packet
dvframe = std::unique_ptr<CDvFramePacket>(new CDvFramePacket((struct dstar_dvframe *)&(Buffer.data()[15]), *((uint16_t *)&(Buffer.data()[12])), Buffer.data()[14]));
dvframe = std::unique_ptr<CDvFramePacket>(new CDvFramePacket((SDstarFrame *)&(Buffer.data()[15]), *((uint16_t *)&(Buffer.data()[12])), Buffer.data()[14]));
// check validity of packet
if ( dvframe && dvframe->IsValid() )
return true;
@ -619,7 +619,7 @@ bool CG3Protocol::IsValidDvLastFramePacket(const CBuffer &Buffer, std::unique_pt
if ( 27==Buffer.size() && 0==Buffer.Compare((uint8_t *)"DSVT", 4) && 0x20U==Buffer.data()[4] && 0x20U==Buffer.data()[8] && (Buffer.data()[14] & 0x40U) )
{
// create packet
dvframe = std::unique_ptr<CDvLastFramePacket>(new CDvLastFramePacket((struct dstar_dvframe *)&(Buffer.data()[15]), *((uint16_t *)&(Buffer.data()[12])), Buffer.data()[14]));
dvframe = std::unique_ptr<CDvLastFramePacket>(new CDvLastFramePacket((SDstarFrame *)&(Buffer.data()[15]), *((uint16_t *)&(Buffer.data()[12])), Buffer.data()[14]));
// check validity of packet
if ( dvframe && dvframe->IsValid() )
return true;

@ -20,11 +20,10 @@
// Origin Id
enum class ECodecType { none, dstar, dmr, c2_1600, c2_3200 };
#define ORIGIN_LOCAL 0
#define ORIGIN_PEER 1
#include "Defines.h"
#include "M17Packet.h"
class CPacket

@ -22,7 +22,7 @@
////////////////////////////////////////////////////////////////////////////////////////
// constructor
CPacketStream::CPacketStream()
CPacketStream::CPacketStream(std::shared_ptr<CUnixDgramReader> reader)
{
m_bOpen = false;
m_uiStreamId = 0;
@ -30,6 +30,7 @@ CPacketStream::CPacketStream()
m_OwnerClient = nullptr;
#ifdef TRANSCODED_MODULES
m_CodecStream = nullptr;
m_TCReader = reader;
#endif
}
@ -52,9 +53,7 @@ bool CPacketStream::OpenPacketStream(const CDvHeaderPacket &DvHeader, std::share
m_LastPacketTime.start();
#ifdef TRANSCODED_MODULES
if (std::string::npos != std::string(TRANSCODED_MODULES).find(DvHeader.GetRpt2Module()))
m_CodecStream = g_Transcoder.GetCodecStream(this, client->GetCodec());
else
m_CodecStream = g_Transcoder.GetCodecStream(this, ECodecType::none);
m_CodecStream = std::make_shared<CCodecStream>(this, client->GetCodec(), m_TCReader);
#endif
ok = true;
}
@ -89,7 +88,7 @@ void CPacketStream::Push(std::unique_ptr<CPacket> Packet)
m_CodecStream->Lock();
{
// transcoder ready & frame need transcoding ?
if ( m_CodecStream->IsConnected() && Packet->HasTranscodableAmbe() )
if (Packet->HasTranscodableAmbe())
{
// yes, push packet to trancoder queue
// trancoder will push it after transcoding

@ -31,11 +31,13 @@
////////////////////////////////////////////////////////////////////////////////////////
// class
#include "UnixDgramSocket.h"
class CPacketStream : public CPacketQueue
{
public:
// constructor
CPacketStream();
CPacketStream(std::shared_ptr<CUnixDgramReader>);
// open / close
bool OpenPacketStream(const CDvHeaderPacket &, std::shared_ptr<CClient>);
@ -63,6 +65,7 @@ protected:
CDvHeaderPacket m_DvHeader;
std::shared_ptr<CClient> m_OwnerClient;
#ifdef TRANSCODED_MODULES
std::shared_ptr<CUnixDgramReader> m_TCReader;
std::shared_ptr<CCodecStream> m_CodecStream;
#endif
};

@ -93,7 +93,15 @@ bool CReflector::Start(void)
// start one thread per reflector module
for (auto it=m_Modules.cbegin(); it!=m_Modules.cend(); it++)
{
m_Stream[*it] = std::make_shared<CPacketStream>();
m_TCReader[*it] = std::make_shared<CUnixDgramReader>();
std::string readername("TCtoRef");
readername.append(1, *it);
if (m_TCReader[*it]->Open(readername.c_str()))
{
std::cerr << "ERROR: Reflector can't open " << readername << std::endl;
m_TCReader[*it].reset();
}
m_Stream[*it] = std::make_shared<CPacketStream>(m_TCReader[*it]);
m_RouterFuture[*it] = std::async(std::launch::async, &CReflector::RouterThread, this, *it);
}

@ -24,6 +24,9 @@
#include "Protocols.h"
#include "PacketStream.h"
#include "NotificationQueue.h"
#ifdef TRANSCODED_MODULES
#include "UnixDgramSocket.h"
#endif
////////////////////////////////////////////////////////////////////////////////////////
@ -126,6 +129,7 @@ protected:
// queues
std::unordered_map<char, std::shared_ptr<CPacketStream>> m_Stream;
std::unordered_map<char, std::shared_ptr<CUnixDgramReader>> m_TCReader;
// threads
std::atomic<bool> keep_running;

@ -0,0 +1,161 @@
// urfd -- The universal reflector
// Copyright © 2021 Thomas A. Early N7TAE
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#pragma once
#include <iostream>
#include <unistd.h>
#include <string.h>
#include <cstring>
#include <thread>
#include <chrono>
#include <errno.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include "UnixDgramSocket.h"
CUnixDgramReader::CUnixDgramReader() : fd(-1) {}
CUnixDgramReader::~CUnixDgramReader()
{
Close();
}
bool CUnixDgramReader::Open(const char *path) // returns true on failure
{
fd = socket(AF_UNIX, SOCK_DGRAM, 0);
if (fd < 0)
{
std::cerr << "socket() failed for " << path << ": " << strerror(errno) << std::endl;
return true;
}
//fcntl(fd, F_SETFL, O_NONBLOCK);
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path+1, path, sizeof(addr.sun_path)-2);
int rval = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
if (rval < 0)
{
std::cerr << "bind() failed for " << path << ": " << strerror(errno) << std::endl;
close(fd);
fd = -1;
return true;
}
return false;
}
bool CUnixDgramReader::Receive(STCPacket *pack, unsigned timeout) const
{
// socket valid ?
if ( 0 > fd )
return true;
// control socket
fd_set FdSet;
FD_ZERO(&FdSet);
FD_SET(fd, &FdSet);
struct timeval tv;
tv.tv_sec = timeout / 1000;
tv.tv_usec = (timeout % 1000) * 1000;
auto rval = select(fd + 1, &FdSet, 0, 0, &tv);
if (rval <= 0) {
if (rval < 0) {
std::cerr << "select() error on transcoder socket: " << strerror(errno) << std::endl;
}
return true;
}
auto len = read(fd, pack, sizeof(STCPacket));
if (len != sizeof(STCPacket)) {
std::cerr << "Received transcoder packet is wrong size: " << len << " but should be " << sizeof(STCPacket) << std::endl;
return true;
}
return false;
}
void CUnixDgramReader::Close()
{
if (fd >= 0)
close(fd);
fd = -1;
}
int CUnixDgramReader::GetFD() const
{
return fd;
}
CUnixDgramWriter::CUnixDgramWriter() {}
CUnixDgramWriter::~CUnixDgramWriter() {}
void CUnixDgramWriter::SetUp(const char *path) // returns true on failure
{
// setup the socket address
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path+1, path, sizeof(addr.sun_path)-2);
}
bool CUnixDgramWriter::Send(const STCPacket *pack) const
{
auto len = Write(pack, sizeof(STCPacket));
if (len != sizeof(STCPacket))
return true;
return false;
}
ssize_t CUnixDgramWriter::Write(const void *buf, ssize_t size) const
{
// open the socket
int fd = socket(AF_UNIX, SOCK_DGRAM, 0);
if (fd < 0)
{
std::cerr << "socket() failed for " << addr.sun_path+1 << ": " << strerror(errno) << std::endl;
return -1;
}
// connect to the receiver
int rval = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
if (rval < 0)
{
std::cerr << "connect() failed for " << addr.sun_path+1 << ": " << strerror(errno) << std::endl;
close(fd);
return -1;
}
auto written = write(fd, buf, size);
if (written != size) {
std::cerr << "write on " << addr.sun_path+1;
if (written < 0)
std::cerr << " returned error: " << strerror(errno) << std::endl;
else if (written == 0)
std::cerr << " returned zero" << std::endl;
else
std::cerr << " only wrote " << written << " bytes, should be " << size << std::endl;
}
close(fd);
return written;
}

@ -0,0 +1,50 @@
#pragma once
// urfd -- The universal reflector
// Copyright © 2021 Thomas A. Early N7TAE
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#pragma once
#include <stdlib.h>
#include <sys/un.h>
#include "Defines.h"
class CUnixDgramReader
{
public:
CUnixDgramReader();
~CUnixDgramReader();
bool Open(const char *path);
bool Receive(STCPacket *pack, unsigned timeout) const;
void Close();
int GetFD() const;
private:
int fd;
};
class CUnixDgramWriter
{
public:
CUnixDgramWriter();
~CUnixDgramWriter();
void SetUp(const char *path);
bool Send(const STCPacket *pack) const;
private:
ssize_t Write(const void *buf, ssize_t size) const;
struct sockaddr_un addr;
};

@ -1,207 +0,0 @@
/*
* Copyright (C) 2020 by Thomas Early N7TAE
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
#include <iostream>
#include <string>
#include <thread>
#include <chrono>
#include <unistd.h>
#include <sys/un.h>
#include <sys/socket.h>
#include "UnixPacketSock.h"
CUnixPacket::CUnixPacket() : m_fd(-1), m_host(NULL) {}
ssize_t CUnixPacket::Read(void *buffer, const ssize_t size)
{
if (0 > m_fd)
return -1;
ssize_t len = read(m_fd, buffer, size);
if (len < 1)
{
if (-1 == len)
{
std::cerr << "Read error on '" << m_name << "': " << strerror(errno) << std::endl;
}
else if (0 == len)
{
std::cerr << "Read error on '" << m_name << "': EOF" << std::endl;
}
if (Restart())
return -1;
else
return 0;
}
return len;
}
bool CUnixPacket::Write(const void *buffer, const ssize_t size)
{
if (0 > m_fd)
return true;
ssize_t written = write(m_fd, buffer, size);
if (written != size)
{
if (-1 == written)
{
std::cerr << "Write error on '" << m_name << "': " << strerror(errno) << std::endl;
}
else
{
std::cout << "Write error on '" << m_name << "': Only wrote " << written << " of " << size << " bytes" << std::endl;
}
return Restart();
}
return false;
}
bool CUnixPacket::Restart()
{
if (! m_host->IsRunning())
return true;
std::cout << "Restarting '" << m_name << "'... " << std::endl;
Close();
std::string name(m_name);
return Open(name.c_str(), m_host);
}
int CUnixPacket::GetFD()
{
return m_fd;
}
CUnixPacketServer::CUnixPacketServer() : m_server(-1) {}
CUnixPacketServer::~CUnixPacketServer()
{
Close();
}
bool CUnixPacketServer::Open(const char *name, CKRBase *host)
{
m_server = socket(AF_UNIX, SOCK_SEQPACKET, 0);
m_host = host;
if (m_server < 0)
{
std::cerr << "Cannot open '" << name << "' socket: " << strerror(errno) << std::endl;
return true;
}
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
memcpy(addr.sun_path+1, name, strlen(name));
if (-1 == bind(m_server, (struct sockaddr *)&addr, sizeof(addr)))
{
std::cerr << "Cannot bind '" << name << "' socket: " << strerror(errno) << std::endl;
Close();
return true;
}
if (-1 == listen(m_server, 1))
{
std::cerr << "Cannot listen on '" << name << "' socket: " << strerror(errno) << std::endl;
Close();
return true;
}
m_fd = accept(m_server, nullptr, 0);
if (m_fd < 0)
{
std::cerr << "Cannot accept on '" << name << "' socket: " << strerror(errno) << std::endl;
Close();
return true;
}
strncpy(m_name, name, 108);
return false;
}
void CUnixPacketServer::Close()
{
if (m_server >= 0)
{
close(m_server);
m_server = -1;
}
if (m_fd >= 0)
{
close(m_fd);
m_fd = -1;
}
}
CUnixPacketClient::~CUnixPacketClient()
{
Close();
}
bool CUnixPacketClient::Open(const char *name, CKRBase *host)
{
m_fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
if (m_fd < 0)
{
std::cerr << "Cannot open unix client socket " << name << std::endl;
return true;
}
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
memcpy(addr.sun_path+1, name, strlen(name));
int rval = -1;
int tries = 0;
while (rval < 0)
{
rval = connect(m_fd, (struct sockaddr *)&addr, sizeof(addr));
if (rval < 0)
{
if (ECONNREFUSED == errno)
{
if (0 == tries++ % 20)
std::cout << "Waiting for " << name << " server to start..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(250));
}
else
{
std::cerr << "Cannot connect '" << name << "' socket: " << strerror(errno) << std::endl;
Close();
return true;
}
}
if (! m_host->IsRunning())
{
Close();
return true;
}
}
m_host = host;
strncpy(m_name, name, 108);
return false;
}
void CUnixPacketClient::Close()
{
if (m_fd >= 0)
{
close(m_fd);
m_fd = -1;
}
}

@ -1,58 +0,0 @@
#pragma once
/*
* Copyright (C) 2020 by Thomas Early N7TAE
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
#include <sys/types.h>
#include "KRBase.h"
class CUnixPacket
{
public:
CUnixPacket();
virtual bool Open(const char *name, CKRBase *host) = 0;
virtual void Close() = 0;
bool Write(const void *buffer, const ssize_t size);
ssize_t Read(void *buffer, const ssize_t size);
int GetFD();
protected:
bool Restart();
int m_fd;
CKRBase *m_host;
char m_name[108];
};
class CUnixPacketServer : public CUnixPacket
{
public:
CUnixPacketServer();
~CUnixPacketServer();
bool Open(const char *name, CKRBase *host);
void Close();
protected:
int m_server;
};
class CUnixPacketClient : public CUnixPacket
{
public:
~CUnixPacketClient();
bool Open(const char *name, CKRBase *host);
void Close();
};
Loading…
Cancel
Save

Powered by TurnKey Linux.