From 9ba2c8fefd0398e3400053dec9ae75158edb0786 Mon Sep 17 00:00:00 2001 From: Tom Early Date: Wed, 22 May 2024 05:22:58 -0700 Subject: [PATCH] move TC server Accept to the reflector maintenance thread --- reflector/CodecStream.cpp | 18 ----------- reflector/Reflector.cpp | 64 ++++++++++++++++++++++----------------- reflector/Reflector.h | 4 +-- reflector/TCSocket.cpp | 24 ++++++++++----- reflector/TCSocket.h | 2 +- 5 files changed, 56 insertions(+), 56 deletions(-) diff --git a/reflector/CodecStream.cpp b/reflector/CodecStream.cpp index 27ae286..30bc9fa 100644 --- a/reflector/CodecStream.cpp +++ b/reflector/CodecStream.cpp @@ -108,24 +108,6 @@ void CCodecStream::Thread() void CCodecStream::Task(void) { - int fd = g_TCServer.GetFD(m_CSModule); - - // if the fd is not good we need to reestablish it - if (fd < 0) // log the situation - std::cout << "Lost connection to transcoder, module '" << m_CSModule << "', waiting for new connection..." << std::endl; - - while (fd < 0) - { - if (g_TCServer.Accept()) // try to get a connection - { - std::cerr << "Unrecoverable ERROR! Quiting..." << std::endl; - abort(); - } - // Either Accept timed out, or it's possile that other Transcoder ports were instead reopened - // So we'll check to see if this module is now open - fd = g_TCServer.GetFD(m_CSModule); - } - STCPacket pack; if (g_TCServer.Receive(m_CSModule, &pack, 8)) { diff --git a/reflector/Reflector.cpp b/reflector/Reflector.cpp index 1cfc3fc..433ab15 100644 --- a/reflector/Reflector.cpp +++ b/reflector/Reflector.cpp @@ -31,9 +31,9 @@ CReflector::CReflector() CReflector::~CReflector() { keep_running = false; - if ( m_XmlReportFuture.valid() ) + if ( m_MaintenanceFuture.valid() ) { - m_XmlReportFuture.get(); + m_MaintenanceFuture.get(); } for (auto it=m_Modules.cbegin(); it!=m_Modules.cend(); it++) @@ -131,7 +131,7 @@ bool CReflector::Start(void) // start the reporting thread try { - m_XmlReportFuture = std::async(std::launch::async, &CReflector::StateReportThread, this); + m_MaintenanceFuture = std::async(std::launch::async, &CReflector::MaintenanceThread, this); } catch(const std::exception& e) { @@ -155,9 +155,9 @@ void CReflector::Stop(void) g_TCServer.Close(); // stop & delete report threads - if ( m_XmlReportFuture.valid() ) + if ( m_MaintenanceFuture.valid() ) { - m_XmlReportFuture.get(); + m_MaintenanceFuture.get(); } // stop & delete all router thread @@ -333,12 +333,11 @@ void CReflector::RouterThread(const char ThisModule) } } -//////////////////////////////////////////////////////////////////////////////////////// -// report threads - +// Maintenance thread hands xml and/or json update, +// and also keeps the transcoder TCP port(s) connected #define XML_UPDATE_PERIOD 10 -void CReflector::StateReportThread() +void CReflector::MaintenanceThread() { std::string xmlpath, jsonpath; #ifndef NO_DHT @@ -348,6 +347,7 @@ void CReflector::StateReportThread() xmlpath.assign(g_Configure.GetString(g_Keys.files.xml)); if (g_Configure.Contains(g_Keys.files.json)) jsonpath.assign(g_Configure.GetString(g_Keys.files.json)); + auto tcport = g_Configure.GetUnsigned(g_Keys.tc.port); if (xmlpath.empty() && jsonpath.empty()) return; // nothing to do @@ -387,33 +387,41 @@ void CReflector::StateReportThread() } } +#ifndef NO_DHT + // update the dht data, if needed + if (peers_changed) + { + PutDHTPeers(); + peers_changed = false; + } + if (clients_changed) + { + PutDHTClients(); + clients_changed = false; + } + if (users_changed) + { + PutDHTUsers(); + users_changed = false; + } +#endif + // and wait a bit and do something useful at the same time - for (int i=0; i< XML_UPDATE_PERIOD && keep_running; i++) + for (int i=0; i< XML_UPDATE_PERIOD*10 && keep_running; i++) { -#ifndef NO_DHT - // update the dht data, if needed - if (peers_changed) - { - PutDHTPeers(); - peers_changed = false; - } - if (clients_changed) - { - PutDHTClients(); - clients_changed = false; - } - if (users_changed) + if (tcport && g_TCServer.AnyAreClosed()) { - PutDHTUsers(); - users_changed = false; + if (g_TCServer.Accept()) + { + std::cerr << "Unrecoverable error, aborting..." << std::endl; + abort(); + } } -#endif - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } } -//////////////////////////////////////////////////////////////////////////////////////// // notifications void CReflector::OnPeersChanged(void) diff --git a/reflector/Reflector.h b/reflector/Reflector.h index 5d8fb52..9b71b1e 100644 --- a/reflector/Reflector.h +++ b/reflector/Reflector.h @@ -94,7 +94,7 @@ protected: // threads void RouterThread(const char); - void StateReportThread(void); + void MaintenanceThread(void); // streams std::shared_ptr GetStream(char); @@ -121,7 +121,7 @@ protected: // threads std::atomic keep_running; std::unordered_map> m_RouterFuture; - std::future m_XmlReportFuture; + std::future m_MaintenanceFuture; #ifndef NO_DHT // Distributed Hash Table diff --git a/reflector/TCSocket.cpp b/reflector/TCSocket.cpp index 24c7658..9d55e7a 100644 --- a/reflector/TCSocket.cpp +++ b/reflector/TCSocket.cpp @@ -63,13 +63,20 @@ void CTCSocket::Close(int fd) { if (fd == p.fd) { - if (close(p.fd)) + if (shutdown(p.fd, SHUT_RDWR)) { - std::cerr << "Error while closing " << fd << ": "; - perror("close"); + perror("shutdown"); } else - p.fd = -1; + { + if (close(p.fd)) + { + std::cerr << "Error while closing " << fd << ": "; + perror("close"); + } + else + p.fd = -1; + } return; } } @@ -96,7 +103,7 @@ char CTCSocket::GetMod(int fd) const return '?'; } -bool CTCServer::any_are_closed() +bool CTCServer::AnyAreClosed() const { for (auto &fds : m_Pfd) { @@ -171,7 +178,6 @@ bool CTCServer::Receive(char module, STCPacket *packet, int ms) auto pfds = &m_Pfd[pos]; if (pfds->fd < 0) { - std::cerr << "Can't receive on module '" << module << "' because it's closed" << std::endl; return rv; } @@ -265,10 +271,14 @@ bool CTCServer::Accept() return true; } - while (any_are_closed()) + while (AnyAreClosed()) { if (acceptone(fd)) + { + close(fd); + Close(); return true; + } } close(fd); diff --git a/reflector/TCSocket.h b/reflector/TCSocket.h index 8a2de8a..381e3ae 100644 --- a/reflector/TCSocket.h +++ b/reflector/TCSocket.h @@ -58,11 +58,11 @@ public: bool Open(const std::string &address, const std::string &modules, uint16_t port); // Returns true if there is data bool Receive(char module, STCPacket *packet, int ms); + bool AnyAreClosed() const; bool Accept(); private: CIp m_Ip; - bool any_are_closed(); bool acceptone(int fd); };