Implement global stream logging and monitoring

Added global stream logging with signal quality information and a monitoring function for active streams.
pull/15/head
Esteban Mackay Q. 2 months ago committed by GitHub
parent bb63139e0b
commit cbfd8a6d52
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -76,8 +76,9 @@ logger = logging.getLogger(__name__)
# ========================
# NUEVO: Registro global para deduplicación de streams desde múltiples OBP
# Ahora incluye información de calidad de señal
# ========================
GLOBAL_STREAM_LOG = {}
GLOBAL_STREAM_LOG = {} # Formato: {stream_key: {timestamp, src_system, src_server, ber, rssi, last_seen, packet_count}}
# ========================
#REGEX
@ -399,7 +400,7 @@ def rule_timer_loop():
logger.info('(ROUTER) Conference Bridge INACTIVE (OFF timer running): System: %s Bridge: %s, TS: %s, TGID: %s, Timeout in: %.2fs,', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID']), timeout_in)
elif _system['ACTIVE'] == True:
_bridge_used = True
logger.debug('(ROUTER) Conference Bridge ACTIVE (no change): System: %s Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID']))
logger.debug('(ROUTER) Conference Bridge ACTIVE (no change): System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID']))
else:
if _system['SYSTEM'][0:3] != 'OBP':
_bridge_used = True
@ -542,7 +543,7 @@ def stream_trimmer_loop():
logger.info('(%s) *TIME OUT* RX STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %.2f', \
system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_RFS']), int_id(_slot['RX_TGID']), slot, _slot['RX_TIME'] - _slot['RX_START'])
if CONFIG['REPORTS']['REPORT']:
systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_PEER']), int_id(_slot['RX_RFS']), slot, int_id(_slot['RX_TGID']), _slot['RX_TIME'] - _slot['RX_START']).encode(encoding='utf-8', errors='ignore'))
systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_PEER']), int_id(_slot['RX_RFS']), slot, int_id(_slot['RX_TGID']), _slot['RX_TIME'] - _slot['RX_START']).encode(encoding='utf-8', errors='ignore'))
#Null stream_id - for loop control
if _slot['RX_TIME'] < _now - 60:
_slot['RX_STREAM_ID'] = b'\x00'
@ -552,7 +553,7 @@ def stream_trimmer_loop():
logger.debug('(%s) *TIME OUT* TX STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %.2f', \
system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_RFS']), int_id(_slot['TX_TGID']), slot, _slot['TX_TIME'] - _slot['TX_START'])
if CONFIG['REPORTS']['REPORT']:
systems[system]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_PEER']), int_id(_slot['TX_RFS']), slot, int_id(_slot['TX_TGID']), _slot['TX_TIME'] - _slot['TX_START']).encode(encoding='utf-8', errors='ignore'))
systems[system]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_PEER']), int_id(_slot['TX_RFS']), slot, int_id(_slot['TX_TGID']), _slot['TX_TIME'] - _slot['TX_START']).encode(encoding='utf-8', errors='ignore'))
# OBP systems
# We can't delete items from a dicationary that's being iterated, so we have to make a temporarly list of entrys to remove later
if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE':
@ -587,7 +588,7 @@ def stream_trimmer_loop():
logger.debug('(%s) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %.2f', \
system, int_id(stream_id), get_alias(int_id(_stream['RFS']), subscriber_ids), get_alias(int_id(_stream['RX_PEER']), peer_ids), get_alias(int_id(_stream['TGID']), talkgroup_ids), _stream['LAST'] - _stream['START'])
if CONFIG['REPORTS']['REPORT']:
systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_stream['RX_PEER']), int_id(_stream['RFS']), 1, int_id(_stream['TGID']), _stream['LAST'] - _stream['START']).encode(encoding='utf-8', errors='ignore'))
systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_stream['RX_PEER']), int_id(_stream['RFS']), 1, int_id(_stream['TGID']), _stream['LAST'] - _stream['START']).encode(encoding='utf-8', errors='ignore'))
systems[system].STATUS[stream_id]['_to'] = True
continue
except Exception as e:
@ -1040,6 +1041,20 @@ def options_config():
# ========================
MAX_HOPS = 15 # Máximo número de saltos permitidos en la malla
# ========================
# NUEVO: Función para monitorear la calidad de los streams activos
# ========================
def monitor_stream_quality():
"""Función para monitorear y reportar la calidad de los streams activos"""
logger.debug('(STREAM_MONITOR) Active streams quality report:')
for stream_key, stream_data in GLOBAL_STREAM_LOG.items():
stream_id, tgid = stream_key
age = time() - stream_data['timestamp']
logger.debug('(STREAM_MONITOR) Stream:%s TGID:%s Age:%.1fs BER:%s RSSI:%s Packets:%s Source:%s',
int_id(stream_id), int_id(tgid), age, stream_data['ber'],
stream_data['rssi'], stream_data['packet_count'], int_id(stream_data['src_server']))
# ========================
class routerOBP(OPENBRIDGE):
def __init__(self, _name, _config, _report):
OPENBRIDGE.__init__(self, _name, _config, _report)
@ -1153,12 +1168,12 @@ class routerOBP(OPENBRIDGE):
self.STATUS[_stream_id]['CONTENTION'] = True
logger.info('(%s) Call not routed to TGID%s, target in group hangtime: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']))
continue
if (_target['TGID'] == _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < STREAM_TO):
if ((_target['TGID'] == _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < STREAM_TO)):
if self.STATUS[_stream_id]['CONTENTION'] == False:
self.STATUS[_stream_id]['CONTENTION'] = True
logger.info('(%s) Call not routed to TGID%s, matching call already active on target: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['RX_TGID']))
continue
if (_target['TGID'] == _target_status[_target['TS']]['TX_TGID']) and (_rf_src != _target_status[_target['TS']]['TX_RFS']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < STREAM_TO):
if ((_target['TGID'] == _target_status[_target['TS']]['TX_TGID']) and (_rf_src != _target_status[_target['TS']]['TX_RFS']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < STREAM_TO)):
if self.STATUS[_stream_id]['CONTENTION'] == False:
self.STATUS[_stream_id]['CONTENTION'] = True
logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS']))
@ -1278,25 +1293,71 @@ class routerOBP(OPENBRIDGE):
_new_hops = (_hop_count + 1).to_bytes(1, 'big')
# ========================
# NUEVO: Deduplicación global de streams desde múltiples OBP
# NUEVO: Deduplicación global con priorización por calidad de señal
# ========================
if self._CONFIG['SYSTEMS'][self._system]['MODE'] == 'OPENBRIDGE':
_now = pkt_time
_stream_key = (_stream_id, _dst_id)
# Limpiar entradas antiguas (> 2 segundos)
# Extraer valores de calidad de señal
_ber_value = int.from_bytes(_ber, 'big') if _ber else 255 # 255 = sin señal/peor
_rssi_value = int.from_bytes(_rssi, 'big', signed=True) if _rssi else -120 # -120 = sin señal/peor
# Limpiar entradas antiguas (configurable, por defecto 3 segundos)
_cleanup_time = 3.0
for key in list(GLOBAL_STREAM_LOG.keys()):
if _now - GLOBAL_STREAM_LOG[key] > 2.0:
if _now - GLOBAL_STREAM_LOG[key]['last_seen'] > _cleanup_time:
del GLOBAL_STREAM_LOG[key]
# Si ya vimos este stream+TG desde otro OBP recientemente, ignorar
# Si ya existe este stream, decidir cuál mantener
if _stream_key in GLOBAL_STREAM_LOG:
logger.debug('(%s) DUPLICATE STREAM from another OBP: STREAM_ID=%s TGID=%s, source_server=%s, ignoring',
self._system, int_id(_stream_id), int_id(_dst_id), int_id(_source_server))
return
# Registrar este stream como visto
GLOBAL_STREAM_LOG[_stream_key] = _now
_existing = GLOBAL_STREAM_LOG[_stream_key]
_existing_ber = _existing.get('ber', 255)
_existing_rssi = _existing.get('rssi', -120)
# Calcular puntuación de calidad (BER tiene más peso)
_current_score = (_ber_value * 10) - (_rssi_value / 10)
_existing_score = (_existing_ber * 10) - (_existing_rssi / 10)
# Si el stream actual tiene mejor calidad (puntuación menor)
if _current_score < _existing_score:
logger.info('(%s) SIGNAL QUALITY SWITCH: Better signal (BER:%s RSSI:%s vs BER:%s RSSI:%s) for STREAM_ID:%s TGID:%s from %s',
self._system, _ber_value, _rssi_value, _existing_ber, _existing_rssi,
int_id(_stream_id), int_id(_dst_id), int_id(_source_server))
# Actualizar con el nuevo stream de mejor calidad
GLOBAL_STREAM_LOG[_stream_key] = {
'timestamp': _now,
'src_system': self._system,
'src_server': _source_server,
'ber': _ber_value,
'rssi': _rssi_value,
'last_seen': _now,
'packet_count': 1
}
else:
# Mantener el stream existente y descartar este
logger.debug('(%s) DUPLICATE STREAM: Keeping existing better quality stream (BER:%s RSSI:%s) from %s',
self._system, _existing_ber, _existing_rssi, int_id(_existing['src_server']))
# Actualizar solo el last_seen del stream existente
GLOBAL_STREAM_LOG[_stream_key]['last_seen'] = _now
GLOBAL_STREAM_LOG[_stream_key]['packet_count'] = GLOBAL_STREAM_LOG[_stream_key].get('packet_count', 0) + 1
return # Descartar este paquete
else:
# Nuevo stream, registrar
logger.debug('(%s) NEW STREAM: Registering STREAM_ID:%s TGID:%s from %s (BER:%s RSSI:%s)',
self._system, int_id(_stream_id), int_id(_dst_id), int_id(_source_server), _ber_value, _rssi_value)
GLOBAL_STREAM_LOG[_stream_key] = {
'timestamp': _now,
'src_system': self._system,
'src_server': _source_server,
'ber': _ber_value,
'rssi': _rssi_value,
'last_seen': _now,
'packet_count': 1
}
# ========================
# Match UNIT data, SMS/GPS, and send it to the dst_id if it is in SUB_MAP
@ -1615,7 +1676,7 @@ class routerHBP(HBSYSTEM):
dmrbits = _target_status[_stream_id]['T_LC'][0:98] + dmrbits[98:166] + _target_status[_stream_id]['T_LC'][98:197]
if CONFIG['REPORTS']['REPORT']:
call_duration = pkt_time - _target_status[_stream_id]['START']
systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore'))
systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{:{:.2f}}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore'))
# Create a Burst B-E packet (Embedded LC)
elif _dtype_vseq in [1,2,3,4]:
dmrbits = dmrbits[0:116] + _target_status[_stream_id]['EMB_LC'][_dtype_vseq] + dmrbits[148:264]
@ -1639,11 +1700,11 @@ class routerHBP(HBSYSTEM):
if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id:
logger.info('(%s) Call not routed to TGID%s, target in group hangtime: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']))
continue
if (_target['TGID'] == _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < STREAM_TO):
if ((_target['TGID'] == _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < STREAM_TO)):
if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id:
logger.info('(%s) Call not routed to TGID%s, matching call already active on target: HBSystem: %s, TS: %s, TGID: %s', self._system, int_id(_target['TGID']), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['RX_TGID']))
continue
if (_target['TGID'] == _target_status[_target['TS']]['TX_TGID']) and (_rf_src != _target_status[_target['TS']]['TX_RFS']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < STREAM_TO):
if ((_target['TGID'] == _target_status[_target['TS']]['TX_TGID']) and (_rf_src != _target_status[_target['TS']]['TX_RFS']) and ((pkt_time - _target_status[_target['TS']]['TX_TIME']) < STREAM_TO)):
if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id:
logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS']))
continue
@ -1687,7 +1748,7 @@ class routerHBP(HBSYSTEM):
dmrbits = _target_status[_target['TS']]['TX_T_LC'][0:98] + dmrbits[98:166] + _target_status[_target['TS']]['TX_T_LC'][98:197]
if CONFIG['REPORTS']['REPORT']:
call_duration = pkt_time - _target_status[_target['TS']]['TX_START']
systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore'))
systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{:{:.2f}}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore'))
# Create a Burst B-E packet (Embedded LC)
elif _dtype_vseq in [1,2,3,4]:
dmrbits = dmrbits[0:116] + _target_status[_target['TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264]
@ -1838,7 +1899,7 @@ class routerHBP(HBSYSTEM):
logger.info('(%s) *PRIVATE CALL END* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) DST: %s (%s), TS %s, Duration: %.2f', \
self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration)
if CONFIG['REPORTS']['REPORT']:
self._report.send_bridgeEvent('PRIVATE VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore'))
self._report.send_bridgeEvent('PRIVATE VOICE,END,RX,{},{},{},{},{},{:{:.2f}}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore'))
# Mark status variables for use later
self.STATUS[_slot]['RX_PEER'] = _peer_id
self.STATUS[_slot]['RX_SEQ'] = _seq
@ -2204,7 +2265,7 @@ class routerHBP(HBSYSTEM):
logger.info('(%s) *CALL END* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s, Duration: %.2f, Packet rate: %.2f/s, LOSS: %.2f%%', \
self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration, packet_rate, loss)
if CONFIG['REPORTS']['REPORT']:
self._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore'))
self._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{:{:.2f}}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore'))
#Reset back to False
self.STATUS[_slot]['lastSeq'] = False
self.STATUS[_slot]['lastData'] = False

Loading…
Cancel
Save

Powered by TurnKey Linux.