Initial commit

main
swanie98635 2 months ago
commit f46e53000d

20
.gitignore vendored

@ -0,0 +1,20 @@
# Python
__pycache__/
*.py[cod]
*$py.class
# Config (Keep local secrets private)
config.yaml
asl3_wx_announce/config.yaml
# Audio Output
*.wav
/tmp/asl3_wx/
# IDE
.vscode/
.idea/
# OS
.DS_Store
Thumbs.db

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2026
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

@ -0,0 +1,93 @@
# ASL3 Weather Announcer
**ASL3 Weather Announcer** is a flexible, multi-country weather alert and reporting system designed for AllStarLink 3 (Asterisk) nodes.
It provides **automated verbal announcements** for:
* **Active Weather Alerts**: Warnings, watches, and advisories as they are issued.
* **Daily Reports**: Detailed forecast, current conditions, sunrise/sunset, and moon phase.
* **Time Announcements**: Current local time at start of report.
## Features
* **Multi-Provider Support**:
* 🇺🇸 **USA**: Uses National Weather Service (NWS) API.
* 🇨🇦 **Canada**: Uses Environment Canada data.
* *Extensible*: Plugin architecture allows adding more countries easily.
* **Smart Location**:
* **GPS/GNSS**: Automatically detects location using `gpsd`.
* **Static**: Configurable fallback lat/lon.
* **Auto-Zone**: Automatically monitors the correct County, Forecast Zone, and Fire Weather Zone for your location.
* **Customizable**:
* **Extra Zones**: Manually monitor adjacent counties or specific stations (e.g., `VAC001` or `ON/s0000430`).
* **Audio**: Works with `pico2wave`, `flite`, or any CLI TTS engine. Plays to multiple ASL3 nodes.
## Installation
### Prerequisites
On your ASL3 server (Debian/Raspbian):
```bash
sudo apt update
sudo apt install python3-pip libttspico-utils gpsd
```
### Install Package
1. Clone this repository to your scripts directory (e.g., `/etc/asterisk/scripts/`).
2. Install python dependencies:
```bash
pip3 install -r requirements.txt
```
## Configuration
Copy the example config:
```bash
cp config.yaml.example config.yaml
```
Edit `config.yaml`:
```yaml
location:
type: auto # Use 'auto' for GPS, or 'static' for fixed lat/lon
# latitude: 45.123
# longitude: -75.123
voice:
tts_command: 'pico2wave -w {file} "{text}"'
audio:
nodes:
- "1966" # Your Private Node
- "92394" # Your Public Node
alerts:
min_severity: "Watch"
extra_zones: # Optional: Monitor extra areas
- "VAC001" # US County FIPS
- "ON/s0000430" # Canadian Station ID
```
## Usage
### Test Full Report
Announce current conditions, forecast, and time immediately:
```bash
python3 -m asl3_wx_announce.main --config config.yaml --report
```
### Run Alert Monitor
Run in the background to announce *new* alerts as they happen:
```bash
python3 -m asl3_wx_announce.main --config config.yaml --monitor
```
### Scheduled Hourly Reports
To announce the weather every hour, add to `crontab -u asterisk -e`:
```cron
0 * * * * /usr/bin/python3 -m asl3_wx_announce.main --config /path/to/config.yaml --report
```
## Contributing
Pull requests are welcome! See `provider/` directory to add support for new countries.
## License
MIT License

@ -0,0 +1,44 @@
import os
import subprocess
import logging
from typing import List
class AudioHandler:
def __init__(self, config: dict):
self.config = config
self.logger = logging.getLogger(__name__)
# Default to echo if no TTS configured (just for safety)
self.tts_template = config.get('voice', {}).get('tts_command', 'echo "{text}" > {file}')
self.output_dir = "/tmp/asl3_wx"
os.makedirs(self.output_dir, exist_ok=True)
def generate_audio(self, text: str, filename: str = "announcement.wav") -> str:
filepath = os.path.join(self.output_dir, filename)
# Simple cleanup?
if os.path.exists(filepath):
os.remove(filepath)
cmd = self.tts_template.format(file=filepath, text=text)
self.logger.info(f"Generating audio: {cmd}")
try:
subprocess.run(cmd, shell=True, check=True)
return filepath
except subprocess.CalledProcessError as e:
self.logger.error(f"TTS Failed: {e}")
raise e
def play_on_nodes(self, filepath: str, nodes: List[str]):
# Asterisk uses file path WITHOUT extension usually for play commands,
# but rpt localplay might require full path or specific format.
# Usually: rpt localplay <node> <path_no_ext>
path_no_ext = os.path.splitext(filepath)[0]
for node in nodes:
asterisk_cmd = f'asterisk -rx "rpt localplay {node} {path_no_ext}"'
self.logger.info(f"Playing on {node}: {asterisk_cmd}")
try:
subprocess.run(asterisk_cmd, shell=True, check=True)
except Exception as e:
self.logger.error(f"Playback failed on {node}: {e}")

