from hashlib import sha1 from hmac import new as hmac_new import os import tempfile import unittest import config as freedmr_config from tests.harness.deterministic import ( HBPF_DATA_SYNC, HBPF_SLT_VHEAD, HBPF_SLT_VTERM, HBPF_VOICE, DeterministicScenario, PacketSpec, add_openbridge_system, active_bridge, bytes_3, bytes_4, minimal_config, ) class PacketSpecTest(unittest.TestCase): def test_packet_spec_builds_parseable_dmrd_payload(self): packet = PacketSpec( peer_id=1234, rf_src=3120001, dst_id=91, slot=2, stream_id=0x01020304, seq=7, ) data = packet.data() self.assertEqual(data[:4], b"DMRD") self.assertEqual(data[4], 7) self.assertEqual(data[5:8], bytes_3(3120001)) self.assertEqual(data[8:11], bytes_3(91)) self.assertEqual(data[11:15], bytes_4(1234)) self.assertEqual(data[16:20], bytes_4(0x01020304)) self.assertEqual(len(data), 55) class DeterministicHarnessTest(unittest.TestCase): def test_config_global_use_acl_false_is_boolean_and_stale_time_is_seconds(self): config_text = """ [GLOBAL] USE_ACL: False [ALIASES] TRY_DOWNLOAD: False STALE_DAYS: 1 [MASTER-A] MODE: MASTER ENABLED: True DEFAULT_REFLECTOR: 0 """ fd, path = tempfile.mkstemp(suffix=".cfg") try: with os.fdopen(fd, "w", encoding="utf-8") as handle: handle.write(config_text) parsed = freedmr_config.build_config(path) finally: os.unlink(path) self.assertIs(parsed["GLOBAL"]["USE_ACL"], False) self.assertEqual(parsed["ALIASES"]["STALE_TIME"], 86400) def test_set_alias_updates_bridge_master_globals_and_shared_hblink_config(self): with DeterministicScenario() as scenario: bm = scenario.bm bm.setAlias( {1001: "Peer"}, {3120001: "Subscriber"}, {91: "Talkgroup"}, {3120002: "Local"}, {"9990": "Server"}, {"peer_ids": "checksum"}, ) self.assertEqual(bm.peer_ids, {1001: "Peer"}) self.assertEqual(bm.subscriber_ids, {3120001: "Subscriber"}) self.assertEqual(bm.talkgroup_ids, {91: "Talkgroup"}) self.assertEqual(bm.local_subscriber_ids, {3120002: "Local"}) self.assertEqual(bm.server_ids, {"9990": "Server"}) self.assertEqual(bm.checksums, {"peer_ids": "checksum"}) self.assertEqual(scenario.config["_PEER_IDS"], bm.peer_ids) self.assertEqual(scenario.config["_SUB_IDS"], bm.subscriber_ids) self.assertEqual( scenario.config["_LOCAL_SUBSCRIBER_IDS"], bm.local_subscriber_ids, ) self.assertEqual(scenario.config["_SERVER_IDS"], bm.server_ids) self.assertEqual(scenario.config["CHECKSUMS"], bm.checksums) def test_bridge_reset_tolerates_missing_options_after_disconnect(self): bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("MASTER-B", 2), ), ) with DeterministicScenario(bridges=bridges) as scenario: scenario.config["SYSTEMS"]["MASTER-A"].pop("OPTIONS", None) scenario.config["SYSTEMS"]["MASTER-A"]["_reset"] = True scenario.bm.bridge_reset() self.assertFalse(scenario.config["SYSTEMS"]["MASTER-A"]["_reset"]) self.assertNotIn( "_reloadoptions", scenario.config["SYSTEMS"]["MASTER-A"], ) def private_call(self, dst_id, slot=1, stream_id=0x01020304): return PacketSpec( dst_id=dst_id, slot=slot, stream_id=stream_id, call_type="unit", frame_type=HBPF_VOICE, dtype_vseq=0, ) def private_call_terminator(self, dst_id, slot=1, stream_id=0x01020304): return PacketSpec( dst_id=dst_id, slot=slot, stream_id=stream_id, call_type="unit", frame_type=HBPF_DATA_SYNC, dtype_vseq=HBPF_SLT_VTERM, ) def reflector_bridge(self, bridge_name, active=True): tg = int(bridge_name[1:]) return { bridge_name: [ { "SYSTEM": "MASTER-A", "TS": 2, "TGID": bytes_3(9), "ACTIVE": active, "TIMEOUT": 60, "TO_TYPE": "ON", "OFF": [], "ON": [bytes_3(tg)], "RESET": [], "TIMER": 0, }, { "SYSTEM": "MASTER-B", "TS": 2, "TGID": bytes_3(9), "ACTIVE": False, "TIMEOUT": 60, "TO_TYPE": "ON", "OFF": [], "ON": [bytes_3(tg)], "RESET": [], "TIMER": 0, }, ] } def run_ident_with_override(self, override_ident_tg): config = minimal_config(("MASTER-A",)) config["SYSTEMS"]["MASTER-A"]["VOICE_IDENT"] = True config["SYSTEMS"]["MASTER-A"]["OVERRIDE_IDENT_TG"] = override_ident_tg with DeterministicScenario(config=config) as scenario: scenario.register_peer("MASTER-A", callsign=b"TEST ") scenario.clock.advance(31) scenario.bm.words["en_GB"].update( { "this-is": b"this-is", "freedmr": b"freedmr", "T": b"T", "E": b"E", "S": b"S", } ) destinations = [] old_pkt_gen = scenario.bm.pkt_gen old_sleep = scenario.bm.sleep try: scenario.bm.pkt_gen = ( lambda source_id, dst_id, peer_id, slot, say: destinations.append(dst_id) or iter([PacketSpec(dst_id=dst_id, slot=2, stream_id=0x01020309).data()]) ) scenario.bm.sleep = lambda seconds: None scenario.bm.ident() finally: scenario.bm.pkt_gen = old_pkt_gen scenario.bm.sleep = old_sleep return destinations def test_hbp_group_packet_routes_to_active_bridge_target(self): bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("MASTER-B", 2), ), ) with DeterministicScenario(bridges=bridges) as scenario: packet = PacketSpec(dst_id=91, slot=2, seq=0, dtype_vseq=0) scenario.inject_hbp("MASTER-A", packet) captured = scenario.capture.for_system("MASTER-B") self.assertEqual(len(captured), 1) self.assertEqual(captured[0].fields["dst_id"], bytes_3(91)) self.assertEqual(captured[0].fields["slot"], 2) self.assertEqual(captured[0].fields["stream_id"], bytes_4(0x01020304)) self.assertEqual(scenario.capture.for_system("MASTER-A"), []) def test_hbp_group_packet_rewrites_only_slot_for_cross_slot_route(self): bridges = active_bridge( "91", 91, ( ("MASTER-A", 1), ("MASTER-B", 2), ), ) with DeterministicScenario(bridges=bridges) as scenario: packet = PacketSpec(dst_id=91, slot=1, seq=3, dtype_vseq=0) scenario.inject_hbp("MASTER-A", packet) captured = scenario.capture.for_system("MASTER-B") self.assertEqual(len(captured), 1) self.assertEqual(captured[0].fields["seq"], 3) self.assertEqual(captured[0].fields["rf_src"], bytes_3(3120001)) self.assertEqual(captured[0].fields["dst_id"], bytes_3(91)) self.assertEqual(captured[0].fields["peer_id"], bytes_4(1001)) self.assertEqual(captured[0].fields["slot"], 2) self.assertEqual(captured[0].fields["stream_id"], bytes_4(0x01020304)) self.assertEqual(captured[0].packet[:15], packet.data()[:15]) self.assertEqual(captured[0].packet[16:], packet.data()[16:]) self.assertNotEqual(captured[0].packet[15], packet.data()[15]) self.assertEqual(scenario.capture.for_system("MASTER-A"), []) def test_startup_static_tgs_reject_reserved_control_targets(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["SYSTEMS"]["MASTER-A"]["TS1_STATIC"] = "8,16777215,A93,91" config["SYSTEMS"]["MASTER-A"]["TS2_STATIC"] = "7,16777216,B94,92" with DeterministicScenario(config=config) as scenario: scenario.bm.make_static_tgs() self.assertNotIn("8", scenario.bridge_state) self.assertNotIn("7", scenario.bridge_state) self.assertNotIn("16777215", scenario.bridge_state) self.assertNotIn("16777216", scenario.bridge_state) self.assertNotIn("A93", scenario.bridge_state) self.assertNotIn("B94", scenario.bridge_state) self.assertIn("91", scenario.bridge_state) self.assertIn("92", scenario.bridge_state) ts1_entry = next( entry for entry in scenario.bridge_state["91"] if entry["SYSTEM"] == "MASTER-A" and entry["TS"] == 1 ) ts2_entry = next( entry for entry in scenario.bridge_state["92"] if entry["SYSTEM"] == "MASTER-A" and entry["TS"] == 2 ) self.assertTrue(ts1_entry["ACTIVE"]) self.assertTrue(ts2_entry["ACTIVE"]) def test_options_static_ts1_rejects_reserved_control_targets(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["SYSTEMS"]["MASTER-A"]["OPTIONS"] = "TS1=8,16777215,A93,91;TS2=7,16777216,B94,92;DIAL=0;TIMER=1" with DeterministicScenario(config=config) as scenario: scenario.bm.options_config() self.assertNotIn("8", scenario.bridge_state) self.assertNotIn("7", scenario.bridge_state) self.assertNotIn("16777215", scenario.bridge_state) self.assertNotIn("16777216", scenario.bridge_state) self.assertNotIn("A93", scenario.bridge_state) self.assertNotIn("B94", scenario.bridge_state) self.assertIn("91", scenario.bridge_state) self.assertIn("92", scenario.bridge_state) ts1_entry = next( entry for entry in scenario.bridge_state["91"] if entry["SYSTEM"] == "MASTER-A" and entry["TS"] == 1 ) ts2_entry = next( entry for entry in scenario.bridge_state["92"] if entry["SYSTEM"] == "MASTER-A" and entry["TS"] == 2 ) self.assertTrue(ts1_entry["ACTIVE"]) self.assertTrue(ts2_entry["ACTIVE"]) def test_options_static_tg_whitespace_is_normalized(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["SYSTEMS"]["MASTER-A"]["OPTIONS"] = "TS1=91, 92;TS2=93, 94;DIAL=0;TIMER=1" with DeterministicScenario(config=config) as scenario: scenario.bm.options_config() self.assertIn("91", scenario.bridge_state) self.assertIn("92", scenario.bridge_state) self.assertIn("93", scenario.bridge_state) self.assertIn("94", scenario.bridge_state) def test_options_invalid_ident_tg_does_not_block_valid_dial(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["SYSTEMS"]["MASTER-A"]["OVERRIDE_IDENT_TG"] = 9 config["SYSTEMS"]["MASTER-A"]["OPTIONS"] = "IDENTTG=A;DIAL=91;TIMER=1" with DeterministicScenario(config=config) as scenario: scenario.bm.options_config() master_entry = next( entry for entry in scenario.bridge_state["#91"] if entry["SYSTEM"] == "MASTER-A" ) self.assertTrue(master_entry["ACTIVE"]) self.assertEqual(config["SYSTEMS"]["MASTER-A"]["DEFAULT_REFLECTOR"], 91) self.assertEqual(config["SYSTEMS"]["MASTER-A"]["OVERRIDE_IDENT_TG"], 9) def test_options_invalid_voice_does_not_block_valid_static_tg(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["SYSTEMS"]["MASTER-A"]["VOICE_IDENT"] = True config["SYSTEMS"]["MASTER-A"]["OPTIONS"] = "VOICE=A;TS1=91;DIAL=0;TIMER=1" with DeterministicScenario(config=config) as scenario: scenario.bm.options_config() ts1_entry = next( entry for entry in scenario.bridge_state["91"] if entry["SYSTEM"] == "MASTER-A" and entry["TS"] == 1 ) self.assertTrue(ts1_entry["ACTIVE"]) self.assertTrue(config["SYSTEMS"]["MASTER-A"]["VOICE_IDENT"]) def test_options_invalid_single_does_not_block_valid_static_tg(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["SYSTEMS"]["MASTER-A"]["SINGLE_MODE"] = True config["SYSTEMS"]["MASTER-A"]["OPTIONS"] = "SINGLE=A;TS2=92;DIAL=0;TIMER=1" with DeterministicScenario(config=config) as scenario: scenario.bm.options_config() ts2_entry = next( entry for entry in scenario.bridge_state["92"] if entry["SYSTEM"] == "MASTER-A" and entry["TS"] == 2 ) self.assertTrue(ts2_entry["ACTIVE"]) self.assertTrue(config["SYSTEMS"]["MASTER-A"]["SINGLE_MODE"]) def test_options_boolean_fields_reject_values_other_than_zero_or_one(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["SYSTEMS"]["MASTER-A"]["VOICE_IDENT"] = False config["SYSTEMS"]["MASTER-A"]["SINGLE_MODE"] = True config["SYSTEMS"]["MASTER-A"]["OPTIONS"] = "VOICE=2;SINGLE=2;DIAL=0;TIMER=1" with DeterministicScenario(config=config) as scenario: scenario.bm.options_config() self.assertFalse(config["SYSTEMS"]["MASTER-A"]["VOICE_IDENT"]) self.assertTrue(config["SYSTEMS"]["MASTER-A"]["SINGLE_MODE"]) def test_options_empty_dial_disables_default_reflector(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["SYSTEMS"]["MASTER-A"]["DEFAULT_REFLECTOR"] = 91 config["SYSTEMS"]["MASTER-A"]["OPTIONS"] = "DIAL=;TIMER=1" with DeterministicScenario(config=config) as scenario: scenario.bm.make_default_reflectors() scenario.bm.options_config() master_entry = next( entry for entry in scenario.bridge_state["#91"] if entry["SYSTEM"] == "MASTER-A" ) self.assertFalse(master_entry["ACTIVE"]) self.assertEqual(config["SYSTEMS"]["MASTER-A"]["DEFAULT_REFLECTOR"], 0) def test_options_invalid_timer_does_not_block_valid_static_tgs(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["SYSTEMS"]["MASTER-A"]["DEFAULT_UA_TIMER"] = 3 config["SYSTEMS"]["MASTER-A"]["OPTIONS"] = "TIMER=A;TS1=91;TS2=92;DIAL=0" with DeterministicScenario(config=config) as scenario: with self.assertLogs(scenario.bm.logger, level="DEBUG") as logs: scenario.bm.options_config() ts1_entry = next( entry for entry in scenario.bridge_state["91"] if entry["SYSTEM"] == "MASTER-A" and entry["TS"] == 1 ) ts2_entry = next( entry for entry in scenario.bridge_state["92"] if entry["SYSTEM"] == "MASTER-A" and entry["TS"] == 2 ) self.assertTrue(ts1_entry["ACTIVE"]) self.assertTrue(ts2_entry["ACTIVE"]) self.assertEqual(ts1_entry["TIMEOUT"], 180) self.assertEqual(ts2_entry["TIMEOUT"], 180) self.assertEqual(config["SYSTEMS"]["MASTER-A"]["DEFAULT_UA_TIMER"], 3) self.assertIn("DEFAULT_UA_TIMER is not an integer", "\n".join(logs.output)) def test_ident_override_accepts_valid_string_tg(self): destinations = self.run_ident_with_override("91") self.assertEqual(destinations, [bytes_3(91)]) def test_ident_override_empty_or_false_uses_all_call(self): self.assertEqual(self.run_ident_with_override(""), [bytes_3(16777215)]) self.assertEqual(self.run_ident_with_override(False), [bytes_3(16777215)]) self.assertEqual(self.run_ident_with_override(0), [bytes_3(16777215)]) def test_ident_override_rejects_control_tg(self): with self.assertLogs("bridge_master", level="WARNING") as logs: destinations = self.run_ident_with_override(9) self.assertEqual(destinations, [bytes_3(16777215)]) self.assertIn("invalid OVERRIDE_IDENT_TG 9", "\n".join(logs.output)) def test_ident_override_rejects_malformed_tg(self): with self.assertLogs("bridge_master", level="WARNING") as logs: destinations = self.run_ident_with_override("A91") self.assertEqual(destinations, [bytes_3(16777215)]) self.assertIn("invalid OVERRIDE_IDENT_TG A91", "\n".join(logs.output)) def test_ident_override_rejects_all_call(self): with self.assertLogs("bridge_master", level="WARNING") as logs: destinations = self.run_ident_with_override(16777215) self.assertEqual(destinations, [bytes_3(16777215)]) self.assertIn("invalid OVERRIDE_IDENT_TG 16777215", "\n".join(logs.output)) def test_ident_cancellation_does_not_block_later_ident(self): config = minimal_config(("MASTER-A",)) config["SYSTEMS"]["MASTER-A"]["VOICE_IDENT"] = True with DeterministicScenario(config=config) as scenario: scenario.register_peer("MASTER-A", callsign=b"TEST ") scenario.bm.words["en_GB"].update( { "this-is": b"this-is", "freedmr": b"freedmr", "T": b"T", "E": b"E", "S": b"S", } ) original_pkt_gen = scenario.bm.pkt_gen original_send_voice_packet = scenario.bm.sendVoicePacket original_sleep = scenario.bm.sleep stream_id = 0x01020309 first_ident = [ PacketSpec(dst_id=16777215, slot=2, stream_id=stream_id, seq=1).data(), PacketSpec(dst_id=16777215, slot=2, stream_id=stream_id, seq=2).data(), ] second_ident = [ PacketSpec(dst_id=16777215, slot=2, stream_id=stream_id + 1, seq=1).data() ] generated = [iter(first_ident), iter(second_ident)] sent_packets = [] def send_then_cancel(system, pkt, source_id, dest_id, slot): original_send_voice_packet(system, pkt, source_id, dest_id, slot) sent_packets.append(pkt) if len(sent_packets) == 1: scenario.bm._cancelGeneratedVoice(slot) try: scenario.bm.pkt_gen = lambda source_id, dst_id, peer_id, slot, say: generated.pop(0) scenario.bm.sendVoicePacket = send_then_cancel scenario.bm.sleep = lambda seconds: None scenario.clock.advance(31) scenario.bm.ident() slot = scenario.systems["MASTER-A"].STATUS[2] self.assertTrue(slot["TX_PROMPT_CANCEL"]) self.assertFalse(slot["TX_PROMPT_ACTIVE"]) scenario.clock.advance(31) scenario.bm.ident() finally: scenario.bm.pkt_gen = original_pkt_gen scenario.bm.sendVoicePacket = original_send_voice_packet scenario.bm.sleep = original_sleep captured = scenario.capture.for_system("MASTER-A") self.assertEqual(len(sent_packets), 2) self.assertEqual(len(captured), 2) self.assertEqual(captured[0].fields["stream_id"], bytes_4(stream_id)) self.assertEqual(captured[1].fields["stream_id"], bytes_4(stream_id + 1)) def test_dial_a_tg_private_call_on_slot_1_activates_ts2_reflector(self): with DeterministicScenario() as scenario: scenario.inject_hbp("MASTER-A", self.private_call(235, slot=1)) bridge = scenario.bridge_state["#235"] source_entry = [entry for entry in bridge if entry["SYSTEM"] == "MASTER-A"][0] self.assertEqual(source_entry["TS"], 2) self.assertEqual(source_entry["TGID"], bytes_3(9)) self.assertTrue(source_entry["ACTIVE"]) self.assertEqual(scenario.capture.packets, []) def test_dial_a_tg_private_call_on_slot_1_retunes_active_ts2_reflector(self): bridges = {} bridges.update(self.reflector_bridge("#235", active=True)) with DeterministicScenario(bridges=bridges) as scenario: scenario.inject_hbp("MASTER-A", self.private_call(236, slot=1)) old_entry = [ entry for entry in scenario.bridge_state["#235"] if entry["SYSTEM"] == "MASTER-A" ][0] new_entry = [ entry for entry in scenario.bridge_state["#236"] if entry["SYSTEM"] == "MASTER-A" ][0] self.assertEqual(old_entry["TS"], 2) self.assertFalse(old_entry["ACTIVE"]) self.assertEqual(new_entry["TS"], 2) self.assertTrue(new_entry["ACTIVE"]) self.assertEqual(scenario.capture.packets, []) def test_dial_a_tg_disconnect_on_slot_1_deactivates_active_ts2_reflector(self): bridges = {} bridges.update(self.reflector_bridge("#235", active=True)) with DeterministicScenario(bridges=bridges) as scenario: scenario.inject_hbp("MASTER-A", self.private_call(4000, slot=1)) source_entry = [ entry for entry in scenario.bridge_state["#235"] if entry["SYSTEM"] == "MASTER-A" ][0] self.assertEqual(source_entry["TS"], 2) self.assertFalse(source_entry["ACTIVE"]) self.assertEqual(scenario.capture.packets, []) def test_dial_a_tg_disconnect_is_scoped_to_receiving_master(self): bridges = { "#235": [ { "SYSTEM": "MASTER-A", "TS": 2, "TGID": bytes_3(9), "ACTIVE": True, "TIMEOUT": 60, "TO_TYPE": "ON", "OFF": [bytes_3(4000)], "ON": [bytes_3(235)], "RESET": [], "TIMER": 111, }, { "SYSTEM": "MASTER-B", "TS": 2, "TGID": bytes_3(9), "ACTIVE": True, "TIMEOUT": 60, "TO_TYPE": "ON", "OFF": [bytes_3(4000)], "ON": [bytes_3(235)], "RESET": [], "TIMER": 222, }, ] } with DeterministicScenario(bridges=bridges) as scenario: scenario.inject_hbp("MASTER-A", self.private_call(4000, slot=1)) master_a_entry = [ entry for entry in scenario.bridge_state["#235"] if entry["SYSTEM"] == "MASTER-A" ][0] master_b_entry = [ entry for entry in scenario.bridge_state["#235"] if entry["SYSTEM"] == "MASTER-B" ][0] self.assertFalse(master_a_entry["ACTIVE"]) self.assertEqual(master_a_entry["TIMER"], scenario.clock.now) self.assertTrue(master_b_entry["ACTIVE"]) self.assertEqual(master_b_entry["TIMER"], 222) self.assertEqual(scenario.capture.packets, []) def test_dial_a_tg_reserved_target_on_slot_1_does_not_create_reflector(self): for target in (5, 6, 7): with self.subTest(target=target): with DeterministicScenario() as scenario: scenario.inject_hbp("MASTER-A", self.private_call(target, slot=1)) self.assertNotIn("#{}".format(target), scenario.bridge_state) self.assertEqual(scenario.capture.packets, []) def test_dial_a_tg_reserved_target_on_slot_1_does_not_retune_active_reflector(self): for target in (5, 6, 7): with self.subTest(target=target): bridges = {} bridges.update(self.reflector_bridge("#235", active=True)) with DeterministicScenario(bridges=bridges) as scenario: scenario.inject_hbp("MASTER-A", self.private_call(target, slot=1)) source_entry = [ entry for entry in scenario.bridge_state["#235"] if entry["SYSTEM"] == "MASTER-A" ][0] self.assertTrue(source_entry["ACTIVE"]) self.assertNotIn("#{}".format(target), scenario.bridge_state) self.assertEqual(scenario.capture.packets, []) def test_dial_a_tg_local_tg_9_on_slot_1_does_not_create_or_retune_reflector(self): bridges = {} bridges.update(self.reflector_bridge("#235", active=True)) with DeterministicScenario(bridges=bridges) as scenario: scenario.inject_hbp("MASTER-A", self.private_call(9, slot=1)) source_entry = [ entry for entry in scenario.bridge_state["#235"] if entry["SYSTEM"] == "MASTER-A" ][0] self.assertTrue(source_entry["ACTIVE"]) self.assertNotIn("#9", scenario.bridge_state) self.assertEqual(scenario.capture.packets, []) def test_dial_a_tg_allstar_target_8_reports_busy_when_disabled(self): bridges = {} bridges.update(self.reflector_bridge("#235", active=True)) with DeterministicScenario(bridges=bridges) as scenario: spoken = [] scenario.bm.CONFIG["ALLSTAR"]["ENABLED"] = False scenario.bm.words["en_GB"].update( { "busy": b"busy", "linkedto": b"linkedto", "silence": b"silence", "to": b"to", } ) scenario.bm.pkt_gen = lambda source_id, dst_id, peer_id, slot, say: spoken.append(list(say)) or iter(()) scenario.inject_hbp("MASTER-A", self.private_call(8, slot=1)) scenario.inject_hbp("MASTER-A", self.private_call_terminator(8, slot=1)) source_entry = [ entry for entry in scenario.bridge_state["#235"] if entry["SYSTEM"] == "MASTER-A" ][0] self.assertFalse(scenario.bm.systems["MASTER-A"].STATUS[1]["_allStarMode"]) self.assertTrue(source_entry["ACTIVE"]) self.assertNotIn("#8", scenario.bridge_state) self.assertTrue(spoken) self.assertIn(b"busy", spoken[-1]) self.assertNotIn(b"linkedto", spoken[-1]) self.assertEqual(scenario.reactor.later, []) self.assertEqual(scenario.capture.packets, []) def test_dial_a_tg_allstar_target_8_enters_allstar_mode_when_enabled(self): bridges = {} bridges.update(self.reflector_bridge("#235", active=True)) with DeterministicScenario(bridges=bridges) as scenario: spoken = [] scenario.bm.CONFIG["ALLSTAR"]["ENABLED"] = True scenario.bm.words["en_GB"].update( { "all-star-link-mode": b"all-star-link-mode", "linkedto": b"linkedto", "silence": b"silence", "to": b"to", } ) scenario.bm.pkt_gen = lambda source_id, dst_id, peer_id, slot, say: spoken.append(list(say)) or iter(()) scenario.inject_hbp("MASTER-A", self.private_call(8, slot=1)) scenario.inject_hbp("MASTER-A", self.private_call_terminator(8, slot=1)) source_entry = [ entry for entry in scenario.bridge_state["#235"] if entry["SYSTEM"] == "MASTER-A" ][0] self.assertTrue(scenario.bm.systems["MASTER-A"].STATUS[1]["_allStarMode"]) self.assertTrue(source_entry["ACTIVE"]) self.assertNotIn("#8", scenario.bridge_state) self.assertTrue(spoken) self.assertIn(b"all-star-link-mode", spoken[-1]) self.assertNotIn(b"linkedto", spoken[-1]) self.assertEqual(len(scenario.reactor.later), 1) self.assertEqual(scenario.reactor.later[0][0], 30) self.assertEqual(scenario.capture.packets, []) def test_dial_a_tg_reserved_control_range_reports_busy_without_retune(self): bridges = {} bridges.update(self.reflector_bridge("#235", active=True)) with DeterministicScenario(bridges=bridges) as scenario: spoken = [] scenario.bm.words["en_GB"].update( { "busy": b"busy", "linkedto": b"linkedto", "silence": b"silence", "to": b"to", } ) scenario.bm.pkt_gen = lambda source_id, dst_id, peer_id, slot, say: spoken.append(list(say)) or iter(()) scenario.inject_hbp("MASTER-A", self.private_call(4001, slot=1)) scenario.inject_hbp("MASTER-A", self.private_call_terminator(4001, slot=1)) source_entry = [ entry for entry in scenario.bridge_state["#235"] if entry["SYSTEM"] == "MASTER-A" ][0] self.assertTrue(source_entry["ACTIVE"]) self.assertNotIn("#4001", scenario.bridge_state) self.assertTrue(spoken) self.assertIn(b"busy", spoken[-1]) self.assertNotIn(b"linkedto", spoken[-1]) self.assertEqual(scenario.capture.packets, []) def test_dial_a_tg_echo_target_9990_is_linkable(self): with DeterministicScenario() as scenario: spoken = [] scenario.bm.words["en_GB"].update( { "0": b"0", "9": b"9", "linkedto": b"linkedto", "silence": b"silence", "to": b"to", } ) scenario.bm.pkt_gen = lambda source_id, dst_id, peer_id, slot, say: spoken.append(list(say)) or iter(()) scenario.inject_hbp("MASTER-A", self.private_call(9990, slot=1)) scenario.inject_hbp("MASTER-A", self.private_call_terminator(9990, slot=1)) source_entry = [ entry for entry in scenario.bridge_state["#9990"] if entry["SYSTEM"] == "MASTER-A" ][0] self.assertEqual(source_entry["TS"], 2) self.assertEqual(source_entry["TGID"], bytes_3(9)) self.assertTrue(source_entry["ACTIVE"]) self.assertEqual(source_entry["TIMEOUT"], 60) self.assertTrue(spoken) self.assertIn(b"linkedto", spoken[-1]) self.assertEqual(spoken[-1].count(b"9"), 3) self.assertIn(b"0", spoken[-1]) self.assertEqual(scenario.capture.packets, []) def test_dial_a_tg_policy_max_999999_is_linkable(self): with DeterministicScenario() as scenario: spoken = [] scenario.bm.words["en_GB"].update( { "9": b"9", "linkedto": b"linkedto", "silence": b"silence", "to": b"to", } ) scenario.bm.pkt_gen = lambda source_id, dst_id, peer_id, slot, say: spoken.append(list(say)) or iter(()) scenario.inject_hbp("MASTER-A", self.private_call(999999, slot=1)) scenario.inject_hbp("MASTER-A", self.private_call_terminator(999999, slot=1)) source_entry = [ entry for entry in scenario.bridge_state["#999999"] if entry["SYSTEM"] == "MASTER-A" ][0] self.assertEqual(source_entry["TS"], 2) self.assertTrue(source_entry["ACTIVE"]) self.assertTrue(spoken) self.assertIn(b"linkedto", spoken[-1]) self.assertEqual(spoken[-1].count(b"9"), 6) self.assertEqual(scenario.capture.packets, []) def test_startup_default_reflector_rejects_reserved_control_targets(self): for default_reflector in (6, 7, 8): with self.subTest(default_reflector=default_reflector): config = minimal_config(("MASTER-A", "MASTER-B")) config["SYSTEMS"]["MASTER-A"]["DEFAULT_REFLECTOR"] = default_reflector with DeterministicScenario(config=config) as scenario: with self.assertLogs(scenario.bm.logger, level="WARNING"): scenario.bm.make_default_reflectors() self.assertNotIn(f"#{default_reflector}", scenario.bridge_state) self.assertEqual(config["SYSTEMS"]["MASTER-A"]["DEFAULT_REFLECTOR"], 0) def test_startup_default_reflector_accepts_linkable_target(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["SYSTEMS"]["MASTER-A"]["DEFAULT_REFLECTOR"] = 91 with DeterministicScenario(config=config) as scenario: scenario.bm.make_default_reflectors() master_entry = next( entry for entry in scenario.bridge_state["#91"] if entry["SYSTEM"] == "MASTER-A" ) self.assertTrue(master_entry["ACTIVE"]) self.assertEqual(master_entry["TS"], 2) self.assertEqual(master_entry["ON"], [bytes_3(91)]) def test_startup_default_reflector_accepts_policy_max_999999(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["SYSTEMS"]["MASTER-A"]["DEFAULT_REFLECTOR"] = 999999 with DeterministicScenario(config=config) as scenario: scenario.bm.make_default_reflectors() master_entry = next( entry for entry in scenario.bridge_state["#999999"] if entry["SYSTEM"] == "MASTER-A" ) self.assertTrue(master_entry["ACTIVE"]) self.assertEqual(master_entry["TS"], 2) self.assertEqual(master_entry["ON"], [bytes_3(999999)]) def test_startup_default_reflector_rejects_above_policy_max(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["SYSTEMS"]["MASTER-A"]["DEFAULT_REFLECTOR"] = 1000000 with DeterministicScenario(config=config) as scenario: with self.assertLogs(scenario.bm.logger, level="WARNING"): scenario.bm.make_default_reflectors() self.assertNotIn("#1000000", scenario.bridge_state) self.assertEqual(config["SYSTEMS"]["MASTER-A"]["DEFAULT_REFLECTOR"], 0) def test_startup_default_reflector_logs_invalid_value(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["SYSTEMS"]["MASTER-A"]["DEFAULT_REFLECTOR"] = 1000000 with DeterministicScenario(config=config) as scenario: with self.assertLogs(scenario.bm.logger, level="WARNING") as logs: scenario.bm.make_default_reflectors() self.assertIn("MASTER-A default dial-a-tg 1000000 is invalid", "\n".join(logs.output)) self.assertEqual(config["SYSTEMS"]["MASTER-A"]["DEFAULT_REFLECTOR"], 0) def test_options_default_reflector_rejects_above_policy_max(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["SYSTEMS"]["MASTER-A"]["OPTIONS"] = "DIAL=1000000;TIMER=1" with DeterministicScenario(config=config) as scenario: scenario.bm.options_config() self.assertNotIn("#1000000", scenario.bridge_state) self.assertEqual(config["SYSTEMS"]["MASTER-A"]["DEFAULT_REFLECTOR"], 0) def test_options_invalid_default_reflector_disables_active_default(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["SYSTEMS"]["MASTER-A"]["DEFAULT_REFLECTOR"] = 91 config["SYSTEMS"]["MASTER-A"]["OPTIONS"] = "DIAL=1000000;TIMER=1" with DeterministicScenario(config=config) as scenario: scenario.bm.make_default_reflectors() active_entry = next( entry for entry in scenario.bridge_state["#91"] if entry["SYSTEM"] == "MASTER-A" ) self.assertTrue(active_entry["ACTIVE"]) scenario.bm.options_config() disabled_entry = next( entry for entry in scenario.bridge_state["#91"] if entry["SYSTEM"] == "MASTER-A" ) self.assertFalse(disabled_entry["ACTIVE"]) self.assertEqual(config["SYSTEMS"]["MASTER-A"]["DEFAULT_REFLECTOR"], 0) def test_dial_a_tg_above_policy_max_reports_busy_without_retune(self): bridges = {} bridges.update(self.reflector_bridge("#235", active=True)) with DeterministicScenario(bridges=bridges) as scenario: spoken = [] scenario.bm.words["en_GB"].update( { "busy": b"busy", "linkedto": b"linkedto", "silence": b"silence", "to": b"to", } ) scenario.bm.pkt_gen = lambda source_id, dst_id, peer_id, slot, say: spoken.append(list(say)) or iter(()) scenario.inject_hbp("MASTER-A", self.private_call(1000000, slot=1)) scenario.inject_hbp("MASTER-A", self.private_call_terminator(1000000, slot=1)) source_entry = [ entry for entry in scenario.bridge_state["#235"] if entry["SYSTEM"] == "MASTER-A" ][0] self.assertTrue(source_entry["ACTIVE"]) self.assertNotIn("#1000000", scenario.bridge_state) self.assertTrue(spoken) self.assertIn(b"busy", spoken[-1]) self.assertNotIn(b"linkedto", spoken[-1]) self.assertEqual(scenario.capture.packets, []) def test_dial_a_tg_linkable_target_creates_active_fbp_target(self): config = minimal_config(("MASTER-A", "MASTER-B")) add_openbridge_system(config, "OBP-1", network_id=3001) with DeterministicScenario(config=config) as scenario: scenario.inject_hbp("MASTER-A", self.private_call(235, slot=1)) bridge = scenario.bridge_state["#235"] source_entry = [entry for entry in bridge if entry["SYSTEM"] == "MASTER-A"][0] fbp_entry = [entry for entry in bridge if entry["SYSTEM"] == "OBP-1"][0] self.assertTrue(source_entry["ACTIVE"]) self.assertEqual(source_entry["TS"], 2) self.assertEqual(source_entry["TGID"], bytes_3(9)) self.assertTrue(fbp_entry["ACTIVE"]) self.assertEqual(fbp_entry["TS"], 1) self.assertEqual(fbp_entry["TGID"], bytes_3(235)) self.assertEqual(fbp_entry["TO_TYPE"], "NONE") self.assertEqual(scenario.capture.packets, []) def test_dial_a_tg_bcsq_uses_reflector_tg_for_fbp_target(self): config = minimal_config(("MASTER-A", "MASTER-B")) add_openbridge_system(config, "OBP-1", network_id=3001) stream_id = bytes_4(0x01020344) with DeterministicScenario(config=config) as scenario: scenario.inject_hbp("MASTER-A", self.private_call(235, slot=1)) config["SYSTEMS"]["OBP-1"]["_bcsq"] = {bytes_3(235): stream_id} scenario.inject_hbp( "MASTER-A", PacketSpec(dst_id=9, slot=2, stream_id=stream_id), ) self.assertEqual(scenario.capture.for_system("OBP-1"), []) def test_dial_a_tg_echo_target_9990_does_not_create_fbp_target(self): config = minimal_config(("MASTER-A", "MASTER-B")) add_openbridge_system(config, "OBP-1", network_id=3001) with DeterministicScenario(config=config) as scenario: scenario.inject_hbp("MASTER-A", self.private_call(9990, slot=1)) bridge = scenario.bridge_state["#9990"] systems = [entry["SYSTEM"] for entry in bridge] self.assertIn("MASTER-A", systems) self.assertIn("MASTER-B", systems) self.assertNotIn("OBP-1", systems) self.assertEqual(scenario.capture.packets, []) def test_dial_a_tg_information_service_schedules_file_and_silence_prompt(self): with DeterministicScenario() as scenario: generated = [] scenario.bm.pkt_gen = lambda source_id, dst_id, peer_id, slot, say: generated.append(list(say)) or iter(()) scenario.inject_hbp("MASTER-A", self.private_call(9991, slot=1)) scenario.inject_hbp("MASTER-A", self.private_call_terminator(9991, slot=1)) scheduled = [call[0] for call in scenario.reactor.thread_calls] self.assertNotIn("#9991", scenario.bridge_state) self.assertIn(scenario.bm.playFileOnRequest, scheduled) self.assertIn(scenario.bm.sendSpeech, scheduled) self.assertEqual(generated, [[b""]]) self.assertEqual(scenario.capture.packets, []) def test_dial_a_tg_private_call_timeout_does_not_emit_group_voice_end(self): config = minimal_config(("MASTER-A",)) config["REPORTS"]["REPORT"] = True with DeterministicScenario(config=config) as scenario: scenario.clock.advance(100) scenario.inject_hbp("MASTER-A", self.private_call(235, slot=1)) scenario.clock.advance(6) scenario.bm.stream_trimmer_loop() self.assertFalse( any( event.startswith(b"GROUP VOICE,END,RX") for event in scenario.reports["MASTER-A"].events ) ) def test_send_speech_tracks_stream_on_router_system_not_global_system(self): config = minimal_config(("MASTER-A", "MASTER-B")) packet = PacketSpec(dst_id=9, slot=2, stream_id=0x01020309).data() stream_id = bytes_4(0x01020309) old_sleep = None old_global_system = None had_global_system = False with DeterministicScenario(config=config) as scenario: old_sleep = scenario.bm.sleep had_global_system = hasattr(scenario.bm, "system") if had_global_system: old_global_system = scenario.bm.system try: scenario.bm.sleep = lambda seconds: None scenario.bm.system = "MASTER-B" scenario.bm.sendSpeech(scenario.systems["MASTER-A"], iter([packet])) finally: scenario.bm.sleep = old_sleep if had_global_system: scenario.bm.system = old_global_system else: delattr(scenario.bm, "system") self.assertIn(stream_id, scenario.systems["MASTER-A"].STATUS) self.assertNotIn(stream_id, scenario.systems["MASTER-B"].STATUS) captured = scenario.capture.for_system("MASTER-A") self.assertEqual(len(captured), 1) self.assertEqual(captured[0].fields["dst_id"], bytes_3(9)) self.assertEqual(captured[0].fields["slot"], 2) def test_generated_voice_first_packet_records_prompt_activity(self): config = minimal_config(("MASTER-A",)) packet = PacketSpec(dst_id=9, slot=2, stream_id=0x01020309).data() stream_id = bytes_4(0x01020309) with DeterministicScenario(config=config) as scenario: slot = scenario.systems["MASTER-A"].STATUS[2] scenario.bm.sendVoicePacket( scenario.systems["MASTER-A"], packet, bytes_3(5000), bytes_3(9), slot, ) captured = scenario.capture.for_system("MASTER-A") self.assertEqual(len(captured), 1) self.assertTrue(slot["TX_PROMPT_ACTIVE"]) self.assertFalse(slot["TX_PROMPT_CANCEL"]) self.assertEqual(slot["TX_PROMPT_STREAM_ID"], stream_id) self.assertEqual(slot["TX_PROMPT_TGID"], bytes_3(9)) self.assertEqual(slot["TX_PROMPT_RFS"], bytes_3(5000)) self.assertEqual(slot["TX_PROMPT_TIME"], scenario.clock.time()) def test_send_speech_stops_when_generated_prompt_is_cancelled(self): config = minimal_config(("MASTER-A",)) packets = iter( [ PacketSpec(dst_id=9, slot=2, stream_id=0x01020309, seq=1).data(), PacketSpec(dst_id=9, slot=2, stream_id=0x01020309, seq=2).data(), ] ) with DeterministicScenario(config=config) as scenario: original_send_voice_packet = scenario.bm.sendVoicePacket original_sleep = scenario.bm.sleep def send_then_cancel(system, pkt, source_id, dest_id, slot): original_send_voice_packet(system, pkt, source_id, dest_id, slot) scenario.bm._cancelGeneratedVoice(slot) try: scenario.bm.sendVoicePacket = send_then_cancel scenario.bm.sleep = lambda seconds: None scenario.bm.sendSpeech(scenario.systems["MASTER-A"], packets) finally: scenario.bm.sendVoicePacket = original_send_voice_packet scenario.bm.sleep = original_sleep captured = scenario.capture.for_system("MASTER-A") slot = scenario.systems["MASTER-A"].STATUS[2] self.assertEqual(len(captured), 1) self.assertEqual(captured[0].fields["seq"], 1) self.assertTrue(slot["TX_PROMPT_CANCEL"]) self.assertFalse(slot["TX_PROMPT_ACTIVE"]) def test_real_hbp_voice_cancels_generated_prompt_and_keeps_late_entry_lc_rewrite(self): config = minimal_config(("MASTER-A", "MASTER-B")) bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("MASTER-B", 2), ), ) prompt_packet = PacketSpec(dst_id=9, slot=2, stream_id=0x01020309).data() real_packet = PacketSpec( dst_id=91, slot=2, stream_id=0x01020304, seq=7, call_type="group", frame_type=HBPF_VOICE, dtype_vseq=1, payload=b"\xff" * 33, ) with DeterministicScenario(config=config, bridges=bridges) as scenario: target_slot = scenario.systems["MASTER-B"].STATUS[2] scenario.bm.sendVoicePacket( scenario.systems["MASTER-B"], prompt_packet, bytes_3(5000), bytes_3(9), target_slot, ) scenario.inject_hbp("MASTER-A", real_packet) captured = scenario.capture.for_system("MASTER-B") self.assertEqual(len(captured), 2) self.assertEqual(captured[0].fields["dst_id"], bytes_3(9)) self.assertEqual(captured[1].fields["dst_id"], bytes_3(91)) self.assertEqual(captured[1].fields["dtype_vseq"], 1) self.assertNotEqual( captured[1].fields["dmr_payload"], real_packet.data()[20:53], ) self.assertTrue(target_slot["TX_PROMPT_CANCEL"]) self.assertFalse(target_slot["TX_PROMPT_ACTIVE"]) def test_dial_a_tg_retune_keeps_created_fbp_targets_active(self): config = minimal_config(("MASTER-A", "MASTER-B")) add_openbridge_system(config, "OBP-1", network_id=3001) with DeterministicScenario(config=config) as scenario: scenario.inject_hbp("MASTER-A", self.private_call(235, slot=1)) scenario.inject_hbp("MASTER-A", self.private_call(236, slot=1, stream_id=0x01020305)) old_master_entry = [ entry for entry in scenario.bridge_state["#235"] if entry["SYSTEM"] == "MASTER-A" ][0] old_fbp_entry = [ entry for entry in scenario.bridge_state["#235"] if entry["SYSTEM"] == "OBP-1" ][0] new_master_entry = [ entry for entry in scenario.bridge_state["#236"] if entry["SYSTEM"] == "MASTER-A" ][0] new_fbp_entry = [ entry for entry in scenario.bridge_state["#236"] if entry["SYSTEM"] == "OBP-1" ][0] self.assertFalse(old_master_entry["ACTIVE"]) self.assertTrue(new_master_entry["ACTIVE"]) self.assertTrue(old_fbp_entry["ACTIVE"]) self.assertTrue(new_fbp_entry["ACTIVE"]) self.assertEqual(old_fbp_entry["TO_TYPE"], "NONE") self.assertEqual(new_fbp_entry["TO_TYPE"], "NONE") self.assertEqual(scenario.capture.packets, []) def test_dial_a_tg_disconnect_keeps_created_fbp_targets_active(self): config = minimal_config(("MASTER-A", "MASTER-B")) add_openbridge_system(config, "OBP-1", network_id=3001) with DeterministicScenario(config=config) as scenario: scenario.inject_hbp("MASTER-A", self.private_call(235, slot=1)) scenario.inject_hbp("MASTER-A", self.private_call(4000, slot=1, stream_id=0x01020305)) master_entry = [ entry for entry in scenario.bridge_state["#235"] if entry["SYSTEM"] == "MASTER-A" ][0] fbp_entry = [ entry for entry in scenario.bridge_state["#235"] if entry["SYSTEM"] == "OBP-1" ][0] self.assertFalse(master_entry["ACTIVE"]) self.assertTrue(fbp_entry["ACTIVE"]) self.assertEqual(fbp_entry["TO_TYPE"], "NONE") self.assertEqual(scenario.capture.packets, []) def test_rule_timer_loop_removes_disconnected_fbp_only_reflector(self): config = minimal_config(("MASTER-A", "MASTER-B")) add_openbridge_system(config, "OBP-1", network_id=3001) with DeterministicScenario(config=config) as scenario: scenario.inject_hbp("MASTER-A", self.private_call(235, slot=1)) scenario.inject_hbp("MASTER-A", self.private_call(4000, slot=1, stream_id=0x01020305)) self.assertIn("#235", scenario.bridge_state) scenario.bm.rule_timer_loop() self.assertNotIn("#235", scenario.bridge_state) self.assertEqual(scenario.capture.packets, []) def test_dial_a_tg_status_on_slot_1_reports_active_ts2_reflector(self): bridges = {} bridges.update(self.reflector_bridge("#235", active=True)) with DeterministicScenario(bridges=bridges) as scenario: spoken = [] scenario.bm.words["en_GB"].update( { "2": b"2", "3": b"3", "5": b"5", "linkedto": b"linkedto", "notlinked": b"notlinked", "silence": b"silence", "to": b"to", } ) scenario.bm.pkt_gen = lambda source_id, dst_id, peer_id, slot, say: spoken.append(list(say)) or iter(()) scenario.inject_hbp("MASTER-A", self.private_call(5000, slot=1)) scenario.inject_hbp("MASTER-A", self.private_call_terminator(5000, slot=1)) source_entry = [ entry for entry in scenario.bridge_state["#235"] if entry["SYSTEM"] == "MASTER-A" ][0] self.assertTrue(source_entry["ACTIVE"]) self.assertEqual(source_entry["TS"], 2) self.assertTrue(spoken) self.assertIn(b"linkedto", spoken[-1]) self.assertIn(b"2", spoken[-1]) self.assertIn(b"3", spoken[-1]) self.assertIn(b"5", spoken[-1]) self.assertNotIn(b"notlinked", spoken[-1]) self.assertEqual(scenario.capture.packets, []) def test_dial_a_tg_status_reports_only_one_active_reflector(self): bridges = {} bridges.update(self.reflector_bridge("#235", active=True)) bridges.update(self.reflector_bridge("#236", active=True)) with DeterministicScenario(bridges=bridges) as scenario: spoken = [] scenario.bm.words["en_GB"].update( { "2": b"2", "3": b"3", "5": b"5", "6": b"6", "linkedto": b"linkedto", "notlinked": b"notlinked", "silence": b"silence", "to": b"to", } ) scenario.bm.pkt_gen = lambda source_id, dst_id, peer_id, slot, say: spoken.append(list(say)) or iter(()) scenario.inject_hbp("MASTER-A", self.private_call(5000, slot=1)) scenario.inject_hbp("MASTER-A", self.private_call_terminator(5000, slot=1)) self.assertTrue(spoken) self.assertEqual(spoken[-1].count(b"linkedto"), 1) self.assertIn(b"5", spoken[-1]) self.assertNotIn(b"6", spoken[-1]) self.assertNotIn(b"notlinked", spoken[-1]) self.assertEqual(scenario.capture.packets, []) def test_remove_bridge_system_keeps_reset_system_identity(self): config = minimal_config(("MASTER-A", "MASTER-B", "MASTER-C")) bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("MASTER-B", 2), ("MASTER-C", 2), ), ) with DeterministicScenario(config=config, bridges=bridges) as scenario: original_b = dict(scenario.bridge_state["91"][1]) original_c = dict(scenario.bridge_state["91"][2]) scenario.bm.remove_bridge_system("MASTER-A") entries = scenario.bridge_state["91"] self.assertEqual([entry["SYSTEM"] for entry in entries], ["MASTER-A", "MASTER-B", "MASTER-C"]) self.assertFalse(entries[0]["ACTIVE"]) self.assertEqual(entries[0]["TS"], 2) self.assertEqual(entries[0]["TGID"], bytes_3(91)) self.assertEqual(entries[0]["ON"], [bytes_3(91)]) self.assertEqual(entries[1], original_b) self.assertEqual(entries[2], original_c) def test_remove_bridge_system_preserves_reflector_activation_trigger(self): config = minimal_config(("MASTER-A", "MASTER-B")) add_openbridge_system(config, "OBP-1", network_id=3001) bridges = { "#235": [ { "SYSTEM": "MASTER-A", "TS": 2, "TGID": bytes_3(9), "ACTIVE": True, "TIMEOUT": 60, "TO_TYPE": "OFF", "OFF": [], "ON": [bytes_3(235)], "RESET": [bytes_3(5000)], "TIMER": 123, }, { "SYSTEM": "MASTER-B", "TS": 2, "TGID": bytes_3(9), "ACTIVE": True, "TIMEOUT": 60, "TO_TYPE": "ON", "OFF": [], "ON": [bytes_3(235)], "RESET": [], "TIMER": 456, }, { "SYSTEM": "OBP-1", "TS": 1, "TGID": bytes_3(235), "ACTIVE": True, "TIMEOUT": "", "TO_TYPE": "NONE", "OFF": [], "ON": [], "RESET": [], "TIMER": 789, }, ] } with DeterministicScenario(config=config, bridges=bridges) as scenario: original_master_b = dict(scenario.bridge_state["#235"][1]) original_obp = dict(scenario.bridge_state["#235"][2]) scenario.bm.remove_bridge_system("MASTER-A") entries = scenario.bridge_state["#235"] self.assertEqual([entry["SYSTEM"] for entry in entries], ["MASTER-A", "MASTER-B", "OBP-1"]) self.assertFalse(entries[0]["ACTIVE"]) self.assertEqual(entries[0]["TO_TYPE"], "ON") self.assertEqual(entries[0]["TGID"], bytes_3(9)) self.assertEqual(entries[0]["ON"], [bytes_3(235)]) self.assertEqual(entries[0]["RESET"], [bytes_3(5000)]) self.assertEqual(entries[1], original_master_b) self.assertEqual(entries[2], original_obp) def test_hbp_reset_guard_allows_packets_when_lifecycle_flags_false(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["SYSTEMS"]["MASTER-A"]["_reset"] = False config["SYSTEMS"]["MASTER-A"]["_reloadoptions"] = False config["SYSTEMS"]["MASTER-A"]["_resetlog"] = False bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("MASTER-B", 2), ), ) with DeterministicScenario(config=config, bridges=bridges) as scenario: scenario.inject_hbp("MASTER-A", PacketSpec(dst_id=91, slot=2)) self.assertEqual(len(scenario.capture.for_system("MASTER-B")), 1) self.assertFalse(config["SYSTEMS"]["MASTER-A"]["_resetlog"]) def test_hbp_reset_guard_drops_packets_and_logs_once_while_reset_active(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["SYSTEMS"]["MASTER-A"]["_reset"] = True config["SYSTEMS"]["MASTER-A"]["_reloadoptions"] = False config["SYSTEMS"]["MASTER-A"]["_resetlog"] = False bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("MASTER-B", 2), ), ) with DeterministicScenario(config=config, bridges=bridges) as scenario: with self.assertLogs(scenario.bm.logger, level="INFO") as logs: scenario.inject_hbp("MASTER-A", PacketSpec(dst_id=91, slot=2)) scenario.inject_hbp("MASTER-A", PacketSpec(dst_id=91, slot=2, stream_id=0x01020305)) self.assertEqual(scenario.capture.for_system("MASTER-B"), []) self.assertTrue(config["SYSTEMS"]["MASTER-A"]["_resetlog"]) self.assertEqual("\n".join(logs.output).count("disallow transmission"), 1) def test_hbp_reset_guard_drops_packets_and_logs_once_while_reload_active(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["SYSTEMS"]["MASTER-A"]["_reset"] = False config["SYSTEMS"]["MASTER-A"]["_reloadoptions"] = True config["SYSTEMS"]["MASTER-A"]["_resetlog"] = False bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("MASTER-B", 2), ), ) with DeterministicScenario(config=config, bridges=bridges) as scenario: with self.assertLogs(scenario.bm.logger, level="INFO") as logs: scenario.inject_hbp("MASTER-A", PacketSpec(dst_id=91, slot=2)) scenario.inject_hbp("MASTER-A", PacketSpec(dst_id=91, slot=2, stream_id=0x01020305)) self.assertEqual(scenario.capture.for_system("MASTER-B"), []) self.assertTrue(config["SYSTEMS"]["MASTER-A"]["_resetlog"]) self.assertEqual("\n".join(logs.output).count("disallow transmission"), 1) def test_hbp_unit_data_to_obp_reports_on_target_system(self): config = minimal_config(("MASTER-A",)) add_openbridge_system(config, "OBP-1", network_id=3001) config["REPORTS"]["REPORT"] = True bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("OBP-1", 1), ), ) with DeterministicScenario(config=config, bridges=bridges) as scenario: packet = PacketSpec( dst_id=1000001, slot=2, stream_id=0x01020304, call_type="unit", frame_type=HBPF_DATA_SYNC, dtype_vseq=6, ) scenario.inject_hbp("MASTER-A", packet) captured = scenario.capture.for_system("OBP-1") self.assertEqual(len(captured), 1) self.assertEqual(captured[0].fields["dst_id"], bytes_3(1000001)) self.assertEqual(len(scenario.reports["OBP-1"].events), 1) self.assertIn(b"UNIT DATA,DATA,TX,OBP-1", scenario.reports["OBP-1"].events[0]) self.assertFalse( any( event.startswith(b"UNIT DATA,DATA,TX,OBP-1") for event in scenario.reports["MASTER-A"].events ) ) def test_hbp_unit_data_to_obp_preserves_ber_rssi_metadata(self): config = minimal_config(("MASTER-A",)) add_openbridge_system(config, "OBP-1", network_id=3001) bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("OBP-1", 1), ), ) with DeterministicScenario(config=config, bridges=bridges) as scenario: packet = PacketSpec( dst_id=1000001, slot=2, stream_id=0x01020304, call_type="unit", frame_type=HBPF_DATA_SYNC, dtype_vseq=6, ber=b"B", rssi=b"R", ) scenario.inject_hbp("MASTER-A", packet) captured = scenario.capture.for_system("OBP-1") self.assertEqual(len(captured), 1) self.assertEqual(captured[0].ber, b"B") self.assertEqual(captured[0].rssi, b"R") self.assertEqual(captured[0].fields["ber"], b"") self.assertEqual(captured[0].fields["rssi"], b"") def test_hbp_unit_data_to_enhanced_obp_without_keepalive_is_dropped(self): config = minimal_config(("MASTER-A",)) add_openbridge_system(config, "OBP-1", network_id=3001) config["SYSTEMS"]["OBP-1"]["ENHANCED_OBP"] = True bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("OBP-1", 1), ), ) with DeterministicScenario(config=config, bridges=bridges) as scenario: packet = PacketSpec( dst_id=1000001, slot=2, stream_id=0x01020304, call_type="unit", frame_type=HBPF_DATA_SYNC, dtype_vseq=6, ) scenario.inject_hbp("MASTER-A", packet) self.assertEqual(scenario.capture.for_system("OBP-1"), []) def test_hbp_unit_data_to_enhanced_obp_with_stale_keepalive_is_dropped(self): config = minimal_config(("MASTER-A",)) add_openbridge_system(config, "OBP-1", network_id=3001) config["SYSTEMS"]["OBP-1"]["ENHANCED_OBP"] = True bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("OBP-1", 1), ), ) with DeterministicScenario(config=config, bridges=bridges) as scenario: config["SYSTEMS"]["OBP-1"]["_bcka"] = scenario.clock.time() - 61 packet = PacketSpec( dst_id=1000001, slot=2, stream_id=0x01020304, call_type="unit", frame_type=HBPF_DATA_SYNC, dtype_vseq=6, ) scenario.inject_hbp("MASTER-A", packet) self.assertEqual(scenario.capture.for_system("OBP-1"), []) def test_hbp_unit_data_to_enhanced_obp_with_recent_keepalive_is_forwarded(self): config = minimal_config(("MASTER-A",)) add_openbridge_system(config, "OBP-1", network_id=3001) config["SYSTEMS"]["OBP-1"]["ENHANCED_OBP"] = True bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("OBP-1", 1), ), ) with DeterministicScenario(config=config, bridges=bridges) as scenario: config["SYSTEMS"]["OBP-1"]["_bcka"] = scenario.clock.time() - 5 packet = PacketSpec( dst_id=1000001, slot=2, stream_id=0x01020304, call_type="unit", frame_type=HBPF_DATA_SYNC, dtype_vseq=6, ) scenario.inject_hbp("MASTER-A", packet) self.assertEqual(len(scenario.capture.for_system("OBP-1")), 1) def test_obp_unit_data_to_enhanced_obp_without_keepalive_is_dropped(self): config = minimal_config(("MASTER-A",)) add_openbridge_system(config, "OBP-1", network_id=3001) add_openbridge_system(config, "OBP-2", network_id=3002) config["SYSTEMS"]["OBP-2"]["ENHANCED_OBP"] = True bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("OBP-1", 1), ("OBP-2", 1), ), ) with DeterministicScenario(config=config, bridges=bridges) as scenario: packet = PacketSpec( dst_id=1000001, slot=1, stream_id=0x01020304, call_type="unit", frame_type=HBPF_DATA_SYNC, dtype_vseq=6, ) scenario.inject_obp("OBP-1", packet) self.assertEqual(scenario.capture.for_system("OBP-2"), []) def test_obp_unit_data_to_fbp_preserves_source_repeater_metadata(self): config = minimal_config(()) add_openbridge_system(config, "OBP-1", network_id=3001) add_openbridge_system(config, "OBP-2", network_id=3002) with DeterministicScenario(config=config) as scenario: packet = PacketSpec( dst_id=1000001, slot=1, stream_id=0x01020304, call_type="unit", frame_type=HBPF_DATA_SYNC, dtype_vseq=6, ber=b"B", rssi=b"R", ) scenario.systems["OBP-1"].dmrd_received( *packet.decoded_obp_args( hops=b"\x05", source_server=7654321, source_rptr=1234567, ) ) captured = scenario.capture.for_system("OBP-2") self.assertEqual(len(captured), 1) self.assertEqual(captured[0].hops, b"\x05") self.assertEqual(captured[0].source_server, bytes_4(7654321)) self.assertEqual(captured[0].source_rptr, bytes_4(1234567)) self.assertEqual(captured[0].ber, b"B") self.assertEqual(captured[0].rssi, b"R") def test_obp_parser_discards_dmre_shorter_than_version_byte(self): config = minimal_config(()) add_openbridge_system(config, "OBP-1", network_id=3001) with DeterministicScenario(config=config) as scenario: with self.assertLogs("hblink", level="WARNING") as logs: scenario.inject_datagram("OBP-1", b"DMRE") self.assertIn("packet too short", "\n".join(logs.output)) self.assertEqual(scenario.capture.for_system("OBP-1"), []) def test_obp_parser_discards_truncated_dmre_after_version_byte(self): config = minimal_config(()) add_openbridge_system(config, "OBP-1", network_id=3001) with DeterministicScenario(config=config) as scenario: packet = b"DMRE" + (b"\x00" * 51) + b"\x05" + (b"\x00" * 10) with self.assertLogs("hblink", level="WARNING") as logs: scenario.inject_datagram("OBP-1", packet) self.assertIn("v5 packet too short", "\n".join(logs.output)) self.assertEqual(scenario.capture.for_system("OBP-1"), []) def test_hbp_master_parser_discards_truncated_dmrd_from_connected_peer(self): config = minimal_config(("MASTER-A",)) with DeterministicScenario(config=config) as scenario: sockaddr = ("127.0.0.1", 50000) scenario.register_peer("MASTER-A", peer_id=1001, sockaddr=sockaddr) packet = b"DMRD" + b"\x01" + bytes_3(3120001) + bytes_3(123) + bytes_4(1001) with self.assertLogs("hblink", level="WARNING") as logs: scenario.inject_datagram("MASTER-A", packet, sockaddr=sockaddr) self.assertIn("DMRD packet too short", "\n".join(logs.output)) self.assertEqual(scenario.capture.for_system("MASTER-A"), []) def test_obp_bridge_control_bcst_sets_global_stun_flag(self): config = minimal_config(()) add_openbridge_system(config, "OBP-1", network_id=3001) config["SYSTEMS"]["OBP-1"]["ENHANCED_OBP"] = True with DeterministicScenario(config=config) as scenario: packet = b"BCST" + hmac_new( config["SYSTEMS"]["OBP-1"]["PASSPHRASE"], b"BCST", sha1, ).digest() scenario.inject_datagram("OBP-1", packet) self.assertTrue(config["STUN"]) self.assertNotIn("_STUN", config["SYSTEMS"]["OBP-1"]) def test_hbp_group_data_reports_as_data_without_voice_timeout_events(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["REPORTS"]["REPORT"] = True bridges = active_bridge( "123", 123, ( ("MASTER-A", 2), ("MASTER-B", 2), ), ) with DeterministicScenario(config=config, bridges=bridges) as scenario: packet = PacketSpec( dst_id=123, slot=2, call_type="group", frame_type=HBPF_DATA_SYNC, dtype_vseq=6, ) scenario.inject_hbp("MASTER-A", packet) scenario.clock.advance(6) scenario.bm.stream_trimmer_loop() self.assertIn( b"DATA HEADER,DATA,RX,MASTER-A,16909060,1001,3120001,2,123", scenario.reports["MASTER-A"].events, ) self.assertIn( b"DATA HEADER,DATA,TX,MASTER-B,16909060,1001,3120001,2,123", scenario.reports["MASTER-B"].events, ) self.assertFalse( any( event.startswith(b"GROUP VOICE") for event in ( scenario.reports["MASTER-A"].events + scenario.reports["MASTER-B"].events ) ) ) def test_hbp_group_rate_drop_handles_zero_duration(self): config = minimal_config(("MASTER-A",)) with DeterministicScenario(config=config) as scenario: packet = PacketSpec( dst_id=123, slot=2, stream_id=0x01020304, call_type="group", frame_type=HBPF_DATA_SYNC, dtype_vseq=6, ) for _ in range(19): scenario.inject_hbp("MASTER-A", packet) self.assertEqual( scenario.systems["MASTER-A"].STATUS[2]["RX_STREAM_ID"], bytes_4(0x01020304), ) def test_obp_group_data_reports_as_data_without_voice_timeout_events(self): config = minimal_config(("MASTER-A",)) add_openbridge_system(config, "OBP-1", network_id=3001) config["REPORTS"]["REPORT"] = True bridges = active_bridge( "123", 123, ( ("OBP-1", 1), ("MASTER-A", 2), ), ) with DeterministicScenario(config=config, bridges=bridges) as scenario: packet = PacketSpec( dst_id=123, slot=1, call_type="group", frame_type=HBPF_DATA_SYNC, dtype_vseq=6, ) with self.assertLogs(scenario.bm.logger, level="INFO") as logs: scenario.inject_obp("OBP-1", packet) scenario.clock.advance(6) scenario.bm.stream_trimmer_loop() self.assertIn("*DATA HEADER*", "\n".join(logs.output)) self.assertNotIn("*CALL START*", "\n".join(logs.output)) self.assertIn( b"DATA HEADER,DATA,RX,OBP-1,16909060,1001,3120001,1,123", scenario.reports["OBP-1"].events, ) self.assertIn( b"DATA HEADER,DATA,TX,MASTER-A,16909060,1001,3120001,2,123", scenario.reports["MASTER-A"].events, ) self.assertFalse( any( event.startswith(b"GROUP VOICE") for event in ( scenario.reports["OBP-1"].events + scenario.reports["MASTER-A"].events ) ) ) def test_hbp_group_data_continuation_reports_as_data(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["REPORTS"]["REPORT"] = True bridges = active_bridge( "123", 123, ( ("MASTER-A", 2), ("MASTER-B", 2), ), ) with DeterministicScenario(config=config, bridges=bridges) as scenario: packet = PacketSpec( dst_id=123, slot=2, call_type="group", frame_type=HBPF_DATA_SYNC, dtype_vseq=8, ) scenario.inject_hbp("MASTER-A", packet) scenario.clock.advance(6) scenario.bm.stream_trimmer_loop() self.assertIn( b"VCSBK 3/4 DATA BLOCK,DATA,RX,MASTER-A,16909060,1001,3120001,2,123", scenario.reports["MASTER-A"].events, ) self.assertIn( b"VCSBK 3/4 DATA BLOCK,DATA,TX,MASTER-B,16909060,1001,3120001,2,123", scenario.reports["MASTER-B"].events, ) self.assertFalse( any( event.startswith(b"GROUP VOICE") for event in ( scenario.reports["MASTER-A"].events + scenario.reports["MASTER-B"].events ) ) ) def test_obp_group_data_continuation_reports_as_data(self): config = minimal_config(("MASTER-A",)) add_openbridge_system(config, "OBP-1", network_id=3001) config["REPORTS"]["REPORT"] = True bridges = active_bridge( "123", 123, ( ("OBP-1", 1), ("MASTER-A", 2), ), ) with DeterministicScenario(config=config, bridges=bridges) as scenario: packet = PacketSpec( dst_id=123, slot=1, call_type="group", frame_type=HBPF_DATA_SYNC, dtype_vseq=7, ) scenario.inject_obp("OBP-1", packet) scenario.clock.advance(6) scenario.bm.stream_trimmer_loop() self.assertIn( b"VCSBK 1/2 DATA BLOCK,DATA,RX,OBP-1,16909060,1001,3120001,1,123", scenario.reports["OBP-1"].events, ) self.assertIn( b"VCSBK 1/2 DATA BLOCK,DATA,TX,MASTER-A,16909060,1001,3120001,2,123", scenario.reports["MASTER-A"].events, ) self.assertFalse( any( event.startswith(b"GROUP VOICE") for event in ( scenario.reports["OBP-1"].events + scenario.reports["MASTER-A"].events ) ) ) def test_hbp_to_hbp_data_sync_control_payload_is_not_emb_lc_rewritten(self): config = minimal_config(("MASTER-A", "MASTER-B")) bridges = active_bridge( "123", 123, ( ("MASTER-A", 2), ("MASTER-B", 2), ), ) payload = bytes(range(33)) with DeterministicScenario(config=config, bridges=bridges) as scenario: packet = PacketSpec( dst_id=123, slot=2, call_type="vcsbk", frame_type=HBPF_DATA_SYNC, dtype_vseq=3, payload=payload, ) scenario.inject_hbp("MASTER-A", packet) captured = scenario.capture.for_system("MASTER-B") self.assertEqual(len(captured), 1) self.assertEqual(captured[0].fields["dmr_payload"], payload) def test_hbp_to_obp_data_sync_control_payload_is_not_emb_lc_rewritten(self): config = minimal_config(("MASTER-A",)) add_openbridge_system(config, "OBP-1", network_id=3001) bridges = active_bridge( "123", 123, ( ("MASTER-A", 2), ("OBP-1", 1), ), ) payload = bytes(range(33)) with DeterministicScenario(config=config, bridges=bridges) as scenario: packet = PacketSpec( dst_id=123, slot=2, call_type="vcsbk", frame_type=HBPF_DATA_SYNC, dtype_vseq=3, payload=payload, ) scenario.inject_hbp("MASTER-A", packet) captured = scenario.capture.for_system("OBP-1") self.assertEqual(len(captured), 1) self.assertEqual(captured[0].fields["dmr_payload"], payload) def test_obp_to_hbp_data_sync_control_payload_is_not_emb_lc_rewritten(self): config = minimal_config(("MASTER-A",)) add_openbridge_system(config, "OBP-1", network_id=3001) bridges = active_bridge( "123", 123, ( ("OBP-1", 1), ("MASTER-A", 2), ), ) payload = bytes(range(33)) with DeterministicScenario(config=config, bridges=bridges) as scenario: packet = PacketSpec( dst_id=123, slot=1, call_type="vcsbk", frame_type=HBPF_DATA_SYNC, dtype_vseq=3, payload=payload, ) scenario.inject_obp("OBP-1", packet) captured = scenario.capture.for_system("MASTER-A") self.assertEqual(len(captured), 1) self.assertEqual(captured[0].fields["dmr_payload"], payload) def test_obp_to_obp_data_sync_control_payload_is_not_emb_lc_rewritten(self): config = minimal_config(()) add_openbridge_system(config, "OBP-1", network_id=3001) add_openbridge_system(config, "OBP-2", network_id=3002) bridges = active_bridge( "123", 123, ( ("OBP-1", 1), ("OBP-2", 1), ), ) payload = bytes(range(33)) with DeterministicScenario(config=config, bridges=bridges) as scenario: packet = PacketSpec( dst_id=123, slot=1, call_type="vcsbk", frame_type=HBPF_DATA_SYNC, dtype_vseq=3, payload=payload, ) scenario.inject_obp("OBP-1", packet) captured = scenario.capture.for_system("OBP-2") self.assertEqual(len(captured), 1) self.assertEqual(captured[0].fields["dmr_payload"], payload) def test_hbp_vcsbk_data_reports_specific_rx_event_without_generic_duplicate(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["REPORTS"]["REPORT"] = True bridges = active_bridge( "123", 123, ( ("MASTER-A", 2), ("MASTER-B", 2), ), ) with DeterministicScenario(config=config, bridges=bridges) as scenario: packet = PacketSpec( dst_id=123, slot=2, call_type="vcsbk", frame_type=HBPF_DATA_SYNC, dtype_vseq=7, ) scenario.inject_hbp("MASTER-A", packet) self.assertIn( b"VCSBK 1/2 DATA BLOCK,DATA,RX,MASTER-A,16909060,1001,3120001,2,123", scenario.reports["MASTER-A"].events, ) self.assertIn( b"VCSBK 1/2 DATA BLOCK,DATA,TX,MASTER-B,16909060,1001,3120001,2,123", scenario.reports["MASTER-B"].events, ) self.assertFalse( any( event.startswith(b"OTHER DATA,DATA,RX") for event in scenario.reports["MASTER-A"].events ) ) def test_hbp_vcsbk_unknown_type_reports_other_data_rx(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["REPORTS"]["REPORT"] = True bridges = active_bridge( "123", 123, ( ("MASTER-A", 2), ("MASTER-B", 2), ), ) with DeterministicScenario(config=config, bridges=bridges) as scenario: packet = PacketSpec( dst_id=123, slot=2, call_type="vcsbk", frame_type=HBPF_DATA_SYNC, dtype_vseq=5, ) scenario.inject_hbp("MASTER-A", packet) scenario.clock.advance(6) scenario.bm.stream_trimmer_loop() self.assertIn( b"OTHER DATA,DATA,RX,MASTER-A,16909060,1001,3120001,2,123", scenario.reports["MASTER-A"].events, ) self.assertIn( b"OTHER DATA,DATA,TX,MASTER-B,16909060,1001,3120001,2,123", scenario.reports["MASTER-B"].events, ) self.assertFalse( any( event.startswith(b"GROUP VOICE") for event in ( scenario.reports["MASTER-A"].events + scenario.reports["MASTER-B"].events ) ) ) def test_obp_vcsbk_unknown_type_reports_other_data_without_voice_events(self): config = minimal_config(("MASTER-A",)) add_openbridge_system(config, "OBP-1", network_id=3001) config["REPORTS"]["REPORT"] = True bridges = active_bridge( "123", 123, ( ("OBP-1", 1), ("MASTER-A", 2), ), ) with DeterministicScenario(config=config, bridges=bridges) as scenario: packet = PacketSpec( dst_id=123, slot=1, call_type="vcsbk", frame_type=HBPF_DATA_SYNC, dtype_vseq=5, ) scenario.inject_obp("OBP-1", packet) scenario.clock.advance(6) scenario.bm.stream_trimmer_loop() self.assertIn( b"OTHER DATA,DATA,RX,OBP-1,16909060,1001,3120001,1,123", scenario.reports["OBP-1"].events, ) self.assertIn( b"OTHER DATA,DATA,TX,MASTER-A,16909060,1001,3120001,2,123", scenario.reports["MASTER-A"].events, ) self.assertFalse( any( event.startswith(b"GROUP VOICE") for event in ( scenario.reports["OBP-1"].events + scenario.reports["MASTER-A"].events ) ) ) def test_obp_group_loop_control_logs_packet_rate_not_duration(self): config = minimal_config(("MASTER-A",)) add_openbridge_system(config, "OBP-1", network_id=3001) add_openbridge_system(config, "OBP-2", network_id=3002) with DeterministicScenario(config=config) as scenario: packet = PacketSpec( dst_id=123, slot=1, stream_id=0x01020304, call_type="group", frame_type=HBPF_VOICE, dtype_vseq=0, ) scenario.inject_obp("OBP-1", packet) scenario.inject_obp("OBP-2", packet) scenario.clock.advance(2) with self.assertLogs(scenario.bm.logger, level="DEBUG") as logs: scenario.inject_obp("OBP-2", packet) self.assertIn("PACKET RATE 0.50/s", "\n".join(logs.output)) self.assertNotIn("PACKET RATE 2.00/s", "\n".join(logs.output)) def test_obp_group_rate_drop_uses_elapsed_duration(self): config = minimal_config(("MASTER-A",)) add_openbridge_system(config, "OBP-1", network_id=3001) with DeterministicScenario(config=config) as scenario: rate_drops = [] scenario.systems["OBP-1"].proxy_BadPeer = lambda: rate_drops.append(True) stream_id = 0x01020304 first_packet = PacketSpec( dst_id=123, slot=1, stream_id=stream_id, seq=0, call_type="group", frame_type=HBPF_VOICE, dtype_vseq=0, payload=bytes([0]) * 33, ) scenario.inject_obp("OBP-1", first_packet) scenario.clock.advance(0.5) with self.assertLogs(scenario.bm.logger, level="WARNING") as logs: for seq in range(1, 20): packet = PacketSpec( dst_id=123, slot=1, stream_id=stream_id, seq=seq, call_type="group", frame_type=HBPF_VOICE, dtype_vseq=0, payload=bytes([seq]) * 33, ) scenario.inject_obp("OBP-1", packet) self.assertEqual(rate_drops, [True]) self.assertIn("*PacketControl* RATE DROP!", "\n".join(logs.output)) def test_obp_voice_terminator_marks_stream_finished_when_reports_disabled(self): config = minimal_config(("MASTER-B",)) add_openbridge_system(config, "OBP-1", network_id=3001) config["REPORTS"]["REPORT"] = False bridges = active_bridge( "91", 91, ( ("OBP-1", 1), ("MASTER-B", 2), ), ) stream_id = 0x01020304 with DeterministicScenario(config=config, bridges=bridges) as scenario: header = PacketSpec( dst_id=91, slot=1, stream_id=stream_id, seq=1, call_type="group", frame_type=HBPF_DATA_SYNC, dtype_vseq=HBPF_SLT_VHEAD, ) terminator = PacketSpec( dst_id=91, slot=1, stream_id=stream_id, seq=2, call_type="group", frame_type=HBPF_DATA_SYNC, dtype_vseq=HBPF_SLT_VTERM, payload=b"\x01" * 33, ) late_packet = PacketSpec( dst_id=91, slot=1, stream_id=stream_id, seq=3, call_type="group", frame_type=HBPF_VOICE, dtype_vseq=3, payload=b"\x02" * 33, ) scenario.inject_obp("OBP-1", header) scenario.inject_obp("OBP-1", terminator) before_late_packet = len(scenario.capture.for_system("MASTER-B")) scenario.inject_obp("OBP-1", late_packet) captured = scenario.capture.for_system("MASTER-B") self.assertEqual(before_late_packet, 2) self.assertEqual(len(captured), 2) self.assertIn( "_fin", scenario.systems["OBP-1"].STATUS[bytes_4(stream_id)], ) def test_hbp_to_obp_terminator_marks_target_finished_without_timeout_report(self): config = minimal_config(("MASTER-A",)) add_openbridge_system(config, "OBP-1", network_id=3001) config["REPORTS"]["REPORT"] = True bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("OBP-1", 1), ), ) stream_id = 0x01020304 with DeterministicScenario(config=config, bridges=bridges) as scenario: header = PacketSpec( dst_id=91, slot=2, stream_id=stream_id, seq=1, call_type="group", frame_type=HBPF_DATA_SYNC, dtype_vseq=HBPF_SLT_VHEAD, ) terminator = PacketSpec( dst_id=91, slot=2, stream_id=stream_id, seq=2, call_type="group", frame_type=HBPF_DATA_SYNC, dtype_vseq=HBPF_SLT_VTERM, payload=b"\x01" * 33, ) scenario.inject_hbp("MASTER-A", header) scenario.inject_hbp("MASTER-A", terminator) scenario.clock.advance(6) scenario.bm.stream_trimmer_loop() self.assertIn( "_fin", scenario.systems["OBP-1"].STATUS[bytes_4(stream_id)], ) self.assertIn( b"GROUP VOICE,END,TX,OBP-1,16909060,1001,3120001,1,91,0.00", scenario.reports["OBP-1"].events, ) self.assertFalse( any( event.startswith(b"GROUP VOICE,END,RX,OBP-1") for event in scenario.reports["OBP-1"].events ) ) def test_obp_to_obp_terminator_marks_target_finished_without_timeout_report(self): config = minimal_config(()) add_openbridge_system(config, "OBP-1", network_id=3001) add_openbridge_system(config, "OBP-2", network_id=3002) config["REPORTS"]["REPORT"] = True bridges = active_bridge( "91", 91, ( ("OBP-1", 1), ("OBP-2", 1), ), ) stream_id = 0x01020304 with DeterministicScenario(config=config, bridges=bridges) as scenario: header = PacketSpec( dst_id=91, slot=1, stream_id=stream_id, seq=1, call_type="group", frame_type=HBPF_DATA_SYNC, dtype_vseq=HBPF_SLT_VHEAD, ) terminator = PacketSpec( dst_id=91, slot=1, stream_id=stream_id, seq=2, call_type="group", frame_type=HBPF_DATA_SYNC, dtype_vseq=HBPF_SLT_VTERM, payload=b"\x01" * 33, ) scenario.inject_obp("OBP-1", header) scenario.inject_obp("OBP-1", terminator) scenario.clock.advance(6) scenario.bm.stream_trimmer_loop() self.assertIn( "_fin", scenario.systems["OBP-2"].STATUS[bytes_4(stream_id)], ) self.assertIn( b"GROUP VOICE,END,TX,OBP-2,16909060,1001,3120001,1,91,0.00", scenario.reports["OBP-2"].events, ) self.assertFalse( any( event.startswith(b"GROUP VOICE,END,RX,OBP-2") for event in scenario.reports["OBP-2"].events ) ) def test_hbp_voice_terminator_suppresses_late_same_stream_voice(self): config = minimal_config(("MASTER-A", "MASTER-B")) bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("MASTER-B", 2), ), ) stream_id = 0x01020304 with DeterministicScenario(config=config, bridges=bridges) as scenario: header = PacketSpec( dst_id=91, slot=2, stream_id=stream_id, seq=1, call_type="group", frame_type=HBPF_DATA_SYNC, dtype_vseq=HBPF_SLT_VHEAD, ) terminator = PacketSpec( dst_id=91, slot=2, stream_id=stream_id, seq=2, call_type="group", frame_type=HBPF_DATA_SYNC, dtype_vseq=HBPF_SLT_VTERM, payload=b"\x01" * 33, ) late_packet = PacketSpec( dst_id=91, slot=2, stream_id=stream_id, seq=3, call_type="group", frame_type=HBPF_VOICE, dtype_vseq=3, payload=b"\x02" * 33, ) scenario.inject_hbp("MASTER-A", header) scenario.inject_hbp("MASTER-A", terminator) before_late_packet = len(scenario.capture.for_system("MASTER-B")) scenario.inject_hbp("MASTER-A", late_packet) captured = scenario.capture.for_system("MASTER-B") source_slot = scenario.systems["MASTER-A"].STATUS[2] self.assertEqual(before_late_packet, 2) self.assertEqual(len(captured), 2) self.assertEqual(source_slot["RX_FINISHED_STREAM_ID"], bytes_4(stream_id)) self.assertEqual(source_slot["RX_TYPE"], HBPF_SLT_VTERM) def test_hbp_new_voice_terminator_after_data_stream_marks_finished(self): config = minimal_config(("MASTER-A", "MASTER-B")) bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("MASTER-B", 2), ), ) stream_id = 0x01020302 with DeterministicScenario(config=config, bridges=bridges) as scenario: data_header = PacketSpec( dst_id=91, slot=2, stream_id=0x01020301, seq=1, call_type="group", frame_type=HBPF_DATA_SYNC, dtype_vseq=6, ) terminator = PacketSpec( dst_id=91, slot=2, stream_id=stream_id, seq=2, call_type="group", frame_type=HBPF_DATA_SYNC, dtype_vseq=HBPF_SLT_VTERM, payload=b"\x01" * 33, ) late_packet = PacketSpec( dst_id=91, slot=2, stream_id=stream_id, seq=3, call_type="group", frame_type=HBPF_VOICE, dtype_vseq=3, payload=b"\x02" * 33, ) scenario.inject_hbp("MASTER-A", data_header) self.assertTrue(scenario.systems["MASTER-A"].STATUS[2]["RX_DATA_STREAM"]) scenario.inject_hbp("MASTER-A", terminator) before_late_packet = len(scenario.capture.for_system("MASTER-B")) scenario.inject_hbp("MASTER-A", late_packet) captured = scenario.capture.for_system("MASTER-B") source_slot = scenario.systems["MASTER-A"].STATUS[2] self.assertEqual(before_late_packet, 2) self.assertEqual(len(captured), 2) self.assertFalse(source_slot["RX_DATA_STREAM"]) self.assertEqual(source_slot["RX_FINISHED_STREAM_ID"], bytes_4(stream_id)) self.assertEqual(source_slot["RX_TYPE"], HBPF_SLT_VTERM) def test_hbp_idle_slot_voice_terminator_marks_finished(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["REPORTS"]["REPORT"] = True bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("MASTER-B", 2), ), ) stream_id = 0x01020304 with DeterministicScenario(config=config, bridges=bridges) as scenario: scenario.clock.advance(100) terminator = PacketSpec( dst_id=91, slot=2, stream_id=stream_id, seq=2, call_type="group", frame_type=HBPF_DATA_SYNC, dtype_vseq=HBPF_SLT_VTERM, payload=b"\x01" * 33, ) late_packet = PacketSpec( dst_id=91, slot=2, stream_id=stream_id, seq=3, call_type="group", frame_type=HBPF_VOICE, dtype_vseq=3, payload=b"\x02" * 33, ) scenario.inject_hbp("MASTER-A", terminator) before_late_packet = len(scenario.capture.for_system("MASTER-B")) scenario.inject_hbp("MASTER-A", late_packet) captured = scenario.capture.for_system("MASTER-B") source_slot = scenario.systems["MASTER-A"].STATUS[2] self.assertEqual(before_late_packet, 1) self.assertEqual(len(captured), 1) self.assertEqual(source_slot["RX_FINISHED_STREAM_ID"], bytes_4(stream_id)) self.assertEqual(source_slot["RX_TYPE"], HBPF_SLT_VTERM) self.assertTrue( any( event.startswith(b"GROUP VOICE,END,RX,MASTER-A") for event in scenario.reports["MASTER-A"].events ) ) def test_hbp_group_voice_still_reports_voice_lifecycle(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["REPORTS"]["REPORT"] = True bridges = active_bridge( "123", 123, ( ("MASTER-A", 2), ("MASTER-B", 2), ), ) with DeterministicScenario(config=config, bridges=bridges) as scenario: packet = PacketSpec( dst_id=123, slot=2, call_type="group", dtype_vseq=0, ) scenario.inject_hbp("MASTER-A", packet) scenario.clock.advance(6) scenario.bm.stream_trimmer_loop() self.assertTrue( any( event.startswith(b"GROUP VOICE,START,RX,MASTER-A") for event in scenario.reports["MASTER-A"].events ) ) self.assertTrue( any( event.startswith(b"GROUP VOICE,END,RX,MASTER-A") for event in scenario.reports["MASTER-A"].events ) ) self.assertTrue( any( event.startswith(b"GROUP VOICE,START,TX,MASTER-B") for event in scenario.reports["MASTER-B"].events ) ) self.assertTrue( any( event.startswith(b"GROUP VOICE,END,TX,MASTER-B") for event in scenario.reports["MASTER-B"].events ) ) def test_hbp_voice_sequence_wrap_with_loss_routes_forward_progress(self): bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("MASTER-B", 2), ), ) stream_id = 0x01020304 packets = [ PacketSpec(dst_id=91, slot=2, stream_id=stream_id, seq=254, dtype_vseq=0), PacketSpec(dst_id=91, slot=2, stream_id=stream_id, seq=255, dtype_vseq=1), PacketSpec(dst_id=91, slot=2, stream_id=stream_id, seq=2, dtype_vseq=2), ] with DeterministicScenario(bridges=bridges) as scenario: for packet in packets: scenario.inject_hbp("MASTER-A", packet) captured = scenario.capture.for_system("MASTER-B") self.assertEqual(len(captured), 3) self.assertEqual(captured[-1].packet[4], 2) source_slot = scenario.systems["MASTER-A"].STATUS[2] self.assertEqual(source_slot["lastSeq"], 2) self.assertEqual(source_slot["loss"], 1) def test_obp_voice_sequence_wrap_with_loss_routes_forward_progress(self): config = minimal_config() add_openbridge_system(config, "OBP-1", network_id=3001) add_openbridge_system(config, "OBP-2", network_id=3002) bridges = active_bridge( "91", 91, ( ("OBP-1", 1), ("OBP-2", 1), ), ) stream_id = 0x01020304 packets = [ PacketSpec(dst_id=91, slot=1, stream_id=stream_id, seq=254, dtype_vseq=0), PacketSpec(dst_id=91, slot=1, stream_id=stream_id, seq=255, dtype_vseq=1), PacketSpec(dst_id=91, slot=1, stream_id=stream_id, seq=2, dtype_vseq=2), ] with DeterministicScenario(config=config, bridges=bridges) as scenario: for packet in packets: scenario.inject_obp("OBP-1", packet) captured = scenario.capture.for_system("OBP-2") self.assertEqual(len(captured), 3) self.assertEqual(captured[-1].packet[4], 2) source_status = scenario.systems["OBP-1"].STATUS[bytes_4(stream_id)] self.assertEqual(source_status["lastSeq"], 2) self.assertEqual(source_status["loss"], 1) def test_obp_target_missing_emb_lc_logs_without_crashing(self): config = minimal_config() add_openbridge_system(config, "OBP-1", network_id=3001) add_openbridge_system(config, "OBP-2", network_id=3002) bridges = active_bridge( "91", 91, ( ("OBP-1", 1), ("OBP-2", 1), ), ) stream_id = 0x01020304 header = PacketSpec( dst_id=91, slot=1, stream_id=stream_id, seq=0, frame_type=HBPF_DATA_SYNC, dtype_vseq=HBPF_SLT_VHEAD, ) voice = PacketSpec( dst_id=91, slot=1, stream_id=stream_id, seq=1, frame_type=HBPF_VOICE, dtype_vseq=1, ) with DeterministicScenario(config=config, bridges=bridges) as scenario: scenario.inject_obp("OBP-1", header) del scenario.systems["OBP-2"].STATUS[bytes_4(stream_id)]["EMB_LC"] with self.assertLogs(scenario.bm.logger, level="WARNING") as logs: scenario.inject_obp("OBP-1", voice) self.assertIn( "(OBP-1) KeyError - EMB_LC, skipping", "\n".join(logs.output), ) captured = scenario.capture.for_system("OBP-2") self.assertEqual(len(captured), 1) self.assertEqual(captured[0].fields["dtype_vseq"], HBPF_SLT_VHEAD) def test_hbp_voice_sequence_zero_duplicate_is_dropped(self): bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("MASTER-B", 2), ), ) first_packet = PacketSpec( dst_id=91, slot=2, stream_id=0x01020304, seq=0, dtype_vseq=0, ) duplicate_packet = PacketSpec( dst_id=91, slot=2, stream_id=0x01020304, seq=0, dtype_vseq=1, payload=b"\x01" * 33, ) with DeterministicScenario(bridges=bridges) as scenario: scenario.inject_hbp("MASTER-A", first_packet) scenario.inject_hbp("MASTER-A", duplicate_packet) captured = scenario.capture.for_system("MASTER-B") self.assertEqual(len(captured), 1) source_slot = scenario.systems["MASTER-A"].STATUS[2] self.assertEqual(source_slot["lastSeq"], 0) self.assertEqual(source_slot["loss"], 1) def test_hbp_new_stream_after_timeout_resets_duplicate_state(self): bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("MASTER-B", 2), ), ) previous_packet = PacketSpec( dst_id=91, slot=2, stream_id=0x01020304, seq=200, dtype_vseq=0, ) next_packet = PacketSpec( dst_id=91, slot=2, stream_id=0x01020305, seq=1, dtype_vseq=0, ) with DeterministicScenario(bridges=bridges) as scenario: scenario.inject_hbp("MASTER-A", previous_packet) source_slot = scenario.systems["MASTER-A"].STATUS[2] self.assertEqual(source_slot["lastSeq"], 200) scenario.clock.advance(6) scenario.bm.stream_trimmer_loop() scenario.inject_hbp("MASTER-A", next_packet) captured = scenario.capture.for_system("MASTER-B") self.assertEqual(len(captured), 2) self.assertEqual(captured[-1].fields["stream_id"], bytes_4(0x01020305)) self.assertEqual(source_slot["lastSeq"], 1) self.assertEqual(source_slot["loss"], 0) def test_hbp_group_voice_to_enhanced_obp_without_keepalive_is_dropped(self): config = minimal_config(("MASTER-A",)) add_openbridge_system(config, "OBP-1", network_id=3001) config["SYSTEMS"]["OBP-1"]["ENHANCED_OBP"] = True bridges = active_bridge( "91", 91, ( ("MASTER-A", 2), ("OBP-1", 1), ), ) with DeterministicScenario(config=config, bridges=bridges) as scenario: packet = PacketSpec( dst_id=91, slot=2, stream_id=0x01020304, call_type="group", frame_type=HBPF_DATA_SYNC, dtype_vseq=HBPF_SLT_VHEAD, ) scenario.inject_hbp("MASTER-A", packet) self.assertEqual(scenario.capture.for_system("OBP-1"), []) def test_hbp_unit_data_to_hbp_reports_actual_target_slot(self): config = minimal_config(("MASTER-A", "MASTER-B")) config["REPORTS"]["REPORT"] = True with DeterministicScenario(config=config) as scenario: scenario.clock.advance(10) scenario.bm.SUB_MAP = { bytes_3(1234567): ("MASTER-B", 2, scenario.clock.time()) } packet = PacketSpec( dst_id=1234567, slot=1, stream_id=0x01020304, call_type="unit", frame_type=HBPF_DATA_SYNC, dtype_vseq=6, ) scenario.inject_hbp("MASTER-A", packet) captured = scenario.capture.for_system("MASTER-B") self.assertEqual(len(captured), 1) self.assertEqual(captured[0].fields["slot"], 2) self.assertIn( b"UNIT DATA,DATA,TX,MASTER-B,16909060,1001,3120001,2,1234567", scenario.reports["MASTER-B"].events, ) def test_obp_unit_data_to_hbp_reports_actual_target_slot(self): config = minimal_config(("MASTER-B",)) add_openbridge_system(config, "OBP-1", network_id=3001) config["REPORTS"]["REPORT"] = True with DeterministicScenario(config=config) as scenario: scenario.clock.advance(10) scenario.bm.SUB_MAP = { bytes_3(1234567): ("MASTER-B", 2, scenario.clock.time()) } packet = PacketSpec( dst_id=1234567, slot=1, stream_id=0x01020304, call_type="unit", frame_type=HBPF_DATA_SYNC, dtype_vseq=6, ) scenario.inject_obp("OBP-1", packet) captured = scenario.capture.for_system("MASTER-B") self.assertEqual(len(captured), 1) self.assertEqual(captured[0].fields["slot"], 2) self.assertIn( b"UNIT DATA,DATA,TX,MASTER-B,16909060,1001,3120001,2,1234567", scenario.reports["MASTER-B"].events, ) def test_obp_unit_data_loop_control_handles_zero_duration_packet_rate(self): config = minimal_config(("MASTER-A",)) add_openbridge_system(config, "OBP-1", network_id=3001) add_openbridge_system(config, "OBP-2", network_id=3002) with DeterministicScenario(config=config) as scenario: packet = PacketSpec( dst_id=1000001, slot=1, stream_id=0x01020304, call_type="unit", frame_type=HBPF_DATA_SYNC, dtype_vseq=6, ) scenario.inject_obp("OBP-1", packet) scenario.inject_obp("OBP-2", packet) stream_id = bytes_4(0x01020304) self.assertTrue(scenario.systems["OBP-2"].STATUS[stream_id]["LOOPLOG"]) self.assertEqual(scenario.capture.for_system("MASTER-A"), []) if __name__ == "__main__": unittest.main()