You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
266 lines
9.7 KiB
266 lines
9.7 KiB
import unittest
|
|
|
|
from freedmr_dmr_codec import (
|
|
EmbeddedLCError,
|
|
FullLCError,
|
|
LC_SERVICE_OPTIONS_HBLINK_LEGACY,
|
|
LC_SERVICE_OPTIONS_NORMAL,
|
|
build_group_voice_lc,
|
|
SlotTypeError,
|
|
decode_embedded_lc,
|
|
decode_full_lc,
|
|
decode_slot_type,
|
|
embedded_lc_fragment_from_payload,
|
|
encode_embedded_lc,
|
|
encode_full_lc,
|
|
encode_full_lc_parity,
|
|
encode_slot_type,
|
|
payload_with_embedded_lc_fragment,
|
|
rs129_parity,
|
|
voice,
|
|
voice_head_term,
|
|
)
|
|
|
|
|
|
GROUP_VOICE_LC = bytes.fromhex("000000000c302f9be5")
|
|
HEADER_LC_BITS = (
|
|
"000010111100001000000100110101100001111000001100001010111001100000001000"
|
|
"010100000111010001100001000000000000001011110100100001110110011100000000"
|
|
"0000000000000010111000000000111111001000010110100111"
|
|
)
|
|
TERMINATOR_LC_BITS = (
|
|
"000010111010110100000100000000100001111010111100001010111110000000001000"
|
|
"001000000111010011000001000100010100001011000111000001110001000100000000"
|
|
"1100010000000010011000000000111001011000010110010100"
|
|
)
|
|
EMBEDDED_LC_BITS = (
|
|
"00001111000011110000011000000110",
|
|
"00010111000100010000000000000110",
|
|
"00001100000000110011010100011011",
|
|
"00010111001101100010001000100010",
|
|
)
|
|
VOICE_HEAD_TERM_PAYLOAD = bytes.fromhex(
|
|
"2b6004101f842dd00df07d41046dff57d75df5de30152e2070b20f803f88c695e2"
|
|
)
|
|
VOICE_BURST_PAYLOAD = bytes.fromhex(
|
|
"b9e881526173002a6bb9e881526134e0f060691173002a6bb9e881526173002a6a"
|
|
)
|
|
|
|
|
|
class FreeDmrDmrCodecTest(unittest.TestCase):
|
|
def test_full_lc_header_encode_matches_current_codec_fixture(self):
|
|
lc = GROUP_VOICE_LC
|
|
|
|
encoded = encode_full_lc(lc)
|
|
|
|
self.assertEqual(encoded.to01(), HEADER_LC_BITS)
|
|
|
|
def test_full_lc_terminator_encode_matches_current_codec_fixture(self):
|
|
lc = GROUP_VOICE_LC
|
|
|
|
encoded = encode_full_lc(lc, terminator=True)
|
|
|
|
self.assertEqual(encoded.to01(), TERMINATOR_LC_BITS)
|
|
|
|
def test_embedded_lc_group_voice_encode_matches_current_codec(self):
|
|
lc = GROUP_VOICE_LC
|
|
|
|
encoded = encode_embedded_lc(lc)
|
|
|
|
self.assertEqual(tuple(fragment.to01() for fragment in encoded), EMBEDDED_LC_BITS)
|
|
|
|
def test_current_runtime_compatibility_function_names(self):
|
|
import freedmr_dmr_codec as dmr_codec
|
|
|
|
lc = GROUP_VOICE_LC
|
|
|
|
self.assertEqual(dmr_codec.encode_header_lc(lc).to01(), HEADER_LC_BITS)
|
|
self.assertEqual(dmr_codec.encode_terminator_lc(lc).to01(), TERMINATOR_LC_BITS)
|
|
self.assertEqual(
|
|
tuple(dmr_codec.encode_emblc(lc)[index].to01() for index in (1, 2, 3, 4)),
|
|
EMBEDDED_LC_BITS,
|
|
)
|
|
|
|
def test_voice_head_term_decode_matches_current_codec(self):
|
|
decoded = voice_head_term(VOICE_HEAD_TERM_PAYLOAD)
|
|
|
|
self.assertEqual(decoded["LC"], bytes.fromhex("001020000c302f9be5"))
|
|
self.assertEqual(decoded["CC"], b"\x01")
|
|
self.assertEqual(decoded["DTYPE"], b"\x01")
|
|
self.assertEqual(decoded["SYNC"].to01(), "110111111111010101111101011101011101111101011101")
|
|
|
|
def test_voice_burst_decode_matches_current_codec(self):
|
|
decoded = voice(VOICE_BURST_PAYLOAD)
|
|
|
|
self.assertEqual(
|
|
[ambe.to01() for ambe in decoded["AMBE"]],
|
|
[
|
|
"101110011110100010000001010100100110000101110011000000000010101001101011",
|
|
"101110011110100010000001010100100110000101110011000000000010101001101011",
|
|
"101110011110100010000001010100100110000101110011000000000010101001101010",
|
|
],
|
|
)
|
|
self.assertEqual(decoded["CC"], b"\x01")
|
|
self.assertEqual(decoded["LCSS"], b"\x01")
|
|
self.assertEqual(decoded["EMBED"].to01(), "01001110000011110000011000000110")
|
|
|
|
def test_full_lc_decode_classifies_group_voice(self):
|
|
lc = GROUP_VOICE_LC
|
|
|
|
decoded = decode_full_lc(encode_full_lc(lc))
|
|
|
|
self.assertEqual(decoded.data, lc)
|
|
self.assertEqual(decoded.flco, 0x00)
|
|
self.assertTrue(decoded.is_group_call)
|
|
self.assertFalse(decoded.is_unit_call)
|
|
self.assertEqual(decoded.target_id, 3120)
|
|
self.assertEqual(decoded.source_id, 3120101)
|
|
|
|
def test_build_group_voice_lc_defaults_to_normal_service_options(self):
|
|
lc = build_group_voice_lc(bytes.fromhex("00005b"), bytes.fromhex("2f9be5"))
|
|
|
|
self.assertEqual(lc, bytes.fromhex("00000000005b2f9be5"))
|
|
self.assertEqual(decode_full_lc(encode_full_lc(lc)).service_options, LC_SERVICE_OPTIONS_NORMAL)
|
|
|
|
def test_build_group_voice_lc_can_represent_legacy_hblink_options_explicitly(self):
|
|
lc = build_group_voice_lc(
|
|
bytes.fromhex("00005b"),
|
|
bytes.fromhex("2f9be5"),
|
|
service_options=LC_SERVICE_OPTIONS_HBLINK_LEGACY,
|
|
)
|
|
|
|
self.assertEqual(lc, bytes.fromhex("00002000005b2f9be5"))
|
|
self.assertEqual(decode_full_lc(encode_full_lc(lc)).service_options, LC_SERVICE_OPTIONS_HBLINK_LEGACY)
|
|
|
|
def test_full_lc_decode_classifies_unit_voice(self):
|
|
lc = bytes.fromhex("030000000c302f9be5")
|
|
|
|
decoded = decode_full_lc(encode_full_lc(lc))
|
|
|
|
self.assertEqual(decoded.flco, 0x03)
|
|
self.assertFalse(decoded.is_group_call)
|
|
self.assertTrue(decoded.is_unit_call)
|
|
|
|
def test_full_lc_rs129_parity_matches_header_and_terminator_masks(self):
|
|
lc = bytes.fromhex("001020000c302f9be5")
|
|
|
|
self.assertEqual(rs129_parity(lc), bytes.fromhex("4c42cc"))
|
|
self.assertEqual(encode_full_lc_parity(lc), bytes.fromhex("dad45a"))
|
|
self.assertEqual(encode_full_lc_parity(lc, terminator=True), bytes.fromhex("d5db55"))
|
|
|
|
def test_full_lc_requires_nine_byte_lc(self):
|
|
with self.assertRaises(FullLCError):
|
|
encode_full_lc(b"\x00" * 8)
|
|
|
|
def test_slot_type_encode_matches_current_codec_fixtures(self):
|
|
self.assertEqual(encode_slot_type(1, 0x1).to01(), "00010001101110001100")
|
|
self.assertEqual(encode_slot_type(1, 0x2).to01(), "00010010101001011001")
|
|
self.assertEqual(encode_slot_type(1, 0x3).to01(), "00010011001010110010")
|
|
self.assertEqual(encode_slot_type(1, 0x6).to01(), "00010110000011001110")
|
|
|
|
def test_slot_type_decode_corrects_three_bits(self):
|
|
bits = encode_slot_type(1, 0x2)
|
|
bits[0] = not bits[0]
|
|
bits[7] = not bits[7]
|
|
bits[19] = not bits[19]
|
|
|
|
decoded = decode_slot_type(bits)
|
|
|
|
self.assertEqual(decoded.color_code, 1)
|
|
self.assertEqual(decoded.data_type, 0x2)
|
|
self.assertEqual(decoded.name, "VOICE_LC_TERM")
|
|
self.assertEqual(decoded.corrected, 3)
|
|
|
|
def test_slot_type_decode_rejects_uncorrectable_error(self):
|
|
bits = encode_slot_type(1, 0x2)
|
|
for index in (0, 1, 2, 3):
|
|
bits[index] = not bits[index]
|
|
|
|
with self.assertRaises(SlotTypeError):
|
|
decode_slot_type(bits)
|
|
|
|
def test_slot_type_rejects_values_outside_four_bits(self):
|
|
with self.assertRaises(SlotTypeError):
|
|
encode_slot_type(16, 0x1)
|
|
with self.assertRaises(SlotTypeError):
|
|
encode_slot_type(1, 16)
|
|
|
|
def test_embedded_lc_round_trips_talker_alias_header(self):
|
|
lc = bytes.fromhex("04004c43414c4c3132")
|
|
|
|
encoded = encode_embedded_lc(lc)
|
|
decoded = decode_embedded_lc(encoded)
|
|
|
|
self.assertEqual(decoded.data, lc)
|
|
self.assertEqual(decoded.flco, 0x04)
|
|
self.assertEqual(decoded.corrected, 0)
|
|
|
|
def test_embedded_lc_round_trips_gps_info(self):
|
|
lc = bytes.fromhex("080007fcfae048b57b")
|
|
|
|
encoded = encode_embedded_lc(lc)
|
|
decoded = decode_embedded_lc(encoded)
|
|
|
|
self.assertEqual(decoded.data, lc)
|
|
self.assertEqual(decoded.flco, 0x08)
|
|
|
|
def test_encoded_talker_alias_fragments_match_mmdvmhost_layout_fixture(self):
|
|
lc = bytes.fromhex("04004c43414c4c3132")
|
|
payload = b"\x55" * 33
|
|
|
|
encoded_payloads = [
|
|
payload_with_embedded_lc_fragment(payload, fragment).hex()
|
|
for fragment in encode_embedded_lc(lc)
|
|
]
|
|
|
|
self.assertEqual(
|
|
encoded_payloads,
|
|
[
|
|
"555555555555555555555555555550517092855555555555555555555555555555",
|
|
"555555555555555555555555555550382441d55555555555555555555555555555",
|
|
"5555555555555555555555555555522717b8e55555555555555555555555555555",
|
|
"5555555555555555555555555555522b73ae255555555555555555555555555555",
|
|
],
|
|
)
|
|
|
|
def test_payload_fragment_helpers_extract_and_apply_embedded_lc(self):
|
|
lc = bytes.fromhex("080007fcfae048b57b")
|
|
fragment = encode_embedded_lc(lc)[0]
|
|
original = b"\x55" * 33
|
|
|
|
updated = payload_with_embedded_lc_fragment(original, fragment)
|
|
extracted = embedded_lc_fragment_from_payload(updated)
|
|
|
|
self.assertEqual(extracted, fragment)
|
|
self.assertEqual(updated[:14], original[:14])
|
|
self.assertEqual(updated[19:], original[19:])
|
|
|
|
def test_embedded_lc_decode_corrects_single_bit_error(self):
|
|
lc = bytes.fromhex("04004c43414c4c3132")
|
|
fragments = list(encode_embedded_lc(lc))
|
|
fragments[0] = fragments[0].copy()
|
|
fragments[0][0] = not fragments[0][0]
|
|
|
|
decoded = decode_embedded_lc(fragments)
|
|
|
|
self.assertEqual(decoded.data, lc)
|
|
self.assertEqual(decoded.corrected, 1)
|
|
|
|
def test_embedded_lc_decode_rejects_uncorrectable_error(self):
|
|
lc = bytes.fromhex("04004c43414c4c3132")
|
|
fragments = list(encode_embedded_lc(lc))
|
|
fragments[0] = fragments[0].copy()
|
|
fragments[0][0] = not fragments[0][0]
|
|
fragments[0][8] = not fragments[0][8]
|
|
|
|
with self.assertRaises(EmbeddedLCError):
|
|
decode_embedded_lc(fragments)
|
|
|
|
def test_embedded_lc_requires_nine_byte_lc(self):
|
|
with self.assertRaises(EmbeddedLCError):
|
|
encode_embedded_lc(b"\x00" * 8)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|