Add more get_alerts comments

pull/48/head
Mason10198 2 years ago
parent 195377284d
commit 56b51019ef

@ -330,9 +330,11 @@ def save_state(state):
def get_alerts(countyCodes):
"""
Retrieve severe weather alerts for specified county codes.
This function retrieves severe weather alerts for specified county codes.
"""
# Mapping for severity for API response and the 'words' severity
# Mapping dictionaries are defined for converting the severity level from the
# API's terminology to a numeric scale and from common alert language to the same numeric scale.
severity_mapping_api = {
"Extreme": 4,
"Severe": 3,
@ -342,12 +344,17 @@ def get_alerts(countyCodes):
}
severity_mapping_words = {"Warning": 4, "Watch": 3, "Advisory": 2, "Statement": 1}
# 'alerts' will store the alert data we retrieve.
# 'seen_alerts' is used to keep track of the alerts we've already processed.
alerts = OrderedDict()
seen_alerts = set() # Store seen alerts
seen_alerts = set()
# We retrieve the current time and log it for debugging purposes.
current_time = datetime.now(timezone.utc)
LOGGER.debug("getAlerts: Current time: %s", current_time)
# Check if injection is enabled
# The script allows for alert injection for testing purposes.
# If injection is enabled in the configuration, we process these injected alerts first.
if config.get("DEV", {}).get("INJECT", False):
LOGGER.debug("getAlerts: DEV Alert Injection Enabled")
injected_alerts = config["DEV"].get("INJECTALERTS", [])
@ -365,11 +372,14 @@ def get_alerts(countyCodes):
end_time_utc.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
)
# We limit the number of alerts to the maximum defined constant.
alerts = OrderedDict(list(alerts.items())[:MAX_ALERTS])
# If injected alerts are used, we return them here and don't proceed with the function.
return alerts
# Determine whether to use 'effective' or 'onset' time
# Configuration specifies whether to use 'effective' or 'onset' time for alerts.
# Depending on the configuration, we set the appropriate keys for start and end time.
timetype_mode = config.get("Alerting", {}).get("TimeType", "onset")
if timetype_mode == "effective":
LOGGER.debug("getAlerts: Using effective time for alerting")
@ -386,20 +396,21 @@ def get_alerts(countyCodes):
time_type_start = "onset"
time_type_end = "ends"
# Loop over each county code and retrieve alerts from the API.
for countyCode in countyCodes:
url = "https://api.weather.gov/alerts/active?zone={}".format(countyCode)
try:
# If we can get a successful response from the API, we process the alerts from the response.
response = requests.get(url)
response.raise_for_status() # will raise an exception if the status code is not 200
response.raise_for_status()
LOGGER.debug(
"getAlerts: Checking for alerts in %s at URL: %s", countyCode, url
)
alert_data = response.json()
for feature in alert_data["features"]:
# Extract start and end times. If end time is missing, use 'expires' time.
start = feature["properties"].get(time_type_start)
end = feature["properties"].get(time_type_end)
# if end is null, use effective time
if not end:
end = feature["properties"].get("expires")
LOGGER.debug(
@ -409,20 +420,26 @@ def get_alerts(countyCodes):
end,
)
if start and end:
# If both start and end times are available, convert them to datetime objects.
start_time = parser.isoparse(start)
end_time = parser.isoparse(end)
# Convert alert times to UTC
# Convert alert times to UTC.
start_time_utc = start_time.astimezone(timezone.utc)
end_time_utc = end_time.astimezone(timezone.utc)
event = feature["properties"]["event"]
# If the current time is within the alert's active period, we process it further.
if start_time_utc <= current_time < end_time_utc:
description = feature["properties"].get("description", "")
severity = feature["properties"].get("severity", None)
# Check if alert has already been seen
# If the alert has already been processed, we skip it.
if event in seen_alerts:
continue
# Initialize a flag to check if the event is globally blocked
# We check if the event is in a list of globally blocked events.
# If it is, we skip it.
is_blocked = False
for global_blocked_event in GLOBAL_BLOCKED_EVENTS:
if fnmatch.fnmatch(event, global_blocked_event):
@ -432,22 +449,25 @@ def get_alerts(countyCodes):
)
is_blocked = True
break
# Skip to the next feature if the event is globally blocked
if is_blocked:
continue
# Determine severity from event name or API's severity value.
if severity is None:
last_word = event.split()[-1]
severity = severity_mapping_words.get(last_word, 0)
else:
severity = severity_mapping_api.get(severity, 0)
# Add the alert to the 'alerts' dictionary and add the event name to 'seen_alerts'.
alerts[(event)] = (
severity,
description,
end_time_utc.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
)
seen_alerts.add(event)
# If the current time is not within the alert's active period, we skip it.
else:
time_difference = time_until(start_time_utc, current_time)
LOGGER.debug(
@ -462,52 +482,21 @@ def get_alerts(countyCodes):
end_time_utc,
)
# If we cannot get a successful response from the API, we load stored alert data instead.
except requests.exceptions.RequestException as e:
LOGGER.error("Failed to retrieve alerts for %s. Reason: %s", countyCode, e)
LOGGER.info("API unreachable. Using stored data instead.")
# Load alerts from data.json
if os.path.isfile(DATA_FILE):
with open(DATA_FILE) as f:
data = json.load(f)
stored_alerts = data.get("last_alerts", [])
alerts = data.get("alerts", {})
# Filter alerts by end_time_utc
current_time_str = datetime.now(timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
)
LOGGER.debug("Current time: %s", current_time_str)
alerts = {}
for event, alert in stored_alerts:
end_time_str = alert[2]
if parser.parse(end_time_str) >= parser.parse(current_time_str):
LOGGER.debug(
"getAlerts: Keeping %s because it does not expire until %s",
event,
end_time_str,
)
alerts[event] = alert
else:
LOGGER.debug(
"getAlerts: Removing %s because it expired at %s",
event,
end_time_str,
)
else:
LOGGER.error("No stored data available.")
alerts = OrderedDict(
sorted(
alerts.items(),
key=lambda item: (
item[1][0], # Severity
severity_mapping_words.get(item[0].split()[-1], 0), # Words severity
),
reverse=True,
)
)
# If the number of alerts exceeds the maximum defined constant, we truncate the alerts.
alerts = OrderedDict(list(alerts.items())[:MAX_ALERTS])
alerts = OrderedDict(list(alerts.items())[:MAX_ALERTS])
# We store the alerts in a file for future use if the API is unreachable.
with open(DATA_FILE, 'w') as f:
json.dump({"alerts": dict(alerts)}, f)
return alerts

Loading…
Cancel
Save

Powered by TurnKey Linux.