Talker Alias: fix embedded-LC FEC bug and enrich inject profiles

* talker_alias.py: replace dmr_utils3.bptc.encode_emblc with a spec-correct
  in-module _encode_emblc. The library version has a transcription bug in burst
  D (uses _binlc[24] twice instead of _binlc[25]) that flips one bit of byte 2
  of every embedded LC. Harmless for the group voice LC (that bit lands in
  Service Options) but for Talker Alias byte 2 is the header (format/length) of
  block 0 and the first text char of blocks 1-3, so it corrupts the alias and
  consumes the embedded-LC error-correction budget. Validated bit-for-bit
  against the known-good real DMR bursts shipped in dmr_utils3.decode.

* talker_alias.py: add load_ta_profiles() and expose {city}/{state}/{country}
  to TALKER_ALIAS_FORMAT.

* bridge_master.py: build _TA_PROFILES from the full subscriber file (callsign +
  fname/surname/city/...) with callsign-only fallback, so inject-mode templates
  like "{callsign} {fname} {city}" work (previously only {callsign} was
  populated, leaving {fname} empty).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
pull/37/head
Adan C31AG 6 hours ago committed by pyopower
parent 9be63326c4
commit bb17826a99

@ -839,7 +839,17 @@ def threadAlias():
def setAlias(_peer_ids,_subscriber_ids, _talkgroup_ids, _local_subscriber_ids, _server_ids, _checksums):
peer_ids, subscriber_ids, talkgroup_ids,local_subscriber_ids,server_ids,checksums = _peer_ids, _subscriber_ids, _talkgroup_ids, _local_subscriber_ids,_server_ids,_checksums
# Build Talker Alias subscriber profiles (id -> {callsign}) for inject mode
# Build Talker Alias subscriber profiles for inject mode. Prefer full
# profiles (callsign + fname/surname/city/...) parsed from the subscriber
# file so templates like '{callsign} {fname} {city}' work; fall back to a
# callsign-only map from the in-memory id->callsign dictionary.
try:
_ta_path = CONFIG['ALIASES']['PATH'] + CONFIG['ALIASES']['SUBSCRIBER_FILE']
_ta_profiles = ta.load_ta_profiles(_ta_path)
if not _ta_profiles:
_ta_profiles = {rid: {'callsign': cs} for rid, cs in (_subscriber_ids or {}).items()}
CONFIG['_TA_PROFILES'] = _ta_profiles
except Exception:
try:
CONFIG['_TA_PROFILES'] = {rid: {'callsign': cs} for rid, cs in (_subscriber_ids or {}).items()}
except Exception:

