From 45cf750bdd1f8682f5bd10841943c5ca9d0ebf9b Mon Sep 17 00:00:00 2001 From: Tom Early Date: Fri, 17 May 2024 13:30:16 -0700 Subject: [PATCH] more tweaks for TC socket --- reflector/TCSocket.cpp | 92 +++++++++++++++++++++++------------------- reflector/TCSocket.h | 4 +- 2 files changed, 53 insertions(+), 43 deletions(-) diff --git a/reflector/TCSocket.cpp b/reflector/TCSocket.cpp index 3297fdd..3699dae 100644 --- a/reflector/TCSocket.cpp +++ b/reflector/TCSocket.cpp @@ -22,7 +22,6 @@ #include #include -#include "IP.h" #include "TCSocket.h" void CTCSocket::Close() @@ -90,10 +89,9 @@ char CTCSocket::GetMod(int fd) const bool CTCServer::any_are_closed() { - const auto n = m_Modules.size(); - for (unsigned i=0; i m_Pfd[i].fd) + if (0 > fds.fd) return true; } return false; @@ -138,8 +136,16 @@ bool CTCSocket::receive(int fd, STCPacket &packet) Close(fd); return true; } + + if (0 == n) + { + std::cerr << "recv() returned zero bytes from mdule '" << GetMod(fd) << "'"; + Close(fd); + return true; + } + if (n != sizeof(STCPacket)) - std::cout << "WARNING: Receive only read " << n << " bytes of the transcoder packet!" << std::endl; + std::cout << "WARNING: Receive only read " << n << " bytes of the transcoder packet from module '" << GetMod(fd) << "'" << std::endl; return false; } @@ -149,31 +155,37 @@ bool CTCServer::Receive(char module, STCPacket &packet, int ms) const auto pos = m_Modules.find(module); if (pos == std::string::npos) { - std::cerr << "Can't Recevieve on unconfigured module '" << module << "'" << std::endl; + std::cerr << "Can't receive on unconfigured module '" << module << "'" << std::endl; + return true; + } + + auto pfds = &m_Pfd[pos]; + if (pfds->fd < 0) + { + std::cerr << "Can't receive on module '" << module << "' because it's closed" << std::endl; return true; } - auto fds = &m_Pfd[pos]; - auto rv = poll(fds, 1, ms); + auto rv = poll(pfds, 1, ms); if (rv < 0) { perror("Recieve poll"); - Close(fds->fd); + Close(pfds->fd); return true; } if (0 == rv) return false; // timeout - if (fds->revents & POLLIN) + if (pfds->revents & POLLIN) { - return receive(fds->fd, packet); + return receive(pfds->fd, packet); } - if (fds->revents & POLLERR || fds->revents & POLLHUP) + if (pfds->revents & POLLERR || pfds->revents & POLLHUP) { - std::cerr << ((fds->revents & POLLERR) ? "POLLERR" : "POLLHUP") << " received on fd " << fds->fd << ", closing socket" << std::endl; - Close(fds->fd); + std::cerr << ((pfds->revents & POLLERR) ? "POLLERR" : "POLLHUP") << " received on module " << module << "', closing socket" << std::endl; + Close(pfds->fd); return true; } @@ -183,16 +195,25 @@ bool CTCServer::Receive(char module, STCPacket &packet, int ms) bool CTCServer::Open(const std::string &address, const std::string &modules, uint16_t port) { m_Modules.assign(modules); - m_Pfd.resize(m_Modules.size()+1); + + m_Ip = CIp(address.c_str(), AF_UNSPEC, SOCK_STREAM, port); + + m_Pfd.resize(m_Modules.size()); for (auto &pf : m_Pfd) { pf.fd = -1; pf.events = POLLIN; + pf.revents = 0; } - CIp ip(address.c_str(), AF_UNSPEC, SOCK_STREAM, port); + std::cout << "Waiting for " << m_Modules.size() << " transcoder connection(s) on " << m_Ip << "..." << std::endl; - int fd = socket(ip.GetFamily(), SOCK_STREAM, 0); + return Accept(); +} + +bool CTCServer::Accept() +{ + auto fd = socket(m_Ip.GetFamily(), SOCK_STREAM, 0); if (fd < 0) { perror("Open socket"); @@ -200,7 +221,7 @@ bool CTCServer::Open(const std::string &address, const std::string &modules, uin } int yes = 1; - int rv = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); + auto rv = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); if (rv < 0) { close(fd); @@ -208,7 +229,7 @@ bool CTCServer::Open(const std::string &address, const std::string &modules, uin return true; } - rv = bind(fd, ip.GetCPointer(), ip.GetSize()); + rv = bind(fd, m_Ip.GetCPointer(), m_Ip.GetSize()); if (rv < 0) { close(fd); @@ -225,37 +246,19 @@ bool CTCServer::Open(const std::string &address, const std::string &modules, uin return true; } - m_Pfd.back().fd = fd; - - std::cout << "Waiting for " << m_Modules.size() << " transcoder connection(s) on " << ip << "..." << std::endl; - return Accept(); -} - -bool CTCServer::Accept() -{ while (any_are_closed()) { if (acceptone()) return true; } + close(fd); + return false; } bool CTCServer::acceptone() { - auto rv = poll(&m_Pfd.back(), 1, 10); - if (rv < 0) - { - perror("Accept poll"); - return true; - } - - if (0 == rv) - return false; // timeout - - // rv has to be 1, so, here comes a connect request - CIp their_addr; // connector's address information socklen_t sin_size = sizeof(struct sockaddr_storage); @@ -268,7 +271,7 @@ bool CTCServer::acceptone() } char mod; - rv = recv(newfd, &mod, 1, 0); // block to get the identification byte + int rv = recv(newfd, &mod, 1, MSG_WAITALL); // block to get the identification byte if (rv != 1) { if (rv < 0) @@ -403,8 +406,10 @@ bool CTCClient::ReConnect() bool CTCClient::Receive(std::queue> &queue, int ms) { - const auto n = m_Pfd.size(); - auto rv = poll(m_Pfd.data(), n, ms); + for (auto &pfd : m_Pfd) + pfd.revents = 0; + + auto rv = poll(m_Pfd.data(), m_Pfd.size(), ms); if (rv < 0) { @@ -418,6 +423,9 @@ bool CTCClient::Receive(std::queue> &queue, int ms) bool some_closed = false; for (auto &pfd : m_Pfd) { + if (pfd.fd < 0) + continue; + if (pfd.revents & POLLIN) { auto p_tcpack = std::make_unique(); diff --git a/reflector/TCSocket.h b/reflector/TCSocket.h index cf1f269..cdbd9d1 100644 --- a/reflector/TCSocket.h +++ b/reflector/TCSocket.h @@ -24,6 +24,7 @@ #include #include +#include "IP.h" #include "TCPacketDef.h" class CTCSocket @@ -59,6 +60,7 @@ public: bool Accept(); private: + CIp m_Ip; bool any_are_closed(); bool acceptone(); }; @@ -73,7 +75,7 @@ public: bool ReConnect(); private: - bool Connect(char module); std::string m_Address; uint16_t m_Port; + bool Connect(char module); };