diff --git a/README.md b/README.md index 95a0425..fa501ac 100644 --- a/README.md +++ b/README.md @@ -173,6 +173,7 @@ To use the `SkyControl.py` script, you need to call it with two parameters: - SayAllClear - TailMessage - CourtesyTone + - IDChange - AlertScript 2. The new value for the setting (either 'true' or 'false' or 'toggle'). diff --git a/SkyControl.py b/SkyControl.py index b687943..5b21cb8 100644 --- a/SkyControl.py +++ b/SkyControl.py @@ -1,6 +1,6 @@ #!/usr/bin/python3 # SkyControl.py -# A Control Script for SkywarnPlus v0.2.0 +# A Control Script for SkywarnPlus v0.2.2 # by Mason Nelson (N5LSN/WRKF394) # # This script allows you to change the value of specific keys in the SkywarnPlus config.yaml file. @@ -18,9 +18,12 @@ import yaml import subprocess from pathlib import Path + # Define a function to change the CT def changeCT(ct): - tone_dir = config["CourtesyTones"].get("ToneDir", os.path.join(str(SCRIPT_DIR), "SOUNDS/TONES")) + tone_dir = config["CourtesyTones"].get( + "ToneDir", os.path.join(str(SCRIPT_DIR), "SOUNDS/TONES") + ) ct1 = config["CourtesyTones"]["Tones"]["CT1"] ct2 = config["CourtesyTones"]["Tones"]["CT2"] wx_ct = config["CourtesyTones"]["Tones"]["WXCT"] @@ -49,6 +52,7 @@ def changeCT(ct): print("Invalid CT value. Please provide either 'wx' or 'normal'.") sys.exit(1) + # Define a function to change the ID def changeID(id): id_dir = config["IDChange"].get("IDDir", os.path.join(SCRIPT_DIR, "ID")) @@ -70,16 +74,65 @@ def changeID(id): print("Invalid ID value. Please provide either 'wx' or 'normal'.") sys.exit(1) + # Define valid keys and corresponding audio files VALID_KEYS = { - "enable": {"key": "Enable", "section": "SKYWARNPLUS", "true_file": "SWP85.wav", "false_file": "SWP86.wav"}, - "sayalert": {"key": "SayAlert", "section": "Alerting", "true_file": "SWP87.wav", "false_file": "SWP88.wav"}, - "sayallclear": {"key": "SayAllClear", "section": "Alerting", "true_file": "SWP89.wav", "false_file": "SWP90.wav"}, - "tailmessage": {"key": "Enable", "section": "Tailmessage", "true_file": "SWP91.wav", "false_file": "SWP92.wav"}, - "courtesytone": {"key": "Enable", "section": "CourtesyTones", "true_file": "SWP93.wav", "false_file": "SWP94.wav"}, - "alertscript": {"key": "Enable", "section": "AlertScript", "true_file": "SWP81.wav", "false_file": "SWP82.wav"}, - "changect": {"key": "", "section": "", "true_file": "SWP79.wav", "false_file": "SWP80.wav", "available_values": ['wx', 'normal']}, - "changeid": {"key": "", "section": "", "true_file": "SWP77.wav", "false_file": "SWP78.wav", "available_values": ['WX', 'NORMAL']}, + "enable": { + "key": "Enable", + "section": "SKYWARNPLUS", + "true_file": "SWP85.wav", + "false_file": "SWP86.wav", + }, + "sayalert": { + "key": "SayAlert", + "section": "Alerting", + "true_file": "SWP87.wav", + "false_file": "SWP88.wav", + }, + "sayallclear": { + "key": "SayAllClear", + "section": "Alerting", + "true_file": "SWP89.wav", + "false_file": "SWP90.wav", + }, + "tailmessage": { + "key": "Enable", + "section": "Tailmessage", + "true_file": "SWP91.wav", + "false_file": "SWP92.wav", + }, + "courtesytone": { + "key": "Enable", + "section": "CourtesyTones", + "true_file": "SWP93.wav", + "false_file": "SWP94.wav", + }, + "idchange": { + "key": "Enable", + "section": "IDChange", + "true_file": "SWP83.wav", + "false_file": "SWP84.wav", + }, + "alertscript": { + "key": "Enable", + "section": "AlertScript", + "true_file": "SWP81.wav", + "false_file": "SWP82.wav", + }, + "changect": { + "key": "", + "section": "", + "true_file": "SWP79.wav", + "false_file": "SWP80.wav", + "available_values": ["wx", "normal"], + }, + "changeid": { + "key": "", + "section": "", + "true_file": "SWP77.wav", + "false_file": "SWP78.wav", + "available_values": ["WX", "NORMAL"], + }, } # Get the directory of the script @@ -102,45 +155,59 @@ if key not in VALID_KEYS: print("The provided key does not match any configurable item.") sys.exit(1) -# Validate the provided value +# Validate the provided value if key in ["changect", "changeid"]: if value not in VALID_KEYS[key]["available_values"]: - print("Invalid value for {}. Please provide either {} or {}".format(key, VALID_KEYS[key]['available_values'][0], VALID_KEYS[key]['available_values'][1])) + print( + "Invalid value for {}. Please provide either {} or {}".format( + key, + VALID_KEYS[key]["available_values"][0], + VALID_KEYS[key]["available_values"][1], + ) + ) sys.exit(1) else: - if value not in ['true', 'false', 'toggle']: + if value not in ["true", "false", "toggle"]: print("Invalid value. Please provide either 'true' or 'false' or 'toggle'.") sys.exit(1) # Load the config file -with open(str(CONFIG_FILE), 'r') as f: +with open(str(CONFIG_FILE), "r") as f: config = yaml.safe_load(f) -if key == 'changect': +if key == "changect": value = changeCT(value) -elif key == 'changeid': +elif key == "changeid": value = changeID(value) else: # Convert the input value to boolean if not 'toggle' - if value != 'toggle': - value = value.lower() == 'true' + if value != "toggle": + value = value.lower() == "true" # Check if toggle is required - if value == 'toggle': - current_value = config[VALID_KEYS[key]['section']][VALID_KEYS[key]['key']] + if value == "toggle": + current_value = config[VALID_KEYS[key]["section"]][VALID_KEYS[key]["key"]] value = not current_value # Update the key in the config - config[VALID_KEYS[key]['section']][VALID_KEYS[key]['key']] = value + config[VALID_KEYS[key]["section"]][VALID_KEYS[key]["key"]] = value # Save the updated config back to the file - with open(str(CONFIG_FILE), 'w') as f: + with open(str(CONFIG_FILE), "w") as f: yaml.dump(config, f) # Get the correct audio file based on the new value -audio_file = VALID_KEYS[key]['true_file'] if value else VALID_KEYS[key]['false_file'] +audio_file = VALID_KEYS[key]["true_file"] if value else VALID_KEYS[key]["false_file"] # Play the corresponding audio message on all nodes -nodes = config['Asterisk']['Nodes'] +nodes = config["Asterisk"]["Nodes"] for node in nodes: - subprocess.run(['/usr/sbin/asterisk', '-rx', 'rpt localplay {} {}/SOUNDS/ALERTS/{}'.format(node, SCRIPT_DIR, audio_file.rsplit(".", 1)[0])]) \ No newline at end of file + subprocess.run( + [ + "/usr/sbin/asterisk", + "-rx", + "rpt localplay {} {}/SOUNDS/ALERTS/{}".format( + node, SCRIPT_DIR, audio_file.rsplit(".", 1)[0] + ), + ] + ) diff --git a/SkywarnPlus.py b/SkywarnPlus.py index c256019..b689eb4 100644 --- a/SkywarnPlus.py +++ b/SkywarnPlus.py @@ -1,7 +1,7 @@ #!/usr/bin/python3 """ -SkywarnPlus v0.2.0 by Mason Nelson (N5LSN/WRKF394) +SkywarnPlus v0.2.2 by Mason Nelson (N5LSN/WRKF394) ================================================== SkywarnPlus is a utility that retrieves severe weather alerts from the National Weather Service and integrates these alerts with an Asterisk/app_rpt based @@ -49,7 +49,9 @@ if not master_enable: # Define tmp_dir and sounds_path tmp_dir = config.get("DEV", {}).get("TmpDir", "/tmp/SkywarnPlus") -sounds_path = config.get("Alerting", {}).get("SoundsPath", os.path.join(baseDir, "SOUNDS")) +sounds_path = config.get("Alerting", {}).get( + "SoundsPath", os.path.join(baseDir, "SOUNDS") +) # Define countyCodes countyCodes = config.get("Alerting", {}).get("CountyCodes", []) @@ -61,18 +63,14 @@ else: print("Error: tmp_dir is not set.") # Define Blocked events -global_blocked_events = ( - config.get("Blocking", {}).get("GlobalBlockedEvents", []) -) +global_blocked_events = config.get("Blocking", {}).get("GlobalBlockedEvents", []) if global_blocked_events is None: global_blocked_events = [] -sayalert_blocked_events = ( - config.get("Blocking", {}).get("SayAlertBlockedEvents", []) -) +sayalert_blocked_events = config.get("Blocking", {}).get("SayAlertBlockedEvents", []) if sayalert_blocked_events is None: sayalert_blocked_events = [] -tailmessage_blocked_events = ( - config.get("Blocking", {}).get("TailmessageBlockedEvents", []) +tailmessage_blocked_events = config.get("Blocking", {}).get( + "TailmessageBlockedEvents", [] ) if tailmessage_blocked_events is None: tailmessage_blocked_events = [] @@ -267,20 +265,22 @@ def load_state(): Load the state from the state file if it exists, else return an initial state. Returns: - dict: A dictionary containing courtesy tone (ct), identifier (id) and alerts. + dict: A dictionary containing data. """ if os.path.exists(data_file): with open(data_file, "r") as file: state = json.load(file) - # state["alertscript_alerts"] = set(state["alertscript_alerts"]) - # state["last_alerts"] = set(state["last_alerts"]) + state["alertscript_alerts"] = state["alertscript_alerts"] + state["last_alerts"] = state["last_alerts"] + state["last_sayalert"] = state.get("last_sayalert", []) return state else: return { "ct": None, "id": None, - "alertscript_alerts": set(), - "last_alerts": set(), + "alertscript_alerts": [], + "last_alerts": [], + "last_sayalert": [], } @@ -289,10 +289,11 @@ def save_state(state): Save the state to the state file. Args: - state (dict): A dictionary containing courtesy tone (ct), identifier (id) and alerts. + state (dict): A dictionary containing data. """ state["alertscript_alerts"] = list(state["alertscript_alerts"]) state["last_alerts"] = list(state["last_alerts"]) + state["last_sayalert"] = list(state["last_sayalert"]) with open(data_file, "w") as file: json.dump(state, file) @@ -321,9 +322,7 @@ def getAlerts(countyCodes): # Inject alerts if DEV INJECT is enabled in the config if config.get("DEV", {}).get("INJECT", False): logger.debug("getAlerts: DEV Alert Injection Enabled") - alerts = [ - alert.strip() for alert in config["DEV"].get("INJECTALERTS", []) - ] + alerts = [alert.strip() for alert in config["DEV"].get("INJECTALERTS", [])] logger.debug("getAlerts: Injecting alerts: %s", alerts) return alerts @@ -397,6 +396,28 @@ def sayAlert(alerts): alerts (list): List of active weather alerts. """ # Define the path of the alert file + state = load_state() + + filtered_alerts = [] + for alert in alerts: + if any( + fnmatch.fnmatch(alert, blocked_event) + for blocked_event in sayalert_blocked_events + ): + logger.debug("sayAlert: blocking %s as per configuration", alert) + continue + filtered_alerts.append(alert) + + # Check if the filtered alerts are the same as the last run + if filtered_alerts == state["last_sayalert"]: + logger.debug( + "sayAlert: The filtered alerts are the same as the last run. Not broadcasting." + ) + return + + state["last_sayalert"] = filtered_alerts + save_state(state) + alert_file = "{}/alert.wav".format(sounds_path) combined_sound = AudioSegment.from_wav( @@ -407,15 +428,7 @@ def sayAlert(alerts): ) alert_count = 0 - - for alert in alerts: - if any( - fnmatch.fnmatch(alert, blocked_event) - for blocked_event in sayalert_blocked_events - ): - logger.debug("sayAlert: blocking %s as per configuration", alert) - continue - + for alert in filtered_alerts: try: index = WS.index(alert) audio_file = AudioSegment.from_wav( @@ -437,32 +450,38 @@ def sayAlert(alerts): if alert_count == 0: logger.debug("sayAlert: All alerts were blocked, not broadcasting any alerts.") - else: - logger.debug("sayAlert: Exporting alert sound to %s", alert_file) - converted_combined_sound = convertAudio(combined_sound) - converted_combined_sound.export(alert_file, format="wav") + return - logger.debug("sayAlert: Replacing tailmessage with silence") - silence = AudioSegment.silent(duration=100) - converted_silence = convertAudio(silence) - converted_silence.export(tailmessage_file, format="wav") + logger.debug("sayAlert: Exporting alert sound to %s", alert_file) + converted_combined_sound = convertAudio(combined_sound) + converted_combined_sound.export(alert_file, format="wav") - node_numbers = config.get("Asterisk", {}).get("Nodes", []) - for node_number in node_numbers: - logger.info("Broadcasting alert on node %s", node_number) - command = '/usr/sbin/asterisk -rx "rpt localplay {} {}"'.format( - node_number, os.path.splitext(os.path.abspath(alert_file))[0] - ) - subprocess.run(command, shell=True) + logger.debug("sayAlert: Replacing tailmessage with silence") + silence = AudioSegment.silent(duration=100) + converted_silence = convertAudio(silence) + converted_silence.export(tailmessage_file, format="wav") - logger.info("Waiting 30 seconds for Asterisk to make announcement...") - time.sleep(30) + node_numbers = config.get("Asterisk", {}).get("Nodes", []) + for node_number in node_numbers: + logger.info("Broadcasting alert on node %s", node_number) + command = '/usr/sbin/asterisk -rx "rpt localplay {} {}"'.format( + node_number, os.path.splitext(os.path.abspath(alert_file))[0] + ) + subprocess.run(command, shell=True) + + logger.info("Waiting 30 seconds for Asterisk to make announcement...") + time.sleep(30) def sayAllClear(): """ Generate and broadcast 'all clear' message on Asterisk. """ + # Empty the last_sayalert list so that the next run will broadcast alerts + state = load_state() + state["last_sayalert"] = [] + save_state(state) + alert_clear = os.path.join(sounds_path, "ALERTS", "SWP96.wav") node_numbers = config.get("Asterisk", {}).get("Nodes", []) @@ -553,7 +572,9 @@ def changeCT(ct): """ state = load_state() current_ct = state["ct"] - tone_dir = config["CourtesyTones"].get("ToneDir", os.path.join(sounds_path, "TONES")) + tone_dir = config["CourtesyTones"].get( + "ToneDir", os.path.join(sounds_path, "TONES") + ) ct1 = config["CourtesyTones"]["Tones"]["CT1"] ct2 = config["CourtesyTones"]["Tones"]["CT2"] wx_ct = config["CourtesyTones"]["Tones"]["WXCT"] @@ -809,17 +830,20 @@ def change_and_log_CT_or_ID( alert_type, specified_alerts, ) + # Check if any alert matches specified_alerts - if set(alerts).intersection(specified_alerts): - for alert in alerts: - if alert in specified_alerts: - logger.debug("Alert %s requires a %s change", alert, alert_type) - if ( - changeCT("WX") if alert_type == "CT" else changeID("WX") - ): # If the CT/ID was actually changed - if pushover_debug: - pushover_message += "Changed {} to WX\n".format(alert_type) - break + # Here we replace set intersection with a list comprehension + intersecting_alerts = [alert for alert in alerts if alert in specified_alerts] + + if intersecting_alerts: + for alert in intersecting_alerts: + logger.debug("Alert %s requires a %s change", alert, alert_type) + if ( + changeCT("WX") if alert_type == "CT" else changeID("WX") + ): # If the CT/ID was actually changed + if pushover_debug: + pushover_message += "Changed {} to WX\n".format(alert_type) + break else: # No alerts require a CT/ID change, revert back to normal logger.debug( "No alerts require a %s change, reverting to normal.", alert_type @@ -862,6 +886,8 @@ def main(): alerts = getAlerts(countyCodes) # If new alerts differ from old ones, process new alerts + logger.debug("Last alerts: %s", last_alerts) + logger.debug("New alerts: %s", alerts) if last_alerts != alerts: state["last_alerts"] = alerts save_state(state) diff --git a/config.yaml b/config.yaml index 85eec4c..c302958 100644 --- a/config.yaml +++ b/config.yaml @@ -1,4 +1,4 @@ -# SkywarnPlus v0.2.0 Configuration File +# SkywarnPlus v0.2.2 Configuration File # Author: Mason Nelson (N5LSN/WRKF394) # Please edit this file according to your specific requirements. #