wide open module configurations, [A-Z]+

unstable
Tom Early 4 years ago
parent 197fb2c166
commit af66a282fd

@ -39,18 +39,14 @@ CCallsignListItem::CCallsignListItem(const CCallsign &callsign, const CIp &ip, c
::memset(m_Modules, 0, sizeof(m_Modules));
if ( modules[0] == '*' )
{
for ( char i = 0; i < NB_OF_MODULES; i++ )
{
m_Modules[i] = 'A' + i;
}
::memcpy(m_Modules, ACTIVE_MODULES, sizeof(ACTIVE_MODULES));
}
else
{
int n = MIN(::strlen(modules), sizeof(m_Modules)-1);
int j = 0;
for ( int i = 0; i < n; i++ )
for (int i=0, j=0; i<n; i++)
{
if ( (modules[i] - 'A') < NB_OF_MODULES )
if (strchr(ACTIVE_MODULES, modules[i]))
{
m_Modules[j++] = modules[i];
}
@ -69,18 +65,14 @@ CCallsignListItem::CCallsignListItem(const CCallsign &callsign, const char *url,
::memset(m_Modules, 0, sizeof(m_Modules));
if ( modules[0] == '*' )
{
for ( char i = 0; i < NB_OF_MODULES; i++ )
{
m_Modules[i] = 'A' + i;
}
::memcpy(m_Modules, ACTIVE_MODULES, sizeof(ACTIVE_MODULES));
}
else
{
int n = MIN(::strlen(modules), sizeof(m_Modules)-1);
int j = 0;
for ( int i = 0; i < n; i++ )
for (int i=0, j=0; i<n; i++)
{
if ( (modules[i] - 'A') < NB_OF_MODULES )
if (strchr(ACTIVE_MODULES, modules[i]))
{
m_Modules[j++] = modules[i];
}

@ -187,7 +187,7 @@ void CDcsProtocol::Task(void)
void CDcsProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header, const CIp &Ip)
{
// find the stream
CPacketStream *stream = GetStream(Header->GetStreamId());
auto stream = GetStream(Header->GetStreamId());
if ( stream )
{
// stream already open
@ -235,14 +235,15 @@ void CDcsProtocol::HandleQueue(void)
auto packet = m_Queue.pop();
// get our sender's id
int iModId = g_Reflector.GetModuleIndex(packet->GetModuleId());
const auto module = packet->GetModule();
// check if it's header and update cache
if ( packet->IsDvHeader() )
{
// this relies on queue feeder setting valid module id
m_StreamsCache[iModId].m_dvHeader = CDvHeaderPacket((const CDvHeaderPacket &)*packet);
m_StreamsCache[iModId].m_iSeqCounter = 0;
// m_StreamsCache[module] will be created if it doesn't exist
m_StreamsCache[module].m_dvHeader = CDvHeaderPacket((const CDvHeaderPacket &)*packet.get());
m_StreamsCache[module].m_iSeqCounter = 0;
}
else
{
@ -251,17 +252,17 @@ void CDcsProtocol::HandleQueue(void)
if ( packet->IsLastPacket() )
{
EncodeDvLastPacket(
m_StreamsCache[iModId].m_dvHeader,
(const CDvFramePacket &)*packet,
m_StreamsCache[iModId].m_iSeqCounter++,
m_StreamsCache[module].m_dvHeader,
(const CDvFramePacket &)*packet.get(),
m_StreamsCache[module].m_iSeqCounter++,
&buffer);
}
else if ( packet->IsDvFrame() )
{
EncodeDvPacket(
m_StreamsCache[iModId].m_dvHeader,
(const CDvFramePacket &)*packet,
m_StreamsCache[iModId].m_iSeqCounter++,
m_StreamsCache[module].m_dvHeader,
(const CDvFramePacket &)*packet.get(),
m_StreamsCache[module].m_iSeqCounter++,
&buffer);
}
@ -275,7 +276,7 @@ void CDcsProtocol::HandleQueue(void)
while ( (client = clients->FindNextClient(EProtocol::dcs, it)) != nullptr )
{
// is this client busy ?
if ( !client->IsAMaster() && (client->GetReflectorModule() == packet->GetModuleId()) )
if ( !client->IsAMaster() && (client->GetReflectorModule() == module) )
{
// no, send the packet
Send(buffer, client->GetIp());

@ -33,8 +33,7 @@
class CDcsStreamCacheItem
{
public:
CDcsStreamCacheItem() { m_iSeqCounter = 0; }
~CDcsStreamCacheItem() {}
CDcsStreamCacheItem() : m_iSeqCounter(0) {}
CDvHeaderPacket m_dvHeader;
uint32_t m_iSeqCounter;
@ -80,5 +79,5 @@ protected:
CTimer m_LastKeepaliveTime;
// for queue header caches
std::array<CDcsStreamCacheItem, NB_OF_MODULES> m_StreamsCache;
std::unordered_map<char, CDcsStreamCacheItem> m_StreamsCache;
};

@ -236,7 +236,7 @@ void CDextraProtocol::HandleQueue(void)
while ( (client = clients->FindNextClient(EProtocol::dextra, it)) != nullptr )
{
// is this client busy ?
if ( !client->IsAMaster() && (client->GetReflectorModule() == packet->GetModuleId()) )
if ( !client->IsAMaster() && (client->GetReflectorModule() == packet->GetModule()) )
{
// no, send the packet
int n = packet->IsDvHeader() ? 5 : 1;
@ -391,7 +391,7 @@ void CDextraProtocol::HandlePeerLinks(void)
void CDextraProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header, const CIp &Ip)
{
// find the stream
CPacketStream *stream = GetStream(Header->GetStreamId());
auto stream = GetStream(Header->GetStreamId());
if ( stream )
{
// stream already open

@ -259,7 +259,7 @@ void CDmrmmdvmProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Hea
bool lastheard = false;
// find the stream
CPacketStream *stream = GetStream(Header->GetStreamId());
auto stream = GetStream(Header->GetStreamId());
if ( stream )
{
// stream already open
@ -354,7 +354,7 @@ void CDmrmmdvmProtocol::HandleQueue(void)
auto packet = m_Queue.pop();
// get our sender's id
int iModId = g_Reflector.GetModuleIndex(packet->GetModuleId());
const auto mod = packet->GetModule();
// encode
CBuffer buffer;
@ -364,19 +364,19 @@ void CDmrmmdvmProtocol::HandleQueue(void)
{
// update local stream cache
// this relies on queue feeder setting valid module id
m_StreamsCache[iModId].m_dvHeader = CDvHeaderPacket((const CDvHeaderPacket &)*packet);
m_StreamsCache[iModId].m_uiSeqId = 0;
m_StreamsCache[mod].m_dvHeader = CDvHeaderPacket((const CDvHeaderPacket &)*packet.get());
m_StreamsCache[mod].m_uiSeqId = 0;
// encode it
EncodeDvHeaderPacket((CDvHeaderPacket &)*packet, m_StreamsCache[iModId].m_uiSeqId, &buffer);
m_StreamsCache[iModId].m_uiSeqId = 1;
EncodeDvHeaderPacket((CDvHeaderPacket &)*packet.get(), m_StreamsCache[mod].m_uiSeqId, &buffer);
m_StreamsCache[mod].m_uiSeqId = 1;
}
// check if it's a last frame
else if ( packet->IsLastPacket() )
{
// encode it
EncodeDvLastPacket(m_StreamsCache[iModId].m_dvHeader, m_StreamsCache[iModId].m_uiSeqId, &buffer);
m_StreamsCache[iModId].m_uiSeqId = (m_StreamsCache[iModId].m_uiSeqId + 1) & 0xFF;
EncodeDvLastPacket(m_StreamsCache[mod].m_dvHeader, m_StreamsCache[mod].m_uiSeqId, &buffer);
m_StreamsCache[mod].m_uiSeqId = (m_StreamsCache[mod].m_uiSeqId + 1) & 0xFF;
}
// otherwise, just a regular DV frame
else
@ -385,20 +385,20 @@ void CDmrmmdvmProtocol::HandleQueue(void)
switch ( packet->GetDmrPacketSubid() )
{
case 1:
m_StreamsCache[iModId].m_dvFrame0 = CDvFramePacket((const CDvFramePacket &)*packet);
m_StreamsCache[mod].m_dvFrame0 = CDvFramePacket((const CDvFramePacket &)*packet.get());
break;
case 2:
m_StreamsCache[iModId].m_dvFrame1 = CDvFramePacket((const CDvFramePacket &)*packet);
m_StreamsCache[mod].m_dvFrame1 = CDvFramePacket((const CDvFramePacket &)*packet.get());
break;
case 3:
EncodeDvPacket(
m_StreamsCache[iModId].m_dvHeader,
m_StreamsCache[iModId].m_dvFrame0,
m_StreamsCache[iModId].m_dvFrame1,
(const CDvFramePacket &)*packet,
m_StreamsCache[iModId].m_uiSeqId,
m_StreamsCache[mod].m_dvHeader,
m_StreamsCache[mod].m_dvFrame0,
m_StreamsCache[mod].m_dvFrame1,
(const CDvFramePacket &)*packet.get(),
m_StreamsCache[mod].m_uiSeqId,
&buffer);
m_StreamsCache[iModId].m_uiSeqId = (m_StreamsCache[iModId].m_uiSeqId + 1) & 0xFF;
m_StreamsCache[mod].m_uiSeqId = (m_StreamsCache[mod].m_uiSeqId + 1) & 0xFF;
break;
default:
break;
@ -415,7 +415,7 @@ void CDmrmmdvmProtocol::HandleQueue(void)
while ( (client = clients->FindNextClient(EProtocol::dmrmmdvm, it)) != nullptr )
{
// is this client busy ?
if ( !client->IsAMaster() && (client->GetReflectorModule() == packet->GetModuleId()) )
if ( !client->IsAMaster() && (client->GetReflectorModule() == packet->GetModule()) )
{
// no, send the packet
Send(buffer, client->GetIp());
@ -972,9 +972,13 @@ void CDmrmmdvmProtocol::EncodeDvLastPacket(const CDvHeaderPacket &Packet, uint8_
char CDmrmmdvmProtocol::DmrDstIdToModule(uint32_t tg) const
{
// is it a 4xxx ?
if ( (tg >= 4001) && (tg <= (4000 + NB_OF_MODULES)) )
if (tg > 4000 && tg < 4027)
{
return ((char)(tg - 4001) + 'A');
const char mod = 'A' + (tg - 4001U);
if (strchr(ACTIVE_MODULES, mod))
{
return mod;
}
}
return ' ';
}

@ -45,9 +45,6 @@
class CDmrmmdvmStreamCacheItem
{
public:
CDmrmmdvmStreamCacheItem() {}
~CDmrmmdvmStreamCacheItem() {}
CDvHeaderPacket m_dvHeader;
CDvFramePacket m_dvFrame0;
CDvFramePacket m_dvFrame1;
@ -117,7 +114,7 @@ protected:
uint16_t m_uiStreamId;
// for queue header caches
std::array<CDmrmmdvmStreamCacheItem, NB_OF_MODULES> m_StreamsCache;
std::unordered_map<char, CDmrmmdvmStreamCacheItem> m_StreamsCache;
// for authentication
uint32_t m_uiAuthSeed;

@ -181,7 +181,7 @@ void CDmrplusProtocol::Task(void)
void CDmrplusProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header, const CIp &Ip)
{
// find the stream
CPacketStream *stream = GetStream(Header->GetStreamId());
auto stream = GetStream(Header->GetStreamId());
if ( stream )
{
// stream already open
@ -227,7 +227,7 @@ void CDmrplusProtocol::HandleQueue(void)
auto packet = m_Queue.pop();
// get our sender's id
int iModId = g_Reflector.GetModuleIndex(packet->GetModuleId());
const auto mod = packet->GetModule();
// encode
CBuffer buffer;
@ -237,11 +237,11 @@ void CDmrplusProtocol::HandleQueue(void)
{
// update local stream cache
// this relies on queue feeder setting valid module id
m_StreamsCache[iModId].m_dvHeader = CDvHeaderPacket((const CDvHeaderPacket &)*packet);
m_StreamsCache[iModId].m_uiSeqId = 4;
m_StreamsCache[mod].m_dvHeader = CDvHeaderPacket((const CDvHeaderPacket &)*packet.get());
m_StreamsCache[mod].m_uiSeqId = 4;
// encode it
EncodeDvHeaderPacket((const CDvHeaderPacket &)*packet, &buffer);
EncodeDvHeaderPacket((const CDvHeaderPacket &)*packet.get(), &buffer);
}
else
{
@ -249,20 +249,20 @@ void CDmrplusProtocol::HandleQueue(void)
switch ( packet->GetDmrPacketSubid() )
{
case 1:
m_StreamsCache[iModId].m_dvFrame0 = CDvFramePacket((const CDvFramePacket &)*packet);
m_StreamsCache[mod].m_dvFrame0 = CDvFramePacket((const CDvFramePacket &)*packet.get());
break;
case 2:
m_StreamsCache[iModId].m_dvFrame1 = CDvFramePacket((const CDvFramePacket &)*packet);
m_StreamsCache[mod].m_dvFrame1 = CDvFramePacket((const CDvFramePacket &)*packet.get());
break;
case 3:
EncodeDvPacket(
m_StreamsCache[iModId].m_dvHeader,
m_StreamsCache[iModId].m_dvFrame0,
m_StreamsCache[iModId].m_dvFrame1,
(const CDvFramePacket &)*packet,
m_StreamsCache[iModId].m_uiSeqId,
m_StreamsCache[mod].m_dvHeader,
m_StreamsCache[mod].m_dvFrame0,
m_StreamsCache[mod].m_dvFrame1,
(const CDvFramePacket &)*packet.get(),
m_StreamsCache[mod].m_uiSeqId,
&buffer);
m_StreamsCache[iModId].m_uiSeqId = GetNextSeqId(m_StreamsCache[iModId].m_uiSeqId);
m_StreamsCache[mod].m_uiSeqId = GetNextSeqId(m_StreamsCache[mod].m_uiSeqId);
break;
default:
break;
@ -280,7 +280,7 @@ void CDmrplusProtocol::HandleQueue(void)
while ( (client = clients->FindNextClient(EProtocol::dmrplus, it)) != nullptr )
{
// is this client busy ?
if ( !client->IsAMaster() && (client->GetReflectorModule() == packet->GetModuleId()) )
if ( !client->IsAMaster() && (client->GetReflectorModule() == packet->GetModule()) )
{
// no, send the packet
Send(buffer, client->GetIp());
@ -651,9 +651,12 @@ uint8_t CDmrplusProtocol::GetNextSeqId(uint8_t uiSeqId) const
char CDmrplusProtocol::DmrDstIdToModule(uint32_t tg) const
{
// is it a 4xxx ?
if ( (tg >= 4001) && (tg <= (4000 + NB_OF_MODULES)) )
{
return ((char)(tg - 4001) + 'A');
if (tg > 4000 && tg < 4027) {
char mod = 'A' + (tg - 4001U);
if (strchr(ACTIVE_MODULES, mod))
{
return mod;
}
}
return ' ';
}

@ -100,5 +100,5 @@ protected:
CTimer m_LastKeepaliveTime;
// for queue header caches
std::array<CDmrplusStreamCacheItem, NB_OF_MODULES> m_StreamsCache;
std::unordered_map<char, CDmrplusStreamCacheItem> m_StreamsCache;
};

@ -175,7 +175,7 @@ void CDplusProtocol::Task(void)
void CDplusProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header, const CIp &Ip)
{
// find the stream
CPacketStream *stream = GetStream(Header->GetStreamId());
auto stream = GetStream(Header->GetStreamId());
if ( stream )
{
// stream already open
@ -241,14 +241,14 @@ void CDplusProtocol::HandleQueue(void)
auto packet = m_Queue.pop();
// get our sender's id
int iModId = g_Reflector.GetModuleIndex(packet->GetModuleId());
const auto mod = packet->GetModule();
// check if it's header and update cache
if ( packet->IsDvHeader() )
{
// this relies on queue feeder setting valid module id
m_StreamsCache[iModId].m_dvHeader = CDvHeaderPacket((const CDvHeaderPacket &)*packet);
m_StreamsCache[iModId].m_iSeqCounter = 0;
m_StreamsCache[mod].m_dvHeader = CDvHeaderPacket((const CDvHeaderPacket &)*packet.get());
m_StreamsCache[mod].m_iSeqCounter = 0;
}
// encode it
@ -280,10 +280,10 @@ void CDplusProtocol::HandleQueue(void)
Send(buffer, client->GetIp());
// is it time to insert a DVheader copy ?
if ( (m_StreamsCache[iModId].m_iSeqCounter++ % 21) == 20 )
if ( (m_StreamsCache[mod].m_iSeqCounter++ % 21) == 20 )
{
// yes, clone it
CDvHeaderPacket packet2(m_StreamsCache[iModId].m_dvHeader);
CDvHeaderPacket packet2(m_StreamsCache[mod].m_dvHeader);
// and send it
SendDvHeader(&packet2, (CDplusClient *)client.get());
}

@ -29,10 +29,10 @@ class CDplusClient;
class CDPlusStreamCacheItem
{
public:
CDPlusStreamCacheItem() { m_iSeqCounter = 0; }
CDPlusStreamCacheItem() : m_iSeqCounter(0) {}
CDvHeaderPacket m_dvHeader;
uint8_t m_iSeqCounter;
uint8_t m_iSeqCounter;
};
class CDplusProtocol : public CProtocol
@ -79,5 +79,5 @@ protected:
CTimer m_LastKeepaliveTime;
// for queue header caches
std::array<CDPlusStreamCacheItem, NB_OF_MODULES> m_StreamsCache;
std::unordered_map<char, CDPlusStreamCacheItem> m_StreamsCache;
};

@ -465,7 +465,7 @@ void CG3Protocol::HandleQueue(void)
while ( (client = clients->FindNextClient(EProtocol::g3, it)) != nullptr )
{
// is this client busy ?
if ( !client->IsAMaster() && (client->GetReflectorModule() == packet->GetModuleId()) )
if ( !client->IsAMaster() && (client->GetReflectorModule() == packet->GetModule()) )
{
// not busy, send the packet
int n = packet->IsDvHeader() ? 5 : 1;
@ -516,7 +516,7 @@ void CG3Protocol::HandleKeepalives(void)
void CG3Protocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header, const CIp &Ip)
{
// find the stream
CPacketStream *stream = GetStream(Header->GetStreamId(), &Ip);
auto stream = GetStream(Header->GetStreamId(), &Ip);
if ( stream )
{

@ -47,9 +47,6 @@ int main()
// splash
std::cout << "Starting " << cs << " " << VERSION_MAJOR << "." << VERSION_MINOR << "." << VERSION_REVISION << std::endl << std::endl;
// initialize reflector
g_Reflector.SetCallsign(cs.c_str());
#ifdef TRANSCODER_IP
g_Reflector.SetTranscoderIp(TRANSCODER_IP, INET6_ADDRSTRLEN);
#endif

@ -23,6 +23,7 @@
#include <array>
#include <list>
#include <map>
#include <unordered_map>
#include <queue>
#include <chrono>
#include <future>

@ -28,7 +28,7 @@ CPacket::CPacket()
m_uiYsfPacketId = 0;
m_uiYsfPacketSubId = 0;
m_uiYsfPacketFrameId = 0;
m_uiModuleId = ' ';
m_cModule = ' ';
m_uiOriginId = ORIGIN_LOCAL;
};
@ -43,7 +43,7 @@ CPacket::CPacket(uint16_t sid, uint8_t dstarpid)
m_uiYsfPacketId = 0xFF;
m_uiYsfPacketSubId = 0xFF;
m_uiYsfPacketFrameId = 0xFF;
m_uiModuleId = ' ';
m_cModule = ' ';
m_uiOriginId = ORIGIN_LOCAL;
};
@ -58,7 +58,7 @@ CPacket::CPacket(uint16_t sid, uint8_t dmrpid, uint8_t dmrspid)
m_uiYsfPacketId = 0xFF;
m_uiYsfPacketSubId = 0xFF;
m_uiYsfPacketFrameId = 0xFF;
m_uiModuleId = ' ';
m_cModule = ' ';
m_uiOriginId = ORIGIN_LOCAL;
};
@ -73,7 +73,7 @@ CPacket::CPacket(uint16_t sid, uint8_t ysfpid, uint8_t ysfsubpid, uint8_t ysffri
m_uiDstarPacketId = 0xFF;
m_uiDmrPacketId = 0xFF;
m_uiDmrPacketSubid = 0xFF;
m_uiModuleId = ' ';
m_cModule = ' ';
m_uiOriginId = ORIGIN_LOCAL;
}
@ -88,7 +88,7 @@ CPacket::CPacket(uint16_t sid, uint8_t dstarpid, uint8_t dmrpid, uint8_t dmrsubp
m_uiYsfPacketId = ysfpid;
m_uiYsfPacketSubId = ysfsubpid;
m_uiYsfPacketFrameId = ysffrid;
m_uiModuleId = ' ';
m_cModule = ' ';
m_uiOriginId = ORIGIN_LOCAL;
}

@ -52,20 +52,20 @@ public:
// get
virtual bool IsValid(void) const { return true; }
uint16_t GetStreamId(void) const { return m_uiStreamId; }
uint8_t GetPacketId(void) const { return m_uiDstarPacketId; }
uint8_t GetDstarPacketId(void) const { return m_uiDstarPacketId; }
uint8_t GetDmrPacketId(void) const { return m_uiDmrPacketId; }
uint8_t GetDmrPacketSubid(void) const { return m_uiDmrPacketSubid; }
uint8_t GetYsfPacketId(void) const { return m_uiYsfPacketId; }
uint8_t GetYsfPacketSubId(void) const { return m_uiYsfPacketSubId; }
uint8_t GetYsfPacketFrameId(void) const { return m_uiYsfPacketFrameId; }
uint8_t GetModuleId(void) const { return m_uiModuleId; }
bool IsLocalOrigin(void) const { return (m_uiOriginId == ORIGIN_LOCAL); }
uint16_t GetStreamId(void) const { return m_uiStreamId; }
uint8_t GetPacketId(void) const { return m_uiDstarPacketId; }
uint8_t GetDstarPacketId(void) const { return m_uiDstarPacketId; }
uint8_t GetDmrPacketId(void) const { return m_uiDmrPacketId; }
uint8_t GetDmrPacketSubid(void) const { return m_uiDmrPacketSubid; }
uint8_t GetYsfPacketId(void) const { return m_uiYsfPacketId; }
uint8_t GetYsfPacketSubId(void) const { return m_uiYsfPacketSubId; }
uint8_t GetYsfPacketFrameId(void) const { return m_uiYsfPacketFrameId; }
char GetModule(void) const { return m_cModule; }
bool IsLocalOrigin(void) const { return (m_uiOriginId == ORIGIN_LOCAL); }
// set
void UpdatePids(uint32_t);
void SetModuleId(uint8_t uiId) { m_uiModuleId = uiId; }
void SetModule(char c) { m_cModule = c; }
void SetLocalOrigin(void) { m_uiOriginId = ORIGIN_LOCAL; }
void SetRemotePeerOrigin(void) { m_uiOriginId = ORIGIN_PEER; }
@ -78,6 +78,6 @@ protected:
uint8_t m_uiYsfPacketId;
uint8_t m_uiYsfPacketSubId;
uint8_t m_uiYsfPacketFrameId;
uint8_t m_uiModuleId;
char m_cModule;
uint8_t m_uiOriginId;
};

@ -149,7 +149,7 @@ bool CProtocol::EncodeDvPacket(const CPacket &packet, CBuffer *buffer) const
void CProtocol::OnDvFramePacketIn(std::unique_ptr<CDvFramePacket> &Frame, const CIp *Ip)
{
// find the stream
CPacketStream *stream = GetStream(Frame->GetStreamId(), Ip);
auto stream = GetStream(Frame->GetStreamId(), Ip);
if ( stream )
{
//std::cout << "DV frame" << "from " << *Ip << std::endl;
@ -167,7 +167,7 @@ void CProtocol::OnDvFramePacketIn(std::unique_ptr<CDvFramePacket> &Frame, const
void CProtocol::OnDvLastFramePacketIn(std::unique_ptr<CDvLastFramePacket> &Frame, const CIp *Ip)
{
// find the stream
CPacketStream *stream = GetStream(Frame->GetStreamId(), Ip);
auto stream = GetStream(Frame->GetStreamId(), Ip);
if ( stream )
{
// push
@ -191,7 +191,7 @@ void CProtocol::OnDvLastFramePacketIn(std::unique_ptr<CDvLastFramePacket> &Frame
////////////////////////////////////////////////////////////////////////////////////////
// stream handle helpers
CPacketStream *CProtocol::GetStream(uint16_t uiStreamId, const CIp *Ip)
std::shared_ptr<CPacketStream> CProtocol::GetStream(uint16_t uiStreamId, const CIp *Ip)
{
for ( auto it=m_Streams.begin(); it!=m_Streams.end(); it++ )
{

@ -96,7 +96,7 @@ protected:
virtual void OnDvLastFramePacketIn(std::unique_ptr<CDvLastFramePacket> &, const CIp * = nullptr);
// stream handle helpers
CPacketStream *GetStream(uint16_t, const CIp * = nullptr);
std::shared_ptr<CPacketStream> GetStream(uint16_t, const CIp * = nullptr);
void CheckStreamsTimeout(void);
// queue helper
@ -128,7 +128,7 @@ protected:
CUdpSocket m_Socket6;
// streams
std::list<CPacketStream *> m_Streams;
std::list<std::shared_ptr<CPacketStream>> m_Streams;
// queue
CPacketQueue m_Queue;

@ -29,22 +29,10 @@
////////////////////////////////////////////////////////////////////////////////////////
// constructor
CReflector::CReflector()
CReflector::CReflector() : m_Callsign(CALLSIGN), m_Modules(ACTIVE_MODULES), keep_running(true)
{
keep_running = true;
#ifdef DEBUG_DUMPFILE
m_DebugFile.open("/Users/jeanluc/Desktop/xlxdebug.txt");
#endif
}
CReflector::CReflector(const CCallsign &callsign)
{
#ifdef DEBUG_DUMPFILE
m_DebugFile.close();
#endif
keep_running = true;
m_Callsign = callsign;
}
////////////////////////////////////////////////////////////////////////////////////////
// destructor
@ -62,13 +50,13 @@ CReflector::~CReflector()
m_JsonReportFuture.get();
}
#endif
for ( int i = 0; i < NB_OF_MODULES; i++ )
for (auto it=m_Modules.cbegin(); it!=m_Modules.cend(); it++)
{
if ( m_RouterFuture[i].valid() )
{
m_RouterFuture[i].get();
}
if (m_RouterFuture[*it].valid())
m_RouterFuture[*it].get();
}
m_RouterFuture.clear();
m_Stream.clear();
}
@ -103,9 +91,10 @@ bool CReflector::Start(void)
}
// start one thread per reflector module
for ( int i = 0; i < NB_OF_MODULES; i++ )
for (auto it=m_Modules.cbegin(); it!=m_Modules.cend(); it++)
{
m_RouterFuture[i] = std::async(std::launch::async, &CReflector::RouterThread, this, &(m_Stream[i]));
m_Stream[*it] = std::make_shared<CPacketStream>();
m_RouterFuture[*it] = std::async(std::launch::async, &CReflector::RouterThread, this, m_Stream[*it]);
}
// start the reporting threads
@ -135,12 +124,10 @@ void CReflector::Stop(void)
#endif
// stop & delete all router thread
for ( int i = 0; i < NB_OF_MODULES; i++ )
for (auto it=m_Modules.cbegin(); it!=m_Modules.cend(); it++)
{
if ( m_RouterFuture[i].valid() )
{
m_RouterFuture[i].get();
}
if (m_RouterFuture[*it].valid())
m_RouterFuture[*it].get();
}
// close protocols
@ -168,7 +155,7 @@ bool CReflector::IsStreaming(char module)
}
// clients MUST have bee locked by the caller so we can freely access it within the fuction
CPacketStream *CReflector::OpenStream(std::unique_ptr<CDvHeaderPacket> &DvHeader, std::shared_ptr<CClient>client)
std::shared_ptr<CPacketStream> CReflector::OpenStream(std::unique_ptr<CDvHeaderPacket> &DvHeader, std::shared_ptr<CClient>client)
{
// check sid is not zero
if ( 0U == DvHeader->GetStreamId() )
@ -188,7 +175,7 @@ CPacketStream *CReflector::OpenStream(std::unique_ptr<CDvHeaderPacket> &DvHeader
// get the module's queue
char module = DvHeader->GetRpt2Module();
CPacketStream *stream = GetStream(module);
auto stream = GetStream(module);
if ( stream == nullptr )
return nullptr;
@ -217,7 +204,7 @@ CPacketStream *CReflector::OpenStream(std::unique_ptr<CDvHeaderPacket> &DvHeader
return stream;
}
void CReflector::CloseStream(CPacketStream *stream)
void CReflector::CloseStream(std::shared_ptr<CPacketStream> stream)
{
if ( stream != nullptr )
{
@ -268,10 +255,10 @@ void CReflector::CloseStream(CPacketStream *stream)
////////////////////////////////////////////////////////////////////////////////////////
// router threads
void CReflector::RouterThread(CPacketStream *streamIn)
void CReflector::RouterThread(std::shared_ptr<CPacketStream> streamIn)
{
// get our module
uint8_t uiModuleId = GetStreamModule(streamIn);
const auto module = GetStreamModule(streamIn);
// get on input queue
std::unique_ptr<CPacket> packet;
@ -295,7 +282,7 @@ void CReflector::RouterThread(CPacketStream *streamIn)
if ( packet != nullptr )
{
// set origin
packet->SetModuleId(uiModuleId);
packet->SetModule(module);
// iterate on all protocols
m_Protocols.Lock();
@ -503,42 +490,31 @@ void CReflector::OnStreamClose(const CCallsign &callsign)
////////////////////////////////////////////////////////////////////////////////////////
// modules & queues
int CReflector::GetModuleIndex(char module) const
std::shared_ptr<CPacketStream> CReflector::GetStream(char module)
{
int i = (int)module - (int)'A';
if ( (i < 0) || (i >= NB_OF_MODULES) )
{
i = -1;
}
return i;
}
auto it=m_Stream.find(module);
if (it!=m_Stream.end())
return it->second;
CPacketStream *CReflector::GetStream(char module)
{
int i = GetModuleIndex(module);
if ( i >= 0 )
{
return &(m_Stream[i]);
}
return nullptr;
}
bool CReflector::IsStreamOpen(const std::unique_ptr<CDvHeaderPacket> &DvHeader)
{
for ( unsigned i = 0; i < m_Stream.size(); i++ )
for (auto it=m_Stream.begin(); it!=m_Stream.end(); it++)
{
if ( (m_Stream[i].GetStreamId() == DvHeader->GetStreamId()) && (m_Stream[i].IsOpen()) )
if ( (it->second->GetStreamId() == DvHeader->GetStreamId()) && (it->second->IsOpen()) )
return true;
}
return false;
}
char CReflector::GetStreamModule(CPacketStream *stream)
char CReflector::GetStreamModule(std::shared_ptr<CPacketStream> stream)
{
for ( unsigned i = 0; i < m_Stream.size(); i++ )
for (auto it=m_Stream.begin(); it!=m_Stream.end(); it++)
{
if ( &(m_Stream[i]) == stream )
return GetModuleLetter(i);
if ( it->second == stream )
return it->first;
}
return ' ';
}

@ -42,13 +42,11 @@ class CReflector
public:
// constructor
CReflector();
CReflector(const CCallsign &);
// destructor
virtual ~CReflector();
// settings
void SetCallsign(const CCallsign &callsign) { m_Callsign = callsign; }
//
const CCallsign &GetCallsign(void) const { return m_Callsign; }
#ifdef TRANSCODER_IP
@ -61,7 +59,7 @@ public:
void Stop(void);
// clients
CClients *GetClients(void) { m_Clients.Lock(); return &m_Clients; }
CClients *GetClients(void) { m_Clients.Lock(); return &m_Clients; }
void ReleaseClients(void) { m_Clients.Unlock(); }
// peers
@ -69,18 +67,16 @@ public:
void ReleasePeers(void) { m_Peers.Unlock(); }
// stream opening & closing
CPacketStream *OpenStream(std::unique_ptr<CDvHeaderPacket> &, std::shared_ptr<CClient>);
std::shared_ptr<CPacketStream> OpenStream(std::unique_ptr<CDvHeaderPacket> &, std::shared_ptr<CClient>);
bool IsStreaming(char);
void CloseStream(CPacketStream *);
void CloseStream(std::shared_ptr<CPacketStream>);
// users
CUsers *GetUsers(void) { m_Users.Lock(); return &m_Users; }
void ReleaseUsers(void) { m_Users.Unlock(); }
// get
bool IsValidModule(char c) const { return (GetModuleIndex(c) >= 0); }
int GetModuleIndex(char) const;
char GetModuleLetter(int i) const { return 'A' + (char)i; }
// check
bool IsValidModule(char c) const { return m_Modules.npos!=m_Modules.find(c); }
// notifications
void OnPeersChanged(void);
@ -91,16 +87,16 @@ public:
protected:
// threads
void RouterThread(CPacketStream *);
void RouterThread(std::shared_ptr<CPacketStream>);
void XmlReportThread(void);
#ifdef JSON_MONITOR
void JsonReportThread(void);
#endif
// streams
CPacketStream *GetStream(char);
bool IsStreamOpen(const std::unique_ptr<CDvHeaderPacket> &);
char GetStreamModule(CPacketStream *);
std::shared_ptr<CPacketStream> GetStream(char);
bool IsStreamOpen(const std::unique_ptr<CDvHeaderPacket> &);
char GetStreamModule(std::shared_ptr<CPacketStream>);
// xml helpers
void WriteXmlFile(std::ofstream &);
@ -116,24 +112,25 @@ protected:
protected:
// identity
CCallsign m_Callsign;
const CCallsign m_Callsign;
const std::string m_Modules;
#ifdef TRANSCODER_IP
char m_AmbedIp[INET6_ADDRSTRLEN];
char m_AmbedIp[INET6_ADDRSTRLEN];
#endif
// objects
CUsers m_Users; // sorted list of lastheard stations
CClients m_Clients; // list of linked repeaters/nodes/peers's modules
CPeers m_Peers; // list of linked peers
CProtocols m_Protocols; // list of supported protocol handlers
CUsers m_Users; // sorted list of lastheard stations
CClients m_Clients; // list of linked repeaters/nodes/peers's modules
CPeers m_Peers; // list of linked peers
CProtocols m_Protocols; // list of supported protocol handlers
// queues
std::array<CPacketStream, NB_OF_MODULES> m_Stream;
std::unordered_map<char, std::shared_ptr<CPacketStream>> m_Stream;
// threads
std::atomic<bool> keep_running;
std::array<std::future<void>, NB_OF_MODULES> m_RouterFuture;
std::future<void> m_XmlReportFuture, m_JsonReportFuture;
std::unordered_map<char, std::future<void>> m_RouterFuture;
std::future<void> m_XmlReportFuture /*, m_JsonReportFuture*/;
// notifications
CNotificationQueue m_Notifications;

@ -249,7 +249,7 @@ void CUlxProtocol::HandleQueue(void)
while ( (client = clients->FindNextClient(EProtocol::ulx, it)) != nullptr )
{
// is this client busy ?
if ( !client->IsAMaster() && (client->GetReflectorModule() == packet->GetModuleId()) )
if ( !client->IsAMaster() && (client->GetReflectorModule() == packet->GetModule()) )
{
// no, send the packet
// this is protocol revision dependent
@ -389,7 +389,7 @@ void CUlxProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header,
Header->SetRemotePeerOrigin();
// find the stream
CPacketStream *stream = GetStream(Header->GetStreamId());
auto stream = GetStream(Header->GetStreamId());
if ( stream )
{
// stream already open

@ -141,6 +141,7 @@ void CWiresxCmdHandler::Task(void)
// handle it
if ( bCmd )
{
const char *modules = ACTIVE_MODULES;
// fill our info object
Info = m_ReflectorWiresxInfo;
g_YsfNodeDir.FindFrequencies(Cmd.GetCallsign(), &uiNodeTxFreq, &uiNodeRxFreq);
@ -171,9 +172,9 @@ void CWiresxCmdHandler::Task(void)
ReplyToWiresxAllReqPacket(Cmd.GetIp(), Info, Cmd.GetArg());
break;
case WIRESX_CMD_CONN_REQ:
if ( (Cmd.GetArg() >= 1) && (Cmd.GetArg() <= NB_OF_MODULES) )
cModule = 'A' + (char)(Cmd.GetArg() - 1);
if (::strchr(modules, cModule))
{
cModule = 'A' + (char)(Cmd.GetArg() - 1);
std::cout << "Wires-X CONN_REQ command to link on module " << cModule << " from " << Cmd.GetCallsign() << " at " << Cmd.GetIp() << std::endl;
// acknowledge
ReplyToWiresxConnReqPacket(Cmd.GetIp(), Info, cModule);
@ -336,6 +337,8 @@ bool CWiresxCmdHandler::ReplyToWiresxAllReqPacket(const CIp &Ip, const CWiresxIn
::memcpy(data + 12U, WiresxInfo.GetNode(), 10U);
// number of entries
const char *modules = ACTIVE_MODULES;
uint NB_OF_MODULES = ::strlen(modules);
uint total = NB_OF_MODULES;
uint n = NB_OF_MODULES - Start;
if (n > 20U)
@ -349,7 +352,8 @@ bool CWiresxCmdHandler::ReplyToWiresxAllReqPacket(const CIp &Ip, const CWiresxIn
{
char item[16U];
// module A == 0
int RoomId = i + Start;
char RoomMod = modules[i + Start];
int RoomId = RoomMod - 'A';
// prepare
::memset(data + offset, ' ', 50U);
@ -361,7 +365,7 @@ bool CWiresxCmdHandler::ReplyToWiresxAllReqPacket(const CIp &Ip, const CWiresxIn
// refl->name
::memset(item, ' ', 16U);
::memcpy(item, "MODULE", 6U); // K2IE fix for U/C only radios
item[7] = 'A' + RoomId;
item[7] = RoomMod;
::memcpy(data + offset + 6U, item, 16U);
// refl->count
::sprintf(item, "%03d", RoomId + 1);

@ -225,7 +225,7 @@ void CYsfProtocol::Task(void)
void CYsfProtocol::OnDvHeaderPacketIn(std::unique_ptr<CDvHeaderPacket> &Header, const CIp &Ip)
{
// find the stream
CPacketStream *stream = GetStream(Header->GetStreamId());
auto stream = GetStream(Header->GetStreamId());
if ( stream )
{
// stream already open
@ -282,7 +282,7 @@ void CYsfProtocol::HandleQueue(void)
auto packet = m_Queue.pop();
// get our sender's id
int iModId = g_Reflector.GetModuleIndex(packet->GetModuleId());
const auto mod = packet->GetModule();
// encode
CBuffer buffer;
@ -292,7 +292,7 @@ void CYsfProtocol::HandleQueue(void)
{
// update local stream cache
// this relies on queue feeder setting valid module id
m_StreamsCache[iModId].m_dvHeader = CDvHeaderPacket((CDvHeaderPacket &)*packet);
m_StreamsCache[mod].m_dvHeader = CDvHeaderPacket((CDvHeaderPacket &)*packet.get());
// encode it
EncodeDvHeaderPacket((CDvHeaderPacket &)*packet.get(), &buffer);
@ -301,7 +301,7 @@ void CYsfProtocol::HandleQueue(void)
else if ( packet->IsLastPacket() )
{
// encode it
EncodeDvLastPacket(m_StreamsCache[iModId].m_dvHeader, &buffer);
EncodeDvLastPacket(m_StreamsCache[mod].m_dvHeader, &buffer);
}
// otherwise, just a regular DV frame
else
@ -311,11 +311,11 @@ void CYsfProtocol::HandleQueue(void)
if (sid <= 4)
{
//std::cout << (int)sid;
m_StreamsCache[iModId].m_dvFrames[sid] = CDvFramePacket((CDvFramePacket &)*packet);
m_StreamsCache[mod].m_dvFrames[sid] = CDvFramePacket((CDvFramePacket &)*packet.get());
if ( sid == 4 )
{
EncodeDvPacket(m_StreamsCache[iModId].m_dvHeader, m_StreamsCache[iModId].m_dvFrames, &buffer);
EncodeDvPacket(m_StreamsCache[mod].m_dvHeader, m_StreamsCache[mod].m_dvFrames, &buffer);
}
}
}
@ -330,7 +330,7 @@ void CYsfProtocol::HandleQueue(void)
while ( (client = clients->FindNextClient(EProtocol::ysf, it)) != nullptr )
{
// is this client busy ?
if ( !client->IsAMaster() && (client->GetReflectorModule() == packet->GetModuleId()) )
if ( !client->IsAMaster() && (client->GetReflectorModule() == packet->GetModule()) )
{
// no, send the packet
Send(buffer, client->GetIp());

@ -121,7 +121,7 @@ protected:
CTimer m_LastKeepaliveTime;
// for queue header caches
std::array<CYsfStreamCacheItem, NB_OF_MODULES> m_StreamsCache;
std::unordered_map<char, CYsfStreamCacheItem> m_StreamsCache;
// for wires-x
CWiresxCmdHandler m_WiresxCmdHandler;

Loading…
Cancel
Save

Powered by TurnKey Linux.