diff --git a/bridge_master.py b/bridge_master.py index 262e53a..7efd693 100644 --- a/bridge_master.py +++ b/bridge_master.py @@ -336,7 +336,7 @@ def make_single_reflector(_tgid,_tmout,_sourcesystem): def remove_bridge_system(system): _bridgestemp = {} _bridgetemp = {} - for _bridge in BRIDGES: + for _bridge in list(BRIDGES): for _bridgesystem in BRIDGES[_bridge]: if _bridgesystem['SYSTEM'] != system: if _bridge not in _bridgestemp: @@ -351,7 +351,7 @@ def remove_bridge_system(system): def deactivate_all_dynamic_bridges(system_name): """Desactiva todos los bridges dinámicos (no estáticos, no reflectores) de un sistema.""" - for _bridge in BRIDGES: + for _bridge in list(BRIDGES): if _bridge[0:1] == '#': # Saltar reflectores continue for _sys_entry in BRIDGES[_bridge]: @@ -363,6 +363,15 @@ def deactivate_all_dynamic_bridges(system_name): ### MODIFIED: Core logic updated to handle special TGIDs (9990-9999) correctly with SINGLE_MODE def rule_timer_loop(): + for _sys_name in list(systems.keys()): + try: + for _slot_id in (1, 2): + _s = systems[_sys_name].STATUS[_slot_id] + if _s['RX_TYPE'] != HBPF_SLT_VTERM or _s['TX_TYPE'] != HBPF_SLT_VTERM: + return + except (KeyError, TypeError, AttributeError): + continue + logger.debug('(ROUTER) routerHBP Rule timer loop started') _now = time() _remove_bridges = deque() @@ -370,7 +379,7 @@ def rule_timer_loop(): # Mantener registro de bridges dinámicos activos por sistema _active_dynamic_bridges = {} - for _bridge in BRIDGES: + for _bridge in list(BRIDGES): _bridge_used = False ### MODIFIED: Detect special TGIDs (9990-9999) to exclude them from infinite timer logic @@ -458,14 +467,14 @@ def rule_timer_loop(): logger.debug('(ROUTER) Unused conference bridge %s removed',_bridgerem) if CONFIG['REPORTS']['REPORT']: - report_server.send_clients(b'bridge updated') + reactor.callFromThread(report_server.send_clients, b'bridge updated') ### END MODIFIED ### def statTrimmer(): logger.debug('(ROUTER) STAT trimmer loop started') _remove_bridges = deque() - for _bridge in BRIDGES: + for _bridge in list(BRIDGES): _bridge_stat = False _in_use = False for _system in BRIDGES[_bridge]: @@ -498,8 +507,8 @@ def bridgeDebug(): bridgeroll = 0 dialroll = 0 activeroll = 0 - for _bridge in BRIDGES: - for enabled_system in BRIDGES[_bridge]: + for _bridge in list(BRIDGES): + for enabled_system in BRIDGES.get(_bridge, []): if enabled_system['SYSTEM'] == system: bridgeroll += 1 if enabled_system['ACTIVE']: @@ -517,8 +526,8 @@ def bridgeDebug(): if dialroll > 1 and CONFIG['SYSTEMS'][system]['MODE'] == 'MASTER': logger.warning('(BRIDGEDEBUG) system %s has more than one active dial bridge (%s) - fixing',system, dialroll) times = {} - for _bridge in BRIDGES: - for enabled_system in BRIDGES[_bridge]: + for _bridge in list(BRIDGES): + for enabled_system in BRIDGES.get(_bridge, []): if enabled_system['ACTIVE'] and _bridge and _bridge[0:1] == '#': times[enabled_system['TIMER']] = _bridge ordered = sorted(times.keys()) @@ -711,13 +720,16 @@ def sendSpeech(self,speech): _nine = bytes_3(9) _source_id = bytes_3(5000) _slot = systems[system].STATUS[2] + _next_time = time() while True: try: pkt = next(speech) except StopIteration: break - #Packet every 60ms - sleep(0.058) + _next_time += 0.058 + _delay = _next_time - time() + if _delay > 0.001: + sleep(_delay) reactor.callFromThread(sendVoicePacket,self,pkt,_source_id,_nine,_slot) logger.debug('(%s) Sendspeech thread ended',self._system) @@ -749,17 +761,20 @@ def disconnectedVoice(system): sleep(1) _slot = systems[system].STATUS[2] + _next_time = time() while True: try: pkt = next(speech) except StopIteration: break - #Packet every 60ms - sleep(0.058) + _next_time += 0.058 + _delay = _next_time - time() + if _delay > 0.001: + sleep(_delay) _stream_id = pkt[16:20] _pkt_time = time() reactor.callFromThread(sendVoicePacket,systems[system],pkt,_source_id,_nine,_slot) - logger.debug('(%s) disconnected voice thread end',system) + logger.debug('(%s) disconnected voice thread end',system) def playFileOnRequest(self,fileNumber): system = self._system @@ -881,13 +896,16 @@ def ident(): sleep(1) _slot = systems[system].STATUS[2] + _next_time = time() while True: try: pkt = next(speech) except StopIteration: break - #Packet every 60ms - sleep(0.058) + _next_time += 0.058 + _delay = _next_time - time() + if _delay > 0.001: + sleep(_delay) _stream_id = pkt[16:20] _pkt_time = time() @@ -905,7 +923,53 @@ _voice_cfg_config_file = '' _ann_tasks = {} _tts_tasks = {} -_FRAME_INTERVAL = 0.054 +_FRAME_INTERVAL = 0.058 + +_broadcast_queue = [] +_broadcast_active = False +_BROADCAST_GAP = 1.5 + +def _enqueue_broadcast(_type, _targets, _pkts_by_ts, _source_id, _dst_id, _tg, _num, _label): + global _broadcast_queue, _broadcast_active + _broadcast_queue.append({ + 'type': _type, + 'targets': _targets, + 'pkts_by_ts': _pkts_by_ts, + 'source_id': _source_id, + 'dst_id': _dst_id, + 'tg': _tg, + 'num': _num, + 'label': _label + }) + _pos = len(_broadcast_queue) + if _broadcast_active: + logger.info('(%s) Enqueued broadcast (position %s in queue)', _label, _pos) + else: + _start_next_broadcast() + +def _start_next_broadcast(): + global _broadcast_queue, _broadcast_active + if not _broadcast_queue: + _broadcast_active = False + return + _broadcast_active = True + _item = _broadcast_queue.pop(0) + _type = _item['type'] + _label = _item['label'] + logger.info('(%s) Starting broadcast from queue (%s remaining)', _label, len(_broadcast_queue)) + if _type == 'ann': + reactor.callLater(0.5, _announcementSendBroadcast, _item['targets'], _item['pkts_by_ts'], 0, _item['source_id'], _item['dst_id'], _item['tg'], 0, _item['num']) + elif _type == 'tts': + reactor.callLater(0.5, _ttsSendBroadcast, _item['targets'], _item['pkts_by_ts'], 0, _item['source_id'], _item['dst_id'], _item['tg'], 0, _item['num']) + +def _broadcast_finished(): + global _broadcast_active + if _broadcast_queue: + logger.info('(QUEUE) Broadcast finished, next in %.1fs (%s queued)', _BROADCAST_GAP, len(_broadcast_queue)) + reactor.callLater(_BROADCAST_GAP, _start_next_broadcast) + else: + _broadcast_active = False + logger.info('(QUEUE) Broadcast finished, queue empty') _RECORDING_MAX_FRAMES = 2750 _recording_state = { @@ -1017,6 +1081,7 @@ def _announcementSendBroadcast(_targets, _pkts_by_ts, _pkt_idx, _source_id, _dst pass _announcement_running[_ann_num] = False logger.info('(%s) Broadcast complete: %s packets sent to %s targets', _label, _total_pkts, len(_targets)) + _broadcast_finished() return _now = time() @@ -1172,7 +1237,7 @@ def scheduledAnnouncement(_ann_num=1): logger.info('(%s) Broadcasting %s packets to %s targets (TS1:%s TS2:%s): %s', _label, len(_pkts_by_ts[1]), len(_targets), _ts1_count, _ts2_count, _sys_names) _announcement_running[_ann_num] = True - reactor.callLater(1.0, _announcementSendBroadcast, _targets, _pkts_by_ts, 0, _source_id, _dst_id, _tg, 0, _ann_num) + _enqueue_broadcast('ann', _targets, _pkts_by_ts, _source_id, _dst_id, _tg, _ann_num, _label) def _checkVoiceConfigReload(): @@ -1388,7 +1453,7 @@ def _ttsConversionDone(_ambe_path, _tts_num, _file, _tg, _lang, _mode, _label): _sys_names += ', ... +{}'.format(len(_targets) - 8) logger.info('(%s) Broadcasting %s packets to %s targets (TS1:%s TS2:%s): %s', _label, len(_pkts_by_ts[1]), len(_targets), _ts1_count, _ts2_count, _sys_names) - reactor.callLater(1.0, _ttsSendBroadcast, _targets, _pkts_by_ts, 0, _source_id, _dst_id, _tg, 0, _tts_num) + _enqueue_broadcast('tts', _targets, _pkts_by_ts, _source_id, _dst_id, _tg, _tts_num, _label) def _ttsConversionError(failure, _tts_num, _label): @@ -1412,6 +1477,7 @@ def _ttsSendBroadcast(_targets, _pkts_by_ts, _pkt_idx, _source_id, _dst_id, _tg, pass _tts_running[_tts_num] = False logger.info('(%s) Broadcast complete: %s packets sent to %s targets', _label, _total_pkts, len(_targets)) + _broadcast_finished() return _now = time() @@ -3653,7 +3719,9 @@ if __name__ == '__main__': # logger.info('(API) API not started') # Initialize the rule timer -- this if for user activated stuff - rule_timer_task = task.LoopingCall(rule_timer_loop) + def _rule_timer_in_thread(): + return threads.deferToThread(rule_timer_loop) + rule_timer_task = task.LoopingCall(_rule_timer_in_thread) rule_timer = rule_timer_task.start(52) rule_timer.addErrback(loopingErrHandle) @@ -3686,12 +3754,16 @@ if __name__ == '__main__': #STAT trimmer - once every 5 mins (roughly - shifted so all timed tasks don't run at once if CONFIG['GLOBAL']['GEN_STAT_BRIDGES']: - stat_trimmer_task = task.LoopingCall(statTrimmer) + def _stat_trimmer_in_thread(): + return threads.deferToThread(statTrimmer) + stat_trimmer_task = task.LoopingCall(_stat_trimmer_in_thread) stat_trimmer = stat_trimmer_task.start(303)#3600 stat_trimmer.addErrback(loopingErrHandle) #KA Reporting - ka_task = task.LoopingCall(kaReporting) + def _ka_reporting_in_thread(): + return threads.deferToThread(kaReporting) + ka_task = task.LoopingCall(_ka_reporting_in_thread) ka = ka_task.start(60) ka.addErrback(loopingErrHandle)