diff --git a/bridge_master.py b/bridge_master.py index bc4ad41..5049f25 100644 --- a/bridge_master.py +++ b/bridge_master.py @@ -38,6 +38,7 @@ from time import time,sleep,perf_counter import importlib.util import re import copy +import json from setproctitle import setproctitle #from crccheck.crc import Crc32 @@ -126,6 +127,7 @@ def config_reports(_config, _factory): i = i +1 logger.info('(REPORT) %s systems have at least one peer',i) logger.info('(REPORT) Subscriber Map has %s entries',len(SUB_MAP)) + logger.info('(REPORT) %s SERVER_ID\'s seen by topography system', len(TOPO)) logger.info('(REPORT) HBlink TCP reporting server configured') @@ -409,6 +411,27 @@ def subMapWrite(): except: logger.warning('(SUBSCRIBER) Cannot write SUB_MAP to file') +def topoWrite(): + try: + _fh = open(CONFIG['ALIASES']['PATH'] + CONFIG['ALIASES']['TOPO_FILE'],'w') + json.dump(TOPO,_fh) + _fh.close() + logger.info('(TOPO) Writing topography file to disk') + except: + logger.warning('(TOPO) Cannot write topography file to disk') + +def topoRead(): + try: + _fh = open(CONFIG['ALIASES']['PATH'] + CONFIG['ALIASES']['TOPO_FILE'],'r') + _topo = {} + _topo = json.load(_fh) + _fh.close() + logger.info('(TOPO) Reading topography file from disk') + except: + logger.warning('(TOPO) Cannot read topography file from disk') + finally: + return(_topo) + #Subscriber Map trimmer loop def SubMapTrimmer(): logger.debug('(SUBSCRIBER) Subscriber Map trimmer loop started') @@ -538,6 +561,25 @@ def stream_trimmer_loop(): else: logger.debug('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS]) +def topoTrimmer(): + logger.debug('(TOPO) Trimming stale entries') + _now = time() + _toprem = [] + for _src in TOPO: + _dstrem = [] + for _dst in TOPO[_src]: + if _now - TOPO[_src][_dst]['time'] > 1800: + _dstrem.append(_dst) + for _remove in _dstrem: + TOPO[_src].pop(_remove) + if len(TOPO[_src]) == 0: + _toprem.append(_src) + for _remove in _toprem: + TOPO.pop(_remove) + topoWrite() + + + def sendVoicePacket(self,pkt,_source_id,_dest_id,_slot): _stream_id = pkt[16:20] _pkt_time = time() @@ -1876,6 +1918,30 @@ class routerOBP(OPENBRIDGE): self.STATUS[_stream_id]['_fin'] = True self.STATUS[_stream_id]['lastSeq'] = False + + def process_bcto(self,_uid,_src,_dst,_ver,_hops): + _uid = int_id(_uid) + _src = int_id(_src) + _dst = int_id(_dst) + _ver = int.from_bytes(_ver,'big') + _hops = int.from_bytes(_hops,'big') + if _src not in TOPO: + TOPO[_src] = {} + TOPO[_src][_dst] = { + 'ver' : _ver, + 'time' : time(), + 'uid' : _uid, + 'hops' : _hops, + } + + def check_bcto_uid(self,_uid): + _uid = int_id(_uid) + for src in TOPO: + for dst in TOPO[src]: + if TOPO[src][dst]['uid'] == _uid: + return(True) + return(False) + class routerHBP(HBSYSTEM): @@ -2904,6 +2970,9 @@ if __name__ == '__main__': reactor.stop() if CONFIG['ALIASES']['SUB_MAP_FILE']: subMapWrite() + if CONFIG['ALIASES']['TOPO_FILE']: + topoWrite() + # Set signal handers so that we can gracefully exit if need be for sig in [signal.SIGINT, signal.SIGTERM]: @@ -2921,7 +2990,7 @@ if __name__ == '__main__': CONFIG['_LOCAL_SUBSCRIBER_IDS'] = local_subscriber_ids CONFIG['_SERVER_IDS'] = server_ids - + TOPO = topoRead() # Import the ruiles file as a module, and create BRIDGES from it spec = importlib.util.spec_from_file_location("module.name", cli_args.RULES_FILE) @@ -3118,6 +3187,12 @@ if __name__ == '__main__': sub_trimmer = sub_trimmer_task.start(3600)#3600 sub_trimmer.addErrback(loopingErrHandle) + #topography trimmer + topo_trimmer_task = task.LoopingCall(topoTrimmer) + topo_trimmer = topo_trimmer_task.start(610)#610 + topo_trimmer.addErrback(loopingErrHandle) + + #more threads reactor.suggestThreadPoolSize(100) diff --git a/config.py b/config.py index 15caf6d..504b9c9 100755 --- a/config.py +++ b/config.py @@ -148,7 +148,8 @@ def build_config(_config_file): 'ANNOUNCEMENT_LANGUAGES': config.get(section, 'ANNOUNCEMENT_LANGUAGES'), 'SERVER_ID': config.getint(section, 'SERVER_ID').to_bytes(4, 'big'), 'DATA_GATEWAY': config.getboolean(section, 'DATA_GATEWAY'), - 'VALIDATE_SERVER_IDS': config.getboolean(section, 'VALIDATE_SERVER_IDS') + 'VALIDATE_SERVER_IDS': config.getboolean(section, 'VALIDATE_SERVER_IDS'), + 'ISO_COUNTRY_CODE' : config.get(section, 'ISO_COUNTRY_CODE'), }) if not CONFIG['GLOBAL']['ANNOUNCEMENT_LANGUAGES']: @@ -186,7 +187,9 @@ def build_config(_config_file): 'SUB_MAP_FILE': config.get(section, 'SUB_MAP_FILE'), 'LOCAL_SUBSCRIBER_FILE': config.get(section, 'LOCAL_SUBSCRIBER_FILE'), 'SERVER_ID_URL': config.get(section, 'SERVER_ID_URL'), - 'SERVER_ID_FILE': config.get(section, 'SERVER_ID_FILE') + 'SERVER_ID_FILE': config.get(section, 'SERVER_ID_FILE'), + 'TOPO_FILE': config.get(section, 'TOPO_FILE'), + }) diff --git a/const.py b/const.py index 630b76f..862555a 100755 --- a/const.py +++ b/const.py @@ -83,9 +83,10 @@ BCKA = b'BCKA' BCSQ = b'BCSQ' BCST = b'BCST' BCVE = b'BCVE' +BCTO = b'BCTO' #Protocol version -VER = 5 +VER = 6 # Higheset peer ID permitted by HBP PEER_MAX = 4294967295 diff --git a/hblink.py b/hblink.py index d2a4a52..a17f3ae 100755 --- a/hblink.py +++ b/hblink.py @@ -163,6 +163,11 @@ class OPENBRIDGE(DatagramProtocol): self._bcve = self._bcve_task.start(60) self._bcve.addErrback(self.loopingErrHandle) + logger.debug('(%s) *BridgeControl* starting topography timer',self._system) + self._bcto_task = task.LoopingCall(self.send_my_bcto) + self._bcto = self._bcto_task.start(604)#600 + self._bcto.addErrback(self.loopingErrHandle) + def dereg(self): logger.info('(%s) is mode OPENBRIDGE. No De-Registration required, continuing shutdown', self._system) @@ -262,12 +267,52 @@ class OPENBRIDGE(DatagramProtocol): logger.trace('(%s) *BridgeControl* sent BCVE. Ver: %s',self._system,VER) else: logger.trace('(%s) *BridgeControl* not sending BCVE, TARGET_IP currently not known',self._system) + + def send_my_bcto(self): + if self._config['VER'] > 5: + _hops = 1 + _hops = _hops.to_bytes(1,'big') + for system in self._CONFIG['SYSTEMS']: + if self._CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE' and self._CONFIG['SYSTEMS'][system]['ENABLED']: + if self._config['ENHANCED_OBP'] and self._config['TARGET_IP']: + if '_bcka' in self._CONFIG['SYSTEMS'][system] and self._CONFIG['SYSTEMS'][system]['_bcka'] < time() - 60: + continue + _uid = bytes_4(randint(0x00, 0xFFFFFFFF)) + _packet = b''.join([BCTO,_uid,self._CONFIG['GLOBAL']['SERVER_ID'],self._CONFIG['SYSTEMS'][system]['NETWORK_ID'],self._CONFIG['SYSTEMS'][system]['VER'].to_bytes(1,"big"),_hops]) + _h = blake2b(key=self._config['PASSPHRASE'], digest_size=16) + _h.update(_packet) + _hash = _h.digest() + _packet = b''.join([_packet,_hash]) + self.transport.write(_packet, (self._config['TARGET_IP'], self._config['TARGET_PORT'])) + logger.trace('(%s) *BridgeControl* sent BCTO. DST: %s, VER: %s ',self._system,int_id(self._CONFIG['SYSTEMS'][system]['NETWORK_ID']),self._CONFIG['SYSTEMS'][system]['VER']) + else: + logger.trace('(%s) *BridgeControl* not sending BCTO, TARGET_IP currently not known. DST: %s, VER: %s ',self._system,int_id(self._CONFIG['SYSTEMS'][system]['NETWORK_ID']),self._CONFIG['SYSTEMS'][system]['VER']) + + + def retransmit_bcto(self,_string,_hops): + _hops += 1 + _hops = _hops.to_bytes(1,'big') + for system in self._CONFIG['SYSTEMS']: + if self._CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE' and self._CONFIG['SYSTEMS'][system]['VER'] > 5: + if self._config['ENHANCED_OBP'] and self._config['TARGET_IP']: + _packet = b''.join([BCTO,_string,_hops]) + _h = blake2b(key=self._CONFIG['SYSTEMS'][system]['PASSPHRASE'], digest_size=16) + _h.update(_packet) + _hash = _h.digest() + _packet = b''.join([_packet,_hash]) + self.transport.write(_packet, (self._CONFIG['SYSTEMS'][system]['TARGET_IP'], self._CONFIG['SYSTEMS'][system]['TARGET_PORT'])) + logger.trace('(%s) *BridgeControl* retransmitted BCTO.',self._system) + else: + logger.trace('(%s) *BridgeControl* not retransmitting BCTO, TARGET_IP currently not known.',self._system) def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data,_hash,_hops = b'', _source_server = b'\x00\x00\x00\x00', _ber = b'\x00', _rssi = b'\x00', _source_rptr = b'\x00\x00\x00\x00'): pass #print(int_id(_peer_id), int_id(_rf_src), int_id(_dst_id), int_id(_seq), _slot, _call_type, _frame_type, repr(_dtype_vseq), int_id(_stream_id)) + + def process_bcto(self,src,dst,ver): + pass def datagramReceived(self, _packet, _sockaddr): # Keep This Line Commented Unless HEAVILY Debugging! @@ -289,7 +334,7 @@ class OPENBRIDGE(DatagramProtocol): if compare_digest(_hash, _ckhs) and (_sockaddr == self._config['TARGET_SOCK'] or self._config['RELAX_CHECKS']): _peer_id = _data[11:15] if self._config['NETWORK_ID'] != _peer_id: - logger.error('(%s) OpenBridge packet discarded because NETWORK_ID: %s Does not match sent Peer ID: %s', self._system, int_id(self._config['NETWORK_ID']), int_id(_peer_id)) + logger.error('(%s) OpenBridge packet discarded because NETWORK_ID: %s Does not match sent Server ID: %s', self._system, int_id(self._config['NETWORK_ID']), int_id(_peer_id)) return #This is a v1 packet, so all the extended stuff we can set to default @@ -704,6 +749,30 @@ class OPENBRIDGE(DatagramProtocol): else: h,p = _sockaddr logger.warning('(%s) *ProtoControl* BCVE invalid, packet discarded - OPCODE: %s DATA: %s HMAC LENGTH: %s HMAC: %s SRC IP: %s SRC PORT: %s', self._system, _packet[:4], repr(_packet[:53]), len(_packet[53:]), repr(_packet[53:]),h,p) + + if _packet[:4] == BCTO: + if self._config['VER'] > 5: + _hash = _packet[18:] + _h = blake2b(key=self._config['PASSPHRASE'], digest_size=16) + _h.update(_packet[:18]) + _hash2 = _h.digest() + _uid = _packet[4:8] + _src = _packet[8:12] + _dst = _packet[12:16] + _ver = _packet[16:17] + _hops = _packet[17:18] + if _hash == _hash2: + logger.trace('(%s) *ProtoControl* BCTO received: %s connected to %s with proto ver. %s. HOPS: %s ',self._system, int_id(_src), int_id(_dst), int.from_bytes(_ver,'big'), int.from_bytes(_hops,'big')) + if int.from_bytes(_hops,'big') < 10 and _src != self._CONFIG['GLOBAL']['SERVER_ID'] and not self.check_bcto_uid(_uid): + self.retransmit_bcto(_packet[4:17],int.from_bytes(_hops,'big')) + else: + logger.trace('(%s) *BridgeControl* not retransmitting BCTO - hop count exceeded, already seen or my packet',self._system) + if not self.check_bcto_uid(_uid): + self.process_bcto(_uid,_src,_dst,_ver,_hops) + else: + h,p = _sockaddr + logger.warning('(%s) *ProtoControl* BCTO invalid, packet discarded - OPCODE: %s DATA: %s HMAC LENGTH: %s HMAC: %s SRC IP: %s SRC PORT: %s', self._system, _packet[:4], repr(_packet[:18]), len(_packet[18:]),repr(_packet[18:]),h,p) +