Progress v0.2.4

pull/24/head
Mason10198 3 years ago
parent 374e3122a3
commit 628c2b619f

@ -17,6 +17,8 @@ import sys
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from ruamel.yaml import YAML from ruamel.yaml import YAML
# Use ruamel.yaml instead of PyYAML to preserve comments in the config file
yaml = YAML() yaml = YAML()
# Define a function to change the CT # Define a function to change the CT

@ -0,0 +1,206 @@
import os
import sys
import requests
import json
from ruamel.yaml import YAML
import urllib.parse
import subprocess
import wave
import contextlib
import re
# Use ruamel.yaml instead of PyYAML
yaml = YAML()
# Directories and Paths
baseDir = os.path.dirname(os.path.realpath(__file__))
configPath = os.path.join(baseDir, "config.yaml")
# Open and read configuration file
with open(configPath, "r") as config_file:
config = yaml.load(config_file)
# Define tmp_dir
tmp_dir = config.get("DEV", {}).get("TmpDir", "/tmp/SkywarnPlus")
# Path to the data file
data_file = os.path.join(tmp_dir, "data.json")
# Enable debugging
debug = True
def debug_print(*args, **kwargs):
"""
Print debug information if debugging is enabled.
"""
if debug:
print(*args, **kwargs)
def load_state():
"""
Load the state from the state file if it exists, else return an initial state.
Returns:
dict: A dictionary containing data.
"""
if os.path.exists(data_file):
with open(data_file, "r") as file:
state = json.load(file)
return state
else:
return {
"ct": None,
"id": None,
"alertscript_alerts": [],
"last_alerts": [],
"last_sayalert": [],
"last_descriptions": {},
}
import re
def modify_description(description):
"""
Modify the description to make it more suitable for conversion to audio.
Args:
description (str): The description text.
Returns:
str: The modified description text.
"""
# Remove newline characters and replace multiple spaces with a single space
description = description.replace('\n', ' ')
description = re.sub(r'\s+', ' ', description)
# Replace some common weather abbreviations and symbols
abbreviations = {
"mph": "miles per hour",
"knots": "nautical miles per hour",
"Nm": "nautical miles",
"nm": "nautical miles",
"PM": "P.M.",
"AM": "A.M.",
"ft.": "feet",
"in.": "inches",
"m": "meter",
"km": "kilometer",
"mi": "mile",
"%": "percent",
"N": "north",
"S": "south",
"E": "east",
"W": "west",
"NE": "northeast",
"NW": "northwest",
"SE": "southeast",
"SW": "southwest",
"F": "Fahrenheit",
"C": "Celsius",
"UV": "ultraviolet",
"gusts up to": "gusts of up to",
"hrs": "hours",
"hr": "hour",
"min": "minute",
"sec": "second",
"sq": "square",
"w/": "with",
"c/o": "care of",
"blw": "below",
"abv": "above",
"avg": "average",
"fr": "from",
"to": "to",
"till": "until",
"b/w": "between",
"btwn": "between",
"N/A": "not available",
"&": "and",
"+": "plus",
"e.g.": "for example",
"i.e.": "that is",
"est.": "estimated",
"...": " dot dot dot ", # or replace with " pause "
"\n\n": " pause ", # or replace with a silence duration
}
for abbr, full in abbreviations.items():
description = description.replace(abbr, full)
# Space out numerical sequences for better pronunciation
description = re.sub(r"(\d)", r"\1 ", description)
# Reform time mentions to a standard format
description = re.sub(r"(\d{1,2})(\d{2}) (A\.M\.|P\.M\.)", r"\1:\2 \3", description)
return description.strip()
def convert_to_audio(api_key, text):
"""
Convert the given text to audio using the Voice RSS Text-to-Speech API.
Args:
api_key (str): The API key.
text (str): The text to convert.
Returns:
str: The path to the audio file.
"""
base_url = 'http://api.voicerss.org/'
params = {
'key': api_key,
'hl': 'en-us',
'src': urllib.parse.quote(text),
'c': 'WAV',
'f': '8khz_8bit_mono'
}
response = requests.get(base_url, params=params)
response.raise_for_status()
audio_file_path = os.path.join(tmp_dir, "description.wav")
with open(audio_file_path, 'wb') as file:
file.write(response.content)
return audio_file_path
def main(index):
state = load_state()
alerts = state["last_alerts"]
descriptions = state["last_descriptions"]
api_key = config["SkyDescribe"]["APIKey"]
try:
alert = alerts[index][0]
description = descriptions[alert]
except IndexError:
print("No alert at index {}".format(index))
description = "No alert description found at index {}".format(index)
# Modify the description
debug_print("Original description:", description)
description = modify_description(description)
debug_print("Modified description:", description)
# Convert description to audio
audio_file = convert_to_audio(api_key, description)
# Check the length of audio file
with contextlib.closing(wave.open(audio_file,'r')) as f:
frames = f.getnframes()
rate = f.getframerate()
duration = frames / float(rate)
debug_print("Length of the audio file in seconds: ", duration)
# Play the corresponding audio message on all nodes
nodes = config["Asterisk"]["Nodes"]
for node in nodes:
command = "/usr/sbin/asterisk -rx 'rpt localplay {} {}'".format(
node, audio_file.rsplit('.', 1)[0]
)
debug_print("Running command:", command)
subprocess.run(command, shell=True)
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: SkyDescribe.py <alert index>")
sys.exit(1)
main(int(sys.argv[1]))

