From 4336de47233e37dab8e19fee99adbf792d1d9669 Mon Sep 17 00:00:00 2001 From: Tom Early Date: Mon, 3 Jan 2022 10:07:03 -0700 Subject: [PATCH] a blocking CPacketQueue --- DV3003.cpp | 87 ++++++++++++++++++--------------------------------- DV3003.h | 4 +-- Main.cpp | 2 +- PacketQueue.h | 23 ++++++-------- 4 files changed, 44 insertions(+), 72 deletions(-) diff --git a/DV3003.cpp b/DV3003.cpp index d02be78..ac4e922 100644 --- a/DV3003.cpp +++ b/DV3003.cpp @@ -378,46 +378,41 @@ bool CDV3003::GetResponse(SDV3003_Packet &packet) void CDV3003::FeedDevice() { const std::string modules(TRANSCODED_MODULES); - const std::size_t devcount = modules.size(); - static std::size_t current = 0; // make sure we cycle through all the input queues + const auto n = modules.size(); while (keep_running) { - std::shared_ptr packet; - // try each input queue until we get a packet - for (std::size_t tries=0; triesDStarIsSet() : packet->DMRIsSet(); + + while (keep_running) // wait until there is room { - packet = inq[current++].pop(); - if (current > devcount) - current = 0; - if (packet) - break; + if (needs_audio) + { + // we need to decode ambe to audio + if (ch_depth < 2) + break; + } + else + { + // we need to encode audio to ambe + if (sp_depth < 2) + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(5)); } - if (packet) - { - const bool needs_audio = (Encoding::dstar==type) ? packet->DStarIsSet() : packet->DMRIsSet(); - while (keep_running) // wait until there is room + if (keep_running) + { + auto index = modules.find(packet->GetModule()); + // save the packet in the vocoder's queue while the vocoder does its magic + if (std::string::npos == index) { - if (needs_audio) - { - // we need to decode ambe to audio - if (ch_depth < 2) - break; - } - else - { - // we need to encode audio to ambe - if (sp_depth < 2) - break; - } - std::this_thread::sleep_for(std::chrono::milliseconds(5)); + std::cerr << "Module '" << packet->GetModule() << "' is not configured on " << devicepath << std::endl; } - - if (keep_running) + else { - auto index = modules.find(packet->GetModule()); - // save the packet in the vocoder's queue while the vocoder does its magic - vocq[index].push(packet); + waiting_packet[index] = packet; if (needs_audio) { @@ -431,10 +426,6 @@ void CDV3003::FeedDevice() } } } - else // no packet is in the input queue - { - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - } } } @@ -464,10 +455,10 @@ void CDV3003::ReadDevice() if (! GetResponse(p)) { unsigned int channel = p.field_id - PKT_CHANNEL0; - auto packet = vocq[channel].pop(); + auto packet = waiting_packet[channel]; if (PKT_CHANNEL == p.header.packet_type) { - if (12!=ntohs(p.header.payload_length) || 1!=p.payload.ambe.chand || 72!=p.payload.ambe.num_bits) + if (12!=ntohs(p.header.payload_length) || PKT_CHAND!=p.payload.ambe.chand || 72!=p.payload.ambe.num_bits) dump("Improper ambe packet:", &p, packet_size(p)); sp_depth--; if (Encoding::dstar == type) @@ -478,7 +469,7 @@ void CDV3003::ReadDevice() } else if (PKT_SPEECH == p.header.packet_type) { - if (323!=ntohs(p.header.payload_length) || 0!=p.payload.audio.speechd || 160!=p.payload.audio.num_samples) + if (323!=ntohs(p.header.payload_length) || PKT_SPEECHD!=p.payload.audio.speechd || 160!=p.payload.audio.num_samples) dump("Improper audio packet:", &p, packet_size(p)); ch_depth--; packet->SetAudioSamples(p.payload.audio.samples, true); @@ -506,23 +497,7 @@ void CDV3003::ReadDevice() void CDV3003::AddPacket(const std::shared_ptr packet) { - static const std::string modules(TRANSCODED_MODULES); - static const unsigned int devcount = modules.size(); - auto pos = modules.find(packet->GetModule()); - if (pos < devcount) - inq[pos].push(packet); - else - std::cerr << "Incoming packet module, '" << packet->GetModule() << "', is not configured for this device!" << std::endl; - -#ifdef DEBUG - static unsigned int maxsize[3] = { 0, 0, 0 }; - unsigned int s = inq[pos].size(); - if (s > maxsize[pos]) - { - std::cout << "input queue #" << pos << " size for " << ((type==Encoding::dstar) ? "dstar" : "dmr") << " is " << s << std::endl; - maxsize[pos] = s; - } -#endif + input_queue.push(packet); } bool CDV3003::SendAudio(const uint8_t channel, const int16_t *audio) const diff --git a/DV3003.h b/DV3003.h index f076a4b..e6a8443 100644 --- a/DV3003.h +++ b/DV3003.h @@ -116,8 +116,8 @@ private: int fd; std::atomic ch_depth, sp_depth; std::atomic keep_running; - CPacketQueue vocq[3]; // we need a queue for each vocoder - CPacketQueue inq[3]; // and input queues + std::shared_ptr waiting_packet[3]; // the packet currently being processed in each vocoder + CPacketQueue input_queue; std::future feedFuture, readFuture; std::string devicepath, productid, version; diff --git a/Main.cpp b/Main.cpp index c43520f..4a1b9e4 100644 --- a/Main.cpp +++ b/Main.cpp @@ -27,7 +27,7 @@ int main() if (Controller.Start()) return EXIT_FAILURE; - std::cout << "Hybrid Transcoder version #211231 successfully started" << std::endl; + std::cout << "Hybrid Transcoder version 1.0.0 successfully started" << std::endl; pause(); diff --git a/PacketQueue.h b/PacketQueue.h index 906cc51..8da8aba 100644 --- a/PacketQueue.h +++ b/PacketQueue.h @@ -20,6 +20,7 @@ #include #include +#include #include #include "TranscoderPacket.h" @@ -29,15 +30,15 @@ class CPacketQueue { public: - // pass thru + // blocks until there's something to pop std::shared_ptr pop() { - std::lock_guard lock(m); - std::shared_ptr pack; - if (! q.empty()) { - pack = q.front(); - q.pop(); + std::unique_lock lock(m); + while (q.empty()) { + c.wait(lock); } + auto pack = q.front(); + q.pop(); return pack; } @@ -45,15 +46,11 @@ public: { std::lock_guard lock(m); q.push(packet); - } - - std::size_t size() - { - std::lock_guard lock(m); - return q.size(); + c.notify_one(); } private: std::queue> q; - std::mutex m; + mutable std::mutex m; + std::condition_variable c; };