diff --git a/Controller.cpp b/Controller.cpp index 605bfa3..a5cd7f3 100644 --- a/Controller.cpp +++ b/Controller.cpp @@ -25,9 +25,7 @@ #include "TranscoderPacket.h" #include "Controller.h" -#define MAX_DEPTH 2 - -CController::CController() : dmr_vocoder_count(0), current_dmr_vocoder(0), dstar_vocoder_count(0), current_dstar_vocoder(0), dmr_depth(0), dstar_depth(0), keep_running(true) +CController::CController() : keep_running(true) { } @@ -39,8 +37,6 @@ bool CController::Start() return true; } reflectorFuture = std::async(std::launch::async, &CController::ReadReflectorThread, this); - readambeFuture = std::async(std::launch::async, &CController::ReadAmbesThread, this); - feedambeFuture = std::async(std::launch::async, &CController::FeedAmbesThread, this); c2Future = std::async(std::launch::async, &CController::ProcessC2Thread, this); return false; } @@ -51,31 +47,17 @@ void CController::Stop() if (reflectorFuture.valid()) reflectorFuture.get(); - if (readambeFuture.valid()) - readambeFuture.get(); - if (feedambeFuture.valid()) - feedambeFuture.get(); if (c2Future.valid()) c2Future.get(); reader.Close(); - for (auto &it : dstar_device) - { - it->CloseDevice(); - it.reset(); - } - dstar_device.clear(); - for (auto &it : dmr_device) - { - it->CloseDevice(); - it.reset(); - } - dmr_device.clear(); + dstar_device.CloseDevice(); + dmr_device.CloseDevice(); } bool CController::InitDevices() { - std::set deviceset; + std::vector deviceset; std::string device; for (int i=0; i<32; i++) { @@ -85,7 +67,7 @@ bool CController::InitDevices() if (access(device.c_str(), R_OK | W_OK)) break; else - deviceset.insert(device); + deviceset.push_back(device); } if (deviceset.empty()) { @@ -93,66 +75,28 @@ bool CController::InitDevices() return true; } - if (2 > deviceset.size()) + if (2 != deviceset.size()) { - std::cerr << "You need at least two DVSI 3003 devices" << std::endl; + std::cerr << "You need exactly two DVSI 3003 devices" << std::endl; return true; } // now initialize each device - // the first one will be a dstar device - Encoding type = Encoding::dstar; - for (const auto devpath : deviceset) - { - // instantiate it - auto a3003 = std::make_shared(type); - - // open it - if (a3003->OpenDevice(devpath, 921600)) - return true; - - // initialize it - a3003->InitDV3003(); - - // set each of the 3 vocoders to the current type - for (uint8_t channel=PKT_CHANNEL0; channel<=PKT_CHANNEL2; channel++) - { - if (a3003->ConfigureVocoder(channel, type)) - return true; - } + if (dstar_device.OpenDevice(deviceset[0], 921600) || dmr_device.OpenDevice(deviceset[1], 921600)) + return true; - // add it to the list, according to type - if (Encoding::dstar == type) - { - dstar_device.push_back(a3003); - dstar_vocoder_count += 3; - } - else - { - dmr_device.push_back(a3003); - dmr_vocoder_count += 3; - } + if (dstar_device.InitDV3003() || dmr_device.InitDV3003()) + return true; - // finally, toggle the type for the next device - type = (type == Encoding::dstar) ? Encoding::dmr : Encoding::dstar; + for (uint8_t ch=PKT_CHANNEL0; ch<=PKT_CHANNEL2; ch++) + { + if (dstar_device.ConfigureVocoder(ch, Encoding::dstar) || dmr_device.ConfigureVocoder(ch, Encoding::dmr)) + return true; } - - std::cout << "Device count: DStar=" << dstar_device.size() << " DMR=" << dmr_device.size() << std::endl; - return false; } -void CController::IncrementDMRVocoder() -{ - current_dmr_vocoder = (current_dmr_vocoder + 1) % dmr_vocoder_count; -} - -void CController::IncrementDStarVocoder() -{ - current_dstar_vocoder = (current_dstar_vocoder + 1) % dstar_vocoder_count; -} - // Encapsulate the incoming STCPacket into a CTranscoderPacket and push it into the appropriate queue // based on packet's codec_in. void CController::ReadReflectorThread() @@ -170,10 +114,10 @@ void CController::ReadReflectorThread() switch (packet->GetCodecIn()) { case ECodecType::dstar: - dstar_queue.push(packet); + dstar_device.AddPacket(packet); break; case ECodecType::dmr: - dmr_queue.push(packet); + dmr_device.AddPacket(packet); break; case ECodecType::c2_1600: case ECodecType::c2_3200: @@ -265,12 +209,8 @@ void CController::Codec2toAudio(std::shared_ptr packet) } } // the only thing left is to encode the two ambe, so push the packet onto both AMBE queues - dstar_mux.lock(); - dstar_queue.push(packet); - dstar_mux.unlock(); - dmr_mux.lock(); - dmr_queue.push(packet); - dmr_mux.unlock(); + dstar_device.AddPacket(packet); + dmr_device.AddPacket(packet); } void CController::ProcessC2Thread() @@ -302,213 +242,6 @@ void CController::ProcessC2Thread() } } -void CController::FeedAmbesThread() -{ - while (keep_running) - { - bool did_nothing = true; - - // If available, pop a packet from the dstar queue and send it for vocoding - dstar_mux.lock(); - if ((! dstar_queue.empty()) && (dstar_depth < MAX_DEPTH)) - { - // encode the audio to dstar - auto packet = dstar_queue.pop(); - auto devnum = current_dstar_vocoder / 3; - auto vocnum = current_dstar_vocoder % 3; - //push the packet onto the vocoder's queue - dstar_device[devnum]->packet_queue[vocnum].push(packet); - // send the correct thing to the current dstar vocoder - if (ECodecType::dstar == packet->GetCodecIn()) - dstar_device[devnum]->SendData(vocnum, packet->GetDStarData()); - else - dstar_device[devnum]->SendAudio(vocnum, packet->GetAudio()); - dstar_depth++; - // increment the dstar vocoder index - IncrementDStarVocoder(); - did_nothing = false; - } - dstar_mux.unlock(); - - // If available, pop a packet from the dmr queue and send it for vocoding - dmr_mux.lock(); - if ((! dmr_queue.empty()) && (dmr_depth < MAX_DEPTH)) - { - // encode the audio to dmr - auto packet = dmr_queue.pop(); - auto devnum = current_dmr_vocoder / 3; - auto vocnum = current_dmr_vocoder % 3; - // push the packet onto the dmr vocoder's queue - dmr_device[devnum]->packet_queue[vocnum].push(packet); - // send the correct thing to the corrent dmr vocoder - if (ECodecType::dmr == packet->GetCodecIn()) - dmr_device[devnum]->SendData(vocnum, packet->GetDMRData()); - else - dmr_device[devnum]->SendAudio(vocnum, packet->GetAudio()); - dmr_depth++; - // increment the dmr vocoder index - IncrementDMRVocoder(); - did_nothing = false; - } - dmr_mux.unlock(); - - // both the dmr and dstar queue were empty, so sleep for a little while - if (did_nothing) - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - } -} - -void CController::AddFDSet(int &max, int newfd, fd_set *set) const -{ - if (newfd > max) - max = newfd; - FD_SET(newfd, set); -} - -// read vocoded (AMBE or audio) data from DVSI hardware -void CController::ReadAmbesThread() -{ - while (keep_running) - { - int maxfd = -1; - fd_set FdSet; - FD_ZERO(&FdSet); - for (const auto &it : dstar_device) - { - AddFDSet(maxfd, it->GetFd(), &FdSet); - } - for (const auto &it : dmr_device) - { - AddFDSet(maxfd, it->GetFd(), &FdSet); - } - struct timeval tv; - tv.tv_sec = 0; - tv.tv_usec = 40000; - //wait for up to 40 ms to read something from any devices - auto rval = select(maxfd+1, &FdSet, nullptr, nullptr, &tv); - if (rval < 0) - { - std::cerr << "select() ERROR reading AMBE devices: " << strerror(errno) << std::endl; - } - if (rval > 0) { - // from the device file descriptor, we'll know if it's dstar or dmr - for (unsigned int i=0 ; iGetFd(), &FdSet)) - { - ReadDevice(dstar_device[i], EAmbeType::dstar); - FD_CLR(dstar_device[i]->GetFd(), &FdSet); - } - } - for (unsigned int i=0 ; iGetFd(), &FdSet)) - { - ReadDevice(dmr_device[i], EAmbeType::dmr); - FD_CLR(dmr_device[i]->GetFd(), &FdSet); - } - } - } - } -} - -// Any audio packet recevied from the DVSI vocoders means that the original codec was AMBE (DStar or DMR). -// A) These audio packets need to be encoded, by the complimentary AMBE vocoder _and_ M17. -// Since code_in was AMBE, the audio will also be encoded to c2_3200, and copied to the packet. -// B) If we have read AMBE data from the vocoder, it needs to be put back into the packet. -// Finally if the packet is complete, it can be sent back to the reflector. -void CController::ReadDevice(std::shared_ptr device, EAmbeType type) -{ - // save the dmr/dstar type - const char *device_type = (EAmbeType::dstar==type) ? "D-Star" : "DMR"; - - // read the response from the vocoder - SDV3003_Packet devpacket; - if (device->GetResponse(devpacket)) - { - std::cerr << "ERROR: could not get response from " << device_type << " device at " << device->GetDevicePath() << std::endl; - return; - } - - // get the response type - bool is_audio; - if (PKT_SPEECH == devpacket.header.packet_type) - is_audio = true; - else if (PKT_CHANNEL == devpacket.header.packet_type) - is_audio = false; - else - { - std::string title("Unexpected "); - title.append(device_type); - title.append(" response packet"); - device->dump(title.c_str(), &devpacket, packet_size(devpacket)); - return; - } - - //get the packet from either the dstar or dmr vocoder's queue - std::shared_ptr packet; - if (EAmbeType::dstar == type) - { - dstar_mux.lock(); - packet = device->packet_queue[devpacket.field_id-PKT_CHANNEL0].pop(); - dstar_mux.unlock(); - dstar_depth--; - } - else - { - dmr_mux.lock(); - packet = device->packet_queue[devpacket.field_id-PKT_CHANNEL0].pop(); - dmr_mux.unlock(); - dmr_depth--; - } - - if (is_audio) - { - //move the audio to the CTranscoderPacket - for (unsigned int i=0; i<160; i++) - packet->GetAudio()[i] = ntohs(devpacket.payload.audio.samples[i]); -#ifdef DEBUG - if (EAmbeType::dstar == type) - AppendWave(packet); -#endif - // we need to encode the m17 - // encode the audio to c2_3200 (all ambe input vocodes to ECodecType::c2_3200) - c2_mux.lock(); - codec2_queue.push(packet); - c2_mux.unlock(); - // ... AND we need to encode the audio to the OTHER ambe codec - if (type == EAmbeType::dstar) - { - dmr_mux.lock(); - dmr_queue.push(packet); - dmr_mux.unlock(); - } - else - { - dstar_mux.lock(); - dstar_queue.push(packet); - dstar_mux.unlock(); - } - } - else /* the response is ambe data */ - { - // put the AMBE data in the packet - if (type == EAmbeType::dmr) - { - packet->SetDMRData(devpacket.payload.ambe.data); - } - else - { - packet->SetDStarData(devpacket.payload.ambe.data); - } - // send it off, if it's done - if (packet->AllCodecsAreSet()) - { - SendToReflector(packet); - } - } -} - void CController::SendToReflector(std::shared_ptr packet) { // open a socket to the reflector channel @@ -524,6 +257,30 @@ void CController::SendToReflector(std::shared_ptr packet) #endif } +void CController::RouteDstPacket(std::shared_ptr packet) +{ + if (ECodecType::dstar == packet->GetCodecIn()) + dmr_device.AddPacket(packet); + else if (packet->AllCodecsAreSet()) + { + send_mux.lock(); + SendToReflector(packet); + send_mux.unlock(); + } +} + +void CController::RouteDmrPacket(std::shared_ptr packet) +{ + if (ECodecType::dmr == packet->GetCodecIn()) + dstar_device.AddPacket(packet); + else if (packet->AllCodecsAreSet()) + { + send_mux.lock(); + SendToReflector(packet); + send_mux.unlock(); + } +} + #ifdef DEBUG void CController::AppendWave(const std::shared_ptr packet) const { diff --git a/Controller.h b/Controller.h index a1cfa04..698cc07 100644 --- a/Controller.h +++ b/Controller.h @@ -16,9 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -#include #include -#include #include #include #include @@ -38,34 +36,31 @@ public: CController(); bool Start(); void Stop(); + void RouteDstPacket(std::shared_ptr packet); + void RouteDmrPacket(std::shared_ptr packet); protected: - unsigned int dmr_vocoder_count, current_dmr_vocoder, dstar_vocoder_count, current_dstar_vocoder, dmr_depth, dstar_depth; std::atomic keep_running; - std::future reflectorFuture, readambeFuture, feedambeFuture, c2Future; - std::vector> dmr_device, dstar_device; + std::future reflectorFuture, c2Future; std::map audio_store; std::map data_store; CUnixDgramReader reader; CUnixDgramWriter writer; CCodec2 c2_16{false}; CCodec2 c2_32{true}; - CPacketQueue codec2_queue, dmr_queue, dstar_queue; - std::mutex dstar_mux, dmr_mux, c2_mux; + CDV3003 dstar_device{Encoding::dstar}; + CDV3003 dmr_device{Encoding::dmr}; + + CPacketQueue codec2_queue; + std::mutex c2_mux, send_mux; bool InitDevices(); - void IncrementDMRVocoder(void); - void IncrementDStarVocoder(void); // processing threads void ReadReflectorThread(); - void ReadAmbesThread(); - void FeedAmbesThread(); void ProcessC2Thread(); - void SendToReflector(std::shared_ptr packet); void Codec2toAudio(std::shared_ptr packet); void AudiotoCodec2(std::shared_ptr packet); - void ReadDevice(std::shared_ptr dv3003, EAmbeType type); - void AddFDSet(int &max, int newfd, fd_set *set) const; + void SendToReflector(std::shared_ptr packet); #ifdef DEBUG void AppendWave(const std::shared_ptr packet) const; void AppendM17(const std::shared_ptr packet) const; diff --git a/DV3003.cpp b/DV3003.cpp index efb7790..b2f7c09 100644 --- a/DV3003.cpp +++ b/DV3003.cpp @@ -30,11 +30,15 @@ #include #include #include +#include #include "DV3003.h" #include "configure.h" +#include "Controller.h" -CDV3003::CDV3003(Encoding t) : type(t), fd(-1) +extern CController Controller; + +CDV3003::CDV3003(Encoding t) : type(t), fd(-1), ch_depth(0), sp_depth(0), current_vocoder(0) { } @@ -43,6 +47,19 @@ CDV3003::~CDV3003() CloseDevice(); } +void CDV3003::CloseDevice() +{ + keep_running = false; + if (fd >= 0) { + close(fd); + fd = -1; + } + if (feedFuture.valid()) + feedFuture.get(); + if (readFuture.valid()) + readFuture.get(); +} + bool CDV3003::checkResponse(SDV3003_Packet &p, uint8_t response) const { if(p.start_byte != PKT_HEADER || p.header.packet_type != PKT_CONTROL || p.field_id != response) @@ -51,11 +68,6 @@ bool CDV3003::checkResponse(SDV3003_Packet &p, uint8_t response) const return false; } -bool CDV3003::IsOpen() const -{ - return fd >= 0; -} - std::string CDV3003::GetDevicePath() const { return devicepath; @@ -231,6 +243,9 @@ bool CDV3003::InitDV3003() } version.assign(responsePacket.payload.ctrl.data.version); std::cout << "Found " << productid << " version " << version << " at " << devicepath << std::endl; + + feedFuture = std::async(std::launch::async, &CDV3003::FeedDevice, this); + readFuture = std::async(std::launch::async, &CDV3003::ReadDevice, this); return false; } @@ -292,14 +307,6 @@ bool CDV3003::ConfigureVocoder(uint8_t pkt_ch, Encoding type) return false; } -void CDV3003::CloseDevice() -{ - if (fd >= 0) { - close(fd); - fd = -1; - } -} - bool CDV3003::GetResponse(SDV3003_Packet &packet) { ssize_t bytesRead; @@ -355,6 +362,109 @@ bool CDV3003::GetResponse(SDV3003_Packet &packet) return false; } +void CDV3003::FeedDevice() +{ + keep_running = true; + while (keep_running) + { + in_mux.lock(); + auto packet = inq.pop(); + in_mux.unlock(); + if (packet) + { + bool device_is_full = true; + bool has_ambe = (Encoding::dstar==type) ? packet->DStarIsSet() : packet->DMRIsSet(); + + while (keep_running && device_is_full) // wait until there is room + { + if (has_ambe) + { + // we need to decode ambe to audio + if (ch_depth < 2) + device_is_full = false; + } + else + { + // we need to encode audio to ambe + if (sp_depth < 2) + device_is_full = false; + } + if (device_is_full) + std::this_thread::sleep_for(std::chrono::milliseconds(2)); + } + + if (keep_running) + { + voc_mux[current_vocoder].lock(); + vocq->push(packet); + voc_mux[current_vocoder].unlock(); + if (has_ambe) + { + SendAudio(current_vocoder, packet->GetAudio()); + sp_depth++; + } + else + { + SendData(current_vocoder, (Encoding::dstar==type) ? packet->GetDStarData() : packet->GetDMRData()); + ch_depth++; + } + if(++current_vocoder > 2) + current_vocoder = 0; + } + } + else // no packet is in the input queue + { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + } +} + +void CDV3003::ReadDevice() +{ + while (keep_running) + { + dv3003_packet p; + if (GetResponse(p)) + { + unsigned int channel = p.field_id - PKT_CHANNEL0; + voc_mux[channel].lock(); + auto packet = vocq[channel].pop(); + voc_mux[channel].unlock(); + if (PKT_CHANNEL == p.header.packet_type) + { + sp_depth--; + if (Encoding::dstar == type) + packet->SetDStarData(p.payload.ambe.data); + else + packet->SetDMRData(p.payload.ambe.data); + } + else if (PKT_SPEECH == p.header.packet_type) + { + ch_depth--; + auto pPCM = packet->GetAudio(); + for (unsigned int i=0; i<160; i++) + pPCM[i] = ntohs(p.payload.audio.samples[i]); + } + else + { + dump("ReadDevice() ERROR: Read an unexpected device packet:", &p, packet_size(p)); + continue; + } + if (Encoding::dstar == type) + Controller.RouteDstPacket(packet); + else + Controller.RouteDmrPacket(packet); + } + } +} + +void CDV3003::AddPacket(const std::shared_ptr packet) +{ + in_mux.lock(); + inq.push(packet); + in_mux.unlock(); +} + bool CDV3003::SendAudio(const uint8_t channel, const int16_t *audio) const { // Create Audio packet based on input int8_ts diff --git a/DV3003.h b/DV3003.h index fe7954b..8f39568 100644 --- a/DV3003.h +++ b/DV3003.h @@ -19,6 +19,9 @@ #include #include #include +#include +#include +#include #include "PacketQueue.h" @@ -104,23 +107,31 @@ public: bool OpenDevice(const std::string &device, int baudrate); bool InitDV3003(); bool ConfigureVocoder(uint8_t pkt_ch, Encoding type); - bool SendAudio(const uint8_t channel, const int16_t *audio) const; - bool SendData(const uint8_t channel, const uint8_t *data) const; - bool GetResponse(SDV3003_Packet &packet); - int GetFd() const { return fd; } void CloseDevice(); - bool IsOpen() const; - void dump(const char *title, void *data, int length) const; std::string GetDevicePath() const; std::string GetProductID() const; std::string GetVersion() const; + void AddPacket(const std::shared_ptr packet); - CPacketQueue packet_queue[3]; // we need a queue for each vocoder - CPacketQueue device_queue; // and a queue for input private: const Encoding type; int fd; + std::atomic ch_depth, sp_depth; + uint8_t current_vocoder; + std::atomic keep_running; + CPacketQueue vocq[3]; // we need a queue for each vocoder + std::mutex voc_mux[3]; + CPacketQueue inq; // and input queue + std::mutex in_mux; + std::future feedFuture, readFuture; std::string devicepath, productid, version; + + void FeedDevice(); + void ReadDevice(); bool SetBaudRate(int baudrate); bool checkResponse(SDV3003_Packet &responsePacket, uint8_t response) const; + bool SendAudio(const uint8_t channel, const int16_t *audio) const; + bool SendData(const uint8_t channel, const uint8_t *data) const; + bool GetResponse(SDV3003_Packet &packet); + void dump(const char *title, void *data, int length) const; }; diff --git a/Main.cpp b/Main.cpp index e82528c..1f37f10 100644 --- a/Main.cpp +++ b/Main.cpp @@ -19,17 +19,19 @@ #include "Controller.h" +// the global controller object +CController Controller; + int main() { - CController controller; - if (controller.Start()) + if (Controller.Start()) return EXIT_FAILURE; - std::cout << "Hybrid Transcoder Version #211220 Successfully started" << std::endl; + std::cout << "Hybrid Transcoder Version #211221 Successfully started" << std::endl; pause(); - controller.Stop(); + Controller.Stop(); return EXIT_SUCCESS; }