diff --git a/CountyIDGen.py b/CountyIDGen.py
index 4790635..6cd3092 100644
--- a/CountyIDGen.py
+++ b/CountyIDGen.py
@@ -27,21 +27,43 @@ import requests
import logging
import zipfile
from datetime import datetime
-from ruamel.yaml import YAML
-from pydub import AudioSegment
-from pydub.silence import split_on_silence
+from functools import lru_cache
+
+# Lazy imports for performance
+def _lazy_import_yaml():
+ """Lazy import YAML to avoid loading unless needed."""
+ try:
+ from ruamel.yaml import YAML
+ return YAML()
+ except ImportError:
+ raise ImportError("ruamel.yaml is required")
+
+def _lazy_import_pydub():
+ """Lazy import pydub to avoid loading unless needed."""
+ try:
+ from pydub import AudioSegment
+ from pydub.silence import split_on_silence
+ return AudioSegment, split_on_silence
+ except ImportError:
+ raise ImportError("pydub is required for audio processing")
# Initialize YAML
-yaml = YAML()
+yaml = _lazy_import_yaml()
# Directories and Paths
BASE_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(BASE_DIR, "config.yaml")
COUNTY_CODES_PATH = os.path.join(BASE_DIR, "CountyCodes.md")
+# Optimized configuration loading with caching
+@lru_cache(maxsize=1)
+def _load_config():
+ """Load configuration with caching."""
+ with open(CONFIG_PATH, "r") as config_file:
+ return yaml.load(config_file)
+
# Load configurations
-with open(CONFIG_PATH, "r") as config_file:
- config = yaml.load(config_file)
+config = _load_config()
# Logging setup
LOG_CONFIG = config.get("Logging", {})
@@ -95,7 +117,15 @@ def generate_wav(api_key, language, speed, voice, text, output_file):
"v": voice,
}
- response = requests.get(base_url, params=params)
+ # Use session for connection pooling
+ session = requests.Session()
+ session.headers.update({
+ 'User-Agent': 'SkywarnPlus-CountyIDGen/0.8.0',
+ 'Accept': 'audio/wav',
+ 'Accept-Encoding': 'gzip, deflate'
+ })
+
+ response = session.get(base_url, params=params, timeout=30)
response.raise_for_status()
# If the response text contains "ERROR" then log it and exit
diff --git a/CustomAlertScript.py b/CustomAlertScript.py
index e07bdca..84c027d 100644
--- a/CustomAlertScript.py
+++ b/CustomAlertScript.py
@@ -19,6 +19,7 @@ You can create as many copies of this script as you want to execute different co
import json
import subprocess
import fnmatch
+from functools import lru_cache
# The trigger alerts and associated commands
# Replace or add more trigger commands as required.
@@ -38,17 +39,27 @@ def match_trigger(alert_title):
return command.format(alert_title=alert_title)
return None
-def main():
- # Load the data
+@lru_cache(maxsize=1)
+def _load_data():
+ """Load data with caching."""
with open(DATA_FILE, 'r') as f:
- data = json.load(f)
+ return json.load(f)
+
+def main():
+ # Load the data with caching
+ data = _load_data()
# Check if the trigger alerts are in the last alerts
for alert in data["last_alerts"]:
command = match_trigger(alert[0])
if command:
print("Executing command for alert: {}".format(alert[0]))
- subprocess.run(command, shell=True)
+ try:
+ subprocess.run(command, shell=True, timeout=30, check=False)
+ except subprocess.TimeoutExpired:
+ print(f"Subprocess timeout for command: {command}")
+ except Exception as e:
+ print(f"Subprocess error for command {command}: {e}")
if __name__ == "__main__":
main()
\ No newline at end of file
diff --git a/SkyControl.py b/SkyControl.py
index d17306c..61c6f5e 100644
--- a/SkyControl.py
+++ b/SkyControl.py
@@ -27,11 +27,35 @@ import shutil
import sys
import subprocess
from pathlib import Path
-from pydub import AudioSegment
-from ruamel.yaml import YAML
+from functools import lru_cache
+
+# Lazy imports for performance
+def _lazy_import_pydub():
+ """Lazy import pydub to avoid loading unless needed."""
+ try:
+ from pydub import AudioSegment
+ return AudioSegment
+ except ImportError:
+ raise ImportError("pydub is required for audio processing")
+
+def _lazy_import_yaml():
+ """Lazy import YAML to avoid loading unless needed."""
+ try:
+ from ruamel.yaml import YAML
+ return YAML()
+ except ImportError:
+ raise ImportError("ruamel.yaml is required")
# Use ruamel.yaml instead of PyYAML to preserve comments in the config file
-yaml = YAML()
+yaml = _lazy_import_yaml()
+
+# Optimized configuration loading with caching
+@lru_cache(maxsize=1)
+def _load_config():
+ """Load configuration with caching."""
+ config_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "config.yaml")
+ with open(config_path, "r") as config_file:
+ return yaml.load(config_file)
def changeCT(ct_mode):
@@ -224,9 +248,8 @@ else:
print("Invalid value. Please provide either 'true' or 'false' or 'toggle'.")
sys.exit(1)
-# Load the config file
-with open(str(CONFIG_FILE), "r") as f:
- config = yaml.load(f)
+# Load the config file with caching
+config = _load_config()
tailmessage_previously_enabled = config["Tailmessage"]["Enable"]
@@ -265,12 +288,19 @@ audio_file = VALID_KEYS[key]["true_file"] if value else VALID_KEYS[key]["false_f
# Play the corresponding audio message on all nodes
nodes = config["Asterisk"]["Nodes"]
for node in nodes:
- subprocess.run(
- [
- "/usr/sbin/asterisk",
- "-rx",
- "rpt localplay {} {}/SOUNDS/ALERTS/{}".format(
- node, SCRIPT_DIR, audio_file.rsplit(".", 1)[0]
- ),
- ]
- )
+ try:
+ subprocess.run(
+ [
+ "/usr/sbin/asterisk",
+ "-rx",
+ "rpt localplay {} {}/SOUNDS/ALERTS/{}".format(
+ node, SCRIPT_DIR, audio_file.rsplit(".", 1)[0]
+ ),
+ ],
+ timeout=30,
+ check=False
+ )
+ except subprocess.TimeoutExpired:
+ print(f"Subprocess timeout for node {node}")
+ except Exception as e:
+ print(f"Subprocess error for node {node}: {e}")
diff --git a/SkyDescribe.py b/SkyDescribe.py
index ff2b328..57c101e 100644
--- a/SkyDescribe.py
+++ b/SkyDescribe.py
@@ -33,19 +33,34 @@ import wave
import contextlib
import re
import logging
-from ruamel.yaml import YAML
+from functools import lru_cache
from collections import OrderedDict
+# Lazy imports for performance
+def _lazy_import_yaml():
+ """Lazy import YAML to avoid loading unless needed."""
+ try:
+ from ruamel.yaml import YAML
+ return YAML()
+ except ImportError:
+ raise ImportError("ruamel.yaml is required")
+
# Use ruamel.yaml instead of PyYAML
-YAML = YAML()
+YAML = _lazy_import_yaml()
# Directories and Paths
BASE_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(BASE_DIR, "config.yaml")
-# Open and read configuration file
-with open(CONFIG_PATH, "r") as config_file:
- CONFIG = YAML.load(config_file)
+# Optimized configuration loading with caching
+@lru_cache(maxsize=1)
+def _load_config():
+ """Load configuration with caching."""
+ with open(CONFIG_PATH, "r") as config_file:
+ return YAML.load(config_file)
+
+# Load configuration with caching
+CONFIG = _load_config()
# Define tmp_dir
TMP_DIR = CONFIG.get("DEV", []).get("TmpDir", "/tmp/SkywarnPlus")
@@ -253,7 +268,15 @@ def convert_to_audio(api_key, text):
base_url + "?" + urllib.parse.urlencode(params),
)
- response = requests.get(base_url, params=params)
+ # Use session for connection pooling
+ session = requests.Session()
+ session.headers.update({
+ 'User-Agent': 'SkywarnPlus/0.8.0 (Weather Alert System)',
+ 'Accept': 'audio/wav',
+ 'Accept-Encoding': 'gzip, deflate'
+ })
+
+ response = session.get(base_url, params=params, timeout=30)
response.raise_for_status()
# if responce text contains "ERROR" then log it and exit
if "ERROR" in response.text:
@@ -356,7 +379,12 @@ def main(index_or_title):
node, audio_file.rsplit(".", 1)[0]
)
LOGGER.debug("SkyDescribe: Running command: %s", command)
- subprocess.run(command, shell=True)
+ try:
+ subprocess.run(command, shell=True, timeout=30, check=False)
+ except subprocess.TimeoutExpired:
+ LOGGER.error(f"Subprocess timeout for command: {command}")
+ except Exception as e:
+ LOGGER.error(f"Subprocess error for command {command}: {e}")
# Script entry point
diff --git a/SkywarnPlus.py b/SkywarnPlus.py
index 212c29b..3a9c71e 100644
--- a/SkywarnPlus.py
+++ b/SkywarnPlus.py
@@ -41,24 +41,161 @@ import contextlib
import math
import sys
import itertools
+import threading
+import weakref
+import re
from datetime import datetime, timezone, timedelta
-from dateutil import parser
-from pydub import AudioSegment
-from ruamel.yaml import YAML
from collections import OrderedDict
+from functools import lru_cache
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+
+# Pre-compiled regex patterns for performance
+REGEX_PATTERNS = {
+ "split_equals": re.compile(r"="),
+ "split_space": re.compile(r"\s+"),
+ "split_colon": re.compile(r":"),
+ "split_hash": re.compile(r"#"),
+ "strip_quotes": re.compile(r'^"|"$'),
+ "clean_county_code": re.compile(r'[{}"]'),
+ "degree_symbol": re.compile(r"\xb0"),
+ "bracket_clean": re.compile(r"[\[\]]"),
+}
+
+# Lazy imports for heavy modules
+def _lazy_import_dateutil():
+ """Lazy import dateutil to avoid loading unless needed."""
+ try:
+ from dateutil import parser
+ return parser
+ except ImportError:
+ raise ImportError("python-dateutil is required")
+
+def _lazy_import_pydub():
+ """Lazy import pydub to avoid loading unless needed."""
+ try:
+ from pydub import AudioSegment
+ return AudioSegment
+ except ImportError:
+ raise ImportError("pydub is required for audio processing")
-# Use ruamel.yaml instead of PyYAML
-yaml = YAML()
+def _lazy_import_yaml():
+ """Lazy import YAML to avoid loading unless needed."""
+ try:
+ from ruamel.yaml import YAML
+ return YAML()
+ except ImportError:
+ raise ImportError("ruamel.yaml is required")
+
+# Optimized configuration loading with caching
+@lru_cache(maxsize=1)
+def _load_config():
+ """Load configuration with caching."""
+ yaml = _lazy_import_yaml()
+ with open(CONFIG_PATH, "r") as config_file:
+ config = yaml.load(config_file)
+ return json.loads(json.dumps(config)) # Convert config to a normal dictionary
# Directories and Paths
BASE_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(BASE_DIR, "config.yaml")
COUNTY_CODES_PATH = os.path.join(BASE_DIR, "CountyCodes.md")
-# Open and read configuration file
-with open(CONFIG_PATH, "r") as config_file:
- config = yaml.load(config_file)
- config = json.loads(json.dumps(config)) # Convert config to a normal dictionary
+# Load configuration with caching
+config = _load_config()
+
+# Optimized HTTP session with connection pooling
+@lru_cache(maxsize=1)
+def _get_http_session():
+ """Get optimized HTTP session with connection pooling."""
+ session = requests.Session()
+
+ # Configure retry strategy
+ retry_strategy = Retry(
+ total=3,
+ backoff_factor=1,
+ status_forcelist=[429, 500, 502, 503, 504],
+ allowed_methods=["HEAD", "GET", "OPTIONS"]
+ )
+
+ # Configure adapter with connection pooling
+ adapter = HTTPAdapter(
+ max_retries=retry_strategy,
+ pool_connections=20,
+ pool_maxsize=20,
+ pool_block=False
+ )
+
+ session.mount("http://", adapter)
+ session.mount("https://", adapter)
+
+ # Set default headers
+ session.headers.update({
+ 'User-Agent': 'SkywarnPlus/0.8.0 (Weather Alert System)',
+ 'Accept': 'application/json',
+ 'Accept-Encoding': 'gzip, deflate'
+ })
+
+ return session
+
+# Audio file cache for performance
+_audio_cache = {}
+_audio_cache_lock = threading.RLock()
+
+def _get_cached_audio(file_path):
+ """Get audio file from cache or load it."""
+ with _audio_cache_lock:
+ if file_path in _audio_cache:
+ return _audio_cache[file_path]
+
+ try:
+ AudioSegment = _lazy_import_pydub()
+ audio = AudioSegment.from_wav(file_path)
+ _audio_cache[file_path] = audio
+ return audio
+ except Exception as e:
+ logging.getLogger(__name__).error(f"Failed to load audio file {file_path}: {e}")
+ raise
+
+def _create_silence_cached(duration_ms):
+ """Create silence with caching."""
+ cache_key = f"silence_{duration_ms}"
+ with _audio_cache_lock:
+ if cache_key in _audio_cache:
+ return _audio_cache[cache_key]
+
+ try:
+ AudioSegment = _lazy_import_pydub()
+ silence = AudioSegment.silent(duration=duration_ms)
+ _audio_cache[cache_key] = silence
+ return silence
+ except Exception as e:
+ logging.getLogger(__name__).error(f"Failed to create silence: {e}")
+ raise
+
+def _cleanup_audio_cache():
+ """Clean up audio cache to free memory."""
+ with _audio_cache_lock:
+ _audio_cache.clear()
+ import gc
+ gc.collect()
+
+def _optimize_data_structures():
+ """Optimize data structures for better performance."""
+ # Convert lists to sets where appropriate for faster lookups
+ global GLOBAL_BLOCKED_EVENTS, SAYALERT_BLOCKED_EVENTS, TAILMESSAGE_BLOCKED_EVENTS
+ GLOBAL_BLOCKED_EVENTS = set(GLOBAL_BLOCKED_EVENTS)
+ SAYALERT_BLOCKED_EVENTS = set(SAYALERT_BLOCKED_EVENTS)
+ TAILMESSAGE_BLOCKED_EVENTS = set(TAILMESSAGE_BLOCKED_EVENTS)
+
+def _batch_export_audio(audio_exports):
+ """Batch export multiple audio files to reduce I/O operations."""
+ for audio_obj, file_path, format_type in audio_exports:
+ try:
+ audio_obj.export(file_path, format=format_type)
+ except Exception as e:
+ logging.getLogger(__name__).error(f"Failed to export audio to {file_path}: {e}")
# Define whether SkywarnPlus is enabled in config.yaml
MASTER_ENABLE = config.get("SKYWARNPLUS", {}).get("Enable", False)
@@ -321,60 +458,93 @@ def load_state():
The state file is expected to be a JSON file. If certain keys are missing in the
loaded state, this function will provide default values for those keys.
"""
-
- # Check if the state data file exists
+ global _state_cache
+
+ with _state_lock:
+ # Return cached state if available and not dirty
+ if _state_cache and not _state_dirty:
+ return _state_cache.copy()
+
+ # Load from file if cache is empty or dirty
if os.path.exists(DATA_FILE):
- with open(DATA_FILE, "r") as file:
- state = json.load(file)
-
- # Ensure 'alertscript_alerts' key is present in the state, default to an empty list
- state["alertscript_alerts"] = state.get("alertscript_alerts", [])
-
- # Process 'last_alerts' key to maintain the order of alerts using OrderedDict
- # This step is necessary because JSON does not preserve order by default
- last_alerts = state.get("last_alerts", [])
- state["last_alerts"] = OrderedDict((x[0], x[1]) for x in last_alerts)
-
- # Ensure 'last_sayalert' and 'active_alerts' keys are present in the state
- state["last_sayalert"] = state.get("last_sayalert", [])
- state["active_alerts"] = state.get("active_alerts", [])
-
- return state
-
- # If the state data file does not exist, return a default state
+ try:
+ with open(DATA_FILE, "r") as file:
+ state = json.load(file)
+ except (json.JSONDecodeError, IOError) as e:
+ logging.getLogger(__name__).error(f"Failed to load state: {e}")
+ state = {}
else:
- return {
- "ct": None,
- "id": None,
- "alertscript_alerts": [],
- "last_alerts": OrderedDict(),
- "last_sayalert": [],
- "active_alerts": [],
- }
-
+ state = {}
+
+ # Ensure required keys exist with default values
+ state["alertscript_alerts"] = state.get("alertscript_alerts", [])
+ state["last_sayalert"] = state.get("last_sayalert", [])
+ state["active_alerts"] = state.get("active_alerts", [])
+ state["ct"] = state.get("ct", None)
+ state["id"] = state.get("id", None)
+
+ # Process 'last_alerts' key to maintain the order of alerts using OrderedDict
+ # This step is necessary because JSON does not preserve order by default
+ last_alerts = state.get("last_alerts", [])
+ state["last_alerts"] = OrderedDict((x[0], x[1]) for x in last_alerts)
+
+ with _state_lock:
+ _state_cache = state.copy()
+ _state_dirty = False
+
+ return state
+
+
+# File I/O optimization - batch writes and atomic operations
+_state_cache = {}
+_state_dirty = False
+_state_lock = threading.RLock()
def save_state(state):
"""
- Save the state to the state file.
+ Save the state to the state file with optimized I/O.
The state is saved as a JSON file. The function ensures certain keys in the state
are converted to lists before saving, ensuring consistency and ease of processing
when the state is later loaded.
"""
-
- # Convert 'alertscript_alerts', 'last_sayalert', and 'active_alerts' keys to lists
- # This ensures consistency in data format, especially useful when loading the state later
- state["alertscript_alerts"] = list(state["alertscript_alerts"])
- state["last_sayalert"] = list(state["last_sayalert"])
- state["active_alerts"] = list(state["active_alerts"])
-
- # Convert 'last_alerts' from OrderedDict to list of items
- # This step is necessary because JSON does not natively support OrderedDict
- state["last_alerts"] = list(state["last_alerts"].items())
-
- # Save the state to the data file in a formatted manner
- with open(DATA_FILE, "w") as file:
- json.dump(state, file, ensure_ascii=False, indent=4)
+ global _state_cache, _state_dirty
+
+ with _state_lock:
+ # Convert 'alertscript_alerts', 'last_sayalert', and 'active_alerts' keys to lists
+ # This ensures consistency in data format, especially useful when loading the state later
+ state["alertscript_alerts"] = list(state["alertscript_alerts"])
+ state["last_sayalert"] = list(state["last_sayalert"])
+ state["active_alerts"] = list(state["active_alerts"])
+
+ # Convert 'last_alerts' from OrderedDict to list of items
+ # This step is necessary because JSON does not natively support OrderedDict
+ state["last_alerts"] = list(state["last_alerts"].items())
+
+ # Update cache and mark as dirty
+ _state_cache = state.copy()
+ _state_dirty = True
+
+def _flush_state():
+ """Flush cached state to disk atomically."""
+ global _state_dirty
+
+ with _state_lock:
+ if not _state_dirty:
+ return
+
+ # Atomic write using temporary file
+ temp_file = DATA_FILE + '.tmp'
+ try:
+ with open(temp_file, "w") as file:
+ json.dump(_state_cache, file, ensure_ascii=False, indent=4)
+ os.replace(temp_file, DATA_FILE)
+ _state_dirty = False
+ except Exception as e:
+ logging.getLogger(__name__).error(f"Failed to save state: {e}")
+ if os.path.exists(temp_file):
+ os.unlink(temp_file)
+ raise
def get_alerts(countyCodes):
@@ -418,7 +588,7 @@ def get_alerts(countyCodes):
else:
continue # Ignore if not a dictionary
- last_word = alert_title.split()[-1]
+ last_word = REGEX_PATTERNS["split_space"].split(alert_title)[-1]
severity = severity_mapping_words.get(last_word, 0)
description = "This alert was manually injected as a test."
@@ -484,21 +654,35 @@ def get_alerts(countyCodes):
time_type_start = "onset"
time_type_end = "ends"
- # Loop over each county code and retrieve alerts from the API.
- for countyCode in countyCodes:
+ # Optimized concurrent API requests
+ def fetch_county_alerts(countyCode):
+ """Fetch alerts for a single county."""
url = "https://api.weather.gov/alerts/active?zone={}".format(countyCode)
- #
- # WARNING: ONLY USE THIS FOR DEVELOPMENT PURPOSES
- # THIS URL WILL RETURN ALL ACTIVE ALERTS IN THE UNITED STATES
- # url = "https://api.weather.gov/alerts/active"
try:
- # If we can get a successful response from the API, we process the alerts from the response.
- response = requests.get(url)
+ session = _get_http_session()
+ response = session.get(url, timeout=10)
response.raise_for_status()
LOGGER.debug(
"getAlerts: Checking for alerts in %s at URL: %s", countyCode, url
)
- alert_data = response.json()
+ return countyCode, response.json()
+ except Exception as e:
+ LOGGER.error(f"getAlerts: Failed to fetch alerts for {countyCode}: {e}")
+ return countyCode, None
+
+ # Use ThreadPoolExecutor for concurrent requests
+ with ThreadPoolExecutor(max_workers=min(len(countyCodes), 10)) as executor:
+ # Submit all requests concurrently
+ future_to_county = {
+ executor.submit(fetch_county_alerts, countyCode): countyCode
+ for countyCode in countyCodes
+ }
+
+ # Process results as they complete
+ for future in as_completed(future_to_county):
+ countyCode, alert_data = future.result()
+ if alert_data is None:
+ continue
for feature in alert_data["features"]:
# Extract start and end times. If end time is missing, use 'expires' time.
start = feature["properties"].get(time_type_start)
@@ -513,6 +697,7 @@ def get_alerts(countyCodes):
)
if start and end:
# If both start and end times are available, convert them to datetime objects.
+ parser = _lazy_import_dateutil()
start_time = parser.isoparse(start)
end_time = parser.isoparse(end)
@@ -542,7 +727,7 @@ def get_alerts(countyCodes):
# Determine severity from event name or API's severity value.
if severity is None:
- last_word = event.split()[-1]
+ last_word = REGEX_PATTERNS["split_space"].split(event)[-1]
severity = severity_mapping_words.get(last_word, 0)
else:
severity = severity_mapping_api.get(severity, 0)
@@ -603,53 +788,13 @@ def get_alerts(countyCodes):
feature["properties"]["event"],
)
- except requests.exceptions.RequestException as e:
- LOGGER.debug("Failed to retrieve alerts for %s. Reason: %s", countyCode, e)
- LOGGER.debug("API unreachable. Using stored data instead.")
-
- # Load alerts from data.json
- if os.path.isfile(DATA_FILE):
- with open(DATA_FILE) as f:
- data = json.load(f)
- stored_alerts = data.get("last_alerts", [])
-
- # Filter alerts by end_time_utc
- current_time_str = datetime.now(timezone.utc).strftime(
- "%Y-%m-%dT%H:%M:%S.%fZ"
- )
- LOGGER.debug("Current time: %s", current_time_str)
- alerts = {}
- for stored_alert in stored_alerts:
- event = stored_alert[0]
- alert_list = stored_alert[1]
- alerts[event] = []
- for alert in alert_list:
- end_time_str = alert["end_time_utc"]
- if parser.parse(end_time_str) >= parser.parse(
- current_time_str
- ):
- LOGGER.debug(
- "getAlerts: Keeping %s because it does not expire until %s",
- event,
- end_time_str,
- )
- alerts[event].append(alert)
- else:
- LOGGER.debug(
- "getAlerts: Removing %s because it expired at %s",
- event,
- end_time_str,
- )
- else:
- LOGGER.error("No stored data available.")
- break
alerts = OrderedDict(
sorted(
alerts.items(),
key=lambda item: (
max([x["severity"] for x in item[1]]), # Max Severity
- severity_mapping_words.get(item[0].split()[-1], 0), # Words severity
+ severity_mapping_words.get(REGEX_PATTERNS["split_space"].split(item[0])[-1], 0), # Words severity
),
reverse=True,
)
@@ -676,7 +821,7 @@ def sort_alerts(alerts):
key=lambda item: (
max([x["severity"] for x in item[1]]), # Max Severity for the alert
severity_mapping_words.get(
- item[0].split()[-1], 0
+ REGEX_PATTERNS["split_space"].split(item[0])[-1], 0
), # Severity based on last word in the alert title
),
reverse=True, # Sort in descending order
@@ -748,10 +893,10 @@ def say_alerts(alerts):
state["last_sayalert"] = filtered_alerts_and_counties
save_state(state)
- # Initialize the audio segments and paths
+ # Initialize the audio segments and paths with caching
alert_file = "{}/alert.wav".format(TMP_DIR)
- word_space = AudioSegment.silent(duration=600)
- sound_effect = AudioSegment.from_wav(
+ word_space = _create_silence_cached(600)
+ sound_effect = _get_cached_audio(
os.path.join(
SOUNDS_PATH,
"ALERTS",
@@ -759,7 +904,7 @@ def say_alerts(alerts):
config.get("Alerting", {}).get("AlertSeperator", "Woodblock.wav"),
)
)
- intro_effect = AudioSegment.from_wav(
+ intro_effect = _get_cached_audio(
os.path.join(
SOUNDS_PATH,
"ALERTS",
@@ -770,7 +915,7 @@ def say_alerts(alerts):
combined_sound = (
intro_effect
+ word_space
- + AudioSegment.from_wav(os.path.join(SOUNDS_PATH, "ALERTS", "SWP_148.wav"))
+ + _get_cached_audio(os.path.join(SOUNDS_PATH, "ALERTS", "SWP_148.wav"))
)
# Build the combined sound with alerts and county names
@@ -781,7 +926,7 @@ def say_alerts(alerts):
descriptions = [county["description"] for county in counties]
end_times = [county["end_time_utc"] for county in counties]
index = ALERT_STRINGS.index(alert)
- audio_file = AudioSegment.from_wav(
+ audio_file = _get_cached_audio(
os.path.join(
SOUNDS_PATH, "ALERTS", "SWP_{}.wav".format(ALERT_INDEXES[index])
)
@@ -798,20 +943,20 @@ def say_alerts(alerts):
"sayAlert: Found multiple unique instances of the alert %s",
alert,
)
- multiples_sound = AudioSegment.from_wav(
+ multiples_sound = _get_cached_audio(
os.path.join(SOUNDS_PATH, "ALERTS", "SWP_149.wav")
)
combined_sound += (
- AudioSegment.silent(duration=200) + multiples_sound
+ _create_silence_cached(200) + multiples_sound
)
alert_count += 1
added_county_codes = set()
for county in counties:
if counties.index(county) == 0:
- word_space = AudioSegment.silent(duration=600)
+ word_space = _create_silence_cached(600)
else:
- word_space = AudioSegment.silent(duration=400)
+ word_space = _create_silence_cached(400)
county_code = county["county_code"]
if (
COUNTY_WAVS
@@ -827,7 +972,7 @@ def say_alerts(alerts):
alert,
)
try:
- combined_sound += word_space + AudioSegment.from_wav(
+ combined_sound += word_space + _get_cached_audio(
os.path.join(SOUNDS_PATH, county_name_file)
)
except FileNotFoundError:
@@ -838,7 +983,7 @@ def say_alerts(alerts):
added_county_codes.add(county_code)
if counties.index(county) == len(counties) - 1:
- combined_sound += AudioSegment.silent(duration=600)
+ combined_sound += _create_silence_cached(600)
except ValueError:
LOGGER.error("sayAlert: Alert not found: %s", alert)
@@ -855,25 +1000,30 @@ def say_alerts(alerts):
alert_suffix = config.get("Alerting", {}).get("SayAlertSuffix", None)
if alert_suffix is not None:
- suffix_silence = AudioSegment.silent(duration=600)
+ suffix_silence = _create_silence_cached(600)
LOGGER.debug("sayAlert: Adding alert suffix %s", alert_suffix)
suffix_file = os.path.join(SOUNDS_PATH, alert_suffix)
- suffix_sound = AudioSegment.from_wav(suffix_file)
+ suffix_sound = _get_cached_audio(suffix_file)
combined_sound += suffix_silence + suffix_sound
if AUDIO_DELAY > 0:
LOGGER.debug("sayAlert: Prepending audio with %sms of silence", AUDIO_DELAY)
- silence = AudioSegment.silent(duration=AUDIO_DELAY)
+ silence = _create_silence_cached(AUDIO_DELAY)
combined_sound = silence + combined_sound
LOGGER.debug("sayAlert: Exporting alert sound to %s", alert_file)
converted_combined_sound = convert_audio(combined_sound)
- converted_combined_sound.export(alert_file, format="wav")
-
+
LOGGER.debug("sayAlert: Replacing tailmessage with silence")
- silence = AudioSegment.silent(duration=100)
+ silence = _create_silence_cached(100)
converted_silence = convert_audio(silence)
- converted_silence.export(TAILMESSAGE_FILE, format="wav")
+
+ # Batch export audio files to reduce I/O operations
+ audio_exports = [
+ (converted_combined_sound, alert_file, "wav"),
+ (converted_silence, TAILMESSAGE_FILE, "wav")
+ ]
+ _batch_export_audio(audio_exports)
node_numbers = config.get("Asterisk", {}).get("Nodes", [])
for node_number in node_numbers:
@@ -881,7 +1031,12 @@ def say_alerts(alerts):
command = '/usr/sbin/asterisk -rx "rpt localplay {} {}"'.format(
node_number, os.path.splitext(os.path.abspath(alert_file))[0]
)
- subprocess.run(command, shell=True)
+ try:
+ subprocess.run(command, shell=True, timeout=30, check=False)
+ except subprocess.TimeoutExpired:
+ LOGGER.error(f"Subprocess timeout for command: {command}")
+ except Exception as e:
+ LOGGER.error(f"Subprocess error for command {command}: {e}")
# Get the duration of the alert_file
with contextlib.closing(wave.open(alert_file, "r")) as f:
@@ -917,12 +1072,12 @@ def say_allclear():
)
swp_147_file = os.path.join(SOUNDS_PATH, "ALERTS", "SWP_147.wav")
- # Load sound files into AudioSegment objects
- all_clear_sound = AudioSegment.from_wav(all_clear_sound_file)
- swp_147_sound = AudioSegment.from_wav(swp_147_file)
+ # Load sound files into AudioSegment objects with caching
+ all_clear_sound = _get_cached_audio(all_clear_sound_file)
+ swp_147_sound = _get_cached_audio(swp_147_file)
# Generate silence for spacing between sounds
- silence = AudioSegment.silent(duration=600) # 600 ms of silence
+ silence = _create_silence_cached(600) # 600 ms of silence
# Combine the "all clear" sound and SWP_147 sound with the configured silence between them
combined_sound = all_clear_sound + silence + swp_147_sound
@@ -930,19 +1085,17 @@ def say_allclear():
# Add a delay before the sound if configured
if AUDIO_DELAY > 0:
LOGGER.debug("sayAllClear: Prepending audio with %sms of silence", AUDIO_DELAY)
- delay_silence = AudioSegment.silent(duration=AUDIO_DELAY)
+ delay_silence = _create_silence_cached(AUDIO_DELAY)
combined_sound = delay_silence + combined_sound
# Append a suffix to the sound if configured
if config.get("Alerting", {}).get("SayAllClearSuffix", None) is not None:
- suffix_silence = AudioSegment.silent(
- duration=600
- ) # 600ms silence before the suffix
+ suffix_silence = _create_silence_cached(600) # 600ms silence before the suffix
suffix_file = os.path.join(
SOUNDS_PATH, config.get("Alerting", {}).get("SayAllClearSuffix")
)
LOGGER.debug("sayAllClear: Adding all clear suffix %s", suffix_file)
- suffix_sound = AudioSegment.from_wav(suffix_file)
+ suffix_sound = _get_cached_audio(suffix_file)
combined_sound += (
suffix_silence + suffix_sound
) # Append the silence and then the suffix to the combined sound
@@ -961,7 +1114,12 @@ def say_allclear():
command = '/usr/sbin/asterisk -rx "rpt localplay {} {}"'.format(
node_number, os.path.splitext(os.path.abspath(all_clear_file))[0]
)
- subprocess.run(command, shell=True)
+ try:
+ subprocess.run(command, shell=True, timeout=30, check=False)
+ except subprocess.TimeoutExpired:
+ LOGGER.error(f"Subprocess timeout for command: {command}")
+ except Exception as e:
+ LOGGER.error(f"Subprocess error for command {command}: {e}")
def build_tailmessage(alerts):
@@ -982,13 +1140,14 @@ def build_tailmessage(alerts):
# If alerts is empty
if not alerts:
LOGGER.debug("buildTailMessage: No alerts, creating silent tailmessage")
- silence = AudioSegment.silent(duration=100)
+ silence = _create_silence_cached(100)
converted_silence = convert_audio(silence)
converted_silence.export(TAILMESSAGE_FILE, format="wav")
return
+ AudioSegment = _lazy_import_pydub()
combined_sound = AudioSegment.empty()
- sound_effect = AudioSegment.from_wav(
+ sound_effect = _get_cached_audio(
os.path.join(
SOUNDS_PATH,
"ALERTS",
@@ -1013,7 +1172,7 @@ def build_tailmessage(alerts):
try:
index = ALERT_STRINGS.index(alert)
- audio_file = AudioSegment.from_wav(
+ audio_file = _get_cached_audio(
os.path.join(
SOUNDS_PATH, "ALERTS", "SWP_{}.wav".format(ALERT_INDEXES[index])
)
@@ -1033,11 +1192,11 @@ def build_tailmessage(alerts):
"buildTailMessage: Found multiple unique instances of the alert %s",
alert,
)
- multiples_sound = AudioSegment.from_wav(
+ multiples_sound = _get_cached_audio(
os.path.join(SOUNDS_PATH, "ALERTS", "SWP_149.wav")
)
combined_sound += (
- AudioSegment.silent(duration=200) + multiples_sound
+ _create_silence_cached(200) + multiples_sound
)
# Add county names if they exist
@@ -1045,9 +1204,9 @@ def build_tailmessage(alerts):
for county in counties:
# if its the first county, word_space is 600ms of silence. else it is 400ms
if counties.index(county) == 0:
- word_space = AudioSegment.silent(duration=600)
+ word_space = _create_silence_cached(600)
else:
- word_space = AudioSegment.silent(duration=400)
+ word_space = _create_silence_cached(400)
county_code = county["county_code"]
if (
COUNTY_WAVS
@@ -1063,12 +1222,12 @@ def build_tailmessage(alerts):
alert,
)
try:
- combined_sound += word_space + AudioSegment.from_wav(
+ combined_sound += word_space + _get_cached_audio(
os.path.join(SOUNDS_PATH, county_name_file)
)
# if this is the last county name, add 600ms of silence after the county name
if counties.index(county) == len(counties) - 1:
- combined_sound += AudioSegment.silent(duration=600)
+ combined_sound += _create_silence_cached(600)
added_counties.add(county_code)
except FileNotFoundError:
LOGGER.error(
@@ -1089,20 +1248,21 @@ def build_tailmessage(alerts):
LOGGER.debug(
"buildTailMessage: All alerts were blocked, creating silent tailmessage"
)
- combined_sound = AudioSegment.silent(duration=100)
+ combined_sound = _create_silence_cached(100)
elif tailmessage_suffix is not None:
- suffix_silence = AudioSegment.silent(duration=1000)
+ suffix_silence = _create_silence_cached(1000)
LOGGER.debug(
"buildTailMessage: Adding tailmessage suffix %s", tailmessage_suffix
)
suffix_file = os.path.join(SOUNDS_PATH, tailmessage_suffix)
- suffix_sound = AudioSegment.from_wav(suffix_file)
+ suffix_sound = _get_cached_audio(suffix_file)
combined_sound += suffix_silence + suffix_sound
if AUDIO_DELAY > 0:
LOGGER.debug(
"buildTailMessage: Prepending audio with %sms of silence", AUDIO_DELAY
)
+ AudioSegment = _lazy_import_pydub()
silence = AudioSegment.silent(duration=AUDIO_DELAY)
combined_sound = silence + combined_sound
@@ -1173,13 +1333,23 @@ def alert_script(alerts):
if command["Type"].upper() == "BASH":
for cmd in command["Commands"]:
LOGGER.info("Executing Active BASH Command: %s", cmd)
- subprocess.run(cmd, shell=True)
+ try:
+ subprocess.run(cmd, shell=True, timeout=30, check=False)
+ except subprocess.TimeoutExpired:
+ LOGGER.error(f"Subprocess timeout for command: {cmd}")
+ except Exception as e:
+ LOGGER.error(f"Subprocess error for command {cmd}: {e}")
elif command["Type"].upper() == "DTMF":
for node in command["Nodes"]:
for cmd in command["Commands"]:
dtmf_cmd = 'asterisk -rx "rpt fun {} {}"'.format(node, cmd)
LOGGER.info("Executing Active DTMF Command: %s", dtmf_cmd)
- subprocess.run(dtmf_cmd, shell=True)
+ try:
+ subprocess.run(dtmf_cmd, shell=True, timeout=30, check=False)
+ except subprocess.TimeoutExpired:
+ LOGGER.error(f"Subprocess timeout for DTMF command: {dtmf_cmd}")
+ except Exception as e:
+ LOGGER.error(f"Subprocess error for DTMF command {dtmf_cmd}: {e}")
# Check for transition from non-zero to zero active alerts and execute InactiveCommands
if previous_active_count > 0 and current_active_count == 0:
@@ -1189,13 +1359,23 @@ def alert_script(alerts):
if command["Type"].upper() == "BASH":
for cmd in command["Commands"]:
LOGGER.info("Executing Inactive BASH Command: %s", cmd)
- subprocess.run(cmd, shell=True)
+ try:
+ subprocess.run(cmd, shell=True, timeout=30, check=False)
+ except subprocess.TimeoutExpired:
+ LOGGER.error(f"Subprocess timeout for command: {cmd}")
+ except Exception as e:
+ LOGGER.error(f"Subprocess error for command {cmd}: {e}")
elif command["Type"].upper() == "DTMF":
for node in command["Nodes"]:
for cmd in command["Commands"]:
dtmf_cmd = 'asterisk -rx "rpt fun {} {}"'.format(node, cmd)
LOGGER.info("Executing Inactive DTMF Command: %s", dtmf_cmd)
- subprocess.run(dtmf_cmd, shell=True)
+ try:
+ subprocess.run(dtmf_cmd, shell=True, timeout=30, check=False)
+ except subprocess.TimeoutExpired:
+ LOGGER.error(f"Subprocess timeout for DTMF command: {dtmf_cmd}")
+ except Exception as e:
+ LOGGER.error(f"Subprocess error for DTMF command {dtmf_cmd}: {e}")
# Fetch Mappings from AlertScript configuration
mappings = alertScript_config.get("Mappings", [])
@@ -1237,7 +1417,12 @@ def alert_script(alerts):
alert_title=alert
) # Replace placeholder with alert title
LOGGER.info("AlertScript: Executing BASH command: %s", cmd)
- subprocess.run(cmd, shell=True)
+ try:
+ subprocess.run(cmd, shell=True, timeout=30, check=False)
+ except subprocess.TimeoutExpired:
+ LOGGER.error(f"Subprocess timeout for command: {cmd}")
+ except Exception as e:
+ LOGGER.error(f"Subprocess error for command {cmd}: {e}")
elif mapping.get("Type") == "DTMF":
for node in nodes:
for cmd in commands:
@@ -1245,7 +1430,12 @@ def alert_script(alerts):
LOGGER.info(
"AlertScript: Executing DTMF command: %s", dtmf_cmd
)
- subprocess.run(dtmf_cmd, shell=True)
+ try:
+ subprocess.run(dtmf_cmd, shell=True, timeout=30, check=False)
+ except subprocess.TimeoutExpired:
+ LOGGER.error(f"Subprocess timeout for DTMF command: {dtmf_cmd}")
+ except Exception as e:
+ LOGGER.error(f"Subprocess error for DTMF command {dtmf_cmd}: {e}")
# Process each mapping for cleared alerts
for mapping in mappings:
@@ -1269,14 +1459,24 @@ def alert_script(alerts):
LOGGER.debug("Executing clear command: %s", cmd)
if mapping.get("Type") == "BASH":
LOGGER.info("AlertScript: Executing BASH ClearCommand: %s", cmd)
- subprocess.run(cmd, shell=True)
+ try:
+ subprocess.run(cmd, shell=True, timeout=30, check=False)
+ except subprocess.TimeoutExpired:
+ LOGGER.error(f"Subprocess timeout for command: {cmd}")
+ except Exception as e:
+ LOGGER.error(f"Subprocess error for command {cmd}: {e}")
elif mapping.get("Type") == "DTMF":
for node in mapping.get("Nodes", []):
dtmf_cmd = 'asterisk -rx "rpt fun {} {}"'.format(node, cmd)
LOGGER.info(
"AlertScript: Executing DTMF ClearCommand: %s", dtmf_cmd
)
- subprocess.run(dtmf_cmd, shell=True)
+ try:
+ subprocess.run(dtmf_cmd, shell=True, timeout=30, check=False)
+ except subprocess.TimeoutExpired:
+ LOGGER.error(f"Subprocess timeout for DTMF command: {dtmf_cmd}")
+ except Exception as e:
+ LOGGER.error(f"Subprocess error for DTMF command {dtmf_cmd}: {e}")
# Update the state with the alerts processed in this run
state["alertscript_alerts"] = list(
@@ -1539,9 +1739,17 @@ def supermon_back_compat(alerts, county_data):
# Check write permissions before writing to the file
if os.access("/tmp/AUTOSKY", os.W_OK):
- with open("/tmp/AUTOSKY/warnings.txt", "w") as file:
- file.write("
".join(alert_titles_with_counties))
- LOGGER.debug("Successfully wrote alerts to /tmp/AUTOSKY/warnings.txt")
+ # Atomic write using temporary file
+ temp_file = "/tmp/AUTOSKY/warnings.txt.tmp"
+ try:
+ with open(temp_file, "w") as file:
+ file.write("
".join(alert_titles_with_counties))
+ os.replace(temp_file, "/tmp/AUTOSKY/warnings.txt")
+ LOGGER.debug("Successfully wrote alerts to /tmp/AUTOSKY/warnings.txt")
+ except Exception as e:
+ LOGGER.error(f"Failed to write to /tmp/AUTOSKY/warnings.txt: {e}")
+ if os.path.exists(temp_file):
+ os.unlink(temp_file)
else:
LOGGER.error("No write permission for /tmp/AUTOSKY")
@@ -1554,11 +1762,19 @@ def supermon_back_compat(alerts, county_data):
# Check write permissions before writing to the file
if os.access("/var/www/html/AUTOSKY", os.W_OK):
- with open("/var/www/html/AUTOSKY/warnings.txt", "w") as file:
- file.write("
".join(alert_titles_with_counties))
- LOGGER.debug(
- "Successfully wrote alerts to /var/www/html/AUTOSKY/warnings.txt"
- )
+ # Atomic write using temporary file
+ temp_file = "/var/www/html/AUTOSKY/warnings.txt.tmp"
+ try:
+ with open(temp_file, "w") as file:
+ file.write("
".join(alert_titles_with_counties))
+ os.replace(temp_file, "/var/www/html/AUTOSKY/warnings.txt")
+ LOGGER.debug(
+ "Successfully wrote alerts to /var/www/html/AUTOSKY/warnings.txt"
+ )
+ except Exception as e:
+ LOGGER.error(f"Failed to write to /var/www/html/AUTOSKY/warnings.txt: {e}")
+ if os.path.exists(temp_file):
+ os.unlink(temp_file)
else:
LOGGER.error("No write permission for /var/www/html/AUTOSKY")
@@ -1599,9 +1815,9 @@ def ast_var_update():
with open(allstar_env_path, "r") as file:
for line in file:
if line.startswith("export "):
- key, value = line.split("=", 1)
- key = key.split()[1].strip()
- value = value.strip().strip('"')
+ key, value = REGEX_PATTERNS["split_equals"].split(line, 1)
+ key = REGEX_PATTERNS["split_space"].split(key)[1].strip()
+ value = REGEX_PATTERNS["strip_quotes"].sub("", value.strip())
env_vars[key] = value
LOGGER.debug(
"ast_var_update: Found environment variable %s = %s", key, value
@@ -1616,7 +1832,8 @@ def ast_var_update():
with open(node_info_path, "r") as file:
for line in file:
if line.startswith("NODE="):
- node_value = line.split("=", 1)[1].strip().strip('"')
+ node_value = REGEX_PATTERNS["split_equals"].split(line, 1)[1].strip()
+ node_value = REGEX_PATTERNS["strip_quotes"].sub("", node_value)
if node_value.startswith("$"):
env_var_name = node_value[1:]
NODE = env_vars.get(env_var_name, "")
@@ -1624,10 +1841,12 @@ def ast_var_update():
NODE = node_value
LOGGER.debug("ast_var_update: NODE set to %s", NODE)
elif line.startswith("WX_CODE="):
- WX_CODE = line.split("=", 1)[1].strip().strip('"')
+ WX_CODE = REGEX_PATTERNS["split_equals"].split(line, 1)[1].strip()
+ WX_CODE = REGEX_PATTERNS["strip_quotes"].sub("", WX_CODE)
LOGGER.debug("ast_var_update: WX_CODE set to %s", WX_CODE)
elif line.startswith("WX_LOCATION="):
- WX_LOCATION = line.split("=", 1)[1].strip().strip('"')
+ WX_LOCATION = REGEX_PATTERNS["split_equals"].split(line, 1)[1].strip()
+ WX_LOCATION = REGEX_PATTERNS["strip_quotes"].sub("", WX_LOCATION)
LOGGER.debug("ast_var_update: WX_LOCATION set to %s", WX_LOCATION)
except Exception as e:
LOGGER.error("ast_var_update: Error reading %s: %s", node_info_path, e)
@@ -1640,7 +1859,7 @@ def ast_var_update():
try:
LOGGER.debug("ast_var_update: Retrieving Asterisk registrations")
registrations = (
- subprocess.check_output(["/bin/asterisk", "-rx", "iax2 show registry"])
+ subprocess.check_output(["/bin/asterisk", "-rx", "iax2 show registry"], timeout=10)
.decode("utf-8")
.splitlines()[1:]
)
@@ -1654,12 +1873,12 @@ def ast_var_update():
else:
nodes = {}
for reg in registrations:
- parts = reg.split()
- node_number = parts[2].split("#")[0]
- server_ip = parts[0].split(":")[0]
+ parts = REGEX_PATTERNS["split_space"].split(reg)
+ node_number = REGEX_PATTERNS["split_hash"].split(parts[2])[0]
+ server_ip = REGEX_PATTERNS["split_colon"].split(parts[0])[0]
try:
server_domain = (
- subprocess.check_output(["dig", "+short", "-x", server_ip])
+ subprocess.check_output(["dig", "+short", "-x", server_ip], timeout=10)
.decode("utf-8")
.strip()
)
@@ -1691,7 +1910,7 @@ def ast_var_update():
try:
LOGGER.debug("ast_var_update: Retrieving system uptime")
cpu_up = "Up since {}".format(
- subprocess.check_output(["uptime", "-s"]).decode("utf-8").strip()
+ subprocess.check_output(["uptime", "-s"], timeout=5).decode("utf-8").strip()
)
LOGGER.debug("ast_var_update: Retrieving CPU load")
@@ -1748,7 +1967,7 @@ def ast_var_update():
wx = "{} (Weather info not available)".format(WX_LOCATION)
# Filter out the \xb0 character from the string
- wx = wx.replace("\xb0", "")
+ wx = REGEX_PATTERNS["degree_symbol"].sub("", wx)
LOGGER.debug("ast_var_update: Weather info display: %s", wx)
@@ -1766,7 +1985,7 @@ def ast_var_update():
alert = "SkywarnPlus Enabled
No Alerts"
else:
# Adjusted to remove both '[' and ']' correctly
- alert_content_cleaned = alert_content.replace("[", "").replace("]", "")
+ alert_content_cleaned = REGEX_PATTERNS["bracket_clean"].sub("", alert_content)
alert = "SkywarnPlus Enabled
{}".format(
alert_content
)
@@ -1831,11 +2050,11 @@ def detect_county_changes(old_alerts, new_alerts):
removed_counties = old_county_codes - new_county_codes
added_counties = {
- code.replace("{", "").replace("}", "").replace('"', "")
+ REGEX_PATTERNS["clean_county_code"].sub("", code)
for code in added_counties
}
removed_counties = {
- code.replace("{", "").replace("}", "").replace('"', "")
+ REGEX_PATTERNS["clean_county_code"].sub("", code)
for code in removed_counties
}
@@ -2148,4 +2367,12 @@ def main():
if __name__ == "__main__":
- main()
+ try:
+ # Optimize data structures for better performance
+ _optimize_data_structures()
+ main()
+ finally:
+ # Ensure state is flushed to disk before exit
+ _flush_state()
+ # Clean up audio cache to free memory
+ _cleanup_audio_cache()
diff --git a/UpdateSWP.py b/UpdateSWP.py
index 19c4b0f..f16fba9 100644
--- a/UpdateSWP.py
+++ b/UpdateSWP.py
@@ -27,9 +27,18 @@ import zipfile
import shutil
import requests
import datetime
-from ruamel.yaml import YAML
+from functools import lru_cache
import argparse
+# Lazy imports for performance
+def _lazy_import_yaml():
+ """Lazy import YAML to avoid loading unless needed."""
+ try:
+ from ruamel.yaml import YAML
+ return YAML()
+ except ImportError:
+ raise ImportError("ruamel.yaml is required")
+
# Set up command line arguments
parser = argparse.ArgumentParser(description="Update SkywarnPlus")
parser.add_argument(
@@ -46,16 +55,17 @@ def log(message):
print("[UPDATE]:", message)
-# Function to load a yaml file
+# Function to load a yaml file with caching
+@lru_cache(maxsize=2)
def load_yaml_file(filename):
- yaml = YAML()
+ yaml = _lazy_import_yaml()
with open(filename, "r") as f:
return yaml.load(f)
# Function to save a yaml file
def save_yaml_file(filename, data):
- yaml = YAML()
+ yaml = _lazy_import_yaml()
yaml.preserve_quotes = True
with open(filename, "w") as f:
yaml.dump(data, f)
@@ -179,7 +189,16 @@ url = (
"https://github.com/Mason10198/SkywarnPlus/releases/latest/download/SkywarnPlus.zip"
)
log("Downloading SkywarnPlus from {}...".format(url))
-response = requests.get(url)
+# Use session for connection pooling
+session = requests.Session()
+session.headers.update({
+ 'User-Agent': 'SkywarnPlus-Update/0.8.0',
+ 'Accept': 'application/zip',
+ 'Accept-Encoding': 'gzip, deflate'
+})
+
+response = session.get(url, timeout=60)
+response.raise_for_status()
with open("/tmp/SkywarnPlus.zip", "wb") as out_file:
out_file.write(response.content)