From 962e7e3374d707e7f644bd0c10d9460f4f6e8e27 Mon Sep 17 00:00:00 2001 From: Joaquin Madrid Belando Date: Tue, 3 Mar 2026 21:14:38 +0100 Subject: [PATCH] Update bridge_master.py --- bridge_master.py | 264 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 261 insertions(+), 3 deletions(-) diff --git a/bridge_master.py b/bridge_master.py index a74cd4b..370a52f 100644 --- a/bridge_master.py +++ b/bridge_master.py @@ -36,6 +36,7 @@ This program currently only works with group voice calls. # Python modules we need import sys +import os from bitarray import bitarray from time import time,sleep,perf_counter import importlib.util @@ -79,7 +80,7 @@ from i8n_voice_map import voiceMap # Stuff for socket reporting import pickle -# REMOVE LATER from datetime import datetime +from datetime import datetime # The module needs logging, but handlers, etc. are controlled by the parent import logging logger = logging.getLogger(__name__) @@ -99,8 +100,8 @@ __author__ = 'Cortney T. Buffington, N0MJS, Forked by Simon Adlem - G7RZU, F __copyright__ = 'Copyright (c) 2016-2019 Cortney T. Buffington, N0MJS and the K0USY Group, Simon Adlem G7RZU 2020-2023, Esteban Mackay, HP3ICC 2024-2026' __credits__ = 'Colin Durbridge, G4EML, Steve Zingman, N4IRS; Mike Zingman, N4IRR; Jonathan Naylor, G4KLX; Hans Barthen, DL5DI; Torsten Shultze, DG1HT; Jon Lee, G4TSN; Norman Williams, M6NBP, Eric Craw KF7EEL, Simon Adlem - G7RZU, Bruno Farias CS8ABG, Esteban Mackay HP3ICC, Joaquin Madrid Belando EA5GVK' __license__ = 'GNU GPLv3' -__maintainer__ = 'Esteban Mackay, HP3ICC' -__email__ = 'setcom40@gmail.com' +__maintainer__ = 'Esteban Mackay, HP3ICC - Joaquin Madrid, EA5GVK' +__email__ = 'setcom40@gmail.com - ea5gvk@gmail.com' #Set header bits #used for slot rewrite and type rewrite @@ -873,6 +874,238 @@ def ident(): _pkt_time = time() reactor.callFromThread(sendVoicePacket,systems[system],pkt,_source_id,_dst_id,_slot) +_announcement_last_hour = {1: -1, 2: -1, 3: -1, 4: -1} +_announcement_running = {1: False, 2: False, 3: False, 4: False} + +_FRAME_INTERVAL = 0.054 + +_RECORDING_MAX_FRAMES = 2750 +_recording_state = { + 'active': False, + 'stream_id': None, + 'bursts': bitarray(endian='big'), + 'start_time': 0, + 'frames': 0, + 'rf_src': None +} + +def _handleRecording(dmrpkt, _frame_type, _dtype_vseq, _stream_id, pkt_time, _rf_src, _int_dst_id, _slot): + global _recording_state + + if not CONFIG['GLOBAL']['RECORDING_ENABLED']: + return + if _int_dst_id != CONFIG['GLOBAL']['RECORDING_TG'] or _slot != CONFIG['GLOBAL']['RECORDING_TIMESLOT']: + return + + if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD: + if _recording_state['active'] and _recording_state['stream_id'] != _stream_id: + logger.info('(GRABACION) Nueva transmision detectada, guardando grabacion anterior (%d frames)', _recording_state['frames']) + _saveRecording() + _recording_state['active'] = True + _recording_state['stream_id'] = _stream_id + _recording_state['bursts'] = bitarray(endian='big') + _recording_state['start_time'] = pkt_time + _recording_state['frames'] = 0 + _recording_state['rf_src'] = _rf_src + logger.info('(GRABACION) Grabacion iniciada - SUB: %s, TG: %s, TS: %s', int_id(_rf_src), _int_dst_id, _slot) + return + + if not _recording_state['active'] or _recording_state['stream_id'] != _stream_id: + return + + if _frame_type in (HBPF_VOICE, HBPF_VOICE_SYNC): + _bits_data = bitarray(endian='big') + _bits_data.frombytes(dmrpkt) + _recording_state['bursts'].extend(_bits_data[:108]) + _recording_state['bursts'].extend(_bits_data[156:264]) + _recording_state['frames'] += 1 + + if _recording_state['frames'] >= _RECORDING_MAX_FRAMES: + logger.info('(GRABACION) Duracion maxima alcanzada (%d frames), guardando', _recording_state['frames']) + _saveRecording() + return + + if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VTERM: + _saveRecording() + return + +def _saveRecording(): + global _recording_state + if not _recording_state['active'] or _recording_state['frames'] == 0: + _recording_state['active'] = False + _recording_state['stream_id'] = None + logger.warning('(GRABACION) No hay frames grabados, descartando') + return + + _lang = CONFIG['GLOBAL']['RECORDING_LANGUAGE'] + _file = CONFIG['GLOBAL']['RECORDING_FILE'] + _dir = './Audio/{}/ondemand/'.format(_lang) + _path = _dir + _file + '.ambe' + + os.makedirs(_dir, exist_ok=True) + + with open(_path, 'wb') as f: + f.write(_recording_state['bursts'].tobytes()) + + _duration = time() - _recording_state['start_time'] + logger.info('(GRABACION) Grabacion guardada: %s (%d frames, %.1f segundos, SUB: %s)', _path, _recording_state['frames'], _duration, int_id(_recording_state['rf_src'])) + + _recording_state['active'] = False + _recording_state['stream_id'] = None + _recording_state['bursts'] = bitarray(endian='big') + _recording_state['frames'] = 0 + _recording_state['rf_src'] = None + +def _announcementSendBroadcast(_targets, _pkts, _pkt_idx, _source_id, _dst_id, _tg, _ts, _ann_num=1, _next_time=None): + global _announcement_running + + _label = 'LOCUCION' if _ann_num == 1 else 'LOCUCION-{}'.format(_ann_num) + + if _pkt_idx >= len(_pkts): + for _t in _targets: + try: + for _sid in list(_t['sys_obj'].STATUS.keys()): + if _sid not in (1, 2): + del _t['sys_obj'].STATUS[_sid] + except: + pass + _announcement_running[_ann_num] = False + logger.info('(%s) Broadcast complete: %s packets sent to %s systems', _label, len(_pkts), len(_targets)) + return + + _now = time() + pkt = _pkts[_pkt_idx] + _stream_id = pkt[16:20] + + for _t in _targets: + try: + _sys_obj = _t['sys_obj'] + _slot = _t['slot'] + if _stream_id not in _sys_obj.STATUS: + _sys_obj.STATUS[_stream_id] = { + 'START': _now, + 'CONTENTION':False, + 'RFS': _source_id, + 'TGID': _dst_id, + 'LAST': _now + } + _slot['TX_TGID'] = _dst_id + else: + _sys_obj.STATUS[_stream_id]['LAST'] = _now + _slot['TX_TIME'] = _now + _sys_obj.send_system(pkt) + except Exception as e: + logger.error('(%s) Error sending packet %s to %s: %s', _label, _pkt_idx, _t['name'], e) + + _elapsed = time() - _now + if _next_time is None: + _next_time = _now + _FRAME_INTERVAL + else: + _next_time = _next_time + _FRAME_INTERVAL + _delay = max(0.001, _next_time - time()) + + if _pkt_idx < 3: + logger.debug('(%s) Packet %s/%s broadcast to %s systems (proc: %.1fms, delay: %.1fms)', _label, _pkt_idx + 1, len(_pkts), len(_targets), _elapsed * 1000, _delay * 1000) + + reactor.callLater(_delay, _announcementSendBroadcast, _targets, _pkts, _pkt_idx + 1, _source_id, _dst_id, _tg, _ts, _ann_num, _next_time) + +def scheduledAnnouncement(_ann_num=1): + global _announcement_last_hour, _announcement_running + + _prefix = 'ANNOUNCEMENT' if _ann_num == 1 else 'ANNOUNCEMENT{}'.format(_ann_num) + _label = 'LOCUCION' if _ann_num == 1 else 'LOCUCION-{}'.format(_ann_num) + + if not CONFIG['GLOBAL']['{}_ENABLED'.format(_prefix)]: + return + + if _announcement_running[_ann_num]: + logger.debug('(%s) Previous announcement still running, skipping', _label) + return + + _mode = CONFIG['GLOBAL']['{}_MODE'.format(_prefix)] + + if _mode == 'hourly': + _now = datetime.now() + if _now.minute != 0: + return + if _now.hour == _announcement_last_hour[_ann_num]: + return + _announcement_last_hour[_ann_num] = _now.hour + + _file = CONFIG['GLOBAL']['{}_FILE'.format(_prefix)] + _tg = CONFIG['GLOBAL']['{}_TG'.format(_prefix)] + _timeslot = CONFIG['GLOBAL']['{}_TIMESLOT'.format(_prefix)] + _lang = CONFIG['GLOBAL']['{}_LANGUAGE'.format(_prefix)] + _slot_index = 2 if _timeslot == 2 else 1 + _dst_id = bytes_3(_tg) + _source_id = bytes_3(5000) + _peer_id = CONFIG['GLOBAL']['SERVER_ID'] + + logger.info('(%s) Playing file: %s to TG %s TS%s (mode: %s, lang: %s)', _label, _file, _tg, _timeslot, _mode, _lang) + + _say = [] + try: + _say.append(AMBEobj.readSingleFile(''.join(['/', _lang, '/ondemand/', str(_file), '.ambe']))) + except IOError: + logger.warning('(%s) Cannot read AMBE file: Audio/%s/ondemand/%s.ambe', _label, _lang, _file) + return + except Exception as e: + logger.error('(%s) Error reading AMBE file: %s', _label, e) + return + + logger.debug('(%s) AMBE file loaded, %s words', _label, len(_say)) + + _excluded = ['ECHO', 'D-APRS'] + _targets = [] + + for _sn in list(systems.keys()): + if _sn in _excluded or any(_sn.startswith(ex + '-') for ex in _excluded): + continue + if _sn not in CONFIG['SYSTEMS']: + continue + if CONFIG['SYSTEMS'][_sn]['MODE'] != 'MASTER': + continue + if 'PEERS' not in CONFIG['SYSTEMS'][_sn]: + continue + + _has_peers = False + try: + for _pid in CONFIG['SYSTEMS'][_sn]['PEERS']: + if CONFIG['SYSTEMS'][_sn]['PEERS'][_pid]['CALLSIGN']: + _has_peers = True + break + except (KeyError, TypeError, RuntimeError): + continue + + if not _has_peers: + continue + + if _sn not in systems: + continue + + _slot = systems[_sn].STATUS[_slot_index] + if (_slot['RX_TYPE'] != HBPF_SLT_VTERM) or (_slot['TX_TYPE'] != HBPF_SLT_VTERM): + logger.debug('(%s) System %s busy, skipping', _label, _sn) + continue + + _targets.append({ + 'sys_obj': systems[_sn], + 'name': _sn, + 'slot': _slot + }) + + if not _targets: + logger.info('(%s) No systems with connected peers to send to', _label) + return + + _pkts = list(pkt_gen(_source_id, _dst_id, _peer_id, _slot_index - 1, _say)) + _sys_names = ', '.join([t['name'] for t in _targets[:5]]) + if len(_targets) > 5: + _sys_names += ', ... +{}'.format(len(_targets) - 5) + logger.info('(%s) Broadcasting %s packets to %s systems simultaneously: %s', _label, len(_pkts), len(_targets), _sys_names) + _announcement_running[_ann_num] = True + reactor.callLater(1.0, _announcementSendBroadcast, _targets, _pkts, 0, _source_id, _dst_id, _tg, _timeslot, _ann_num) + def bridge_reset(): logger.debug('(BRIDGERESET) Running bridge resetter') for _system in CONFIG['SYSTEMS']: @@ -2609,6 +2842,8 @@ class routerHBP(HBSYSTEM): self.STATUS[_slot]['lastSeq'] = _seq #Save this packet self.STATUS[_slot]['lastData'] = _data + + _handleRecording(dmrpkt, _frame_type, _dtype_vseq, _stream_id, pkt_time, _rf_src, _int_dst_id, _slot) ### MODIFIED: Prioritize routing for the TGID that just created a bridge _sysIgnore = deque() @@ -2841,6 +3076,7 @@ if __name__ == '__main__': if cli_args.LOG_LEVEL: CONFIG['LOGGER']['LOG_LEVEL'] = cli_args.LOG_LEVEL logger = log.config_logging(CONFIG['LOGGER']) + logger.info('\n\nCopyright (c) 2026 Joaquin Madrid Belando, EA5GVK ea5gvk@gmail.com') logger.info('\n\nCopyright (c) 2024-2026 Esteban Mackay, HP3ICC setcom40@gmail.com') logger.info('\n\nCopyright (c) 2020-2023 Simon G7RZU simon@gb7fr.org.uk') logger.info('Copyright (c) 2013, 2014, 2015, 2016, 2018, 2019\n\tThe Regents of the K0USY Group. All rights reserved.\n') @@ -3126,6 +3362,28 @@ if __name__ == '__main__': killserver_task = task.LoopingCall(kill_server) killserver = killserver_task.start(5) killserver.addErrback(loopingErrHandle) + + for _ann_num in range(1, 5): + _prefix = 'ANNOUNCEMENT' if _ann_num == 1 else 'ANNOUNCEMENT{}'.format(_ann_num) + _label = 'LOCUCION' if _ann_num == 1 else 'LOCUCION-{}'.format(_ann_num) + if CONFIG['GLOBAL']['{}_ENABLED'.format(_prefix)]: + _ann_mode = CONFIG['GLOBAL']['{}_MODE'.format(_prefix)] + if _ann_mode == 'hourly': + _ann_check_interval = 30 + else: + _ann_check_interval = CONFIG['GLOBAL']['{}_INTERVAL'.format(_prefix)] + _ann_task = task.LoopingCall(scheduledAnnouncement, _ann_num) + _ann_def = _ann_task.start(_ann_check_interval, now=False) + _ann_def.addErrback(loopingErrHandle) + logger.info('(%s) Scheduled announcements enabled - mode: %s, file: %s, TG: %s, TS: %s, lang: %s', + _label, + _ann_mode, + CONFIG['GLOBAL']['{}_FILE'.format(_prefix)], + CONFIG['GLOBAL']['{}_TG'.format(_prefix)], + CONFIG['GLOBAL']['{}_TIMESLOT'.format(_prefix)], + CONFIG['GLOBAL']['{}_LANGUAGE'.format(_prefix)]) + if _ann_mode == 'interval': + logger.info('(%s) Interval: every %s seconds', _label, _ann_check_interval) #Security downloads from central server init_security_downloads(CONFIG)