Merge pull request #31 from ea5gvk/pruebas

Allow parallel broadcasts on different TGs - serialize only same-TG b…
develop
Joaquin Madrid Belando 1 week ago committed by GitHub
commit f43ba8efa9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -922,50 +922,84 @@ _tts_tasks = {}
_FRAME_INTERVAL = 0.058
_broadcast_queue = []
_broadcast_active = False
_broadcast_active_tgs = set()
_BROADCAST_GAP = 1.5
def _enqueue_broadcast(_type, _targets, _pkts_by_ts, _source_id, _dst_id, _tg, _num, _label):
global _broadcast_queue, _broadcast_active
_broadcast_queue.append({
'type': _type,
'targets': _targets,
'pkts_by_ts': _pkts_by_ts,
'source_id': _source_id,
'dst_id': _dst_id,
'tg': _tg,
'num': _num,
'label': _label
})
_pos = len(_broadcast_queue)
if _broadcast_active:
logger.info('(%s) Enqueued broadcast (position %s in queue)', _label, _pos)
global _broadcast_queue, _broadcast_active_tgs
_tg_key = str(_tg)
if _tg_key in _broadcast_active_tgs:
_broadcast_queue.append({
'type': _type,
'targets': _targets,
'pkts_by_ts': _pkts_by_ts,
'source_id': _source_id,
'dst_id': _dst_id,
'tg': _tg,
'num': _num,
'label': _label
})
_pos = len(_broadcast_queue)
logger.info('(%s) Enqueued broadcast for same TG %s (position %s in queue)', _label, _tg, _pos)
else:
_start_next_broadcast()
_broadcast_active_tgs.add(_tg_key)
_mark_slots_busy(_targets)
logger.info('(%s) Starting broadcast immediately for TG %s (active TGs: %s)', _label, _tg, len(_broadcast_active_tgs))
if _type == 'ann':
reactor.callLater(0.5, _announcementSendBroadcast, _targets, _pkts_by_ts, 0, _source_id, _dst_id, _tg, 0, _num)
elif _type == 'tts':
reactor.callLater(0.5, _ttsSendBroadcast, _targets, _pkts_by_ts, 0, _source_id, _dst_id, _tg, 0, _num)
def _start_next_broadcast():
global _broadcast_queue, _broadcast_active
global _broadcast_queue, _broadcast_active_tgs
if not _broadcast_queue:
_broadcast_active = False
return
_broadcast_active = True
_item = _broadcast_queue.pop(0)
_type = _item['type']
_label = _item['label']
logger.info('(%s) Starting broadcast from queue (%s remaining)', _label, len(_broadcast_queue))
_next = None
for i, _item in enumerate(_broadcast_queue):
_tg_key = str(_item['tg'])
if _tg_key not in _broadcast_active_tgs:
_next = _broadcast_queue.pop(i)
break
if not _next:
return
_type = _next['type']
_label = _next['label']
_tg_key = str(_next['tg'])
_broadcast_active_tgs.add(_tg_key)
_mark_slots_busy(_next['targets'])
logger.info('(%s) Starting broadcast from queue for TG %s (%s remaining, active TGs: %s)', _label, _next['tg'], len(_broadcast_queue), len(_broadcast_active_tgs))
if _type == 'ann':
reactor.callLater(0.5, _announcementSendBroadcast, _item['targets'], _item['pkts_by_ts'], 0, _item['source_id'], _item['dst_id'], _item['tg'], 0, _item['num'])
reactor.callLater(0.5, _announcementSendBroadcast, _next['targets'], _next['pkts_by_ts'], 0, _next['source_id'], _next['dst_id'], _next['tg'], 0, _next['num'])
elif _type == 'tts':
reactor.callLater(0.5, _ttsSendBroadcast, _item['targets'], _item['pkts_by_ts'], 0, _item['source_id'], _item['dst_id'], _item['tg'], 0, _item['num'])
reactor.callLater(0.5, _ttsSendBroadcast, _next['targets'], _next['pkts_by_ts'], 0, _next['source_id'], _next['dst_id'], _next['tg'], 0, _next['num'])
def _broadcast_finished():
global _broadcast_active
def _broadcast_finished(_tg=None):
global _broadcast_active_tgs
if _tg is not None:
_tg_key = str(_tg)
_broadcast_active_tgs.discard(_tg_key)
if _broadcast_queue:
logger.info('(QUEUE) Broadcast finished, next in %.1fs (%s queued)', _BROADCAST_GAP, len(_broadcast_queue))
logger.info('(QUEUE) Broadcast finished for TG %s, checking queue (%s queued, active TGs: %s)', _tg, len(_broadcast_queue), len(_broadcast_active_tgs))
reactor.callLater(_BROADCAST_GAP, _start_next_broadcast)
else:
_broadcast_active = False
logger.info('(QUEUE) Broadcast finished, queue empty')
if not _broadcast_active_tgs:
logger.info('(QUEUE) All broadcasts finished, queue empty')
else:
logger.info('(QUEUE) Broadcast finished for TG %s, %s TGs still active', _tg, len(_broadcast_active_tgs))
def _mark_slots_busy(_targets):
for _t in _targets:
try:
_t['slot']['TX_TYPE'] = HBPF_SLT_VHEAD
except (KeyError, TypeError):
pass
def _mark_slots_free(_targets):
for _t in _targets:
try:
_t['slot']['TX_TYPE'] = HBPF_SLT_VTERM
except (KeyError, TypeError):
pass
_RECORDING_MAX_FRAMES = 2750
_recording_state = {
@ -1068,6 +1102,7 @@ def _announcementSendBroadcast(_targets, _pkts_by_ts, _pkt_idx, _source_id, _dst
_total_pkts = len(_pkts_by_ts[1])
if _pkt_idx >= _total_pkts:
_mark_slots_free(_targets)
for _t in _targets:
try:
for _sid in list(_t['sys_obj'].STATUS.keys()):
@ -1077,7 +1112,7 @@ def _announcementSendBroadcast(_targets, _pkts_by_ts, _pkt_idx, _source_id, _dst
pass
_announcement_running[_ann_num] = False
logger.info('(%s) Broadcast complete: %s packets sent to %s targets', _label, _total_pkts, len(_targets))
_broadcast_finished()
_broadcast_finished(_tg)
return
_now = time()
@ -1143,14 +1178,14 @@ def scheduledAnnouncement(_ann_num=1, _retry=0):
return
_announcement_last_hour[_ann_num] = _now.hour
if _broadcast_active and _retry < 60:
_tg = CONFIG['GLOBAL']['{}_TG'.format(_prefix)]
if str(_tg) in _broadcast_active_tgs and _retry < 60:
if _retry == 0:
logger.debug('(%s) Broadcast queue busy, deferring prep', _label)
logger.debug('(%s) Same TG %s already broadcasting, deferring prep', _label, _tg)
reactor.callLater(3.0 + _ann_num * 0.5, scheduledAnnouncement, _ann_num, _retry + 1)
return
_file = CONFIG['GLOBAL']['{}_FILE'.format(_prefix)]
_tg = CONFIG['GLOBAL']['{}_TG'.format(_prefix)]
_lang = CONFIG['GLOBAL']['{}_LANGUAGE'.format(_prefix)]
_dst_id = bytes_3(_tg)
_source_id = bytes_3(5000)
@ -1341,14 +1376,14 @@ def scheduledTTSAnnouncement(_tts_num=1, _retry=0):
return
_tts_last_hour[_tts_num] = _now.hour
if _broadcast_active and _retry < 60:
_tg = CONFIG['GLOBAL']['{}_TG'.format(_prefix)]
if str(_tg) in _broadcast_active_tgs and _retry < 60:
if _retry == 0:
logger.debug('(%s) Broadcast queue busy, deferring TTS prep', _label)
logger.debug('(%s) Same TG %s already broadcasting, deferring TTS prep', _label, _tg)
reactor.callLater(3.0 + _tts_num * 0.5, scheduledTTSAnnouncement, _tts_num, _retry + 1)
return
_file = CONFIG['GLOBAL']['{}_FILE'.format(_prefix)]
_tg = CONFIG['GLOBAL']['{}_TG'.format(_prefix)]
_lang = CONFIG['GLOBAL']['{}_LANGUAGE'.format(_prefix)]
_tts_running[_tts_num] = True
@ -1367,9 +1402,9 @@ def _ttsConversionDone(_ambe_path, _tts_num, _file, _tg, _lang, _mode, _label, _
logger.warning('(%s) No AMBE file available for TTS announcement %s', _label, _file)
return
if _broadcast_active and _retry < 60:
if str(_tg) in _broadcast_active_tgs and _retry < 60:
if _retry == 0:
logger.debug('(%s) Broadcast queue busy, deferring TTS packet prep', 'TTS-{}'.format(_tts_num))
logger.debug('(%s) Same TG %s already broadcasting, deferring TTS packet prep', 'TTS-{}'.format(_tts_num), _tg)
reactor.callLater(3.0 + _tts_num * 0.5, _ttsConversionDone, _ambe_path, _tts_num, _file, _tg, _lang, _mode, _label, _retry + 1)
return
@ -1484,6 +1519,7 @@ def _ttsSendBroadcast(_targets, _pkts_by_ts, _pkt_idx, _source_id, _dst_id, _tg,
_total_pkts = len(_pkts_by_ts[1])
if _pkt_idx >= _total_pkts:
_mark_slots_free(_targets)
for _t in _targets:
try:
for _sid in list(_t['sys_obj'].STATUS.keys()):
@ -1493,7 +1529,7 @@ def _ttsSendBroadcast(_targets, _pkts_by_ts, _pkt_idx, _source_id, _dst_id, _tg,
pass
_tts_running[_tts_num] = False
logger.info('(%s) Broadcast complete: %s packets sent to %s targets', _label, _total_pkts, len(_targets))
_broadcast_finished()
_broadcast_finished(_tg)
return
_now = time()

Loading…
Cancel
Save

Powered by TurnKey Linux.