commit f46e53000dc6deaea2ef84c2ed1ce560a85e16c3 Author: swanie98635 Date: Sun Jan 11 19:14:07 2026 -0800 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4ab03c8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,20 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class + +# Config (Keep local secrets private) +config.yaml +asl3_wx_announce/config.yaml + +# Audio Output +*.wav +/tmp/asl3_wx/ + +# IDE +.vscode/ +.idea/ + +# OS +.DS_Store +Thumbs.db diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..14fac91 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..c73f20c --- /dev/null +++ b/README.md @@ -0,0 +1,93 @@ +# ASL3 Weather Announcer + +**ASL3 Weather Announcer** is a flexible, multi-country weather alert and reporting system designed for AllStarLink 3 (Asterisk) nodes. + +It provides **automated verbal announcements** for: +* **Active Weather Alerts**: Warnings, watches, and advisories as they are issued. +* **Daily Reports**: Detailed forecast, current conditions, sunrise/sunset, and moon phase. +* **Time Announcements**: Current local time at start of report. + +## Features + +* **Multi-Provider Support**: + * πŸ‡ΊπŸ‡Έ **USA**: Uses National Weather Service (NWS) API. + * πŸ‡¨πŸ‡¦ **Canada**: Uses Environment Canada data. + * *Extensible*: Plugin architecture allows adding more countries easily. +* **Smart Location**: + * **GPS/GNSS**: Automatically detects location using `gpsd`. + * **Static**: Configurable fallback lat/lon. + * **Auto-Zone**: Automatically monitors the correct County, Forecast Zone, and Fire Weather Zone for your location. +* **Customizable**: + * **Extra Zones**: Manually monitor adjacent counties or specific stations (e.g., `VAC001` or `ON/s0000430`). + * **Audio**: Works with `pico2wave`, `flite`, or any CLI TTS engine. Plays to multiple ASL3 nodes. + +## Installation + +### Prerequisites +On your ASL3 server (Debian/Raspbian): +```bash +sudo apt update +sudo apt install python3-pip libttspico-utils gpsd +``` + +### Install Package +1. Clone this repository to your scripts directory (e.g., `/etc/asterisk/scripts/`). +2. Install python dependencies: + ```bash + pip3 install -r requirements.txt + ``` + +## Configuration + +Copy the example config: +```bash +cp config.yaml.example config.yaml +``` + +Edit `config.yaml`: +```yaml +location: + type: auto # Use 'auto' for GPS, or 'static' for fixed lat/lon + # latitude: 45.123 + # longitude: -75.123 + +voice: + tts_command: 'pico2wave -w {file} "{text}"' + +audio: + nodes: + - "1966" # Your Private Node + - "92394" # Your Public Node + +alerts: + min_severity: "Watch" + extra_zones: # Optional: Monitor extra areas + - "VAC001" # US County FIPS + - "ON/s0000430" # Canadian Station ID +``` + +## Usage + +### Test Full Report +Announce current conditions, forecast, and time immediately: +```bash +python3 -m asl3_wx_announce.main --config config.yaml --report +``` + +### Run Alert Monitor +Run in the background to announce *new* alerts as they happen: +```bash +python3 -m asl3_wx_announce.main --config config.yaml --monitor +``` + +### Scheduled Hourly Reports +To announce the weather every hour, add to `crontab -u asterisk -e`: +```cron +0 * * * * /usr/bin/python3 -m asl3_wx_announce.main --config /path/to/config.yaml --report +``` + +## Contributing +Pull requests are welcome! See `provider/` directory to add support for new countries. + +## License +MIT License diff --git a/asl3_wx_announce/audio.py b/asl3_wx_announce/audio.py new file mode 100644 index 0000000..a352134 --- /dev/null +++ b/asl3_wx_announce/audio.py @@ -0,0 +1,44 @@ +import os +import subprocess +import logging +from typing import List + +class AudioHandler: + def __init__(self, config: dict): + self.config = config + self.logger = logging.getLogger(__name__) + # Default to echo if no TTS configured (just for safety) + self.tts_template = config.get('voice', {}).get('tts_command', 'echo "{text}" > {file}') + self.output_dir = "/tmp/asl3_wx" + os.makedirs(self.output_dir, exist_ok=True) + + def generate_audio(self, text: str, filename: str = "announcement.wav") -> str: + filepath = os.path.join(self.output_dir, filename) + + # Simple cleanup? + if os.path.exists(filepath): + os.remove(filepath) + + cmd = self.tts_template.format(file=filepath, text=text) + self.logger.info(f"Generating audio: {cmd}") + + try: + subprocess.run(cmd, shell=True, check=True) + return filepath + except subprocess.CalledProcessError as e: + self.logger.error(f"TTS Failed: {e}") + raise e + + def play_on_nodes(self, filepath: str, nodes: List[str]): + # Asterisk uses file path WITHOUT extension usually for play commands, + # but rpt localplay might require full path or specific format. + # Usually: rpt localplay + path_no_ext = os.path.splitext(filepath)[0] + + for node in nodes: + asterisk_cmd = f'asterisk -rx "rpt localplay {node} {path_no_ext}"' + self.logger.info(f"Playing on {node}: {asterisk_cmd}") + try: + subprocess.run(asterisk_cmd, shell=True, check=True) + except Exception as e: + self.logger.error(f"Playback failed on {node}: {e}") diff --git a/asl3_wx_announce/location.py b/asl3_wx_announce/location.py new file mode 100644 index 0000000..9762c2a --- /dev/null +++ b/asl3_wx_announce/location.py @@ -0,0 +1,45 @@ +from typing import Tuple, Dict, Optional +import json +import logging +import socket + +class LocationService: + def __init__(self, config: Dict): + self.config = config + self.logger = logging.getLogger(__name__) + + def get_coordinates(self) -> Tuple[float, float]: + """ + Returns (lat, lon). + Tries GPS first if configured, then falls back to static config. + """ + if self.config.get('location', {}).get('type') == 'auto': + try: + return self._read_gpsd() + except Exception as e: + self.logger.warning(f"GPS read failed: {e}. Falling back to static.") + + # Static fallback + lat = self.config.get('location', {}).get('latitude') + lon = self.config.get('location', {}).get('longitude') + if lat is None or lon is None: + raise ValueError("No valid location coordinates found in config or GPS.") + return float(lat), float(lon) + + def _read_gpsd(self) -> Tuple[float, float]: + # Simple socket reader for GPSD standard port 2947 + # We start the WATCH command and wait for a TPV report + try: + with socket.create_connection(('localhost', 2947), timeout=2) as sock: + sock.sendall(b'?WATCH={"enable":true,"json":true}\n') + fp = sock.makefile() + for line in fp: + data = json.loads(line) + if data.get('class') == 'TPV': + lat = data.get('lat') + lon = data.get('lon') + if lat and lon: + return lat, lon + except Exception as e: + raise e + raise RuntimeError("No TPV data received from GPSD") diff --git a/asl3_wx_announce/main.py b/asl3_wx_announce/main.py new file mode 100644 index 0000000..202c525 --- /dev/null +++ b/asl3_wx_announce/main.py @@ -0,0 +1,109 @@ +import argparse +import yaml +import time +import logging +from .models import AlertSeverity +from .location import LocationService +from .provider.factory import get_provider_instance +from .provider.astro import AstroProvider +from .narrator import Narrator +from .audio import AudioHandler + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("asl3_wx") + +def load_config(path): + with open(path, 'r') as f: + return yaml.safe_load(f) + +def do_full_report(config): + loc_svc = LocationService(config) + lat, lon = loc_svc.get_coordinates() + + # Provider + prov_code = config.get('location', {}).get('provider') + provider = get_provider_instance(CountryCode=prov_code, Lat=lat, Lon=lon, Config=config) + + # Fetch Data + loc_info = provider.get_location_info(lat, lon) + logger.info(f"Resolved Location: {loc_info.city}, {loc_info.region} ({loc_info.country_code})") + + conditions = provider.get_conditions(lat, lon) + forecast = provider.get_forecast(lat, lon) + alerts = provider.get_alerts(lat, lon) + + # Astro + astro = AstroProvider() + sun_info = astro.get_astro_info(loc_info) + + # Narrate + narrator = Narrator() + text = narrator.build_full_report(loc_info, conditions, forecast, alerts, sun_info) + logger.info(f"Report Text: {text}") + + # Audio + handler = AudioHandler(config) + wav_file = handler.generate_audio(text, "report.wav") + + # Play + nodes = config.get('audio', {}).get('nodes', []) + handler.play_on_nodes(wav_file, nodes) + +def monitor_loop(config): + interval = config.get('alerts', {}).get('check_interval_minutes', 10) * 60 + known_alerts = set() + + loc_svc = LocationService(config) + lat, lon = loc_svc.get_coordinates() + prov_code = config.get('location', {}).get('provider') + provider = get_provider_instance(CountryCode=prov_code, Lat=lat, Lon=lon, Config=config) + narrator = Narrator() + handler = AudioHandler(config) + nodes = config.get('audio', {}).get('nodes', []) + + logger.info("Starting Alert Monitor...") + + while True: + try: + alerts = provider.get_alerts(lat, lon) + current_ids = {a.id for a in alerts} + + new_alerts = [] + for a in alerts: + if a.id not in known_alerts: + new_alerts.append(a) + known_alerts.add(a.id) + + # Announce new items + if new_alerts: + logger.info(f"New Alerts detected: {len(new_alerts)}") + text = narrator.announce_alerts(new_alerts) + wav = handler.generate_audio(text, "alert.wav") + handler.play_on_nodes(wav, nodes) + + # Cleanup expired from known + known_alerts = known_alerts.intersection(current_ids) + + except Exception as e: + logger.error(f"Monitor error: {e}") + + time.sleep(interval) + +def main(): + parser = argparse.ArgumentParser(description="ASL3 Weather Announcer") + parser.add_argument("--config", default="config.yaml", help="Path to config file") + parser.add_argument("--report", action="store_true", help="Play full weather report now") + parser.add_argument("--monitor", action="store_true", help="Run in continuous monitor mode") + + args = parser.parse_args() + config = load_config(args.config) + + if args.report: + do_full_report(config) + elif args.monitor: + monitor_loop(config) + else: + parser.print_help() + +if __name__ == "__main__": + main() diff --git a/asl3_wx_announce/models.py b/asl3_wx_announce/models.py new file mode 100644 index 0000000..b020928 --- /dev/null +++ b/asl3_wx_announce/models.py @@ -0,0 +1,48 @@ +from typing import List, Optional +from datetime import datetime +from enum import Enum +from pydantic import BaseModel + +class AlertSeverity(str, Enum): + UNKNOWN = "Unknown" + ADVISORY = "Advisory" + WATCH = "Watch" + WARNING = "Warning" + CRITICAL = "Critical" + +class WeatherAlert(BaseModel): + id: str + severity: AlertSeverity + title: str + description: str + instruction: Optional[str] = None + area_description: str + effective: datetime + expires: datetime + + @property + def is_active(self) -> bool: + now = datetime.now(self.expires.tzinfo) + return self.effective <= now < self.expires + +class WeatherForecast(BaseModel): + period_name: str # e.g., "Today", "Tonight", "Monday" + high_temp: Optional[float] # Celsius + low_temp: Optional[float] # Celsius + summary: str + precip_probability: Optional[int] + +class CurrentConditions(BaseModel): + temperature: float # Celsius + humidity: Optional[int] + wind_speed: Optional[float] # km/h + wind_direction: Optional[str] + description: str + +class LocationInfo(BaseModel): + latitude: float + longitude: float + city: str + region: str # State/Province + country_code: str + timezone: str diff --git a/asl3_wx_announce/narrator.py b/asl3_wx_announce/narrator.py new file mode 100644 index 0000000..965c7d7 --- /dev/null +++ b/asl3_wx_announce/narrator.py @@ -0,0 +1,70 @@ +from datetime import datetime +from typing import List +from .models import LocationInfo, CurrentConditions, WeatherForecast, WeatherAlert + +class Narrator: + def __init__(self): + pass + + def announce_conditions(self, loc: LocationInfo, current: CurrentConditions) -> str: + wind = "" + if current.wind_speed and current.wind_speed > 5: + wind = f", with winds from the {current.wind_direction} at {int(current.wind_speed)} kilometers per hour" + + return ( + f"Current conditions for {loc.city}, {loc.region}. " + f"The temperature is {int(current.temperature)} degrees celsius. " + f"Conditions are {current.description}{wind}." + ) + + def announce_forecast(self, forecasts: List[WeatherForecast]) -> str: + if not forecasts: + return "" + + text = "Here is the forecast. " + for f in forecasts[:3]: # Read first 3 periods + temp = "" + if f.high_temp is not None: + temp = f" with a high of {int(f.high_temp)}" + elif f.low_temp is not None: + temp = f" with a low of {int(f.low_temp)}" + + text += f"{f.period_name}: {f.summary}{temp}. " + + return text + + def announce_alerts(self, alerts: List[WeatherAlert]) -> str: + if not alerts: + return "There are no active weather alerts." + + text = f"There are {len(alerts)} active weather alerts. " + for a in alerts: + # "A Severe Thunderstorm Warning is in effect until 5 PM." + expires_str = a.expires.strftime("%I %M %p") + text += f"A {a.title} is in effect until {expires_str}. " + + return text + + def build_full_report(self, loc: LocationInfo, current: CurrentConditions, forecast: List[WeatherForecast], alerts: List[WeatherAlert], sun_info: str = "") -> str: + parts = [] + + # Time string: "10 30 PM" + now_str = datetime.now().strftime("%I %M %p") + # Remove leading zero from hour if desired, but TTS usually handles "09" fine. + # For cleaner TTS: + if now_str.startswith("0"): + now_str = now_str[1:] + + parts.append(f"Good day. The time is {now_str}.") + parts.append(f"This is the automated weather report for {loc.city}.") + + if alerts: + parts.append("Please be advised: " + self.announce_alerts(alerts)) + + parts.append(self.announce_conditions(loc, current)) + parts.append(self.announce_forecast(forecast)) + + if sun_info: + parts.append(sun_info) + + return " ".join(parts) diff --git a/asl3_wx_announce/provider/astro.py b/asl3_wx_announce/provider/astro.py new file mode 100644 index 0000000..ad6eaf1 --- /dev/null +++ b/asl3_wx_announce/provider/astro.py @@ -0,0 +1,33 @@ +from datetime import datetime +from astral import LocationInfo as AstralLoc +from astral.sun import sun +from astral.moon import phase +from ..models import LocationInfo + +class AstroProvider: + def get_astro_info(self, loc: LocationInfo) -> str: + try: + # Astral uses its own LocationInfo + City = AstralLoc(loc.city, loc.region, loc.timezone, loc.latitude, loc.longitude) + s = sun(City.observer, date=datetime.now(), tzinfo=City.timezone) + + sunrise = s['sunrise'].strftime("%I %M %p") + sunset = s['sunset'].strftime("%I %M %p") + + # Moon phase: 0..27 + ph = phase(datetime.now()) + moon_desc = self._describe_phase(ph) + + return f"Sunrise is at {sunrise}. Sunset is at {sunset}. The moon is {moon_desc}." + except Exception as e: + return "" + + def _describe_phase(self, day: float) -> str: + if day < 1: return "New" + if day < 7: return "Waxing Crescent" + if day < 8: return "First Quarter" + if day < 14: return "Waxing Gibbous" + if day < 15: return "Full" + if day < 21: return "Waning Gibbous" + if day < 22: return "Last Quarter" + return "Waning Crescent" diff --git a/asl3_wx_announce/provider/base.py b/asl3_wx_announce/provider/base.py new file mode 100644 index 0000000..8253bbc --- /dev/null +++ b/asl3_wx_announce/provider/base.py @@ -0,0 +1,42 @@ +from abc import ABC, abstractmethod +from typing import List, Tuple +from ..models import LocationInfo, CurrentConditions, WeatherForecast, WeatherAlert + +class WeatherProvider(ABC): + """ + Abstract Base Class for Country-Specific Weather Providers. + """ + + def __init__(self, **kwargs): + """ + Initialize provider with arbitrary config. + """ + pass + + @abstractmethod + def get_location_info(self, lat: float, lon: float) -> LocationInfo: + """ + Resolve lat/lon to standard location info (City, Region, etc.). + """ + pass + + @abstractmethod + def get_conditions(self, lat: float, lon: float) -> CurrentConditions: + """ + Get current weather observations. + """ + pass + + @abstractmethod + def get_forecast(self, lat: float, lon: float) -> List[WeatherForecast]: + """ + Get daily forecast periods. + """ + pass + + @abstractmethod + def get_alerts(self, lat: float, lon: float) -> List[WeatherAlert]: + """ + Get active weather alerts. + """ + pass diff --git a/asl3_wx_announce/provider/ec.py b/asl3_wx_announce/provider/ec.py new file mode 100644 index 0000000..49f025f --- /dev/null +++ b/asl3_wx_announce/provider/ec.py @@ -0,0 +1,106 @@ +from datetime import datetime +from typing import List +from env_canada import ECData +from ..models import LocationInfo, CurrentConditions, WeatherForecast, WeatherAlert, AlertSeverity +from .base import WeatherProvider + +class ECProvider(WeatherProvider): + def __init__(self, **kwargs): + self.points_cache = {} + self.extra_zones = kwargs.get('alerts', {}).get('extra_zones', []) + + def _get_ec_data(self, lat, lon): + # ECData auto-selects station based on lat/lon + ec = ECData(coordinates=(lat, lon)) + ec.update() + return ec + + def get_location_info(self, lat: float, lon: float) -> LocationInfo: + ec = self._get_ec_data(lat, lon) + # ECData metadata is nested + meta = ec.metadata + return LocationInfo( + latitude=lat, + longitude=lon, + city=meta.get('location', 'Unknown'), + region=meta.get('province', 'Canada'), + country_code="CA", + timezone="Unknown" # EC lib doesn't trivialy expose TZ + ) + + def get_conditions(self, lat: float, lon: float) -> CurrentConditions: + ec = self._get_ec_data(lat, lon) + cond = ec.conditions + + return CurrentConditions( + temperature=float(cond.get('temperature', {}).get('value', 0)), + humidity=int(cond.get('humidity', {}).get('value') or 0), + wind_speed=float(cond.get('wind_speed', {}).get('value') or 0), + wind_direction=cond.get('wind_direction', {}).get('value'), + description=cond.get('condition', 'Unknown') + ) + + def get_forecast(self, lat: float, lon: float) -> List[WeatherForecast]: + ec = self._get_ec_data(lat, lon) + daily = ec.daily_forecasts + + forecasts = [] + for d in daily: + forecasts.append(WeatherForecast( + period_name=d.get('period'), + high_temp=float(d.get('temperature')) if d.get('temperature') else None, + low_temp=None, # EC daily structure is period-based (e.g. "Monday", "Monday Night") + summary=d.get('text_summary', ''), + precip_probability=int(d.get('precip_probability') or 0) + )) + return forecasts + + def get_alerts(self, lat: float, lon: float) -> List[WeatherAlert]: + ec_objects = [self._get_ec_data(lat, lon)] + + # Add extra zones (Station IDs e.g., 'ON/s0000430') + for zone_id in self.extra_zones: + if "/" in zone_id: # Basic check if it looks like EC station ID + try: + ec_objects.append(ECData(station_id=zone_id)) + except Exception: + pass + + results = [] + now = datetime.now() + seen_titles = set() + + for ec in ec_objects: + try: + if not getattr(ec, 'conditions', None): # Ensure updated + ec.update() + + for a in ec.alerts: + title = a.get('title', '') + if title in seen_titles: continue + seen_titles.add(title) + + # Mapping severity roughly + severity = AlertSeverity.UNKNOWN + if "warning" in title.lower(): severity = AlertSeverity.WARNING + elif "watch" in title.lower(): severity = AlertSeverity.WATCH + elif "advisory" in title.lower(): severity = AlertSeverity.ADVISORY + elif "statement" in title.lower(): severity = AlertSeverity.ADVISORY + + # Using current time as dummy effective/expires if missing + eff = a.get('date') + + results.append(WeatherAlert( + id=title, + severity=severity, + title=title, + description=a.get('detail', ''), + area_description=ec.metadata.get('location', 'Local Area'), + effective=now, + expires=now + )) + except Exception: + continue + + return results + diff --git a/asl3_wx_announce/provider/factory.py b/asl3_wx_announce/provider/factory.py new file mode 100644 index 0000000..93df593 --- /dev/null +++ b/asl3_wx_announce/provider/factory.py @@ -0,0 +1,45 @@ +import reverse_geocoder as rg +from typing import Type +from .base import WeatherProvider +from .nws import NWSProvider +from .ec import ECProvider + +# Registry to hold provider classes +_PROVIDERS = { + "US": NWSProvider, + "CA": ECProvider +} + +def register_provider(country_code: str, provider_cls: Type[WeatherProvider]): + _PROVIDERS[country_code.upper()] = provider_cls + +def get_provider_class(lat: float, lon: float) -> Type[WeatherProvider]: + """ + Determines the appropriate provider class based on location. + """ + try: + # result is a list of dicts: [{'lat': '...', 'lon': '...', 'name': 'City', 'admin1': 'Region', 'cc': 'US'}] + results = rg.search((lat, lon)) + cc = results[0]['cc'].upper() + + if cc in _PROVIDERS: + return _PROVIDERS[cc] + + print(f"Warning: No explicit provider for {cc}, defaulting to generic if possible or erroring.") + raise ValueError(f"No weather provider found for country code: {cc}") + + except Exception as e: + raise e + +def get_provider_instance(CountryCode: str = None, Lat: float = None, Lon: float = None, Config: dict = None) -> WeatherProvider: + if CountryCode: + cls = _PROVIDERS.get(CountryCode.upper()) + if not cls: + raise ValueError(f"Unknown country code: {CountryCode}") + return cls(**(Config or {})) + + if Lat is not None and Lon is not None: + cls = get_provider_class(Lat, Lon) + return cls(**(Config or {})) + + raise ValueError("Must provide either CountryCode or Lat/Lon to select provider.") diff --git a/asl3_wx_announce/provider/nws.py b/asl3_wx_announce/provider/nws.py new file mode 100644 index 0000000..664d8d5 --- /dev/null +++ b/asl3_wx_announce/provider/nws.py @@ -0,0 +1,145 @@ +import requests +from datetime import datetime +from typing import List +from dateutil.parser import parse as parse_date +from ..models import LocationInfo, CurrentConditions, WeatherForecast, WeatherAlert, AlertSeverity +from .base import WeatherProvider + +class NWSProvider(WeatherProvider): + USER_AGENT = "(asl3-wx-announce, contact@example.com)" + + def __init__(self, **kwargs): + self.points_cache = {} + self.extra_zones = kwargs.get('alerts', {}).get('extra_zones', []) + + def _headers(self): + return {"User-Agent": self.USER_AGENT, "Accept": "application/geo+json"} + + def _get_point_metadata(self, lat, lon): + key = f"{lat},{lon}" + if key in self.points_cache: + return self.points_cache[key] + + url = f"https://api.weather.gov/points/{lat},{lon}" + resp = requests.get(url, headers=self._headers()) + resp.raise_for_status() + data = resp.json() + self.points_cache[key] = data['properties'] + return data['properties'] + + def get_location_info(self, lat: float, lon: float) -> LocationInfo: + meta = self._get_point_metadata(lat, lon) + props = meta.get('relativeLocation', {}).get('properties', {}) + return LocationInfo( + latitude=lat, + longitude=lon, + city=props.get('city', 'Unknown'), + region=props.get('state', 'US'), + country_code="US", + timezone=meta.get('timeZone', 'UTC') + ) + + def get_conditions(self, lat: float, lon: float) -> CurrentConditions: + # NWS "current conditions" often requires finding a nearby station first + # For simplicity, we can sometimes pull from the gridpoint "now", but standard practice + # is to hit the stations endpoint. + meta = self._get_point_metadata(lat, lon) + stations_url = meta['observationStations'] + + # Get first station + s_resp = requests.get(stations_url, headers=self._headers()) + s_data = s_resp.json() + station_id = s_data['features'][0]['properties']['stationIdentifier'] + + # Get obs + obs_url = f"https://api.weather.gov/stations/{station_id}/observations/latest" + o_resp = requests.get(obs_url, headers=self._headers()) + props = o_resp.json()['properties'] + + temp_c = props.get('temperature', {}).get('value') + return CurrentConditions( + temperature=temp_c if temp_c is not None else 0.0, + humidity=props.get('relativeHumidity', {}).get('value'), + wind_speed=props.get('windSpeed', {}).get('value'), + wind_direction=str(props.get('windDirection', {}).get('value')), + description=props.get('textDescription', 'Unknown') + ) + + def get_forecast(self, lat: float, lon: float) -> List[WeatherForecast]: + meta = self._get_point_metadata(lat, lon) + forecast_url = meta['forecast'] + + resp = requests.get(forecast_url, headers=self._headers()) + periods = resp.json()['properties']['periods'] + + forecasts = [] + for p in periods[:4]: # Just next few periods + # NWS gives temp in F sometimes, but API usually defaults to F. + # We strictly want models in C, so check unit. + temp = p.get('temperature') + unit = p.get('temperatureUnit') + if unit == 'F': + temp = (temp - 32) * 5.0/9.0 + + summary = p['detailedForecast'] + forecasts.append(WeatherForecast( + period_name=p['name'], + high_temp=temp if p['isDaytime'] else None, + low_temp=temp if not p['isDaytime'] else None, + summary=summary, + precip_probability=p.get('probabilityOfPrecipitation', {}).get('value') + )) + return forecasts + + def get_alerts(self, lat: float, lon: float) -> List[WeatherAlert]: + meta = self._get_point_metadata(lat, lon) + + # Extract zone IDs (e.g., https://api.weather.gov/zones/forecast/MDZ013) + # We want the basename + def get_id(url): + return url.split('/')[-1] if url else None + + zones = set(self.extra_zones) + zones.add(get_id(meta.get('county'))) + zones.add(get_id(meta.get('fireWeatherZone'))) + zones.add(get_id(meta.get('forecastZone'))) + zones.discard(None) # Remove None if any failed + + if not zones: + # Fallback to point if no zones found (unlikely) + url = f"https://api.weather.gov/alerts/active?point={lat},{lon}" + else: + # Join zones: active?zone=MDZ013,MDC031 + zone_str = ",".join(zones) + url = f"https://api.weather.gov/alerts/active?zone={zone_str}" + + resp = requests.get(url, headers=self._headers()) + features = resp.json().get('features', []) + + alerts = [] + for f in features: + props = f['properties'] + + # Map severity + sev_str = props.get('severity', 'Unknown') + severity = AlertSeverity.UNKNOWN + if sev_str == 'Severe': severity = AlertSeverity.WARNING + + event_type = props.get('event', '').upper() + if "WARNING" in event_type: severity = AlertSeverity.WARNING + elif "WATCH" in event_type: severity = AlertSeverity.WATCH + elif "ADVISORY" in event_type: severity = AlertSeverity.ADVISORY + elif "STATEMENT" in event_type: severity = AlertSeverity.ADVISORY + + alerts.append(WeatherAlert( + id=props['id'], + severity=severity, + title=props['event'], + description=props['description'] or "", + instruction=props.get('instruction'), + area_description=props.get('areaDesc', ''), + effective=parse_date(props['effective']), + expires=parse_date(props['expires']) + )) + + return alerts diff --git a/config.yaml.example b/config.yaml.example new file mode 100644 index 0000000..b0748b4 --- /dev/null +++ b/config.yaml.example @@ -0,0 +1,20 @@ +location: + type: auto # or static + # latitude: 45.4215 + # longitude: -75.6972 + # provider: CA # Optional: force US or CA + +voice: + # Example utilizing pico2wave (sudo apt install libttspico-utils) + tts_command: 'pico2wave -w {file} "{text}"' + + nodes: + - "YOUR_NODE_NUMBER_HERE" + +alerts: + check_interval_minutes: 10 + min_severity: "Watch" + # Optional: List of NWS Zones/Counties to monitor in addition to current location + # extra_zones: + # - "VAC001" + # - "MDZ013" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5e39138 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +requests +astral +pydantic +pyyaml +env_canada +reverse_geocoder +numpy