import socket import tempfile import time import unittest from pathlib import Path from freedmr_dmr_codec import encode_embedded_lc, payload_with_embedded_lc_fragment from tests.harness.deterministic import ( HBPF_DATA_SYNC, HBPF_SLT_VHEAD, HBPF_SLT_VTERM, HBPF_VOICE, PacketSpec, bytes_3, bytes_4, ) from tests.harness.udp_blackbox import ( ImpairmentProfiles, RecordedPacketFixture, StreamProfile, UdpBlackBoxScenario, require_udp_integration_enabled, ) def embedded_lc_payloads(lc): payload = b"\x55" * 33 return tuple( payload_with_embedded_lc_fragment(payload, fragment) for fragment in encode_embedded_lc(lc) ) class UdpBlackBoxHarnessTest(unittest.TestCase): TA_EMB_LC_PAYLOADS = embedded_lc_payloads(bytes.fromhex("04004c43414c4c3132")) GPS_EMB_LC_PAYLOADS = embedded_lc_payloads(bytes.fromhex("080007fcfae048b57b")) def private_call(self, dst_id, peer_id, slot=1, stream_id=0x01020304): return PacketSpec( peer_id=peer_id, dst_id=dst_id, slot=slot, stream_id=stream_id, call_type="unit", frame_type=HBPF_VOICE, dtype_vseq=0, ) def private_call_terminator(self, dst_id, peer_id, slot=1, stream_id=0x01020304): return PacketSpec( peer_id=peer_id, dst_id=dst_id, slot=slot, stream_id=stream_id, call_type="unit", frame_type=HBPF_DATA_SYNC, dtype_vseq=HBPF_SLT_VTERM, ) def test_two_registered_repeaters_observe_static_tg_route(self): require_udp_integration_enabled() with UdpBlackBoxScenario() as scenario: master_a = scenario.repeater("MASTER-A", 1001) master_b = scenario.repeater("MASTER-B", 1002) try: master_a.login() master_b.login() master_a.send_dmr(PacketSpec(peer_id=1001, rf_src=3120001, dst_id=91, slot=2)) captured = master_b.recv(timeout=2.0) finally: master_a.close() master_b.close() self.assertEqual(captured.packet[:4], b"DMRD") self.assertEqual(captured.fields["dst_id"], bytes_3(91)) self.assertEqual(captured.fields["slot"], 2) self.assertEqual(captured.fields["stream_id"], bytes_4(0x01020304)) def test_global_use_acl_false_does_not_apply_deny_acl(self): require_udp_integration_enabled() with UdpBlackBoxScenario( global_use_acl=False, global_sub_acl="DENY:3120001", ) as scenario: master_a = scenario.repeater("MASTER-A", 1001) master_b = scenario.repeater("MASTER-B", 1002) try: master_a.login() master_b.login() master_a.send_dmr( PacketSpec(peer_id=1001, rf_src=3120001, dst_id=91, slot=2) ) captured = master_b.recv(timeout=2.0) finally: master_a.close() master_b.close() self.assertEqual(captured.fields["rf_src"], bytes_3(3120001)) self.assertEqual(captured.fields["dst_id"], bytes_3(91)) def test_hbp_data_sync_control_payload_is_preserved(self): require_udp_integration_enabled() payload = bytes(range(33)) with UdpBlackBoxScenario() as scenario: master_a = scenario.repeater("MASTER-A", 1001) master_b = scenario.repeater("MASTER-B", 1002) try: master_a.login() master_b.login() master_a.send_dmr( PacketSpec( peer_id=1001, rf_src=3120001, dst_id=91, slot=2, frame_type=HBPF_DATA_SYNC, dtype_vseq=7, payload=payload, ) ) captured = master_b.recv(timeout=2.0) finally: master_a.close() master_b.close() self.assertEqual(captured.fields["frame_type"], HBPF_DATA_SYNC) self.assertEqual(captured.fields["dtype_vseq"], 7) self.assertEqual(captured.fields["dmr_payload"], payload) def test_hbp_same_tg_voice_embedded_lc_payload_is_preserved(self): require_udp_integration_enabled() payload = bytes((index * 7) % 256 for index in range(33)) with UdpBlackBoxScenario() as scenario: master_a = scenario.repeater("MASTER-A", 1001) master_b = scenario.repeater("MASTER-B", 1002) try: master_a.login() master_b.login() master_a.send_dmr( PacketSpec( peer_id=1001, rf_src=3120001, dst_id=91, slot=2, frame_type=HBPF_VOICE, dtype_vseq=1, payload=payload, ) ) captured = master_b.recv(timeout=2.0) finally: master_a.close() master_b.close() self.assertEqual(captured.fields["frame_type"], HBPF_VOICE) self.assertEqual(captured.fields["dtype_vseq"], 1) self.assertEqual(captured.fields["dmr_payload"], payload) def test_hbp_in_call_talker_alias_is_logged(self): require_udp_integration_enabled() with UdpBlackBoxScenario() as scenario: master_a = scenario.repeater("MASTER-A", 1001) master_b = scenario.repeater("MASTER-B", 1002) try: master_a.login() master_b.login() for index, payload in enumerate(self.TA_EMB_LC_PAYLOADS, start=1): master_a.send_dmr( PacketSpec( peer_id=1001, rf_src=3120001, dst_id=91, slot=2, stream_id=0x01020330, seq=index, frame_type=HBPF_VOICE, dtype_vseq=index, payload=payload, ) ) output = scenario.process.wait_for_log("*IN-CALL TA*", timeout=2.0) finally: master_a.close() master_b.close() self.assertIn("TEXT: 'CALL12'", output) self.assertIn("LC: 04004c43414c4c3132", output) def test_hbp_in_call_gps_is_logged(self): require_udp_integration_enabled() with UdpBlackBoxScenario() as scenario: master_a = scenario.repeater("MASTER-A", 1001) master_b = scenario.repeater("MASTER-B", 1002) try: master_a.login() master_b.login() for index, payload in enumerate(self.GPS_EMB_LC_PAYLOADS, start=1): master_a.send_dmr( PacketSpec( peer_id=1001, rf_src=3120001, dst_id=91, slot=2, stream_id=0x01020331, seq=index, frame_type=HBPF_VOICE, dtype_vseq=index, payload=payload, ) ) output = scenario.process.wait_for_log("*IN-CALL GPS*", timeout=2.0) finally: master_a.close() master_b.close() self.assertIn("LAT: 51.123451", output) self.assertIn("LON: -2.123451", output) self.assertIn("LC: 080007fcfae048b57b", output) def test_hbp_malformed_short_dmrd_is_ignored_and_later_packet_routes(self): require_udp_integration_enabled() with UdpBlackBoxScenario() as scenario: master_a = scenario.repeater("MASTER-A", 1001) master_b = scenario.repeater("MASTER-B", 1002) try: master_a.login() master_b.login() master_a.send(b"DMRD" + b"\x00" * 10) master_a.send_dmr( PacketSpec(peer_id=1001, rf_src=3120001, dst_id=91, slot=2) ) captured = master_b.recv(timeout=2.0) finally: master_a.close() master_b.close() self.assertEqual(captured.fields["rf_src"], bytes_3(3120001)) self.assertEqual(captured.fields["dst_id"], bytes_3(91)) def test_hbp_voice_sequence_wrap_routes_forward_progress(self): require_udp_integration_enabled() stream_id = 0x01020304 packets = [ PacketSpec(peer_id=1001, dst_id=91, slot=2, stream_id=stream_id, seq=254), PacketSpec( peer_id=1001, dst_id=91, slot=2, stream_id=stream_id, seq=255, dtype_vseq=1, ), PacketSpec( peer_id=1001, dst_id=91, slot=2, stream_id=stream_id, seq=2, dtype_vseq=2, ), ] with UdpBlackBoxScenario() as scenario: master_a = scenario.repeater("MASTER-A", 1001) master_b = scenario.repeater("MASTER-B", 1002) try: master_a.login() master_b.login() for packet in packets: master_a.send_dmr(packet) captured = [master_b.recv(timeout=2.0) for _ in packets] finally: master_a.close() master_b.close() self.assertEqual([packet.fields["seq"] for packet in captured], [254, 255, 2]) self.assertEqual(captured[-1].fields["stream_id"], bytes_4(stream_id)) def test_hbp_link_impairment_discards_late_out_of_order_packet(self): require_udp_integration_enabled() stream_id = 0x01020308 packets = [ PacketSpec(peer_id=1001, dst_id=91, slot=2, stream_id=stream_id, seq=0), PacketSpec( peer_id=1001, dst_id=91, slot=2, stream_id=stream_id, seq=1, dtype_vseq=1, ), PacketSpec( peer_id=1001, dst_id=91, slot=2, stream_id=stream_id, seq=2, dtype_vseq=2, ), ] impairment = ImpairmentProfiles.provider_vxlan_reorder() with UdpBlackBoxScenario() as scenario: master_a = scenario.repeater("MASTER-A", 1001) master_b = scenario.repeater("MASTER-B", 1002) try: master_a.login() master_b.login() master_a.send_stream( packets, cadence_seconds=0.03, impairment=impairment, ) captured = [master_b.recv(timeout=2.0) for _ in range(2)] leaked = master_b.drain(seconds=0.4) finally: master_a.close() master_b.close() self.assertEqual([packet.fields["seq"] for packet in captured], [0, 2]) self.assertEqual(leaked, []) def test_hbp_duplicate_sequence_zero_is_dropped(self): require_udp_integration_enabled() first = PacketSpec(peer_id=1001, dst_id=91, slot=2, seq=0, payload=b"\x11" * 33) duplicate = PacketSpec( peer_id=1001, dst_id=91, slot=2, seq=0, payload=b"\x22" * 33, ) with UdpBlackBoxScenario() as scenario: master_a = scenario.repeater("MASTER-A", 1001) master_b = scenario.repeater("MASTER-B", 1002) try: master_a.login() master_b.login() master_a.send_dmr(first) captured = master_b.recv(timeout=2.0) master_a.send_dmr(duplicate) leaked = master_b.drain(seconds=0.4) finally: master_a.close() master_b.close() self.assertEqual(captured.fields["seq"], 0) self.assertEqual(captured.fields["dmr_payload"], b"\x11" * 33) self.assertEqual(leaked, []) def test_hbp_recorded_fixture_replay_routes_preserved_packet(self): require_udp_integration_enabled() packet = PacketSpec( peer_id=1001, rf_src=3120001, dst_id=91, slot=2, stream_id=0x0102031C, payload=b"\x33" * 33, ) with tempfile.TemporaryDirectory(prefix="freedmr-fixture-") as tempdir: fixture_path = Path(tempdir) / "hbp.hex" fixture_path.write_text( "# recorded HBP packet\n" + packet.data().hex() + "\n", encoding="ascii", ) fixture = RecordedPacketFixture.from_file(fixture_path) with UdpBlackBoxScenario() as scenario: master_a = scenario.repeater("MASTER-A", 1001) master_b = scenario.repeater("MASTER-B", 1002) try: master_a.login() master_b.login() master_a.replay_fixture(fixture) captured = master_b.recv(timeout=2.0) finally: master_a.close() master_b.close() self.assertEqual(captured.fields["stream_id"], bytes_4(0x0102031C)) self.assertEqual(captured.fields["dmr_payload"], b"\x33" * 33) def test_hbp_burst_loss_profile_routes_later_forward_progress(self): require_udp_integration_enabled() stream_id = 0x0102031D profile = StreamProfile.voice_over( peer_id=1001, dst_id=91, slot=2, stream_id=stream_id, voice_bursts=4, ) with UdpBlackBoxScenario() as scenario: master_a = scenario.repeater("MASTER-A", 1001) master_b = scenario.repeater("MASTER-B", 1002) try: master_a.login() master_b.login() master_a.send_stream( profile.packets, cadence_seconds=profile.cadence_seconds, impairment=ImpairmentProfiles.burst_loss(1, 2), ) captured = [master_b.recv(timeout=2.0) for _ in range(2)] leaked = master_b.drain(seconds=0.4) finally: master_a.close() master_b.close() self.assertEqual([packet.fields["seq"] for packet in captured], [0, 3]) self.assertEqual(leaked, []) def test_hbp_duplicate_udp_profile_drops_duplicate_and_routes_next_packet(self): require_udp_integration_enabled() stream_id = 0x0102031E profile = StreamProfile.voice_over( peer_id=1001, dst_id=91, slot=2, stream_id=stream_id, voice_bursts=2, ) with UdpBlackBoxScenario() as scenario: master_a = scenario.repeater("MASTER-A", 1001) master_b = scenario.repeater("MASTER-B", 1002) try: master_a.login() master_b.login() master_a.send_stream( profile.packets, cadence_seconds=profile.cadence_seconds, impairment=ImpairmentProfiles.duplicate_udp(0), ) captured = [master_b.recv(timeout=2.0) for _ in range(2)] leaked = master_b.drain(seconds=0.4) finally: master_a.close() master_b.close() self.assertEqual([packet.fields["seq"] for packet in captured], [0, 1]) self.assertEqual(leaked, []) def test_hbp_voice_terminator_suppresses_late_same_stream_packet(self): require_udp_integration_enabled() stream_id = 0x01020304 header = PacketSpec( peer_id=1001, dst_id=91, slot=2, stream_id=stream_id, seq=0, frame_type=HBPF_DATA_SYNC, dtype_vseq=HBPF_SLT_VHEAD, ) terminator = PacketSpec( peer_id=1001, dst_id=91, slot=2, stream_id=stream_id, seq=1, frame_type=HBPF_DATA_SYNC, dtype_vseq=HBPF_SLT_VTERM, ) late_voice = PacketSpec( peer_id=1001, dst_id=91, slot=2, stream_id=stream_id, seq=2, frame_type=HBPF_VOICE, dtype_vseq=1, ) with UdpBlackBoxScenario() as scenario: master_a = scenario.repeater("MASTER-A", 1001) master_b = scenario.repeater("MASTER-B", 1002) try: master_a.login() master_b.login() master_a.send_dmr(header) master_a.send_dmr(terminator) captured = [master_b.recv(timeout=2.0) for _ in range(2)] master_a.send_dmr(late_voice) leaked = master_b.drain(seconds=0.4) finally: master_a.close() master_b.close() self.assertEqual( [packet.fields["dtype_vseq"] for packet in captured], [HBPF_SLT_VHEAD, HBPF_SLT_VTERM], ) self.assertEqual(leaked, []) def test_dial_a_tg_reserved_control_emits_local_tg9_ts2_prompt(self): require_udp_integration_enabled() with UdpBlackBoxScenario() as scenario: master_a = scenario.repeater("MASTER-A", 1001) master_b = scenario.repeater("MASTER-B", 1002) try: master_a.login() master_b.login() master_a.send_dmr(self.private_call(4001, peer_id=1001, slot=1)) master_a.send_dmr(self.private_call_terminator(4001, peer_id=1001, slot=1)) captured = master_a.recv(timeout=4.0) leaked = master_b.drain(seconds=0.4) finally: master_a.close() master_b.close() self.assertEqual(captured.packet[:4], b"DMRD") self.assertEqual(captured.fields["rf_src"], bytes_3(5000)) self.assertEqual(captured.fields["dst_id"], bytes_3(9)) self.assertEqual(captured.fields["slot"], 2) self.assertEqual(leaked, []) def test_dial_a_tg_slot_1_routes_local_tg9_to_fbp_reflector_tg(self): require_udp_integration_enabled() with UdpBlackBoxScenario( ts2_static="", fbp_systems={"OBP-1": 3001}, ) as scenario: master_a = scenario.repeater("MASTER-A", 1001) fbp_peer = scenario.fbp_peer("OBP-1") try: master_a.login() fbp_peer.send_bcka() fbp_peer.send_bcve() fbp_peer.drain(seconds=0.2) master_a.send_dmr(self.private_call(235, peer_id=1001, slot=1)) master_a.send_dmr( PacketSpec( peer_id=1001, rf_src=3120001, dst_id=9, slot=1, stream_id=0x01020321, ) ) captured = fbp_peer.recv_dmre(timeout=2.0) finally: master_a.close() self.assertEqual(captured.fields["dst_id"], bytes_3(235)) self.assertEqual(captured.fields["slot"], 1) def test_dynamic_tg_routing_disabled_does_not_create_unknown_hbp_route(self): require_udp_integration_enabled() with UdpBlackBoxScenario(ts2_static="", dynamic_tg_routing=False) as scenario: master_a = scenario.repeater("MASTER-A", 1001) master_b = scenario.repeater("MASTER-B", 1002) try: master_a.login() master_b.login() master_a.send_dmr( PacketSpec( peer_id=1001, rf_src=3120001, dst_id=12345, slot=2, stream_id=0x01020322, ) ) leaked = master_b.drain(seconds=0.4) finally: master_a.close() master_b.close() self.assertEqual(leaked, []) def test_real_hbp_voice_interrupts_generated_prompt_and_routes(self): require_udp_integration_enabled() real_stream = 0x0102030B with UdpBlackBoxScenario() as scenario: master_a = scenario.repeater("MASTER-A", 1001) master_b = scenario.repeater("MASTER-B", 1002) try: master_a.login() master_b.login() master_a.send_dmr(self.private_call(4001, peer_id=1001, slot=1)) master_a.send_dmr(self.private_call_terminator(4001, peer_id=1001, slot=1)) prompt = master_a.recv(timeout=4.0) real_voice = StreamProfile.voice_over( peer_id=1001, rf_src=3120001, dst_id=91, slot=2, stream_id=real_stream, voice_bursts=1, ) master_a.send_stream(real_voice.packets) routed = master_b.recv(timeout=2.0) finally: master_a.close() master_b.close() self.assertEqual(prompt.fields["rf_src"], bytes_3(5000)) self.assertEqual(prompt.fields["dst_id"], bytes_3(9)) self.assertEqual(prompt.fields["slot"], 2) self.assertEqual(routed.fields["rf_src"], bytes_3(3120001)) self.assertEqual(routed.fields["dst_id"], bytes_3(91)) self.assertEqual(routed.fields["stream_id"], bytes_4(real_stream)) def test_hbp_static_tg_routes_to_fbp_v5_peer(self): require_udp_integration_enabled() with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario: master_a = scenario.repeater("MASTER-A", 1001) fbp_peer = scenario.fbp_peer("OBP-1") try: master_a.login() fbp_peer.send_bcka() fbp_peer.send_bcve() fbp_peer.drain(seconds=0.2) master_a.send_dmr( PacketSpec(peer_id=1001, rf_src=3120001, dst_id=91, slot=2) ) captured = fbp_peer.recv_dmre(timeout=2.0) finally: master_a.close() self.assertEqual(captured.packet[:4], b"DMRE") self.assertEqual(captured.fields["fbp_version"], 5) self.assertEqual(captured.fields["peer_id"], bytes_4(9990)) self.assertEqual(captured.fields["rf_src"], bytes_3(3120001)) self.assertEqual(captured.fields["dst_id"], bytes_3(91)) self.assertEqual(captured.fields["slot"], 1) def test_fbp_enhanced_keepalive_gates_hbp_to_fbp_forwarding(self): require_udp_integration_enabled() with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario: master_a = scenario.repeater("MASTER-A", 1001) fbp_peer = scenario.fbp_peer("OBP-1") try: master_a.login() fbp_peer.drain(seconds=0.2) master_a.send_dmr( PacketSpec( peer_id=1001, rf_src=3120001, dst_id=91, slot=2, stream_id=0x01020312, ) ) missing_before_keepalive = [ capture for capture in fbp_peer.drain(seconds=0.4) if capture.packet[:4] == b"DMRE" ] fbp_peer.send_bcka() fbp_peer.send_bcve() fbp_peer.drain(seconds=0.2) master_a.send_dmr( PacketSpec( peer_id=1001, rf_src=3120001, dst_id=91, slot=2, stream_id=0x01020313, ) ) captured_after_keepalive = fbp_peer.recv_dmre(timeout=2.0) finally: master_a.close() self.assertEqual(missing_before_keepalive, []) self.assertEqual(captured_after_keepalive.fields["stream_id"], bytes_4(0x01020313)) def test_fbp_bcve_downgrade_does_not_change_outbound_packet_version(self): require_udp_integration_enabled() with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario: master_a = scenario.repeater("MASTER-A", 1001) fbp_peer = scenario.fbp_peer("OBP-1") try: master_a.login() fbp_peer.send_bcka() fbp_peer.send_bcve(version=4) fbp_peer.drain(seconds=0.2) master_a.send_dmr( PacketSpec( peer_id=1001, rf_src=3120001, dst_id=91, slot=2, stream_id=0x01020317, ) ) captured = fbp_peer.recv_dmre(timeout=2.0) finally: master_a.close() self.assertEqual(captured.fields["fbp_version"], 5) self.assertEqual(captured.fields["source_rptr"], bytes_4(1001)) def test_fbp_bcve_unsupported_version_does_not_change_outbound_packet_version(self): require_udp_integration_enabled() with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario: master_a = scenario.repeater("MASTER-A", 1001) fbp_peer = scenario.fbp_peer("OBP-1") try: master_a.login() fbp_peer.send_bcka() fbp_peer.send_bcve(version=6) fbp_peer.drain(seconds=0.2) master_a.send_dmr( PacketSpec( peer_id=1001, rf_src=3120001, dst_id=91, slot=2, stream_id=0x01020318, ) ) captured = fbp_peer.recv_dmre(timeout=2.0) finally: master_a.close() self.assertEqual(captured.fields["fbp_version"], 5) self.assertEqual(captured.fields["source_rptr"], bytes_4(1001)) def test_fbp_invalid_bcve_does_not_change_outbound_packet_version(self): require_udp_integration_enabled() with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario: master_a = scenario.repeater("MASTER-A", 1001) fbp_peer = scenario.fbp_peer("OBP-1") try: master_a.login() fbp_peer.send_bcka() fbp_peer.send_invalid_bcve(version=4) fbp_peer.drain(seconds=0.2) master_a.send_dmr( PacketSpec( peer_id=1001, rf_src=3120001, dst_id=91, slot=2, stream_id=0x01020319, ) ) captured = fbp_peer.recv_dmre(timeout=2.0) finally: master_a.close() self.assertEqual(captured.fields["fbp_version"], 5) self.assertEqual(captured.fields["source_rptr"], bytes_4(1001)) def test_fbp_v4_packet_routes_to_hbp_using_v4_metadata_layout(self): require_udp_integration_enabled() with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario: master_b = scenario.repeater("MASTER-B", 1002) fbp_peer = scenario.fbp_peer("OBP-1") try: master_b.login() fbp_peer.send_bcka() fbp_peer.send_bcve() fbp_peer.drain(seconds=0.2) fbp_peer.send_fbp( PacketSpec( peer_id=3001, rf_src=3120001, dst_id=91, slot=1, stream_id=0x0102031A, ), fbp_version=4, source_server=9991, source_rptr=1001, ) captured = master_b.recv(timeout=2.0) finally: master_b.close() self.assertEqual(captured.packet[:4], b"DMRD") self.assertEqual(captured.fields["rf_src"], bytes_3(3120001)) self.assertEqual(captured.fields["dst_id"], bytes_3(91)) self.assertEqual(captured.fields["slot"], 2) self.assertEqual(captured.fields["stream_id"], bytes_4(0x0102031A)) @unittest.expectedFailure def test_fbp_unsupported_embedded_packet_version_is_rejected_without_hbp_leak(self): require_udp_integration_enabled() with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario: master_b = scenario.repeater("MASTER-B", 1002) fbp_peer = scenario.fbp_peer("OBP-1") try: master_b.login() fbp_peer.send_bcka() fbp_peer.send_bcve() fbp_peer.drain(seconds=0.2) fbp_peer.send_fbp( PacketSpec( peer_id=3001, rf_src=3120001, dst_id=91, slot=1, stream_id=0x0102031F, ), fbp_version=6, source_server=9991, source_rptr=1001, ) with self.assertRaises(socket.timeout): master_b.recv(timeout=0.5) finally: master_b.close() def test_fbp_v4_packet_downgrades_session_to_v4_layout_for_compatibility(self): require_udp_integration_enabled() with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario: master_a = scenario.repeater("MASTER-A", 1001) master_b = scenario.repeater("MASTER-B", 1002) fbp_peer = scenario.fbp_peer("OBP-1") try: master_a.login() master_b.login() fbp_peer.send_bcka() fbp_peer.send_bcve() fbp_peer.drain(seconds=0.2) fbp_peer.send_fbp( PacketSpec( peer_id=3001, rf_src=3120001, dst_id=91, slot=1, stream_id=0x01020320, ), fbp_version=4, source_server=9991, ) master_b.recv(timeout=2.0) master_a.send_dmr( PacketSpec( peer_id=1001, rf_src=3120002, dst_id=91, slot=2, stream_id=0x01020321, ) ) captured = fbp_peer.recv_dmre(timeout=2.0) finally: master_a.close() master_b.close() self.assertEqual(captured.packet[:4], b"DMRE") self.assertEqual(len(captured.packet), 85) self.assertEqual(captured.fields["source_rptr"], b"\x00\x00\x00\x00") @unittest.expectedFailure def test_fbp_configured_proto_v4_outbound_packet_carries_v4_version_byte(self): require_udp_integration_enabled() with UdpBlackBoxScenario( fbp_systems={"OBP-1": 3001}, fbp_proto_versions={"OBP-1": 4}, ) as scenario: master_a = scenario.repeater("MASTER-A", 1001) fbp_peer = scenario.fbp_peer("OBP-1") try: master_a.login() fbp_peer.send_bcka() fbp_peer.send_bcve(version=4) fbp_peer.drain(seconds=0.2) master_a.send_dmr( PacketSpec( peer_id=1001, rf_src=3120002, dst_id=91, slot=2, stream_id=0x01020322, ) ) captured = fbp_peer.recv_dmre(timeout=2.0) finally: master_a.close() self.assertEqual(len(captured.packet), 85) self.assertEqual(captured.fields["fbp_version"], 4) def test_obp_v1_packet_on_fbp_link_is_rejected_with_bcve_without_hbp_leak(self): require_udp_integration_enabled() with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario: master_b = scenario.repeater("MASTER-B", 1002) fbp_peer = scenario.fbp_peer("OBP-1") try: master_b.login() fbp_peer.send_obp_v1( PacketSpec( peer_id=3001, rf_src=3120001, dst_id=91, slot=1, stream_id=0x0102031B, ) ) version_response = fbp_peer.recv_opcode(b"BCVE", timeout=2.0) with self.assertRaises(socket.timeout): master_b.recv(timeout=0.5) finally: master_b.close() self.assertEqual(version_response.packet[:4], b"BCVE") self.assertEqual(version_response.packet[4], 5) def test_fbp_invalid_source_quench_does_not_suppress_hbp_to_fbp_stream(self): require_udp_integration_enabled() stream_id = 0x01020314 with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario: master_a = scenario.repeater("MASTER-A", 1001) fbp_peer = scenario.fbp_peer("OBP-1") try: master_a.login() fbp_peer.send_bcka() fbp_peer.send_bcve() fbp_peer.drain(seconds=0.2) fbp_peer.send_invalid_bcsq(91, stream_id) master_a.send_dmr( PacketSpec( peer_id=1001, rf_src=3120001, dst_id=91, slot=2, stream_id=stream_id, ) ) captured = fbp_peer.recv_dmre(timeout=2.0) finally: master_a.close() self.assertEqual(captured.fields["stream_id"], bytes_4(stream_id)) self.assertEqual(captured.fields["dst_id"], bytes_3(91)) def test_fbp_stun_blocks_hbp_to_fbp_and_fbp_to_hbp_traffic(self): require_udp_integration_enabled() with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario: master_a = scenario.repeater("MASTER-A", 1001) master_b = scenario.repeater("MASTER-B", 1002) fbp_peer = scenario.fbp_peer("OBP-1") try: master_a.login() master_b.login() fbp_peer.send_bcka() fbp_peer.send_bcve() fbp_peer.drain(seconds=0.2) fbp_peer.send_bcst() master_a.send_dmr( PacketSpec( peer_id=1001, rf_src=3120001, dst_id=91, slot=2, stream_id=0x01020315, ) ) leaked_to_fbp = [ capture for capture in fbp_peer.drain(seconds=0.4) if capture.packet[:4] == b"DMRE" ] master_b.drain(seconds=0.2) fbp_peer.send_fbp( PacketSpec( peer_id=3001, rf_src=3120001, dst_id=91, slot=1, stream_id=0x01020316, ), source_server=9991, source_rptr=1001, ) with self.assertRaises(socket.timeout): master_b.recv(timeout=0.5) finally: master_a.close() master_b.close() self.assertEqual(leaked_to_fbp, []) def test_fbp_trunk_routes_clean_stream_while_another_stream_is_reordered(self): require_udp_integration_enabled() stream_a = 0x0102030C stream_b = 0x0102030D stream_a_profile = StreamProfile.voice_over( peer_id=1001, rf_src=3120001, dst_id=91, slot=2, stream_id=stream_a, voice_bursts=3, ) stream_b_profile = StreamProfile.voice_over( peer_id=1002, rf_src=3120002, dst_id=235, slot=2, stream_id=stream_b, voice_bursts=2, ) with UdpBlackBoxScenario( ts2_static="91,235", fbp_systems={"OBP-1": 3001}, ) as scenario: master_a = scenario.repeater("MASTER-A", 1001) master_b = scenario.repeater("MASTER-B", 1002) fbp_peer = scenario.fbp_peer("OBP-1") try: master_a.login() master_b.login() fbp_peer.send_bcka() fbp_peer.send_bcve() fbp_peer.drain(seconds=0.2) master_a.send_dmr(stream_a_profile.packets[0]) time.sleep(stream_a_profile.cadence_seconds) master_b.send_dmr(stream_b_profile.packets[0]) time.sleep(stream_a_profile.cadence_seconds) master_a.send_dmr(stream_a_profile.packets[2]) time.sleep(stream_a_profile.cadence_seconds) master_b.send_dmr(stream_b_profile.packets[1]) time.sleep(stream_a_profile.cadence_seconds) master_a.send_dmr(stream_a_profile.packets[1]) captured = [fbp_peer.recv_dmre(timeout=2.0) for _ in range(4)] leaked = [ capture for capture in fbp_peer.drain(seconds=0.4) if capture.packet[:4] == b"DMRE" ] finally: master_a.close() master_b.close() seqs_by_stream = {} for capture in captured: seqs_by_stream.setdefault(capture.fields["stream_id"], []).append( capture.fields["seq"] ) self.assertEqual(seqs_by_stream[bytes_4(stream_a)], [0, 2]) self.assertEqual(seqs_by_stream[bytes_4(stream_b)], [0, 1]) self.assertEqual(leaked, []) def test_fbp_static_tg_routes_to_hbp_repeater(self): require_udp_integration_enabled() with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario: master_b = scenario.repeater("MASTER-B", 1002) fbp_peer = scenario.fbp_peer("OBP-1") try: master_b.login() fbp_peer.send_bcka() fbp_peer.send_bcve() fbp_peer.drain(seconds=0.2) fbp_peer.send_fbp( PacketSpec( peer_id=3001, rf_src=3120001, dst_id=91, slot=1, stream_id=0x01020305, ), source_server=9991, source_rptr=1001, ) captured = master_b.recv(timeout=2.0) finally: master_b.close() self.assertEqual(captured.packet[:4], b"DMRD") self.assertEqual(captured.fields["rf_src"], bytes_3(3120001)) self.assertEqual(captured.fields["dst_id"], bytes_3(91)) self.assertEqual(captured.fields["slot"], 2) self.assertEqual(captured.fields["stream_id"], bytes_4(0x01020305)) def test_fbp_source_quench_suppresses_hbp_to_fbp_stream(self): require_udp_integration_enabled() stream_id = 0x01020306 with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario: master_a = scenario.repeater("MASTER-A", 1001) fbp_peer = scenario.fbp_peer("OBP-1") try: master_a.login() fbp_peer.send_bcka() fbp_peer.send_bcve() fbp_peer.drain(seconds=0.2) fbp_peer.send_bcsq(91, stream_id) master_a.send_dmr( PacketSpec( peer_id=1001, rf_src=3120001, dst_id=91, slot=2, stream_id=stream_id, ) ) leaked = [ capture for capture in fbp_peer.drain(seconds=0.5) if capture.packet[:4] == b"DMRE" ] finally: master_a.close() self.assertEqual(leaked, []) def test_fbp_rejects_packet_with_wrong_network_id(self): require_udp_integration_enabled() with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario: master_b = scenario.repeater("MASTER-B", 1002) fbp_peer = scenario.fbp_peer("OBP-1") try: master_b.login() fbp_peer.send_bcka() fbp_peer.send_bcve() fbp_peer.drain(seconds=0.2) fbp_peer.send_fbp( PacketSpec(dst_id=91, slot=1, stream_id=0x01020307), network_id=3002, source_server=9991, source_rptr=1001, ) with self.assertRaises(socket.timeout): master_b.recv(timeout=0.5) finally: master_b.close() def test_fbp_rejects_bad_hash_without_leaking_to_hbp(self): require_udp_integration_enabled() with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario: master_b = scenario.repeater("MASTER-B", 1002) fbp_peer = scenario.fbp_peer("OBP-1") try: master_b.login() fbp_peer.send_bcka() fbp_peer.send_bcve() fbp_peer.drain(seconds=0.2) fbp_peer.send_fbp( PacketSpec(peer_id=3001, dst_id=91, slot=1, stream_id=0x0102030E), source_server=9991, source_rptr=1001, corrupt_hash=True, ) with self.assertRaises(socket.timeout): master_b.recv(timeout=0.5) scenario.process.wait_for_log("FreeBridge HMAC failed", timeout=2.0) finally: master_b.close() def test_fbp_stale_timestamp_is_source_quenched_without_hbp_leak(self): require_udp_integration_enabled() stream_id = 0x0102030F with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario: master_b = scenario.repeater("MASTER-B", 1002) fbp_peer = scenario.fbp_peer("OBP-1") try: master_b.login() fbp_peer.send_bcka() fbp_peer.send_bcve() fbp_peer.drain(seconds=0.2) fbp_peer.send_fbp( PacketSpec( peer_id=3001, dst_id=91, slot=1, stream_id=stream_id, ), source_server=9991, source_rptr=1001, timestamp_ns=1, ) quench = fbp_peer.recv_opcode(b"BCSQ", timeout=2.0) with self.assertRaises(socket.timeout): master_b.recv(timeout=0.5) finally: master_b.close() self.assertEqual(quench.packet[4:7], bytes_3(91)) self.assertEqual(quench.packet[7:11], bytes_4(stream_id)) def test_fbp_max_hops_is_source_quenched_without_hbp_leak(self): require_udp_integration_enabled() stream_id = 0x01020310 with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario: master_b = scenario.repeater("MASTER-B", 1002) fbp_peer = scenario.fbp_peer("OBP-1") try: master_b.login() fbp_peer.send_bcka() fbp_peer.send_bcve() fbp_peer.drain(seconds=0.2) fbp_peer.send_fbp( PacketSpec( peer_id=3001, dst_id=91, slot=1, stream_id=stream_id, ), source_server=9991, source_rptr=1001, hops=10, ) quench = fbp_peer.recv_opcode(b"BCSQ", timeout=2.0) with self.assertRaises(socket.timeout): master_b.recv(timeout=0.5) finally: master_b.close() self.assertEqual(quench.packet[4:7], bytes_3(91)) self.assertEqual(quench.packet[7:11], bytes_4(stream_id)) def test_fbp_malformed_short_dmre_is_ignored_and_later_packet_routes(self): require_udp_integration_enabled() with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario: master_b = scenario.repeater("MASTER-B", 1002) fbp_peer = scenario.fbp_peer("OBP-1") try: master_b.login() fbp_peer.send_bcka() fbp_peer.send_bcve() fbp_peer.drain(seconds=0.2) fbp_peer.send(b"DMRE" + b"\x00" * 10) scenario.process.wait_for_log("FreeBridge packet too short", timeout=2.0) fbp_peer.send_fbp( PacketSpec(peer_id=3001, dst_id=91, slot=1, stream_id=0x01020311), source_server=9991, source_rptr=1001, ) captured = master_b.recv(timeout=2.0) finally: master_b.close() self.assertEqual(captured.fields["dst_id"], bytes_3(91)) self.assertEqual(captured.fields["stream_id"], bytes_4(0x01020311)) def test_fbp_link_impairment_does_not_buffer_or_replay_late_packets(self): require_udp_integration_enabled() stream_a = 0x01020309 stream_b = 0x0102030A impaired_packets = [ PacketSpec(peer_id=3001, dst_id=91, slot=1, stream_id=stream_a, seq=0), PacketSpec( peer_id=3001, dst_id=91, slot=1, stream_id=stream_a, seq=1, dtype_vseq=1, ), PacketSpec( peer_id=3001, dst_id=91, slot=1, stream_id=stream_a, seq=2, dtype_vseq=2, ), ] following_packet = PacketSpec( peer_id=3001, dst_id=91, slot=1, stream_id=stream_b, seq=0, rf_src=3120001, ) with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario: master_b = scenario.repeater("MASTER-B", 1002) fbp_peer = scenario.fbp_peer("OBP-1") try: master_b.login() fbp_peer.send_bcka() fbp_peer.send_bcve() fbp_peer.drain(seconds=0.2) fbp_peer.send_fbp_stream( impaired_packets, cadence_seconds=0.03, impairment=ImpairmentProfiles.provider_vxlan_reorder(), source_server=9991, source_rptr=1001, ) impaired_captures = [master_b.recv(timeout=2.0) for _ in range(2)] leaked = master_b.drain(seconds=0.4) fbp_peer.send_fbp( following_packet, source_server=9991, source_rptr=1001, ) following_capture = master_b.recv(timeout=2.0) finally: master_b.close() self.assertEqual([packet.fields["seq"] for packet in impaired_captures], [0, 2]) self.assertEqual(leaked, []) self.assertEqual(following_capture.fields["stream_id"], bytes_4(stream_b)) self.assertEqual(following_capture.fields["seq"], 0) if __name__ == "__main__": unittest.main()