massive changes in workflow

main
Tom Early 4 years ago
parent aeec1687e2
commit 88b966310a

@ -25,9 +25,7 @@
#include "TranscoderPacket.h"
#include "Controller.h"
#define MAX_DEPTH 2
CController::CController() : dmr_vocoder_count(0), current_dmr_vocoder(0), dstar_vocoder_count(0), current_dstar_vocoder(0), dmr_depth(0), dstar_depth(0), keep_running(true)
CController::CController() : keep_running(true)
{
}
@ -39,8 +37,6 @@ bool CController::Start()
return true;
}
reflectorFuture = std::async(std::launch::async, &CController::ReadReflectorThread, this);
readambeFuture = std::async(std::launch::async, &CController::ReadAmbesThread, this);
feedambeFuture = std::async(std::launch::async, &CController::FeedAmbesThread, this);
c2Future = std::async(std::launch::async, &CController::ProcessC2Thread, this);
return false;
}
@ -51,31 +47,17 @@ void CController::Stop()
if (reflectorFuture.valid())
reflectorFuture.get();
if (readambeFuture.valid())
readambeFuture.get();
if (feedambeFuture.valid())
feedambeFuture.get();
if (c2Future.valid())
c2Future.get();
reader.Close();
for (auto &it : dstar_device)
{
it->CloseDevice();
it.reset();
}
dstar_device.clear();
for (auto &it : dmr_device)
{
it->CloseDevice();
it.reset();
}
dmr_device.clear();
dstar_device.CloseDevice();
dmr_device.CloseDevice();
}
bool CController::InitDevices()
{
std::set<std::string> deviceset;
std::vector<std::string> deviceset;
std::string device;
for (int i=0; i<32; i++) {
@ -85,7 +67,7 @@ bool CController::InitDevices()
if (access(device.c_str(), R_OK | W_OK))
break;
else
deviceset.insert(device);
deviceset.push_back(device);
}
if (deviceset.empty()) {
@ -93,66 +75,28 @@ bool CController::InitDevices()
return true;
}
if (2 > deviceset.size())
if (2 != deviceset.size())
{
std::cerr << "You need at least two DVSI 3003 devices" << std::endl;
std::cerr << "You need exactly two DVSI 3003 devices" << std::endl;
return true;
}
// now initialize each device
// the first one will be a dstar device
Encoding type = Encoding::dstar;
for (const auto devpath : deviceset)
{
// instantiate it
auto a3003 = std::make_shared<CDV3003>(type);
// open it
if (a3003->OpenDevice(devpath, 921600))
return true;
// initialize it
a3003->InitDV3003();
// set each of the 3 vocoders to the current type
for (uint8_t channel=PKT_CHANNEL0; channel<=PKT_CHANNEL2; channel++)
{
if (a3003->ConfigureVocoder(channel, type))
return true;
}
if (dstar_device.OpenDevice(deviceset[0], 921600) || dmr_device.OpenDevice(deviceset[1], 921600))
return true;
// add it to the list, according to type
if (Encoding::dstar == type)
{
dstar_device.push_back(a3003);
dstar_vocoder_count += 3;
}
else
{
dmr_device.push_back(a3003);
dmr_vocoder_count += 3;
}
if (dstar_device.InitDV3003() || dmr_device.InitDV3003())
return true;
// finally, toggle the type for the next device
type = (type == Encoding::dstar) ? Encoding::dmr : Encoding::dstar;
for (uint8_t ch=PKT_CHANNEL0; ch<=PKT_CHANNEL2; ch++)
{
if (dstar_device.ConfigureVocoder(ch, Encoding::dstar) || dmr_device.ConfigureVocoder(ch, Encoding::dmr))
return true;
}
std::cout << "Device count: DStar=" << dstar_device.size() << " DMR=" << dmr_device.size() << std::endl;
return false;
}
void CController::IncrementDMRVocoder()
{
current_dmr_vocoder = (current_dmr_vocoder + 1) % dmr_vocoder_count;
}
void CController::IncrementDStarVocoder()
{
current_dstar_vocoder = (current_dstar_vocoder + 1) % dstar_vocoder_count;
}
// Encapsulate the incoming STCPacket into a CTranscoderPacket and push it into the appropriate queue
// based on packet's codec_in.
void CController::ReadReflectorThread()
@ -170,10 +114,10 @@ void CController::ReadReflectorThread()
switch (packet->GetCodecIn())
{
case ECodecType::dstar:
dstar_queue.push(packet);
dstar_device.AddPacket(packet);
break;
case ECodecType::dmr:
dmr_queue.push(packet);
dmr_device.AddPacket(packet);
break;
case ECodecType::c2_1600:
case ECodecType::c2_3200:
@ -265,12 +209,8 @@ void CController::Codec2toAudio(std::shared_ptr<CTranscoderPacket> packet)
}
}
// the only thing left is to encode the two ambe, so push the packet onto both AMBE queues
dstar_mux.lock();
dstar_queue.push(packet);
dstar_mux.unlock();
dmr_mux.lock();
dmr_queue.push(packet);
dmr_mux.unlock();
dstar_device.AddPacket(packet);
dmr_device.AddPacket(packet);
}
void CController::ProcessC2Thread()
@ -302,213 +242,6 @@ void CController::ProcessC2Thread()
}
}
void CController::FeedAmbesThread()
{
while (keep_running)
{
bool did_nothing = true;
// If available, pop a packet from the dstar queue and send it for vocoding
dstar_mux.lock();
if ((! dstar_queue.empty()) && (dstar_depth < MAX_DEPTH))
{
// encode the audio to dstar
auto packet = dstar_queue.pop();
auto devnum = current_dstar_vocoder / 3;
auto vocnum = current_dstar_vocoder % 3;
//push the packet onto the vocoder's queue
dstar_device[devnum]->packet_queue[vocnum].push(packet);
// send the correct thing to the current dstar vocoder
if (ECodecType::dstar == packet->GetCodecIn())
dstar_device[devnum]->SendData(vocnum, packet->GetDStarData());
else
dstar_device[devnum]->SendAudio(vocnum, packet->GetAudio());
dstar_depth++;
// increment the dstar vocoder index
IncrementDStarVocoder();
did_nothing = false;
}
dstar_mux.unlock();
// If available, pop a packet from the dmr queue and send it for vocoding
dmr_mux.lock();
if ((! dmr_queue.empty()) && (dmr_depth < MAX_DEPTH))
{
// encode the audio to dmr
auto packet = dmr_queue.pop();
auto devnum = current_dmr_vocoder / 3;
auto vocnum = current_dmr_vocoder % 3;
// push the packet onto the dmr vocoder's queue
dmr_device[devnum]->packet_queue[vocnum].push(packet);
// send the correct thing to the corrent dmr vocoder
if (ECodecType::dmr == packet->GetCodecIn())
dmr_device[devnum]->SendData(vocnum, packet->GetDMRData());
else
dmr_device[devnum]->SendAudio(vocnum, packet->GetAudio());
dmr_depth++;
// increment the dmr vocoder index
IncrementDMRVocoder();
did_nothing = false;
}
dmr_mux.unlock();
// both the dmr and dstar queue were empty, so sleep for a little while
if (did_nothing)
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}
void CController::AddFDSet(int &max, int newfd, fd_set *set) const
{
if (newfd > max)
max = newfd;
FD_SET(newfd, set);
}
// read vocoded (AMBE or audio) data from DVSI hardware
void CController::ReadAmbesThread()
{
while (keep_running)
{
int maxfd = -1;
fd_set FdSet;
FD_ZERO(&FdSet);
for (const auto &it : dstar_device)
{
AddFDSet(maxfd, it->GetFd(), &FdSet);
}
for (const auto &it : dmr_device)
{
AddFDSet(maxfd, it->GetFd(), &FdSet);
}
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 40000;
//wait for up to 40 ms to read something from any devices
auto rval = select(maxfd+1, &FdSet, nullptr, nullptr, &tv);
if (rval < 0)
{
std::cerr << "select() ERROR reading AMBE devices: " << strerror(errno) << std::endl;
}
if (rval > 0) {
// from the device file descriptor, we'll know if it's dstar or dmr
for (unsigned int i=0 ; i<dstar_device.size(); i++)
{
if (FD_ISSET(dstar_device[i]->GetFd(), &FdSet))
{
ReadDevice(dstar_device[i], EAmbeType::dstar);
FD_CLR(dstar_device[i]->GetFd(), &FdSet);
}
}
for (unsigned int i=0 ; i<dmr_device.size(); i++)
{
if (FD_ISSET(dmr_device[i]->GetFd(), &FdSet))
{
ReadDevice(dmr_device[i], EAmbeType::dmr);
FD_CLR(dmr_device[i]->GetFd(), &FdSet);
}
}
}
}
}
// Any audio packet recevied from the DVSI vocoders means that the original codec was AMBE (DStar or DMR).
// A) These audio packets need to be encoded, by the complimentary AMBE vocoder _and_ M17.
// Since code_in was AMBE, the audio will also be encoded to c2_3200, and copied to the packet.
// B) If we have read AMBE data from the vocoder, it needs to be put back into the packet.
// Finally if the packet is complete, it can be sent back to the reflector.
void CController::ReadDevice(std::shared_ptr<CDV3003> device, EAmbeType type)
{
// save the dmr/dstar type
const char *device_type = (EAmbeType::dstar==type) ? "D-Star" : "DMR";
// read the response from the vocoder
SDV3003_Packet devpacket;
if (device->GetResponse(devpacket))
{
std::cerr << "ERROR: could not get response from " << device_type << " device at " << device->GetDevicePath() << std::endl;
return;
}
// get the response type
bool is_audio;
if (PKT_SPEECH == devpacket.header.packet_type)
is_audio = true;
else if (PKT_CHANNEL == devpacket.header.packet_type)
is_audio = false;
else
{
std::string title("Unexpected ");
title.append(device_type);
title.append(" response packet");
device->dump(title.c_str(), &devpacket, packet_size(devpacket));
return;
}
//get the packet from either the dstar or dmr vocoder's queue
std::shared_ptr<CTranscoderPacket> packet;
if (EAmbeType::dstar == type)
{
dstar_mux.lock();
packet = device->packet_queue[devpacket.field_id-PKT_CHANNEL0].pop();
dstar_mux.unlock();
dstar_depth--;
}
else
{
dmr_mux.lock();
packet = device->packet_queue[devpacket.field_id-PKT_CHANNEL0].pop();
dmr_mux.unlock();
dmr_depth--;
}
if (is_audio)
{
//move the audio to the CTranscoderPacket
for (unsigned int i=0; i<160; i++)
packet->GetAudio()[i] = ntohs(devpacket.payload.audio.samples[i]);
#ifdef DEBUG
if (EAmbeType::dstar == type)
AppendWave(packet);
#endif
// we need to encode the m17
// encode the audio to c2_3200 (all ambe input vocodes to ECodecType::c2_3200)
c2_mux.lock();
codec2_queue.push(packet);
c2_mux.unlock();
// ... AND we need to encode the audio to the OTHER ambe codec
if (type == EAmbeType::dstar)
{
dmr_mux.lock();
dmr_queue.push(packet);
dmr_mux.unlock();
}
else
{
dstar_mux.lock();
dstar_queue.push(packet);
dstar_mux.unlock();
}
}
else /* the response is ambe data */
{
// put the AMBE data in the packet
if (type == EAmbeType::dmr)
{
packet->SetDMRData(devpacket.payload.ambe.data);
}
else
{
packet->SetDStarData(devpacket.payload.ambe.data);
}
// send it off, if it's done
if (packet->AllCodecsAreSet())
{
SendToReflector(packet);
}
}
}
void CController::SendToReflector(std::shared_ptr<CTranscoderPacket> packet)
{
// open a socket to the reflector channel
@ -524,6 +257,30 @@ void CController::SendToReflector(std::shared_ptr<CTranscoderPacket> packet)
#endif
}
void CController::RouteDstPacket(std::shared_ptr<CTranscoderPacket> packet)
{
if (ECodecType::dstar == packet->GetCodecIn())
dmr_device.AddPacket(packet);
else if (packet->AllCodecsAreSet())
{
send_mux.lock();
SendToReflector(packet);
send_mux.unlock();
}
}
void CController::RouteDmrPacket(std::shared_ptr<CTranscoderPacket> packet)
{
if (ECodecType::dmr == packet->GetCodecIn())
dstar_device.AddPacket(packet);
else if (packet->AllCodecsAreSet())
{
send_mux.lock();
SendToReflector(packet);
send_mux.unlock();
}
}
#ifdef DEBUG
void CController::AppendWave(const std::shared_ptr<CTranscoderPacket> packet) const
{

@ -16,9 +16,7 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#include <vector>
#include <map>
#include <set>
#include <memory>
#include <atomic>
#include <future>
@ -38,34 +36,31 @@ public:
CController();
bool Start();
void Stop();
void RouteDstPacket(std::shared_ptr<CTranscoderPacket> packet);
void RouteDmrPacket(std::shared_ptr<CTranscoderPacket> packet);
protected:
unsigned int dmr_vocoder_count, current_dmr_vocoder, dstar_vocoder_count, current_dstar_vocoder, dmr_depth, dstar_depth;
std::atomic<bool> keep_running;
std::future<void> reflectorFuture, readambeFuture, feedambeFuture, c2Future;
std::vector<std::shared_ptr<CDV3003>> dmr_device, dstar_device;
std::future<void> reflectorFuture, c2Future;
std::map<char, int16_t[160]> audio_store;
std::map<char, uint8_t[8]> data_store;
CUnixDgramReader reader;
CUnixDgramWriter writer;
CCodec2 c2_16{false};
CCodec2 c2_32{true};
CPacketQueue codec2_queue, dmr_queue, dstar_queue;
std::mutex dstar_mux, dmr_mux, c2_mux;
CDV3003 dstar_device{Encoding::dstar};
CDV3003 dmr_device{Encoding::dmr};
CPacketQueue codec2_queue;
std::mutex c2_mux, send_mux;
bool InitDevices();
void IncrementDMRVocoder(void);
void IncrementDStarVocoder(void);
// processing threads
void ReadReflectorThread();
void ReadAmbesThread();
void FeedAmbesThread();
void ProcessC2Thread();
void SendToReflector(std::shared_ptr<CTranscoderPacket> packet);
void Codec2toAudio(std::shared_ptr<CTranscoderPacket> packet);
void AudiotoCodec2(std::shared_ptr<CTranscoderPacket> packet);
void ReadDevice(std::shared_ptr<CDV3003> dv3003, EAmbeType type);
void AddFDSet(int &max, int newfd, fd_set *set) const;
void SendToReflector(std::shared_ptr<CTranscoderPacket> packet);
#ifdef DEBUG
void AppendWave(const std::shared_ptr<CTranscoderPacket> packet) const;
void AppendM17(const std::shared_ptr<CTranscoderPacket> packet) const;

@ -30,11 +30,15 @@
#include <iostream>
#include <iomanip>
#include <cerrno>
#include <thread>
#include "DV3003.h"
#include "configure.h"
#include "Controller.h"
CDV3003::CDV3003(Encoding t) : type(t), fd(-1)
extern CController Controller;
CDV3003::CDV3003(Encoding t) : type(t), fd(-1), ch_depth(0), sp_depth(0), current_vocoder(0)
{
}
@ -43,6 +47,19 @@ CDV3003::~CDV3003()
CloseDevice();
}
void CDV3003::CloseDevice()
{
keep_running = false;
if (fd >= 0) {
close(fd);
fd = -1;
}
if (feedFuture.valid())
feedFuture.get();
if (readFuture.valid())
readFuture.get();
}
bool CDV3003::checkResponse(SDV3003_Packet &p, uint8_t response) const
{
if(p.start_byte != PKT_HEADER || p.header.packet_type != PKT_CONTROL || p.field_id != response)
@ -51,11 +68,6 @@ bool CDV3003::checkResponse(SDV3003_Packet &p, uint8_t response) const
return false;
}
bool CDV3003::IsOpen() const
{
return fd >= 0;
}
std::string CDV3003::GetDevicePath() const
{
return devicepath;
@ -231,6 +243,9 @@ bool CDV3003::InitDV3003()
}
version.assign(responsePacket.payload.ctrl.data.version);
std::cout << "Found " << productid << " version " << version << " at " << devicepath << std::endl;
feedFuture = std::async(std::launch::async, &CDV3003::FeedDevice, this);
readFuture = std::async(std::launch::async, &CDV3003::ReadDevice, this);
return false;
}
@ -292,14 +307,6 @@ bool CDV3003::ConfigureVocoder(uint8_t pkt_ch, Encoding type)
return false;
}
void CDV3003::CloseDevice()
{
if (fd >= 0) {
close(fd);
fd = -1;
}
}
bool CDV3003::GetResponse(SDV3003_Packet &packet)
{
ssize_t bytesRead;
@ -355,6 +362,109 @@ bool CDV3003::GetResponse(SDV3003_Packet &packet)
return false;
}
void CDV3003::FeedDevice()
{
keep_running = true;
while (keep_running)
{
in_mux.lock();
auto packet = inq.pop();
in_mux.unlock();
if (packet)
{
bool device_is_full = true;
bool has_ambe = (Encoding::dstar==type) ? packet->DStarIsSet() : packet->DMRIsSet();
while (keep_running && device_is_full) // wait until there is room
{
if (has_ambe)
{
// we need to decode ambe to audio
if (ch_depth < 2)
device_is_full = false;
}
else
{
// we need to encode audio to ambe
if (sp_depth < 2)
device_is_full = false;
}
if (device_is_full)
std::this_thread::sleep_for(std::chrono::milliseconds(2));
}
if (keep_running)
{
voc_mux[current_vocoder].lock();
vocq->push(packet);
voc_mux[current_vocoder].unlock();
if (has_ambe)
{
SendAudio(current_vocoder, packet->GetAudio());
sp_depth++;
}
else
{
SendData(current_vocoder, (Encoding::dstar==type) ? packet->GetDStarData() : packet->GetDMRData());
ch_depth++;
}
if(++current_vocoder > 2)
current_vocoder = 0;
}
}
else // no packet is in the input queue
{
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}
}
void CDV3003::ReadDevice()
{
while (keep_running)
{
dv3003_packet p;
if (GetResponse(p))
{
unsigned int channel = p.field_id - PKT_CHANNEL0;
voc_mux[channel].lock();
auto packet = vocq[channel].pop();
voc_mux[channel].unlock();
if (PKT_CHANNEL == p.header.packet_type)
{
sp_depth--;
if (Encoding::dstar == type)
packet->SetDStarData(p.payload.ambe.data);
else
packet->SetDMRData(p.payload.ambe.data);
}
else if (PKT_SPEECH == p.header.packet_type)
{
ch_depth--;
auto pPCM = packet->GetAudio();
for (unsigned int i=0; i<160; i++)
pPCM[i] = ntohs(p.payload.audio.samples[i]);
}
else
{
dump("ReadDevice() ERROR: Read an unexpected device packet:", &p, packet_size(p));
continue;
}
if (Encoding::dstar == type)
Controller.RouteDstPacket(packet);
else
Controller.RouteDmrPacket(packet);
}
}
}
void CDV3003::AddPacket(const std::shared_ptr<CTranscoderPacket> packet)
{
in_mux.lock();
inq.push(packet);
in_mux.unlock();
}
bool CDV3003::SendAudio(const uint8_t channel, const int16_t *audio) const
{
// Create Audio packet based on input int8_ts

@ -19,6 +19,9 @@
#include <netinet/in.h>
#include <string>
#include <sstream>
#include <future>
#include <mutex>
#include <atomic>
#include "PacketQueue.h"
@ -104,23 +107,31 @@ public:
bool OpenDevice(const std::string &device, int baudrate);
bool InitDV3003();
bool ConfigureVocoder(uint8_t pkt_ch, Encoding type);
bool SendAudio(const uint8_t channel, const int16_t *audio) const;
bool SendData(const uint8_t channel, const uint8_t *data) const;
bool GetResponse(SDV3003_Packet &packet);
int GetFd() const { return fd; }
void CloseDevice();
bool IsOpen() const;
void dump(const char *title, void *data, int length) const;
std::string GetDevicePath() const;
std::string GetProductID() const;
std::string GetVersion() const;
void AddPacket(const std::shared_ptr<CTranscoderPacket> packet);
CPacketQueue packet_queue[3]; // we need a queue for each vocoder
CPacketQueue device_queue; // and a queue for input
private:
const Encoding type;
int fd;
std::atomic<unsigned int> ch_depth, sp_depth;
uint8_t current_vocoder;
std::atomic<bool> keep_running;
CPacketQueue vocq[3]; // we need a queue for each vocoder
std::mutex voc_mux[3];
CPacketQueue inq; // and input queue
std::mutex in_mux;
std::future<void> feedFuture, readFuture;
std::string devicepath, productid, version;
void FeedDevice();
void ReadDevice();
bool SetBaudRate(int baudrate);
bool checkResponse(SDV3003_Packet &responsePacket, uint8_t response) const;
bool SendAudio(const uint8_t channel, const int16_t *audio) const;
bool SendData(const uint8_t channel, const uint8_t *data) const;
bool GetResponse(SDV3003_Packet &packet);
void dump(const char *title, void *data, int length) const;
};

@ -19,17 +19,19 @@
#include "Controller.h"
// the global controller object
CController Controller;
int main()
{
CController controller;
if (controller.Start())
if (Controller.Start())
return EXIT_FAILURE;
std::cout << "Hybrid Transcoder Version #211220 Successfully started" << std::endl;
std::cout << "Hybrid Transcoder Version #211221 Successfully started" << std::endl;
pause();
controller.Stop();
Controller.Stop();
return EXIT_SUCCESS;
}

Loading…
Cancel
Save

Powered by TurnKey Linux.