@ -1,7 +1,7 @@
#!/usr/bin/python3
"""
SkywarnPlus v0 .2 . 0 by Mason Nelson ( N5LSN / WRKF394 )
SkywarnPlus v0 .2 . 2 by Mason Nelson ( N5LSN / WRKF394 )
== == == == == == == == == == == == == == == == == == == == == == == == ==
SkywarnPlus is a utility that retrieves severe weather alerts from the National
Weather Service and integrates these alerts with an Asterisk / app_rpt based
@ -49,7 +49,9 @@ if not master_enable:
# Define tmp_dir and sounds_path
tmp_dir = config . get ( " DEV " , { } ) . get ( " TmpDir " , " /tmp/SkywarnPlus " )
sounds_path = config . get ( " Alerting " , { } ) . get ( " SoundsPath " , os . path . join ( baseDir , " SOUNDS " ) )
sounds_path = config . get ( " Alerting " , { } ) . get (
" SoundsPath " , os . path . join ( baseDir , " SOUNDS " )
)
# Define countyCodes
countyCodes = config . get ( " Alerting " , { } ) . get ( " CountyCodes " , [ ] )
@ -61,18 +63,14 @@ else:
print ( " Error: tmp_dir is not set. " )
# Define Blocked events
global_blocked_events = (
config . get ( " Blocking " , { } ) . get ( " GlobalBlockedEvents " , [ ] )
)
global_blocked_events = config . get ( " Blocking " , { } ) . get ( " GlobalBlockedEvents " , [ ] )
if global_blocked_events is None :
global_blocked_events = [ ]
sayalert_blocked_events = (
config . get ( " Blocking " , { } ) . get ( " SayAlertBlockedEvents " , [ ] )
)
sayalert_blocked_events = config . get ( " Blocking " , { } ) . get ( " SayAlertBlockedEvents " , [ ] )
if sayalert_blocked_events is None :
sayalert_blocked_events = [ ]
tailmessage_blocked_events = (
config . get ( " Blocking " , { } ) . get ( " TailmessageBlockedEvents " , [ ] )
tailmessage_blocked_events = config . get ( " Blocking " , { } ) . get (
" TailmessageBlockedEvents " , [ ]
)
if tailmessage_blocked_events is None :
tailmessage_blocked_events = [ ]
@ -267,20 +265,22 @@ def load_state():
Load the state from the state file if it exists , else return an initial state .
Returns :
dict : A dictionary containing courtesy tone ( ct ) , identifier ( id ) and alerts .
dict : A dictionary containing data .
"""
if os . path . exists ( data_file ) :
with open ( data_file , " r " ) as file :
state = json . load ( file )
# state["alertscript_alerts"] = set(state["alertscript_alerts"])
# state["last_alerts"] = set(state["last_alerts"])
state [ " alertscript_alerts " ] = state [ " alertscript_alerts " ]
state [ " last_alerts " ] = state [ " last_alerts " ]
state [ " last_sayalert " ] = state . get ( " last_sayalert " , [ ] )
return state
else :
return {
" ct " : None ,
" id " : None ,
" alertscript_alerts " : set ( ) ,
" last_alerts " : set ( ) ,
" alertscript_alerts " : [ ] ,
" last_alerts " : [ ] ,
" last_sayalert " : [ ] ,
}
@ -289,10 +289,11 @@ def save_state(state):
Save the state to the state file .
Args :
state ( dict ) : A dictionary containing courtesy tone ( ct ) , identifier ( id ) and alerts .
state ( dict ) : A dictionary containing data .
"""
state [ " alertscript_alerts " ] = list ( state [ " alertscript_alerts " ] )
state [ " last_alerts " ] = list ( state [ " last_alerts " ] )
state [ " last_sayalert " ] = list ( state [ " last_sayalert " ] )
with open ( data_file , " w " ) as file :
json . dump ( state , file )
@ -321,9 +322,7 @@ def getAlerts(countyCodes):
# Inject alerts if DEV INJECT is enabled in the config
if config . get ( " DEV " , { } ) . get ( " INJECT " , False ) :
logger . debug ( " getAlerts: DEV Alert Injection Enabled " )
alerts = [
alert . strip ( ) for alert in config [ " DEV " ] . get ( " INJECTALERTS " , [ ] )
]
alerts = [ alert . strip ( ) for alert in config [ " DEV " ] . get ( " INJECTALERTS " , [ ] ) ]
logger . debug ( " getAlerts: Injecting alerts: %s " , alerts )
return alerts
@ -397,6 +396,28 @@ def sayAlert(alerts):
alerts ( list ) : List of active weather alerts .
"""
# Define the path of the alert file
state = load_state ( )
filtered_alerts = [ ]
for alert in alerts :
if any (
fnmatch . fnmatch ( alert , blocked_event )
for blocked_event in sayalert_blocked_events
) :
logger . debug ( " sayAlert: blocking %s as per configuration " , alert )
continue
filtered_alerts . append ( alert )
# Check if the filtered alerts are the same as the last run
if filtered_alerts == state [ " last_sayalert " ] :
logger . debug (
" sayAlert: The filtered alerts are the same as the last run. Not broadcasting. "
)
return
state [ " last_sayalert " ] = filtered_alerts
save_state ( state )
alert_file = " {} /alert.wav " . format ( sounds_path )
combined_sound = AudioSegment . from_wav (
@ -407,15 +428,7 @@ def sayAlert(alerts):
)
alert_count = 0
for alert in alerts :
if any (
fnmatch . fnmatch ( alert , blocked_event )
for blocked_event in sayalert_blocked_events
) :
logger . debug ( " sayAlert: blocking %s as per configuration " , alert )
continue
for alert in filtered_alerts :
try :
index = WS . index ( alert )
audio_file = AudioSegment . from_wav (
@ -437,32 +450,38 @@ def sayAlert(alerts):
if alert_count == 0 :
logger . debug ( " sayAlert: All alerts were blocked, not broadcasting any alerts. " )
else :
logger . debug ( " sayAlert: Exporting alert sound to %s " , alert_file )
converted_combined_sound = convertAudio ( combined_sound )
converted_combined_sound . export ( alert_file , format = " wav " )
return
logger . debug ( " sayAlert: Replacing tailmessage with silence " )
silence = AudioSegment . silent ( duration = 100 )
converted_silence = convertAudio ( silence )
converted_silence . export ( tailmessage_file , format = " wav " )
logger . debug ( " sayAlert: Exporting alert sound to %s " , alert_file )
converted_combined_sound = convertAudio ( combined_sound )
converted_combined_sound . export ( alert_file , format = " wav " )
node_numbers = config . get ( " Asterisk " , { } ) . get ( " Nodes " , [ ] )
for node_number in node_numbers :
logger . info ( " Broadcasting alert on node %s " , node_number )
command = ' /usr/sbin/asterisk -rx " rpt localplay {} {} " ' . format (
node_number , os . path . splitext ( os . path . abspath ( alert_file ) ) [ 0 ]
)
subprocess . run ( command , shell = True )
logger . debug ( " sayAlert: Replacing tailmessage with silence " )
silence = AudioSegment . silent ( duration = 100 )
converted_silence = convertAudio ( silence )
converted_silence . export ( tailmessage_file , format = " wav " )
logger . info ( " Waiting 30 seconds for Asterisk to make announcement... " )
time . sleep ( 30 )
node_numbers = config . get ( " Asterisk " , { } ) . get ( " Nodes " , [ ] )
for node_number in node_numbers :
logger . info ( " Broadcasting alert on node %s " , node_number )
command = ' /usr/sbin/asterisk -rx " rpt localplay {} {} " ' . format (
node_number , os . path . splitext ( os . path . abspath ( alert_file ) ) [ 0 ]
)
subprocess . run ( command , shell = True )
logger . info ( " Waiting 30 seconds for Asterisk to make announcement... " )
time . sleep ( 30 )
def sayAllClear ( ) :
"""
Generate and broadcast ' all clear ' message on Asterisk .
"""
# Empty the last_sayalert list so that the next run will broadcast alerts
state = load_state ( )
state [ " last_sayalert " ] = [ ]
save_state ( state )
alert_clear = os . path . join ( sounds_path , " ALERTS " , " SWP96.wav " )
node_numbers = config . get ( " Asterisk " , { } ) . get ( " Nodes " , [ ] )
@ -553,7 +572,9 @@ def changeCT(ct):
"""
state = load_state ( )
current_ct = state [ " ct " ]
tone_dir = config [ " CourtesyTones " ] . get ( " ToneDir " , os . path . join ( sounds_path , " TONES " ) )
tone_dir = config [ " CourtesyTones " ] . get (
" ToneDir " , os . path . join ( sounds_path , " TONES " )
)
ct1 = config [ " CourtesyTones " ] [ " Tones " ] [ " CT1 " ]
ct2 = config [ " CourtesyTones " ] [ " Tones " ] [ " CT2 " ]
wx_ct = config [ " CourtesyTones " ] [ " Tones " ] [ " WXCT " ]
@ -809,17 +830,20 @@ def change_and_log_CT_or_ID(
alert_type ,
specified_alerts ,
)
# Check if any alert matches specified_alerts
if set ( alerts ) . intersection ( specified_alerts ) :
for alert in alerts :
if alert in specified_alerts :
logger . debug ( " Alert %s requires a %s change " , alert , alert_type )
if (
changeCT ( " WX " ) if alert_type == " CT " else changeID ( " WX " )
) : # If the CT/ID was actually changed
if pushover_debug :
pushover_message + = " Changed {} to WX \n " . format ( alert_type )
break
# Here we replace set intersection with a list comprehension
intersecting_alerts = [ alert for alert in alerts if alert in specified_alerts ]
if intersecting_alerts :
for alert in intersecting_alerts :
logger . debug ( " Alert %s requires a %s change " , alert , alert_type )
if (
changeCT ( " WX " ) if alert_type == " CT " else changeID ( " WX " )
) : # If the CT/ID was actually changed
if pushover_debug :
pushover_message + = " Changed {} to WX \n " . format ( alert_type )
break
else : # No alerts require a CT/ID change, revert back to normal
logger . debug (
" No alerts require a %s change, reverting to normal. " , alert_type
@ -862,6 +886,8 @@ def main():
alerts = getAlerts ( countyCodes )
# If new alerts differ from old ones, process new alerts
logger . debug ( " Last alerts: %s " , last_alerts )
logger . debug ( " New alerts: %s " , alerts )
if last_alerts != alerts :
state [ " last_alerts " ] = alerts
save_state ( state )