Talker Alias: update hblink.py

pull/36/head
Joaquin Madrid Belando 2 days ago
parent 5d3a4af82d
commit aa80f20318

@ -47,6 +47,7 @@ import config
from const import *
from utils import mk_id_dict, try_download,load_json,blake2bsum
from dmr_utils3.utils import int_id, bytes_4
import talker_alias as ta
# Imports for the reporting server
import pickle
@ -690,6 +691,9 @@ class HBSYSTEM(DatagramProtocol):
self._report = _report
self._config = self._CONFIG['SYSTEMS'][self._system]
self._laststrid = {1: b'', 2: b''}
# DMR Talker Alias: per-stream inbound DMRA buffers and stream lookup
self._dmra_by_stream = {}
self._dmra_rf_stream = {}
# Define shortcuts and generic function names based on the type of system we are
@ -730,6 +734,7 @@ class HBSYSTEM(DatagramProtocol):
# Aliased in __init__ to maintenance_loop if system is a master
def master_maintenance_loop(self):
logger.trace('(%s) Master maintenance loop started', self._system)
self.trim_dmra_streams()
remove_list = deque()
for peer in self._peers:
_this_peer = self._peers[peer]
@ -813,6 +818,80 @@ class HBSYSTEM(DatagramProtocol):
# KEEP THE FOLLOWING COMMENTED OUT UNLESS YOU'RE DEBUGGING DEEPLY!!!!
#logger.debug('(%s) TX Packet to %s:%s -- %s', self._system, self._config['MASTER_IP'], self._config['MASTER_PORT'], ahex(_packet))
# ---- DMR Talker Alias (TA) support ------------------------------------
def note_dmrd_stream(self, _peer_id, _rf_src, _stream_id):
# Associate the latest stream with its RF source so buffered DMRA
# (Talker Alias) packets can be matched to the right call.
try:
self._dmra_rf_stream[(_peer_id, _rf_src)] = (_stream_id, time())
except Exception:
pass
def store_dmra_packet(self, _data, _sockaddr=None):
try:
parsed = ta.parse_dmra_packet(_data)
if not parsed:
return
_rf_src, _block_id, _payload = parsed
# Resolve the originating peer from the socket address when possible,
# so the alias is matched to the right peer's stream (not just RID).
_peer_id = None
if _sockaddr is not None and hasattr(self, '_peers'):
for _pid, _pinfo in self._peers.items():
if _pinfo.get('SOCKADDR') == _sockaddr:
_peer_id = _pid
break
# Find the most recent stream from this RF source (peer-scoped if known)
_stream_id = None
_best_t = -1
for (_pid, _src), (_sid, _t) in self._dmra_rf_stream.items():
if _src == _rf_src and (_peer_id is None or _pid == _peer_id) and _t > _best_t:
_stream_id = _sid
_best_t = _t
if _stream_id is None:
_stream_id = _rf_src # fallback key until a DMRD arrives
entry = self._dmra_by_stream.setdefault(_stream_id, {'BLOCKS': {}, 'TIME': time(), 'RFS': _rf_src})
entry['BLOCKS'][_block_id] = _payload
entry['TIME'] = time()
entry['RFS'] = _rf_src
except Exception:
logger.exception('(%s) Talker Alias store failed', self._system)
def get_dmra_blocks(self, _stream_id):
entry = self._dmra_by_stream.get(_stream_id)
if entry:
return entry['BLOCKS']
return None
def clear_dmra_stream(self, _stream_id):
self._dmra_by_stream.pop(_stream_id, None)
def trim_dmra_streams(self, _max_age=180):
now = time()
for _sid in [k for k, v in self._dmra_by_stream.items() if now - v.get('TIME', 0) > _max_age]:
self._dmra_by_stream.pop(_sid, None)
for _key in [k for k, v in self._dmra_rf_stream.items() if now - v[1] > _max_age]:
self._dmra_rf_stream.pop(_key, None)
def send_dmra_to_peers(self, _packets, _exclude=None):
if not hasattr(self, '_peers'):
return
for _peer in self._peers:
if _exclude is not None and _peer == _exclude:
continue
for _pkt in _packets:
self.transport.write(_pkt, self._peers[_peer]['SOCKADDR'])
def send_dmra_system(self, _packets):
try:
if self._config['MODE'] == 'MASTER':
self.send_dmra_to_peers(_packets)
else:
for _pkt in _packets:
self.transport.write(_pkt, self._config['MASTER_SOCKADDR'])
except Exception:
logger.exception('(%s) Talker Alias send failed', self._system)
def send_xlxmaster(self, radio, xlx, mastersock):
radio3 = int.from_bytes(radio, 'big').to_bytes(3, 'big')
radio4 = int.from_bytes(radio, 'big').to_bytes(4, 'big')
@ -958,6 +1037,20 @@ class HBSYSTEM(DatagramProtocol):
self.transport.write(b''.join(pkt), self._peers[_peer]['SOCKADDR'])
#logger.debug('(%s) Packet on TS%s from %s (%s) for destination ID %s repeated to peer: %s (%s) [Stream ID: %s]', self._system, _slot, self._peers[_peer_id]['CALLSIGN'], int_id(_peer_id), int_id(_dst_id), self._peers[_peer]['CALLSIGN'], int_id(_peer), int_id(_stream_id))
# DMR Talker Alias handling (opt-in via GLOBAL TALKER_ALIAS)
if ta.ta_enabled(self._CONFIG, self._system):
self.note_dmrd_stream(_peer_id, _rf_src, _stream_id)
if self._config['REPEAT'] == True and _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD:
try:
_resolved = ta.resolve_ta(self._CONFIG, self._system, _rf_src, self.get_dmra_blocks(_stream_id))
_ta_pkts = ta.ta_dmra_packets(_rf_src, _resolved)
if _ta_pkts:
self.send_dmra_to_peers(_ta_pkts, _exclude=_peer_id)
except Exception:
logger.exception('(%s) Talker Alias local repeat failed', self._system)
if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VTERM:
self.clear_dmra_stream(_stream_id)
# Userland actions -- typically this is the function you subclass for an application
self.dmrd_received(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data)
@ -1138,6 +1231,8 @@ class HBSYSTEM(DatagramProtocol):
elif _command == DMRA:
_peer_id = _data[4:8]
logger.debug('(%s) Peer has sent Talker Alias packet %s', self._system, _data)
if ta.ta_enabled(self._CONFIG, self._system):
self.store_dmra_packet(_data, _sockaddr)
elif _command == PRIN:
logger.info('(%s) *ProxyInfo* Connection from IP:Port: %s', self._system, _data.decode('utf8')[4:])

Loading…
Cancel
Save

Powered by TurnKey Linux.