diff --git a/reflector/CodecStream.cpp b/reflector/CodecStream.cpp index 2a436e2..4760b0f 100644 --- a/reflector/CodecStream.cpp +++ b/reflector/CodecStream.cpp @@ -18,7 +18,6 @@ #include -#include #include "Global.h" #include "DVFramePacket.h" @@ -114,6 +113,7 @@ void CCodecStream::Task(void) // if the fd is not good we need to reestablish it if (fd < 0) // log the situation std::cout << "Lost connection to transcoder, module '" << m_CSModule << "', waiting for new connection..." << std::endl; + while (fd < 0) { if (g_TCServer.Accept()) // try to get a connection @@ -127,20 +127,8 @@ void CCodecStream::Task(void) } STCPacket pack; - struct timeval tv; - fd_set readfds; - - tv.tv_sec = 0; - tv.tv_usec = 7000; - - FD_ZERO(&readfds); - FD_SET(fd, &readfds); - - // don't care about writefds and exceptfds: - if (select(fd+1, &readfds, NULL, NULL, &tv)) + if (g_TCServer.Receive(m_CSModule, &pack, 8)) { - if (g_TCServer.Receive(fd, &pack)) - return; // update statistics double rt = pack.rt_timer.time(); // the round-trip time if (0 == m_RTCount) @@ -219,7 +207,7 @@ void CCodecStream::Task(void) return; } - if (g_TCServer.Send(fd, Frame->GetCodecPacket())) + if (g_TCServer.Send(Frame->GetCodecPacket())) { // ditto, we'll try to fix this on the next pass return; diff --git a/reflector/TCSocket.cpp b/reflector/TCSocket.cpp index 03cab1a..61fd109 100644 --- a/reflector/TCSocket.cpp +++ b/reflector/TCSocket.cpp @@ -27,34 +27,36 @@ void CTCSocket::Close() { - std::lock_guard lck(m_Mutex); - for (auto item : m_FD) - close(item.second); - m_FD.clear(); + for (auto &item : m_Pfd) + { + if (item.fd >= 0) + { + close(item.fd); + } + } + m_Pfd.clear(); } void CTCSocket::Close(char mod) { - std::lock_guard lck(m_Mutex); - auto item = m_FD.find(mod); - if (m_FD.end() == item) + auto pos = m_Modules.find(mod); + if (std::string::npos == pos) { - std::cerr << "Could not find a file descriptor for module '" << mod << "'" << std::endl; + std::cerr << "Could not find module '" << mod << "'" << std::endl; return; } - close(item->second); - m_FD.erase(item); + close(m_Pfd[pos].fd); + m_Pfd[pos].fd = -1; } void CTCSocket::Close(int fd) { - std::lock_guard lck(m_Mutex); - for (auto &p : m_FD) + for (auto &p : m_Pfd) { - if (fd == p.second) + if (fd == p.fd) { - close(fd); - m_FD.erase(p.first); + close(p.fd); + p.fd = -1; return; } } @@ -63,32 +65,47 @@ void CTCSocket::Close(int fd) int CTCSocket::GetFD(char module) const { - std::lock_guard lck(m_Mutex); - const auto item = m_FD.find(module); - if (m_FD.cend() == item) - { + auto pos = m_Modules.find(module); + if (std::string::npos == pos) return -1; - } - return item->second; + return m_Pfd[pos].fd; } char CTCSocket::GetMod(int fd) const { - std::lock_guard lck(m_Mutex); - for (const auto &p : m_FD) + for (unsigned i=0; i m_Pfd[i].fd) + return true; + } + return false; +} + +bool CTCSocket::Send(const STCPacket *packet) { + const auto pos = m_Modules.find(packet->module); + if (pos == std::string::npos) + { + std::cerr << "Can't Send() this packet to unconfigured module '" << packet->module << "'" << std::endl; + return true; + } unsigned count = 0; auto data = (const unsigned char *)packet; do { - auto n = send(fd, data+count, sizeof(STCPacket)-count, 0); + auto n = send(m_Pfd[pos].fd, data+count, sizeof(STCPacket)-count, 0); if (n <= 0) { if (0 == n) @@ -107,22 +124,47 @@ bool CTCSocket::Send(int fd, const STCPacket *packet) return false; } -bool CTCSocket::Receive(int fd, STCPacket *packet) +// returns true if successful +bool CTCServer::Receive(char module, STCPacket *packet, int ms) { + const auto pos = m_Modules.find(module); + if (pos == std::string::npos) + { + std::cerr << "Can't Recevieve on unconfigured module '" << module << "'" << std::endl; + return false; + } + + auto rv = poll(&m_Pfd[pos], 1, ms); + if (rv < 0) + { + perror("Recieve poll"); + Close(m_Pfd[pos].fd); + return false; + } + auto data = (unsigned char *)packet; - auto n = recv(fd, data, sizeof(STCPacket), MSG_WAITALL); + auto n = recv(m_Pfd[pos].fd, data, sizeof(STCPacket), MSG_WAITALL); if (n < 0) { - perror("CTCSocket::Receive"); - Close(fd); - return true; + perror("Receive recv"); + Close(m_Pfd[pos].fd); + return false; } - return n == sizeof(STCPacket); + if (n != sizeof(STCPacket)) + std::cout << "Warning: Receive only read " << n << " bytes of the transcoder packet!" << std::endl; + + return true; } bool CTCServer::Open(const std::string &address, const std::string &modules, uint16_t port) { m_Modules.assign(modules); + m_Pfd.resize(m_Modules.size()+1); + for (auto &pf : m_Pfd) + { + pf.fd = -1; + pf.events = POLLIN; + } CIp ip(address.c_str(), AF_UNSPEC, SOCK_STREAM, port); @@ -162,51 +204,38 @@ bool CTCServer::Open(const std::string &address, const std::string &modules, uin auto n = m_Modules.size(); std::cout << "Waiting for " << n << " transcoder connection(s)..." << std::endl; - while (m_FD.size() < n) + while (any_are_closed()) { if (Accept()) return true; } - m_listenSock = fd; + m_Pfd.back().fd = fd; return false; } bool CTCServer::Accept() { - struct timeval tv; - fd_set readfds; - - // 10 milliseconds - tv.tv_sec = 0; - tv.tv_usec = 10000; - - FD_ZERO(&readfds); - FD_SET(m_listenSock, &readfds); - - // don't care about writefds and exceptfds: - int rv = select(m_listenSock+1, &readfds, NULL, NULL, &tv); + auto rv = poll(&m_Pfd.back(), 1, 10); if (rv < 0) { - perror("Accept select"); + perror("Accept poll"); return true; } if (0 == rv) // we timed out waiting for something return false; - // here comes a connect + // rv has to be 1, so, here comes a connect request CIp their_addr; // connector's address information socklen_t sin_size = sizeof(struct sockaddr_storage); - auto newfd = accept(m_listenSock, their_addr.GetPointer(), &sin_size); + auto newfd = accept(m_Pfd.back().fd, their_addr.GetPointer(), &sin_size); if (newfd < 0) { - if (EAGAIN == errno || EWOULDBLOCK == errno) - return false; perror("Accept accept"); return true; } @@ -223,7 +252,8 @@ bool CTCServer::Accept() return true; } - if (std::string::npos == m_Modules.find(mod)) + const auto pos = m_Modules.find(mod); + if (std::string::npos == pos) { std::cerr << "New connection for module '" << mod << "', but it's not configured!" << std::endl; std::cerr << "The transcoded modules need to be configured identically for both urfd and tcd." << std::endl; @@ -233,8 +263,7 @@ bool CTCServer::Accept() std::cout << "File descriptor " << newfd << " opened TCP port for module '" << mod << "' on " << their_addr << std::endl; - std::lock_guard lck(m_Mutex); - m_FD[mod] = newfd; + m_Pfd[pos].fd = newfd; return false; } @@ -259,6 +288,12 @@ bool CTCClient::Initialize(const std::string &address, const std::string &module bool CTCClient::Connect(char module) { + const auto pos = m_Modules.find(module); + if (pos == std::string::npos) + { + std::cerr << "CTCClient::Connect: could not find module '" << module << "' in configured modules!" << std::endl; + return true; + } CIp ip(m_Address.c_str(), AF_UNSPEC, SOCK_STREAM, m_Port); auto fd = socket(ip.GetFamily(), SOCK_STREAM, 0); @@ -311,8 +346,7 @@ bool CTCClient::Connect(char module) std::cout << "File descriptor " << fd << " on " << ip << " opened for module '" << module << "'" << std::endl; - std::lock_guard lck(m_Mutex); - m_FD[module] = fd; + m_Pfd[pos].fd = fd; return false; } diff --git a/reflector/TCSocket.h b/reflector/TCSocket.h index 574528d..8b6b484 100644 --- a/reflector/TCSocket.h +++ b/reflector/TCSocket.h @@ -19,7 +19,8 @@ #include #include #include -#include +#include +#include #include "TCPacketDef.h" @@ -33,16 +34,16 @@ public: void Close(char module); // close a specific module void Close(int fd); // close a specific file descriptor - // bool functions return true on failure - bool Send(int fd, const STCPacket *packet); - bool Receive(int fd, STCPacket *packet); + // most bool functions return true on failure + bool Send(const STCPacket *packet); int GetFD(char module) const; // can return -1! char GetMod(int fd) const; protected: - std::unordered_map m_FD; - mutable std::mutex m_Mutex; + bool any_are_closed(); + std::vector m_Pfd; + std::string m_Modules; }; class CTCServer : public CTCSocket @@ -52,9 +53,7 @@ public: ~CTCServer() {} bool Open(const std::string &address, const std::string &modules, uint16_t port); bool Accept(); -private: - int m_listenSock; - std::string m_Modules; + bool Receive(char module, STCPacket *packet, int ms); }; class CTCClient : public CTCSocket @@ -65,6 +64,6 @@ public: bool Initialize(const std::string &address, const std::string &modules, uint16_t port); bool Connect(char module); private: - std::string m_Address, m_Modules; + std::string m_Address; uint16_t m_Port; };