diff --git a/bridge_master.py b/bridge_master.py index e595ffb..262e53a 100644 --- a/bridge_master.py +++ b/bridge_master.py @@ -336,7 +336,7 @@ def make_single_reflector(_tgid,_tmout,_sourcesystem): def remove_bridge_system(system): _bridgestemp = {} _bridgetemp = {} - for _bridge in list(BRIDGES): + for _bridge in BRIDGES: for _bridgesystem in BRIDGES[_bridge]: if _bridgesystem['SYSTEM'] != system: if _bridge not in _bridgestemp: @@ -351,7 +351,7 @@ def remove_bridge_system(system): def deactivate_all_dynamic_bridges(system_name): """Desactiva todos los bridges dinámicos (no estáticos, no reflectores) de un sistema.""" - for _bridge in list(BRIDGES): + for _bridge in BRIDGES: if _bridge[0:1] == '#': # Saltar reflectores continue for _sys_entry in BRIDGES[_bridge]: @@ -370,7 +370,7 @@ def rule_timer_loop(): # Mantener registro de bridges dinámicos activos por sistema _active_dynamic_bridges = {} - for _bridge in list(BRIDGES): + for _bridge in BRIDGES: _bridge_used = False ### MODIFIED: Detect special TGIDs (9990-9999) to exclude them from infinite timer logic @@ -458,14 +458,14 @@ def rule_timer_loop(): logger.debug('(ROUTER) Unused conference bridge %s removed',_bridgerem) if CONFIG['REPORTS']['REPORT']: - reactor.callFromThread(report_server.send_clients, b'bridge updated') + report_server.send_clients(b'bridge updated') ### END MODIFIED ### def statTrimmer(): logger.debug('(ROUTER) STAT trimmer loop started') _remove_bridges = deque() - for _bridge in list(BRIDGES): + for _bridge in BRIDGES: _bridge_stat = False _in_use = False for _system in BRIDGES[_bridge]: @@ -498,8 +498,8 @@ def bridgeDebug(): bridgeroll = 0 dialroll = 0 activeroll = 0 - for _bridge in list(BRIDGES): - for enabled_system in BRIDGES.get(_bridge, []): + for _bridge in BRIDGES: + for enabled_system in BRIDGES[_bridge]: if enabled_system['SYSTEM'] == system: bridgeroll += 1 if enabled_system['ACTIVE']: @@ -517,8 +517,8 @@ def bridgeDebug(): if dialroll > 1 and CONFIG['SYSTEMS'][system]['MODE'] == 'MASTER': logger.warning('(BRIDGEDEBUG) system %s has more than one active dial bridge (%s) - fixing',system, dialroll) times = {} - for _bridge in list(BRIDGES): - for enabled_system in BRIDGES.get(_bridge, []): + for _bridge in BRIDGES: + for enabled_system in BRIDGES[_bridge]: if enabled_system['ACTIVE'] and _bridge and _bridge[0:1] == '#': times[enabled_system['TIMER']] = _bridge ordered = sorted(times.keys()) @@ -711,16 +711,13 @@ def sendSpeech(self,speech): _nine = bytes_3(9) _source_id = bytes_3(5000) _slot = systems[system].STATUS[2] - _next_time = time() while True: try: pkt = next(speech) except StopIteration: break - _next_time += 0.058 - _delay = _next_time - time() - if _delay > 0.001: - sleep(_delay) + #Packet every 60ms + sleep(0.058) reactor.callFromThread(sendVoicePacket,self,pkt,_source_id,_nine,_slot) logger.debug('(%s) Sendspeech thread ended',self._system) @@ -752,20 +749,17 @@ def disconnectedVoice(system): sleep(1) _slot = systems[system].STATUS[2] - _next_time = time() while True: try: pkt = next(speech) except StopIteration: break - _next_time += 0.058 - _delay = _next_time - time() - if _delay > 0.001: - sleep(_delay) + #Packet every 60ms + sleep(0.058) _stream_id = pkt[16:20] _pkt_time = time() reactor.callFromThread(sendVoicePacket,systems[system],pkt,_source_id,_nine,_slot) - logger.debug('(%s) disconnected voice thread end',system) + logger.debug('(%s) disconnected voice thread end',system) def playFileOnRequest(self,fileNumber): system = self._system @@ -887,16 +881,13 @@ def ident(): sleep(1) _slot = systems[system].STATUS[2] - _next_time = time() while True: try: pkt = next(speech) except StopIteration: break - _next_time += 0.058 - _delay = _next_time - time() - if _delay > 0.001: - sleep(_delay) + #Packet every 60ms + sleep(0.058) _stream_id = pkt[16:20] _pkt_time = time() @@ -914,53 +905,7 @@ _voice_cfg_config_file = '' _ann_tasks = {} _tts_tasks = {} -_FRAME_INTERVAL = 0.058 - -_broadcast_queue = [] -_broadcast_active = False -_BROADCAST_GAP = 1.5 - -def _enqueue_broadcast(_type, _targets, _pkts_by_ts, _source_id, _dst_id, _tg, _num, _label): - global _broadcast_queue, _broadcast_active - _broadcast_queue.append({ - 'type': _type, - 'targets': _targets, - 'pkts_by_ts': _pkts_by_ts, - 'source_id': _source_id, - 'dst_id': _dst_id, - 'tg': _tg, - 'num': _num, - 'label': _label - }) - _pos = len(_broadcast_queue) - if _broadcast_active: - logger.info('(%s) Enqueued broadcast (position %s in queue)', _label, _pos) - else: - _start_next_broadcast() - -def _start_next_broadcast(): - global _broadcast_queue, _broadcast_active - if not _broadcast_queue: - _broadcast_active = False - return - _broadcast_active = True - _item = _broadcast_queue.pop(0) - _type = _item['type'] - _label = _item['label'] - logger.info('(%s) Starting broadcast from queue (%s remaining)', _label, len(_broadcast_queue)) - if _type == 'ann': - reactor.callLater(0.5, _announcementSendBroadcast, _item['targets'], _item['pkts_by_ts'], 0, _item['source_id'], _item['dst_id'], _item['tg'], 0, _item['num']) - elif _type == 'tts': - reactor.callLater(0.5, _ttsSendBroadcast, _item['targets'], _item['pkts_by_ts'], 0, _item['source_id'], _item['dst_id'], _item['tg'], 0, _item['num']) - -def _broadcast_finished(): - global _broadcast_active - if _broadcast_queue: - logger.info('(QUEUE) Broadcast finished, next in %.1fs (%s queued)', _BROADCAST_GAP, len(_broadcast_queue)) - reactor.callLater(_BROADCAST_GAP, _start_next_broadcast) - else: - _broadcast_active = False - logger.info('(QUEUE) Broadcast finished, queue empty') +_FRAME_INTERVAL = 0.054 _RECORDING_MAX_FRAMES = 2750 _recording_state = { @@ -1072,7 +1017,6 @@ def _announcementSendBroadcast(_targets, _pkts_by_ts, _pkt_idx, _source_id, _dst pass _announcement_running[_ann_num] = False logger.info('(%s) Broadcast complete: %s packets sent to %s targets', _label, _total_pkts, len(_targets)) - _broadcast_finished() return _now = time() @@ -1228,7 +1172,7 @@ def scheduledAnnouncement(_ann_num=1): logger.info('(%s) Broadcasting %s packets to %s targets (TS1:%s TS2:%s): %s', _label, len(_pkts_by_ts[1]), len(_targets), _ts1_count, _ts2_count, _sys_names) _announcement_running[_ann_num] = True - _enqueue_broadcast('ann', _targets, _pkts_by_ts, _source_id, _dst_id, _tg, _ann_num, _label) + reactor.callLater(1.0, _announcementSendBroadcast, _targets, _pkts_by_ts, 0, _source_id, _dst_id, _tg, 0, _ann_num) def _checkVoiceConfigReload(): @@ -1444,7 +1388,7 @@ def _ttsConversionDone(_ambe_path, _tts_num, _file, _tg, _lang, _mode, _label): _sys_names += ', ... +{}'.format(len(_targets) - 8) logger.info('(%s) Broadcasting %s packets to %s targets (TS1:%s TS2:%s): %s', _label, len(_pkts_by_ts[1]), len(_targets), _ts1_count, _ts2_count, _sys_names) - _enqueue_broadcast('tts', _targets, _pkts_by_ts, _source_id, _dst_id, _tg, _tts_num, _label) + reactor.callLater(1.0, _ttsSendBroadcast, _targets, _pkts_by_ts, 0, _source_id, _dst_id, _tg, 0, _tts_num) def _ttsConversionError(failure, _tts_num, _label): @@ -1468,7 +1412,6 @@ def _ttsSendBroadcast(_targets, _pkts_by_ts, _pkt_idx, _source_id, _dst_id, _tg, pass _tts_running[_tts_num] = False logger.info('(%s) Broadcast complete: %s packets sent to %s targets', _label, _total_pkts, len(_targets)) - _broadcast_finished() return _now = time() @@ -3710,9 +3653,7 @@ if __name__ == '__main__': # logger.info('(API) API not started') # Initialize the rule timer -- this if for user activated stuff - def _rule_timer_in_thread(): - return threads.deferToThread(rule_timer_loop) - rule_timer_task = task.LoopingCall(_rule_timer_in_thread) + rule_timer_task = task.LoopingCall(rule_timer_loop) rule_timer = rule_timer_task.start(52) rule_timer.addErrback(loopingErrHandle) @@ -3745,16 +3686,12 @@ if __name__ == '__main__': #STAT trimmer - once every 5 mins (roughly - shifted so all timed tasks don't run at once if CONFIG['GLOBAL']['GEN_STAT_BRIDGES']: - def _stat_trimmer_in_thread(): - return threads.deferToThread(statTrimmer) - stat_trimmer_task = task.LoopingCall(_stat_trimmer_in_thread) + stat_trimmer_task = task.LoopingCall(statTrimmer) stat_trimmer = stat_trimmer_task.start(303)#3600 stat_trimmer.addErrback(loopingErrHandle) #KA Reporting - def _ka_reporting_in_thread(): - return threads.deferToThread(kaReporting) - ka_task = task.LoopingCall(_ka_reporting_in_thread) + ka_task = task.LoopingCall(kaReporting) ka = ka_task.start(60) ka.addErrback(loopingErrHandle)