From e1219ef345b32d10734cd62b5927aafcec98cdaf Mon Sep 17 00:00:00 2001 From: Tom Early Date: Fri, 31 Dec 2021 06:36:19 -0700 Subject: [PATCH] 3 input queues to make sure a single module doesn't dominate the device --- Controller.cpp | 20 -------------------- Controller.h | 2 +- DV3003.cpp | 32 +++++++++++++++++++++++++------- DV3003.h | 3 +-- Main.cpp | 2 +- PacketQueue.h | 25 +++++++++++++------------ 6 files changed, 41 insertions(+), 43 deletions(-) diff --git a/Controller.cpp b/Controller.cpp index 4a28d6d..cbc6f91 100644 --- a/Controller.cpp +++ b/Controller.cpp @@ -144,20 +144,14 @@ void CController::ReadReflectorThread() switch (packet->GetCodecIn()) { case ECodecType::dstar: - add_dst_mux.lock(); dstar_device.AddPacket(packet); - add_dst_mux.unlock(); break; case ECodecType::dmr: - add_dmr_mux.lock(); dmr_device.AddPacket(packet); - add_dmr_mux.unlock(); break; case ECodecType::c2_1600: case ECodecType::c2_3200: - c2_mux.lock(); codec2_queue.push(packet); - c2_mux.unlock(); break; default: Dump(packet, "ERROR: Received a reflector packet with unknown Codec:"); @@ -246,21 +240,15 @@ void CController::Codec2toAudio(std::shared_ptr packet) } } // the only thing left is to encode the two ambe, so push the packet onto both AMBE queues - add_dst_mux.lock(); dstar_device.AddPacket(packet); - add_dst_mux.unlock(); - add_dmr_mux.lock(); dmr_device.AddPacket(packet); - add_dmr_mux.unlock(); } void CController::ProcessC2Thread() { while (keep_running) { - c2_mux.lock(); auto packet = codec2_queue.pop(); - c2_mux.unlock(); if (packet) { switch (packet->GetCodecIn()) @@ -300,12 +288,8 @@ void CController::RouteDstPacket(std::shared_ptr packet) if (ECodecType::dstar == packet->GetCodecIn()) { // codec_in is dstar, the audio has just completed, so now calc the M17 and DMR - c2_mux.lock(); codec2_queue.push(packet); - c2_mux.unlock(); - add_dmr_mux.lock(); dmr_device.AddPacket(packet); - add_dmr_mux.unlock(); } else if (packet->AllCodecsAreSet()) { @@ -319,12 +303,8 @@ void CController::RouteDmrPacket(std::shared_ptr packet) { if (ECodecType::dmr == packet->GetCodecIn()) { - c2_mux.lock(); codec2_queue.push(packet); - c2_mux.unlock(); - add_dst_mux.lock(); dstar_device.AddPacket(packet); - add_dst_mux.unlock(); } else if (packet->AllCodecsAreSet()) { diff --git a/Controller.h b/Controller.h index 85ac74a..6638a49 100644 --- a/Controller.h +++ b/Controller.h @@ -53,7 +53,7 @@ protected: CDV3003 dmr_device{Encoding::dmr}; CPacketQueue codec2_queue; - std::mutex add_dst_mux, add_dmr_mux, c2_mux, send_mux; + std::mutex send_mux; bool InitDevices(); // processing threads diff --git a/DV3003.cpp b/DV3003.cpp index ad69655..d02be78 100644 --- a/DV3003.cpp +++ b/DV3003.cpp @@ -378,9 +378,20 @@ bool CDV3003::GetResponse(SDV3003_Packet &packet) void CDV3003::FeedDevice() { const std::string modules(TRANSCODED_MODULES); + const std::size_t devcount = modules.size(); + static std::size_t current = 0; // make sure we cycle through all the input queues while (keep_running) { - auto packet = inq.pop(); + std::shared_ptr packet; + // try each input queue until we get a packet + for (std::size_t tries=0; tries devcount) + current = 0; + if (packet) + break; + } if (packet) { const bool needs_audio = (Encoding::dstar==type) ? packet->DStarIsSet() : packet->DMRIsSet(); @@ -495,14 +506,21 @@ void CDV3003::ReadDevice() void CDV3003::AddPacket(const std::shared_ptr packet) { - inq.push(packet); + static const std::string modules(TRANSCODED_MODULES); + static const unsigned int devcount = modules.size(); + auto pos = modules.find(packet->GetModule()); + if (pos < devcount) + inq[pos].push(packet); + else + std::cerr << "Incoming packet module, '" << packet->GetModule() << "', is not configured for this device!" << std::endl; + #ifdef DEBUG - static unsigned int maxsize = 0; - unsigned int s = inq.size(); - if (s > maxsize) + static unsigned int maxsize[3] = { 0, 0, 0 }; + unsigned int s = inq[pos].size(); + if (s > maxsize[pos]) { - std::cout << "input queue size for " << ((type==Encoding::dstar) ? "dstar" : "dmr") << " is " << s << std::endl; - maxsize = s; + std::cout << "input queue #" << pos << " size for " << ((type==Encoding::dstar) ? "dstar" : "dmr") << " is " << s << std::endl; + maxsize[pos] = s; } #endif } diff --git a/DV3003.h b/DV3003.h index 4b2c8d2..f076a4b 100644 --- a/DV3003.h +++ b/DV3003.h @@ -20,7 +20,6 @@ #include #include #include -#include #include #include "PacketQueue.h" @@ -118,7 +117,7 @@ private: std::atomic ch_depth, sp_depth; std::atomic keep_running; CPacketQueue vocq[3]; // we need a queue for each vocoder - CPacketQueue inq; // and input queue + CPacketQueue inq[3]; // and input queues std::future feedFuture, readFuture; std::string devicepath, productid, version; diff --git a/Main.cpp b/Main.cpp index 2c0910a..c43520f 100644 --- a/Main.cpp +++ b/Main.cpp @@ -27,7 +27,7 @@ int main() if (Controller.Start()) return EXIT_FAILURE; - std::cout << "Hybrid Transcoder version #211228 successfully started" << std::endl; + std::cout << "Hybrid Transcoder version #211231 successfully started" << std::endl; pause(); diff --git a/PacketQueue.h b/PacketQueue.h index 6ee361c..906cc51 100644 --- a/PacketQueue.h +++ b/PacketQueue.h @@ -19,40 +19,41 @@ // along with this program. If not, see . #include +#include #include #include "TranscoderPacket.h" // for holding CTranscoder packets while the vocoders are working their magic +// thread safe class CPacketQueue { public: // pass thru std::shared_ptr pop() { + std::lock_guard lock(m); std::shared_ptr pack; - if (! queue.empty()) { - pack = queue.front(); - queue.pop(); + if (! q.empty()) { + pack = q.front(); + q.pop(); } return pack; } - bool empty() - { - return queue.empty(); - } - void push(std::shared_ptr packet) { - queue.push(packet); + std::lock_guard lock(m); + q.push(packet); } std::size_t size() { - return queue.size(); + std::lock_guard lock(m); + return q.size(); } -protected: - std::queue> queue; +private: + std::queue> q; + std::mutex m; };