diff --git a/bridge_master.py b/bridge_master.py index eae9af7..2d55088 100644 --- a/bridge_master.py +++ b/bridge_master.py @@ -71,6 +71,7 @@ import log from const import * from mk_voice import pkt_gen from utils import load_json, save_json +import talker_alias as ta #from voice_lib import words #Read voices @@ -838,6 +839,11 @@ def threadAlias(): def setAlias(_peer_ids,_subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums): peer_ids, subscriber_ids, talkgroup_ids,local_subscriber_ids,server_ids,checksums = _peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids,_server_ids,_checksums + # Build Talker Alias subscriber profiles (id -> {callsign}) for inject mode + try: + CONFIG['_TA_PROFILES'] = {rid: {'callsign': cs} for rid, cs in (_subscriber_ids or {}).items()} + except Exception: + CONFIG['_TA_PROFILES'] = {} def aliasb(): _peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums = mk_aliases(CONFIG) @@ -1985,6 +1991,12 @@ class routerOBP(OPENBRIDGE): _target_status[_stream_id]['H_LC'] = bptc.encode_header_lc(dst_lc) _target_status[_stream_id]['T_LC'] = bptc.encode_terminator_lc(dst_lc) _target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(dst_lc) + try: + if ta.ta_enabled(self._CONFIG, self._system): + _ta_blocks = self.get_dmra_blocks(_stream_id) if hasattr(self, 'get_dmra_blocks') else None + ta.init_embed_state(_target_status[_stream_id], self._CONFIG, self._system, _rf_src, _stream_id, _ta_blocks) + except Exception: + logger.exception('(%s) Talker Alias embed init (OBP target) failed', self._system) logger.debug('(%s) Conference Bridge: %s, Call Bridged to OBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) if CONFIG['REPORTS']['REPORT']: @@ -2039,7 +2051,14 @@ class routerOBP(OPENBRIDGE): systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) # Create a Burst B-E packet (Embedded LC) elif _dtype_vseq in [1,2,3,4]: - dmrbits = dmrbits[0:116] + _target_status[_stream_id]['EMB_LC'][_dtype_vseq] + dmrbits[148:264] + _ta_st = _target_status[_stream_id] + if _ta_st.get('TA_EMB'): + try: + dmrbits = ta.rewrite_embed_lc(dmrbits, _ta_st, _dtype_vseq, 'EMB_LC') + except Exception: + dmrbits = dmrbits[0:116] + _ta_st['EMB_LC'][_dtype_vseq] + dmrbits[148:264] + else: + dmrbits = dmrbits[0:116] + _ta_st['EMB_LC'][_dtype_vseq] + dmrbits[148:264] dmrpkt = dmrbits.tobytes() _tmp_data = b''.join([_tmp_data, dmrpkt]) @@ -2087,6 +2106,16 @@ class routerOBP(OPENBRIDGE): _target_status[_target['TS']]['TX_H_LC'] = bptc.encode_header_lc(dst_lc) _target_status[_target['TS']]['TX_T_LC'] = bptc.encode_terminator_lc(dst_lc) _target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc) + try: + if ta.ta_enabled(self._CONFIG, self._system): + _ta_blocks = self.get_dmra_blocks(_stream_id) if hasattr(self, 'get_dmra_blocks') else None + _ta_resolved = ta.init_embed_state(_target_status[_target['TS']], self._CONFIG, self._system, _rf_src, _stream_id, _ta_blocks) + if _target_system['MODE'] in ('MASTER', 'PEER'): + _ta_pkts = ta.ta_dmra_packets(_rf_src, _ta_resolved) + if _ta_pkts and hasattr(systems[_target['SYSTEM']], 'send_dmra_system'): + systems[_target['SYSTEM']].send_dmra_system(_ta_pkts) + except Exception: + logger.exception('(%s) Talker Alias embed init (HBP target) failed', self._system) logger.debug('(%s) Generating TX FULL and EMB LCs for HomeBrew destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) logger.debug('(%s) Conference Bridge: %s, Call Bridged to HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) if CONFIG['REPORTS']['REPORT']: @@ -2121,7 +2150,14 @@ class routerOBP(OPENBRIDGE): systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) # Create a Burst B-E packet (Embedded LC) elif _dtype_vseq in [1,2,3,4]: - dmrbits = dmrbits[0:116] + _target_status[_target['TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264] + _ta_st = _target_status[_target['TS']] + if _ta_st.get('TA_EMB'): + try: + dmrbits = ta.rewrite_embed_lc(dmrbits, _ta_st, _dtype_vseq, 'TX_EMB_LC') + except Exception: + dmrbits = dmrbits[0:116] + _ta_st['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264] + else: + dmrbits = dmrbits[0:116] + _ta_st['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264] dmrpkt = dmrbits.tobytes() #_tmp_data = b''.join([_tmp_data, dmrpkt, b'\x00\x00']) # Add two bytes of nothing since OBP doesn't include BER & RSSI bytes #_data[53:55] _tmp_data = b''.join([_tmp_data, dmrpkt]) @@ -2653,6 +2689,12 @@ class routerHBP(HBSYSTEM): _target_status[_stream_id]['H_LC'] = bptc.encode_header_lc(dst_lc) _target_status[_stream_id]['T_LC'] = bptc.encode_terminator_lc(dst_lc) _target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(dst_lc) + try: + if ta.ta_enabled(self._CONFIG, self._system): + _ta_blocks = self.get_dmra_blocks(_stream_id) if hasattr(self, 'get_dmra_blocks') else None + ta.init_embed_state(_target_status[_stream_id], self._CONFIG, self._system, _rf_src, _stream_id, _ta_blocks) + except Exception: + logger.exception('(%s) Talker Alias embed init (OBP target) failed', self._system) logger.debug('(%s) Conference Bridge: %s, Call Bridged to OBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) if CONFIG['REPORTS']['REPORT']: @@ -2695,7 +2737,14 @@ class routerHBP(HBSYSTEM): systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) # Create a Burst B-E packet (Embedded LC) elif _dtype_vseq in [1,2,3,4]: - dmrbits = dmrbits[0:116] + _target_status[_stream_id]['EMB_LC'][_dtype_vseq] + dmrbits[148:264] + _ta_st = _target_status[_stream_id] + if _ta_st.get('TA_EMB'): + try: + dmrbits = ta.rewrite_embed_lc(dmrbits, _ta_st, _dtype_vseq, 'EMB_LC') + except Exception: + dmrbits = dmrbits[0:116] + _ta_st['EMB_LC'][_dtype_vseq] + dmrbits[148:264] + else: + dmrbits = dmrbits[0:116] + _ta_st['EMB_LC'][_dtype_vseq] + dmrbits[148:264] dmrpkt = dmrbits.tobytes() _tmp_data = b''.join([_tmp_data, dmrpkt]) @@ -2739,6 +2788,16 @@ class routerHBP(HBSYSTEM): _target_status[_target['TS']]['TX_H_LC'] = bptc.encode_header_lc(dst_lc) _target_status[_target['TS']]['TX_T_LC'] = bptc.encode_terminator_lc(dst_lc) _target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc) + try: + if ta.ta_enabled(self._CONFIG, self._system): + _ta_blocks = self.get_dmra_blocks(_stream_id) if hasattr(self, 'get_dmra_blocks') else None + _ta_resolved = ta.init_embed_state(_target_status[_target['TS']], self._CONFIG, self._system, _rf_src, _stream_id, _ta_blocks) + if _target_system['MODE'] in ('MASTER', 'PEER'): + _ta_pkts = ta.ta_dmra_packets(_rf_src, _ta_resolved) + if _ta_pkts and hasattr(systems[_target['SYSTEM']], 'send_dmra_system'): + systems[_target['SYSTEM']].send_dmra_system(_ta_pkts) + except Exception: + logger.exception('(%s) Talker Alias embed init (HBP target) failed', self._system) logger.debug('(%s) Generating TX FULL and EMB LCs for HomeBrew destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) logger.debug('(%s) Conference Bridge: %s, Call Bridged to HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID'])) if CONFIG['REPORTS']['REPORT']: @@ -2773,7 +2832,14 @@ class routerHBP(HBSYSTEM): systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore')) # Create a Burst B-E packet (Embedded LC) elif _dtype_vseq in [1,2,3,4]: - dmrbits = dmrbits[0:116] + _target_status[_target['TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264] + _ta_st = _target_status[_target['TS']] + if _ta_st.get('TA_EMB'): + try: + dmrbits = ta.rewrite_embed_lc(dmrbits, _ta_st, _dtype_vseq, 'TX_EMB_LC') + except Exception: + dmrbits = dmrbits[0:116] + _ta_st['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264] + else: + dmrbits = dmrbits[0:116] + _ta_st['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264] try: dmrpkt = dmrbits.tobytes() except AttributeError: diff --git a/config.py b/config.py index 6c36e4d..a1be2fd 100755 --- a/config.py +++ b/config.py @@ -159,6 +159,9 @@ def build_config(_config_file): 'VALIDATE_SERVER_IDS': config.getboolean(section, 'VALIDATE_SERVER_IDS', fallback=True), 'DEBUG_BRIDGES' : config.getboolean(section, 'DEBUG_BRIDGES', fallback=False), 'ENABLE_API' : config.getboolean(section, 'ENABLE_API', fallback=False), + 'TALKER_ALIAS': config.getboolean(section, 'TALKER_ALIAS', fallback=False), + 'TALKER_ALIAS_MODE': config.get(section, 'TALKER_ALIAS_MODE', fallback='both'), + 'TALKER_ALIAS_FORMAT': config.get(section, 'TALKER_ALIAS_FORMAT', fallback='{callsign} {fname}'), }) if not CONFIG['GLOBAL']['ANNOUNCEMENT_LANGUAGES']: CONFIG['GLOBAL']['ANNOUNCEMENT_LANGUAGES'] = languages diff --git a/config/ADN-SAMPLE-commented.cfg b/config/ADN-SAMPLE-commented.cfg index debc536..ce4f959 100755 --- a/config/ADN-SAMPLE-commented.cfg +++ b/config/ADN-SAMPLE-commented.cfg @@ -17,15 +17,15 @@ # The 'action' May be PERMIT|DENY # Each entry may be a single radio id, or a hypenated range (e.g. 1-2999) # Format: -# ACL = 'action:id|start-end|,id|start-end,....' -# --for example-- -# SUB_ACL: DENY:1,1000-2000,4500-60000,17 +# ACL = 'action:id|start-end|,id|start-end,....' +# --for example-- +# SUB_ACL: DENY:1,1000-2000,4500-60000,17 # # ACL Types: -# REG_ACL: peer radio IDs for registration (only used on HBP master systems) -# SUB_ACL: subscriber IDs for end-users -# TGID_TS1_ACL: destination talkgroup IDs on Timeslot 1 -# TGID_TS2_ACL: destination talkgroup IDs on Timeslot 2 +# REG_ACL: peer radio IDs for registration (only used on HBP master systems) +# SUB_ACL: subscriber IDs for end-users +# TGID_TS1_ACL: destination talkgroup IDs on Timeslot 1 +# TGID_TS2_ACL: destination talkgroup IDs on Timeslot 2 # # ACLs may be repeated for individual systems if needed for granularity # Global ACLs will be processed BEFORE the system level ACLs @@ -51,6 +51,15 @@ SERVER_ID: 0000 DATA_GATEWAY: False VALIDATE_SERVER_IDS: True +# DMR Talker Alias (apagado por defecto) +# TALKER_ALIAS - True para activar el reenvio/insercion de Talker Alias (DMRA + LC embebida) +# TALKER_ALIAS_MODE - passthrough (solo reenvia el TA recibido), inject (genera el TA desde el alias del suscriptor) +# o both (reenvia si llega completo, si no lo genera). Por defecto: both +# TALKER_ALIAS_FORMAT - plantilla del texto generado en modo inject/both. Campos: {callsign} {fname} {surname} {id} +TALKER_ALIAS: False +TALKER_ALIAS_MODE: both +TALKER_ALIAS_FORMAT: {callsign} {fname} + # Servidor de seguridad centralizado URL_SECURITY: PORT_SECURITY: diff --git a/config/ADN-SAMPLE.cfg b/config/ADN-SAMPLE.cfg index 1171db0..21f50c6 100755 --- a/config/ADN-SAMPLE.cfg +++ b/config/ADN-SAMPLE.cfg @@ -13,6 +13,11 @@ SERVER_ID: 0000 DATA_GATEWAY: False VALIDATE_SERVER_IDS: True +# DMR Talker Alias (apagado por defecto). MODE: passthrough | inject | both +TALKER_ALIAS: False +TALKER_ALIAS_MODE: both +TALKER_ALIAS_FORMAT: {callsign} {fname} + # Servidor de seguridad centralizado URL_SECURITY: PORT_SECURITY: diff --git a/config/adn.cfg b/config/adn.cfg index 6c6facc..9844e22 100644 --- a/config/adn.cfg +++ b/config/adn.cfg @@ -13,6 +13,13 @@ SERVER_ID: 0000 DATA_GATEWAY: False VALIDATE_SERVER_IDS: True +# DMR Talker Alias (apagado por defecto). Pon True para activarlo. +# TALKER_ALIAS_MODE: passthrough | inject | both +# TALKER_ALIAS_FORMAT: plantilla con campos {callsign} {fname} {surname} {id} +TALKER_ALIAS: False +TALKER_ALIAS_MODE: both +TALKER_ALIAS_FORMAT: {callsign} {fname} + # Servidor de seguridad centralizado URL_SECURITY: PORT_SECURITY: diff --git a/hblink.py b/hblink.py index 8647d49..6590fdf 100755 --- a/hblink.py +++ b/hblink.py @@ -47,6 +47,7 @@ import config from const import * from utils import mk_id_dict, try_download,load_json,blake2bsum from dmr_utils3.utils import int_id, bytes_4 +import talker_alias as ta # Imports for the reporting server import pickle @@ -690,6 +691,9 @@ class HBSYSTEM(DatagramProtocol): self._report = _report self._config = self._CONFIG['SYSTEMS'][self._system] self._laststrid = {1: b'', 2: b''} + # DMR Talker Alias: per-stream inbound DMRA buffers and stream lookup + self._dmra_by_stream = {} + self._dmra_rf_stream = {} # Define shortcuts and generic function names based on the type of system we are @@ -730,6 +734,7 @@ class HBSYSTEM(DatagramProtocol): # Aliased in __init__ to maintenance_loop if system is a master def master_maintenance_loop(self): logger.trace('(%s) Master maintenance loop started', self._system) + self.trim_dmra_streams() remove_list = deque() for peer in self._peers: _this_peer = self._peers[peer] @@ -813,6 +818,80 @@ class HBSYSTEM(DatagramProtocol): # KEEP THE FOLLOWING COMMENTED OUT UNLESS YOU'RE DEBUGGING DEEPLY!!!! #logger.debug('(%s) TX Packet to %s:%s -- %s', self._system, self._config['MASTER_IP'], self._config['MASTER_PORT'], ahex(_packet)) + # ---- DMR Talker Alias (TA) support ------------------------------------ + def note_dmrd_stream(self, _peer_id, _rf_src, _stream_id): + # Associate the latest stream with its RF source so buffered DMRA + # (Talker Alias) packets can be matched to the right call. + try: + self._dmra_rf_stream[(_peer_id, _rf_src)] = (_stream_id, time()) + except Exception: + pass + + def store_dmra_packet(self, _data, _sockaddr=None): + try: + parsed = ta.parse_dmra_packet(_data) + if not parsed: + return + _rf_src, _block_id, _payload = parsed + # Resolve the originating peer from the socket address when possible, + # so the alias is matched to the right peer's stream (not just RID). + _peer_id = None + if _sockaddr is not None and hasattr(self, '_peers'): + for _pid, _pinfo in self._peers.items(): + if _pinfo.get('SOCKADDR') == _sockaddr: + _peer_id = _pid + break + # Find the most recent stream from this RF source (peer-scoped if known) + _stream_id = None + _best_t = -1 + for (_pid, _src), (_sid, _t) in self._dmra_rf_stream.items(): + if _src == _rf_src and (_peer_id is None or _pid == _peer_id) and _t > _best_t: + _stream_id = _sid + _best_t = _t + if _stream_id is None: + _stream_id = _rf_src # fallback key until a DMRD arrives + entry = self._dmra_by_stream.setdefault(_stream_id, {'BLOCKS': {}, 'TIME': time(), 'RFS': _rf_src}) + entry['BLOCKS'][_block_id] = _payload + entry['TIME'] = time() + entry['RFS'] = _rf_src + except Exception: + logger.exception('(%s) Talker Alias store failed', self._system) + + def get_dmra_blocks(self, _stream_id): + entry = self._dmra_by_stream.get(_stream_id) + if entry: + return entry['BLOCKS'] + return None + + def clear_dmra_stream(self, _stream_id): + self._dmra_by_stream.pop(_stream_id, None) + + def trim_dmra_streams(self, _max_age=180): + now = time() + for _sid in [k for k, v in self._dmra_by_stream.items() if now - v.get('TIME', 0) > _max_age]: + self._dmra_by_stream.pop(_sid, None) + for _key in [k for k, v in self._dmra_rf_stream.items() if now - v[1] > _max_age]: + self._dmra_rf_stream.pop(_key, None) + + def send_dmra_to_peers(self, _packets, _exclude=None): + if not hasattr(self, '_peers'): + return + for _peer in self._peers: + if _exclude is not None and _peer == _exclude: + continue + for _pkt in _packets: + self.transport.write(_pkt, self._peers[_peer]['SOCKADDR']) + + def send_dmra_system(self, _packets): + try: + if self._config['MODE'] == 'MASTER': + self.send_dmra_to_peers(_packets) + else: + for _pkt in _packets: + self.transport.write(_pkt, self._config['MASTER_SOCKADDR']) + except Exception: + logger.exception('(%s) Talker Alias send failed', self._system) + def send_xlxmaster(self, radio, xlx, mastersock): radio3 = int.from_bytes(radio, 'big').to_bytes(3, 'big') radio4 = int.from_bytes(radio, 'big').to_bytes(4, 'big') @@ -958,6 +1037,20 @@ class HBSYSTEM(DatagramProtocol): self.transport.write(b''.join(pkt), self._peers[_peer]['SOCKADDR']) #logger.debug('(%s) Packet on TS%s from %s (%s) for destination ID %s repeated to peer: %s (%s) [Stream ID: %s]', self._system, _slot, self._peers[_peer_id]['CALLSIGN'], int_id(_peer_id), int_id(_dst_id), self._peers[_peer]['CALLSIGN'], int_id(_peer), int_id(_stream_id)) + # DMR Talker Alias handling (opt-in via GLOBAL TALKER_ALIAS) + if ta.ta_enabled(self._CONFIG, self._system): + self.note_dmrd_stream(_peer_id, _rf_src, _stream_id) + if self._config['REPEAT'] == True and _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD: + try: + _resolved = ta.resolve_ta(self._CONFIG, self._system, _rf_src, self.get_dmra_blocks(_stream_id)) + _ta_pkts = ta.ta_dmra_packets(_rf_src, _resolved) + if _ta_pkts: + self.send_dmra_to_peers(_ta_pkts, _exclude=_peer_id) + except Exception: + logger.exception('(%s) Talker Alias local repeat failed', self._system) + if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VTERM: + self.clear_dmra_stream(_stream_id) + # Userland actions -- typically this is the function you subclass for an application self.dmrd_received(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data) @@ -1138,6 +1231,8 @@ class HBSYSTEM(DatagramProtocol): elif _command == DMRA: _peer_id = _data[4:8] logger.debug('(%s) Peer has sent Talker Alias packet %s', self._system, _data) + if ta.ta_enabled(self._CONFIG, self._system): + self.store_dmra_packet(_data, _sockaddr) elif _command == PRIN: logger.info('(%s) *ProxyInfo* Connection from IP:Port: %s', self._system, _data.decode('utf8')[4:]) diff --git a/talker_alias.py b/talker_alias.py new file mode 100644 index 0000000..73dcc3f --- /dev/null +++ b/talker_alias.py @@ -0,0 +1,360 @@ +############################################################################### +# ADN DMR Peer Server - DMR Talker Alias (ETSI / MMDVMHost) +# +# Talker Alias (ETSI TS 102 361-2): short alphanumeric label carried in the +# DMR voice stream. Two transports are supported here: +# 1) Standalone HBP "DMRA" UDP packets (15 bytes, up to 4 blocks per TX). +# 2) Embedded LC inside the forwarded DMRD voice (FLCO 4-7, bursts B-E), +# which is what MMDVMHost / Pi-Star actually decode. +# +# Ported from the clean-architecture implementation in the ce5rpy fork to +# this monolithic codebase. UTF-8 encoding (format 2), same as MMDVMHost +# DMRTA.cpp. Everything here is opt-in via the GLOBAL TALKER_ALIAS switch; +# when disabled the bridge forwarding path behaves exactly as before. +############################################################################### + +TALKER_ALIAS_MAX_LEN = 29 + +TA_FORMAT_7BIT = 0 +TA_FORMAT_ISO8 = 1 +TA_FORMAT_UTF8 = 2 + +DMRA_OPCODE = b'DMRA' +DMRA_PACKET_LEN = 15 +DMRA_BLOCK_COUNT = 4 +DMRA_PAYLOAD_LEN = 7 +DMRA_BUF_LEN = DMRA_BLOCK_COUNT * DMRA_PAYLOAD_LEN # 28 + +FLCO_TALKER_ALIAS_HEADER = 4 # FLCO 4 = TA block 0; 5,6,7 = blocks 1,2,3 + +VALID_MODES = frozenset({'inject', 'passthrough', 'both'}) + + +def _int_id(_bytes): + return int.from_bytes(_bytes, 'big') + + +# --------------------------------------------------------------------------- +# Encode / decode +# --------------------------------------------------------------------------- + +def truncate_talker_alias(text): + """Hard-cap at protocol maximum (not user-configurable).""" + if len(text) <= TALKER_ALIAS_MAX_LEN: + return text + return text[:TALKER_ALIAS_MAX_LEN] + + +def decode_ta(buf): + """Decode a 28-byte TA buffer (formats 0-3). Mirrors MMDVMHost CDMRTA::decodeTA.""" + if len(buf) < 1: + return '' + ta_format = (buf[0] >> 6) & 0x03 + ta_size = (buf[0] >> 1) & 0x1F + if ta_format == TA_FORMAT_ISO8 or ta_format == TA_FORMAT_UTF8: + raw = buf[1:1 + ta_size] + return raw.decode('latin-1', errors='replace') + if ta_format != TA_FORMAT_7BIT: + return '' + out = bytearray() + t1 = 0 + t2 = 0 + c = 0 + for i in range(min(32, len(buf))): + if t2 >= ta_size: + break + for j in range(7, -1, -1): + c = ((c << 1) | ((buf[i] >> j) & 1)) & 0xFF + t1 += 1 + if t1 == 7: + if i > 0: + out.append(c & 0x7F) + t2 += 1 + if t2 >= ta_size: + break + t1 = 0 + c = 0 + return out[:ta_size].decode('ascii', errors='replace') + + +def encode_utf8(text): + """Encode text into a 28-byte TA buffer (format 2 / UTF-8).""" + text = truncate_talker_alias(text) + raw = text.encode('utf-8') + if len(raw) > 27: + raw = text.encode('utf-8')[:27].decode('utf-8', errors='ignore').encode('utf-8') + size = len(raw) + header = (TA_FORMAT_UTF8 << 6) | (size << 1) | 0 + buf = bytearray(DMRA_BUF_LEN) + buf[0] = header + buf[1:1 + size] = raw + return bytes(buf) + + +def blocks_from_buffer(buf): + """Split a 28-byte encoded buffer into four 7-byte DMRA payloads.""" + buf = buf.ljust(DMRA_BUF_LEN, b'\x00')[:DMRA_BUF_LEN] + return [buf[i * DMRA_PAYLOAD_LEN:(i + 1) * DMRA_PAYLOAD_LEN] for i in range(DMRA_BLOCK_COUNT)] + + +def required_ta_block_count(buf): + """Number of blocks 1-4 actually needed (skip trailing zero payloads).""" + blocks = blocks_from_buffer(buf) + last = 0 + for i in range(DMRA_BLOCK_COUNT): + if blocks[i] != b'\x00' * DMRA_PAYLOAD_LEN: + last = i + return max(1, last + 1) + + +def buffer_from_blocks(blocks): + """Merge up to four block payloads (dict block_id->payload) into a 28-byte buffer.""" + buf = bytearray(DMRA_BUF_LEN) + for block_id, payload in blocks.items(): + if 0 <= block_id < DMRA_BLOCK_COUNT and payload: + start = block_id * DMRA_PAYLOAD_LEN + buf[start:start + DMRA_PAYLOAD_LEN] = payload[:DMRA_PAYLOAD_LEN] + return bytes(buf) + + +# --------------------------------------------------------------------------- +# Standalone DMRA packet builders / parser +# --------------------------------------------------------------------------- + +def build_dmra_packets(rf_src, text): + """Build the 1-4 HBP DMRA packets needed to carry 'text' (server injection).""" + rf = rf_src[:3] if len(rf_src) >= 3 else rf_src.ljust(3, b'\x00')[:3] + encoded = encode_utf8(text) + blocks = blocks_from_buffer(encoded) + count = required_ta_block_count(encoded) + packets = [] + for block_id in range(count): + packets.append(DMRA_OPCODE + rf + bytes([block_id]) + blocks[block_id]) + return packets + + +def build_dmra_packet(rf_src, block_id, payload): + """Build one 15-byte DMRA packet (pass-through of a buffered block).""" + rf = rf_src[:3] if len(rf_src) >= 3 else rf_src.ljust(3, b'\x00')[:3] + block = max(0, min(3, int(block_id))) + pl = payload[:DMRA_PAYLOAD_LEN].ljust(DMRA_PAYLOAD_LEN, b'\x00') + return DMRA_OPCODE + rf + bytes([block]) + pl + + +def parse_dmra_packet(data): + """Parse a DMRA packet; returns (rf_src, block_id, payload) or None.""" + if len(data) < DMRA_PACKET_LEN or data[:4] != DMRA_OPCODE: + return None + rf_src = data[4:7] + block_id = data[7] + payload = data[8:15] + return rf_src, block_id, payload + + +# --------------------------------------------------------------------------- +# Embedded LC (FLCO 4-7) builders +# --------------------------------------------------------------------------- + +def talker_alias_lc_bytes(block_id, payload7): + """9-byte embedded LC for one TA fragment (FLCO 4-7 + FID 0x00 + 7 payload).""" + block = max(0, min(3, int(block_id))) + payload = payload7[:DMRA_PAYLOAD_LEN].ljust(DMRA_PAYLOAD_LEN, b'\x00') + return bytes([FLCO_TALKER_ALIAS_HEADER + block, 0x00]) + payload + + +def encode_talker_alias_emblc(text): + """Embedded-LC dicts for TA blocks 0..N-1 plus block count N (1-4).""" + from dmr_utils3 import bptc + encoded = encode_utf8(text) + blocks = blocks_from_buffer(encoded) + count = required_ta_block_count(encoded) + emblcs = [bptc.encode_emblc(talker_alias_lc_bytes(i, blocks[i])) for i in range(count)] + return emblcs, count + + +def encode_talker_alias_emblc_from_blocks(blocks): + """Build embedded TA LC dicts from buffered (passthrough) DMRA payloads.""" + from dmr_utils3 import bptc + buf = buffer_from_blocks(blocks) + buf_blocks = blocks_from_buffer(buf) + count = required_ta_block_count(buf) + emblcs = [bptc.encode_emblc(talker_alias_lc_bytes(i, buf_blocks[i])) for i in range(count)] + return emblcs, count + + +# --------------------------------------------------------------------------- +# Policy: settings, text formatting, inject vs passthrough +# --------------------------------------------------------------------------- + +def ta_settings(CONFIG, system_name=None): + """Effective Talker Alias settings (GLOBAL with optional per-system override).""" + g = CONFIG.get('GLOBAL', {}) + sys_cfg = CONFIG.get('SYSTEMS', {}).get(system_name, {}) if system_name else {} + enabled = sys_cfg.get('TALKER_ALIAS') + if enabled is None: + enabled = g.get('TALKER_ALIAS', False) + mode = sys_cfg.get('TALKER_ALIAS_MODE') + if mode is None: + mode = g.get('TALKER_ALIAS_MODE', 'both') + if mode not in VALID_MODES: + mode = 'both' + fmt = sys_cfg.get('TALKER_ALIAS_FORMAT') + if fmt is None: + fmt = g.get('TALKER_ALIAS_FORMAT', '{callsign} {fname}') + return {'enabled': bool(enabled), 'mode': mode, 'format': str(fmt)} + + +def ta_enabled(CONFIG, system_name=None): + return ta_settings(CONFIG, system_name)['enabled'] + + +def format_ta_text(CONFIG, rf_src): + """Build the display string from the subscriber profile + template.""" + settings = ta_settings(CONFIG) + template = settings['format'] + rid = _int_id(rf_src) + profiles = CONFIG.get('_TA_PROFILES', {}) + profile = profiles.get(rid, {}) + callsign = profile.get('callsign') or '' + fname = profile.get('fname') or '' + surname = profile.get('surname') or '' + if profile.get('talker_alias'): + text = str(profile['talker_alias']) + else: + try: + text = template.format(callsign=callsign, fname=fname, surname=surname, id=rid) + except (KeyError, ValueError, IndexError): + text = callsign or 'DMR ID:{}'.format(rid) + text = ' '.join(text.split()) + if not text.strip(): + text = callsign or 'DMR ID:{}'.format(rid) + return truncate_talker_alias(text) + + +def required_blocks_from_header(block0): + """Blocks (1-4) needed for a TA whose header is in block 0's first byte.""" + if not block0: + return DMRA_BLOCK_COUNT + header = block0[0] + ta_format = (header >> 6) & 0x03 + ta_size = (header >> 1) & 0x1F + if ta_format in (TA_FORMAT_ISO8, TA_FORMAT_UTF8): + needed_bytes = 1 + ta_size # header byte + payload bytes + else: + needed_bytes = 1 + (((ta_size * 7) + 7) // 8) # 7-bit packed chars + count = (needed_bytes + DMRA_PAYLOAD_LEN - 1) // DMRA_PAYLOAD_LEN + return max(1, min(DMRA_BLOCK_COUNT, count)) + + +def passthrough_complete(blocks): + """True when every TA block implied by the header (block 0) was received.""" + if not blocks or 0 not in blocks: + return False + block0 = blocks[0] + if not block0 or len(block0) < 1: + return False + needed = required_blocks_from_header(block0) + return all(i in blocks and blocks[i] and len(blocks[i]) >= DMRA_PAYLOAD_LEN for i in range(needed)) + + +def passthrough_packets_from_blocks(rf_src, blocks): + """Rebuild four DMRA packets from buffered block payloads.""" + packets = [] + for block_id in range(DMRA_BLOCK_COUNT): + payload = blocks.get(block_id, b'\x00' * DMRA_PAYLOAD_LEN) + packets.append(build_dmra_packet(rf_src, block_id, payload)) + return packets + + +def resolve_ta(CONFIG, system_name, rf_src, blocks): + """Decide what TA to emit for this stream. + + Returns a dict {'source': 'passthrough'|'inject', 'text': str, + 'blocks': dict|None} or None when TA is disabled / nothing to send. + """ + settings = ta_settings(CONFIG, system_name) + if not settings['enabled']: + return None + mode = settings['mode'] + have_passthrough = passthrough_complete(blocks) + if mode == 'passthrough': + if not have_passthrough: + return None + return {'source': 'passthrough', 'blocks': dict(blocks), + 'text': decode_ta(buffer_from_blocks(blocks))} + if mode == 'inject': + return {'source': 'inject', 'blocks': None, + 'text': format_ta_text(CONFIG, rf_src)} + # both + if have_passthrough: + return {'source': 'passthrough', 'blocks': dict(blocks), + 'text': decode_ta(buffer_from_blocks(blocks))} + return {'source': 'inject', 'blocks': None, + 'text': format_ta_text(CONFIG, rf_src)} + + +def ta_dmra_packets(rf_src, resolved): + """Standalone DMRA packets for a resolved TA (or [] ).""" + if not resolved: + return [] + if resolved['source'] == 'passthrough': + return passthrough_packets_from_blocks(rf_src, resolved['blocks']) + return build_dmra_packets(rf_src, resolved['text']) + + +def ta_emblc(resolved): + """Embedded-LC (dicts, count) for a resolved TA, or None.""" + if not resolved: + return None + if resolved['source'] == 'passthrough': + return encode_talker_alias_emblc_from_blocks(resolved['blocks']) + return encode_talker_alias_emblc(resolved['text']) + + +# --------------------------------------------------------------------------- +# Per-stream embedded-LC state helpers (stored inside the target STATUS dict) +# --------------------------------------------------------------------------- + +def init_embed_state(st, CONFIG, system_name, rf_src, stream_id, blocks): + """Prepare alternating embedded TA state on a target stream status dict. + + Returns the resolved TA dict (so callers can also emit standalone DMRA), + or None when TA is disabled / nothing to inject. + """ + clear_embed_state(st) + resolved = resolve_ta(CONFIG, system_name, rf_src, blocks) + emb = ta_emblc(resolved) + if emb: + emblcs, count = emb + st['TA_EMB'] = emblcs + st['TA_COUNT'] = count + st['TA_PHASE'] = 0 + # First B-E supercycle carries the normal group LC; TA on the next one. + st['TA_ON'] = False + return resolved + + +def clear_embed_state(st): + for k in ('TA_EMB', 'TA_PHASE', 'TA_ON', 'TA_COUNT'): + st.pop(k, None) + + +def rewrite_embed_lc(dmrbits, st, dtype_vseq, emb_key): + """Return dmrbits with the embedded LC rewritten on bursts B-E (dtype 1-4), + alternating one supercycle of group LC and one supercycle of TA blocks. + + Only call this when st has 'TA_EMB' set; otherwise use the legacy path. + """ + ta_emb = st.get('TA_EMB') + if ta_emb is not None and st.get('TA_ON'): + phase = st.get('TA_PHASE', 0) + count = st.get('TA_COUNT', DMRA_BLOCK_COUNT) + frag = ta_emb[phase][dtype_vseq] + if dtype_vseq == 4: + st['TA_ON'] = False + st['TA_PHASE'] = (phase + 1) % max(1, count) + else: + frag = st[emb_key][dtype_vseq] + if dtype_vseq == 4 and ta_emb is not None: + st['TA_ON'] = True + return dmrbits[0:116] + frag + dmrbits[148:264]