You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
FreeDMR/tests/test_udp_blackbox_harness.py

1391 lines
49 KiB

import socket
import tempfile
import time
import unittest
from pathlib import Path
from freedmr_dmr_codec import encode_embedded_lc, payload_with_embedded_lc_fragment
from tests.harness.deterministic import (
HBPF_DATA_SYNC,
HBPF_SLT_VHEAD,
HBPF_SLT_VTERM,
HBPF_VOICE,
PacketSpec,
bytes_3,
bytes_4,
)
from tests.harness.udp_blackbox import (
ImpairmentProfiles,
RecordedPacketFixture,
StreamProfile,
UdpBlackBoxScenario,
require_udp_integration_enabled,
)
def embedded_lc_payloads(lc):
payload = b"\x55" * 33
return tuple(
payload_with_embedded_lc_fragment(payload, fragment)
for fragment in encode_embedded_lc(lc)
)
class UdpBlackBoxHarnessTest(unittest.TestCase):
TA_EMB_LC_PAYLOADS = embedded_lc_payloads(bytes.fromhex("04004c43414c4c3132"))
GPS_EMB_LC_PAYLOADS = embedded_lc_payloads(bytes.fromhex("080007fcfae048b57b"))
def private_call(self, dst_id, peer_id, slot=1, stream_id=0x01020304):
return PacketSpec(
peer_id=peer_id,
dst_id=dst_id,
slot=slot,
stream_id=stream_id,
call_type="unit",
frame_type=HBPF_VOICE,
dtype_vseq=0,
)
def private_call_terminator(self, dst_id, peer_id, slot=1, stream_id=0x01020304):
return PacketSpec(
peer_id=peer_id,
dst_id=dst_id,
slot=slot,
stream_id=stream_id,
call_type="unit",
frame_type=HBPF_DATA_SYNC,
dtype_vseq=HBPF_SLT_VTERM,
)
def test_two_registered_repeaters_observe_static_tg_route(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario() as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
master_b = scenario.repeater("MASTER-B", 1002)
try:
master_a.login()
master_b.login()
master_a.send_dmr(PacketSpec(peer_id=1001, rf_src=3120001, dst_id=91, slot=2))
captured = master_b.recv(timeout=2.0)
finally:
master_a.close()
master_b.close()
self.assertEqual(captured.packet[:4], b"DMRD")
self.assertEqual(captured.fields["dst_id"], bytes_3(91))
self.assertEqual(captured.fields["slot"], 2)
self.assertEqual(captured.fields["stream_id"], bytes_4(0x01020304))
def test_global_use_acl_false_does_not_apply_deny_acl(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario(
global_use_acl=False,
global_sub_acl="DENY:3120001",
) as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
master_b = scenario.repeater("MASTER-B", 1002)
try:
master_a.login()
master_b.login()
master_a.send_dmr(
PacketSpec(peer_id=1001, rf_src=3120001, dst_id=91, slot=2)
)
captured = master_b.recv(timeout=2.0)
finally:
master_a.close()
master_b.close()
self.assertEqual(captured.fields["rf_src"], bytes_3(3120001))
self.assertEqual(captured.fields["dst_id"], bytes_3(91))
def test_hbp_data_sync_control_payload_is_preserved(self):
require_udp_integration_enabled()
payload = bytes(range(33))
with UdpBlackBoxScenario() as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
master_b = scenario.repeater("MASTER-B", 1002)
try:
master_a.login()
master_b.login()
master_a.send_dmr(
PacketSpec(
peer_id=1001,
rf_src=3120001,
dst_id=91,
slot=2,
frame_type=HBPF_DATA_SYNC,
dtype_vseq=7,
payload=payload,
)
)
captured = master_b.recv(timeout=2.0)
finally:
master_a.close()
master_b.close()
self.assertEqual(captured.fields["frame_type"], HBPF_DATA_SYNC)
self.assertEqual(captured.fields["dtype_vseq"], 7)
self.assertEqual(captured.fields["dmr_payload"], payload)
def test_hbp_same_tg_voice_embedded_lc_payload_is_preserved(self):
require_udp_integration_enabled()
payload = bytes((index * 7) % 256 for index in range(33))
with UdpBlackBoxScenario() as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
master_b = scenario.repeater("MASTER-B", 1002)
try:
master_a.login()
master_b.login()
master_a.send_dmr(
PacketSpec(
peer_id=1001,
rf_src=3120001,
dst_id=91,
slot=2,
frame_type=HBPF_VOICE,
dtype_vseq=1,
payload=payload,
)
)
captured = master_b.recv(timeout=2.0)
finally:
master_a.close()
master_b.close()
self.assertEqual(captured.fields["frame_type"], HBPF_VOICE)
self.assertEqual(captured.fields["dtype_vseq"], 1)
self.assertEqual(captured.fields["dmr_payload"], payload)
def test_hbp_in_call_talker_alias_is_logged(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario() as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
master_b = scenario.repeater("MASTER-B", 1002)
try:
master_a.login()
master_b.login()
for index, payload in enumerate(self.TA_EMB_LC_PAYLOADS, start=1):
master_a.send_dmr(
PacketSpec(
peer_id=1001,
rf_src=3120001,
dst_id=91,
slot=2,
stream_id=0x01020330,
seq=index,
frame_type=HBPF_VOICE,
dtype_vseq=index,
payload=payload,
)
)
output = scenario.process.wait_for_log("*IN-CALL TA*", timeout=2.0)
finally:
master_a.close()
master_b.close()
self.assertIn("TEXT: 'CALL12'", output)
self.assertIn("LC: 04004c43414c4c3132", output)
def test_hbp_in_call_gps_is_logged(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario() as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
master_b = scenario.repeater("MASTER-B", 1002)
try:
master_a.login()
master_b.login()
for index, payload in enumerate(self.GPS_EMB_LC_PAYLOADS, start=1):
master_a.send_dmr(
PacketSpec(
peer_id=1001,
rf_src=3120001,
dst_id=91,
slot=2,
stream_id=0x01020331,
seq=index,
frame_type=HBPF_VOICE,
dtype_vseq=index,
payload=payload,
)
)
output = scenario.process.wait_for_log("*IN-CALL GPS*", timeout=2.0)
finally:
master_a.close()
master_b.close()
self.assertIn("LAT: 51.123451", output)
self.assertIn("LON: -2.123451", output)
self.assertIn("LC: 080007fcfae048b57b", output)
def test_hbp_malformed_short_dmrd_is_ignored_and_later_packet_routes(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario() as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
master_b = scenario.repeater("MASTER-B", 1002)
try:
master_a.login()
master_b.login()
master_a.send(b"DMRD" + b"\x00" * 10)
master_a.send_dmr(
PacketSpec(peer_id=1001, rf_src=3120001, dst_id=91, slot=2)
)
captured = master_b.recv(timeout=2.0)
finally:
master_a.close()
master_b.close()
self.assertEqual(captured.fields["rf_src"], bytes_3(3120001))
self.assertEqual(captured.fields["dst_id"], bytes_3(91))
def test_hbp_voice_sequence_wrap_routes_forward_progress(self):
require_udp_integration_enabled()
stream_id = 0x01020304
packets = [
PacketSpec(peer_id=1001, dst_id=91, slot=2, stream_id=stream_id, seq=254),
PacketSpec(
peer_id=1001,
dst_id=91,
slot=2,
stream_id=stream_id,
seq=255,
dtype_vseq=1,
),
PacketSpec(
peer_id=1001,
dst_id=91,
slot=2,
stream_id=stream_id,
seq=2,
dtype_vseq=2,
),
]
with UdpBlackBoxScenario() as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
master_b = scenario.repeater("MASTER-B", 1002)
try:
master_a.login()
master_b.login()
for packet in packets:
master_a.send_dmr(packet)
captured = [master_b.recv(timeout=2.0) for _ in packets]
finally:
master_a.close()
master_b.close()
self.assertEqual([packet.fields["seq"] for packet in captured], [254, 255, 2])
self.assertEqual(captured[-1].fields["stream_id"], bytes_4(stream_id))
def test_hbp_link_impairment_discards_late_out_of_order_packet(self):
require_udp_integration_enabled()
stream_id = 0x01020308
packets = [
PacketSpec(peer_id=1001, dst_id=91, slot=2, stream_id=stream_id, seq=0),
PacketSpec(
peer_id=1001,
dst_id=91,
slot=2,
stream_id=stream_id,
seq=1,
dtype_vseq=1,
),
PacketSpec(
peer_id=1001,
dst_id=91,
slot=2,
stream_id=stream_id,
seq=2,
dtype_vseq=2,
),
]
impairment = ImpairmentProfiles.provider_vxlan_reorder()
with UdpBlackBoxScenario() as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
master_b = scenario.repeater("MASTER-B", 1002)
try:
master_a.login()
master_b.login()
master_a.send_stream(
packets,
cadence_seconds=0.03,
impairment=impairment,
)
captured = [master_b.recv(timeout=2.0) for _ in range(2)]
leaked = master_b.drain(seconds=0.4)
finally:
master_a.close()
master_b.close()
self.assertEqual([packet.fields["seq"] for packet in captured], [0, 2])
self.assertEqual(leaked, [])
def test_hbp_duplicate_sequence_zero_is_dropped(self):
require_udp_integration_enabled()
first = PacketSpec(peer_id=1001, dst_id=91, slot=2, seq=0, payload=b"\x11" * 33)
duplicate = PacketSpec(
peer_id=1001,
dst_id=91,
slot=2,
seq=0,
payload=b"\x22" * 33,
)
with UdpBlackBoxScenario() as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
master_b = scenario.repeater("MASTER-B", 1002)
try:
master_a.login()
master_b.login()
master_a.send_dmr(first)
captured = master_b.recv(timeout=2.0)
master_a.send_dmr(duplicate)
leaked = master_b.drain(seconds=0.4)
finally:
master_a.close()
master_b.close()
self.assertEqual(captured.fields["seq"], 0)
self.assertEqual(captured.fields["dmr_payload"], b"\x11" * 33)
self.assertEqual(leaked, [])
def test_hbp_recorded_fixture_replay_routes_preserved_packet(self):
require_udp_integration_enabled()
packet = PacketSpec(
peer_id=1001,
rf_src=3120001,
dst_id=91,
slot=2,
stream_id=0x0102031C,
payload=b"\x33" * 33,
)
with tempfile.TemporaryDirectory(prefix="freedmr-fixture-") as tempdir:
fixture_path = Path(tempdir) / "hbp.hex"
fixture_path.write_text(
"# recorded HBP packet\n" + packet.data().hex() + "\n",
encoding="ascii",
)
fixture = RecordedPacketFixture.from_file(fixture_path)
with UdpBlackBoxScenario() as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
master_b = scenario.repeater("MASTER-B", 1002)
try:
master_a.login()
master_b.login()
master_a.replay_fixture(fixture)
captured = master_b.recv(timeout=2.0)
finally:
master_a.close()
master_b.close()
self.assertEqual(captured.fields["stream_id"], bytes_4(0x0102031C))
self.assertEqual(captured.fields["dmr_payload"], b"\x33" * 33)
def test_hbp_burst_loss_profile_routes_later_forward_progress(self):
require_udp_integration_enabled()
stream_id = 0x0102031D
profile = StreamProfile.voice_over(
peer_id=1001,
dst_id=91,
slot=2,
stream_id=stream_id,
voice_bursts=4,
)
with UdpBlackBoxScenario() as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
master_b = scenario.repeater("MASTER-B", 1002)
try:
master_a.login()
master_b.login()
master_a.send_stream(
profile.packets,
cadence_seconds=profile.cadence_seconds,
impairment=ImpairmentProfiles.burst_loss(1, 2),
)
captured = [master_b.recv(timeout=2.0) for _ in range(2)]
leaked = master_b.drain(seconds=0.4)
finally:
master_a.close()
master_b.close()
self.assertEqual([packet.fields["seq"] for packet in captured], [0, 3])
self.assertEqual(leaked, [])
def test_hbp_duplicate_udp_profile_drops_duplicate_and_routes_next_packet(self):
require_udp_integration_enabled()
stream_id = 0x0102031E
profile = StreamProfile.voice_over(
peer_id=1001,
dst_id=91,
slot=2,
stream_id=stream_id,
voice_bursts=2,
)
with UdpBlackBoxScenario() as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
master_b = scenario.repeater("MASTER-B", 1002)
try:
master_a.login()
master_b.login()
master_a.send_stream(
profile.packets,
cadence_seconds=profile.cadence_seconds,
impairment=ImpairmentProfiles.duplicate_udp(0),
)
captured = [master_b.recv(timeout=2.0) for _ in range(2)]
leaked = master_b.drain(seconds=0.4)
finally:
master_a.close()
master_b.close()
self.assertEqual([packet.fields["seq"] for packet in captured], [0, 1])
self.assertEqual(leaked, [])
def test_hbp_voice_terminator_suppresses_late_same_stream_packet(self):
require_udp_integration_enabled()
stream_id = 0x01020304
header = PacketSpec(
peer_id=1001,
dst_id=91,
slot=2,
stream_id=stream_id,
seq=0,
frame_type=HBPF_DATA_SYNC,
dtype_vseq=HBPF_SLT_VHEAD,
)
terminator = PacketSpec(
peer_id=1001,
dst_id=91,
slot=2,
stream_id=stream_id,
seq=1,
frame_type=HBPF_DATA_SYNC,
dtype_vseq=HBPF_SLT_VTERM,
)
late_voice = PacketSpec(
peer_id=1001,
dst_id=91,
slot=2,
stream_id=stream_id,
seq=2,
frame_type=HBPF_VOICE,
dtype_vseq=1,
)
with UdpBlackBoxScenario() as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
master_b = scenario.repeater("MASTER-B", 1002)
try:
master_a.login()
master_b.login()
master_a.send_dmr(header)
master_a.send_dmr(terminator)
captured = [master_b.recv(timeout=2.0) for _ in range(2)]
master_a.send_dmr(late_voice)
leaked = master_b.drain(seconds=0.4)
finally:
master_a.close()
master_b.close()
self.assertEqual(
[packet.fields["dtype_vseq"] for packet in captured],
[HBPF_SLT_VHEAD, HBPF_SLT_VTERM],
)
self.assertEqual(leaked, [])
def test_dial_a_tg_reserved_control_emits_local_tg9_ts2_prompt(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario() as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
master_b = scenario.repeater("MASTER-B", 1002)
try:
master_a.login()
master_b.login()
master_a.send_dmr(self.private_call(4001, peer_id=1001, slot=1))
master_a.send_dmr(self.private_call_terminator(4001, peer_id=1001, slot=1))
captured = master_a.recv(timeout=4.0)
leaked = master_b.drain(seconds=0.4)
finally:
master_a.close()
master_b.close()
self.assertEqual(captured.packet[:4], b"DMRD")
self.assertEqual(captured.fields["rf_src"], bytes_3(5000))
self.assertEqual(captured.fields["dst_id"], bytes_3(9))
self.assertEqual(captured.fields["slot"], 2)
self.assertEqual(leaked, [])
def test_dial_a_tg_slot_1_routes_local_tg9_to_fbp_reflector_tg(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario(
ts2_static="",
fbp_systems={"OBP-1": 3001},
) as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_a.login()
fbp_peer.send_bcka()
fbp_peer.send_bcve()
fbp_peer.drain(seconds=0.2)
master_a.send_dmr(self.private_call(235, peer_id=1001, slot=1))
master_a.send_dmr(
PacketSpec(
peer_id=1001,
rf_src=3120001,
dst_id=9,
slot=1,
stream_id=0x01020321,
)
)
captured = fbp_peer.recv_dmre(timeout=2.0)
finally:
master_a.close()
self.assertEqual(captured.fields["dst_id"], bytes_3(235))
self.assertEqual(captured.fields["slot"], 1)
def test_dynamic_tg_routing_disabled_does_not_create_unknown_hbp_route(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario(ts2_static="", dynamic_tg_routing=False) as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
master_b = scenario.repeater("MASTER-B", 1002)
try:
master_a.login()
master_b.login()
master_a.send_dmr(
PacketSpec(
peer_id=1001,
rf_src=3120001,
dst_id=12345,
slot=2,
stream_id=0x01020322,
)
)
leaked = master_b.drain(seconds=0.4)
finally:
master_a.close()
master_b.close()
self.assertEqual(leaked, [])
def test_real_hbp_voice_interrupts_generated_prompt_and_routes(self):
require_udp_integration_enabled()
real_stream = 0x0102030B
with UdpBlackBoxScenario() as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
master_b = scenario.repeater("MASTER-B", 1002)
try:
master_a.login()
master_b.login()
master_a.send_dmr(self.private_call(4001, peer_id=1001, slot=1))
master_a.send_dmr(self.private_call_terminator(4001, peer_id=1001, slot=1))
prompt = master_a.recv(timeout=4.0)
real_voice = StreamProfile.voice_over(
peer_id=1001,
rf_src=3120001,
dst_id=91,
slot=2,
stream_id=real_stream,
voice_bursts=1,
)
master_a.send_stream(real_voice.packets)
routed = master_b.recv(timeout=2.0)
finally:
master_a.close()
master_b.close()
self.assertEqual(prompt.fields["rf_src"], bytes_3(5000))
self.assertEqual(prompt.fields["dst_id"], bytes_3(9))
self.assertEqual(prompt.fields["slot"], 2)
self.assertEqual(routed.fields["rf_src"], bytes_3(3120001))
self.assertEqual(routed.fields["dst_id"], bytes_3(91))
self.assertEqual(routed.fields["stream_id"], bytes_4(real_stream))
def test_hbp_static_tg_routes_to_fbp_v5_peer(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_a.login()
fbp_peer.send_bcka()
fbp_peer.send_bcve()
fbp_peer.drain(seconds=0.2)
master_a.send_dmr(
PacketSpec(peer_id=1001, rf_src=3120001, dst_id=91, slot=2)
)
captured = fbp_peer.recv_dmre(timeout=2.0)
finally:
master_a.close()
self.assertEqual(captured.packet[:4], b"DMRE")
self.assertEqual(captured.fields["fbp_version"], 5)
self.assertEqual(captured.fields["peer_id"], bytes_4(9990))
self.assertEqual(captured.fields["rf_src"], bytes_3(3120001))
self.assertEqual(captured.fields["dst_id"], bytes_3(91))
self.assertEqual(captured.fields["slot"], 1)
def test_fbp_enhanced_keepalive_gates_hbp_to_fbp_forwarding(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_a.login()
fbp_peer.drain(seconds=0.2)
master_a.send_dmr(
PacketSpec(
peer_id=1001,
rf_src=3120001,
dst_id=91,
slot=2,
stream_id=0x01020312,
)
)
missing_before_keepalive = [
capture
for capture in fbp_peer.drain(seconds=0.4)
if capture.packet[:4] == b"DMRE"
]
fbp_peer.send_bcka()
fbp_peer.send_bcve()
fbp_peer.drain(seconds=0.2)
master_a.send_dmr(
PacketSpec(
peer_id=1001,
rf_src=3120001,
dst_id=91,
slot=2,
stream_id=0x01020313,
)
)
captured_after_keepalive = fbp_peer.recv_dmre(timeout=2.0)
finally:
master_a.close()
self.assertEqual(missing_before_keepalive, [])
self.assertEqual(captured_after_keepalive.fields["stream_id"], bytes_4(0x01020313))
def test_fbp_bcve_downgrade_does_not_change_outbound_packet_version(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_a.login()
fbp_peer.send_bcka()
fbp_peer.send_bcve(version=4)
fbp_peer.drain(seconds=0.2)
master_a.send_dmr(
PacketSpec(
peer_id=1001,
rf_src=3120001,
dst_id=91,
slot=2,
stream_id=0x01020317,
)
)
captured = fbp_peer.recv_dmre(timeout=2.0)
finally:
master_a.close()
self.assertEqual(captured.fields["fbp_version"], 5)
self.assertEqual(captured.fields["source_rptr"], bytes_4(1001))
def test_fbp_bcve_unsupported_version_does_not_change_outbound_packet_version(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_a.login()
fbp_peer.send_bcka()
fbp_peer.send_bcve(version=6)
fbp_peer.drain(seconds=0.2)
master_a.send_dmr(
PacketSpec(
peer_id=1001,
rf_src=3120001,
dst_id=91,
slot=2,
stream_id=0x01020318,
)
)
captured = fbp_peer.recv_dmre(timeout=2.0)
finally:
master_a.close()
self.assertEqual(captured.fields["fbp_version"], 5)
self.assertEqual(captured.fields["source_rptr"], bytes_4(1001))
def test_fbp_invalid_bcve_does_not_change_outbound_packet_version(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_a.login()
fbp_peer.send_bcka()
fbp_peer.send_invalid_bcve(version=4)
fbp_peer.drain(seconds=0.2)
master_a.send_dmr(
PacketSpec(
peer_id=1001,
rf_src=3120001,
dst_id=91,
slot=2,
stream_id=0x01020319,
)
)
captured = fbp_peer.recv_dmre(timeout=2.0)
finally:
master_a.close()
self.assertEqual(captured.fields["fbp_version"], 5)
self.assertEqual(captured.fields["source_rptr"], bytes_4(1001))
def test_fbp_v4_packet_routes_to_hbp_using_v4_metadata_layout(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
master_b = scenario.repeater("MASTER-B", 1002)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_b.login()
fbp_peer.send_bcka()
fbp_peer.send_bcve()
fbp_peer.drain(seconds=0.2)
fbp_peer.send_fbp(
PacketSpec(
peer_id=3001,
rf_src=3120001,
dst_id=91,
slot=1,
stream_id=0x0102031A,
),
fbp_version=4,
source_server=9991,
source_rptr=1001,
)
captured = master_b.recv(timeout=2.0)
finally:
master_b.close()
self.assertEqual(captured.packet[:4], b"DMRD")
self.assertEqual(captured.fields["rf_src"], bytes_3(3120001))
self.assertEqual(captured.fields["dst_id"], bytes_3(91))
self.assertEqual(captured.fields["slot"], 2)
self.assertEqual(captured.fields["stream_id"], bytes_4(0x0102031A))
@unittest.expectedFailure
def test_fbp_unsupported_embedded_packet_version_is_rejected_without_hbp_leak(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
master_b = scenario.repeater("MASTER-B", 1002)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_b.login()
fbp_peer.send_bcka()
fbp_peer.send_bcve()
fbp_peer.drain(seconds=0.2)
fbp_peer.send_fbp(
PacketSpec(
peer_id=3001,
rf_src=3120001,
dst_id=91,
slot=1,
stream_id=0x0102031F,
),
fbp_version=6,
source_server=9991,
source_rptr=1001,
)
with self.assertRaises(socket.timeout):
master_b.recv(timeout=0.5)
finally:
master_b.close()
def test_fbp_v4_packet_downgrades_session_to_v4_layout_for_compatibility(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
master_b = scenario.repeater("MASTER-B", 1002)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_a.login()
master_b.login()
fbp_peer.send_bcka()
fbp_peer.send_bcve()
fbp_peer.drain(seconds=0.2)
fbp_peer.send_fbp(
PacketSpec(
peer_id=3001,
rf_src=3120001,
dst_id=91,
slot=1,
stream_id=0x01020320,
),
fbp_version=4,
source_server=9991,
)
master_b.recv(timeout=2.0)
master_a.send_dmr(
PacketSpec(
peer_id=1001,
rf_src=3120002,
dst_id=91,
slot=2,
stream_id=0x01020321,
)
)
captured = fbp_peer.recv_dmre(timeout=2.0)
finally:
master_a.close()
master_b.close()
self.assertEqual(captured.packet[:4], b"DMRE")
self.assertEqual(len(captured.packet), 85)
self.assertEqual(captured.fields["source_rptr"], b"\x00\x00\x00\x00")
@unittest.expectedFailure
def test_fbp_configured_proto_v4_outbound_packet_carries_v4_version_byte(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario(
fbp_systems={"OBP-1": 3001},
fbp_proto_versions={"OBP-1": 4},
) as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_a.login()
fbp_peer.send_bcka()
fbp_peer.send_bcve(version=4)
fbp_peer.drain(seconds=0.2)
master_a.send_dmr(
PacketSpec(
peer_id=1001,
rf_src=3120002,
dst_id=91,
slot=2,
stream_id=0x01020322,
)
)
captured = fbp_peer.recv_dmre(timeout=2.0)
finally:
master_a.close()
self.assertEqual(len(captured.packet), 85)
self.assertEqual(captured.fields["fbp_version"], 4)
def test_obp_v1_packet_on_fbp_link_is_rejected_with_bcve_without_hbp_leak(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
master_b = scenario.repeater("MASTER-B", 1002)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_b.login()
fbp_peer.send_obp_v1(
PacketSpec(
peer_id=3001,
rf_src=3120001,
dst_id=91,
slot=1,
stream_id=0x0102031B,
)
)
version_response = fbp_peer.recv_opcode(b"BCVE", timeout=2.0)
with self.assertRaises(socket.timeout):
master_b.recv(timeout=0.5)
finally:
master_b.close()
self.assertEqual(version_response.packet[:4], b"BCVE")
self.assertEqual(version_response.packet[4], 5)
def test_fbp_invalid_source_quench_does_not_suppress_hbp_to_fbp_stream(self):
require_udp_integration_enabled()
stream_id = 0x01020314
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_a.login()
fbp_peer.send_bcka()
fbp_peer.send_bcve()
fbp_peer.drain(seconds=0.2)
fbp_peer.send_invalid_bcsq(91, stream_id)
master_a.send_dmr(
PacketSpec(
peer_id=1001,
rf_src=3120001,
dst_id=91,
slot=2,
stream_id=stream_id,
)
)
captured = fbp_peer.recv_dmre(timeout=2.0)
finally:
master_a.close()
self.assertEqual(captured.fields["stream_id"], bytes_4(stream_id))
self.assertEqual(captured.fields["dst_id"], bytes_3(91))
def test_fbp_stun_blocks_hbp_to_fbp_and_fbp_to_hbp_traffic(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
master_b = scenario.repeater("MASTER-B", 1002)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_a.login()
master_b.login()
fbp_peer.send_bcka()
fbp_peer.send_bcve()
fbp_peer.drain(seconds=0.2)
fbp_peer.send_bcst()
master_a.send_dmr(
PacketSpec(
peer_id=1001,
rf_src=3120001,
dst_id=91,
slot=2,
stream_id=0x01020315,
)
)
leaked_to_fbp = [
capture
for capture in fbp_peer.drain(seconds=0.4)
if capture.packet[:4] == b"DMRE"
]
master_b.drain(seconds=0.2)
fbp_peer.send_fbp(
PacketSpec(
peer_id=3001,
rf_src=3120001,
dst_id=91,
slot=1,
stream_id=0x01020316,
),
source_server=9991,
source_rptr=1001,
)
with self.assertRaises(socket.timeout):
master_b.recv(timeout=0.5)
finally:
master_a.close()
master_b.close()
self.assertEqual(leaked_to_fbp, [])
def test_fbp_trunk_routes_clean_stream_while_another_stream_is_reordered(self):
require_udp_integration_enabled()
stream_a = 0x0102030C
stream_b = 0x0102030D
stream_a_profile = StreamProfile.voice_over(
peer_id=1001,
rf_src=3120001,
dst_id=91,
slot=2,
stream_id=stream_a,
voice_bursts=3,
)
stream_b_profile = StreamProfile.voice_over(
peer_id=1002,
rf_src=3120002,
dst_id=235,
slot=2,
stream_id=stream_b,
voice_bursts=2,
)
with UdpBlackBoxScenario(
ts2_static="91,235",
fbp_systems={"OBP-1": 3001},
) as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
master_b = scenario.repeater("MASTER-B", 1002)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_a.login()
master_b.login()
fbp_peer.send_bcka()
fbp_peer.send_bcve()
fbp_peer.drain(seconds=0.2)
master_a.send_dmr(stream_a_profile.packets[0])
time.sleep(stream_a_profile.cadence_seconds)
master_b.send_dmr(stream_b_profile.packets[0])
time.sleep(stream_a_profile.cadence_seconds)
master_a.send_dmr(stream_a_profile.packets[2])
time.sleep(stream_a_profile.cadence_seconds)
master_b.send_dmr(stream_b_profile.packets[1])
time.sleep(stream_a_profile.cadence_seconds)
master_a.send_dmr(stream_a_profile.packets[1])
captured = [fbp_peer.recv_dmre(timeout=2.0) for _ in range(4)]
leaked = [
capture
for capture in fbp_peer.drain(seconds=0.4)
if capture.packet[:4] == b"DMRE"
]
finally:
master_a.close()
master_b.close()
seqs_by_stream = {}
for capture in captured:
seqs_by_stream.setdefault(capture.fields["stream_id"], []).append(
capture.fields["seq"]
)
self.assertEqual(seqs_by_stream[bytes_4(stream_a)], [0, 2])
self.assertEqual(seqs_by_stream[bytes_4(stream_b)], [0, 1])
self.assertEqual(leaked, [])
def test_fbp_static_tg_routes_to_hbp_repeater(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
master_b = scenario.repeater("MASTER-B", 1002)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_b.login()
fbp_peer.send_bcka()
fbp_peer.send_bcve()
fbp_peer.drain(seconds=0.2)
fbp_peer.send_fbp(
PacketSpec(
peer_id=3001,
rf_src=3120001,
dst_id=91,
slot=1,
stream_id=0x01020305,
),
source_server=9991,
source_rptr=1001,
)
captured = master_b.recv(timeout=2.0)
finally:
master_b.close()
self.assertEqual(captured.packet[:4], b"DMRD")
self.assertEqual(captured.fields["rf_src"], bytes_3(3120001))
self.assertEqual(captured.fields["dst_id"], bytes_3(91))
self.assertEqual(captured.fields["slot"], 2)
self.assertEqual(captured.fields["stream_id"], bytes_4(0x01020305))
def test_fbp_source_quench_suppresses_hbp_to_fbp_stream(self):
require_udp_integration_enabled()
stream_id = 0x01020306
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
master_a = scenario.repeater("MASTER-A", 1001)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_a.login()
fbp_peer.send_bcka()
fbp_peer.send_bcve()
fbp_peer.drain(seconds=0.2)
fbp_peer.send_bcsq(91, stream_id)
master_a.send_dmr(
PacketSpec(
peer_id=1001,
rf_src=3120001,
dst_id=91,
slot=2,
stream_id=stream_id,
)
)
leaked = [
capture
for capture in fbp_peer.drain(seconds=0.5)
if capture.packet[:4] == b"DMRE"
]
finally:
master_a.close()
self.assertEqual(leaked, [])
def test_fbp_rejects_packet_with_wrong_network_id(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
master_b = scenario.repeater("MASTER-B", 1002)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_b.login()
fbp_peer.send_bcka()
fbp_peer.send_bcve()
fbp_peer.drain(seconds=0.2)
fbp_peer.send_fbp(
PacketSpec(dst_id=91, slot=1, stream_id=0x01020307),
network_id=3002,
source_server=9991,
source_rptr=1001,
)
with self.assertRaises(socket.timeout):
master_b.recv(timeout=0.5)
finally:
master_b.close()
def test_fbp_rejects_bad_hash_without_leaking_to_hbp(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
master_b = scenario.repeater("MASTER-B", 1002)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_b.login()
fbp_peer.send_bcka()
fbp_peer.send_bcve()
fbp_peer.drain(seconds=0.2)
fbp_peer.send_fbp(
PacketSpec(peer_id=3001, dst_id=91, slot=1, stream_id=0x0102030E),
source_server=9991,
source_rptr=1001,
corrupt_hash=True,
)
with self.assertRaises(socket.timeout):
master_b.recv(timeout=0.5)
scenario.process.wait_for_log("FreeBridge HMAC failed", timeout=2.0)
finally:
master_b.close()
def test_fbp_stale_timestamp_is_source_quenched_without_hbp_leak(self):
require_udp_integration_enabled()
stream_id = 0x0102030F
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
master_b = scenario.repeater("MASTER-B", 1002)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_b.login()
fbp_peer.send_bcka()
fbp_peer.send_bcve()
fbp_peer.drain(seconds=0.2)
fbp_peer.send_fbp(
PacketSpec(
peer_id=3001,
dst_id=91,
slot=1,
stream_id=stream_id,
),
source_server=9991,
source_rptr=1001,
timestamp_ns=1,
)
quench = fbp_peer.recv_opcode(b"BCSQ", timeout=2.0)
with self.assertRaises(socket.timeout):
master_b.recv(timeout=0.5)
finally:
master_b.close()
self.assertEqual(quench.packet[4:7], bytes_3(91))
self.assertEqual(quench.packet[7:11], bytes_4(stream_id))
def test_fbp_max_hops_is_source_quenched_without_hbp_leak(self):
require_udp_integration_enabled()
stream_id = 0x01020310
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
master_b = scenario.repeater("MASTER-B", 1002)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_b.login()
fbp_peer.send_bcka()
fbp_peer.send_bcve()
fbp_peer.drain(seconds=0.2)
fbp_peer.send_fbp(
PacketSpec(
peer_id=3001,
dst_id=91,
slot=1,
stream_id=stream_id,
),
source_server=9991,
source_rptr=1001,
hops=10,
)
quench = fbp_peer.recv_opcode(b"BCSQ", timeout=2.0)
with self.assertRaises(socket.timeout):
master_b.recv(timeout=0.5)
finally:
master_b.close()
self.assertEqual(quench.packet[4:7], bytes_3(91))
self.assertEqual(quench.packet[7:11], bytes_4(stream_id))
def test_fbp_malformed_short_dmre_is_ignored_and_later_packet_routes(self):
require_udp_integration_enabled()
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
master_b = scenario.repeater("MASTER-B", 1002)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_b.login()
fbp_peer.send_bcka()
fbp_peer.send_bcve()
fbp_peer.drain(seconds=0.2)
fbp_peer.send(b"DMRE" + b"\x00" * 10)
scenario.process.wait_for_log("FreeBridge packet too short", timeout=2.0)
fbp_peer.send_fbp(
PacketSpec(peer_id=3001, dst_id=91, slot=1, stream_id=0x01020311),
source_server=9991,
source_rptr=1001,
)
captured = master_b.recv(timeout=2.0)
finally:
master_b.close()
self.assertEqual(captured.fields["dst_id"], bytes_3(91))
self.assertEqual(captured.fields["stream_id"], bytes_4(0x01020311))
def test_fbp_link_impairment_does_not_buffer_or_replay_late_packets(self):
require_udp_integration_enabled()
stream_a = 0x01020309
stream_b = 0x0102030A
impaired_packets = [
PacketSpec(peer_id=3001, dst_id=91, slot=1, stream_id=stream_a, seq=0),
PacketSpec(
peer_id=3001,
dst_id=91,
slot=1,
stream_id=stream_a,
seq=1,
dtype_vseq=1,
),
PacketSpec(
peer_id=3001,
dst_id=91,
slot=1,
stream_id=stream_a,
seq=2,
dtype_vseq=2,
),
]
following_packet = PacketSpec(
peer_id=3001,
dst_id=91,
slot=1,
stream_id=stream_b,
seq=0,
rf_src=3120001,
)
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
master_b = scenario.repeater("MASTER-B", 1002)
fbp_peer = scenario.fbp_peer("OBP-1")
try:
master_b.login()
fbp_peer.send_bcka()
fbp_peer.send_bcve()
fbp_peer.drain(seconds=0.2)
fbp_peer.send_fbp_stream(
impaired_packets,
cadence_seconds=0.03,
impairment=ImpairmentProfiles.provider_vxlan_reorder(),
source_server=9991,
source_rptr=1001,
)
impaired_captures = [master_b.recv(timeout=2.0) for _ in range(2)]
leaked = master_b.drain(seconds=0.4)
fbp_peer.send_fbp(
following_packet,
source_server=9991,
source_rptr=1001,
)
following_capture = master_b.recv(timeout=2.0)
finally:
master_b.close()
self.assertEqual([packet.fields["seq"] for packet in impaired_captures], [0, 2])
self.assertEqual(leaked, [])
self.assertEqual(following_capture.fields["stream_id"], bytes_4(stream_b))
self.assertEqual(following_capture.fields["seq"], 0)
if __name__ == "__main__":
unittest.main()

Powered by TurnKey Linux.