@ -1,7 +1,7 @@
#!/usr/bin/python3
"""
SkywarnPlus v0 . 2.6 by Mason Nelson ( N5LSN / WRKF394 )
SkywarnPlus v0 . 3.0 by Mason Nelson ( N5LSN / WRKF394 )
== == == == == == == == == == == == == == == == == == == == == == == == ==
SkywarnPlus is a utility that retrieves severe weather alerts from the National
Weather Service and integrates these alerts with an Asterisk / app_rpt based
@ -276,9 +276,6 @@ logger.debug("Tailmessage Blocked events: %s", tailmessage_blocked_events)
def load_state ( ) :
"""
Load the state from the state file if it exists , else return an initial state .
Returns :
OrderedDict : A dictionary containing data .
"""
if os . path . exists ( data_file ) :
with open ( data_file , " r " ) as file :
@ -307,9 +304,6 @@ def load_state():
def save_state ( state ) :
"""
Save the state to the state file .
Args :
state ( OrderedDict ) : A dictionary containing data .
"""
state [ " alertscript_alerts " ] = list ( state [ " alertscript_alerts " ] )
state [ " last_alerts " ] = list ( state [ " last_alerts " ] . items ( ) )
@ -322,13 +316,6 @@ def save_state(state):
def getAlerts ( countyCodes ) :
"""
Retrieve severe weather alerts for specified county codes .
Args :
countyCodes ( list ) : List of county codes .
Returns :
alerts ( list ) : List of active weather alerts .
descriptions ( dict ) : Dictionary of alert descriptions .
"""
# Mapping for severity for API response and the 'words' severity
severity_mapping_api = {
@ -350,17 +337,24 @@ def getAlerts(countyCodes):
logger . debug ( " getAlerts: DEV Alert Injection Enabled " )
injected_alerts = config [ " DEV " ] . get ( " INJECTALERTS " , [ ] )
logger . debug ( " getAlerts: Injecting alerts: %s " , injected_alerts )
if injected_alerts is None :
injected_alerts = [ ]
for event in injected_alerts :
last_word = event . split ( ) [ - 1 ]
severity = severity_mapping_words . get ( last_word , 0 )
alerts [ ( event , severity ) ] = " Manually injected. "
description = " This alert was manually injected as a test. "
end_time_utc = current_time + timedelta ( hours = 1 )
alerts [ ( event ) ] = (
severity ,
description ,
end_time_utc . strftime ( " % Y- % m- %d T % H: % M: % S. %f Z " ) ,
)
alerts = OrderedDict ( list ( alerts . items ( ) ) [ : max_alerts ] )
return alerts
# Determing whether to use 'effective' or 'onset' time
# Determin e whether to use 'effective' or 'onset' time
timetype_mode = config . get ( " Alerting " , { } ) . get ( " TimeType " , " onset " )
if timetype_mode == " effective " :
logger . debug ( " getAlerts: Using effective time for alerting " )
@ -373,10 +367,13 @@ def getAlerts(countyCodes):
for countyCode in countyCodes :
url = " https://api.weather.gov/alerts/active?zone= {} " . format ( countyCode )
logger . debug ( " getAlerts: Checking for alerts in %s at URL: %s " , countyCode , url )
response = requests . get ( url )
try :
response = requests . get ( url )
response . raise_for_status ( ) # will raise an exception if the status code is not 200
if response . status_code == 200 :
logger . debug (
" getAlerts: Checking for alerts in %s at URL: %s " , countyCode , url
)
alert_data = response . json ( )
for feature in alert_data [ " features " ] :
start = feature [ " properties " ] . get ( time_type_start )
@ -415,7 +412,11 @@ def getAlerts(countyCodes):
severity = severity_mapping_words . get ( last_word , 0 )
else :
severity = severity_mapping_api . get ( severity , 0 )
alerts [ ( event , severity ) ] = description
alerts [ ( event ) ] = (
severity ,
description ,
end_time_utc . strftime ( " % Y- % m- %d T % H: % M: % S. %f Z " ) ,
)
seen_alerts . add ( event )
else :
time_difference = time_until ( start_time_utc , current_time )
@ -431,20 +432,46 @@ def getAlerts(countyCodes):
end_time_utc ,
)
else :
logger . error (
" Failed to retrieve alerts for %s , HTTP status code %s , response: %s " ,
countyCode ,
response . status_code ,
response . text ,
)
except requests . exceptions . RequestException as e :
logger . error ( " Failed to retrieve alerts for %s . Reason: %s " , countyCode , e )
logger . info ( " API unreachable. Using stored data instead. " )
# Load alerts from data.json
if os . path . isfile ( data_file ) :
with open ( data_file ) as f :
data = json . load ( f )
stored_alerts = data . get ( " last_alerts " , [ ] )
# Filter alerts by end_time_utc
current_time_str = datetime . now ( timezone . utc ) . strftime (
" % Y- % m- %d T % H: % M: % S. %f Z "
)
logger . debug ( " Current time: %s " , current_time_str )
alerts = { }
for event , alert in stored_alerts :
end_time_str = alert [ 2 ]
if parser . parse ( end_time_str ) > = parser . parse ( current_time_str ) :
logger . debug (
" getAlerts: Keeping %s because it does not expire until %s " ,
event ,
end_time_str ,
)
alerts [ event ] = alert
else :
logger . debug (
" getAlerts: Removing %s because it expired at %s " ,
event ,
end_time_str ,
)
else :
logger . error ( " No stored data available. " )
alerts = OrderedDict (
sorted (
alerts . items ( ) ,
key = lambda item : (
item [ 0 ] [ 1 ] , # API-provided severity
severity_mapping_words . get ( item [ 0 ] [ 0 ] . split ( ) [ - 1 ] , 0 ) , # Words severity
item [ 1] [ 0 ] , # S everity
severity_mapping_words . get ( item [ 0 ] . split ( ) [ - 1 ] , 0 ) , # Words severity
) ,
reverse = True ,
)
@ -459,15 +486,6 @@ def time_until(start_time_utc, current_time):
"""
Calculate the time difference between two datetime objects and returns it
as a formatted string .
Args :
start_time_utc ( datetime ) : The start time of the alert in UTC .
current_time ( datetime ) : The current time in UTC .
Returns :
str : Formatted string displaying the time difference . If the alert has
not started yet , it returns a positive time difference . If the
alert has already started , it returns a negative time difference .
"""
delta = start_time_utc - current_time
sign = " - " if delta < timedelta ( 0 ) else " "
@ -482,15 +500,12 @@ def time_until(start_time_utc, current_time):
def sayAlert ( alerts ) :
"""
Generate and broadcast severe weather alert sounds on Asterisk .
Args :
alerts ( OrderedDict ) : OrderedDict of active weather alerts and their descriptions .
"""
# Define the path of the alert file
state = load_state ( )
# Extract only the alert names from the OrderedDict keys
alert_names = [ alert [ 0 ] for alert in alerts . keys ( ) ]
alert_names = [ alert for alert in alerts . keys ( ) ]
filtered_alerts = [ ]
for alert in alert_names :
@ -579,7 +594,7 @@ def sayAlert(alerts):
wait_time = duration + 5
logger . info (
logger . debug (
" Waiting %s seconds for Asterisk to make announcement to avoid doubling alerts with tailmessage... " ,
wait_time ,
)
@ -610,19 +625,16 @@ def buildTailmessage(alerts):
"""
Build a tailmessage , which is a short message appended to the end of a
transmission to update on the weather conditions .
Args :
alerts ( list ) : List of active weather alerts .
"""
# Extract only the alert names from the OrderedDict keys
alert_names = [ alert [ 0 ] for alert in alerts . keys ( ) ]
alert_names = [ alert for alert in alerts . keys ( ) ]
# Get the suffix config
tailmessage_suffix = config . get ( " Tailmessage " , { } ) . get ( " TailmessageSuffix " , None )
if not alerts :
logger . info ( " buildTailMessage: No alerts, creating silent tailmessage " )
logger . debug ( " buildTailMessage: No alerts, creating silent tailmessage " )
silence = AudioSegment . silent ( duration = 100 )
converted_silence = convertAudio ( silence )
converted_silence . export ( tailmessage_file , format = " wav " )
@ -686,18 +698,8 @@ def buildTailmessage(alerts):
def changeCT ( ct ) :
"""
Change the current Courtesy Tone ( CT ) to the one specified .
This function first checks if the specified CT is already in use . If so , it does not make any changes .
If the CT needs to be changed , it replaces the current CT files with the new ones and updates the state file .
Args :
ct ( str ) : The name of the new CT to use . This should be one of the CTs specified in the config file .
Returns :
bool : True if the CT was changed , False otherwise .
Raises :
FileNotFoundError : If the specified CT files are not found .
"""
state = load_state ( )
current_ct = state [ " ct " ]
@ -764,18 +766,8 @@ def changeCT(ct):
def changeID ( id ) :
"""
Change the current Identifier ( ID ) to the one specified .
This function first checks if the specified ID is already in use . If so , it does not make any changes .
If the ID needs to be changed , it replaces the current ID files with the new ones and updates the state file .
Args :
id ( str ) : The name of the new ID to use . This should be one of the IDs specified in the config file .
Returns :
bool : True if the ID was changed , False otherwise .
Raises :
FileNotFoundError : If the specified ID files are not found .
"""
state = load_state ( )
current_id = state [ " id " ]
@ -825,24 +817,30 @@ def alertScript(alerts):
"""
This function reads a list of alerts , then performs actions based
on the alert triggers defined in the global configuration file .
: param alerts : List of alerts to process
: type alerts : list [ str ]
"""
# Load the saved state
state = load_state ( )
processed_alerts = state [ " alertscript_alerts " ]
active_alerts = state . get ( " active_alerts " , [ ] ) # Load active alerts from state
processed_alerts = set (
state [ " alertscript_alerts " ]
) # Change this to a set for easier processing
active_alerts = set (
state . get ( " active_alerts " , [ ] )
) # Load active alerts from state, also as a set
# Extract only the alert names from the OrderedDict keys
alert_names = [ alert [ 0 ] for alert in alerts . keys ( ) ]
alert_names = set ( [ alert for alert in alerts . keys ( ) ] ) # This should also be a set
# New alerts are those that are in the current alerts but were not active before
new_alerts = list ( set ( alert_names ) - set ( active_alerts ) )
new_alerts = alert_names - active_alerts
# Alerts to be cleared are those that were previously active but are no longer present
cleared_alerts = active_alerts - alert_names
# Update the active alerts in the state
state [ " active_alerts " ] = alert_names
state [ " active_alerts " ] = list (
alert_names
) # Convert back to list for JSON serialization
# Fetch AlertScript configuration from global_config
alertScript_config = config . get ( " AlertScript " , { } )
@ -854,9 +852,6 @@ def alertScript(alerts):
mappings = [ ]
logger . debug ( " Mappings: %s " , mappings )
# A set to hold alerts that are processed in this run
currently_processed_alerts = set ( )
# Iterate over each mapping
for mapping in mappings :
logger . debug ( " Processing mapping: %s " , mapping )
@ -888,7 +883,7 @@ def alertScript(alerts):
# Execute commands based on the Type of mapping
for alert in matched_alerts :
currently_ processed_alerts. add ( alert )
processed_alerts. add ( alert )
if mapping . get ( " Type " ) == " BASH " :
logger . debug ( ' Mapping type is " BASH " ' )
@ -905,25 +900,21 @@ def alertScript(alerts):
)
subprocess . run ( dtmf_cmd , shell = True )
# Clear alerts that are no longer active
processed_alerts - = cleared_alerts
# Update the state with the alerts processed in this run
state [ " alertscript_alerts " ] = list ( currently_processed_alerts )
state [ " alertscript_alerts " ] = list (
processed_alerts
) # Convert back to list for JSON serialization
save_state ( state )
def sendPushover ( message , title = None , priority = 0 ) :
"""
Send a push notification via the Pushover service .
This function constructs the payload for the request , including the user key , API token , message , title , and priority .
The payload is then sent to the Pushover API endpoint . If the request fails , an error message is logged .
Args :
message ( str ) : The content of the push notification .
title ( str , optional ) : The title of the push notification . Defaults to None .
priority ( int , optional ) : The priority of the push notification . Defaults to 0.
Raises :
requests . exceptions . HTTPError : If an error occurs while sending the notification .
"""
pushover_config = config [ " Pushover " ]
user_key = pushover_config . get ( " UserKey " )
@ -950,12 +941,6 @@ def sendPushover(message, title=None, priority=0):
def convertAudio ( audio ) :
"""
Convert audio file to 8000 Hz mono for compatibility with Asterisk .
Args :
audio ( AudioSegment ) : Audio file to be converted .
Returns :
AudioSegment : Converted audio file .
"""
return audio . set_frame_rate ( 8000 ) . set_channels ( 1 )
@ -970,14 +955,6 @@ def change_and_log_CT_or_ID(
) :
"""
Check whether the CT or ID needs to be changed , performs the change , and logs the process .
Args :
alerts ( list ) : The new alerts that have been fetched .
specified_alerts ( list ) : The alerts that require a change in CT or ID .
auto_change_enabled ( bool ) : Whether auto change is enabled for CT or ID .
alert_type ( str ) : " CT " for Courtesy Tones and " ID " for Identifiers .
pushover_debug ( bool ) : Whether to include debug information in pushover notifications .
pushover_message ( str ) : The current pushover message to which any updates will be added .
"""
if auto_change_enabled :
logger . debug (
@ -988,7 +965,7 @@ def change_and_log_CT_or_ID(
)
# Extract only the alert names from the OrderedDict keys
alert_names = [ alert [ 0 ] for alert in alerts . keys ( ) ]
alert_names = [ alert for alert in alerts . keys ( ) ]
# Check if any alert matches specified_alerts
# Here we replace set intersection with a list comprehension
@ -1021,9 +998,6 @@ def change_and_log_CT_or_ID(
def supermon_back_compat ( alerts ) :
"""
Write alerts to a file for backward compatibility with supermon .
Args :
alerts ( OrderedDict ) : The alerts to write .
"""
# Ensure the target directory exists
@ -1058,9 +1032,23 @@ def main():
alerts = getAlerts ( countyCodes )
# If new alerts differ from old ones, process new alerts
if [ alert [ 0 ] for alert in last_alerts . keys ( ) ] != [
alert [ 0 ] for alert in alerts . keys ( )
] :
added_alerts = [
alert for alert in alerts . keys ( ) if alert not in last_alerts . keys ( )
]
removed_alerts = [
alert for alert in last_alerts . keys ( ) if alert not in alerts . keys ( )
]
if added_alerts :
logger . info ( " Alerts added: %s " , " , " . join ( alert for alert in added_alerts ) )
if removed_alerts :
logger . info (
" Alerts removed: %s " , " , " . join ( alert for alert in removed_alerts )
)
if last_alerts . keys ( ) != alerts . keys ( ) :
new_alerts = [ x for x in alerts . keys ( ) if x not in last_alerts . keys ( ) ]
state [ " last_alerts " ] = alerts
save_state ( state )
@ -1081,7 +1069,7 @@ def main():
pushover_message = (
" Alerts Cleared \n "
if not alerts
else " \n " . join ( alert [ 0 ] for alert in alert s. keys ( ) ) + " \n "
else " \n " . join ( alert s. keys ( ) ) + " \n "
)
# Check if Courtesy Tones (CT) or ID needs to be changed
@ -1102,17 +1090,17 @@ def main():
pushover_message ,
)
if alertscript_enabled :
alertScript ( alerts )
# Check if alerts need to be communicated
if len ( alerts ) == 0 :
logger . info ( " Alerts cleared " )
if say_all_clear_enabled :
sayAllClear ( )
else :
logger . info ( " New alerts: %s " , new_alerts )
if say_alert_enabled :
sayAlert ( alerts )
if alertscript_enabled :
alertScript ( alerts )
# Check if tailmessage needs to be built
enable_tailmessage = config . get ( " Tailmessage " , { } ) . get ( " Enable " , False )