move TC server Accept to the reflector maintenance thread

pull/3/head
Tom Early 2 years ago
parent f9e67ce7e6
commit 9ba2c8fefd

@ -108,24 +108,6 @@ void CCodecStream::Thread()
void CCodecStream::Task(void)
{
int fd = g_TCServer.GetFD(m_CSModule);
// if the fd is not good we need to reestablish it
if (fd < 0) // log the situation
std::cout << "Lost connection to transcoder, module '" << m_CSModule << "', waiting for new connection..." << std::endl;
while (fd < 0)
{
if (g_TCServer.Accept()) // try to get a connection
{
std::cerr << "Unrecoverable ERROR! Quiting..." << std::endl;
abort();
}
// Either Accept timed out, or it's possile that other Transcoder ports were instead reopened
// So we'll check to see if this module is now open
fd = g_TCServer.GetFD(m_CSModule);
}
STCPacket pack;
if (g_TCServer.Receive(m_CSModule, &pack, 8))
{

@ -31,9 +31,9 @@ CReflector::CReflector()
CReflector::~CReflector()
{
keep_running = false;
if ( m_XmlReportFuture.valid() )
if ( m_MaintenanceFuture.valid() )
{
m_XmlReportFuture.get();
m_MaintenanceFuture.get();
}
for (auto it=m_Modules.cbegin(); it!=m_Modules.cend(); it++)
@ -131,7 +131,7 @@ bool CReflector::Start(void)
// start the reporting thread
try
{
m_XmlReportFuture = std::async(std::launch::async, &CReflector::StateReportThread, this);
m_MaintenanceFuture = std::async(std::launch::async, &CReflector::MaintenanceThread, this);
}
catch(const std::exception& e)
{
@ -155,9 +155,9 @@ void CReflector::Stop(void)
g_TCServer.Close();
// stop & delete report threads
if ( m_XmlReportFuture.valid() )
if ( m_MaintenanceFuture.valid() )
{
m_XmlReportFuture.get();
m_MaintenanceFuture.get();
}
// stop & delete all router thread
@ -333,12 +333,11 @@ void CReflector::RouterThread(const char ThisModule)
}
}
////////////////////////////////////////////////////////////////////////////////////////
// report threads
// Maintenance thread hands xml and/or json update,
// and also keeps the transcoder TCP port(s) connected
#define XML_UPDATE_PERIOD 10
void CReflector::StateReportThread()
void CReflector::MaintenanceThread()
{
std::string xmlpath, jsonpath;
#ifndef NO_DHT
@ -348,6 +347,7 @@ void CReflector::StateReportThread()
xmlpath.assign(g_Configure.GetString(g_Keys.files.xml));
if (g_Configure.Contains(g_Keys.files.json))
jsonpath.assign(g_Configure.GetString(g_Keys.files.json));
auto tcport = g_Configure.GetUnsigned(g_Keys.tc.port);
if (xmlpath.empty() && jsonpath.empty())
return; // nothing to do
@ -387,33 +387,41 @@ void CReflector::StateReportThread()
}
}
#ifndef NO_DHT
// update the dht data, if needed
if (peers_changed)
{
PutDHTPeers();
peers_changed = false;
}
if (clients_changed)
{
PutDHTClients();
clients_changed = false;
}
if (users_changed)
{
PutDHTUsers();
users_changed = false;
}
#endif
// and wait a bit and do something useful at the same time
for (int i=0; i< XML_UPDATE_PERIOD && keep_running; i++)
for (int i=0; i< XML_UPDATE_PERIOD*10 && keep_running; i++)
{
#ifndef NO_DHT
// update the dht data, if needed
if (peers_changed)
{
PutDHTPeers();
peers_changed = false;
}
if (clients_changed)
{
PutDHTClients();
clients_changed = false;
}
if (users_changed)
if (tcport && g_TCServer.AnyAreClosed())
{
PutDHTUsers();
users_changed = false;
if (g_TCServer.Accept())
{
std::cerr << "Unrecoverable error, aborting..." << std::endl;
abort();
}
}
#endif
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
}
////////////////////////////////////////////////////////////////////////////////////////
// notifications
void CReflector::OnPeersChanged(void)

@ -94,7 +94,7 @@ protected:
// threads
void RouterThread(const char);
void StateReportThread(void);
void MaintenanceThread(void);
// streams
std::shared_ptr<CPacketStream> GetStream(char);
@ -121,7 +121,7 @@ protected:
// threads
std::atomic<bool> keep_running;
std::unordered_map<char, std::future<void>> m_RouterFuture;
std::future<void> m_XmlReportFuture;
std::future<void> m_MaintenanceFuture;
#ifndef NO_DHT
// Distributed Hash Table

@ -63,13 +63,20 @@ void CTCSocket::Close(int fd)
{
if (fd == p.fd)
{
if (close(p.fd))
if (shutdown(p.fd, SHUT_RDWR))
{
std::cerr << "Error while closing " << fd << ": ";
perror("close");
perror("shutdown");
}
else
p.fd = -1;
{
if (close(p.fd))
{
std::cerr << "Error while closing " << fd << ": ";
perror("close");
}
else
p.fd = -1;
}
return;
}
}
@ -96,7 +103,7 @@ char CTCSocket::GetMod(int fd) const
return '?';
}
bool CTCServer::any_are_closed()
bool CTCServer::AnyAreClosed() const
{
for (auto &fds : m_Pfd)
{
@ -171,7 +178,6 @@ bool CTCServer::Receive(char module, STCPacket *packet, int ms)
auto pfds = &m_Pfd[pos];
if (pfds->fd < 0)
{
std::cerr << "Can't receive on module '" << module << "' because it's closed" << std::endl;
return rv;
}
@ -265,10 +271,14 @@ bool CTCServer::Accept()
return true;
}
while (any_are_closed())
while (AnyAreClosed())
{
if (acceptone(fd))
{
close(fd);
Close();
return true;
}
}
close(fd);

@ -58,11 +58,11 @@ public:
bool Open(const std::string &address, const std::string &modules, uint16_t port);
// Returns true if there is data
bool Receive(char module, STCPacket *packet, int ms);
bool AnyAreClosed() const;
bool Accept();
private:
CIp m_Ip;
bool any_are_closed();
bool acceptone(int fd);
};

Loading…
Cancel
Save

Powered by TurnKey Linux.