3 input queues to make sure a single module doesn't dominate the device

main
Tom Early 4 years ago
parent 91707340bb
commit e1219ef345

@ -144,20 +144,14 @@ void CController::ReadReflectorThread()
switch (packet->GetCodecIn()) switch (packet->GetCodecIn())
{ {
case ECodecType::dstar: case ECodecType::dstar:
add_dst_mux.lock();
dstar_device.AddPacket(packet); dstar_device.AddPacket(packet);
add_dst_mux.unlock();
break; break;
case ECodecType::dmr: case ECodecType::dmr:
add_dmr_mux.lock();
dmr_device.AddPacket(packet); dmr_device.AddPacket(packet);
add_dmr_mux.unlock();
break; break;
case ECodecType::c2_1600: case ECodecType::c2_1600:
case ECodecType::c2_3200: case ECodecType::c2_3200:
c2_mux.lock();
codec2_queue.push(packet); codec2_queue.push(packet);
c2_mux.unlock();
break; break;
default: default:
Dump(packet, "ERROR: Received a reflector packet with unknown Codec:"); Dump(packet, "ERROR: Received a reflector packet with unknown Codec:");
@ -246,21 +240,15 @@ void CController::Codec2toAudio(std::shared_ptr<CTranscoderPacket> packet)
} }
} }
// the only thing left is to encode the two ambe, so push the packet onto both AMBE queues // the only thing left is to encode the two ambe, so push the packet onto both AMBE queues
add_dst_mux.lock();
dstar_device.AddPacket(packet); dstar_device.AddPacket(packet);
add_dst_mux.unlock();
add_dmr_mux.lock();
dmr_device.AddPacket(packet); dmr_device.AddPacket(packet);
add_dmr_mux.unlock();
} }
void CController::ProcessC2Thread() void CController::ProcessC2Thread()
{ {
while (keep_running) while (keep_running)
{ {
c2_mux.lock();
auto packet = codec2_queue.pop(); auto packet = codec2_queue.pop();
c2_mux.unlock();
if (packet) if (packet)
{ {
switch (packet->GetCodecIn()) switch (packet->GetCodecIn())
@ -300,12 +288,8 @@ void CController::RouteDstPacket(std::shared_ptr<CTranscoderPacket> packet)
if (ECodecType::dstar == packet->GetCodecIn()) if (ECodecType::dstar == packet->GetCodecIn())
{ {
// codec_in is dstar, the audio has just completed, so now calc the M17 and DMR // codec_in is dstar, the audio has just completed, so now calc the M17 and DMR
c2_mux.lock();
codec2_queue.push(packet); codec2_queue.push(packet);
c2_mux.unlock();
add_dmr_mux.lock();
dmr_device.AddPacket(packet); dmr_device.AddPacket(packet);
add_dmr_mux.unlock();
} }
else if (packet->AllCodecsAreSet()) else if (packet->AllCodecsAreSet())
{ {
@ -319,12 +303,8 @@ void CController::RouteDmrPacket(std::shared_ptr<CTranscoderPacket> packet)
{ {
if (ECodecType::dmr == packet->GetCodecIn()) if (ECodecType::dmr == packet->GetCodecIn())
{ {
c2_mux.lock();
codec2_queue.push(packet); codec2_queue.push(packet);
c2_mux.unlock();
add_dst_mux.lock();
dstar_device.AddPacket(packet); dstar_device.AddPacket(packet);
add_dst_mux.unlock();
} }
else if (packet->AllCodecsAreSet()) else if (packet->AllCodecsAreSet())
{ {

@ -53,7 +53,7 @@ protected:
CDV3003 dmr_device{Encoding::dmr}; CDV3003 dmr_device{Encoding::dmr};
CPacketQueue codec2_queue; CPacketQueue codec2_queue;
std::mutex add_dst_mux, add_dmr_mux, c2_mux, send_mux; std::mutex send_mux;
bool InitDevices(); bool InitDevices();
// processing threads // processing threads

@ -378,9 +378,20 @@ bool CDV3003::GetResponse(SDV3003_Packet &packet)
void CDV3003::FeedDevice() void CDV3003::FeedDevice()
{ {
const std::string modules(TRANSCODED_MODULES); const std::string modules(TRANSCODED_MODULES);
const std::size_t devcount = modules.size();
static std::size_t current = 0; // make sure we cycle through all the input queues
while (keep_running) while (keep_running)
{ {
auto packet = inq.pop(); std::shared_ptr<CTranscoderPacket> packet;
// try each input queue until we get a packet
for (std::size_t tries=0; tries<devcount; tries++)
{
packet = inq[current++].pop();
if (current > devcount)
current = 0;
if (packet)
break;
}
if (packet) if (packet)
{ {
const bool needs_audio = (Encoding::dstar==type) ? packet->DStarIsSet() : packet->DMRIsSet(); const bool needs_audio = (Encoding::dstar==type) ? packet->DStarIsSet() : packet->DMRIsSet();
@ -495,14 +506,21 @@ void CDV3003::ReadDevice()
void CDV3003::AddPacket(const std::shared_ptr<CTranscoderPacket> packet) void CDV3003::AddPacket(const std::shared_ptr<CTranscoderPacket> packet)
{ {
inq.push(packet); static const std::string modules(TRANSCODED_MODULES);
static const unsigned int devcount = modules.size();
auto pos = modules.find(packet->GetModule());
if (pos < devcount)
inq[pos].push(packet);
else
std::cerr << "Incoming packet module, '" << packet->GetModule() << "', is not configured for this device!" << std::endl;
#ifdef DEBUG #ifdef DEBUG
static unsigned int maxsize = 0; static unsigned int maxsize[3] = { 0, 0, 0 };
unsigned int s = inq.size(); unsigned int s = inq[pos].size();
if (s > maxsize) if (s > maxsize[pos])
{ {
std::cout << "input queue size for " << ((type==Encoding::dstar) ? "dstar" : "dmr") << " is " << s << std::endl; std::cout << "input queue #" << pos << " size for " << ((type==Encoding::dstar) ? "dstar" : "dmr") << " is " << s << std::endl;
maxsize = s; maxsize[pos] = s;
} }
#endif #endif
} }

@ -20,7 +20,6 @@
#include <string> #include <string>
#include <sstream> #include <sstream>
#include <future> #include <future>
#include <mutex>
#include <atomic> #include <atomic>
#include "PacketQueue.h" #include "PacketQueue.h"
@ -118,7 +117,7 @@ private:
std::atomic<unsigned int> ch_depth, sp_depth; std::atomic<unsigned int> ch_depth, sp_depth;
std::atomic<bool> keep_running; std::atomic<bool> keep_running;
CPacketQueue vocq[3]; // we need a queue for each vocoder CPacketQueue vocq[3]; // we need a queue for each vocoder
CPacketQueue inq; // and input queue CPacketQueue inq[3]; // and input queues
std::future<void> feedFuture, readFuture; std::future<void> feedFuture, readFuture;
std::string devicepath, productid, version; std::string devicepath, productid, version;

@ -27,7 +27,7 @@ int main()
if (Controller.Start()) if (Controller.Start())
return EXIT_FAILURE; return EXIT_FAILURE;
std::cout << "Hybrid Transcoder version #211228 successfully started" << std::endl; std::cout << "Hybrid Transcoder version #211231 successfully started" << std::endl;
pause(); pause();

@ -19,40 +19,41 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
#include <queue> #include <queue>
#include <mutex>
#include <memory> #include <memory>
#include "TranscoderPacket.h" #include "TranscoderPacket.h"
// for holding CTranscoder packets while the vocoders are working their magic // for holding CTranscoder packets while the vocoders are working their magic
// thread safe
class CPacketQueue class CPacketQueue
{ {
public: public:
// pass thru // pass thru
std::shared_ptr<CTranscoderPacket> pop() std::shared_ptr<CTranscoderPacket> pop()
{ {
std::lock_guard<std::mutex> lock(m);
std::shared_ptr<CTranscoderPacket> pack; std::shared_ptr<CTranscoderPacket> pack;
if (! queue.empty()) { if (! q.empty()) {
pack = queue.front(); pack = q.front();
queue.pop(); q.pop();
} }
return pack; return pack;
} }
bool empty()
{
return queue.empty();
}
void push(std::shared_ptr<CTranscoderPacket> packet) void push(std::shared_ptr<CTranscoderPacket> packet)
{ {
queue.push(packet); std::lock_guard<std::mutex> lock(m);
q.push(packet);
} }
std::size_t size() std::size_t size()
{ {
return queue.size(); std::lock_guard<std::mutex> lock(m);
return q.size();
} }
protected: private:
std::queue<std::shared_ptr<CTranscoderPacket>> queue; std::queue<std::shared_ptr<CTranscoderPacket>> q;
std::mutex m;
}; };

Loading…
Cancel
Save

Powered by TurnKey Linux.