@ -162,23 +162,58 @@ def talker_alias_lc_bytes(block_id, payload7):
return bytes([FLCO_TALKER_ALIAS_HEADER + block, 0x00]) + payload
def _encode_emblc(_lc):
"""Spec-correct embedded-LC encoder (BPTC 128/72 + 5-bit checksum).
dmr_utils3.bptc.encode_emblc has a transcription bug: in burst D it uses
_binlc[24] twice instead of _binlc[25], flipping one bit of byte 2 of every
embedded LC. Harmless for the group voice LC (the bit lands in Service
Options) but for Talker Alias byte 2 is the TA header (format/length) of
block 0 and the first text char of blocks 1-3, so it corrupts the alias.
The column-major interleave is: out bit (burst b, subrow j, row r) =
_binlc[(b*4 + j) + 16*r], for b,j in 0..3 and r in 0..7.
"""
from dmr_utils3 import hamming, crc
from bitarray import bitarray
_csum = crc.csum5(_lc)
_binlc = bitarray(endian="big")
_binlc.frombytes(_lc)
_binlc.insert(32, _csum[0])
_binlc.insert(43, _csum[1])
_binlc.insert(54, _csum[2])
_binlc.insert(65, _csum[3])
_binlc.insert(76, _csum[4])
for index in range(0, 112, 16):
for hindex, hbit in zip(range(index + 11, index + 16),
hamming.enc_16114(_binlc[index:index + 11])):
_binlc.insert(hindex, hbit)
for index in range(0, 16):
_binlc.insert(index + 112,
_binlc[index] ^ _binlc[index + 16] ^ _binlc[index + 32]
^ _binlc[index + 48] ^ _binlc[index + 64] ^ _binlc[index + 80]
^ _binlc[index + 96])
_idx = [(b * 4 + j) + 16 * r for b in range(4) for j in range(4) for r in range(8)]
out = bitarray(128, endian="big")
for k, i in enumerate(_idx):
out[k] = _binlc[i]
return {1: out[0:32], 2: out[32:64], 3: out[64:96], 4: out[96:128]}
def encode_talker_alias_emblc(text):
"""Embedded-LC dicts for TA blocks 0..N-1 plus block count N (1-4)."""
from dmr_utils3 import bptc
encoded = encode_utf8(text)
blocks = blocks_from_buffer(encoded)
count = required_ta_block_count(encoded)
emblcs = [bptc.encode_emblc(talker_alias_lc_bytes(i, blocks[i])) for i in range(count)]
emblcs = [_encode_emblc(talker_alias_lc_bytes(i, blocks[i])) for i in range(count)]
return emblcs, count
def encode_talker_alias_emblc_from_blocks(blocks):
"""Build embedded TA LC dicts from buffered (passthrough) DMRA payloads."""
from dmr_utils3 import bptc
buf = buffer_from_blocks(blocks)
buf_blocks = blocks_from_buffer(buf)
count = required_ta_block_count(buf)
emblcs = [bptc.encode_emblc(talker_alias_lc_bytes(i, buf_blocks[i])) for i in range(count)]
emblcs = [_encode_emblc(talker_alias_lc_bytes(i, buf_blocks[i])) for i in range(count)]
return emblcs, count
@ -186,6 +221,33 @@ def encode_talker_alias_emblc_from_blocks(blocks):
# Policy: settings, text formatting, inject vs passthrough
# ---------------------------------------------------------------------------
def load_ta_profiles(json_path):
"""Build {id: {callsign, fname, surname, city, state, country}} from a
radioid-style subscriber_ids.json. Lets inject-mode templates use name/QTH.
Returns {} on any error (caller can fall back to a callsign-only map)."""
import json
out = {}
try:
with open(json_path, "r", encoding="latin-1") as fh:
data = json.load(fh)
for rec in data.get("results", []):
try:
rid = int(rec["id"])
except (KeyError, ValueError, TypeError):
continue
out[rid] = {
"callsign": (rec.get("callsign") or "").strip(),
"fname": (rec.get("fname") or "").strip(),
"surname": (rec.get("surname") or "").strip(),
"city": (rec.get("city") or "").strip(),
"state": (rec.get("state") or "").strip(),
"country": (rec.get("country") or "").strip(),
}
except Exception:
return {}
return out
def ta_settings(CONFIG, system_name=None):
"""Effective Talker Alias settings (GLOBAL with optional per-system override)."""
g = CONFIG.get('GLOBAL', {})
@ -218,11 +280,16 @@ def format_ta_text(CONFIG, rf_src):
callsign = profile.get('callsign') or ''
fname = profile.get('fname') or ''
surname = profile.get('surname') or ''
city = profile.get('city') or ''
state = profile.get('state') or ''
country = profile.get('country') or ''
if profile.get('talker_alias'):
text = str(profile['talker_alias'])
else:
try:
text = template.format(callsign=callsign, fname=fname, surname=surname, id=rid)
text = template.format(callsign=callsign, fname=fname,
surname=surname, city=city, state=state,
country=country, id=rid)
except (KeyError, ValueError, IndexError):
text = callsign or 'DMR ID:{}'.format(rid)
text = ' '.join(text.split())

Loading…
Cancel
Save

Powered by TurnKey Linux.