@ -0,0 +1,45 @@
from typing import Tuple, Dict, Optional
import json
import logging
import socket
class LocationService:
def __init__(self, config: Dict):
self.config = config
self.logger = logging.getLogger(__name__)
def get_coordinates(self) -> Tuple[float, float]:
"""
Returns (lat, lon).
Tries GPS first if configured, then falls back to static config.
"""
if self.config.get('location', {}).get('type') == 'auto':
try:
return self._read_gpsd()
except Exception as e:
self.logger.warning(f"GPS read failed: {e}. Falling back to static.")
# Static fallback
lat = self.config.get('location', {}).get('latitude')
lon = self.config.get('location', {}).get('longitude')
if lat is None or lon is None:
raise ValueError("No valid location coordinates found in config or GPS.")
return float(lat), float(lon)
def _read_gpsd(self) -> Tuple[float, float]:
# Simple socket reader for GPSD standard port 2947
# We start the WATCH command and wait for a TPV report
try:
with socket.create_connection(('localhost', 2947), timeout=2) as sock:
sock.sendall(b'?WATCH={"enable":true,"json":true}\n')
fp = sock.makefile()
for line in fp:
data = json.loads(line)
if data.get('class') == 'TPV':
lat = data.get('lat')
lon = data.get('lon')
if lat and lon:
return lat, lon
except Exception as e:
raise e
raise RuntimeError("No TPV data received from GPSD")

@ -0,0 +1,109 @@
import argparse
import yaml
import time
import logging
from .models import AlertSeverity
from .location import LocationService
from .provider.factory import get_provider_instance
from .provider.astro import AstroProvider
from .narrator import Narrator
from .audio import AudioHandler
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("asl3_wx")
def load_config(path):
with open(path, 'r') as f:
return yaml.safe_load(f)
def do_full_report(config):
loc_svc = LocationService(config)
lat, lon = loc_svc.get_coordinates()
# Provider
prov_code = config.get('location', {}).get('provider')
provider = get_provider_instance(CountryCode=prov_code, Lat=lat, Lon=lon, Config=config)
# Fetch Data
loc_info = provider.get_location_info(lat, lon)
logger.info(f"Resolved Location: {loc_info.city}, {loc_info.region} ({loc_info.country_code})")
conditions = provider.get_conditions(lat, lon)
forecast = provider.get_forecast(lat, lon)
alerts = provider.get_alerts(lat, lon)
# Astro
astro = AstroProvider()
sun_info = astro.get_astro_info(loc_info)
# Narrate
narrator = Narrator()
text = narrator.build_full_report(loc_info, conditions, forecast, alerts, sun_info)
logger.info(f"Report Text: {text}")
# Audio
handler = AudioHandler(config)
wav_file = handler.generate_audio(text, "report.wav")
# Play
nodes = config.get('audio', {}).get('nodes', [])
handler.play_on_nodes(wav_file, nodes)
def monitor_loop(config):
interval = config.get('alerts', {}).get('check_interval_minutes', 10) * 60
known_alerts = set()
loc_svc = LocationService(config)
lat, lon = loc_svc.get_coordinates()
prov_code = config.get('location', {}).get('provider')
provider = get_provider_instance(CountryCode=prov_code, Lat=lat, Lon=lon, Config=config)
narrator = Narrator()
handler = AudioHandler(config)
nodes = config.get('audio', {}).get('nodes', [])
logger.info("Starting Alert Monitor...")
while True:
try:
alerts = provider.get_alerts(lat, lon)
current_ids = {a.id for a in alerts}
new_alerts = []
for a in alerts:
if a.id not in known_alerts:
new_alerts.append(a)
known_alerts.add(a.id)
# Announce new items
if new_alerts:
logger.info(f"New Alerts detected: {len(new_alerts)}")
text = narrator.announce_alerts(new_alerts)
wav = handler.generate_audio(text, "alert.wav")
handler.play_on_nodes(wav, nodes)
# Cleanup expired from known
known_alerts = known_alerts.intersection(current_ids)
except Exception as e:
logger.error(f"Monitor error: {e}")
time.sleep(interval)
def main():
parser = argparse.ArgumentParser(description="ASL3 Weather Announcer")
parser.add_argument("--config", default="config.yaml", help="Path to config file")
parser.add_argument("--report", action="store_true", help="Play full weather report now")
parser.add_argument("--monitor", action="store_true", help="Run in continuous monitor mode")
args = parser.parse_args()
config = load_config(args.config)
if args.report:
do_full_report(config)
elif args.monitor:
monitor_loop(config)
else:
parser.print_help()
if __name__ == "__main__":
main()

