a blocking CPacketQueue

main
Tom Early 4 years ago
parent e5a4011507
commit 4336de4723

@ -378,46 +378,41 @@ bool CDV3003::GetResponse(SDV3003_Packet &packet)
void CDV3003::FeedDevice() void CDV3003::FeedDevice()
{ {
const std::string modules(TRANSCODED_MODULES); const std::string modules(TRANSCODED_MODULES);
const std::size_t devcount = modules.size(); const auto n = modules.size();
static std::size_t current = 0; // make sure we cycle through all the input queues
while (keep_running) while (keep_running)
{ {
std::shared_ptr<CTranscoderPacket> packet; auto packet = input_queue.pop(); // blocks until there is something to pop
// try each input queue until we get a packet
for (std::size_t tries=0; tries<devcount; tries++) const bool needs_audio = (Encoding::dstar==type) ? packet->DStarIsSet() : packet->DMRIsSet();
while (keep_running) // wait until there is room
{ {
packet = inq[current++].pop(); if (needs_audio)
if (current > devcount) {
current = 0; // we need to decode ambe to audio
if (packet) if (ch_depth < 2)
break; break;
}
else
{
// we need to encode audio to ambe
if (sp_depth < 2)
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));
} }
if (packet)
{
const bool needs_audio = (Encoding::dstar==type) ? packet->DStarIsSet() : packet->DMRIsSet();
while (keep_running) // wait until there is room if (keep_running)
{
auto index = modules.find(packet->GetModule());
// save the packet in the vocoder's queue while the vocoder does its magic
if (std::string::npos == index)
{ {
if (needs_audio) std::cerr << "Module '" << packet->GetModule() << "' is not configured on " << devicepath << std::endl;
{
// we need to decode ambe to audio
if (ch_depth < 2)
break;
}
else
{
// we need to encode audio to ambe
if (sp_depth < 2)
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));
} }
else
if (keep_running)
{ {
auto index = modules.find(packet->GetModule()); waiting_packet[index] = packet;
// save the packet in the vocoder's queue while the vocoder does its magic
vocq[index].push(packet);
if (needs_audio) if (needs_audio)
{ {
@ -431,10 +426,6 @@ void CDV3003::FeedDevice()
} }
} }
} }
else // no packet is in the input queue
{
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
} }
} }
@ -464,10 +455,10 @@ void CDV3003::ReadDevice()
if (! GetResponse(p)) if (! GetResponse(p))
{ {
unsigned int channel = p.field_id - PKT_CHANNEL0; unsigned int channel = p.field_id - PKT_CHANNEL0;
auto packet = vocq[channel].pop(); auto packet = waiting_packet[channel];
if (PKT_CHANNEL == p.header.packet_type) if (PKT_CHANNEL == p.header.packet_type)
{ {
if (12!=ntohs(p.header.payload_length) || 1!=p.payload.ambe.chand || 72!=p.payload.ambe.num_bits) if (12!=ntohs(p.header.payload_length) || PKT_CHAND!=p.payload.ambe.chand || 72!=p.payload.ambe.num_bits)
dump("Improper ambe packet:", &p, packet_size(p)); dump("Improper ambe packet:", &p, packet_size(p));
sp_depth--; sp_depth--;
if (Encoding::dstar == type) if (Encoding::dstar == type)
@ -478,7 +469,7 @@ void CDV3003::ReadDevice()
} }
else if (PKT_SPEECH == p.header.packet_type) else if (PKT_SPEECH == p.header.packet_type)
{ {
if (323!=ntohs(p.header.payload_length) || 0!=p.payload.audio.speechd || 160!=p.payload.audio.num_samples) if (323!=ntohs(p.header.payload_length) || PKT_SPEECHD!=p.payload.audio.speechd || 160!=p.payload.audio.num_samples)
dump("Improper audio packet:", &p, packet_size(p)); dump("Improper audio packet:", &p, packet_size(p));
ch_depth--; ch_depth--;
packet->SetAudioSamples(p.payload.audio.samples, true); packet->SetAudioSamples(p.payload.audio.samples, true);
@ -506,23 +497,7 @@ void CDV3003::ReadDevice()
void CDV3003::AddPacket(const std::shared_ptr<CTranscoderPacket> packet) void CDV3003::AddPacket(const std::shared_ptr<CTranscoderPacket> packet)
{ {
static const std::string modules(TRANSCODED_MODULES); input_queue.push(packet);
static const unsigned int devcount = modules.size();
auto pos = modules.find(packet->GetModule());
if (pos < devcount)
inq[pos].push(packet);
else
std::cerr << "Incoming packet module, '" << packet->GetModule() << "', is not configured for this device!" << std::endl;
#ifdef DEBUG
static unsigned int maxsize[3] = { 0, 0, 0 };
unsigned int s = inq[pos].size();
if (s > maxsize[pos])
{
std::cout << "input queue #" << pos << " size for " << ((type==Encoding::dstar) ? "dstar" : "dmr") << " is " << s << std::endl;
maxsize[pos] = s;
}
#endif
} }
bool CDV3003::SendAudio(const uint8_t channel, const int16_t *audio) const bool CDV3003::SendAudio(const uint8_t channel, const int16_t *audio) const

@ -116,8 +116,8 @@ private:
int fd; int fd;
std::atomic<unsigned int> ch_depth, sp_depth; std::atomic<unsigned int> ch_depth, sp_depth;
std::atomic<bool> keep_running; std::atomic<bool> keep_running;
CPacketQueue vocq[3]; // we need a queue for each vocoder std::shared_ptr<CTranscoderPacket> waiting_packet[3]; // the packet currently being processed in each vocoder
CPacketQueue inq[3]; // and input queues CPacketQueue input_queue;
std::future<void> feedFuture, readFuture; std::future<void> feedFuture, readFuture;
std::string devicepath, productid, version; std::string devicepath, productid, version;

@ -27,7 +27,7 @@ int main()
if (Controller.Start()) if (Controller.Start())
return EXIT_FAILURE; return EXIT_FAILURE;
std::cout << "Hybrid Transcoder version #211231 successfully started" << std::endl; std::cout << "Hybrid Transcoder version 1.0.0 successfully started" << std::endl;
pause(); pause();

@ -20,6 +20,7 @@
#include <queue> #include <queue>
#include <mutex> #include <mutex>
#include <condition_variable>
#include <memory> #include <memory>
#include "TranscoderPacket.h" #include "TranscoderPacket.h"
@ -29,15 +30,15 @@
class CPacketQueue class CPacketQueue
{ {
public: public:
// pass thru // blocks until there's something to pop
std::shared_ptr<CTranscoderPacket> pop() std::shared_ptr<CTranscoderPacket> pop()
{ {
std::lock_guard<std::mutex> lock(m); std::unique_lock<std::mutex> lock(m);
std::shared_ptr<CTranscoderPacket> pack; while (q.empty()) {
if (! q.empty()) { c.wait(lock);
pack = q.front();
q.pop();
} }
auto pack = q.front();
q.pop();
return pack; return pack;
} }
@ -45,15 +46,11 @@ public:
{ {
std::lock_guard<std::mutex> lock(m); std::lock_guard<std::mutex> lock(m);
q.push(packet); q.push(packet);
} c.notify_one();
std::size_t size()
{
std::lock_guard<std::mutex> lock(m);
return q.size();
} }
private: private:
std::queue<std::shared_ptr<CTranscoderPacket>> q; std::queue<std::shared_ptr<CTranscoderPacket>> q;
std::mutex m; mutable std::mutex m;
std::condition_variable c;
}; };

Loading…
Cancel
Save

Powered by TurnKey Linux.