diff --git a/reflector/BMProtocol.cpp b/reflector/BMProtocol.cpp index d348695..4749573 100644 --- a/reflector/BMProtocol.cpp +++ b/reflector/BMProtocol.cpp @@ -28,7 +28,7 @@ //////////////////////////////////////////////////////////////////////////////////////// // operation -bool CBMProtocol::Initialize(const char *type, EProtocol ptype, const uint16_t port, const bool has_ipv4, const bool has_ipv6) +bool CBMProtocol::Initialize(const char *type, const EProtocol ptype, const uint16_t port, const bool has_ipv4, const bool has_ipv6) { if (! CProtocol::Initialize(type, ptype, port, has_ipv4, has_ipv6)) return false; @@ -234,15 +234,10 @@ void CBMProtocol::HandleQueue(void) case EProtoRev::ambe: default: #ifdef TRANSCODED_MODULES - if ( g_Transcoder.IsConnected() ) - { - Send(buffer, client->GetIp()); - } - else + Send(buffer, client->GetIp()); +#else + Send(bufferLegacy, client->GetIp()); #endif - { - Send(bufferLegacy, client->GetIp()); - } break; } } @@ -382,7 +377,7 @@ void CBMProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, c if ( (stream = g_Reflector.OpenStream(Header, client)) != nullptr ) { // keep the handle - m_Streams.push_back(stream); + m_Streams[stream->GetStreamId()] = stream; } // get origin peer = client->GetCallsign(); diff --git a/reflector/BMProtocol.h b/reflector/BMProtocol.h index 930a5d0..94ebe43 100644 --- a/reflector/BMProtocol.h +++ b/reflector/BMProtocol.h @@ -34,7 +34,7 @@ class CBMProtocol : public CProtocol { public: // initialization - bool Initialize(const char *type, EProtocol ptype, const uint16_t port, const bool has_ipv4, const bool has_ipv6); + bool Initialize(const char *type, const EProtocol ptype, const uint16_t port, const bool has_ipv4, const bool has_ipv6); // task void Task(void); diff --git a/reflector/Callsign.cpp b/reflector/Callsign.cpp index 2ce6047..2d1aafe 100644 --- a/reflector/Callsign.cpp +++ b/reflector/Callsign.cpp @@ -24,8 +24,7 @@ #include "Callsign.h" // if a client is using special characters ',', '-' or '/', he's out of luck! -//#define M17CHARACTERS " ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-/." -#define M17CHARACTERS " ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" +#define M17CHARACTERS " ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-/." //////////////////////////////////////////////////////////////////////////////////////// // constructors diff --git a/reflector/Client.h b/reflector/Client.h index 18f354d..ec66e11 100644 --- a/reflector/Client.h +++ b/reflector/Client.h @@ -24,7 +24,7 @@ #include "Buffer.h" #include "Packet.h" -enum class EProtoRev { original, revised, ambe, urf }; +enum class EProtoRev { original, revised, ambe }; class CClient { diff --git a/reflector/DCSProtocol.cpp b/reflector/DCSProtocol.cpp index a5d68bc..46bd810 100644 --- a/reflector/DCSProtocol.cpp +++ b/reflector/DCSProtocol.cpp @@ -202,7 +202,7 @@ void CDcsProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, if ( (stream = g_Reflector.OpenStream(Header, client)) != nullptr ) { // keep the handle - m_Streams.push_back(stream); + m_Streams[stream->GetStreamId()] = stream; } } // release diff --git a/reflector/DExtraClient.cpp b/reflector/DExtraClient.cpp index 5710be7..579352b 100644 --- a/reflector/DExtraClient.cpp +++ b/reflector/DExtraClient.cpp @@ -25,20 +25,20 @@ CDextraClient::CDextraClient() { - m_ProtRev = 0; + m_ProtRev = EProtoRev::ambe; } -CDextraClient::CDextraClient(const CCallsign &callsign, const CIp &ip, char reflectorModule, int protRev) +CDextraClient::CDextraClient(const CCallsign &callsign, const CIp &ip, char reflectorModule, EProtoRev protRev) : CClient(callsign, ip, reflectorModule) { m_ProtRev = protRev; } -CDextraClient::CDextraClient(const CDextraClient &client) - : CClient(client) -{ - m_ProtRev = client.m_ProtRev; -} +//CDextraClient::CDextraClient(const CDextraClient &client) +// : CClient(client) +//{ +// m_ProtRev = client.m_ProtRev; +//} //////////////////////////////////////////////////////////////////////////////////////// // status diff --git a/reflector/DExtraClient.h b/reflector/DExtraClient.h index d8d0353..8777fee 100644 --- a/reflector/DExtraClient.h +++ b/reflector/DExtraClient.h @@ -32,15 +32,15 @@ class CDextraClient : public CClient public: // constructors CDextraClient(); - CDextraClient(const CCallsign &, const CIp &, char = ' ', int = 0); - CDextraClient(const CDextraClient &); + CDextraClient(const CCallsign &, const CIp &, char, EProtoRev); +// CDextraClient(const CDextraClient &); // destructor virtual ~CDextraClient() {}; // identity EProtocol GetProtocol(void) const { return EProtocol::dextra; } - int GetProtocolRevision(void) const { return m_ProtRev; } + EProtoRev GetProtocolRevision(void) const { return m_ProtRev; } const char *GetProtocolName(void) const { return "DExtra"; } ECodecType GetCodec(void) const { return ECodecType::dstar; } bool IsNode(void) const { return true; } @@ -50,5 +50,5 @@ public: protected: // data - int m_ProtRev; + EProtoRev m_ProtRev; }; diff --git a/reflector/DExtraPeer.cpp b/reflector/DExtraPeer.cpp index 7e58723..e8c04a5 100644 --- a/reflector/DExtraPeer.cpp +++ b/reflector/DExtraPeer.cpp @@ -40,8 +40,8 @@ CDextraPeer::CDextraPeer(const CCallsign &callsign, const CIp &ip, const char *m // and construct the DExtra clients for ( unsigned i = 0; i < ::strlen(modules); i++ ) { - // create and append to vector - m_Clients.push_back(std::make_shared(callsign, ip, modules[i], version.GetMajor())); + // create and append to list + m_Clients.push_back(std::make_shared(callsign, ip, modules[i], EProtoRev::ambe)); } } diff --git a/reflector/DExtraProtocol.cpp b/reflector/DExtraProtocol.cpp index c7ef032..87b40fc 100644 --- a/reflector/DExtraProtocol.cpp +++ b/reflector/DExtraProtocol.cpp @@ -51,7 +51,7 @@ void CDextraProtocol::Task(void) CIp Ip; CCallsign Callsign; char ToLinkModule; - int ProtRev; + EProtoRev ProtRev; std::unique_ptr Header; std::unique_ptr Frame; @@ -79,9 +79,23 @@ void CDextraProtocol::Task(void) OnDvHeaderPacketIn(Header, Ip); } } - else if ( IsValidConnectPacket(Buffer, &Callsign, &ToLinkModule, &ProtRev) ) + else if ( IsValidConnectPacket(Buffer, Callsign, ToLinkModule, ProtRev) ) { - std::cout << "DExtra connect packet for module " << ToLinkModule << " from " << Callsign << " at " << Ip << " rev " << ProtRev << std::endl; + std::cout << "DExtra connect packet for module " << ToLinkModule << " from " << Callsign << " at " << Ip << " rev "; + switch (ProtRev) { + case EProtoRev::original: + std::cout << "Original" << std::endl; + break; + case EProtoRev::revised: + std::cout << "Revised" << std::endl; + break; + case EProtoRev::ambe: + std::cout << "AMBE" << std::endl; + break; + default: + std::cout << "UNEXPECTED Revision" << std::endl; + break; + } // callsign authorized? if ( g_GateKeeper.MayLink(Callsign, Ip, EProtocol::dextra) ) @@ -419,7 +433,7 @@ void CDextraProtocol::OnDvHeaderPacketIn(std::unique_ptr &Heade if ( (stream = g_Reflector.OpenStream(Header, client)) != nullptr ) { // keep the handle - m_Streams.push_back(stream); + m_Streams[stream->GetStreamId()] = stream; } } // release @@ -434,28 +448,27 @@ void CDextraProtocol::OnDvHeaderPacketIn(std::unique_ptr &Heade //////////////////////////////////////////////////////////////////////////////////////// // packet decoding helpers -bool CDextraProtocol::IsValidConnectPacket(const CBuffer &Buffer, CCallsign *callsign, char *reflectormodule, int *revision) +bool CDextraProtocol::IsValidConnectPacket(const CBuffer &Buffer, CCallsign &callsign, char &module, EProtoRev &protrev) { bool valid = false; if ((Buffer.size() == 11) && (Buffer.data()[9] != ' ')) { - callsign->SetCallsign(Buffer.data(), 8); - callsign->SetModule(Buffer.data()[8]); - *reflectormodule = Buffer.data()[9]; - *revision = (Buffer.data()[10] == 11) ? 1 : 0; - valid = (callsign->IsValid() && IsLetter(*reflectormodule)); + callsign.SetCallsign(Buffer.data(), 8); + callsign.SetModule(Buffer.data()[8]); + module = Buffer.data()[9]; + valid = (callsign.IsValid() && IsLetter(module)); // detect revision if ( (Buffer.data()[10] == 11) ) { - *revision = 1; + protrev = EProtoRev::revised; } - else if ( callsign->HasSameCallsignWithWildcard(CCallsign("XRF*")) ) + else if ( callsign.HasSameCallsignWithWildcard(CCallsign("XRF*")) ) { - *revision = 2; + protrev = EProtoRev::ambe; } else { - *revision = 0; + EProtoRev::original; } } return valid; @@ -529,10 +542,10 @@ void CDextraProtocol::EncodeConnectPacket(CBuffer *Buffer, const char *Modules) Buffer->Append((uint8_t)0); } -void CDextraProtocol::EncodeConnectAckPacket(CBuffer *Buffer, int ProtRev) +void CDextraProtocol::EncodeConnectAckPacket(CBuffer *Buffer, EProtoRev ProtRev) { // is it for a XRF or repeater - if ( ProtRev == 2 ) + if ( ProtRev == EProtoRev::ambe ) { // XRFxxx uint8_t rm = (Buffer->data())[8]; diff --git a/reflector/DExtraProtocol.h b/reflector/DExtraProtocol.h index 88dc746..4e401cd 100644 --- a/reflector/DExtraProtocol.h +++ b/reflector/DExtraProtocol.h @@ -40,7 +40,7 @@ // the protocol is detected by looking at "XRF" in connect packet callsign // the protocol require a specific connect ack packet // the protocol also implement a workaround for detecting stream's module -// as dxrfd soes not set DV header RPT2 properly. +// as dxrfd does not set DV header RPT2 properly. // the protocol assumes that a dxrfd can only be linked to one module at a time @@ -68,7 +68,7 @@ protected: void OnDvHeaderPacketIn(std::unique_ptr &, const CIp &); // packet decoding helpers - bool IsValidConnectPacket( const CBuffer &, CCallsign *, char *, int *); + bool IsValidConnectPacket( const CBuffer &, CCallsign &, char &, EProtoRev &); bool IsValidDisconnectPacket( const CBuffer &, CCallsign *); bool IsValidKeepAlivePacket( const CBuffer &, CCallsign *); bool IsValidDvHeaderPacket( const CBuffer &, std::unique_ptr &); @@ -77,7 +77,7 @@ protected: // packet encoding helpers void EncodeKeepAlivePacket(CBuffer *); void EncodeConnectPacket(CBuffer *, const char *); - void EncodeConnectAckPacket(CBuffer *, int); + void EncodeConnectAckPacket(CBuffer *, EProtoRev); void EncodeConnectNackPacket(CBuffer *); void EncodeDisconnectPacket(CBuffer *, char); void EncodeDisconnectedPacket(CBuffer *); diff --git a/reflector/DMRMMDVMProtocol.cpp b/reflector/DMRMMDVMProtocol.cpp index 983acc1..8355c0a 100644 --- a/reflector/DMRMMDVMProtocol.cpp +++ b/reflector/DMRMMDVMProtocol.cpp @@ -319,7 +319,7 @@ void CDmrmmdvmProtocol::OnDvHeaderPacketIn(std::unique_ptr &Hea if ( (stream = g_Reflector.OpenStream(Header, client)) != nullptr ) { // keep the handle - m_Streams.push_back(stream); + m_Streams[stream->GetStreamId()] = stream; lastheard = true; } } diff --git a/reflector/DMRPlusProtocol.cpp b/reflector/DMRPlusProtocol.cpp index e87df22..0c6d5f2 100644 --- a/reflector/DMRPlusProtocol.cpp +++ b/reflector/DMRPlusProtocol.cpp @@ -203,7 +203,7 @@ void CDmrplusProtocol::OnDvHeaderPacketIn(std::unique_ptr &Head if ( (stream = g_Reflector.OpenStream(Header, client)) != nullptr ) { // keep the handle - m_Streams.push_back(stream); + m_Streams[stream->GetStreamId()] = stream; } } // release diff --git a/reflector/DPlusProtocol.cpp b/reflector/DPlusProtocol.cpp index 260467e..1526107 100644 --- a/reflector/DPlusProtocol.cpp +++ b/reflector/DPlusProtocol.cpp @@ -207,7 +207,7 @@ void CDplusProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header if ( (stream = g_Reflector.OpenStream(Header, client)) != nullptr ) { // keep the handle - m_Streams.push_back(stream); + m_Streams[stream->GetStreamId()] = stream; } } // release diff --git a/reflector/G3Protocol.cpp b/reflector/G3Protocol.cpp index dc5a821..e13bb33 100644 --- a/reflector/G3Protocol.cpp +++ b/reflector/G3Protocol.cpp @@ -566,7 +566,7 @@ void CG3Protocol::OnDvHeaderPacketIn(std::unique_ptr &Header, c if ( (stream = g_Reflector.OpenStream(Header, client)) != nullptr ) { // keep the handle - m_Streams.push_back(stream); + m_Streams[stream->GetStreamId()] = stream; } // update last heard diff --git a/reflector/GateKeeper.cpp b/reflector/GateKeeper.cpp index a4a8cee..208839f 100644 --- a/reflector/GateKeeper.cpp +++ b/reflector/GateKeeper.cpp @@ -98,7 +98,7 @@ bool CGateKeeper::MayLink(const CCallsign &callsign, const CIp &ip, EProtocol pr break; // XLX interlinks - case EProtocol::ulx: + case EProtocol::urf: ok &= IsPeerListedOk(callsign, ip, modules); break; @@ -143,7 +143,7 @@ bool CGateKeeper::MayTransmit(const CCallsign &callsign, const CIp &ip, const EP break; // XLX interlinks - case EProtocol::ulx: + case EProtocol::urf: ok = ok && IsPeerListedOk(callsign, ip, module); break; @@ -280,8 +280,8 @@ const std::string CGateKeeper::ProtocolName(const EProtocol p) const return "MMDVM DMR"; case EProtocol::dmrplus: return "DMR+"; - case EProtocol::ulx: - return "ULX"; + case EProtocol::urf: + return "URF"; case EProtocol::ysf: return "YSF"; #ifndef NO_G3 diff --git a/reflector/M17Protocol.cpp b/reflector/M17Protocol.cpp index 4e8176b..a6e98ee 100644 --- a/reflector/M17Protocol.cpp +++ b/reflector/M17Protocol.cpp @@ -199,7 +199,7 @@ void CM17Protocol::OnDvHeaderPacketIn(std::unique_ptr &Header, if ( (stream = g_Reflector.OpenStream(Header, client)) != nullptr ) { // keep the handle - m_Streams.push_back(stream); + m_Streams[stream->GetStreamId()] = stream; } } // release diff --git a/reflector/Main.h b/reflector/Main.h index ce0a832..2c2f53c 100644 --- a/reflector/Main.h +++ b/reflector/Main.h @@ -225,6 +225,3 @@ extern CYsfNodeDirHttp g_YsfNodeDir; class CYsfNodeDirFile; extern CYsfNodeDirFile g_YsfNodeDir; #endif - -class CTranscoder; -extern CTranscoder g_Transcoder; diff --git a/reflector/Makefile b/reflector/Makefile index 6e64c97..e070ce2 100644 --- a/reflector/Makefile +++ b/reflector/Makefile @@ -36,23 +36,21 @@ endif LDFLAGS=-pthread -XRFSRCS = Buffer.cpp Callsign.cpp CallsignList.cpp CallsignListItem.cpp Client.cpp Clients.cpp DCSClient.cpp DCSProtocol.cpp DExtraClient.cpp DExtraPeer.cpp DExtraProtocol.cpp DPlusClient.cpp DPlusProtocol.cpp DVFramePacket.cpp DVHeaderPacket.cpp DVLastFramePacket.cpp GateKeeper.cpp IP.cpp Notification.cpp Packet.cpp PacketStream.cpp PeerCallsignList.cpp Peer.cpp Peers.cpp Protocol.cpp Protocols.cpp Reflector.cpp UDPSocket.cpp User.cpp Users.cpp Version.cpp Main.cpp -XLXSRCS = BMClient.cpp BMPeer.cpp BPTC19696.cpp CRC.cpp DMRIdDir.cpp DMRIdDirFile.cpp DMRIdDirHttp.cpp DMRMMDVMClient.cpp DMRMMDVMProtocol.cpp DMRPlusClient.cpp DMRPlusProtocol.cpp Golay2087.cpp Golay24128.cpp Hamming.cpp QR1676.cpp RS129.cpp Semaphore.cpp Utils.cpp WiresXCmd.cpp WiresXCmdHandler.cpp WiresXInfo.cpp XLXClient.cpp XLXProtocol.cpp XLXPeer.cpp YSFClient.cpp YSFConvolution.cpp YSFFich.cpp YSFNode.cpp YSFNodeDir.cpp YSFNodeDirFile.cpp YSFNodeDirHttp.cpp YSFPayload.cpp YSFProtocol.cpp YSFUtils.cpp +XRFSRCS = Buffer.cpp Callsign.cpp CallsignList.cpp CallsignListItem.cpp Client.cpp Clients.cpp DCSClient.cpp DCSProtocol.cpp DExtraClient.cpp DExtraPeer.cpp DExtraProtocol.cpp DPlusClient.cpp DPlusProtocol.cpp DVFramePacket.cpp DVHeaderPacket.cpp GateKeeper.cpp IP.cpp M17Client.cpp M17CRC.cpp M17Packet.cpp M17Client.cpp Notification.cpp Packet.cpp PacketStream.cpp PeerCallsignList.cpp Peer.cpp Peers.cpp Protocol.cpp Protocols.cpp Reflector.cpp UDPSocket.cpp User.cpp Users.cpp Version.cpp Main.cpp +XLXSRCS = BMClient.cpp BMPeer.cpp BPTC19696.cpp CRC.cpp DMRIdDir.cpp DMRIdDirFile.cpp DMRIdDirHttp.cpp DMRMMDVMClient.cpp DMRMMDVMProtocol.cpp DMRPlusClient.cpp DMRPlusProtocol.cpp Golay2087.cpp Golay24128.cpp Hamming.cpp QR1676.cpp RS129.cpp Semaphore.cpp Utils.cpp WiresXCmd.cpp WiresXCmdHandler.cpp WiresXInfo.cpp URFClient.cpp URFProtocol.cpp URFPeer.cpp YSFClient.cpp YSFConvolution.cpp YSFFich.cpp YSFNode.cpp YSFNodeDir.cpp YSFNodeDirFile.cpp YSFNodeDirHttp.cpp YSFPayload.cpp YSFProtocol.cpp YSFUtils.cpp G3SRCS = G3Client.cpp G3Protocol.cpp RawSocket.cpp UDPMsgSocket.cpp -SRCS = $(XRFSRCS) +SRCS = $(XRFSRCS) $(XLXSRCS) +ifeq ($(use_g3), true) +SRCS += $(G3SRCS) +endif -SRCS += $(XLXSRCS) ifeq ($(ysf_db), true) LDFLAGS += `mysql_config --libs` endif ifdef need_tc -SRCS += CodecStream.cpp -endif - -ifeq ($(use_g3), true) -SRCS += $(G3SRCS) +SRCS += CodecStream.cpp UnixDgramSocket.cpp endif OBJS = $(SRCS:.cpp=.o) diff --git a/reflector/PacketStream.cpp b/reflector/PacketStream.cpp index 3d73728..f8d8d5a 100644 --- a/reflector/PacketStream.cpp +++ b/reflector/PacketStream.cpp @@ -53,7 +53,7 @@ bool CPacketStream::OpenPacketStream(const CDvHeaderPacket &DvHeader, std::share m_LastPacketTime.start(); #ifdef TRANSCODED_MODULES if (std::string::npos != std::string(TRANSCODED_MODULES).find(DvHeader.GetRpt2Module())) - m_CodecStream = std::make_shared(this, client->GetCodec(), m_TCReader); + m_CodecStream = std::make_shared(this, m_uiStreamId, client->GetCodec(), m_TCReader); #endif ok = true; } @@ -67,7 +67,6 @@ void CPacketStream::ClosePacketStream(void) m_uiStreamId = 0; m_OwnerClient = nullptr; #ifdef TRANSCODED_MODULES - g_Transcoder.ReleaseStream(m_CodecStream); m_CodecStream = nullptr; #endif } diff --git a/reflector/PacketStream.h b/reflector/PacketStream.h index 4ebbd72..ad2ff0f 100644 --- a/reflector/PacketStream.h +++ b/reflector/PacketStream.h @@ -21,7 +21,8 @@ #include "PacketQueue.h" #include "Timer.h" #include "DVHeaderPacket.h" -#include "Transcoder.h" +#include "UnixDgramSocket.h" +#include "CodecStream.h" //////////////////////////////////////////////////////////////////////////////////////// @@ -31,8 +32,6 @@ //////////////////////////////////////////////////////////////////////////////////////// // class -#include "UnixDgramSocket.h" - class CPacketStream : public CPacketQueue { public: diff --git a/reflector/Peer.h b/reflector/Peer.h index 2d17387..d39a146 100644 --- a/reflector/Peer.h +++ b/reflector/Peer.h @@ -47,7 +47,7 @@ public: // identity virtual EProtocol GetProtocol(void) const { return EProtocol::none; } - virtual int GetProtocolRevision(void) const { return 0; } + virtual EProtoRev GetProtocolRevision(void) const { return EProtoRev::original; } virtual const char *GetProtocolName(void) const { return "NONE"; } // status diff --git a/reflector/Protocol.cpp b/reflector/Protocol.cpp index bd7d660..a90810c 100644 --- a/reflector/Protocol.cpp +++ b/reflector/Protocol.cpp @@ -166,21 +166,18 @@ void CProtocol::OnDvFramePacketIn(std::unique_ptr &Frame, const std::shared_ptr CProtocol::GetStream(uint16_t uiStreamId, const CIp *Ip) { - for ( auto it=m_Streams.begin(); it!=m_Streams.end(); it++ ) + auto it = m_Streams.find(uiStreamId); + if (it == m_Streams.end()) + return nullptr; + + if (it->second->GetOwnerIp() != nullptr) { - if ( (*it)->GetStreamId() == uiStreamId ) + if (*Ip == *it->second->GetOwnerIp()) { - // if Ip not nullptr, also check if IP match - if ( (Ip != nullptr) && ((*it)->GetOwnerIp() != nullptr) ) - { - if ( *Ip == *((*it)->GetOwnerIp()) ) - { - return *it; - } - } + return it->second; } } - // done + return nullptr; } @@ -189,18 +186,19 @@ void CProtocol::CheckStreamsTimeout(void) for ( auto it=m_Streams.begin(); it!=m_Streams.end(); ) { // time out ? - (*it)->Lock(); - if ( (*it)->IsExpired() ) + it->second->Lock(); + if ( it->second->IsExpired() ) { // yes, close it - (*it)->Unlock(); - g_Reflector.CloseStream(*it); - // and remove it + it->second->Unlock(); + g_Reflector.CloseStream(it->second); + // and remove it from the m_Streams map it = m_Streams.erase(it); } else { - (*it++)->Unlock(); + it->second->Unlock(); + it++; } } } diff --git a/reflector/Protocol.h b/reflector/Protocol.h index 5b5da32..0be010c 100644 --- a/reflector/Protocol.h +++ b/reflector/Protocol.h @@ -125,7 +125,7 @@ protected: CUdpSocket m_Socket6; // streams - std::list> m_Streams; + std::unordered_map> m_Streams; // queue CPacketQueue m_Queue; diff --git a/reflector/Reflector.cpp b/reflector/Reflector.cpp index 00ad405..5a2ddf9 100644 --- a/reflector/Reflector.cpp +++ b/reflector/Reflector.cpp @@ -22,7 +22,6 @@ #include "GateKeeper.h" #include "DMRIdDirFile.h" #include "DMRIdDirHttp.h" -#include "Transcoder.h" #include "YSFNodeDirFile.h" #include "YSFNodeDirHttp.h" @@ -77,12 +76,6 @@ bool CReflector::Start(void) // init wiresx node directory. Likewise with the return vale. g_YsfNodeDir.Init(); -#ifdef TRANSCODER_IP - // init the transcoder - if (! g_Transcoder.Init()) - return false; -#endif - // create protocols if (! m_Protocols.Init()) { @@ -144,11 +137,6 @@ void CReflector::Stop(void) // close gatekeeper g_GateKeeper.Close(); -#ifdef TRANSCODER_IP - // close transcoder - g_Transcoder.Close(); -#endif - // close databases g_DmridDir.Close(); g_YsfNodeDir.Close(); @@ -441,6 +429,7 @@ void CReflector::JsonReportThread() std::cout << "Error creating monitor socket" << std::endl; } } +#endif //////////////////////////////////////////////////////////////////////////////////////// // notifications @@ -489,7 +478,6 @@ void CReflector::OnStreamClose(const CCallsign &callsign) m_Notifications.push(notification); m_Notifications.Unlock(); } -#endif //////////////////////////////////////////////////////////////////////////////////////// // modules & queues diff --git a/reflector/Reflector.h b/reflector/Reflector.h index 352bd9d..f3fda53 100644 --- a/reflector/Reflector.h +++ b/reflector/Reflector.h @@ -24,9 +24,6 @@ #include "Protocols.h" #include "PacketStream.h" #include "NotificationQueue.h" -#ifdef TRANSCODED_MODULES -//#include "UnixDgramSocket.h" -#endif //////////////////////////////////////////////////////////////////////////////////////// @@ -82,6 +79,7 @@ public: bool IsValidModule(char c) const { return m_Modules.npos!=m_Modules.find(c); } // notifications + void OnPeersChanged(void); void OnClientsChanged(void); void OnUsersChanged(void); @@ -137,9 +135,9 @@ protected: std::future m_XmlReportFuture; #ifdef JSON_MONITOR std::future m_JsonReportFuture; +#endif // notifications CNotificationQueue m_Notifications; -#endif public: #ifdef DEBUG_DUMPFILE diff --git a/reflector/Transcoder.cpp b/reflector/Transcoder.cpp deleted file mode 100644 index d226a89..0000000 --- a/reflector/Transcoder.cpp +++ /dev/null @@ -1,386 +0,0 @@ -// Copyright © 2015 Jean-Luc Deltombe (LX3JL). All rights reserved. - -// urfd -- The universal reflector -// Copyright © 2021 Thomas A. Early N7TAE -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -#include "Main.h" -#include "Reflector.h" -#include "Transcoder.h" - -//////////////////////////////////////////////////////////////////////////////////////// -// define - -// status -#define STATUS_IDLE 0 -#define STATUS_LOGGED 1 - -// timeout -#define AMBED_OPENSTREAM_TIMEOUT 200 // in ms - -//////////////////////////////////////////////////////////////////////////////////////// - -CTranscoder g_Transcoder; - - -//////////////////////////////////////////////////////////////////////////////////////// -// constructor - -CTranscoder::CTranscoder() -{ - keep_running = true; - m_bConnected = false; - m_LastKeepaliveTime.start(); - m_LastActivityTime.start(); - m_bStreamOpened = false; - m_StreamidOpenStream = 0; - m_PortOpenStream = 0; -} - -//////////////////////////////////////////////////////////////////////////////////////// -// destructor - -CTranscoder::~CTranscoder() -{ - Close(); -} - -//////////////////////////////////////////////////////////////////////////////////////// -// initialization - -bool CTranscoder::Init(void) -{ - // create address to the transcoder - auto s = g_Reflector.GetTranscoderIp(); - m_Ip.Initialize(strchr(s, ':') ? AF_INET6 : AF_INET, TRANSCODER_PORT, s); - - // does the user not want to use a transcoder? - if (0 == strncasecmp(s, "none", 4)) - { - std::cout << "Transcoder will not be enabled because the transcoder IP addess is 'none'" << std::endl; - return true; - } - - // now open the transcoder port -#ifdef LISTEN_IPV4 -#ifdef LISTEN_IPV6 - auto paddr = (AF_INET == m_Ip.GetFamily()) ? LISTEN_IPV4 : LISTEN_IPV6; -#else - auto paddr = LISTEN_IPV4; -#endif -#else - auto paddr = LISTEN_IPV6; -#endif - CIp tc(m_Ip.GetFamily(), TRANSCODER_PORT, paddr); - - // create our socket - if (tc.IsSet()) - { - if (! m_Socket.Open(tc)) - { - std::cerr << "Error opening socket on port UDP" << TRANSCODER_PORT << " on ip " << m_Ip << std::endl; - return false; - } - } - else - { - // something bad was specified for the transcoder IP? - std::cerr << "Error initializing transcoder port using " << ((AF_INET == m_Ip.GetFamily()) ? "IPv4" : "IPv6") << " on " << paddr << std::endl; - return false; - } - - // start thread - keep_running = true; - m_Future = std::async(std::launch::async, &CTranscoder::Thread, this); - - return true; -} - -void CTranscoder::Close(void) -{ - // close socket - m_Socket.Close(); - - // close all streams - m_Mutex.lock(); - m_Streams.clear(); - m_Mutex.unlock(); - - // kill threads - keep_running = false; - if ( m_Future.valid() ) - { - m_Future.get(); - } -} - -//////////////////////////////////////////////////////////////////////////////////////// -// thread - -void CTranscoder::Thread() -{ - while (keep_running) - { - Task(); - } -} - -void CTranscoder::Task(void) -{ - CBuffer Buffer; - CIp Ip; - uint16_t StreamId; - uint16_t Port; - - // anything coming in from codec server ? - //if ( (m_Socket.Receive(&Buffer, &Ip, 20) != -1) && (Ip == m_Ip) ) - if ( m_Socket.Receive(Buffer, Ip, 20) ) - { - m_LastActivityTime.start(); - - // crack packet - if ( IsValidStreamDescrPacket(Buffer, &StreamId, &Port) ) - { - //std::cout << "Transcoder stream " << (int) StreamId << " descr packet " << std::endl; - m_bStreamOpened = true; - m_StreamidOpenStream = StreamId; - m_PortOpenStream = Port; - m_SemaphoreOpenStream.Notify(); - } - else if ( IsValidNoStreamAvailablePacket(Buffer) ) - { - m_bStreamOpened = false; - m_SemaphoreOpenStream.Notify(); - } - else if ( IsValidKeepAlivePacket(Buffer) ) - { - if ( !m_bConnected ) - { - std::cout << "Transcoder connected at " << Ip << std::endl; - } - m_bConnected = true; - } - - } - - // keep client alive - if ( m_LastKeepaliveTime.time() > TRANSCODER_KEEPALIVE_PERIOD ) - { - // - HandleKeepalives(); - - // update time - m_LastKeepaliveTime.start(); - } -} - -//////////////////////////////////////////////////////////////////////////////////////// -// manage streams - -std::shared_ptr CTranscoder::GetCodecStream(CPacketStream *PacketStream, uint8_t uiCodecIn) -{ - CBuffer Buffer; - - // do we need transcoding - if ( uiCodecIn != CODEC_NONE ) - { - // are we connected to server - if ( m_bConnected ) - { - // yes, post openstream request - EncodeOpenstreamPacket(&Buffer, uiCodecIn, (uiCodecIn == CODEC_AMBEPLUS) ? CODEC_AMBE2PLUS : CODEC_AMBEPLUS); - m_Socket.Send(Buffer, m_Ip, TRANSCODER_PORT); - - // wait relpy here - if ( m_SemaphoreOpenStream.WaitFor(AMBED_OPENSTREAM_TIMEOUT) ) - { - if ( m_bStreamOpened ) - { - std::cout << "ambed openstream ok" << std::endl; - - // create stream object - auto stream = std::make_shared(PacketStream, m_StreamidOpenStream, uiCodecIn, (uiCodecIn == CODEC_AMBEPLUS) ? CODEC_AMBE2PLUS : CODEC_AMBEPLUS); - - if ( stream ) { - if ( stream->Init(m_PortOpenStream) ) - { - // and append to list - Lock(); - m_Streams.push_back(stream); - Unlock(); - return stream; - } - else - { - // send close packet - EncodeClosestreamPacket(&Buffer, stream->GetStreamId()); - m_Socket.Send(Buffer, m_Ip, TRANSCODER_PORT); - stream.reset(); - } - } - else - std::cerr << "ERROR: Unable to make a new CCodecStream" << std::endl; - } - else - { - std::cerr << "ERROR: Ambed openstream failed (no suitable channel available)" << std::endl; - } - } - else - { - std::cout << "ambed openstream timeout" << std::endl; - } - - } - } - return nullptr; -} - -void CTranscoder::ReleaseStream(std::shared_ptr stream) -{ - CBuffer Buffer; - - if ( stream ) - { - // look for the stream - bool found = false; - Lock(); - { - for ( auto it=m_Streams.begin(); it!=m_Streams.end(); it++ ) - { - // compare object pointers - if ( (*it) == stream ) - { - // send close packet - EncodeClosestreamPacket(&Buffer, (*it)->GetStreamId()); - m_Socket.Send(Buffer, m_Ip, TRANSCODER_PORT); - - // display stats - if ( (*it)->GetPingMin() >= 0.0 ) - { - char sz[256]; - sprintf(sz, "ambed stats (ms) : %.1f/%.1f/%.1f", - (*it)->GetPingMin() * 1000.0, - (*it)->GetPingAve() * 1000.0, - (*it)->GetPingMax() * 1000.0); - std::cout << sz << std::endl; - } - if ( (*it)->GetTimeoutPackets() > 0 ) - { - char sz[256]; - sprintf(sz, "ambed %d of %d packets timed out", - (*it)->GetTimeoutPackets(), - (*it)->GetTotalPackets()); - std::cout << sz << std::endl; - } - - // and close it - (*it)->Close(); - m_Streams.erase(it); - break; - } - } - } - Unlock(); - } -} - -//////////////////////////////////////////////////////////////////////////////////////// -// keepalive helpers - -void CTranscoder::HandleKeepalives(void) -{ - CBuffer keepalive; - - // send keepalive - EncodeKeepAlivePacket(&keepalive); - m_Socket.Send(keepalive, m_Ip, TRANSCODER_PORT); - - // check if still with us - if ( m_bConnected && (m_LastActivityTime.time() >= TRANSCODER_KEEPALIVE_TIMEOUT) ) - { - // no, disconnect - m_bConnected = false; - std::cout << "Transcoder keepalive timeout" << std::endl; - } -} - -//////////////////////////////////////////////////////////////////////////////////////// -// packet decoding helpers - -bool CTranscoder::IsValidKeepAlivePacket(const CBuffer &Buffer) -{ - uint8_t tag[] = { 'A','M','B','E','D','P','O','N','G' }; - - bool valid = false; - if ( (Buffer.size() == 9) && (Buffer.Compare(tag, sizeof(tag)) == 0) ) - { - valid = true; - } - return valid; -} - -bool CTranscoder::IsValidStreamDescrPacket(const CBuffer &Buffer, uint16_t *Id, uint16_t *Port) -{ - uint8_t tag[] = { 'A','M','B','E','D','S','T','D' }; - - bool valid = false; - if ( (Buffer.size() == 14) && (Buffer.Compare(tag, sizeof(tag)) == 0) ) - { - *Id = *(uint16_t *)(&Buffer.data()[8]); - *Port = *(uint16_t *)(&Buffer.data()[10]); - // uint8_t CodecIn = Buffer.data()[12]; - // uint8_t CodecOut = Buffer.data()[13]; - valid = true; - } - return valid; -} - -bool CTranscoder::IsValidNoStreamAvailablePacket(const CBuffer&Buffer) -{ - uint8_t tag[] = { 'A','M','B','E','D','B','U','S','Y' }; - - return ( (Buffer.size() == 9) && (Buffer.Compare(tag, sizeof(tag)) == 0) ); -} - - -//////////////////////////////////////////////////////////////////////////////////////// -// packet encoding helpers - -void CTranscoder::EncodeKeepAlivePacket(CBuffer *Buffer) -{ - uint8_t tag[] = { 'A','M','B','E','D','P','I','N','G' }; - - Buffer->Set(tag, sizeof(tag)); - Buffer->Append((uint8_t *)(const char *)g_Reflector.GetCallsign(), CALLSIGN_LEN); -} - -void CTranscoder::EncodeOpenstreamPacket(CBuffer *Buffer, uint8_t uiCodecIn, uint8_t uiCodecOut) -{ - uint8_t tag[] = { 'A','M','B','E','D','O','S' }; - - Buffer->Set(tag, sizeof(tag)); - Buffer->Append((uint8_t *)(const char *)g_Reflector.GetCallsign(), CALLSIGN_LEN); - Buffer->Append((uint8_t)uiCodecIn); - Buffer->Append((uint8_t)uiCodecOut); -} - -void CTranscoder::EncodeClosestreamPacket(CBuffer *Buffer, uint16_t uiStreamId) -{ - uint8_t tag[] = { 'A','M','B','E','D','C','S' }; - - Buffer->Set(tag, sizeof(tag)); - Buffer->Append((uint16_t)uiStreamId); -} diff --git a/reflector/Transcoder.h b/reflector/Transcoder.h deleted file mode 100644 index aa57997..0000000 --- a/reflector/Transcoder.h +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright © 2015 Jean-Luc Deltombe (LX3JL). All rights reserved. - -// urfd -- The universal reflector -// Copyright © 2021 Thomas A. Early N7TAE -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -#pragma once - -#include "Semaphore.h" -#include "CodecStream.h" -#include "UDPSocket.h" - -class CPacketStream; - -class CTranscoder -{ -public: - // constructor - CTranscoder(); - - // destructor - ~CTranscoder(); - - // initialization - bool Init(void); - void Close(void); - - // locks - void Lock(void) { m_Mutex.lock(); } - void Unlock(void) { m_Mutex.unlock(); } - - // status - bool IsConnected(void) const { return m_bConnected; } - - // manage streams - std::shared_ptr GetCodecStream(CPacketStream *, uint8_t); - void ReleaseStream(std::shared_ptr); - - // task - void Thread(void); - void Task(void); - -protected: - // keepalive helpers - void HandleKeepalives(void); - - // packet decoding helpers - bool IsValidKeepAlivePacket(const CBuffer &); - bool IsValidStreamDescrPacket(const CBuffer &, uint16_t *, uint16_t *); - bool IsValidNoStreamAvailablePacket(const CBuffer&); - - // packet encoding helpers - void EncodeKeepAlivePacket(CBuffer *); - void EncodeOpenstreamPacket(CBuffer *, uint8_t, uint8_t); - void EncodeClosestreamPacket(CBuffer *, uint16_t); - -protected: - // streams - std::mutex m_Mutex; - std::list> m_Streams; - - // sync objects for Openstream - CSemaphore m_SemaphoreOpenStream; - bool m_bStreamOpened; - uint16_t m_StreamidOpenStream; - uint16_t m_PortOpenStream; - - // thread - std::atomic keep_running; - std::future m_Future; - - // socket - CIp m_Ip; - CUdpSocket m_Socket; - bool m_bConnected; - - // time - CTimer m_LastKeepaliveTime; - CTimer m_LastActivityTime; -}; diff --git a/reflector/URFPeer.cpp b/reflector/URFPeer.cpp index d3a2634..68b7327 100644 --- a/reflector/URFPeer.cpp +++ b/reflector/URFPeer.cpp @@ -59,5 +59,5 @@ bool CURFPeer::IsAlive(void) const EProtoRev CURFPeer::GetProtocolRevision(const CVersion &/*version*/) { - return EProtoRev::urf; + return EProtoRev::original; } diff --git a/reflector/URFProtocol.cpp b/reflector/URFProtocol.cpp index efd8b84..103d237 100644 --- a/reflector/URFProtocol.cpp +++ b/reflector/URFProtocol.cpp @@ -87,7 +87,7 @@ void CURFProtocol::Task(void) { // acknowledge connecting request // following is version dependent - if (EProtoRev::urf == CURFPeer::GetProtocolRevision(Version)) + if (EProtoRev::original == CURFPeer::GetProtocolRevision(Version)) { // already connected ? CPeers *peers = g_Reflector.GetPeers(); @@ -235,7 +235,7 @@ void CURFProtocol::HandleQueue(void) { // no, send the packet // this is protocol revision dependent - if (EProtoRev::urf == client->GetProtocolRevision()) + if (EProtoRev::original == client->GetProtocolRevision()) { Send(buffer, client->GetIp()); } @@ -376,7 +376,7 @@ void CURFProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, if ( (stream = g_Reflector.OpenStream(Header, client)) != nullptr ) { // keep the handle - m_Streams.push_back(stream); + m_Streams[stream->GetStreamId()] = stream; } // get origin peer = client->GetCallsign(); diff --git a/reflector/UnixDgramSocket.cpp b/reflector/UnixDgramSocket.cpp index c4ea4ba..b760ac6 100644 --- a/reflector/UnixDgramSocket.cpp +++ b/reflector/UnixDgramSocket.cpp @@ -14,8 +14,6 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -#pragma once - #include #include #include diff --git a/reflector/YSFProtocol.cpp b/reflector/YSFProtocol.cpp index 6a6a49a..f742aa2 100644 --- a/reflector/YSFProtocol.cpp +++ b/reflector/YSFProtocol.cpp @@ -254,7 +254,7 @@ void CYsfProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, if ( (stream = g_Reflector.OpenStream(Header, client)) != nullptr ) { // keep the handle - m_Streams.push_back(stream); + m_Streams[stream->GetStreamId()] = stream; } } // release