diff --git a/docs/nng.md b/docs/nng.md index 9c078ac..5503406 100644 --- a/docs/nng.md +++ b/docs/nng.md @@ -29,7 +29,7 @@ graph TD %% Internal Flows CC -- "client_connect / client_disconnect" --> NP - CU -- "hearing (activity)" --> NP + CU -- "hearing / closing" --> NP CR -- "periodic state report" --> NP PS -- "IsActive status" --> CR @@ -123,6 +123,21 @@ Triggered when the reflector "hears" an active transmission. This event is sent } ``` +### 4. Transmission End (`closing`) + +Triggered when a transmission stream is closed (user stops talking). + +**Payload Structure:** + +```json +{ + "type": "closing", + "my": "G4XYZ", + "module": "A", + "protocol": "M17" +} +``` + ## Middle Tier Design Considerations 1. **Late Joining**: The `state` message is broadcast periodically to ensure a middle-tier connecting at any time (or reconnecting) can synchronize its internal state without waiting for new events. diff --git a/reflector/DCSProtocol.cpp b/reflector/DCSProtocol.cpp index 0ea47f4..3821dac 100644 --- a/reflector/DCSProtocol.cpp +++ b/reflector/DCSProtocol.cpp @@ -208,7 +208,7 @@ void CDcsProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, g_Reflector.ReleaseClients(); // update last heard - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, EProtocol::dcs); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::dcs); g_Reflector.ReleaseUsers(); } } diff --git a/reflector/DExtraProtocol.cpp b/reflector/DExtraProtocol.cpp index c698b26..32be8d2 100644 --- a/reflector/DExtraProtocol.cpp +++ b/reflector/DExtraProtocol.cpp @@ -351,7 +351,7 @@ void CDextraProtocol::OnDvHeaderPacketIn(std::unique_ptr &Heade g_Reflector.ReleaseClients(); // update last heard - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, EProtocol::dextra); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::dextra); g_Reflector.ReleaseUsers(); } } diff --git a/reflector/DMRMMDVMProtocol.cpp b/reflector/DMRMMDVMProtocol.cpp index f2b201e..c36c921 100644 --- a/reflector/DMRMMDVMProtocol.cpp +++ b/reflector/DMRMMDVMProtocol.cpp @@ -335,7 +335,7 @@ void CDmrmmdvmProtocol::OnDvHeaderPacketIn(std::unique_ptr &Hea // update last heard if ( lastheard ) { - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, EProtocol::dmrmmdvm); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::dmrmmdvm); g_Reflector.ReleaseUsers(); } } diff --git a/reflector/DPlusProtocol.cpp b/reflector/DPlusProtocol.cpp index 24b819d..ce908f4 100644 --- a/reflector/DPlusProtocol.cpp +++ b/reflector/DPlusProtocol.cpp @@ -213,7 +213,7 @@ void CDplusProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header g_Reflector.ReleaseClients(); // update last heard - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, EProtocol::dplus); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::dplus); g_Reflector.ReleaseUsers(); } else diff --git a/reflector/GateKeeper.h b/reflector/GateKeeper.h index 619dd86..d6baabd 100644 --- a/reflector/GateKeeper.h +++ b/reflector/GateKeeper.h @@ -47,6 +47,7 @@ public: // authorizations bool MayLink(const CCallsign &, const CIp &, const EProtocol, char * = nullptr) const; bool MayTransmit(const CCallsign &, const CIp &, EProtocol = EProtocol::any, char = ' ') const; + const std::string ProtocolName(EProtocol) const; protected: // thread @@ -56,7 +57,6 @@ protected: bool IsNodeListedOk(const std::string &) const; bool IsPeerListedOk(const std::string &, char) const; bool IsPeerListedOk(const std::string &, const CIp &, char *) const; - const std::string ProtocolName(EProtocol) const; protected: // data diff --git a/reflector/M17Protocol.cpp b/reflector/M17Protocol.cpp index 15b6954..204909e 100644 --- a/reflector/M17Protocol.cpp +++ b/reflector/M17Protocol.cpp @@ -1,5 +1,5 @@ // Copyright © 2015 Jean-Luc Deltombe (LX3JL). All rights reserved. -// + // urfd -- The universal reflector // Copyright © 2021 Thomas A. Early N7TAE // @@ -20,18 +20,9 @@ #include #include "M17Client.h" #include "M17Protocol.h" -#include "M17Parrot.h" #include "M17Packet.h" #include "Global.h" -//////////////////////////////////////////////////////////////////////////////////////// -// constructors - -CM17Protocol::CM17Protocol() - : CSEProtocol() -{ -} - //////////////////////////////////////////////////////////////////////////////////////// // operation @@ -48,6 +39,8 @@ bool CM17Protocol::Initialize(const char *type, const EProtocol ptype, const uin return true; } + + //////////////////////////////////////////////////////////////////////////////////////// // task @@ -56,7 +49,6 @@ void CM17Protocol::Task(void) CBuffer Buffer; CIp Ip; CCallsign Callsign; - CCallsign DstCallsign; char ToLinkModule; std::unique_ptr Header; std::unique_ptr Frame; @@ -75,24 +67,8 @@ void CM17Protocol::Task(void) // crack the packet if ( IsValidDvPacket(Buffer, Header, Frame) ) { - // find client - std::shared_ptr client = g_Reflector.GetClients()->FindClient(Ip, EProtocol::m17); - bool isListen = false; - if (client) - { - auto m17client = std::dynamic_pointer_cast(client); - if (m17client && m17client->IsListenOnly()) - isListen = true; - } - g_Reflector.ReleaseClients(); - - // parrot? - if ( Header->GetUrCallsign() == "PARROT" ) - { - HandleParrot(Ip, Buffer, true); - } // callsign muted? - else if ( g_GateKeeper.MayTransmit(Header->GetMyCallsign(), Ip, EProtocol::m17, Header->GetRpt2Module()) ) + if ( g_GateKeeper.MayTransmit(Header->GetMyCallsign(), Ip, EProtocol::m17, Header->GetRpt2Module()) ) { OnDvHeaderPacketIn(Header, Ip); @@ -109,10 +85,9 @@ void CM17Protocol::Task(void) OnDvFramePacketIn(secondFrame, &Ip); // push two packet because we need a packet every 20 ms } } - else if ( IsValidConnectPacket(Buffer, Callsign, ToLinkModule) || IsValidListenPacket(Buffer, Callsign, ToLinkModule) ) + else if ( IsValidConnectPacket(Buffer, Callsign, ToLinkModule) ) { - bool isListen = (0 == Buffer.Compare((const uint8_t*)"LSTN", 4)); - std::cout << "M17 " << (isListen ? "listen-only " : "") << "connect packet for module " << ToLinkModule << " from " << Callsign << " at " << Ip << std::endl; + std::cout << "M17 connect packet for module " << ToLinkModule << " from " << Callsign << " at " << Ip << std::endl; // callsign authorized? if ( g_GateKeeper.MayLink(Callsign, Ip, EProtocol::m17) && g_Reflector.IsValidModule(ToLinkModule) ) @@ -124,7 +99,7 @@ void CM17Protocol::Task(void) Send("ACKN", Ip); // create the client and append - g_Reflector.GetClients()->AddClient(std::make_shared(Callsign, Ip, ToLinkModule, isListen)); + g_Reflector.GetClients()->AddClient(std::make_shared(Callsign, Ip, ToLinkModule)); g_Reflector.ReleaseClients(); } else @@ -170,44 +145,6 @@ void CM17Protocol::Task(void) } g_Reflector.ReleaseClients(); } - else if ( IsValidPacketModePacket(Buffer, Callsign, DstCallsign) ) - { - // find client - std::shared_ptr client = g_Reflector.GetClients()->FindClient(Ip, EProtocol::m17); - bool isListen = false; - if (client) - { - auto m17client = std::dynamic_pointer_cast(client); - if (m17client && m17client->IsListenOnly()) - isListen = true; - } - g_Reflector.ReleaseClients(); - - if (!isListen) - { - // parrot? - if ( DstCallsign == "PARROT" ) - { - HandleParrot(Ip, Buffer, false); - } - // repeat to all clients on the module - else if (client) - { - char module = client->GetReflectorModule(); - CClients *clients = g_Reflector.GetClients(); - auto it = clients->begin(); - std::shared_ptr target = nullptr; - while ( (target = clients->FindNextClient(EProtocol::m17, it)) != nullptr ) - { - if (target->GetReflectorModule() == module && target->GetIp() != Ip) - { - Send(Buffer, target->GetIp()); - } - } - g_Reflector.ReleaseClients(); - } - } - } else { // invalid packet @@ -232,24 +169,6 @@ void CM17Protocol::Task(void) // update time m_LastKeepaliveTime.start(); } - - // Handle Parrot timeouts and cleanup - for (auto it = m_ParrotMap.begin(); it != m_ParrotMap.end(); ) - { - if (it->second->GetState() == EParrotState::record && it->second->IsExpired()) - { - it->second->Play(); - it++; - } - else if (it->second->GetState() == EParrotState::done) - { - it = m_ParrotMap.erase(it); - } - else - { - it++; - } - } } //////////////////////////////////////////////////////////////////////////////////////// @@ -290,7 +209,9 @@ void CM17Protocol::OnDvHeaderPacketIn(std::unique_ptr &Header, g_Reflector.ReleaseClients(); // update last heard - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, EProtocol::m17); + CCallsign reflectorCall = rpt2; + reflectorCall.SetCSModule(Header->GetRpt2Module()); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, reflectorCall, EProtocol::m17); g_Reflector.ReleaseUsers(); } } @@ -407,19 +328,6 @@ bool CM17Protocol::IsValidConnectPacket(const CBuffer &Buffer, CCallsign &callsi return valid; } -bool CM17Protocol::IsValidListenPacket(const CBuffer &Buffer, CCallsign &callsign, char &mod) -{ - uint8_t tag[] = { 'L', 'S', 'T', 'N' }; - bool valid = false; - if (11 == Buffer.size() && 0 == Buffer.Compare(tag, 4)) - { - callsign.CodeIn(Buffer.data() + 4); - mod = Buffer.data()[10]; - valid = (callsign.IsValid() && IsLetter(mod)); - } - return valid; -} - bool CM17Protocol::IsValidDisconnectPacket(const CBuffer &Buffer, CCallsign &callsign) { uint8_t tag[] = { 'D', 'I', 'S', 'C' }; @@ -434,35 +342,26 @@ bool CM17Protocol::IsValidDisconnectPacket(const CBuffer &Buffer, CCallsign &cal bool CM17Protocol::IsValidKeepAlivePacket(const CBuffer &Buffer, CCallsign &callsign) { + uint8_t tag[] = { 'P', 'O', 'N', 'G' }; bool valid = false; - if (Buffer.size() == 10) + if ( (Buffer.size() == 10) || (0 == Buffer.Compare(tag, 4)) ) { - if (0 == Buffer.Compare((const uint8_t*)"PING", 4) || 0 == Buffer.Compare((const uint8_t*)"PONG", 4)) - { - callsign.CodeIn(Buffer.data() + 4); - valid = callsign.IsValid(); - } + callsign.CodeIn(Buffer.data() + 4); + valid = callsign.IsValid(); } return valid; } -bool CM17Protocol::IsValidPacketModePacket(const CBuffer &Buffer, CCallsign &src, CCallsign &dst) -{ - uint8_t tag[] = { 'M', '1', '7', 'P' }; - if ( (Buffer.size() >= 18) && (0 == Buffer.Compare(tag, 4)) ) - { - dst.CodeIn(Buffer.data() + 4); - src.CodeIn(Buffer.data() + 10); - return (src.IsValid() && (0x0U == (0x1U & Buffer[17]))); // no encryption - } - return false; -} - bool CM17Protocol::IsValidDvPacket(const CBuffer &Buffer, std::unique_ptr &header, std::unique_ptr &frame) { uint8_t tag[] = { 'M', '1', '7', ' ' }; if ( (Buffer.size() == sizeof(SM17Frame)) && (0 == Buffer.Compare(tag, sizeof(tag))) && (0x4U == (0x1CU & Buffer[19])) ) + // Buffer[19] is the low-order byte of the uint16_t frametype. + // the 0x1CU mask (00011100 binary) just lets us see: + // 1. the encryptions bytes (mask 0x18U) which must be zero, and + // 2. the msb of the 2-bit payload type (mask 0x4U) which must be set. This bit set means it's voice or voice+data. + // An masked result of 0x4U means the payload contains Codec2 voice data and there is no encryption. { // Make the M17 header CM17Packet m17(Buffer.data()); @@ -514,63 +413,3 @@ void CM17Protocol::EncodeM17Packet(SM17Frame &frame, const CDvHeaderPacket &Head frame.streamid = Header.GetStreamId(); // no host<--->network byte swapping since we never do any math on this value // the CRC will be set in HandleQueue, after lich.dest is set } - -bool CM17Protocol::EncodeDvHeaderPacket(const CDvHeaderPacket &Header, CBuffer &Buffer) const -{ - (void)Header; - (void)Buffer; - return false; // M17 uses EncodeM17Packet -} - -bool CM17Protocol::EncodeDvFramePacket(const CDvFramePacket &Frame, CBuffer &Buffer) const -{ - (void)Frame; - (void)Buffer; - return false; // M17 uses EncodeM17Packet -} - -void CM17Protocol::HandleParrot(const CIp &Ip, const CBuffer &Buffer, bool isStream) -{ - std::string key = Ip.GetAddress(); - auto it = m_ParrotMap.find(key); - - if (it == m_ParrotMap.end()) - { - std::shared_ptr client = g_Reflector.GetClients()->FindClient(Ip, EProtocol::m17); - auto m17client = std::dynamic_pointer_cast(client); - g_Reflector.ReleaseClients(); - - if (m17client) - { - if (isStream) - { - // Extract frametype from SM17Frame - uint16_t ft = (Buffer.data()[12] << 8) | Buffer.data()[13]; - m_ParrotMap[key] = std::make_shared(m17client->GetCallsign(), m17client, ft, this); - } - else - { - // Extract frametype from SM17P (lich part starts at offset 4, but frametype is at offset 16) - uint16_t ft = (Buffer.data()[16] << 8) | Buffer.data()[17]; - m_ParrotMap[key] = std::make_shared(m17client->GetCallsign(), m17client, ft, this); - } - } - } - - it = m_ParrotMap.find(key); - if (it != m_ParrotMap.end() && it->second->GetState() == EParrotState::record) - { - if (isStream) - { - // streamId at offset 4, fn at 38 - uint16_t sid = (Buffer.data()[4] << 8) | Buffer.data()[5]; - uint16_t fn = (Buffer.data()[38] << 8) | Buffer.data()[39]; - it->second->Add(Buffer, sid, fn); - } - else - { - it->second->AddPacket(Buffer); - it->second->Play(); // Packet mode parrot plays back immediately - } - } -} diff --git a/reflector/NXDNProtocol.cpp b/reflector/NXDNProtocol.cpp index 7c7b832..58638d2 100644 --- a/reflector/NXDNProtocol.cpp +++ b/reflector/NXDNProtocol.cpp @@ -235,7 +235,7 @@ void CNXDNProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, // update last heard if ( g_Reflector.IsValidModule(rpt2.GetCSModule()) ) { - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, EProtocol::nxdn); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::nxdn); g_Reflector.ReleaseUsers(); } } diff --git a/reflector/P25Protocol.cpp b/reflector/P25Protocol.cpp index 919fc66..011eec8 100644 --- a/reflector/P25Protocol.cpp +++ b/reflector/P25Protocol.cpp @@ -219,7 +219,7 @@ void CP25Protocol::OnDvHeaderPacketIn(std::unique_ptr &Header, g_Reflector.ReleaseClients(); // update last heard - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, EProtocol::p25); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::p25); g_Reflector.ReleaseUsers(); } } diff --git a/reflector/Reflector.cpp b/reflector/Reflector.cpp index 5ffec10..442fb8b 100644 --- a/reflector/Reflector.cpp +++ b/reflector/Reflector.cpp @@ -276,6 +276,9 @@ void CReflector::CloseStream(std::shared_ptr stream) // notify //OnStreamClose(stream->GetUserCallsign()); + // dashboard event + GetUsers()->Closing(stream->GetUserCallsign(), GetStreamModule(stream), stream->GetOwnerClient()->GetProtocol()); + std::cout << "Closing stream of module " << GetStreamModule(stream) << std::endl; } diff --git a/reflector/URFProtocol.cpp b/reflector/URFProtocol.cpp index e9d7fcb..cdac493 100644 --- a/reflector/URFProtocol.cpp +++ b/reflector/URFProtocol.cpp @@ -411,7 +411,9 @@ void CURFProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, // release g_Reflector.ReleaseClients(); // update last heard - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, peer, EProtocol::urf); + CCallsign xlx = rpt2; + xlx.SetCSModule(Header->GetRpt2Module()); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, xlx, EProtocol::urf); g_Reflector.ReleaseUsers(); } } diff --git a/reflector/Users.cpp b/reflector/Users.cpp index 1284d7a..80b2faf 100644 --- a/reflector/Users.cpp +++ b/reflector/Users.cpp @@ -76,3 +76,14 @@ void CUsers::Hearing(const CCallsign &my, const CCallsign &rpt1, const CCallsign event["protocol"] = g_GateKeeper.ProtocolName(protocol); g_NNGPublisher.Publish(event); } + +void CUsers::Closing(const CCallsign &my, char module, EProtocol protocol) +{ + // dashboard event + nlohmann::json event; + event["type"] = "closing"; + event["my"] = my.GetCS(); + event["module"] = std::string(1, module); + event["protocol"] = g_GateKeeper.ProtocolName(protocol); + g_NNGPublisher.Publish(event); +} diff --git a/reflector/Users.h b/reflector/Users.h index 0873317..9638ebd 100644 --- a/reflector/Users.h +++ b/reflector/Users.h @@ -22,6 +22,7 @@ #include #include "User.h" +#include "Defines.h" class CUsers { @@ -49,6 +50,7 @@ public: // operation void Hearing(const CCallsign &, const CCallsign &, const CCallsign &, EProtocol protocol = EProtocol::none); void Hearing(const CCallsign &, const CCallsign &, const CCallsign &, const CCallsign &, EProtocol protocol); + void Closing(const CCallsign &, char module, EProtocol protocol); protected: // data diff --git a/reflector/YSFProtocol.cpp b/reflector/YSFProtocol.cpp index 420aa93..ed6e09a 100644 --- a/reflector/YSFProtocol.cpp +++ b/reflector/YSFProtocol.cpp @@ -42,7 +42,6 @@ bool CYsfProtocol::Initialize(const char *type, const EProtocol ptype, const uin { // config data m_AutolinkModule = g_Configure.GetAutolinkModule(g_Keys.ysf.autolinkmod); - m_EnableDGID = g_Configure.GetBoolean(g_Keys.ysf.enabledgid); m_RegistrationId = g_Configure.GetUnsigned(g_Keys.ysf.ysfreflectordb.id); m_RegistrationName.assign(g_Configure.GetString(g_Keys.ysf.ysfreflectordb.name)); m_RegistrationDesc.assign(g_Configure.GetString(g_Keys.ysf.ysfreflectordb.description)); @@ -131,7 +130,7 @@ void CYsfProtocol::Task(void) if ( g_GateKeeper.MayTransmit(Header->GetMyCallsign(), Ip, EProtocol::ysf, Header->GetRpt2Module()) ) { // handle it - OnDvHeaderPacketIn(Header, Ip, Fich.getSQ()); + OnDvHeaderPacketIn(Header, Ip); //OnDvFramePacketIn(Frames[0], &Ip); //OnDvFramePacketIn(Frames[1], &Ip); } @@ -253,7 +252,7 @@ void CYsfProtocol::Task(void) //////////////////////////////////////////////////////////////////////////////////////// // streams helpers -void CYsfProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, const CIp &Ip, uint8_t dgid) +void CYsfProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, const CIp &Ip) { // find the stream auto stream = GetStream(Header->GetStreamId()); @@ -276,16 +275,6 @@ void CYsfProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, { // get client callsign rpt1 = client->GetCallsign(); - - // module selection by DGID - if (m_EnableDGID && dgid >= 10 && dgid <= 35) { - char newModule = 'A' + (dgid - 10); - if (client->GetReflectorModule() != newModule) { - std::cout << "YSF: DGID module switch for " << client->GetCallsign() << " from " << client->GetReflectorModule() << " to " << newModule << std::endl; - client->SetReflectorModule(newModule); - } - } - // get module it's linked to auto m = client->GetReflectorModule(); Header->SetRpt2Module(m); @@ -304,7 +293,7 @@ void CYsfProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, // update last heard if ( g_Reflector.IsValidModule(rpt2.GetCSModule()) ) { - g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, EProtocol::ysf); + g_Reflector.GetUsers()->Hearing(my, rpt1, rpt2, rpt2, EProtocol::ysf); g_Reflector.ReleaseUsers(); } } @@ -548,7 +537,7 @@ bool CYsfProtocol::IsValidDvFramePacket(const CIp &Ip, const CYSFFICH &Fich, con if ( g_GateKeeper.MayTransmit(header->GetMyCallsign(), Ip, EProtocol::ysf, header->GetRpt2Module()) ) { - OnDvHeaderPacketIn(header, Ip, Fich.getSQ()); + OnDvHeaderPacketIn(header, Ip); } }