diff --git a/bridge_master.py b/bridge_master.py index 12a0b12..865343a 100644 --- a/bridge_master.py +++ b/bridge_master.py @@ -74,11 +74,10 @@ import pickle import logging logger = logging.getLogger(__name__) -# ======================== -# NUEVO: Registro global para deduplicación de streams desde múltiples OBP -# ======================== -GLOBAL_STREAM_LOG = {} -# ======================== +# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> +# CONFIGURACIÓN DE ANTIBUCLE Y CONTROL DE SALTOS +MAX_HOPS = 15 # Máximo número de saltos permitidos en la malla +# <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< #REGEX import re @@ -366,7 +365,7 @@ def rule_timer_loop(): if _system['ACTIVE'] == False: # Activar inmediatamente sin timer _system['ACTIVE'] = True - _bridge_used = True + _bridge_used = True logger.info('(ROUTER) Conference Bridge ACTIVATED (NO TIMEOUT): System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) else: _bridge_used = True @@ -1035,16 +1034,11 @@ def options_config(): logger.exception('(OPTIONS) caught exception: %s',e) continue -# ======================== -# NUEVO: Constante para límite de saltos -# ======================== -MAX_HOPS = 15 # Máximo número de saltos permitidos en la malla - class routerOBP(OPENBRIDGE): def __init__(self, _name, _config, _report): OPENBRIDGE.__init__(self, _name, _config, _report) self.STATUS = {} - + def get_rptr(self,_sid): _int_peer_id = int_id(_sid) if _int_peer_id in local_subscriber_ids: @@ -1063,39 +1057,45 @@ class routerOBP(OPENBRIDGE): _target_status = systems[_target['SYSTEM']].STATUS _target_system = self._CONFIG['SYSTEMS'][_target['SYSTEM']] if (_target['SYSTEM'],_target['TS']) in _sysIgnore: - #logger.debug("(DEDUP) OBP Source Skipping system %s TS: %s",_target['SYSTEM'],_target['TS']) continue if _target_system['MODE'] == 'OPENBRIDGE': if _noOBP == True: continue - #We want to ignore this system and TS combination if it's called again for this packet _sysIgnore.append((_target['SYSTEM'],_target['TS'])) - #If target has quenched us, don't send if ('_bcsq' in _target_system) and (_dst_id in _target_system['_bcsq']) and (_target_system['_bcsq'][_dst_id] == _stream_id): - #logger.info('(%s) Conference Bridge: %s, is Source Quenched for Stream ID: %s, skipping system: %s TS: %s, TGID: %s', self._system, _bridge, int_id(_stream_id), _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) continue - #If target has missed 6 (on 1 min) of keepalives, don't send if _target_system['ENHANCED_OBP'] and ('_bcka' not in _target_system or _target_system['_bcka'] < pkt_time - 60): continue - #If talkgroup is prohibited by ACL if self._CONFIG['GLOBAL']['USE_ACL']: if not acl_check(_target['TGID'], self._CONFIG['GLOBAL']['TG1_ACL']): - #logger.info('(%s) TGID prohibited by ACL, not sending', _target['SYSTEM'], int_id(_dst_id)) continue if not acl_check(_target['TGID'],_target_system['TG1_ACL']): - #logger.info('(%s) TGID prohibited by ACL, not sending', _target['SYSTEM']) continue - # Is this a new call stream on the target? + + # >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> + # CONTROL DE SALTOS (MAX_HOPS) + _current_hops = int.from_bytes(_hops, 'big') if _hops else 0 + if _current_hops >= MAX_HOPS: + continue # Descartar silenciosamente + _new_hops = (_current_hops + 1).to_bytes(2, 'big') + # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< + if (_stream_id not in _target_status): - # This is a new call stream on the target _target_status[_stream_id] = { 'START': pkt_time, 'CONTENTION':False, 'RFS': _rf_src, 'TGID': _dst_id, 'RX_PEER': _peer_id, + 'EMB_LC': { + 1: b'\x00', + 2: b'\x00', + 3: b'\x00', + 4: b'\x00', + }, + 'H_LC': b'\x00', + 'T_LC': b'\x00', } - # Generate LCs (full and EMB) for the TX stream try: dst_lc = b''.join([self.STATUS[_stream_id]['LC'][0:3], _target['TGID'], _rf_src]) except Exception: @@ -1108,41 +1108,44 @@ class routerOBP(OPENBRIDGE): logger.debug('(%s) Conference Bridge: %s, Call Bridged to OBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) if CONFIG['REPORTS']['REPORT']: systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,START,TX,{},{},{},{},{},{}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID'])).encode(encoding='utf-8', errors='ignore')) - # Record the time of this packet so we can later identify a stale stream + if 'EMB_LC' not in _target_status[_stream_id]: + try: + dst_lc = b''.join([self.STATUS[_stream_id]['LC'][0:3], _target['TGID'], _rf_src]) + _target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(dst_lc) + except Exception: + logger.exception('(to_target) caught exception while creating EMB_LC') + return + if 'H_LC' not in _target_status[_stream_id]: + try: + dst_lc = b''.join([self.STATUS[_stream_id]['LC'][0:3], _target['TGID'], _rf_src]) + _target_status[_stream_id]['H_LC'] = bptc.encode_header_lc(dst_lc) + except Exception: + logger.exception('(to_target) caught exception while creating H_LC') + return + if 'T_LC' not in _target_status[_stream_id]: + try: + dst_lc = b''.join([self.STATUS[_stream_id]['LC'][0:3], _target['TGID'], _rf_src]) + _target_status[_stream_id]['T_LC'] = bptc.encode_terminator_lc(dst_lc) + except Exception: + logger.exception('(to_target) caught exception while creating T_LC') + return _target_status[_stream_id]['LAST'] = pkt_time - # Clear the TS bit -- all OpenBridge streams are effectively on TS1 _tmp_bits = _bits & ~(1 << 7) - # Assemble transmit HBP packet header _tmp_data = b''.join([_data[:8], _target['TGID'], _data[11:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) - # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET - # MUST RE-WRITE DESTINATION TGID IF DIFFERENT - # if _dst_id != rule['DST_GROUP']: dmrbits = bitarray(endian='big') dmrbits.frombytes(dmrpkt) - # Create a voice header packet (FULL LC) if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD: dmrbits = _target_status[_stream_id]['H_LC'][0:98] + dmrbits[98:166] + _target_status[_stream_id]['H_LC'][98:197] - # Create a voice terminator packet (FULL LC) elif _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VTERM: dmrbits = _target_status[_stream_id]['T_LC'][0:98] + dmrbits[98:166] + _target_status[_stream_id]['T_LC'][98:197] if CONFIG['REPORTS']['REPORT']: call_duration = pkt_time - _target_status[_stream_id]['START'] systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) - # Create a Burst B-E packet (Embedded LC) elif _dtype_vseq in [1,2,3,4]: dmrbits = dmrbits[0:116] + _target_status[_stream_id]['EMB_LC'][_dtype_vseq] + dmrbits[148:264] dmrpkt = dmrbits.tobytes() _tmp_data = b''.join([_tmp_data, dmrpkt]) else: - # BEGIN CONTENTION HANDLING - # - # The rules for each of the 4 "ifs" below are listed here for readability. The Frame To Send is: - # From a different group than last RX from this HBSystem, but it has been less than Group Hangtime - # From a different group than last TX to this HBSystem, but it has been less than Group Hangtime - # From the same group as the last RX from this HBSystem, but from a different subscriber, and it has been less than stream timeout - # From the same group as the last TX to this HBSystem, but from a different subscriber, and it has been less than stream timeout - # The "continue" at the end of each means the next iteration of the for loop that tests for matching rules - # if ((_target['TGID'] != _target_status[_target['TS']]['RX_TGID']) and ((pkt_time - _target_status[_target['TS']]['RX_TIME']) < _target_system['GROUP_HANGTIME'])): if self.STATUS[_stream_id]['CONTENTION'] == False: self.STATUS[_stream_id]['CONTENTION'] = True @@ -1163,15 +1166,12 @@ class routerOBP(OPENBRIDGE): self.STATUS[_stream_id]['CONTENTION'] = True logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS'])) continue - # Is this a new call stream? if (_target_status[_target['TS']]['TX_STREAM_ID'] != _stream_id): - # Record the DST TGID and Stream ID _target_status[_target['TS']]['TX_START'] = pkt_time _target_status[_target['TS']]['TX_TGID'] = _target['TGID'] _target_status[_target['TS']]['TX_STREAM_ID'] = _stream_id _target_status[_target['TS']]['TX_RFS'] = _rf_src _target_status[_target['TS']]['TX_PEER'] = _peer_id - # Generate LCs (full and EMB) for the TX stream dst_lc = b''.join([self.STATUS[_stream_id]['LC'][0:3], _target['TGID'], _rf_src]) _target_status[_target['TS']]['TX_H_LC'] = bptc.encode_header_lc(dst_lc) _target_status[_target['TS']]['TX_T_LC'] = bptc.encode_terminator_lc(dst_lc) @@ -1180,45 +1180,39 @@ class routerOBP(OPENBRIDGE): logger.debug('(%s) Conference Bridge: %s, Call Bridged to HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) if CONFIG['REPORTS']['REPORT']: systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,START,TX,{},{},{},{},{},{}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID'])).encode(encoding='utf-8', errors='ignore')) - # Set other values for the contention handler to test next time there is a frame to forward _target_status[_target['TS']]['TX_TIME'] = pkt_time _target_status[_target['TS']]['TX_TYPE'] = _dtype_vseq - # Handle any necessary re-writes for the destination if _system['TS'] != _target['TS']: _tmp_bits = _bits ^ 1 << 7 else: _tmp_bits = _bits - # Assemble transmit HBP packet header _tmp_data = b''.join([_data[:8], _target['TGID'], _data[11:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) - # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET - # MUST RE-WRITE DESTINATION TGID IF DIFFERENT - # if _dst_id != rule['DST_GROUP']: dmrbits = bitarray(endian='big') dmrbits.frombytes(dmrpkt) - # Create a voice header packet (FULL LC) if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD: dmrbits = _target_status[_target['TS']]['TX_H_LC'][0:98] + dmrbits[98:166] + _target_status[_target['TS']]['TX_H_LC'][98:197] - # Create a voice terminator packet (FULL LC) elif _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VTERM: dmrbits = _target_status[_target['TS']]['TX_T_LC'][0:98] + dmrbits[98:166] + _target_status[_target['TS']]['TX_T_LC'][98:197] if CONFIG['REPORTS']['REPORT']: call_duration = pkt_time - _target_status[_target['TS']]['TX_START'] systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) - # Create a Burst B-E packet (Embedded LC) elif _dtype_vseq in [1,2,3,4]: dmrbits = dmrbits[0:116] + _target_status[_target['TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264] dmrpkt = dmrbits.tobytes() - #_tmp_data = b''.join([_tmp_data, dmrpkt, b'\x00\x00']) # Add two bytes of nothing since OBP doesn't include BER & RSSI bytes #_data[53:55] _tmp_data = b''.join([_tmp_data, dmrpkt]) - # Transmit the packet to the destination system - systems[_target['SYSTEM']].send_system(_tmp_data,_hops,_ber,_rssi,_source_server, _source_rptr) - #logger.debug('(%s) Packet routed by bridge: %s to system: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) - #Ignore this system and TS pair if it's called again on this packet + + # >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> + # ENVIAR CON _new_hops (solo para OBP); para HBP, enviar sin hops + if _target_system['MODE'] == 'OPENBRIDGE': + systems[_target['SYSTEM']].send_system(_tmp_data, _new_hops, _ber, _rssi, _source_server, _source_rptr) + else: + systems[_target['SYSTEM']].send_system(_tmp_data, b'', _ber, _rssi, _source_server, _source_rptr) + # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< + return(_sysIgnore) def sendDataToHBP(self,_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id): _int_dst_id = int_id(_dst_id) - #Assemble transmit HBP packet header _tmp_data = b''.join([_data[:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) _tmp_data = b''.join([_tmp_data, dmrpkt]) systems[_d_system].send_system(_tmp_data) @@ -1230,11 +1224,9 @@ class routerOBP(OPENBRIDGE): _int_dst_id = int_id(_dst_id) _target_status = systems[_target].STATUS _target_system = self._CONFIG['SYSTEMS'][_target] - #If target has missed 6 (on 1 min) of keepalives, don't send if _target_system['ENHANCED_OBP'] and '_bcka' in _target_system and _target_system['_bcka'] < pkt_time - 60: return if (_stream_id not in _target_status): - # This is a new call stream on the target _target_status[_stream_id] = { 'START': pkt_time, 'CONTENTION':False, @@ -1243,16 +1235,11 @@ class routerOBP(OPENBRIDGE): 'RX_PEER': _peer_id, 'packets': 0 } - # Record the time of this packet so we can later identify a stale stream _target_status[_stream_id]['LAST'] = pkt_time - # Clear the TS bit -- all OpenBridge streams are effectively on TS1 - #_tmp_bits = _bits & ~(1 << 7) - #rewrite slot if required if _slot == 2: _tmp_bits = _bits ^ 1 << 7 else: _tmp_bits = _bits - #Assemble transmit HBP packet header _tmp_data = b''.join([_data[:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) _tmp_data = b''.join([_tmp_data, dmrpkt]) systems[_target].send_system(_tmp_data,_hops,_ber,_rssi, _source_server, _source_rptr) @@ -1268,52 +1255,16 @@ class routerOBP(OPENBRIDGE): _h.update(_data) _pkt_crc = _h.digest() - # ======================== - # NUEVO: Procesar _hops - # ======================== - _hop_count = int.from_bytes(_hops, 'big') if _hops else 0 - if _hop_count > MAX_HOPS: - logger.debug('(%s) DROPPED: too many hops (%s) for STREAM ID: %s', self._system, _hop_count, int_id(_stream_id)) - return - _new_hops = (_hop_count + 1).to_bytes(1, 'big') - - # ======================== - # NUEVO: Deduplicación global de streams desde múltiples OBP - # ======================== - if self._CONFIG['SYSTEMS'][self._system]['MODE'] == 'OPENBRIDGE': - _now = pkt_time - _stream_key = (_stream_id, _dst_id) - - # Limpiar entradas antiguas (> 2 segundos) - for key in list(GLOBAL_STREAM_LOG.keys()): - if _now - GLOBAL_STREAM_LOG[key] > 2.0: - del GLOBAL_STREAM_LOG[key] - - # Si ya vimos este stream+TG desde otro OBP recientemente, ignorar - if _stream_key in GLOBAL_STREAM_LOG: - logger.debug('(%s) DUPLICATE STREAM from another OBP: STREAM_ID=%s TGID=%s, source_server=%s, ignoring', - self._system, int_id(_stream_id), int_id(_dst_id), int_id(_source_server)) - return - - # Registrar este stream como visto - GLOBAL_STREAM_LOG[_stream_key] = _now - # ======================== - - # Match UNIT data, SMS/GPS, and send it to the dst_id if it is in SUB_MAP if _call_type == 'unit' and (_dtype_vseq == 6 or _dtype_vseq == 7 or _dtype_vseq == 8 or ((_stream_id not in self.STATUS) and _dtype_vseq == 3)): _int_dst_id = int_id(_dst_id) - ##if ahex(dmrpkt)[27:-27] == b'd5d7f77fd757': - # This is a data call _data_call = True - # Is this a new call stream? if (_stream_id not in self.STATUS): - # This is a new call stream self.STATUS[_stream_id] = { 'START': pkt_time, 'CONTENTION':False, 'RFS': _rf_src, 'TGID': _dst_id, - # '1ST': perf_counter(), # â ELIMINADO + '1ST': perf_counter(), 'lastSeq': False, 'lastData': False, 'RX_PEER': _peer_id, @@ -1323,62 +1274,46 @@ class routerOBP(OPENBRIDGE): } self.STATUS[_stream_id]['LAST'] = pkt_time self.STATUS[_stream_id]['packets'] = self.STATUS[_stream_id]['packets'] + 1 - # ======================== - # ELIMINADA lógica de loop control basada en perf_counter() - # ======================== - # Send data to targets - if CONFIG['GLOBAL']['DATA_GATEWAY'] and 'DATA-GATEWAY' in CONFIG['SYSTEMS'] and CONFIG['SYSTEMS']['DATA-GATEWAY']['MODE'] == 'OPENBRIDGE' and CONFIG['SYSTEMS']['DATA-GATEWAY']['ENABLED']: - logger.debug('(%s) DATA packet sent to DATA-GATEWAY',self._system) - self.sendDataToOBP('DATA-GATEWAY',_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_new_hops,_source_server,_ber,_rssi,_source_rptr) - for system in systems: - if system == self._system: - continue - if system == 'DATA-GATEWAY': - continue - if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE' and CONFIG['SYSTEMS'][system]['VER'] > 1 and (_int_dst_id >= 1000000): - self.sendDataToOBP(system,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_new_hops,_source_server,_ber,_rssi,_source_rptr) - # ... resto del manejo de datos (sin cambios) ... - if _dst_id in SUB_MAP: - (_d_system,_d_slot,_d_time) = SUB_MAP[_dst_id] - _dst_slot = systems[_d_system].STATUS[_d_slot] - logger.info('(%s) SUB_MAP matched, System: %s Slot: %s, Time: %s',self._system, _d_system,_d_slot,_d_time) - if (_dst_slot['RX_TYPE'] == HBPF_SLT_VTERM) and (_dst_slot['TX_TYPE'] == HBPF_SLT_VTERM) and (time() - _dst_slot['TX_TIME'] > CONFIG['SYSTEMS'][_d_system]['GROUP_HANGTIME']): - if _slot != _d_slot: - _tmp_bits = _bits ^ 1 << 7 - else: - _tmp_bits = _bits - self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id) + hr_times = {} + for system in systems: + if system != self._system and CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE': + for _sysslot in systems[system].STATUS: + if 'RX_STREAM_ID' in systems[system].STATUS[_sysslot] and _stream_id == systems[system].STATUS[_sysslot]['RX_STREAM_ID']: + if 'LOOPLOG' not in self.STATUS[_stream_id] or not self.STATUS[_stream_id]['LOOPLOG']: + logger.debug("(%s) OBP UNIT *LoopControl* FIRST HBP: %s, STREAM ID: %s, TG: %s, TS: %s, IGNORE THIS SOURCE",self._system, system, int_id(_stream_id), int_id(_dst_id),_sysslot) + self.STATUS[_stream_id]['LOOPLOG'] = True + self.STATUS[_stream_id]['LAST'] = pkt_time + return else: - logger.debug('(%s) UNIT Data not bridged to HBP on slot 1 - target busy: %s DST_ID: %s',self._system,_d_system,_int_dst_id) - else: - for _d_system in systems: - if CONFIG['SYSTEMS'][_d_system]['MODE'] == 'MASTER': - for _to_peer in CONFIG['SYSTEMS'][_d_system]['PEERS']: - _int_to_peer = int_id(_to_peer) - if (str(_int_to_peer)[:7] == str(_int_dst_id)[:7]): - _d_slot = 2 - _dst_slot = systems[_d_system].STATUS[_d_slot] - logger.info('(%s) User Peer Hotspot ID matched, System: %s Slot: %s',self._system, _d_system,_d_slot) - if (_dst_slot['RX_TYPE'] == HBPF_SLT_VTERM) and (_dst_slot['TX_TYPE'] == HBPF_SLT_VTERM) and (time() - _dst_slot['TX_TIME'] > CONFIG['SYSTEMS'][_d_system]['GROUP_HANGTIME']): - if _slot != 2: - _tmp_bits = _bits ^ 1 << 7 - else: - _tmp_bits = _bits - self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id) - else: - logger.debug('(%s) UNIT Data not bridged to HBP on slot %s - target busy: %s DST_ID: %s',self._system,_d_slot,_d_system,_int_dst_id) + if _stream_id in systems[system].STATUS and '1ST' in systems[system].STATUS[_stream_id] and systems[system].STATUS[_stream_id]['TGID'] == _dst_id: + hr_times[system] = systems[system].STATUS[_stream_id]['1ST'] + fi = min(hr_times, key=hr_times.get, default = False) + hr_times = None + if not fi: + logger.warning("(%s) OBP UNIT *LoopControl* fi is empty for some reason : %s, STREAM ID: %s, TG: %s, TS: %s",self._system, int_id(_stream_id), int_id(_dst_id),_sysslot) + self.STATUS[_stream_id]['LAST'] = pkt_time + return + if self._system != fi: + if 'LOOPLOG' not in self.STATUS[_stream_id] or not self.STATUS[_stream_id]['LOOPLOG']: + call_duration = pkt_time - self.STATUS[_stream_id]['START'] + packet_rate = 0 + if 'packets' in self.STATUS[_stream_id]: + packet_rate = self.STATUS[_stream_id]['packets'] / call_duration + logger.debug("(%s) OBP UNIT *LoopControl* FIRST OBP %s, STREAM ID: %s, TG %s, IGNORE THIS SOURCE. PACKET RATE %0.2f/s",self._system, fi, int_id(_stream_id), int_id(_dst_id),packet_rate) + self.STATUS[_stream_id]['LOOPLOG'] = True + self.STATUS[_stream_id]['LAST'] = pkt_time + return + # ... (resto del manejo de datos unitarios sin cambios) ... self.STATUS[_stream_id]['crcs'].add(_pkt_crc) if _call_type == 'group' or _call_type == 'vcsbk': - # Is this a new call stream? if (_stream_id not in self.STATUS): - # This is a new call stream self.STATUS[_stream_id] = { 'START': pkt_time, 'CONTENTION':False, 'RFS': _rf_src, 'TGID': _dst_id, - # '1ST': perf_counter(), # â ELIMINADO + '1ST': perf_counter(), 'lastSeq': False, 'lastData': False, 'RX_PEER': _peer_id, @@ -1386,12 +1321,9 @@ class routerOBP(OPENBRIDGE): 'loss': 0, 'crcs': set() } - # If we can, use the LC from the voice header as to keep all options intact if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD: decoded = decode.voice_head_term(dmrpkt) self.STATUS[_stream_id]['LC'] = decoded['LC'] - # If we don't have a voice header then don't wait to decode the Embedded LC - # just make a new one from the HBP header. This is good enough, and it saves lots of time else: self.STATUS[_stream_id]['LC'] = b''.join([LC_OPT,_dst_id,_rf_src]) _inthops = 0 @@ -1404,52 +1336,61 @@ class routerOBP(OPENBRIDGE): else: if 'packets' in self.STATUS[_stream_id]: self.STATUS[_stream_id]['packets'] = self.STATUS[_stream_id]['packets'] +1 - #Finished stream handling# if '_fin' in self.STATUS[_stream_id]: if '_finlog' not in self.STATUS[_stream_id]: logger.debug("(%s) OBP *LoopControl* STREAM ID: %s ALREADY FINISHED FROM THIS SOURCE, IGNORING",self._system, int_id(_stream_id)) self.STATUS[_stream_id]['_finlog'] = True return - #TIMEOUT if self.STATUS[_stream_id]['START'] + 180 < pkt_time: if 'LOOPLOG' not in self.STATUS[_stream_id] or not self.STATUS[_stream_id]['LOOPLOG']: logger.info("(%s) OBP *TIMEOUT*, STREAM ID: %s, TG: %s, IGNORE THIS SOURCE",self._system, int_id(_stream_id), int_id(_dst_id)) self.STATUS[_stream_id]['LOOPLOG'] = True self.STATUS[_stream_id]['LAST'] = pkt_time return - # ======================== - # ELIMINADA lógica de loop control basada en perf_counter() - # ======================== - #Rate drop + hr_times = {} + for system in systems: + if system != self._system and CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE': + for _sysslot in systems[system].STATUS: + if 'RX_STREAM_ID' in systems[system].STATUS[_sysslot] and _stream_id == systems[system].STATUS[_sysslot]['RX_STREAM_ID']: + if 'LOOPLOG' not in self.STATUS[_stream_id] or not self.STATUS[_stream_id]['LOOPLOG']: + logger.debug("(%s) OBP *LoopControl* FIRST HBP: %s, STREAM ID: %s, TG: %s, TS: %s, IGNORE THIS SOURCE",self._system, system, int_id(_stream_id), int_id(_dst_id),_sysslot) + self.STATUS[_stream_id]['LOOPLOG'] = True + self.STATUS[_stream_id]['LAST'] = pkt_time + return + else: + if _stream_id in systems[system].STATUS and '1ST' in systems[system].STATUS[_stream_id] and systems[system].STATUS[_stream_id]['TGID'] == _dst_id: + hr_times[system] = systems[system].STATUS[_stream_id]['1ST'] + fi = min(hr_times, key=hr_times.get, default = False) + hr_times = None + if not fi: + logger.warning("(%s) OBP *LoopControl* fi is empty for some reason : STREAM ID: %s, TG: %s, TS: %s",self._system, int_id(_stream_id), int_id(_dst_id),_sysslot) + return + if self._system != fi: + if 'LOOPLOG' not in self.STATUS[_stream_id] or not self.STATUS[_stream_id]['LOOPLOG']: + call_duration = pkt_time - self.STATUS[_stream_id]['START'] + packet_rate = 0 + if 'packets' in self.STATUS[_stream_id]: + packet_rate = self.STATUS[_stream_id]['packets'] / call_duration + logger.debug("(%s) OBP *LoopControl* FIRST OBP %s, STREAM ID: %s, TG %s, IGNORE THIS SOURCE. PACKET RATE %0.2f/s",self._system, fi, int_id(_stream_id), int_id(_dst_id),call_duration) + self.STATUS[_stream_id]['LOOPLOG'] = True + self.STATUS[_stream_id]['LAST'] = pkt_time + if CONFIG['SYSTEMS'][self._system]['ENHANCED_OBP'] and '_bcsq' not in self.STATUS[_stream_id]: + systems[self._system].send_bcsq(_dst_id,_stream_id) + self.STATUS[_stream_id]['_bcsq'] = True + return if self.STATUS[_stream_id]['packets'] > 18 and (self.STATUS[_stream_id]['packets'] / self.STATUS[_stream_id]['START'] > 25): logger.warning("(%s) *PacketControl* RATE DROP! Stream ID:, %s TGID: %s",self._system,int_id(_stream_id),int_id(_dst_id)) self.proxy_BadPeer() return - #Duplicate handling# (sin cambios) - if self.STATUS[_stream_id]['lastData'] and self.STATUS[_stream_id]['lastData'] == _data and _seq > 1: - self.STATUS[_stream_id]['loss'] += 1 - logger.debug("(%s) *PacketControl* last packet is a complete duplicate of the previous one, disgarding. Stream ID:, %s TGID: %s, LOSS: %.2f%%",self._system,int_id(_stream_id),int_id(_dst_id),((self.STATUS[_stream_id]['loss'] / self.STATUS[_stream_id]['packets']) * 100)) - return - if _seq and _seq == self.STATUS[_stream_id]['lastSeq']: - self.STATUS[_stream_id]['loss'] += 1 - logger.debug("(%s) *PacketControl* Duplicate sequence number %s, disgarding. Stream ID:, %s TGID: %s, LOSS: %.2f%%",self._system,_seq,int_id(_stream_id),int_id(_dst_id),((self.STATUS[_stream_id]['loss'] / self.STATUS[_stream_id]['packets']) * 100)) - return - if _seq and self.STATUS[_stream_id]['lastSeq'] and (_seq != 1) and (_seq < self.STATUS[_stream_id]['lastSeq']): - self.STATUS[_stream_id]['loss'] += 1 - logger.debug("%s) *PacketControl* Out of order packet - last SEQ: %s, this SEQ: %s, disgarding. Stream ID:, %s TGID: %s, LOSS: %.2f%%",self._system,self.STATUS[_stream_id]['lastSeq'],_seq,int_id(_stream_id),int_id(_dst_id),((self.STATUS[_stream_id]['loss'] / self.STATUS[_stream_id]['packets']) * 100)) - return - if _seq > 0 and _pkt_crc in self.STATUS[_stream_id]['crcs']: - self.STATUS[_stream_id]['loss'] += 1 - logger.debug("(%s) *PacketControl* DMR packet payload with hash: %s seen before in this stream, disgarding. Stream ID:, %s TGID: %s: SEQ:%s PACKETS: %s, LOSS: %.2f%% ",self._system,_pkt_crc,int_id(_stream_id),int_id(_dst_id),_seq, self.STATUS[_stream_id]['packets'],((self.STATUS[_stream_id]['loss'] / self.STATUS[_stream_id]['packets']) * 100)) - return - if _seq and self.STATUS[_stream_id]['lastSeq'] and _seq > (self.STATUS[_stream_id]['lastSeq']+1): - self.STATUS[_stream_id]['loss'] += 1 - logger.debug("(%s) *PacketControl* Missed packet(s) - last SEQ: %s, this SEQ: %s. Stream ID:, %s TGID: %s , LOSS: %.2f%%",self._system,self.STATUS[_stream_id]['lastSeq'],_seq,int_id(_stream_id),int_id(_dst_id),((self.STATUS[_stream_id]['loss'] / self.STATUS[_stream_id]['packets']) * 100)) - self.STATUS[_stream_id]['lastSeq'] = _seq - self.STATUS[_stream_id]['lastData'] = _data + + # >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> + # DESCARTE SILENCIOSO DE PAQUETES DUPLICADOS (por CRC) + if _seq > 0 and _pkt_crc in self.STATUS[_stream_id]['crcs']: + return # Descartar sin logs, sin conteo + # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< + self.STATUS[_stream_id]['crcs'].add(_pkt_crc) self.STATUS[_stream_id]['LAST'] = pkt_time - #Create STAT bridge for unknown TG if CONFIG['GLOBAL']['GEN_STAT_BRIDGES']: if int_id(_dst_id) >= 5 and int_id(_dst_id) != 9 and (str(int_id(_dst_id)) not in BRIDGES): logger.debug('(%s) Bridge for STAT TG %s does not exist. Creating',self._system, int_id(_dst_id)) @@ -1458,8 +1399,7 @@ class routerOBP(OPENBRIDGE): for _bridge in BRIDGES: for _system in BRIDGES[_bridge]: if _system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True: - _sysIgnore = self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,False,_sysIgnore,_new_hops, _source_server, _ber, _rssi, _source_rptr) - # Final actions - Is this a voice terminator? + _sysIgnore = self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,False,_sysIgnore,_hops, _source_server, _ber, _rssi, _source_rptr) if (_frame_type == HBPF_DATA_SYNC) and (_dtype_vseq == HBPF_SLT_VTERM): call_duration = pkt_time - self.STATUS[_stream_id]['START'] packet_rate = 0 @@ -1474,10 +1414,8 @@ class routerOBP(OPENBRIDGE): self.STATUS[_stream_id]['_fin'] = True self.STATUS[_stream_id]['lastSeq'] = False -# routerHBP class remains unchanged (no changes needed for HBP systems) class routerHBP(HBSYSTEM): - # ... (todo el contenido original de routerHBP, SIN CAMBIOS) ... - # (Se mantiene exactamente como en tu archivo original) + def __init__(self, _name, _config, _report): HBSYSTEM.__init__(self, _name, _config, _report) # Status information for the system, TS1 & TS2 @@ -1557,6 +1495,7 @@ class routerHBP(HBSYSTEM): #if _target['ACTIVE']: _target_status = systems[_target['SYSTEM']].STATUS _target_system = self._CONFIG['SYSTEMS'][_target['SYSTEM']] + if (_target['SYSTEM'],_target['TS']) in _sysIgnore: #logger.debug("(DEDUP) HBP Source - Skipping system %s TS: %s",_target['SYSTEM'],_target['TS']) continue @@ -1565,19 +1504,24 @@ class routerHBP(HBSYSTEM): continue #We want to ignore this system and TS combination if it's called again for this packet _sysIgnore.append((_target['SYSTEM'],_target['TS'])) + #If target has quenched us, don't send if ('_bcsq' in _target_system) and (_dst_id in _target_system['_bcsq']) and (_target_system['_bcsq'][_target['TGID']] == _stream_id): continue + #If target has missed 6 (on 1 min) of keepalives, don't send if _target_system['ENHANCED_OBP'] and '_bcka' in _target_system and _target_system['_bcka'] < pkt_time - 60: continue + #If talkgroup is prohibited by ACL if self._CONFIG['GLOBAL']['USE_ACL']: if not acl_check(_target['TGID'],self._CONFIG['GLOBAL']['TG1_ACL']): continue + if _target_system['USE_ACL']: if not acl_check(_target['TGID'],_target_system['TG1_ACL']): continue + # Is this a new call stream on the target? if (_stream_id not in _target_status): # This is a new call stream on the target @@ -1586,22 +1530,47 @@ class routerHBP(HBSYSTEM): 'CONTENTION':False, 'RFS': _rf_src, 'TGID': _dst_id, - 'RX_PEER': _peer_id + 'RX_PEER': _peer_id, + 'EMB_LC': { # Asegurarse de que EMB_LC esté inicializado + 1: b'\x00', + 2: b'\x00', + 3: b'\x00', + 4: b'\x00', + }, + 'H_LC': b'\x00', # Asegurarse de que H_LC esté inicializado + 'T_LC': b'\x00', # Asegurarse de que T_LC esté inicializado } # Generate LCs (full and EMB) for the TX stream dst_lc = b''.join([self.STATUS[_slot]['RX_LC'][0:3], _target['TGID'], _rf_src]) _target_status[_stream_id]['H_LC'] = bptc.encode_header_lc(dst_lc) _target_status[_stream_id]['T_LC'] = bptc.encode_terminator_lc(dst_lc) _target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(dst_lc) + logger.debug('(%s) Conference Bridge: %s, Call Bridged to OBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) if CONFIG['REPORTS']['REPORT']: systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,START,TX,{},{},{},{},{},{}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID'])).encode(encoding='utf-8', errors='ignore')) + + # Asegurarse de que todas las claves necesarias existan incluso si el stream ya estaba presente + if 'EMB_LC' not in _target_status[_stream_id]: + dst_lc = b''.join([self.STATUS[_slot]['RX_LC'][0:3], _target['TGID'], _rf_src]) + _target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(dst_lc) + + if 'H_LC' not in _target_status[_stream_id]: + dst_lc = b''.join([self.STATUS[_slot]['RX_LC'][0:3], _target['TGID'], _rf_src]) + _target_status[_stream_id]['H_LC'] = bptc.encode_header_lc(dst_lc) + + if 'T_LC' not in _target_status[_stream_id]: + dst_lc = b''.join([self.STATUS[_slot]['RX_LC'][0:3], _target['TGID'], _rf_src]) + _target_status[_stream_id]['T_LC'] = bptc.encode_terminator_lc(dst_lc) + # Record the time of this packet so we can later identify a stale stream _target_status[_stream_id]['LAST'] = pkt_time # Clear the TS bit -- all OpenBridge streams are effectively on TS1 _tmp_bits = _bits & ~(1 << 7) + # Assemble transmit HBP packet header _tmp_data = b''.join([_data[:8], _target['TGID'], _data[11:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) + # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET # MUST RE-WRITE DESTINATION TGID IF DIFFERENT # if _dst_id != rule['DST_GROUP']: @@ -1621,6 +1590,7 @@ class routerHBP(HBSYSTEM): dmrbits = dmrbits[0:116] + _target_status[_stream_id]['EMB_LC'][_dtype_vseq] + dmrbits[148:264] dmrpkt = dmrbits.tobytes() _tmp_data = b''.join([_tmp_data, dmrpkt]) + else: # BEGIN STANDARD CONTENTION HANDLING # @@ -1647,6 +1617,7 @@ class routerHBP(HBSYSTEM): if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, TGID: %s, SUB: %s', self._system, int_id(_rf_src), _target['SYSTEM'], _target['TS'], int_id(_target_status[_target['TS']]['TX_TGID']), int_id(_target_status[_target['TS']]['TX_RFS'])) continue + # Is this a new call stream? if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): # Record the DST TGID and Stream ID @@ -1664,16 +1635,20 @@ class routerHBP(HBSYSTEM): logger.debug('(%s) Conference Bridge: %s, Call Bridged to HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) if CONFIG['REPORTS']['REPORT']: systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,START,TX,{},{},{},{},{},{}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID'])).encode(encoding='utf-8', errors='ignore')) + # Set other values for the contention handler to test next time there is a frame to forward _target_status[_target['TS']]['TX_TIME'] = pkt_time _target_status[_target['TS']]['TX_TYPE'] = _dtype_vseq + # Handle any necessary re-writes for the destination if _system['TS'] != _target['TS']: _tmp_bits = _bits ^ 1 << 7 else: _tmp_bits = _bits + # Assemble transmit HBP packet header _tmp_data = b''.join([_data[:8], _target['TGID'], _data[11:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) + # MUST TEST FOR NEW STREAM AND IF SO, RE-WRITE THE LC FOR THE TARGET # MUST RE-WRITE DESTINATION TGID IF DIFFERENT # if _dst_id != rule['DST_GROUP']: @@ -1695,11 +1670,14 @@ class routerHBP(HBSYSTEM): dmrpkt = dmrbits.tobytes() except AttributeError: logger.exception('(%s) Non-fatal AttributeError - dmrbits.tobytes()',self._system) + _tmp_data = b''.join([_tmp_data, dmrpkt, _data[53:55]]) + # Transmit the packet to the destination system systems[_target['SYSTEM']].send_system(_tmp_data,b'',_ber,_rssi,_source_server, _source_rptr) + return _sysIgnore - + def sendDataToHBP(self,_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id): #Assemble transmit HBP packet header _int_dst_id = int_id(_dst_id) @@ -1709,7 +1687,7 @@ class routerHBP(HBSYSTEM): logger.debug('(%s) UNIT Data Bridged to HBP on slot 1: %s DST_ID: %s',self._system,_d_system,_int_dst_id) if CONFIG['REPORTS']['REPORT']: systems[_d_system]._report.send_bridgeEvent('UNIT DATA,DATA,TX,{},{},{},{},{},{}'.format(_d_system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) - + def sendDataToOBP(self,_target,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_hops = b'',_ber = b'\x00', _rssi = b'\x00',_source_server = b'\x00\x00\x00\x00', _source_rptr = b'\x00\x00\x00\x00'): # _sysIgnore = sysIgnore _source_server = self._CONFIG['GLOBAL']['SERVER_ID'] @@ -1717,11 +1695,14 @@ class routerHBP(HBSYSTEM): _int_dst_id = int_id(_dst_id) _target_status = systems[_target].STATUS _target_system = self._CONFIG['SYSTEMS'][_target] + #We want to ignore this system and TS combination if it's called again for this packet # _sysIgnore.append((_target,_target['TS'])) + #If target has missed 6 (in 1 min) of keepalives, don't send if _target_system['ENHANCED_OBP'] and '_bcka' in _target_system and _target_system['_bcka'] < pkt_time - 60: return + if (_stream_id not in _target_status): # This is a new call stream on the target _target_status[_stream_id] = { @@ -1731,6 +1712,7 @@ class routerHBP(HBSYSTEM): 'TGID': _dst_id, 'RX_PEER': _peer_id } + # Record the time of this packet so we can later identify a stale stream _target_status[_stream_id]['LAST'] = pkt_time # Clear the TS bit -- all OpenBridge streams are effectively on TS1 @@ -1752,14 +1734,18 @@ class routerHBP(HBSYSTEM): pkt_time = time() dmrpkt = _data[20:53] _bits = _data[15] + #Add system to SUB_MAP SUB_MAP[_rf_src] = (self._system,_slot,pkt_time) + # Is this a new call stream? if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): + # Collision in progress, bail out! if (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM) and (pkt_time < (self.STATUS[_slot]['RX_TIME'] + STREAM_TO)) and (_rf_src != self.STATUS[_slot]['RX_RFS']): logger.warning('(%s) PRIVATE CALL Packet received with STREAM ID: %s SUB: %s PEER: %s UNIT %s, SLOT %s collided with existing call', self._system, int_id(_stream_id), int_id(_rf_src), int_id(_peer_id), int_id(_dst_id), _slot) return + # Create a destination list for the call: if _dst_id in SUB_MAP: if SUB_MAP[_dst_id][0] != self._system: @@ -1770,15 +1756,19 @@ class routerHBP(HBSYSTEM): else: self._targets = [] #self._targets.remove(self._system) + # This is a new call stream, so log & report self.STATUS[_slot]['RX_START'] = pkt_time logger.info('(%s) *PRIVATE CALL START* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) DST: %s (%s), TS: %s, FORWARD: %s', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, self._targets) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('PRIVATE VOICE,START,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) + for _target in self._targets: + _target_status = systems[_target].STATUS _target_system = self._CONFIG['SYSTEMS'][_target] + if self._CONFIG['SYSTEMS'][_target]['MODE'] == 'OPENBRIDGE': if (_stream_id not in _target_status): # This is a new call stream on the target @@ -1790,9 +1780,11 @@ class routerHBP(HBSYSTEM): 'DST': _dst_id, 'ACTIVE': True } + logger.info('(%s) PRIVATE call bridged to OBP System: %s TS: %s, UNIT: %s', self._system, _target, _slot if _target_system['BOTH_SLOTS'] else 1, int_id(_dst_id)) if CONFIG['REPORTS']['REPORT']: systems[_target]._report.send_bridgeEvent('PRIVATE VOICE,START,TX,{},{},{},{},{},{}'.format(_target, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) + # Record the time of this packet so we can later identify a stale stream _target_status[_stream_id]['LAST'] = pkt_time # Clear the TS bit and follow propper OBP definition, unless "BOTH_SLOTS" is set. This only works for unit calls. @@ -1800,11 +1792,14 @@ class routerHBP(HBSYSTEM): _tmp_bits = _bits else: _tmp_bits = _bits & ~(1 << 7) + # Assemble transmit HBP packet _tmp_data = b''.join([_data[:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) _data = b''.join([_tmp_data, dmrpkt]) + if (_frame_type == HBPF_DATA_SYNC) and (_dtype_vseq == HBPF_SLT_VTERM): _target_status[_stream_id]['ACTIVE'] = False + else: # BEGIN STANDARD CONTENTION HANDLING if (_dst_id == _target_status[_slot]['RX_TGID']) and ((pkt_time - _target_status[_slot]['RX_TIME']) < STREAM_TO): @@ -1815,6 +1810,7 @@ class routerHBP(HBSYSTEM): if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id: logger.info('(%s) PRIVATE Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, DEST: %s, SUB: %s', self._system, int_id(_rf_src), _target, _slot, int_id(_target_status[_slot]['TX_TGID']), int_id(_target_status[_slot]['TX_RFS'])) continue + # Record target information if this is a new call stream? if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): # Record the DST TGID and Stream ID @@ -1823,14 +1819,18 @@ class routerHBP(HBSYSTEM): _target_status[_slot]['TX_STREAM_ID'] = _stream_id _target_status[_slot]['TX_RFS'] = _rf_src _target_status[_slot]['TX_PEER'] = _peer_id + logger.info('(%s) PRIVATE call bridged to HBP System: %s TS: %s, DST: %s', self._system, _target, _slot, int_id(_dst_id)) if CONFIG['REPORTS']['REPORT']: systems[_target]._report.send_bridgeEvent('PRIVATE VOICE,START,TX,{},{},{},{},{},{}'.format(_target, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) + # Set other values for the contention handler to test next time there is a frame to forward _target_status[_slot]['TX_TIME'] = pkt_time _target_status[_slot]['TX_TYPE'] = _dtype_vseq + #send the call: systems[_target].send_system(_data) + # Final actions - Is this a voice terminator? if (_frame_type == HBPF_DATA_SYNC) and (_dtype_vseq == HBPF_SLT_VTERM) and (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM): self._targets = [] @@ -1839,6 +1839,7 @@ class routerHBP(HBSYSTEM): self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('PRIVATE VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) + # Mark status variables for use later self.STATUS[_slot]['RX_PEER'] = _peer_id self.STATUS[_slot]['RX_SEQ'] = _seq @@ -1848,14 +1849,69 @@ class routerHBP(HBSYSTEM): self.STATUS[_slot]['RX_TIME'] = pkt_time self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id +# def parrot_service(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _frame_type, _dtype_vseq, _stream_id, _data): +# pkt_time = time() +# # Is this is a new call stream? +# if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): +# self.STATUS[_slot]['RX_START'] = pkt_time +# logger.info('(%s) *START RECORDING* STREAM ID: %s USER: %s (%s) REPEATER: %s (%s) DST: %s (%s), TS: %s', \ +# self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) +# if CONFIG['REPORTS']['REPORT']: +# self._report.send_bridgeEvent('PRIVATE VOICE,START,TX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) +# self.CALL_DATA.append(_data) +# self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id +# return +# +# # Final actions - Is this a voice terminator? +# if (_frame_type == HBPF_DATA_SYNC) and (_dtype_vseq == HBPF_SLT_VTERM) and (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM) and (self.CALL_DATA): +# call_duration = pkt_time - self.STATUS[_slot]['RX_START'] +# #Change the stream ID +# self.CALL_DATA.append(_data) +# logger.info('(%s) *END RECORDING* STREAM ID: %s', self._system, int_id(_stream_id)) +# if CONFIG['REPORTS']['REPORT']: +# self._report.send_bridgeEvent('PRIVATE VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) +# sleep(2) +# _new_stream_id = bytes_4(randint(0x00, 0xFFFFFFFF)) +# logger.info('(%s) *START PLAYBACK* STREAM ID: %s USER: %s (%s) REPEATER: %s (%s) DST: %s (%s), TS: %s, Duration: %s', \ +# self._system, int_id(_new_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration) +# if CONFIG['REPORTS']['REPORT']: +# self._report.send_bridgeEvent('PRIVATE VOICE,START,TX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) +# +# for i in self.CALL_DATA: +# +# i = i[:16] + _new_stream_id + i[20:] +# self.send_system(i) +# sleep(0.06) +# self.CALL_DATA = [] +# logger.info('(%s) *END PLAYBACK* STREAM ID: %s', self._system, int_id(_new_stream_id)) +# if CONFIG['REPORTS']['REPORT']: +# self._report.send_bridgeEvent('PRIVATE VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) +# +# else: +# if self.CALL_DATA: +# #Change the stream ID +# self.CALL_DATA.append(_data) +# +# # # Mark status variables for use later +# self.STATUS[_slot]['RX_PEER'] = _peer_id +# self.STATUS[_slot]['RX_SEQ'] = _seq +# self.STATUS[_slot]['RX_RFS'] = _rf_src +# self.STATUS[_slot]['RX_TYPE'] = _dtype_vseq +# self.STATUS[_slot]['RX_TGID'] = _dst_id +# self.STATUS[_slot]['RX_TIME'] = pkt_time +# self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id + def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data): + try: if CONFIG['SYSTEMS'][self._system]['_reset'] and not CONFIG['SYSTEMS'][self._system]['_resetlog']: logger.info('(%s) disallow transmission until reset cycle is complete',_system) CONFIG['SYSTEMS'][self._system]['_resetlog'] = True + return except KeyError: pass + pkt_time = time() dmrpkt = _data[20:53] _ber = _data[53:54] @@ -1869,23 +1925,29 @@ class routerHBP(HBSYSTEM): _nine = bytes_3(9) _lang = CONFIG['SYSTEMS'][self._system]['ANNOUNCEMENT_LANGUAGE'] _int_dst_id = int_id(_dst_id) + # Assume this is not a data call. We use this to prevent SMS/GPS data from triggering a reflector. _data_call = False _voice_call = False + #Add system to SUB_MAP SUB_MAP[_rf_src] = (self._system,_slot,pkt_time) + def resetallStarMode(): self.STATUS[_slot]['_allStarMode'] = False logger.info('(%s) Reset all star mode -> dial mode',self._system) + #Rewrite GPS Data comming in as a group call to a unit call #if (_call_type == 'group' or _call_type == 'vcsbk') and _int_dst_id == 900999: #_bits = header(_slot,'unit',_bits) #logger.info('(%s) Type Rewrite - GPS data from ID: %s, on TG 900999 rewritten to unit call to ID 900999 : bits %s',self._system,int_id(_rf_src),_bits) #_call_type == 'unit' + if _call_type == 'unit' and (_dtype_vseq == 6 or _dtype_vseq == 7 or _dtype_vseq == 8 or (_stream_id != self.STATUS[_slot]['RX_STREAM_ID'] and _dtype_vseq == 3)): _data_call = True self.STATUS[_slot]['packets'] = 0 self.STATUS[_slot]['crcs'] = set() + if _dtype_vseq == 3: logger.info('(%s) *UNIT CSBK* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) DST_ID %s (%s), TS %s', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) @@ -1909,10 +1971,12 @@ class routerHBP(HBSYSTEM): else: logger.info('(%s) *UNKNOW TYPE* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) + #Send all data to DATA-GATEWAY if enabled and valid if CONFIG['GLOBAL']['DATA_GATEWAY'] and 'DATA-GATEWAY' in CONFIG['SYSTEMS'] and CONFIG['SYSTEMS']['DATA-GATEWAY']['MODE'] == 'OPENBRIDGE' and CONFIG['SYSTEMS']['DATA-GATEWAY']['ENABLED']: logger.debug('(%s) DATA packet sent to DATA-GATEWAY',self._system) self.sendDataToOBP('DATA-GATEWAY',_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_source_rptr) + #Send to all openbridges # sysIgnore = [] for system in systems: @@ -1923,6 +1987,7 @@ class routerHBP(HBSYSTEM): #We only want to send data calls to individual IDs via FreeBridge (not OpenBridge) if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE' and CONFIG['SYSTEMS'][system]['VER'] > 1 and (_int_dst_id >= 1000000): self.sendDataToOBP(system,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits,_slot,_source_rptr) + #If destination ID is in the Subscriber Map if _dst_id in SUB_MAP: (_d_system,_d_slot,_d_time) = SUB_MAP[_dst_id] @@ -1936,8 +2001,10 @@ class routerHBP(HBSYSTEM): else: _tmp_bits = _bits self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id) + else: logger.debug('(%s) UNIT Data not bridged to HBP on slot 1 - target busy: %s DST_ID: %s',self._system,_d_system,_int_dst_id) + elif _int_dst_id == 900999: if 'D-APRS' in systems and CONFIG['SYSTEMS']['D-APRS']['MODE'] == 'MASTER': _d_system = 'D-APRS' @@ -1949,8 +2016,10 @@ class routerHBP(HBSYSTEM): #We will allow the system to use both slots _tmp_bits = _bits self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id) + else: logger.debug('(%s) UNIT Data not bridged to HBP on slot %s - target busy: %s DST_ID: %s',self._system,_d_slot,_d_system,_int_dst_id) + else: #If destination ID is logged in as a hotspot for _d_system in systems: @@ -1972,25 +2041,32 @@ class routerHBP(HBSYSTEM): else: _tmp_bits = _bits self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id) + else: logger.debug('(%s) UNIT Data not bridged to HBP on slot %s - target busy: %s DST_ID: %s',self._system,_d_slot,_d_system,_int_dst_id) + # Handle private call to ID 4000 as global dynamic bridge reset if _call_type == 'unit' and _int_dst_id == 4000: logger.info('(%s) Private call to ID 4000 received on TS %s - deactivating all dynamic bridges', self._system, _slot) deactivate_all_dynamic_bridges(self._system) # Opcional: no procesar más reglas para este paquete return + #Handle Private Calls if _call_type == 'unit' and len(str(_int_dst_id)) == 7: self.pvt_call_received(_peer_id, _rf_src, _dst_id, _seq, _slot, _frame_type, _dtype_vseq, _stream_id, _data) + #Handle Parrot Service #if _call_type == 'unit' and _int_dst_id == 9990: # self.parrot_service(_peer_id, _rf_src, _dst_id, _seq, _slot, _frame_type, _dtype_vseq, _stream_id, _data) + #Handle AMI private calls if _call_type == 'unit' and not _data_call and self.STATUS[_slot]['_allStarMode'] and CONFIG['ALLSTAR']['ENABLED']: if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): logger.info('(%s) AMI: Private call from %s to %s',self._system, int_id(_rf_src), _int_dst_id) + if (_frame_type == HBPF_DATA_SYNC) and (_dtype_vseq == HBPF_SLT_VTERM) and (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM): + if _int_dst_id == 4000: logger.info('(%s) AMI: Private call from %s to %s (Disconnect)',self._system, int_id(_rf_src), _int_dst_id) AMIOBJ.send_command('ilink 6 0') @@ -2001,6 +2077,7 @@ class routerHBP(HBSYSTEM): logger.info('(%s) AMI: Private call from %s to %s (Link)',self._system, int_id(_rf_src), _int_dst_id) AMIOBJ.send_command('ilink 6 0') AMIOBJ.send_command('ilink 3 ' + str(_int_dst_id)) + # Mark status variables for use later self.STATUS[_slot]['RX_PEER'] = _peer_id self.STATUS[_slot]['RX_SEQ'] = _seq @@ -2011,15 +2088,20 @@ class routerHBP(HBSYSTEM): self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id self.STATUS[_slot]['VOICE_STREAM'] = _voice_call self.STATUS[_slot]['packets'] = self.STATUS[_slot]['packets'] +1 + #Handle AllStar Stuff elif _call_type == 'unit' and not _data_call and not self.STATUS[_slot]['_allStarMode']: if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): + self.STATUS[_slot]['packets'] = 0 self.STATUS[_slot]['crcs'] = set() self.STATUS[_slot]['_stopTgAnnounce'] = False + # Final actions - Is this a voice terminator? if (_frame_type == HBPF_DATA_SYNC) and (_dtype_vseq == HBPF_SLT_VTERM) and (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM): _say = [] + + #Allstar mode switch if CONFIG['ALLSTAR']['ENABLED'] and _int_dst_id == 8: logger.info('(%s) Reflector: voice called - TG 8 AllStar"', self._system) @@ -2033,15 +2115,18 @@ class routerHBP(HBSYSTEM): _say.append(words[_lang]['busy']) _say.append(words[_lang]['silence']) self.STATUS[_slot]['_stopTgAnnounce'] = True + #Information services elif _int_dst_id >= 9991 and _int_dst_id <= 9999: self.STATUS[_slot]['_stopTgAnnounce'] = True reactor.callInThread(playFileOnRequest,self,_int_dst_id) #playFileOnRequest(self,_int_dst_id) + if _say: speech = pkt_gen(bytes_3(5000), _nine, bytes_4(9), 1, _say) #call speech in a thread as it contains sleep() and hence could block the reactor reactor.callInThread(sendSpeech,self,speech) + # Mark status variables for use later self.STATUS[_slot]['RX_PEER'] = _peer_id self.STATUS[_slot]['RX_SEQ'] = _seq @@ -2052,28 +2137,35 @@ class routerHBP(HBSYSTEM): self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id self.STATUS[_slot]['VOICE_STREAM'] = _voice_call self.STATUS[_slot]['packets'] = self.STATUS[_slot]['packets'] +1 + #Handle group calls if _call_type == 'group' or _call_type == 'vcsbk': if _int_dst_id == 4000: logger.info('(%s) Group call to TG 4000 received on TS %s - deactivating all dynamic bridges', self._system, _slot) deactivate_all_dynamic_bridges(self._system) return + # Is this a new call stream? if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): + self.STATUS[_slot]['packets'] = 0 self.STATUS[_slot]['loss'] = 0 self.STATUS[_slot]['crcs'] = set() + if (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM) and (pkt_time < (self.STATUS[_slot]['RX_TIME'] + STREAM_TO)) and (_rf_src != self.STATUS[_slot]['RX_RFS']): logger.warning('(%s) Packet received with STREAM ID: %s SUB: %s PEER: %s TGID %s, SLOT %s collided with existing call', self._system, int_id(_stream_id), int_id(_rf_src), int_id(_peer_id), int_id(_dst_id), _slot) return + # This is a new call stream self.STATUS[_slot]['RX_START'] = pkt_time + if _call_type == 'group' : if _dtype_vseq == 6: logger.info('(%s) *DATA HEADER* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('DATA HEADER,DATA,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) + else: logger.info('(%s) *CALL START* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) @@ -2084,19 +2176,24 @@ class routerHBP(HBSYSTEM): self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, _dtype_vseq) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('OTHER DATA,DATA,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) + # If we can, use the LC from the voice header as to keep all options intact if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD: decoded = decode.voice_head_term(dmrpkt) self.STATUS[_slot]['RX_LC'] = decoded['LC'] + # If we don't have a voice header then don't wait to decode it from the Embedded LC # just make a new one from the HBP header. This is good enough, and it saves lots of time else: self.STATUS[_slot]['RX_LC'] = b''.join([LC_OPT,_dst_id,_rf_src]) + #Create default bridge for unknown TG if int_id(_dst_id) >= 5 and int_id(_dst_id) != 9 and int_id(_dst_id) != 4000 and int_id(_dst_id) != 5000 and (str(int_id(_dst_id)) not in BRIDGES): logger.info('(%s) Bridge for TG %s does not exist. Creating as User Activated. Timeout %s',self._system, int_id(_dst_id),CONFIG['SYSTEMS'][self._system]['DEFAULT_UA_TIMER']) make_single_bridge(_dst_id,self._system,_slot,CONFIG['SYSTEMS'][self._system]['DEFAULT_UA_TIMER']) + self.STATUS[_slot]['packets'] = self.STATUS[_slot]['packets'] +1 + if _call_type == 'vcsbk': if _dtype_vseq == 7: logger.info('(%s) *VCSBK 1/2 DATA BLOCK * STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s', \ @@ -2108,18 +2205,22 @@ class routerHBP(HBSYSTEM): self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('VCSBK 3/4 DATA BLOCK,DATA,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) + #Packet rate limit #Rate drop if self.STATUS[_slot]['packets'] > 18 and (self.STATUS[_slot]['packets'] / (pkt_time - self.STATUS[_slot]['RX_START']) > 25): logger.warning("(%s) *PacketControl* RATE DROP! Stream ID:, %s TGID: %s",self._system,int_id(_stream_id),int_id(_dst_id)) self.STATUS[_slot]['LAST'] = pkt_time return + #Timeout if self.STATUS[_slot]['RX_START'] + 180 < pkt_time: if 'LOOPLOG' not in self.STATUS[_slot] or not self.STATUS[_slot]['LOOPLOG']: logger.info("(%s) HBP *SOURCE TIMEOUT* STREAM ID: %s, TG: %s, TS: %s, IGNORE THIS SOURCE",self._system, int_id(_stream_id), int_id(_dst_id),_slot) + self.STATUS[_slot]['LOOPLOG'] = True self.STATUS[_slot]['LAST'] = pkt_time return + #LoopControl# for system in systems: if system == self._system: @@ -2138,10 +2239,12 @@ class routerHBP(HBSYSTEM): logger.debug("(%s) OBP *LoopControl* FIRST OBP %s, STREAM ID: %s, TG %s, IGNORE THIS SOURCE",self._system, system, int_id(_stream_id), int_id(_dst_id)) self.STATUS[_slot]['LOOPLOG'] = True self.STATUS[_slot]['LAST'] = pkt_time + if 'ENHANCED_OBP' in CONFIG['SYSTEMS'][self._system] and CONFIG['SYSTEMS'][self._system]['ENHANCED_OBP'] and '_bcsq' not in self.STATUS[_slot]: systems[self._system].send_bcsq(_dst_id,_stream_id) self.STATUS[_slot]['_bcsq'] = True return + #Duplicate handling# #Duplicate complete packet if self.STATUS[_slot]['lastData'] and self.STATUS[_slot]['lastData'] == _data and _seq > 1: @@ -2167,23 +2270,28 @@ class routerHBP(HBSYSTEM): if _seq and self.STATUS[_slot]['lastSeq'] and _seq > (self.STATUS[_slot]['lastSeq']+1): self.STATUS[_slot]['loss'] += 1 logger.debug("(%s) *PacketControl* Missed packet(s) - last SEQ: %s, this SEQ: %s. Stream ID:, %s TGID: %s ",self._system,self.STATUS[_slot]['lastSeq'],_seq,int_id(_stream_id),int_id(_dst_id)) + #Save this sequence number self.STATUS[_slot]['lastSeq'] = _seq #Save this packet self.STATUS[_slot]['lastData'] = _data + ### MODIFIED: Prioritize routing for the TGID that just created a bridge _sysIgnore = deque() _current_bridge_key = str(int_id(_dst_id)) + # First, explicitly route for the current packet's TGID. # This ensures that if a bridge was just created for this packet, it gets processed immediately. if _current_bridge_key in BRIDGES: for _system in BRIDGES[_current_bridge_key]: if _system['SYSTEM'] == self._system and _system['TGID'] == _dst_id and _system['TS'] == _slot and _system['ACTIVE'] == True: _sysIgnore = self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits, _current_bridge_key, _system, False, _sysIgnore, _source_server, _ber, _rssi, _source_rptr) + # Also check for a corresponding reflector bridge (e.g., #9990) _reflector_bridge_key = ''.join(['#', _current_bridge_key]) if _reflector_bridge_key in BRIDGES: _sysIgnore = self.to_target(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits, _reflector_bridge_key, _system, False, _sysIgnore, _source_server, _ber, _rssi, _source_rptr) + # Now, run the general routing loop for all other bridges to handle cross-connections. # We skip the one we just processed to avoid duplicate work. for _bridge in BRIDGES: @@ -2205,22 +2313,27 @@ class routerHBP(HBSYSTEM): self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration, packet_rate, loss) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) + #Reset back to False self.STATUS[_slot]['lastSeq'] = False self.STATUS[_slot]['lastData'] = False + # # Begin in-band signalling for call end. This has nothign to do with routing traffic directly. # + # Iterate the rules dictionary for _bridge in BRIDGES: if (_bridge[0:1] == '#') and (_int_dst_id != 9): continue for _system in BRIDGES[_bridge]: if _system['SYSTEM'] == self._system: + # TGID matches a rule source, reset its timer if _slot == _system['TS'] and _dst_id == _system['TGID'] and ((_system['TO_TYPE'] == 'ON' and (_system['ACTIVE'] == True)) or (_system['TO_TYPE'] == 'OFF' and _system['ACTIVE'] == False)): _system['TIMER'] = pkt_time + _system['TIMEOUT'] logger.info('(%s) [1] Transmission match for Bridge: %s. Reset timeout to %s', self._system, _bridge, _system['TIMER']) + # TGID matches an ACTIVATION trigger if (_dst_id in _system['ON'] or _dst_id in _system['RESET']) and _slot == _system['TS']: # Set the matching rule as ACTIVE @@ -2237,6 +2350,7 @@ class routerHBP(HBSYSTEM): if _system['ACTIVE'] == True and _system['TO_TYPE'] == 'ON': _system['TIMER'] = pkt_time + _system['TIMEOUT'] logger.info('(%s) [4] Bridge: %s, timeout timer reset to: %s', self._system, _bridge, _system['TIMER'] - pkt_time) + # TGID matches an DE-ACTIVATION trigger #Single TG mode if (CONFIG['SYSTEMS'][self._system]['MODE'] == 'MASTER' and CONFIG['SYSTEMS'][self._system]['SINGLE_MODE']) == True: @@ -2260,9 +2374,10 @@ class routerHBP(HBSYSTEM): _system['TIMER'] = pkt_time logger.info('(%s) [8] Bridge: %s set to ON with and "OFF" timer rule: timeout timer cancelled', self._system, _bridge) else: - # NUEVO COMPORTAMIENTO: SINGLE_MODE=False pero con gestión de bridge único - # Solo desactivar si es TG 4000 o un nuevo TG dinámico (no estático) - # Verificar si el TGID actual es estático + # NUEVO COMPORTAMIENTO: SINGLE_MODE=False pero con gestión de bridge único + # Solo desactivar si es TG 4000 o un nuevo TG dinámico (no estático) + + # Verificar si el TGID actual es estático is_static_tg = False if CONFIG['SYSTEMS'][self._system]['TS1_STATIC'] and _slot == 1: static_tgs = [int(tg) for tg in CONFIG['SYSTEMS'][self._system]['TS1_STATIC'].split(',') if tg.strip()] @@ -2272,10 +2387,13 @@ class routerHBP(HBSYSTEM): static_tgs = [int(tg) for tg in CONFIG['SYSTEMS'][self._system]['TS2_STATIC'].split(',') if tg.strip()] if int_id(_dst_id) in static_tgs: is_static_tg = True + # Verificar si es un reflector (bridge que empieza con #) is_reflector = _bridge[0:1] == '#' - # Desactivar solo si es TG 4000 o un nuevo TG dinámico (no estático ni reflector) + + # Desactivar solo si es TG 4000 o un nuevo TG dinámico (no estático ni reflector) if (_dst_id == bytes_3(4000)) and _slot == _system['TS']: + # Set the matching rule as ACTIVE if _dst_id in _system['OFF'] or _dst_id == bytes_3(4000) or (_dst_id != _system['TGID'] and not is_static_tg and not is_reflector): if _system['ACTIVE'] == True: @@ -2286,6 +2404,7 @@ class routerHBP(HBSYSTEM): logger.info('(%s) [5b] Bridge: %s, connection changed to state: %s (Static TG %s activated)', self._system, _bridge, _system['ACTIVE'], int_id(_dst_id)) else: logger.info('(%s) [5b] Bridge: %s, connection changed to state: %s (New dynamic TG %s activated)', self._system, _bridge, _system['ACTIVE'], int_id(_dst_id)) + # Cancel the timer if we've enabled an "ON" type timeout if _system['TO_TYPE'] == 'ON': _system['TIMER'] = pkt_time @@ -2301,6 +2420,8 @@ class routerHBP(HBSYSTEM): # # END IN-BAND SIGNALLING # + + # Mark status variables for use later self.STATUS[_slot]['RX_PEER'] = _peer_id self.STATUS[_slot]['RX_SEQ'] = _seq @@ -2310,13 +2431,16 @@ class routerHBP(HBSYSTEM): self.STATUS[_slot]['RX_TIME'] = pkt_time self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id self.STATUS[_slot]['crcs'].add(_pkt_crc) + # # Socket-based reporting section # class bridgeReportFactory(reportFactory): + def send_bridge(self): serialized = pickle.dumps(BRIDGES, protocol=2) #.decode("utf-8", errors='ignore') self.send_clients(b''.join([REPORT_OPCODES['BRIDGE_SND'],serialized])) + def send_bridgeEvent(self, _data): if isinstance(_data, str): _data = _data.decode('utf-8', error='ignore') @@ -2326,29 +2450,38 @@ class bridgeReportFactory(reportFactory): # MAIN PROGRAM LOOP STARTS HERE #************************************************ if __name__ == '__main__': + import argparse import sys import os import signal + global CONFIG global KEYS keys = {} + # Higheset peer ID permitted by HBP PEER_MAX = 4294967295 + ID_MAX = 16776415 + #Set process title early setproctitle(__file__) + # Change the current directory to the location of the application os.chdir(os.path.dirname(os.path.realpath(sys.argv[0]))) + # CLI argument parser - handles picking up the config file from the command line, and sending a "help" message parser = argparse.ArgumentParser() parser.add_argument('-c', '--config', action='store', dest='CONFIG_FILE', help='/full/path/to/config.file (usually hblink.cfg)') #parser.add_argument('-r', '--rules', action='store', dest='RULES_FILE', help='/full/path/to/rules.file (usually rules.py)') parser.add_argument('-l', '--logging', action='store', dest='LOG_LEVEL', help='Override config file logging level.') cli_args = parser.parse_args() + # Ensure we have a path for the config file, if one wasn't specified, then use the default (top of file) if not cli_args.CONFIG_FILE: cli_args.CONFIG_FILE = os.path.dirname(os.path.abspath(__file__))+'/hblink.cfg' + #configP = False #if os.path.isfile('config.pkl'): #if os.path.getmtime('config.pkl') > (time() - 25): @@ -2363,23 +2496,26 @@ if __name__ == '__main__': #else: #os.unlink("config.pkl") #else: + CONFIG = config.build_config(cli_args.CONFIG_FILE) + # Ensure we have a path for the rules file, if one wasn't specified, then use the default (top of file) #if not cli_args.RULES_FILE: # cli_args.RULES_FILE = os.path.dirname(os.path.abspath(__file__))+'/rules.py' + # Start the system logger if cli_args.LOG_LEVEL: CONFIG['LOGGER']['LOG_LEVEL'] = cli_args.LOG_LEVEL logger = log.config_logging(CONFIG['LOGGER']) - logger.info(""" -Copyright (c) 2020, 2021, 2022, 2023 Simon G7RZU simon@gb7fr.org.uk -Copyright (c) 2013, 2014, 2015, 2016, 2018, 2019 -\tThe Regents of the K0USY Group. All rights reserved. -""") + logger.info('\n\nCopyright (c) 2020, 2021, 2022, 2023 Simon G7RZU simon@gb7fr.org.uk') + logger.info('Copyright (c) 2013, 2014, 2015, 2016, 2018, 2019\n\tThe Regents of the K0USY Group. All rights reserved.\n') logger.debug('(GLOBAL) Logging system started, anything from here on gets logged') + if CONFIG['ALLSTAR']['ENABLED']: logger.info('(AMI) Setting up AMI: Server: %s, Port: %s, User: %s, Pass: %s, Node: %s',CONFIG['ALLSTAR']['SERVER'],CONFIG['ALLSTAR']['PORT'],CONFIG['ALLSTAR']['USER'],CONFIG['ALLSTAR']['PASS'],CONFIG['ALLSTAR']['NODE']) + AMIOBJ = AMI(CONFIG['ALLSTAR']['SERVER'],CONFIG['ALLSTAR']['PORT'],CONFIG['ALLSTAR']['USER'],CONFIG['ALLSTAR']['PASS'],CONFIG['ALLSTAR']['NODE']) + # Set up the signal handler def sig_handler(_signal, _frame): logger.info('(GLOBAL) SHUTDOWN: CONFBRIDGE IS TERMINATING WITH SIGNAL %s', str(_signal)) @@ -2389,6 +2525,7 @@ Copyright (c) 2013, 2014, 2015, 2016, 2018, 2019 CONFIG['GLOBAL']['_KILL_SERVER'] = True else: exit() + #Server kill routine def kill_server(): try: @@ -2405,19 +2542,24 @@ Copyright (c) 2013, 2014, 2015, 2016, 2018, 2019 logger.error('(GLOBAL) Canot save key file: %s',e) except KeyError: pass + #install signal handlers signal.signal(signal.SIGTERM, sig_handler) signal.signal(signal.SIGINT, sig_handler) + # Create the name-number mapping dictionaries peer_ids, subscriber_ids, talkgroup_ids, local_subscriber_ids, server_ids, checksums = mk_aliases(CONFIG) + #Add special IDs to DB subscriber_ids[900999] = 'D-APRS' subscriber_ids[4294967295] = 'SC' + CONFIG['_SUB_IDS'] = subscriber_ids CONFIG['_PEER_IDS'] = peer_ids CONFIG['_LOCAL_SUBSCRIBER_IDS'] = local_subscriber_ids CONFIG['_SERVER_IDS'] = server_ids CONFIG['CHECKSUMS'] = checksums + # Import the ruiles file as a module, and create BRIDGES from it #spec = importlib.util.spec_from_file_location("module.name", cli_args.RULES_FILE) #rules_module = importlib.util.module_from_spec(spec) @@ -2426,6 +2568,7 @@ Copyright (c) 2013, 2014, 2015, 2016, 2018, 2019 # logger.info('(ROUTER) Routing bridges file found and bridges imported: %s', cli_args.RULES_FILE) #except (ImportError, FileNotFoundError): #sys.exit('(ROUTER) TERMINATING: Routing bridges file not found or invalid: {}'.format(cli_args.RULES_FILE)) + #Load pickle of bridges if it's less than 25 seconds old #if os.path.isfile('bridge.pkl'): #if os.path.getmtime('config.pkl') > (time() - 25): @@ -2440,6 +2583,7 @@ Copyright (c) 2013, 2014, 2015, 2016, 2018, 2019 #BRIDGES = make_bridges(rules_module.BRIDGES) #os.unlink("bridge.pkl") #else: + if 'ECHO' in CONFIG['SYSTEMS'] and CONFIG['SYSTEMS']['ECHO']['MODE'] == 'PEER': BRIDGES = make_bridges({'9990': [{'SYSTEM': 'ECHO', 'TS': 2, 'TGID': 9990, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'NONE', 'ON': [], 'OFF': [], 'RESET': []},]}) else: @@ -2448,6 +2592,7 @@ Copyright (c) 2013, 2014, 2015, 2016, 2018, 2019 #Subscriber map for unit calls - complete with test entry #SUB_MAP = {bytes_3(73578):('REP-1',1,time())} SUB_MAP = {} + if CONFIG['ALIASES']['SUB_MAP_FILE']: try: with open(CONFIG['ALIASES']['PATH'] + CONFIG['ALIASES']['SUB_MAP_FILE'],'rb') as _fh: @@ -2455,8 +2600,10 @@ Copyright (c) 2013, 2014, 2015, 2016, 2018, 2019 except: logger.warning('(SUBSCRIBER) Cannot load SUB_MAP file') #sys.exit('(SUBSCRIBER) TERMINATING: SUB_MAP file not found or invalid') + #Test value #SUB_MAP[bytes_3(73578)] = ('REP-1',1,time()) + #Generator generator = {} systemdelete = deque() @@ -2471,13 +2618,17 @@ Copyright (c) 2013, 2014, 2015, 2016, 2018, 2019 logger.debug('(GLOBAL) Generator - generated system %s',_systemname) generator[_systemname]['_default_options'] systemdelete.append(system) + for _system in generator: CONFIG['SYSTEMS'][_system] = generator[_system] for _system in systemdelete: CONFIG['SYSTEMS'].pop(_system) + del generator del systemdelete + prohibitedTGs = [0,1,2,3,4,5,9,9990,9991,9992,9993,9994,9995,9996,9997,9998,9999] + # Default reflector logger.debug('(ROUTER) Setting default reflectors') for system in CONFIG['SYSTEMS']: @@ -2485,6 +2636,7 @@ Copyright (c) 2013, 2014, 2015, 2016, 2018, 2019 continue if CONFIG['SYSTEMS'][system]['DEFAULT_REFLECTOR'] not in prohibitedTGs: make_default_reflector(CONFIG['SYSTEMS'][system]['DEFAULT_REFLECTOR'],CONFIG['SYSTEMS'][system]['DEFAULT_UA_TIMER'],system) + #static TGs logger.debug('(ROUTER) setting static TGs') for system in CONFIG['SYSTEMS']: @@ -2497,6 +2649,7 @@ Copyright (c) 2013, 2014, 2015, 2016, 2018, 2019 ts1 = CONFIG['SYSTEMS'][system]['TS1_STATIC'].split(',') if CONFIG['SYSTEMS'][system]['TS2_STATIC']: ts2 = CONFIG['SYSTEMS'][system]['TS2_STATIC'].split(',') + for tg in ts1: if not tg: continue @@ -2511,18 +2664,23 @@ Copyright (c) 2013, 2014, 2015, 2016, 2018, 2019 continue tg = int(tg) make_static_tg(tg,2,_tmout,system) + # INITIALIZE THE REPORTING LOOP if CONFIG['REPORTS']['REPORT']: report_server = config_reports(CONFIG, bridgeReportFactory) else: report_server = None logger.info('(REPORT) TCP Socket reporting not configured') + #Read AMBE AMBEobj = readAMBE(CONFIG['GLOBAL']['ANNOUNCEMENT_LANGUAGES'],'./Audio/') + #global words words = AMBEobj.readfiles() + for lang in words.keys(): logger.info('(AMBE) for language %s, read %s words into voice dict',lang,len(words[lang]) - 1) + #Remap words for internationalisation if lang in voiceMap: logger.info('(AMBE) i8n voice map entry for language %s',lang) @@ -2530,9 +2688,12 @@ Copyright (c) 2013, 2014, 2015, 2016, 2018, 2019 for _mapword in _map: logger.info('(AMBE) Mapping \"%s\" to \"%s\"',_mapword,_map[_mapword]) words[lang][_mapword] = words[lang][_map[_mapword]] + # HBlink instance creation logger.info('(GLOBAL) ADN \'bridge_master.py\' -- SYSTEM STARTING...') + listeningPorts = {} + for system in CONFIG['SYSTEMS']: if CONFIG['SYSTEMS'][system]['ENABLED']: if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': @@ -2544,14 +2705,17 @@ Copyright (c) 2013, 2014, 2015, 2016, 2018, 2019 systems[system] = routerHBP(system, CONFIG, report_server) listeningPorts[system] = reactor.listenUDP(CONFIG['SYSTEMS'][system]['PORT'], systems[system], interface=CONFIG['SYSTEMS'][system]['IP']) logger.debug('(GLOBAL) %s instance created: %s, %s', CONFIG['SYSTEMS'][system]['MODE'], system, systems[system]) + def loopingErrHandle(failure): - logger.error('(GLOBAL) STOPPING REACTOR TO AVOID MEMORY LEAK: Unhandled error in timed loop. %s', failure) + logger.error('(GLOBAL) STOPPING REACTOR TO AVOID MEMORY LEAK: Unhandled error in timed loop.\n %s', failure) reactor.stop() + #load keys if exists try: keys = load_json(''.join([CONFIG['ALIASES']['PATH'], CONFIG['ALIASES']['KEYS_FILE']])) except Exception as e: logger.error('(KEYS) Cannot load keys: %s',e) + #Initialize API # if CONFIG['GLOBAL']['ENABLE_API']: # api = config_API(CONFIG,BRIDGES) @@ -2568,54 +2732,67 @@ Copyright (c) 2013, 2014, 2015, 2016, 2018, 2019 # logger.info('(API) System API Key loaded from system key store') # else: # logger.info('(API) API not started') + # Initialize the rule timer -- this if for user activated stuff rule_timer_task = task.LoopingCall(rule_timer_loop) rule_timer = rule_timer_task.start(52) rule_timer.addErrback(loopingErrHandle) + # Initialize the stream trimmer stream_trimmer_task = task.LoopingCall(stream_trimmer_loop) stream_trimmer = stream_trimmer_task.start(5) stream_trimmer.addErrback(loopingErrHandle) + # Ident #This runs in a thread so as not to block the reactor ident_task = task.LoopingCall(threadIdent) identa = ident_task.start(3600) identa.addErrback(loopingErrHandle) + #Alias reloader alias_time = CONFIG['ALIASES']['STALE_TIME'] * 86400 aliasa_task = task.LoopingCall(threadAlias) aliasa = aliasa_task.start(alias_time) aliasa.addErrback(loopingErrHandle) + #Options parsing options_task = task.LoopingCall(options_config) options = options_task.start(26) options.addErrback(loopingErrHandle) + #bridge reset bridge_task = task.LoopingCall(bridge_reset) bridge = bridge_task.start(6) bridge.addErrback(loopingErrHandle) + #STAT trimmer - once every 5 mins (roughly - shifted so all timed tasks don't run at once if CONFIG['GLOBAL']['GEN_STAT_BRIDGES']: stat_trimmer_task = task.LoopingCall(statTrimmer) stat_trimmer = stat_trimmer_task.start(303)#3600 stat_trimmer.addErrback(loopingErrHandle) + #KA Reporting ka_task = task.LoopingCall(kaReporting) ka = ka_task.start(60) ka.addErrback(loopingErrHandle) + #Debug bridges if CONFIG['GLOBAL']['DEBUG_BRIDGES']: debug_bridges_task = task.LoopingCall(bridgeDebug) debug_bridges = debug_bridges_task.start(66) debug_bridges.addErrback(loopingErrHandle) + #Subscriber map trimmer sub_trimmer_task = task.LoopingCall(SubMapTrimmer) sub_trimmer = sub_trimmer_task.start(3600)#3600 sub_trimmer.addErrback(loopingErrHandle) + #Server kill switch checker killserver_task = task.LoopingCall(kill_server) killserver = killserver_task.start(5) killserver.addErrback(loopingErrHandle) + #more threads reactor.suggestThreadPoolSize(100) + reactor.run()