diff --git a/bridge_master.py b/bridge_master.py index 865343a..00c06be 100644 --- a/bridge_master.py +++ b/bridge_master.py @@ -20,6 +20,7 @@ # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA ############################################################################### + ''' This application, in conjuction with it's rule file (rules.py) will work like a "conference bridge". This is similar to what most hams think of as a @@ -28,8 +29,10 @@ bridge will both receive traffic from, and send traffic to any other system joined to the same conference bridge. It does not provide end-to-end connectivity as each end system must individually be joined to a conference bridge (a name you create in the configuraiton file) to pass traffic. + This program currently only works with group voice calls. ''' + # Python modules we need import sys from bitarray import bitarray @@ -41,17 +44,21 @@ from setproctitle import setproctitle from collections import deque from random import randint import secrets + #from crccheck.crc import Crc32 from hashlib import blake2b + # Twisted is pretty important, so I keep it separate from twisted.internet.protocol import Factory, Protocol from twisted.protocols.basic import NetstringReceiver from twisted.internet import reactor, task from twisted.web.server import Site + #from spyne import Application #from spyne.server.twisted import TwistedWebResource #from spyne.protocol.http import HttpRpc #from spyne.protocol.json import JsonDocument + # Things we import from the main hblink module from hblink import HBSYSTEM, OPENBRIDGE, systems, hblink_handler, reportFactory, REPORT_OPCODES, mk_aliases, acl_check from dmr_utils3.utils import bytes_3, int_id, get_alias, bytes_4 @@ -63,10 +70,12 @@ from const import * from mk_voice import pkt_gen from utils import load_json, save_json #from voice_lib import words + #Read voices from read_ambe import readAMBE #Remap some words for certain languages from i8n_voice_map import voiceMap + # Stuff for socket reporting import pickle # REMOVE LATER from datetime import datetime @@ -74,16 +83,14 @@ import pickle import logging logger = logging.getLogger(__name__) -# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -# CONFIGURACIÓN DE ANTIBUCLE Y CONTROL DE SALTOS -MAX_HOPS = 15 # Máximo número de saltos permitidos en la malla -# <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< - #REGEX import re + from binascii import b2a_hex as ahex + from AMI import AMI #from API import FD_API, FD_APIUserDefinedContext + # Does anybody read this stuff? There's a PEP somewhere that says I should do this. __author__ = 'Cortney T. Buffington, N0MJS, Forked by Simon Adlem - G7RZU' __copyright__ = 'Copyright (c) 2016-2019 Cortney T. Buffington, N0MJS and the K0USY Group, Simon Adlem, G7RZU 2020,2021, 2022' @@ -95,13 +102,18 @@ __email__ = 'simon@gb7fr.org.uk' #Set header bits #used for slot rewrite and type rewrite def header(slot,call_type,bits): + if not bits: bits = 0b00100000 + bits = slot << 7 | bits + if call_type == 'unit': + bits = 0b00000011 | bits + return bits - + # Timed loop used for reporting HBP status # # REPORT BASED ON THE TYPE SELECTED IN THE MAIN CONFIG FILE @@ -117,26 +129,35 @@ def config_reports(_config, _factory): i = i +1 logger.info('(REPORT) %s systems have at least one peer',i) logger.info('(REPORT) Subscriber Map has %s entries',len(SUB_MAP)) + logger.info('(REPORT) HBlink TCP reporting server configured') + report_server = _factory(_config) report_server.clients = [] reactor.listenTCP(_config['REPORTS']['REPORT_PORT'], report_server) + reporting = task.LoopingCall(reporting_loop, logger, report_server) reporting.start(_config['REPORTS']['REPORT_INTERVAL']) + return report_server # Start API server def config_API(_config, _bridges): + application = Application([FD_API], tns='adn.api', in_protocol=HttpRpc(validator='soft'), out_protocol=JsonDocument() ) + def _on_method_call(ctx): ctx.udc = FD_APIUserDefinedContext(CONFIG,_bridges) + application.event_manager.add_listener('method_call', _on_method_call) + resource = TwistedWebResource(application) site = Site(resource) + r = reactor.listenTCP(8000, site, interface='0.0.0.0') return(r) @@ -151,6 +172,7 @@ def make_bridges(_rules): for _system in _rules[_bridge]: if _system['SYSTEM'] not in CONFIG['SYSTEMS']: sys.exit('ERROR: Conference bridge "{}" references a system named "{}" that is not enabled in the main configuration'.format(_bridge, _system['SYSTEM'])) + _system['TGID'] = bytes_3(_system['TGID']) for i, e in enumerate(_system['ON']): _system['ON'][i] = bytes_3(_system['ON'][i]) @@ -161,8 +183,10 @@ def make_bridges(_rules): _system['TIMER'] = time() + _system['TIMEOUT'] else: _system['TIMER'] = time() + # if _bridge[0:1] == '#': # continue + for _confsystem in CONFIG['SYSTEMS']: #if _confsystem[0:3] == 'OBP': if CONFIG['SYSTEMS'][_confsystem]['MODE'] != 'MASTER': @@ -184,6 +208,7 @@ def make_bridges(_rules): _tmout = CONFIG['SYSTEMS'][_confsystem]['DEFAULT_UA_TIMER'] if ts2 == False: _rules[_bridge].append({'SYSTEM': _confsystem, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [bytes_3(4000)],'ON': [],'RESET': [], 'TIMER': time()}) + return _rules ### MODIFIED: Updated to handle all special TGIDs (9990-9999) with a 1-minute timeout @@ -207,8 +232,10 @@ def make_single_bridge(_tgid,_sourcesystem,_slot,_tmout): else: BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [_tgid,],'RESET': [], 'TIMER': time()}) BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 2, 'TGID': _tgid,'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [_tgid,],'RESET': [], 'TIMER': time()}) + if _system[0:3] == 'OBP' and (int_id(_tgid) >= 79 and (int_id(_tgid) < 9990 or int_id(_tgid) > 9999)): BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'NONE','OFF': [],'ON': [],'RESET': [], 'TIMER': time()}) + ### END MODIFIED ### #Make static bridge - used for on-the-fly relay bridges @@ -221,6 +248,7 @@ def make_stat_bridge(_tgid): _tmout = CONFIG['SYSTEMS'][_system]['DEFAULT_UA_TIMER'] BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [_tgid,],'RESET': [], 'TIMER': time()}) BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 2, 'TGID': _tgid,'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [_tgid,],'RESET': [], 'TIMER': time()}) + if _system[0:3] == 'OBP': BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'STAT','OFF': [],'ON': [],'RESET': [], 'TIMER': time()}) @@ -236,8 +264,9 @@ def make_default_reflector(reflector,_tmout,system): bridgetemp.append({'SYSTEM': system, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': True,'TIMEOUT': _tmout * 60,'TO_TYPE': 'OFF','OFF': [],'ON': [bytes_3(reflector),],'RESET': [], 'TIMER': time() + (_tmout * 60)}) else: bridgetemp.append(bridgesystem) + BRIDGES[bridge] = bridgetemp - + def make_static_tg(tg,ts,_tmout,system): #_tmout = CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER'] if str(tg) not in BRIDGES: @@ -248,8 +277,9 @@ def make_static_tg(tg,ts,_tmout,system): bridgetemp.append({'SYSTEM': system, 'TS': ts, 'TGID': bytes_3(tg),'ACTIVE': True,'TIMEOUT': _tmout * 60,'TO_TYPE': 'OFF','OFF': [],'ON': [bytes_3(tg),],'RESET': [], 'TIMER': time() + (_tmout * 60)}) else: bridgetemp.append(bridgesystem) + BRIDGES[str(tg)] = bridgetemp - + def reset_static_tg(tg,ts,_tmout,system): #_tmout = CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER'] bridgetemp = deque() @@ -259,6 +289,7 @@ def reset_static_tg(tg,ts,_tmout,system): bridgetemp.append({'SYSTEM': system, 'TS': ts, 'TGID': bytes_3(tg),'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [bytes_3(tg),],'RESET': [], 'TIMER': time() + (_tmout * 60)}) else: bridgetemp.append(bridgesystem) + BRIDGES[str(tg)] = bridgetemp except KeyError: logger.exception('(%s) KeyError in reset_static_tg() - bridge gone away? TG: %s',system,tg) @@ -275,7 +306,7 @@ def reset_all_reflector_system(_tmout,system): else: bridgetemp.append(bridgesystem) BRIDGES[bridge] = bridgetemp - + ### MODIFIED: Updated to handle all special TGIDs (9990-9999) with a 1-minute timeout def make_single_reflector(_tgid,_tmout,_sourcesystem): _tgid_s = str(int_id(_tgid)) @@ -294,6 +325,7 @@ def make_single_reflector(_tgid,_tmout,_sourcesystem): BRIDGES[_bridge].append({'SYSTEM': _system, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': False,'TIMEOUT': CONFIG['SYSTEMS'][_system]['DEFAULT_UA_TIMER'] * 60,'TO_TYPE': 'ON','OFF': [],'ON': [_tgid,],'RESET': [], 'TIMER': time()}) if _system[0:3] == 'OBP' and (int_id(_tgid) >= 79 and (int_id(_tgid) < 9990 or int_id(_tgid) > 9999)): BRIDGES[_bridge].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'NONE','OFF': [],'ON': [],'RESET': [], 'TIMER': time()}) + ### END MODIFIED ### def remove_bridge_system(system): @@ -309,10 +341,11 @@ def remove_bridge_system(system): if _bridge not in _bridgestemp: _bridgestemp[_bridge] = [] _bridgestemp[_bridge].append({'SYSTEM': system, 'TS': _bridgesystem['TS'], 'TGID': _bridgesystem['TGID'],'ACTIVE': False,'TIMEOUT': _bridgesystem['TIMEOUT'],'TO_TYPE': 'ON','OFF': [],'ON': [_bridgesystem['TGID'],],'RESET': [], 'TIMER': time() + _bridgesystem['TIMEOUT']}) + BRIDGES.update(_bridgestemp) def deactivate_all_dynamic_bridges(system_name): - """Desactiva todos los bridges dinámicos (no estáticos, no reflectores) de un sistema.""" + """Desactiva todos los bridges dinámicos (no estáticos, no reflectores) de un sistema.""" for _bridge in BRIDGES: if _bridge[0:1] == '#': # Saltar reflectores continue @@ -322,16 +355,19 @@ def deactivate_all_dynamic_bridges(system_name): _sys_entry['ACTIVE'] = False logger.info('(ROUTER) Deactivated dynamic bridge due to TG/ID 4000: System: %s, Bridge: %s, TS: %s, TGID: %s', system_name, _bridge, _sys_entry['TS'], int_id(_sys_entry['TGID'])) - + ### MODIFIED: Core logic updated to handle special TGIDs (9990-9999) correctly with SINGLE_MODE def rule_timer_loop(): logger.debug('(ROUTER) routerHBP Rule timer loop started') _now = time() _remove_bridges = deque() - # Mantener registro de bridges dinámicos activos por sistema + + # Mantener registro de bridges dinámicos activos por sistema _active_dynamic_bridges = {} + for _bridge in BRIDGES: _bridge_used = False + ### MODIFIED: Detect special TGIDs (9990-9999) to exclude them from infinite timer logic _is_special_tg = False if _bridge[0:1] != '#': # No es un reflector @@ -342,19 +378,22 @@ def rule_timer_loop(): except ValueError: pass ### END MODIFIED ### + for _system in BRIDGES[_bridge]: _system_config = CONFIG['SYSTEMS'][_system['SYSTEM']] _is_single_mode = _system_config.get('SINGLE_MODE', False) - # Si SINGLE_MODE está DESACTIVADO y es bridge dinámico, usar timer infinito + + # Si SINGLE_MODE está DESACTIVADO y es bridge dinámico, usar timer infinito _is_dynamic_bridge = _bridge[0:1] != '#' and _system['TO_TYPE'] != 'STAT' + ### MODIFIED: Added 'and not _is_special_tg' to the condition if not _is_single_mode and _is_dynamic_bridge and _system['SYSTEM'][0:3] != 'OBP' and not _is_special_tg: ### END MODIFIED ### - # SINGLE MODE DESACTIVADO - Timer infinito para bridges dinámicos + # SINGLE MODE DESACTIVADO - Timer infinito para bridges dinámicos if _system['TO_TYPE'] == 'ON': if _system['ACTIVE'] == True: _bridge_used = True - # Registrar bridge dinámico activo + # Registrar bridge dinámico activo if _system['SYSTEM'] not in _active_dynamic_bridges: _active_dynamic_bridges[_system['SYSTEM']] = [] _active_dynamic_bridges[_system['SYSTEM']].append((_bridge, _system)) @@ -371,7 +410,7 @@ def rule_timer_loop(): _bridge_used = True logger.debug('(ROUTER) Conference Bridge ACTIVE (no change): System: %s Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) else: - # COMPORTAMIENTO ORIGINAL (SINGLE MODE ACTIVADO o bridges estáticos o TGIDs especiales) + # COMPORTAMIENTO ORIGINAL (SINGLE MODE ACTIVADO o bridges estáticos o TGIDs especiales) if _system['TO_TYPE'] == 'ON': if _system['ACTIVE'] == True: _bridge_used = True @@ -405,13 +444,17 @@ def rule_timer_loop(): elif _system['SYSTEM'][0:3] == 'OBP' and _system['TO_TYPE'] == 'STAT': _bridge_used = True logger.debug('(ROUTER) Conference Bridge NO ACTION: System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) + if _bridge_used == False: _remove_bridges.append(_bridge) + for _bridgerem in _remove_bridges: del BRIDGES[_bridgerem] logger.debug('(ROUTER) Unused conference bridge %s removed',_bridgerem) + if CONFIG['REPORTS']['REPORT']: report_server.send_clients(b'bridge updated') + ### END MODIFIED ### def statTrimmer(): @@ -440,10 +483,12 @@ def bridgeDebug(): logger.debug('(BRIDGEDEBUG) Running bridge debug') _rst_time = time() statroll = 0 + #Kill off any bridges that should nnot exist, ever. for b in ['0','1','2','3','4','5','6','7','8','9']: BRIDGES.pop(b,None) BRIDGES.pop('#'+b, None) + for system in CONFIG['SYSTEMS']: bridgeroll = 0 dialroll = 0 @@ -458,10 +503,12 @@ def bridgeDebug(): activeroll += 1 else: activeroll += 1 + if enabled_system['TO_TYPE'] == 'STAT': statroll += 1 if bridgeroll: logger.debug('(BRIDGEDEBUG) system %s has %s bridges of which %s are in an ACTIVE state', system, bridgeroll, activeroll) + if dialroll > 1 and CONFIG['SYSTEMS'][system]['MODE'] == 'MASTER': logger.warning('(BRIDGEDEBUG) system %s has more than one active dial bridge (%s) - fixing',system, dialroll) times = {} @@ -481,11 +528,13 @@ def bridgeDebug(): _setbridge = int(_bridge[1:]) else: _setbridge = int(_bridge) + if bridgesystem['SYSTEM'] == system and bridgesystem['TS'] == 2: bridgetemp.append({'SYSTEM': system, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [bytes_3(_setbridge),],'RESET': [], 'TIMER': _rst_time + (_tmout * 60)}) else: bridgetemp.append(bridgesystem) BRIDGES[_bridge] = bridgetemp + logger.info('(BRIDGEDEBUG) The server currently has %s STATic bridges',statroll) def kaReporting(): @@ -516,6 +565,7 @@ def SubMapTrimmer(): for _subscriber in SUB_MAP: if SUB_MAP[_subscriber][2] < (_sub_time - 86400): _remove_list.append(_subscriber) + for _remove in _remove_list: SUB_MAP.pop(_remove) if CONFIG['ALIASES']['SUB_MAP_FILE']: @@ -525,11 +575,13 @@ def SubMapTrimmer(): def stream_trimmer_loop(): logger.debug('(ROUTER) Trimming inactive stream IDs from system lists') _now = time() + for system in systems: # HBP systems, master and peer if CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE': for slot in range(1,3): _slot = systems[system].STATUS[slot] + # RX slot check if _slot['RX_TYPE'] != HBPF_SLT_VTERM and _slot['RX_TIME'] < _now - 5: _slot['RX_TYPE'] = HBPF_SLT_VTERM @@ -545,6 +597,7 @@ def stream_trimmer_loop(): #Null stream_id - for loop control if _slot['RX_TIME'] < _now - 60: _slot['RX_STREAM_ID'] = b'\x00' + # TX slot check if _slot['TX_TYPE'] != HBPF_SLT_VTERM and _slot['TX_TIME'] < _now - 5: _slot['TX_TYPE'] = HBPF_SLT_VTERM @@ -552,17 +605,20 @@ def stream_trimmer_loop(): system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_RFS']), int_id(_slot['TX_TGID']), slot, _slot['TX_TIME'] - _slot['TX_START']) if CONFIG['REPORTS']['REPORT']: systems[system]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_PEER']), int_id(_slot['TX_RFS']), slot, int_id(_slot['TX_TGID']), _slot['TX_TIME'] - _slot['TX_START']).encode(encoding='utf-8', errors='ignore')) + # OBP systems - # We can't delete items from a dicationary that's being iterated, so we have to make a temporarly list of entrys to remove later + # We can't delete items from a dicationry that's being iterated, so we have to make a temporarly list of entrys to remove later if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': remove_list = deque() fin_list = deque() for stream_id in systems[system].STATUS: + #if stream already marked as finished, just remove it if '_fin' in systems[system].STATUS[stream_id] and systems[system].STATUS[stream_id]['LAST'] < _now - 180: logger.debug('(%s) *FINISHED STREAM* STREAM ID: %s',system, int_id(stream_id)) fin_list.append(stream_id) continue + try: if '_to' not in systems[system].STATUS[stream_id] and '_fin' not in systems[system].STATUS[stream_id] and systems[system].STATUS[stream_id]['LAST'] < _now - 5: _stream = systems[system].STATUS[stream_id] @@ -585,6 +641,7 @@ def stream_trimmer_loop(): else: logger.debug('(%s) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %.2f', \ system, int_id(stream_id), get_alias(int_id(_stream['RFS']), subscriber_ids), get_alias(int_id(_stream['RX_PEER']), peer_ids), get_alias(int_id(_stream['TGID']), talkgroup_ids), _stream['LAST'] - _stream['START']) + if CONFIG['REPORTS']['REPORT']: systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_stream['RX_PEER']), int_id(_stream['RFS']), 1, int_id(_stream['TGID']), _stream['LAST'] - _stream['START']).encode(encoding='utf-8', errors='ignore')) systems[system].STATUS[stream_id]['_to'] = True @@ -593,6 +650,7 @@ def stream_trimmer_loop(): logger.exception("(%s) Keyerror - stream trimmer Stream ID: %s",system,stream_id, exc_info=e) systems[system].STATUS[stream_id]['LAST'] = _now continue + try: if systems[system].STATUS[stream_id]['LAST'] < _now - 180: remove_list.append(stream_id) @@ -600,14 +658,18 @@ def stream_trimmer_loop(): logger.exception("(%s) Keyerror - stream trimmer Stream ID: %s",system,stream_id, exc_info=e) systems[system].STATUS[stream_id]['LAST'] = _now continue + #remove finished for stream_id in fin_list: removed = systems[system].STATUS.pop(stream_id) + for stream_id in remove_list: if stream_id in systems[system].STATUS: _stream = systems[system].STATUS[stream_id] _sysconfig = CONFIG['SYSTEMS'][system] + removed = systems[system].STATUS.pop(stream_id) + try: _bcsq_remove = deque() for tgid in _sysconfig['_bcsq']: @@ -635,8 +697,9 @@ def sendVoicePacket(self,pkt,_source_id,_dest_id,_slot): else: systems[system].STATUS[_stream_id]['LAST'] = _pkt_time _slot['TX_TIME'] = _pkt_time + self.send_system(pkt) - + def sendSpeech(self,speech): logger.debug('(%s) Inside sendspeech thread',self._system) sleep(1) @@ -651,6 +714,7 @@ def sendSpeech(self,speech): #Packet every 60ms sleep(0.058) reactor.callFromThread(sendVoicePacket,self,pkt,_source_id,_nine,_slot) + logger.debug('(%s) Sendspeech thread ended',self._system) def disconnectedVoice(system): @@ -667,13 +731,17 @@ def disconnectedVoice(system): _say.append(words[_lang]['to']) _say.append(words[_lang]['silence']) _say.append(words[_lang]['silence']) + for number in str(CONFIG['SYSTEMS'][system]['DEFAULT_REFLECTOR']): _say.append(words[_lang][number]) _say.append(words[_lang]['silence']) else: _say.append(words[_lang]['notlinked']) + _say.append(words[_lang]['silence']) + speech = pkt_gen(_source_id, _nine, bytes_4(9), 1, _say) + sleep(1) _slot = systems[system].STATUS[2] while True: @@ -719,17 +787,17 @@ def playFileOnRequest(self,fileNumber): def threadIdent(): logger.debug('(IDENT) starting ident thread') reactor.callInThread(ident) - + def threadAlias(): logger.debug('(ALIAS) starting alias thread') reactor.callInThread(aliasb) def setAlias(_peer_ids,_subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums): peer_ids, subscriber_ids, talkgroup_ids,local_subscriber_ids,server_ids,checksums = _peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids,_server_ids,_checksums - + def aliasb(): _peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums = mk_aliases(CONFIG) - reactor.callInThread(setAlias,_peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums) + reactor.callFromThread(setAlias,_peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums) def ident(): for system in systems: @@ -752,7 +820,9 @@ def ident(): if (_slot['RX_TYPE'] == HBPF_SLT_VTERM) and (_slot['TX_TYPE'] == HBPF_SLT_VTERM) and (time() - _slot['TX_TIME'] > 30 and time() - _slot['RX_TIME'] > 30): _all_call = bytes_3(16777215) _source_id= bytes_3(5000) + _dst_id = b'' + if 'OVERRIDE_IDENT_TG' in CONFIG['SYSTEMS'][system] and CONFIG['SYSTEMS'][system]['OVERRIDE_IDENT_TG'] and int(CONFIG['SYSTEMS'][system]['OVERRIDE_IDENT_TG']) > 0 and int(CONFIG['SYSTEMS'][system]['OVERRIDE_IDENT_TG'] < 16777215): _dst_id = bytes_3(CONFIG['SYSTEMS'][system]['OVERRIDE_IDENT_TG']) else: @@ -769,6 +839,7 @@ def ident(): _say.append(words[_lang]['silence']) _say.append(words[_lang]['silence']) _say.append(words[_lang]['silence']) + _systemcs = re.sub(r'\W+', '', _callsign) _systemcs.upper() for character in _systemcs: @@ -779,9 +850,12 @@ def ident(): _say.append(words[_lang]['silence']) _say.append(words[_lang]['silence']) _say.append(words[_lang]['silence']) + _say.append(words[_lang]['adn']) + _peer_id = CONFIG['GLOBAL']['SERVER_ID'] speech = pkt_gen(_source_id, _dst_id, _peer_id, 1, _say) + sleep(1) _slot = systems[system].STATUS[2] while True: @@ -791,6 +865,7 @@ def ident(): break #Packet every 60ms sleep(0.058) + _stream_id = pkt[16:20] _pkt_time = time() reactor.callFromThread(sendVoicePacket,systems[system],pkt,_source_id,_dst_id,_slot) @@ -810,7 +885,9 @@ def bridge_reset(): def options_config(): logger.debug('(OPTIONS) Running options parser') + prohibitedTGs = [0,1,2,3,4,5,9,9990,9991,9992,9993,9994,9995,9996,9997,9998,9999] + for _system in CONFIG['SYSTEMS']: try: if CONFIG['SYSTEMS'][_system]['MODE'] != 'MASTER': @@ -830,6 +907,7 @@ def options_config(): continue _options[k] = v logger.debug('(OPTIONS) Options found for %s',_system) + if '_opt_key' in CONFIG['SYSTEMS'][_system] and CONFIG['SYSTEMS'][_system]['_opt_key']: if 'KEY' not in _options: logger.debug('(OPTIONS) %s, options key set but no key in options string, skipping',_system) @@ -857,6 +935,7 @@ def options_config(): _options['OVERRIDE_IDENT_TG'] = _options.pop('VOICETG') if 'IDENT' in _options: _options['VOICE'] = _options.pop('IDENT') + #DMR+ style options if 'StartRef' in _options: _options['DEFAULT_REFLECTOR'] = _options.pop('StartRef') @@ -898,6 +977,7 @@ def options_config(): _options['TS2_STATIC'] = ''.join([_options['TS2_STATIC'],',',_options.pop('TS2_8')]) if 'TS2_9' in _options: _options['TS2_STATIC'] = ''.join([_options['TS2_STATIC'],',',_options.pop('TS2_9')]) + if 'UserLink' in _options: _options.pop('UserLink') if 'TS1_STATIC' not in _options: @@ -948,6 +1028,7 @@ def options_config(): if isinstance(_options['DEFAULT_UA_TIMER'], str) and not _options['DEFAULT_UA_TIMER'].isdigit(): logger.debug('(OPTIONS) %s - DEFAULT_UA_TIMER is not an integer, ignoring',_system) continue + #if the UA timer is set to 0 - actually set it to (close to) maximum size of a 32 #bit signed int - which works out at around 68 years! #For all practical purposes, this implements an unlimited timer - aka sticky static. @@ -973,6 +1054,7 @@ def options_config(): else: if ts2 == False: BRIDGES[_bridge].append({'SYSTEM': _system, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [bytes_3(4000)],'ON': [],'RESET': [], 'TIMER': time()}) + if int(_options['DEFAULT_REFLECTOR']) != CONFIG['SYSTEMS'][_system]['DEFAULT_REFLECTOR']: if int(_options['DEFAULT_REFLECTOR']) > 0: logger.debug('(OPTIONS) %s default reflector changed, updating',_system) @@ -980,9 +1062,11 @@ def options_config(): make_default_reflector(int(_options['DEFAULT_REFLECTOR']),_tmout,_system) elif int(_options['DEFAULT_REFLECTOR']) in prohibitedTGs and not bool(_options['DEFAULT_REFLECTOR']): logger.debug('(OPTIONS) %s default reflector is prohibited, ignoring change',_system) + else: logger.debug('(OPTIONS) %s default reflector disabled, updating',_system) reset_all_reflector_system(_tmout,_system) + ts1 = [] if _options['TS1_STATIC'] != CONFIG['SYSTEMS'][_system]['TS1_STATIC']: _tmout = int(_options['DEFAULT_UA_TIMER']) @@ -1026,6 +1110,7 @@ def options_config(): continue tg = int(tg) make_static_tg(tg,2,_tmout,_system) + CONFIG['SYSTEMS'][_system]['TS1_STATIC'] = _options['TS1_STATIC'] CONFIG['SYSTEMS'][_system]['TS2_STATIC'] = _options['TS2_STATIC'] CONFIG['SYSTEMS'][_system]['DEFAULT_REFLECTOR'] = int(_options['DEFAULT_REFLECTOR']) @@ -1034,11 +1119,13 @@ def options_config(): logger.exception('(OPTIONS) caught exception: %s',e) continue + class routerOBP(OPENBRIDGE): + def __init__(self, _name, _config, _report): OPENBRIDGE.__init__(self, _name, _config, _report) self.STATUS = {} - + def get_rptr(self,_sid): _int_peer_id = int_id(_sid) if _int_peer_id in local_subscriber_ids: @@ -1049,7 +1136,7 @@ class routerOBP(OPENBRIDGE): return peer_ids[_int_peer_id] else: return _int_peer_id - + def to_target(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,_noOBP,sysIgnore, _hops = b'', _source_server = b'\x00\x00\x00\x00', _ber = b'\x00', _rssi = b'\x00', _source_rptr = b'\x00\x00\x00\x00'): _sysIgnore = sysIgnore for _target in BRIDGES[_bridge]: @@ -1057,45 +1144,52 @@ class routerOBP(OPENBRIDGE): _target_status = systems[_target['SYSTEM']].STATUS _target_system = self._CONFIG['SYSTEMS'][_target['SYSTEM']] if (_target['SYSTEM'],_target['TS']) in _sysIgnore: + #logger.debug("(DEDUP) OBP Source Skipping system %s TS: %s",_target['SYSTEM'],_target['TS']) continue if _target_system['MODE'] == 'OPENBRIDGE': if _noOBP == True: continue + #We want to ignore this system and TS combination if it's called again for this packet _sysIgnore.append((_target['SYSTEM'],_target['TS'])) + + #If target has quenched us, don't send if ('_bcsq' in _target_system) and (_dst_id in _target_system['_bcsq']) and (_target_system['_bcsq'][_dst_id] == _stream_id): + #logger.info('(%s) Conference Bridge: %s, is Source Quenched for Stream ID: %s, skipping system: %s TS: %s, TGID: %s', self._system, _bridge, int_id(_stream_id), _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) continue + + #If target has missed 6 (on 1 min) of keepalives, don't send if _target_system['ENHANCED_OBP'] and ('_bcka' not in _target_system or _target_system['_bcka'] < pkt_time - 60): continue + + #If talkgroup is prohibited by ACL if self._CONFIG['GLOBAL']['USE_ACL']: if not acl_check(_target['TGID'], self._CONFIG['GLOBAL']['TG1_ACL']): + #logger.info('(%s) TGID prohibited by ACL, not sending', _target['SYSTEM'], int_id(_dst_id)) continue + if not acl_check(_target['TGID'],_target_system['TG1_ACL']): + #logger.info('(%s) TGID prohibited by ACL, not sending', _target['SYSTEM']) continue - - # >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - # CONTROL DE SALTOS (MAX_HOPS) - _current_hops = int.from_bytes(_hops, 'big') if _hops else 0 - if _current_hops >= MAX_HOPS: - continue # Descartar silenciosamente - _new_hops = (_current_hops + 1).to_bytes(2, 'big') - # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< - + + # Is this a new call stream on the target? if (_stream_id not in _target_status): + # This is a new call stream on the target _target_status[_stream_id] = { 'START': pkt_time, 'CONTENTION':False, 'RFS': _rf_src, 'TGID': _dst_id, 'RX_PEER': _peer_id, - 'EMB_LC': { + 'EMB_LC': { # Asegurarse de que EMB_LC esté inicializado 1: b'\x00', 2: b'\x00', 3: b'\x00', 4: b'\x00', }, - 'H_LC': b'\x00', - 'T_LC': b'\x00', + 'H_LC': b'\x00', # Asegurarse de que H_LC esté inicializado + 'T_LC': b'\x00', # Asegurarse de que T_LC esté inicializado } + # Generate LCs (full and EMB) for the TX stream try: dst_lc = b''.join([self.STATUS[_stream_id]['LC'][0:3], _target['TGID'], _rf_src]) except Exception: @@ -1105,9 +1199,12 @@ class routerOBP(OPENBRIDGE): _target_status[_stream_id]['H_LC'] = bptc.encode_header_lc(dst_lc) _target_status[_stream_id]['T_LC'] = bptc.encode_terminator_lc(dst_lc) _target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(dst_lc) + logger.debug('(%s) Conference Bridge: %s, Call Bridged to OBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) if CONFIG['REPORTS']['REPORT']: systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,START,TX,{},{},{},{},{},{}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID'])).encode(encoding='utf-8', errors='ignore')) + + # Asegurarse de que todas las claves necesarias existan incluso si el stream ya estaba presente if 'EMB_LC' not in _target_status[_stream_id]: try: dst_lc = b''.join([self.STATUS[_stream_id]['LC'][0:3], _target['TGID'], _rf_src]) @@ -1115,6 +1212,7 @@ class routerOBP(OPENBRIDGE): except Exception: logger.exception('(to_target) caught exception while creating EMB_LC') return + if 'H_LC' not in _target_status[_stream_id]: try: dst_lc = b''.join([self.STATUS[_stream_id]['LC'][0:3], _target['TGID'], _rf_src]) @@ -1122,6 +1220,7 @@ class routerOBP(OPENBRIDGE): except Exception: logger.exception('(to_target) caught exception while creating H_LC') return + if 'T_LC' not in _target_status[_stream_id]: try: dst_lc = b''.join([self.STATUS[_stream_id]['LC'][0:3], _target['TGID'], _rf_src]) @@ -1129,23 +1228,45 @@ class routerOBP(OPENBRIDGE): except Exception: logger.exception('(to_target) caught exception while creating T_LC') return + + # Record the time of this packet so we can later identify a stale stream _target_status[_stream_id]['LAST'] = pkt_time + # Clear the TS bit -- all OpenBridge streams are effectively on TS1 _tmp_bits = _bits & ~(1 << 7) + + # Assemble transmit HBP packet header _tmp_data = b''.join([_data[:8], _target['TGID'], _data[11:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) + + # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET + # MUST RE-WRITE DESTINATION TGID IF DIFFERENT + # if _dst_id != rule['DST_GROUP']: dmrbits = bitarray(endian='big') dmrbits.frombytes(dmrpkt) + # Create a voice header packet (FULL LC) if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD: dmrbits = _target_status[_stream_id]['H_LC'][0:98] + dmrbits[98:166] + _target_status[_stream_id]['H_LC'][98:197] + # Create a voice terminator packet (FULL LC) elif _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VTERM: dmrbits = _target_status[_stream_id]['T_LC'][0:98] + dmrbits[98:166] + _target_status[_stream_id]['T_LC'][98:197] if CONFIG['REPORTS']['REPORT']: call_duration = pkt_time - _target_status[_stream_id]['START'] systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) + # Create a Burst B-E packet (Embedded LC) elif _dtype_vseq in [1,2,3,4]: dmrbits = dmrbits[0:116] + _target_status[_stream_id]['EMB_LC'][_dtype_vseq] + dmrbits[148:264] dmrpkt = dmrbits.tobytes() _tmp_data = b''.join([_tmp_data, dmrpkt]) + else: + # BEGIN CONTENTION HANDLING + # + # The rules for each of the 4 "ifs" below are listed here for readability. The Frame To Send is: + # From a different group than last RX from this HBSystem, but it has been less than Group Hangtime + # From a different group than last TX to this HBSystem, but it has been less than Group Hangtime + # From the same group as the last RX from this HBSystem, but from a different subscriber, and it has been less than stream timeout + # From the same group as the last TX to this HBSystem, but from a different subscriber, and it has been less than stream timeout + # The "continue" at the end of each means the next iteration of the for loop that tests for matching rules + # if ((_target['TGID'] != _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < _target_system['GROUP_HANGTIME'])): if self.STATUS[_stream_id]['CONTENTION'] == False: self.STATUS[_stream_id]['CONTENTION'] = True @@ -1166,12 +1287,16 @@ class routerOBP(OPENBRIDGE): self.STATUS[_stream_id]['CONTENTION'] = True logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS'])) continue + + # Is this a new call stream? if (_target_status[_target['TS']]['TX_STREAM_ID'] != _stream_id): + # Record the DST TGID and Stream ID _target_status[_target['TS']]['TX_START'] = pkt_time _target_status[_target['TS']]['TX_TGID'] = _target['TGID'] _target_status[_target['TS']]['TX_STREAM_ID'] = _stream_id _target_status[_target['TS']]['TX_RFS'] = _rf_src _target_status[_target['TS']]['TX_PEER'] = _peer_id + # Generate LCs (full and EMB) for the TX stream dst_lc = b''.join([self.STATUS[_stream_id]['LC'][0:3], _target['TGID'], _rf_src]) _target_status[_target['TS']]['TX_H_LC'] = bptc.encode_header_lc(dst_lc) _target_status[_target['TS']]['TX_T_LC'] = bptc.encode_terminator_lc(dst_lc) @@ -1180,53 +1305,69 @@ class routerOBP(OPENBRIDGE): logger.debug('(%s) Conference Bridge: %s, Call Bridged to HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) if CONFIG['REPORTS']['REPORT']: systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,START,TX,{},{},{},{},{},{}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID'])).encode(encoding='utf-8', errors='ignore')) + + # Set other values for the contention handler to test next time there is a frame to forward _target_status[_target['TS']]['TX_TIME'] = pkt_time _target_status[_target['TS']]['TX_TYPE'] = _dtype_vseq + + # Handle any necessary re-writes for the destination if _system['TS'] != _target['TS']: _tmp_bits = _bits ^ 1 << 7 else: _tmp_bits = _bits + + # Assemble transmit HBP packet header _tmp_data = b''.join([_data[:8], _target['TGID'], _data[11:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) + + # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET + # MUST RE-WRITE DESTINATION TGID IF DIFFERENT + # if _dst_id != rule['DST_GROUP']: dmrbits = bitarray(endian='big') dmrbits.frombytes(dmrpkt) + # Create a voice header packet (FULL LC) if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD: dmrbits = _target_status[_target['TS']]['TX_H_LC'][0:98] + dmrbits[98:166] + _target_status[_target['TS']]['TX_H_LC'][98:197] + # Create a voice terminator packet (FULL LC) elif _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VTERM: dmrbits = _target_status[_target['TS']]['TX_T_LC'][0:98] + dmrbits[98:166] + _target_status[_target['TS']]['TX_T_LC'][98:197] if CONFIG['REPORTS']['REPORT']: call_duration = pkt_time - _target_status[_target['TS']]['TX_START'] systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) + # Create a Burst B-E packet (Embedded LC) elif _dtype_vseq in [1,2,3,4]: dmrbits = dmrbits[0:116] + _target_status[_target['TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264] dmrpkt = dmrbits.tobytes() + #_tmp_data = b''.join([_tmp_data, dmrpkt, b'\x00\x00']) # Add two bytes of nothing since OBP doesn't include BER & RSSI bytes #_data[53:55] _tmp_data = b''.join([_tmp_data, dmrpkt]) - - # >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - # ENVIAR CON _new_hops (solo para OBP); para HBP, enviar sin hops - if _target_system['MODE'] == 'OPENBRIDGE': - systems[_target['SYSTEM']].send_system(_tmp_data, _new_hops, _ber, _rssi, _source_server, _source_rptr) - else: - systems[_target['SYSTEM']].send_system(_tmp_data, b'', _ber, _rssi, _source_server, _source_rptr) - # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< - + + # Transmit the packet to the destination system + systems[_target['SYSTEM']].send_system(_tmp_data,_hops,_ber,_rssi,_source_server, _source_rptr) + #logger.debug('(%s) Packet routed by bridge: %s to system: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) + #Ignore this system and TS pair if it's called again on this packet return(_sysIgnore) def sendDataToHBP(self,_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id): _int_dst_id = int_id(_dst_id) + #Assemble transmit HBP packet header _tmp_data = b''.join([_data[:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) _tmp_data = b''.join([_tmp_data, dmrpkt]) systems[_d_system].send_system(_tmp_data) logger.debug('(%s) UNIT Data Bridged to HBP on slot 1: %s DST_ID: %s',self._system,_d_system,_int_dst_id) if CONFIG['REPORTS']['REPORT']: systems[_d_system]._report.send_bridgeEvent('UNIT DATA,DATA,TX,{},{},{},{},{},{}'.format(_d_system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) - + def sendDataToOBP(self,_target,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_hops = b'',_source_server = b'\x00\x00\x00\x00', _ber = b'\x00', _rssi = b'\x00', _source_rptr = b'\x00\x00\x00\x00'): + _int_dst_id = int_id(_dst_id) _target_status = systems[_target].STATUS _target_system = self._CONFIG['SYSTEMS'][_target] + + #If target has missed 6 (on 1 min) of keepalives, don't send if _target_system['ENHANCED_OBP'] and '_bcka' in _target_system and _target_system['_bcka'] < pkt_time - 60: return + if (_stream_id not in _target_status): + # This is a new call stream on the target _target_status[_stream_id] = { 'START': pkt_time, 'CONTENTION':False, @@ -1235,11 +1376,17 @@ class routerOBP(OPENBRIDGE): 'RX_PEER': _peer_id, 'packets': 0 } + + # Record the time of this packet so we can later identify a stale stream _target_status[_stream_id]['LAST'] = pkt_time + # Clear the TS bit -- all OpenBridge streams are effectively on TS1 + #_tmp_bits = _bits & ~(1 << 7) + #rewrite slot if required if _slot == 2: _tmp_bits = _bits ^ 1 << 7 else: _tmp_bits = _bits + #Assemble transmit HBP packet header _tmp_data = b''.join([_data[:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) _tmp_data = b''.join([_tmp_data, dmrpkt]) systems[_target].send_system(_tmp_data,_hops,_ber,_rssi, _source_server, _source_rptr) @@ -1248,6 +1395,7 @@ class routerOBP(OPENBRIDGE): systems[_target]._report.send_bridgeEvent('UNIT DATA,DATA,TX,{},{},{},{},{},{}'.format(_target, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data,_hash, _hops = b'', _source_server = b'\x00\x00\x00\x00', _ber = b'\x00', _rssi = b'\x00', _source_rptr = b'\x00\x00\x00\x00'): + pkt_time = time() dmrpkt = _data[20:53] _bits = _data[15] @@ -1255,10 +1403,18 @@ class routerOBP(OPENBRIDGE): _h.update(_data) _pkt_crc = _h.digest() + # Match UNIT data, SMS/GPS, and send it to the dst_id if it is in SUB_MAP if _call_type == 'unit' and (_dtype_vseq == 6 or _dtype_vseq == 7 or _dtype_vseq == 8 or ((_stream_id not in self.STATUS) and _dtype_vseq == 3)): + _int_dst_id = int_id(_dst_id) + ##if ahex(dmrpkt)[27:-27] == b'd5d7f77fd757': + # This is a data call _data_call = True + + # Is this a new call stream? if (_stream_id not in self.STATUS): + + # This is a new call stream self.STATUS[_stream_id] = { 'START': pkt_time, 'CONTENTION':False, @@ -1272,8 +1428,10 @@ class routerOBP(OPENBRIDGE): 'loss': 0, 'crcs': set() } + self.STATUS[_stream_id]['LAST'] = pkt_time self.STATUS[_stream_id]['packets'] = self.STATUS[_stream_id]['packets'] + 1 + hr_times = {} for system in systems: if system != self._system and CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE': @@ -1287,12 +1445,18 @@ class routerOBP(OPENBRIDGE): else: if _stream_id in systems[system].STATUS and '1ST' in systems[system].STATUS[_stream_id] and systems[system].STATUS[_stream_id]['TGID'] == _dst_id: hr_times[system] = systems[system].STATUS[_stream_id]['1ST'] + + #use the minimum perf_counter to ensure + #We always use only the earliest packet fi = min(hr_times, key=hr_times.get, default = False) + hr_times = None + if not fi: logger.warning("(%s) OBP UNIT *LoopControl* fi is empty for some reason : %s, STREAM ID: %s, TG: %s, TS: %s",self._system, int_id(_stream_id), int_id(_dst_id),_sysslot) self.STATUS[_stream_id]['LAST'] = pkt_time return + if self._system != fi: if 'LOOPLOG' not in self.STATUS[_stream_id] or not self.STATUS[_stream_id]['LOOPLOG']: call_duration = pkt_time - self.STATUS[_stream_id]['START'] @@ -1303,11 +1467,96 @@ class routerOBP(OPENBRIDGE): self.STATUS[_stream_id]['LOOPLOG'] = True self.STATUS[_stream_id]['LAST'] = pkt_time return - # ... (resto del manejo de datos unitarios sin cambios) ... + + if _dtype_vseq == 3: + logger.info('(%s) *UNIT CSBK* STREAM ID: %s, RPTR: %s SUB: %s (%s) PEER: %s (%s) DST_ID %s (%s), TS %s, SRC: %s, RPTR: %s', \ + self._system, int_id(_stream_id), self.get_rptr(_source_rptr), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, int_id(_source_server),int_id(_source_rptr)) + if CONFIG['REPORTS']['REPORT']: + self._report.send_bridgeEvent('UNIT CSBK,DATA,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) + elif _dtype_vseq == 6: + logger.info('(%s) *UNIT DATA HEADER* STREAM ID: %s, RPTR: %s SUB: %s (%s) PEER: %s (%s) DST_ID %s (%s), TS %s, SRC: %s, RPTR: %s', \ + self._system, int_id(_stream_id),self.get_rptr(_source_rptr), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot,int_id(_source_server),int_id(_source_rptr)) + if CONFIG['REPORTS']['REPORT']: + self._report.send_bridgeEvent('UNIT DATA HEADER,DATA,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) + elif _dtype_vseq == 7: + logger.info('(%s) *UNIT VCSBK 1/2 DATA BLOCK * STREAM ID: %s, RPTR: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s, SRC: %s, RPTR: %s', \ + self._system, int_id(_stream_id), self.get_rptr(_source_rptr), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, int_id(_source_server),int_id(_source_rptr)) + if CONFIG['REPORTS']['REPORT']: + self._report.send_bridgeEvent('UNIT VCSBK 1/2 DATA BLOCK,DATA,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) + elif _dtype_vseq == 8: + logger.info('(%s) *UNIT VCSBK 3/4 DATA BLOCK * STREAM ID: %s, RPTR: %s, SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s, SRC: %s, RPTR: %s', \ + self._system, int_id(_stream_id), self.get_rptr(_source_rptr), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot,int_id(_source_server),int_id(_source_rptr)) + if CONFIG['REPORTS']['REPORT']: + self._report.send_bridgeEvent('UNIT VCSBK 3/4 DATA BLOCK,DATA,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) + else: + logger.info('(%s) *UNKNOWN DATA TYPE* STREAM ID: %s, RPTR: %s, SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s, SRC: %s, RPTR: %s', \ + self._system, int_id(_stream_id), self.get_rptr(_source_rptr), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot,int_id(_source_server),int_id(_source_rptr)) + + #Send all data to DATA-GATEWAY if enabled and valid + if CONFIG['GLOBAL']['DATA_GATEWAY'] and 'DATA-GATEWAY' in CONFIG['SYSTEMS'] and CONFIG['SYSTEMS']['DATA-GATEWAY']['MODE'] == 'OPENBRIDGE' and CONFIG['SYSTEMS']['DATA-GATEWAY']['ENABLED']: + logger.debug('(%s) DATA packet sent to DATA-GATEWAY',self._system) + self.sendDataToOBP('DATA-GATEWAY',_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_source_rptr,_ber,_rssi) + + #Send other openbridges + for system in systems: + if system == self._system: + continue + if system == 'DATA-GATEWAY': + continue + #We only want to send data calls to individual IDs via OpenBridge + #Only send if proto ver for bridge is > 1 + if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE' and CONFIG['SYSTEMS'][system]['VER'] > 1 and (_int_dst_id >= 1000000): + self.sendDataToOBP(system,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_hops,_source_server,_ber,_rssi) + + #If destination ID is in the Subscriber Map + if _dst_id in SUB_MAP: + (_d_system,_d_slot,_d_time) = SUB_MAP[_dst_id] + _dst_slot = systems[_d_system].STATUS[_d_slot] + logger.info('(%s) SUB_MAP matched, System: %s Slot: %s, Time: %s',self._system, _d_system,_d_slot,_d_time) + #If slot is idle for RX and TX + if (_dst_slot['RX_TYPE'] == HBPF_SLT_VTERM) and (_dst_slot['TX_TYPE'] == HBPF_SLT_VTERM) and (time() - _dst_slot['TX_TIME'] > CONFIG['SYSTEMS'][_d_system]['GROUP_HANGTIME']): + #rewrite slot if required + if _slot != _d_slot: + _tmp_bits = _bits ^ 1 << 7 + else: + _tmp_bits = _bits + self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id) + + else: + logger.debug('(%s) UNIT Data not bridged to HBP on slot 1 - target busy: %s DST_ID: %s',self._system,_d_system,_int_dst_id) + + else: + #If destination ID is logged in as a hotspot + for _d_system in systems: + if CONFIG['SYSTEMS'][_d_system]['MODE'] == 'MASTER': + for _to_peer in CONFIG['SYSTEMS'][_d_system]['PEERS']: + _int_to_peer = int_id(_to_peer) + if (str(_int_to_peer)[:7] == str(_int_dst_id)[:7]): + #(_d_system,_d_slot,_d_time) = SUB_MAP[_dst_id] + _d_slot = 2 + _dst_slot = systems[_d_system].STATUS[_d_slot] + logger.info('(%s) User Peer Hotspot ID matched, System: %s Slot: %s',self._system, _d_system,_d_slot) + #If slot is idle for RX and TX + if (_dst_slot['RX_TYPE'] == HBPF_SLT_VTERM) and (_dst_slot['TX_TYPE'] == HBPF_SLT_VTERM) and (time() - _dst_slot['TX_TIME'] > CONFIG['SYSTEMS'][_d_system]['GROUP_HANGTIME']): + #Always use slot2 for hotspots - many of them are simplex and this + #is the convention + #rewrite slot if required (slot 2 is used on hotspots) + if _slot != 2: + _tmp_bits = _bits ^ 1 << 7 + else: + _tmp_bits = _bits + self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id) + + else: + logger.debug('(%s) UNIT Data not bridged to HBP on slot %s - target busy: %s DST_ID: %s',self._system,_d_slot,_d_system,_int_dst_id) + self.STATUS[_stream_id]['crcs'].add(_pkt_crc) - + if _call_type == 'group' or _call_type == 'vcsbk': + # Is this a new call stream? if (_stream_id not in self.STATUS): + + # This is a new call stream self.STATUS[_stream_id] = { 'START': pkt_time, 'CONTENTION':False, @@ -1321,11 +1570,17 @@ class routerOBP(OPENBRIDGE): 'loss': 0, 'crcs': set() } + + # If we can, use the LC from the voice header as to keep all options intact if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD: decoded = decode.voice_head_term(dmrpkt) self.STATUS[_stream_id]['LC'] = decoded['LC'] + + # If we don't have a voice header then don't wait to decode the Embedded LC + # just make a new one from the HBP header. This is good enough, and it saves lots of time else: self.STATUS[_stream_id]['LC'] = b''.join([LC_OPT,_dst_id,_rf_src]) + _inthops = 0 if _hops: _inthops = int.from_bytes(_hops,'big') @@ -1333,22 +1588,30 @@ class routerOBP(OPENBRIDGE): self._system, int_id(_stream_id),get_alias(_rf_src, subscriber_ids),int_id(_rf_src),self.get_rptr(_source_rptr), int_id(_source_rptr), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot,int_id(_source_server),_inthops) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('GROUP VOICE,START,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) + else: if 'packets' in self.STATUS[_stream_id]: self.STATUS[_stream_id]['packets'] = self.STATUS[_stream_id]['packets'] +1 + #Finished stream handling# if '_fin' in self.STATUS[_stream_id]: if '_finlog' not in self.STATUS[_stream_id]: logger.debug("(%s) OBP *LoopControl* STREAM ID: %s ALREADY FINISHED FROM THIS SOURCE, IGNORING",self._system, int_id(_stream_id)) self.STATUS[_stream_id]['_finlog'] = True return + + #TIMEOUT if self.STATUS[_stream_id]['START'] + 180 < pkt_time: if 'LOOPLOG' not in self.STATUS[_stream_id] or not self.STATUS[_stream_id]['LOOPLOG']: logger.info("(%s) OBP *TIMEOUT*, STREAM ID: %s, TG: %s, IGNORE THIS SOURCE",self._system, int_id(_stream_id), int_id(_dst_id)) self.STATUS[_stream_id]['LOOPLOG'] = True self.STATUS[_stream_id]['LAST'] = pkt_time return + + #LoopControl hr_times = {} for system in systems: + # if system == self._system: + # continue if system != self._system and CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE': for _sysslot in systems[system].STATUS: if 'RX_STREAM_ID' in systems[system].STATUS[_sysslot] and _stream_id == systems[system].STATUS[_sysslot]['RX_STREAM_ID']: @@ -1358,13 +1621,19 @@ class routerOBP(OPENBRIDGE): self.STATUS[_stream_id]['LAST'] = pkt_time return else: + #if _stream_id in systems[system].STATUS and systems[system].STATUS[_stream_id]['START'] <= self.STATUS[_stream_id]['START']: if _stream_id in systems[system].STATUS and '1ST' in systems[system].STATUS[_stream_id] and systems[system].STATUS[_stream_id]['TGID'] == _dst_id: hr_times[system] = systems[system].STATUS[_stream_id]['1ST'] + + #use the minimum perf_counter to ensure + #We always use only the earliest packet fi = min(hr_times, key=hr_times.get, default = False) hr_times = None + if not fi: logger.warning("(%s) OBP *LoopControl* fi is empty for some reason : STREAM ID: %s, TG: %s, TS: %s",self._system, int_id(_stream_id), int_id(_dst_id),_sysslot) return + if self._system != fi: if 'LOOPLOG' not in self.STATUS[_stream_id] or not self.STATUS[_stream_id]['LOOPLOG']: call_duration = pkt_time - self.STATUS[_stream_id]['START'] @@ -1374,32 +1643,67 @@ class routerOBP(OPENBRIDGE): logger.debug("(%s) OBP *LoopControl* FIRST OBP %s, STREAM ID: %s, TG %s, IGNORE THIS SOURCE. PACKET RATE %0.2f/s",self._system, fi, int_id(_stream_id), int_id(_dst_id),call_duration) self.STATUS[_stream_id]['LOOPLOG'] = True self.STATUS[_stream_id]['LAST'] = pkt_time + if CONFIG['SYSTEMS'][self._system]['ENHANCED_OBP'] and '_bcsq' not in self.STATUS[_stream_id]: systems[self._system].send_bcsq(_dst_id,_stream_id) self.STATUS[_stream_id]['_bcsq'] = True return + + #Rate drop if self.STATUS[_stream_id]['packets'] > 18 and (self.STATUS[_stream_id]['packets'] / self.STATUS[_stream_id]['START'] > 25): logger.warning("(%s) *PacketControl* RATE DROP! Stream ID:, %s TGID: %s",self._system,int_id(_stream_id),int_id(_dst_id)) self.proxy_BadPeer() return - - # >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - # DESCARTE SILENCIOSO DE PAQUETES DUPLICADOS (por CRC) - if _seq > 0 and _pkt_crc in self.STATUS[_stream_id]['crcs']: - return # Descartar sin logs, sin conteo - # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< - + + #Duplicate handling# + #Handle inbound duplicates + #Duplicate complete packet + if self.STATUS[_stream_id]['lastData'] and self.STATUS[_stream_id]['lastData'] == _data and _seq > 1: + self.STATUS[_stream_id]['loss'] += 1 + logger.debug("(%s) *PacketControl* last packet is a complete duplicate of the previous one, disgarding. Stream ID:, %s TGID: %s, LOSS: %.2f%%",self._system,int_id(_stream_id),int_id(_dst_id),((self.STATUS[_stream_id]['loss'] / self.STATUS[_stream_id]['packets']) * 100)) + return + #Duplicate SEQ number + if _seq and _seq == self.STATUS[_stream_id]['lastSeq']: + self.STATUS[_stream_id]['loss'] += 1 + logger.debug("(%s) *PacketControl* Duplicate sequence number %s, disgarding. Stream ID:, %s TGID: %s, LOSS: %.2f%%",self._system,_seq,int_id(_stream_id),int_id(_dst_id),((self.STATUS[_stream_id]['loss'] / self.STATUS[_stream_id]['packets']) * 100)) + return + #Inbound out-of-order packets + if _seq and self.STATUS[_stream_id]['lastSeq'] and (_seq != 1) and (_seq < self.STATUS[_stream_id]['lastSeq']): + self.STATUS[_stream_id]['loss'] += 1 + logger.debug("%s) *PacketControl* Out of order packet - last SEQ: %s, this SEQ: %s, disgarding. Stream ID:, %s TGID: %s, LOSS: %.2f%%",self._system,self.STATUS[_stream_id]['lastSeq'],_seq,int_id(_stream_id),int_id(_dst_id),((self.STATUS[_stream_id]['loss'] / self.STATUS[_stream_id]['packets']) * 100)) + return + #Duplicate DMR payload to previuos packet (by hash + if _seq > 0 and _pkt_crc in self.STATUS[_stream_id]['crcs']: + self.STATUS[_stream_id]['loss'] += 1 + logger.debug("(%s) *PacketControl* DMR packet payload with hash: %s seen before in this stream, disgarding. Stream ID:, %s TGID: %s: SEQ:%s PACKETS: %s, LOSS: %.2f%% ",self._system,_pkt_crc,int_id(_stream_id),int_id(_dst_id),_seq, self.STATUS[_stream_id]['packets'],((self.STATUS[_stream_id]['loss'] / self.STATUS[_stream_id]['packets']) * 100)) + return + #Inbound missed packets + if _seq and self.STATUS[_stream_id]['lastSeq'] and _seq > (self.STATUS[_stream_id]['lastSeq']+1): + self.STATUS[_stream_id]['loss'] += 1 + logger.debug("(%s) *PacketControl* Missed packet(s) - last SEQ: %s, this SEQ: %s. Stream ID:, %s TGID: %s , LOSS: %.2f%%",self._system,self.STATUS[_stream_id]['lastSeq'],_seq,int_id(_stream_id),int_id(_dst_id),((self.STATUS[_stream_id]['loss'] / self.STATUS[_stream_id]['packets']) * 100)) + + #Save this sequence number + self.STATUS[_stream_id]['lastSeq'] = _seq + #Save this packet + self.STATUS[_stream_id]['lastData'] = _data + self.STATUS[_stream_id]['crcs'].add(_pkt_crc) self.STATUS[_stream_id]['LAST'] = pkt_time + + #Create STAT bridge for unknown TG if CONFIG['GLOBAL']['GEN_STAT_BRIDGES']: if int_id(_dst_id) >= 5 and int_id(_dst_id) != 9 and (str(int_id(_dst_id)) not in BRIDGES): logger.debug('(%s) Bridge for STAT TG %s does not exist. Creating',self._system, int_id(_dst_id)) make_stat_bridge(_dst_id) + _sysIgnore = deque() for _bridge in BRIDGES: for _system in BRIDGES[_bridge]: + if _system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True: _sysIgnore = self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,False,_sysIgnore,_hops, _source_server, _ber, _rssi, _source_rptr) + + # Final actions - Is this a voice terminator? if (_frame_type == HBPF_DATA_SYNC) and (_dtype_vseq == HBPF_SLT_VTERM): call_duration = pkt_time - self.STATUS[_stream_id]['START'] packet_rate = 0 @@ -1412,6 +1716,7 @@ class routerOBP(OPENBRIDGE): if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) self.STATUS[_stream_id]['_fin'] = True + self.STATUS[_stream_id]['lastSeq'] = False class routerHBP(HBSYSTEM):