diff --git a/reflector/CodecStream.cpp b/reflector/CodecStream.cpp index d755267..ce47687 100644 --- a/reflector/CodecStream.cpp +++ b/reflector/CodecStream.cpp @@ -25,13 +25,12 @@ //////////////////////////////////////////////////////////////////////////////////////// // constructor -CCodecStream::CCodecStream(CPacketStream *PacketStream, uint16_t uiId, ECodecType type) +CCodecStream::CCodecStream(CPacketStream *PacketStream, uint16_t streamid, ECodecType type, std::shared_ptr reader) { keep_running = true; - m_uiStreamId = uiId; + m_uiStreamId = streamid; m_uiPid = 0; m_eCodecIn = type; - m_bConnected = false; m_fPingMin = -1; m_fPingMax = -1; m_fPingSum = 0; @@ -39,6 +38,7 @@ CCodecStream::CCodecStream(CPacketStream *PacketStream, uint16_t uiId, ECodecTyp m_uiTotalPackets = 0; m_uiTimeoutPackets = 0; m_PacketStream = PacketStream; + m_TCReader = reader; } //////////////////////////////////////////////////////////////////////////////////////// @@ -47,7 +47,7 @@ CCodecStream::CCodecStream(CPacketStream *PacketStream, uint16_t uiId, ECodecTyp CCodecStream::~CCodecStream() { // close socket - m_Socket.Close(); + m_TCReader->Close(); // kill threads keep_running = false; @@ -60,48 +60,10 @@ CCodecStream::~CCodecStream() //////////////////////////////////////////////////////////////////////////////////////// // initialization -bool CCodecStream::Init(uint16_t uiPort) +bool CCodecStream::Init() { - m_bConnected = keep_running = false; // prepare for the worst - - // create the send to address - m_uiPort = uiPort; - auto s = g_Reflector.GetTranscoderIp(); - m_Ip.Initialize(strchr(s, ':') ? AF_INET6 : AF_INET, m_uiPort, s); - - if (0 == strncasecmp(s, "none", 4)) - { - return false; // the user has disabled the transcoder - } - - // create socket address, family based on transcoder listen address -#ifdef LISTEN_IPV4 -#ifdef LISTEN_IPV6 - auto paddr = (AF_INET == m_Ip.GetFamily()) ? LISTEN_IPV4 : LISTEN_IPV6; -#else - auto paddr = LISTEN_IPV4; -#endif -#else - auto paddr = LISTEN_IPV6; -#endif - CIp ip(m_Ip.GetFamily(), m_uiPort, paddr); - - // create our socket - if (ip.IsSet()) - { - if (! m_Socket.Open(ip)) - { - std::cerr << "Error opening socket on IP address " << m_Ip << std::endl; - return false; - } - } - else - { - std::cerr << "Could not initialize Codec Stream on " << paddr << std::endl; - return false; - } - - keep_running = m_bConnected = true; + m_TCWriter.SetUp("ReftoTC"); + keep_running = true; m_Future = std::async(std::launch::async, &CCodecStream::Thread, this); return true; @@ -110,8 +72,8 @@ bool CCodecStream::Init(uint16_t uiPort) void CCodecStream::Close(void) { // close socket - keep_running = m_bConnected = false; - m_Socket.Close(); + keep_running = false; + m_TCReader->Close(); // kill threads if ( m_Future.valid() ) @@ -141,59 +103,47 @@ void CCodecStream::Thread() void CCodecStream::Task(void) { - CBuffer Buffer; - CIp Ip; - uint8_t Ambe[9]; - uint8_t DStarSync[] = { 0x55,0x2D,0x16 }; + STCPacket pack; // any packet from transcoder - if ( m_Socket.Receive(Buffer, Ip, 5) ) + if ( ! m_TCReader->Receive(&pack, 5) ) { - // crack - if ( IsValidAmbePacket(Buffer, Ambe) ) + // tickle + m_TimeoutTimer.start(); + + // update statistics + double ping = m_StatsTimer.time(); + if ( m_fPingMin == -1 ) { - // tickle - m_TimeoutTimer.start(); - - // update statistics - double ping = m_StatsTimer.time(); - if ( m_fPingMin == -1 ) - { - m_fPingMin = ping; - m_fPingMax = ping; - - } - else - { - m_fPingMin = MIN(m_fPingMin, ping); - m_fPingMax = MAX(m_fPingMax, ping); - - } - m_fPingSum += ping; - m_fPingCount += 1; - - // pop the original packet - if ( !m_LocalQueue.empty() ) - { - auto Packet = m_LocalQueue.pop(); - auto Frame = (CDvFramePacket *)Packet.get(); - // todo: check the PID - // update content with transcoded ambe - Frame->SetAmbe(m_uiCodecOut, Ambe); - // tag syncs in DvData - if ( (m_uiCodecOut == CODEC_AMBEPLUS) && (Frame->GetPacketId() % 21) == 0 ) - { - Frame->SetDvData(DStarSync); - } - // and push it back to client - m_PacketStream->Lock(); - m_PacketStream->push(Packet); - m_PacketStream->Unlock(); - } - else - { - std::cout << "Unexpected transcoded packet received from transcoder" << std::endl; - } + m_fPingMin = ping; + m_fPingMax = ping; + + } + else + { + m_fPingMin = MIN(m_fPingMin, ping); + m_fPingMax = MAX(m_fPingMax, ping); + + } + m_fPingSum += ping; + m_fPingCount += 1; + + // pop the original packet + if ( !m_LocalQueue.empty() ) + { + auto Packet = m_LocalQueue.pop(); + auto Frame = (CDvFramePacket *)Packet.get(); + // todo: check the PID + // update content with transcoded data + Frame->SetCodecData(&pack); + // and push it back to client + m_PacketStream->Lock(); + m_PacketStream->push(Packet); + m_PacketStream->Unlock(); + } + else + { + std::cout << "Unexpected transcoded packet received from transcoder" << std::endl; } } @@ -204,14 +154,13 @@ void CCodecStream::Task(void) auto Packet = pop(); auto Frame = (CDvFramePacket *)Packet.get(); - // yes, send to ambed + // yes, send to transcoder // this assume that thread pushing the Packet // have verified that the CodecStream is connected // and that the packet needs transcoding m_StatsTimer.start(); m_uiTotalPackets++; - EncodeAmbePacket(&Buffer, Frame->GetAmbe(m_uiCodecIn)); - m_Socket.Send(Buffer, m_Ip, m_uiPort); + m_TCWriter.Send(Frame->GetCodecPacket()); // and push to our local queue m_LocalQueue.push(Packet); @@ -220,33 +169,7 @@ void CCodecStream::Task(void) // handle timeout if ( !m_LocalQueue.empty() && (m_TimeoutTimer.time() >= (TRANSCODER_AMBEPACKET_TIMEOUT/1000.0f)) ) { - //std::cout << "ambed packet timeout" << std::endl; + //std::cout << "transcoder packet timeout" << std::endl; m_uiTimeoutPackets++; } } - -//////////////////////////////////////////////////////////////////////////////////////// -/// packet decoding helpers - -bool CCodecStream::IsValidAmbePacket(const CBuffer &Buffer, uint8_t *Ambe) -{ - bool valid = false; - - if ( (Buffer.size() == 11) && (Buffer.data()[0] == m_uiCodecOut) ) - { - memcpy(Ambe, &(Buffer.data()[2]), 9); - valid = true; - } - return valid; -} - -//////////////////////////////////////////////////////////////////////////////////////// -/// packet encoding helpers - -void CCodecStream::EncodeAmbePacket(CBuffer *Buffer, const uint8_t *Ambe) -{ - Buffer->clear(); - Buffer->Append(m_uiCodecIn); - Buffer->Append(m_uiPid); - Buffer->Append((uint8_t *)Ambe, 9); -} diff --git a/reflector/CodecStream.h b/reflector/CodecStream.h index 4193ba7..19995d2 100644 --- a/reflector/CodecStream.h +++ b/reflector/CodecStream.h @@ -18,7 +18,7 @@ #pragma once -#include "UDPSocket.h" +#include "UnixDgramSocket.h" #include "PacketQueue.h" //////////////////////////////////////////////////////////////////////////////////////// @@ -30,17 +30,16 @@ class CCodecStream : public CPacketQueue { public: // constructor - CCodecStream(CPacketStream *, uint16_t, ECodecType); + CCodecStream(CPacketStream *packetstream, uint16_t streamid, ECodecType codectype, std::shared_ptr reader); // destructor virtual ~CCodecStream(); // initialization - bool Init(uint16_t); + bool Init(); void Close(void); // get - bool IsConnected(void) const { return m_bConnected; } uint16_t GetStreamId(void) const { return m_uiStreamId; } double GetPingMin(void) const { return m_fPingMin; } double GetPingMax(void) const { return m_fPingMax; } @@ -53,26 +52,16 @@ public: void Thread(void); void Task(void); - -protected: - // packet decoding helpers - bool IsValidAmbePacket(const CBuffer &, uint8_t *); - - // packet encoding helpers - void EncodeAmbePacket(CBuffer *, const uint8_t *); - - protected: // data - uint16_t m_uiStreamId; - uint16_t m_uiPort; - uint8_t m_uiPid; - ECodecType m_eCodecIn; - - // socket - CIp m_Ip; - CUdpSocket m_Socket; - bool m_bConnected; + uint16_t m_uiStreamId; + uint16_t m_uiPort; + uint8_t m_uiPid; + ECodecType m_eCodecIn; + + // sockets + std::shared_ptr m_TCReader; + CUnixDgramWriter m_TCWriter; // associated packet stream CPacketStream *m_PacketStream; diff --git a/reflector/DCSProtocol.cpp b/reflector/DCSProtocol.cpp index 52103cd..34b8b2c 100644 --- a/reflector/DCSProtocol.cpp +++ b/reflector/DCSProtocol.cpp @@ -396,12 +396,12 @@ bool CDcsProtocol::IsValidDvPacket(const CBuffer &Buffer, std::unique_ptr(new CDvLastFramePacket((struct dstar_dvframe *)&(Buffer.data()[46]), *((uint16_t *)&(Buffer.data()[43])), Buffer.data()[45])); + frame = std::unique_ptr(new CDvLastFramePacket((SDstarFrame *)&(Buffer.data()[46]), *((uint16_t *)&(Buffer.data()[43])), Buffer.data()[45])); } else { // it's a regular DV frame - frame = std::unique_ptr(new CDvFramePacket((struct dstar_dvframe *)&(Buffer.data()[46]), *((uint16_t *)&(Buffer.data()[43])), Buffer.data()[45])); + frame = std::unique_ptr(new CDvFramePacket((SDstarFrame *)&(Buffer.data()[46]), *((uint16_t *)&(Buffer.data()[43])), Buffer.data()[45])); } // check validity of packets diff --git a/reflector/DExtraProtocol.cpp b/reflector/DExtraProtocol.cpp index 49d4a87..ef60895 100644 --- a/reflector/DExtraProtocol.cpp +++ b/reflector/DExtraProtocol.cpp @@ -507,7 +507,7 @@ bool CDextraProtocol::IsValidDvFramePacket(const CBuffer &Buffer, std::unique_pt if ( 27==Buffer.size() && 0==Buffer.Compare((uint8_t *)"DSVT", 4) && 0x20U==Buffer.data()[4] && 0x20U==Buffer.data()[8] && 0U==(Buffer.data()[14] & 0x40U) ) { // create packet - dvframe = std::unique_ptr(new CDvFramePacket((struct dstar_dvframe *)&(Buffer.data()[15]), *((uint16_t *)&(Buffer.data()[12])), Buffer.data()[14])); + dvframe = std::unique_ptr(new CDvFramePacket((SDstarFrame *)&(Buffer.data()[15]), *((uint16_t *)&(Buffer.data()[12])), Buffer.data()[14])); // check validity of packet if ( dvframe && dvframe->IsValid() ) return true; @@ -520,7 +520,7 @@ bool CDextraProtocol::IsValidDvLastFramePacket(const CBuffer &Buffer, std::uniqu if ( 27==Buffer.size() && 0==Buffer.Compare((uint8_t *)"DSVT", 4) && 0x20U==Buffer.data()[4] && 0x20U==Buffer.data()[8] && (Buffer.data()[14] & 0x40) ) { // create packet - dvframe = std::unique_ptr(new CDvLastFramePacket((struct dstar_dvframe *)&(Buffer.data()[15]), *((uint16_t *)&(Buffer.data()[12])), Buffer.data()[14])); + dvframe = std::unique_ptr(new CDvLastFramePacket((SDstarFrame *)&(Buffer.data()[15]), *((uint16_t *)&(Buffer.data()[12])), Buffer.data()[14])); // check validity of packet if ( dvframe && dvframe->IsValid() ) return true; diff --git a/reflector/DPlusProtocol.cpp b/reflector/DPlusProtocol.cpp index 6cb8544..7cf3e02 100644 --- a/reflector/DPlusProtocol.cpp +++ b/reflector/DPlusProtocol.cpp @@ -431,7 +431,7 @@ bool CDplusProtocol::IsValidDvFramePacket(const CBuffer &Buffer, std::unique_ptr if ( 29==Buffer.size() && 0x1DU==Buffer.data()[0] && 0x80U==Buffer.data()[1] && 0==Buffer.Compare((uint8_t *)"DSVT", 2, 4) && 0x20U==Buffer.data()[6] && 0x20U==Buffer.data()[10] ) { // create packet - dvframe = std::unique_ptr(new CDvFramePacket((struct dstar_dvframe *)&(Buffer.data()[17]), *((uint16_t *)&(Buffer.data()[14])), Buffer.data()[16])); + dvframe = std::unique_ptr(new CDvFramePacket((SDstarFrame *)&(Buffer.data()[17]), *((uint16_t *)&(Buffer.data()[14])), Buffer.data()[16])); // check validity of packet if ( dvframe && dvframe->IsValid() ) return true; @@ -444,7 +444,7 @@ bool CDplusProtocol::IsValidDvLastFramePacket(const CBuffer &Buffer, std::unique if ( 32==Buffer.size() && 0==Buffer.Compare((uint8_t *)"DSVT", 2, 4) && 0x20U==Buffer.data()[0] && 0x80U==Buffer.data()[1] && 0x20U==Buffer.data()[6] && 0x20U==Buffer.data()[10] ) { // create packet - dvframe = std::unique_ptr(new CDvLastFramePacket((struct dstar_dvframe *)&(Buffer.data()[17]), *((uint16_t *)&(Buffer.data()[14])), Buffer.data()[16])); + dvframe = std::unique_ptr(new CDvLastFramePacket((SDstarFrame *)&(Buffer.data()[17]), *((uint16_t *)&(Buffer.data()[14])), Buffer.data()[16])); // check validity of packet if ( dvframe && dvframe->IsValid() ) return true; diff --git a/reflector/DVFramePacket.cpp b/reflector/DVFramePacket.cpp index 1975054..53a985d 100644 --- a/reflector/DVFramePacket.cpp +++ b/reflector/DVFramePacket.cpp @@ -25,25 +25,27 @@ CDvFramePacket::CDvFramePacket() { - memset(m_uiAmbe, 0, AMBE_SIZE); - memset(m_uiDvData, 0, DVDATA_SIZE); - memset(m_uiAmbePlus, 0, AMBEPLUS_SIZE); - memset(m_uiDvSync, 0, DVSYNC_SIZE); - memset(m_uiCodec2, 0, 16); + memset(m_TCPack.dstar, 0, 9); + memset(m_uiDvData, 0, 3); + memset(m_TCPack.dmr, 0, 9); + memset(m_uiDvSync, 0, 7); + memset(m_TCPack.m17, 0, 16); memset(m_Nonce, 0, 14); + m_TCPack.codec_in = ECodecType::none; }; // dstar constructor -CDvFramePacket::CDvFramePacket(const struct dstar_dvframe *dvframe, uint16_t sid, uint8_t pid) +CDvFramePacket::CDvFramePacket(const SDStarFrame *dvframe, uint16_t sid, uint8_t pid) : CPacket(sid, pid) { - memcpy(m_uiAmbe, dvframe->AMBE, AMBE_SIZE); - memcpy(m_uiDvData, dvframe->DVDATA, DVDATA_SIZE); - memset(m_uiAmbePlus, 0, AMBEPLUS_SIZE); - memset(m_uiDvSync, 0, DVSYNC_SIZE); - memset(m_uiCodec2, 0, 16); + memcpy(m_TCPack.dstar, dvframe->AMBE, 9); + memcpy(m_uiDvData, dvframe->DVDATA, 3); + memset(m_TCPack.dmr, 0, 9); + memset(m_uiDvSync, 0, 7); + memset(m_TCPack.m17, 0, 16); memset(m_Nonce, 0, 14); + m_TCPack.codec_in = ECodecType::dstar; } // dmr constructor @@ -51,12 +53,13 @@ CDvFramePacket::CDvFramePacket(const struct dstar_dvframe *dvframe, uint16_t sid CDvFramePacket::CDvFramePacket(const uint8_t *ambe, const uint8_t *sync, uint16_t sid, uint8_t pid, uint8_t spid) : CPacket(sid, pid, spid) { - memcpy(m_uiAmbePlus, ambe, AMBEPLUS_SIZE); - memcpy(m_uiDvSync, sync, DVSYNC_SIZE); - memset(m_uiAmbe, 0, AMBE_SIZE); - memset(m_uiDvData, 0, DVDATA_SIZE); - memset(m_uiCodec2, 0, 16); + memcpy(m_TCPack.dmr, ambe, 9); + memcpy(m_uiDvSync, sync, 7); + memset(m_TCPack.dstar, 0, 9); + memset(m_uiDvData, 0, 3); + memset(m_TCPack.m17, 0, 16); memset(m_Nonce, 0, 14); + m_TCPack.codec_in = ECodecType::dmr; } // ysf constructor @@ -64,12 +67,13 @@ CDvFramePacket::CDvFramePacket(const uint8_t *ambe, const uint8_t *sync, uint16_ CDvFramePacket::CDvFramePacket(const uint8_t *ambe, uint16_t sid, uint8_t pid, uint8_t spid, uint8_t fid) : CPacket(sid, pid, spid, fid) { - memcpy(m_uiAmbePlus, ambe, AMBEPLUS_SIZE); - memset(m_uiDvSync, 0, DVSYNC_SIZE); - memset(m_uiAmbe, 0, AMBE_SIZE); - memset(m_uiDvData, 0, DVDATA_SIZE); - memset(m_uiCodec2, 0, 16); + memcpy(m_TCPack.dmr, ambe, 9); + memset(m_uiDvSync, 0, 7); + memset(m_TCPack.dstar, 0, 9); + memset(m_uiDvData, 0, 3); + memset(m_TCPack.m17, 0, 16); memset(m_Nonce, 0, 14); + m_TCPack.codec_in = ECodecType::dmr; } // xlx constructor @@ -80,22 +84,35 @@ CDvFramePacket::CDvFramePacket uint8_t dmrpid, uint8_t dprspid, const uint8_t *dmrambe, const uint8_t *dmrsync, ECodecType codecInType, const uint8_t *codec2, const uint8_t * nonce) : CPacket(sid, dstarpid, dmrpid, dprspid, 0xFF, 0xFF, 0xFF, codecInType) { - memcpy(m_uiAmbe, dstarambe, AMBE_SIZE); - memcpy(m_uiDvData, dstardvdata, DVDATA_SIZE); - memcpy(m_uiAmbePlus, dmrambe, AMBEPLUS_SIZE); - memcpy(m_uiDvSync, dmrsync, DVSYNC_SIZE); - memcpy(m_uiCodec2, codec2, 16); + memcpy(m_TCPack.dstar, dstarambe, 9); + memcpy(m_uiDvData, dstardvdata, 3); + memcpy(m_TCPack.dmr, dmrambe, 9); + memcpy(m_uiDvSync, dmrsync, 7); + memcpy(m_TCPack.m17, codec2, 16); memcpy(m_Nonce, nonce, 14); + m_TCPack.codec_in = codecInType; } CDvFramePacket::CDvFramePacket(const CM17Packet &m17) : CPacket(m17) { - memset(m_uiAmbe, 0, AMBE_SIZE); - memset(m_uiDvData, 0, DVDATA_SIZE); - memset(m_uiAmbePlus, 0, AMBEPLUS_SIZE); - memset(m_uiDvSync, 0, DVSYNC_SIZE); - memcpy(m_uiCodec2, m17.GetPayload(), 16); + memset(m_TCPack.dstar, 0, 9); + memset(m_uiDvData, 0, 3); + memset(m_TCPack.dmr, 0, 9); + memset(m_uiDvSync, 0, 7); + memcpy(m_TCPack.m17, m17.GetPayload(), 16); memcpy(m_Nonce, m17.GetNonce(), 14); + switch (0x6U & m17.GetFrameType()) + { + case 0x4U: + m_TCPack.codec_in = ECodecType::c2_3200; + break; + case 0x6U: + m_TCPack.codec_in = ECodecType::c2_1600; + break; + default: + m_TCPack.codec_in = ECodecType::none; + break; + } } //////////////////////////////////////////////////////////////////////////////////////// @@ -114,12 +131,12 @@ const uint8_t *CDvFramePacket::GetCodecData(ECodecType type) const switch (type) { case ECodecType::dstar: - return m_uiAmbe; + return m_TCPack.dstar; case ECodecType::dmr: - return m_uiAmbePlus; + return m_TCPack.dmr; case ECodecType::c2_1600: case ECodecType::c2_3200: - return m_uiCodec2; + return m_TCPack.m17; default: return nullptr; } @@ -130,39 +147,42 @@ const uint8_t *CDvFramePacket::GetCodecData(ECodecType type) const void CDvFramePacket::SetDvData(uint8_t *DvData) { - memcpy(m_uiDvData, DvData, DVDATA_SIZE); + memcpy(m_uiDvData, DvData, 3); } -void CDvFramePacket::SetCodecData(ECodecType type, uint8_t *data) +void CDvFramePacket::SetCodecData(ECodecType type, const uint8_t *data) { switch (type) { case ECodecType::dstar: - memcpy(m_uiAmbe, data, AMBE_SIZE); + memcpy(m_TCPack.dstar, data, 9); break; case ECodecType::dmr: - memcpy(m_uiAmbePlus, data, DVDATA_SIZE); + memcpy(m_TCPack.dmr, data, 3); break; case ECodecType::c2_1600: - memcpy(m_uiCodec2, data, 8); + memcpy(m_TCPack.m17, data, 8); break; case ECodecType::c2_3200: - memcpy(m_uiCodec2, data, 16); + memcpy(m_TCPack.m17, data, 16); break; } } +void CDvFramePacket::SetCodecData(const STCPacket *pack) +{ + memcpy(&m_TCPack, pack, sizeof(STCPacket)); +} + //////////////////////////////////////////////////////////////////////////////////////// // operators bool CDvFramePacket::operator ==(const CDvFramePacket &DvFrame) const { - return ( (memcmp(m_uiAmbe, DvFrame.m_uiAmbe, AMBE_SIZE) == 0) - && (memcmp(m_uiDvData, DvFrame.m_uiDvData, DVDATA_SIZE) == 0) -#ifndef NO_XLX - && (memcmp(m_uiAmbePlus, DvFrame.m_uiAmbePlus, AMBEPLUS_SIZE) == 0) - && (memcmp(m_uiDvSync, DvFrame.m_uiDvSync, DVSYNC_SIZE) == 0) -#endif + return ( (memcmp(m_TCPack.dstar, DvFrame.m_TCPack.dstar, 9) == 0) + && (memcmp(m_uiDvData, DvFrame.m_uiDvData, 3) == 0) + && (memcmp(m_TCPack.dmr, DvFrame.m_TCPack.dmr, 9) == 0) + && (memcmp(m_uiDvSync, DvFrame.m_uiDvSync, 7) == 0) ); } diff --git a/reflector/DVFramePacket.h b/reflector/DVFramePacket.h index 19387a6..2d13045 100644 --- a/reflector/DVFramePacket.h +++ b/reflector/DVFramePacket.h @@ -23,18 +23,12 @@ //////////////////////////////////////////////////////////////////////////////////////// // defines -#define AMBE_SIZE 9 -#define DVDATA_SIZE 3 - -#define AMBEPLUS_SIZE 9 -#define DVSYNC_SIZE 7 - // typedef & structures -struct __attribute__ ((__packed__))dstar_dvframe +using SDStarFrame = struct __attribute__ ((__packed__))dstar_dvframe_tag { - uint8_t AMBE[AMBE_SIZE]; - uint8_t DVDATA[DVDATA_SIZE]; + uint8_t AMBE[9]; + uint8_t DVDATA[3]; }; //////////////////////////////////////////////////////////////////////////////////////// @@ -46,7 +40,7 @@ class CDvFramePacket : public CPacket public: // constructor CDvFramePacket(); - CDvFramePacket(const struct dstar_dvframe *, uint16_t, uint8_t); + CDvFramePacket(const SDStarFrame *, uint16_t, uint8_t); CDvFramePacket(const uint8_t *, const uint8_t *, uint16_t, uint8_t, uint8_t); CDvFramePacket(const uint8_t *, uint16_t, uint8_t, uint8_t, uint8_t); CDvFramePacket(uint16_t, uint8_t, const uint8_t *, const uint8_t *, uint8_t, uint8_t, const uint8_t *, const uint8_t *, ECodecType, const uint8_t *, const uint8_t *); @@ -60,6 +54,7 @@ public: bool HasTranscodableAmbe(void) const { return true; } // get + const STCPacket *GetCodecPacket() const { return &m_TCPack; } const uint8_t *GetCodecData(ECodecType) const; const uint8_t *GetDvSync(void) const { return m_uiDvSync; } const uint8_t *GetDvData(void) const { return m_uiDvData; } @@ -67,19 +62,19 @@ public: // set void SetDvData(uint8_t *); - void SetCodecData(ECodecType, uint8_t *); + void SetCodecData(ECodecType, const uint8_t *); + void SetCodecData(const STCPacket *pack); // operators bool operator ==(const CDvFramePacket &) const; protected: // data (dstar) - uint8_t m_uiAmbe[AMBE_SIZE]; - uint8_t m_uiDvData[DVDATA_SIZE]; + uint8_t m_uiDvData[3]; // data (dmr) - uint8_t m_uiAmbePlus[AMBEPLUS_SIZE]; - uint8_t m_uiDvSync[DVSYNC_SIZE]; - - uint8_t m_uiCodec2[16]; + uint8_t m_uiDvSync[7]; + // m17 uint8_t m_Nonce[14]; + // the transcoder packet + STCPacket m_TCPack; }; diff --git a/reflector/DVLastFramePacket.cpp b/reflector/DVLastFramePacket.cpp index 2bffede..c9f5edf 100644 --- a/reflector/DVLastFramePacket.cpp +++ b/reflector/DVLastFramePacket.cpp @@ -29,7 +29,7 @@ CDvLastFramePacket::CDvLastFramePacket() // dstar constructor -CDvLastFramePacket::CDvLastFramePacket(const struct dstar_dvframe *DvFrame, uint16_t sid, uint8_t pid) +CDvLastFramePacket::CDvLastFramePacket(const SDstarFrame *DvFrame, uint16_t sid, uint8_t pid) : CDvFramePacket(DvFrame, sid, pid) { } diff --git a/reflector/DVLastFramePacket.h b/reflector/DVLastFramePacket.h index 44c2f51..6073178 100644 --- a/reflector/DVLastFramePacket.h +++ b/reflector/DVLastFramePacket.h @@ -32,7 +32,7 @@ class CDvLastFramePacket : public CDvFramePacket public: // constructor CDvLastFramePacket(); - CDvLastFramePacket(const struct dstar_dvframe *, uint16_t, uint8_t); + CDvLastFramePacket(const SDstarFrame *, uint16_t, uint8_t); CDvLastFramePacket(const uint8_t *, const uint8_t *, uint16_t, uint8_t, uint8_t); CDvLastFramePacket(const uint8_t *, uint16_t, uint8_t, uint8_t, uint8_t); CDvLastFramePacket(uint16_t, uint8_t, const uint8_t *, const uint8_t *, uint8_t, uint8_t, const uint8_t *, const uint8_t *, ECodecType, const uint8_t *); diff --git a/reflector/Defines.h b/reflector/Defines.h new file mode 100644 index 0000000..ce2e07d --- /dev/null +++ b/reflector/Defines.h @@ -0,0 +1,32 @@ +#pragma once + +// urfd -- The universal reflector +// Copyright © 2021 Thomas A. Early N7TAE +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +#include + +enum class ECodecType { none, dstar, dmr, c2_1600, c2_3200 }; + +using STCPacket = struct tcpacket_tag { + char module; + bool is_last_packet; + bool has_valid_m17; + uint16_t streamid; + ECodecType codec_in; + uint8_t dstar[9]; + uint8_t dmr[9]; + uint8_t m17[16]; +}; diff --git a/reflector/G3Protocol.cpp b/reflector/G3Protocol.cpp index ec3ee4e..bbbc471 100644 --- a/reflector/G3Protocol.cpp +++ b/reflector/G3Protocol.cpp @@ -606,7 +606,7 @@ bool CG3Protocol::IsValidDvFramePacket(const CBuffer &Buffer, std::unique_ptr(new CDvFramePacket((struct dstar_dvframe *)&(Buffer.data()[15]), *((uint16_t *)&(Buffer.data()[12])), Buffer.data()[14])); + dvframe = std::unique_ptr(new CDvFramePacket((SDstarFrame *)&(Buffer.data()[15]), *((uint16_t *)&(Buffer.data()[12])), Buffer.data()[14])); // check validity of packet if ( dvframe && dvframe->IsValid() ) return true; @@ -619,7 +619,7 @@ bool CG3Protocol::IsValidDvLastFramePacket(const CBuffer &Buffer, std::unique_pt if ( 27==Buffer.size() && 0==Buffer.Compare((uint8_t *)"DSVT", 4) && 0x20U==Buffer.data()[4] && 0x20U==Buffer.data()[8] && (Buffer.data()[14] & 0x40U) ) { // create packet - dvframe = std::unique_ptr(new CDvLastFramePacket((struct dstar_dvframe *)&(Buffer.data()[15]), *((uint16_t *)&(Buffer.data()[12])), Buffer.data()[14])); + dvframe = std::unique_ptr(new CDvLastFramePacket((SDstarFrame *)&(Buffer.data()[15]), *((uint16_t *)&(Buffer.data()[12])), Buffer.data()[14])); // check validity of packet if ( dvframe && dvframe->IsValid() ) return true; diff --git a/reflector/Packet.h b/reflector/Packet.h index 4335e92..a232024 100644 --- a/reflector/Packet.h +++ b/reflector/Packet.h @@ -20,11 +20,10 @@ // Origin Id -enum class ECodecType { none, dstar, dmr, c2_1600, c2_3200 }; - #define ORIGIN_LOCAL 0 #define ORIGIN_PEER 1 +#include "Defines.h" #include "M17Packet.h" class CPacket diff --git a/reflector/PacketStream.cpp b/reflector/PacketStream.cpp index 1705c45..4c4e808 100644 --- a/reflector/PacketStream.cpp +++ b/reflector/PacketStream.cpp @@ -22,7 +22,7 @@ //////////////////////////////////////////////////////////////////////////////////////// // constructor -CPacketStream::CPacketStream() +CPacketStream::CPacketStream(std::shared_ptr reader) { m_bOpen = false; m_uiStreamId = 0; @@ -30,6 +30,7 @@ CPacketStream::CPacketStream() m_OwnerClient = nullptr; #ifdef TRANSCODED_MODULES m_CodecStream = nullptr; + m_TCReader = reader; #endif } @@ -52,9 +53,7 @@ bool CPacketStream::OpenPacketStream(const CDvHeaderPacket &DvHeader, std::share m_LastPacketTime.start(); #ifdef TRANSCODED_MODULES if (std::string::npos != std::string(TRANSCODED_MODULES).find(DvHeader.GetRpt2Module())) - m_CodecStream = g_Transcoder.GetCodecStream(this, client->GetCodec()); - else - m_CodecStream = g_Transcoder.GetCodecStream(this, ECodecType::none); + m_CodecStream = std::make_shared(this, client->GetCodec(), m_TCReader); #endif ok = true; } @@ -89,7 +88,7 @@ void CPacketStream::Push(std::unique_ptr Packet) m_CodecStream->Lock(); { // transcoder ready & frame need transcoding ? - if ( m_CodecStream->IsConnected() && Packet->HasTranscodableAmbe() ) + if (Packet->HasTranscodableAmbe()) { // yes, push packet to trancoder queue // trancoder will push it after transcoding diff --git a/reflector/PacketStream.h b/reflector/PacketStream.h index ebc1883..4ebbd72 100644 --- a/reflector/PacketStream.h +++ b/reflector/PacketStream.h @@ -31,11 +31,13 @@ //////////////////////////////////////////////////////////////////////////////////////// // class +#include "UnixDgramSocket.h" + class CPacketStream : public CPacketQueue { public: // constructor - CPacketStream(); + CPacketStream(std::shared_ptr); // open / close bool OpenPacketStream(const CDvHeaderPacket &, std::shared_ptr); @@ -63,6 +65,7 @@ protected: CDvHeaderPacket m_DvHeader; std::shared_ptr m_OwnerClient; #ifdef TRANSCODED_MODULES + std::shared_ptr m_TCReader; std::shared_ptr m_CodecStream; #endif }; diff --git a/reflector/Reflector.cpp b/reflector/Reflector.cpp index c2bf409..57a08dc 100644 --- a/reflector/Reflector.cpp +++ b/reflector/Reflector.cpp @@ -93,7 +93,15 @@ bool CReflector::Start(void) // start one thread per reflector module for (auto it=m_Modules.cbegin(); it!=m_Modules.cend(); it++) { - m_Stream[*it] = std::make_shared(); + m_TCReader[*it] = std::make_shared(); + std::string readername("TCtoRef"); + readername.append(1, *it); + if (m_TCReader[*it]->Open(readername.c_str())) + { + std::cerr << "ERROR: Reflector can't open " << readername << std::endl; + m_TCReader[*it].reset(); + } + m_Stream[*it] = std::make_shared(m_TCReader[*it]); m_RouterFuture[*it] = std::async(std::launch::async, &CReflector::RouterThread, this, *it); } diff --git a/reflector/Reflector.h b/reflector/Reflector.h index e83bd91..2f95fc0 100644 --- a/reflector/Reflector.h +++ b/reflector/Reflector.h @@ -24,6 +24,9 @@ #include "Protocols.h" #include "PacketStream.h" #include "NotificationQueue.h" +#ifdef TRANSCODED_MODULES +#include "UnixDgramSocket.h" +#endif //////////////////////////////////////////////////////////////////////////////////////// @@ -126,6 +129,7 @@ protected: // queues std::unordered_map> m_Stream; + std::unordered_map> m_TCReader; // threads std::atomic keep_running; diff --git a/reflector/UnixDgramSocket.cpp b/reflector/UnixDgramSocket.cpp new file mode 100644 index 0000000..c4ea4ba --- /dev/null +++ b/reflector/UnixDgramSocket.cpp @@ -0,0 +1,161 @@ +// urfd -- The universal reflector +// Copyright © 2021 Thomas A. Early N7TAE +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "UnixDgramSocket.h" + +CUnixDgramReader::CUnixDgramReader() : fd(-1) {} + +CUnixDgramReader::~CUnixDgramReader() +{ + Close(); +} + +bool CUnixDgramReader::Open(const char *path) // returns true on failure +{ + fd = socket(AF_UNIX, SOCK_DGRAM, 0); + if (fd < 0) + { + std::cerr << "socket() failed for " << path << ": " << strerror(errno) << std::endl; + return true; + } + //fcntl(fd, F_SETFL, O_NONBLOCK); + + struct sockaddr_un addr; + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path+1, path, sizeof(addr.sun_path)-2); + + int rval = bind(fd, (struct sockaddr *)&addr, sizeof(addr)); + if (rval < 0) + { + std::cerr << "bind() failed for " << path << ": " << strerror(errno) << std::endl; + close(fd); + fd = -1; + return true; + } + return false; +} + +bool CUnixDgramReader::Receive(STCPacket *pack, unsigned timeout) const +{ + // socket valid ? + if ( 0 > fd ) + return true; + + // control socket + fd_set FdSet; + FD_ZERO(&FdSet); + FD_SET(fd, &FdSet); + struct timeval tv; + tv.tv_sec = timeout / 1000; + tv.tv_usec = (timeout % 1000) * 1000; + + auto rval = select(fd + 1, &FdSet, 0, 0, &tv); + if (rval <= 0) { + if (rval < 0) { + std::cerr << "select() error on transcoder socket: " << strerror(errno) << std::endl; + } + return true; + } + + auto len = read(fd, pack, sizeof(STCPacket)); + if (len != sizeof(STCPacket)) { + std::cerr << "Received transcoder packet is wrong size: " << len << " but should be " << sizeof(STCPacket) << std::endl; + return true; + } + + return false; +} + +void CUnixDgramReader::Close() +{ + if (fd >= 0) + close(fd); + fd = -1; +} + +int CUnixDgramReader::GetFD() const +{ + return fd; +} + +CUnixDgramWriter::CUnixDgramWriter() {} + +CUnixDgramWriter::~CUnixDgramWriter() {} + +void CUnixDgramWriter::SetUp(const char *path) // returns true on failure +{ + // setup the socket address + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path+1, path, sizeof(addr.sun_path)-2); +} + +bool CUnixDgramWriter::Send(const STCPacket *pack) const +{ + auto len = Write(pack, sizeof(STCPacket)); + + if (len != sizeof(STCPacket)) + return true; + + return false; +} + +ssize_t CUnixDgramWriter::Write(const void *buf, ssize_t size) const +{ + // open the socket + int fd = socket(AF_UNIX, SOCK_DGRAM, 0); + if (fd < 0) + { + std::cerr << "socket() failed for " << addr.sun_path+1 << ": " << strerror(errno) << std::endl; + return -1; + } + // connect to the receiver + int rval = connect(fd, (struct sockaddr *)&addr, sizeof(addr)); + if (rval < 0) + { + std::cerr << "connect() failed for " << addr.sun_path+1 << ": " << strerror(errno) << std::endl; + close(fd); + return -1; + } + + auto written = write(fd, buf, size); + if (written != size) { + std::cerr << "write on " << addr.sun_path+1; + if (written < 0) + std::cerr << " returned error: " << strerror(errno) << std::endl; + else if (written == 0) + std::cerr << " returned zero" << std::endl; + else + std::cerr << " only wrote " << written << " bytes, should be " << size << std::endl; + } + + close(fd); + return written; +} diff --git a/reflector/UnixDgramSocket.h b/reflector/UnixDgramSocket.h new file mode 100644 index 0000000..ad84d51 --- /dev/null +++ b/reflector/UnixDgramSocket.h @@ -0,0 +1,50 @@ +#pragma once + +// urfd -- The universal reflector +// Copyright © 2021 Thomas A. Early N7TAE +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +#pragma once + +#include +#include + +#include "Defines.h" + +class CUnixDgramReader +{ +public: + CUnixDgramReader(); + ~CUnixDgramReader(); + bool Open(const char *path); + bool Receive(STCPacket *pack, unsigned timeout) const; + void Close(); + int GetFD() const; +private: + int fd; +}; + +class CUnixDgramWriter +{ +public: + CUnixDgramWriter(); + ~CUnixDgramWriter(); + void SetUp(const char *path); + bool Send(const STCPacket *pack) const; +private: + ssize_t Write(const void *buf, ssize_t size) const; + + struct sockaddr_un addr; +}; diff --git a/reflector/UnixPacketSocket.cpp b/reflector/UnixPacketSocket.cpp deleted file mode 100644 index ba105c0..0000000 --- a/reflector/UnixPacketSocket.cpp +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Copyright (C) 2020 by Thomas Early N7TAE - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#include -#include -#include -#include -#include -#include -#include - -#include "UnixPacketSock.h" - -CUnixPacket::CUnixPacket() : m_fd(-1), m_host(NULL) {} - -ssize_t CUnixPacket::Read(void *buffer, const ssize_t size) -{ - if (0 > m_fd) - return -1; - ssize_t len = read(m_fd, buffer, size); - if (len < 1) - { - if (-1 == len) - { - std::cerr << "Read error on '" << m_name << "': " << strerror(errno) << std::endl; - } - else if (0 == len) - { - std::cerr << "Read error on '" << m_name << "': EOF" << std::endl; - } - if (Restart()) - return -1; - else - return 0; - } - return len; -} - -bool CUnixPacket::Write(const void *buffer, const ssize_t size) -{ - if (0 > m_fd) - return true; - ssize_t written = write(m_fd, buffer, size); - if (written != size) - { - if (-1 == written) - { - std::cerr << "Write error on '" << m_name << "': " << strerror(errno) << std::endl; - } - else - { - std::cout << "Write error on '" << m_name << "': Only wrote " << written << " of " << size << " bytes" << std::endl; - } - return Restart(); - } - return false; -} - -bool CUnixPacket::Restart() -{ - if (! m_host->IsRunning()) - return true; - std::cout << "Restarting '" << m_name << "'... " << std::endl; - Close(); - std::string name(m_name); - return Open(name.c_str(), m_host); -} - -int CUnixPacket::GetFD() -{ - return m_fd; -} - -CUnixPacketServer::CUnixPacketServer() : m_server(-1) {} - -CUnixPacketServer::~CUnixPacketServer() -{ - Close(); -} - -bool CUnixPacketServer::Open(const char *name, CKRBase *host) -{ - m_server = socket(AF_UNIX, SOCK_SEQPACKET, 0); - m_host = host; - if (m_server < 0) - { - std::cerr << "Cannot open '" << name << "' socket: " << strerror(errno) << std::endl; - return true; - } - - struct sockaddr_un addr; - memset(&addr, 0, sizeof(addr)); - addr.sun_family = AF_UNIX; - memcpy(addr.sun_path+1, name, strlen(name)); - if (-1 == bind(m_server, (struct sockaddr *)&addr, sizeof(addr))) - { - std::cerr << "Cannot bind '" << name << "' socket: " << strerror(errno) << std::endl; - Close(); - return true; - } - - if (-1 == listen(m_server, 1)) - { - std::cerr << "Cannot listen on '" << name << "' socket: " << strerror(errno) << std::endl; - Close(); - return true; - } - - m_fd = accept(m_server, nullptr, 0); - if (m_fd < 0) - { - std::cerr << "Cannot accept on '" << name << "' socket: " << strerror(errno) << std::endl; - Close(); - return true; - } - - strncpy(m_name, name, 108); - return false; -} - -void CUnixPacketServer::Close() -{ - if (m_server >= 0) - { - close(m_server); - m_server = -1; - } - if (m_fd >= 0) - { - close(m_fd); - m_fd = -1; - } -} - -CUnixPacketClient::~CUnixPacketClient() -{ - Close(); -} - -bool CUnixPacketClient::Open(const char *name, CKRBase *host) -{ - m_fd = socket(AF_UNIX, SOCK_SEQPACKET, 0); - if (m_fd < 0) - { - std::cerr << "Cannot open unix client socket " << name << std::endl; - return true; - } - - struct sockaddr_un addr; - memset(&addr, 0, sizeof(addr)); - addr.sun_family = AF_UNIX; - memcpy(addr.sun_path+1, name, strlen(name)); - int rval = -1; - int tries = 0; - while (rval < 0) - { - rval = connect(m_fd, (struct sockaddr *)&addr, sizeof(addr)); - if (rval < 0) - { - if (ECONNREFUSED == errno) - { - if (0 == tries++ % 20) - std::cout << "Waiting for " << name << " server to start..." << std::endl; - std::this_thread::sleep_for(std::chrono::milliseconds(250)); - } - else - { - std::cerr << "Cannot connect '" << name << "' socket: " << strerror(errno) << std::endl; - Close(); - return true; - } - } - if (! m_host->IsRunning()) - { - Close(); - return true; - } - } - - m_host = host; - strncpy(m_name, name, 108); - return false; -} - -void CUnixPacketClient::Close() -{ - if (m_fd >= 0) - { - close(m_fd); - m_fd = -1; - } -} diff --git a/reflector/UnixPacketSocket.h b/reflector/UnixPacketSocket.h deleted file mode 100644 index 6d7dacd..0000000 --- a/reflector/UnixPacketSocket.h +++ /dev/null @@ -1,58 +0,0 @@ -#pragma once - -/* - * Copyright (C) 2020 by Thomas Early N7TAE - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#include - -#include "KRBase.h" - -class CUnixPacket -{ -public: - CUnixPacket(); - virtual bool Open(const char *name, CKRBase *host) = 0; - virtual void Close() = 0; - bool Write(const void *buffer, const ssize_t size); - ssize_t Read(void *buffer, const ssize_t size); - int GetFD(); -protected: - bool Restart(); - int m_fd; - CKRBase *m_host; - char m_name[108]; -}; - -class CUnixPacketServer : public CUnixPacket -{ -public: - CUnixPacketServer(); - ~CUnixPacketServer(); - bool Open(const char *name, CKRBase *host); - void Close(); -protected: - int m_server; -}; - -class CUnixPacketClient : public CUnixPacket -{ -public: - ~CUnixPacketClient(); - bool Open(const char *name, CKRBase *host); - void Close(); -};