a new TCP socket

pull/3/head
Tom Early 2 years ago
parent 381268fb0b
commit a5495deb85

@ -33,12 +33,9 @@ IPv4Binding = 0.0.0.0
# define if you want to override what urfd finds using ipv6.icanhazip.com
# IPv6External = f:e:d:c:b:a:9:0
Transcoder = local # SORRY, but only local TC's are supported right now!
[Modules]
# Modules = ABCDEFGHIJKLMNOPQRSTUVWXYZ
Modules = ADMSZ
Transcoded = A # comment out if you don't have transcoding hardware
# Create Descriptions as needed...
DescriptionA = Transcoded
DescriptionD = DMR Chat
@ -46,6 +43,12 @@ DescriptionM = M17 Chat
DescriptionS = DStar Chat
DescriptionZ = Temp Meeting
[Transcoder]
Port = 10100 # TCP listening port for connection(s), set to 0 if there is no transcoder, then other two values will be ignored
BindingAddress = 127.0.0.1 # or ::1, the IPv4 or IPv6 "loop-back" address for a local transcoder
# For a connection to a remote transcoder, usually use the "any" address: 0.0.0.0 or ::
Modules = A # Transcoded modules one or three modules, depending on the hardware
# Protocols
[Brandmeister]
Enable = false # Set to true if you've configured BM connections in your urfd.interlink file.

