You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
46 lines
1.7 KiB
46 lines
1.7 KiB
from typing import Tuple, Dict, Optional
|
|
import json
|
|
import logging
|
|
import socket
|
|
|
|
class LocationService:
|
|
def __init__(self, config: Dict):
|
|
self.config = config
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def get_coordinates(self) -> Tuple[float, float]:
|
|
"""
|
|
Returns (lat, lon).
|
|
Tries GPS first if configured, then falls back to static config.
|
|
"""
|
|
if self.config.get('location', {}).get('type') == 'auto':
|
|
try:
|
|
return self._read_gpsd()
|
|
except Exception as e:
|
|
self.logger.warning(f"GPS read failed: {e}. Falling back to static.")
|
|
|
|
# Static fallback
|
|
lat = self.config.get('location', {}).get('latitude')
|
|
lon = self.config.get('location', {}).get('longitude')
|
|
if lat is None or lon is None:
|
|
raise ValueError("No valid location coordinates found in config or GPS.")
|
|
return float(lat), float(lon)
|
|
|
|
def _read_gpsd(self) -> Tuple[float, float]:
|
|
# Simple socket reader for GPSD standard port 2947
|
|
# We start the WATCH command and wait for a TPV report
|
|
try:
|
|
with socket.create_connection(('localhost', 2947), timeout=2) as sock:
|
|
sock.sendall(b'?WATCH={"enable":true,"json":true}\n')
|
|
fp = sock.makefile()
|
|
for line in fp:
|
|
data = json.loads(line)
|
|
if data.get('class') == 'TPV':
|
|
lat = data.get('lat')
|
|
lon = data.get('lon')
|
|
if lat and lon:
|
|
return lat, lon
|
|
except Exception as e:
|
|
raise e
|
|
raise RuntimeError("No TPV data received from GPSD")
|