@ -0,0 +1,48 @@
from typing import List, Optional
from datetime import datetime
from enum import Enum
from pydantic import BaseModel
class AlertSeverity(str, Enum):
UNKNOWN = "Unknown"
ADVISORY = "Advisory"
WATCH = "Watch"
WARNING = "Warning"
CRITICAL = "Critical"
class WeatherAlert(BaseModel):
id: str
severity: AlertSeverity
title: str
description: str
instruction: Optional[str] = None
area_description: str
effective: datetime
expires: datetime
@property
def is_active(self) -> bool:
now = datetime.now(self.expires.tzinfo)
return self.effective <= now < self.expires
class WeatherForecast(BaseModel):
period_name: str # e.g., "Today", "Tonight", "Monday"
high_temp: Optional[float] # Celsius
low_temp: Optional[float] # Celsius
summary: str
precip_probability: Optional[int]
class CurrentConditions(BaseModel):
temperature: float # Celsius
humidity: Optional[int]
wind_speed: Optional[float] # km/h
wind_direction: Optional[str]
description: str
class LocationInfo(BaseModel):
latitude: float
longitude: float
city: str
region: str # State/Province
country_code: str
timezone: str

@ -0,0 +1,70 @@
from datetime import datetime
from typing import List
from .models import LocationInfo, CurrentConditions, WeatherForecast, WeatherAlert
class Narrator:
def __init__(self):
pass
def announce_conditions(self, loc: LocationInfo, current: CurrentConditions) -> str:
wind = ""
if current.wind_speed and current.wind_speed > 5:
wind = f", with winds from the {current.wind_direction} at {int(current.wind_speed)} kilometers per hour"
return (
f"Current conditions for {loc.city}, {loc.region}. "
f"The temperature is {int(current.temperature)} degrees celsius. "
f"Conditions are {current.description}{wind}."
)
def announce_forecast(self, forecasts: List[WeatherForecast]) -> str:
if not forecasts:
return ""
text = "Here is the forecast. "
for f in forecasts[:3]: # Read first 3 periods
temp = ""
if f.high_temp is not None:
temp = f" with a high of {int(f.high_temp)}"
elif f.low_temp is not None:
temp = f" with a low of {int(f.low_temp)}"
text += f"{f.period_name}: {f.summary}{temp}. "
return text
def announce_alerts(self, alerts: List[WeatherAlert]) -> str:
if not alerts:
return "There are no active weather alerts."
text = f"There are {len(alerts)} active weather alerts. "
for a in alerts:
# "A Severe Thunderstorm Warning is in effect until 5 PM."
expires_str = a.expires.strftime("%I %M %p")
text += f"A {a.title} is in effect until {expires_str}. "
return text
def build_full_report(self, loc: LocationInfo, current: CurrentConditions, forecast: List[WeatherForecast], alerts: List[WeatherAlert], sun_info: str = "") -> str:
parts = []
# Time string: "10 30 PM"
now_str = datetime.now().strftime("%I %M %p")
# Remove leading zero from hour if desired, but TTS usually handles "09" fine.
# For cleaner TTS:
if now_str.startswith("0"):
now_str = now_str[1:]
parts.append(f"Good day. The time is {now_str}.")
parts.append(f"This is the automated weather report for {loc.city}.")
if alerts:
parts.append("Please be advised: " + self.announce_alerts(alerts))
parts.append(self.announce_conditions(loc, current))
parts.append(self.announce_forecast(forecast))
if sun_info:
parts.append(sun_info)
return " ".join(parts)

