From 7eccd3e307f64d50afef5f70a46139cecfd6a225 Mon Sep 17 00:00:00 2001 From: Simon Date: Tue, 9 Aug 2022 13:17:13 +0100 Subject: [PATCH] Auto topography discovery system: new config directive: under [ALIAS] TOPO_FILE: topography.json A JSON file is output showing all of the SERVER_IDs seen, at the top level, and where they are bridged to. Example: { "2341" : { #Top-level server ID seen "7301" : { #Is bridged with server 7301 "hops" : 2, #Hops it took for the topography packet to reach us "time" : 1660047369.11104, #Timestamp on which the packet was received "ver" : 5, #Version of FBP this bridge speaks "uid" : 1820181884 #Unique ID to identify the topography packet. Mostly so we don't duplicate. } } Squashed commit of the following: commit 950f3e1c3e5157023707ab90df8d51dac2fd1489 Author: Simon Date: Tue Aug 9 13:16:40 2022 +0100 Topo ready for merge commit 4dc9f817a7b4060b9fdb3278c1e99287f9113f6d Author: Simon Date: Tue Aug 9 02:01:08 2022 +0100 Fix trimmer commit 48c520c86377f1f56e9ad841db14596a5f5eaf9c Author: Simon Date: Mon Aug 8 01:35:43 2022 +0100 Think this may be ready :) commit 8a860bb382b1a86aeff1ae47044ef41aa8b91df9 Author: Simon Date: Mon Aug 8 00:29:14 2022 +0100 Fix back to operational params commit 3657760112f39baa5d88aa21a3e7a4b45cf33d12 Author: Simon Date: Mon Aug 8 00:18:18 2022 +0100 UID typo commit 047b1df5a43bb41fc34e5270ef33a300121bd783 Author: Simon Date: Mon Aug 8 00:17:12 2022 +0100 Fix BCTO commit dfe21bfaa3fffe16d52cf8d92640416fb5732eea Author: Simon Date: Mon Aug 8 00:12:48 2022 +0100 Testing with 10 sec commit 8431b4dcf63f571020a96ade1362b05f6e9c7599 Author: Simon Date: Fri Aug 5 00:10:28 2022 +0100 Use UID in BCTO commit 620a9818bfa9966e96522d385869712a229d00f3 Author: Simon Date: Thu Aug 4 02:38:59 2022 +0100 dfkldlk commit 1f92e1b78439a86d9bb6d60f36e473556ae689d7 Author: Simon Date: Thu Aug 4 02:16:48 2022 +0100 retrans with correct passphrase commit 445ae4474c921a94b9dce4abfb62ffe7c9cb16c2 Author: Simon Date: Thu Aug 4 01:47:01 2022 +0100 Retransmit to right systems! commit ba0da271a2837096854ea35912a400618b556367 Author: Simon Date: Thu Aug 4 01:08:56 2022 +0100 my hops commit 80e42f7837c5c0937469848a266e75be44a40090 Author: Simon Date: Thu Aug 4 00:51:47 2022 +0100 dfdf commit 0991670323fdd6f9a2401ba85656ec8786a6831d Author: Simon Date: Thu Aug 4 00:49:10 2022 +0100 fkjkj commit 275306af50a2b384d2f3ba758fca31e2e3fc205c Author: Simon Date: Thu Aug 4 00:48:12 2022 +0100 f commit c9cb71accd851882e53bcef9e66f09c86d304b8a Author: Simon Date: Thu Aug 4 00:47:10 2022 +0100 hoppy commit d01019fa6138163f94e861ac0a6a62a217bdaf59 Author: Simon Date: Thu Aug 4 00:23:39 2022 +0100 bctony commit f3de53d47bb045e39dca31d862389c46ea41c463 Author: Simon Date: Thu Aug 4 00:04:50 2022 +0100 fklflk commit 752b8407a812f37cbfa23dde066e062401895b7a Author: Simon Date: Wed Aug 3 23:59:17 2022 +0100 lkflkk commit dd92b059eef3af67e47332639314ebdee7bf7ef6 Author: Simon Date: Wed Aug 3 23:58:14 2022 +0100 dlfklk commit 220d5bc6bbb5e36a92c0f2e3bcc437bafe371c37 Author: Simon Date: Wed Aug 3 23:57:25 2022 +0100 dlkfdlkf commit d9e46764c1a988415e2ad953ede24baeefa91c39 Author: Simon Date: Wed Aug 3 23:49:52 2022 +0100 dflklxk commit 9aba9d2fdc4ae8f03361bfb8aab35d73f91c2a8e Author: Simon Date: Wed Aug 3 23:48:06 2022 +0100 ;fl;sl commit 0d2d6fc5ea61de554d827f35e4ca71b5a56b0c99 Author: Simon Date: Wed Aug 3 23:46:20 2022 +0100 lskdlk commit aad811c080d15d912fc21dd6b4c7fa96c82ab65e Author: Simon Date: Wed Aug 3 23:39:59 2022 +0100 flklk commit a531b5f2b814c3aba20ae42fd054000a02946d1f Author: Simon Date: Wed Aug 3 23:39:00 2022 +0100 hoppy commit 648bd3d77ccad944763b6dfb0d19ad916bd3cb1c Author: Simon Date: Wed Aug 3 23:37:27 2022 +0100 slfkdlsk commit 32f70379732e4dd82eaa6b3603778582de8181b2 Author: Simon Date: Wed Aug 3 23:35:55 2022 +0100 dflkldk commit e1e58321c198e062d593eef390f7c3545b9b76f7 Author: Simon Date: Wed Aug 3 23:33:49 2022 +0100 djklk commit f7985a6119d3f080a48d7a3556bd47441f15349c Author: Simon Date: Wed Aug 3 23:32:59 2022 +0100 f f commit dfcf4184f438fde07ba9943a3b80873c705534d0 Author: Simon Date: Wed Aug 3 23:31:58 2022 +0100 fix fix commit dd76e6ce4e47f7e24d24e1ba773719ab8a7494a9 Author: Simon Date: Wed Aug 3 23:30:35 2022 +0100 fix hmac commit 5306f366752b0bc416ab81dfd0a485f4b6b93d11 Author: Simon Date: Wed Aug 3 23:28:40 2022 +0100 10 sec test commit 549f8245e68bff09c183ce17c59e169798f52430 Author: Simon Date: Wed Aug 3 23:23:54 2022 +0100 packet commit 011db46f479cad700dc34d56b7805f09ce443974 Author: Simon Date: Wed Aug 3 23:10:44 2022 +0100 Lets test the TOPO! commit d1cee872e564d2f25bda2589ee73db081bda181e Author: Simon Date: Tue Aug 2 00:47:15 2022 +0100 Topo first pass --- bridge_master.py | 77 +++++++++++++++++++++++++++++++++++++++++++++++- config.py | 7 +++-- const.py | 3 +- hblink.py | 71 +++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 153 insertions(+), 5 deletions(-) diff --git a/bridge_master.py b/bridge_master.py index bc4ad41..5049f25 100644 --- a/bridge_master.py +++ b/bridge_master.py @@ -38,6 +38,7 @@ from time import time,sleep,perf_counter import importlib.util import re import copy +import json from setproctitle import setproctitle #from crccheck.crc import Crc32 @@ -126,6 +127,7 @@ def config_reports(_config, _factory): i = i +1 logger.info('(REPORT) %s systems have at least one peer',i) logger.info('(REPORT) Subscriber Map has %s entries',len(SUB_MAP)) + logger.info('(REPORT) %s SERVER_ID\'s seen by topography system', len(TOPO)) logger.info('(REPORT) HBlink TCP reporting server configured') @@ -409,6 +411,27 @@ def subMapWrite(): except: logger.warning('(SUBSCRIBER) Cannot write SUB_MAP to file') +def topoWrite(): + try: + _fh = open(CONFIG['ALIASES']['PATH'] + CONFIG['ALIASES']['TOPO_FILE'],'w') + json.dump(TOPO,_fh) + _fh.close() + logger.info('(TOPO) Writing topography file to disk') + except: + logger.warning('(TOPO) Cannot write topography file to disk') + +def topoRead(): + try: + _fh = open(CONFIG['ALIASES']['PATH'] + CONFIG['ALIASES']['TOPO_FILE'],'r') + _topo = {} + _topo = json.load(_fh) + _fh.close() + logger.info('(TOPO) Reading topography file from disk') + except: + logger.warning('(TOPO) Cannot read topography file from disk') + finally: + return(_topo) + #Subscriber Map trimmer loop def SubMapTrimmer(): logger.debug('(SUBSCRIBER) Subscriber Map trimmer loop started') @@ -538,6 +561,25 @@ def stream_trimmer_loop(): else: logger.debug('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS]) +def topoTrimmer(): + logger.debug('(TOPO) Trimming stale entries') + _now = time() + _toprem = [] + for _src in TOPO: + _dstrem = [] + for _dst in TOPO[_src]: + if _now - TOPO[_src][_dst]['time'] > 1800: + _dstrem.append(_dst) + for _remove in _dstrem: + TOPO[_src].pop(_remove) + if len(TOPO[_src]) == 0: + _toprem.append(_src) + for _remove in _toprem: + TOPO.pop(_remove) + topoWrite() + + + def sendVoicePacket(self,pkt,_source_id,_dest_id,_slot): _stream_id = pkt[16:20] _pkt_time = time() @@ -1876,6 +1918,30 @@ class routerOBP(OPENBRIDGE): self.STATUS[_stream_id]['_fin'] = True self.STATUS[_stream_id]['lastSeq'] = False + + def process_bcto(self,_uid,_src,_dst,_ver,_hops): + _uid = int_id(_uid) + _src = int_id(_src) + _dst = int_id(_dst) + _ver = int.from_bytes(_ver,'big') + _hops = int.from_bytes(_hops,'big') + if _src not in TOPO: + TOPO[_src] = {} + TOPO[_src][_dst] = { + 'ver' : _ver, + 'time' : time(), + 'uid' : _uid, + 'hops' : _hops, + } + + def check_bcto_uid(self,_uid): + _uid = int_id(_uid) + for src in TOPO: + for dst in TOPO[src]: + if TOPO[src][dst]['uid'] == _uid: + return(True) + return(False) + class routerHBP(HBSYSTEM): @@ -2904,6 +2970,9 @@ if __name__ == '__main__': reactor.stop() if CONFIG['ALIASES']['SUB_MAP_FILE']: subMapWrite() + if CONFIG['ALIASES']['TOPO_FILE']: + topoWrite() + # Set signal handers so that we can gracefully exit if need be for sig in [signal.SIGINT, signal.SIGTERM]: @@ -2921,7 +2990,7 @@ if __name__ == '__main__': CONFIG['_LOCAL_SUBSCRIBER_IDS'] = local_subscriber_ids CONFIG['_SERVER_IDS'] = server_ids - + TOPO = topoRead() # Import the ruiles file as a module, and create BRIDGES from it spec = importlib.util.spec_from_file_location("module.name", cli_args.RULES_FILE) @@ -3118,6 +3187,12 @@ if __name__ == '__main__': sub_trimmer = sub_trimmer_task.start(3600)#3600 sub_trimmer.addErrback(loopingErrHandle) + #topography trimmer + topo_trimmer_task = task.LoopingCall(topoTrimmer) + topo_trimmer = topo_trimmer_task.start(610)#610 + topo_trimmer.addErrback(loopingErrHandle) + + #more threads reactor.suggestThreadPoolSize(100) diff --git a/config.py b/config.py index 15caf6d..504b9c9 100755 --- a/config.py +++ b/config.py @@ -148,7 +148,8 @@ def build_config(_config_file): 'ANNOUNCEMENT_LANGUAGES': config.get(section, 'ANNOUNCEMENT_LANGUAGES'), 'SERVER_ID': config.getint(section, 'SERVER_ID').to_bytes(4, 'big'), 'DATA_GATEWAY': config.getboolean(section, 'DATA_GATEWAY'), - 'VALIDATE_SERVER_IDS': config.getboolean(section, 'VALIDATE_SERVER_IDS') + 'VALIDATE_SERVER_IDS': config.getboolean(section, 'VALIDATE_SERVER_IDS'), + 'ISO_COUNTRY_CODE' : config.get(section, 'ISO_COUNTRY_CODE'), }) if not CONFIG['GLOBAL']['ANNOUNCEMENT_LANGUAGES']: @@ -186,7 +187,9 @@ def build_config(_config_file): 'SUB_MAP_FILE': config.get(section, 'SUB_MAP_FILE'), 'LOCAL_SUBSCRIBER_FILE': config.get(section, 'LOCAL_SUBSCRIBER_FILE'), 'SERVER_ID_URL': config.get(section, 'SERVER_ID_URL'), - 'SERVER_ID_FILE': config.get(section, 'SERVER_ID_FILE') + 'SERVER_ID_FILE': config.get(section, 'SERVER_ID_FILE'), + 'TOPO_FILE': config.get(section, 'TOPO_FILE'), + }) diff --git a/const.py b/const.py index 630b76f..862555a 100755 --- a/const.py +++ b/const.py @@ -83,9 +83,10 @@ BCKA = b'BCKA' BCSQ = b'BCSQ' BCST = b'BCST' BCVE = b'BCVE' +BCTO = b'BCTO' #Protocol version -VER = 5 +VER = 6 # Higheset peer ID permitted by HBP PEER_MAX = 4294967295 diff --git a/hblink.py b/hblink.py index d2a4a52..a17f3ae 100755 --- a/hblink.py +++ b/hblink.py @@ -163,6 +163,11 @@ class OPENBRIDGE(DatagramProtocol): self._bcve = self._bcve_task.start(60) self._bcve.addErrback(self.loopingErrHandle) + logger.debug('(%s) *BridgeControl* starting topography timer',self._system) + self._bcto_task = task.LoopingCall(self.send_my_bcto) + self._bcto = self._bcto_task.start(604)#600 + self._bcto.addErrback(self.loopingErrHandle) + def dereg(self): logger.info('(%s) is mode OPENBRIDGE. No De-Registration required, continuing shutdown', self._system) @@ -262,12 +267,52 @@ class OPENBRIDGE(DatagramProtocol): logger.trace('(%s) *BridgeControl* sent BCVE. Ver: %s',self._system,VER) else: logger.trace('(%s) *BridgeControl* not sending BCVE, TARGET_IP currently not known',self._system) + + def send_my_bcto(self): + if self._config['VER'] > 5: + _hops = 1 + _hops = _hops.to_bytes(1,'big') + for system in self._CONFIG['SYSTEMS']: + if self._CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE' and self._CONFIG['SYSTEMS'][system]['ENABLED']: + if self._config['ENHANCED_OBP'] and self._config['TARGET_IP']: + if '_bcka' in self._CONFIG['SYSTEMS'][system] and self._CONFIG['SYSTEMS'][system]['_bcka'] < time() - 60: + continue + _uid = bytes_4(randint(0x00, 0xFFFFFFFF)) + _packet = b''.join([BCTO,_uid,self._CONFIG['GLOBAL']['SERVER_ID'],self._CONFIG['SYSTEMS'][system]['NETWORK_ID'],self._CONFIG['SYSTEMS'][system]['VER'].to_bytes(1,"big"),_hops]) + _h = blake2b(key=self._config['PASSPHRASE'], digest_size=16) + _h.update(_packet) + _hash = _h.digest() + _packet = b''.join([_packet,_hash]) + self.transport.write(_packet, (self._config['TARGET_IP'], self._config['TARGET_PORT'])) + logger.trace('(%s) *BridgeControl* sent BCTO. DST: %s, VER: %s ',self._system,int_id(self._CONFIG['SYSTEMS'][system]['NETWORK_ID']),self._CONFIG['SYSTEMS'][system]['VER']) + else: + logger.trace('(%s) *BridgeControl* not sending BCTO, TARGET_IP currently not known. DST: %s, VER: %s ',self._system,int_id(self._CONFIG['SYSTEMS'][system]['NETWORK_ID']),self._CONFIG['SYSTEMS'][system]['VER']) + + + def retransmit_bcto(self,_string,_hops): + _hops += 1 + _hops = _hops.to_bytes(1,'big') + for system in self._CONFIG['SYSTEMS']: + if self._CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE' and self._CONFIG['SYSTEMS'][system]['VER'] > 5: + if self._config['ENHANCED_OBP'] and self._config['TARGET_IP']: + _packet = b''.join([BCTO,_string,_hops]) + _h = blake2b(key=self._CONFIG['SYSTEMS'][system]['PASSPHRASE'], digest_size=16) + _h.update(_packet) + _hash = _h.digest() + _packet = b''.join([_packet,_hash]) + self.transport.write(_packet, (self._CONFIG['SYSTEMS'][system]['TARGET_IP'], self._CONFIG['SYSTEMS'][system]['TARGET_PORT'])) + logger.trace('(%s) *BridgeControl* retransmitted BCTO.',self._system) + else: + logger.trace('(%s) *BridgeControl* not retransmitting BCTO, TARGET_IP currently not known.',self._system) def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data,_hash,_hops = b'', _source_server = b'\x00\x00\x00\x00', _ber = b'\x00', _rssi = b'\x00', _source_rptr = b'\x00\x00\x00\x00'): pass #print(int_id(_peer_id), int_id(_rf_src), int_id(_dst_id), int_id(_seq), _slot, _call_type, _frame_type, repr(_dtype_vseq), int_id(_stream_id)) + + def process_bcto(self,src,dst,ver): + pass def datagramReceived(self, _packet, _sockaddr): # Keep This Line Commented Unless HEAVILY Debugging! @@ -289,7 +334,7 @@ class OPENBRIDGE(DatagramProtocol): if compare_digest(_hash, _ckhs) and (_sockaddr == self._config['TARGET_SOCK'] or self._config['RELAX_CHECKS']): _peer_id = _data[11:15] if self._config['NETWORK_ID'] != _peer_id: - logger.error('(%s) OpenBridge packet discarded because NETWORK_ID: %s Does not match sent Peer ID: %s', self._system, int_id(self._config['NETWORK_ID']), int_id(_peer_id)) + logger.error('(%s) OpenBridge packet discarded because NETWORK_ID: %s Does not match sent Server ID: %s', self._system, int_id(self._config['NETWORK_ID']), int_id(_peer_id)) return #This is a v1 packet, so all the extended stuff we can set to default @@ -704,6 +749,30 @@ class OPENBRIDGE(DatagramProtocol): else: h,p = _sockaddr logger.warning('(%s) *ProtoControl* BCVE invalid, packet discarded - OPCODE: %s DATA: %s HMAC LENGTH: %s HMAC: %s SRC IP: %s SRC PORT: %s', self._system, _packet[:4], repr(_packet[:53]), len(_packet[53:]), repr(_packet[53:]),h,p) + + if _packet[:4] == BCTO: + if self._config['VER'] > 5: + _hash = _packet[18:] + _h = blake2b(key=self._config['PASSPHRASE'], digest_size=16) + _h.update(_packet[:18]) + _hash2 = _h.digest() + _uid = _packet[4:8] + _src = _packet[8:12] + _dst = _packet[12:16] + _ver = _packet[16:17] + _hops = _packet[17:18] + if _hash == _hash2: + logger.trace('(%s) *ProtoControl* BCTO received: %s connected to %s with proto ver. %s. HOPS: %s ',self._system, int_id(_src), int_id(_dst), int.from_bytes(_ver,'big'), int.from_bytes(_hops,'big')) + if int.from_bytes(_hops,'big') < 10 and _src != self._CONFIG['GLOBAL']['SERVER_ID'] and not self.check_bcto_uid(_uid): + self.retransmit_bcto(_packet[4:17],int.from_bytes(_hops,'big')) + else: + logger.trace('(%s) *BridgeControl* not retransmitting BCTO - hop count exceeded, already seen or my packet',self._system) + if not self.check_bcto_uid(_uid): + self.process_bcto(_uid,_src,_dst,_ver,_hops) + else: + h,p = _sockaddr + logger.warning('(%s) *ProtoControl* BCTO invalid, packet discarded - OPCODE: %s DATA: %s HMAC LENGTH: %s HMAC: %s SRC IP: %s SRC PORT: %s', self._system, _packet[:4], repr(_packet[:18]), len(_packet[18:]),repr(_packet[18:]),h,p) +