refactor CController

main
Tom Early 4 years ago
parent 6c6d327903
commit 1bdf393d55

@ -20,11 +20,14 @@
#include <iomanip> #include <iomanip>
#include <sstream> #include <sstream>
#include <fstream> #include <fstream>
#include <thread>
#include "TranscoderPacket.h" #include "TranscoderPacket.h"
#include "Controller.h" #include "Controller.h"
CController::CController() : dmr_vocoder_count(0), current_dmr_vocoder(0), dstar_vocoder_count(0), current_dstar_vocoder(0), keep_running(true) #define MAX_DEPTH 3
CController::CController() : dmr_vocoder_count(0), current_dmr_vocoder(0), dstar_vocoder_count(0), current_dstar_vocoder(0), dmr_depth(0), dstar_depth(0), keep_running(true)
{ {
} }
@ -35,18 +38,26 @@ bool CController::Start()
keep_running = false; keep_running = false;
return true; return true;
} }
reflectorThread = std::async(std::launch::async, &CController::ReadReflector, this); reflectorFuture = std::async(std::launch::async, &CController::ReadReflectorThread, this);
ambeThread = std::async(std::launch::async, &CController::ReadAmbeDevices, this); readambeFuture = std::async(std::launch::async, &CController::ReadAmbesThread, this);
feedambeFuture = std::async(std::launch::async, &CController::FeedAmbesThread, this);
c2Future = std::async(std::launch::async, &CController::ProcessC2Thread, this);
return false; return false;
} }
void CController::Stop() void CController::Stop()
{ {
keep_running = false; keep_running = false;
if (reflectorThread.valid())
reflectorThread.get(); if (reflectorFuture.valid())
if (ambeThread.valid()) reflectorFuture.get();
ambeThread.get(); if (readambeFuture.valid())
readambeFuture.get();
if (feedambeFuture.valid())
feedambeFuture.get();
if (c2Future.valid())
c2Future.get();
reader.Close(); reader.Close();
for (auto &it : dstar_device) for (auto &it : dstar_device)
{ {
@ -142,51 +153,79 @@ void CController::IncrementDStarVocoder()
current_dstar_vocoder = (current_dstar_vocoder + 1) % dstar_vocoder_count; current_dstar_vocoder = (current_dstar_vocoder + 1) % dstar_vocoder_count;
} }
// DMR and YSF use the exact same codec, both AMBE. void CController::ReadReflectorThread()
// Incoming packets from clients have different starting codecs, AMBE or M17.
// The transcoder's task is to fill in the missing data.
// Incoming AMBE codecs go to the DVSI hardware for decoding to audio. The ambeThread will finish up.
// Incoming M17 codecs are decoded in software and the audio data send to the two different AMBE encoders.
// There is no need to transcode between M17 codecs.
void CController::ReadReflector()
{ {
while (keep_running) while (keep_running)
{ {
STCPacket tcpack; STCPacket tcpack;
// wait up to 40 ms to read something on the unix port // wait up to 100 ms to read something on the unix port
if (reader.Receive(&tcpack, 40)) if (reader.Receive(&tcpack, 100))
{ {
// create a shared pointer to a new packet // create a shared pointer to a new packet
// there is only one CTranscoderPacket created for each new STCPacket received from the reflector // there is only one CTranscoderPacket created for each new STCPacket received from the reflector
auto packet = std::make_shared<CTranscoderPacket>(tcpack); auto packet = std::make_shared<CTranscoderPacket>(tcpack);
#ifdef DEBUG
Dump(packet, "Incoming TC Packet:");
#endif
unsigned int devnum, vocnum;
switch (packet->GetCodecIn()) switch (packet->GetCodecIn())
{ {
case ECodecType::dstar: case ECodecType::dstar:
devnum = current_dstar_vocoder / 3; dstar_queue.push(packet);
vocnum = current_dstar_vocoder % 3;
//send it to the next available dstar vocoder
dstar_device[devnum]->SendData(vocnum, packet->GetDStarData());
//push the packet onto that vocoder's queue
dstar_device[devnum]->packet_queue[vocnum].push(packet);
//increment the dstar vocoder index
IncrementDStarVocoder();
break; break;
case ECodecType::dmr: case ECodecType::dmr:
devnum = current_dmr_vocoder / 3; dmr_queue.push(packet);
vocnum = current_dmr_vocoder % 3;
//send it to the next avaiable dmr vocoder
dmr_device[devnum]->SendData(vocnum, packet->GetDMRData());
//push the packet onto that vocoder's queue
dmr_device[devnum]->packet_queue[vocnum].push(packet);
//increment the dmr vocoder index
IncrementDMRVocoder();
break; break;
case ECodecType::c2_1600: case ECodecType::c2_1600:
case ECodecType::c2_3200: case ECodecType::c2_3200:
c2_mux.lock();
codec2_queue.push(packet);
c2_mux.unlock();
break;
default:
Dump(packet, "ERROR: Got a reflector packet with unknown Codec:");
break;
}
}
}
}
void CController::AudiotoCodec2(std::shared_ptr<CTranscoderPacket> packet)
{
uint8_t m17data[8];
c2_32.codec2_encode(m17data, packet->GetAudio());
if (packet->IsSecond())
{
//move the c2_3200 data to the second half of the M17 packet
packet->SetM17Data(m17data, EAudioSection::secondhalf);
}
else /* the packet is first */
{
// move it into the packet
packet->SetM17Data(m17data, EAudioSection::firsthalf);
if (packet->IsLast())
{
// we have an odd number of packets, so we have to finish up the m17 packet
const uint8_t silence[] = {0x00, 0x01, 0x43, 0x09, 0xe4, 0x9c, 0x08, 0x21 };
//put codec silence in the second half of the codec
packet->SetM17Data(silence, EAudioSection::secondhalf);
}
}
// we've received the audio and we've calculated the m17 data, now we just need to
// calculate the other ambe data
if (packet->GetCodecIn() == ECodecType::dmr)
{
dstar_mux.lock();
dstar_queue.push(packet);
dstar_mux.unlock();
}
else /* the dmr/dstar type is dstar */
{
dmr_mux.lock();
dmr_queue.push(packet);
dmr_mux.unlock();
}
}
void CController::Codec2toAudio(std::shared_ptr<CTranscoderPacket> packet)
{
if (packet->IsSecond()) if (packet->IsSecond())
{ {
if (packet->GetCodecIn() == ECodecType::c2_1600) if (packet->GetCodecIn() == ECodecType::c2_1600)
@ -219,34 +258,89 @@ void CController::ReadReflector()
c2_32.codec2_decode(packet->GetAudio(), packet->GetM17Data()); c2_32.codec2_decode(packet->GetAudio(), packet->GetM17Data());
} }
} }
// the only thing left is to encode the two ambe
dstar_mux.lock();
dstar_queue.push(packet);
dstar_mux.unlock();
dmr_mux.lock();
dmr_queue.push(packet);
dmr_mux.unlock();
}
void CController::ProcessC2Thread()
{
while (keep_running)
{
c2_mux.lock();
auto c2_queue_is_empty = codec2_queue.empty();
c2_mux.unlock();
if (c2_queue_is_empty)
{
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
else
{
c2_mux.lock();
auto packet = codec2_queue.pop();
c2_mux.unlock();
switch (packet->GetCodecIn())
{
case ECodecType::c2_1600:
case ECodecType::c2_3200:
Codec2toAudio(packet);
break;
case ECodecType::dstar:
case ECodecType::dmr:
AudiotoCodec2(packet);
break;
}
}
}
}
void CController::FeedAmbesThread()
{
while (keep_running)
{
bool did_nothing = true;
dstar_mux.lock();
if ((! dstar_queue.empty()) && (dstar_depth < MAX_DEPTH))
{
// encode the audio to dstar // encode the audio to dstar
devnum = current_dstar_vocoder / 3; auto packet = dstar_queue.pop();
vocnum = current_dstar_vocoder % 3; auto devnum = current_dstar_vocoder / 3;
// send the audio to the current dstar vocoder auto vocnum = current_dstar_vocoder % 3;
dstar_device[devnum]->SendAudio(vocnum, packet->GetAudio());
//push the packet onto the vocoder's queue //push the packet onto the vocoder's queue
dstar_device[devnum]->packet_queue[vocnum].push(packet); dstar_device[devnum]->packet_queue[vocnum].push(packet);
// send the audio to the current dstar vocoder
dstar_device[devnum]->SendAudio(vocnum, packet->GetAudio());
dstar_depth++;
// increment the dstar vocoder index // increment the dstar vocoder index
IncrementDStarVocoder(); IncrementDStarVocoder();
did_nothing = false;
}
dstar_mux.unlock();
dmr_mux.lock();
if ((! dmr_queue.empty()) && (dmr_depth < MAX_DEPTH))
{
// encode the audio to dmr // encode the audio to dmr
devnum = current_dmr_vocoder / 3; auto packet = dmr_queue.pop();
vocnum = current_dmr_vocoder % 3; auto devnum = current_dmr_vocoder / 3;
// send the audio to the corrent dmr vocoder auto vocnum = current_dmr_vocoder % 3;
dmr_device[devnum]->SendAudio(vocnum, packet->GetAudio());
// push the packet onto the dmr vocoder's queue // push the packet onto the dmr vocoder's queue
dmr_device[devnum]->packet_queue[vocnum].push(packet); dmr_device[devnum]->packet_queue[vocnum].push(packet);
// send the audio to the corrent dmr vocoder
dmr_device[devnum]->SendAudio(vocnum, packet->GetAudio());
dmr_depth++;
// increment the dmr vocoder index // increment the dmr vocoder index
IncrementDMRVocoder(); IncrementDMRVocoder();
break; did_nothing = false;
case ECodecType::none:
default:
std::cerr << "ERROR: Got a reflector packet with unknown Codec" << std::endl;
#ifdef DEBUG
Dump(packet, "This is what's in it:");
#endif
break;
}
} }
dmr_mux.unlock();
if (did_nothing)
std::this_thread::sleep_for(std::chrono::milliseconds(5));
} }
} }
@ -258,7 +352,7 @@ void CController::AddFDSet(int &max, int newfd, fd_set *set) const
} }
// read transcoded (AMBE or audio) data from DVSI hardware // read transcoded (AMBE or audio) data from DVSI hardware
void CController::ReadAmbeDevices() void CController::ReadAmbesThread()
{ {
while (keep_running) while (keep_running)
{ {
@ -338,85 +432,56 @@ void CController::ReadDevice(std::shared_ptr<CDV3003> device, EAmbeType type)
} }
//get the packet from either the dstar or dmr vocoder's queue //get the packet from either the dstar or dmr vocoder's queue
auto spPacket = device->packet_queue[devpacket.field_id-PKT_CHANNEL0].pop(); std::shared_ptr<CTranscoderPacket> packet;
if (EAmbeType::dstar == type)
{
dstar_mux.lock();
packet = device->packet_queue[devpacket.field_id-PKT_CHANNEL0].pop();
dstar_depth--;
dstar_mux.unlock();
}
else
{
dmr_mux.lock();
packet = device->packet_queue[devpacket.field_id-PKT_CHANNEL0].pop();
dmr_depth--;
dmr_mux.unlock();
}
if (is_audio) if (is_audio)
{ {
//move the audio to the CTranscoderPacket //move the audio to the CTranscoderPacket
for (unsigned int i=0; i<160; i++) for (unsigned int i=0; i<160; i++)
spPacket->GetAudio()[i] = ntohs(devpacket.payload.audio.samples[i]); packet->GetAudio()[i] = ntohs(devpacket.payload.audio.samples[i]);
// we need to encode the m17 // we need to encode the m17
// encode the audio to c2_3200 (all ambe input vocodes to ECodecType::c2_3200) // encode the audio to c2_3200 (all ambe input vocodes to ECodecType::c2_3200)
uint8_t m17data[8]; AudiotoCodec2(packet);
c2_32.codec2_encode(m17data, spPacket->GetAudio());
if (spPacket->IsSecond())
{
//move the c2_3200 data to the second half of the M17 packet
spPacket->SetM17Data(m17data, EAudioSection::secondhalf);
}
else /* the packet is first */
{
// move it into the packet
spPacket->SetM17Data(m17data, EAudioSection::firsthalf);
if (spPacket->IsLast())
{
// we have an odd number of packets, so we have to finish up the m17 packet
const uint8_t silence[] = {0x00, 0x01, 0x43, 0x09, 0xe4, 0x9c, 0x08, 0x21 };
//put codec silence in the second half of the codec
spPacket->SetM17Data(silence, EAudioSection::secondhalf);
}
}
// we've received the audio and we've calculated the m17 data, now we just need to
// calculate the other ambe data
if (type == EAmbeType::dmr)
{
const unsigned int devnum = current_dstar_vocoder / 3;
const unsigned int vocnum = current_dstar_vocoder % 3;
//send the audio packet to the next available dstar vocoder
dstar_device[devnum]->SendAudio(vocnum, spPacket->GetAudio());
//push the packet onto the dstar vocoder's queue
dstar_device[devnum]->packet_queue[vocnum].push(spPacket);
//increment the dmr vocoder index
IncrementDStarVocoder();
}
else /* the dmr/dstar type is dstar */
{
const unsigned int devnum = current_dmr_vocoder / 3;
const unsigned int vocnum = current_dmr_vocoder % 3;
//send the audio packet to the next available dmr vocoder
dmr_device[devnum]->SendAudio(vocnum, spPacket->GetAudio());
//push the packet onto the dmr vocoder's queue
dmr_device[devnum]->packet_queue[vocnum].push(spPacket);
//increment the dmr vocoder index
IncrementDMRVocoder();
}
} }
else /* the response is ambe data */ else /* the response is ambe data */
{ {
// put the AMBE data in the packet // put the AMBE data in the packet
if (type == EAmbeType::dmr) if (type == EAmbeType::dmr)
{ {
spPacket->SetDMRData(devpacket.payload.ambe.data); packet->SetDMRData(devpacket.payload.ambe.data);
} }
else else
{ {
spPacket->SetDStarData(devpacket.payload.ambe.data); packet->SetDStarData(devpacket.payload.ambe.data);
} }
// send it off, if it's done // send it off, if it's done
if (spPacket->AllCodecsAreSet()) if (packet->AllCodecsAreSet())
{ {
// open a socket to the reflector channel // open a socket to the reflector channel
CUnixDgramWriter socket; CUnixDgramWriter socket;
std::string name(TC2REF); std::string name(TC2REF);
name.append(1, spPacket->GetModule()); name.append(1, packet->GetModule());
socket.SetUp(name.c_str()); socket.SetUp(name.c_str());
// send the packet over the socket // send the packet over the socket
socket.Send(spPacket->GetTCPacket()); socket.Send(packet->GetTCPacket());
// the socket will automatically close after sending // the socket will automatically close after sending
#ifdef DEBUG #ifdef DEBUG
Dump(spPacket, "Completed Transcoder packet"); AppendWave(packet);
AppendWave(spPacket);
#endif #endif
} }
} }

@ -22,6 +22,7 @@
#include <memory> #include <memory>
#include <atomic> #include <atomic>
#include <future> #include <future>
#include <mutex>
#include <sys/select.h> #include <sys/select.h>
#include "codec2.h" #include "codec2.h"
@ -39,22 +40,28 @@ public:
void Stop(); void Stop();
protected: protected:
unsigned int dmr_vocoder_count, current_dmr_vocoder, dstar_vocoder_count, current_dstar_vocoder; unsigned int dmr_vocoder_count, current_dmr_vocoder, dstar_vocoder_count, current_dstar_vocoder, dmr_depth, dstar_depth;
std::atomic<bool> keep_running; std::atomic<bool> keep_running;
std::future<void> reflectorThread, ambeThread; std::future<void> reflectorFuture, readambeFuture, feedambeFuture, c2Future;
std::vector<std::shared_ptr<CDV3003>> dmr_device, dstar_device; std::vector<std::shared_ptr<CDV3003>> dmr_device, dstar_device;
std::map<char, int16_t[160]> audio_store; std::map<char, int16_t[160]> audio_store;
CUnixDgramReader reader; CUnixDgramReader reader;
CUnixDgramWriter writer; CUnixDgramWriter writer;
CCodec2 c2_16{false}; CCodec2 c2_16{false};
CCodec2 c2_32{true}; CCodec2 c2_32{true};
CPacketQueue codec2_queue, dmr_queue, dstar_queue;
std::mutex dstar_mux, dmr_mux, c2_mux;
bool InitDevices(); bool InitDevices();
void IncrementDMRVocoder(void); void IncrementDMRVocoder(void);
void IncrementDStarVocoder(void); void IncrementDStarVocoder(void);
// processing threads // processing threads
void ReadReflector(); void ReadReflectorThread();
void ReadAmbeDevices(); void ReadAmbesThread();
void FeedAmbesThread();
void ProcessC2Thread();
void Codec2toAudio(std::shared_ptr<CTranscoderPacket> pack);
void AudiotoCodec2(std::shared_ptr<CTranscoderPacket> pack);
void ReadDevice(std::shared_ptr<CDV3003> dv3003, EAmbeType type); void ReadDevice(std::shared_ptr<CDV3003> dv3003, EAmbeType type);
void AddFDSet(int &max, int newfd, fd_set *set) const; void AddFDSet(int &max, int newfd, fd_set *set) const;
#ifdef DEBUG #ifdef DEBUG

@ -116,6 +116,7 @@ public:
std::string GetVersion() const; std::string GetVersion() const;
CPacketQueue packet_queue[3]; // we need a queue for each vocoder CPacketQueue packet_queue[3]; // we need a queue for each vocoder
CPacketQueue device_queue; // and a queue for input
private: private:
const Encoding type; const Encoding type;
int fd; int fd;

@ -18,7 +18,6 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
#include <mutex>
#include <queue> #include <queue>
#include <memory> #include <memory>
@ -32,39 +31,28 @@ public:
std::shared_ptr<CTranscoderPacket> pop() std::shared_ptr<CTranscoderPacket> pop()
{ {
std::shared_ptr<CTranscoderPacket> pack; std::shared_ptr<CTranscoderPacket> pack;
mutex.lock();
if (! queue.empty()) { if (! queue.empty()) {
pack = queue.front(); pack = queue.front();
queue.pop(); queue.pop();
} }
mutex.unlock(); return pack;
return std::move(pack);
} }
bool empty() bool empty()
{ {
mutex.lock(); return queue.empty();
bool rval = queue.empty();
mutex.unlock();
return rval;
} }
void push(std::shared_ptr<CTranscoderPacket> packet) void push(std::shared_ptr<CTranscoderPacket> packet)
{ {
mutex.lock();
queue.push(packet); queue.push(packet);
mutex.unlock();
} }
std::size_t size() std::size_t size()
{ {
mutex.lock(); return queue.size();
auto s = queue.size();
mutex.unlock();
return s;
} }
protected: protected:
std::mutex mutex;
std::queue<std::shared_ptr<CTranscoderPacket>> queue; std::queue<std::shared_ptr<CTranscoderPacket>> queue;
}; };

Loading…
Cancel
Save

Powered by TurnKey Linux.