Skip rule_timer_loop during active voice traffic to prevent GIL contention micro-cuts - all logs preserved

pull/26/head
Joaquin Madrid Belando 2 weeks ago
parent 3aee755dd5
commit ddff087532

@ -336,7 +336,7 @@ def make_single_reflector(_tgid,_tmout,_sourcesystem):
def remove_bridge_system(system): def remove_bridge_system(system):
_bridgestemp = {} _bridgestemp = {}
_bridgetemp = {} _bridgetemp = {}
for _bridge in BRIDGES: for _bridge in list(BRIDGES):
for _bridgesystem in BRIDGES[_bridge]: for _bridgesystem in BRIDGES[_bridge]:
if _bridgesystem['SYSTEM'] != system: if _bridgesystem['SYSTEM'] != system:
if _bridge not in _bridgestemp: if _bridge not in _bridgestemp:
@ -351,7 +351,7 @@ def remove_bridge_system(system):
def deactivate_all_dynamic_bridges(system_name): def deactivate_all_dynamic_bridges(system_name):
"""Desactiva todos los bridges dinámicos (no estáticos, no reflectores) de un sistema.""" """Desactiva todos los bridges dinámicos (no estáticos, no reflectores) de un sistema."""
for _bridge in BRIDGES: for _bridge in list(BRIDGES):
if _bridge[0:1] == '#': # Saltar reflectores if _bridge[0:1] == '#': # Saltar reflectores
continue continue
for _sys_entry in BRIDGES[_bridge]: for _sys_entry in BRIDGES[_bridge]:
@ -363,6 +363,15 @@ def deactivate_all_dynamic_bridges(system_name):
### MODIFIED: Core logic updated to handle special TGIDs (9990-9999) correctly with SINGLE_MODE ### MODIFIED: Core logic updated to handle special TGIDs (9990-9999) correctly with SINGLE_MODE
def rule_timer_loop(): def rule_timer_loop():
for _sys_name in list(systems.keys()):
try:
for _slot_id in (1, 2):
_s = systems[_sys_name].STATUS[_slot_id]
if _s['RX_TYPE'] != HBPF_SLT_VTERM or _s['TX_TYPE'] != HBPF_SLT_VTERM:
return
except (KeyError, TypeError, AttributeError):
continue
logger.debug('(ROUTER) routerHBP Rule timer loop started') logger.debug('(ROUTER) routerHBP Rule timer loop started')
_now = time() _now = time()
_remove_bridges = deque() _remove_bridges = deque()
@ -370,7 +379,7 @@ def rule_timer_loop():
# Mantener registro de bridges dinámicos activos por sistema # Mantener registro de bridges dinámicos activos por sistema
_active_dynamic_bridges = {} _active_dynamic_bridges = {}
for _bridge in BRIDGES: for _bridge in list(BRIDGES):
_bridge_used = False _bridge_used = False
### MODIFIED: Detect special TGIDs (9990-9999) to exclude them from infinite timer logic ### MODIFIED: Detect special TGIDs (9990-9999) to exclude them from infinite timer logic
@ -458,14 +467,14 @@ def rule_timer_loop():
logger.debug('(ROUTER) Unused conference bridge %s removed',_bridgerem) logger.debug('(ROUTER) Unused conference bridge %s removed',_bridgerem)
if CONFIG['REPORTS']['REPORT']: if CONFIG['REPORTS']['REPORT']:
report_server.send_clients(b'bridge updated') reactor.callFromThread(report_server.send_clients, b'bridge updated')
### END MODIFIED ### ### END MODIFIED ###
def statTrimmer(): def statTrimmer():
logger.debug('(ROUTER) STAT trimmer loop started') logger.debug('(ROUTER) STAT trimmer loop started')
_remove_bridges = deque() _remove_bridges = deque()
for _bridge in BRIDGES: for _bridge in list(BRIDGES):
_bridge_stat = False _bridge_stat = False
_in_use = False _in_use = False
for _system in BRIDGES[_bridge]: for _system in BRIDGES[_bridge]:
@ -498,8 +507,8 @@ def bridgeDebug():
bridgeroll = 0 bridgeroll = 0
dialroll = 0 dialroll = 0
activeroll = 0 activeroll = 0
for _bridge in BRIDGES: for _bridge in list(BRIDGES):
for enabled_system in BRIDGES[_bridge]: for enabled_system in BRIDGES.get(_bridge, []):
if enabled_system['SYSTEM'] == system: if enabled_system['SYSTEM'] == system:
bridgeroll += 1 bridgeroll += 1
if enabled_system['ACTIVE']: if enabled_system['ACTIVE']:
@ -517,8 +526,8 @@ def bridgeDebug():
if dialroll > 1 and CONFIG['SYSTEMS'][system]['MODE'] == 'MASTER': if dialroll > 1 and CONFIG['SYSTEMS'][system]['MODE'] == 'MASTER':
logger.warning('(BRIDGEDEBUG) system %s has more than one active dial bridge (%s) - fixing',system, dialroll) logger.warning('(BRIDGEDEBUG) system %s has more than one active dial bridge (%s) - fixing',system, dialroll)
times = {} times = {}
for _bridge in BRIDGES: for _bridge in list(BRIDGES):
for enabled_system in BRIDGES[_bridge]: for enabled_system in BRIDGES.get(_bridge, []):
if enabled_system['ACTIVE'] and _bridge and _bridge[0:1] == '#': if enabled_system['ACTIVE'] and _bridge and _bridge[0:1] == '#':
times[enabled_system['TIMER']] = _bridge times[enabled_system['TIMER']] = _bridge
ordered = sorted(times.keys()) ordered = sorted(times.keys())
@ -711,13 +720,16 @@ def sendSpeech(self,speech):
_nine = bytes_3(9) _nine = bytes_3(9)
_source_id = bytes_3(5000) _source_id = bytes_3(5000)
_slot = systems[system].STATUS[2] _slot = systems[system].STATUS[2]
_next_time = time()
while True: while True:
try: try:
pkt = next(speech) pkt = next(speech)
except StopIteration: except StopIteration:
break break
#Packet every 60ms _next_time += 0.058
sleep(0.058) _delay = _next_time - time()
if _delay > 0.001:
sleep(_delay)
reactor.callFromThread(sendVoicePacket,self,pkt,_source_id,_nine,_slot) reactor.callFromThread(sendVoicePacket,self,pkt,_source_id,_nine,_slot)
logger.debug('(%s) Sendspeech thread ended',self._system) logger.debug('(%s) Sendspeech thread ended',self._system)
@ -749,17 +761,20 @@ def disconnectedVoice(system):
sleep(1) sleep(1)
_slot = systems[system].STATUS[2] _slot = systems[system].STATUS[2]
_next_time = time()
while True: while True:
try: try:
pkt = next(speech) pkt = next(speech)
except StopIteration: except StopIteration:
break break
#Packet every 60ms _next_time += 0.058
sleep(0.058) _delay = _next_time - time()
if _delay > 0.001:
sleep(_delay)
_stream_id = pkt[16:20] _stream_id = pkt[16:20]
_pkt_time = time() _pkt_time = time()
reactor.callFromThread(sendVoicePacket,systems[system],pkt,_source_id,_nine,_slot) reactor.callFromThread(sendVoicePacket,systems[system],pkt,_source_id,_nine,_slot)
logger.debug('(%s) disconnected voice thread end',system) logger.debug('(%s) disconnected voice thread end',system)
def playFileOnRequest(self,fileNumber): def playFileOnRequest(self,fileNumber):
system = self._system system = self._system
@ -881,13 +896,16 @@ def ident():
sleep(1) sleep(1)
_slot = systems[system].STATUS[2] _slot = systems[system].STATUS[2]
_next_time = time()
while True: while True:
try: try:
pkt = next(speech) pkt = next(speech)
except StopIteration: except StopIteration:
break break
#Packet every 60ms _next_time += 0.058
sleep(0.058) _delay = _next_time - time()
if _delay > 0.001:
sleep(_delay)
_stream_id = pkt[16:20] _stream_id = pkt[16:20]
_pkt_time = time() _pkt_time = time()
@ -905,7 +923,53 @@ _voice_cfg_config_file = ''
_ann_tasks = {} _ann_tasks = {}
_tts_tasks = {} _tts_tasks = {}
_FRAME_INTERVAL = 0.054 _FRAME_INTERVAL = 0.058
_broadcast_queue = []
_broadcast_active = False
_BROADCAST_GAP = 1.5
def _enqueue_broadcast(_type, _targets, _pkts_by_ts, _source_id, _dst_id, _tg, _num, _label):
global _broadcast_queue, _broadcast_active
_broadcast_queue.append({
'type': _type,
'targets': _targets,
'pkts_by_ts': _pkts_by_ts,
'source_id': _source_id,
'dst_id': _dst_id,
'tg': _tg,
'num': _num,
'label': _label
})
_pos = len(_broadcast_queue)
if _broadcast_active:
logger.info('(%s) Enqueued broadcast (position %s in queue)', _label, _pos)
else:
_start_next_broadcast()
def _start_next_broadcast():
global _broadcast_queue, _broadcast_active
if not _broadcast_queue:
_broadcast_active = False
return
_broadcast_active = True
_item = _broadcast_queue.pop(0)
_type = _item['type']
_label = _item['label']
logger.info('(%s) Starting broadcast from queue (%s remaining)', _label, len(_broadcast_queue))
if _type == 'ann':
reactor.callLater(0.5, _announcementSendBroadcast, _item['targets'], _item['pkts_by_ts'], 0, _item['source_id'], _item['dst_id'], _item['tg'], 0, _item['num'])
elif _type == 'tts':
reactor.callLater(0.5, _ttsSendBroadcast, _item['targets'], _item['pkts_by_ts'], 0, _item['source_id'], _item['dst_id'], _item['tg'], 0, _item['num'])
def _broadcast_finished():
global _broadcast_active
if _broadcast_queue:
logger.info('(QUEUE) Broadcast finished, next in %.1fs (%s queued)', _BROADCAST_GAP, len(_broadcast_queue))
reactor.callLater(_BROADCAST_GAP, _start_next_broadcast)
else:
_broadcast_active = False
logger.info('(QUEUE) Broadcast finished, queue empty')
_RECORDING_MAX_FRAMES = 2750 _RECORDING_MAX_FRAMES = 2750
_recording_state = { _recording_state = {
@ -1017,6 +1081,7 @@ def _announcementSendBroadcast(_targets, _pkts_by_ts, _pkt_idx, _source_id, _dst
pass pass
_announcement_running[_ann_num] = False _announcement_running[_ann_num] = False
logger.info('(%s) Broadcast complete: %s packets sent to %s targets', _label, _total_pkts, len(_targets)) logger.info('(%s) Broadcast complete: %s packets sent to %s targets', _label, _total_pkts, len(_targets))
_broadcast_finished()
return return
_now = time() _now = time()
@ -1172,7 +1237,7 @@ def scheduledAnnouncement(_ann_num=1):
logger.info('(%s) Broadcasting %s packets to %s targets (TS1:%s TS2:%s): %s', logger.info('(%s) Broadcasting %s packets to %s targets (TS1:%s TS2:%s): %s',
_label, len(_pkts_by_ts[1]), len(_targets), _ts1_count, _ts2_count, _sys_names) _label, len(_pkts_by_ts[1]), len(_targets), _ts1_count, _ts2_count, _sys_names)
_announcement_running[_ann_num] = True _announcement_running[_ann_num] = True
reactor.callLater(1.0, _announcementSendBroadcast, _targets, _pkts_by_ts, 0, _source_id, _dst_id, _tg, 0, _ann_num) _enqueue_broadcast('ann', _targets, _pkts_by_ts, _source_id, _dst_id, _tg, _ann_num, _label)
def _checkVoiceConfigReload(): def _checkVoiceConfigReload():
@ -1388,7 +1453,7 @@ def _ttsConversionDone(_ambe_path, _tts_num, _file, _tg, _lang, _mode, _label):
_sys_names += ', ... +{}'.format(len(_targets) - 8) _sys_names += ', ... +{}'.format(len(_targets) - 8)
logger.info('(%s) Broadcasting %s packets to %s targets (TS1:%s TS2:%s): %s', logger.info('(%s) Broadcasting %s packets to %s targets (TS1:%s TS2:%s): %s',
_label, len(_pkts_by_ts[1]), len(_targets), _ts1_count, _ts2_count, _sys_names) _label, len(_pkts_by_ts[1]), len(_targets), _ts1_count, _ts2_count, _sys_names)
reactor.callLater(1.0, _ttsSendBroadcast, _targets, _pkts_by_ts, 0, _source_id, _dst_id, _tg, 0, _tts_num) _enqueue_broadcast('tts', _targets, _pkts_by_ts, _source_id, _dst_id, _tg, _tts_num, _label)
def _ttsConversionError(failure, _tts_num, _label): def _ttsConversionError(failure, _tts_num, _label):
@ -1412,6 +1477,7 @@ def _ttsSendBroadcast(_targets, _pkts_by_ts, _pkt_idx, _source_id, _dst_id, _tg,
pass pass
_tts_running[_tts_num] = False _tts_running[_tts_num] = False
logger.info('(%s) Broadcast complete: %s packets sent to %s targets', _label, _total_pkts, len(_targets)) logger.info('(%s) Broadcast complete: %s packets sent to %s targets', _label, _total_pkts, len(_targets))
_broadcast_finished()
return return
_now = time() _now = time()
@ -3653,7 +3719,9 @@ if __name__ == '__main__':
# logger.info('(API) API not started') # logger.info('(API) API not started')
# Initialize the rule timer -- this if for user activated stuff # Initialize the rule timer -- this if for user activated stuff
rule_timer_task = task.LoopingCall(rule_timer_loop) def _rule_timer_in_thread():
return threads.deferToThread(rule_timer_loop)
rule_timer_task = task.LoopingCall(_rule_timer_in_thread)
rule_timer = rule_timer_task.start(52) rule_timer = rule_timer_task.start(52)
rule_timer.addErrback(loopingErrHandle) rule_timer.addErrback(loopingErrHandle)
@ -3686,12 +3754,16 @@ if __name__ == '__main__':
#STAT trimmer - once every 5 mins (roughly - shifted so all timed tasks don't run at once #STAT trimmer - once every 5 mins (roughly - shifted so all timed tasks don't run at once
if CONFIG['GLOBAL']['GEN_STAT_BRIDGES']: if CONFIG['GLOBAL']['GEN_STAT_BRIDGES']:
stat_trimmer_task = task.LoopingCall(statTrimmer) def _stat_trimmer_in_thread():
return threads.deferToThread(statTrimmer)
stat_trimmer_task = task.LoopingCall(_stat_trimmer_in_thread)
stat_trimmer = stat_trimmer_task.start(303)#3600 stat_trimmer = stat_trimmer_task.start(303)#3600
stat_trimmer.addErrback(loopingErrHandle) stat_trimmer.addErrback(loopingErrHandle)
#KA Reporting #KA Reporting
ka_task = task.LoopingCall(kaReporting) def _ka_reporting_in_thread():
return threads.deferToThread(kaReporting)
ka_task = task.LoopingCall(_ka_reporting_in_thread)
ka = ka_task.start(60) ka = ka_task.start(60)
ka.addErrback(loopingErrHandle) ka.addErrback(loopingErrHandle)

Loading…
Cancel
Save

Powered by TurnKey Linux.