some redesign of TC socket

pull/3/head
Tom Early 2 years ago
parent 4d8582c5c2
commit 8698c70ab8

@ -127,7 +127,7 @@ void CCodecStream::Task(void)
}
STCPacket pack;
if (g_TCServer.Receive(m_CSModule, &pack, 8))
if (! g_TCServer.Receive(m_CSModule, pack, 8))
{
// update statistics
double rt = pack.rt_timer.time(); // the round-trip time

@ -88,7 +88,7 @@ char CTCSocket::GetMod(int fd) const
return '?';
}
bool CTCSocket::any_are_closed()
bool CTCServer::any_are_closed()
{
const auto n = m_Modules.size();
for (unsigned i=0; i<n; i++)
@ -129,50 +129,55 @@ bool CTCSocket::Send(const STCPacket *packet)
return false;
}
// returns true if data is returned
bool CTCServer::Receive(char module, STCPacket *packet, int ms)
bool CTCSocket::receive(int fd, STCPacket &packet)
{
auto n = recv(fd, &packet, sizeof(STCPacket), MSG_WAITALL);
if (n < 0)
{
perror("Receive recv");
Close(fd);
return true;
}
if (n != sizeof(STCPacket))
std::cout << "WARNING: Receive only read " << n << " bytes of the transcoder packet!" << std::endl;
return false;
}
// returns true on error
bool CTCServer::Receive(char module, STCPacket &packet, int ms)
{
const auto pos = m_Modules.find(module);
if (pos == std::string::npos)
{
std::cerr << "Can't Recevieve on unconfigured module '" << module << "'" << std::endl;
return false;
return true;
}
auto fds = &m_Pfd[pos];
auto rv = poll(fds, 1, ms);
if (rv < 0)
{
perror("CTCServer::Recieve poll");
perror("Recieve poll");
Close(fds->fd);
return false;
return true;
}
if (0 == rv)
return false;
return false; // timeout
if (fds->revents & POLLIN)
{
auto data = (unsigned char *)packet;
auto n = recv(fds->fd, data, sizeof(STCPacket), MSG_WAITALL);
if (n < 0)
{
perror("Receive recv");
Close(fds->fd);
return false;
}
if (n != sizeof(STCPacket))
std::cout << "WARNING: Receive only read " << n << " bytes of the transcoder packet!" << std::endl;
return receive(fds->fd, packet);
}
if (fds->revents & POLLERR || fds->revents & POLLHUP)
{
std::cerr << ((fds->revents & POLLERR) ? "POLLERR" : "POLLHUP") << " received on fd " << fds->fd << ", closing socket" << std::endl;
Close(fds->fd);
return false;
return true;
}
return true;
return false;
}
bool CTCServer::Open(const std::string &address, const std::string &modules, uint16_t port)
@ -220,21 +225,24 @@ bool CTCServer::Open(const std::string &address, const std::string &modules, uin
return true;
}
auto n = m_Modules.size();
std::cout << "Waiting for " << n << " transcoder connection(s)..." << std::endl;
m_Pfd.back().fd = fd;
std::cout << "Waiting for " << m_Modules.size() << " transcoder connection(s) on fd " << fd << "..." << std::endl;
return Accept();
}
bool CTCServer::Accept()
{
while (any_are_closed())
{
if (Accept())
if (AcceptOne())
return true;
}
return false;
}
bool CTCServer::Accept()
bool CTCServer::AcceptOne()
{
auto rv = poll(&m_Pfd.back(), 1, 10);
if (rv < 0)
@ -243,8 +251,8 @@ bool CTCServer::Accept()
return true;
}
if (0 == rv) // we timed out waiting for something
return false;
if (0 == rv)
return false; // timeout
// rv has to be 1, so, here comes a connect request
@ -260,7 +268,7 @@ bool CTCServer::Accept()
}
char mod;
rv = recv(newfd, &mod, 1, 0); // retrieve the identification byte
rv = recv(newfd, &mod, 1, 0); // block to get the identification byte
if (rv != 1)
{
if (rv < 0)
@ -377,62 +385,54 @@ bool CTCClient::Connect(char module)
return false;
}
bool CTCClient::CheckConnections()
bool CTCClient::ReConnect()
{
bool rv = false;
for (char m : m_Modules)
{
if (-1 == GetFD(m))
{
if (Connect(m))
{
return true;
rv = true;
}
}
}
return false;
return rv;
}
// returns true if data is returned.
// queue should be initially empty!
bool CTCClient::Receive(std::queue<std::unique_ptr<STCPacket>> &queue, int ms)
{
auto n = m_Pfd.size();
const auto n = m_Pfd.size();
auto rv = poll(m_Pfd.data(), n, ms);
if (0 == rv)
return false;
if (rv < 0)
{
perror("CTCClient::Receive poll");
perror("Receive poll");
return false;
}
if (0 == rv)
return false;
for (auto &pfd : m_Pfd)
{
if (pfd.revents & POLLIN)
{
auto packet = std::make_unique<STCPacket>();
auto data = (unsigned char *)packet.get();
auto n = recv(pfd.fd, data, sizeof(STCPacket), MSG_WAITALL);
if (n < 0)
auto p_tcpack = std::make_unique<STCPacket>();
if (receive(pfd.fd, *p_tcpack))
{
perror("Receive recv");
Close(pfd.fd);
packet.reset();
p_tcpack.reset();
}
else if (n > 0)
else
{
if (n != sizeof(STCPacket))
std::cout << "WARNING: Receive only read " << n << " bytes of the transcoder packet!" << std::endl;
queue.push(std::move(packet));
queue.push(std::move(p_tcpack));
}
}
if (pfd.revents & POLLERR || pfd.revents & POLLHUP)
{
std::cerr << ((pfd.revents & POLLERR) ? "POLLERR" : "POLLHUP") << " received on fd " << pfd.fd << ", closing socket" << std::endl;
std::cerr << "IO ERROR on Receive module " << GetMod(pfd.fd) << std::endl;
Close(pfd.fd);
}
}

@ -35,15 +35,15 @@ public:
void Close(); // close all open sockets
void Close(char module); // close a specific module
void Close(int fd); // close a specific file descriptor
bool receive(int fd, STCPacket &packet);
// most bool functions return true on failure
// All bool functions return true if there was an error
bool Send(const STCPacket *packet);
int GetFD(char module) const; // can return -1!
char GetMod(int fd) const;
protected:
bool any_are_closed();
std::vector<struct pollfd> m_Pfd;
std::string m_Modules;
};
@ -54,8 +54,12 @@ public:
CTCServer() : CTCSocket() {}
~CTCServer() {}
bool Open(const std::string &address, const std::string &modules, uint16_t port);
bool Receive(char module, STCPacket &packet, int ms);
bool Accept();
bool Receive(char module, STCPacket *packet, int ms);
private:
bool any_are_closed();
bool AcceptOne();
};
class CTCClient : public CTCSocket
@ -64,10 +68,11 @@ public:
CTCClient() : CTCSocket(), m_Port(0) {}
~CTCClient() {}
bool Initialize(const std::string &address, const std::string &modules, uint16_t port);
bool Connect(char module);
bool CheckConnections();
bool Receive(std::queue<std::unique_ptr<STCPacket>> &queue, int ms);
bool ReConnect();
private:
bool Connect(char module);
std::string m_Address;
uint16_t m_Port;
};

Loading…
Cancel
Save

Powered by TurnKey Linux.