From bb63139e0bed3927ae47624e40b332ebb3cb4a94 Mon Sep 17 00:00:00 2001 From: "Esteban Mackay Q." <49044505+hp3icc@users.noreply.github.com> Date: Fri, 24 Oct 2025 17:21:24 -0500 Subject: [PATCH 1/3] Implement global stream deduplication and TGID updates Added global stream logging to deduplicate streams from multiple OBPs --- bridge_master.py | 81 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 77 insertions(+), 4 deletions(-) diff --git a/bridge_master.py b/bridge_master.py index aa4555f..12a0b12 100644 --- a/bridge_master.py +++ b/bridge_master.py @@ -73,6 +73,13 @@ import pickle # The module needs logging, but handlers, etc. are controlled by the parent import logging logger = logging.getLogger(__name__) + +# ======================== +# NUEVO: Registro global para deduplicación de streams desde múltiples OBP +# ======================== +GLOBAL_STREAM_LOG = {} +# ======================== + #REGEX import re from binascii import b2a_hex as ahex @@ -85,6 +92,7 @@ __credits__ = 'Colin Durbridge, G4EML, Steve Zingman, N4IRS; Mike Zingman, N4 __license__ = 'GNU GPLv3' __maintainer__ = 'Simon Adlem G7RZU' __email__ = 'simon@gb7fr.org.uk' + #Set header bits #used for slot rewrite and type rewrite def header(slot,call_type,bits): @@ -94,6 +102,7 @@ def header(slot,call_type,bits): if call_type == 'unit': bits = 0b00000011 | bits return bits + # Timed loop used for reporting HBP status # # REPORT BASED ON THE TYPE SELECTED IN THE MAIN CONFIG FILE @@ -116,6 +125,7 @@ def config_reports(_config, _factory): reporting = task.LoopingCall(reporting_loop, logger, report_server) reporting.start(_config['REPORTS']['REPORT_INTERVAL']) return report_server + # Start API server def config_API(_config, _bridges): application = Application([FD_API], @@ -130,6 +140,7 @@ def config_API(_config, _bridges): site = Site(resource) r = reactor.listenTCP(8000, site, interface='0.0.0.0') return(r) + # Import Bridging rules # Note: A stanza *must* exist for any MASTER or CLIENT configured in the main # configuration file and listed as "active". It can be empty, @@ -175,6 +186,7 @@ def make_bridges(_rules): if ts2 == False: _rules[_bridge].append({'SYSTEM': _confsystem, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [bytes_3(4000)],'ON': [],'RESET': [], 'TIMER': time()}) return _rules + ### MODIFIED: Updated to handle all special TGIDs (9990-9999) with a 1-minute timeout def make_single_bridge(_tgid,_sourcesystem,_slot,_tmout): _tgid_s = str(int_id(_tgid)) @@ -199,6 +211,7 @@ def make_single_bridge(_tgid,_sourcesystem,_slot,_tmout): if _system[0:3] == 'OBP' and (int_id(_tgid) >= 79 and (int_id(_tgid) < 9990 or int_id(_tgid) > 9999)): BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'NONE','OFF': [],'ON': [],'RESET': [], 'TIMER': time()}) ### END MODIFIED ### + #Make static bridge - used for on-the-fly relay bridges def make_stat_bridge(_tgid): _tgid_s = str(int_id(_tgid)) @@ -211,6 +224,7 @@ def make_stat_bridge(_tgid): BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 2, 'TGID': _tgid,'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [_tgid,],'RESET': [], 'TIMER': time()}) if _system[0:3] == 'OBP': BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'STAT','OFF': [],'ON': [],'RESET': [], 'TIMER': time()}) + def make_default_reflector(reflector,_tmout,system): bridge = ''.join(['#',str(reflector)]) #_tmout = CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER'] @@ -224,6 +238,7 @@ def make_default_reflector(reflector,_tmout,system): else: bridgetemp.append(bridgesystem) BRIDGES[bridge] = bridgetemp + def make_static_tg(tg,ts,_tmout,system): #_tmout = CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER'] if str(tg) not in BRIDGES: @@ -235,6 +250,7 @@ def make_static_tg(tg,ts,_tmout,system): else: bridgetemp.append(bridgesystem) BRIDGES[str(tg)] = bridgetemp + def reset_static_tg(tg,ts,_tmout,system): #_tmout = CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER'] bridgetemp = deque() @@ -248,6 +264,7 @@ def reset_static_tg(tg,ts,_tmout,system): except KeyError: logger.exception('(%s) KeyError in reset_static_tg() - bridge gone away? TG: %s',system,tg) return + def reset_all_reflector_system(_tmout,system): for system in CONFIG['SYSTEMS']: for bridge in BRIDGES: @@ -259,6 +276,7 @@ def reset_all_reflector_system(_tmout,system): else: bridgetemp.append(bridgesystem) BRIDGES[bridge] = bridgetemp + ### MODIFIED: Updated to handle all special TGIDs (9990-9999) with a 1-minute timeout def make_single_reflector(_tgid,_tmout,_sourcesystem): _tgid_s = str(int_id(_tgid)) @@ -278,6 +296,7 @@ def make_single_reflector(_tgid,_tmout,_sourcesystem): if _system[0:3] == 'OBP' and (int_id(_tgid) >= 79 and (int_id(_tgid) < 9990 or int_id(_tgid) > 9999)): BRIDGES[_bridge].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'NONE','OFF': [],'ON': [],'RESET': [], 'TIMER': time()}) ### END MODIFIED ### + def remove_bridge_system(system): _bridgestemp = {} _bridgetemp = {} @@ -292,6 +311,7 @@ def remove_bridge_system(system): _bridgestemp[_bridge] = [] _bridgestemp[_bridge].append({'SYSTEM': system, 'TS': _bridgesystem['TS'], 'TGID': _bridgesystem['TGID'],'ACTIVE': False,'TIMEOUT': _bridgesystem['TIMEOUT'],'TO_TYPE': 'ON','OFF': [],'ON': [_bridgesystem['TGID'],],'RESET': [], 'TIMER': time() + _bridgesystem['TIMEOUT']}) BRIDGES.update(_bridgestemp) + def deactivate_all_dynamic_bridges(system_name): """Desactiva todos los bridges dinámicos (no estáticos, no reflectores) de un sistema.""" for _bridge in BRIDGES: @@ -303,6 +323,7 @@ def deactivate_all_dynamic_bridges(system_name): _sys_entry['ACTIVE'] = False logger.info('(ROUTER) Deactivated dynamic bridge due to TG/ID 4000: System: %s, Bridge: %s, TS: %s, TGID: %s', system_name, _bridge, _sys_entry['TS'], int_id(_sys_entry['TGID'])) + ### MODIFIED: Core logic updated to handle special TGIDs (9990-9999) correctly with SINGLE_MODE def rule_timer_loop(): logger.debug('(ROUTER) routerHBP Rule timer loop started') @@ -345,7 +366,7 @@ def rule_timer_loop(): if _system['ACTIVE'] == False: # Activar inmediatamente sin timer _system['ACTIVE'] = True - _bridge_used = True + _bridge_used = True logger.info('(ROUTER) Conference Bridge ACTIVATED (NO TIMEOUT): System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) else: _bridge_used = True @@ -393,6 +414,7 @@ def rule_timer_loop(): if CONFIG['REPORTS']['REPORT']: report_server.send_clients(b'bridge updated') ### END MODIFIED ### + def statTrimmer(): logger.debug('(ROUTER) STAT trimmer loop started') _remove_bridges = deque() @@ -413,6 +435,7 @@ def statTrimmer(): logger.debug('(ROUTER) STAT bridge %s removed',_bridgerem) if CONFIG['REPORTS']['REPORT']: report_server.send_clients(b'bridge updated') + #Debug and fix bridge table issues. def bridgeDebug(): logger.debug('(BRIDGEDEBUG) Running bridge debug') @@ -465,6 +488,7 @@ def bridgeDebug(): bridgetemp.append(bridgesystem) BRIDGES[_bridge] = bridgetemp logger.info('(BRIDGEDEBUG) The server currently has %s STATic bridges',statroll) + def kaReporting(): logger.debug('(ROUTER) KeepAlive reporting loop started') for system in systems: @@ -474,6 +498,7 @@ def kaReporting(): logger.warning('(ROUTER) not sending to system %s as KeepAlive never seen',system) elif CONFIG['SYSTEMS'][system]['_bcka'] < time() - 60: logger.warning('(ROUTER) not sending to system %s as last KeepAlive was %s seconds ago',system, int(time() - CONFIG['SYSTEMS'][system]['_bcka'])) + #Write SUB_MAP to disk def subMapWrite(): try: @@ -483,6 +508,7 @@ def subMapWrite(): logger.info('(SUBSCRIBER) Writing SUB_MAP to disk') except: logger.warning('(SUBSCRIBER) Cannot write SUB_MAP to file') + #Subscriber Map trimmer loop def SubMapTrimmer(): logger.debug('(SUBSCRIBER) Subscriber Map trimmer loop started') @@ -495,6 +521,7 @@ def SubMapTrimmer(): SUB_MAP.pop(_remove) if CONFIG['ALIASES']['SUB_MAP_FILE']: subMapWrite() + # run this every 10 seconds to trim stream ids def stream_trimmer_loop(): logger.debug('(ROUTER) Trimming inactive stream IDs from system lists') @@ -593,6 +620,7 @@ def stream_trimmer_loop(): pass else: logger.debug('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS]) + def sendVoicePacket(self,pkt,_source_id,_dest_id,_slot): _stream_id = pkt[16:20] _pkt_time = time() @@ -609,6 +637,7 @@ def sendVoicePacket(self,pkt,_source_id,_dest_id,_slot): systems[system].STATUS[_stream_id]['LAST'] = _pkt_time _slot['TX_TIME'] = _pkt_time self.send_system(pkt) + def sendSpeech(self,speech): logger.debug('(%s) Inside sendspeech thread',self._system) sleep(1) @@ -624,6 +653,7 @@ def sendSpeech(self,speech): sleep(0.058) reactor.callFromThread(sendVoicePacket,self,pkt,_source_id,_nine,_slot) logger.debug('(%s) Sendspeech thread ended',self._system) + def disconnectedVoice(system): _nine = bytes_3(9) _source_id = bytes_3(5000) @@ -658,6 +688,7 @@ def disconnectedVoice(system): _pkt_time = time() reactor.callFromThread(sendVoicePacket,systems[system],pkt,_source_id,_nine,_slot) logger.debug('(%s) disconnected voice thread end',system) + def playFileOnRequest(self,fileNumber): system = self._system _lang = CONFIG['SYSTEMS'][system]['ANNOUNCEMENT_LANGUAGE'] @@ -685,17 +716,22 @@ def playFileOnRequest(self,fileNumber): _pkt_time = time() reactor.callFromThread(sendVoicePacket,self,pkt,_source_id,_nine,_slot) logger.debug('(%s) Sending AMBE file %s end',system,fileNumber) + def threadIdent(): logger.debug('(IDENT) starting ident thread') reactor.callInThread(ident) + def threadAlias(): logger.debug('(ALIAS) starting alias thread') reactor.callInThread(aliasb) + def setAlias(_peer_ids,_subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums): peer_ids, subscriber_ids, talkgroup_ids,local_subscriber_ids,server_ids,checksums = _peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids,_server_ids,_checksums + def aliasb(): _peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums = mk_aliases(CONFIG) reactor.callInThread(setAlias,_peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums) + def ident(): for system in systems: if CONFIG['SYSTEMS'][system]['MODE'] != 'MASTER': @@ -759,6 +795,7 @@ def ident(): _stream_id = pkt[16:20] _pkt_time = time() reactor.callFromThread(sendVoicePacket,systems[system],pkt,_source_id,_dst_id,_slot) + def bridge_reset(): logger.debug('(BRIDGERESET) Running bridge resetter') for _system in CONFIG['SYSTEMS']: @@ -771,6 +808,7 @@ def bridge_reset(): pass CONFIG['SYSTEMS'][_system]['_reset'] = False CONFIG['SYSTEMS'][_system]['_resetlog'] = False + def options_config(): logger.debug('(OPTIONS) Running options parser') prohibitedTGs = [0,1,2,3,4,5,9,9990,9991,9992,9993,9994,9995,9996,9997,9998,9999] @@ -1006,6 +1044,7 @@ class routerOBP(OPENBRIDGE): def __init__(self, _name, _config, _report): OPENBRIDGE.__init__(self, _name, _config, _report) self.STATUS = {} + def get_rptr(self,_sid): _int_peer_id = int_id(_sid) if _int_peer_id in local_subscriber_ids: @@ -1016,6 +1055,7 @@ class routerOBP(OPENBRIDGE): return peer_ids[_int_peer_id] else: return _int_peer_id + def to_target(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,_noOBP,sysIgnore, _hops = b'', _source_server = b'\x00\x00\x00\x00', _ber = b'\x00', _rssi = b'\x00', _source_rptr = b'\x00\x00\x00\x00'): _sysIgnore = sysIgnore for _target in BRIDGES[_bridge]: @@ -1175,6 +1215,7 @@ class routerOBP(OPENBRIDGE): #logger.debug('(%s) Packet routed by bridge: %s to system: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) #Ignore this system and TS pair if it's called again on this packet return(_sysIgnore) + def sendDataToHBP(self,_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id): _int_dst_id = int_id(_dst_id) #Assemble transmit HBP packet header @@ -1184,6 +1225,7 @@ class routerOBP(OPENBRIDGE): logger.debug('(%s) UNIT Data Bridged to HBP on slot 1: %s DST_ID: %s',self._system,_d_system,_int_dst_id) if CONFIG['REPORTS']['REPORT']: systems[_d_system]._report.send_bridgeEvent('UNIT DATA,DATA,TX,{},{},{},{},{},{}'.format(_d_system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) + def sendDataToOBP(self,_target,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_hops = b'',_source_server = b'\x00\x00\x00\x00', _ber = b'\x00', _rssi = b'\x00', _source_rptr = b'\x00\x00\x00\x00'): _int_dst_id = int_id(_dst_id) _target_status = systems[_target].STATUS @@ -1217,6 +1259,7 @@ class routerOBP(OPENBRIDGE): logger.debug('(%s) UNIT Data Bridged to OBP System: %s DST_ID: %s', self._system, _target,_int_dst_id) if CONFIG['REPORTS']['REPORT']: systems[_target]._report.send_bridgeEvent('UNIT DATA,DATA,TX,{},{},{},{},{},{}'.format(_target, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) + def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data,_hash, _hops = b'', _source_server = b'\x00\x00\x00\x00', _ber = b'\x00', _rssi = b'\x00', _source_rptr = b'\x00\x00\x00\x00'): pkt_time = time() dmrpkt = _data[20:53] @@ -1224,6 +1267,7 @@ class routerOBP(OPENBRIDGE): _h = blake2b(digest_size=16) _h.update(_data) _pkt_crc = _h.digest() + # ======================== # NUEVO: Procesar _hops # ======================== @@ -1233,6 +1277,28 @@ class routerOBP(OPENBRIDGE): return _new_hops = (_hop_count + 1).to_bytes(1, 'big') + # ======================== + # NUEVO: Deduplicación global de streams desde múltiples OBP + # ======================== + if self._CONFIG['SYSTEMS'][self._system]['MODE'] == 'OPENBRIDGE': + _now = pkt_time + _stream_key = (_stream_id, _dst_id) + + # Limpiar entradas antiguas (> 2 segundos) + for key in list(GLOBAL_STREAM_LOG.keys()): + if _now - GLOBAL_STREAM_LOG[key] > 2.0: + del GLOBAL_STREAM_LOG[key] + + # Si ya vimos este stream+TG desde otro OBP recientemente, ignorar + if _stream_key in GLOBAL_STREAM_LOG: + logger.debug('(%s) DUPLICATE STREAM from another OBP: STREAM_ID=%s TGID=%s, source_server=%s, ignoring', + self._system, int_id(_stream_id), int_id(_dst_id), int_id(_source_server)) + return + + # Registrar este stream como visto + GLOBAL_STREAM_LOG[_stream_key] = _now + # ======================== + # Match UNIT data, SMS/GPS, and send it to the dst_id if it is in SUB_MAP if _call_type == 'unit' and (_dtype_vseq == 6 or _dtype_vseq == 7 or _dtype_vseq == 8 or ((_stream_id not in self.STATUS) and _dtype_vseq == 3)): _int_dst_id = int_id(_dst_id) @@ -1247,7 +1313,7 @@ class routerOBP(OPENBRIDGE): 'CONTENTION':False, 'RFS': _rf_src, 'TGID': _dst_id, - # '1ST': perf_counter(), # ← ELIMINADO + # '1ST': perf_counter(), # â ELIMINADO 'lastSeq': False, 'lastData': False, 'RX_PEER': _peer_id, @@ -1302,6 +1368,7 @@ class routerOBP(OPENBRIDGE): else: logger.debug('(%s) UNIT Data not bridged to HBP on slot %s - target busy: %s DST_ID: %s',self._system,_d_slot,_d_system,_int_dst_id) self.STATUS[_stream_id]['crcs'].add(_pkt_crc) + if _call_type == 'group' or _call_type == 'vcsbk': # Is this a new call stream? if (_stream_id not in self.STATUS): @@ -1311,7 +1378,7 @@ class routerOBP(OPENBRIDGE): 'CONTENTION':False, 'RFS': _rf_src, 'TGID': _dst_id, - # '1ST': perf_counter(), # ← ELIMINADO + # '1ST': perf_counter(), # â ELIMINADO 'lastSeq': False, 'lastData': False, 'RX_PEER': _peer_id, @@ -1481,6 +1548,7 @@ class routerHBP(HBSYSTEM): } } self.CALL_DATA = [] + def to_target(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,_noOBP,sysIgnore,_source_server, _ber, _rssi, _source_rptr): _sysIgnore = sysIgnore for _target in BRIDGES[_bridge]: @@ -1631,6 +1699,7 @@ class routerHBP(HBSYSTEM): # Transmit the packet to the destination system systems[_target['SYSTEM']].send_system(_tmp_data,b'',_ber,_rssi,_source_server, _source_rptr) return _sysIgnore + def sendDataToHBP(self,_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id): #Assemble transmit HBP packet header _int_dst_id = int_id(_dst_id) @@ -1640,6 +1709,7 @@ class routerHBP(HBSYSTEM): logger.debug('(%s) UNIT Data Bridged to HBP on slot 1: %s DST_ID: %s',self._system,_d_system,_int_dst_id) if CONFIG['REPORTS']['REPORT']: systems[_d_system]._report.send_bridgeEvent('UNIT DATA,DATA,TX,{},{},{},{},{},{}'.format(_d_system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) + def sendDataToOBP(self,_target,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_hops = b'',_ber = b'\x00', _rssi = b'\x00',_source_server = b'\x00\x00\x00\x00', _source_rptr = b'\x00\x00\x00\x00'): # _sysIgnore = sysIgnore _source_server = self._CONFIG['GLOBAL']['SERVER_ID'] @@ -1677,6 +1747,7 @@ class routerHBP(HBSYSTEM): logger.debug('(%s) UNIT Data Bridged to OBP System: %s DST_ID: %s', self._system, _target,_int_dst_id) if CONFIG['REPORTS']['REPORT']: systems[system]._report.send_bridgeEvent('UNIT DATA,DATA,TX,{},{},{},{},{},{}'.format(_target, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) + def pvt_call_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _frame_type, _dtype_vseq, _stream_id, _data): pkt_time = time() dmrpkt = _data[20:53] @@ -1776,6 +1847,7 @@ class routerHBP(HBSYSTEM): self.STATUS[_slot]['RX_TGID'] = _dst_id self.STATUS[_slot]['RX_TIME'] = pkt_time self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id + def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data): try: if CONFIG['SYSTEMS'][self._system]['_reset'] and not CONFIG['SYSTEMS'][self._system]['_resetlog']: @@ -1906,7 +1978,7 @@ class routerHBP(HBSYSTEM): if _call_type == 'unit' and _int_dst_id == 4000: logger.info('(%s) Private call to ID 4000 received on TS %s - deactivating all dynamic bridges', self._system, _slot) deactivate_all_dynamic_bridges(self._system) - # Opcional: no procesar más reglas para este paquete + # Opcional: no procesar más reglas para este paquete return #Handle Private Calls if _call_type == 'unit' and len(str(_int_dst_id)) == 7: @@ -2249,6 +2321,7 @@ class bridgeReportFactory(reportFactory): if isinstance(_data, str): _data = _data.decode('utf-8', error='ignore') self.send_clients(b''.join([REPORT_OPCODES['BRDG_EVENT'],_data])) + #************************************************ # MAIN PROGRAM LOOP STARTS HERE #************************************************ From cbfd8a6d527f556a8e9e1b5cb33aa07e905bdecc Mon Sep 17 00:00:00 2001 From: "Esteban Mackay Q." <49044505+hp3icc@users.noreply.github.com> Date: Fri, 24 Oct 2025 18:01:05 -0500 Subject: [PATCH 2/3] Implement global stream logging and monitoring Added global stream logging with signal quality information and a monitoring function for active streams. --- bridge_master.py | 111 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 86 insertions(+), 25 deletions(-) diff --git a/bridge_master.py b/bridge_master.py index 12a0b12..c601407 100644 --- a/bridge_master.py +++ b/bridge_master.py @@ -76,8 +76,9 @@ logger = logging.getLogger(__name__) # ======================== # NUEVO: Registro global para deduplicación de streams desde múltiples OBP +# Ahora incluye información de calidad de señal # ======================== -GLOBAL_STREAM_LOG = {} +GLOBAL_STREAM_LOG = {} # Formato: {stream_key: {timestamp, src_system, src_server, ber, rssi, last_seen, packet_count}} # ======================== #REGEX @@ -399,7 +400,7 @@ def rule_timer_loop(): logger.info('(ROUTER) Conference Bridge INACTIVE (OFF timer running): System: %s Bridge: %s, TS: %s, TGID: %s, Timeout in: %.2fs,', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID']), timeout_in) elif _system['ACTIVE'] == True: _bridge_used = True - logger.debug('(ROUTER) Conference Bridge ACTIVE (no change): System: %s Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) + logger.debug('(ROUTER) Conference Bridge ACTIVE (no change): System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) else: if _system['SYSTEM'][0:3] != 'OBP': _bridge_used = True @@ -542,7 +543,7 @@ def stream_trimmer_loop(): logger.info('(%s) *TIME OUT* RX STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %.2f', \ system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_RFS']), int_id(_slot['RX_TGID']), slot, _slot['RX_TIME'] - _slot['RX_START']) if CONFIG['REPORTS']['REPORT']: - systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_PEER']), int_id(_slot['RX_RFS']), slot, int_id(_slot['RX_TGID']), _slot['RX_TIME'] - _slot['RX_START']).encode(encoding='utf-8', errors='ignore')) + systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_PEER']), int_id(_slot['RX_RFS']), slot, int_id(_slot['RX_TGID']), _slot['RX_TIME'] - _slot['RX_START']).encode(encoding='utf-8', errors='ignore')) #Null stream_id - for loop control if _slot['RX_TIME'] < _now - 60: _slot['RX_STREAM_ID'] = b'\x00' @@ -552,7 +553,7 @@ def stream_trimmer_loop(): logger.debug('(%s) *TIME OUT* TX STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %.2f', \ system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_RFS']), int_id(_slot['TX_TGID']), slot, _slot['TX_TIME'] - _slot['TX_START']) if CONFIG['REPORTS']['REPORT']: - systems[system]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_PEER']), int_id(_slot['TX_RFS']), slot, int_id(_slot['TX_TGID']), _slot['TX_TIME'] - _slot['TX_START']).encode(encoding='utf-8', errors='ignore')) + systems[system]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_PEER']), int_id(_slot['TX_RFS']), slot, int_id(_slot['TX_TGID']), _slot['TX_TIME'] - _slot['TX_START']).encode(encoding='utf-8', errors='ignore')) # OBP systems # We can't delete items from a dicationary that's being iterated, so we have to make a temporarly list of entrys to remove later if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': @@ -587,7 +588,7 @@ def stream_trimmer_loop(): logger.debug('(%s) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %.2f', \ system, int_id(stream_id), get_alias(int_id(_stream['RFS']), subscriber_ids), get_alias(int_id(_stream['RX_PEER']), peer_ids), get_alias(int_id(_stream['TGID']), talkgroup_ids), _stream['LAST'] - _stream['START']) if CONFIG['REPORTS']['REPORT']: - systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_stream['RX_PEER']), int_id(_stream['RFS']), 1, int_id(_stream['TGID']), _stream['LAST'] - _stream['START']).encode(encoding='utf-8', errors='ignore')) + systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_stream['RX_PEER']), int_id(_stream['RFS']), 1, int_id(_stream['TGID']), _stream['LAST'] - _stream['START']).encode(encoding='utf-8', errors='ignore')) systems[system].STATUS[stream_id]['_to'] = True continue except Exception as e: @@ -1040,6 +1041,20 @@ def options_config(): # ======================== MAX_HOPS = 15 # Máximo número de saltos permitidos en la malla +# ======================== +# NUEVO: Función para monitorear la calidad de los streams activos +# ======================== +def monitor_stream_quality(): + """Función para monitorear y reportar la calidad de los streams activos""" + logger.debug('(STREAM_MONITOR) Active streams quality report:') + for stream_key, stream_data in GLOBAL_STREAM_LOG.items(): + stream_id, tgid = stream_key + age = time() - stream_data['timestamp'] + logger.debug('(STREAM_MONITOR) Stream:%s TGID:%s Age:%.1fs BER:%s RSSI:%s Packets:%s Source:%s', + int_id(stream_id), int_id(tgid), age, stream_data['ber'], + stream_data['rssi'], stream_data['packet_count'], int_id(stream_data['src_server'])) +# ======================== + class routerOBP(OPENBRIDGE): def __init__(self, _name, _config, _report): OPENBRIDGE.__init__(self, _name, _config, _report) @@ -1153,12 +1168,12 @@ class routerOBP(OPENBRIDGE): self.STATUS[_stream_id]['CONTENTION'] = True logger.info('(%s) Call not routed to TGID%s, target in group hangtime: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID'])) continue - if (_target['TGID'] == _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < STREAM_TO): + if ((_target['TGID'] == _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < STREAM_TO)): if self.STATUS[_stream_id]['CONTENTION'] == False: self.STATUS[_stream_id]['CONTENTION'] = True logger.info('(%s) Call not routed to TGID%s, matching call already active on target: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['RX_TGID'])) continue - if (_target['TGID'] == _target_status[_target['TS']]['TX_TGID']) and (_rf_src != _target_status[_target['TS']]['TX_RFS']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < STREAM_TO): + if ((_target['TGID'] == _target_status[_target['TS']]['TX_TGID']) and (_rf_src != _target_status[_target['TS']]['TX_RFS']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < STREAM_TO)): if self.STATUS[_stream_id]['CONTENTION'] == False: self.STATUS[_stream_id]['CONTENTION'] = True logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS'])) @@ -1278,25 +1293,71 @@ class routerOBP(OPENBRIDGE): _new_hops = (_hop_count + 1).to_bytes(1, 'big') # ======================== - # NUEVO: Deduplicación global de streams desde múltiples OBP + # NUEVO: Deduplicación global con priorización por calidad de señal # ======================== if self._CONFIG['SYSTEMS'][self._system]['MODE'] == 'OPENBRIDGE': _now = pkt_time _stream_key = (_stream_id, _dst_id) - - # Limpiar entradas antiguas (> 2 segundos) + + # Extraer valores de calidad de señal + _ber_value = int.from_bytes(_ber, 'big') if _ber else 255 # 255 = sin señal/peor + _rssi_value = int.from_bytes(_rssi, 'big', signed=True) if _rssi else -120 # -120 = sin señal/peor + + # Limpiar entradas antiguas (configurable, por defecto 3 segundos) + _cleanup_time = 3.0 for key in list(GLOBAL_STREAM_LOG.keys()): - if _now - GLOBAL_STREAM_LOG[key] > 2.0: + if _now - GLOBAL_STREAM_LOG[key]['last_seen'] > _cleanup_time: del GLOBAL_STREAM_LOG[key] - - # Si ya vimos este stream+TG desde otro OBP recientemente, ignorar + + # Si ya existe este stream, decidir cuál mantener if _stream_key in GLOBAL_STREAM_LOG: - logger.debug('(%s) DUPLICATE STREAM from another OBP: STREAM_ID=%s TGID=%s, source_server=%s, ignoring', - self._system, int_id(_stream_id), int_id(_dst_id), int_id(_source_server)) - return - - # Registrar este stream como visto - GLOBAL_STREAM_LOG[_stream_key] = _now + _existing = GLOBAL_STREAM_LOG[_stream_key] + _existing_ber = _existing.get('ber', 255) + _existing_rssi = _existing.get('rssi', -120) + + # Calcular puntuación de calidad (BER tiene más peso) + _current_score = (_ber_value * 10) - (_rssi_value / 10) + _existing_score = (_existing_ber * 10) - (_existing_rssi / 10) + + # Si el stream actual tiene mejor calidad (puntuación menor) + if _current_score < _existing_score: + logger.info('(%s) SIGNAL QUALITY SWITCH: Better signal (BER:%s RSSI:%s vs BER:%s RSSI:%s) for STREAM_ID:%s TGID:%s from %s', + self._system, _ber_value, _rssi_value, _existing_ber, _existing_rssi, + int_id(_stream_id), int_id(_dst_id), int_id(_source_server)) + + # Actualizar con el nuevo stream de mejor calidad + GLOBAL_STREAM_LOG[_stream_key] = { + 'timestamp': _now, + 'src_system': self._system, + 'src_server': _source_server, + 'ber': _ber_value, + 'rssi': _rssi_value, + 'last_seen': _now, + 'packet_count': 1 + } + else: + # Mantener el stream existente y descartar este + logger.debug('(%s) DUPLICATE STREAM: Keeping existing better quality stream (BER:%s RSSI:%s) from %s', + self._system, _existing_ber, _existing_rssi, int_id(_existing['src_server'])) + + # Actualizar solo el last_seen del stream existente + GLOBAL_STREAM_LOG[_stream_key]['last_seen'] = _now + GLOBAL_STREAM_LOG[_stream_key]['packet_count'] = GLOBAL_STREAM_LOG[_stream_key].get('packet_count', 0) + 1 + return # Descartar este paquete + else: + # Nuevo stream, registrar + logger.debug('(%s) NEW STREAM: Registering STREAM_ID:%s TGID:%s from %s (BER:%s RSSI:%s)', + self._system, int_id(_stream_id), int_id(_dst_id), int_id(_source_server), _ber_value, _rssi_value) + + GLOBAL_STREAM_LOG[_stream_key] = { + 'timestamp': _now, + 'src_system': self._system, + 'src_server': _source_server, + 'ber': _ber_value, + 'rssi': _rssi_value, + 'last_seen': _now, + 'packet_count': 1 + } # ======================== # Match UNIT data, SMS/GPS, and send it to the dst_id if it is in SUB_MAP @@ -1615,7 +1676,7 @@ class routerHBP(HBSYSTEM): dmrbits = _target_status[_stream_id]['T_LC'][0:98] + dmrbits[98:166] + _target_status[_stream_id]['T_LC'][98:197] if CONFIG['REPORTS']['REPORT']: call_duration = pkt_time - _target_status[_stream_id]['START'] - systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) + systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{:{:.2f}}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) # Create a Burst B-E packet (Embedded LC) elif _dtype_vseq in [1,2,3,4]: dmrbits = dmrbits[0:116] + _target_status[_stream_id]['EMB_LC'][_dtype_vseq] + dmrbits[148:264] @@ -1639,11 +1700,11 @@ class routerHBP(HBSYSTEM): if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: logger.info('(%s) Call not routed to TGID%s, target in group hangtime: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID'])) continue - if (_target['TGID'] == _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < STREAM_TO): + if ((_target['TGID'] == _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < STREAM_TO)): if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: logger.info('(%s) Call not routed to TGID%s, matching call already active on target: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['RX_TGID'])) continue - if (_target['TGID'] == _target_status[_target['TS']]['TX_TGID']) and (_rf_src != _target_status[_target['TS']]['TX_RFS']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < STREAM_TO): + if ((_target['TGID'] == _target_status[_target['TS']]['TX_TGID']) and (_rf_src != _target_status[_target['TS']]['TX_RFS']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < STREAM_TO)): if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS'])) continue @@ -1687,7 +1748,7 @@ class routerHBP(HBSYSTEM): dmrbits = _target_status[_target['TS']]['TX_T_LC'][0:98] + dmrbits[98:166] + _target_status[_target['TS']]['TX_T_LC'][98:197] if CONFIG['REPORTS']['REPORT']: call_duration = pkt_time - _target_status[_target['TS']]['TX_START'] - systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) + systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{:{:.2f}}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) # Create a Burst B-E packet (Embedded LC) elif _dtype_vseq in [1,2,3,4]: dmrbits = dmrbits[0:116] + _target_status[_target['TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264] @@ -1838,7 +1899,7 @@ class routerHBP(HBSYSTEM): logger.info('(%s) *PRIVATE CALL END* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) DST: %s (%s), TS %s, Duration: %.2f', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration) if CONFIG['REPORTS']['REPORT']: - self._report.send_bridgeEvent('PRIVATE VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) + self._report.send_bridgeEvent('PRIVATE VOICE,END,RX,{},{},{},{},{},{:{:.2f}}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) # Mark status variables for use later self.STATUS[_slot]['RX_PEER'] = _peer_id self.STATUS[_slot]['RX_SEQ'] = _seq @@ -2204,7 +2265,7 @@ class routerHBP(HBSYSTEM): logger.info('(%s) *CALL END* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s, Duration: %.2f, Packet rate: %.2f/s, LOSS: %.2f%%', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration, packet_rate, loss) if CONFIG['REPORTS']['REPORT']: - self._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) + self._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{:{:.2f}}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) #Reset back to False self.STATUS[_slot]['lastSeq'] = False self.STATUS[_slot]['lastData'] = False From 166a52815708a16dbcb720ab501d13d1f2e9e887 Mon Sep 17 00:00:00 2001 From: "Esteban Mackay Q." <49044505+hp3icc@users.noreply.github.com> Date: Fri, 24 Oct 2025 18:51:29 -0500 Subject: [PATCH 3/3] Refactor logging and stream quality handling --- bridge_master.py | 111 +++++++++++------------------------------------ 1 file changed, 25 insertions(+), 86 deletions(-) diff --git a/bridge_master.py b/bridge_master.py index c601407..12a0b12 100644 --- a/bridge_master.py +++ b/bridge_master.py @@ -76,9 +76,8 @@ logger = logging.getLogger(__name__) # ======================== # NUEVO: Registro global para deduplicación de streams desde múltiples OBP -# Ahora incluye información de calidad de señal # ======================== -GLOBAL_STREAM_LOG = {} # Formato: {stream_key: {timestamp, src_system, src_server, ber, rssi, last_seen, packet_count}} +GLOBAL_STREAM_LOG = {} # ======================== #REGEX @@ -400,7 +399,7 @@ def rule_timer_loop(): logger.info('(ROUTER) Conference Bridge INACTIVE (OFF timer running): System: %s Bridge: %s, TS: %s, TGID: %s, Timeout in: %.2fs,', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID']), timeout_in) elif _system['ACTIVE'] == True: _bridge_used = True - logger.debug('(ROUTER) Conference Bridge ACTIVE (no change): System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) + logger.debug('(ROUTER) Conference Bridge ACTIVE (no change): System: %s Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) else: if _system['SYSTEM'][0:3] != 'OBP': _bridge_used = True @@ -543,7 +542,7 @@ def stream_trimmer_loop(): logger.info('(%s) *TIME OUT* RX STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %.2f', \ system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_RFS']), int_id(_slot['RX_TGID']), slot, _slot['RX_TIME'] - _slot['RX_START']) if CONFIG['REPORTS']['REPORT']: - systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_PEER']), int_id(_slot['RX_RFS']), slot, int_id(_slot['RX_TGID']), _slot['RX_TIME'] - _slot['RX_START']).encode(encoding='utf-8', errors='ignore')) + systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_PEER']), int_id(_slot['RX_RFS']), slot, int_id(_slot['RX_TGID']), _slot['RX_TIME'] - _slot['RX_START']).encode(encoding='utf-8', errors='ignore')) #Null stream_id - for loop control if _slot['RX_TIME'] < _now - 60: _slot['RX_STREAM_ID'] = b'\x00' @@ -553,7 +552,7 @@ def stream_trimmer_loop(): logger.debug('(%s) *TIME OUT* TX STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %.2f', \ system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_RFS']), int_id(_slot['TX_TGID']), slot, _slot['TX_TIME'] - _slot['TX_START']) if CONFIG['REPORTS']['REPORT']: - systems[system]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_PEER']), int_id(_slot['TX_RFS']), slot, int_id(_slot['TX_TGID']), _slot['TX_TIME'] - _slot['TX_START']).encode(encoding='utf-8', errors='ignore')) + systems[system]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_PEER']), int_id(_slot['TX_RFS']), slot, int_id(_slot['TX_TGID']), _slot['TX_TIME'] - _slot['TX_START']).encode(encoding='utf-8', errors='ignore')) # OBP systems # We can't delete items from a dicationary that's being iterated, so we have to make a temporarly list of entrys to remove later if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': @@ -588,7 +587,7 @@ def stream_trimmer_loop(): logger.debug('(%s) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %.2f', \ system, int_id(stream_id), get_alias(int_id(_stream['RFS']), subscriber_ids), get_alias(int_id(_stream['RX_PEER']), peer_ids), get_alias(int_id(_stream['TGID']), talkgroup_ids), _stream['LAST'] - _stream['START']) if CONFIG['REPORTS']['REPORT']: - systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_stream['RX_PEER']), int_id(_stream['RFS']), 1, int_id(_stream['TGID']), _stream['LAST'] - _stream['START']).encode(encoding='utf-8', errors='ignore')) + systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_stream['RX_PEER']), int_id(_stream['RFS']), 1, int_id(_stream['TGID']), _stream['LAST'] - _stream['START']).encode(encoding='utf-8', errors='ignore')) systems[system].STATUS[stream_id]['_to'] = True continue except Exception as e: @@ -1041,20 +1040,6 @@ def options_config(): # ======================== MAX_HOPS = 15 # Máximo número de saltos permitidos en la malla -# ======================== -# NUEVO: Función para monitorear la calidad de los streams activos -# ======================== -def monitor_stream_quality(): - """Función para monitorear y reportar la calidad de los streams activos""" - logger.debug('(STREAM_MONITOR) Active streams quality report:') - for stream_key, stream_data in GLOBAL_STREAM_LOG.items(): - stream_id, tgid = stream_key - age = time() - stream_data['timestamp'] - logger.debug('(STREAM_MONITOR) Stream:%s TGID:%s Age:%.1fs BER:%s RSSI:%s Packets:%s Source:%s', - int_id(stream_id), int_id(tgid), age, stream_data['ber'], - stream_data['rssi'], stream_data['packet_count'], int_id(stream_data['src_server'])) -# ======================== - class routerOBP(OPENBRIDGE): def __init__(self, _name, _config, _report): OPENBRIDGE.__init__(self, _name, _config, _report) @@ -1168,12 +1153,12 @@ class routerOBP(OPENBRIDGE): self.STATUS[_stream_id]['CONTENTION'] = True logger.info('(%s) Call not routed to TGID%s, target in group hangtime: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID'])) continue - if ((_target['TGID'] == _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < STREAM_TO)): + if (_target['TGID'] == _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < STREAM_TO): if self.STATUS[_stream_id]['CONTENTION'] == False: self.STATUS[_stream_id]['CONTENTION'] = True logger.info('(%s) Call not routed to TGID%s, matching call already active on target: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['RX_TGID'])) continue - if ((_target['TGID'] == _target_status[_target['TS']]['TX_TGID']) and (_rf_src != _target_status[_target['TS']]['TX_RFS']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < STREAM_TO)): + if (_target['TGID'] == _target_status[_target['TS']]['TX_TGID']) and (_rf_src != _target_status[_target['TS']]['TX_RFS']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < STREAM_TO): if self.STATUS[_stream_id]['CONTENTION'] == False: self.STATUS[_stream_id]['CONTENTION'] = True logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS'])) @@ -1293,71 +1278,25 @@ class routerOBP(OPENBRIDGE): _new_hops = (_hop_count + 1).to_bytes(1, 'big') # ======================== - # NUEVO: Deduplicación global con priorización por calidad de señal + # NUEVO: Deduplicación global de streams desde múltiples OBP # ======================== if self._CONFIG['SYSTEMS'][self._system]['MODE'] == 'OPENBRIDGE': _now = pkt_time _stream_key = (_stream_id, _dst_id) - - # Extraer valores de calidad de señal - _ber_value = int.from_bytes(_ber, 'big') if _ber else 255 # 255 = sin señal/peor - _rssi_value = int.from_bytes(_rssi, 'big', signed=True) if _rssi else -120 # -120 = sin señal/peor - - # Limpiar entradas antiguas (configurable, por defecto 3 segundos) - _cleanup_time = 3.0 + + # Limpiar entradas antiguas (> 2 segundos) for key in list(GLOBAL_STREAM_LOG.keys()): - if _now - GLOBAL_STREAM_LOG[key]['last_seen'] > _cleanup_time: + if _now - GLOBAL_STREAM_LOG[key] > 2.0: del GLOBAL_STREAM_LOG[key] - - # Si ya existe este stream, decidir cuál mantener + + # Si ya vimos este stream+TG desde otro OBP recientemente, ignorar if _stream_key in GLOBAL_STREAM_LOG: - _existing = GLOBAL_STREAM_LOG[_stream_key] - _existing_ber = _existing.get('ber', 255) - _existing_rssi = _existing.get('rssi', -120) - - # Calcular puntuación de calidad (BER tiene más peso) - _current_score = (_ber_value * 10) - (_rssi_value / 10) - _existing_score = (_existing_ber * 10) - (_existing_rssi / 10) - - # Si el stream actual tiene mejor calidad (puntuación menor) - if _current_score < _existing_score: - logger.info('(%s) SIGNAL QUALITY SWITCH: Better signal (BER:%s RSSI:%s vs BER:%s RSSI:%s) for STREAM_ID:%s TGID:%s from %s', - self._system, _ber_value, _rssi_value, _existing_ber, _existing_rssi, - int_id(_stream_id), int_id(_dst_id), int_id(_source_server)) - - # Actualizar con el nuevo stream de mejor calidad - GLOBAL_STREAM_LOG[_stream_key] = { - 'timestamp': _now, - 'src_system': self._system, - 'src_server': _source_server, - 'ber': _ber_value, - 'rssi': _rssi_value, - 'last_seen': _now, - 'packet_count': 1 - } - else: - # Mantener el stream existente y descartar este - logger.debug('(%s) DUPLICATE STREAM: Keeping existing better quality stream (BER:%s RSSI:%s) from %s', - self._system, _existing_ber, _existing_rssi, int_id(_existing['src_server'])) - - # Actualizar solo el last_seen del stream existente - GLOBAL_STREAM_LOG[_stream_key]['last_seen'] = _now - GLOBAL_STREAM_LOG[_stream_key]['packet_count'] = GLOBAL_STREAM_LOG[_stream_key].get('packet_count', 0) + 1 - return # Descartar este paquete - else: - # Nuevo stream, registrar - logger.debug('(%s) NEW STREAM: Registering STREAM_ID:%s TGID:%s from %s (BER:%s RSSI:%s)', - self._system, int_id(_stream_id), int_id(_dst_id), int_id(_source_server), _ber_value, _rssi_value) - - GLOBAL_STREAM_LOG[_stream_key] = { - 'timestamp': _now, - 'src_system': self._system, - 'src_server': _source_server, - 'ber': _ber_value, - 'rssi': _rssi_value, - 'last_seen': _now, - 'packet_count': 1 - } + logger.debug('(%s) DUPLICATE STREAM from another OBP: STREAM_ID=%s TGID=%s, source_server=%s, ignoring', + self._system, int_id(_stream_id), int_id(_dst_id), int_id(_source_server)) + return + + # Registrar este stream como visto + GLOBAL_STREAM_LOG[_stream_key] = _now # ======================== # Match UNIT data, SMS/GPS, and send it to the dst_id if it is in SUB_MAP @@ -1676,7 +1615,7 @@ class routerHBP(HBSYSTEM): dmrbits = _target_status[_stream_id]['T_LC'][0:98] + dmrbits[98:166] + _target_status[_stream_id]['T_LC'][98:197] if CONFIG['REPORTS']['REPORT']: call_duration = pkt_time - _target_status[_stream_id]['START'] - systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{:{:.2f}}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) + systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) # Create a Burst B-E packet (Embedded LC) elif _dtype_vseq in [1,2,3,4]: dmrbits = dmrbits[0:116] + _target_status[_stream_id]['EMB_LC'][_dtype_vseq] + dmrbits[148:264] @@ -1700,11 +1639,11 @@ class routerHBP(HBSYSTEM): if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: logger.info('(%s) Call not routed to TGID%s, target in group hangtime: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID'])) continue - if ((_target['TGID'] == _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < STREAM_TO)): + if (_target['TGID'] == _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < STREAM_TO): if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: logger.info('(%s) Call not routed to TGID%s, matching call already active on target: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['RX_TGID'])) continue - if ((_target['TGID'] == _target_status[_target['TS']]['TX_TGID']) and (_rf_src != _target_status[_target['TS']]['TX_RFS']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < STREAM_TO)): + if (_target['TGID'] == _target_status[_target['TS']]['TX_TGID']) and (_rf_src != _target_status[_target['TS']]['TX_RFS']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < STREAM_TO): if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS'])) continue @@ -1748,7 +1687,7 @@ class routerHBP(HBSYSTEM): dmrbits = _target_status[_target['TS']]['TX_T_LC'][0:98] + dmrbits[98:166] + _target_status[_target['TS']]['TX_T_LC'][98:197] if CONFIG['REPORTS']['REPORT']: call_duration = pkt_time - _target_status[_target['TS']]['TX_START'] - systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{:{:.2f}}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) + systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) # Create a Burst B-E packet (Embedded LC) elif _dtype_vseq in [1,2,3,4]: dmrbits = dmrbits[0:116] + _target_status[_target['TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264] @@ -1899,7 +1838,7 @@ class routerHBP(HBSYSTEM): logger.info('(%s) *PRIVATE CALL END* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) DST: %s (%s), TS %s, Duration: %.2f', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration) if CONFIG['REPORTS']['REPORT']: - self._report.send_bridgeEvent('PRIVATE VOICE,END,RX,{},{},{},{},{},{:{:.2f}}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) + self._report.send_bridgeEvent('PRIVATE VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) # Mark status variables for use later self.STATUS[_slot]['RX_PEER'] = _peer_id self.STATUS[_slot]['RX_SEQ'] = _seq @@ -2265,7 +2204,7 @@ class routerHBP(HBSYSTEM): logger.info('(%s) *CALL END* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s, Duration: %.2f, Packet rate: %.2f/s, LOSS: %.2f%%', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration, packet_rate, loss) if CONFIG['REPORTS']['REPORT']: - self._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{:{:.2f}}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) + self._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) #Reset back to False self.STATUS[_slot]['lastSeq'] = False self.STATUS[_slot]['lastData'] = False