diff --git a/Controller.cpp b/Controller.cpp index 05d23ed..e6d792b 100644 --- a/Controller.cpp +++ b/Controller.cpp @@ -20,11 +20,14 @@ #include #include #include +#include #include "TranscoderPacket.h" #include "Controller.h" -CController::CController() : dmr_vocoder_count(0), current_dmr_vocoder(0), dstar_vocoder_count(0), current_dstar_vocoder(0), keep_running(true) +#define MAX_DEPTH 3 + +CController::CController() : dmr_vocoder_count(0), current_dmr_vocoder(0), dstar_vocoder_count(0), current_dstar_vocoder(0), dmr_depth(0), dstar_depth(0), keep_running(true) { } @@ -35,18 +38,26 @@ bool CController::Start() keep_running = false; return true; } - reflectorThread = std::async(std::launch::async, &CController::ReadReflector, this); - ambeThread = std::async(std::launch::async, &CController::ReadAmbeDevices, this); + reflectorFuture = std::async(std::launch::async, &CController::ReadReflectorThread, this); + readambeFuture = std::async(std::launch::async, &CController::ReadAmbesThread, this); + feedambeFuture = std::async(std::launch::async, &CController::FeedAmbesThread, this); + c2Future = std::async(std::launch::async, &CController::ProcessC2Thread, this); return false; } void CController::Stop() { keep_running = false; - if (reflectorThread.valid()) - reflectorThread.get(); - if (ambeThread.valid()) - ambeThread.get(); + + if (reflectorFuture.valid()) + reflectorFuture.get(); + if (readambeFuture.valid()) + readambeFuture.get(); + if (feedambeFuture.valid()) + feedambeFuture.get(); + if (c2Future.valid()) + c2Future.get(); + reader.Close(); for (auto &it : dstar_device) { @@ -142,114 +153,197 @@ void CController::IncrementDStarVocoder() current_dstar_vocoder = (current_dstar_vocoder + 1) % dstar_vocoder_count; } -// DMR and YSF use the exact same codec, both AMBE. -// Incoming packets from clients have different starting codecs, AMBE or M17. -// The transcoder's task is to fill in the missing data. -// Incoming AMBE codecs go to the DVSI hardware for decoding to audio. The ambeThread will finish up. -// Incoming M17 codecs are decoded in software and the audio data send to the two different AMBE encoders. -// There is no need to transcode between M17 codecs. -void CController::ReadReflector() +void CController::ReadReflectorThread() { while (keep_running) { STCPacket tcpack; - // wait up to 40 ms to read something on the unix port - if (reader.Receive(&tcpack, 40)) + // wait up to 100 ms to read something on the unix port + if (reader.Receive(&tcpack, 100)) { // create a shared pointer to a new packet // there is only one CTranscoderPacket created for each new STCPacket received from the reflector auto packet = std::make_shared(tcpack); -#ifdef DEBUG - Dump(packet, "Incoming TC Packet:"); -#endif - unsigned int devnum, vocnum; + switch (packet->GetCodecIn()) { case ECodecType::dstar: - devnum = current_dstar_vocoder / 3; - vocnum = current_dstar_vocoder % 3; - //send it to the next available dstar vocoder - dstar_device[devnum]->SendData(vocnum, packet->GetDStarData()); - //push the packet onto that vocoder's queue - dstar_device[devnum]->packet_queue[vocnum].push(packet); - //increment the dstar vocoder index - IncrementDStarVocoder(); + dstar_queue.push(packet); break; case ECodecType::dmr: - devnum = current_dmr_vocoder / 3; - vocnum = current_dmr_vocoder % 3; - //send it to the next avaiable dmr vocoder - dmr_device[devnum]->SendData(vocnum, packet->GetDMRData()); - //push the packet onto that vocoder's queue - dmr_device[devnum]->packet_queue[vocnum].push(packet); - //increment the dmr vocoder index - IncrementDMRVocoder(); + dmr_queue.push(packet); break; case ECodecType::c2_1600: case ECodecType::c2_3200: - if (packet->IsSecond()) - { - if (packet->GetCodecIn() == ECodecType::c2_1600) - { - //copy the audio from local storage - memcpy(packet->GetAudio(), audio_store[packet->GetModule()], 320); - } - else /* codec_in is ECodecType::c2_3200 */ - { - //decode the second 8 data bytes - //move the 160 audio samples to the packet - c2_32.codec2_decode(packet->GetAudio(), packet->GetM17Data()+8); - } - } - else /* it's a "first packet" */ - { - if (packet->GetCodecIn() == ECodecType::c2_1600) - { - //c2_1600 encodes 40 ms of audio, 320 points, so... - //we need some temprary audio storage: - int16_t tmp[320]; - //decode it - c2_16.codec2_decode(tmp, packet->GetM17Data()); // 8 bytes input produces 320 audio points - //move the first and second half - memcpy(packet->GetAudio(), tmp, 320); - memcpy(audio_store[packet->GetModule()], tmp+160, 320); - } - else /* codec_in is ECodecType::c2_3200 */ - { - c2_32.codec2_decode(packet->GetAudio(), packet->GetM17Data()); - } - } - // encode the audio to dstar - devnum = current_dstar_vocoder / 3; - vocnum = current_dstar_vocoder % 3; - // send the audio to the current dstar vocoder - dstar_device[devnum]->SendAudio(vocnum, packet->GetAudio()); - //push the packet onto the vocoder's queue - dstar_device[devnum]->packet_queue[vocnum].push(packet); - // increment the dstar vocoder index - IncrementDStarVocoder(); - // encode the audio to dmr - devnum = current_dmr_vocoder / 3; - vocnum = current_dmr_vocoder % 3; - // send the audio to the corrent dmr vocoder - dmr_device[devnum]->SendAudio(vocnum, packet->GetAudio()); - // push the packet onto the dmr vocoder's queue - dmr_device[devnum]->packet_queue[vocnum].push(packet); - // increment the dmr vocoder index - IncrementDMRVocoder(); + c2_mux.lock(); + codec2_queue.push(packet); + c2_mux.unlock(); break; - case ECodecType::none: default: - std::cerr << "ERROR: Got a reflector packet with unknown Codec" << std::endl; -#ifdef DEBUG - Dump(packet, "This is what's in it:"); -#endif + Dump(packet, "ERROR: Got a reflector packet with unknown Codec:"); break; } } } } +void CController::AudiotoCodec2(std::shared_ptr packet) +{ + uint8_t m17data[8]; + c2_32.codec2_encode(m17data, packet->GetAudio()); + if (packet->IsSecond()) + { + //move the c2_3200 data to the second half of the M17 packet + packet->SetM17Data(m17data, EAudioSection::secondhalf); + } + else /* the packet is first */ + { + // move it into the packet + packet->SetM17Data(m17data, EAudioSection::firsthalf); + if (packet->IsLast()) + { + // we have an odd number of packets, so we have to finish up the m17 packet + const uint8_t silence[] = {0x00, 0x01, 0x43, 0x09, 0xe4, 0x9c, 0x08, 0x21 }; + //put codec silence in the second half of the codec + packet->SetM17Data(silence, EAudioSection::secondhalf); + } + } + // we've received the audio and we've calculated the m17 data, now we just need to + // calculate the other ambe data + if (packet->GetCodecIn() == ECodecType::dmr) + { + dstar_mux.lock(); + dstar_queue.push(packet); + dstar_mux.unlock(); + } + else /* the dmr/dstar type is dstar */ + { + dmr_mux.lock(); + dmr_queue.push(packet); + dmr_mux.unlock(); + } +} + +void CController::Codec2toAudio(std::shared_ptr packet) +{ + if (packet->IsSecond()) + { + if (packet->GetCodecIn() == ECodecType::c2_1600) + { + //copy the audio from local storage + memcpy(packet->GetAudio(), audio_store[packet->GetModule()], 320); + } + else /* codec_in is ECodecType::c2_3200 */ + { + //decode the second 8 data bytes + //move the 160 audio samples to the packet + c2_32.codec2_decode(packet->GetAudio(), packet->GetM17Data()+8); + } + } + else /* it's a "first packet" */ + { + if (packet->GetCodecIn() == ECodecType::c2_1600) + { + //c2_1600 encodes 40 ms of audio, 320 points, so... + //we need some temprary audio storage: + int16_t tmp[320]; + //decode it + c2_16.codec2_decode(tmp, packet->GetM17Data()); // 8 bytes input produces 320 audio points + //move the first and second half + memcpy(packet->GetAudio(), tmp, 320); + memcpy(audio_store[packet->GetModule()], tmp+160, 320); + } + else /* codec_in is ECodecType::c2_3200 */ + { + c2_32.codec2_decode(packet->GetAudio(), packet->GetM17Data()); + } + } + // the only thing left is to encode the two ambe + dstar_mux.lock(); + dstar_queue.push(packet); + dstar_mux.unlock(); + dmr_mux.lock(); + dmr_queue.push(packet); + dmr_mux.unlock(); +} + +void CController::ProcessC2Thread() +{ + while (keep_running) + { + c2_mux.lock(); + auto c2_queue_is_empty = codec2_queue.empty(); + c2_mux.unlock(); + if (c2_queue_is_empty) + { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + else + { + c2_mux.lock(); + auto packet = codec2_queue.pop(); + c2_mux.unlock(); + switch (packet->GetCodecIn()) + { + case ECodecType::c2_1600: + case ECodecType::c2_3200: + Codec2toAudio(packet); + break; + case ECodecType::dstar: + case ECodecType::dmr: + AudiotoCodec2(packet); + break; + } + } + } +} + +void CController::FeedAmbesThread() +{ + while (keep_running) + { + bool did_nothing = true; + dstar_mux.lock(); + if ((! dstar_queue.empty()) && (dstar_depth < MAX_DEPTH)) + { + // encode the audio to dstar + auto packet = dstar_queue.pop(); + auto devnum = current_dstar_vocoder / 3; + auto vocnum = current_dstar_vocoder % 3; + //push the packet onto the vocoder's queue + dstar_device[devnum]->packet_queue[vocnum].push(packet); + // send the audio to the current dstar vocoder + dstar_device[devnum]->SendAudio(vocnum, packet->GetAudio()); + dstar_depth++; + // increment the dstar vocoder index + IncrementDStarVocoder(); + did_nothing = false; + } + dstar_mux.unlock(); + + dmr_mux.lock(); + if ((! dmr_queue.empty()) && (dmr_depth < MAX_DEPTH)) + { + // encode the audio to dmr + auto packet = dmr_queue.pop(); + auto devnum = current_dmr_vocoder / 3; + auto vocnum = current_dmr_vocoder % 3; + // push the packet onto the dmr vocoder's queue + dmr_device[devnum]->packet_queue[vocnum].push(packet); + // send the audio to the corrent dmr vocoder + dmr_device[devnum]->SendAudio(vocnum, packet->GetAudio()); + dmr_depth++; + // increment the dmr vocoder index + IncrementDMRVocoder(); + did_nothing = false; + } + dmr_mux.unlock(); + + if (did_nothing) + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } +} + void CController::AddFDSet(int &max, int newfd, fd_set *set) const { if (newfd > max) @@ -258,7 +352,7 @@ void CController::AddFDSet(int &max, int newfd, fd_set *set) const } // read transcoded (AMBE or audio) data from DVSI hardware -void CController::ReadAmbeDevices() +void CController::ReadAmbesThread() { while (keep_running) { @@ -338,85 +432,56 @@ void CController::ReadDevice(std::shared_ptr device, EAmbeType type) } //get the packet from either the dstar or dmr vocoder's queue - auto spPacket = device->packet_queue[devpacket.field_id-PKT_CHANNEL0].pop(); + std::shared_ptr packet; + if (EAmbeType::dstar == type) + { + dstar_mux.lock(); + packet = device->packet_queue[devpacket.field_id-PKT_CHANNEL0].pop(); + dstar_depth--; + dstar_mux.unlock(); + } + else + { + dmr_mux.lock(); + packet = device->packet_queue[devpacket.field_id-PKT_CHANNEL0].pop(); + dmr_depth--; + dmr_mux.unlock(); + } if (is_audio) { //move the audio to the CTranscoderPacket for (unsigned int i=0; i<160; i++) - spPacket->GetAudio()[i] = ntohs(devpacket.payload.audio.samples[i]); + packet->GetAudio()[i] = ntohs(devpacket.payload.audio.samples[i]); // we need to encode the m17 // encode the audio to c2_3200 (all ambe input vocodes to ECodecType::c2_3200) - uint8_t m17data[8]; - c2_32.codec2_encode(m17data, spPacket->GetAudio()); - if (spPacket->IsSecond()) - { - //move the c2_3200 data to the second half of the M17 packet - spPacket->SetM17Data(m17data, EAudioSection::secondhalf); - } - else /* the packet is first */ - { - // move it into the packet - spPacket->SetM17Data(m17data, EAudioSection::firsthalf); - if (spPacket->IsLast()) - { - // we have an odd number of packets, so we have to finish up the m17 packet - const uint8_t silence[] = {0x00, 0x01, 0x43, 0x09, 0xe4, 0x9c, 0x08, 0x21 }; - //put codec silence in the second half of the codec - spPacket->SetM17Data(silence, EAudioSection::secondhalf); - } - } - // we've received the audio and we've calculated the m17 data, now we just need to - // calculate the other ambe data - if (type == EAmbeType::dmr) - { - const unsigned int devnum = current_dstar_vocoder / 3; - const unsigned int vocnum = current_dstar_vocoder % 3; - //send the audio packet to the next available dstar vocoder - dstar_device[devnum]->SendAudio(vocnum, spPacket->GetAudio()); - //push the packet onto the dstar vocoder's queue - dstar_device[devnum]->packet_queue[vocnum].push(spPacket); - //increment the dmr vocoder index - IncrementDStarVocoder(); - } - else /* the dmr/dstar type is dstar */ - { - const unsigned int devnum = current_dmr_vocoder / 3; - const unsigned int vocnum = current_dmr_vocoder % 3; - //send the audio packet to the next available dmr vocoder - dmr_device[devnum]->SendAudio(vocnum, spPacket->GetAudio()); - //push the packet onto the dmr vocoder's queue - dmr_device[devnum]->packet_queue[vocnum].push(spPacket); - //increment the dmr vocoder index - IncrementDMRVocoder(); - } + AudiotoCodec2(packet); } else /* the response is ambe data */ { // put the AMBE data in the packet if (type == EAmbeType::dmr) { - spPacket->SetDMRData(devpacket.payload.ambe.data); + packet->SetDMRData(devpacket.payload.ambe.data); } else { - spPacket->SetDStarData(devpacket.payload.ambe.data); + packet->SetDStarData(devpacket.payload.ambe.data); } // send it off, if it's done - if (spPacket->AllCodecsAreSet()) + if (packet->AllCodecsAreSet()) { // open a socket to the reflector channel CUnixDgramWriter socket; std::string name(TC2REF); - name.append(1, spPacket->GetModule()); + name.append(1, packet->GetModule()); socket.SetUp(name.c_str()); // send the packet over the socket - socket.Send(spPacket->GetTCPacket()); + socket.Send(packet->GetTCPacket()); // the socket will automatically close after sending #ifdef DEBUG - Dump(spPacket, "Completed Transcoder packet"); - AppendWave(spPacket); + AppendWave(packet); #endif } } diff --git a/Controller.h b/Controller.h index 91f32fb..76107c4 100644 --- a/Controller.h +++ b/Controller.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include "codec2.h" @@ -39,22 +40,28 @@ public: void Stop(); protected: - unsigned int dmr_vocoder_count, current_dmr_vocoder, dstar_vocoder_count, current_dstar_vocoder; + unsigned int dmr_vocoder_count, current_dmr_vocoder, dstar_vocoder_count, current_dstar_vocoder, dmr_depth, dstar_depth; std::atomic keep_running; - std::future reflectorThread, ambeThread; + std::future reflectorFuture, readambeFuture, feedambeFuture, c2Future; std::vector> dmr_device, dstar_device; std::map audio_store; CUnixDgramReader reader; CUnixDgramWriter writer; CCodec2 c2_16{false}; CCodec2 c2_32{true}; + CPacketQueue codec2_queue, dmr_queue, dstar_queue; + std::mutex dstar_mux, dmr_mux, c2_mux; bool InitDevices(); void IncrementDMRVocoder(void); void IncrementDStarVocoder(void); // processing threads - void ReadReflector(); - void ReadAmbeDevices(); + void ReadReflectorThread(); + void ReadAmbesThread(); + void FeedAmbesThread(); + void ProcessC2Thread(); + void Codec2toAudio(std::shared_ptr pack); + void AudiotoCodec2(std::shared_ptr pack); void ReadDevice(std::shared_ptr dv3003, EAmbeType type); void AddFDSet(int &max, int newfd, fd_set *set) const; #ifdef DEBUG diff --git a/DV3003.h b/DV3003.h index fa22287..fe7954b 100644 --- a/DV3003.h +++ b/DV3003.h @@ -116,6 +116,7 @@ public: std::string GetVersion() const; CPacketQueue packet_queue[3]; // we need a queue for each vocoder + CPacketQueue device_queue; // and a queue for input private: const Encoding type; int fd; diff --git a/PacketQueue.h b/PacketQueue.h index 25c0577..6ee361c 100644 --- a/PacketQueue.h +++ b/PacketQueue.h @@ -18,7 +18,6 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -#include #include #include @@ -32,39 +31,28 @@ public: std::shared_ptr pop() { std::shared_ptr pack; - mutex.lock(); if (! queue.empty()) { pack = queue.front(); queue.pop(); } - mutex.unlock(); - return std::move(pack); + return pack; } bool empty() { - mutex.lock(); - bool rval = queue.empty(); - mutex.unlock(); - return rval; + return queue.empty(); } void push(std::shared_ptr packet) { - mutex.lock(); queue.push(packet); - mutex.unlock(); } std::size_t size() { - mutex.lock(); - auto s = queue.size(); - mutex.unlock(); - return s; + return queue.size(); } protected: - std::mutex mutex; std::queue> queue; };