From 3a37220e147599e334a9319af925ae322f105a41 Mon Sep 17 00:00:00 2001 From: Joaquin Madrid Belando Date: Wed, 18 Mar 2026 19:29:46 +0100 Subject: [PATCH] Allow parallel broadcasts on different TGs - serialize only same-TG broadcasts, mark slots busy/free for TS conflict prevention --- bridge_master.py | 116 +++++++++++++++++++++++++++++++---------------- 1 file changed, 76 insertions(+), 40 deletions(-) diff --git a/bridge_master.py b/bridge_master.py index d399fbd..3d77bdf 100644 --- a/bridge_master.py +++ b/bridge_master.py @@ -922,50 +922,84 @@ _tts_tasks = {} _FRAME_INTERVAL = 0.058 _broadcast_queue = [] -_broadcast_active = False +_broadcast_active_tgs = set() _BROADCAST_GAP = 1.5 def _enqueue_broadcast(_type, _targets, _pkts_by_ts, _source_id, _dst_id, _tg, _num, _label): - global _broadcast_queue, _broadcast_active - _broadcast_queue.append({ - 'type': _type, - 'targets': _targets, - 'pkts_by_ts': _pkts_by_ts, - 'source_id': _source_id, - 'dst_id': _dst_id, - 'tg': _tg, - 'num': _num, - 'label': _label - }) - _pos = len(_broadcast_queue) - if _broadcast_active: - logger.info('(%s) Enqueued broadcast (position %s in queue)', _label, _pos) + global _broadcast_queue, _broadcast_active_tgs + _tg_key = str(_tg) + if _tg_key in _broadcast_active_tgs: + _broadcast_queue.append({ + 'type': _type, + 'targets': _targets, + 'pkts_by_ts': _pkts_by_ts, + 'source_id': _source_id, + 'dst_id': _dst_id, + 'tg': _tg, + 'num': _num, + 'label': _label + }) + _pos = len(_broadcast_queue) + logger.info('(%s) Enqueued broadcast for same TG %s (position %s in queue)', _label, _tg, _pos) else: - _start_next_broadcast() + _broadcast_active_tgs.add(_tg_key) + _mark_slots_busy(_targets) + logger.info('(%s) Starting broadcast immediately for TG %s (active TGs: %s)', _label, _tg, len(_broadcast_active_tgs)) + if _type == 'ann': + reactor.callLater(0.5, _announcementSendBroadcast, _targets, _pkts_by_ts, 0, _source_id, _dst_id, _tg, 0, _num) + elif _type == 'tts': + reactor.callLater(0.5, _ttsSendBroadcast, _targets, _pkts_by_ts, 0, _source_id, _dst_id, _tg, 0, _num) def _start_next_broadcast(): - global _broadcast_queue, _broadcast_active + global _broadcast_queue, _broadcast_active_tgs if not _broadcast_queue: - _broadcast_active = False return - _broadcast_active = True - _item = _broadcast_queue.pop(0) - _type = _item['type'] - _label = _item['label'] - logger.info('(%s) Starting broadcast from queue (%s remaining)', _label, len(_broadcast_queue)) + _next = None + for i, _item in enumerate(_broadcast_queue): + _tg_key = str(_item['tg']) + if _tg_key not in _broadcast_active_tgs: + _next = _broadcast_queue.pop(i) + break + if not _next: + return + _type = _next['type'] + _label = _next['label'] + _tg_key = str(_next['tg']) + _broadcast_active_tgs.add(_tg_key) + _mark_slots_busy(_next['targets']) + logger.info('(%s) Starting broadcast from queue for TG %s (%s remaining, active TGs: %s)', _label, _next['tg'], len(_broadcast_queue), len(_broadcast_active_tgs)) if _type == 'ann': - reactor.callLater(0.5, _announcementSendBroadcast, _item['targets'], _item['pkts_by_ts'], 0, _item['source_id'], _item['dst_id'], _item['tg'], 0, _item['num']) + reactor.callLater(0.5, _announcementSendBroadcast, _next['targets'], _next['pkts_by_ts'], 0, _next['source_id'], _next['dst_id'], _next['tg'], 0, _next['num']) elif _type == 'tts': - reactor.callLater(0.5, _ttsSendBroadcast, _item['targets'], _item['pkts_by_ts'], 0, _item['source_id'], _item['dst_id'], _item['tg'], 0, _item['num']) + reactor.callLater(0.5, _ttsSendBroadcast, _next['targets'], _next['pkts_by_ts'], 0, _next['source_id'], _next['dst_id'], _next['tg'], 0, _next['num']) -def _broadcast_finished(): - global _broadcast_active +def _broadcast_finished(_tg=None): + global _broadcast_active_tgs + if _tg is not None: + _tg_key = str(_tg) + _broadcast_active_tgs.discard(_tg_key) if _broadcast_queue: - logger.info('(QUEUE) Broadcast finished, next in %.1fs (%s queued)', _BROADCAST_GAP, len(_broadcast_queue)) + logger.info('(QUEUE) Broadcast finished for TG %s, checking queue (%s queued, active TGs: %s)', _tg, len(_broadcast_queue), len(_broadcast_active_tgs)) reactor.callLater(_BROADCAST_GAP, _start_next_broadcast) else: - _broadcast_active = False - logger.info('(QUEUE) Broadcast finished, queue empty') + if not _broadcast_active_tgs: + logger.info('(QUEUE) All broadcasts finished, queue empty') + else: + logger.info('(QUEUE) Broadcast finished for TG %s, %s TGs still active', _tg, len(_broadcast_active_tgs)) + +def _mark_slots_busy(_targets): + for _t in _targets: + try: + _t['slot']['TX_TYPE'] = HBPF_SLT_VHEAD + except (KeyError, TypeError): + pass + +def _mark_slots_free(_targets): + for _t in _targets: + try: + _t['slot']['TX_TYPE'] = HBPF_SLT_VTERM + except (KeyError, TypeError): + pass _RECORDING_MAX_FRAMES = 2750 _recording_state = { @@ -1068,6 +1102,7 @@ def _announcementSendBroadcast(_targets, _pkts_by_ts, _pkt_idx, _source_id, _dst _total_pkts = len(_pkts_by_ts[1]) if _pkt_idx >= _total_pkts: + _mark_slots_free(_targets) for _t in _targets: try: for _sid in list(_t['sys_obj'].STATUS.keys()): @@ -1077,7 +1112,7 @@ def _announcementSendBroadcast(_targets, _pkts_by_ts, _pkt_idx, _source_id, _dst pass _announcement_running[_ann_num] = False logger.info('(%s) Broadcast complete: %s packets sent to %s targets', _label, _total_pkts, len(_targets)) - _broadcast_finished() + _broadcast_finished(_tg) return _now = time() @@ -1143,14 +1178,14 @@ def scheduledAnnouncement(_ann_num=1, _retry=0): return _announcement_last_hour[_ann_num] = _now.hour - if _broadcast_active and _retry < 60: + _tg = CONFIG['GLOBAL']['{}_TG'.format(_prefix)] + if str(_tg) in _broadcast_active_tgs and _retry < 60: if _retry == 0: - logger.debug('(%s) Broadcast queue busy, deferring prep', _label) + logger.debug('(%s) Same TG %s already broadcasting, deferring prep', _label, _tg) reactor.callLater(3.0 + _ann_num * 0.5, scheduledAnnouncement, _ann_num, _retry + 1) return _file = CONFIG['GLOBAL']['{}_FILE'.format(_prefix)] - _tg = CONFIG['GLOBAL']['{}_TG'.format(_prefix)] _lang = CONFIG['GLOBAL']['{}_LANGUAGE'.format(_prefix)] _dst_id = bytes_3(_tg) _source_id = bytes_3(5000) @@ -1341,14 +1376,14 @@ def scheduledTTSAnnouncement(_tts_num=1, _retry=0): return _tts_last_hour[_tts_num] = _now.hour - if _broadcast_active and _retry < 60: + _tg = CONFIG['GLOBAL']['{}_TG'.format(_prefix)] + if str(_tg) in _broadcast_active_tgs and _retry < 60: if _retry == 0: - logger.debug('(%s) Broadcast queue busy, deferring TTS prep', _label) + logger.debug('(%s) Same TG %s already broadcasting, deferring TTS prep', _label, _tg) reactor.callLater(3.0 + _tts_num * 0.5, scheduledTTSAnnouncement, _tts_num, _retry + 1) return _file = CONFIG['GLOBAL']['{}_FILE'.format(_prefix)] - _tg = CONFIG['GLOBAL']['{}_TG'.format(_prefix)] _lang = CONFIG['GLOBAL']['{}_LANGUAGE'.format(_prefix)] _tts_running[_tts_num] = True @@ -1367,9 +1402,9 @@ def _ttsConversionDone(_ambe_path, _tts_num, _file, _tg, _lang, _mode, _label, _ logger.warning('(%s) No AMBE file available for TTS announcement %s', _label, _file) return - if _broadcast_active and _retry < 60: + if str(_tg) in _broadcast_active_tgs and _retry < 60: if _retry == 0: - logger.debug('(%s) Broadcast queue busy, deferring TTS packet prep', 'TTS-{}'.format(_tts_num)) + logger.debug('(%s) Same TG %s already broadcasting, deferring TTS packet prep', 'TTS-{}'.format(_tts_num), _tg) reactor.callLater(3.0 + _tts_num * 0.5, _ttsConversionDone, _ambe_path, _tts_num, _file, _tg, _lang, _mode, _label, _retry + 1) return @@ -1484,6 +1519,7 @@ def _ttsSendBroadcast(_targets, _pkts_by_ts, _pkt_idx, _source_id, _dst_id, _tg, _total_pkts = len(_pkts_by_ts[1]) if _pkt_idx >= _total_pkts: + _mark_slots_free(_targets) for _t in _targets: try: for _sid in list(_t['sys_obj'].STATUS.keys()): @@ -1493,7 +1529,7 @@ def _ttsSendBroadcast(_targets, _pkts_by_ts, _pkt_idx, _source_id, _dst_id, _tg, pass _tts_running[_tts_num] = False logger.info('(%s) Broadcast complete: %s packets sent to %s targets', _label, _total_pkts, len(_targets)) - _broadcast_finished() + _broadcast_finished(_tg) return _now = time()