You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1391 lines
49 KiB
1391 lines
49 KiB
import socket
|
|
import tempfile
|
|
import time
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
from freedmr_dmr_codec import encode_embedded_lc, payload_with_embedded_lc_fragment
|
|
|
|
from tests.harness.deterministic import (
|
|
HBPF_DATA_SYNC,
|
|
HBPF_SLT_VHEAD,
|
|
HBPF_SLT_VTERM,
|
|
HBPF_VOICE,
|
|
PacketSpec,
|
|
bytes_3,
|
|
bytes_4,
|
|
)
|
|
from tests.harness.udp_blackbox import (
|
|
ImpairmentProfiles,
|
|
RecordedPacketFixture,
|
|
StreamProfile,
|
|
UdpBlackBoxScenario,
|
|
require_udp_integration_enabled,
|
|
)
|
|
|
|
|
|
def embedded_lc_payloads(lc):
|
|
payload = b"\x55" * 33
|
|
return tuple(
|
|
payload_with_embedded_lc_fragment(payload, fragment)
|
|
for fragment in encode_embedded_lc(lc)
|
|
)
|
|
|
|
|
|
class UdpBlackBoxHarnessTest(unittest.TestCase):
|
|
TA_EMB_LC_PAYLOADS = embedded_lc_payloads(bytes.fromhex("04004c43414c4c3132"))
|
|
GPS_EMB_LC_PAYLOADS = embedded_lc_payloads(bytes.fromhex("080007fcfae048b57b"))
|
|
|
|
def private_call(self, dst_id, peer_id, slot=1, stream_id=0x01020304):
|
|
return PacketSpec(
|
|
peer_id=peer_id,
|
|
dst_id=dst_id,
|
|
slot=slot,
|
|
stream_id=stream_id,
|
|
call_type="unit",
|
|
frame_type=HBPF_VOICE,
|
|
dtype_vseq=0,
|
|
)
|
|
|
|
def private_call_terminator(self, dst_id, peer_id, slot=1, stream_id=0x01020304):
|
|
return PacketSpec(
|
|
peer_id=peer_id,
|
|
dst_id=dst_id,
|
|
slot=slot,
|
|
stream_id=stream_id,
|
|
call_type="unit",
|
|
frame_type=HBPF_DATA_SYNC,
|
|
dtype_vseq=HBPF_SLT_VTERM,
|
|
)
|
|
|
|
def test_two_registered_repeaters_observe_static_tg_route(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario() as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
try:
|
|
master_a.login()
|
|
master_b.login()
|
|
|
|
master_a.send_dmr(PacketSpec(peer_id=1001, rf_src=3120001, dst_id=91, slot=2))
|
|
captured = master_b.recv(timeout=2.0)
|
|
finally:
|
|
master_a.close()
|
|
master_b.close()
|
|
|
|
self.assertEqual(captured.packet[:4], b"DMRD")
|
|
self.assertEqual(captured.fields["dst_id"], bytes_3(91))
|
|
self.assertEqual(captured.fields["slot"], 2)
|
|
self.assertEqual(captured.fields["stream_id"], bytes_4(0x01020304))
|
|
|
|
def test_global_use_acl_false_does_not_apply_deny_acl(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario(
|
|
global_use_acl=False,
|
|
global_sub_acl="DENY:3120001",
|
|
) as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
try:
|
|
master_a.login()
|
|
master_b.login()
|
|
|
|
master_a.send_dmr(
|
|
PacketSpec(peer_id=1001, rf_src=3120001, dst_id=91, slot=2)
|
|
)
|
|
captured = master_b.recv(timeout=2.0)
|
|
finally:
|
|
master_a.close()
|
|
master_b.close()
|
|
|
|
self.assertEqual(captured.fields["rf_src"], bytes_3(3120001))
|
|
self.assertEqual(captured.fields["dst_id"], bytes_3(91))
|
|
|
|
def test_hbp_data_sync_control_payload_is_preserved(self):
|
|
require_udp_integration_enabled()
|
|
|
|
payload = bytes(range(33))
|
|
with UdpBlackBoxScenario() as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
try:
|
|
master_a.login()
|
|
master_b.login()
|
|
|
|
master_a.send_dmr(
|
|
PacketSpec(
|
|
peer_id=1001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=2,
|
|
frame_type=HBPF_DATA_SYNC,
|
|
dtype_vseq=7,
|
|
payload=payload,
|
|
)
|
|
)
|
|
captured = master_b.recv(timeout=2.0)
|
|
finally:
|
|
master_a.close()
|
|
master_b.close()
|
|
|
|
self.assertEqual(captured.fields["frame_type"], HBPF_DATA_SYNC)
|
|
self.assertEqual(captured.fields["dtype_vseq"], 7)
|
|
self.assertEqual(captured.fields["dmr_payload"], payload)
|
|
|
|
def test_hbp_same_tg_voice_embedded_lc_payload_is_preserved(self):
|
|
require_udp_integration_enabled()
|
|
|
|
payload = bytes((index * 7) % 256 for index in range(33))
|
|
with UdpBlackBoxScenario() as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
try:
|
|
master_a.login()
|
|
master_b.login()
|
|
|
|
master_a.send_dmr(
|
|
PacketSpec(
|
|
peer_id=1001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=2,
|
|
frame_type=HBPF_VOICE,
|
|
dtype_vseq=1,
|
|
payload=payload,
|
|
)
|
|
)
|
|
captured = master_b.recv(timeout=2.0)
|
|
finally:
|
|
master_a.close()
|
|
master_b.close()
|
|
|
|
self.assertEqual(captured.fields["frame_type"], HBPF_VOICE)
|
|
self.assertEqual(captured.fields["dtype_vseq"], 1)
|
|
self.assertEqual(captured.fields["dmr_payload"], payload)
|
|
|
|
def test_hbp_in_call_talker_alias_is_logged(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario() as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
try:
|
|
master_a.login()
|
|
master_b.login()
|
|
|
|
for index, payload in enumerate(self.TA_EMB_LC_PAYLOADS, start=1):
|
|
master_a.send_dmr(
|
|
PacketSpec(
|
|
peer_id=1001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=0x01020330,
|
|
seq=index,
|
|
frame_type=HBPF_VOICE,
|
|
dtype_vseq=index,
|
|
payload=payload,
|
|
)
|
|
)
|
|
output = scenario.process.wait_for_log("*IN-CALL TA*", timeout=2.0)
|
|
finally:
|
|
master_a.close()
|
|
master_b.close()
|
|
|
|
self.assertIn("TEXT: 'CALL12'", output)
|
|
self.assertIn("LC: 04004c43414c4c3132", output)
|
|
|
|
def test_hbp_in_call_gps_is_logged(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario() as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
try:
|
|
master_a.login()
|
|
master_b.login()
|
|
|
|
for index, payload in enumerate(self.GPS_EMB_LC_PAYLOADS, start=1):
|
|
master_a.send_dmr(
|
|
PacketSpec(
|
|
peer_id=1001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=0x01020331,
|
|
seq=index,
|
|
frame_type=HBPF_VOICE,
|
|
dtype_vseq=index,
|
|
payload=payload,
|
|
)
|
|
)
|
|
output = scenario.process.wait_for_log("*IN-CALL GPS*", timeout=2.0)
|
|
finally:
|
|
master_a.close()
|
|
master_b.close()
|
|
|
|
self.assertIn("LAT: 51.123451", output)
|
|
self.assertIn("LON: -2.123451", output)
|
|
self.assertIn("LC: 080007fcfae048b57b", output)
|
|
|
|
def test_hbp_malformed_short_dmrd_is_ignored_and_later_packet_routes(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario() as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
try:
|
|
master_a.login()
|
|
master_b.login()
|
|
|
|
master_a.send(b"DMRD" + b"\x00" * 10)
|
|
master_a.send_dmr(
|
|
PacketSpec(peer_id=1001, rf_src=3120001, dst_id=91, slot=2)
|
|
)
|
|
captured = master_b.recv(timeout=2.0)
|
|
finally:
|
|
master_a.close()
|
|
master_b.close()
|
|
|
|
self.assertEqual(captured.fields["rf_src"], bytes_3(3120001))
|
|
self.assertEqual(captured.fields["dst_id"], bytes_3(91))
|
|
|
|
def test_hbp_voice_sequence_wrap_routes_forward_progress(self):
|
|
require_udp_integration_enabled()
|
|
|
|
stream_id = 0x01020304
|
|
packets = [
|
|
PacketSpec(peer_id=1001, dst_id=91, slot=2, stream_id=stream_id, seq=254),
|
|
PacketSpec(
|
|
peer_id=1001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=stream_id,
|
|
seq=255,
|
|
dtype_vseq=1,
|
|
),
|
|
PacketSpec(
|
|
peer_id=1001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=stream_id,
|
|
seq=2,
|
|
dtype_vseq=2,
|
|
),
|
|
]
|
|
|
|
with UdpBlackBoxScenario() as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
try:
|
|
master_a.login()
|
|
master_b.login()
|
|
|
|
for packet in packets:
|
|
master_a.send_dmr(packet)
|
|
|
|
captured = [master_b.recv(timeout=2.0) for _ in packets]
|
|
finally:
|
|
master_a.close()
|
|
master_b.close()
|
|
|
|
self.assertEqual([packet.fields["seq"] for packet in captured], [254, 255, 2])
|
|
self.assertEqual(captured[-1].fields["stream_id"], bytes_4(stream_id))
|
|
|
|
def test_hbp_link_impairment_discards_late_out_of_order_packet(self):
|
|
require_udp_integration_enabled()
|
|
|
|
stream_id = 0x01020308
|
|
packets = [
|
|
PacketSpec(peer_id=1001, dst_id=91, slot=2, stream_id=stream_id, seq=0),
|
|
PacketSpec(
|
|
peer_id=1001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=stream_id,
|
|
seq=1,
|
|
dtype_vseq=1,
|
|
),
|
|
PacketSpec(
|
|
peer_id=1001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=stream_id,
|
|
seq=2,
|
|
dtype_vseq=2,
|
|
),
|
|
]
|
|
impairment = ImpairmentProfiles.provider_vxlan_reorder()
|
|
|
|
with UdpBlackBoxScenario() as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
try:
|
|
master_a.login()
|
|
master_b.login()
|
|
|
|
master_a.send_stream(
|
|
packets,
|
|
cadence_seconds=0.03,
|
|
impairment=impairment,
|
|
)
|
|
captured = [master_b.recv(timeout=2.0) for _ in range(2)]
|
|
leaked = master_b.drain(seconds=0.4)
|
|
finally:
|
|
master_a.close()
|
|
master_b.close()
|
|
|
|
self.assertEqual([packet.fields["seq"] for packet in captured], [0, 2])
|
|
self.assertEqual(leaked, [])
|
|
|
|
def test_hbp_duplicate_sequence_zero_is_dropped(self):
|
|
require_udp_integration_enabled()
|
|
|
|
first = PacketSpec(peer_id=1001, dst_id=91, slot=2, seq=0, payload=b"\x11" * 33)
|
|
duplicate = PacketSpec(
|
|
peer_id=1001,
|
|
dst_id=91,
|
|
slot=2,
|
|
seq=0,
|
|
payload=b"\x22" * 33,
|
|
)
|
|
|
|
with UdpBlackBoxScenario() as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
try:
|
|
master_a.login()
|
|
master_b.login()
|
|
|
|
master_a.send_dmr(first)
|
|
captured = master_b.recv(timeout=2.0)
|
|
master_a.send_dmr(duplicate)
|
|
leaked = master_b.drain(seconds=0.4)
|
|
finally:
|
|
master_a.close()
|
|
master_b.close()
|
|
|
|
self.assertEqual(captured.fields["seq"], 0)
|
|
self.assertEqual(captured.fields["dmr_payload"], b"\x11" * 33)
|
|
self.assertEqual(leaked, [])
|
|
|
|
def test_hbp_recorded_fixture_replay_routes_preserved_packet(self):
|
|
require_udp_integration_enabled()
|
|
|
|
packet = PacketSpec(
|
|
peer_id=1001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=0x0102031C,
|
|
payload=b"\x33" * 33,
|
|
)
|
|
with tempfile.TemporaryDirectory(prefix="freedmr-fixture-") as tempdir:
|
|
fixture_path = Path(tempdir) / "hbp.hex"
|
|
fixture_path.write_text(
|
|
"# recorded HBP packet\n" + packet.data().hex() + "\n",
|
|
encoding="ascii",
|
|
)
|
|
fixture = RecordedPacketFixture.from_file(fixture_path)
|
|
|
|
with UdpBlackBoxScenario() as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
try:
|
|
master_a.login()
|
|
master_b.login()
|
|
|
|
master_a.replay_fixture(fixture)
|
|
captured = master_b.recv(timeout=2.0)
|
|
finally:
|
|
master_a.close()
|
|
master_b.close()
|
|
|
|
self.assertEqual(captured.fields["stream_id"], bytes_4(0x0102031C))
|
|
self.assertEqual(captured.fields["dmr_payload"], b"\x33" * 33)
|
|
|
|
def test_hbp_burst_loss_profile_routes_later_forward_progress(self):
|
|
require_udp_integration_enabled()
|
|
|
|
stream_id = 0x0102031D
|
|
profile = StreamProfile.voice_over(
|
|
peer_id=1001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=stream_id,
|
|
voice_bursts=4,
|
|
)
|
|
|
|
with UdpBlackBoxScenario() as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
try:
|
|
master_a.login()
|
|
master_b.login()
|
|
|
|
master_a.send_stream(
|
|
profile.packets,
|
|
cadence_seconds=profile.cadence_seconds,
|
|
impairment=ImpairmentProfiles.burst_loss(1, 2),
|
|
)
|
|
captured = [master_b.recv(timeout=2.0) for _ in range(2)]
|
|
leaked = master_b.drain(seconds=0.4)
|
|
finally:
|
|
master_a.close()
|
|
master_b.close()
|
|
|
|
self.assertEqual([packet.fields["seq"] for packet in captured], [0, 3])
|
|
self.assertEqual(leaked, [])
|
|
|
|
def test_hbp_duplicate_udp_profile_drops_duplicate_and_routes_next_packet(self):
|
|
require_udp_integration_enabled()
|
|
|
|
stream_id = 0x0102031E
|
|
profile = StreamProfile.voice_over(
|
|
peer_id=1001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=stream_id,
|
|
voice_bursts=2,
|
|
)
|
|
|
|
with UdpBlackBoxScenario() as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
try:
|
|
master_a.login()
|
|
master_b.login()
|
|
|
|
master_a.send_stream(
|
|
profile.packets,
|
|
cadence_seconds=profile.cadence_seconds,
|
|
impairment=ImpairmentProfiles.duplicate_udp(0),
|
|
)
|
|
captured = [master_b.recv(timeout=2.0) for _ in range(2)]
|
|
leaked = master_b.drain(seconds=0.4)
|
|
finally:
|
|
master_a.close()
|
|
master_b.close()
|
|
|
|
self.assertEqual([packet.fields["seq"] for packet in captured], [0, 1])
|
|
self.assertEqual(leaked, [])
|
|
|
|
def test_hbp_voice_terminator_suppresses_late_same_stream_packet(self):
|
|
require_udp_integration_enabled()
|
|
|
|
stream_id = 0x01020304
|
|
header = PacketSpec(
|
|
peer_id=1001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=stream_id,
|
|
seq=0,
|
|
frame_type=HBPF_DATA_SYNC,
|
|
dtype_vseq=HBPF_SLT_VHEAD,
|
|
)
|
|
terminator = PacketSpec(
|
|
peer_id=1001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=stream_id,
|
|
seq=1,
|
|
frame_type=HBPF_DATA_SYNC,
|
|
dtype_vseq=HBPF_SLT_VTERM,
|
|
)
|
|
late_voice = PacketSpec(
|
|
peer_id=1001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=stream_id,
|
|
seq=2,
|
|
frame_type=HBPF_VOICE,
|
|
dtype_vseq=1,
|
|
)
|
|
|
|
with UdpBlackBoxScenario() as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
try:
|
|
master_a.login()
|
|
master_b.login()
|
|
|
|
master_a.send_dmr(header)
|
|
master_a.send_dmr(terminator)
|
|
captured = [master_b.recv(timeout=2.0) for _ in range(2)]
|
|
master_a.send_dmr(late_voice)
|
|
leaked = master_b.drain(seconds=0.4)
|
|
finally:
|
|
master_a.close()
|
|
master_b.close()
|
|
|
|
self.assertEqual(
|
|
[packet.fields["dtype_vseq"] for packet in captured],
|
|
[HBPF_SLT_VHEAD, HBPF_SLT_VTERM],
|
|
)
|
|
self.assertEqual(leaked, [])
|
|
|
|
def test_dial_a_tg_reserved_control_emits_local_tg9_ts2_prompt(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario() as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
try:
|
|
master_a.login()
|
|
master_b.login()
|
|
|
|
master_a.send_dmr(self.private_call(4001, peer_id=1001, slot=1))
|
|
master_a.send_dmr(self.private_call_terminator(4001, peer_id=1001, slot=1))
|
|
|
|
captured = master_a.recv(timeout=4.0)
|
|
leaked = master_b.drain(seconds=0.4)
|
|
finally:
|
|
master_a.close()
|
|
master_b.close()
|
|
|
|
self.assertEqual(captured.packet[:4], b"DMRD")
|
|
self.assertEqual(captured.fields["rf_src"], bytes_3(5000))
|
|
self.assertEqual(captured.fields["dst_id"], bytes_3(9))
|
|
self.assertEqual(captured.fields["slot"], 2)
|
|
self.assertEqual(leaked, [])
|
|
|
|
def test_dial_a_tg_slot_1_routes_local_tg9_to_fbp_reflector_tg(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario(
|
|
ts2_static="",
|
|
fbp_systems={"OBP-1": 3001},
|
|
) as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_a.login()
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_bcve()
|
|
fbp_peer.drain(seconds=0.2)
|
|
|
|
master_a.send_dmr(self.private_call(235, peer_id=1001, slot=1))
|
|
master_a.send_dmr(
|
|
PacketSpec(
|
|
peer_id=1001,
|
|
rf_src=3120001,
|
|
dst_id=9,
|
|
slot=1,
|
|
stream_id=0x01020321,
|
|
)
|
|
)
|
|
captured = fbp_peer.recv_dmre(timeout=2.0)
|
|
finally:
|
|
master_a.close()
|
|
|
|
self.assertEqual(captured.fields["dst_id"], bytes_3(235))
|
|
self.assertEqual(captured.fields["slot"], 1)
|
|
|
|
def test_dynamic_tg_routing_disabled_does_not_create_unknown_hbp_route(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario(ts2_static="", dynamic_tg_routing=False) as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
try:
|
|
master_a.login()
|
|
master_b.login()
|
|
|
|
master_a.send_dmr(
|
|
PacketSpec(
|
|
peer_id=1001,
|
|
rf_src=3120001,
|
|
dst_id=12345,
|
|
slot=2,
|
|
stream_id=0x01020322,
|
|
)
|
|
)
|
|
leaked = master_b.drain(seconds=0.4)
|
|
finally:
|
|
master_a.close()
|
|
master_b.close()
|
|
|
|
self.assertEqual(leaked, [])
|
|
|
|
def test_real_hbp_voice_interrupts_generated_prompt_and_routes(self):
|
|
require_udp_integration_enabled()
|
|
|
|
real_stream = 0x0102030B
|
|
with UdpBlackBoxScenario() as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
try:
|
|
master_a.login()
|
|
master_b.login()
|
|
|
|
master_a.send_dmr(self.private_call(4001, peer_id=1001, slot=1))
|
|
master_a.send_dmr(self.private_call_terminator(4001, peer_id=1001, slot=1))
|
|
prompt = master_a.recv(timeout=4.0)
|
|
|
|
real_voice = StreamProfile.voice_over(
|
|
peer_id=1001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=real_stream,
|
|
voice_bursts=1,
|
|
)
|
|
master_a.send_stream(real_voice.packets)
|
|
routed = master_b.recv(timeout=2.0)
|
|
finally:
|
|
master_a.close()
|
|
master_b.close()
|
|
|
|
self.assertEqual(prompt.fields["rf_src"], bytes_3(5000))
|
|
self.assertEqual(prompt.fields["dst_id"], bytes_3(9))
|
|
self.assertEqual(prompt.fields["slot"], 2)
|
|
self.assertEqual(routed.fields["rf_src"], bytes_3(3120001))
|
|
self.assertEqual(routed.fields["dst_id"], bytes_3(91))
|
|
self.assertEqual(routed.fields["stream_id"], bytes_4(real_stream))
|
|
|
|
def test_hbp_static_tg_routes_to_fbp_v5_peer(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_a.login()
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_bcve()
|
|
fbp_peer.drain(seconds=0.2)
|
|
|
|
master_a.send_dmr(
|
|
PacketSpec(peer_id=1001, rf_src=3120001, dst_id=91, slot=2)
|
|
)
|
|
captured = fbp_peer.recv_dmre(timeout=2.0)
|
|
finally:
|
|
master_a.close()
|
|
|
|
self.assertEqual(captured.packet[:4], b"DMRE")
|
|
self.assertEqual(captured.fields["fbp_version"], 5)
|
|
self.assertEqual(captured.fields["peer_id"], bytes_4(9990))
|
|
self.assertEqual(captured.fields["rf_src"], bytes_3(3120001))
|
|
self.assertEqual(captured.fields["dst_id"], bytes_3(91))
|
|
self.assertEqual(captured.fields["slot"], 1)
|
|
|
|
def test_fbp_enhanced_keepalive_gates_hbp_to_fbp_forwarding(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_a.login()
|
|
fbp_peer.drain(seconds=0.2)
|
|
|
|
master_a.send_dmr(
|
|
PacketSpec(
|
|
peer_id=1001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=0x01020312,
|
|
)
|
|
)
|
|
missing_before_keepalive = [
|
|
capture
|
|
for capture in fbp_peer.drain(seconds=0.4)
|
|
if capture.packet[:4] == b"DMRE"
|
|
]
|
|
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_bcve()
|
|
fbp_peer.drain(seconds=0.2)
|
|
master_a.send_dmr(
|
|
PacketSpec(
|
|
peer_id=1001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=0x01020313,
|
|
)
|
|
)
|
|
captured_after_keepalive = fbp_peer.recv_dmre(timeout=2.0)
|
|
finally:
|
|
master_a.close()
|
|
|
|
self.assertEqual(missing_before_keepalive, [])
|
|
self.assertEqual(captured_after_keepalive.fields["stream_id"], bytes_4(0x01020313))
|
|
|
|
def test_fbp_bcve_downgrade_does_not_change_outbound_packet_version(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_a.login()
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_bcve(version=4)
|
|
fbp_peer.drain(seconds=0.2)
|
|
|
|
master_a.send_dmr(
|
|
PacketSpec(
|
|
peer_id=1001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=0x01020317,
|
|
)
|
|
)
|
|
captured = fbp_peer.recv_dmre(timeout=2.0)
|
|
finally:
|
|
master_a.close()
|
|
|
|
self.assertEqual(captured.fields["fbp_version"], 5)
|
|
self.assertEqual(captured.fields["source_rptr"], bytes_4(1001))
|
|
|
|
def test_fbp_bcve_unsupported_version_does_not_change_outbound_packet_version(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_a.login()
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_bcve(version=6)
|
|
fbp_peer.drain(seconds=0.2)
|
|
|
|
master_a.send_dmr(
|
|
PacketSpec(
|
|
peer_id=1001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=0x01020318,
|
|
)
|
|
)
|
|
captured = fbp_peer.recv_dmre(timeout=2.0)
|
|
finally:
|
|
master_a.close()
|
|
|
|
self.assertEqual(captured.fields["fbp_version"], 5)
|
|
self.assertEqual(captured.fields["source_rptr"], bytes_4(1001))
|
|
|
|
def test_fbp_invalid_bcve_does_not_change_outbound_packet_version(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_a.login()
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_invalid_bcve(version=4)
|
|
fbp_peer.drain(seconds=0.2)
|
|
|
|
master_a.send_dmr(
|
|
PacketSpec(
|
|
peer_id=1001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=0x01020319,
|
|
)
|
|
)
|
|
captured = fbp_peer.recv_dmre(timeout=2.0)
|
|
finally:
|
|
master_a.close()
|
|
|
|
self.assertEqual(captured.fields["fbp_version"], 5)
|
|
self.assertEqual(captured.fields["source_rptr"], bytes_4(1001))
|
|
|
|
def test_fbp_v4_packet_routes_to_hbp_using_v4_metadata_layout(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_b.login()
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_bcve()
|
|
fbp_peer.drain(seconds=0.2)
|
|
|
|
fbp_peer.send_fbp(
|
|
PacketSpec(
|
|
peer_id=3001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=1,
|
|
stream_id=0x0102031A,
|
|
),
|
|
fbp_version=4,
|
|
source_server=9991,
|
|
source_rptr=1001,
|
|
)
|
|
captured = master_b.recv(timeout=2.0)
|
|
finally:
|
|
master_b.close()
|
|
|
|
self.assertEqual(captured.packet[:4], b"DMRD")
|
|
self.assertEqual(captured.fields["rf_src"], bytes_3(3120001))
|
|
self.assertEqual(captured.fields["dst_id"], bytes_3(91))
|
|
self.assertEqual(captured.fields["slot"], 2)
|
|
self.assertEqual(captured.fields["stream_id"], bytes_4(0x0102031A))
|
|
|
|
@unittest.expectedFailure
|
|
def test_fbp_unsupported_embedded_packet_version_is_rejected_without_hbp_leak(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_b.login()
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_bcve()
|
|
fbp_peer.drain(seconds=0.2)
|
|
|
|
fbp_peer.send_fbp(
|
|
PacketSpec(
|
|
peer_id=3001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=1,
|
|
stream_id=0x0102031F,
|
|
),
|
|
fbp_version=6,
|
|
source_server=9991,
|
|
source_rptr=1001,
|
|
)
|
|
with self.assertRaises(socket.timeout):
|
|
master_b.recv(timeout=0.5)
|
|
finally:
|
|
master_b.close()
|
|
|
|
def test_fbp_v4_packet_downgrades_session_to_v4_layout_for_compatibility(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_a.login()
|
|
master_b.login()
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_bcve()
|
|
fbp_peer.drain(seconds=0.2)
|
|
|
|
fbp_peer.send_fbp(
|
|
PacketSpec(
|
|
peer_id=3001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=1,
|
|
stream_id=0x01020320,
|
|
),
|
|
fbp_version=4,
|
|
source_server=9991,
|
|
)
|
|
master_b.recv(timeout=2.0)
|
|
|
|
master_a.send_dmr(
|
|
PacketSpec(
|
|
peer_id=1001,
|
|
rf_src=3120002,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=0x01020321,
|
|
)
|
|
)
|
|
captured = fbp_peer.recv_dmre(timeout=2.0)
|
|
finally:
|
|
master_a.close()
|
|
master_b.close()
|
|
|
|
self.assertEqual(captured.packet[:4], b"DMRE")
|
|
self.assertEqual(len(captured.packet), 85)
|
|
self.assertEqual(captured.fields["source_rptr"], b"\x00\x00\x00\x00")
|
|
|
|
@unittest.expectedFailure
|
|
def test_fbp_configured_proto_v4_outbound_packet_carries_v4_version_byte(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario(
|
|
fbp_systems={"OBP-1": 3001},
|
|
fbp_proto_versions={"OBP-1": 4},
|
|
) as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_a.login()
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_bcve(version=4)
|
|
fbp_peer.drain(seconds=0.2)
|
|
|
|
master_a.send_dmr(
|
|
PacketSpec(
|
|
peer_id=1001,
|
|
rf_src=3120002,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=0x01020322,
|
|
)
|
|
)
|
|
captured = fbp_peer.recv_dmre(timeout=2.0)
|
|
finally:
|
|
master_a.close()
|
|
|
|
self.assertEqual(len(captured.packet), 85)
|
|
self.assertEqual(captured.fields["fbp_version"], 4)
|
|
|
|
def test_obp_v1_packet_on_fbp_link_is_rejected_with_bcve_without_hbp_leak(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_b.login()
|
|
|
|
fbp_peer.send_obp_v1(
|
|
PacketSpec(
|
|
peer_id=3001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=1,
|
|
stream_id=0x0102031B,
|
|
)
|
|
)
|
|
version_response = fbp_peer.recv_opcode(b"BCVE", timeout=2.0)
|
|
with self.assertRaises(socket.timeout):
|
|
master_b.recv(timeout=0.5)
|
|
finally:
|
|
master_b.close()
|
|
|
|
self.assertEqual(version_response.packet[:4], b"BCVE")
|
|
self.assertEqual(version_response.packet[4], 5)
|
|
|
|
def test_fbp_invalid_source_quench_does_not_suppress_hbp_to_fbp_stream(self):
|
|
require_udp_integration_enabled()
|
|
|
|
stream_id = 0x01020314
|
|
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_a.login()
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_bcve()
|
|
fbp_peer.drain(seconds=0.2)
|
|
fbp_peer.send_invalid_bcsq(91, stream_id)
|
|
|
|
master_a.send_dmr(
|
|
PacketSpec(
|
|
peer_id=1001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=stream_id,
|
|
)
|
|
)
|
|
captured = fbp_peer.recv_dmre(timeout=2.0)
|
|
finally:
|
|
master_a.close()
|
|
|
|
self.assertEqual(captured.fields["stream_id"], bytes_4(stream_id))
|
|
self.assertEqual(captured.fields["dst_id"], bytes_3(91))
|
|
|
|
def test_fbp_stun_blocks_hbp_to_fbp_and_fbp_to_hbp_traffic(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_a.login()
|
|
master_b.login()
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_bcve()
|
|
fbp_peer.drain(seconds=0.2)
|
|
|
|
fbp_peer.send_bcst()
|
|
|
|
master_a.send_dmr(
|
|
PacketSpec(
|
|
peer_id=1001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=0x01020315,
|
|
)
|
|
)
|
|
leaked_to_fbp = [
|
|
capture
|
|
for capture in fbp_peer.drain(seconds=0.4)
|
|
if capture.packet[:4] == b"DMRE"
|
|
]
|
|
master_b.drain(seconds=0.2)
|
|
|
|
fbp_peer.send_fbp(
|
|
PacketSpec(
|
|
peer_id=3001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=1,
|
|
stream_id=0x01020316,
|
|
),
|
|
source_server=9991,
|
|
source_rptr=1001,
|
|
)
|
|
with self.assertRaises(socket.timeout):
|
|
master_b.recv(timeout=0.5)
|
|
finally:
|
|
master_a.close()
|
|
master_b.close()
|
|
|
|
self.assertEqual(leaked_to_fbp, [])
|
|
|
|
def test_fbp_trunk_routes_clean_stream_while_another_stream_is_reordered(self):
|
|
require_udp_integration_enabled()
|
|
|
|
stream_a = 0x0102030C
|
|
stream_b = 0x0102030D
|
|
stream_a_profile = StreamProfile.voice_over(
|
|
peer_id=1001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=stream_a,
|
|
voice_bursts=3,
|
|
)
|
|
stream_b_profile = StreamProfile.voice_over(
|
|
peer_id=1002,
|
|
rf_src=3120002,
|
|
dst_id=235,
|
|
slot=2,
|
|
stream_id=stream_b,
|
|
voice_bursts=2,
|
|
)
|
|
|
|
with UdpBlackBoxScenario(
|
|
ts2_static="91,235",
|
|
fbp_systems={"OBP-1": 3001},
|
|
) as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_a.login()
|
|
master_b.login()
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_bcve()
|
|
fbp_peer.drain(seconds=0.2)
|
|
|
|
master_a.send_dmr(stream_a_profile.packets[0])
|
|
time.sleep(stream_a_profile.cadence_seconds)
|
|
master_b.send_dmr(stream_b_profile.packets[0])
|
|
time.sleep(stream_a_profile.cadence_seconds)
|
|
master_a.send_dmr(stream_a_profile.packets[2])
|
|
time.sleep(stream_a_profile.cadence_seconds)
|
|
master_b.send_dmr(stream_b_profile.packets[1])
|
|
time.sleep(stream_a_profile.cadence_seconds)
|
|
master_a.send_dmr(stream_a_profile.packets[1])
|
|
|
|
captured = [fbp_peer.recv_dmre(timeout=2.0) for _ in range(4)]
|
|
leaked = [
|
|
capture
|
|
for capture in fbp_peer.drain(seconds=0.4)
|
|
if capture.packet[:4] == b"DMRE"
|
|
]
|
|
finally:
|
|
master_a.close()
|
|
master_b.close()
|
|
|
|
seqs_by_stream = {}
|
|
for capture in captured:
|
|
seqs_by_stream.setdefault(capture.fields["stream_id"], []).append(
|
|
capture.fields["seq"]
|
|
)
|
|
|
|
self.assertEqual(seqs_by_stream[bytes_4(stream_a)], [0, 2])
|
|
self.assertEqual(seqs_by_stream[bytes_4(stream_b)], [0, 1])
|
|
self.assertEqual(leaked, [])
|
|
|
|
def test_fbp_static_tg_routes_to_hbp_repeater(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_b.login()
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_bcve()
|
|
fbp_peer.drain(seconds=0.2)
|
|
|
|
fbp_peer.send_fbp(
|
|
PacketSpec(
|
|
peer_id=3001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=1,
|
|
stream_id=0x01020305,
|
|
),
|
|
source_server=9991,
|
|
source_rptr=1001,
|
|
)
|
|
captured = master_b.recv(timeout=2.0)
|
|
finally:
|
|
master_b.close()
|
|
|
|
self.assertEqual(captured.packet[:4], b"DMRD")
|
|
self.assertEqual(captured.fields["rf_src"], bytes_3(3120001))
|
|
self.assertEqual(captured.fields["dst_id"], bytes_3(91))
|
|
self.assertEqual(captured.fields["slot"], 2)
|
|
self.assertEqual(captured.fields["stream_id"], bytes_4(0x01020305))
|
|
|
|
def test_fbp_source_quench_suppresses_hbp_to_fbp_stream(self):
|
|
require_udp_integration_enabled()
|
|
|
|
stream_id = 0x01020306
|
|
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
|
|
master_a = scenario.repeater("MASTER-A", 1001)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_a.login()
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_bcve()
|
|
fbp_peer.drain(seconds=0.2)
|
|
fbp_peer.send_bcsq(91, stream_id)
|
|
|
|
master_a.send_dmr(
|
|
PacketSpec(
|
|
peer_id=1001,
|
|
rf_src=3120001,
|
|
dst_id=91,
|
|
slot=2,
|
|
stream_id=stream_id,
|
|
)
|
|
)
|
|
leaked = [
|
|
capture
|
|
for capture in fbp_peer.drain(seconds=0.5)
|
|
if capture.packet[:4] == b"DMRE"
|
|
]
|
|
finally:
|
|
master_a.close()
|
|
|
|
self.assertEqual(leaked, [])
|
|
|
|
def test_fbp_rejects_packet_with_wrong_network_id(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_b.login()
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_bcve()
|
|
fbp_peer.drain(seconds=0.2)
|
|
|
|
fbp_peer.send_fbp(
|
|
PacketSpec(dst_id=91, slot=1, stream_id=0x01020307),
|
|
network_id=3002,
|
|
source_server=9991,
|
|
source_rptr=1001,
|
|
)
|
|
with self.assertRaises(socket.timeout):
|
|
master_b.recv(timeout=0.5)
|
|
finally:
|
|
master_b.close()
|
|
|
|
def test_fbp_rejects_bad_hash_without_leaking_to_hbp(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_b.login()
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_bcve()
|
|
fbp_peer.drain(seconds=0.2)
|
|
|
|
fbp_peer.send_fbp(
|
|
PacketSpec(peer_id=3001, dst_id=91, slot=1, stream_id=0x0102030E),
|
|
source_server=9991,
|
|
source_rptr=1001,
|
|
corrupt_hash=True,
|
|
)
|
|
with self.assertRaises(socket.timeout):
|
|
master_b.recv(timeout=0.5)
|
|
scenario.process.wait_for_log("FreeBridge HMAC failed", timeout=2.0)
|
|
finally:
|
|
master_b.close()
|
|
|
|
def test_fbp_stale_timestamp_is_source_quenched_without_hbp_leak(self):
|
|
require_udp_integration_enabled()
|
|
|
|
stream_id = 0x0102030F
|
|
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_b.login()
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_bcve()
|
|
fbp_peer.drain(seconds=0.2)
|
|
|
|
fbp_peer.send_fbp(
|
|
PacketSpec(
|
|
peer_id=3001,
|
|
dst_id=91,
|
|
slot=1,
|
|
stream_id=stream_id,
|
|
),
|
|
source_server=9991,
|
|
source_rptr=1001,
|
|
timestamp_ns=1,
|
|
)
|
|
quench = fbp_peer.recv_opcode(b"BCSQ", timeout=2.0)
|
|
with self.assertRaises(socket.timeout):
|
|
master_b.recv(timeout=0.5)
|
|
finally:
|
|
master_b.close()
|
|
|
|
self.assertEqual(quench.packet[4:7], bytes_3(91))
|
|
self.assertEqual(quench.packet[7:11], bytes_4(stream_id))
|
|
|
|
def test_fbp_max_hops_is_source_quenched_without_hbp_leak(self):
|
|
require_udp_integration_enabled()
|
|
|
|
stream_id = 0x01020310
|
|
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_b.login()
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_bcve()
|
|
fbp_peer.drain(seconds=0.2)
|
|
|
|
fbp_peer.send_fbp(
|
|
PacketSpec(
|
|
peer_id=3001,
|
|
dst_id=91,
|
|
slot=1,
|
|
stream_id=stream_id,
|
|
),
|
|
source_server=9991,
|
|
source_rptr=1001,
|
|
hops=10,
|
|
)
|
|
quench = fbp_peer.recv_opcode(b"BCSQ", timeout=2.0)
|
|
with self.assertRaises(socket.timeout):
|
|
master_b.recv(timeout=0.5)
|
|
finally:
|
|
master_b.close()
|
|
|
|
self.assertEqual(quench.packet[4:7], bytes_3(91))
|
|
self.assertEqual(quench.packet[7:11], bytes_4(stream_id))
|
|
|
|
def test_fbp_malformed_short_dmre_is_ignored_and_later_packet_routes(self):
|
|
require_udp_integration_enabled()
|
|
|
|
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_b.login()
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_bcve()
|
|
fbp_peer.drain(seconds=0.2)
|
|
|
|
fbp_peer.send(b"DMRE" + b"\x00" * 10)
|
|
scenario.process.wait_for_log("FreeBridge packet too short", timeout=2.0)
|
|
fbp_peer.send_fbp(
|
|
PacketSpec(peer_id=3001, dst_id=91, slot=1, stream_id=0x01020311),
|
|
source_server=9991,
|
|
source_rptr=1001,
|
|
)
|
|
captured = master_b.recv(timeout=2.0)
|
|
finally:
|
|
master_b.close()
|
|
|
|
self.assertEqual(captured.fields["dst_id"], bytes_3(91))
|
|
self.assertEqual(captured.fields["stream_id"], bytes_4(0x01020311))
|
|
|
|
def test_fbp_link_impairment_does_not_buffer_or_replay_late_packets(self):
|
|
require_udp_integration_enabled()
|
|
|
|
stream_a = 0x01020309
|
|
stream_b = 0x0102030A
|
|
impaired_packets = [
|
|
PacketSpec(peer_id=3001, dst_id=91, slot=1, stream_id=stream_a, seq=0),
|
|
PacketSpec(
|
|
peer_id=3001,
|
|
dst_id=91,
|
|
slot=1,
|
|
stream_id=stream_a,
|
|
seq=1,
|
|
dtype_vseq=1,
|
|
),
|
|
PacketSpec(
|
|
peer_id=3001,
|
|
dst_id=91,
|
|
slot=1,
|
|
stream_id=stream_a,
|
|
seq=2,
|
|
dtype_vseq=2,
|
|
),
|
|
]
|
|
following_packet = PacketSpec(
|
|
peer_id=3001,
|
|
dst_id=91,
|
|
slot=1,
|
|
stream_id=stream_b,
|
|
seq=0,
|
|
rf_src=3120001,
|
|
)
|
|
|
|
with UdpBlackBoxScenario(fbp_systems={"OBP-1": 3001}) as scenario:
|
|
master_b = scenario.repeater("MASTER-B", 1002)
|
|
fbp_peer = scenario.fbp_peer("OBP-1")
|
|
try:
|
|
master_b.login()
|
|
fbp_peer.send_bcka()
|
|
fbp_peer.send_bcve()
|
|
fbp_peer.drain(seconds=0.2)
|
|
|
|
fbp_peer.send_fbp_stream(
|
|
impaired_packets,
|
|
cadence_seconds=0.03,
|
|
impairment=ImpairmentProfiles.provider_vxlan_reorder(),
|
|
source_server=9991,
|
|
source_rptr=1001,
|
|
)
|
|
impaired_captures = [master_b.recv(timeout=2.0) for _ in range(2)]
|
|
leaked = master_b.drain(seconds=0.4)
|
|
|
|
fbp_peer.send_fbp(
|
|
following_packet,
|
|
source_server=9991,
|
|
source_rptr=1001,
|
|
)
|
|
following_capture = master_b.recv(timeout=2.0)
|
|
finally:
|
|
master_b.close()
|
|
|
|
self.assertEqual([packet.fields["seq"] for packet in impaired_captures], [0, 2])
|
|
self.assertEqual(leaked, [])
|
|
self.assertEqual(following_capture.fields["stream_id"], bytes_4(stream_b))
|
|
self.assertEqual(following_capture.fields["seq"], 0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|