diff --git a/SkywarnPlus.py b/SkywarnPlus.py index b2ca2b8..ef8a6e3 100644 --- a/SkywarnPlus.py +++ b/SkywarnPlus.py @@ -330,9 +330,11 @@ def save_state(state): def get_alerts(countyCodes): """ - Retrieve severe weather alerts for specified county codes. + This function retrieves severe weather alerts for specified county codes. """ - # Mapping for severity for API response and the 'words' severity + + # Mapping dictionaries are defined for converting the severity level from the + # API's terminology to a numeric scale and from common alert language to the same numeric scale. severity_mapping_api = { "Extreme": 4, "Severe": 3, @@ -342,12 +344,17 @@ def get_alerts(countyCodes): } severity_mapping_words = {"Warning": 4, "Watch": 3, "Advisory": 2, "Statement": 1} + # 'alerts' will store the alert data we retrieve. + # 'seen_alerts' is used to keep track of the alerts we've already processed. alerts = OrderedDict() - seen_alerts = set() # Store seen alerts + seen_alerts = set() + + # We retrieve the current time and log it for debugging purposes. current_time = datetime.now(timezone.utc) LOGGER.debug("getAlerts: Current time: %s", current_time) - # Check if injection is enabled + # The script allows for alert injection for testing purposes. + # If injection is enabled in the configuration, we process these injected alerts first. if config.get("DEV", {}).get("INJECT", False): LOGGER.debug("getAlerts: DEV Alert Injection Enabled") injected_alerts = config["DEV"].get("INJECTALERTS", []) @@ -365,11 +372,14 @@ def get_alerts(countyCodes): end_time_utc.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), ) + # We limit the number of alerts to the maximum defined constant. alerts = OrderedDict(list(alerts.items())[:MAX_ALERTS]) + # If injected alerts are used, we return them here and don't proceed with the function. return alerts - # Determine whether to use 'effective' or 'onset' time + # Configuration specifies whether to use 'effective' or 'onset' time for alerts. + # Depending on the configuration, we set the appropriate keys for start and end time. timetype_mode = config.get("Alerting", {}).get("TimeType", "onset") if timetype_mode == "effective": LOGGER.debug("getAlerts: Using effective time for alerting") @@ -386,20 +396,21 @@ def get_alerts(countyCodes): time_type_start = "onset" time_type_end = "ends" + # Loop over each county code and retrieve alerts from the API. for countyCode in countyCodes: url = "https://api.weather.gov/alerts/active?zone={}".format(countyCode) try: + # If we can get a successful response from the API, we process the alerts from the response. response = requests.get(url) - response.raise_for_status() # will raise an exception if the status code is not 200 - + response.raise_for_status() LOGGER.debug( "getAlerts: Checking for alerts in %s at URL: %s", countyCode, url ) alert_data = response.json() for feature in alert_data["features"]: + # Extract start and end times. If end time is missing, use 'expires' time. start = feature["properties"].get(time_type_start) end = feature["properties"].get(time_type_end) - # if end is null, use effective time if not end: end = feature["properties"].get("expires") LOGGER.debug( @@ -409,20 +420,26 @@ def get_alerts(countyCodes): end, ) if start and end: + # If both start and end times are available, convert them to datetime objects. start_time = parser.isoparse(start) end_time = parser.isoparse(end) - # Convert alert times to UTC + + # Convert alert times to UTC. start_time_utc = start_time.astimezone(timezone.utc) end_time_utc = end_time.astimezone(timezone.utc) event = feature["properties"]["event"] + + # If the current time is within the alert's active period, we process it further. if start_time_utc <= current_time < end_time_utc: description = feature["properties"].get("description", "") severity = feature["properties"].get("severity", None) - # Check if alert has already been seen + + # If the alert has already been processed, we skip it. if event in seen_alerts: continue - # Initialize a flag to check if the event is globally blocked + # We check if the event is in a list of globally blocked events. + # If it is, we skip it. is_blocked = False for global_blocked_event in GLOBAL_BLOCKED_EVENTS: if fnmatch.fnmatch(event, global_blocked_event): @@ -432,22 +449,25 @@ def get_alerts(countyCodes): ) is_blocked = True break - - # Skip to the next feature if the event is globally blocked if is_blocked: continue + # Determine severity from event name or API's severity value. if severity is None: last_word = event.split()[-1] severity = severity_mapping_words.get(last_word, 0) else: severity = severity_mapping_api.get(severity, 0) + + # Add the alert to the 'alerts' dictionary and add the event name to 'seen_alerts'. alerts[(event)] = ( severity, description, end_time_utc.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), ) seen_alerts.add(event) + + # If the current time is not within the alert's active period, we skip it. else: time_difference = time_until(start_time_utc, current_time) LOGGER.debug( @@ -462,52 +482,21 @@ def get_alerts(countyCodes): end_time_utc, ) + # If we cannot get a successful response from the API, we load stored alert data instead. except requests.exceptions.RequestException as e: LOGGER.error("Failed to retrieve alerts for %s. Reason: %s", countyCode, e) LOGGER.info("API unreachable. Using stored data instead.") - - # Load alerts from data.json if os.path.isfile(DATA_FILE): with open(DATA_FILE) as f: data = json.load(f) - stored_alerts = data.get("last_alerts", []) + alerts = data.get("alerts", {}) - # Filter alerts by end_time_utc - current_time_str = datetime.now(timezone.utc).strftime( - "%Y-%m-%dT%H:%M:%S.%fZ" - ) - LOGGER.debug("Current time: %s", current_time_str) - alerts = {} - for event, alert in stored_alerts: - end_time_str = alert[2] - if parser.parse(end_time_str) >= parser.parse(current_time_str): - LOGGER.debug( - "getAlerts: Keeping %s because it does not expire until %s", - event, - end_time_str, - ) - alerts[event] = alert - else: - LOGGER.debug( - "getAlerts: Removing %s because it expired at %s", - event, - end_time_str, - ) - else: - LOGGER.error("No stored data available.") - - alerts = OrderedDict( - sorted( - alerts.items(), - key=lambda item: ( - item[1][0], # Severity - severity_mapping_words.get(item[0].split()[-1], 0), # Words severity - ), - reverse=True, - ) - ) + # If the number of alerts exceeds the maximum defined constant, we truncate the alerts. + alerts = OrderedDict(list(alerts.items())[:MAX_ALERTS]) - alerts = OrderedDict(list(alerts.items())[:MAX_ALERTS]) + # We store the alerts in a file for future use if the API is unreachable. + with open(DATA_FILE, 'w') as f: + json.dump({"alerts": dict(alerts)}, f) return alerts