@ -0,0 +1,33 @@
from datetime import datetime
from astral import LocationInfo as AstralLoc
from astral.sun import sun
from astral.moon import phase
from ..models import LocationInfo
class AstroProvider:
def get_astro_info(self, loc: LocationInfo) -> str:
try:
# Astral uses its own LocationInfo
City = AstralLoc(loc.city, loc.region, loc.timezone, loc.latitude, loc.longitude)
s = sun(City.observer, date=datetime.now(), tzinfo=City.timezone)
sunrise = s['sunrise'].strftime("%I %M %p")
sunset = s['sunset'].strftime("%I %M %p")
# Moon phase: 0..27
ph = phase(datetime.now())
moon_desc = self._describe_phase(ph)
return f"Sunrise is at {sunrise}. Sunset is at {sunset}. The moon is {moon_desc}."
except Exception as e:
return ""
def _describe_phase(self, day: float) -> str:
if day < 1: return "New"
if day < 7: return "Waxing Crescent"
if day < 8: return "First Quarter"
if day < 14: return "Waxing Gibbous"
if day < 15: return "Full"
if day < 21: return "Waning Gibbous"
if day < 22: return "Last Quarter"
return "Waning Crescent"

@ -0,0 +1,42 @@
from abc import ABC, abstractmethod
from typing import List, Tuple
from ..models import LocationInfo, CurrentConditions, WeatherForecast, WeatherAlert
class WeatherProvider(ABC):
"""
Abstract Base Class for Country-Specific Weather Providers.
"""
def __init__(self, **kwargs):
"""
Initialize provider with arbitrary config.
"""
pass
@abstractmethod
def get_location_info(self, lat: float, lon: float) -> LocationInfo:
"""
Resolve lat/lon to standard location info (City, Region, etc.).
"""
pass
@abstractmethod
def get_conditions(self, lat: float, lon: float) -> CurrentConditions:
"""
Get current weather observations.
"""
pass
@abstractmethod
def get_forecast(self, lat: float, lon: float) -> List[WeatherForecast]:
"""
Get daily forecast periods.
"""
pass
@abstractmethod
def get_alerts(self, lat: float, lon: float) -> List[WeatherAlert]:
"""
Get active weather alerts.
"""
pass

@ -0,0 +1,106 @@
from datetime import datetime
from typing import List
from env_canada import ECData
from ..models import LocationInfo, CurrentConditions, WeatherForecast, WeatherAlert, AlertSeverity
from .base import WeatherProvider
class ECProvider(WeatherProvider):
def __init__(self, **kwargs):
self.points_cache = {}
self.extra_zones = kwargs.get('alerts', {}).get('extra_zones', [])
def _get_ec_data(self, lat, lon):
# ECData auto-selects station based on lat/lon
ec = ECData(coordinates=(lat, lon))
ec.update()
return ec
def get_location_info(self, lat: float, lon: float) -> LocationInfo:
ec = self._get_ec_data(lat, lon)
# ECData metadata is nested
meta = ec.metadata
return LocationInfo(
latitude=lat,
longitude=lon,
city=meta.get('location', 'Unknown'),
region=meta.get('province', 'Canada'),
country_code="CA",
timezone="Unknown" # EC lib doesn't trivialy expose TZ
)
def get_conditions(self, lat: float, lon: float) -> CurrentConditions:
ec = self._get_ec_data(lat, lon)
cond = ec.conditions
return CurrentConditions(
temperature=float(cond.get('temperature', {}).get('value', 0)),
humidity=int(cond.get('humidity', {}).get('value') or 0),
wind_speed=float(cond.get('wind_speed', {}).get('value') or 0),
wind_direction=cond.get('wind_direction', {}).get('value'),
description=cond.get('condition', 'Unknown')
)
def get_forecast(self, lat: float, lon: float) -> List[WeatherForecast]:
ec = self._get_ec_data(lat, lon)
daily = ec.daily_forecasts
forecasts = []
for d in daily:
forecasts.append(WeatherForecast(
period_name=d.get('period'),
high_temp=float(d.get('temperature')) if d.get('temperature') else None,
low_temp=None, # EC daily structure is period-based (e.g. "Monday", "Monday Night")
summary=d.get('text_summary', ''),
precip_probability=int(d.get('precip_probability') or 0)
))
return forecasts
def get_alerts(self, lat: float, lon: float) -> List[WeatherAlert]:
ec_objects = [self._get_ec_data(lat, lon)]
# Add extra zones (Station IDs e.g., 'ON/s0000430')
for zone_id in self.extra_zones:
if "/" in zone_id: # Basic check if it looks like EC station ID
try:
ec_objects.append(ECData(station_id=zone_id))
except Exception:
pass
results = []
now = datetime.now()
seen_titles = set()
for ec in ec_objects:
try:
if not getattr(ec, 'conditions', None): # Ensure updated
ec.update()
for a in ec.alerts:
title = a.get('title', '')
if title in seen_titles: continue
seen_titles.add(title)
# Mapping severity roughly
severity = AlertSeverity.UNKNOWN
if "warning" in title.lower(): severity = AlertSeverity.WARNING
elif "watch" in title.lower(): severity = AlertSeverity.WATCH
elif "advisory" in title.lower(): severity = AlertSeverity.ADVISORY
elif "statement" in title.lower(): severity = AlertSeverity.ADVISORY
# Using current time as dummy effective/expires if missing
eff = a.get('date')
results.append(WeatherAlert(
id=title,
severity=severity,
title=title,
description=a.get('detail', ''),
area_description=ec.metadata.get('location', 'Local Area'),
effective=now,
expires=now
))
except Exception:
continue
return results

