#!/usr/bin/env python3 """ SkywarnPlus by Mason Nelson (N5LSN/WRKF394) ================================================== SkywarnPlus is a utility that retrieves severe weather alerts from the National Weather Service and integrates these alerts with an Asterisk/app_rpt based radio repeater controller. This utility is designed to be highly configurable, allowing users to specify particular counties for which to check for alerts, the types of alerts to include or block, and how these alerts are integrated into their radio repeater system. This includes features such as automatic voice alerts and a tail message feature for constant updates. All alerts are sorted by severity and cover a broad range of weather conditions such as hurricane warnings, thunderstorms, heat waves, etc. Configurable through a .ini file, SkywarnPlus serves as a comprehensive and flexible tool for those who need to stay informed about weather conditions and disseminate this information through their radio repeater system. """ import os import json import logging import requests import configparser import shutil import fnmatch import subprocess import time from datetime import datetime, timezone from dateutil import parser from pydub import AudioSegment # Configuration file handling baseDir = os.path.dirname(os.path.realpath(__file__)) configPath = os.path.join(baseDir, "config.ini") config = configparser.ConfigParser() config.read_file(open(configPath, "r")) # Fetch values from configuration file master_enable = config["SKYWARNPLUS"].getboolean("Enable", fallback=False) if not master_enable: print("SkywarnPlus is disabled in config.ini, exiting...") exit() tmp_dir = config["DEV"].get("TmpDir", fallback="/tmp/SkywarnPlus") sounds_path = config["Alerting"].get("SoundsPath", fallback="./SOUNDS") if sounds_path == "./SOUNDS": sounds_path = os.path.join(baseDir, "SOUNDS") countyCodes = config["Alerting"]["CountyCodes"].split(",") # If temporary directory doesn't exist, create it if not os.path.exists(tmp_dir): os.makedirs(tmp_dir) # List of blocked events global_blocked_events = config["Blocking"].get("GlobalBlockedEvents").split(",") sayalert_blocked_events = config["Blocking"].get("SayAlertBlockedEvents").split(",") tailmessage_blocked_events = ( config["Blocking"].get("TailmessageBlockedEvents").split(",") ) # Maximum number of alerts to process max_alerts = config["Alerting"].getint("MaxAlerts", fallback=99) # Configuration for tailmessage tailmessage_config = config["Tailmessage"] # Flag to enable/disable tailmessage enable_tailmessage = tailmessage_config.getboolean("Enable", fallback=False) # Path to tailmessage file tailmessage_file = tailmessage_config.get( "TailmessagePath", fallback="./SOUNDS/wx-tail.wav" ) if tailmessage_file == "./SOUNDS/wx-tail.wav": tailmessage_file = os.path.join(baseDir, "SOUNDS/wx-tail.wav") # Warning and announcement strings WS = [ "Hurricane Force Wind Warning", "Severe Thunderstorm Warning", "Severe Thunderstorm Watch", "Winter Weather Advisory", "Tropical Storm Warning", "Special Marine Warning", "Freezing Rain Advisory", "Special Weather Statement", "Excessive Heat Warning", "Coastal Flood Advisory", "Coastal Flood Warning", "Winter Storm Warning", "Tropical Storm Watch", "Thunderstorm Warning", "Small Craft Advisory", "Extreme Wind Warning", "Excessive Heat Watch", "Wind Chill Advisory", "Storm Surge Warning", "River Flood Warning", "Flash Flood Warning", "Coastal Flood Watch", "Winter Storm Watch", "Wind Chill Warning", "Thunderstorm Watch", "Fire Weather Watch", "Dense Fog Advisory", "Storm Surge Watch", "River Flood Watch", "Ice Storm Warning", "Hurricane Warning", "High Wind Warning", "Flash Flood Watch", "Red Flag Warning", "Blizzard Warning", "Tornado Warning", "Hurricane Watch", "High Wind Watch", "Frost Advisory", "Freeze Warning", "Wind Advisory", "Tornado Watch", "Storm Warning", "Heat Advisory", "Flood Warning", "Gale Warning", "Freeze Watch", "Flood Watch", "Flood Advisory", "Hurricane Local Statement", "Beach Hazards Statement", "Air Quality Alert", "Severe Weather Statement", "Winter Storm Advisory", "Tropical Storm Advisory", "Blizzard Watch", "Dust Storm Warning", "High Surf Advisory", "Heat Watch", "Freeze Watch", "Dense Smoke Advisory", "Avalanche Warning", "SkywarnPlus Enabled", "SkywarnPlus Disabled", "SayAlert Enabled", "SayAlert Disabled", "SayAllClear Enabled", "SayAllClear Disabled", "Tailmessage Enabled", "Tailmessage Disabled", "CourtesyTone Enabled", "CourtesyTone Disabled", "Tic Sound Effect", "All Clear Message", "Updated Weather Information Message", "Error Sound Effect", "Word Space Silence", ] WA = [ "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "30", "31", "32", "33", "34", "35", "36", "37", "38", "39", "40", "41", "42", "43", "44", "45", "46", "47", "48", "49", "50", "51", "52", "53", "54", "55", "56", "57", "58", "59", "60", "61", "62", "85", "86", "87", "88", "89", "90", "91", "92", "93", "94", "95", "96", "97", "98", "99", ] # Cleanup flag for testing CLEANSLATE = config["DEV"].get("CLEANSLATE") if CLEANSLATE == "True": shutil.rmtree(tmp_dir) os.mkdir(tmp_dir) # Configure logging log_config = config["Logging"] enable_debug = log_config.getboolean("Debug", fallback=False) log_file = log_config.get("LogPath", fallback="{}/SkywarnPlus.log".format(tmp_dir)) logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG if enable_debug else logging.INFO) c_handler = logging.StreamHandler() f_handler = logging.FileHandler(log_file) c_format = f_format = logging.Formatter("%(asctime)s %(levelname)s %(message)s") c_handler.setFormatter(c_format) f_handler.setFormatter(f_format) logger.addHandler(c_handler) logger.addHandler(f_handler) # Debugging stuff logger.debug("Base directory: {}".format(baseDir)) logger.debug("Temporary directory: {}".format(tmp_dir)) logger.debug("Sounds path: {}".format(sounds_path)) logger.debug("Tailmessage path: {}".format(tailmessage_file)) logger.debug("Global Blocked events: {}".format(global_blocked_events)) logger.debug("SayAlert Blocked events: {}".format(sayalert_blocked_events)) logger.debug("Tailmessage Blocked events: {}".format(tailmessage_blocked_events)) def getAlerts(countyCodes): """ Retrieve severe weather alerts for specified county codes. Args: countyCodes (list): List of county codes. Returns: alerts (list): List of active weather alerts. """ # Severity mappings severity_mapping_api = {"Extreme": 4, "Severe": 3, "Moderate": 2, "Minor": 1, "Unknown": 0} severity_mapping_words = {"Warning": 4, "Watch": 3, "Advisory": 2, "Statement": 1} if config.getboolean("DEV", "INJECT", fallback=False): logger.debug("DEV Alert Injection Enabled") alerts = [ alert.strip() for alert in config["DEV"].get("INJECTALERTS").split(",") ] logger.debug("Injecting alerts: {}".format(alerts)) return alerts alerts = [] current_time = datetime.now(timezone.utc) logger.debug("Checking for alerts in {}".format(countyCodes)) for countyCode in countyCodes: logger.debug("Checking for alerts in {}".format(countyCode)) url = "https://api.weather.gov/alerts/active?zone={}".format(countyCode) logger.debug("Requesting {}".format(url)) response = requests.get(url) logger.debug("Response: {}\n\n".format(response.text)) if response.status_code == 200: alert_data = response.json() for feature in alert_data["features"]: effective = feature["properties"].get("effective") expires = feature["properties"].get("expires") if effective and expires: effective_time = parser.isoparse(effective) expires_time = parser.isoparse(expires) if effective_time <= current_time < expires_time: event = feature["properties"]["event"] for global_blocked_event in global_blocked_events: if fnmatch.fnmatch(event, global_blocked_event): logger.debug( "Globally Blocking {} as per configuration".format( event ) ) break else: severity = feature["properties"].get("severity") if severity is None: # Determine severity from the last word of the event if not provided last_word = event.split()[-1] severity = severity_mapping_words.get(last_word, 0) else: severity = severity_mapping_api.get(severity, 0) alerts.append((event, severity)) # Add event to list as a tuple else: logger.error( "Failed to retrieve alerts for {}, HTTP status code {}, response: {}".format( countyCode, response.status_code, response.text ) ) # Eliminate duplicates in a way that preserves order alerts = [x for i, x in enumerate(alerts) if alerts.index(x) == i] # Sort by both API-provided severity and 'words' severity alerts.sort( key=lambda x: ( x[1], # API-provided severity severity_mapping_words.get(x[0].split()[-1], 0) # 'words' severity ), reverse=True ) logger.debug("Sorted alerts: (alert), (severity)") for alert in alerts: logger.debug(alert) # Only keep the events (not the severities) alerts = [alert[0] for alert in alerts[:max_alerts]] # Only keep the first 'max_alerts' alerts return alerts def sayAlert(alerts): """ Generate and broadcast severe weather alert sounds on Asterisk. Args: alerts (list): List of active weather alerts. """ alert_file = "{}/alert.wav".format(sounds_path) combined_sound = AudioSegment.from_wav( os.path.join(sounds_path, "ALERTS", "SWP97.wav") ) sound_effect = AudioSegment.from_wav( os.path.join(sounds_path, "ALERTS", "SWP95.wav") ) alert_count = 0 # Counter for alerts added to combined_sound for alert in alerts: # Check if alert matches any pattern in the SayAlertBlockedEvents list if any(fnmatch.fnmatch(alert, blocked_event) for blocked_event in sayalert_blocked_events): logger.debug("SayAlert blocking {} as per configuration".format(alert)) continue try: index = WS.index(alert) audio_file = AudioSegment.from_wav( os.path.join(sounds_path, "ALERTS", "SWP{}.wav".format(WA[index])) ) combined_sound += sound_effect + audio_file logger.debug("Added {} (SWP{}.wav) to alert sound".format(alert, WA[index])) alert_count += 1 # Increment the counter except ValueError: logger.error("Alert not found: {}".format(alert)) except FileNotFoundError: logger.error( "Audio file not found: {}/ALERTS/SWP{}.wav".format( sounds_path, WA[index] ) ) if alert_count == 0: # Check the counter instead of combined_sound.empty() logger.debug("SayAlert: All alerts were blocked, not broadcasting any alerts.") else: logger.debug("Exporting alert sound to {}".format(alert_file)) converted_combined_sound = convert_audio(combined_sound) converted_combined_sound.export(alert_file, format="wav") logger.debug("Replacing tailmessage with silence") silence = AudioSegment.silent(duration=100) converted_silence = convert_audio(silence) converted_silence.export(tailmessage_file, format="wav") node_numbers = config["Asterisk"]["Nodes"].split(",") for node_number in node_numbers: logger.info("Broadcasting alert on node {}".format(node_number)) command = '/usr/sbin/asterisk -rx "rpt localplay {} {}"'.format( node_number.strip(), os.path.splitext(os.path.abspath(alert_file))[0] ) subprocess.run(command, shell=True) # This keeps Asterisk from playing the tailmessage immediately after the alert logger.info("Waiting 30 seconds for Asterisk to make announcement...") time.sleep(30) def sayAllClear(): """ Generate and broadcast 'all clear' message on Asterisk. """ alert_clear = os.path.join(sounds_path, "ALERTS", "SWP96.wav") node_numbers = config["Asterisk"]["Nodes"].split(",") for node_number in node_numbers: logger.info("Broadcasting all clear message on node {}".format(node_number)) command = '/usr/sbin/asterisk -rx "rpt localplay {} {}"'.format( node_number.strip(), os.path.splitext(os.path.abspath(alert_clear))[0] ) subprocess.run(command, shell=True) def buildTailmessage(alerts): """ Build a tailmessage, which is a short message appended to the end of a transmission to update on the weather conditions. Args: alerts (list): List of active weather alerts. """ if not alerts: logger.debug("No alerts, creating silent tailmessage") silence = AudioSegment.silent(duration=100) converted_silence = convert_audio(silence) converted_silence.export(tailmessage_file, format="wav") return combined_sound = AudioSegment.empty() sound_effect = AudioSegment.from_wav( os.path.join(sounds_path, "ALERTS", "SWP95.wav") ) for alert in alerts: # Check if alert matches any pattern in the TailmessageBlockedEvents list if any(fnmatch.fnmatch(alert, blocked_event) for blocked_event in tailmessage_blocked_events): logger.debug("Alert blocked by TailmessageBlockedEvents: {}".format(alert)) continue try: index = WS.index(alert) audio_file = AudioSegment.from_wav( os.path.join(sounds_path, "ALERTS", "SWP{}.wav".format(WA[index])) ) combined_sound += sound_effect + audio_file logger.debug("Added {} (SWP{}.wav) to tailmessage".format(alert, WA[index])) except ValueError: logger.error("Alert not found: {}".format(alert)) except FileNotFoundError: logger.error( "Audio file not found: {}/ALERTS/SWP{}.wav".format( sounds_path, WA[index] ) ) if combined_sound.empty(): logger.debug( "BuildTailmessage: All alerts were blocked, creating silent tailmessage" ) combined_sound = AudioSegment.silent(duration=100) logger.debug("Exporting tailmessage to {}".format(tailmessage_file)) converted_combined_sound = convert_audio(combined_sound) converted_combined_sound.export(tailmessage_file, format="wav") def changeCT(ct): """ Change the current Courtesy Tone (CT) to the one specified. The function first checks if the specified CT is already in use, and if it is, it returns without making any changes. If the CT needs to be changed, it replaces the current CT files with the new ones and updates the state file. If no CT is specified, the function logs an error message and returns. Args: ct (str): The name of the new CT to use. This should be one of the CTs specified in the config file. Returns: bool: True if the CT was changed, False otherwise. """ tone_dir = config["CourtesyTones"].get("ToneDir", fallback="./SOUNDS/TONES") if tone_dir == "./SOUNDS/TONES": tone_dir = os.path.join(sounds_path, "TONES") local_ct = config["CourtesyTones"].get("LocalCT") link_ct = config["CourtesyTones"].get("LinkCT") wx_ct = config["CourtesyTones"].get("WXCT") rpt_local_ct = config["CourtesyTones"].get("RptLocalCT") rpt_link_ct = config["CourtesyTones"].get("RptLinkCT") ct_state_file = os.path.join(tmp_dir, "ct_state.txt") logger.debug("Tone directory: {}".format(tone_dir)) logger.debug("CT argument: {}".format(ct)) if not ct: logger.error("ChangeCT called with no CT specified") return current_ct = None if os.path.exists(ct_state_file): with open(ct_state_file, "r") as file: current_ct = file.read().strip() logger.debug("Current CT: {}".format(current_ct)) if ct == current_ct: logger.debug("Courtesy tones are already {}, no changes made.".format(ct)) return False if ct == "NORMAL": logger.info("Changing to NORMAL courtesy tones") src_file = tone_dir + "/" + local_ct dest_file = tone_dir + "/" + rpt_local_ct logger.debug("Copying {} to {}".format(src_file, dest_file)) shutil.copyfile(src_file, dest_file) src_file = tone_dir + "/" + link_ct dest_file = tone_dir + "/" + rpt_link_ct logger.debug("Copying {} to {}".format(src_file, dest_file)) shutil.copyfile(src_file, dest_file) else: logger.info("Changing to {} courtesy tone".format(ct)) src_file = tone_dir + "/" + wx_ct dest_file = tone_dir + "/" + rpt_local_ct logger.debug("Copying {} to {}".format(src_file, dest_file)) shutil.copyfile(src_file, dest_file) src_file = tone_dir + "/" + wx_ct dest_file = tone_dir + "/" + rpt_link_ct logger.debug("Copying {} to {}".format(src_file, dest_file)) shutil.copyfile(src_file, dest_file) with open(ct_state_file, "w") as file: file.write(ct) return True def send_pushover_notification(message, title=None, priority=0): """ Send a push notification via Pushover service. The function constructs the payload for the request, including the user key, API token, message, title, and priority. The payload is then sent to the Pushover API endpoint. If the request fails, an error message is logged. Args: message (str): The content of the push notification. title (str, optional): The title of the push notification. Defaults to None. priority (int, optional): The priority of the push notification. Defaults to 0. Returns: None """ pushover_config = config["Pushover"] user_key = pushover_config.get("UserKey") token = pushover_config.get("APIToken") # Remove newline from the end of the message message = message.rstrip("\n") url = "https://api.pushover.net/1/messages.json" payload = { "token": token, "user": user_key, "message": message, "title": title, "priority": priority, } response = requests.post(url, data=payload) if response.status_code != 200: logger.error("Failed to send Pushover notification: {}".format(response.text)) def convert_audio(audio): """ Convert audio file to 8000Hz mono for compatibility with Asterisk. Args: audio (AudioSegment): Audio file to be converted. Returns: AudioSegment: Converted audio file. """ return audio.set_frame_rate(8000).set_channels(1) def main(): """ Main function of the script, that fetches and processes severe weather alerts, then integrates these alerts into an Asterisk/app_rpt based radio repeater system. """ say_alert_enabled = config["Alerting"].getboolean("SayAlert", fallback=False) say_all_clear_enabled = config["Alerting"].getboolean("SayAllClear", fallback=False) alerts = getAlerts(countyCodes) tmp_file = "{}/alerts.json".format(tmp_dir) if os.path.exists(tmp_file): with open(tmp_file, "r") as file: old_alerts = json.load(file) else: old_alerts = ["init"] logger.info("No previous alerts file found, starting fresh.") if old_alerts != alerts: with open(tmp_file, "w") as file: json.dump(alerts, file) ct_alerts = [ alert.strip() for alert in config["CourtesyTones"].get("CTAlerts").split(",") ] enable_ct_auto_change = config["CourtesyTones"].getboolean( "Enable", fallback=False ) pushover_enabled = config["Pushover"].getboolean("Enable", fallback=False) pushover_debug = config["Pushover"].getboolean("Debug", fallback=False) if len(alerts) == 0: logger.info("No alerts found") pushover_message = "Alerts Cleared\n" if not os.path.exists(tmp_file): with open(tmp_file, "w") as file: json.dump([], file) if say_all_clear_enabled: sayAllClear() else: logger.info("Alerts found: {}".format(alerts)) if say_alert_enabled: sayAlert(alerts) pushover_message = "\n".join(alerts) + "\n" if enable_ct_auto_change: logger.debug( "CT auto change is enabled, alerts that require a CT change: {}".format( ct_alerts ) ) # Check if any alert matches ct_alerts if set(alerts).intersection(ct_alerts): for alert in alerts: if alert in ct_alerts: logger.debug("Alert {} requires a CT change".format(alert)) if changeCT("WX"): # If the CT was actually changed if pushover_debug: pushover_message += "Changed courtesy tones to WX\n" break else: # No alerts require a CT change, revert back to normal logger.debug("No alerts require a CT change, reverting to normal.") if changeCT("NORMAL"): # If the CT was actually changed if pushover_debug: pushover_message += "Changed courtesy tones to NORMAL\n" else: logger.debug("CT auto change is not enabled") if enable_tailmessage: buildTailmessage(alerts) if pushover_debug: if not alerts: pushover_message += "WX tailmessage removed\n" else: pushover_message += "Built WX tailmessage\n" if pushover_enabled: pushover_message = pushover_message.rstrip("\n") logger.debug("Sending pushover notification: {}".format(pushover_message)) send_pushover_notification(pushover_message, title="Alerts Changed") else: logger.debug("No change in alerts") if __name__ == "__main__": main()