Fix: apply DMR 3-way interleave to AMBE frames from DV3000

DV3000 outputs raw 72-bit AMBE+2 frames but DMR voice bursts
require 3 frames interleaved into 216-bit payloads.
Interleave: burst_payload[i*3+j] = frame[j].bit[i] (ETSI TS 102 361-1)
Also uses proper AMBE silence frame for padding incomplete triplets.
pull/26/head
Joaquin Madrid Belando 3 weeks ago
parent 7807c17ea9
commit 12948e81ef

@ -41,6 +41,7 @@ import struct
import subprocess
import wave
import logging
from bitarray import bitarray
logger = logging.getLogger(__name__)
@ -71,6 +72,46 @@ DV3K_RATEP_DMR = bytes([
DV3K_PRODID_REQ = bytes([0x61, 0x00, 0x01, 0x00, 0x30])
AMBE_SILENCE = bytes([0xAC, 0xAA, 0x40, 0x20, 0x00, 0x44, 0x40, 0x80, 0x80])
def _interleave_ambe_to_dmr(frames_data):
'''
Convert raw AMBE+2 frames (9 bytes/72 bits each, from DV3000) to DMR burst format.
DMR voice bursts carry 3 AMBE frames per burst (216 bits = 2 x 108 bits).
The 3 frames are interleaved: burst_payload[i*3 + j] = frame[j].bit[i]
where i = 0..71 (bit index within frame), j = 0..2 (frame index).
This matches ETSI TS 102 361-1 voice burst payload format.
The output is compatible with readAMBE.readSingleFile() which splits
the data into 108-bit pairs for mk_voice.pkt_gen().
'''
result = bitarray(endian='big')
for idx in range(0, len(frames_data), 3):
f0 = frames_data[idx] if idx < len(frames_data) else AMBE_SILENCE
f1 = frames_data[idx + 1] if idx + 1 < len(frames_data) else AMBE_SILENCE
f2 = frames_data[idx + 2] if idx + 2 < len(frames_data) else AMBE_SILENCE
b0 = bitarray(endian='big')
b0.frombytes(f0)
b1 = bitarray(endian='big')
b1.frombytes(f1)
b2 = bitarray(endian='big')
b2.frombytes(f2)
burst = bitarray(216, endian='big')
burst.setall(False)
for i in range(72):
burst[i * 3 + 0] = b0[i]
burst[i * 3 + 1] = b1[i]
burst[i * 3 + 2] = b2[i]
result.extend(burst)
return result.tobytes()
def _get_tts_lang(announcement_language):
if announcement_language in _LANG_MAP:
@ -278,9 +319,14 @@ def _encode_ambe_ambeserver(wav_path, ambe_path, host, port):
return False
try:
interleaved = _interleave_ambe_to_dmr(_ambe_frames)
with open(ambe_path, 'wb') as f:
for frame in _ambe_frames:
f.write(frame)
f.write(interleaved)
_bursts = len(_ambe_frames) // 3
if len(_ambe_frames) % 3:
_bursts += 1
logger.info('(TTS-AMBESERVER) Entrelazado DMR: %d frames -> %d bursts (%d bytes)',
len(_ambe_frames), _bursts, len(interleaved))
except Exception as e:
logger.error('(TTS-AMBESERVER) Error escribiendo archivo AMBE: %s', e)
return False

Loading…
Cancel
Save

Powered by TurnKey Linux.