use poll in TCSocket

pull/3/head
Tom Early 2 years ago
parent 38984be6bb
commit e0f3797d69

@ -18,7 +18,6 @@
#include <string.h> #include <string.h>
#include <sys/select.h>
#include "Global.h" #include "Global.h"
#include "DVFramePacket.h" #include "DVFramePacket.h"
@ -114,6 +113,7 @@ void CCodecStream::Task(void)
// if the fd is not good we need to reestablish it // if the fd is not good we need to reestablish it
if (fd < 0) // log the situation if (fd < 0) // log the situation
std::cout << "Lost connection to transcoder, module '" << m_CSModule << "', waiting for new connection..." << std::endl; std::cout << "Lost connection to transcoder, module '" << m_CSModule << "', waiting for new connection..." << std::endl;
while (fd < 0) while (fd < 0)
{ {
if (g_TCServer.Accept()) // try to get a connection if (g_TCServer.Accept()) // try to get a connection
@ -127,20 +127,8 @@ void CCodecStream::Task(void)
} }
STCPacket pack; STCPacket pack;
struct timeval tv; if (g_TCServer.Receive(m_CSModule, &pack, 8))
fd_set readfds;
tv.tv_sec = 0;
tv.tv_usec = 7000;
FD_ZERO(&readfds);
FD_SET(fd, &readfds);
// don't care about writefds and exceptfds:
if (select(fd+1, &readfds, NULL, NULL, &tv))
{ {
if (g_TCServer.Receive(fd, &pack))
return;
// update statistics // update statistics
double rt = pack.rt_timer.time(); // the round-trip time double rt = pack.rt_timer.time(); // the round-trip time
if (0 == m_RTCount) if (0 == m_RTCount)
@ -219,7 +207,7 @@ void CCodecStream::Task(void)
return; return;
} }
if (g_TCServer.Send(fd, Frame->GetCodecPacket())) if (g_TCServer.Send(Frame->GetCodecPacket()))
{ {
// ditto, we'll try to fix this on the next pass // ditto, we'll try to fix this on the next pass
return; return;

@ -27,34 +27,36 @@
void CTCSocket::Close() void CTCSocket::Close()
{ {
std::lock_guard<std::mutex> lck(m_Mutex); for (auto &item : m_Pfd)
for (auto item : m_FD) {
close(item.second); if (item.fd >= 0)
m_FD.clear(); {
close(item.fd);
}
}
m_Pfd.clear();
} }
void CTCSocket::Close(char mod) void CTCSocket::Close(char mod)
{ {
std::lock_guard<std::mutex> lck(m_Mutex); auto pos = m_Modules.find(mod);
auto item = m_FD.find(mod); if (std::string::npos == pos)
if (m_FD.end() == item)
{ {
std::cerr << "Could not find a file descriptor for module '" << mod << "'" << std::endl; std::cerr << "Could not find module '" << mod << "'" << std::endl;
return; return;
} }
close(item->second); close(m_Pfd[pos].fd);
m_FD.erase(item); m_Pfd[pos].fd = -1;
} }
void CTCSocket::Close(int fd) void CTCSocket::Close(int fd)
{ {
std::lock_guard<std::mutex> lck(m_Mutex); for (auto &p : m_Pfd)
for (auto &p : m_FD)
{ {
if (fd == p.second) if (fd == p.fd)
{ {
close(fd); close(p.fd);
m_FD.erase(p.first); p.fd = -1;
return; return;
} }
} }
@ -63,32 +65,47 @@ void CTCSocket::Close(int fd)
int CTCSocket::GetFD(char module) const int CTCSocket::GetFD(char module) const
{ {
std::lock_guard<std::mutex> lck(m_Mutex); auto pos = m_Modules.find(module);
const auto item = m_FD.find(module); if (std::string::npos == pos)
if (m_FD.cend() == item)
{
return -1; return -1;
} return m_Pfd[pos].fd;
return item->second;
} }
char CTCSocket::GetMod(int fd) const char CTCSocket::GetMod(int fd) const
{ {
std::lock_guard<std::mutex> lck(m_Mutex); for (unsigned i=0; i<m_Pfd.size(); i++)
for (const auto &p : m_FD)
{ {
if (fd == p.second) if (fd == m_Pfd[i].fd)
return p.first; {
return m_Modules[i];
}
} }
return '?'; return '?';
} }
bool CTCSocket::Send(int fd, const STCPacket *packet) bool CTCSocket::any_are_closed()
{
const auto n = m_Modules.size();
for (unsigned i=0; i<n; i++)
{
if (0 > m_Pfd[i].fd)
return true;
}
return false;
}
bool CTCSocket::Send(const STCPacket *packet)
{ {
const auto pos = m_Modules.find(packet->module);
if (pos == std::string::npos)
{
std::cerr << "Can't Send() this packet to unconfigured module '" << packet->module << "'" << std::endl;
return true;
}
unsigned count = 0; unsigned count = 0;
auto data = (const unsigned char *)packet; auto data = (const unsigned char *)packet;
do { do {
auto n = send(fd, data+count, sizeof(STCPacket)-count, 0); auto n = send(m_Pfd[pos].fd, data+count, sizeof(STCPacket)-count, 0);
if (n <= 0) if (n <= 0)
{ {
if (0 == n) if (0 == n)
@ -107,22 +124,47 @@ bool CTCSocket::Send(int fd, const STCPacket *packet)
return false; return false;
} }
bool CTCSocket::Receive(int fd, STCPacket *packet) // returns true if successful
bool CTCServer::Receive(char module, STCPacket *packet, int ms)
{ {
const auto pos = m_Modules.find(module);
if (pos == std::string::npos)
{
std::cerr << "Can't Recevieve on unconfigured module '" << module << "'" << std::endl;
return false;
}
auto rv = poll(&m_Pfd[pos], 1, ms);
if (rv < 0)
{
perror("Recieve poll");
Close(m_Pfd[pos].fd);
return false;
}
auto data = (unsigned char *)packet; auto data = (unsigned char *)packet;
auto n = recv(fd, data, sizeof(STCPacket), MSG_WAITALL); auto n = recv(m_Pfd[pos].fd, data, sizeof(STCPacket), MSG_WAITALL);
if (n < 0) if (n < 0)
{ {
perror("CTCSocket::Receive"); perror("Receive recv");
Close(fd); Close(m_Pfd[pos].fd);
return true; return false;
} }
return n == sizeof(STCPacket); if (n != sizeof(STCPacket))
std::cout << "Warning: Receive only read " << n << " bytes of the transcoder packet!" << std::endl;
return true;
} }
bool CTCServer::Open(const std::string &address, const std::string &modules, uint16_t port) bool CTCServer::Open(const std::string &address, const std::string &modules, uint16_t port)
{ {
m_Modules.assign(modules); m_Modules.assign(modules);
m_Pfd.resize(m_Modules.size()+1);
for (auto &pf : m_Pfd)
{
pf.fd = -1;
pf.events = POLLIN;
}
CIp ip(address.c_str(), AF_UNSPEC, SOCK_STREAM, port); CIp ip(address.c_str(), AF_UNSPEC, SOCK_STREAM, port);
@ -162,51 +204,38 @@ bool CTCServer::Open(const std::string &address, const std::string &modules, uin
auto n = m_Modules.size(); auto n = m_Modules.size();
std::cout << "Waiting for " << n << " transcoder connection(s)..." << std::endl; std::cout << "Waiting for " << n << " transcoder connection(s)..." << std::endl;
while (m_FD.size() < n) while (any_are_closed())
{ {
if (Accept()) if (Accept())
return true; return true;
} }
m_listenSock = fd; m_Pfd.back().fd = fd;
return false; return false;
} }
bool CTCServer::Accept() bool CTCServer::Accept()
{ {
struct timeval tv; auto rv = poll(&m_Pfd.back(), 1, 10);
fd_set readfds;
// 10 milliseconds
tv.tv_sec = 0;
tv.tv_usec = 10000;
FD_ZERO(&readfds);
FD_SET(m_listenSock, &readfds);
// don't care about writefds and exceptfds:
int rv = select(m_listenSock+1, &readfds, NULL, NULL, &tv);
if (rv < 0) if (rv < 0)
{ {
perror("Accept select"); perror("Accept poll");
return true; return true;
} }
if (0 == rv) // we timed out waiting for something if (0 == rv) // we timed out waiting for something
return false; return false;
// here comes a connect // rv has to be 1, so, here comes a connect request
CIp their_addr; // connector's address information CIp their_addr; // connector's address information
socklen_t sin_size = sizeof(struct sockaddr_storage); socklen_t sin_size = sizeof(struct sockaddr_storage);
auto newfd = accept(m_listenSock, their_addr.GetPointer(), &sin_size); auto newfd = accept(m_Pfd.back().fd, their_addr.GetPointer(), &sin_size);
if (newfd < 0) if (newfd < 0)
{ {
if (EAGAIN == errno || EWOULDBLOCK == errno)
return false;
perror("Accept accept"); perror("Accept accept");
return true; return true;
} }
@ -223,7 +252,8 @@ bool CTCServer::Accept()
return true; return true;
} }
if (std::string::npos == m_Modules.find(mod)) const auto pos = m_Modules.find(mod);
if (std::string::npos == pos)
{ {
std::cerr << "New connection for module '" << mod << "', but it's not configured!" << std::endl; std::cerr << "New connection for module '" << mod << "', but it's not configured!" << std::endl;
std::cerr << "The transcoded modules need to be configured identically for both urfd and tcd." << std::endl; std::cerr << "The transcoded modules need to be configured identically for both urfd and tcd." << std::endl;
@ -233,8 +263,7 @@ bool CTCServer::Accept()
std::cout << "File descriptor " << newfd << " opened TCP port for module '" << mod << "' on " << their_addr << std::endl; std::cout << "File descriptor " << newfd << " opened TCP port for module '" << mod << "' on " << their_addr << std::endl;
std::lock_guard<std::mutex> lck(m_Mutex); m_Pfd[pos].fd = newfd;
m_FD[mod] = newfd;
return false; return false;
} }
@ -259,6 +288,12 @@ bool CTCClient::Initialize(const std::string &address, const std::string &module
bool CTCClient::Connect(char module) bool CTCClient::Connect(char module)
{ {
const auto pos = m_Modules.find(module);
if (pos == std::string::npos)
{
std::cerr << "CTCClient::Connect: could not find module '" << module << "' in configured modules!" << std::endl;
return true;
}
CIp ip(m_Address.c_str(), AF_UNSPEC, SOCK_STREAM, m_Port); CIp ip(m_Address.c_str(), AF_UNSPEC, SOCK_STREAM, m_Port);
auto fd = socket(ip.GetFamily(), SOCK_STREAM, 0); auto fd = socket(ip.GetFamily(), SOCK_STREAM, 0);
@ -311,8 +346,7 @@ bool CTCClient::Connect(char module)
std::cout << "File descriptor " << fd << " on " << ip << " opened for module '" << module << "'" << std::endl; std::cout << "File descriptor " << fd << " on " << ip << " opened for module '" << module << "'" << std::endl;
std::lock_guard<std::mutex> lck(m_Mutex); m_Pfd[pos].fd = fd;
m_FD[module] = fd;
return false; return false;
} }

@ -19,7 +19,8 @@
#include <string> #include <string>
#include <cstdint> #include <cstdint>
#include <mutex> #include <mutex>
#include <unordered_map> #include <vector>
#include <poll.h>
#include "TCPacketDef.h" #include "TCPacketDef.h"
@ -33,16 +34,16 @@ public:
void Close(char module); // close a specific module void Close(char module); // close a specific module
void Close(int fd); // close a specific file descriptor void Close(int fd); // close a specific file descriptor
// bool functions return true on failure // most bool functions return true on failure
bool Send(int fd, const STCPacket *packet); bool Send(const STCPacket *packet);
bool Receive(int fd, STCPacket *packet);
int GetFD(char module) const; // can return -1! int GetFD(char module) const; // can return -1!
char GetMod(int fd) const; char GetMod(int fd) const;
protected: protected:
std::unordered_map<char, int> m_FD; bool any_are_closed();
mutable std::mutex m_Mutex; std::vector<struct pollfd> m_Pfd;
std::string m_Modules;
}; };
class CTCServer : public CTCSocket class CTCServer : public CTCSocket
@ -52,9 +53,7 @@ public:
~CTCServer() {} ~CTCServer() {}
bool Open(const std::string &address, const std::string &modules, uint16_t port); bool Open(const std::string &address, const std::string &modules, uint16_t port);
bool Accept(); bool Accept();
private: bool Receive(char module, STCPacket *packet, int ms);
int m_listenSock;
std::string m_Modules;
}; };
class CTCClient : public CTCSocket class CTCClient : public CTCSocket
@ -65,6 +64,6 @@ public:
bool Initialize(const std::string &address, const std::string &modules, uint16_t port); bool Initialize(const std::string &address, const std::string &modules, uint16_t port);
bool Connect(char module); bool Connect(char module);
private: private:
std::string m_Address, m_Modules; std::string m_Address;
uint16_t m_Port; uint16_t m_Port;
}; };

Loading…
Cancel
Save

Powered by TurnKey Linux.