You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
212 lines
8.0 KiB
212 lines
8.0 KiB
#!/usr/bin/env python
|
|
import os
|
|
import logging
|
|
import urllib.request
|
|
import urllib.error
|
|
from urllib.parse import quote
|
|
import tempfile
|
|
import shutil
|
|
import socket
|
|
from time import time
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DOWNLOAD_INTERVAL_PASSWORDS = 300
|
|
DOWNLOAD_INTERVAL_ENCRYPTION = None
|
|
|
|
_last_passwords_download = 0
|
|
_last_passwords_size = 0
|
|
_last_passwords_content = None
|
|
|
|
def resolve_hostname(hostname, timeout=10):
|
|
try:
|
|
old_timeout = socket.getdefaulttimeout()
|
|
socket.setdefaulttimeout(timeout)
|
|
ip = socket.gethostbyname(hostname)
|
|
socket.setdefaulttimeout(old_timeout)
|
|
logger.debug('(SECURITY) Resolved %s to %s', hostname, ip)
|
|
return ip
|
|
except socket.gaierror as e:
|
|
logger.error('(SECURITY) DNS resolution failed for %s: %s', hostname, str(e))
|
|
return None
|
|
except Exception as e:
|
|
logger.error('(SECURITY) Unexpected error resolving %s: %s', hostname, str(e))
|
|
return None
|
|
|
|
def build_download_url(config, filename):
|
|
url_security = config['GLOBAL'].get('URL_SECURITY', '').strip()
|
|
port_security = config['GLOBAL'].get('PORT_SECURITY', '').strip()
|
|
pass_security = config['GLOBAL'].get('PASS_SECURITY', '').strip()
|
|
|
|
if not url_security or not port_security or not pass_security:
|
|
return None, None
|
|
|
|
try:
|
|
socket.inet_aton(url_security)
|
|
host = url_security
|
|
except socket.error:
|
|
host = resolve_hostname(url_security)
|
|
if not host:
|
|
logger.error('(SECURITY) Could not resolve hostname: %s', url_security)
|
|
return None, None
|
|
|
|
url = f"http://{host}:{port_security}/descargar?pass={quote(pass_security, safe='')}&file={filename}"
|
|
return url, url_security
|
|
|
|
def download_file_safely(url, dest_path, timeout=60):
|
|
try:
|
|
temp_fd, temp_path = tempfile.mkstemp()
|
|
os.close(temp_fd)
|
|
|
|
try:
|
|
logger.debug('(SECURITY) Attempting download from: %s', url)
|
|
req = urllib.request.Request(url)
|
|
req.add_header('User-Agent', 'ADN-Systems-DMR/1.0')
|
|
|
|
with urllib.request.urlopen(req, timeout=timeout) as response:
|
|
content = response.read()
|
|
|
|
if len(content) == 0:
|
|
logger.warning('(SECURITY) Downloaded file is empty, keeping existing: %s', dest_path)
|
|
os.unlink(temp_path)
|
|
return False
|
|
|
|
with open(temp_path, 'wb') as f:
|
|
f.write(content)
|
|
|
|
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
|
|
shutil.move(temp_path, dest_path)
|
|
logger.info('(SECURITY) Successfully downloaded: %s (%d bytes)', dest_path, len(content))
|
|
return True
|
|
|
|
except urllib.error.HTTPError as e:
|
|
logger.error('(SECURITY) HTTP error downloading %s: %s (Code: %d)', dest_path, str(e), e.code)
|
|
if os.path.exists(temp_path):
|
|
os.unlink(temp_path)
|
|
return False
|
|
except urllib.error.URLError as e:
|
|
logger.error('(SECURITY) URL error downloading %s: %s', dest_path, str(e.reason))
|
|
if os.path.exists(temp_path):
|
|
os.unlink(temp_path)
|
|
return False
|
|
except socket.timeout:
|
|
logger.error('(SECURITY) Timeout downloading %s', dest_path)
|
|
if os.path.exists(temp_path):
|
|
os.unlink(temp_path)
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error('(SECURITY) Unexpected error downloading %s: %s', dest_path, str(e))
|
|
return False
|
|
|
|
def download_encryption_key(config):
|
|
hash_encrypt = config['GLOBAL'].get('HASH_ENCRYPT', 'encryption_key.secret').strip()
|
|
dest_path = os.path.join('config', hash_encrypt)
|
|
|
|
url, original_host = build_download_url(config, hash_encrypt)
|
|
if not url:
|
|
logger.debug('(SECURITY) Security server not configured, skipping encryption key download')
|
|
return False
|
|
|
|
logger.info('(SECURITY) Downloading encryption key from central server...')
|
|
return download_file_safely(url, dest_path)
|
|
|
|
def download_user_passwords(config, force=False):
|
|
global _last_passwords_download, _last_passwords_size, _last_passwords_content
|
|
|
|
current_time = time()
|
|
if not force and (current_time - _last_passwords_download) < DOWNLOAD_INTERVAL_PASSWORDS:
|
|
return False
|
|
|
|
users_pass = config['GLOBAL'].get('USERS_PASS', 'user_passwords.json').strip()
|
|
dest_path = os.path.join('data', users_pass)
|
|
|
|
url, original_host = build_download_url(config, users_pass)
|
|
if not url:
|
|
logger.debug('(SECURITY) Security server not configured, skipping passwords download')
|
|
return False
|
|
|
|
try:
|
|
logger.debug('(SECURITY) Downloading passwords from: %s', url)
|
|
req = urllib.request.Request(url)
|
|
req.add_header('User-Agent', 'ADN-Systems-DMR/1.0')
|
|
|
|
with urllib.request.urlopen(req, timeout=60) as response:
|
|
new_content = response.read()
|
|
new_size = len(new_content)
|
|
|
|
if new_size == 0:
|
|
logger.warning('(SECURITY) Downloaded passwords file is empty, keeping existing')
|
|
_last_passwords_download = current_time
|
|
return False
|
|
|
|
if _last_passwords_content is not None:
|
|
if new_content == _last_passwords_content:
|
|
logger.debug('(SECURITY) Passwords file unchanged, no update needed')
|
|
_last_passwords_download = current_time
|
|
return False
|
|
|
|
temp_fd, temp_path = tempfile.mkstemp()
|
|
os.close(temp_fd)
|
|
|
|
with open(temp_path, 'wb') as f:
|
|
f.write(new_content)
|
|
|
|
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
|
|
shutil.move(temp_path, dest_path)
|
|
|
|
_last_passwords_content = new_content
|
|
_last_passwords_size = new_size
|
|
_last_passwords_download = current_time
|
|
|
|
logger.info('(SECURITY) Successfully updated passwords file: %s (%d bytes)', dest_path, new_size)
|
|
return True
|
|
|
|
except urllib.error.HTTPError as e:
|
|
logger.error('(SECURITY) HTTP error downloading passwords: %s (Code: %d)', str(e), e.code)
|
|
_last_passwords_download = current_time
|
|
return False
|
|
except urllib.error.URLError as e:
|
|
logger.error('(SECURITY) URL error downloading passwords: %s', str(e.reason))
|
|
_last_passwords_download = current_time
|
|
return False
|
|
except socket.timeout:
|
|
logger.error('(SECURITY) Timeout downloading passwords')
|
|
_last_passwords_download = current_time
|
|
return False
|
|
except Exception as e:
|
|
logger.error('(SECURITY) Unexpected error downloading passwords: %s', str(e))
|
|
_last_passwords_download = current_time
|
|
return False
|
|
|
|
def init_security_downloads(config):
|
|
url_security = config['GLOBAL'].get('URL_SECURITY', '').strip()
|
|
if not url_security:
|
|
logger.info('(SECURITY) Central security server not configured')
|
|
return False
|
|
|
|
port_security = config['GLOBAL'].get('PORT_SECURITY', '').strip()
|
|
|
|
logger.info('(SECURITY) Initializing centralized security downloads...')
|
|
logger.info('(SECURITY) Security server: %s:%s', url_security, port_security)
|
|
|
|
try:
|
|
socket.inet_aton(url_security)
|
|
logger.info('(SECURITY) Using IP address: %s', url_security)
|
|
except socket.error:
|
|
resolved_ip = resolve_hostname(url_security)
|
|
if resolved_ip:
|
|
logger.info('(SECURITY) Resolved hostname %s to IP: %s', url_security, resolved_ip)
|
|
else:
|
|
logger.error('(SECURITY) Failed to resolve hostname: %s', url_security)
|
|
return False
|
|
|
|
download_encryption_key(config)
|
|
|
|
download_user_passwords(config, force=True)
|
|
|
|
return True
|
|
|
|
def periodic_password_download(config):
|
|
download_user_passwords(config)
|