diff --git a/bridge_master.py b/bridge_master.py index 983fc53..aa4555f 100644 --- a/bridge_master.py +++ b/bridge_master.py @@ -20,7 +20,6 @@ # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA ############################################################################### - ''' This application, in conjuction with it's rule file (rules.py) will work like a "conference bridge". This is similar to what most hams think of as a @@ -29,10 +28,8 @@ bridge will both receive traffic from, and send traffic to any other system joined to the same conference bridge. It does not provide end-to-end connectivity as each end system must individually be joined to a conference bridge (a name you create in the configuraiton file) to pass traffic. - This program currently only works with group voice calls. ''' - # Python modules we need import sys from bitarray import bitarray @@ -44,21 +41,17 @@ from setproctitle import setproctitle from collections import deque from random import randint import secrets - #from crccheck.crc import Crc32 from hashlib import blake2b - # Twisted is pretty important, so I keep it separate from twisted.internet.protocol import Factory, Protocol from twisted.protocols.basic import NetstringReceiver from twisted.internet import reactor, task from twisted.web.server import Site - #from spyne import Application #from spyne.server.twisted import TwistedWebResource #from spyne.protocol.http import HttpRpc #from spyne.protocol.json import JsonDocument - # Things we import from the main hblink module from hblink import HBSYSTEM, OPENBRIDGE, systems, hblink_handler, reportFactory, REPORT_OPCODES, mk_aliases, acl_check from dmr_utils3.utils import bytes_3, int_id, get_alias, bytes_4 @@ -70,27 +63,21 @@ from const import * from mk_voice import pkt_gen from utils import load_json, save_json #from voice_lib import words - #Read voices from read_ambe import readAMBE #Remap some words for certain languages from i8n_voice_map import voiceMap - # Stuff for socket reporting import pickle # REMOVE LATER from datetime import datetime # The module needs logging, but handlers, etc. are controlled by the parent import logging logger = logging.getLogger(__name__) - #REGEX import re - from binascii import b2a_hex as ahex - from AMI import AMI #from API import FD_API, FD_APIUserDefinedContext - # Does anybody read this stuff? There's a PEP somewhere that says I should do this. __author__ = 'Cortney T. Buffington, N0MJS, Forked by Simon Adlem - G7RZU' __copyright__ = 'Copyright (c) 2016-2019 Cortney T. Buffington, N0MJS and the K0USY Group, Simon Adlem, G7RZU 2020,2021, 2022' @@ -98,22 +85,15 @@ __credits__ = 'Colin Durbridge, G4EML, Steve Zingman, N4IRS; Mike Zingman, N4 __license__ = 'GNU GPLv3' __maintainer__ = 'Simon Adlem G7RZU' __email__ = 'simon@gb7fr.org.uk' - #Set header bits #used for slot rewrite and type rewrite def header(slot,call_type,bits): - if not bits: bits = 0b00100000 - bits = slot << 7 | bits - if call_type == 'unit': - bits = 0b00000011 | bits - return bits - # Timed loop used for reporting HBP status # # REPORT BASED ON THE TYPE SELECTED IN THE MAIN CONFIG FILE @@ -129,38 +109,27 @@ def config_reports(_config, _factory): i = i +1 logger.info('(REPORT) %s systems have at least one peer',i) logger.info('(REPORT) Subscriber Map has %s entries',len(SUB_MAP)) - logger.info('(REPORT) HBlink TCP reporting server configured') - report_server = _factory(_config) report_server.clients = [] reactor.listenTCP(_config['REPORTS']['REPORT_PORT'], report_server) - reporting = task.LoopingCall(reporting_loop, logger, report_server) reporting.start(_config['REPORTS']['REPORT_INTERVAL']) - return report_server - # Start API server def config_API(_config, _bridges): - application = Application([FD_API], tns='adn.api', in_protocol=HttpRpc(validator='soft'), out_protocol=JsonDocument() ) - def _on_method_call(ctx): ctx.udc = FD_APIUserDefinedContext(CONFIG,_bridges) - application.event_manager.add_listener('method_call', _on_method_call) - resource = TwistedWebResource(application) site = Site(resource) - r = reactor.listenTCP(8000, site, interface='0.0.0.0') return(r) - # Import Bridging rules # Note: A stanza *must* exist for any MASTER or CLIENT configured in the main # configuration file and listed as "active". It can be empty, @@ -172,7 +141,6 @@ def make_bridges(_rules): for _system in _rules[_bridge]: if _system['SYSTEM'] not in CONFIG['SYSTEMS']: sys.exit('ERROR: Conference bridge "{}" references a system named "{}" that is not enabled in the main configuration'.format(_bridge, _system['SYSTEM'])) - _system['TGID'] = bytes_3(_system['TGID']) for i, e in enumerate(_system['ON']): _system['ON'][i] = bytes_3(_system['ON'][i]) @@ -183,10 +151,8 @@ def make_bridges(_rules): _system['TIMER'] = time() + _system['TIMEOUT'] else: _system['TIMER'] = time() - # if _bridge[0:1] == '#': # continue - for _confsystem in CONFIG['SYSTEMS']: #if _confsystem[0:3] == 'OBP': if CONFIG['SYSTEMS'][_confsystem]['MODE'] != 'MASTER': @@ -208,9 +174,7 @@ def make_bridges(_rules): _tmout = CONFIG['SYSTEMS'][_confsystem]['DEFAULT_UA_TIMER'] if ts2 == False: _rules[_bridge].append({'SYSTEM': _confsystem, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [bytes_3(4000)],'ON': [],'RESET': [], 'TIMER': time()}) - return _rules - ### MODIFIED: Updated to handle all special TGIDs (9990-9999) with a 1-minute timeout def make_single_bridge(_tgid,_sourcesystem,_slot,_tmout): _tgid_s = str(int_id(_tgid)) @@ -232,12 +196,9 @@ def make_single_bridge(_tgid,_sourcesystem,_slot,_tmout): else: BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [_tgid,],'RESET': [], 'TIMER': time()}) BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 2, 'TGID': _tgid,'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [_tgid,],'RESET': [], 'TIMER': time()}) - if _system[0:3] == 'OBP' and (int_id(_tgid) >= 79 and (int_id(_tgid) < 9990 or int_id(_tgid) > 9999)): BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'NONE','OFF': [],'ON': [],'RESET': [], 'TIMER': time()}) - ### END MODIFIED ### - #Make static bridge - used for on-the-fly relay bridges def make_stat_bridge(_tgid): _tgid_s = str(int_id(_tgid)) @@ -248,10 +209,8 @@ def make_stat_bridge(_tgid): _tmout = CONFIG['SYSTEMS'][_system]['DEFAULT_UA_TIMER'] BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [_tgid,],'RESET': [], 'TIMER': time()}) BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 2, 'TGID': _tgid,'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [_tgid,],'RESET': [], 'TIMER': time()}) - if _system[0:3] == 'OBP': BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'STAT','OFF': [],'ON': [],'RESET': [], 'TIMER': time()}) - def make_default_reflector(reflector,_tmout,system): bridge = ''.join(['#',str(reflector)]) #_tmout = CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER'] @@ -264,9 +223,7 @@ def make_default_reflector(reflector,_tmout,system): bridgetemp.append({'SYSTEM': system, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': True,'TIMEOUT': _tmout * 60,'TO_TYPE': 'OFF','OFF': [],'ON': [bytes_3(reflector),],'RESET': [], 'TIMER': time() + (_tmout * 60)}) else: bridgetemp.append(bridgesystem) - BRIDGES[bridge] = bridgetemp - def make_static_tg(tg,ts,_tmout,system): #_tmout = CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER'] if str(tg) not in BRIDGES: @@ -277,9 +234,7 @@ def make_static_tg(tg,ts,_tmout,system): bridgetemp.append({'SYSTEM': system, 'TS': ts, 'TGID': bytes_3(tg),'ACTIVE': True,'TIMEOUT': _tmout * 60,'TO_TYPE': 'OFF','OFF': [],'ON': [bytes_3(tg),],'RESET': [], 'TIMER': time() + (_tmout * 60)}) else: bridgetemp.append(bridgesystem) - BRIDGES[str(tg)] = bridgetemp - def reset_static_tg(tg,ts,_tmout,system): #_tmout = CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER'] bridgetemp = deque() @@ -289,12 +244,10 @@ def reset_static_tg(tg,ts,_tmout,system): bridgetemp.append({'SYSTEM': system, 'TS': ts, 'TGID': bytes_3(tg),'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [bytes_3(tg),],'RESET': [], 'TIMER': time() + (_tmout * 60)}) else: bridgetemp.append(bridgesystem) - BRIDGES[str(tg)] = bridgetemp except KeyError: logger.exception('(%s) KeyError in reset_static_tg() - bridge gone away? TG: %s',system,tg) return - def reset_all_reflector_system(_tmout,system): for system in CONFIG['SYSTEMS']: for bridge in BRIDGES: @@ -306,7 +259,6 @@ def reset_all_reflector_system(_tmout,system): else: bridgetemp.append(bridgesystem) BRIDGES[bridge] = bridgetemp - ### MODIFIED: Updated to handle all special TGIDs (9990-9999) with a 1-minute timeout def make_single_reflector(_tgid,_tmout,_sourcesystem): _tgid_s = str(int_id(_tgid)) @@ -325,9 +277,7 @@ def make_single_reflector(_tgid,_tmout,_sourcesystem): BRIDGES[_bridge].append({'SYSTEM': _system, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': False,'TIMEOUT': CONFIG['SYSTEMS'][_system]['DEFAULT_UA_TIMER'] * 60,'TO_TYPE': 'ON','OFF': [],'ON': [_tgid,],'RESET': [], 'TIMER': time()}) if _system[0:3] == 'OBP' and (int_id(_tgid) >= 79 and (int_id(_tgid) < 9990 or int_id(_tgid) > 9999)): BRIDGES[_bridge].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'NONE','OFF': [],'ON': [],'RESET': [], 'TIMER': time()}) - ### END MODIFIED ### - def remove_bridge_system(system): _bridgestemp = {} _bridgetemp = {} @@ -341,11 +291,9 @@ def remove_bridge_system(system): if _bridge not in _bridgestemp: _bridgestemp[_bridge] = [] _bridgestemp[_bridge].append({'SYSTEM': system, 'TS': _bridgesystem['TS'], 'TGID': _bridgesystem['TGID'],'ACTIVE': False,'TIMEOUT': _bridgesystem['TIMEOUT'],'TO_TYPE': 'ON','OFF': [],'ON': [_bridgesystem['TGID'],],'RESET': [], 'TIMER': time() + _bridgesystem['TIMEOUT']}) - BRIDGES.update(_bridgestemp) - def deactivate_all_dynamic_bridges(system_name): - """Desactiva todos los bridges dinámicos (no estáticos, no reflectores) de un sistema.""" + """Desactiva todos los bridges dinámicos (no estáticos, no reflectores) de un sistema.""" for _bridge in BRIDGES: if _bridge[0:1] == '#': # Saltar reflectores continue @@ -355,19 +303,15 @@ def deactivate_all_dynamic_bridges(system_name): _sys_entry['ACTIVE'] = False logger.info('(ROUTER) Deactivated dynamic bridge due to TG/ID 4000: System: %s, Bridge: %s, TS: %s, TGID: %s', system_name, _bridge, _sys_entry['TS'], int_id(_sys_entry['TGID'])) - ### MODIFIED: Core logic updated to handle special TGIDs (9990-9999) correctly with SINGLE_MODE def rule_timer_loop(): logger.debug('(ROUTER) routerHBP Rule timer loop started') _now = time() _remove_bridges = deque() - - # Mantener registro de bridges dinámicos activos por sistema + # Mantener registro de bridges dinámicos activos por sistema _active_dynamic_bridges = {} - for _bridge in BRIDGES: _bridge_used = False - ### MODIFIED: Detect special TGIDs (9990-9999) to exclude them from infinite timer logic _is_special_tg = False if _bridge[0:1] != '#': # No es un reflector @@ -378,22 +322,19 @@ def rule_timer_loop(): except ValueError: pass ### END MODIFIED ### - for _system in BRIDGES[_bridge]: _system_config = CONFIG['SYSTEMS'][_system['SYSTEM']] _is_single_mode = _system_config.get('SINGLE_MODE', False) - - # Si SINGLE_MODE está DESACTIVADO y es bridge dinámico, usar timer infinito + # Si SINGLE_MODE está DESACTIVADO y es bridge dinámico, usar timer infinito _is_dynamic_bridge = _bridge[0:1] != '#' and _system['TO_TYPE'] != 'STAT' - ### MODIFIED: Added 'and not _is_special_tg' to the condition if not _is_single_mode and _is_dynamic_bridge and _system['SYSTEM'][0:3] != 'OBP' and not _is_special_tg: ### END MODIFIED ### - # SINGLE MODE DESACTIVADO - Timer infinito para bridges dinámicos + # SINGLE MODE DESACTIVADO - Timer infinito para bridges dinámicos if _system['TO_TYPE'] == 'ON': if _system['ACTIVE'] == True: _bridge_used = True - # Registrar bridge dinámico activo + # Registrar bridge dinámico activo if _system['SYSTEM'] not in _active_dynamic_bridges: _active_dynamic_bridges[_system['SYSTEM']] = [] _active_dynamic_bridges[_system['SYSTEM']].append((_bridge, _system)) @@ -410,7 +351,7 @@ def rule_timer_loop(): _bridge_used = True logger.debug('(ROUTER) Conference Bridge ACTIVE (no change): System: %s Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) else: - # COMPORTAMIENTO ORIGINAL (SINGLE MODE ACTIVADO o bridges estáticos o TGIDs especiales) + # COMPORTAMIENTO ORIGINAL (SINGLE MODE ACTIVADO o bridges estáticos o TGIDs especiales) if _system['TO_TYPE'] == 'ON': if _system['ACTIVE'] == True: _bridge_used = True @@ -444,19 +385,14 @@ def rule_timer_loop(): elif _system['SYSTEM'][0:3] == 'OBP' and _system['TO_TYPE'] == 'STAT': _bridge_used = True logger.debug('(ROUTER) Conference Bridge NO ACTION: System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) - if _bridge_used == False: _remove_bridges.append(_bridge) - for _bridgerem in _remove_bridges: del BRIDGES[_bridgerem] logger.debug('(ROUTER) Unused conference bridge %s removed',_bridgerem) - if CONFIG['REPORTS']['REPORT']: report_server.send_clients(b'bridge updated') - ### END MODIFIED ### - def statTrimmer(): logger.debug('(ROUTER) STAT trimmer loop started') _remove_bridges = deque() @@ -477,18 +413,15 @@ def statTrimmer(): logger.debug('(ROUTER) STAT bridge %s removed',_bridgerem) if CONFIG['REPORTS']['REPORT']: report_server.send_clients(b'bridge updated') - #Debug and fix bridge table issues. def bridgeDebug(): logger.debug('(BRIDGEDEBUG) Running bridge debug') _rst_time = time() statroll = 0 - #Kill off any bridges that should nnot exist, ever. for b in ['0','1','2','3','4','5','6','7','8','9']: BRIDGES.pop(b,None) BRIDGES.pop('#'+b, None) - for system in CONFIG['SYSTEMS']: bridgeroll = 0 dialroll = 0 @@ -503,12 +436,10 @@ def bridgeDebug(): activeroll += 1 else: activeroll += 1 - if enabled_system['TO_TYPE'] == 'STAT': statroll += 1 if bridgeroll: logger.debug('(BRIDGEDEBUG) system %s has %s bridges of which %s are in an ACTIVE state', system, bridgeroll, activeroll) - if dialroll > 1 and CONFIG['SYSTEMS'][system]['MODE'] == 'MASTER': logger.warning('(BRIDGEDEBUG) system %s has more than one active dial bridge (%s) - fixing',system, dialroll) times = {} @@ -528,15 +459,12 @@ def bridgeDebug(): _setbridge = int(_bridge[1:]) else: _setbridge = int(_bridge) - if bridgesystem['SYSTEM'] == system and bridgesystem['TS'] == 2: bridgetemp.append({'SYSTEM': system, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [bytes_3(_setbridge),],'RESET': [], 'TIMER': _rst_time + (_tmout * 60)}) else: bridgetemp.append(bridgesystem) BRIDGES[_bridge] = bridgetemp - logger.info('(BRIDGEDEBUG) The server currently has %s STATic bridges',statroll) - def kaReporting(): logger.debug('(ROUTER) KeepAlive reporting loop started') for system in systems: @@ -546,7 +474,6 @@ def kaReporting(): logger.warning('(ROUTER) not sending to system %s as KeepAlive never seen',system) elif CONFIG['SYSTEMS'][system]['_bcka'] < time() - 60: logger.warning('(ROUTER) not sending to system %s as last KeepAlive was %s seconds ago',system, int(time() - CONFIG['SYSTEMS'][system]['_bcka'])) - #Write SUB_MAP to disk def subMapWrite(): try: @@ -556,7 +483,6 @@ def subMapWrite(): logger.info('(SUBSCRIBER) Writing SUB_MAP to disk') except: logger.warning('(SUBSCRIBER) Cannot write SUB_MAP to file') - #Subscriber Map trimmer loop def SubMapTrimmer(): logger.debug('(SUBSCRIBER) Subscriber Map trimmer loop started') @@ -565,23 +491,19 @@ def SubMapTrimmer(): for _subscriber in SUB_MAP: if SUB_MAP[_subscriber][2] < (_sub_time - 86400): _remove_list.append(_subscriber) - for _remove in _remove_list: SUB_MAP.pop(_remove) if CONFIG['ALIASES']['SUB_MAP_FILE']: subMapWrite() - # run this every 10 seconds to trim stream ids def stream_trimmer_loop(): logger.debug('(ROUTER) Trimming inactive stream IDs from system lists') _now = time() - for system in systems: # HBP systems, master and peer if CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE': for slot in range(1,3): _slot = systems[system].STATUS[slot] - # RX slot check if _slot['RX_TYPE'] != HBPF_SLT_VTERM and _slot['RX_TIME'] < _now - 5: _slot['RX_TYPE'] = HBPF_SLT_VTERM @@ -597,7 +519,6 @@ def stream_trimmer_loop(): #Null stream_id - for loop control if _slot['RX_TIME'] < _now - 60: _slot['RX_STREAM_ID'] = b'\x00' - # TX slot check if _slot['TX_TYPE'] != HBPF_SLT_VTERM and _slot['TX_TIME'] < _now - 5: _slot['TX_TYPE'] = HBPF_SLT_VTERM @@ -605,20 +526,17 @@ def stream_trimmer_loop(): system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_RFS']), int_id(_slot['TX_TGID']), slot, _slot['TX_TIME'] - _slot['TX_START']) if CONFIG['REPORTS']['REPORT']: systems[system]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_PEER']), int_id(_slot['TX_RFS']), slot, int_id(_slot['TX_TGID']), _slot['TX_TIME'] - _slot['TX_START']).encode(encoding='utf-8', errors='ignore')) - # OBP systems - # We can't delete items from a dicationry that's being iterated, so we have to make a temporarly list of entrys to remove later + # We can't delete items from a dicationary that's being iterated, so we have to make a temporarly list of entrys to remove later if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': remove_list = deque() fin_list = deque() for stream_id in systems[system].STATUS: - #if stream already marked as finished, just remove it if '_fin' in systems[system].STATUS[stream_id] and systems[system].STATUS[stream_id]['LAST'] < _now - 180: logger.debug('(%s) *FINISHED STREAM* STREAM ID: %s',system, int_id(stream_id)) fin_list.append(stream_id) continue - try: if '_to' not in systems[system].STATUS[stream_id] and '_fin' not in systems[system].STATUS[stream_id] and systems[system].STATUS[stream_id]['LAST'] < _now - 5: _stream = systems[system].STATUS[stream_id] @@ -641,7 +559,6 @@ def stream_trimmer_loop(): else: logger.debug('(%s) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %.2f', \ system, int_id(stream_id), get_alias(int_id(_stream['RFS']), subscriber_ids), get_alias(int_id(_stream['RX_PEER']), peer_ids), get_alias(int_id(_stream['TGID']), talkgroup_ids), _stream['LAST'] - _stream['START']) - if CONFIG['REPORTS']['REPORT']: systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_stream['RX_PEER']), int_id(_stream['RFS']), 1, int_id(_stream['TGID']), _stream['LAST'] - _stream['START']).encode(encoding='utf-8', errors='ignore')) systems[system].STATUS[stream_id]['_to'] = True @@ -650,7 +567,6 @@ def stream_trimmer_loop(): logger.exception("(%s) Keyerror - stream trimmer Stream ID: %s",system,stream_id, exc_info=e) systems[system].STATUS[stream_id]['LAST'] = _now continue - try: if systems[system].STATUS[stream_id]['LAST'] < _now - 180: remove_list.append(stream_id) @@ -658,18 +574,14 @@ def stream_trimmer_loop(): logger.exception("(%s) Keyerror - stream trimmer Stream ID: %s",system,stream_id, exc_info=e) systems[system].STATUS[stream_id]['LAST'] = _now continue - #remove finished for stream_id in fin_list: removed = systems[system].STATUS.pop(stream_id) - for stream_id in remove_list: if stream_id in systems[system].STATUS: _stream = systems[system].STATUS[stream_id] _sysconfig = CONFIG['SYSTEMS'][system] - removed = systems[system].STATUS.pop(stream_id) - try: _bcsq_remove = deque() for tgid in _sysconfig['_bcsq']: @@ -681,7 +593,6 @@ def stream_trimmer_loop(): pass else: logger.debug('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS]) - def sendVoicePacket(self,pkt,_source_id,_dest_id,_slot): _stream_id = pkt[16:20] _pkt_time = time() @@ -697,9 +608,7 @@ def sendVoicePacket(self,pkt,_source_id,_dest_id,_slot): else: systems[system].STATUS[_stream_id]['LAST'] = _pkt_time _slot['TX_TIME'] = _pkt_time - self.send_system(pkt) - def sendSpeech(self,speech): logger.debug('(%s) Inside sendspeech thread',self._system) sleep(1) @@ -714,9 +623,7 @@ def sendSpeech(self,speech): #Packet every 60ms sleep(0.058) reactor.callFromThread(sendVoicePacket,self,pkt,_source_id,_nine,_slot) - logger.debug('(%s) Sendspeech thread ended',self._system) - def disconnectedVoice(system): _nine = bytes_3(9) _source_id = bytes_3(5000) @@ -731,17 +638,13 @@ def disconnectedVoice(system): _say.append(words[_lang]['to']) _say.append(words[_lang]['silence']) _say.append(words[_lang]['silence']) - for number in str(CONFIG['SYSTEMS'][system]['DEFAULT_REFLECTOR']): _say.append(words[_lang][number]) _say.append(words[_lang]['silence']) else: _say.append(words[_lang]['notlinked']) - _say.append(words[_lang]['silence']) - speech = pkt_gen(_source_id, _nine, bytes_4(9), 1, _say) - sleep(1) _slot = systems[system].STATUS[2] while True: @@ -755,7 +658,6 @@ def disconnectedVoice(system): _pkt_time = time() reactor.callFromThread(sendVoicePacket,systems[system],pkt,_source_id,_nine,_slot) logger.debug('(%s) disconnected voice thread end',system) - def playFileOnRequest(self,fileNumber): system = self._system _lang = CONFIG['SYSTEMS'][system]['ANNOUNCEMENT_LANGUAGE'] @@ -783,22 +685,17 @@ def playFileOnRequest(self,fileNumber): _pkt_time = time() reactor.callFromThread(sendVoicePacket,self,pkt,_source_id,_nine,_slot) logger.debug('(%s) Sending AMBE file %s end',system,fileNumber) - def threadIdent(): logger.debug('(IDENT) starting ident thread') reactor.callInThread(ident) - def threadAlias(): logger.debug('(ALIAS) starting alias thread') reactor.callInThread(aliasb) - def setAlias(_peer_ids,_subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums): peer_ids, subscriber_ids, talkgroup_ids,local_subscriber_ids,server_ids,checksums = _peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids,_server_ids,_checksums - def aliasb(): _peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums = mk_aliases(CONFIG) - reactor.callFromThread(setAlias,_peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums) - + reactor.callInThread(setAlias,_peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums) def ident(): for system in systems: if CONFIG['SYSTEMS'][system]['MODE'] != 'MASTER': @@ -820,9 +717,7 @@ def ident(): if (_slot['RX_TYPE'] == HBPF_SLT_VTERM) and (_slot['TX_TYPE'] == HBPF_SLT_VTERM) and (time() - _slot['TX_TIME'] > 30 and time() - _slot['RX_TIME'] > 30): _all_call = bytes_3(16777215) _source_id= bytes_3(5000) - _dst_id = b'' - if 'OVERRIDE_IDENT_TG' in CONFIG['SYSTEMS'][system] and CONFIG['SYSTEMS'][system]['OVERRIDE_IDENT_TG'] and int(CONFIG['SYSTEMS'][system]['OVERRIDE_IDENT_TG']) > 0 and int(CONFIG['SYSTEMS'][system]['OVERRIDE_IDENT_TG'] < 16777215): _dst_id = bytes_3(CONFIG['SYSTEMS'][system]['OVERRIDE_IDENT_TG']) else: @@ -839,7 +734,6 @@ def ident(): _say.append(words[_lang]['silence']) _say.append(words[_lang]['silence']) _say.append(words[_lang]['silence']) - _systemcs = re.sub(r'\W+', '', _callsign) _systemcs.upper() for character in _systemcs: @@ -850,12 +744,9 @@ def ident(): _say.append(words[_lang]['silence']) _say.append(words[_lang]['silence']) _say.append(words[_lang]['silence']) - _say.append(words[_lang]['adn']) - _peer_id = CONFIG['GLOBAL']['SERVER_ID'] speech = pkt_gen(_source_id, _dst_id, _peer_id, 1, _say) - sleep(1) _slot = systems[system].STATUS[2] while True: @@ -865,11 +756,9 @@ def ident(): break #Packet every 60ms sleep(0.058) - _stream_id = pkt[16:20] _pkt_time = time() reactor.callFromThread(sendVoicePacket,systems[system],pkt,_source_id,_dst_id,_slot) - def bridge_reset(): logger.debug('(BRIDGERESET) Running bridge resetter') for _system in CONFIG['SYSTEMS']: @@ -882,12 +771,9 @@ def bridge_reset(): pass CONFIG['SYSTEMS'][_system]['_reset'] = False CONFIG['SYSTEMS'][_system]['_resetlog'] = False - def options_config(): logger.debug('(OPTIONS) Running options parser') - prohibitedTGs = [0,1,2,3,4,5,9,9990,9991,9992,9993,9994,9995,9996,9997,9998,9999] - for _system in CONFIG['SYSTEMS']: try: if CONFIG['SYSTEMS'][_system]['MODE'] != 'MASTER': @@ -907,7 +793,6 @@ def options_config(): continue _options[k] = v logger.debug('(OPTIONS) Options found for %s',_system) - if '_opt_key' in CONFIG['SYSTEMS'][_system] and CONFIG['SYSTEMS'][_system]['_opt_key']: if 'KEY' not in _options: logger.debug('(OPTIONS) %s, options key set but no key in options string, skipping',_system) @@ -935,7 +820,6 @@ def options_config(): _options['OVERRIDE_IDENT_TG'] = _options.pop('VOICETG') if 'IDENT' in _options: _options['VOICE'] = _options.pop('IDENT') - #DMR+ style options if 'StartRef' in _options: _options['DEFAULT_REFLECTOR'] = _options.pop('StartRef') @@ -977,7 +861,6 @@ def options_config(): _options['TS2_STATIC'] = ''.join([_options['TS2_STATIC'],',',_options.pop('TS2_8')]) if 'TS2_9' in _options: _options['TS2_STATIC'] = ''.join([_options['TS2_STATIC'],',',_options.pop('TS2_9')]) - if 'UserLink' in _options: _options.pop('UserLink') if 'TS1_STATIC' not in _options: @@ -1028,7 +911,6 @@ def options_config(): if isinstance(_options['DEFAULT_UA_TIMER'], str) and not _options['DEFAULT_UA_TIMER'].isdigit(): logger.debug('(OPTIONS) %s - DEFAULT_UA_TIMER is not an integer, ignoring',_system) continue - #if the UA timer is set to 0 - actually set it to (close to) maximum size of a 32 #bit signed int - which works out at around 68 years! #For all practical purposes, this implements an unlimited timer - aka sticky static. @@ -1054,7 +936,6 @@ def options_config(): else: if ts2 == False: BRIDGES[_bridge].append({'SYSTEM': _system, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [bytes_3(4000)],'ON': [],'RESET': [], 'TIMER': time()}) - if int(_options['DEFAULT_REFLECTOR']) != CONFIG['SYSTEMS'][_system]['DEFAULT_REFLECTOR']: if int(_options['DEFAULT_REFLECTOR']) > 0: logger.debug('(OPTIONS) %s default reflector changed, updating',_system) @@ -1062,11 +943,9 @@ def options_config(): make_default_reflector(int(_options['DEFAULT_REFLECTOR']),_tmout,_system) elif int(_options['DEFAULT_REFLECTOR']) in prohibitedTGs and not bool(_options['DEFAULT_REFLECTOR']): logger.debug('(OPTIONS) %s default reflector is prohibited, ignoring change',_system) - else: logger.debug('(OPTIONS) %s default reflector disabled, updating',_system) reset_all_reflector_system(_tmout,_system) - ts1 = [] if _options['TS1_STATIC'] != CONFIG['SYSTEMS'][_system]['TS1_STATIC']: _tmout = int(_options['DEFAULT_UA_TIMER']) @@ -1110,7 +989,6 @@ def options_config(): continue tg = int(tg) make_static_tg(tg,2,_tmout,_system) - CONFIG['SYSTEMS'][_system]['TS1_STATIC'] = _options['TS1_STATIC'] CONFIG['SYSTEMS'][_system]['TS2_STATIC'] = _options['TS2_STATIC'] CONFIG['SYSTEMS'][_system]['DEFAULT_REFLECTOR'] = int(_options['DEFAULT_REFLECTOR']) @@ -1119,13 +997,15 @@ def options_config(): logger.exception('(OPTIONS) caught exception: %s',e) continue +# ======================== +# NUEVO: Constante para límite de saltos +# ======================== +MAX_HOPS = 15 # Máximo número de saltos permitidos en la malla class routerOBP(OPENBRIDGE): - def __init__(self, _name, _config, _report): OPENBRIDGE.__init__(self, _name, _config, _report) self.STATUS = {} - def get_rptr(self,_sid): _int_peer_id = int_id(_sid) if _int_peer_id in local_subscriber_ids: @@ -1136,7 +1016,6 @@ class routerOBP(OPENBRIDGE): return peer_ids[_int_peer_id] else: return _int_peer_id - def to_target(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,_noOBP,sysIgnore, _hops = b'', _source_server = b'\x00\x00\x00\x00', _ber = b'\x00', _rssi = b'\x00', _source_rptr = b'\x00\x00\x00\x00'): _sysIgnore = sysIgnore for _target in BRIDGES[_bridge]: @@ -1151,26 +1030,21 @@ class routerOBP(OPENBRIDGE): continue #We want to ignore this system and TS combination if it's called again for this packet _sysIgnore.append((_target['SYSTEM'],_target['TS'])) - #If target has quenched us, don't send if ('_bcsq' in _target_system) and (_dst_id in _target_system['_bcsq']) and (_target_system['_bcsq'][_dst_id] == _stream_id): #logger.info('(%s) Conference Bridge: %s, is Source Quenched for Stream ID: %s, skipping system: %s TS: %s, TGID: %s', self._system, _bridge, int_id(_stream_id), _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) continue - #If target has missed 6 (on 1 min) of keepalives, don't send if _target_system['ENHANCED_OBP'] and ('_bcka' not in _target_system or _target_system['_bcka'] < pkt_time - 60): continue - #If talkgroup is prohibited by ACL if self._CONFIG['GLOBAL']['USE_ACL']: if not acl_check(_target['TGID'], self._CONFIG['GLOBAL']['TG1_ACL']): #logger.info('(%s) TGID prohibited by ACL, not sending', _target['SYSTEM'], int_id(_dst_id)) continue - if not acl_check(_target['TGID'],_target_system['TG1_ACL']): #logger.info('(%s) TGID prohibited by ACL, not sending', _target['SYSTEM']) continue - # Is this a new call stream on the target? if (_stream_id not in _target_status): # This is a new call stream on the target @@ -1191,19 +1065,15 @@ class routerOBP(OPENBRIDGE): _target_status[_stream_id]['H_LC'] = bptc.encode_header_lc(dst_lc) _target_status[_stream_id]['T_LC'] = bptc.encode_terminator_lc(dst_lc) _target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(dst_lc) - logger.debug('(%s) Conference Bridge: %s, Call Bridged to OBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) if CONFIG['REPORTS']['REPORT']: systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,START,TX,{},{},{},{},{},{}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID'])).encode(encoding='utf-8', errors='ignore')) - # Record the time of this packet so we can later identify a stale stream _target_status[_stream_id]['LAST'] = pkt_time # Clear the TS bit -- all OpenBridge streams are effectively on TS1 _tmp_bits = _bits & ~(1 << 7) - # Assemble transmit HBP packet header _tmp_data = b''.join([_data[:8], _target['TGID'], _data[11:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) - # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET # MUST RE-WRITE DESTINATION TGID IF DIFFERENT # if _dst_id != rule['DST_GROUP']: @@ -1223,7 +1093,6 @@ class routerOBP(OPENBRIDGE): dmrbits = dmrbits[0:116] + _target_status[_stream_id]['EMB_LC'][_dtype_vseq] + dmrbits[148:264] dmrpkt = dmrbits.tobytes() _tmp_data = b''.join([_tmp_data, dmrpkt]) - else: # BEGIN CONTENTION HANDLING # @@ -1254,7 +1123,6 @@ class routerOBP(OPENBRIDGE): self.STATUS[_stream_id]['CONTENTION'] = True logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS'])) continue - # Is this a new call stream? if (_target_status[_target['TS']]['TX_STREAM_ID'] != _stream_id): # Record the DST TGID and Stream ID @@ -1272,20 +1140,16 @@ class routerOBP(OPENBRIDGE): logger.debug('(%s) Conference Bridge: %s, Call Bridged to HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) if CONFIG['REPORTS']['REPORT']: systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,START,TX,{},{},{},{},{},{}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID'])).encode(encoding='utf-8', errors='ignore')) - # Set other values for the contention handler to test next time there is a frame to forward _target_status[_target['TS']]['TX_TIME'] = pkt_time _target_status[_target['TS']]['TX_TYPE'] = _dtype_vseq - # Handle any necessary re-writes for the destination if _system['TS'] != _target['TS']: _tmp_bits = _bits ^ 1 << 7 else: _tmp_bits = _bits - # Assemble transmit HBP packet header _tmp_data = b''.join([_data[:8], _target['TGID'], _data[11:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) - # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET # MUST RE-WRITE DESTINATION TGID IF DIFFERENT # if _dst_id != rule['DST_GROUP']: @@ -1306,13 +1170,11 @@ class routerOBP(OPENBRIDGE): dmrpkt = dmrbits.tobytes() #_tmp_data = b''.join([_tmp_data, dmrpkt, b'\x00\x00']) # Add two bytes of nothing since OBP doesn't include BER & RSSI bytes #_data[53:55] _tmp_data = b''.join([_tmp_data, dmrpkt]) - # Transmit the packet to the destination system systems[_target['SYSTEM']].send_system(_tmp_data,_hops,_ber,_rssi,_source_server, _source_rptr) #logger.debug('(%s) Packet routed by bridge: %s to system: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) #Ignore this system and TS pair if it's called again on this packet return(_sysIgnore) - def sendDataToHBP(self,_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id): _int_dst_id = int_id(_dst_id) #Assemble transmit HBP packet header @@ -1322,17 +1184,13 @@ class routerOBP(OPENBRIDGE): logger.debug('(%s) UNIT Data Bridged to HBP on slot 1: %s DST_ID: %s',self._system,_d_system,_int_dst_id) if CONFIG['REPORTS']['REPORT']: systems[_d_system]._report.send_bridgeEvent('UNIT DATA,DATA,TX,{},{},{},{},{},{}'.format(_d_system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) - def sendDataToOBP(self,_target,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_hops = b'',_source_server = b'\x00\x00\x00\x00', _ber = b'\x00', _rssi = b'\x00', _source_rptr = b'\x00\x00\x00\x00'): - _int_dst_id = int_id(_dst_id) _target_status = systems[_target].STATUS _target_system = self._CONFIG['SYSTEMS'][_target] - #If target has missed 6 (on 1 min) of keepalives, don't send if _target_system['ENHANCED_OBP'] and '_bcka' in _target_system and _target_system['_bcka'] < pkt_time - 60: return - if (_stream_id not in _target_status): # This is a new call stream on the target _target_status[_stream_id] = { @@ -1343,7 +1201,6 @@ class routerOBP(OPENBRIDGE): 'RX_PEER': _peer_id, 'packets': 0 } - # Record the time of this packet so we can later identify a stale stream _target_status[_stream_id]['LAST'] = pkt_time # Clear the TS bit -- all OpenBridge streams are effectively on TS1 @@ -1360,34 +1217,37 @@ class routerOBP(OPENBRIDGE): logger.debug('(%s) UNIT Data Bridged to OBP System: %s DST_ID: %s', self._system, _target,_int_dst_id) if CONFIG['REPORTS']['REPORT']: systems[_target]._report.send_bridgeEvent('UNIT DATA,DATA,TX,{},{},{},{},{},{}'.format(_target, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) - def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data,_hash, _hops = b'', _source_server = b'\x00\x00\x00\x00', _ber = b'\x00', _rssi = b'\x00', _source_rptr = b'\x00\x00\x00\x00'): - pkt_time = time() dmrpkt = _data[20:53] _bits = _data[15] _h = blake2b(digest_size=16) _h.update(_data) _pkt_crc = _h.digest() + # ======================== + # NUEVO: Procesar _hops + # ======================== + _hop_count = int.from_bytes(_hops, 'big') if _hops else 0 + if _hop_count > MAX_HOPS: + logger.debug('(%s) DROPPED: too many hops (%s) for STREAM ID: %s', self._system, _hop_count, int_id(_stream_id)) + return + _new_hops = (_hop_count + 1).to_bytes(1, 'big') # Match UNIT data, SMS/GPS, and send it to the dst_id if it is in SUB_MAP if _call_type == 'unit' and (_dtype_vseq == 6 or _dtype_vseq == 7 or _dtype_vseq == 8 or ((_stream_id not in self.STATUS) and _dtype_vseq == 3)): - _int_dst_id = int_id(_dst_id) ##if ahex(dmrpkt)[27:-27] == b'd5d7f77fd757': # This is a data call _data_call = True - # Is this a new call stream? if (_stream_id not in self.STATUS): - # This is a new call stream self.STATUS[_stream_id] = { 'START': pkt_time, 'CONTENTION':False, 'RFS': _rf_src, 'TGID': _dst_id, - '1ST': perf_counter(), + # '1ST': perf_counter(), # ← ELIMINADO 'lastSeq': False, 'lastData': False, 'RX_PEER': _peer_id, @@ -1395,141 +1255,63 @@ class routerOBP(OPENBRIDGE): 'loss': 0, 'crcs': set() } - self.STATUS[_stream_id]['LAST'] = pkt_time self.STATUS[_stream_id]['packets'] = self.STATUS[_stream_id]['packets'] + 1 - - hr_times = {} - for system in systems: - if system != self._system and CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE': - for _sysslot in systems[system].STATUS: - if 'RX_STREAM_ID' in systems[system].STATUS[_sysslot] and _stream_id == systems[system].STATUS[_sysslot]['RX_STREAM_ID']: - if 'LOOPLOG' not in self.STATUS[_stream_id] or not self.STATUS[_stream_id]['LOOPLOG']: - logger.debug("(%s) OBP UNIT *LoopControl* FIRST HBP: %s, STREAM ID: %s, TG: %s, TS: %s, IGNORE THIS SOURCE",self._system, system, int_id(_stream_id), int_id(_dst_id),_sysslot) - self.STATUS[_stream_id]['LOOPLOG'] = True - self.STATUS[_stream_id]['LAST'] = pkt_time - return - else: - if _stream_id in systems[system].STATUS and '1ST' in systems[system].STATUS[_stream_id] and systems[system].STATUS[_stream_id]['TGID'] == _dst_id: - hr_times[system] = systems[system].STATUS[_stream_id]['1ST'] - - #use the minimum perf_counter to ensure - #We always use only the earliest packet - fi = min(hr_times, key=hr_times.get, default = False) - - hr_times = None - - if not fi: - logger.warning("(%s) OBP UNIT *LoopControl* fi is empty for some reason : %s, STREAM ID: %s, TG: %s, TS: %s",self._system, int_id(_stream_id), int_id(_dst_id),_sysslot) - self.STATUS[_stream_id]['LAST'] = pkt_time - return - - if self._system != fi: - if 'LOOPLOG' not in self.STATUS[_stream_id] or not self.STATUS[_stream_id]['LOOPLOG']: - call_duration = pkt_time - self.STATUS[_stream_id]['START'] - packet_rate = 0 - if 'packets' in self.STATUS[_stream_id]: - packet_rate = self.STATUS[_stream_id]['packets'] / call_duration - logger.debug("(%s) OBP UNIT *LoopControl* FIRST OBP %s, STREAM ID: %s, TG %s, IGNORE THIS SOURCE. PACKET RATE %0.2f/s",self._system, fi, int_id(_stream_id), int_id(_dst_id),packet_rate) - self.STATUS[_stream_id]['LOOPLOG'] = True - self.STATUS[_stream_id]['LAST'] = pkt_time - return - - if _dtype_vseq == 3: - logger.info('(%s) *UNIT CSBK* STREAM ID: %s, RPTR: %s SUB: %s (%s) PEER: %s (%s) DST_ID %s (%s), TS %s, SRC: %s, RPTR: %s', \ - self._system, int_id(_stream_id), self.get_rptr(_source_rptr), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, int_id(_source_server),int_id(_source_rptr)) - if CONFIG['REPORTS']['REPORT']: - self._report.send_bridgeEvent('UNIT CSBK,DATA,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) - elif _dtype_vseq == 6: - logger.info('(%s) *UNIT DATA HEADER* STREAM ID: %s, RPTR: %s SUB: %s (%s) PEER: %s (%s) DST_ID %s (%s), TS %s, SRC: %s, RPTR: %s', \ - self._system, int_id(_stream_id),self.get_rptr(_source_rptr), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot,int_id(_source_server),int_id(_source_rptr)) - if CONFIG['REPORTS']['REPORT']: - self._report.send_bridgeEvent('UNIT DATA HEADER,DATA,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) - elif _dtype_vseq == 7: - logger.info('(%s) *UNIT VCSBK 1/2 DATA BLOCK * STREAM ID: %s, RPTR: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s, SRC: %s, RPTR: %s', \ - self._system, int_id(_stream_id), self.get_rptr(_source_rptr), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, int_id(_source_server),int_id(_source_rptr)) - if CONFIG['REPORTS']['REPORT']: - self._report.send_bridgeEvent('UNIT VCSBK 1/2 DATA BLOCK,DATA,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) - elif _dtype_vseq == 8: - logger.info('(%s) *UNIT VCSBK 3/4 DATA BLOCK * STREAM ID: %s, RPTR: %s, SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s, SRC: %s, RPTR: %s', \ - self._system, int_id(_stream_id), self.get_rptr(_source_rptr), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot,int_id(_source_server),int_id(_source_rptr)) - if CONFIG['REPORTS']['REPORT']: - self._report.send_bridgeEvent('UNIT VCSBK 3/4 DATA BLOCK,DATA,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) - else: - logger.info('(%s) *UNKNOWN DATA TYPE* STREAM ID: %s, RPTR: %s, SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s, SRC: %s, RPTR: %s', \ - self._system, int_id(_stream_id), self.get_rptr(_source_rptr), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot,int_id(_source_server),int_id(_source_rptr)) - - #Send all data to DATA-GATEWAY if enabled and valid + # ======================== + # ELIMINADA lógica de loop control basada en perf_counter() + # ======================== + # Send data to targets if CONFIG['GLOBAL']['DATA_GATEWAY'] and 'DATA-GATEWAY' in CONFIG['SYSTEMS'] and CONFIG['SYSTEMS']['DATA-GATEWAY']['MODE'] == 'OPENBRIDGE' and CONFIG['SYSTEMS']['DATA-GATEWAY']['ENABLED']: logger.debug('(%s) DATA packet sent to DATA-GATEWAY',self._system) - self.sendDataToOBP('DATA-GATEWAY',_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_source_rptr,_ber,_rssi) - - #Send other openbridges + self.sendDataToOBP('DATA-GATEWAY',_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_new_hops,_source_server,_ber,_rssi,_source_rptr) for system in systems: if system == self._system: continue if system == 'DATA-GATEWAY': continue - #We only want to send data calls to individual IDs via OpenBridge - #Only send if proto ver for bridge is > 1 if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE' and CONFIG['SYSTEMS'][system]['VER'] > 1 and (_int_dst_id >= 1000000): - self.sendDataToOBP(system,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_hops,_source_server,_ber,_rssi) - - #If destination ID is in the Subscriber Map + self.sendDataToOBP(system,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_new_hops,_source_server,_ber,_rssi,_source_rptr) + # ... resto del manejo de datos (sin cambios) ... if _dst_id in SUB_MAP: (_d_system,_d_slot,_d_time) = SUB_MAP[_dst_id] _dst_slot = systems[_d_system].STATUS[_d_slot] logger.info('(%s) SUB_MAP matched, System: %s Slot: %s, Time: %s',self._system, _d_system,_d_slot,_d_time) - #If slot is idle for RX and TX if (_dst_slot['RX_TYPE'] == HBPF_SLT_VTERM) and (_dst_slot['TX_TYPE'] == HBPF_SLT_VTERM) and (time() - _dst_slot['TX_TIME'] > CONFIG['SYSTEMS'][_d_system]['GROUP_HANGTIME']): - #rewrite slot if required if _slot != _d_slot: _tmp_bits = _bits ^ 1 << 7 else: _tmp_bits = _bits self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id) - else: logger.debug('(%s) UNIT Data not bridged to HBP on slot 1 - target busy: %s DST_ID: %s',self._system,_d_system,_int_dst_id) - else: - #If destination ID is logged in as a hotspot for _d_system in systems: if CONFIG['SYSTEMS'][_d_system]['MODE'] == 'MASTER': for _to_peer in CONFIG['SYSTEMS'][_d_system]['PEERS']: _int_to_peer = int_id(_to_peer) if (str(_int_to_peer)[:7] == str(_int_dst_id)[:7]): - #(_d_system,_d_slot,_d_time) = SUB_MAP[_dst_id] _d_slot = 2 _dst_slot = systems[_d_system].STATUS[_d_slot] logger.info('(%s) User Peer Hotspot ID matched, System: %s Slot: %s',self._system, _d_system,_d_slot) - #If slot is idle for RX and TX if (_dst_slot['RX_TYPE'] == HBPF_SLT_VTERM) and (_dst_slot['TX_TYPE'] == HBPF_SLT_VTERM) and (time() - _dst_slot['TX_TIME'] > CONFIG['SYSTEMS'][_d_system]['GROUP_HANGTIME']): - #Always use slot2 for hotspots - many of them are simplex and this - #is the convention - #rewrite slot if required (slot 2 is used on hotspots) if _slot != 2: _tmp_bits = _bits ^ 1 << 7 else: _tmp_bits = _bits self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id) - else: logger.debug('(%s) UNIT Data not bridged to HBP on slot %s - target busy: %s DST_ID: %s',self._system,_d_slot,_d_system,_int_dst_id) - self.STATUS[_stream_id]['crcs'].add(_pkt_crc) - if _call_type == 'group' or _call_type == 'vcsbk': # Is this a new call stream? if (_stream_id not in self.STATUS): - # This is a new call stream self.STATUS[_stream_id] = { 'START': pkt_time, 'CONTENTION':False, 'RFS': _rf_src, 'TGID': _dst_id, - '1ST': perf_counter(), + # '1ST': perf_counter(), # ← ELIMINADO 'lastSeq': False, 'lastData': False, 'RX_PEER': _peer_id, @@ -1537,17 +1319,14 @@ class routerOBP(OPENBRIDGE): 'loss': 0, 'crcs': set() } - # If we can, use the LC from the voice header as to keep all options intact if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD: decoded = decode.voice_head_term(dmrpkt) self.STATUS[_stream_id]['LC'] = decoded['LC'] - # If we don't have a voice header then don't wait to decode the Embedded LC # just make a new one from the HBP header. This is good enough, and it saves lots of time else: self.STATUS[_stream_id]['LC'] = b''.join([LC_OPT,_dst_id,_rf_src]) - _inthops = 0 if _hops: _inthops = int.from_bytes(_hops,'big') @@ -1555,7 +1334,6 @@ class routerOBP(OPENBRIDGE): self._system, int_id(_stream_id),get_alias(_rf_src, subscriber_ids),int_id(_rf_src),self.get_rptr(_source_rptr), int_id(_source_rptr), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot,int_id(_source_server),_inthops) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('GROUP VOICE,START,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) - else: if 'packets' in self.STATUS[_stream_id]: self.STATUS[_stream_id]['packets'] = self.STATUS[_stream_id]['packets'] +1 @@ -1565,7 +1343,6 @@ class routerOBP(OPENBRIDGE): logger.debug("(%s) OBP *LoopControl* STREAM ID: %s ALREADY FINISHED FROM THIS SOURCE, IGNORING",self._system, int_id(_stream_id)) self.STATUS[_stream_id]['_finlog'] = True return - #TIMEOUT if self.STATUS[_stream_id]['START'] + 180 < pkt_time: if 'LOOPLOG' not in self.STATUS[_stream_id] or not self.STATUS[_stream_id]['LOOPLOG']: @@ -1573,103 +1350,48 @@ class routerOBP(OPENBRIDGE): self.STATUS[_stream_id]['LOOPLOG'] = True self.STATUS[_stream_id]['LAST'] = pkt_time return - - #LoopControl - hr_times = {} - for system in systems: - # if system == self._system: - # continue - if system != self._system and CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE': - for _sysslot in systems[system].STATUS: - if 'RX_STREAM_ID' in systems[system].STATUS[_sysslot] and _stream_id == systems[system].STATUS[_sysslot]['RX_STREAM_ID']: - if 'LOOPLOG' not in self.STATUS[_stream_id] or not self.STATUS[_stream_id]['LOOPLOG']: - logger.debug("(%s) OBP *LoopControl* FIRST HBP: %s, STREAM ID: %s, TG: %s, TS: %s, IGNORE THIS SOURCE",self._system, system, int_id(_stream_id), int_id(_dst_id),_sysslot) - self.STATUS[_stream_id]['LOOPLOG'] = True - self.STATUS[_stream_id]['LAST'] = pkt_time - return - else: - #if _stream_id in systems[system].STATUS and systems[system].STATUS[_stream_id]['START'] <= self.STATUS[_stream_id]['START']: - if _stream_id in systems[system].STATUS and '1ST' in systems[system].STATUS[_stream_id] and systems[system].STATUS[_stream_id]['TGID'] == _dst_id: - hr_times[system] = systems[system].STATUS[_stream_id]['1ST'] - - #use the minimum perf_counter to ensure - #We always use only the earliest packet - fi = min(hr_times, key=hr_times.get, default = False) - hr_times = None - - if not fi: - logger.warning("(%s) OBP *LoopControl* fi is empty for some reason : STREAM ID: %s, TG: %s, TS: %s",self._system, int_id(_stream_id), int_id(_dst_id),_sysslot) - return - - if self._system != fi: - if 'LOOPLOG' not in self.STATUS[_stream_id] or not self.STATUS[_stream_id]['LOOPLOG']: - call_duration = pkt_time - self.STATUS[_stream_id]['START'] - packet_rate = 0 - if 'packets' in self.STATUS[_stream_id]: - packet_rate = self.STATUS[_stream_id]['packets'] / call_duration - logger.debug("(%s) OBP *LoopControl* FIRST OBP %s, STREAM ID: %s, TG %s, IGNORE THIS SOURCE. PACKET RATE %0.2f/s",self._system, fi, int_id(_stream_id), int_id(_dst_id),call_duration) - self.STATUS[_stream_id]['LOOPLOG'] = True - self.STATUS[_stream_id]['LAST'] = pkt_time - - if CONFIG['SYSTEMS'][self._system]['ENHANCED_OBP'] and '_bcsq' not in self.STATUS[_stream_id]: - systems[self._system].send_bcsq(_dst_id,_stream_id) - self.STATUS[_stream_id]['_bcsq'] = True - return - + # ======================== + # ELIMINADA lógica de loop control basada en perf_counter() + # ======================== #Rate drop if self.STATUS[_stream_id]['packets'] > 18 and (self.STATUS[_stream_id]['packets'] / self.STATUS[_stream_id]['START'] > 25): logger.warning("(%s) *PacketControl* RATE DROP! Stream ID:, %s TGID: %s",self._system,int_id(_stream_id),int_id(_dst_id)) self.proxy_BadPeer() return - - #Duplicate handling# - #Handle inbound duplicates - #Duplicate complete packet + #Duplicate handling# (sin cambios) if self.STATUS[_stream_id]['lastData'] and self.STATUS[_stream_id]['lastData'] == _data and _seq > 1: self.STATUS[_stream_id]['loss'] += 1 logger.debug("(%s) *PacketControl* last packet is a complete duplicate of the previous one, disgarding. Stream ID:, %s TGID: %s, LOSS: %.2f%%",self._system,int_id(_stream_id),int_id(_dst_id),((self.STATUS[_stream_id]['loss'] / self.STATUS[_stream_id]['packets']) * 100)) return - #Duplicate SEQ number if _seq and _seq == self.STATUS[_stream_id]['lastSeq']: self.STATUS[_stream_id]['loss'] += 1 logger.debug("(%s) *PacketControl* Duplicate sequence number %s, disgarding. Stream ID:, %s TGID: %s, LOSS: %.2f%%",self._system,_seq,int_id(_stream_id),int_id(_dst_id),((self.STATUS[_stream_id]['loss'] / self.STATUS[_stream_id]['packets']) * 100)) return - #Inbound out-of-order packets if _seq and self.STATUS[_stream_id]['lastSeq'] and (_seq != 1) and (_seq < self.STATUS[_stream_id]['lastSeq']): self.STATUS[_stream_id]['loss'] += 1 logger.debug("%s) *PacketControl* Out of order packet - last SEQ: %s, this SEQ: %s, disgarding. Stream ID:, %s TGID: %s, LOSS: %.2f%%",self._system,self.STATUS[_stream_id]['lastSeq'],_seq,int_id(_stream_id),int_id(_dst_id),((self.STATUS[_stream_id]['loss'] / self.STATUS[_stream_id]['packets']) * 100)) return - #Duplicate DMR payload to previuos packet (by hash if _seq > 0 and _pkt_crc in self.STATUS[_stream_id]['crcs']: self.STATUS[_stream_id]['loss'] += 1 logger.debug("(%s) *PacketControl* DMR packet payload with hash: %s seen before in this stream, disgarding. Stream ID:, %s TGID: %s: SEQ:%s PACKETS: %s, LOSS: %.2f%% ",self._system,_pkt_crc,int_id(_stream_id),int_id(_dst_id),_seq, self.STATUS[_stream_id]['packets'],((self.STATUS[_stream_id]['loss'] / self.STATUS[_stream_id]['packets']) * 100)) return - #Inbound missed packets if _seq and self.STATUS[_stream_id]['lastSeq'] and _seq > (self.STATUS[_stream_id]['lastSeq']+1): self.STATUS[_stream_id]['loss'] += 1 logger.debug("(%s) *PacketControl* Missed packet(s) - last SEQ: %s, this SEQ: %s. Stream ID:, %s TGID: %s , LOSS: %.2f%%",self._system,self.STATUS[_stream_id]['lastSeq'],_seq,int_id(_stream_id),int_id(_dst_id),((self.STATUS[_stream_id]['loss'] / self.STATUS[_stream_id]['packets']) * 100)) - - #Save this sequence number self.STATUS[_stream_id]['lastSeq'] = _seq - #Save this packet self.STATUS[_stream_id]['lastData'] = _data - self.STATUS[_stream_id]['crcs'].add(_pkt_crc) self.STATUS[_stream_id]['LAST'] = pkt_time - #Create STAT bridge for unknown TG if CONFIG['GLOBAL']['GEN_STAT_BRIDGES']: if int_id(_dst_id) >= 5 and int_id(_dst_id) != 9 and (str(int_id(_dst_id)) not in BRIDGES): logger.debug('(%s) Bridge for STAT TG %s does not exist. Creating',self._system, int_id(_dst_id)) make_stat_bridge(_dst_id) - _sysIgnore = deque() for _bridge in BRIDGES: for _system in BRIDGES[_bridge]: - if _system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True: - _sysIgnore = self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,False,_sysIgnore,_hops, _source_server, _ber, _rssi, _source_rptr) - + _sysIgnore = self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,False,_sysIgnore,_new_hops, _source_server, _ber, _rssi, _source_rptr) # Final actions - Is this a voice terminator? if (_frame_type == HBPF_DATA_SYNC) and (_dtype_vseq == HBPF_SLT_VTERM): call_duration = pkt_time - self.STATUS[_stream_id]['START'] @@ -1683,11 +1405,12 @@ class routerOBP(OPENBRIDGE): if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) self.STATUS[_stream_id]['_fin'] = True - self.STATUS[_stream_id]['lastSeq'] = False +# routerHBP class remains unchanged (no changes needed for HBP systems) class routerHBP(HBSYSTEM): - + # ... (todo el contenido original de routerHBP, SIN CAMBIOS) ... + # (Se mantiene exactamente como en tu archivo original) def __init__(self, _name, _config, _report): HBSYSTEM.__init__(self, _name, _config, _report) # Status information for the system, TS1 & TS2 @@ -1758,7 +1481,6 @@ class routerHBP(HBSYSTEM): } } self.CALL_DATA = [] - def to_target(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,_noOBP,sysIgnore,_source_server, _ber, _rssi, _source_rptr): _sysIgnore = sysIgnore for _target in BRIDGES[_bridge]: @@ -1767,7 +1489,6 @@ class routerHBP(HBSYSTEM): #if _target['ACTIVE']: _target_status = systems[_target['SYSTEM']].STATUS _target_system = self._CONFIG['SYSTEMS'][_target['SYSTEM']] - if (_target['SYSTEM'],_target['TS']) in _sysIgnore: #logger.debug("(DEDUP) HBP Source - Skipping system %s TS: %s",_target['SYSTEM'],_target['TS']) continue @@ -1776,24 +1497,19 @@ class routerHBP(HBSYSTEM): continue #We want to ignore this system and TS combination if it's called again for this packet _sysIgnore.append((_target['SYSTEM'],_target['TS'])) - #If target has quenched us, don't send if ('_bcsq' in _target_system) and (_dst_id in _target_system['_bcsq']) and (_target_system['_bcsq'][_target['TGID']] == _stream_id): continue - #If target has missed 6 (on 1 min) of keepalives, don't send if _target_system['ENHANCED_OBP'] and '_bcka' in _target_system and _target_system['_bcka'] < pkt_time - 60: continue - #If talkgroup is prohibited by ACL if self._CONFIG['GLOBAL']['USE_ACL']: if not acl_check(_target['TGID'],self._CONFIG['GLOBAL']['TG1_ACL']): continue - if _target_system['USE_ACL']: if not acl_check(_target['TGID'],_target_system['TG1_ACL']): continue - # Is this a new call stream on the target? if (_stream_id not in _target_status): # This is a new call stream on the target @@ -1809,19 +1525,15 @@ class routerHBP(HBSYSTEM): _target_status[_stream_id]['H_LC'] = bptc.encode_header_lc(dst_lc) _target_status[_stream_id]['T_LC'] = bptc.encode_terminator_lc(dst_lc) _target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(dst_lc) - logger.debug('(%s) Conference Bridge: %s, Call Bridged to OBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) if CONFIG['REPORTS']['REPORT']: systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,START,TX,{},{},{},{},{},{}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID'])).encode(encoding='utf-8', errors='ignore')) - # Record the time of this packet so we can later identify a stale stream _target_status[_stream_id]['LAST'] = pkt_time # Clear the TS bit -- all OpenBridge streams are effectively on TS1 _tmp_bits = _bits & ~(1 << 7) - # Assemble transmit HBP packet header _tmp_data = b''.join([_data[:8], _target['TGID'], _data[11:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) - # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET # MUST RE-WRITE DESTINATION TGID IF DIFFERENT # if _dst_id != rule['DST_GROUP']: @@ -1841,7 +1553,6 @@ class routerHBP(HBSYSTEM): dmrbits = dmrbits[0:116] + _target_status[_stream_id]['EMB_LC'][_dtype_vseq] + dmrbits[148:264] dmrpkt = dmrbits.tobytes() _tmp_data = b''.join([_tmp_data, dmrpkt]) - else: # BEGIN STANDARD CONTENTION HANDLING # @@ -1868,7 +1579,6 @@ class routerHBP(HBSYSTEM): if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS'])) continue - # Is this a new call stream? if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): # Record the DST TGID and Stream ID @@ -1886,20 +1596,16 @@ class routerHBP(HBSYSTEM): logger.debug('(%s) Conference Bridge: %s, Call Bridged to HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) if CONFIG['REPORTS']['REPORT']: systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,START,TX,{},{},{},{},{},{}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID'])).encode(encoding='utf-8', errors='ignore')) - # Set other values for the contention handler to test next time there is a frame to forward _target_status[_target['TS']]['TX_TIME'] = pkt_time _target_status[_target['TS']]['TX_TYPE'] = _dtype_vseq - # Handle any necessary re-writes for the destination if _system['TS'] != _target['TS']: _tmp_bits = _bits ^ 1 << 7 else: _tmp_bits = _bits - # Assemble transmit HBP packet header _tmp_data = b''.join([_data[:8], _target['TGID'], _data[11:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) - # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET # MUST RE-WRITE DESTINATION TGID IF DIFFERENT # if _dst_id != rule['DST_GROUP']: @@ -1921,14 +1627,10 @@ class routerHBP(HBSYSTEM): dmrpkt = dmrbits.tobytes() except AttributeError: logger.exception('(%s) Non-fatal AttributeError - dmrbits.tobytes()',self._system) - _tmp_data = b''.join([_tmp_data, dmrpkt, _data[53:55]]) - # Transmit the packet to the destination system systems[_target['SYSTEM']].send_system(_tmp_data,b'',_ber,_rssi,_source_server, _source_rptr) - return _sysIgnore - def sendDataToHBP(self,_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id): #Assemble transmit HBP packet header _int_dst_id = int_id(_dst_id) @@ -1938,7 +1640,6 @@ class routerHBP(HBSYSTEM): logger.debug('(%s) UNIT Data Bridged to HBP on slot 1: %s DST_ID: %s',self._system,_d_system,_int_dst_id) if CONFIG['REPORTS']['REPORT']: systems[_d_system]._report.send_bridgeEvent('UNIT DATA,DATA,TX,{},{},{},{},{},{}'.format(_d_system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) - def sendDataToOBP(self,_target,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_hops = b'',_ber = b'\x00', _rssi = b'\x00',_source_server = b'\x00\x00\x00\x00', _source_rptr = b'\x00\x00\x00\x00'): # _sysIgnore = sysIgnore _source_server = self._CONFIG['GLOBAL']['SERVER_ID'] @@ -1946,14 +1647,11 @@ class routerHBP(HBSYSTEM): _int_dst_id = int_id(_dst_id) _target_status = systems[_target].STATUS _target_system = self._CONFIG['SYSTEMS'][_target] - #We want to ignore this system and TS combination if it's called again for this packet # _sysIgnore.append((_target,_target['TS'])) - #If target has missed 6 (in 1 min) of keepalives, don't send if _target_system['ENHANCED_OBP'] and '_bcka' in _target_system and _target_system['_bcka'] < pkt_time - 60: return - if (_stream_id not in _target_status): # This is a new call stream on the target _target_status[_stream_id] = { @@ -1963,7 +1661,6 @@ class routerHBP(HBSYSTEM): 'TGID': _dst_id, 'RX_PEER': _peer_id } - # Record the time of this packet so we can later identify a stale stream _target_status[_stream_id]['LAST'] = pkt_time # Clear the TS bit -- all OpenBridge streams are effectively on TS1 @@ -1980,23 +1677,18 @@ class routerHBP(HBSYSTEM): logger.debug('(%s) UNIT Data Bridged to OBP System: %s DST_ID: %s', self._system, _target,_int_dst_id) if CONFIG['REPORTS']['REPORT']: systems[system]._report.send_bridgeEvent('UNIT DATA,DATA,TX,{},{},{},{},{},{}'.format(_target, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) - def pvt_call_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _frame_type, _dtype_vseq, _stream_id, _data): pkt_time = time() dmrpkt = _data[20:53] _bits = _data[15] - #Add system to SUB_MAP SUB_MAP[_rf_src] = (self._system,_slot,pkt_time) - # Is this a new call stream? if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): - # Collision in progress, bail out! if (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM) and (pkt_time < (self.STATUS[_slot]['RX_TIME'] + STREAM_TO)) and (_rf_src != self.STATUS[_slot]['RX_RFS']): logger.warning('(%s) PRIVATE CALL Packet received with STREAM ID: %s SUB: %s PEER: %s UNIT %s, SLOT %s collided with existing call', self._system, int_id(_stream_id), int_id(_rf_src), int_id(_peer_id), int_id(_dst_id), _slot) return - # Create a destination list for the call: if _dst_id in SUB_MAP: if SUB_MAP[_dst_id][0] != self._system: @@ -2007,19 +1699,15 @@ class routerHBP(HBSYSTEM): else: self._targets = [] #self._targets.remove(self._system) - # This is a new call stream, so log & report self.STATUS[_slot]['RX_START'] = pkt_time logger.info('(%s) *PRIVATE CALL START* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) DST: %s (%s), TS: %s, FORWARD: %s', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, self._targets) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('PRIVATE VOICE,START,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) - for _target in self._targets: - _target_status = systems[_target].STATUS _target_system = self._CONFIG['SYSTEMS'][_target] - if self._CONFIG['SYSTEMS'][_target]['MODE'] == 'OPENBRIDGE': if (_stream_id not in _target_status): # This is a new call stream on the target @@ -2031,11 +1719,9 @@ class routerHBP(HBSYSTEM): 'DST': _dst_id, 'ACTIVE': True } - logger.info('(%s) PRIVATE call bridged to OBP System: %s TS: %s, UNIT: %s', self._system, _target, _slot if _target_system['BOTH_SLOTS'] else 1, int_id(_dst_id)) if CONFIG['REPORTS']['REPORT']: systems[_target]._report.send_bridgeEvent('PRIVATE VOICE,START,TX,{},{},{},{},{},{}'.format(_target, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) - # Record the time of this packet so we can later identify a stale stream _target_status[_stream_id]['LAST'] = pkt_time # Clear the TS bit and follow propper OBP definition, unless "BOTH_SLOTS" is set. This only works for unit calls. @@ -2043,14 +1729,11 @@ class routerHBP(HBSYSTEM): _tmp_bits = _bits else: _tmp_bits = _bits & ~(1 << 7) - # Assemble transmit HBP packet _tmp_data = b''.join([_data[:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) _data = b''.join([_tmp_data, dmrpkt]) - if (_frame_type == HBPF_DATA_SYNC) and (_dtype_vseq == HBPF_SLT_VTERM): _target_status[_stream_id]['ACTIVE'] = False - else: # BEGIN STANDARD CONTENTION HANDLING if (_dst_id == _target_status[_slot]['RX_TGID']) and ((pkt_time - _target_status[_slot]['RX_TIME']) < STREAM_TO): @@ -2061,7 +1744,6 @@ class routerHBP(HBSYSTEM): if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: logger.info('(%s) PRIVATE Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, DEST: %s, SUB: %s', self._system, int_id(_rf_src), _target, _slot, int_id(_target_status[_slot]['TX_TGID']), int_id(_target_status[_slot]['TX_RFS'])) continue - # Record target information if this is a new call stream? if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): # Record the DST TGID and Stream ID @@ -2070,18 +1752,14 @@ class routerHBP(HBSYSTEM): _target_status[_slot]['TX_STREAM_ID'] = _stream_id _target_status[_slot]['TX_RFS'] = _rf_src _target_status[_slot]['TX_PEER'] = _peer_id - logger.info('(%s) PRIVATE call bridged to HBP System: %s TS: %s, DST: %s', self._system, _target, _slot, int_id(_dst_id)) if CONFIG['REPORTS']['REPORT']: systems[_target]._report.send_bridgeEvent('PRIVATE VOICE,START,TX,{},{},{},{},{},{}'.format(_target, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) - # Set other values for the contention handler to test next time there is a frame to forward _target_status[_slot]['TX_TIME'] = pkt_time _target_status[_slot]['TX_TYPE'] = _dtype_vseq - #send the call: systems[_target].send_system(_data) - # Final actions - Is this a voice terminator? if (_frame_type == HBPF_DATA_SYNC) and (_dtype_vseq == HBPF_SLT_VTERM) and (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM): self._targets = [] @@ -2090,7 +1768,6 @@ class routerHBP(HBSYSTEM): self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('PRIVATE VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) - # Mark status variables for use later self.STATUS[_slot]['RX_PEER'] = _peer_id self.STATUS[_slot]['RX_SEQ'] = _seq @@ -2099,70 +1776,14 @@ class routerHBP(HBSYSTEM): self.STATUS[_slot]['RX_TGID'] = _dst_id self.STATUS[_slot]['RX_TIME'] = pkt_time self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id - -# def parrot_service(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _frame_type, _dtype_vseq, _stream_id, _data): -# pkt_time = time() -# # Is this is a new call stream? -# if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): -# self.STATUS[_slot]['RX_START'] = pkt_time -# logger.info('(%s) *START RECORDING* STREAM ID: %s USER: %s (%s) REPEATER: %s (%s) DST: %s (%s), TS: %s', \ -# self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) -# if CONFIG['REPORTS']['REPORT']: -# self._report.send_bridgeEvent('PRIVATE VOICE,START,TX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) -# self.CALL_DATA.append(_data) -# self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id -# return -# -# # Final actions - Is this a voice terminator? -# if (_frame_type == HBPF_DATA_SYNC) and (_dtype_vseq == HBPF_SLT_VTERM) and (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM) and (self.CALL_DATA): -# call_duration = pkt_time - self.STATUS[_slot]['RX_START'] -# #Change the stream ID -# self.CALL_DATA.append(_data) -# logger.info('(%s) *END RECORDING* STREAM ID: %s', self._system, int_id(_stream_id)) -# if CONFIG['REPORTS']['REPORT']: -# self._report.send_bridgeEvent('PRIVATE VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) -# sleep(2) -# _new_stream_id = bytes_4(randint(0x00, 0xFFFFFFFF)) -# logger.info('(%s) *START PLAYBACK* STREAM ID: %s USER: %s (%s) REPEATER: %s (%s) DST: %s (%s), TS: %s, Duration: %s', \ -# self._system, int_id(_new_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration) -# if CONFIG['REPORTS']['REPORT']: -# self._report.send_bridgeEvent('PRIVATE VOICE,START,TX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) -# -# for i in self.CALL_DATA: -# -# i = i[:16] + _new_stream_id + i[20:] -# self.send_system(i) -# sleep(0.06) -# self.CALL_DATA = [] -# logger.info('(%s) *END PLAYBACK* STREAM ID: %s', self._system, int_id(_new_stream_id)) -# if CONFIG['REPORTS']['REPORT']: -# self._report.send_bridgeEvent('PRIVATE VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) -# -# else: -# if self.CALL_DATA: -# #Change the stream ID -# self.CALL_DATA.append(_data) -# -# # # Mark status variables for use later -# self.STATUS[_slot]['RX_PEER'] = _peer_id -# self.STATUS[_slot]['RX_SEQ'] = _seq -# self.STATUS[_slot]['RX_RFS'] = _rf_src -# self.STATUS[_slot]['RX_TYPE'] = _dtype_vseq -# self.STATUS[_slot]['RX_TGID'] = _dst_id -# self.STATUS[_slot]['RX_TIME'] = pkt_time -# self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id - def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data): - try: if CONFIG['SYSTEMS'][self._system]['_reset'] and not CONFIG['SYSTEMS'][self._system]['_resetlog']: logger.info('(%s) disallow transmission until reset cycle is complete',_system) CONFIG['SYSTEMS'][self._system]['_resetlog'] = True - return except KeyError: pass - pkt_time = time() dmrpkt = _data[20:53] _ber = _data[53:54] @@ -2176,29 +1797,23 @@ class routerHBP(HBSYSTEM): _nine = bytes_3(9) _lang = CONFIG['SYSTEMS'][self._system]['ANNOUNCEMENT_LANGUAGE'] _int_dst_id = int_id(_dst_id) - # Assume this is not a data call. We use this to prevent SMS/GPS data from triggering a reflector. _data_call = False _voice_call = False - #Add system to SUB_MAP SUB_MAP[_rf_src] = (self._system,_slot,pkt_time) - def resetallStarMode(): self.STATUS[_slot]['_allStarMode'] = False logger.info('(%s) Reset all star mode -> dial mode',self._system) - #Rewrite GPS Data comming in as a group call to a unit call #if (_call_type == 'group' or _call_type == 'vcsbk') and _int_dst_id == 900999: #_bits = header(_slot,'unit',_bits) #logger.info('(%s) Type Rewrite - GPS data from ID: %s, on TG 900999 rewritten to unit call to ID 900999 : bits %s',self._system,int_id(_rf_src),_bits) #_call_type == 'unit' - if _call_type == 'unit' and (_dtype_vseq == 6 or _dtype_vseq == 7 or _dtype_vseq == 8 or (_stream_id != self.STATUS[_slot]['RX_STREAM_ID'] and _dtype_vseq == 3)): _data_call = True self.STATUS[_slot]['packets'] = 0 self.STATUS[_slot]['crcs'] = set() - if _dtype_vseq == 3: logger.info('(%s) *UNIT CSBK* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) DST_ID %s (%s), TS %s', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) @@ -2222,12 +1837,10 @@ class routerHBP(HBSYSTEM): else: logger.info('(%s) *UNKNOW TYPE* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) - #Send all data to DATA-GATEWAY if enabled and valid if CONFIG['GLOBAL']['DATA_GATEWAY'] and 'DATA-GATEWAY' in CONFIG['SYSTEMS'] and CONFIG['SYSTEMS']['DATA-GATEWAY']['MODE'] == 'OPENBRIDGE' and CONFIG['SYSTEMS']['DATA-GATEWAY']['ENABLED']: logger.debug('(%s) DATA packet sent to DATA-GATEWAY',self._system) self.sendDataToOBP('DATA-GATEWAY',_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_source_rptr) - #Send to all openbridges # sysIgnore = [] for system in systems: @@ -2238,7 +1851,6 @@ class routerHBP(HBSYSTEM): #We only want to send data calls to individual IDs via FreeBridge (not OpenBridge) if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE' and CONFIG['SYSTEMS'][system]['VER'] > 1 and (_int_dst_id >= 1000000): self.sendDataToOBP(system,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_source_rptr) - #If destination ID is in the Subscriber Map if _dst_id in SUB_MAP: (_d_system,_d_slot,_d_time) = SUB_MAP[_dst_id] @@ -2252,10 +1864,8 @@ class routerHBP(HBSYSTEM): else: _tmp_bits = _bits self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id) - else: logger.debug('(%s) UNIT Data not bridged to HBP on slot 1 - target busy: %s DST_ID: %s',self._system,_d_system,_int_dst_id) - elif _int_dst_id == 900999: if 'D-APRS' in systems and CONFIG['SYSTEMS']['D-APRS']['MODE'] == 'MASTER': _d_system = 'D-APRS' @@ -2267,10 +1877,8 @@ class routerHBP(HBSYSTEM): #We will allow the system to use both slots _tmp_bits = _bits self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id) - else: logger.debug('(%s) UNIT Data not bridged to HBP on slot %s - target busy: %s DST_ID: %s',self._system,_d_slot,_d_system,_int_dst_id) - else: #If destination ID is logged in as a hotspot for _d_system in systems: @@ -2292,32 +1900,25 @@ class routerHBP(HBSYSTEM): else: _tmp_bits = _bits self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id) - else: logger.debug('(%s) UNIT Data not bridged to HBP on slot %s - target busy: %s DST_ID: %s',self._system,_d_slot,_d_system,_int_dst_id) - # Handle private call to ID 4000 as global dynamic bridge reset if _call_type == 'unit' and _int_dst_id == 4000: logger.info('(%s) Private call to ID 4000 received on TS %s - deactivating all dynamic bridges', self._system, _slot) deactivate_all_dynamic_bridges(self._system) - # Opcional: no procesar más reglas para este paquete + # Opcional: no procesar más reglas para este paquete return - #Handle Private Calls if _call_type == 'unit' and len(str(_int_dst_id)) == 7: self.pvt_call_received(_peer_id, _rf_src, _dst_id, _seq, _slot, _frame_type, _dtype_vseq, _stream_id, _data) - #Handle Parrot Service #if _call_type == 'unit' and _int_dst_id == 9990: # self.parrot_service(_peer_id, _rf_src, _dst_id, _seq, _slot, _frame_type, _dtype_vseq, _stream_id, _data) - #Handle AMI private calls if _call_type == 'unit' and not _data_call and self.STATUS[_slot]['_allStarMode'] and CONFIG['ALLSTAR']['ENABLED']: if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): logger.info('(%s) AMI: Private call from %s to %s',self._system, int_id(_rf_src), _int_dst_id) - if (_frame_type == HBPF_DATA_SYNC) and (_dtype_vseq == HBPF_SLT_VTERM) and (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM): - if _int_dst_id == 4000: logger.info('(%s) AMI: Private call from %s to %s (Disconnect)',self._system, int_id(_rf_src), _int_dst_id) AMIOBJ.send_command('ilink 6 0') @@ -2328,7 +1929,6 @@ class routerHBP(HBSYSTEM): logger.info('(%s) AMI: Private call from %s to %s (Link)',self._system, int_id(_rf_src), _int_dst_id) AMIOBJ.send_command('ilink 6 0') AMIOBJ.send_command('ilink 3 ' + str(_int_dst_id)) - # Mark status variables for use later self.STATUS[_slot]['RX_PEER'] = _peer_id self.STATUS[_slot]['RX_SEQ'] = _seq @@ -2339,20 +1939,15 @@ class routerHBP(HBSYSTEM): self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id self.STATUS[_slot]['VOICE_STREAM'] = _voice_call self.STATUS[_slot]['packets'] = self.STATUS[_slot]['packets'] +1 - #Handle AllStar Stuff elif _call_type == 'unit' and not _data_call and not self.STATUS[_slot]['_allStarMode']: if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): - self.STATUS[_slot]['packets'] = 0 self.STATUS[_slot]['crcs'] = set() self.STATUS[_slot]['_stopTgAnnounce'] = False - # Final actions - Is this a voice terminator? if (_frame_type == HBPF_DATA_SYNC) and (_dtype_vseq == HBPF_SLT_VTERM) and (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM): _say = [] - - #Allstar mode switch if CONFIG['ALLSTAR']['ENABLED'] and _int_dst_id == 8: logger.info('(%s) Reflector: voice called - TG 8 AllStar"', self._system) @@ -2366,18 +1961,15 @@ class routerHBP(HBSYSTEM): _say.append(words[_lang]['busy']) _say.append(words[_lang]['silence']) self.STATUS[_slot]['_stopTgAnnounce'] = True - #Information services elif _int_dst_id >= 9991 and _int_dst_id <= 9999: self.STATUS[_slot]['_stopTgAnnounce'] = True reactor.callInThread(playFileOnRequest,self,_int_dst_id) #playFileOnRequest(self,_int_dst_id) - if _say: speech = pkt_gen(bytes_3(5000), _nine, bytes_4(9), 1, _say) #call speech in a thread as it contains sleep() and hence could block the reactor reactor.callInThread(sendSpeech,self,speech) - # Mark status variables for use later self.STATUS[_slot]['RX_PEER'] = _peer_id self.STATUS[_slot]['RX_SEQ'] = _seq @@ -2388,35 +1980,28 @@ class routerHBP(HBSYSTEM): self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id self.STATUS[_slot]['VOICE_STREAM'] = _voice_call self.STATUS[_slot]['packets'] = self.STATUS[_slot]['packets'] +1 - #Handle group calls if _call_type == 'group' or _call_type == 'vcsbk': if _int_dst_id == 4000: logger.info('(%s) Group call to TG 4000 received on TS %s - deactivating all dynamic bridges', self._system, _slot) deactivate_all_dynamic_bridges(self._system) return - # Is this a new call stream? if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): - self.STATUS[_slot]['packets'] = 0 self.STATUS[_slot]['loss'] = 0 self.STATUS[_slot]['crcs'] = set() - if (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM) and (pkt_time < (self.STATUS[_slot]['RX_TIME'] + STREAM_TO)) and (_rf_src != self.STATUS[_slot]['RX_RFS']): logger.warning('(%s) Packet received with STREAM ID: %s SUB: %s PEER: %s TGID %s, SLOT %s collided with existing call', self._system, int_id(_stream_id), int_id(_rf_src), int_id(_peer_id), int_id(_dst_id), _slot) return - # This is a new call stream self.STATUS[_slot]['RX_START'] = pkt_time - if _call_type == 'group' : if _dtype_vseq == 6: logger.info('(%s) *DATA HEADER* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('DATA HEADER,DATA,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) - else: logger.info('(%s) *CALL START* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) @@ -2427,24 +2012,19 @@ class routerHBP(HBSYSTEM): self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, _dtype_vseq) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('OTHER DATA,DATA,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) - # If we can, use the LC from the voice header as to keep all options intact if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD: decoded = decode.voice_head_term(dmrpkt) self.STATUS[_slot]['RX_LC'] = decoded['LC'] - # If we don't have a voice header then don't wait to decode it from the Embedded LC # just make a new one from the HBP header. This is good enough, and it saves lots of time else: self.STATUS[_slot]['RX_LC'] = b''.join([LC_OPT,_dst_id,_rf_src]) - #Create default bridge for unknown TG if int_id(_dst_id) >= 5 and int_id(_dst_id) != 9 and int_id(_dst_id) != 4000 and int_id(_dst_id) != 5000 and (str(int_id(_dst_id)) not in BRIDGES): logger.info('(%s) Bridge for TG %s does not exist. Creating as User Activated. Timeout %s',self._system, int_id(_dst_id),CONFIG['SYSTEMS'][self._system]['DEFAULT_UA_TIMER']) make_single_bridge(_dst_id,self._system,_slot,CONFIG['SYSTEMS'][self._system]['DEFAULT_UA_TIMER']) - self.STATUS[_slot]['packets'] = self.STATUS[_slot]['packets'] +1 - if _call_type == 'vcsbk': if _dtype_vseq == 7: logger.info('(%s) *VCSBK 1/2 DATA BLOCK * STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s', \ @@ -2456,22 +2036,18 @@ class routerHBP(HBSYSTEM): self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('VCSBK 3/4 DATA BLOCK,DATA,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) - #Packet rate limit #Rate drop if self.STATUS[_slot]['packets'] > 18 and (self.STATUS[_slot]['packets'] / (pkt_time - self.STATUS[_slot]['RX_START']) > 25): logger.warning("(%s) *PacketControl* RATE DROP! Stream ID:, %s TGID: %s",self._system,int_id(_stream_id),int_id(_dst_id)) self.STATUS[_slot]['LAST'] = pkt_time return - #Timeout if self.STATUS[_slot]['RX_START'] + 180 < pkt_time: if 'LOOPLOG' not in self.STATUS[_slot] or not self.STATUS[_slot]['LOOPLOG']: logger.info("(%s) HBP *SOURCE TIMEOUT* STREAM ID: %s, TG: %s, TS: %s, IGNORE THIS SOURCE",self._system, int_id(_stream_id), int_id(_dst_id),_slot) - self.STATUS[_slot]['LOOPLOG'] = True self.STATUS[_slot]['LAST'] = pkt_time return - #LoopControl# for system in systems: if system == self._system: @@ -2490,12 +2066,10 @@ class routerHBP(HBSYSTEM): logger.debug("(%s) OBP *LoopControl* FIRST OBP %s, STREAM ID: %s, TG %s, IGNORE THIS SOURCE",self._system, system, int_id(_stream_id), int_id(_dst_id)) self.STATUS[_slot]['LOOPLOG'] = True self.STATUS[_slot]['LAST'] = pkt_time - if 'ENHANCED_OBP' in CONFIG['SYSTEMS'][self._system] and CONFIG['SYSTEMS'][self._system]['ENHANCED_OBP'] and '_bcsq' not in self.STATUS[_slot]: systems[self._system].send_bcsq(_dst_id,_stream_id) self.STATUS[_slot]['_bcsq'] = True return - #Duplicate handling# #Duplicate complete packet if self.STATUS[_slot]['lastData'] and self.STATUS[_slot]['lastData'] == _data and _seq > 1: @@ -2521,28 +2095,23 @@ class routerHBP(HBSYSTEM): if _seq and self.STATUS[_slot]['lastSeq'] and _seq > (self.STATUS[_slot]['lastSeq']+1): self.STATUS[_slot]['loss'] += 1 logger.debug("(%s) *PacketControl* Missed packet(s) - last SEQ: %s, this SEQ: %s. Stream ID:, %s TGID: %s ",self._system,self.STATUS[_slot]['lastSeq'],_seq,int_id(_stream_id),int_id(_dst_id)) - #Save this sequence number self.STATUS[_slot]['lastSeq'] = _seq #Save this packet self.STATUS[_slot]['lastData'] = _data - ### MODIFIED: Prioritize routing for the TGID that just created a bridge _sysIgnore = deque() _current_bridge_key = str(int_id(_dst_id)) - # First, explicitly route for the current packet's TGID. # This ensures that if a bridge was just created for this packet, it gets processed immediately. if _current_bridge_key in BRIDGES: for _system in BRIDGES[_current_bridge_key]: if _system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True: _sysIgnore = self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits, _current_bridge_key, _system, False, _sysIgnore, _source_server, _ber, _rssi, _source_rptr) - # Also check for a corresponding reflector bridge (e.g., #9990) _reflector_bridge_key = ''.join(['#', _current_bridge_key]) if _reflector_bridge_key in BRIDGES: _sysIgnore = self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits, _reflector_bridge_key, _system, False, _sysIgnore, _source_server, _ber, _rssi, _source_rptr) - # Now, run the general routing loop for all other bridges to handle cross-connections. # We skip the one we just processed to avoid duplicate work. for _bridge in BRIDGES: @@ -2564,27 +2133,22 @@ class routerHBP(HBSYSTEM): self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration, packet_rate, loss) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) - #Reset back to False self.STATUS[_slot]['lastSeq'] = False self.STATUS[_slot]['lastData'] = False - # # Begin in-band signalling for call end. This has nothign to do with routing traffic directly. # - # Iterate the rules dictionary for _bridge in BRIDGES: if (_bridge[0:1] == '#') and (_int_dst_id != 9): continue for _system in BRIDGES[_bridge]: if _system['SYSTEM'] == self._system: - # TGID matches a rule source, reset its timer if _slot == _system['TS'] and _dst_id == _system['TGID'] and ((_system['TO_TYPE'] == 'ON' and (_system['ACTIVE'] == True)) or (_system['TO_TYPE'] == 'OFF' and _system['ACTIVE'] == False)): _system['TIMER'] = pkt_time + _system['TIMEOUT'] logger.info('(%s) [1] Transmission match for Bridge: %s. Reset timeout to %s', self._system, _bridge, _system['TIMER']) - # TGID matches an ACTIVATION trigger if (_dst_id in _system['ON'] or _dst_id in _system['RESET']) and _slot == _system['TS']: # Set the matching rule as ACTIVE @@ -2601,7 +2165,6 @@ class routerHBP(HBSYSTEM): if _system['ACTIVE'] == True and _system['TO_TYPE'] == 'ON': _system['TIMER'] = pkt_time + _system['TIMEOUT'] logger.info('(%s) [4] Bridge: %s, timeout timer reset to: %s', self._system, _bridge, _system['TIMER'] - pkt_time) - # TGID matches an DE-ACTIVATION trigger #Single TG mode if (CONFIG['SYSTEMS'][self._system]['MODE'] == 'MASTER' and CONFIG['SYSTEMS'][self._system]['SINGLE_MODE']) == True: @@ -2625,10 +2188,9 @@ class routerHBP(HBSYSTEM): _system['TIMER'] = pkt_time logger.info('(%s) [8] Bridge: %s set to ON with and "OFF" timer rule: timeout timer cancelled', self._system, _bridge) else: - # NUEVO COMPORTAMIENTO: SINGLE_MODE=False pero con gestión de bridge único - # Solo desactivar si es TG 4000 o un nuevo TG dinámico (no estático) - - # Verificar si el TGID actual es estático + # NUEVO COMPORTAMIENTO: SINGLE_MODE=False pero con gestión de bridge único + # Solo desactivar si es TG 4000 o un nuevo TG dinámico (no estático) + # Verificar si el TGID actual es estático is_static_tg = False if CONFIG['SYSTEMS'][self._system]['TS1_STATIC'] and _slot == 1: static_tgs = [int(tg) for tg in CONFIG['SYSTEMS'][self._system]['TS1_STATIC'].split(',') if tg.strip()] @@ -2638,13 +2200,10 @@ class routerHBP(HBSYSTEM): static_tgs = [int(tg) for tg in CONFIG['SYSTEMS'][self._system]['TS2_STATIC'].split(',') if tg.strip()] if int_id(_dst_id) in static_tgs: is_static_tg = True - # Verificar si es un reflector (bridge que empieza con #) is_reflector = _bridge[0:1] == '#' - - # Desactivar solo si es TG 4000 o un nuevo TG dinámico (no estático ni reflector) + # Desactivar solo si es TG 4000 o un nuevo TG dinámico (no estático ni reflector) if (_dst_id == bytes_3(4000)) and _slot == _system['TS']: - # Set the matching rule as ACTIVE if _dst_id in _system['OFF'] or _dst_id == bytes_3(4000) or (_dst_id != _system['TGID'] and not is_static_tg and not is_reflector): if _system['ACTIVE'] == True: @@ -2655,7 +2214,6 @@ class routerHBP(HBSYSTEM): logger.info('(%s) [5b] Bridge: %s, connection changed to state: %s (Static TG %s activated)', self._system, _bridge, _system['ACTIVE'], int_id(_dst_id)) else: logger.info('(%s) [5b] Bridge: %s, connection changed to state: %s (New dynamic TG %s activated)', self._system, _bridge, _system['ACTIVE'], int_id(_dst_id)) - # Cancel the timer if we've enabled an "ON" type timeout if _system['TO_TYPE'] == 'ON': _system['TIMER'] = pkt_time @@ -2671,8 +2229,6 @@ class routerHBP(HBSYSTEM): # # END IN-BAND SIGNALLING # - - # Mark status variables for use later self.STATUS[_slot]['RX_PEER'] = _peer_id self.STATUS[_slot]['RX_SEQ'] = _seq @@ -2682,57 +2238,44 @@ class routerHBP(HBSYSTEM): self.STATUS[_slot]['RX_TIME'] = pkt_time self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id self.STATUS[_slot]['crcs'].add(_pkt_crc) - # # Socket-based reporting section # class bridgeReportFactory(reportFactory): - def send_bridge(self): serialized = pickle.dumps(BRIDGES, protocol=2) #.decode("utf-8", errors='ignore') self.send_clients(b''.join([REPORT_OPCODES['BRIDGE_SND'],serialized])) - def send_bridgeEvent(self, _data): if isinstance(_data, str): _data = _data.decode('utf-8', error='ignore') self.send_clients(b''.join([REPORT_OPCODES['BRDG_EVENT'],_data])) - #************************************************ # MAIN PROGRAM LOOP STARTS HERE #************************************************ if __name__ == '__main__': - import argparse import sys import os import signal - global CONFIG global KEYS keys = {} - # Higheset peer ID permitted by HBP PEER_MAX = 4294967295 - ID_MAX = 16776415 - #Set process title early setproctitle(__file__) - # Change the current directory to the location of the application os.chdir(os.path.dirname(os.path.realpath(sys.argv[0]))) - # CLI argument parser - handles picking up the config file from the command line, and sending a "help" message parser = argparse.ArgumentParser() parser.add_argument('-c', '--config', action='store', dest='CONFIG_FILE', help='/full/path/to/config.file (usually hblink.cfg)') #parser.add_argument('-r', '--rules', action='store', dest='RULES_FILE', help='/full/path/to/rules.file (usually rules.py)') parser.add_argument('-l', '--logging', action='store', dest='LOG_LEVEL', help='Override config file logging level.') cli_args = parser.parse_args() - # Ensure we have a path for the config file, if one wasn't specified, then use the default (top of file) if not cli_args.CONFIG_FILE: cli_args.CONFIG_FILE = os.path.dirname(os.path.abspath(__file__))+'/hblink.cfg' - #configP = False #if os.path.isfile('config.pkl'): #if os.path.getmtime('config.pkl') > (time() - 25): @@ -2747,26 +2290,23 @@ if __name__ == '__main__': #else: #os.unlink("config.pkl") #else: - CONFIG = config.build_config(cli_args.CONFIG_FILE) - # Ensure we have a path for the rules file, if one wasn't specified, then use the default (top of file) #if not cli_args.RULES_FILE: # cli_args.RULES_FILE = os.path.dirname(os.path.abspath(__file__))+'/rules.py' - # Start the system logger if cli_args.LOG_LEVEL: CONFIG['LOGGER']['LOG_LEVEL'] = cli_args.LOG_LEVEL logger = log.config_logging(CONFIG['LOGGER']) - logger.info('\n\nCopyright (c) 2020, 2021, 2022, 2023 Simon G7RZU simon@gb7fr.org.uk') - logger.info('Copyright (c) 2013, 2014, 2015, 2016, 2018, 2019\n\tThe Regents of the K0USY Group. All rights reserved.\n') + logger.info(""" +Copyright (c) 2020, 2021, 2022, 2023 Simon G7RZU simon@gb7fr.org.uk +Copyright (c) 2013, 2014, 2015, 2016, 2018, 2019 +\tThe Regents of the K0USY Group. All rights reserved. +""") logger.debug('(GLOBAL) Logging system started, anything from here on gets logged') - if CONFIG['ALLSTAR']['ENABLED']: logger.info('(AMI) Setting up AMI: Server: %s, Port: %s, User: %s, Pass: %s, Node: %s',CONFIG['ALLSTAR']['SERVER'],CONFIG['ALLSTAR']['PORT'],CONFIG['ALLSTAR']['USER'],CONFIG['ALLSTAR']['PASS'],CONFIG['ALLSTAR']['NODE']) - AMIOBJ = AMI(CONFIG['ALLSTAR']['SERVER'],CONFIG['ALLSTAR']['PORT'],CONFIG['ALLSTAR']['USER'],CONFIG['ALLSTAR']['PASS'],CONFIG['ALLSTAR']['NODE']) - # Set up the signal handler def sig_handler(_signal, _frame): logger.info('(GLOBAL) SHUTDOWN: CONFBRIDGE IS TERMINATING WITH SIGNAL %s', str(_signal)) @@ -2776,7 +2316,6 @@ if __name__ == '__main__': CONFIG['GLOBAL']['_KILL_SERVER'] = True else: exit() - #Server kill routine def kill_server(): try: @@ -2793,24 +2332,19 @@ if __name__ == '__main__': logger.error('(GLOBAL) Canot save key file: %s',e) except KeyError: pass - #install signal handlers signal.signal(signal.SIGTERM, sig_handler) signal.signal(signal.SIGINT, sig_handler) - # Create the name-number mapping dictionaries peer_ids, subscriber_ids, talkgroup_ids, local_subscriber_ids, server_ids, checksums = mk_aliases(CONFIG) - #Add special IDs to DB subscriber_ids[900999] = 'D-APRS' subscriber_ids[4294967295] = 'SC' - CONFIG['_SUB_IDS'] = subscriber_ids CONFIG['_PEER_IDS'] = peer_ids CONFIG['_LOCAL_SUBSCRIBER_IDS'] = local_subscriber_ids CONFIG['_SERVER_IDS'] = server_ids CONFIG['CHECKSUMS'] = checksums - # Import the ruiles file as a module, and create BRIDGES from it #spec = importlib.util.spec_from_file_location("module.name", cli_args.RULES_FILE) #rules_module = importlib.util.module_from_spec(spec) @@ -2819,7 +2353,6 @@ if __name__ == '__main__': # logger.info('(ROUTER) Routing bridges file found and bridges imported: %s', cli_args.RULES_FILE) #except (ImportError, FileNotFoundError): #sys.exit('(ROUTER) TERMINATING: Routing bridges file not found or invalid: {}'.format(cli_args.RULES_FILE)) - #Load pickle of bridges if it's less than 25 seconds old #if os.path.isfile('bridge.pkl'): #if os.path.getmtime('config.pkl') > (time() - 25): @@ -2834,7 +2367,6 @@ if __name__ == '__main__': #BRIDGES = make_bridges(rules_module.BRIDGES) #os.unlink("bridge.pkl") #else: - if 'ECHO' in CONFIG['SYSTEMS'] and CONFIG['SYSTEMS']['ECHO']['MODE'] == 'PEER': BRIDGES = make_bridges({'9990': [{'SYSTEM': 'ECHO', 'TS': 2, 'TGID': 9990, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'NONE', 'ON': [], 'OFF': [], 'RESET': []},]}) else: @@ -2843,7 +2375,6 @@ if __name__ == '__main__': #Subscriber map for unit calls - complete with test entry #SUB_MAP = {bytes_3(73578):('REP-1',1,time())} SUB_MAP = {} - if CONFIG['ALIASES']['SUB_MAP_FILE']: try: with open(CONFIG['ALIASES']['PATH'] + CONFIG['ALIASES']['SUB_MAP_FILE'],'rb') as _fh: @@ -2851,10 +2382,8 @@ if __name__ == '__main__': except: logger.warning('(SUBSCRIBER) Cannot load SUB_MAP file') #sys.exit('(SUBSCRIBER) TERMINATING: SUB_MAP file not found or invalid') - #Test value #SUB_MAP[bytes_3(73578)] = ('REP-1',1,time()) - #Generator generator = {} systemdelete = deque() @@ -2869,17 +2398,13 @@ if __name__ == '__main__': logger.debug('(GLOBAL) Generator - generated system %s',_systemname) generator[_systemname]['_default_options'] systemdelete.append(system) - for _system in generator: CONFIG['SYSTEMS'][_system] = generator[_system] for _system in systemdelete: CONFIG['SYSTEMS'].pop(_system) - del generator del systemdelete - prohibitedTGs = [0,1,2,3,4,5,9,9990,9991,9992,9993,9994,9995,9996,9997,9998,9999] - # Default reflector logger.debug('(ROUTER) Setting default reflectors') for system in CONFIG['SYSTEMS']: @@ -2887,7 +2412,6 @@ if __name__ == '__main__': continue if CONFIG['SYSTEMS'][system]['DEFAULT_REFLECTOR'] not in prohibitedTGs: make_default_reflector(CONFIG['SYSTEMS'][system]['DEFAULT_REFLECTOR'],CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER'],system) - #static TGs logger.debug('(ROUTER) setting static TGs') for system in CONFIG['SYSTEMS']: @@ -2900,7 +2424,6 @@ if __name__ == '__main__': ts1 = CONFIG['SYSTEMS'][system]['TS1_STATIC'].split(',') if CONFIG['SYSTEMS'][system]['TS2_STATIC']: ts2 = CONFIG['SYSTEMS'][system]['TS2_STATIC'].split(',') - for tg in ts1: if not tg: continue @@ -2915,23 +2438,18 @@ if __name__ == '__main__': continue tg = int(tg) make_static_tg(tg,2,_tmout,system) - # INITIALIZE THE REPORTING LOOP if CONFIG['REPORTS']['REPORT']: report_server = config_reports(CONFIG, bridgeReportFactory) else: report_server = None logger.info('(REPORT) TCP Socket reporting not configured') - #Read AMBE AMBEobj = readAMBE(CONFIG['GLOBAL']['ANNOUNCEMENT_LANGUAGES'],'./Audio/') - #global words words = AMBEobj.readfiles() - for lang in words.keys(): logger.info('(AMBE) for language %s, read %s words into voice dict',lang,len(words[lang]) - 1) - #Remap words for internationalisation if lang in voiceMap: logger.info('(AMBE) i8n voice map entry for language %s',lang) @@ -2939,12 +2457,9 @@ if __name__ == '__main__': for _mapword in _map: logger.info('(AMBE) Mapping \"%s\" to \"%s\"',_mapword,_map[_mapword]) words[lang][_mapword] = words[lang][_map[_mapword]] - # HBlink instance creation logger.info('(GLOBAL) ADN \'bridge_master.py\' -- SYSTEM STARTING...') - listeningPorts = {} - for system in CONFIG['SYSTEMS']: if CONFIG['SYSTEMS'][system]['ENABLED']: if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': @@ -2956,17 +2471,14 @@ if __name__ == '__main__': systems[system] = routerHBP(system, CONFIG, report_server) listeningPorts[system] = reactor.listenUDP(CONFIG['SYSTEMS'][system]['PORT'], systems[system], interface=CONFIG['SYSTEMS'][system]['IP']) logger.debug('(GLOBAL) %s instance created: %s, %s', CONFIG['SYSTEMS'][system]['MODE'], system, systems[system]) - def loopingErrHandle(failure): - logger.error('(GLOBAL) STOPPING REACTOR TO AVOID MEMORY LEAK: Unhandled error in timed loop.\n %s', failure) + logger.error('(GLOBAL) STOPPING REACTOR TO AVOID MEMORY LEAK: Unhandled error in timed loop. %s', failure) reactor.stop() - #load keys if exists try: keys = load_json(''.join([CONFIG['ALIASES']['PATH'], CONFIG['ALIASES']['KEYS_FILE']])) except Exception as e: logger.error('(KEYS) Cannot load keys: %s',e) - #Initialize API # if CONFIG['GLOBAL']['ENABLE_API']: # api = config_API(CONFIG,BRIDGES) @@ -2983,67 +2495,54 @@ if __name__ == '__main__': # logger.info('(API) System API Key loaded from system key store') # else: # logger.info('(API) API not started') - # Initialize the rule timer -- this if for user activated stuff rule_timer_task = task.LoopingCall(rule_timer_loop) rule_timer = rule_timer_task.start(52) rule_timer.addErrback(loopingErrHandle) - # Initialize the stream trimmer stream_trimmer_task = task.LoopingCall(stream_trimmer_loop) stream_trimmer = stream_trimmer_task.start(5) stream_trimmer.addErrback(loopingErrHandle) - # Ident #This runs in a thread so as not to block the reactor ident_task = task.LoopingCall(threadIdent) identa = ident_task.start(3600) identa.addErrback(loopingErrHandle) - #Alias reloader alias_time = CONFIG['ALIASES']['STALE_TIME'] * 86400 aliasa_task = task.LoopingCall(threadAlias) aliasa = aliasa_task.start(alias_time) aliasa.addErrback(loopingErrHandle) - #Options parsing options_task = task.LoopingCall(options_config) options = options_task.start(26) options.addErrback(loopingErrHandle) - #bridge reset bridge_task = task.LoopingCall(bridge_reset) bridge = bridge_task.start(6) bridge.addErrback(loopingErrHandle) - #STAT trimmer - once every 5 mins (roughly - shifted so all timed tasks don't run at once if CONFIG['GLOBAL']['GEN_STAT_BRIDGES']: stat_trimmer_task = task.LoopingCall(statTrimmer) stat_trimmer = stat_trimmer_task.start(303)#3600 stat_trimmer.addErrback(loopingErrHandle) - #KA Reporting ka_task = task.LoopingCall(kaReporting) ka = ka_task.start(60) ka.addErrback(loopingErrHandle) - #Debug bridges if CONFIG['GLOBAL']['DEBUG_BRIDGES']: debug_bridges_task = task.LoopingCall(bridgeDebug) debug_bridges = debug_bridges_task.start(66) debug_bridges.addErrback(loopingErrHandle) - #Subscriber map trimmer sub_trimmer_task = task.LoopingCall(SubMapTrimmer) sub_trimmer = sub_trimmer_task.start(3600)#3600 sub_trimmer.addErrback(loopingErrHandle) - #Server kill switch checker killserver_task = task.LoopingCall(kill_server) killserver = killserver_task.start(5) killserver.addErrback(loopingErrHandle) - #more threads reactor.suggestThreadPoolSize(100) - reactor.run()