Update bridge_master.py

pull/26/head
Joaquin Madrid Belando 3 weeks ago committed by GitHub
parent acfd947ada
commit 962e7e3374
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -36,6 +36,7 @@ This program currently only works with group voice calls.
# Python modules we need
import sys
import os
from bitarray import bitarray
from time import time,sleep,perf_counter
import importlib.util
@ -79,7 +80,7 @@ from i8n_voice_map import voiceMap
# Stuff for socket reporting
import pickle
# REMOVE LATER from datetime import datetime
from datetime import datetime
# The module needs logging, but handlers, etc. are controlled by the parent
import logging
logger = logging.getLogger(__name__)
@ -99,8 +100,8 @@ __author__ = 'Cortney T. Buffington, N0MJS, Forked by Simon Adlem - G7RZU, F
__copyright__ = 'Copyright (c) 2016-2019 Cortney T. Buffington, N0MJS and the K0USY Group, Simon Adlem G7RZU 2020-2023, Esteban Mackay, HP3ICC 2024-2026'
__credits__ = 'Colin Durbridge, G4EML, Steve Zingman, N4IRS; Mike Zingman, N4IRR; Jonathan Naylor, G4KLX; Hans Barthen, DL5DI; Torsten Shultze, DG1HT; Jon Lee, G4TSN; Norman Williams, M6NBP, Eric Craw KF7EEL, Simon Adlem - G7RZU, Bruno Farias CS8ABG, Esteban Mackay HP3ICC, Joaquin Madrid Belando EA5GVK'
__license__ = 'GNU GPLv3'
__maintainer__ = 'Esteban Mackay, HP3ICC'
__email__ = 'setcom40@gmail.com'
__maintainer__ = 'Esteban Mackay, HP3ICC - Joaquin Madrid, EA5GVK'
__email__ = 'setcom40@gmail.com - ea5gvk@gmail.com'
#Set header bits
#used for slot rewrite and type rewrite
@ -873,6 +874,238 @@ def ident():
_pkt_time = time()
reactor.callFromThread(sendVoicePacket,systems[system],pkt,_source_id,_dst_id,_slot)
_announcement_last_hour = {1: -1, 2: -1, 3: -1, 4: -1}
_announcement_running = {1: False, 2: False, 3: False, 4: False}
_FRAME_INTERVAL = 0.054
_RECORDING_MAX_FRAMES = 2750
_recording_state = {
'active': False,
'stream_id': None,
'bursts': bitarray(endian='big'),
'start_time': 0,
'frames': 0,
'rf_src': None
}
def _handleRecording(dmrpkt, _frame_type, _dtype_vseq, _stream_id, pkt_time, _rf_src, _int_dst_id, _slot):
global _recording_state
if not CONFIG['GLOBAL']['RECORDING_ENABLED']:
return
if _int_dst_id != CONFIG['GLOBAL']['RECORDING_TG'] or _slot != CONFIG['GLOBAL']['RECORDING_TIMESLOT']:
return
if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD:
if _recording_state['active'] and _recording_state['stream_id'] != _stream_id:
logger.info('(GRABACION) Nueva transmision detectada, guardando grabacion anterior (%d frames)', _recording_state['frames'])
_saveRecording()
_recording_state['active'] = True
_recording_state['stream_id'] = _stream_id
_recording_state['bursts'] = bitarray(endian='big')
_recording_state['start_time'] = pkt_time
_recording_state['frames'] = 0
_recording_state['rf_src'] = _rf_src
logger.info('(GRABACION) Grabacion iniciada - SUB: %s, TG: %s, TS: %s', int_id(_rf_src), _int_dst_id, _slot)
return
if not _recording_state['active'] or _recording_state['stream_id'] != _stream_id:
return
if _frame_type in (HBPF_VOICE, HBPF_VOICE_SYNC):
_bits_data = bitarray(endian='big')
_bits_data.frombytes(dmrpkt)
_recording_state['bursts'].extend(_bits_data[:108])
_recording_state['bursts'].extend(_bits_data[156:264])
_recording_state['frames'] += 1
if _recording_state['frames'] >= _RECORDING_MAX_FRAMES:
logger.info('(GRABACION) Duracion maxima alcanzada (%d frames), guardando', _recording_state['frames'])
_saveRecording()
return
if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VTERM:
_saveRecording()
return
def _saveRecording():
global _recording_state
if not _recording_state['active'] or _recording_state['frames'] == 0:
_recording_state['active'] = False
_recording_state['stream_id'] = None
logger.warning('(GRABACION) No hay frames grabados, descartando')
return
_lang = CONFIG['GLOBAL']['RECORDING_LANGUAGE']
_file = CONFIG['GLOBAL']['RECORDING_FILE']
_dir = './Audio/{}/ondemand/'.format(_lang)
_path = _dir + _file + '.ambe'
os.makedirs(_dir, exist_ok=True)
with open(_path, 'wb') as f:
f.write(_recording_state['bursts'].tobytes())
_duration = time() - _recording_state['start_time']
logger.info('(GRABACION) Grabacion guardada: %s (%d frames, %.1f segundos, SUB: %s)', _path, _recording_state['frames'], _duration, int_id(_recording_state['rf_src']))
_recording_state['active'] = False
_recording_state['stream_id'] = None
_recording_state['bursts'] = bitarray(endian='big')
_recording_state['frames'] = 0
_recording_state['rf_src'] = None
def _announcementSendBroadcast(_targets, _pkts, _pkt_idx, _source_id, _dst_id, _tg, _ts, _ann_num=1, _next_time=None):
global _announcement_running
_label = 'LOCUCION' if _ann_num == 1 else 'LOCUCION-{}'.format(_ann_num)
if _pkt_idx >= len(_pkts):
for _t in _targets:
try:
for _sid in list(_t['sys_obj'].STATUS.keys()):
if _sid not in (1, 2):
del _t['sys_obj'].STATUS[_sid]
except:
pass
_announcement_running[_ann_num] = False
logger.info('(%s) Broadcast complete: %s packets sent to %s systems', _label, len(_pkts), len(_targets))
return
_now = time()
pkt = _pkts[_pkt_idx]
_stream_id = pkt[16:20]
for _t in _targets:
try:
_sys_obj = _t['sys_obj']
_slot = _t['slot']
if _stream_id not in _sys_obj.STATUS:
_sys_obj.STATUS[_stream_id] = {
'START': _now,
'CONTENTION':False,
'RFS': _source_id,
'TGID': _dst_id,
'LAST': _now
}
_slot['TX_TGID'] = _dst_id
else:
_sys_obj.STATUS[_stream_id]['LAST'] = _now
_slot['TX_TIME'] = _now
_sys_obj.send_system(pkt)
except Exception as e:
logger.error('(%s) Error sending packet %s to %s: %s', _label, _pkt_idx, _t['name'], e)
_elapsed = time() - _now
if _next_time is None:
_next_time = _now + _FRAME_INTERVAL
else:
_next_time = _next_time + _FRAME_INTERVAL
_delay = max(0.001, _next_time - time())
if _pkt_idx < 3:
logger.debug('(%s) Packet %s/%s broadcast to %s systems (proc: %.1fms, delay: %.1fms)', _label, _pkt_idx + 1, len(_pkts), len(_targets), _elapsed * 1000, _delay * 1000)
reactor.callLater(_delay, _announcementSendBroadcast, _targets, _pkts, _pkt_idx + 1, _source_id, _dst_id, _tg, _ts, _ann_num, _next_time)
def scheduledAnnouncement(_ann_num=1):
global _announcement_last_hour, _announcement_running
_prefix = 'ANNOUNCEMENT' if _ann_num == 1 else 'ANNOUNCEMENT{}'.format(_ann_num)
_label = 'LOCUCION' if _ann_num == 1 else 'LOCUCION-{}'.format(_ann_num)
if not CONFIG['GLOBAL']['{}_ENABLED'.format(_prefix)]:
return
if _announcement_running[_ann_num]:
logger.debug('(%s) Previous announcement still running, skipping', _label)
return
_mode = CONFIG['GLOBAL']['{}_MODE'.format(_prefix)]
if _mode == 'hourly':
_now = datetime.now()
if _now.minute != 0:
return
if _now.hour == _announcement_last_hour[_ann_num]:
return
_announcement_last_hour[_ann_num] = _now.hour
_file = CONFIG['GLOBAL']['{}_FILE'.format(_prefix)]
_tg = CONFIG['GLOBAL']['{}_TG'.format(_prefix)]
_timeslot = CONFIG['GLOBAL']['{}_TIMESLOT'.format(_prefix)]
_lang = CONFIG['GLOBAL']['{}_LANGUAGE'.format(_prefix)]
_slot_index = 2 if _timeslot == 2 else 1
_dst_id = bytes_3(_tg)
_source_id = bytes_3(5000)
_peer_id = CONFIG['GLOBAL']['SERVER_ID']
logger.info('(%s) Playing file: %s to TG %s TS%s (mode: %s, lang: %s)', _label, _file, _tg, _timeslot, _mode, _lang)
_say = []
try:
_say.append(AMBEobj.readSingleFile(''.join(['/', _lang, '/ondemand/', str(_file), '.ambe'])))
except IOError:
logger.warning('(%s) Cannot read AMBE file: Audio/%s/ondemand/%s.ambe', _label, _lang, _file)
return
except Exception as e:
logger.error('(%s) Error reading AMBE file: %s', _label, e)
return
logger.debug('(%s) AMBE file loaded, %s words', _label, len(_say))
_excluded = ['ECHO', 'D-APRS']
_targets = []
for _sn in list(systems.keys()):
if _sn in _excluded or any(_sn.startswith(ex + '-') for ex in _excluded):
continue
if _sn not in CONFIG['SYSTEMS']:
continue
if CONFIG['SYSTEMS'][_sn]['MODE'] != 'MASTER':
continue
if 'PEERS' not in CONFIG['SYSTEMS'][_sn]:
continue
_has_peers = False
try:
for _pid in CONFIG['SYSTEMS'][_sn]['PEERS']:
if CONFIG['SYSTEMS'][_sn]['PEERS'][_pid]['CALLSIGN']:
_has_peers = True
break
except (KeyError, TypeError, RuntimeError):
continue
if not _has_peers:
continue
if _sn not in systems:
continue
_slot = systems[_sn].STATUS[_slot_index]
if (_slot['RX_TYPE'] != HBPF_SLT_VTERM) or (_slot['TX_TYPE'] != HBPF_SLT_VTERM):
logger.debug('(%s) System %s busy, skipping', _label, _sn)
continue
_targets.append({
'sys_obj': systems[_sn],
'name': _sn,
'slot': _slot
})
if not _targets:
logger.info('(%s) No systems with connected peers to send to', _label)
return
_pkts = list(pkt_gen(_source_id, _dst_id, _peer_id, _slot_index - 1, _say))
_sys_names = ', '.join([t['name'] for t in _targets[:5]])
if len(_targets) > 5:
_sys_names += ', ... +{}'.format(len(_targets) - 5)
logger.info('(%s) Broadcasting %s packets to %s systems simultaneously: %s', _label, len(_pkts), len(_targets), _sys_names)
_announcement_running[_ann_num] = True
reactor.callLater(1.0, _announcementSendBroadcast, _targets, _pkts, 0, _source_id, _dst_id, _tg, _timeslot, _ann_num)
def bridge_reset():
logger.debug('(BRIDGERESET) Running bridge resetter')
for _system in CONFIG['SYSTEMS']:
@ -2609,6 +2842,8 @@ class routerHBP(HBSYSTEM):
self.STATUS[_slot]['lastSeq'] = _seq
#Save this packet
self.STATUS[_slot]['lastData'] = _data
_handleRecording(dmrpkt, _frame_type, _dtype_vseq, _stream_id, pkt_time, _rf_src, _int_dst_id, _slot)
### MODIFIED: Prioritize routing for the TGID that just created a bridge
_sysIgnore = deque()
@ -2841,6 +3076,7 @@ if __name__ == '__main__':
if cli_args.LOG_LEVEL:
CONFIG['LOGGER']['LOG_LEVEL'] = cli_args.LOG_LEVEL
logger = log.config_logging(CONFIG['LOGGER'])
logger.info('\n\nCopyright (c) 2026 Joaquin Madrid Belando, EA5GVK ea5gvk@gmail.com')
logger.info('\n\nCopyright (c) 2024-2026 Esteban Mackay, HP3ICC setcom40@gmail.com')
logger.info('\n\nCopyright (c) 2020-2023 Simon G7RZU simon@gb7fr.org.uk')
logger.info('Copyright (c) 2013, 2014, 2015, 2016, 2018, 2019\n\tThe Regents of the K0USY Group. All rights reserved.\n')
@ -3126,6 +3362,28 @@ if __name__ == '__main__':
killserver_task = task.LoopingCall(kill_server)
killserver = killserver_task.start(5)
killserver.addErrback(loopingErrHandle)
for _ann_num in range(1, 5):
_prefix = 'ANNOUNCEMENT' if _ann_num == 1 else 'ANNOUNCEMENT{}'.format(_ann_num)
_label = 'LOCUCION' if _ann_num == 1 else 'LOCUCION-{}'.format(_ann_num)
if CONFIG['GLOBAL']['{}_ENABLED'.format(_prefix)]:
_ann_mode = CONFIG['GLOBAL']['{}_MODE'.format(_prefix)]
if _ann_mode == 'hourly':
_ann_check_interval = 30
else:
_ann_check_interval = CONFIG['GLOBAL']['{}_INTERVAL'.format(_prefix)]
_ann_task = task.LoopingCall(scheduledAnnouncement, _ann_num)
_ann_def = _ann_task.start(_ann_check_interval, now=False)
_ann_def.addErrback(loopingErrHandle)
logger.info('(%s) Scheduled announcements enabled - mode: %s, file: %s, TG: %s, TS: %s, lang: %s',
_label,
_ann_mode,
CONFIG['GLOBAL']['{}_FILE'.format(_prefix)],
CONFIG['GLOBAL']['{}_TG'.format(_prefix)],
CONFIG['GLOBAL']['{}_TIMESLOT'.format(_prefix)],
CONFIG['GLOBAL']['{}_LANGUAGE'.format(_prefix)])
if _ann_mode == 'interval':
logger.info('(%s) Interval: every %s seconds', _label, _ann_check_interval)
#Security downloads from central server
init_security_downloads(CONFIG)

Loading…
Cancel
Save

Powered by TurnKey Linux.