You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SkywarnPlus/SkywarnPlus.py

708 lines
23 KiB

#!/usr/bin/env python3
"""
SkywarnPlus v0.1.0 by Mason Nelson (N5LSN/WRKF394)
==================================================
SkywarnPlus is a utility that retrieves severe weather alerts from the National
Weather Service and integrates these alerts with an Asterisk/app_rpt based
radio repeater controller.
This utility is designed to be highly configurable, allowing users to specify
particular counties for which to check for alerts, the types of alerts to include
or block, and how these alerts are integrated into their radio repeater system.
This includes features such as automatic voice alerts and a tail message feature
for constant updates. All alerts are sorted by severity and cover a broad range
of weather conditions such as hurricane warnings, thunderstorms, heat waves, etc.
Configurable through a .ini file, SkywarnPlus serves as a comprehensive and
flexible tool for those who need to stay informed about weather conditions
and disseminate this information through their radio repeater system.
"""
import os
import json
import logging
import requests
import configparser
import shutil
import fnmatch
import subprocess
import time
from datetime import datetime, timezone
from dateutil import parser
from pydub import AudioSegment
# Configuration file handling
baseDir = os.path.dirname(os.path.realpath(__file__))
configPath = os.path.join(baseDir, "config.ini")
config = configparser.ConfigParser()
config.read_file(open(configPath, "r"))
# Fetch values from configuration file
master_enable = config["SKYWARNPLUS"].getboolean("Enable", fallback=False)
if not master_enable:
print("SkywarnPlus is disabled in config.ini, exiting...")
exit()
tmp_dir = config["DEV"].get("TmpDir", fallback="/tmp/SkywarnPlus")
sounds_path = config["Alerting"].get("SoundsPath", fallback="./SOUNDS")
if sounds_path == "./SOUNDS":
sounds_path = os.path.join(baseDir, "SOUNDS")
countyCodes = config["Alerting"]["CountyCodes"].split(",")
# If temporary directory doesn't exist, create it
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)
# List of blocked events
global_blocked_events = config["Blocking"].get("GlobalBlockedEvents").split(",")
sayalert_blocked_events = config["Blocking"].get("SayAlertBlockedEvents").split(",")
tailmessage_blocked_events = (
config["Blocking"].get("TailmessageBlockedEvents").split(",")
)
# Maximum number of alerts to process
max_alerts = config["Alerting"].getint("MaxAlerts", fallback=99)
# Configuration for tailmessage
tailmessage_config = config["Tailmessage"]
# Flag to enable/disable tailmessage
enable_tailmessage = tailmessage_config.getboolean("Enable", fallback=False)
# Path to tailmessage file
tailmessage_file = tailmessage_config.get(
"TailmessagePath", fallback="./SOUNDS/wx-tail.wav"
)
if tailmessage_file == "./SOUNDS/wx-tail.wav":
tailmessage_file = os.path.join(baseDir, "SOUNDS/wx-tail.wav")
# Warning and announcement strings
WS = [
"Hurricane Force Wind Warning",
"Severe Thunderstorm Warning",
"Severe Thunderstorm Watch",
"Winter Weather Advisory",
"Tropical Storm Warning",
"Special Marine Warning",
"Freezing Rain Advisory",
"Special Weather Statement",
"Excessive Heat Warning",
"Coastal Flood Advisory",
"Coastal Flood Warning",
"Winter Storm Warning",
"Tropical Storm Watch",
"Thunderstorm Warning",
"Small Craft Advisory",
"Extreme Wind Warning",
"Excessive Heat Watch",
"Wind Chill Advisory",
"Storm Surge Warning",
"River Flood Warning",
"Flash Flood Warning",
"Coastal Flood Watch",
"Winter Storm Watch",
"Wind Chill Warning",
"Thunderstorm Watch",
"Fire Weather Watch",
"Dense Fog Advisory",
"Storm Surge Watch",
"River Flood Watch",
"Ice Storm Warning",
"Hurricane Warning",
"High Wind Warning",
"Flash Flood Watch",
"Red Flag Warning",
"Blizzard Warning",
"Tornado Warning",
"Hurricane Watch",
"High Wind Watch",
"Frost Advisory",
"Freeze Warning",
"Wind Advisory",
"Tornado Watch",
"Storm Warning",
"Heat Advisory",
"Flood Warning",
"Gale Warning",
"Freeze Watch",
"Flood Watch",
"Flood Advisory",
"Hurricane Local Statement",
"Beach Hazards Statement",
"Air Quality Alert",
"Severe Weather Statement",
"Winter Storm Advisory",
"Tropical Storm Advisory",
"Blizzard Watch",
"Dust Storm Warning",
"High Surf Advisory",
"Heat Watch",
"Freeze Watch",
"Dense Smoke Advisory",
"Avalanche Warning",
"SkywarnPlus Enabled",
"SkywarnPlus Disabled",
"SayAlert Enabled",
"SayAlert Disabled",
"SayAllClear Enabled",
"SayAllClear Disabled",
"Tailmessage Enabled",
"Tailmessage Disabled",
"CourtesyTone Enabled",
"CourtesyTone Disabled",
"Tic Sound Effect",
"All Clear Message",
"Updated Weather Information Message",
"Error Sound Effect",
"Word Space Silence",
]
WA = [
"01",
"02",
"03",
"04",
"05",
"06",
"07",
"08",
"09",
"10",
"11",
"12",
"13",
"14",
"15",
"16",
"17",
"18",
"19",
"20",
"21",
"22",
"23",
"24",
"25",
"26",
"27",
"28",
"29",
"30",
"31",
"32",
"33",
"34",
"35",
"36",
"37",
"38",
"39",
"40",
"41",
"42",
"43",
"44",
"45",
"46",
"47",
"48",
"49",
"50",
"51",
"52",
"53",
"54",
"55",
"56",
"57",
"58",
"59",
"60",
"61",
"62",
"85",
"86",
"87",
"88",
"89",
"90",
"91",
"92",
"93",
"94",
"95",
"96",
"97",
"98",
"99",
]
# Cleanup flag for testing
CLEANSLATE = config["DEV"].get("CLEANSLATE")
if CLEANSLATE == "True":
shutil.rmtree(tmp_dir)
os.mkdir(tmp_dir)
# Configure logging
log_config = config["Logging"]
enable_debug = log_config.getboolean("Debug", fallback=False)
log_file = log_config.get("LogPath", fallback="{}/SkywarnPlus.log".format(tmp_dir))
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG if enable_debug else logging.INFO)
c_handler = logging.StreamHandler()
f_handler = logging.FileHandler(log_file)
c_format = f_format = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
c_handler.setFormatter(c_format)
f_handler.setFormatter(f_format)
logger.addHandler(c_handler)
logger.addHandler(f_handler)
# Debugging stuff
logger.debug("Base directory: {}".format(baseDir))
logger.debug("Temporary directory: {}".format(tmp_dir))
logger.debug("Sounds path: {}".format(sounds_path))
logger.debug("Tailmessage path: {}".format(tailmessage_file))
logger.debug("Global Blocked events: {}".format(global_blocked_events))
logger.debug("SayAlert Blocked events: {}".format(sayalert_blocked_events))
logger.debug("Tailmessage Blocked events: {}".format(tailmessage_blocked_events))
def getAlerts(countyCodes):
"""
Retrieve severe weather alerts for specified county codes.
Args:
countyCodes (list): List of county codes.
Returns:
alerts (list): List of active weather alerts.
"""
# Severity mappings
severity_mapping_api = {
"Extreme": 4,
"Severe": 3,
"Moderate": 2,
"Minor": 1,
"Unknown": 0,
}
severity_mapping_words = {"Warning": 4, "Watch": 3, "Advisory": 2, "Statement": 1}
if config.getboolean("DEV", "INJECT", fallback=False):
logger.debug("DEV Alert Injection Enabled")
alerts = [
alert.strip() for alert in config["DEV"].get("INJECTALERTS").split(",")
]
logger.debug("Injecting alerts: {}".format(alerts))
return alerts
alerts = []
current_time = datetime.now(timezone.utc)
logger.debug("Checking for alerts in {}".format(countyCodes))
for countyCode in countyCodes:
logger.debug("Checking for alerts in {}".format(countyCode))
url = "https://api.weather.gov/alerts/active?zone={}".format(countyCode)
logger.debug("Requesting {}".format(url))
response = requests.get(url)
logger.debug("Response: {}\n\n".format(response.text))
if response.status_code == 200:
alert_data = response.json()
for feature in alert_data["features"]:
effective = feature["properties"].get("effective")
expires = feature["properties"].get("expires")
if effective and expires:
effective_time = parser.isoparse(effective)
expires_time = parser.isoparse(expires)
if effective_time <= current_time < expires_time:
event = feature["properties"]["event"]
for global_blocked_event in global_blocked_events:
if fnmatch.fnmatch(event, global_blocked_event):
logger.debug(
"Globally Blocking {} as per configuration".format(
event
)
)
break
else:
severity = feature["properties"].get("severity")
if severity is None:
# Determine severity from the last word of the event if not provided
last_word = event.split()[-1]
severity = severity_mapping_words.get(last_word, 0)
else:
severity = severity_mapping_api.get(severity, 0)
alerts.append(
(event, severity)
) # Add event to list as a tuple
else:
logger.error(
"Failed to retrieve alerts for {}, HTTP status code {}, response: {}".format(
countyCode, response.status_code, response.text
)
)
# Eliminate duplicates in a way that preserves order
alerts = [x for i, x in enumerate(alerts) if alerts.index(x) == i]
# Sort by both API-provided severity and 'words' severity
alerts.sort(
key=lambda x: (
x[1], # API-provided severity
severity_mapping_words.get(x[0].split()[-1], 0), # 'words' severity
),
reverse=True,
)
logger.debug("Sorted alerts: (alert), (severity)")
for alert in alerts:
logger.debug(alert)
# Only keep the events (not the severities)
alerts = [
alert[0] for alert in alerts[:max_alerts]
] # Only keep the first 'max_alerts' alerts
return alerts
def sayAlert(alerts):
"""
Generate and broadcast severe weather alert sounds on Asterisk.
Args:
alerts (list): List of active weather alerts.
"""
alert_file = "{}/alert.wav".format(sounds_path)
combined_sound = AudioSegment.from_wav(
os.path.join(sounds_path, "ALERTS", "SWP97.wav")
)
sound_effect = AudioSegment.from_wav(
os.path.join(sounds_path, "ALERTS", "SWP95.wav")
)
alert_count = 0 # Counter for alerts added to combined_sound
for alert in alerts:
# Check if alert matches any pattern in the SayAlertBlockedEvents list
if any(
fnmatch.fnmatch(alert, blocked_event)
for blocked_event in sayalert_blocked_events
):
logger.debug("SayAlert blocking {} as per configuration".format(alert))
continue
try:
index = WS.index(alert)
audio_file = AudioSegment.from_wav(
os.path.join(sounds_path, "ALERTS", "SWP{}.wav".format(WA[index]))
)
combined_sound += sound_effect + audio_file
logger.debug("Added {} (SWP{}.wav) to alert sound".format(alert, WA[index]))
alert_count += 1 # Increment the counter
except ValueError:
logger.error("Alert not found: {}".format(alert))
except FileNotFoundError:
logger.error(
"Audio file not found: {}/ALERTS/SWP{}.wav".format(
sounds_path, WA[index]
)
)
if alert_count == 0: # Check the counter instead of combined_sound.empty()
logger.debug("SayAlert: All alerts were blocked, not broadcasting any alerts.")
else:
logger.debug("Exporting alert sound to {}".format(alert_file))
converted_combined_sound = convert_audio(combined_sound)
converted_combined_sound.export(alert_file, format="wav")
logger.debug("Replacing tailmessage with silence")
silence = AudioSegment.silent(duration=100)
converted_silence = convert_audio(silence)
converted_silence.export(tailmessage_file, format="wav")
node_numbers = config["Asterisk"]["Nodes"].split(",")
for node_number in node_numbers:
logger.info("Broadcasting alert on node {}".format(node_number))
command = '/usr/sbin/asterisk -rx "rpt localplay {} {}"'.format(
node_number.strip(), os.path.splitext(os.path.abspath(alert_file))[0]
)
subprocess.run(command, shell=True)
# This keeps Asterisk from playing the tailmessage immediately after the alert
logger.info("Waiting 30 seconds for Asterisk to make announcement...")
time.sleep(30)
def sayAllClear():
"""
Generate and broadcast 'all clear' message on Asterisk.
"""
alert_clear = os.path.join(sounds_path, "ALERTS", "SWP96.wav")
node_numbers = config["Asterisk"]["Nodes"].split(",")
for node_number in node_numbers:
logger.info("Broadcasting all clear message on node {}".format(node_number))
command = '/usr/sbin/asterisk -rx "rpt localplay {} {}"'.format(
node_number.strip(), os.path.splitext(os.path.abspath(alert_clear))[0]
)
subprocess.run(command, shell=True)
def buildTailmessage(alerts):
"""
Build a tailmessage, which is a short message appended to the end of a
transmission to update on the weather conditions.
Args:
alerts (list): List of active weather alerts.
"""
if not alerts:
logger.debug("No alerts, creating silent tailmessage")
silence = AudioSegment.silent(duration=100)
converted_silence = convert_audio(silence)
converted_silence.export(tailmessage_file, format="wav")
return
combined_sound = AudioSegment.empty()
sound_effect = AudioSegment.from_wav(
os.path.join(sounds_path, "ALERTS", "SWP95.wav")
)
for alert in alerts:
# Check if alert matches any pattern in the TailmessageBlockedEvents list
if any(
fnmatch.fnmatch(alert, blocked_event)
for blocked_event in tailmessage_blocked_events
):
logger.debug("Alert blocked by TailmessageBlockedEvents: {}".format(alert))
continue
try:
index = WS.index(alert)
audio_file = AudioSegment.from_wav(
os.path.join(sounds_path, "ALERTS", "SWP{}.wav".format(WA[index]))
)
combined_sound += sound_effect + audio_file
logger.debug("Added {} (SWP{}.wav) to tailmessage".format(alert, WA[index]))
except ValueError:
logger.error("Alert not found: {}".format(alert))
except FileNotFoundError:
logger.error(
"Audio file not found: {}/ALERTS/SWP{}.wav".format(
sounds_path, WA[index]
)
)
if combined_sound.empty():
logger.debug(
"BuildTailmessage: All alerts were blocked, creating silent tailmessage"
)
combined_sound = AudioSegment.silent(duration=100)
logger.debug("Exporting tailmessage to {}".format(tailmessage_file))
converted_combined_sound = convert_audio(combined_sound)
converted_combined_sound.export(tailmessage_file, format="wav")
def changeCT(ct):
"""
Change the current Courtesy Tone (CT) to the one specified.
The function first checks if the specified CT is already in use, and if it is, it returns without making any changes.
If the CT needs to be changed, it replaces the current CT files with the new ones and updates the state file.
If no CT is specified, the function logs an error message and returns.
Args:
ct (str): The name of the new CT to use. This should be one of the CTs specified in the config file.
Returns:
bool: True if the CT was changed, False otherwise.
"""
tone_dir = config["CourtesyTones"].get("ToneDir", fallback="./SOUNDS/TONES")
if tone_dir == "./SOUNDS/TONES":
tone_dir = os.path.join(sounds_path, "TONES")
local_ct = config["CourtesyTones"].get("LocalCT")
link_ct = config["CourtesyTones"].get("LinkCT")
wx_ct = config["CourtesyTones"].get("WXCT")
rpt_local_ct = config["CourtesyTones"].get("RptLocalCT")
rpt_link_ct = config["CourtesyTones"].get("RptLinkCT")
ct_state_file = os.path.join(tmp_dir, "ct_state.txt")
logger.debug("Tone directory: {}".format(tone_dir))
logger.debug("CT argument: {}".format(ct))
if not ct:
logger.error("ChangeCT called with no CT specified")
return
current_ct = None
if os.path.exists(ct_state_file):
with open(ct_state_file, "r") as file:
current_ct = file.read().strip()
logger.debug("Current CT: {}".format(current_ct))
if ct == current_ct:
logger.debug("Courtesy tones are already {}, no changes made.".format(ct))
return False
if ct == "NORMAL":
logger.info("Changing to NORMAL courtesy tones")
src_file = tone_dir + "/" + local_ct
dest_file = tone_dir + "/" + rpt_local_ct
logger.debug("Copying {} to {}".format(src_file, dest_file))
shutil.copyfile(src_file, dest_file)
src_file = tone_dir + "/" + link_ct
dest_file = tone_dir + "/" + rpt_link_ct
logger.debug("Copying {} to {}".format(src_file, dest_file))
shutil.copyfile(src_file, dest_file)
else:
logger.info("Changing to {} courtesy tone".format(ct))
src_file = tone_dir + "/" + wx_ct
dest_file = tone_dir + "/" + rpt_local_ct
logger.debug("Copying {} to {}".format(src_file, dest_file))
shutil.copyfile(src_file, dest_file)
src_file = tone_dir + "/" + wx_ct
dest_file = tone_dir + "/" + rpt_link_ct
logger.debug("Copying {} to {}".format(src_file, dest_file))
shutil.copyfile(src_file, dest_file)
with open(ct_state_file, "w") as file:
file.write(ct)
return True
def send_pushover_notification(message, title=None, priority=0):
"""
Send a push notification via Pushover service.
The function constructs the payload for the request, including the user key, API token, message, title, and priority.
The payload is then sent to the Pushover API endpoint. If the request fails, an error message is logged.
Args:
message (str): The content of the push notification.
title (str, optional): The title of the push notification. Defaults to None.
priority (int, optional): The priority of the push notification. Defaults to 0.
Returns:
None
"""
pushover_config = config["Pushover"]
user_key = pushover_config.get("UserKey")
token = pushover_config.get("APIToken")
# Remove newline from the end of the message
message = message.rstrip("\n")
url = "https://api.pushover.net/1/messages.json"
payload = {
"token": token,
"user": user_key,
"message": message,
"title": title,
"priority": priority,
}
response = requests.post(url, data=payload)
if response.status_code != 200:
logger.error("Failed to send Pushover notification: {}".format(response.text))
def convert_audio(audio):
"""
Convert audio file to 8000Hz mono for compatibility with Asterisk.
Args:
audio (AudioSegment): Audio file to be converted.
Returns:
AudioSegment: Converted audio file.
"""
return audio.set_frame_rate(8000).set_channels(1)
def main():
"""
Main function of the script, that fetches and processes severe weather
alerts, then integrates these alerts into an Asterisk/app_rpt based radio
repeater system.
"""
say_alert_enabled = config["Alerting"].getboolean("SayAlert", fallback=False)
say_all_clear_enabled = config["Alerting"].getboolean("SayAllClear", fallback=False)
alerts = getAlerts(countyCodes)
tmp_file = "{}/alerts.json".format(tmp_dir)
if os.path.exists(tmp_file):
with open(tmp_file, "r") as file:
old_alerts = json.load(file)
else:
old_alerts = ["init"]
logger.info("No previous alerts file found, starting fresh.")
if old_alerts != alerts:
with open(tmp_file, "w") as file:
json.dump(alerts, file)
ct_alerts = [
alert.strip()
for alert in config["CourtesyTones"].get("CTAlerts").split(",")
]
enable_ct_auto_change = config["CourtesyTones"].getboolean(
"Enable", fallback=False
)
pushover_enabled = config["Pushover"].getboolean("Enable", fallback=False)
pushover_debug = config["Pushover"].getboolean("Debug", fallback=False)
if len(alerts) == 0:
logger.info("No alerts found")
pushover_message = "Alerts Cleared\n"
if not os.path.exists(tmp_file):
with open(tmp_file, "w") as file:
json.dump([], file)
if say_all_clear_enabled:
sayAllClear()
else:
logger.info("Alerts found: {}".format(alerts))
if say_alert_enabled:
sayAlert(alerts)
pushover_message = "\n".join(alerts) + "\n"
if enable_ct_auto_change:
logger.debug(
"CT auto change is enabled, alerts that require a CT change: {}".format(
ct_alerts
)
)
# Check if any alert matches ct_alerts
if set(alerts).intersection(ct_alerts):
for alert in alerts:
if alert in ct_alerts:
logger.debug("Alert {} requires a CT change".format(alert))
if changeCT("WX"): # If the CT was actually changed
if pushover_debug:
pushover_message += "Changed courtesy tones to WX\n"
break
else: # No alerts require a CT change, revert back to normal
logger.debug("No alerts require a CT change, reverting to normal.")
if changeCT("NORMAL"): # If the CT was actually changed
if pushover_debug:
pushover_message += "Changed courtesy tones to NORMAL\n"
else:
logger.debug("CT auto change is not enabled")
if enable_tailmessage:
buildTailmessage(alerts)
if pushover_debug:
if not alerts:
pushover_message += "WX tailmessage removed\n"
else:
pushover_message += "Built WX tailmessage\n"
if pushover_enabled:
pushover_message = pushover_message.rstrip("\n")
logger.debug("Sending pushover notification: {}".format(pushover_message))
send_pushover_notification(pushover_message, title="Alerts Changed")
else:
logger.debug("No change in alerts")
if __name__ == "__main__":
main()

Powered by TurnKey Linux.