@ -28,10 +28,14 @@ import shutil
import fnmatch import fnmatch
import subprocess import subprocess
import time import time
import yaml
from datetime import datetime, timezone from datetime import datetime, timezone
from dateutil import parser from dateutil import parser
from pydub import AudioSegment from pydub import AudioSegment
from ruamel.yaml import YAML
from collections import OrderedDict
# Use ruamel.yaml instead of PyYAML
yaml = YAML()
# Directories and Paths # Directories and Paths
baseDir = os.path.dirname(os.path.realpath(__file__)) baseDir = os.path.dirname(os.path.realpath(__file__))
@ -39,7 +43,7 @@ configPath = os.path.join(baseDir, "config.yaml")
# Open and read configuration file # Open and read configuration file
with open(configPath, "r") as config_file: with open(configPath, "r") as config_file:
config = yaml.safe_load(config_file) config = yaml.load(config_file)
# Check if SkywarnPlus is enabled # Check if SkywarnPlus is enabled
master_enable = config.get("SKYWARNPLUS", {}).get("Enable", False) master_enable = config.get("SKYWARNPLUS", {}).get("Enable", False)
@ -270,13 +274,18 @@ def load_state():
Load the state from the state file if it exists, else return an initial state. Load the state from the state file if it exists, else return an initial state.
Returns: Returns:
dict: A dictionary containing data. OrderedDict: A dictionary containing data.
""" """
if os.path.exists(data_file): if os.path.exists(data_file):
with open(data_file, "r") as file: with open(data_file, "r") as file:
state = json.load(file) state = json.load(file)
state["alertscript_alerts"] = state["alertscript_alerts"] state["alertscript_alerts"] = state.get("alertscript_alerts", [])
state["last_alerts"] = state["last_alerts"] last_alerts = state.get("last_alerts", [])
last_alerts = [
(tuple(x[0]), x[1]) if isinstance(x[0], list) else x
for x in last_alerts
]
state["last_alerts"] = OrderedDict(last_alerts)
state["last_sayalert"] = state.get("last_sayalert", []) state["last_sayalert"] = state.get("last_sayalert", [])
return state return state
else: else:
@ -284,7 +293,7 @@ def load_state():
"ct": None, "ct": None,
"id": None, "id": None,
"alertscript_alerts": [], "alertscript_alerts": [],
"last_alerts": [], "last_alerts": OrderedDict(),
"last_sayalert": [], "last_sayalert": [],
} }
@ -294,13 +303,13 @@ def save_state(state):
Save the state to the state file. Save the state to the state file.
Args: Args:
state (dict): A dictionary containing data. state (OrderedDict): A dictionary containing data.
""" """
state["alertscript_alerts"] = list(state["alertscript_alerts"]) state["alertscript_alerts"] = list(state["alertscript_alerts"])
state["last_alerts"] = list(state["last_alerts"]) state["last_alerts"] = list(state["last_alerts"].items())
state["last_sayalert"] = list(state["last_sayalert"]) state["last_sayalert"] = list(state["last_sayalert"])
with open(data_file, "w") as file: with open(data_file, "w") as file:
json.dump(state, file) json.dump(state, file, ensure_ascii=False, indent=4)
def getAlerts(countyCodes): def getAlerts(countyCodes):
@ -312,7 +321,7 @@ def getAlerts(countyCodes):
Returns: Returns:
alerts (list): List of active weather alerts. alerts (list): List of active weather alerts.
In case of alert injection from the config, return the injected alerts. descriptions (dict): Dictionary of alert descriptions.
""" """
# Mapping for severity for API response and the 'words' severity # Mapping for severity for API response and the 'words' severity
severity_mapping_api = { severity_mapping_api = {
@ -327,15 +336,20 @@ def getAlerts(countyCodes):
# Inject alerts if DEV INJECT is enabled in the config # Inject alerts if DEV INJECT is enabled in the config
if config.get("DEV", {}).get("INJECT", False): if config.get("DEV", {}).get("INJECT", False):
logger.debug("getAlerts: DEV Alert Injection Enabled") logger.debug("getAlerts: DEV Alert Injection Enabled")
alerts = [alert.strip() for alert in config["DEV"].get("INJECTALERTS", [])] injected_alerts = [
logger.debug("getAlerts: Injecting alerts: %s", alerts) alert.strip() for alert in config["DEV"].get("INJECTALERTS", [])
]
logger.debug("getAlerts: Injecting alerts: %s", injected_alerts)
alerts = OrderedDict((alert, "Injected manually") for alert in injected_alerts)
return alerts return alerts
alerts = [] alerts = OrderedDict()
seen_alerts = set() # Store seen alerts
current_time = datetime.now(timezone.utc) current_time = datetime.now(timezone.utc)
for countyCode in countyCodes: for countyCode in countyCodes:
url = "https://api.weather.gov/alerts/active?zone={}".format(countyCode) # url = "https://api.weather.gov/alerts/active?zone={}".format(countyCode)
url = "https://api.weather.gov/alerts/active"
logger.debug("getAlerts: Checking for alerts in %s at URL: %s", countyCode, url) logger.debug("getAlerts: Checking for alerts in %s at URL: %s", countyCode, url)
response = requests.get(url) response = requests.get(url)
@ -349,6 +363,9 @@ def getAlerts(countyCodes):
expires_time = parser.isoparse(expires) expires_time = parser.isoparse(expires)
if effective_time <= current_time < expires_time: if effective_time <= current_time < expires_time:
event = feature["properties"]["event"] event = feature["properties"]["event"]
# Check if alert has already been seen
if event in seen_alerts:
continue
for global_blocked_event in global_blocked_events: for global_blocked_event in global_blocked_events:
if fnmatch.fnmatch(event, global_blocked_event): if fnmatch.fnmatch(event, global_blocked_event):
logger.debug( logger.debug(
@ -358,14 +375,14 @@ def getAlerts(countyCodes):
break break
else: else:
severity = feature["properties"].get("severity", None) severity = feature["properties"].get("severity", None)
description = feature["properties"].get("description", "")
if severity is None: if severity is None:
last_word = event.split()[-1] last_word = event.split()[-1]
severity = severity_mapping_words.get(last_word, 0) severity = severity_mapping_words.get(last_word, 0)
else: else:
severity = severity_mapping_api.get(severity, 0) severity = severity_mapping_api.get(severity, 0)
alerts.append( alerts[(event, severity)] = description
(event, severity) seen_alerts.add(event)
) # Add event to list as a tuple
else: else:
logger.error( logger.error(
"Failed to retrieve alerts for %s, HTTP status code %s, response: %s", "Failed to retrieve alerts for %s, HTTP status code %s, response: %s",
@ -374,21 +391,18 @@ def getAlerts(countyCodes):
response.text, response.text,
) )
alerts = list(dict.fromkeys(alerts)) alerts = OrderedDict(
sorted(
alerts.sort( alerts.items(),
key=lambda x: ( key=lambda item: (
x[1], # API-provided severity item[0][1], # API-provided severity
severity_mapping_words.get(x[0].split()[-1], 0), # 'words' severity severity_mapping_words.get(item[0][0].split()[-1], 0), # Words severity
), ),
reverse=True, reverse=True,
)
) )
logger.debug("getAlerts: Sorted alerts - (alert), (severity)") alerts = OrderedDict(list(alerts.items())[:max_alerts])
for alert in alerts:
logger.debug(alert)
alerts = [alert[0] for alert in alerts[:max_alerts]]
return alerts return alerts
@ -398,7 +412,7 @@ def sayAlert(alerts):
Generate and broadcast severe weather alert sounds on Asterisk. Generate and broadcast severe weather alert sounds on Asterisk.
Args: Args:
alerts (list): List of active weather alerts. alerts (OrderedDict): OrderedDict of active weather alerts and their descriptions.
""" """
# Define the path of the alert file # Define the path of the alert file
state = load_state() state = load_state()
@ -867,14 +881,6 @@ def main():
The main function that orchestrates the entire process of fetching and The main function that orchestrates the entire process of fetching and
processing severe weather alerts, then integrating these alerts into processing severe weather alerts, then integrating these alerts into
an Asterisk/app_rpt based radio repeater system. an Asterisk/app_rpt based radio repeater system.
Key Steps:
1. Fetch the configuration from the local setup.
2. Get the new alerts based on the provided county codes.
3. Compare the new alerts with the previously stored alerts.
4. If there's a change, store the new alerts and process them accordingly.
5. Check each alert against a set of specified alert types and perform actions accordingly.
6. Send notifications if enabled.
""" """
# Fetch configurations # Fetch configurations
say_alert_enabled = config["Alerting"].get("SayAlert", False) say_alert_enabled = config["Alerting"].get("SayAlert", False)
@ -891,9 +897,10 @@ def main():
alerts = getAlerts(countyCodes) alerts = getAlerts(countyCodes)
# If new alerts differ from old ones, process new alerts # If new alerts differ from old ones, process new alerts
logger.debug("Last alerts: %s", last_alerts)
logger.debug("New alerts: %s", alerts) if last_alerts.keys() != alerts.keys():
if last_alerts != alerts: new_alerts = [x for x in alerts.keys() if x not in last_alerts.keys()]
logger.info("New alerts: %s", new_alerts)
state["last_alerts"] = alerts state["last_alerts"] = alerts
save_state(state) save_state(state)
@ -908,7 +915,9 @@ def main():
# Initialize pushover message # Initialize pushover message
pushover_message = ( pushover_message = (
"Alerts Cleared\n" if len(alerts) == 0 else "\n".join(alerts) + "\n" "Alerts Cleared\n"
if len(alerts) == 0
else "\n".join(str(key) for key in alerts.keys()) + "\n"
) )
# Check if Courtesy Tones (CT) or ID needs to be changed # Check if Courtesy Tones (CT) or ID needs to be changed
@ -935,7 +944,6 @@ def main():
if say_all_clear_enabled: if say_all_clear_enabled:
sayAllClear() sayAllClear()
else: else:
logger.info("Alerts found: %s", alerts)
if alertscript_enabled: if alertscript_enabled:
alertScript(alerts) alertScript(alerts)
if say_alert_enabled: if say_alert_enabled:

@ -22,7 +22,7 @@ Asterisk:
# - 1998 # - 1998
# - 1999 # - 1999
Nodes: Nodes:
- YOUR_NODE_NUMBER_HERE - 1999
################################################################################################################################ ################################################################################################################################
@ -35,7 +35,7 @@ Alerting:
# - ARC121 # - ARC121
# - ARC021 # - ARC021
CountyCodes: CountyCodes:
- YOUR_COUNTY_CODE_HERE - ARC125
# Enable instant voice announcement when new weather alerts are issued. # Enable instant voice announcement when new weather alerts are issued.
# Set to 'True' for enabling or 'False' for disabling. # Set to 'True' for enabling or 'False' for disabling.
# Example: SayAlert: true # Example: SayAlert: true
@ -156,6 +156,12 @@ IDChange:
################################################################################################################################ ################################################################################################################################
SkyDescribe:
Enable: false
APIKey:
################################################################################################################################
AlertScript: AlertScript:
# Completely enable/disable AlertScript # Completely enable/disable AlertScript
Enable: false Enable: false
@ -239,7 +245,7 @@ Pushover:
Logging: Logging:
# Enable verbose logging # Enable verbose logging
Debug: false Debug: true
# Specify an alternative log file path. # Specify an alternative log file path.
# LogPath: # LogPath:
@ -249,7 +255,7 @@ DEV:
# Delete cached data on startup # Delete cached data on startup
CLEANSLATE: false CLEANSLATE: false
# Specify the TMP directory. # Specify the TMP directory.
TmpDir: /tmp/SkywarnPlus TmpDir: C:\Users\Mason\Desktop\SWP_TMP
# Enable test alert injection instead of calling the NWS API by setting 'INJECT' to 'True'. # Enable test alert injection instead of calling the NWS API by setting 'INJECT' to 'True'.
INJECT: false INJECT: false
# List the test alerts to inject. Use a case-sensitive list. One alert per line. # List the test alerts to inject. Use a case-sensitive list. One alert per line.

Loading…
Cancel
Save

Powered by TurnKey Linux.