From d1cee872e564d2f25bda2589ee73db081bda181e Mon Sep 17 00:00:00 2001 From: Simon Date: Tue, 2 Aug 2022 00:47:15 +0100 Subject: [PATCH] Topo first pass --- bridge_master.py | 35 +++++++++++++++++++++++++++++ const.py | 3 ++- hblink.py | 57 +++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 93 insertions(+), 2 deletions(-) diff --git a/bridge_master.py b/bridge_master.py index bc4ad41..237fdae 100644 --- a/bridge_master.py +++ b/bridge_master.py @@ -538,6 +538,26 @@ def stream_trimmer_loop(): else: logger.debug('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS]) +def topoTrimmer(): + logger.debug('(TOPO) Trimming stale entries') + _now = time() + _toprem = [] + for _src in TOPO: + _dstrem = [] + for _dst in TOPO[_src]: + if TOPO[_src][_dst]['time'] - +now > 1800: + _dstrem.append(_dst) + for _remove in _dstrem: + TOPO[_src].pop(_remove) + if len(TOPO[_src]) == 0: + _toprem.append(_src) + for _remove in _toprem: + TOPO.pop(_remove) + + print(TOPO) + + + def sendVoicePacket(self,pkt,_source_id,_dest_id,_slot): _stream_id = pkt[16:20] _pkt_time = time() @@ -1876,6 +1896,13 @@ class routerOBP(OPENBRIDGE): self.STATUS[_stream_id]['_fin'] = True self.STATUS[_stream_id]['lastSeq'] = False + + def process_bcto(self,_src,_dst,_ver): + TOPO[_src][_dst] = { + 'ver' : _ver, + 'time' : time() + } + class routerHBP(HBSYSTEM): @@ -2850,6 +2877,8 @@ if __name__ == '__main__': #os.unlink("config.pkl") #else: + TOPO = {} + CONFIG = config.build_config(cli_args.CONFIG_FILE) # Ensure we have a path for the rules file, if one wasn't specified, then use the default (top of file) @@ -3118,6 +3147,12 @@ if __name__ == '__main__': sub_trimmer = sub_trimmer_task.start(3600)#3600 sub_trimmer.addErrback(loopingErrHandle) + #topography trimmer + topo_trimmer_task = task.LoopingCall(topoTrimmer) + topo_trimmer = topo_trimmer_task.start(610) + topo_trimmer.addErrback(loopingErrHandle) + + #more threads reactor.suggestThreadPoolSize(100) diff --git a/const.py b/const.py index 630b76f..862555a 100755 --- a/const.py +++ b/const.py @@ -83,9 +83,10 @@ BCKA = b'BCKA' BCSQ = b'BCSQ' BCST = b'BCST' BCVE = b'BCVE' +BCTO = b'BCTO' #Protocol version -VER = 5 +VER = 6 # Higheset peer ID permitted by HBP PEER_MAX = 4294967295 diff --git a/hblink.py b/hblink.py index d2a4a52..8611954 100755 --- a/hblink.py +++ b/hblink.py @@ -163,6 +163,11 @@ class OPENBRIDGE(DatagramProtocol): self._bcve = self._bcve_task.start(60) self._bcve.addErrback(self.loopingErrHandle) + logger.debug('(%s) *BridgeControl* starting topography timer',self._system) + self._bcto_task = task.LoopingCall(self.send_my_bcto) + self._bcto = self._bcto_task.start(600) + self._bcto.addErrback(self.loopingErrHandle) + def dereg(self): logger.info('(%s) is mode OPENBRIDGE. No De-Registration required, continuing shutdown', self._system) @@ -262,12 +267,44 @@ class OPENBRIDGE(DatagramProtocol): logger.trace('(%s) *BridgeControl* sent BCVE. Ver: %s',self._system,VER) else: logger.trace('(%s) *BridgeControl* not sending BCVE, TARGET_IP currently not known',self._system) + + def send_my_bcto(self): + for system in self._CONFIG['SYSTEMS']: + if self._CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE' and self._CONFIG['SYSTEMS'][system]['VER'] > 5: + if self._config['ENHANCED_OBP'] and self._config['TARGET_IP']: + _packet = b''.join([BCTO,self._CONFIG['GLOBAL']['SERVER_ID'],self._CONFIG['SYSTEMS'][system]['NETWORK_ID'],self._CONFIG['SYSTEMS'][system]['VER'].to_bytes(1,"big")]) + _h = blake2b(key=self._config['PASSPHRASE'], digest_size=16) + _h.update(_packet) + _hash = _h.digest() + _packet = b''.join([_packet,_hash]) + self.transport.write(_packet, (self._config['TARGET_IP'], self._config['TARGET_PORT'])) + logger.trace('(%s) *BridgeControl* sent BCTO. DST: %s, VER: %s ',self._system,int_id(self._CONFIG['SYSTEMS'][system]['NETWORK_ID']),self._CONFIG['SYSTEMS'][system]['VER']) + else: + logger.trace('(%s) *BridgeControl* not sending BCTO, TARGET_IP currently not known. DST: %s, VER: %s ',self._system,int_id(self._CONFIG['SYSTEMS'][system]['NETWORK_ID']),self._CONFIG['SYSTEMS'][system]['VER']) + + + def retransmit_bcto(self,_string): + for system in self._CONFIG['SYSTEMS']: + if self._CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE' and self._CONFIG['SYSTEMS'][system]['VER'] > 5: + if self._config['ENHANCED_OBP'] and self._config['TARGET_IP']: + _packet = b''.join([BCTO,_string]) + _h = blake2b(key=self._config['PASSPHRASE'], digest_size=16) + _h.update(_packet) + _hash = _h.digest() + _packet = b''.join([_packet,_hash]) + self.transport.write(_packet, (self._config['TARGET_IP'], self._config['TARGET_PORT'])) + logger.trace('(%s) *BridgeControl* retransmitted BCTO.') + else: + logger.trace('(%s) *BridgeControl* not retransmitting BCTO, TARGET_IP currently not known.',self._system) def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data,_hash,_hops = b'', _source_server = b'\x00\x00\x00\x00', _ber = b'\x00', _rssi = b'\x00', _source_rptr = b'\x00\x00\x00\x00'): pass #print(int_id(_peer_id), int_id(_rf_src), int_id(_dst_id), int_id(_seq), _slot, _call_type, _frame_type, repr(_dtype_vseq), int_id(_stream_id)) + + def process_bcto(self,src,dst,ver): + pass def datagramReceived(self, _packet, _sockaddr): # Keep This Line Commented Unless HEAVILY Debugging! @@ -289,7 +326,7 @@ class OPENBRIDGE(DatagramProtocol): if compare_digest(_hash, _ckhs) and (_sockaddr == self._config['TARGET_SOCK'] or self._config['RELAX_CHECKS']): _peer_id = _data[11:15] if self._config['NETWORK_ID'] != _peer_id: - logger.error('(%s) OpenBridge packet discarded because NETWORK_ID: %s Does not match sent Peer ID: %s', self._system, int_id(self._config['NETWORK_ID']), int_id(_peer_id)) + logger.error('(%s) OpenBridge packet discarded because NETWORK_ID: %s Does not match sent Server ID: %s', self._system, int_id(self._config['NETWORK_ID']), int_id(_peer_id)) return #This is a v1 packet, so all the extended stuff we can set to default @@ -704,6 +741,24 @@ class OPENBRIDGE(DatagramProtocol): else: h,p = _sockaddr logger.warning('(%s) *ProtoControl* BCVE invalid, packet discarded - OPCODE: %s DATA: %s HMAC LENGTH: %s HMAC: %s SRC IP: %s SRC PORT: %s', self._system, _packet[:4], repr(_packet[:53]), len(_packet[53:]), repr(_packet[53:]),h,p) + + if _packet[:4] == BCTO: + if self._config['VER'] > 5: + _hash = _packet[13:] + _h = blake2b(key=self._config['PASSPHRASE'], digest_size=16) + _h.update(_packet[:13]) + _hash2 = _h.digest() + _src[4:8] + _dst[8:12] + _ver[12:13] + if _hash == _hash2: + logger.trace('(%s) *ProtoControl* BCTO received: %s connected to %s with proto ver. %s'. self._system, int_id(_src), int_id(_dst), int.from_bytes_ver,'big') + self.retransmit_bcto(_packet[:13]) + self.process_bcto(src,dst,ver) + else: + h,p = _sockaddr + logger.warning('(%s) *ProtoControl* BCTO invalid, packet discarded - OPCODE: %s DATA: %s HMAC LENGTH: %s HMAC: %s SRC IP: %s SRC PORT: %s', self._system, _packet[:4], repr(_packet[:53]), len(_packet[53:]),repr(_packet[53:]),h,p) +