You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
FreeDMR/tests/test_freedmr_dmr_codec.py

266 lines
9.7 KiB

import unittest
from freedmr_dmr_codec import (
EmbeddedLCError,
FullLCError,
LC_SERVICE_OPTIONS_HBLINK_LEGACY,
LC_SERVICE_OPTIONS_NORMAL,
build_group_voice_lc,
SlotTypeError,
decode_embedded_lc,
decode_full_lc,
decode_slot_type,
embedded_lc_fragment_from_payload,
encode_embedded_lc,
encode_full_lc,
encode_full_lc_parity,
encode_slot_type,
payload_with_embedded_lc_fragment,
rs129_parity,
voice,
voice_head_term,
)
GROUP_VOICE_LC = bytes.fromhex("000000000c302f9be5")
HEADER_LC_BITS = (
"000010111100001000000100110101100001111000001100001010111001100000001000"
"010100000111010001100001000000000000001011110100100001110110011100000000"
"0000000000000010111000000000111111001000010110100111"
)
TERMINATOR_LC_BITS = (
"000010111010110100000100000000100001111010111100001010111110000000001000"
"001000000111010011000001000100010100001011000111000001110001000100000000"
"1100010000000010011000000000111001011000010110010100"
)
EMBEDDED_LC_BITS = (
"00001111000011110000011000000110",
"00010111000100010000000000000110",
"00001100000000110011010100011011",
"00010111001101100010001000100010",
)
VOICE_HEAD_TERM_PAYLOAD = bytes.fromhex(
"2b6004101f842dd00df07d41046dff57d75df5de30152e2070b20f803f88c695e2"
)
VOICE_BURST_PAYLOAD = bytes.fromhex(
"b9e881526173002a6bb9e881526134e0f060691173002a6bb9e881526173002a6a"
)
class FreeDmrDmrCodecTest(unittest.TestCase):
def test_full_lc_header_encode_matches_current_codec_fixture(self):
lc = GROUP_VOICE_LC
encoded = encode_full_lc(lc)
self.assertEqual(encoded.to01(), HEADER_LC_BITS)
def test_full_lc_terminator_encode_matches_current_codec_fixture(self):
lc = GROUP_VOICE_LC
encoded = encode_full_lc(lc, terminator=True)
self.assertEqual(encoded.to01(), TERMINATOR_LC_BITS)
def test_embedded_lc_group_voice_encode_matches_current_codec(self):
lc = GROUP_VOICE_LC
encoded = encode_embedded_lc(lc)
self.assertEqual(tuple(fragment.to01() for fragment in encoded), EMBEDDED_LC_BITS)
def test_current_runtime_compatibility_function_names(self):
import freedmr_dmr_codec as dmr_codec
lc = GROUP_VOICE_LC
self.assertEqual(dmr_codec.encode_header_lc(lc).to01(), HEADER_LC_BITS)
self.assertEqual(dmr_codec.encode_terminator_lc(lc).to01(), TERMINATOR_LC_BITS)
self.assertEqual(
tuple(dmr_codec.encode_emblc(lc)[index].to01() for index in (1, 2, 3, 4)),
EMBEDDED_LC_BITS,
)
def test_voice_head_term_decode_matches_current_codec(self):
decoded = voice_head_term(VOICE_HEAD_TERM_PAYLOAD)
self.assertEqual(decoded["LC"], bytes.fromhex("001020000c302f9be5"))
self.assertEqual(decoded["CC"], b"\x01")
self.assertEqual(decoded["DTYPE"], b"\x01")
self.assertEqual(decoded["SYNC"].to01(), "110111111111010101111101011101011101111101011101")
def test_voice_burst_decode_matches_current_codec(self):
decoded = voice(VOICE_BURST_PAYLOAD)
self.assertEqual(
[ambe.to01() for ambe in decoded["AMBE"]],
[
"101110011110100010000001010100100110000101110011000000000010101001101011",
"101110011110100010000001010100100110000101110011000000000010101001101011",
"101110011110100010000001010100100110000101110011000000000010101001101010",
],
)
self.assertEqual(decoded["CC"], b"\x01")
self.assertEqual(decoded["LCSS"], b"\x01")
self.assertEqual(decoded["EMBED"].to01(), "01001110000011110000011000000110")
def test_full_lc_decode_classifies_group_voice(self):
lc = GROUP_VOICE_LC
decoded = decode_full_lc(encode_full_lc(lc))
self.assertEqual(decoded.data, lc)
self.assertEqual(decoded.flco, 0x00)
self.assertTrue(decoded.is_group_call)
self.assertFalse(decoded.is_unit_call)
self.assertEqual(decoded.target_id, 3120)
self.assertEqual(decoded.source_id, 3120101)
def test_build_group_voice_lc_defaults_to_normal_service_options(self):
lc = build_group_voice_lc(bytes.fromhex("00005b"), bytes.fromhex("2f9be5"))
self.assertEqual(lc, bytes.fromhex("00000000005b2f9be5"))
self.assertEqual(decode_full_lc(encode_full_lc(lc)).service_options, LC_SERVICE_OPTIONS_NORMAL)
def test_build_group_voice_lc_can_represent_legacy_hblink_options_explicitly(self):
lc = build_group_voice_lc(
bytes.fromhex("00005b"),
bytes.fromhex("2f9be5"),
service_options=LC_SERVICE_OPTIONS_HBLINK_LEGACY,
)
self.assertEqual(lc, bytes.fromhex("00002000005b2f9be5"))
self.assertEqual(decode_full_lc(encode_full_lc(lc)).service_options, LC_SERVICE_OPTIONS_HBLINK_LEGACY)
def test_full_lc_decode_classifies_unit_voice(self):
lc = bytes.fromhex("030000000c302f9be5")
decoded = decode_full_lc(encode_full_lc(lc))
self.assertEqual(decoded.flco, 0x03)
self.assertFalse(decoded.is_group_call)
self.assertTrue(decoded.is_unit_call)
def test_full_lc_rs129_parity_matches_header_and_terminator_masks(self):
lc = bytes.fromhex("001020000c302f9be5")
self.assertEqual(rs129_parity(lc), bytes.fromhex("4c42cc"))
self.assertEqual(encode_full_lc_parity(lc), bytes.fromhex("dad45a"))
self.assertEqual(encode_full_lc_parity(lc, terminator=True), bytes.fromhex("d5db55"))
def test_full_lc_requires_nine_byte_lc(self):
with self.assertRaises(FullLCError):
encode_full_lc(b"\x00" * 8)
def test_slot_type_encode_matches_current_codec_fixtures(self):
self.assertEqual(encode_slot_type(1, 0x1).to01(), "00010001101110001100")
self.assertEqual(encode_slot_type(1, 0x2).to01(), "00010010101001011001")
self.assertEqual(encode_slot_type(1, 0x3).to01(), "00010011001010110010")
self.assertEqual(encode_slot_type(1, 0x6).to01(), "00010110000011001110")
def test_slot_type_decode_corrects_three_bits(self):
bits = encode_slot_type(1, 0x2)
bits[0] = not bits[0]
bits[7] = not bits[7]
bits[19] = not bits[19]
decoded = decode_slot_type(bits)
self.assertEqual(decoded.color_code, 1)
self.assertEqual(decoded.data_type, 0x2)
self.assertEqual(decoded.name, "VOICE_LC_TERM")
self.assertEqual(decoded.corrected, 3)
def test_slot_type_decode_rejects_uncorrectable_error(self):
bits = encode_slot_type(1, 0x2)
for index in (0, 1, 2, 3):
bits[index] = not bits[index]
with self.assertRaises(SlotTypeError):
decode_slot_type(bits)
def test_slot_type_rejects_values_outside_four_bits(self):
with self.assertRaises(SlotTypeError):
encode_slot_type(16, 0x1)
with self.assertRaises(SlotTypeError):
encode_slot_type(1, 16)
def test_embedded_lc_round_trips_talker_alias_header(self):
lc = bytes.fromhex("04004c43414c4c3132")
encoded = encode_embedded_lc(lc)
decoded = decode_embedded_lc(encoded)
self.assertEqual(decoded.data, lc)
self.assertEqual(decoded.flco, 0x04)
self.assertEqual(decoded.corrected, 0)
def test_embedded_lc_round_trips_gps_info(self):
lc = bytes.fromhex("080007fcfae048b57b")
encoded = encode_embedded_lc(lc)
decoded = decode_embedded_lc(encoded)
self.assertEqual(decoded.data, lc)
self.assertEqual(decoded.flco, 0x08)
def test_encoded_talker_alias_fragments_match_mmdvmhost_layout_fixture(self):
lc = bytes.fromhex("04004c43414c4c3132")
payload = b"\x55" * 33
encoded_payloads = [
payload_with_embedded_lc_fragment(payload, fragment).hex()
for fragment in encode_embedded_lc(lc)
]
self.assertEqual(
encoded_payloads,
[
"555555555555555555555555555550517092855555555555555555555555555555",
"555555555555555555555555555550382441d55555555555555555555555555555",
"5555555555555555555555555555522717b8e55555555555555555555555555555",
"5555555555555555555555555555522b73ae255555555555555555555555555555",
],
)
def test_payload_fragment_helpers_extract_and_apply_embedded_lc(self):
lc = bytes.fromhex("080007fcfae048b57b")
fragment = encode_embedded_lc(lc)[0]
original = b"\x55" * 33
updated = payload_with_embedded_lc_fragment(original, fragment)
extracted = embedded_lc_fragment_from_payload(updated)
self.assertEqual(extracted, fragment)
self.assertEqual(updated[:14], original[:14])
self.assertEqual(updated[19:], original[19:])
def test_embedded_lc_decode_corrects_single_bit_error(self):
lc = bytes.fromhex("04004c43414c4c3132")
fragments = list(encode_embedded_lc(lc))
fragments[0] = fragments[0].copy()
fragments[0][0] = not fragments[0][0]
decoded = decode_embedded_lc(fragments)
self.assertEqual(decoded.data, lc)
self.assertEqual(decoded.corrected, 1)
def test_embedded_lc_decode_rejects_uncorrectable_error(self):
lc = bytes.fromhex("04004c43414c4c3132")
fragments = list(encode_embedded_lc(lc))
fragments[0] = fragments[0].copy()
fragments[0][0] = not fragments[0][0]
fragments[0][8] = not fragments[0][8]
with self.assertRaises(EmbeddedLCError):
decode_embedded_lc(fragments)
def test_embedded_lc_requires_nine_byte_lc(self):
with self.assertRaises(EmbeddedLCError):
encode_embedded_lc(b"\x00" * 8)
if __name__ == "__main__":
unittest.main()

Powered by TurnKey Linux.