@ -0,0 +1,45 @@
import reverse_geocoder as rg
from typing import Type
from .base import WeatherProvider
from .nws import NWSProvider
from .ec import ECProvider
# Registry to hold provider classes
_PROVIDERS = {
"US": NWSProvider,
"CA": ECProvider
}
def register_provider(country_code: str, provider_cls: Type[WeatherProvider]):
_PROVIDERS[country_code.upper()] = provider_cls
def get_provider_class(lat: float, lon: float) -> Type[WeatherProvider]:
"""
Determines the appropriate provider class based on location.
"""
try:
# result is a list of dicts: [{'lat': '...', 'lon': '...', 'name': 'City', 'admin1': 'Region', 'cc': 'US'}]
results = rg.search((lat, lon))
cc = results[0]['cc'].upper()
if cc in _PROVIDERS:
return _PROVIDERS[cc]
print(f"Warning: No explicit provider for {cc}, defaulting to generic if possible or erroring.")
raise ValueError(f"No weather provider found for country code: {cc}")
except Exception as e:
raise e
def get_provider_instance(CountryCode: str = None, Lat: float = None, Lon: float = None, Config: dict = None) -> WeatherProvider:
if CountryCode:
cls = _PROVIDERS.get(CountryCode.upper())
if not cls:
raise ValueError(f"Unknown country code: {CountryCode}")
return cls(**(Config or {}))
if Lat is not None and Lon is not None:
cls = get_provider_class(Lat, Lon)
return cls(**(Config or {}))
raise ValueError("Must provide either CountryCode or Lat/Lon to select provider.")

