From aa80f20318d029b957eb0d8a717a8f7fdf3043f5 Mon Sep 17 00:00:00 2001 From: Joaquin Madrid Belando Date: Tue, 2 Jun 2026 21:23:15 +0200 Subject: [PATCH] Talker Alias: update hblink.py --- hblink.py | 95 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/hblink.py b/hblink.py index 8647d49..6590fdf 100755 --- a/hblink.py +++ b/hblink.py @@ -47,6 +47,7 @@ import config from const import * from utils import mk_id_dict, try_download,load_json,blake2bsum from dmr_utils3.utils import int_id, bytes_4 +import talker_alias as ta # Imports for the reporting server import pickle @@ -690,6 +691,9 @@ class HBSYSTEM(DatagramProtocol): self._report = _report self._config = self._CONFIG['SYSTEMS'][self._system] self._laststrid = {1: b'', 2: b''} + # DMR Talker Alias: per-stream inbound DMRA buffers and stream lookup + self._dmra_by_stream = {} + self._dmra_rf_stream = {} # Define shortcuts and generic function names based on the type of system we are @@ -730,6 +734,7 @@ class HBSYSTEM(DatagramProtocol): # Aliased in __init__ to maintenance_loop if system is a master def master_maintenance_loop(self): logger.trace('(%s) Master maintenance loop started', self._system) + self.trim_dmra_streams() remove_list = deque() for peer in self._peers: _this_peer = self._peers[peer] @@ -813,6 +818,80 @@ class HBSYSTEM(DatagramProtocol): # KEEP THE FOLLOWING COMMENTED OUT UNLESS YOU'RE DEBUGGING DEEPLY!!!! #logger.debug('(%s) TX Packet to %s:%s -- %s', self._system, self._config['MASTER_IP'], self._config['MASTER_PORT'], ahex(_packet)) + # ---- DMR Talker Alias (TA) support ------------------------------------ + def note_dmrd_stream(self, _peer_id, _rf_src, _stream_id): + # Associate the latest stream with its RF source so buffered DMRA + # (Talker Alias) packets can be matched to the right call. + try: + self._dmra_rf_stream[(_peer_id, _rf_src)] = (_stream_id, time()) + except Exception: + pass + + def store_dmra_packet(self, _data, _sockaddr=None): + try: + parsed = ta.parse_dmra_packet(_data) + if not parsed: + return + _rf_src, _block_id, _payload = parsed + # Resolve the originating peer from the socket address when possible, + # so the alias is matched to the right peer's stream (not just RID). + _peer_id = None + if _sockaddr is not None and hasattr(self, '_peers'): + for _pid, _pinfo in self._peers.items(): + if _pinfo.get('SOCKADDR') == _sockaddr: + _peer_id = _pid + break + # Find the most recent stream from this RF source (peer-scoped if known) + _stream_id = None + _best_t = -1 + for (_pid, _src), (_sid, _t) in self._dmra_rf_stream.items(): + if _src == _rf_src and (_peer_id is None or _pid == _peer_id) and _t > _best_t: + _stream_id = _sid + _best_t = _t + if _stream_id is None: + _stream_id = _rf_src # fallback key until a DMRD arrives + entry = self._dmra_by_stream.setdefault(_stream_id, {'BLOCKS': {}, 'TIME': time(), 'RFS': _rf_src}) + entry['BLOCKS'][_block_id] = _payload + entry['TIME'] = time() + entry['RFS'] = _rf_src + except Exception: + logger.exception('(%s) Talker Alias store failed', self._system) + + def get_dmra_blocks(self, _stream_id): + entry = self._dmra_by_stream.get(_stream_id) + if entry: + return entry['BLOCKS'] + return None + + def clear_dmra_stream(self, _stream_id): + self._dmra_by_stream.pop(_stream_id, None) + + def trim_dmra_streams(self, _max_age=180): + now = time() + for _sid in [k for k, v in self._dmra_by_stream.items() if now - v.get('TIME', 0) > _max_age]: + self._dmra_by_stream.pop(_sid, None) + for _key in [k for k, v in self._dmra_rf_stream.items() if now - v[1] > _max_age]: + self._dmra_rf_stream.pop(_key, None) + + def send_dmra_to_peers(self, _packets, _exclude=None): + if not hasattr(self, '_peers'): + return + for _peer in self._peers: + if _exclude is not None and _peer == _exclude: + continue + for _pkt in _packets: + self.transport.write(_pkt, self._peers[_peer]['SOCKADDR']) + + def send_dmra_system(self, _packets): + try: + if self._config['MODE'] == 'MASTER': + self.send_dmra_to_peers(_packets) + else: + for _pkt in _packets: + self.transport.write(_pkt, self._config['MASTER_SOCKADDR']) + except Exception: + logger.exception('(%s) Talker Alias send failed', self._system) + def send_xlxmaster(self, radio, xlx, mastersock): radio3 = int.from_bytes(radio, 'big').to_bytes(3, 'big') radio4 = int.from_bytes(radio, 'big').to_bytes(4, 'big') @@ -958,6 +1037,20 @@ class HBSYSTEM(DatagramProtocol): self.transport.write(b''.join(pkt), self._peers[_peer]['SOCKADDR']) #logger.debug('(%s) Packet on TS%s from %s (%s) for destination ID %s repeated to peer: %s (%s) [Stream ID: %s]', self._system, _slot, self._peers[_peer_id]['CALLSIGN'], int_id(_peer_id), int_id(_dst_id), self._peers[_peer]['CALLSIGN'], int_id(_peer), int_id(_stream_id)) + # DMR Talker Alias handling (opt-in via GLOBAL TALKER_ALIAS) + if ta.ta_enabled(self._CONFIG, self._system): + self.note_dmrd_stream(_peer_id, _rf_src, _stream_id) + if self._config['REPEAT'] == True and _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD: + try: + _resolved = ta.resolve_ta(self._CONFIG, self._system, _rf_src, self.get_dmra_blocks(_stream_id)) + _ta_pkts = ta.ta_dmra_packets(_rf_src, _resolved) + if _ta_pkts: + self.send_dmra_to_peers(_ta_pkts, _exclude=_peer_id) + except Exception: + logger.exception('(%s) Talker Alias local repeat failed', self._system) + if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VTERM: + self.clear_dmra_stream(_stream_id) + # Userland actions -- typically this is the function you subclass for an application self.dmrd_received(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data) @@ -1138,6 +1231,8 @@ class HBSYSTEM(DatagramProtocol): elif _command == DMRA: _peer_id = _data[4:8] logger.debug('(%s) Peer has sent Talker Alias packet %s', self._system, _data) + if ta.ta_enabled(self._CONFIG, self._system): + self.store_dmra_packet(_data, _sockaddr) elif _command == PRIN: logger.info('(%s) *ProxyInfo* Connection from IP:Port: %s', self._system, _data.decode('utf8')[4:])