From 3c319ec93d3a7e51a985646932af079b63714c6f Mon Sep 17 00:00:00 2001 From: Dave Behnke <916775+dbehnke@users.noreply.github.com> Date: Fri, 26 Dec 2025 23:56:07 -0500 Subject: [PATCH] Implement NNG support for transcoder link - Replace TCP sockets with NNG pair protocol - Support IPC connections via file paths - Add thread-safe packet queues - Fix YSFProtocol header signature mismatch --- reflector/TCSocket.cpp | 598 ++++++++++---------------------------- reflector/TCSocket.h | 95 +++--- reflector/YSFProtocol.cpp | 2 +- 3 files changed, 210 insertions(+), 485 deletions(-) diff --git a/reflector/TCSocket.cpp b/reflector/TCSocket.cpp index 35b1d67..1ea498b 100644 --- a/reflector/TCSocket.cpp +++ b/reflector/TCSocket.cpp @@ -1,537 +1,249 @@ -// urfd -- The universal reflector -// Copyright © 2024 Thomas A. Early N7TAE -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - #include #include #include #include -#include -#include -#include -#include +#include +#include #include "TCSocket.h" -void CTCSocket::Close() +CTCSocket::CTCSocket() : m_Running(false), m_Connected(false) { - for (auto &item : m_Pfd) - { - if (item.fd >= 0) - { - Close(item.fd); - } - } - m_Pfd.clear(); + m_Sock.id = 0; } -void CTCSocket::Close(char mod) +CTCSocket::~CTCSocket() { - auto pos = m_Modules.find(mod); - if (std::string::npos == pos) - { - std::cerr << "Could not find module '" << mod << "'" << std::endl; - return; - } - if (m_Pfd[pos].fd < 0) - { - std::cerr << "Close(" << mod << ") is already closed" << std::endl; - return; - } - Close(m_Pfd[pos].fd); - m_Pfd[pos].fd = -1; + Close(); } -void CTCSocket::Close(int fd) +void CTCSocket::Close() { - if (fd < 0) - { - return; - } - for (auto &p : m_Pfd) + m_Running = false; + if (m_Thread.joinable()) + m_Thread.join(); + + if (m_Sock.id != 0) { - if (fd == p.fd) - { - if (shutdown(p.fd, SHUT_RDWR)) - { - perror("shutdown"); - } - else - { - if (close(p.fd)) - { - std::cerr << "Error while closing " << fd << ": "; - perror("close"); - } - else - p.fd = -1; - } - return; - } + nng_close(m_Sock); + m_Sock.id = 0; } - std::cerr << "Could not find a file descriptor with a value of " << fd << std::endl; + m_Connected = false; } -int CTCSocket::GetFD(char module) const +void CTCSocket::Close(char module) { - auto pos = m_Modules.find(module); - if (std::string::npos == pos) - return -1; - return m_Pfd[pos].fd; + // In multiplexed mode, we cannot close a single module's connection independently + // without closing the whole pipe. So this is a no-op or full close. + // For now, no-op to allow other modules to survive transient errors. + // std::cerr << "Close(" << module << ") ignored in NNG mode" << std::endl; } -char CTCSocket::GetMod(int fd) const +bool CTCSocket::Send(const STCPacket *packet) { - for (unsigned i=0; i fds.fd) - return true; - } - return false; + return m_Connected; } -bool CTCSocket::Send(const STCPacket *packet) +int CTCSocket::GetFD(char module) const { - auto pos = m_Modules.find(packet->module); - if (pos == std::string::npos) + // Legacy helper for checking connection state + // CodecStream expects < 0 on failure + return m_Connected ? 1 : -1; +} + +void CTCSocket::Dispatcher() +{ + while (m_Running) { - if(packet->codec_in == ECodecType::ping) - { - pos = 0; // There is at least one transcoding module, use it to send the ping - } - else - { - std::cerr << "Can't Send() this packet to unconfigured module '" << packet->module << "'" << std::endl; - return true; - } - } - unsigned count = 0; - auto data = (const unsigned char *)packet; - do { - auto n = send(m_Pfd[pos].fd, data+count, sizeof(STCPacket)-count, 0); - if (n <= 0) + STCPacket *buf = nullptr; + size_t sz = 0; + // 100ms timeout to check m_Running + int rv = nng_recv(m_Sock, &buf, &sz, NNG_FLAG_ALLOC); + + if (rv == 0) { - if (0 == n) + if (sz == sizeof(STCPacket)) { - std::cerr << "CTCSocket::Send: socket on module '" << packet->module << "' has been closed!" << std::endl; + STCPacket pkt; + memcpy(&pkt, buf, sizeof(STCPacket)); + nng_free(buf, sz); + + if (m_ClientQueue) + { + // Client mode: everything goes to one queue + m_ClientQueue->Push(pkt); + } + else + { + // Server mode: route by module + auto it = m_Queues.find(pkt.module); + if (it != m_Queues.end()) + { + it->second->Push(pkt); + } + else + { + // Unknown module or not configured? + // In urfd, we might want to auto-create logic or drop? + // For now drop, as configured modules are set in Open + } + } } else { - perror("CTCSocket::Send"); + nng_free(buf, sz); + std::cerr << "Received packet of incorrect size: " << sz << std::endl; } - Close(packet->module); - return true; } - count += n; - } while (count < sizeof(STCPacket)); - return false; -} - -bool CTCSocket::receive(int fd, STCPacket *packet) -{ - auto n = recv(fd, packet, sizeof(STCPacket), MSG_WAITALL); - if (n < 0) - { - perror("Receive recv"); - Close(fd); - return true; - } - - if (0 == n) - { - return true; + else if (rv != NNG_ETIMEDOUT) + { + // Fatal error? + // std::cerr << "NNG Recv Error: " << nng_strerror(rv) << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } } - - if (n != sizeof(STCPacket)) - std::cout << "receive() only read " << n << " bytes of the transcoder packet from module '" << GetMod(fd) << "'" << std::endl; - return false; } -// returns true if there is data to return -bool CTCServer::Receive(char module, STCPacket *packet, int ms) -{ - bool rv = false; - const auto pos = m_Modules.find(module); - if (pos == std::string::npos) - { - std::cerr << "Can't receive on unconfigured module '" << module << "'" << std::endl; - return rv; - } +// ---------------- SERVER ---------------- - auto pfds = &m_Pfd[pos]; - if (pfds->fd < 0) - { - std::this_thread::sleep_for(std::chrono::milliseconds(ms)); - return rv; - } - - auto n = poll(pfds, 1, ms); - if (n < 0) - { - perror("Recieve poll"); - Close(pfds->fd); - return rv; - } - - if (0 == n) - return rv; // timeout - - if (pfds->revents & POLLIN) - { - rv = receive(pfds->fd, packet); - } - - // It's possible that even if we read the data, the socket can have an error after the read... - // So we'll check... - if (pfds->revents & POLLERR || pfds->revents & POLLHUP) - { - if (pfds->revents & POLLERR) - std::cerr << "POLLERR received on module '" << module << "', closing socket" << std::endl; - if (pfds->revents & POLLHUP) - std::cerr << "POLLHUP received on module '" << module << "', closing socket" << std::endl; - Close(pfds->fd); - } - if (pfds->revents & POLLNVAL) - { - std::cerr << "POLLNVAL received on module " << module << "'" << std::endl; - } - - if (rv) - Close(pfds->fd); - - if(packet->codec_in == ECodecType::ping) - return false; - else - return !rv; -} +// ---------------- SERVER ---------------- bool CTCServer::Open(const std::string &address, const std::string &modules, uint16_t port) { - m_Modules.assign(modules); - - m_Ip = CIp(address.c_str(), AF_UNSPEC, SOCK_STREAM, port); - - m_Pfd.resize(m_Modules.size()); - for (auto &pf : m_Pfd) + m_Modules = modules; + // Initialize queues for configured modules + for (char c : m_Modules) { - pf.fd = -1; - pf.events = POLLIN; - pf.revents = 0; + m_Queues[c] = std::make_shared(); } - return Accept(); -} - -bool CTCServer::Accept() -{ - auto fd = socket(m_Ip.GetFamily(), SOCK_STREAM, 0); - if (fd < 0) + int rv; + if ((rv = nng_pair1_open(&m_Sock)) != 0) { - perror("Open socket"); + std::cerr << "nng_pair1_open failed: " << nng_strerror(rv) << std::endl; return true; } - int yes = 1; - auto rv = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); - if (rv < 0) - { - close(fd); - perror("Open setsockopt"); - return true; - } + // Set receive timeout to 100ms for dispatcher loop + nng_duration timeout = 100; + nng_socket_set_ms(m_Sock, NNG_OPT_RECVTIMEO, timeout); - rv = bind(fd, m_Ip.GetCPointer(), m_Ip.GetSize()); - if (rv < 0) - { - close(fd); - perror("Open bind"); - return true; + std::stringstream url; + if (address.find("ipc://") == 0) { + url << address; + } else if (address.find("/") == 0 || address.find("./") == 0 || address.find("../") == 0) { + url << "ipc://" << address; + } else { + url << "tcp://" << address << ":" << port; } - rv = listen(fd, 3); - if (rv < 0) + if ((rv = nng_listen(m_Sock, url.str().c_str(), nullptr, 0)) != 0) { - perror("Open listen"); - close(fd); - Close(); + std::cerr << "nng_listen failed: " << nng_strerror(rv) << " URL: " << url.str() << std::endl; return true; } - std::string wmod; - for (const char c : m_Modules) - { - if (GetFD(c) < 0) - wmod.append(1, c); - } - - std::cout << "Checking " << m_Ip << " for transcoder connection"; - if (wmod.size() > 1) - { - std::cout << "s for modules "; - } - else - { - std::cout << " for module "; - } - std::cout << wmod << "..." << std::endl; - - struct pollfd pfd; - pfd.fd = fd; - pfd.events = POLLIN; - - while (AnyAreClosed()) - { - auto p = poll(&pfd, 1, 100); // 100ms timeout - if (p < 0) - { - perror("Accept poll"); - close(fd); - Close(); - return true; - } - if (0 == p) - break; // No more pending connections for now - - if (acceptone(fd)) - { - close(fd); - Close(); - return true; - } - } - - close(fd); + m_Running = true; + m_Connected = true; + m_Thread = std::thread([this] { Dispatcher(); }); return false; } -bool CTCServer::acceptone(int fd) +bool CTCServer::Receive(char module, STCPacket *packet, int ms) { - CIp their_addr; // connector's address information + auto it = m_Queues.find(module); + if (it == m_Queues.end()) return false; - socklen_t sin_size = sizeof(struct sockaddr_storage); - - auto newfd = accept(fd, their_addr.GetPointer(), &sin_size); - if (newfd < 0) - { - perror("Accept accept"); - return true; - } - - char mod; - int rv = recv(newfd, &mod, 1, MSG_WAITALL); // block to get the identification byte - if (rv != 1) - { - if (rv < 0) - perror("Accept recv"); - else - std::cerr << "recv got no identification byte!" << std::endl; - close(newfd); - return true; - } - - const auto pos = m_Modules.find(mod); - if (std::string::npos == pos) - { - std::cerr << "New connection for module '" << mod << "', but it's not configured!" << std::endl; - std::cerr << "The transcoded modules need to be configured identically for both urfd and tcd." << std::endl; - close(newfd); - return true; - } - - std::cout << "File descriptor " << newfd << " opened TCP port for module '" << mod << "' on " << their_addr << std::endl; - - m_Pfd[pos].fd = newfd; - - return false; + return it->second->Pop(*packet, ms); } -bool CTCClient::Open(const std::string &address, const std::string &modules, uint16_t port) +bool CTCServer::AnyAreClosed() const { - m_Address.assign(address); - m_Modules.assign(modules); - m_Port = port; - - m_Pfd.resize(m_Modules.size()); - for (auto &pf : m_Pfd) - { - pf.fd = -1; - pf.events = POLLIN; - } - - std::cout << "Connecting to the TCP server..." << std::endl; + // If the dispatcher is running, we assume open. + // NNG handles reconnections. + return !m_Running; +} - for (char c : modules) - { - if (Connect(c)) - { - return true; - } - } +bool CTCServer::Accept() +{ + // No manual accept needed with NNG return false; } -bool CTCClient::Connect(char module) + +// ---------------- CLIENT ---------------- + +bool CTCClient::Open(const std::string &address, const std::string &modules, uint16_t port) { - const auto pos = m_Modules.find(module); - if (pos == std::string::npos) - { - std::cerr << "CTCClient::Connect: could not find module '" << module << "' in configured modules!" << std::endl; - return true; - } - CIp ip(m_Address.c_str(), AF_UNSPEC, SOCK_STREAM, m_Port); + m_Modules = modules; + m_ClientQueue = std::make_shared(); - auto fd = socket(ip.GetFamily(), SOCK_STREAM, 0); - if (fd < 0) + int rv; + if ((rv = nng_pair1_open(&m_Sock)) != 0) { - std::cerr << "Could not open socket for module '" << module << "'" << std::endl; - perror("TC client socket"); + std::cerr << "nng_pair1_open failed: " << nng_strerror(rv) << std::endl; return true; } - int yes = 1; - if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int))) - { - std::cerr << "Moudule " << module << " error:"; - perror("setsockopt"); - close(fd); - return true; - } + // Set receive timeout for dispatcher + nng_duration timeout = 100; + nng_socket_set_ms(m_Sock, NNG_OPT_RECVTIMEO, timeout); - unsigned count = 0; - while (connect(fd, ip.GetCPointer(), ip.GetSize())) - { - if (ECONNREFUSED == errno) - { - if (0 == ++count % 100) std::cout << "Connection refused! Restart the reflector." << std::endl; - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - else - { - std::cerr << "Module " << module << " error: "; - perror("connect"); - close(fd); - return true; - } + std::stringstream url; + if (address.find("ipc://") == 0) { + url << address; + } else if (address.find("/") == 0 || address.find("./") == 0 || address.find("../") == 0) { + url << "ipc://" << address; + } else { + url << "tcp://" << address << ":" << port; } - int sent = send(fd, &module, 1, 0); // send the identification byte - if (sent < 0) - { - std::cerr << "Error sending ID byte to module '" << module << "':" << std::endl; - perror("send"); - close(fd); - return true; - } - else if (0 == sent) + // Client dials asynchronously so it can retry in background + if ((rv = nng_dial(m_Sock, url.str().c_str(), nullptr, NNG_FLAG_NONBLOCK)) != 0) { - std::cerr << "Could not set ID byte to module '" << module << "'" << std::endl; - close(fd); + std::cerr << "nng_dial failed: " << nng_strerror(rv) << " URL: " << url.str() << std::endl; return true; } - std::cout << "File descriptor " << fd << " on " << ip << " opened for module '" << module << "'" << std::endl; - - m_Pfd[pos].fd = fd; + m_Running = true; + m_Connected = true; + m_Thread = std::thread([this] { Dispatcher(); }); + // Give it a moment to connect? Not strictly necessary. + return false; } -void CTCClient::ReConnect() // and sometimes ping +void CTCClient::Receive(std::queue> &queue, int ms) { - static std::chrono::system_clock::time_point start = std::chrono::system_clock::now(); - auto now = std::chrono::system_clock::now(); - std::chrono::duration secs = now - start; - - for (char m : m_Modules) - { - if (0 > GetFD(m)) - { - std::cout << "Reconnecting module " << m << "..." << std::endl; - if (Connect(m)) - { - raise(SIGINT); - } - } - } - - if(secs.count() > 5.0) - { - STCPacket ping; - ping.codec_in = ECodecType::ping; - Send(&ping); - start = now; - } + // Wait up to ms for the first packet + STCPacket p; + if (m_ClientQueue->Pop(p, ms)) + { + queue.push(std::make_unique(p)); + // Drain the rest without waiting + while (m_ClientQueue->Pop(p, 0)) + { + queue.push(std::make_unique(p)); + } + } } -void CTCClient::Receive(std::queue> &queue, int ms) +void CTCClient::ReConnect() { - for (auto &pfd : m_Pfd) - pfd.revents = 0; - - auto rv = poll(m_Pfd.data(), m_Pfd.size(), ms); - - if (rv < 0) - { - perror("Receive poll"); - return; - } - - if (0 == rv) - return; - - for (auto &pfd : m_Pfd) - { - if (pfd.fd < 0) - continue; - - if (pfd.revents & POLLIN) - { - auto p_tcpack = std::make_unique(); - if (receive(pfd.fd, p_tcpack.get())) - { - p_tcpack.reset(); - Close(pfd.fd); - } - else - { - queue.push(std::move(p_tcpack)); - } - } - - if (pfd.revents & POLLERR || pfd.revents & POLLHUP) - { - std::cerr << "IO ERROR on Receive module " << GetMod(pfd.fd) << std::endl; - Close(pfd.fd); - } - if (pfd.revents & POLLNVAL) - { - std::cerr << "POLLNVAL received on fd " << pfd.fd << ", resetting to -1" << std::endl; - pfd.fd = -1; - } - } + // NNG handles reconnection automatically } diff --git a/reflector/TCSocket.h b/reflector/TCSocket.h index a22dc5b..68c7b71 100644 --- a/reflector/TCSocket.h +++ b/reflector/TCSocket.h @@ -1,19 +1,3 @@ -// urfd -- The universal reflector -// Copyright © 2024 Thomas A. Early N7TAE -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - #pragma once #include @@ -22,32 +6,71 @@ #include #include #include -#include +#include +#include +#include +#include +#include +#include -#include "IP.h" #include "TCPacketDef.h" +// Specialized thread-safe queue for STCPacket by value, avoiding template conflict +class CTCPacketQueue { + std::queue q; + std::mutex m; + std::condition_variable cv; +public: + void Push(const STCPacket& p) { + std::lock_guard l(m); + q.push(p); + cv.notify_one(); + } + bool Pop(STCPacket& p, int ms) { + std::unique_lock l(m); + // Wait up to ms if queue is empty + if (q.empty()) { + if (ms <= 0) return false; + // wait_for returns false if timeout, true if predicate is true + if (!cv.wait_for(l, std::chrono::milliseconds(ms), [this]{ return !q.empty(); })) { + return false; // timeout + } + } + + p = q.front(); + q.pop(); + return true; + } +}; + class CTCSocket { public: - CTCSocket() {} - virtual ~CTCSocket() { Close(); } + CTCSocket(); + virtual ~CTCSocket(); virtual bool Open(const std::string &address, const std::string &modules, uint16_t port) = 0; - void Close(); // close all open sockets - void Close(char module); // close a specific module - void Close(int fd); // close a specific file descriptor + void Close(); + void Close(char module); - // All bool functions, except Server Receive, return true if there was an error bool Send(const STCPacket *packet); - int GetFD(char module) const; // can return -1! - char GetMod(int fd) const; + bool IsConnected(char module) const; + int GetFD(char module) const; // Legacy compat: returns 1 if connected, -1 if not protected: - bool receive(int fd, STCPacket *packet); - std::vector m_Pfd; + nng_socket m_Sock; + std::thread m_Thread; + std::atomic m_Running; + std::atomic m_Connected; std::string m_Modules; + + // Per-module input queues + std::map> m_Queues; + // Client queue (receives all) + std::shared_ptr m_ClientQueue; + + void Dispatcher(); }; class CTCServer : public CTCSocket @@ -56,27 +79,17 @@ public: CTCServer() : CTCSocket() {} ~CTCServer() {} bool Open(const std::string &address, const std::string &modules, uint16_t port); - // Returns true if there is data bool Receive(char module, STCPacket *packet, int ms); bool AnyAreClosed() const; - bool Accept(); - -private: - CIp m_Ip; - bool acceptone(int fd); + bool Accept(); // Checks NNG state }; class CTCClient : public CTCSocket { public: - CTCClient() : CTCSocket(), m_Port(0) {} + CTCClient() : CTCSocket() {} ~CTCClient() {} bool Open(const std::string &address, const std::string &modules, uint16_t port); void Receive(std::queue> &queue, int ms); - void ReConnect(); - -private: - std::string m_Address; - uint16_t m_Port; - bool Connect(char module); + void ReConnect(); // No-op in NNG }; diff --git a/reflector/YSFProtocol.cpp b/reflector/YSFProtocol.cpp index b6dcceb..094c958 100644 --- a/reflector/YSFProtocol.cpp +++ b/reflector/YSFProtocol.cpp @@ -252,7 +252,7 @@ void CYsfProtocol::Task(void) //////////////////////////////////////////////////////////////////////////////////////// // streams helpers -void CYsfProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, const CIp &Ip) +void CYsfProtocol::OnDvHeaderPacketIn(std::unique_ptr &Header, const CIp &Ip, uint8_t) { // find the stream auto stream = GetStream(Header->GetStreamId());