@ -0,0 +1,145 @@
import requests
from datetime import datetime
from typing import List
from dateutil.parser import parse as parse_date
from ..models import LocationInfo, CurrentConditions, WeatherForecast, WeatherAlert, AlertSeverity
from .base import WeatherProvider
class NWSProvider(WeatherProvider):
USER_AGENT = "(asl3-wx-announce, contact@example.com)"
def __init__(self, **kwargs):
self.points_cache = {}
self.extra_zones = kwargs.get('alerts', {}).get('extra_zones', [])
def _headers(self):
return {"User-Agent": self.USER_AGENT, "Accept": "application/geo+json"}
def _get_point_metadata(self, lat, lon):
key = f"{lat},{lon}"
if key in self.points_cache:
return self.points_cache[key]
url = f"https://api.weather.gov/points/{lat},{lon}"
resp = requests.get(url, headers=self._headers())
resp.raise_for_status()
data = resp.json()
self.points_cache[key] = data['properties']
return data['properties']
def get_location_info(self, lat: float, lon: float) -> LocationInfo:
meta = self._get_point_metadata(lat, lon)
props = meta.get('relativeLocation', {}).get('properties', {})
return LocationInfo(
latitude=lat,
longitude=lon,
city=props.get('city', 'Unknown'),
region=props.get('state', 'US'),
country_code="US",
timezone=meta.get('timeZone', 'UTC')
)
def get_conditions(self, lat: float, lon: float) -> CurrentConditions:
# NWS "current conditions" often requires finding a nearby station first
# For simplicity, we can sometimes pull from the gridpoint "now", but standard practice
# is to hit the stations endpoint.
meta = self._get_point_metadata(lat, lon)
stations_url = meta['observationStations']
# Get first station
s_resp = requests.get(stations_url, headers=self._headers())
s_data = s_resp.json()
station_id = s_data['features'][0]['properties']['stationIdentifier']
# Get obs
obs_url = f"https://api.weather.gov/stations/{station_id}/observations/latest"
o_resp = requests.get(obs_url, headers=self._headers())
props = o_resp.json()['properties']
temp_c = props.get('temperature', {}).get('value')
return CurrentConditions(
temperature=temp_c if temp_c is not None else 0.0,
humidity=props.get('relativeHumidity', {}).get('value'),
wind_speed=props.get('windSpeed', {}).get('value'),
wind_direction=str(props.get('windDirection', {}).get('value')),
description=props.get('textDescription', 'Unknown')
)
def get_forecast(self, lat: float, lon: float) -> List[WeatherForecast]:
meta = self._get_point_metadata(lat, lon)
forecast_url = meta['forecast']
resp = requests.get(forecast_url, headers=self._headers())
periods = resp.json()['properties']['periods']
forecasts = []
for p in periods[:4]: # Just next few periods
# NWS gives temp in F sometimes, but API usually defaults to F.
# We strictly want models in C, so check unit.
temp = p.get('temperature')
unit = p.get('temperatureUnit')
if unit == 'F':
temp = (temp - 32) * 5.0/9.0
summary = p['detailedForecast']
forecasts.append(WeatherForecast(
period_name=p['name'],
high_temp=temp if p['isDaytime'] else None,
low_temp=temp if not p['isDaytime'] else None,
summary=summary,
precip_probability=p.get('probabilityOfPrecipitation', {}).get('value')
))
return forecasts
def get_alerts(self, lat: float, lon: float) -> List[WeatherAlert]:
meta = self._get_point_metadata(lat, lon)
# Extract zone IDs (e.g., https://api.weather.gov/zones/forecast/MDZ013)
# We want the basename
def get_id(url):
return url.split('/')[-1] if url else None
zones = set(self.extra_zones)
zones.add(get_id(meta.get('county')))
zones.add(get_id(meta.get('fireWeatherZone')))
zones.add(get_id(meta.get('forecastZone')))
zones.discard(None) # Remove None if any failed
if not zones:
# Fallback to point if no zones found (unlikely)
url = f"https://api.weather.gov/alerts/active?point={lat},{lon}"
else:
# Join zones: active?zone=MDZ013,MDC031
zone_str = ",".join(zones)
url = f"https://api.weather.gov/alerts/active?zone={zone_str}"
resp = requests.get(url, headers=self._headers())
features = resp.json().get('features', [])
alerts = []
for f in features:
props = f['properties']
# Map severity
sev_str = props.get('severity', 'Unknown')
severity = AlertSeverity.UNKNOWN
if sev_str == 'Severe': severity = AlertSeverity.WARNING
event_type = props.get('event', '').upper()
if "WARNING" in event_type: severity = AlertSeverity.WARNING
elif "WATCH" in event_type: severity = AlertSeverity.WATCH
elif "ADVISORY" in event_type: severity = AlertSeverity.ADVISORY
elif "STATEMENT" in event_type: severity = AlertSeverity.ADVISORY
alerts.append(WeatherAlert(
id=props['id'],
severity=severity,
title=props['event'],
description=props['description'] or "",
instruction=props.get('instruction'),
area_description=props.get('areaDesc', ''),
effective=parse_date(props['effective']),
expires=parse_date(props['expires'])
))
return alerts

@ -0,0 +1,20 @@
location:
type: auto # or static
# latitude: 45.4215
# longitude: -75.6972
# provider: CA # Optional: force US or CA
voice:
# Example utilizing pico2wave (sudo apt install libttspico-utils)
tts_command: 'pico2wave -w {file} "{text}"'
nodes:
- "YOUR_NODE_NUMBER_HERE"
alerts:
check_interval_minutes: 10
min_severity: "Watch"
# Optional: List of NWS Zones/Counties to monitor in addition to current location
# extra_zones:
# - "VAC001"
# - "MDZ013"

@ -0,0 +1,7 @@
requests
astral
pydantic
pyyaml
env_canada
reverse_geocoder
numpy
Loading…
Cancel
Save

Powered by TurnKey Linux.