Run TTS conversion (gTTS+ffmpeg+AMBEServer) in separate thread via deferToThread to avoid blocking reactor

pull/26/head
Joaquin Madrid Belando 3 weeks ago
parent 2ce47b1fc9
commit 50b5e2ef3a

@ -53,7 +53,7 @@ from hashlib import blake2b
# Twisted is pretty important, so I keep it separate # Twisted is pretty important, so I keep it separate
from twisted.internet.protocol import Factory, Protocol from twisted.internet.protocol import Factory, Protocol
from twisted.protocols.basic import NetstringReceiver from twisted.protocols.basic import NetstringReceiver
from twisted.internet import reactor, task from twisted.internet import reactor, task, threads
from twisted.web.server import Site from twisted.web.server import Site
#from spyne import Application #from spyne import Application
@ -1251,19 +1251,30 @@ def scheduledTTSAnnouncement(_tts_num=1):
_tg = CONFIG['GLOBAL']['{}_TG'.format(_prefix)] _tg = CONFIG['GLOBAL']['{}_TG'.format(_prefix)]
_timeslot = CONFIG['GLOBAL']['{}_TIMESLOT'.format(_prefix)] _timeslot = CONFIG['GLOBAL']['{}_TIMESLOT'.format(_prefix)]
_lang = CONFIG['GLOBAL']['{}_LANGUAGE'.format(_prefix)] _lang = CONFIG['GLOBAL']['{}_LANGUAGE'.format(_prefix)]
_slot_index = 2 if _timeslot == 2 else 1
_dst_id = bytes_3(_tg)
_source_id = bytes_3(5000)
_peer_id = CONFIG['GLOBAL']['SERVER_ID']
_ambe_path = ensure_tts_ambe(CONFIG, _tts_num) _tts_running[_tts_num] = True
logger.info('(%s) Iniciando conversion TTS en hilo separado para %s', _label, _file)
d = threads.deferToThread(ensure_tts_ambe, CONFIG, _tts_num)
d.addCallback(_ttsConversionDone, _tts_num, _file, _tg, _timeslot, _lang, _mode, _label)
d.addErrback(_ttsConversionError, _tts_num, _label)
def _ttsConversionDone(_ambe_path, _tts_num, _file, _tg, _timeslot, _lang, _mode, _label):
global _tts_running
if not _ambe_path: if not _ambe_path:
_tts_running[_tts_num] = False
logger.warning('(%s) No AMBE file available for TTS announcement %s', _label, _file) logger.warning('(%s) No AMBE file available for TTS announcement %s', _label, _file)
return return
logger.info('(%s) Playing TTS file: %s to TG %s TS%s (mode: %s, lang: %s)', _label, _file, _tg, _timeslot, _mode, _lang) logger.info('(%s) Playing TTS file: %s to TG %s TS%s (mode: %s, lang: %s)', _label, _file, _tg, _timeslot, _mode, _lang)
_slot_index = 2 if _timeslot == 2 else 1
_dst_id = bytes_3(_tg)
_source_id = bytes_3(5000)
_peer_id = CONFIG['GLOBAL']['SERVER_ID']
_say = [] _say = []
try: try:
_relative_path = _ambe_path _relative_path = _ambe_path
@ -1273,9 +1284,11 @@ def scheduledTTSAnnouncement(_tts_num=1):
_relative_path = '/' + _relative_path[len('Audio'):] _relative_path = '/' + _relative_path[len('Audio'):]
_say.append(AMBEobj.readSingleFile(_relative_path)) _say.append(AMBEobj.readSingleFile(_relative_path))
except IOError: except IOError:
_tts_running[_tts_num] = False
logger.warning('(%s) Cannot read AMBE file: %s', _label, _ambe_path) logger.warning('(%s) Cannot read AMBE file: %s', _label, _ambe_path)
return return
except Exception as e: except Exception as e:
_tts_running[_tts_num] = False
logger.error('(%s) Error reading AMBE file: %s', _label, e) logger.error('(%s) Error reading AMBE file: %s', _label, e)
return return
@ -1321,6 +1334,7 @@ def scheduledTTSAnnouncement(_tts_num=1):
}) })
if not _targets: if not _targets:
_tts_running[_tts_num] = False
logger.info('(%s) No systems with connected peers to send to', _label) logger.info('(%s) No systems with connected peers to send to', _label)
return return
@ -1329,9 +1343,14 @@ def scheduledTTSAnnouncement(_tts_num=1):
if len(_targets) > 5: if len(_targets) > 5:
_sys_names += ', ... +{}'.format(len(_targets) - 5) _sys_names += ', ... +{}'.format(len(_targets) - 5)
logger.info('(%s) Broadcasting %s packets to %s systems simultaneously: %s', _label, len(_pkts), len(_targets), _sys_names) logger.info('(%s) Broadcasting %s packets to %s systems simultaneously: %s', _label, len(_pkts), len(_targets), _sys_names)
_tts_running[_tts_num] = True
reactor.callLater(1.0, _ttsSendBroadcast, _targets, _pkts, 0, _source_id, _dst_id, _tg, _timeslot, _tts_num) reactor.callLater(1.0, _ttsSendBroadcast, _targets, _pkts, 0, _source_id, _dst_id, _tg, _timeslot, _tts_num)
def _ttsConversionError(failure, _tts_num, _label):
global _tts_running
_tts_running[_tts_num] = False
logger.error('(%s) Error en conversion TTS: %s', _label, failure.getErrorMessage())
def _ttsSendBroadcast(_targets, _pkts, _pkt_idx, _source_id, _dst_id, _tg, _ts, _tts_num=1, _next_time=None): def _ttsSendBroadcast(_targets, _pkts, _pkt_idx, _source_id, _dst_id, _tg, _ts, _tts_num=1, _next_time=None):
global _tts_running global _tts_running

Loading…
Cancel
Save

Powered by TurnKey Linux.