diff --git a/reflector/CodecStream.cpp b/reflector/CodecStream.cpp index 4760b0f..0df5425 100644 --- a/reflector/CodecStream.cpp +++ b/reflector/CodecStream.cpp @@ -122,7 +122,7 @@ void CCodecStream::Task(void) exit(1); } // Either Accept timed out, or it's possile that other Transcoder ports were instead reopened - // So we'll check to see if the one for this module is okay now + // So we'll check to see if this module is now open fd = g_TCServer.GetFD(m_CSModule); } diff --git a/reflector/TCSocket.cpp b/reflector/TCSocket.cpp index 61fd109..b4c3995 100644 --- a/reflector/TCSocket.cpp +++ b/reflector/TCSocket.cpp @@ -31,7 +31,7 @@ void CTCSocket::Close() { if (item.fd >= 0) { - close(item.fd); + Close(item.fd); } } m_Pfd.clear(); @@ -45,7 +45,7 @@ void CTCSocket::Close(char mod) std::cerr << "Could not find module '" << mod << "'" << std::endl; return; } - close(m_Pfd[pos].fd); + Close(m_Pfd[pos].fd); m_Pfd[pos].fd = -1; } @@ -55,8 +55,13 @@ void CTCSocket::Close(int fd) { if (fd == p.fd) { - close(p.fd); - p.fd = -1; + if (close(p.fd)) + { + std::cerr << "Error while closing " << fd << ": "; + perror("close"); + } + else + p.fd = -1; return; } } @@ -124,7 +129,7 @@ bool CTCSocket::Send(const STCPacket *packet) return false; } -// returns true if successful +// returns true if data is returned bool CTCServer::Receive(char module, STCPacket *packet, int ms) { const auto pos = m_Modules.find(module); @@ -134,24 +139,38 @@ bool CTCServer::Receive(char module, STCPacket *packet, int ms) return false; } - auto rv = poll(&m_Pfd[pos], 1, ms); + auto fds = &m_Pfd[pos]; + auto rv = poll(fds, 1, ms); if (rv < 0) { - perror("Recieve poll"); - Close(m_Pfd[pos].fd); + perror("CTCServer::Recieve poll"); + Close(fds->fd); return false; } - auto data = (unsigned char *)packet; - auto n = recv(m_Pfd[pos].fd, data, sizeof(STCPacket), MSG_WAITALL); - if (n < 0) + if (0 == rv) + return false; + + if (fds->revents & POLLIN) { - perror("Receive recv"); - Close(m_Pfd[pos].fd); + auto data = (unsigned char *)packet; + auto n = recv(fds->fd, data, sizeof(STCPacket), MSG_WAITALL); + if (n < 0) + { + perror("Receive recv"); + Close(fds->fd); + return false; + } + if (n != sizeof(STCPacket)) + std::cout << "WARNING: Receive only read " << n << " bytes of the transcoder packet!" << std::endl; + } + + if (fds->revents & POLLERR || fds->revents & POLLHUP) + { + std::cerr << ((fds->revents & POLLERR) ? "POLLERR" : "POLLHUP") << " received on fd " << fds->fd << ", closing socket" << std::endl; + Close(fds->fd); return false; } - if (n != sizeof(STCPacket)) - std::cout << "Warning: Receive only read " << n << " bytes of the transcoder packet!" << std::endl; return true; } @@ -317,7 +336,7 @@ bool CTCClient::Connect(char module) { if (ECONNREFUSED == errno) { - if (0 == ++count % 100) std::cout << "Connection refused! Restart the server." << std::endl; + if (0 == ++count % 100) std::cout << "Connection refused! Restart the system." << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(100)); } else @@ -350,3 +369,65 @@ bool CTCClient::Connect(char module) return false; } + +bool CTCClient::CheckConnections() +{ + for (char m : m_Modules) + { + if (-1 == GetFD(m)) + { + if (Connect(m)) + { + return true; + } + } + } + return false; +} + +// returns true if data is returned. +// queue should be initially empty! +bool CTCClient::Receive(std::queue> &queue, int ms) +{ + auto n = m_Pfd.size(); + auto rv = poll(m_Pfd.data(), n, ms); + if (0 == rv) + return false; + + if (rv < 0) + { + perror("CTCClient::Receive poll"); + return false; + } + + for (auto &pfd : m_Pfd) + { + if (pfd.revents & POLLIN) + { + auto packet = std::make_unique(); + auto data = (unsigned char *)packet.get(); + auto n = recv(pfd.fd, data, sizeof(STCPacket), MSG_WAITALL); + + if (n < 0) + { + perror("Receive recv"); + Close(pfd.fd); + packet.reset(); + } + else if (n > 0) + { + if (n != sizeof(STCPacket)) + std::cout << "WARNING: Receive only read " << n << " bytes of the transcoder packet!" << std::endl; + + queue.push(std::move(packet)); + } + } + + if (pfd.revents & POLLERR || pfd.revents & POLLHUP) + { + std::cerr << ((pfd.revents & POLLERR) ? "POLLERR" : "POLLHUP") << " received on fd " << pfd.fd << ", closing socket" << std::endl; + Close(pfd.fd); + } + } + return ! queue.empty(); +} diff --git a/reflector/TCSocket.h b/reflector/TCSocket.h index 8b6b484..57d784c 100644 --- a/reflector/TCSocket.h +++ b/reflector/TCSocket.h @@ -20,6 +20,8 @@ #include #include #include +#include +#include #include #include "TCPacketDef.h" @@ -63,6 +65,8 @@ public: ~CTCClient() {} bool Initialize(const std::string &address, const std::string &modules, uint16_t port); bool Connect(char module); + bool CheckConnections(); + bool Receive(std::queue> &queue, int ms); private: std::string m_Address; uint16_t m_Port;