Merge pull request #36 from ea5gvk/pruebas

Añadida Funcion Talker Alias
pull/37/head
Joaquin Madrid Belando 2 days ago committed by GitHub
commit 9be63326c4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -71,6 +71,7 @@ import log
from const import *
from mk_voice import pkt_gen
from utils import load_json, save_json
import talker_alias as ta
#from voice_lib import words
#Read voices
@ -838,6 +839,11 @@ def threadAlias():
def setAlias(_peer_ids,_subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums):
peer_ids, subscriber_ids, talkgroup_ids,local_subscriber_ids,server_ids,checksums = _peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids,_server_ids,_checksums
# Build Talker Alias subscriber profiles (id -> {callsign}) for inject mode
try:
CONFIG['_TA_PROFILES'] = {rid: {'callsign': cs} for rid, cs in (_subscriber_ids or {}).items()}
except Exception:
CONFIG['_TA_PROFILES'] = {}
def aliasb():
_peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums = mk_aliases(CONFIG)
@ -1985,6 +1991,12 @@ class routerOBP(OPENBRIDGE):
_target_status[_stream_id]['H_LC'] = bptc.encode_header_lc(dst_lc)
_target_status[_stream_id]['T_LC'] = bptc.encode_terminator_lc(dst_lc)
_target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(dst_lc)
try:
if ta.ta_enabled(self._CONFIG, self._system):
_ta_blocks = self.get_dmra_blocks(_stream_id) if hasattr(self, 'get_dmra_blocks') else None
ta.init_embed_state(_target_status[_stream_id], self._CONFIG, self._system, _rf_src, _stream_id, _ta_blocks)
except Exception:
logger.exception('(%s) Talker Alias embed init (OBP target) failed', self._system)
logger.debug('(%s) Conference Bridge: %s, Call Bridged to OBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
if CONFIG['REPORTS']['REPORT']:
@ -2039,7 +2051,14 @@ class routerOBP(OPENBRIDGE):
systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore'))
# Create a Burst B-E packet (Embedded LC)
elif _dtype_vseq in [1,2,3,4]:
dmrbits = dmrbits[0:116] + _target_status[_stream_id]['EMB_LC'][_dtype_vseq] + dmrbits[148:264]
_ta_st = _target_status[_stream_id]
if _ta_st.get('TA_EMB'):
try:
dmrbits = ta.rewrite_embed_lc(dmrbits, _ta_st, _dtype_vseq, 'EMB_LC')
except Exception:
dmrbits = dmrbits[0:116] + _ta_st['EMB_LC'][_dtype_vseq] + dmrbits[148:264]
else:
dmrbits = dmrbits[0:116] + _ta_st['EMB_LC'][_dtype_vseq] + dmrbits[148:264]
dmrpkt = dmrbits.tobytes()
_tmp_data = b''.join([_tmp_data, dmrpkt])
@ -2087,6 +2106,16 @@ class routerOBP(OPENBRIDGE):
_target_status[_target['TS']]['TX_H_LC'] = bptc.encode_header_lc(dst_lc)
_target_status[_target['TS']]['TX_T_LC'] = bptc.encode_terminator_lc(dst_lc)
_target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc)
try:
if ta.ta_enabled(self._CONFIG, self._system):
_ta_blocks = self.get_dmra_blocks(_stream_id) if hasattr(self, 'get_dmra_blocks') else None
_ta_resolved = ta.init_embed_state(_target_status[_target['TS']], self._CONFIG, self._system, _rf_src, _stream_id, _ta_blocks)
if _target_system['MODE'] in ('MASTER', 'PEER'):
_ta_pkts = ta.ta_dmra_packets(_rf_src, _ta_resolved)
if _ta_pkts and hasattr(systems[_target['SYSTEM']], 'send_dmra_system'):
systems[_target['SYSTEM']].send_dmra_system(_ta_pkts)
except Exception:
logger.exception('(%s) Talker Alias embed init (HBP target) failed', self._system)
logger.debug('(%s) Generating TX FULL and EMB LCs for HomeBrew destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
logger.debug('(%s) Conference Bridge: %s, Call Bridged to HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
if CONFIG['REPORTS']['REPORT']:
@ -2121,7 +2150,14 @@ class routerOBP(OPENBRIDGE):
systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore'))
# Create a Burst B-E packet (Embedded LC)
elif _dtype_vseq in [1,2,3,4]:
dmrbits = dmrbits[0:116] + _target_status[_target['TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264]
_ta_st = _target_status[_target['TS']]
if _ta_st.get('TA_EMB'):
try:
dmrbits = ta.rewrite_embed_lc(dmrbits, _ta_st, _dtype_vseq, 'TX_EMB_LC')
except Exception:
dmrbits = dmrbits[0:116] + _ta_st['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264]
else:
dmrbits = dmrbits[0:116] + _ta_st['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264]
dmrpkt = dmrbits.tobytes()
#_tmp_data = b''.join([_tmp_data, dmrpkt, b'\x00\x00']) # Add two bytes of nothing since OBP doesn't include BER & RSSI bytes #_data[53:55]
_tmp_data = b''.join([_tmp_data, dmrpkt])
@ -2653,6 +2689,12 @@ class routerHBP(HBSYSTEM):
_target_status[_stream_id]['H_LC'] = bptc.encode_header_lc(dst_lc)
_target_status[_stream_id]['T_LC'] = bptc.encode_terminator_lc(dst_lc)
_target_status[_stream_id]['EMB_LC'] = bptc.encode_emblc(dst_lc)
try:
if ta.ta_enabled(self._CONFIG, self._system):
_ta_blocks = self.get_dmra_blocks(_stream_id) if hasattr(self, 'get_dmra_blocks') else None
ta.init_embed_state(_target_status[_stream_id], self._CONFIG, self._system, _rf_src, _stream_id, _ta_blocks)
except Exception:
logger.exception('(%s) Talker Alias embed init (OBP target) failed', self._system)
logger.debug('(%s) Conference Bridge: %s, Call Bridged to OBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
if CONFIG['REPORTS']['REPORT']:
@ -2695,7 +2737,14 @@ class routerHBP(HBSYSTEM):
systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore'))
# Create a Burst B-E packet (Embedded LC)
elif _dtype_vseq in [1,2,3,4]:
dmrbits = dmrbits[0:116] + _target_status[_stream_id]['EMB_LC'][_dtype_vseq] + dmrbits[148:264]
_ta_st = _target_status[_stream_id]
if _ta_st.get('TA_EMB'):
try:
dmrbits = ta.rewrite_embed_lc(dmrbits, _ta_st, _dtype_vseq, 'EMB_LC')
except Exception:
dmrbits = dmrbits[0:116] + _ta_st['EMB_LC'][_dtype_vseq] + dmrbits[148:264]
else:
dmrbits = dmrbits[0:116] + _ta_st['EMB_LC'][_dtype_vseq] + dmrbits[148:264]
dmrpkt = dmrbits.tobytes()
_tmp_data = b''.join([_tmp_data, dmrpkt])
@ -2739,6 +2788,16 @@ class routerHBP(HBSYSTEM):
_target_status[_target['TS']]['TX_H_LC'] = bptc.encode_header_lc(dst_lc)
_target_status[_target['TS']]['TX_T_LC'] = bptc.encode_terminator_lc(dst_lc)
_target_status[_target['TS']]['TX_EMB_LC'] = bptc.encode_emblc(dst_lc)
try:
if ta.ta_enabled(self._CONFIG, self._system):
_ta_blocks = self.get_dmra_blocks(_stream_id) if hasattr(self, 'get_dmra_blocks') else None
_ta_resolved = ta.init_embed_state(_target_status[_target['TS']], self._CONFIG, self._system, _rf_src, _stream_id, _ta_blocks)
if _target_system['MODE'] in ('MASTER', 'PEER'):
_ta_pkts = ta.ta_dmra_packets(_rf_src, _ta_resolved)
if _ta_pkts and hasattr(systems[_target['SYSTEM']], 'send_dmra_system'):
systems[_target['SYSTEM']].send_dmra_system(_ta_pkts)
except Exception:
logger.exception('(%s) Talker Alias embed init (HBP target) failed', self._system)
logger.debug('(%s) Generating TX FULL and EMB LCs for HomeBrew destination: System: %s, TS: %s, TGID: %s', self._system, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
logger.debug('(%s) Conference Bridge: %s, Call Bridged to HBP System: %s TS: %s, TGID: %s', self._system, _bridge, _target['SYSTEM'], _target['TS'], int_id(_target['TGID']))
if CONFIG['REPORTS']['REPORT']:
@ -2773,7 +2832,14 @@ class routerHBP(HBSYSTEM):
systems[_target['SYSTEM']]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(_target['SYSTEM'], int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _target['TS'], int_id(_target['TGID']), call_duration).encode(encoding='utf-8', errors='ignore'))
# Create a Burst B-E packet (Embedded LC)
elif _dtype_vseq in [1,2,3,4]:
dmrbits = dmrbits[0:116] + _target_status[_target['TS']]['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264]
_ta_st = _target_status[_target['TS']]
if _ta_st.get('TA_EMB'):
try:
dmrbits = ta.rewrite_embed_lc(dmrbits, _ta_st, _dtype_vseq, 'TX_EMB_LC')
except Exception:
dmrbits = dmrbits[0:116] + _ta_st['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264]
else:
dmrbits = dmrbits[0:116] + _ta_st['TX_EMB_LC'][_dtype_vseq] + dmrbits[148:264]
try:
dmrpkt = dmrbits.tobytes()
except AttributeError:

@ -159,6 +159,9 @@ def build_config(_config_file):
'VALIDATE_SERVER_IDS': config.getboolean(section, 'VALIDATE_SERVER_IDS', fallback=True),
'DEBUG_BRIDGES' : config.getboolean(section, 'DEBUG_BRIDGES', fallback=False),
'ENABLE_API' : config.getboolean(section, 'ENABLE_API', fallback=False),
'TALKER_ALIAS': config.getboolean(section, 'TALKER_ALIAS', fallback=False),
'TALKER_ALIAS_MODE': config.get(section, 'TALKER_ALIAS_MODE', fallback='both'),
'TALKER_ALIAS_FORMAT': config.get(section, 'TALKER_ALIAS_FORMAT', fallback='{callsign} {fname}'),
})
if not CONFIG['GLOBAL']['ANNOUNCEMENT_LANGUAGES']:
CONFIG['GLOBAL']['ANNOUNCEMENT_LANGUAGES'] = languages

@ -17,15 +17,15 @@
# The 'action' May be PERMIT|DENY
# Each entry may be a single radio id, or a hypenated range (e.g. 1-2999)
# Format:
# ACL = 'action:id|start-end|,id|start-end,....'
# --for example--
# SUB_ACL: DENY:1,1000-2000,4500-60000,17
# ACL = 'action:id|start-end|,id|start-end,....'
# --for example--
# SUB_ACL: DENY:1,1000-2000,4500-60000,17
#
# ACL Types:
# REG_ACL: peer radio IDs for registration (only used on HBP master systems)
# SUB_ACL: subscriber IDs for end-users
# TGID_TS1_ACL: destination talkgroup IDs on Timeslot 1
# TGID_TS2_ACL: destination talkgroup IDs on Timeslot 2
# REG_ACL: peer radio IDs for registration (only used on HBP master systems)
# SUB_ACL: subscriber IDs for end-users
# TGID_TS1_ACL: destination talkgroup IDs on Timeslot 1
# TGID_TS2_ACL: destination talkgroup IDs on Timeslot 2
#
# ACLs may be repeated for individual systems if needed for granularity
# Global ACLs will be processed BEFORE the system level ACLs
@ -51,6 +51,15 @@ SERVER_ID: 0000
DATA_GATEWAY: False
VALIDATE_SERVER_IDS: True
# DMR Talker Alias (apagado por defecto)
# TALKER_ALIAS - True para activar el reenvio/insercion de Talker Alias (DMRA + LC embebida)
# TALKER_ALIAS_MODE - passthrough (solo reenvia el TA recibido), inject (genera el TA desde el alias del suscriptor)
# o both (reenvia si llega completo, si no lo genera). Por defecto: both
# TALKER_ALIAS_FORMAT - plantilla del texto generado en modo inject/both. Campos: {callsign} {fname} {surname} {id}
TALKER_ALIAS: False
TALKER_ALIAS_MODE: both
TALKER_ALIAS_FORMAT: {callsign} {fname}
# Servidor de seguridad centralizado
URL_SECURITY:
PORT_SECURITY:

@ -13,6 +13,11 @@ SERVER_ID: 0000
DATA_GATEWAY: False
VALIDATE_SERVER_IDS: True
# DMR Talker Alias (apagado por defecto). MODE: passthrough | inject | both
TALKER_ALIAS: False
TALKER_ALIAS_MODE: both
TALKER_ALIAS_FORMAT: {callsign} {fname}
# Servidor de seguridad centralizado
URL_SECURITY:
PORT_SECURITY:

@ -13,6 +13,13 @@ SERVER_ID: 0000
DATA_GATEWAY: False
VALIDATE_SERVER_IDS: True
# DMR Talker Alias (apagado por defecto). Pon True para activarlo.
# TALKER_ALIAS_MODE: passthrough | inject | both
# TALKER_ALIAS_FORMAT: plantilla con campos {callsign} {fname} {surname} {id}
TALKER_ALIAS: False
TALKER_ALIAS_MODE: both
TALKER_ALIAS_FORMAT: {callsign} {fname}
# Servidor de seguridad centralizado
URL_SECURITY:
PORT_SECURITY:

@ -47,6 +47,7 @@ import config
from const import *
from utils import mk_id_dict, try_download,load_json,blake2bsum
from dmr_utils3.utils import int_id, bytes_4
import talker_alias as ta
# Imports for the reporting server
import pickle
@ -690,6 +691,9 @@ class HBSYSTEM(DatagramProtocol):
self._report = _report
self._config = self._CONFIG['SYSTEMS'][self._system]
self._laststrid = {1: b'', 2: b''}
# DMR Talker Alias: per-stream inbound DMRA buffers and stream lookup
self._dmra_by_stream = {}
self._dmra_rf_stream = {}
# Define shortcuts and generic function names based on the type of system we are
@ -730,6 +734,7 @@ class HBSYSTEM(DatagramProtocol):
# Aliased in __init__ to maintenance_loop if system is a master
def master_maintenance_loop(self):
logger.trace('(%s) Master maintenance loop started', self._system)
self.trim_dmra_streams()
remove_list = deque()
for peer in self._peers:
_this_peer = self._peers[peer]
@ -813,6 +818,80 @@ class HBSYSTEM(DatagramProtocol):
# KEEP THE FOLLOWING COMMENTED OUT UNLESS YOU'RE DEBUGGING DEEPLY!!!!
#logger.debug('(%s) TX Packet to %s:%s -- %s', self._system, self._config['MASTER_IP'], self._config['MASTER_PORT'], ahex(_packet))
# ---- DMR Talker Alias (TA) support ------------------------------------
def note_dmrd_stream(self, _peer_id, _rf_src, _stream_id):
# Associate the latest stream with its RF source so buffered DMRA
# (Talker Alias) packets can be matched to the right call.
try:
self._dmra_rf_stream[(_peer_id, _rf_src)] = (_stream_id, time())
except Exception:
pass
def store_dmra_packet(self, _data, _sockaddr=None):
try:
parsed = ta.parse_dmra_packet(_data)
if not parsed:
return
_rf_src, _block_id, _payload = parsed
# Resolve the originating peer from the socket address when possible,
# so the alias is matched to the right peer's stream (not just RID).
_peer_id = None
if _sockaddr is not None and hasattr(self, '_peers'):
for _pid, _pinfo in self._peers.items():
if _pinfo.get('SOCKADDR') == _sockaddr:
_peer_id = _pid
break
# Find the most recent stream from this RF source (peer-scoped if known)
_stream_id = None
_best_t = -1
for (_pid, _src), (_sid, _t) in self._dmra_rf_stream.items():
if _src == _rf_src and (_peer_id is None or _pid == _peer_id) and _t > _best_t:
_stream_id = _sid
_best_t = _t
if _stream_id is None:
_stream_id = _rf_src # fallback key until a DMRD arrives
entry = self._dmra_by_stream.setdefault(_stream_id, {'BLOCKS': {}, 'TIME': time(), 'RFS': _rf_src})
entry['BLOCKS'][_block_id] = _payload
entry['TIME'] = time()
entry['RFS'] = _rf_src
except Exception:
logger.exception('(%s) Talker Alias store failed', self._system)
def get_dmra_blocks(self, _stream_id):
entry = self._dmra_by_stream.get(_stream_id)
if entry:
return entry['BLOCKS']
return None
def clear_dmra_stream(self, _stream_id):
self._dmra_by_stream.pop(_stream_id, None)
def trim_dmra_streams(self, _max_age=180):
now = time()
for _sid in [k for k, v in self._dmra_by_stream.items() if now - v.get('TIME', 0) > _max_age]:
self._dmra_by_stream.pop(_sid, None)
for _key in [k for k, v in self._dmra_rf_stream.items() if now - v[1] > _max_age]:
self._dmra_rf_stream.pop(_key, None)
def send_dmra_to_peers(self, _packets, _exclude=None):
if not hasattr(self, '_peers'):
return
for _peer in self._peers:
if _exclude is not None and _peer == _exclude:
continue
for _pkt in _packets:
self.transport.write(_pkt, self._peers[_peer]['SOCKADDR'])
def send_dmra_system(self, _packets):
try:
if self._config['MODE'] == 'MASTER':
self.send_dmra_to_peers(_packets)
else:
for _pkt in _packets:
self.transport.write(_pkt, self._config['MASTER_SOCKADDR'])
except Exception:
logger.exception('(%s) Talker Alias send failed', self._system)
def send_xlxmaster(self, radio, xlx, mastersock):
radio3 = int.from_bytes(radio, 'big').to_bytes(3, 'big')
radio4 = int.from_bytes(radio, 'big').to_bytes(4, 'big')
@ -958,6 +1037,20 @@ class HBSYSTEM(DatagramProtocol):
self.transport.write(b''.join(pkt), self._peers[_peer]['SOCKADDR'])
#logger.debug('(%s) Packet on TS%s from %s (%s) for destination ID %s repeated to peer: %s (%s) [Stream ID: %s]', self._system, _slot, self._peers[_peer_id]['CALLSIGN'], int_id(_peer_id), int_id(_dst_id), self._peers[_peer]['CALLSIGN'], int_id(_peer), int_id(_stream_id))
# DMR Talker Alias handling (opt-in via GLOBAL TALKER_ALIAS)
if ta.ta_enabled(self._CONFIG, self._system):
self.note_dmrd_stream(_peer_id, _rf_src, _stream_id)
if self._config['REPEAT'] == True and _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD:
try:
_resolved = ta.resolve_ta(self._CONFIG, self._system, _rf_src, self.get_dmra_blocks(_stream_id))
_ta_pkts = ta.ta_dmra_packets(_rf_src, _resolved)
if _ta_pkts:
self.send_dmra_to_peers(_ta_pkts, _exclude=_peer_id)
except Exception:
logger.exception('(%s) Talker Alias local repeat failed', self._system)
if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VTERM:
self.clear_dmra_stream(_stream_id)
# Userland actions -- typically this is the function you subclass for an application
self.dmrd_received(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data)
@ -1138,6 +1231,8 @@ class HBSYSTEM(DatagramProtocol):
elif _command == DMRA:
_peer_id = _data[4:8]
logger.debug('(%s) Peer has sent Talker Alias packet %s', self._system, _data)
if ta.ta_enabled(self._CONFIG, self._system):
self.store_dmra_packet(_data, _sockaddr)
elif _command == PRIN:
logger.info('(%s) *ProxyInfo* Connection from IP:Port: %s', self._system, _data.decode('utf8')[4:])

@ -0,0 +1,360 @@
###############################################################################
# ADN DMR Peer Server - DMR Talker Alias (ETSI / MMDVMHost)
#
# Talker Alias (ETSI TS 102 361-2): short alphanumeric label carried in the
# DMR voice stream. Two transports are supported here:
# 1) Standalone HBP "DMRA" UDP packets (15 bytes, up to 4 blocks per TX).
# 2) Embedded LC inside the forwarded DMRD voice (FLCO 4-7, bursts B-E),
# which is what MMDVMHost / Pi-Star actually decode.
#
# Ported from the clean-architecture implementation in the ce5rpy fork to
# this monolithic codebase. UTF-8 encoding (format 2), same as MMDVMHost
# DMRTA.cpp. Everything here is opt-in via the GLOBAL TALKER_ALIAS switch;
# when disabled the bridge forwarding path behaves exactly as before.
###############################################################################
TALKER_ALIAS_MAX_LEN = 29
TA_FORMAT_7BIT = 0
TA_FORMAT_ISO8 = 1
TA_FORMAT_UTF8 = 2
DMRA_OPCODE = b'DMRA'
DMRA_PACKET_LEN = 15
DMRA_BLOCK_COUNT = 4
DMRA_PAYLOAD_LEN = 7
DMRA_BUF_LEN = DMRA_BLOCK_COUNT * DMRA_PAYLOAD_LEN # 28
FLCO_TALKER_ALIAS_HEADER = 4 # FLCO 4 = TA block 0; 5,6,7 = blocks 1,2,3
VALID_MODES = frozenset({'inject', 'passthrough', 'both'})
def _int_id(_bytes):
return int.from_bytes(_bytes, 'big')
# ---------------------------------------------------------------------------
# Encode / decode
# ---------------------------------------------------------------------------
def truncate_talker_alias(text):
"""Hard-cap at protocol maximum (not user-configurable)."""
if len(text) <= TALKER_ALIAS_MAX_LEN:
return text
return text[:TALKER_ALIAS_MAX_LEN]
def decode_ta(buf):
"""Decode a 28-byte TA buffer (formats 0-3). Mirrors MMDVMHost CDMRTA::decodeTA."""
if len(buf) < 1:
return ''
ta_format = (buf[0] >> 6) & 0x03
ta_size = (buf[0] >> 1) & 0x1F
if ta_format == TA_FORMAT_ISO8 or ta_format == TA_FORMAT_UTF8:
raw = buf[1:1 + ta_size]
return raw.decode('latin-1', errors='replace')
if ta_format != TA_FORMAT_7BIT:
return ''
out = bytearray()
t1 = 0
t2 = 0
c = 0
for i in range(min(32, len(buf))):
if t2 >= ta_size:
break
for j in range(7, -1, -1):
c = ((c << 1) | ((buf[i] >> j) & 1)) & 0xFF
t1 += 1
if t1 == 7:
if i > 0:
out.append(c & 0x7F)
t2 += 1
if t2 >= ta_size:
break
t1 = 0
c = 0
return out[:ta_size].decode('ascii', errors='replace')
def encode_utf8(text):
"""Encode text into a 28-byte TA buffer (format 2 / UTF-8)."""
text = truncate_talker_alias(text)
raw = text.encode('utf-8')
if len(raw) > 27:
raw = text.encode('utf-8')[:27].decode('utf-8', errors='ignore').encode('utf-8')
size = len(raw)
header = (TA_FORMAT_UTF8 << 6) | (size << 1) | 0
buf = bytearray(DMRA_BUF_LEN)
buf[0] = header
buf[1:1 + size] = raw
return bytes(buf)
def blocks_from_buffer(buf):
"""Split a 28-byte encoded buffer into four 7-byte DMRA payloads."""
buf = buf.ljust(DMRA_BUF_LEN, b'\x00')[:DMRA_BUF_LEN]
return [buf[i * DMRA_PAYLOAD_LEN:(i + 1) * DMRA_PAYLOAD_LEN] for i in range(DMRA_BLOCK_COUNT)]
def required_ta_block_count(buf):
"""Number of blocks 1-4 actually needed (skip trailing zero payloads)."""
blocks = blocks_from_buffer(buf)
last = 0
for i in range(DMRA_BLOCK_COUNT):
if blocks[i] != b'\x00' * DMRA_PAYLOAD_LEN:
last = i
return max(1, last + 1)
def buffer_from_blocks(blocks):
"""Merge up to four block payloads (dict block_id->payload) into a 28-byte buffer."""
buf = bytearray(DMRA_BUF_LEN)
for block_id, payload in blocks.items():
if 0 <= block_id < DMRA_BLOCK_COUNT and payload:
start = block_id * DMRA_PAYLOAD_LEN
buf[start:start + DMRA_PAYLOAD_LEN] = payload[:DMRA_PAYLOAD_LEN]
return bytes(buf)
# ---------------------------------------------------------------------------
# Standalone DMRA packet builders / parser
# ---------------------------------------------------------------------------
def build_dmra_packets(rf_src, text):
"""Build the 1-4 HBP DMRA packets needed to carry 'text' (server injection)."""
rf = rf_src[:3] if len(rf_src) >= 3 else rf_src.ljust(3, b'\x00')[:3]
encoded = encode_utf8(text)
blocks = blocks_from_buffer(encoded)
count = required_ta_block_count(encoded)
packets = []
for block_id in range(count):
packets.append(DMRA_OPCODE + rf + bytes([block_id]) + blocks[block_id])
return packets
def build_dmra_packet(rf_src, block_id, payload):
"""Build one 15-byte DMRA packet (pass-through of a buffered block)."""
rf = rf_src[:3] if len(rf_src) >= 3 else rf_src.ljust(3, b'\x00')[:3]
block = max(0, min(3, int(block_id)))
pl = payload[:DMRA_PAYLOAD_LEN].ljust(DMRA_PAYLOAD_LEN, b'\x00')
return DMRA_OPCODE + rf + bytes([block]) + pl
def parse_dmra_packet(data):
"""Parse a DMRA packet; returns (rf_src, block_id, payload) or None."""
if len(data) < DMRA_PACKET_LEN or data[:4] != DMRA_OPCODE:
return None
rf_src = data[4:7]
block_id = data[7]
payload = data[8:15]
return rf_src, block_id, payload
# ---------------------------------------------------------------------------
# Embedded LC (FLCO 4-7) builders
# ---------------------------------------------------------------------------
def talker_alias_lc_bytes(block_id, payload7):
"""9-byte embedded LC for one TA fragment (FLCO 4-7 + FID 0x00 + 7 payload)."""
block = max(0, min(3, int(block_id)))
payload = payload7[:DMRA_PAYLOAD_LEN].ljust(DMRA_PAYLOAD_LEN, b'\x00')
return bytes([FLCO_TALKER_ALIAS_HEADER + block, 0x00]) + payload
def encode_talker_alias_emblc(text):
"""Embedded-LC dicts for TA blocks 0..N-1 plus block count N (1-4)."""
from dmr_utils3 import bptc
encoded = encode_utf8(text)
blocks = blocks_from_buffer(encoded)
count = required_ta_block_count(encoded)
emblcs = [bptc.encode_emblc(talker_alias_lc_bytes(i, blocks[i])) for i in range(count)]
return emblcs, count
def encode_talker_alias_emblc_from_blocks(blocks):
"""Build embedded TA LC dicts from buffered (passthrough) DMRA payloads."""
from dmr_utils3 import bptc
buf = buffer_from_blocks(blocks)
buf_blocks = blocks_from_buffer(buf)
count = required_ta_block_count(buf)
emblcs = [bptc.encode_emblc(talker_alias_lc_bytes(i, buf_blocks[i])) for i in range(count)]
return emblcs, count
# ---------------------------------------------------------------------------
# Policy: settings, text formatting, inject vs passthrough
# ---------------------------------------------------------------------------
def ta_settings(CONFIG, system_name=None):
"""Effective Talker Alias settings (GLOBAL with optional per-system override)."""
g = CONFIG.get('GLOBAL', {})
sys_cfg = CONFIG.get('SYSTEMS', {}).get(system_name, {}) if system_name else {}
enabled = sys_cfg.get('TALKER_ALIAS')
if enabled is None:
enabled = g.get('TALKER_ALIAS', False)
mode = sys_cfg.get('TALKER_ALIAS_MODE')
if mode is None:
mode = g.get('TALKER_ALIAS_MODE', 'both')
if mode not in VALID_MODES:
mode = 'both'
fmt = sys_cfg.get('TALKER_ALIAS_FORMAT')
if fmt is None:
fmt = g.get('TALKER_ALIAS_FORMAT', '{callsign} {fname}')
return {'enabled': bool(enabled), 'mode': mode, 'format': str(fmt)}
def ta_enabled(CONFIG, system_name=None):
return ta_settings(CONFIG, system_name)['enabled']
def format_ta_text(CONFIG, rf_src):
"""Build the display string from the subscriber profile + template."""
settings = ta_settings(CONFIG)
template = settings['format']
rid = _int_id(rf_src)
profiles = CONFIG.get('_TA_PROFILES', {})
profile = profiles.get(rid, {})
callsign = profile.get('callsign') or ''
fname = profile.get('fname') or ''
surname = profile.get('surname') or ''
if profile.get('talker_alias'):
text = str(profile['talker_alias'])
else:
try:
text = template.format(callsign=callsign, fname=fname, surname=surname, id=rid)
except (KeyError, ValueError, IndexError):
text = callsign or 'DMR ID:{}'.format(rid)
text = ' '.join(text.split())
if not text.strip():
text = callsign or 'DMR ID:{}'.format(rid)
return truncate_talker_alias(text)
def required_blocks_from_header(block0):
"""Blocks (1-4) needed for a TA whose header is in block 0's first byte."""
if not block0:
return DMRA_BLOCK_COUNT
header = block0[0]
ta_format = (header >> 6) & 0x03
ta_size = (header >> 1) & 0x1F
if ta_format in (TA_FORMAT_ISO8, TA_FORMAT_UTF8):
needed_bytes = 1 + ta_size # header byte + payload bytes
else:
needed_bytes = 1 + (((ta_size * 7) + 7) // 8) # 7-bit packed chars
count = (needed_bytes + DMRA_PAYLOAD_LEN - 1) // DMRA_PAYLOAD_LEN
return max(1, min(DMRA_BLOCK_COUNT, count))
def passthrough_complete(blocks):
"""True when every TA block implied by the header (block 0) was received."""
if not blocks or 0 not in blocks:
return False
block0 = blocks[0]
if not block0 or len(block0) < 1:
return False
needed = required_blocks_from_header(block0)
return all(i in blocks and blocks[i] and len(blocks[i]) >= DMRA_PAYLOAD_LEN for i in range(needed))
def passthrough_packets_from_blocks(rf_src, blocks):
"""Rebuild four DMRA packets from buffered block payloads."""
packets = []
for block_id in range(DMRA_BLOCK_COUNT):
payload = blocks.get(block_id, b'\x00' * DMRA_PAYLOAD_LEN)
packets.append(build_dmra_packet(rf_src, block_id, payload))
return packets
def resolve_ta(CONFIG, system_name, rf_src, blocks):
"""Decide what TA to emit for this stream.
Returns a dict {'source': 'passthrough'|'inject', 'text': str,
'blocks': dict|None} or None when TA is disabled / nothing to send.
"""
settings = ta_settings(CONFIG, system_name)
if not settings['enabled']:
return None
mode = settings['mode']
have_passthrough = passthrough_complete(blocks)
if mode == 'passthrough':
if not have_passthrough:
return None
return {'source': 'passthrough', 'blocks': dict(blocks),
'text': decode_ta(buffer_from_blocks(blocks))}
if mode == 'inject':
return {'source': 'inject', 'blocks': None,
'text': format_ta_text(CONFIG, rf_src)}
# both
if have_passthrough:
return {'source': 'passthrough', 'blocks': dict(blocks),
'text': decode_ta(buffer_from_blocks(blocks))}
return {'source': 'inject', 'blocks': None,
'text': format_ta_text(CONFIG, rf_src)}
def ta_dmra_packets(rf_src, resolved):
"""Standalone DMRA packets for a resolved TA (or [] )."""
if not resolved:
return []
if resolved['source'] == 'passthrough':
return passthrough_packets_from_blocks(rf_src, resolved['blocks'])
return build_dmra_packets(rf_src, resolved['text'])
def ta_emblc(resolved):
"""Embedded-LC (dicts, count) for a resolved TA, or None."""
if not resolved:
return None
if resolved['source'] == 'passthrough':
return encode_talker_alias_emblc_from_blocks(resolved['blocks'])
return encode_talker_alias_emblc(resolved['text'])
# ---------------------------------------------------------------------------
# Per-stream embedded-LC state helpers (stored inside the target STATUS dict)
# ---------------------------------------------------------------------------
def init_embed_state(st, CONFIG, system_name, rf_src, stream_id, blocks):
"""Prepare alternating embedded TA state on a target stream status dict.
Returns the resolved TA dict (so callers can also emit standalone DMRA),
or None when TA is disabled / nothing to inject.
"""
clear_embed_state(st)
resolved = resolve_ta(CONFIG, system_name, rf_src, blocks)
emb = ta_emblc(resolved)
if emb:
emblcs, count = emb
st['TA_EMB'] = emblcs
st['TA_COUNT'] = count
st['TA_PHASE'] = 0
# First B-E supercycle carries the normal group LC; TA on the next one.
st['TA_ON'] = False
return resolved
def clear_embed_state(st):
for k in ('TA_EMB', 'TA_PHASE', 'TA_ON', 'TA_COUNT'):
st.pop(k, None)
def rewrite_embed_lc(dmrbits, st, dtype_vseq, emb_key):
"""Return dmrbits with the embedded LC rewritten on bursts B-E (dtype 1-4),
alternating one supercycle of group LC and one supercycle of TA blocks.
Only call this when st has 'TA_EMB' set; otherwise use the legacy path.
"""
ta_emb = st.get('TA_EMB')
if ta_emb is not None and st.get('TA_ON'):
phase = st.get('TA_PHASE', 0)
count = st.get('TA_COUNT', DMRA_BLOCK_COUNT)
frag = ta_emb[phase][dtype_vseq]
if dtype_vseq == 4:
st['TA_ON'] = False
st['TA_PHASE'] = (phase + 1) % max(1, count)
else:
frag = st[emb_key][dtype_vseq]
if dtype_vseq == 4 and ta_emb is not None:
st['TA_ON'] = True
return dmrbits[0:116] + frag + dmrbits[148:264]
Loading…
Cancel
Save

Powered by TurnKey Linux.