more tweaks for TC socket

pull/3/head
Tom Early 2 years ago
parent e715f57eff
commit 45cf750bdd

@ -22,7 +22,6 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/select.h> #include <sys/select.h>
#include "IP.h"
#include "TCSocket.h" #include "TCSocket.h"
void CTCSocket::Close() void CTCSocket::Close()
@ -90,10 +89,9 @@ char CTCSocket::GetMod(int fd) const
bool CTCServer::any_are_closed() bool CTCServer::any_are_closed()
{ {
const auto n = m_Modules.size(); for (auto &fds : m_Pfd)
for (unsigned i=0; i<n; i++)
{ {
if (0 > m_Pfd[i].fd) if (0 > fds.fd)
return true; return true;
} }
return false; return false;
@ -138,8 +136,16 @@ bool CTCSocket::receive(int fd, STCPacket &packet)
Close(fd); Close(fd);
return true; return true;
} }
if (0 == n)
{
std::cerr << "recv() returned zero bytes from mdule '" << GetMod(fd) << "'";
Close(fd);
return true;
}
if (n != sizeof(STCPacket)) if (n != sizeof(STCPacket))
std::cout << "WARNING: Receive only read " << n << " bytes of the transcoder packet!" << std::endl; std::cout << "WARNING: Receive only read " << n << " bytes of the transcoder packet from module '" << GetMod(fd) << "'" << std::endl;
return false; return false;
} }
@ -149,31 +155,37 @@ bool CTCServer::Receive(char module, STCPacket &packet, int ms)
const auto pos = m_Modules.find(module); const auto pos = m_Modules.find(module);
if (pos == std::string::npos) if (pos == std::string::npos)
{ {
std::cerr << "Can't Recevieve on unconfigured module '" << module << "'" << std::endl; std::cerr << "Can't receive on unconfigured module '" << module << "'" << std::endl;
return true;
}
auto pfds = &m_Pfd[pos];
if (pfds->fd < 0)
{
std::cerr << "Can't receive on module '" << module << "' because it's closed" << std::endl;
return true; return true;
} }
auto fds = &m_Pfd[pos]; auto rv = poll(pfds, 1, ms);
auto rv = poll(fds, 1, ms);
if (rv < 0) if (rv < 0)
{ {
perror("Recieve poll"); perror("Recieve poll");
Close(fds->fd); Close(pfds->fd);
return true; return true;
} }
if (0 == rv) if (0 == rv)
return false; // timeout return false; // timeout
if (fds->revents & POLLIN) if (pfds->revents & POLLIN)
{ {
return receive(fds->fd, packet); return receive(pfds->fd, packet);
} }
if (fds->revents & POLLERR || fds->revents & POLLHUP) if (pfds->revents & POLLERR || pfds->revents & POLLHUP)
{ {
std::cerr << ((fds->revents & POLLERR) ? "POLLERR" : "POLLHUP") << " received on fd " << fds->fd << ", closing socket" << std::endl; std::cerr << ((pfds->revents & POLLERR) ? "POLLERR" : "POLLHUP") << " received on module " << module << "', closing socket" << std::endl;
Close(fds->fd); Close(pfds->fd);
return true; return true;
} }
@ -183,16 +195,25 @@ bool CTCServer::Receive(char module, STCPacket &packet, int ms)
bool CTCServer::Open(const std::string &address, const std::string &modules, uint16_t port) bool CTCServer::Open(const std::string &address, const std::string &modules, uint16_t port)
{ {
m_Modules.assign(modules); m_Modules.assign(modules);
m_Pfd.resize(m_Modules.size()+1);
m_Ip = CIp(address.c_str(), AF_UNSPEC, SOCK_STREAM, port);
m_Pfd.resize(m_Modules.size());
for (auto &pf : m_Pfd) for (auto &pf : m_Pfd)
{ {
pf.fd = -1; pf.fd = -1;
pf.events = POLLIN; pf.events = POLLIN;
pf.revents = 0;
} }
CIp ip(address.c_str(), AF_UNSPEC, SOCK_STREAM, port); std::cout << "Waiting for " << m_Modules.size() << " transcoder connection(s) on " << m_Ip << "..." << std::endl;
int fd = socket(ip.GetFamily(), SOCK_STREAM, 0); return Accept();
}
bool CTCServer::Accept()
{
auto fd = socket(m_Ip.GetFamily(), SOCK_STREAM, 0);
if (fd < 0) if (fd < 0)
{ {
perror("Open socket"); perror("Open socket");
@ -200,7 +221,7 @@ bool CTCServer::Open(const std::string &address, const std::string &modules, uin
} }
int yes = 1; int yes = 1;
int rv = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); auto rv = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
if (rv < 0) if (rv < 0)
{ {
close(fd); close(fd);
@ -208,7 +229,7 @@ bool CTCServer::Open(const std::string &address, const std::string &modules, uin
return true; return true;
} }
rv = bind(fd, ip.GetCPointer(), ip.GetSize()); rv = bind(fd, m_Ip.GetCPointer(), m_Ip.GetSize());
if (rv < 0) if (rv < 0)
{ {
close(fd); close(fd);
@ -225,37 +246,19 @@ bool CTCServer::Open(const std::string &address, const std::string &modules, uin
return true; return true;
} }
m_Pfd.back().fd = fd;
std::cout << "Waiting for " << m_Modules.size() << " transcoder connection(s) on " << ip << "..." << std::endl;
return Accept();
}
bool CTCServer::Accept()
{
while (any_are_closed()) while (any_are_closed())
{ {
if (acceptone()) if (acceptone())
return true; return true;
} }
close(fd);
return false; return false;
} }
bool CTCServer::acceptone() bool CTCServer::acceptone()
{ {
auto rv = poll(&m_Pfd.back(), 1, 10);
if (rv < 0)
{
perror("Accept poll");
return true;
}
if (0 == rv)
return false; // timeout
// rv has to be 1, so, here comes a connect request
CIp their_addr; // connector's address information CIp their_addr; // connector's address information
socklen_t sin_size = sizeof(struct sockaddr_storage); socklen_t sin_size = sizeof(struct sockaddr_storage);
@ -268,7 +271,7 @@ bool CTCServer::acceptone()
} }
char mod; char mod;
rv = recv(newfd, &mod, 1, 0); // block to get the identification byte int rv = recv(newfd, &mod, 1, MSG_WAITALL); // block to get the identification byte
if (rv != 1) if (rv != 1)
{ {
if (rv < 0) if (rv < 0)
@ -403,8 +406,10 @@ bool CTCClient::ReConnect()
bool CTCClient::Receive(std::queue<std::unique_ptr<STCPacket>> &queue, int ms) bool CTCClient::Receive(std::queue<std::unique_ptr<STCPacket>> &queue, int ms)
{ {
const auto n = m_Pfd.size(); for (auto &pfd : m_Pfd)
auto rv = poll(m_Pfd.data(), n, ms); pfd.revents = 0;
auto rv = poll(m_Pfd.data(), m_Pfd.size(), ms);
if (rv < 0) if (rv < 0)
{ {
@ -418,6 +423,9 @@ bool CTCClient::Receive(std::queue<std::unique_ptr<STCPacket>> &queue, int ms)
bool some_closed = false; bool some_closed = false;
for (auto &pfd : m_Pfd) for (auto &pfd : m_Pfd)
{ {
if (pfd.fd < 0)
continue;
if (pfd.revents & POLLIN) if (pfd.revents & POLLIN)
{ {
auto p_tcpack = std::make_unique<STCPacket>(); auto p_tcpack = std::make_unique<STCPacket>();

@ -24,6 +24,7 @@
#include <memory> #include <memory>
#include <poll.h> #include <poll.h>
#include "IP.h"
#include "TCPacketDef.h" #include "TCPacketDef.h"
class CTCSocket class CTCSocket
@ -59,6 +60,7 @@ public:
bool Accept(); bool Accept();
private: private:
CIp m_Ip;
bool any_are_closed(); bool any_are_closed();
bool acceptone(); bool acceptone();
}; };
@ -73,7 +75,7 @@ public:
bool ReConnect(); bool ReConnect();
private: private:
bool Connect(char module);
std::string m_Address; std::string m_Address;
uint16_t m_Port; uint16_t m_Port;
bool Connect(char module);
}; };

Loading…
Cancel
Save

Powered by TurnKey Linux.