From cbfd8a6d527f556a8e9e1b5cb33aa07e905bdecc Mon Sep 17 00:00:00 2001 From: "Esteban Mackay Q." <49044505+hp3icc@users.noreply.github.com> Date: Fri, 24 Oct 2025 18:01:05 -0500 Subject: [PATCH] Implement global stream logging and monitoring Added global stream logging with signal quality information and a monitoring function for active streams. --- bridge_master.py | 111 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 86 insertions(+), 25 deletions(-) diff --git a/bridge_master.py b/bridge_master.py index 12a0b12..c601407 100644 --- a/bridge_master.py +++ b/bridge_master.py @@ -76,8 +76,9 @@ logger = logging.getLogger(__name__) # ======================== # NUEVO: Registro global para deduplicación de streams desde múltiples OBP +# Ahora incluye información de calidad de señal # ======================== -GLOBAL_STREAM_LOG = {} +GLOBAL_STREAM_LOG = {} # Formato: {stream_key: {timestamp, src_system, src_server, ber, rssi, last_seen, packet_count}} # ======================== #REGEX @@ -399,7 +400,7 @@ def rule_timer_loop(): logger.info('(ROUTER) Conference Bridge INACTIVE (OFF timer running): System: %s Bridge: %s, TS: %s, TGID: %s, Timeout in: %.2fs,', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID']), timeout_in) elif _system['ACTIVE'] == True: _bridge_used = True - logger.debug('(ROUTER) Conference Bridge ACTIVE (no change): System: %s Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) + logger.debug('(ROUTER) Conference Bridge ACTIVE (no change): System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) else: if _system['SYSTEM'][0:3] != 'OBP': _bridge_used = True @@ -542,7 +543,7 @@ def stream_trimmer_loop(): logger.info('(%s) *TIME OUT* RX STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %.2f', \ system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_RFS']), int_id(_slot['RX_TGID']), slot, _slot['RX_TIME'] - _slot['RX_START']) if CONFIG['REPORTS']['REPORT']: - systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_PEER']), int_id(_slot['RX_RFS']), slot, int_id(_slot['RX_TGID']), _slot['RX_TIME'] - _slot['RX_START']).encode(encoding='utf-8', errors='ignore')) + systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_PEER']), int_id(_slot['RX_RFS']), slot, int_id(_slot['RX_TGID']), _slot['RX_TIME'] - _slot['RX_START']).encode(encoding='utf-8', errors='ignore')) #Null stream_id - for loop control if _slot['RX_TIME'] < _now - 60: _slot['RX_STREAM_ID'] = b'\x00' @@ -552,7 +553,7 @@ def stream_trimmer_loop(): logger.debug('(%s) *TIME OUT* TX STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %.2f', \ system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_RFS']), int_id(_slot['TX_TGID']), slot, _slot['TX_TIME'] - _slot['TX_START']) if CONFIG['REPORTS']['REPORT']: - systems[system]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_PEER']), int_id(_slot['TX_RFS']), slot, int_id(_slot['TX_TGID']), _slot['TX_TIME'] - _slot['TX_START']).encode(encoding='utf-8', errors='ignore')) + systems[system]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_PEER']), int_id(_slot['TX_RFS']), slot, int_id(_slot['TX_TGID']), _slot['TX_TIME'] - _slot['TX_START']).encode(encoding='utf-8', errors='ignore')) # OBP systems # We can't delete items from a dicationary that's being iterated, so we have to make a temporarly list of entrys to remove later if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': @@ -587,7 +588,7 @@ def stream_trimmer_loop(): logger.debug('(%s) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %.2f', \ system, int_id(stream_id), get_alias(int_id(_stream['RFS']), subscriber_ids), get_alias(int_id(_stream['RX_PEER']), peer_ids), get_alias(int_id(_stream['TGID']), talkgroup_ids), _stream['LAST'] - _stream['START']) if CONFIG['REPORTS']['REPORT']: - systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_stream['RX_PEER']), int_id(_stream['RFS']), 1, int_id(_stream['TGID']), _stream['LAST'] - _stream['START']).encode(encoding='utf-8', errors='ignore')) + systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_stream['RX_PEER']), int_id(_stream['RFS']), 1, int_id(_stream['TGID']), _stream['LAST'] - _stream['START']).encode(encoding='utf-8', errors='ignore')) systems[system].STATUS[stream_id]['_to'] = True continue except Exception as e: @@ -1040,6 +1041,20 @@ def options_config(): # ======================== MAX_HOPS = 15 # Máximo número de saltos permitidos en la malla +# ======================== +# NUEVO: Función para monitorear la calidad de los streams activos +# ======================== +def monitor_stream_quality(): + """Función para monitorear y reportar la calidad de los streams activos""" + logger.debug('(STREAM_MONITOR) Active streams quality report:') + for stream_key, stream_data in GLOBAL_STREAM_LOG.items(): + stream_id, tgid = stream_key + age = time() - stream_data['timestamp'] + logger.debug('(STREAM_MONITOR) Stream:%s TGID:%s Age:%.1fs BER:%s RSSI:%s Packets:%s Source:%s', + int_id(stream_id), int_id(tgid), age, stream_data['ber'], + stream_data['rssi'], stream_data['packet_count'], int_id(stream_data['src_server'])) +# ======================== + class routerOBP(OPENBRIDGE): def __init__(self, _name, _config, _report): OPENBRIDGE.__init__(self, _name, _config, _report) @@ -1153,12 +1168,12 @@ class routerOBP(OPENBRIDGE): self.STATUS[_stream_id]['CONTENTION'] = True logger.info('(%s) Call not routed to TGID%s, target in group hangtime: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID'])) continue - if (_target['TGID'] == _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < STREAM_TO): + if ((_target['TGID'] == _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < STREAM_TO)): if self.STATUS[_stream_id]['CONTENTION'] == False: self.STATUS[_stream_id]['CONTENTION'] = True logger.info('(%s) Call not routed to TGID%s, matching call already active on target: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['RX_TGID'])) continue - if (_target['TGID'] == _target_status[_target['TS']]['TX_TGID']) and (_rf_src != _target_status[_target['TS']]['TX_RFS']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < STREAM_TO): + if ((_target['TGID'] == _target_status[_target['TS']]['TX_TGID']) and (_rf_src != _target_status[_target['TS']]['TX_RFS']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < STREAM_TO)): if self.STATUS[_stream_id]['CONTENTION'] == False: self.STATUS[_stream_id]['CONTENTION'] = True logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS'])) @@ -1278,25 +1293,71 @@ class routerOBP(OPENBRIDGE): _new_hops = (_hop_count + 1).to_bytes(1, 'big') # ======================== - # NUEVO: Deduplicación global de streams desde múltiples OBP + # NUEVO: Deduplicación global con priorización por calidad de señal # ======================== if self._CONFIG['SYSTEMS'][self._system]['MODE'] == 'OPENBRIDGE': _now = pkt_time _stream_key = (_stream_id, _dst_id) - - # Limpiar entradas antiguas (> 2 segundos) + + # Extraer valores de calidad de señal + _ber_value = int.from_bytes(_ber, 'big') if _ber else 255 # 255 = sin señal/peor + _rssi_value = int.from_bytes(_rssi, 'big', signed=True) if _rssi else -120 # -120 = sin señal/peor + + # Limpiar entradas antiguas (configurable, por defecto 3 segundos) + _cleanup_time = 3.0 for key in list(GLOBAL_STREAM_LOG.keys()): - if _now - GLOBAL_STREAM_LOG[key] > 2.0: + if _now - GLOBAL_STREAM_LOG[key]['last_seen'] > _cleanup_time: del GLOBAL_STREAM_LOG[key] - - # Si ya vimos este stream+TG desde otro OBP recientemente, ignorar + + # Si ya existe este stream, decidir cuál mantener if _stream_key in GLOBAL_STREAM_LOG: - logger.debug('(%s) DUPLICATE STREAM from another OBP: STREAM_ID=%s TGID=%s, source_server=%s, ignoring', - self._system, int_id(_stream_id), int_id(_dst_id), int_id(_source_server)) - return - - # Registrar este stream como visto - GLOBAL_STREAM_LOG[_stream_key] = _now + _existing = GLOBAL_STREAM_LOG[_stream_key] + _existing_ber = _existing.get('ber', 255) + _existing_rssi = _existing.get('rssi', -120) + + # Calcular puntuación de calidad (BER tiene más peso) + _current_score = (_ber_value * 10) - (_rssi_value / 10) + _existing_score = (_existing_ber * 10) - (_existing_rssi / 10) + + # Si el stream actual tiene mejor calidad (puntuación menor) + if _current_score < _existing_score: + logger.info('(%s) SIGNAL QUALITY SWITCH: Better signal (BER:%s RSSI:%s vs BER:%s RSSI:%s) for STREAM_ID:%s TGID:%s from %s', + self._system, _ber_value, _rssi_value, _existing_ber, _existing_rssi, + int_id(_stream_id), int_id(_dst_id), int_id(_source_server)) + + # Actualizar con el nuevo stream de mejor calidad + GLOBAL_STREAM_LOG[_stream_key] = { + 'timestamp': _now, + 'src_system': self._system, + 'src_server': _source_server, + 'ber': _ber_value, + 'rssi': _rssi_value, + 'last_seen': _now, + 'packet_count': 1 + } + else: + # Mantener el stream existente y descartar este + logger.debug('(%s) DUPLICATE STREAM: Keeping existing better quality stream (BER:%s RSSI:%s) from %s', + self._system, _existing_ber, _existing_rssi, int_id(_existing['src_server'])) + + # Actualizar solo el last_seen del stream existente + GLOBAL_STREAM_LOG[_stream_key]['last_seen'] = _now + GLOBAL_STREAM_LOG[_stream_key]['packet_count'] = GLOBAL_STREAM_LOG[_stream_key].get('packet_count', 0) + 1 + return # Descartar este paquete + else: + # Nuevo stream, registrar + logger.debug('(%s) NEW STREAM: Registering STREAM_ID:%s TGID:%s from %s (BER:%s RSSI:%s)', + self._system, int_id(_stream_id), int_id(_dst_id), int_id(_source_server), _ber_value, _rssi_value) + + GLOBAL_STREAM_LOG[_stream_key] = { + 'timestamp': _now, + 'src_system': self._system, + 'src_server': _source_server, + 'ber': _ber_value, + 'rssi': _rssi_value, + 'last_seen': _now, + 'packet_count': 1 + } # ======================== # Match UNIT data, SMS/GPS, and send it to the dst_id if it is in SUB_MAP @@ -1615,7 +1676,7 @@ class routerHBP(HBSYSTEM): dmrbits = _target_status[_stream_id]['T_LC'][0:98] + dmrbits[98:166] + _target_status[_stream_id]['T_LC'][98:197] if CONFIG['REPORTS']['REPORT']: call_duration = pkt_time - _target_status[_stream_id]['START'] - systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) + systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{:{:.2f}}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) # Create a Burst B-E packet (Embedded LC) elif _dtype_vseq in [1,2,3,4]: dmrbits = dmrbits[0:116] + _target_status[_stream_id]['EMB_LC'][_dtype_vseq] + dmrbits[148:264] @@ -1639,11 +1700,11 @@ class routerHBP(HBSYSTEM): if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: logger.info('(%s) Call not routed to TGID%s, target in group hangtime: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID'])) continue - if (_target['TGID'] == _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < STREAM_TO): + if ((_target['TGID'] == _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < STREAM_TO)): if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: logger.info('(%s) Call not routed to TGID%s, matching call already active on target: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['RX_TGID'])) continue - if (_target['TGID'] == _target_status[_target['TS']]['TX_TGID']) and (_rf_src != _target_status[_target['TS']]['TX_RFS']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < STREAM_TO): + if ((_target['TGID'] == _target_status[_target['TS']]['TX_TGID']) and (_rf_src != _target_status[_target['TS']]['TX_RFS']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < STREAM_TO)): if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS'])) continue @@ -1687,7 +1748,7 @@ class routerHBP(HBSYSTEM): dmrbits = _target_status[_target['TS']]['TX_T_LC'][0:98] + dmrbits[98:166] + _target_status[_target['TS']]['TX_T_LC'][98:197] if CONFIG['REPORTS']['REPORT']: call_duration = pkt_time - _target_status[_target['TS']]['TX_START'] - systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) + systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{:{:.2f}}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) # Create a Burst B-E packet (Embedded LC) elif _dtype_vseq in [1,2,3,4]: dmrbits = dmrbits[0:116] + _target_status[_target['TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264] @@ -1838,7 +1899,7 @@ class routerHBP(HBSYSTEM): logger.info('(%s) *PRIVATE CALL END* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) DST: %s (%s), TS %s, Duration: %.2f', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration) if CONFIG['REPORTS']['REPORT']: - self._report.send_bridgeEvent('PRIVATE VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) + self._report.send_bridgeEvent('PRIVATE VOICE,END,RX,{},{},{},{},{},{:{:.2f}}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) # Mark status variables for use later self.STATUS[_slot]['RX_PEER'] = _peer_id self.STATUS[_slot]['RX_SEQ'] = _seq @@ -2204,7 +2265,7 @@ class routerHBP(HBSYSTEM): logger.info('(%s) *CALL END* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s, Duration: %.2f, Packet rate: %.2f/s, LOSS: %.2f%%', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration, packet_rate, loss) if CONFIG['REPORTS']['REPORT']: - self._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) + self._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{:{:.2f}}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) #Reset back to False self.STATUS[_slot]['lastSeq'] = False self.STATUS[_slot]['lastData'] = False