You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ADN-DMR-Peer-Server/tts_engine.py

421 lines
15 KiB

#!/usr/bin/env python
#
###############################################################################
# Copyright (C) 2026 Joaquin Madrid Belando, EA5GVK <ea5gvk@gmail.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
###############################################################################
'''
TTS Engine for ADN-DMR-Peer-Server
Converts text files (.txt) to AMBE audio files (.ambe) for DMR transmission.
Pipeline: .txt -> gTTS -> .mp3 -> ffmpeg -> .wav (8kHz mono 16-bit) -> vocoder -> .ambe
Encoding options (in priority order):
1. AMBEServer (DV3000 remoto via UDP) - TTS_AMBESERVER_HOST/PORT in voice.cfg
2. External vocoder command - TTS_VOCODER_CMD in voice.cfg
3. Pre-converted .ambe file (bypass pipeline)
Requires:
- gTTS (pip install gTTS)
- ffmpeg (system package)
- One of: AMBEServer, external vocoder, or pre-converted .ambe files
'''
import os
import socket
import struct
import subprocess
import wave
import logging
logger = logging.getLogger(__name__)
_LANG_MAP = {
'es_ES': 'es', 'en_GB': 'en', 'en_US': 'en', 'fr_FR': 'fr',
'de_DE': 'de', 'it_IT': 'it', 'pt_PT': 'pt', 'pt_BR': 'pt',
'pl_PL': 'pl', 'nl_NL': 'nl', 'da_DK': 'da', 'sv_SE': 'sv',
'no_NO': 'no', 'el_GR': 'el', 'th_TH': 'th', 'cy_GB': 'cy',
'ca_ES': 'ca', 'gl_ES': 'gl', 'eu_ES': 'eu',
}
DV3K_START_BYTE = 0x61
DV3K_TYPE_CONTROL = 0x00
DV3K_TYPE_AMBE = 0x01
DV3K_TYPE_AUDIO = 0x02
DV3K_AMBE_FIELD_ID = 0x01
DV3K_AUDIO_FIELD_ID = 0x00
DV3K_SAMPLES_PER_FRAME = 160
DV3K_RATET_DMR = bytes([
0x61, 0x00, 0x02, 0x00, 0x09, 0x21
])
DV3K_PRODID_REQ = bytes([0x61, 0x00, 0x01, 0x00, 0x30])
def _get_tts_lang(announcement_language):
if announcement_language in _LANG_MAP:
return _LANG_MAP[announcement_language]
return announcement_language[:2]
def _generate_tts_audio(text, lang, mp3_path):
try:
from gtts import gTTS
except ImportError:
logger.error('(TTS) gTTS no esta instalado. Ejecuta: pip install gTTS')
return False
try:
tts = gTTS(text=text, lang=lang, slow=False)
tts.save(mp3_path)
logger.info('(TTS) Audio TTS generado: %s', mp3_path)
return True
except Exception as e:
logger.error('(TTS) Error generando audio TTS: %s', e)
return False
def _convert_to_wav(mp3_path, wav_path, volume_db=0, speed=1.0):
speed = max(0.5, min(2.0, speed))
_filters = []
if speed != 1.0:
_filters.append('atempo={:.2f}'.format(speed))
logger.info('(TTS) Aplicando velocidad: x%.2f', speed)
if volume_db != 0:
_filters.append('volume={}dB'.format(volume_db))
logger.info('(TTS) Aplicando ajuste de volumen: %ddB', volume_db)
_cmd = ['ffmpeg', '-y', '-i', mp3_path,
'-ar', '8000', '-ac', '1', '-sample_fmt', 's16']
if _filters:
_cmd += ['-af', ','.join(_filters)]
_cmd += ['-f', 'wav', wav_path]
try:
result = subprocess.run(_cmd, capture_output=True, timeout=60)
if result.returncode != 0:
logger.error('(TTS) Error ffmpeg: %s', result.stderr.decode('utf-8', errors='ignore')[:500])
return False
logger.info('(TTS) Audio convertido a WAV 8kHz mono: %s', wav_path)
return True
except FileNotFoundError:
logger.error('(TTS) ffmpeg no encontrado. Instala ffmpeg en el sistema')
return False
except subprocess.TimeoutExpired:
logger.error('(TTS) Timeout en conversion ffmpeg')
return False
except Exception as e:
logger.error('(TTS) Error en conversion de audio: %s', e)
return False
def _encode_ambe_vocoder(wav_path, ambe_path, vocoder_cmd):
if not vocoder_cmd:
return False
cmd = vocoder_cmd.replace('{wav}', wav_path).replace('{ambe}', ambe_path)
try:
result = subprocess.run(
cmd, shell=True, capture_output=True, timeout=120
)
if result.returncode != 0:
logger.error('(TTS) Error del vocoder: %s', result.stderr.decode('utf-8', errors='ignore')[:500])
return False
if not os.path.isfile(ambe_path):
logger.error('(TTS) El vocoder no genero el archivo AMBE: %s', ambe_path)
return False
logger.info('(TTS) Audio codificado a AMBE via vocoder externo: %s', ambe_path)
return True
except FileNotFoundError:
logger.error('(TTS) Comando vocoder no encontrado: %s', cmd.split()[0] if cmd else '(vacio)')
return False
except subprocess.TimeoutExpired:
logger.error('(TTS) Timeout en codificacion AMBE')
return False
except Exception as e:
logger.error('(TTS) Error ejecutando vocoder: %s', e)
return False
def _build_audio_packet(pcm_samples):
payload = struct.pack('BB', DV3K_AUDIO_FIELD_ID, len(pcm_samples))
for sample in pcm_samples:
payload += struct.pack('>h', sample)
header = bytes([DV3K_START_BYTE]) + struct.pack('>HB', len(payload), DV3K_TYPE_AUDIO)
return header + payload
def _parse_ambe_response(data):
if len(data) < 4:
return None
if data[0] != DV3K_START_BYTE:
return None
_payload_len = struct.unpack('>H', data[1:3])[0]
_pkt_type = data[3]
if _pkt_type == DV3K_TYPE_AMBE:
_field_id = data[4]
if _field_id == DV3K_AMBE_FIELD_ID:
_num_bits = data[5]
_num_bytes = (_num_bits + 7) // 8
_ambe_data = data[6:6 + _num_bytes]
return _ambe_data
if _pkt_type == DV3K_TYPE_CONTROL:
logger.debug('(TTS-AMBESERVER) Control response received (field_id: 0x%02X)', data[4] if len(data) > 4 else 0)
return None
def _encode_ambe_ambeserver(wav_path, ambe_path, host, port):
host = host.strip().strip('"').strip("'")
logger.info('(TTS-AMBESERVER) Conectando a AMBEServer %s:%d', host, port)
try:
_resolved_ip = socket.gethostbyname(host)
if _resolved_ip != host:
logger.info('(TTS-AMBESERVER) Host %s resuelto a %s', host, _resolved_ip)
host = _resolved_ip
except socket.gaierror as e:
logger.error('(TTS-AMBESERVER) No se puede resolver el host "%s": %s', host, e)
return False
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(5.0)
except Exception as e:
logger.error('(TTS-AMBESERVER) Error creando socket UDP: %s', e)
return False
try:
sock.sendto(DV3K_PRODID_REQ, (host, port))
data, addr = sock.recvfrom(1024)
if data[0] != DV3K_START_BYTE:
logger.error('(TTS-AMBESERVER) Respuesta invalida del AMBEServer')
sock.close()
return False
logger.info('(TTS-AMBESERVER) AMBEServer conectado correctamente')
except socket.timeout:
logger.error('(TTS-AMBESERVER) Timeout conectando a AMBEServer %s:%d - Verifica que el servidor esta activo', host, port)
sock.close()
return False
except Exception as e:
logger.error('(TTS-AMBESERVER) Error conectando a AMBEServer: %s', e)
sock.close()
return False
try:
sock.sendto(DV3K_RATET_DMR, (host, port))
data, addr = sock.recvfrom(1024)
if data[0] != DV3K_START_BYTE:
logger.error('(TTS-AMBESERVER) Error configurando RATET DMR')
sock.close()
return False
logger.info('(TTS-AMBESERVER) RATET DMR (AMBE+2 tabla 33) configurado')
except socket.timeout:
logger.error('(TTS-AMBESERVER) Timeout configurando RATET')
sock.close()
return False
try:
wf = wave.open(wav_path, 'rb')
except Exception as e:
logger.error('(TTS-AMBESERVER) Error abriendo WAV: %s', e)
sock.close()
return False
if wf.getsampwidth() != 2 or wf.getnchannels() != 1:
logger.error('(TTS-AMBESERVER) WAV debe ser mono 16-bit PCM')
wf.close()
sock.close()
return False
_total_frames = wf.getnframes()
_sample_rate = wf.getframerate()
logger.info('(TTS-AMBESERVER) WAV: %d muestras, %d Hz, duracion: %.1f s',
_total_frames, _sample_rate, _total_frames / _sample_rate)
_raw_frames = wf.readframes(_total_frames)
wf.close()
_samples = list(struct.unpack('<' + 'h' * (_total_frames), _raw_frames))
_ambe_frames = []
_frames_sent = 0
_frames_error = 0
for i in range(0, len(_samples), DV3K_SAMPLES_PER_FRAME):
_chunk = _samples[i:i + DV3K_SAMPLES_PER_FRAME]
if len(_chunk) < DV3K_SAMPLES_PER_FRAME:
_chunk = _chunk + [0] * (DV3K_SAMPLES_PER_FRAME - len(_chunk))
_audio_pkt = _build_audio_packet(_chunk)
try:
sock.sendto(_audio_pkt, (host, port))
data, addr = sock.recvfrom(1024)
_ambe_data = _parse_ambe_response(data)
if _ambe_data is not None:
_ambe_frames.append(_ambe_data)
_frames_sent += 1
else:
_frames_error += 1
logger.debug('(TTS-AMBESERVER) Frame %d: respuesta no AMBE (tipo: 0x%02X)',
i // DV3K_SAMPLES_PER_FRAME, data[3] if len(data) > 3 else 0)
except socket.timeout:
_frames_error += 1
logger.warning('(TTS-AMBESERVER) Timeout en frame %d', i // DV3K_SAMPLES_PER_FRAME)
except Exception as e:
_frames_error += 1
logger.error('(TTS-AMBESERVER) Error en frame %d: %s', i // DV3K_SAMPLES_PER_FRAME, e)
sock.close()
if not _ambe_frames:
logger.error('(TTS-AMBESERVER) No se recibieron frames AMBE')
return False
try:
with open(ambe_path, 'wb') as f:
for frame in _ambe_frames:
f.write(frame)
except Exception as e:
logger.error('(TTS-AMBESERVER) Error escribiendo archivo AMBE: %s', e)
return False
logger.info('(TTS-AMBESERVER) Codificacion completada: %d frames AMBE (%d errores), guardado en %s',
_frames_sent, _frames_error, ambe_path)
return True
def text_to_ambe(txt_path, ambe_path, language, vocoder_cmd, ambeserver_host='', ambeserver_port=2460, volume_db=0, speed=1.0):
if not os.path.isfile(txt_path):
logger.warning('(TTS) Archivo de texto no encontrado: %s', txt_path)
return False
if os.path.isfile(ambe_path):
txt_mtime = os.path.getmtime(txt_path)
ambe_mtime = os.path.getmtime(ambe_path)
if ambe_mtime > txt_mtime:
logger.info('(TTS) Usando AMBE cacheado (mas reciente que .txt): %s', ambe_path)
return True
with open(txt_path, 'r', encoding='utf-8') as f:
text = f.read().strip()
if not text:
logger.warning('(TTS) Archivo de texto vacio: %s', txt_path)
return False
logger.info('(TTS) Convirtiendo texto a AMBE: %s (%d caracteres, idioma: %s)', txt_path, len(text), language)
_dir = os.path.dirname(ambe_path)
if _dir:
os.makedirs(_dir, exist_ok=True)
_base = os.path.splitext(ambe_path)[0]
_mp3_path = _base + '.mp3'
_wav_path = _base + '.wav'
_tts_lang = _get_tts_lang(language)
if not _generate_tts_audio(text, _tts_lang, _mp3_path):
return False
if not _convert_to_wav(_mp3_path, _wav_path, volume_db, speed):
_cleanup([_mp3_path])
return False
_encoded = False
if ambeserver_host:
logger.info('(TTS) Usando AMBEServer %s:%d para codificacion AMBE', ambeserver_host, ambeserver_port)
_encoded = _encode_ambe_ambeserver(_wav_path, ambe_path, ambeserver_host, ambeserver_port)
if not _encoded:
logger.warning('(TTS) AMBEServer fallo, intentando vocoder externo...')
if not _encoded and vocoder_cmd:
logger.info('(TTS) Usando vocoder externo para codificacion AMBE')
_encoded = _encode_ambe_vocoder(_wav_path, ambe_path, vocoder_cmd)
if not _encoded:
logger.warning('(TTS) No se pudo codificar a AMBE. Archivos intermedios disponibles:')
logger.warning('(TTS) MP3: %s', _mp3_path)
logger.warning('(TTS) WAV: %s', _wav_path)
logger.warning('(TTS) Opciones para codificar:')
logger.warning('(TTS) 1. Configura TTS_AMBESERVER_HOST en voice.cfg (DV3000 remoto)')
logger.warning('(TTS) 2. Configura TTS_VOCODER_CMD en voice.cfg (vocoder local)')
logger.warning('(TTS) 3. Convierte manualmente el WAV a AMBE y guardalo como: %s', ambe_path)
return False
_cleanup([_mp3_path, _wav_path])
logger.info('(TTS) Conversion completada: %s -> %s', txt_path, ambe_path)
return True
def _cleanup(files):
for f in files:
try:
if os.path.isfile(f):
os.remove(f)
except Exception:
pass
def ensure_tts_ambe(config, tts_num):
_prefix = 'TTS_ANNOUNCEMENT{}'.format(tts_num)
if not config['GLOBAL'].get('{}_ENABLED'.format(_prefix), False):
return None
_file = config['GLOBAL']['{}_FILE'.format(_prefix)]
_lang = config['GLOBAL']['{}_LANGUAGE'.format(_prefix)]
_vocoder_cmd = config['GLOBAL'].get('TTS_VOCODER_CMD', '')
_ambeserver_host = config['GLOBAL'].get('TTS_AMBESERVER_HOST', '')
_ambeserver_port = config['GLOBAL'].get('TTS_AMBESERVER_PORT', 2460)
_volume_db = config['GLOBAL'].get('TTS_VOLUME', -3)
_speed = config['GLOBAL'].get('TTS_SPEED', 1.0)
_txt_path = './Audio/{}/ondemand/{}.txt'.format(_lang, _file)
_ambe_path = './Audio/{}/ondemand/{}.ambe'.format(_lang, _file)
if os.path.isfile(_ambe_path):
if not os.path.isfile(_txt_path):
logger.info('(TTS-%d) Usando archivo AMBE existente (sin .txt): %s', tts_num, _ambe_path)
return _ambe_path
txt_mtime = os.path.getmtime(_txt_path)
ambe_mtime = os.path.getmtime(_ambe_path)
if ambe_mtime > txt_mtime:
logger.debug('(TTS-%d) Usando AMBE cacheado: %s', tts_num, _ambe_path)
return _ambe_path
if not os.path.isfile(_txt_path):
logger.warning('(TTS-%d) Archivo de texto no encontrado: %s', tts_num, _txt_path)
return None
if text_to_ambe(_txt_path, _ambe_path, _lang, _vocoder_cmd, _ambeserver_host, _ambeserver_port, _volume_db, _speed):
return _ambe_path
else:
if os.path.isfile(_ambe_path):
logger.warning('(TTS-%d) Usando AMBE anterior (conversion fallo): %s', tts_num, _ambe_path)
return _ambe_path
return None

Powered by TurnKey Linux.