Changes for CTCPClient

pull/3/head
Tom Early 2 years ago
parent e0f3797d69
commit 37d47c1ac6

@ -122,7 +122,7 @@ void CCodecStream::Task(void)
exit(1); exit(1);
} }
// Either Accept timed out, or it's possile that other Transcoder ports were instead reopened // Either Accept timed out, or it's possile that other Transcoder ports were instead reopened
// So we'll check to see if the one for this module is okay now // So we'll check to see if this module is now open
fd = g_TCServer.GetFD(m_CSModule); fd = g_TCServer.GetFD(m_CSModule);
} }

@ -31,7 +31,7 @@ void CTCSocket::Close()
{ {
if (item.fd >= 0) if (item.fd >= 0)
{ {
close(item.fd); Close(item.fd);
} }
} }
m_Pfd.clear(); m_Pfd.clear();
@ -45,7 +45,7 @@ void CTCSocket::Close(char mod)
std::cerr << "Could not find module '" << mod << "'" << std::endl; std::cerr << "Could not find module '" << mod << "'" << std::endl;
return; return;
} }
close(m_Pfd[pos].fd); Close(m_Pfd[pos].fd);
m_Pfd[pos].fd = -1; m_Pfd[pos].fd = -1;
} }
@ -55,8 +55,13 @@ void CTCSocket::Close(int fd)
{ {
if (fd == p.fd) if (fd == p.fd)
{ {
close(p.fd); if (close(p.fd))
p.fd = -1; {
std::cerr << "Error while closing " << fd << ": ";
perror("close");
}
else
p.fd = -1;
return; return;
} }
} }
@ -124,7 +129,7 @@ bool CTCSocket::Send(const STCPacket *packet)
return false; return false;
} }
// returns true if successful // returns true if data is returned
bool CTCServer::Receive(char module, STCPacket *packet, int ms) bool CTCServer::Receive(char module, STCPacket *packet, int ms)
{ {
const auto pos = m_Modules.find(module); const auto pos = m_Modules.find(module);
@ -134,24 +139,38 @@ bool CTCServer::Receive(char module, STCPacket *packet, int ms)
return false; return false;
} }
auto rv = poll(&m_Pfd[pos], 1, ms); auto fds = &m_Pfd[pos];
auto rv = poll(fds, 1, ms);
if (rv < 0) if (rv < 0)
{ {
perror("Recieve poll"); perror("CTCServer::Recieve poll");
Close(m_Pfd[pos].fd); Close(fds->fd);
return false; return false;
} }
auto data = (unsigned char *)packet; if (0 == rv)
auto n = recv(m_Pfd[pos].fd, data, sizeof(STCPacket), MSG_WAITALL); return false;
if (n < 0)
if (fds->revents & POLLIN)
{ {
perror("Receive recv"); auto data = (unsigned char *)packet;
Close(m_Pfd[pos].fd); auto n = recv(fds->fd, data, sizeof(STCPacket), MSG_WAITALL);
if (n < 0)
{
perror("Receive recv");
Close(fds->fd);
return false;
}
if (n != sizeof(STCPacket))
std::cout << "WARNING: Receive only read " << n << " bytes of the transcoder packet!" << std::endl;
}
if (fds->revents & POLLERR || fds->revents & POLLHUP)
{
std::cerr << ((fds->revents & POLLERR) ? "POLLERR" : "POLLHUP") << " received on fd " << fds->fd << ", closing socket" << std::endl;
Close(fds->fd);
return false; return false;
} }
if (n != sizeof(STCPacket))
std::cout << "Warning: Receive only read " << n << " bytes of the transcoder packet!" << std::endl;
return true; return true;
} }
@ -317,7 +336,7 @@ bool CTCClient::Connect(char module)
{ {
if (ECONNREFUSED == errno) if (ECONNREFUSED == errno)
{ {
if (0 == ++count % 100) std::cout << "Connection refused! Restart the server." << std::endl; if (0 == ++count % 100) std::cout << "Connection refused! Restart the system." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::this_thread::sleep_for(std::chrono::milliseconds(100));
} }
else else
@ -350,3 +369,65 @@ bool CTCClient::Connect(char module)
return false; return false;
} }
bool CTCClient::CheckConnections()
{
for (char m : m_Modules)
{
if (-1 == GetFD(m))
{
if (Connect(m))
{
return true;
}
}
}
return false;
}
// returns true if data is returned.
// queue should be initially empty!
bool CTCClient::Receive(std::queue<std::unique_ptr<STCPacket>> &queue, int ms)
{
auto n = m_Pfd.size();
auto rv = poll(m_Pfd.data(), n, ms);
if (0 == rv)
return false;
if (rv < 0)
{
perror("CTCClient::Receive poll");
return false;
}
for (auto &pfd : m_Pfd)
{
if (pfd.revents & POLLIN)
{
auto packet = std::make_unique<STCPacket>();
auto data = (unsigned char *)packet.get();
auto n = recv(pfd.fd, data, sizeof(STCPacket), MSG_WAITALL);
if (n < 0)
{
perror("Receive recv");
Close(pfd.fd);
packet.reset();
}
else if (n > 0)
{
if (n != sizeof(STCPacket))
std::cout << "WARNING: Receive only read " << n << " bytes of the transcoder packet!" << std::endl;
queue.push(std::move(packet));
}
}
if (pfd.revents & POLLERR || pfd.revents & POLLHUP)
{
std::cerr << ((pfd.revents & POLLERR) ? "POLLERR" : "POLLHUP") << " received on fd " << pfd.fd << ", closing socket" << std::endl;
Close(pfd.fd);
}
}
return ! queue.empty();
}

@ -20,6 +20,8 @@
#include <cstdint> #include <cstdint>
#include <mutex> #include <mutex>
#include <vector> #include <vector>
#include <queue>
#include <memory>
#include <poll.h> #include <poll.h>
#include "TCPacketDef.h" #include "TCPacketDef.h"
@ -63,6 +65,8 @@ public:
~CTCClient() {} ~CTCClient() {}
bool Initialize(const std::string &address, const std::string &modules, uint16_t port); bool Initialize(const std::string &address, const std::string &modules, uint16_t port);
bool Connect(char module); bool Connect(char module);
bool CheckConnections();
bool Receive(std::queue<std::unique_ptr<STCPacket>> &queue, int ms);
private: private:
std::string m_Address; std::string m_Address;
uint16_t m_Port; uint16_t m_Port;

Loading…
Cancel
Save

Powered by TurnKey Linux.