Implement global stream deduplication and TGID updates

Added global stream logging to deduplicate streams from multiple OBPs
pull/15/head
Esteban Mackay Q. 2 months ago committed by GitHub
parent 225fb56707
commit bb63139e0b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -73,6 +73,13 @@ import pickle
# The module needs logging, but handlers, etc. are controlled by the parent
import logging
logger = logging.getLogger(__name__)
# ========================
# NUEVO: Registro global para deduplicación de streams desde múltiples OBP
# ========================
GLOBAL_STREAM_LOG = {}
# ========================
#REGEX
import re
from binascii import b2a_hex as ahex
@ -85,6 +92,7 @@ __credits__ = 'Colin Durbridge, G4EML, Steve Zingman, N4IRS; Mike Zingman, N4
__license__ = 'GNU GPLv3'
__maintainer__ = 'Simon Adlem G7RZU'
__email__ = 'simon@gb7fr.org.uk'
#Set header bits
#used for slot rewrite and type rewrite
def header(slot,call_type,bits):
@ -94,6 +102,7 @@ def header(slot,call_type,bits):
if call_type == 'unit':
bits = 0b00000011 | bits
return bits
# Timed loop used for reporting HBP status
#
# REPORT BASED ON THE TYPE SELECTED IN THE MAIN CONFIG FILE
@ -116,6 +125,7 @@ def config_reports(_config, _factory):
reporting = task.LoopingCall(reporting_loop, logger, report_server)
reporting.start(_config['REPORTS']['REPORT_INTERVAL'])
return report_server
# Start API server
def config_API(_config, _bridges):
application = Application([FD_API],
@ -130,6 +140,7 @@ def config_API(_config, _bridges):
site = Site(resource)
r = reactor.listenTCP(8000, site, interface='0.0.0.0')
return(r)
# Import Bridging rules
# Note: A stanza *must* exist for any MASTER or CLIENT configured in the main
# configuration file and listed as "active". It can be empty,
@ -175,6 +186,7 @@ def make_bridges(_rules):
if ts2 == False:
_rules[_bridge].append({'SYSTEM': _confsystem, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [bytes_3(4000)],'ON': [],'RESET': [], 'TIMER': time()})
return _rules
### MODIFIED: Updated to handle all special TGIDs (9990-9999) with a 1-minute timeout
def make_single_bridge(_tgid,_sourcesystem,_slot,_tmout):
_tgid_s = str(int_id(_tgid))
@ -199,6 +211,7 @@ def make_single_bridge(_tgid,_sourcesystem,_slot,_tmout):
if _system[0:3] == 'OBP' and (int_id(_tgid) >= 79 and (int_id(_tgid) < 9990 or int_id(_tgid) > 9999)):
BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'NONE','OFF': [],'ON': [],'RESET': [], 'TIMER': time()})
### END MODIFIED ###
#Make static bridge - used for on-the-fly relay bridges
def make_stat_bridge(_tgid):
_tgid_s = str(int_id(_tgid))
@ -211,6 +224,7 @@ def make_stat_bridge(_tgid):
BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 2, 'TGID': _tgid,'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [_tgid,],'RESET': [], 'TIMER': time()})
if _system[0:3] == 'OBP':
BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'STAT','OFF': [],'ON': [],'RESET': [], 'TIMER': time()})
def make_default_reflector(reflector,_tmout,system):
bridge = ''.join(['#',str(reflector)])
#_tmout = CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER']
@ -224,6 +238,7 @@ def make_default_reflector(reflector,_tmout,system):
else:
bridgetemp.append(bridgesystem)
BRIDGES[bridge] = bridgetemp
def make_static_tg(tg,ts,_tmout,system):
#_tmout = CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER']
if str(tg) not in BRIDGES:
@ -235,6 +250,7 @@ def make_static_tg(tg,ts,_tmout,system):
else:
bridgetemp.append(bridgesystem)
BRIDGES[str(tg)] = bridgetemp
def reset_static_tg(tg,ts,_tmout,system):
#_tmout = CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER']
bridgetemp = deque()
@ -248,6 +264,7 @@ def reset_static_tg(tg,ts,_tmout,system):
except KeyError:
logger.exception('(%s) KeyError in reset_static_tg() - bridge gone away? TG: %s',system,tg)
return
def reset_all_reflector_system(_tmout,system):
for system in CONFIG['SYSTEMS']:
for bridge in BRIDGES:
@ -259,6 +276,7 @@ def reset_all_reflector_system(_tmout,system):
else:
bridgetemp.append(bridgesystem)
BRIDGES[bridge] = bridgetemp
### MODIFIED: Updated to handle all special TGIDs (9990-9999) with a 1-minute timeout
def make_single_reflector(_tgid,_tmout,_sourcesystem):
_tgid_s = str(int_id(_tgid))
@ -278,6 +296,7 @@ def make_single_reflector(_tgid,_tmout,_sourcesystem):
if _system[0:3] == 'OBP' and (int_id(_tgid) >= 79 and (int_id(_tgid) < 9990 or int_id(_tgid) > 9999)):
BRIDGES[_bridge].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'NONE','OFF': [],'ON': [],'RESET': [], 'TIMER': time()})
### END MODIFIED ###
def remove_bridge_system(system):
_bridgestemp = {}
_bridgetemp = {}
@ -292,6 +311,7 @@ def remove_bridge_system(system):
_bridgestemp[_bridge] = []
_bridgestemp[_bridge].append({'SYSTEM': system, 'TS': _bridgesystem['TS'], 'TGID': _bridgesystem['TGID'],'ACTIVE': False,'TIMEOUT': _bridgesystem['TIMEOUT'],'TO_TYPE': 'ON','OFF': [],'ON': [_bridgesystem['TGID'],],'RESET': [], 'TIMER': time() + _bridgesystem['TIMEOUT']})
BRIDGES.update(_bridgestemp)
def deactivate_all_dynamic_bridges(system_name):
"""Desactiva todos los bridges dinámicos (no estáticos, no reflectores) de un sistema."""
for _bridge in BRIDGES:
@ -303,6 +323,7 @@ def deactivate_all_dynamic_bridges(system_name):
_sys_entry['ACTIVE'] = False
logger.info('(ROUTER) Deactivated dynamic bridge due to TG/ID 4000: System: %s, Bridge: %s, TS: %s, TGID: %s',
system_name, _bridge, _sys_entry['TS'], int_id(_sys_entry['TGID']))
### MODIFIED: Core logic updated to handle special TGIDs (9990-9999) correctly with SINGLE_MODE
def rule_timer_loop():
logger.debug('(ROUTER) routerHBP Rule timer loop started')
@ -345,7 +366,7 @@ def rule_timer_loop():
if _system['ACTIVE'] == False:
# Activar inmediatamente sin timer
_system['ACTIVE'] = True
_bridge_used = True
_bridge_used = True
logger.info('(ROUTER) Conference Bridge ACTIVATED (NO TIMEOUT): System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID']))
else:
_bridge_used = True
@ -393,6 +414,7 @@ def rule_timer_loop():
if CONFIG['REPORTS']['REPORT']:
report_server.send_clients(b'bridge updated')
### END MODIFIED ###
def statTrimmer():
logger.debug('(ROUTER) STAT trimmer loop started')
_remove_bridges = deque()
@ -413,6 +435,7 @@ def statTrimmer():
logger.debug('(ROUTER) STAT bridge %s removed',_bridgerem)
if CONFIG['REPORTS']['REPORT']:
report_server.send_clients(b'bridge updated')
#Debug and fix bridge table issues.
def bridgeDebug():
logger.debug('(BRIDGEDEBUG) Running bridge debug')
@ -465,6 +488,7 @@ def bridgeDebug():
bridgetemp.append(bridgesystem)
BRIDGES[_bridge] = bridgetemp
logger.info('(BRIDGEDEBUG) The server currently has %s STATic bridges',statroll)
def kaReporting():
logger.debug('(ROUTER) KeepAlive reporting loop started')
for system in systems:
@ -474,6 +498,7 @@ def kaReporting():
logger.warning('(ROUTER) not sending to system %s as KeepAlive never seen',system)
elif CONFIG['SYSTEMS'][system]['_bcka'] < time() - 60:
logger.warning('(ROUTER) not sending to system %s as last KeepAlive was %s seconds ago',system, int(time() - CONFIG['SYSTEMS'][system]['_bcka']))
#Write SUB_MAP to disk
def subMapWrite():
try:
@ -483,6 +508,7 @@ def subMapWrite():
logger.info('(SUBSCRIBER) Writing SUB_MAP to disk')
except:
logger.warning('(SUBSCRIBER) Cannot write SUB_MAP to file')
#Subscriber Map trimmer loop
def SubMapTrimmer():
logger.debug('(SUBSCRIBER) Subscriber Map trimmer loop started')
@ -495,6 +521,7 @@ def SubMapTrimmer():
SUB_MAP.pop(_remove)
if CONFIG['ALIASES']['SUB_MAP_FILE']:
subMapWrite()
# run this every 10 seconds to trim stream ids
def stream_trimmer_loop():
logger.debug('(ROUTER) Trimming inactive stream IDs from system lists')
@ -593,6 +620,7 @@ def stream_trimmer_loop():
pass
else:
logger.debug('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS])
def sendVoicePacket(self,pkt,_source_id,_dest_id,_slot):
_stream_id = pkt[16:20]
_pkt_time = time()
@ -609,6 +637,7 @@ def sendVoicePacket(self,pkt,_source_id,_dest_id,_slot):
systems[system].STATUS[_stream_id]['LAST'] = _pkt_time
_slot['TX_TIME'] = _pkt_time
self.send_system(pkt)
def sendSpeech(self,speech):
logger.debug('(%s) Inside sendspeech thread',self._system)
sleep(1)
@ -624,6 +653,7 @@ def sendSpeech(self,speech):
sleep(0.058)
reactor.callFromThread(sendVoicePacket,self,pkt,_source_id,_nine,_slot)
logger.debug('(%s) Sendspeech thread ended',self._system)
def disconnectedVoice(system):
_nine = bytes_3(9)
_source_id = bytes_3(5000)
@ -658,6 +688,7 @@ def disconnectedVoice(system):
_pkt_time = time()
reactor.callFromThread(sendVoicePacket,systems[system],pkt,_source_id,_nine,_slot)
logger.debug('(%s) disconnected voice thread end',system)
def playFileOnRequest(self,fileNumber):
system = self._system
_lang = CONFIG['SYSTEMS'][system]['ANNOUNCEMENT_LANGUAGE']
@ -685,17 +716,22 @@ def playFileOnRequest(self,fileNumber):
_pkt_time = time()
reactor.callFromThread(sendVoicePacket,self,pkt,_source_id,_nine,_slot)
logger.debug('(%s) Sending AMBE file %s end',system,fileNumber)
def threadIdent():
logger.debug('(IDENT) starting ident thread')
reactor.callInThread(ident)
def threadAlias():
logger.debug('(ALIAS) starting alias thread')
reactor.callInThread(aliasb)
def setAlias(_peer_ids,_subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums):
peer_ids, subscriber_ids, talkgroup_ids,local_subscriber_ids,server_ids,checksums = _peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids,_server_ids,_checksums
def aliasb():
_peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums = mk_aliases(CONFIG)
reactor.callInThread(setAlias,_peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums)
def ident():
for system in systems:
if CONFIG['SYSTEMS'][system]['MODE'] != 'MASTER':
@ -759,6 +795,7 @@ def ident():
_stream_id = pkt[16:20]
_pkt_time = time()
reactor.callFromThread(sendVoicePacket,systems[system],pkt,_source_id,_dst_id,_slot)
def bridge_reset():
logger.debug('(BRIDGERESET) Running bridge resetter')
for _system in CONFIG['SYSTEMS']:
@ -771,6 +808,7 @@ def bridge_reset():
pass
CONFIG['SYSTEMS'][_system]['_reset'] = False
CONFIG['SYSTEMS'][_system]['_resetlog'] = False
def options_config():
logger.debug('(OPTIONS) Running options parser')
prohibitedTGs = [0,1,2,3,4,5,9,9990,9991,9992,9993,9994,9995,9996,9997,9998,9999]
@ -1006,6 +1044,7 @@ class routerOBP(OPENBRIDGE):
def __init__(self, _name, _config, _report):
OPENBRIDGE.__init__(self, _name, _config, _report)
self.STATUS = {}
def get_rptr(self,_sid):
_int_peer_id = int_id(_sid)
if _int_peer_id in local_subscriber_ids:
@ -1016,6 +1055,7 @@ class routerOBP(OPENBRIDGE):
return peer_ids[_int_peer_id]
else:
return _int_peer_id
def to_target(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,_noOBP,sysIgnore, _hops = b'', _source_server = b'\x00\x00\x00\x00', _ber = b'\x00', _rssi = b'\x00', _source_rptr = b'\x00\x00\x00\x00'):
_sysIgnore = sysIgnore
for _target in BRIDGES[_bridge]:
@ -1175,6 +1215,7 @@ class routerOBP(OPENBRIDGE):
#logger.debug('(%s) Packet routed by bridge: %s to system: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
#Ignore this system and TS pair if it's called again on this packet
return(_sysIgnore)
def sendDataToHBP(self,_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id):
_int_dst_id = int_id(_dst_id)
#Assemble transmit HBP packet header
@ -1184,6 +1225,7 @@ class routerOBP(OPENBRIDGE):
logger.debug('(%s) UNIT Data Bridged to HBP on slot 1: %s DST_ID: %s',self._system,_d_system,_int_dst_id)
if CONFIG['REPORTS']['REPORT']:
systems[_d_system]._report.send_bridgeEvent('UNIT DATA,DATA,TX,{},{},{},{},{},{}'.format(_d_system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore'))
def sendDataToOBP(self,_target,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_hops = b'',_source_server = b'\x00\x00\x00\x00', _ber = b'\x00', _rssi = b'\x00', _source_rptr = b'\x00\x00\x00\x00'):
_int_dst_id = int_id(_dst_id)
_target_status = systems[_target].STATUS
@ -1217,6 +1259,7 @@ class routerOBP(OPENBRIDGE):
logger.debug('(%s) UNIT Data Bridged to OBP System: %s DST_ID: %s', self._system, _target,_int_dst_id)
if CONFIG['REPORTS']['REPORT']:
systems[_target]._report.send_bridgeEvent('UNIT DATA,DATA,TX,{},{},{},{},{},{}'.format(_target, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore'))
def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data,_hash, _hops = b'', _source_server = b'\x00\x00\x00\x00', _ber = b'\x00', _rssi = b'\x00', _source_rptr = b'\x00\x00\x00\x00'):
pkt_time = time()
dmrpkt = _data[20:53]
@ -1224,6 +1267,7 @@ class routerOBP(OPENBRIDGE):
_h = blake2b(digest_size=16)
_h.update(_data)
_pkt_crc = _h.digest()
# ========================
# NUEVO: Procesar _hops
# ========================
@ -1233,6 +1277,28 @@ class routerOBP(OPENBRIDGE):
return
_new_hops = (_hop_count + 1).to_bytes(1, 'big')
# ========================
# NUEVO: Deduplicación global de streams desde múltiples OBP
# ========================
if self._CONFIG['SYSTEMS'][self._system]['MODE'] == 'OPENBRIDGE':
_now = pkt_time
_stream_key = (_stream_id, _dst_id)
# Limpiar entradas antiguas (> 2 segundos)
for key in list(GLOBAL_STREAM_LOG.keys()):
if _now - GLOBAL_STREAM_LOG[key] > 2.0:
del GLOBAL_STREAM_LOG[key]
# Si ya vimos este stream+TG desde otro OBP recientemente, ignorar
if _stream_key in GLOBAL_STREAM_LOG:
logger.debug('(%s) DUPLICATE STREAM from another OBP: STREAM_ID=%s TGID=%s, source_server=%s, ignoring',
self._system, int_id(_stream_id), int_id(_dst_id), int_id(_source_server))
return
# Registrar este stream como visto
GLOBAL_STREAM_LOG[_stream_key] = _now
# ========================
# Match UNIT data, SMS/GPS, and send it to the dst_id if it is in SUB_MAP
if _call_type == 'unit' and (_dtype_vseq == 6 or _dtype_vseq == 7 or _dtype_vseq == 8 or ((_stream_id not in self.STATUS) and _dtype_vseq == 3)):
_int_dst_id = int_id(_dst_id)
@ -1247,7 +1313,7 @@ class routerOBP(OPENBRIDGE):
'CONTENTION':False,
'RFS': _rf_src,
'TGID': _dst_id,
# '1ST': perf_counter(), # ELIMINADO
# '1ST': perf_counter(), # â ELIMINADO
'lastSeq': False,
'lastData': False,
'RX_PEER': _peer_id,
@ -1302,6 +1368,7 @@ class routerOBP(OPENBRIDGE):
else:
logger.debug('(%s) UNIT Data not bridged to HBP on slot %s - target busy: %s DST_ID: %s',self._system,_d_slot,_d_system,_int_dst_id)
self.STATUS[_stream_id]['crcs'].add(_pkt_crc)
if _call_type == 'group' or _call_type == 'vcsbk':
# Is this a new call stream?
if (_stream_id not in self.STATUS):
@ -1311,7 +1378,7 @@ class routerOBP(OPENBRIDGE):
'CONTENTION':False,
'RFS': _rf_src,
'TGID': _dst_id,
# '1ST': perf_counter(), # ELIMINADO
# '1ST': perf_counter(), # â ELIMINADO
'lastSeq': False,
'lastData': False,
'RX_PEER': _peer_id,
@ -1481,6 +1548,7 @@ class routerHBP(HBSYSTEM):
}
}
self.CALL_DATA = []
def to_target(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,_noOBP,sysIgnore,_source_server, _ber, _rssi, _source_rptr):
_sysIgnore = sysIgnore
for _target in BRIDGES[_bridge]:
@ -1631,6 +1699,7 @@ class routerHBP(HBSYSTEM):
# Transmit the packet to the destination system
systems[_target['SYSTEM']].send_system(_tmp_data,b'',_ber,_rssi,_source_server, _source_rptr)
return _sysIgnore
def sendDataToHBP(self,_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id):
#Assemble transmit HBP packet header
_int_dst_id = int_id(_dst_id)
@ -1640,6 +1709,7 @@ class routerHBP(HBSYSTEM):
logger.debug('(%s) UNIT Data Bridged to HBP on slot 1: %s DST_ID: %s',self._system,_d_system,_int_dst_id)
if CONFIG['REPORTS']['REPORT']:
systems[_d_system]._report.send_bridgeEvent('UNIT DATA,DATA,TX,{},{},{},{},{},{}'.format(_d_system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore'))
def sendDataToOBP(self,_target,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_hops = b'',_ber = b'\x00', _rssi = b'\x00',_source_server = b'\x00\x00\x00\x00', _source_rptr = b'\x00\x00\x00\x00'):
# _sysIgnore = sysIgnore
_source_server = self._CONFIG['GLOBAL']['SERVER_ID']
@ -1677,6 +1747,7 @@ class routerHBP(HBSYSTEM):
logger.debug('(%s) UNIT Data Bridged to OBP System: %s DST_ID: %s', self._system, _target,_int_dst_id)
if CONFIG['REPORTS']['REPORT']:
systems[system]._report.send_bridgeEvent('UNIT DATA,DATA,TX,{},{},{},{},{},{}'.format(_target, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore'))
def pvt_call_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _frame_type, _dtype_vseq, _stream_id, _data):
pkt_time = time()
dmrpkt = _data[20:53]
@ -1776,6 +1847,7 @@ class routerHBP(HBSYSTEM):
self.STATUS[_slot]['RX_TGID'] = _dst_id
self.STATUS[_slot]['RX_TIME'] = pkt_time
self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id
def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data):
try:
if CONFIG['SYSTEMS'][self._system]['_reset'] and not CONFIG['SYSTEMS'][self._system]['_resetlog']:
@ -1906,7 +1978,7 @@ class routerHBP(HBSYSTEM):
if _call_type == 'unit' and _int_dst_id == 4000:
logger.info('(%s) Private call to ID 4000 received on TS %s - deactivating all dynamic bridges', self._system, _slot)
deactivate_all_dynamic_bridges(self._system)
# Opcional: no procesar más reglas para este paquete
# Opcional: no procesar más reglas para este paquete
return
#Handle Private Calls
if _call_type == 'unit' and len(str(_int_dst_id)) == 7:
@ -2249,6 +2321,7 @@ class bridgeReportFactory(reportFactory):
if isinstance(_data, str):
_data = _data.decode('utf-8', error='ignore')
self.send_clients(b''.join([REPORT_OPCODES['BRDG_EVENT'],_data]))
#************************************************
# MAIN PROGRAM LOOP STARTS HERE
#************************************************

Loading…
Cancel
Save

Powered by TurnKey Linux.