You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
dvmhost/tools/dvmcfggen/config_manager.py

274 lines
10 KiB

#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0-only
#/**
#* Digital Voice Modem - Host Configuration Generator
#* GPLv2 Open Source. Use is subject to license terms.
#* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
#*
#*/
#/*
#* Copyright (C) 2026 by Bryan Biedenkapp N2PLL
#*
#* This program is free software; you can redistribute it and/or modify
#* it under the terms of the GNU General Public License as published by
#* the Free Software Foundation; either version 2 of the License, or
#* (at your option) any later version.
#*
#* This program is distributed in the hope that it will be useful,
#* but WITHOUT ANY WARRANTY; without even the implied warranty of
#* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#* GNU General Public License for more details.
#*
#* You should have received a copy of the GNU General Public License
#* along with this program; if not, write to the Free Software
#* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
#*/
"""
dvmcfggen - DVMHost Configuration Generator
Core module for managing DVMHost configuration files
"""
import yaml
import copy
from pathlib import Path
from typing import Dict, Any, Optional, List
import re
class ConfigValidator:
"""Validates DVMHost configuration values"""
@staticmethod
def validate_ip_address(ip: str) -> bool:
"""Validate IP address format"""
if ip in ['0.0.0.0', 'localhost']:
return True
pattern = r'^(\d{1,3}\.){3}\d{1,3}$'
if not re.match(pattern, ip):
return False
parts = ip.split('.')
return all(0 <= int(part) <= 255 for part in parts)
@staticmethod
def validate_port(port: int) -> bool:
"""Validate port number"""
return 1 <= port <= 65535
@staticmethod
def validate_hex_key(key: str, required_length: int) -> bool:
"""Validate hexadecimal key"""
if len(key) != required_length:
return False
return all(c in '0123456789ABCDEFabcdef' for c in key)
@staticmethod
def validate_range(value: int, min_val: int, max_val: int) -> bool:
"""Validate value is within range"""
return min_val <= value <= max_val
@staticmethod
def validate_identity(identity: str) -> bool:
"""Validate system identity format"""
return len(identity) > 0 and len(identity) <= 32
@staticmethod
def validate_callsign(callsign: str) -> bool:
"""Validate callsign format"""
return len(callsign) > 0 and len(callsign) <= 16
class DVMConfig:
"""DVMHost configuration manager"""
def __init__(self, config_path: Optional[Path] = None):
"""
Initialize configuration manager
Args:
config_path: Path to existing config file to load
"""
self.config_path = config_path
self.config: Dict[str, Any] = {}
self.validator = ConfigValidator()
if config_path and config_path.exists():
self.load(config_path)
def load(self, config_path: Path) -> None:
"""Load configuration from YAML file"""
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
self.config_path = config_path
def save(self, output_path: Optional[Path] = None) -> None:
"""Save configuration to YAML file"""
path = output_path or self.config_path
if not path:
raise ValueError("No output path specified")
# Create backup if file exists
if path.exists():
backup_path = path.with_suffix(path.suffix + '.bak')
path.rename(backup_path)
with open(path, 'w') as f:
yaml.dump(self.config, f, default_flow_style=False, sort_keys=False)
def get(self, key_path: str, default: Any = None) -> Any:
"""
Get configuration value using dot notation
Args:
key_path: Path to value (e.g., 'network.id')
default: Default value if not found
"""
keys = key_path.split('.')
value = self.config
for key in keys:
if isinstance(value, dict) and key in value:
value = value[key]
else:
return default
return value
def set(self, key_path: str, value: Any) -> None:
"""
Set configuration value using dot notation
Args:
key_path: Path to value (e.g., 'network.id')
value: Value to set
"""
keys = key_path.split('.')
config = self.config
# Navigate to the parent dict
for key in keys[:-1]:
if key not in config:
config[key] = {}
config = config[key]
# Set the value
config[keys[-1]] = value
def validate(self) -> List[str]:
"""
Validate configuration
Returns:
List of error messages (empty if valid)
"""
errors = []
# Network validation
if self.get('network.enable'):
if not self.validator.validate_ip_address(self.get('network.address', '')):
errors.append("Invalid network.address")
if not self.validator.validate_port(self.get('network.port', 0)):
errors.append("Invalid network.port")
if self.get('network.encrypted'):
psk = self.get('network.presharedKey', '')
if not self.validator.validate_hex_key(psk, 64):
errors.append("Invalid network.presharedKey (must be 64 hex characters)")
# RPC validation
rpc_addr = self.get('network.rpcAddress', '')
if rpc_addr and not self.validator.validate_ip_address(rpc_addr):
errors.append("Invalid network.rpcAddress")
rpc_port = self.get('network.rpcPort', 0)
if rpc_port and not self.validator.validate_port(rpc_port):
errors.append("Invalid network.rpcPort")
# REST API validation
if self.get('network.restEnable'):
rest_addr = self.get('network.restAddress', '')
if not self.validator.validate_ip_address(rest_addr):
errors.append("Invalid network.restAddress")
rest_port = self.get('network.restPort', 0)
if not self.validator.validate_port(rest_port):
errors.append("Invalid network.restPort")
# System validation
identity = self.get('system.identity', '')
if not self.validator.validate_identity(identity):
errors.append("Invalid system.identity")
# Color code / NAC / RAN validation
color_code = self.get('system.config.colorCode', 1)
if not self.validator.validate_range(color_code, 0, 15):
errors.append("Invalid system.config.colorCode (must be 0-15)")
nac = self.get('system.config.nac', 0)
if not self.validator.validate_range(nac, 0, 0xFFF):
errors.append("Invalid system.config.nac (must be 0-4095)")
ran = self.get('system.config.ran', 0)
if not self.validator.validate_range(ran, 0, 63):
errors.append("Invalid system.config.ran (must be 0-63)")
# CW ID validation
if self.get('system.cwId.enable'):
callsign = self.get('system.cwId.callsign', '')
if not self.validator.validate_callsign(callsign):
errors.append("Invalid system.cwId.callsign")
# Modem validation
rx_level = self.get('system.modem.rxLevel', 0)
if not self.validator.validate_range(rx_level, 0, 100):
errors.append("Invalid system.modem.rxLevel (must be 0-100)")
tx_level = self.get('system.modem.txLevel', 0)
if not self.validator.validate_range(tx_level, 0, 100):
errors.append("Invalid system.modem.txLevel (must be 0-100)")
# LLA key validation
lla_key = self.get('system.config.secure.key', '')
if lla_key and not self.validator.validate_hex_key(lla_key, 32):
errors.append("Invalid system.config.secure.key (must be 32 hex characters)")
return errors
def get_summary(self) -> Dict[str, Any]:
"""Get configuration summary"""
return {
'identity': self.get('system.identity', 'UNKNOWN'),
'peer_id': self.get('network.id', 0),
'fne_address': self.get('network.address', 'N/A'),
'fne_port': self.get('network.port', 0),
'rpc_password': self.get('network.rpcPassword', 'N/A'),
'rest_enabled': self.get('network.restEnable', False),
'rest_password': self.get('network.restPassword', 'N/A'),
'protocols': {
'dmr': self.get('protocols.dmr.enable', False),
'p25': self.get('protocols.p25.enable', False),
'nxdn': self.get('protocols.nxdn.enable', False),
},
'color_code': self.get('system.config.colorCode', 1),
'nac': self.get('system.config.nac', 0x293),
'site_id': self.get('system.config.siteId', 1),
'is_control': self.get('protocols.p25.control.enable', False) or
self.get('protocols.dmr.control.enable', False),
'modem_type': self.get('system.modem.protocol.type', 'null'),
'modem_mode': self.get('system.modem.protocol.mode', 'air'),
'modem_port': self.get('system.modem.protocol.uart.port', 'N/A'),
'mode': 'Duplex' if self.get('system.duplex', True) else 'Simplex',
'channel_id': self.get('system.config.channelId', 0),
'channel_no': self.get('system.config.channelNo', 0),
'tx_frequency': self.get('system.config.txFrequency', 0),
'rx_frequency': self.get('system.config.rxFrequency', 0),
}
if __name__ == '__main__':
# Quick test
config = DVMConfig()
config.set('system.identity', 'TEST001')
config.set('network.id', 100001)
print("Config manager initialized successfully")

Powered by TurnKey Linux.