@ -18,9 +18,14 @@
#include <string.h>
#include <sys/select.h>
#include "DVFramePacket.h"
#include "PacketStream.h"
#include "CodecStream.h"
#include "Reflector.h"
extern CReflector g_Reflector;
////////////////////////////////////////////////////////////////////////////////////////
// constructor
@ -42,7 +47,6 @@ CCodecStream::~CCodecStream()
m_Future.get();
}
// and close the socket
m_TCReader.Close();
}
void CCodecStream::ResetStats(uint16_t streamid, ECodecType type)
@ -80,12 +84,6 @@ void CCodecStream::ReportStats()
bool CCodecStream::InitCodecStream()
{
m_TCWriter.SetUp(REF2TC);
std::string name(TC2REF);
name.append(1, m_CSModule);
if (m_TCReader.Open(name.c_str()))
return true;
std::cout << "Initialized CodecStream receive socket " << name << std::endl;
keep_running = true;
try
{
@ -94,7 +92,6 @@ bool CCodecStream::InitCodecStream()
catch(const std::exception& e)
{
std::cerr << "Could not start Codec processing on module '" << m_CSModule << "': " << e.what() << std::endl;
m_TCReader.Close();
return true;
}
return false;
@ -114,10 +111,21 @@ void CCodecStream::Thread()
void CCodecStream::Task(void)
{
STCPacket pack;
struct timeval tv;
fd_set readfds;
tv.tv_sec = 0;
tv.tv_usec = 7000;
int fd = g_Reflector.tcServer.GetFD(m_CSModule);
FD_ZERO(&readfds);
FD_SET(fd, &readfds);
// any packet from transcoder
if (m_TCReader.Receive(&pack, 5))
// don't care about writefds and exceptfds:
if (select(fd+1, &readfds, NULL, NULL, &tv))
{
g_Reflector.tcServer.Receive(fd, &pack);
// update statistics
double rt = pack.rt_timer.time(); // the round-trip time
if (0 == m_RTCount)
@ -191,7 +199,7 @@ void CCodecStream::Task(void)
Frame->SetTCParams(m_uiTotalPackets++);
// now send to transcoder
m_TCWriter.Send(Frame->GetCodecPacket());
g_Reflector.tcServer.Send(Frame->GetCodecPacket());
// push to our local queue where it can wait for the transcoder
m_LocalQueue.Push(std::move(Packet));

@ -22,7 +22,6 @@
#include <future>
#include "DVFramePacket.h"
#include "UnixDgramSocket.h"
#include "SafePacketQueue.h"
////////////////////////////////////////////////////////////////////////////////////////
@ -64,10 +63,6 @@ protected:
uint8_t m_uiPid;
ECodecType m_eCodecIn;
// sockets
CUnixDgramReader m_TCReader;
CUnixDgramWriter m_TCWriter;
// associated packet stream
CPacketStream *m_PacketStream;

@ -32,6 +32,7 @@
// ini file keywords
#define JAUTOLINKMODULE "AutoLinkModule"
#define JBINDINGADDRESS "BindingAddress"
#define JBLACKLISTPATH "BlacklistPath"
#define JBOOTSTRAP "Bootstrap"
#define JBRANDMEISTER "Brandmeister"
@ -79,7 +80,6 @@
#define JRXPORT "RxPort"
#define JSPONSOR "Sponsor"
#define JSYSOPEMAIL "SysopEmail"
#define JTRANSCODED "Transcoded"
#define JTRANSCODER "Transcoder"
#define JTXPORT "TxPort"
#define JURF "URF"
@ -131,6 +131,7 @@ bool CConfigure::ReadData(const std::string &path)
ESection section = ESection::none;
counter = 0;
SJsonKeys::DB *pdb;
unsigned tcport = 0;
//data.ysfalmodule = 0;
//data.DPlusPort = data.DCSPort = data.DExtraPort = data.BMPort = data.DMRPlusPort = 0;
@ -180,6 +181,8 @@ bool CConfigure::ReadData(const std::string &path)
section = ESection::names;
else if (0 == hname.compare(JIPADDRESSES))
section = ESection::ip;
else if (0 == hname.compare(JTRANSCODER))
section = ESection::tc;
else if (0 == hname.compare(JMODULES))
section = ESection::modules;
else if (0 == hname.compare(JDPLUS))
@ -282,17 +285,26 @@ bool CConfigure::ReadData(const std::string &path)
{
data[g_Keys.ip.ipv6address] = value;
}
else if (0 == key.compare(JTRANSCODER))
else
badParam(key);
break;
case ESection::tc:
if (0 == key.compare(JPORT))
data[g_Keys.tc.port] = getUnsigned(value, "Transcoder Port", 0, 40000, 10100);
else if (key.compare(JBINDINGADDRESS))
data[g_Keys.tc.bind] = value;
else if (key.compare(JMODULES))
{
if (value.compare("local"))
std::string m(value);
if (checkModules(m))
{
std::cout << "WARNING: Line #" << counter << ": malformed transcoder address, '" << value << "', resetting..." << std::endl;
std::cerr << "ERROR: line #" << counter << ": no letters found in transcoder Modules: '" << m << "'" << std::endl;
rval = true;
}
data[g_Keys.ip.transcoder] = "local";
data[g_Keys.tc.modules] = m;
}
else
badParam(key);
break;
case ESection::modules:
if (0 == key.compare(JMODULES))
{
@ -304,16 +316,6 @@ bool CConfigure::ReadData(const std::string &path)
} else
data[g_Keys.modules.modules] = m;
}
else if (0 == key.compare(JTRANSCODED))
{
std::string m(value);
if (checkModules(m))
{
std::cerr << "ERROR: line #" << counter << ": no letters found in Transcoded: '" << m << "'" << std::endl;
rval = true;
} else
data[g_Keys.modules.tcmodules] = m;
}
else if (0 == key.compare(0, 11, "Description"))
{
if (12 == key.size() && isupper(key[11]))
@ -606,27 +608,6 @@ bool CConfigure::ReadData(const std::string &path)
if (isDefined(ErrorLevel::fatal, JMODULES, JMODULES, g_Keys.modules.modules, rval))
{
const auto mods(data[g_Keys.modules.modules].get<std::string>());
if (data.contains(g_Keys.modules.tcmodules))
{
const auto tcmods(data[g_Keys.modules.tcmodules].get<std::string>());
// how many transcoded modules
auto size = tcmods.size();
if (3 != size && 1 != size)
std::cout << "WARNING: [" << JMODULES << ']' << JTRANSCODED << " doesn't define one (or three) modules" << std::endl;
// make sure each transcoded module is configured
for (auto c : tcmods)
{
if (std::string::npos == mods.find(c))
{
std::cerr << "ERROR: transcoded module '" << c << "' not found in defined modules" << std::endl;
rval = true;
}
}
}
else
data[g_Keys.modules.tcmodules] = nullptr;
// finally, check the module descriptions
for (unsigned i=0; i<26; i++)
@ -652,6 +633,44 @@ bool CConfigure::ReadData(const std::string &path)
}
}
// Transcoder section
if (isDefined(ErrorLevel::fatal, JTRANSCODER, JPORT, g_Keys.tc.port, rval))
{
tcport = GetUnsigned(g_Keys.tc.port);
if (tcport)
{
if (isDefined(ErrorLevel::fatal, JTRANSCODER, JBINDINGADDRESS, g_Keys.tc.bind, rval))
{
const auto bind(data[g_Keys.tc.bind].get<std::string>());
if (!std::regex_match(bind, IPv4RegEx) && !std::regex_match(bind, IPv6RegEx))
{
std::cerr << "ERROR: Transcoder bind address [" << bind << "] is malformed" << std::endl;
rval = true;
}
}
if (isDefined(ErrorLevel::fatal, JTRANSCODER, JMODULES, g_Keys.tc.modules, rval))
{
const auto tcmods(data[g_Keys.tc.modules].get<std::string>());
// how many transcoded modules
auto size = tcmods.size();
if (3 != size && 1 != size)
std::cout << "WARNING: [" << JTRANSCODER << ']' << JMODULES << " doesn't define one (or three) modules" << std::endl;
// make sure each transcoded module is configured
const std::string mods(GetString(g_Keys.modules.modules));
for (auto c : tcmods)
{
if (std::string::npos == mods.find(c))
{
std::cerr << "ERROR: transcoded module '" << c << "' not found in defined modules" << std::endl;
rval = true;
}
}
}
}
}
// "simple" protocols with only a Port
isDefined(ErrorLevel::fatal, JDCS, JPORT, g_Keys.dcs.port, rval);
isDefined(ErrorLevel::fatal, JDEXTRA, JPORT, g_Keys.dextra.port, rval);
@ -691,11 +710,11 @@ bool CConfigure::ReadData(const std::string &path)
{
if (GetBoolean(g_Keys.usrp.enable))
{
if (IsString(g_Keys.modules.tcmodules))
if (tcport)
{
if (isDefined(ErrorLevel::fatal, JUSRP, JMODULE, g_Keys.usrp.module, rval))
{
if (std::string::npos == GetString(g_Keys.modules.tcmodules).find(GetString(g_Keys.usrp.module).at(0)))
if (std::string::npos == GetString(g_Keys.tc.modules).find(GetString(g_Keys.usrp.module).at(0)))
{
std::cerr << "ERROR: [" << JUSRP << ']' << JMODULE << " is not a transcoded module" << std::endl;
rval = true;

@ -25,7 +25,7 @@
enum class ErrorLevel { fatal, mild };
enum class ERefreshType { file, http, both };
enum class ESection { none, names, ip, modules, urf, dplus, dextra, dcs, g3, dmrplus, mmdvm, nxdn, bm, ysf, p25, m17, usrp, dmrid, nxdnid, ysffreq, files };
enum class ESection { none, names, ip, modules, urf, dplus, dextra, dcs, g3, dmrplus, mmdvm, nxdn, bm, ysf, p25, m17, usrp, dmrid, nxdnid, ysffreq, files, tc };
#define IS_TRUE(a) ((a)=='t' || (a)=='T' || (a)=='1')

@ -42,11 +42,14 @@ struct SJsonKeys {
struct NAMES { const std::string callsign, bootstrap, url, email, country, sponsor; }
names { "Callsign", "bootstrap", "DashboardUrl", "SysopEmail", "Country", "Sponsor" };
struct IP { const std::string ipv4bind, ipv4address, ipv6bind, ipv6address, transcoder; }
ip { "ipv4bind", "IPv4Address", "ipv6bind", "IPv6Address", "tcaddress" };
struct IP { const std::string ipv4bind, ipv4address, ipv6bind, ipv6address; }
ip { "ipv4bind", "IPv4Address", "ipv6bind", "IPv6Address" };
struct MODULES { const std::string modules, tcmodules, descriptor[26]; }
modules { "Modules", "TranscodedModules",
struct TC { const std::string port, bind, modules; }
tc { "tcport", "tcbind", "TranscodedModules" };
struct MODULES { const std::string modules, descriptor[26]; }
modules { "Modules",
"DescriptionA", "DescriptionB", "DescriptionC", "DescriptionD", "DescriptionE", "DescriptionF", "DescriptionG", "DescriptionH", "DescriptionI", "DescriptionJ", "DescriptionK", "DescriptionL", "DescriptionM", "DescriptionN", "DescriptionO", "DescriptionP", "DescriptionQ", "DescriptionR", "DescriptionS", "DescriptionT", "DescriptionU", "DescriptionV", "DescriptionW", "DescriptionX", "DescriptionY", "DescriptionZ" };
struct USRP { const std::string enable, ip, txport, rxport, module, callsign, filepath; }

@ -55,7 +55,8 @@ bool CReflector::Start(void)
const auto cs(g_Configure.GetString(g_Keys.names.callsign));
m_Callsign.SetCallsign(cs, false);
m_Modules.assign(g_Configure.GetString(g_Keys.modules.modules));
std::string tcmods(g_Configure.GetString(g_Keys.modules.tcmodules));
const auto tcmods(g_Configure.GetString(g_Keys.tc.modules));
const auto port = g_Configure.GetUnsigned(g_Keys.tc.port);
#ifndef NO_DHT
// start the dht instance
@ -67,6 +68,10 @@ bool CReflector::Start(void)
// let's go!
keep_running = true;
// init transcoder comms
if (port)
tcServer.Open(g_Configure.GetString(g_Keys.tc.bind), tcmods, port);
// init gate keeper. It can only return true!
g_GateKeeper.Init();
@ -93,10 +98,13 @@ bool CReflector::Start(void)
if (stream)
{
// if it's a transcoded module, then we need to initialize the codec stream
if (std::string::npos != tcmods.find(c))
if (port)
{
if (stream->InitCodecStream())
return true;
if (std::string::npos != tcmods.find(c))
{
if (stream->InitCodecStream())
return true;
}
}
m_Stream[c] = stream;
}
@ -139,6 +147,10 @@ void CReflector::Stop(void)
// stop & delete all threads
keep_running = false;
// stop transcoder comms
// if it was never opened, then there is nothing to close;
tcServer.Close();
// stop & delete report threads
if ( m_XmlReportFuture.valid() )
{
@ -637,7 +649,8 @@ void CReflector::PutDHTConfig()
cfg.ipv4addr.assign(g_Configure.GetString(g_Keys.ip.ipv4address));
cfg.ipv6addr.assign(g_Configure.GetString(g_Keys.ip.ipv6address));
cfg.modules.assign(g_Configure.GetString(g_Keys.modules.modules));
cfg.transcodedmods.assign(g_Configure.GetString(g_Keys.modules.tcmodules));
if (g_Configure.GetUnsigned(g_Keys.tc.port))
cfg.transcodedmods.assign(g_Configure.GetString(g_Keys.tc.modules));
cfg.url.assign(g_Configure.GetString(g_Keys.names.url));
cfg.email.assign(g_Configure.GetString(g_Keys.names.email));
cfg.country.assign(g_Configure.GetString(g_Keys.names.country));

@ -25,6 +25,7 @@
#include "Peers.h"
#include "Protocols.h"
#include "PacketStream.h"
#include "TCTCPSocket.h"
#ifndef NO_DHT
#include "dht-values.h"
@ -83,6 +84,9 @@ public:
void GetDHTConfig(const std::string &cs);
#endif
// transcoder communication
CTCTCPServer tcServer;
protected:
#ifndef NO_DHT
// Publish DHT

@ -0,0 +1,223 @@
// urfd -- The universal reflector
// Copyright © 2024 Thomas A. Early N7TAE
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#include <iostream>
#include <unistd.h>
#include <thread>
#include <chrono>
#include <sys/types.h>
#include <sys/socket.h>
#include "IP.h"
#include "TCTCPSocket.h"
void CTCTCPSocket::Close()
{
for (auto item : m_FD)
close(item.second);
m_FD.clear();
}
void CTCTCPSocket::Close(char mod)
{
auto item = m_FD.find(mod);
if (m_FD.end() == item)
return;
close(item->second);
m_FD.erase(item);
}
bool CTCTCPSocket::Send(const STCPacket *packet)
{
int fd = GetFD(packet->module);
if (fd < 0)
{
return true;
}
long count = 0;
auto data = (const unsigned char *)packet;
do {
auto n = send(fd, data+count, sizeof(STCPacket)-count, 0);
if (n <= 0)
{
if (0 == n)
{
std::cerr << "CTCTCPSocket::Send: socket on module '" << packet->module << "' has been closed!" << std::endl;
}
else
{
perror("CTCTCPSocket::Send");
}
Close(packet->module);
return true;
}
count += n;
} while (count < sizeof(STCPacket));
return false;
}
bool CTCTCPSocket::Receive(int fd, STCPacket *packet)
{
auto data = (unsigned char *)packet;
auto n = recv(fd, data, sizeof(STCPacket), MSG_WAITALL);
if (n < 0)
{
perror("CTCTCPSocket::Receive");
return true;
}
return n == sizeof(STCPacket);
}
int CTCTCPSocket::GetFD(char module) const
{
const auto item = m_FD.find(module);
if (m_FD.cend() == item)
{
return -1;
}
return item->second;
}
bool CTCTCPServer::Open(const std::string &address, const std::string &tcmodules, uint16_t port)
{
int fd;
CIp ip(address.c_str(), AF_UNSPEC, SOCK_STREAM, port);
fd = socket(ip.GetFamily(), SOCK_STREAM, 0);
if (fd < 0)
{
perror("Open socket");
return true;
}
int yes = 1;
int rv = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
if (rv < 0)
{
close(fd);
perror("Open setsockopt");
return true;
}
rv = bind(fd, ip.GetCPointer(), ip.GetSize());
if (rv < 0)
{
close(fd);
perror("Open bind");
return true;
}
rv = listen(fd, 3);
if (rv < 0)
{
perror("Open listen");
close(fd);
Close();
return true;
}
std::cout << "Waiting for " << tcmodules.size() << " transcoder connection(s)..." << std::endl;
for (unsigned x=0; x<tcmodules.size(); x++)
{
CIp their_addr; // connector's address information
socklen_t sin_size = sizeof(struct sockaddr_storage);
auto newfd = accept(fd, their_addr.GetPointer(), &sin_size);
if (newfd < 0)
{
perror("Open accept");
Close();
return true;
}
char mod;
rv = recv(newfd, &mod, 1, 0); // retrieve the identification byte
if (rv != 1)
{
if (rv < 0) perror("Open recv");
else std::cerr << "recv got no identification byte!" << std::endl;
close(newfd);
Close();
return true;
}
if (std::string::npos == tcmodules.find(mod))
{
std::cerr << "New connection for module '" << mod << "', but it's not config'ed!" << std::endl;
std::cerr << "The transcoded modules need to be configured identically for both urfd and tcd." << std::endl;
close(newfd);
Close();
return true;
}
std::cout << "File descriptor " << newfd << " opened TCP port for module '" << mod << "' on " << their_addr << std::endl;
m_FD[mod] = newfd;
}
close(fd); // we don't need this anymore.
return false;
}
bool CTCTCPClient::Open(const std::string &address, const std::string &tcmodules, uint16_t port)
{
std::cout << "Connecting to the TCP server..." << std::endl;
for (auto c : tcmodules)
{
CIp ip(address.c_str(), AF_UNSPEC, SOCK_STREAM, port);
auto fd = socket(ip.GetFamily(), SOCK_STREAM, 0);
if (fd < 0)
{
std::cout << "errno=" << errno << std::endl;
perror("Open socket");
Close();
return true;
}
int yes = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)))
{
perror("Open setsockopt");
Close();
close(fd);
return true;
}
unsigned count = 0;
while (connect(fd, ip.GetCPointer(), ip.GetSize()))
{
if (ECONNREFUSED == errno)
{
if (0 == count++ % 15) std::cout << "Connection refused! Restart the server." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
}
else
{
perror("Open connect");
Close();
close(fd);
return true;
}
}
std::cout << "File descriptor " << fd << " on " << ip << " opened for module '" << c << "'" << std::endl;
send(fd, &c, 1, 0); // send the identification byte
m_FD[c] = fd;
}
return false;
}

@ -0,0 +1,60 @@
// urfd -- The universal reflector
// Copyright © 2024 Thomas A. Early N7TAE
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#pragma once
#include <string>
#include <cstdint>
#include <unordered_map>
#include "TCPacketDef.h"
class CTCTCPSocket
{
public:
CTCTCPSocket() {}
virtual ~CTCTCPSocket() { Close(); }
void Close(); // close all open sockets
// bool functions return true on failure
virtual bool Open(const std::string &address, const std::string &modules, uint16_t port) = 0;
bool Send(const STCPacket *packet);
bool Receive(int fd, STCPacket *packet);
int GetFD(char module) const; // can return -1!
protected:
void Close(char); // close a specific module
std::unordered_map<char, int> m_FD;
};
class CTCTCPServer : public CTCTCPSocket
{
public:
CTCTCPServer() : CTCTCPSocket() {}
~CTCTCPServer() {}
bool Open(const std::string &address, const std::string &modules, uint16_t port);
};
class CTCTCPClient : public CTCTCPSocket
{
public:
CTCTCPClient() : CTCTCPSocket() {}
~CTCTCPClient() {}
bool Open(const std::string &address, const std::string &modules, uint16_t port);
};

@ -1,167 +0,0 @@
// Copyright © 2021 Thomas A. Early N7TAE
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#include <iostream>
#include <unistd.h>
#include <string.h>
#include <cstring>
#include <thread>
#include <chrono>
#include <errno.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include "UnixDgramSocket.h"
CUnixDgramReader::CUnixDgramReader() : fd(-1) {}
CUnixDgramReader::~CUnixDgramReader()
{
Close();
}
bool CUnixDgramReader::Open(const char *path) // returns true on failure
{
fd = socket(AF_UNIX, SOCK_DGRAM, 0);
if (fd < 0)
{
std::cerr << "socket() failed for " << path << ": " << strerror(errno) << std::endl;
return true;
}
//fcntl(fd, F_SETFL, O_NONBLOCK);
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path+1, path, sizeof(addr.sun_path)-2);
// We know path is a string, so we skip the first null, get the string length and add 1 for the begining Null
int path_len = sizeof(addr.sun_family) + strlen(addr.sun_path + 1) + 1;
int rval = bind(fd, (struct sockaddr *)&addr, path_len);
if (rval < 0)
{
std::cerr << "bind() failed for " << path << ": " << strerror(errno) << std::endl;
close(fd);
fd = -1;
return true;
}
return false;
}
bool CUnixDgramReader::Receive(STCPacket *pack, unsigned timeout) const
{
// socket valid ?
if ( 0 > fd )
return false;
// control socket
fd_set FdSet;
FD_ZERO(&FdSet);
FD_SET(fd, &FdSet);
struct timeval tv;
tv.tv_sec = timeout / 1000;
tv.tv_usec = (timeout % 1000) * 1000;
auto rval = select(fd + 1, &FdSet, 0, 0, &tv);
if (rval <= 0) {
if (rval < 0) {
std::cerr << "select() error on transcoder socket: " << strerror(errno) << std::endl;
}
return false;
}
return Read(pack);
}
bool CUnixDgramReader::Read(STCPacket *pack) const
{
auto len = read(fd, pack, sizeof(STCPacket));
if (len != sizeof(STCPacket)) {
std::cerr << "Received transcoder packet is wrong size: " << len << " but should be " << sizeof(STCPacket) << std::endl;
return false;
}
return true;
}
void CUnixDgramReader::Close()
{
if (fd >= 0)
close(fd);
fd = -1;
}
int CUnixDgramReader::GetFD() const
{
return fd;
}
CUnixDgramWriter::CUnixDgramWriter() {}
CUnixDgramWriter::~CUnixDgramWriter() {}
void CUnixDgramWriter::SetUp(const char *path) // returns true on failure
{
// setup the socket address
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path+1, path, sizeof(addr.sun_path)-2);
path_len = sizeof(addr.sun_family) + strlen(addr.sun_path + 1) + 1;
}
bool CUnixDgramWriter::Send(const STCPacket *pack) const
{
auto len = Write(pack, sizeof(STCPacket));
if (len != sizeof(STCPacket))
return true;
return false;
}
ssize_t CUnixDgramWriter::Write(const void *buf, ssize_t size) const
{
// open the socket
int fd = socket(AF_UNIX, SOCK_DGRAM, 0);
if (fd < 0)
{
std::cerr << "socket() failed for " << addr.sun_path+1 << ": " << strerror(errno) << std::endl;
return -1;
}
// connect to the receiver
// We know path is a string, so we skip the first null, get the string length and add 1 for the begining Null
int rval = connect(fd, (struct sockaddr *)&addr, path_len);
if (rval < 0)
{
std::cerr << "connect() failed for " << addr.sun_path+1 << ": " << strerror(errno) << std::endl;
close(fd);
return -1;
}
auto written = write(fd, buf, size);
if (written != size) {
std::cerr << "write on " << addr.sun_path+1;
if (written < 0)
std::cerr << " returned error: " << strerror(errno) << std::endl;
else if (written == 0)
std::cerr << " returned zero" << std::endl;
else
std::cerr << " only wrote " << written << " bytes, should be " << size << std::endl;
}
close(fd);
return written;
}

@ -1,51 +0,0 @@
#pragma once
// Copyright © 2021 Thomas A. Early N7TAE
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#pragma once
#include <stdlib.h>
#include <sys/un.h>
#include "TCPacketDef.h"
class CUnixDgramReader
{
public:
CUnixDgramReader();
~CUnixDgramReader();
bool Open(const char *path);
bool Read(STCPacket *pack) const;
bool Receive(STCPacket *pack, unsigned timeout) const;
void Close();
int GetFD() const;
private:
int fd;
};
class CUnixDgramWriter
{
public:
CUnixDgramWriter();
~CUnixDgramWriter();
void SetUp(const char *path);
bool Send(const STCPacket *pack) const;
private:
ssize_t Write(const void *buf, ssize_t size) const;
struct sockaddr_un addr;
int path_len;
};
Loading…
Cancel
Save

Powered by TurnKey Linux.