diff --git a/reflector/CodecStream.cpp b/reflector/CodecStream.cpp index 0df5425..c3693d1 100644 --- a/reflector/CodecStream.cpp +++ b/reflector/CodecStream.cpp @@ -127,7 +127,7 @@ void CCodecStream::Task(void) } STCPacket pack; - if (g_TCServer.Receive(m_CSModule, &pack, 8)) + if (! g_TCServer.Receive(m_CSModule, pack, 8)) { // update statistics double rt = pack.rt_timer.time(); // the round-trip time diff --git a/reflector/TCSocket.cpp b/reflector/TCSocket.cpp index d968a90..ef62f91 100644 --- a/reflector/TCSocket.cpp +++ b/reflector/TCSocket.cpp @@ -88,7 +88,7 @@ char CTCSocket::GetMod(int fd) const return '?'; } -bool CTCSocket::any_are_closed() +bool CTCServer::any_are_closed() { const auto n = m_Modules.size(); for (unsigned i=0; ifd); - return false; + return true; } if (0 == rv) - return false; + return false; // timeout if (fds->revents & POLLIN) { - auto data = (unsigned char *)packet; - auto n = recv(fds->fd, data, sizeof(STCPacket), MSG_WAITALL); - if (n < 0) - { - perror("Receive recv"); - Close(fds->fd); - return false; - } - if (n != sizeof(STCPacket)) - std::cout << "WARNING: Receive only read " << n << " bytes of the transcoder packet!" << std::endl; + return receive(fds->fd, packet); } if (fds->revents & POLLERR || fds->revents & POLLHUP) { std::cerr << ((fds->revents & POLLERR) ? "POLLERR" : "POLLHUP") << " received on fd " << fds->fd << ", closing socket" << std::endl; Close(fds->fd); - return false; + return true; } - return true; + return false; } bool CTCServer::Open(const std::string &address, const std::string &modules, uint16_t port) @@ -220,21 +225,24 @@ bool CTCServer::Open(const std::string &address, const std::string &modules, uin return true; } - auto n = m_Modules.size(); - std::cout << "Waiting for " << n << " transcoder connection(s)..." << std::endl; m_Pfd.back().fd = fd; + std::cout << "Waiting for " << m_Modules.size() << " transcoder connection(s) on fd " << fd << "..." << std::endl; + return Accept(); +} + +bool CTCServer::Accept() +{ while (any_are_closed()) { - if (Accept()) + if (AcceptOne()) return true; } - return false; } -bool CTCServer::Accept() +bool CTCServer::AcceptOne() { auto rv = poll(&m_Pfd.back(), 1, 10); if (rv < 0) @@ -243,8 +251,8 @@ bool CTCServer::Accept() return true; } - if (0 == rv) // we timed out waiting for something - return false; + if (0 == rv) + return false; // timeout // rv has to be 1, so, here comes a connect request @@ -260,7 +268,7 @@ bool CTCServer::Accept() } char mod; - rv = recv(newfd, &mod, 1, 0); // retrieve the identification byte + rv = recv(newfd, &mod, 1, 0); // block to get the identification byte if (rv != 1) { if (rv < 0) @@ -377,62 +385,54 @@ bool CTCClient::Connect(char module) return false; } -bool CTCClient::CheckConnections() +bool CTCClient::ReConnect() { + bool rv = false; for (char m : m_Modules) { if (-1 == GetFD(m)) { if (Connect(m)) { - return true; + rv = true; } } } - return false; + return rv; } -// returns true if data is returned. -// queue should be initially empty! bool CTCClient::Receive(std::queue> &queue, int ms) { - auto n = m_Pfd.size(); + const auto n = m_Pfd.size(); auto rv = poll(m_Pfd.data(), n, ms); - if (0 == rv) - return false; if (rv < 0) { - perror("CTCClient::Receive poll"); + perror("Receive poll"); return false; } + if (0 == rv) + return false; + for (auto &pfd : m_Pfd) { if (pfd.revents & POLLIN) { - auto packet = std::make_unique(); - auto data = (unsigned char *)packet.get(); - auto n = recv(pfd.fd, data, sizeof(STCPacket), MSG_WAITALL); - - if (n < 0) + auto p_tcpack = std::make_unique(); + if (receive(pfd.fd, *p_tcpack)) { - perror("Receive recv"); - Close(pfd.fd); - packet.reset(); + p_tcpack.reset(); } - else if (n > 0) + else { - if (n != sizeof(STCPacket)) - std::cout << "WARNING: Receive only read " << n << " bytes of the transcoder packet!" << std::endl; - - queue.push(std::move(packet)); + queue.push(std::move(p_tcpack)); } } if (pfd.revents & POLLERR || pfd.revents & POLLHUP) { - std::cerr << ((pfd.revents & POLLERR) ? "POLLERR" : "POLLHUP") << " received on fd " << pfd.fd << ", closing socket" << std::endl; + std::cerr << "IO ERROR on Receive module " << GetMod(pfd.fd) << std::endl; Close(pfd.fd); } } diff --git a/reflector/TCSocket.h b/reflector/TCSocket.h index 57d784c..47b98bd 100644 --- a/reflector/TCSocket.h +++ b/reflector/TCSocket.h @@ -35,15 +35,15 @@ public: void Close(); // close all open sockets void Close(char module); // close a specific module void Close(int fd); // close a specific file descriptor + bool receive(int fd, STCPacket &packet); - // most bool functions return true on failure + // All bool functions return true if there was an error bool Send(const STCPacket *packet); int GetFD(char module) const; // can return -1! char GetMod(int fd) const; protected: - bool any_are_closed(); std::vector m_Pfd; std::string m_Modules; }; @@ -54,8 +54,12 @@ public: CTCServer() : CTCSocket() {} ~CTCServer() {} bool Open(const std::string &address, const std::string &modules, uint16_t port); + bool Receive(char module, STCPacket &packet, int ms); bool Accept(); - bool Receive(char module, STCPacket *packet, int ms); + +private: + bool any_are_closed(); + bool AcceptOne(); }; class CTCClient : public CTCSocket @@ -64,10 +68,11 @@ public: CTCClient() : CTCSocket(), m_Port(0) {} ~CTCClient() {} bool Initialize(const std::string &address, const std::string &modules, uint16_t port); - bool Connect(char module); - bool CheckConnections(); bool Receive(std::queue> &queue, int ms); + bool ReConnect(); + private: + bool Connect(char module); std::string m_Address; uint16_t m_Port; };