From bb63139e0bed3927ae47624e40b332ebb3cb4a94 Mon Sep 17 00:00:00 2001 From: "Esteban Mackay Q." <49044505+hp3icc@users.noreply.github.com> Date: Fri, 24 Oct 2025 17:21:24 -0500 Subject: [PATCH] Implement global stream deduplication and TGID updates Added global stream logging to deduplicate streams from multiple OBPs --- bridge_master.py | 81 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 77 insertions(+), 4 deletions(-) diff --git a/bridge_master.py b/bridge_master.py index aa4555f..12a0b12 100644 --- a/bridge_master.py +++ b/bridge_master.py @@ -73,6 +73,13 @@ import pickle # The module needs logging, but handlers, etc. are controlled by the parent import logging logger = logging.getLogger(__name__) + +# ======================== +# NUEVO: Registro global para deduplicación de streams desde múltiples OBP +# ======================== +GLOBAL_STREAM_LOG = {} +# ======================== + #REGEX import re from binascii import b2a_hex as ahex @@ -85,6 +92,7 @@ __credits__ = 'Colin Durbridge, G4EML, Steve Zingman, N4IRS; Mike Zingman, N4 __license__ = 'GNU GPLv3' __maintainer__ = 'Simon Adlem G7RZU' __email__ = 'simon@gb7fr.org.uk' + #Set header bits #used for slot rewrite and type rewrite def header(slot,call_type,bits): @@ -94,6 +102,7 @@ def header(slot,call_type,bits): if call_type == 'unit': bits = 0b00000011 | bits return bits + # Timed loop used for reporting HBP status # # REPORT BASED ON THE TYPE SELECTED IN THE MAIN CONFIG FILE @@ -116,6 +125,7 @@ def config_reports(_config, _factory): reporting = task.LoopingCall(reporting_loop, logger, report_server) reporting.start(_config['REPORTS']['REPORT_INTERVAL']) return report_server + # Start API server def config_API(_config, _bridges): application = Application([FD_API], @@ -130,6 +140,7 @@ def config_API(_config, _bridges): site = Site(resource) r = reactor.listenTCP(8000, site, interface='0.0.0.0') return(r) + # Import Bridging rules # Note: A stanza *must* exist for any MASTER or CLIENT configured in the main # configuration file and listed as "active". It can be empty, @@ -175,6 +186,7 @@ def make_bridges(_rules): if ts2 == False: _rules[_bridge].append({'SYSTEM': _confsystem, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [bytes_3(4000)],'ON': [],'RESET': [], 'TIMER': time()}) return _rules + ### MODIFIED: Updated to handle all special TGIDs (9990-9999) with a 1-minute timeout def make_single_bridge(_tgid,_sourcesystem,_slot,_tmout): _tgid_s = str(int_id(_tgid)) @@ -199,6 +211,7 @@ def make_single_bridge(_tgid,_sourcesystem,_slot,_tmout): if _system[0:3] == 'OBP' and (int_id(_tgid) >= 79 and (int_id(_tgid) < 9990 or int_id(_tgid) > 9999)): BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'NONE','OFF': [],'ON': [],'RESET': [], 'TIMER': time()}) ### END MODIFIED ### + #Make static bridge - used for on-the-fly relay bridges def make_stat_bridge(_tgid): _tgid_s = str(int_id(_tgid)) @@ -211,6 +224,7 @@ def make_stat_bridge(_tgid): BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 2, 'TGID': _tgid,'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [_tgid,],'RESET': [], 'TIMER': time()}) if _system[0:3] == 'OBP': BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'STAT','OFF': [],'ON': [],'RESET': [], 'TIMER': time()}) + def make_default_reflector(reflector,_tmout,system): bridge = ''.join(['#',str(reflector)]) #_tmout = CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER'] @@ -224,6 +238,7 @@ def make_default_reflector(reflector,_tmout,system): else: bridgetemp.append(bridgesystem) BRIDGES[bridge] = bridgetemp + def make_static_tg(tg,ts,_tmout,system): #_tmout = CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER'] if str(tg) not in BRIDGES: @@ -235,6 +250,7 @@ def make_static_tg(tg,ts,_tmout,system): else: bridgetemp.append(bridgesystem) BRIDGES[str(tg)] = bridgetemp + def reset_static_tg(tg,ts,_tmout,system): #_tmout = CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER'] bridgetemp = deque() @@ -248,6 +264,7 @@ def reset_static_tg(tg,ts,_tmout,system): except KeyError: logger.exception('(%s) KeyError in reset_static_tg() - bridge gone away? TG: %s',system,tg) return + def reset_all_reflector_system(_tmout,system): for system in CONFIG['SYSTEMS']: for bridge in BRIDGES: @@ -259,6 +276,7 @@ def reset_all_reflector_system(_tmout,system): else: bridgetemp.append(bridgesystem) BRIDGES[bridge] = bridgetemp + ### MODIFIED: Updated to handle all special TGIDs (9990-9999) with a 1-minute timeout def make_single_reflector(_tgid,_tmout,_sourcesystem): _tgid_s = str(int_id(_tgid)) @@ -278,6 +296,7 @@ def make_single_reflector(_tgid,_tmout,_sourcesystem): if _system[0:3] == 'OBP' and (int_id(_tgid) >= 79 and (int_id(_tgid) < 9990 or int_id(_tgid) > 9999)): BRIDGES[_bridge].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'NONE','OFF': [],'ON': [],'RESET': [], 'TIMER': time()}) ### END MODIFIED ### + def remove_bridge_system(system): _bridgestemp = {} _bridgetemp = {} @@ -292,6 +311,7 @@ def remove_bridge_system(system): _bridgestemp[_bridge] = [] _bridgestemp[_bridge].append({'SYSTEM': system, 'TS': _bridgesystem['TS'], 'TGID': _bridgesystem['TGID'],'ACTIVE': False,'TIMEOUT': _bridgesystem['TIMEOUT'],'TO_TYPE': 'ON','OFF': [],'ON': [_bridgesystem['TGID'],],'RESET': [], 'TIMER': time() + _bridgesystem['TIMEOUT']}) BRIDGES.update(_bridgestemp) + def deactivate_all_dynamic_bridges(system_name): """Desactiva todos los bridges dinámicos (no estáticos, no reflectores) de un sistema.""" for _bridge in BRIDGES: @@ -303,6 +323,7 @@ def deactivate_all_dynamic_bridges(system_name): _sys_entry['ACTIVE'] = False logger.info('(ROUTER) Deactivated dynamic bridge due to TG/ID 4000: System: %s, Bridge: %s, TS: %s, TGID: %s', system_name, _bridge, _sys_entry['TS'], int_id(_sys_entry['TGID'])) + ### MODIFIED: Core logic updated to handle special TGIDs (9990-9999) correctly with SINGLE_MODE def rule_timer_loop(): logger.debug('(ROUTER) routerHBP Rule timer loop started') @@ -345,7 +366,7 @@ def rule_timer_loop(): if _system['ACTIVE'] == False: # Activar inmediatamente sin timer _system['ACTIVE'] = True - _bridge_used = True + _bridge_used = True logger.info('(ROUTER) Conference Bridge ACTIVATED (NO TIMEOUT): System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) else: _bridge_used = True @@ -393,6 +414,7 @@ def rule_timer_loop(): if CONFIG['REPORTS']['REPORT']: report_server.send_clients(b'bridge updated') ### END MODIFIED ### + def statTrimmer(): logger.debug('(ROUTER) STAT trimmer loop started') _remove_bridges = deque() @@ -413,6 +435,7 @@ def statTrimmer(): logger.debug('(ROUTER) STAT bridge %s removed',_bridgerem) if CONFIG['REPORTS']['REPORT']: report_server.send_clients(b'bridge updated') + #Debug and fix bridge table issues. def bridgeDebug(): logger.debug('(BRIDGEDEBUG) Running bridge debug') @@ -465,6 +488,7 @@ def bridgeDebug(): bridgetemp.append(bridgesystem) BRIDGES[_bridge] = bridgetemp logger.info('(BRIDGEDEBUG) The server currently has %s STATic bridges',statroll) + def kaReporting(): logger.debug('(ROUTER) KeepAlive reporting loop started') for system in systems: @@ -474,6 +498,7 @@ def kaReporting(): logger.warning('(ROUTER) not sending to system %s as KeepAlive never seen',system) elif CONFIG['SYSTEMS'][system]['_bcka'] < time() - 60: logger.warning('(ROUTER) not sending to system %s as last KeepAlive was %s seconds ago',system, int(time() - CONFIG['SYSTEMS'][system]['_bcka'])) + #Write SUB_MAP to disk def subMapWrite(): try: @@ -483,6 +508,7 @@ def subMapWrite(): logger.info('(SUBSCRIBER) Writing SUB_MAP to disk') except: logger.warning('(SUBSCRIBER) Cannot write SUB_MAP to file') + #Subscriber Map trimmer loop def SubMapTrimmer(): logger.debug('(SUBSCRIBER) Subscriber Map trimmer loop started') @@ -495,6 +521,7 @@ def SubMapTrimmer(): SUB_MAP.pop(_remove) if CONFIG['ALIASES']['SUB_MAP_FILE']: subMapWrite() + # run this every 10 seconds to trim stream ids def stream_trimmer_loop(): logger.debug('(ROUTER) Trimming inactive stream IDs from system lists') @@ -593,6 +620,7 @@ def stream_trimmer_loop(): pass else: logger.debug('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS]) + def sendVoicePacket(self,pkt,_source_id,_dest_id,_slot): _stream_id = pkt[16:20] _pkt_time = time() @@ -609,6 +637,7 @@ def sendVoicePacket(self,pkt,_source_id,_dest_id,_slot): systems[system].STATUS[_stream_id]['LAST'] = _pkt_time _slot['TX_TIME'] = _pkt_time self.send_system(pkt) + def sendSpeech(self,speech): logger.debug('(%s) Inside sendspeech thread',self._system) sleep(1) @@ -624,6 +653,7 @@ def sendSpeech(self,speech): sleep(0.058) reactor.callFromThread(sendVoicePacket,self,pkt,_source_id,_nine,_slot) logger.debug('(%s) Sendspeech thread ended',self._system) + def disconnectedVoice(system): _nine = bytes_3(9) _source_id = bytes_3(5000) @@ -658,6 +688,7 @@ def disconnectedVoice(system): _pkt_time = time() reactor.callFromThread(sendVoicePacket,systems[system],pkt,_source_id,_nine,_slot) logger.debug('(%s) disconnected voice thread end',system) + def playFileOnRequest(self,fileNumber): system = self._system _lang = CONFIG['SYSTEMS'][system]['ANNOUNCEMENT_LANGUAGE'] @@ -685,17 +716,22 @@ def playFileOnRequest(self,fileNumber): _pkt_time = time() reactor.callFromThread(sendVoicePacket,self,pkt,_source_id,_nine,_slot) logger.debug('(%s) Sending AMBE file %s end',system,fileNumber) + def threadIdent(): logger.debug('(IDENT) starting ident thread') reactor.callInThread(ident) + def threadAlias(): logger.debug('(ALIAS) starting alias thread') reactor.callInThread(aliasb) + def setAlias(_peer_ids,_subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums): peer_ids, subscriber_ids, talkgroup_ids,local_subscriber_ids,server_ids,checksums = _peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids,_server_ids,_checksums + def aliasb(): _peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums = mk_aliases(CONFIG) reactor.callInThread(setAlias,_peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums) + def ident(): for system in systems: if CONFIG['SYSTEMS'][system]['MODE'] != 'MASTER': @@ -759,6 +795,7 @@ def ident(): _stream_id = pkt[16:20] _pkt_time = time() reactor.callFromThread(sendVoicePacket,systems[system],pkt,_source_id,_dst_id,_slot) + def bridge_reset(): logger.debug('(BRIDGERESET) Running bridge resetter') for _system in CONFIG['SYSTEMS']: @@ -771,6 +808,7 @@ def bridge_reset(): pass CONFIG['SYSTEMS'][_system]['_reset'] = False CONFIG['SYSTEMS'][_system]['_resetlog'] = False + def options_config(): logger.debug('(OPTIONS) Running options parser') prohibitedTGs = [0,1,2,3,4,5,9,9990,9991,9992,9993,9994,9995,9996,9997,9998,9999] @@ -1006,6 +1044,7 @@ class routerOBP(OPENBRIDGE): def __init__(self, _name, _config, _report): OPENBRIDGE.__init__(self, _name, _config, _report) self.STATUS = {} + def get_rptr(self,_sid): _int_peer_id = int_id(_sid) if _int_peer_id in local_subscriber_ids: @@ -1016,6 +1055,7 @@ class routerOBP(OPENBRIDGE): return peer_ids[_int_peer_id] else: return _int_peer_id + def to_target(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,_noOBP,sysIgnore, _hops = b'', _source_server = b'\x00\x00\x00\x00', _ber = b'\x00', _rssi = b'\x00', _source_rptr = b'\x00\x00\x00\x00'): _sysIgnore = sysIgnore for _target in BRIDGES[_bridge]: @@ -1175,6 +1215,7 @@ class routerOBP(OPENBRIDGE): #logger.debug('(%s) Packet routed by bridge: %s to system: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) #Ignore this system and TS pair if it's called again on this packet return(_sysIgnore) + def sendDataToHBP(self,_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id): _int_dst_id = int_id(_dst_id) #Assemble transmit HBP packet header @@ -1184,6 +1225,7 @@ class routerOBP(OPENBRIDGE): logger.debug('(%s) UNIT Data Bridged to HBP on slot 1: %s DST_ID: %s',self._system,_d_system,_int_dst_id) if CONFIG['REPORTS']['REPORT']: systems[_d_system]._report.send_bridgeEvent('UNIT DATA,DATA,TX,{},{},{},{},{},{}'.format(_d_system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) + def sendDataToOBP(self,_target,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_hops = b'',_source_server = b'\x00\x00\x00\x00', _ber = b'\x00', _rssi = b'\x00', _source_rptr = b'\x00\x00\x00\x00'): _int_dst_id = int_id(_dst_id) _target_status = systems[_target].STATUS @@ -1217,6 +1259,7 @@ class routerOBP(OPENBRIDGE): logger.debug('(%s) UNIT Data Bridged to OBP System: %s DST_ID: %s', self._system, _target,_int_dst_id) if CONFIG['REPORTS']['REPORT']: systems[_target]._report.send_bridgeEvent('UNIT DATA,DATA,TX,{},{},{},{},{},{}'.format(_target, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) + def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data,_hash, _hops = b'', _source_server = b'\x00\x00\x00\x00', _ber = b'\x00', _rssi = b'\x00', _source_rptr = b'\x00\x00\x00\x00'): pkt_time = time() dmrpkt = _data[20:53] @@ -1224,6 +1267,7 @@ class routerOBP(OPENBRIDGE): _h = blake2b(digest_size=16) _h.update(_data) _pkt_crc = _h.digest() + # ======================== # NUEVO: Procesar _hops # ======================== @@ -1233,6 +1277,28 @@ class routerOBP(OPENBRIDGE): return _new_hops = (_hop_count + 1).to_bytes(1, 'big') + # ======================== + # NUEVO: Deduplicación global de streams desde múltiples OBP + # ======================== + if self._CONFIG['SYSTEMS'][self._system]['MODE'] == 'OPENBRIDGE': + _now = pkt_time + _stream_key = (_stream_id, _dst_id) + + # Limpiar entradas antiguas (> 2 segundos) + for key in list(GLOBAL_STREAM_LOG.keys()): + if _now - GLOBAL_STREAM_LOG[key] > 2.0: + del GLOBAL_STREAM_LOG[key] + + # Si ya vimos este stream+TG desde otro OBP recientemente, ignorar + if _stream_key in GLOBAL_STREAM_LOG: + logger.debug('(%s) DUPLICATE STREAM from another OBP: STREAM_ID=%s TGID=%s, source_server=%s, ignoring', + self._system, int_id(_stream_id), int_id(_dst_id), int_id(_source_server)) + return + + # Registrar este stream como visto + GLOBAL_STREAM_LOG[_stream_key] = _now + # ======================== + # Match UNIT data, SMS/GPS, and send it to the dst_id if it is in SUB_MAP if _call_type == 'unit' and (_dtype_vseq == 6 or _dtype_vseq == 7 or _dtype_vseq == 8 or ((_stream_id not in self.STATUS) and _dtype_vseq == 3)): _int_dst_id = int_id(_dst_id) @@ -1247,7 +1313,7 @@ class routerOBP(OPENBRIDGE): 'CONTENTION':False, 'RFS': _rf_src, 'TGID': _dst_id, - # '1ST': perf_counter(), # ← ELIMINADO + # '1ST': perf_counter(), # â ELIMINADO 'lastSeq': False, 'lastData': False, 'RX_PEER': _peer_id, @@ -1302,6 +1368,7 @@ class routerOBP(OPENBRIDGE): else: logger.debug('(%s) UNIT Data not bridged to HBP on slot %s - target busy: %s DST_ID: %s',self._system,_d_slot,_d_system,_int_dst_id) self.STATUS[_stream_id]['crcs'].add(_pkt_crc) + if _call_type == 'group' or _call_type == 'vcsbk': # Is this a new call stream? if (_stream_id not in self.STATUS): @@ -1311,7 +1378,7 @@ class routerOBP(OPENBRIDGE): 'CONTENTION':False, 'RFS': _rf_src, 'TGID': _dst_id, - # '1ST': perf_counter(), # ← ELIMINADO + # '1ST': perf_counter(), # â ELIMINADO 'lastSeq': False, 'lastData': False, 'RX_PEER': _peer_id, @@ -1481,6 +1548,7 @@ class routerHBP(HBSYSTEM): } } self.CALL_DATA = [] + def to_target(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,_noOBP,sysIgnore,_source_server, _ber, _rssi, _source_rptr): _sysIgnore = sysIgnore for _target in BRIDGES[_bridge]: @@ -1631,6 +1699,7 @@ class routerHBP(HBSYSTEM): # Transmit the packet to the destination system systems[_target['SYSTEM']].send_system(_tmp_data,b'',_ber,_rssi,_source_server, _source_rptr) return _sysIgnore + def sendDataToHBP(self,_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id): #Assemble transmit HBP packet header _int_dst_id = int_id(_dst_id) @@ -1640,6 +1709,7 @@ class routerHBP(HBSYSTEM): logger.debug('(%s) UNIT Data Bridged to HBP on slot 1: %s DST_ID: %s',self._system,_d_system,_int_dst_id) if CONFIG['REPORTS']['REPORT']: systems[_d_system]._report.send_bridgeEvent('UNIT DATA,DATA,TX,{},{},{},{},{},{}'.format(_d_system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) + def sendDataToOBP(self,_target,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_hops = b'',_ber = b'\x00', _rssi = b'\x00',_source_server = b'\x00\x00\x00\x00', _source_rptr = b'\x00\x00\x00\x00'): # _sysIgnore = sysIgnore _source_server = self._CONFIG['GLOBAL']['SERVER_ID'] @@ -1677,6 +1747,7 @@ class routerHBP(HBSYSTEM): logger.debug('(%s) UNIT Data Bridged to OBP System: %s DST_ID: %s', self._system, _target,_int_dst_id) if CONFIG['REPORTS']['REPORT']: systems[system]._report.send_bridgeEvent('UNIT DATA,DATA,TX,{},{},{},{},{},{}'.format(_target, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) + def pvt_call_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _frame_type, _dtype_vseq, _stream_id, _data): pkt_time = time() dmrpkt = _data[20:53] @@ -1776,6 +1847,7 @@ class routerHBP(HBSYSTEM): self.STATUS[_slot]['RX_TGID'] = _dst_id self.STATUS[_slot]['RX_TIME'] = pkt_time self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id + def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data): try: if CONFIG['SYSTEMS'][self._system]['_reset'] and not CONFIG['SYSTEMS'][self._system]['_resetlog']: @@ -1906,7 +1978,7 @@ class routerHBP(HBSYSTEM): if _call_type == 'unit' and _int_dst_id == 4000: logger.info('(%s) Private call to ID 4000 received on TS %s - deactivating all dynamic bridges', self._system, _slot) deactivate_all_dynamic_bridges(self._system) - # Opcional: no procesar más reglas para este paquete + # Opcional: no procesar más reglas para este paquete return #Handle Private Calls if _call_type == 'unit' and len(str(_int_dst_id)) == 7: @@ -2249,6 +2321,7 @@ class bridgeReportFactory(reportFactory): if isinstance(_data, str): _data = _data.decode('utf-8', error='ignore') self.send_clients(b''.join([REPORT_OPCODES['BRDG_EVENT'],_data])) + #************************************************ # MAIN PROGRAM LOOP STARTS HERE #************************************************