import unittest from freedmr_dmr_codec import ( EmbeddedLCError, FullLCError, LC_SERVICE_OPTIONS_HBLINK_LEGACY, LC_SERVICE_OPTIONS_NORMAL, build_group_voice_lc, SlotTypeError, decode_embedded_lc, decode_full_lc, decode_slot_type, embedded_lc_fragment_from_payload, encode_embedded_lc, encode_full_lc, encode_full_lc_parity, encode_slot_type, payload_with_embedded_lc_fragment, rs129_parity, voice, voice_head_term, ) GROUP_VOICE_LC = bytes.fromhex("000000000c302f9be5") HEADER_LC_BITS = ( "000010111100001000000100110101100001111000001100001010111001100000001000" "010100000111010001100001000000000000001011110100100001110110011100000000" "0000000000000010111000000000111111001000010110100111" ) TERMINATOR_LC_BITS = ( "000010111010110100000100000000100001111010111100001010111110000000001000" "001000000111010011000001000100010100001011000111000001110001000100000000" "1100010000000010011000000000111001011000010110010100" ) EMBEDDED_LC_BITS = ( "00001111000011110000011000000110", "00010111000100010000000000000110", "00001100000000110011010100011011", "00010111001101100010001000100010", ) VOICE_HEAD_TERM_PAYLOAD = bytes.fromhex( "2b6004101f842dd00df07d41046dff57d75df5de30152e2070b20f803f88c695e2" ) VOICE_BURST_PAYLOAD = bytes.fromhex( "b9e881526173002a6bb9e881526134e0f060691173002a6bb9e881526173002a6a" ) class FreeDmrDmrCodecTest(unittest.TestCase): def test_full_lc_header_encode_matches_current_codec_fixture(self): lc = GROUP_VOICE_LC encoded = encode_full_lc(lc) self.assertEqual(encoded.to01(), HEADER_LC_BITS) def test_full_lc_terminator_encode_matches_current_codec_fixture(self): lc = GROUP_VOICE_LC encoded = encode_full_lc(lc, terminator=True) self.assertEqual(encoded.to01(), TERMINATOR_LC_BITS) def test_embedded_lc_group_voice_encode_matches_current_codec(self): lc = GROUP_VOICE_LC encoded = encode_embedded_lc(lc) self.assertEqual(tuple(fragment.to01() for fragment in encoded), EMBEDDED_LC_BITS) def test_current_runtime_compatibility_function_names(self): import freedmr_dmr_codec as dmr_codec lc = GROUP_VOICE_LC self.assertEqual(dmr_codec.encode_header_lc(lc).to01(), HEADER_LC_BITS) self.assertEqual(dmr_codec.encode_terminator_lc(lc).to01(), TERMINATOR_LC_BITS) self.assertEqual( tuple(dmr_codec.encode_emblc(lc)[index].to01() for index in (1, 2, 3, 4)), EMBEDDED_LC_BITS, ) def test_voice_head_term_decode_matches_current_codec(self): decoded = voice_head_term(VOICE_HEAD_TERM_PAYLOAD) self.assertEqual(decoded["LC"], bytes.fromhex("001020000c302f9be5")) self.assertEqual(decoded["CC"], b"\x01") self.assertEqual(decoded["DTYPE"], b"\x01") self.assertEqual(decoded["SYNC"].to01(), "110111111111010101111101011101011101111101011101") def test_voice_burst_decode_matches_current_codec(self): decoded = voice(VOICE_BURST_PAYLOAD) self.assertEqual( [ambe.to01() for ambe in decoded["AMBE"]], [ "101110011110100010000001010100100110000101110011000000000010101001101011", "101110011110100010000001010100100110000101110011000000000010101001101011", "101110011110100010000001010100100110000101110011000000000010101001101010", ], ) self.assertEqual(decoded["CC"], b"\x01") self.assertEqual(decoded["LCSS"], b"\x01") self.assertEqual(decoded["EMBED"].to01(), "01001110000011110000011000000110") def test_full_lc_decode_classifies_group_voice(self): lc = GROUP_VOICE_LC decoded = decode_full_lc(encode_full_lc(lc)) self.assertEqual(decoded.data, lc) self.assertEqual(decoded.flco, 0x00) self.assertTrue(decoded.is_group_call) self.assertFalse(decoded.is_unit_call) self.assertEqual(decoded.target_id, 3120) self.assertEqual(decoded.source_id, 3120101) def test_build_group_voice_lc_defaults_to_normal_service_options(self): lc = build_group_voice_lc(bytes.fromhex("00005b"), bytes.fromhex("2f9be5")) self.assertEqual(lc, bytes.fromhex("00000000005b2f9be5")) self.assertEqual(decode_full_lc(encode_full_lc(lc)).service_options, LC_SERVICE_OPTIONS_NORMAL) def test_build_group_voice_lc_can_represent_legacy_hblink_options_explicitly(self): lc = build_group_voice_lc( bytes.fromhex("00005b"), bytes.fromhex("2f9be5"), service_options=LC_SERVICE_OPTIONS_HBLINK_LEGACY, ) self.assertEqual(lc, bytes.fromhex("00002000005b2f9be5")) self.assertEqual(decode_full_lc(encode_full_lc(lc)).service_options, LC_SERVICE_OPTIONS_HBLINK_LEGACY) def test_full_lc_decode_classifies_unit_voice(self): lc = bytes.fromhex("030000000c302f9be5") decoded = decode_full_lc(encode_full_lc(lc)) self.assertEqual(decoded.flco, 0x03) self.assertFalse(decoded.is_group_call) self.assertTrue(decoded.is_unit_call) def test_full_lc_rs129_parity_matches_header_and_terminator_masks(self): lc = bytes.fromhex("001020000c302f9be5") self.assertEqual(rs129_parity(lc), bytes.fromhex("4c42cc")) self.assertEqual(encode_full_lc_parity(lc), bytes.fromhex("dad45a")) self.assertEqual(encode_full_lc_parity(lc, terminator=True), bytes.fromhex("d5db55")) def test_full_lc_requires_nine_byte_lc(self): with self.assertRaises(FullLCError): encode_full_lc(b"\x00" * 8) def test_slot_type_encode_matches_current_codec_fixtures(self): self.assertEqual(encode_slot_type(1, 0x1).to01(), "00010001101110001100") self.assertEqual(encode_slot_type(1, 0x2).to01(), "00010010101001011001") self.assertEqual(encode_slot_type(1, 0x3).to01(), "00010011001010110010") self.assertEqual(encode_slot_type(1, 0x6).to01(), "00010110000011001110") def test_slot_type_decode_corrects_three_bits(self): bits = encode_slot_type(1, 0x2) bits[0] = not bits[0] bits[7] = not bits[7] bits[19] = not bits[19] decoded = decode_slot_type(bits) self.assertEqual(decoded.color_code, 1) self.assertEqual(decoded.data_type, 0x2) self.assertEqual(decoded.name, "VOICE_LC_TERM") self.assertEqual(decoded.corrected, 3) def test_slot_type_decode_rejects_uncorrectable_error(self): bits = encode_slot_type(1, 0x2) for index in (0, 1, 2, 3): bits[index] = not bits[index] with self.assertRaises(SlotTypeError): decode_slot_type(bits) def test_slot_type_rejects_values_outside_four_bits(self): with self.assertRaises(SlotTypeError): encode_slot_type(16, 0x1) with self.assertRaises(SlotTypeError): encode_slot_type(1, 16) def test_embedded_lc_round_trips_talker_alias_header(self): lc = bytes.fromhex("04004c43414c4c3132") encoded = encode_embedded_lc(lc) decoded = decode_embedded_lc(encoded) self.assertEqual(decoded.data, lc) self.assertEqual(decoded.flco, 0x04) self.assertEqual(decoded.corrected, 0) def test_embedded_lc_round_trips_gps_info(self): lc = bytes.fromhex("080007fcfae048b57b") encoded = encode_embedded_lc(lc) decoded = decode_embedded_lc(encoded) self.assertEqual(decoded.data, lc) self.assertEqual(decoded.flco, 0x08) def test_encoded_talker_alias_fragments_match_mmdvmhost_layout_fixture(self): lc = bytes.fromhex("04004c43414c4c3132") payload = b"\x55" * 33 encoded_payloads = [ payload_with_embedded_lc_fragment(payload, fragment).hex() for fragment in encode_embedded_lc(lc) ] self.assertEqual( encoded_payloads, [ "555555555555555555555555555550517092855555555555555555555555555555", "555555555555555555555555555550382441d55555555555555555555555555555", "5555555555555555555555555555522717b8e55555555555555555555555555555", "5555555555555555555555555555522b73ae255555555555555555555555555555", ], ) def test_payload_fragment_helpers_extract_and_apply_embedded_lc(self): lc = bytes.fromhex("080007fcfae048b57b") fragment = encode_embedded_lc(lc)[0] original = b"\x55" * 33 updated = payload_with_embedded_lc_fragment(original, fragment) extracted = embedded_lc_fragment_from_payload(updated) self.assertEqual(extracted, fragment) self.assertEqual(updated[:14], original[:14]) self.assertEqual(updated[19:], original[19:]) def test_embedded_lc_decode_corrects_single_bit_error(self): lc = bytes.fromhex("04004c43414c4c3132") fragments = list(encode_embedded_lc(lc)) fragments[0] = fragments[0].copy() fragments[0][0] = not fragments[0][0] decoded = decode_embedded_lc(fragments) self.assertEqual(decoded.data, lc) self.assertEqual(decoded.corrected, 1) def test_embedded_lc_decode_rejects_uncorrectable_error(self): lc = bytes.fromhex("04004c43414c4c3132") fragments = list(encode_embedded_lc(lc)) fragments[0] = fragments[0].copy() fragments[0][0] = not fragments[0][0] fragments[0][8] = not fragments[0][8] with self.assertRaises(EmbeddedLCError): decode_embedded_lc(fragments) def test_embedded_lc_requires_nine_byte_lc(self): with self.assertRaises(EmbeddedLCError): encode_embedded_lc(b"\x00" * 8) if __name__ == "__main__": unittest.main()