diff --git a/bridge_master.py b/bridge_master.py index 32a87e0..a5ac6ea 100644 --- a/bridge_master.py +++ b/bridge_master.py @@ -1378,6 +1378,50 @@ class routerOBP(OPENBRIDGE): #logger.debug('(%s) Packet routed by bridge: %s to system: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) #Ignore this system and TS pair if it's called again on this packet return(_sysIgnore) + + def sendDataToHBP(self,_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id): + _int_dst_id = int_id(_dst_id) + #Assemble transmit HBP packet header + _tmp_data = b''.join([_data[:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) + _tmp_data = b''.join([_tmp_data, dmrpkt]) + systems[_d_system].send_system(_tmp_data) + logger.info('(%s) UNIT Data Bridged to HBP on slot 1: %s DST_ID: %s',self._system,_d_system,_int_dst_id) + if CONFIG['REPORTS']['REPORT']: + systems[_d_system]._report.send_bridgeEvent('UNIT DATA,START,TX,{},{},{},{},{},{}'.format(_d_system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) + + def sendDataToOBP(self,_target,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits): + + _int_dst_id = int_id(_dst_id) + _target_status = systems[_target].STATUS + _target_system = self._CONFIG['SYSTEMS'][_target] + + + #If target has missed 6 (on 1 min) of keepalives, don't send + if _target_system['ENHANCED_OBP'] and '_bcka' in _target_system and _target_system['_bcka'] < pkt_time - 60: + return + + if (_stream_id not in _target_status): + # This is a new call stream on the target + _target_status[_stream_id] = { + 'START': pkt_time, + 'CONTENTION':False, + 'RFS': _rf_src, + 'TGID': _dst_id, + 'RX_PEER': _peer_id + } + + # Record the time of this packet so we can later identify a stale stream + _target_status[_stream_id]['LAST'] = pkt_time + # Clear the TS bit -- all OpenBridge streams are effectively on TS1 + _tmp_bits = _bits & ~(1 << 7) + #Assemble transmit HBP packet header + _tmp_data = b''.join([_data[:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) + _tmp_data = b''.join([_tmp_data, dmrpkt]) + systems[_target].send_system(_tmp_data) + logger.info('(%s) UNIT Data Bridged to OBP System: %s DST_ID: %s', self._system, _target,_int_dst_id) + if CONFIG['REPORTS']['REPORT']: + systems[system]._report.send_bridgeEvent('UNIT DATA,START,TX,{},{},{},{},{},{}'.format(_target, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) + def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data): pkt_time = time() @@ -1386,9 +1430,64 @@ class routerOBP(OPENBRIDGE): # Match UNIT data, SMS/GPS, and send it to the dst_id if it is in out UNIT_MAP if _call_type == 'unit' and (_dtype_vseq == 6 or _dtype_vseq == 7 or ((_stream_id not in self.STATUS) and _dtype_vseq == 3)): + + _int_dst_id = int_id(_dst_id) ## if ahex(dmrpkt)[27:-27] == b'd5d7f77fd757': # This is a data call _data_call = True + + # Is this a new call stream? + if (_stream_id not in self.STATUS): + + # This is a new call stream + self.STATUS[_stream_id] = { + 'START': pkt_time, + 'CONTENTION':False, + 'RFS': _rf_src, + 'TGID': _dst_id, + '1ST': perf_counter(), + 'lastSeq': False, + 'lastData': False, + 'RX_PEER': _peer_id + + } + + self.STATUS[_stream_id]['LAST'] = pkt_time + + hr_times = {} + for system in systems: + if system != self._system and CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE': + for _sysslot in systems[system].STATUS: + if 'RX_STREAM_ID' in systems[system].STATUS[_sysslot] and _stream_id == systems[system].STATUS[_sysslot]['RX_STREAM_ID']: + if 'LOOPLOG' not in self.STATUS[_stream_id] or not self.STATUS[_stream_id]['LOOPLOG']: + logger.warning("(%s) OBP UNIT *LoopControl* FIRST HBP: %s, STREAM ID: %s, TG: %s, TS: %s, IGNORE THIS SOURCE",self._system, system, int_id(_stream_id), int_id(_dst_id),_sysslot) + self.STATUS[_stream_id]['LOOPLOG'] = True + self.STATUS[_stream_id]['LAST'] = pkt_time + return + else: + if _stream_id in systems[system].STATUS and '1ST' in systems[system].STATUS[_stream_id] and systems[system].STATUS[_stream_id]['TGID'] == _dst_id: + hr_times[system] = systems[system].STATUS[_stream_id]['1ST'] + + #use the minimum perf_counter to ensure + #We always use only the earliest packet + fi = min(hr_times, key=hr_times.get, default = False) + + hr_times = None + + if not fi: + logger.warning("(%s) OBP UNIT *LoopControl* fi is empty for some reason : %s, STREAM ID: %s, TG: %s, TS: %s",self._system, int_id(_stream_id), int_id(_dst_id),_sysslot) + self.STATUS[_stream_id]['LAST'] = pkt_time + return + + if self._system != fi: + if 'LOOPLOG' not in self.STATUS[_stream_id] or not self.STATUS[_stream_id]['LOOPLOG']: + logger.warning("(%s) OBP UNIT *LoopControl* FIRST OBP %s, STREAM ID: %s, TG %s, IGNORE THIS SOURCE",self._system, fi, int_id(_stream_id), int_id(_dst_id)) + self.STATUS[_stream_id]['LOOPLOG'] = True + self.STATUS[_stream_id]['LAST'] = pkt_time + return + + + if _dtype_vseq == 3: logger.info('(%s) *UNIT CSBK* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) DST_ID %s (%s), TS %s', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) @@ -1413,7 +1512,70 @@ class routerOBP(OPENBRIDGE): logger.info('(%s) *UNKNOWN DATA TYPE* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) - + #Send other openbridges + for system in systems: + if system == self._system: + continue + #We only want to send data calls to individual IDs via OpenBridge + if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE' and _int_dst_id >= 1000000: + self.sendDataToOBP(system,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits) + + #If destination ID is in the Subscriber Map + if _dst_id in SUB_MAP: + (_d_system,_d_slot,_d_time) = SUB_MAP[_dst_id] + _dst_slot = systems[_d_system].STATUS[_d_slot] + logger.info('(%s) SUB_MAP matched, System: %s Slot: %s, Time: %s',self._system, _d_system,_d_slot,_d_time) + #If slot is idle for RX and TX + if (_dst_slot['RX_TYPE'] == HBPF_SLT_VTERM) and (_dst_slot['TX_TYPE'] == HBPF_SLT_VTERM) and (time() - _dst_slot['TX_TIME'] > CONFIG['SYSTEMS'][_d_system]['GROUP_HANGTIME']): + #rewrite slot if required + if _slot != _d_slot: + _tmp_bits = _bits ^ 1 << 7 + else: + _tmp_bits = _bits + self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id) + + else: + logger.info('(%s) UNIT Data not bridged to HBP on slot 1 - target busy: %s DST_ID: %s',self._system,_d_system,_int_dst_id) + + elif _int_dst_id == 900999: + if 'D-APRS' in systems and CONFIG['SYSTEMS']['D-APRS']['MODE'] == 'MASTER': + _d_system = 'D-APRS' + _d_slot = _slot + _dst_slot = systems['D-APRS'].STATUS[_slot] + logger.info('(%s) D-APRS ID matched, System: %s Slot: %s',self._system, _d_system,_slot) + #If slot is idle for RX and TX + if (_dst_slot['RX_TYPE'] == HBPF_SLT_VTERM) and (_dst_slot['TX_TYPE'] == HBPF_SLT_VTERM) and (time() - _dst_slot['TX_TIME'] > CONFIG['SYSTEMS'][_d_system]['GROUP_HANGTIME']): + #We will allow the system to use both slots + _tmp_bits = _bits + self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id) + + else: + logger.info('(%s) UNIT Data not bridged to HBP on slot %s - target busy: %s DST_ID: %s',self._system,_d_slot,_d_system,_int_dst_id) + + else: + #If destination ID is logged in as a hotspot + for _d_system in systems: + if CONFIG['SYSTEMS'][_d_system]['MODE'] == 'MASTER': + for _to_peer in CONFIG['SYSTEMS'][_d_system]['PEERS']: + _int_to_peer = int_id(_to_peer) + if (str(_int_to_peer)[:7] == str(_int_dst_id)[:7]): + #(_d_system,_d_slot,_d_time) = SUB_MAP[_dst_id] + _d_slot = 2 + _dst_slot = systems[_d_system].STATUS[_d_slot] + logger.info('(%s) User Peer Hotspot ID matched, System: %s Slot: %s',self._system, _d_system,_d_slot) + #If slot is idle for RX and TX + if (_dst_slot['RX_TYPE'] == HBPF_SLT_VTERM) and (_dst_slot['TX_TYPE'] == HBPF_SLT_VTERM) and (time() - _dst_slot['TX_TIME'] > CONFIG['SYSTEMS'][_d_system]['GROUP_HANGTIME']): + #Always use slot2 for hotspots - many of them are simplex and this + #is the convention + #rewrite slot if required (slot 2 is used on hotspots) + if _slot != 2: + _tmp_bits = _bits ^ 1 << 7 + else: + _tmp_bits = _bits + self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id) + + else: + logger.info('(%s) UNIT Data not bridged to HBP on slot %s - target busy: %s DST_ID: %s',self._system,_d_slot,_d_system,_int_dst_id) if _call_type == 'group' or _call_type == 'vcsbk': # Is this a new call stream? @@ -1785,8 +1947,9 @@ class routerHBP(HBSYSTEM): return _sysIgnore - def sendDataToHBP(self,_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src): + def sendDataToHBP(self,_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id): #Assemble transmit HBP packet header + _int_dst_id = int_id(_dst_id) _tmp_data = b''.join([_data[:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) _tmp_data = b''.join([_tmp_data, dmrpkt]) systems[_d_system].send_system(_tmp_data) @@ -1794,7 +1957,7 @@ class routerHBP(HBSYSTEM): if CONFIG['REPORTS']['REPORT']: systems[_d_system]._report.send_bridgeEvent('UNIT DATA,START,TX,{},{},{},{},{},{}'.format(_d_system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) - def sendDatatoOBP(self,_target,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits): + def sendDataToOBP(self,_target,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits): # _sysIgnore = sysIgnore _int_dst_id = int_id(_dst_id) _target_status = systems[_target].STATUS @@ -1829,8 +1992,6 @@ class routerHBP(HBSYSTEM): if CONFIG['REPORTS']['REPORT']: systems[system]._report.send_bridgeEvent('UNIT DATA,START,TX,{},{},{},{},{},{}'.format(_target, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) - #return(_sysIgnore) - def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data): @@ -1880,35 +2041,14 @@ class routerHBP(HBSYSTEM): #Send to all openbridges # sysIgnore = [] - #Don't forward if ID is local - if _dst_id not in SUB_MAP: - for system in systems: - if system == self._system: - continue - #We only want to send data calls to individual IDs vis OpenBridge - if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE' and _int_dst_id >= 1000000: - #Disabled in master for now - self.sendDatatoOBP(system,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits) - - #Send UNIT data to data gateway - #if CONFIG['GLOBAL']['DATA_GATEWAY'] and (CONFIG['GLOBAL']['DATA_GATEWAY'] in systems) \ - #and CONFIG['SYSTEMS'][CONFIG['GLOBAL']['DATA_GATEWAY']]['MODE'] == 'OPENBRIDGE': - #Clear the TS bit -- all OpenBridge streams are effectively on TS1 - #_tmp_bits = _bits & ~(1 << 7) - #Assemble transmit HBP packet header - #_tmp_data = b''.join([_data[:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]]) - #_tmp_data = b''.join([_tmp_data, dmrpkt]) - #systems[CONFIG['GLOBAL']['DATA_GATEWAY']].send_system(_tmp_data) - #logger.info('(%s) UNIT Data Bridged to DATA_GATEWAY: %s DST_ID: %s', self._system,CONFIG['GLOBAL']['DATA_GATEWAY'],_int_dst_id) - #if CONFIG['REPORTS']['REPORT']: - #systems[system]._report.send_bridgeEvent('UNIT DATA,START,TX,{},{},{},{},{},{}'.format(system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) - #else: - #if not bool(CONFIG['GLOBAL']['DATA_GATEWAY']): - #logger.info('(%s) UNIT Data not Bridged - no DATA_GATEWAY: %s, DST_ID: %s',self._system,_int_dst_id) - #elif CONFIG['GLOBAL']['DATA_GATEWAY'] not in systems: - #logger.warning('(%s) UNIT Data not Bridged - DATA_GATEWAY: %s not valid. DST_ID: %s',self._system, CONFIG['GLOBAL']['DATA_GATEWAY'],_int_dst_id) - #elif CONFIG['SYSTEMS'][CONFIG['GLOBAL']['DATA_GATEWAY']]['MODE'] != 'OPENBRIDGE': - #logger.warning('(%s) UNIT Data not Bridged - DATA_GATEWAY: %s not OPENBRIDGE. DST_ID: %s',self._system, CONFIG['GLOBAL']['DATA_GATEWAY'],_int_dst_id) + for system in systems: + if system == self._system: + continue + #We only want to send data calls to individual IDs via OpenBridge + if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE' and _int_dst_id >= 1000000: + #Disabled in master for now + self.sendDataToOBP(system,_data,dmrpkt,pkt_time,_stream_id,_dst_id,_peer_id,_rf_src,_bits) + #If destination ID is in the Subscriber Map if _dst_id in SUB_MAP: @@ -1922,7 +2062,7 @@ class routerHBP(HBSYSTEM): _tmp_bits = _bits ^ 1 << 7 else: _tmp_bits = _bits - #self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src) + self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id) else: logger.info('(%s) UNIT Data not bridged to HBP on slot 1 - target busy: %s DST_ID: %s',self._system,_d_system,_int_dst_id) @@ -1937,7 +2077,7 @@ class routerHBP(HBSYSTEM): if (_dst_slot['RX_TYPE'] == HBPF_SLT_VTERM) and (_dst_slot['TX_TYPE'] == HBPF_SLT_VTERM) and (time() - _dst_slot['TX_TIME'] > CONFIG['SYSTEMS'][_d_system]['GROUP_HANGTIME']): #We will allow the system to use both slots _tmp_bits = _bits - self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src) + self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id) else: logger.info('(%s) UNIT Data not bridged to HBP on slot %s - target busy: %s DST_ID: %s',self._system,_d_slot,_d_system,_int_dst_id) @@ -1962,7 +2102,7 @@ class routerHBP(HBSYSTEM): _tmp_bits = _bits ^ 1 << 7 else: _tmp_bits = _bits - self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src) + self.sendDataToHBP(_d_system,_d_slot,_dst_id,_tmp_bits,_data,dmrpkt,_rf_src,_stream_id,_peer_id) else: logger.info('(%s) UNIT Data not bridged to HBP on slot %s - target busy: %s DST_ID: %s',self._system,_d_slot,_d_system,_int_dst_id) diff --git a/config.py b/config.py index cecd047..51b6c4e 100755 --- a/config.py +++ b/config.py @@ -145,8 +145,7 @@ def build_config(_config_file): 'GEN_STAT_BRIDGES': config.getboolean(section, 'GEN_STAT_BRIDGES'), 'ALLOW_NULL_PASSPHRASE': config.getboolean(section, 'ALLOW_NULL_PASSPHRASE'), 'ANNOUNCEMENT_LANGUAGES': config.get(section, 'ANNOUNCEMENT_LANGUAGES'), - 'SERVER_ID': config.getint(section, 'SERVER_ID').to_bytes(4, 'big'), - 'DATA_GATEWAY': config.get(section, 'DATA_GATEWAY') + 'SERVER_ID': config.getint(section, 'SERVER_ID').to_bytes(4, 'big') }) if not CONFIG['GLOBAL']['ANNOUNCEMENT_LANGUAGES']: