You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ADN-DMR-Peer-Server/hotspot_proxy_self_service.py

685 lines
23 KiB

#!/usr/bin/env python3
###############################################################################
# Copyright (C) 2020 Simon Adlem, G7RZU <g7rzu@gb7fr.org.uk>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
###############################################################################
from hashlib import pbkdf2_hmac
import random
import ipaddress
import os
from datetime import datetime
from time import time
from twisted.internet.protocol import DatagramProtocol
from twisted.internet.defer import inlineCallbacks
from twisted.internet import reactor, task
from setproctitle import setproctitle
from dmr_utils3.utils import int_id
import Pyro5.api
from proxy_db import ProxyDB
# Does anybody read this stuff? There's a PEP somewhere that says I should do this.
__author__ = "Simon Adlem - G7RZU"
__verion__ = "23.10.14"
__copyright__ = "Copyright (c) Simon Adlem, G7RZU 2020,2021,2022,2023"
__credits__ = "Jon Lee, G4TSN; Norman Williams, M6NBP; Christian, OA4DOA"
__license__ = "GNU GPLv3"
__maintainer__ = "Simon Adlem G7RZU"
__email__ = "simon@gb7fr.org.uk"
def IsIPv4Address(ip):
try:
ipaddress.IPv4Address(ip)
return True
except ValueError as errorCode:
pass
return False
def IsIPv6Address(ip):
try:
ipaddress.IPv6Address(ip)
return True
except ValueError as errorCode:
pass
class privHelper:
def __init__(self):
self._netfilterURI = (
"PYRO:netfilterControl@./u:/run/priv_control/priv_control.unixsocket"
)
self._conntrackURI = (
"PYRO:conntrackControl@./u:/run/priv_control/priv_control.unixsocket"
)
def addBL(self, dport, ip):
try:
with Pyro5.api.Proxy(self._netfilterURI) as nf:
nf.blocklistAdd(dport, ip)
except Exception as e:
print("(PrivError) {}".format(e))
def delBL(self, dport, ip):
try:
with Pyro5.api.Proxy(self._netfilterURI) as nf:
nf.blocklistDel(dport, ip)
except Exception as e:
print("(PrivError) {}".format(e))
def blocklistFlush(self):
try:
with Pyro5.api.Proxy(self._netfilterURI) as nf:
nf.blocklistFlush()
except Exception as e:
print("(PrivError) {}".format(e))
def flushCT(self):
try:
with Pyro5.api.Proxy(self._conntrackURI) as ct:
ct.flushUDPTarget(62031)
except Exception as e:
print("(PrivError) {}".format(e))
class Proxy(DatagramProtocol):
def __init__(
self,
Master,
ListenPort,
connTrack,
peerTrack,
blackList,
IPBlackList,
Timeout,
Debug,
ClientInfo,
DestportStart,
DestPortEnd,
privHelper,
rptlTrack,
db_proxy,
selfservice,
):
self.master = Master
self.ListenPort = ListenPort
self.connTrack = connTrack
self.peerTrack = peerTrack
self.timeout = Timeout
self.debug = Debug
self.clientinfo = ClientInfo
self.blackList = blackList
self.IPBlackList = IPBlackList
self.destPortStart = DestportStart
self.destPortEnd = DestPortEnd
self.numPorts = DestPortEnd - DestportStart
self.privHelper = privHelper
self.rptlTrack = rptlTrack
self.db_proxy = db_proxy
self.selfserv = selfservice
def reaper(self, _peer_id):
if self.debug:
print("dead", _peer_id)
if self.clientinfo and _peer_id != b"\xff\xff\xff\xff":
print(
f"{datetime.now().replace(microsecond=0)} Client: ID:{str(int_id(_peer_id)).rjust(9)} IP:{self.peerTrack[_peer_id]['shost'].rjust(15)} Port:{self.peerTrack[_peer_id]['sport']} Removed."
)
self.transport.write(
b"RPTCL" + _peer_id, (self.master, self.peerTrack[_peer_id]["dport"])
)
# Tell client we have closed the session - 3 times, in case they are on a lossy network
self.transport.write(
b"MSTCL",
(self.peerTrack[_peer_id]["shost"], self.peerTrack[_peer_id]["sport"]),
)
self.transport.write(
b"MSTCL",
(self.peerTrack[_peer_id]["shost"], self.peerTrack[_peer_id]["sport"]),
)
self.transport.write(
b"MSTCL",
(self.peerTrack[_peer_id]["shost"], self.peerTrack[_peer_id]["sport"]),
)
self.connTrack[self.peerTrack[_peer_id]["dport"]] = False
if self.selfserv:
self.db_proxy.updt_tbl("log_out", _peer_id)
del self.peerTrack[_peer_id]
def datagramReceived(self, data, addr):
# HomeBrew Protocol Commands
DMRD = b"DMRD"
DMRA = b"DMRA"
MSTCL = b"MSTCL"
MSTNAK = b"MSTNAK"
MSTPONG = b"MSTPONG"
MSTN = b"MSTN"
MSTP = b"MSTP"
MSTC = b"MSTC"
RPTL = b"RPTL"
RPTPING = b"RPTPING"
RPTCL = b"RPTCL"
RPTL = b"RPTL"
RPTACK = b"RPTACK"
RPTK = b"RPTK"
RPTC = b"RPTC"
RPTP = b"RPTP"
RPTA = b"RPTA"
RPTO = b"RPTO"
# Proxy control commands
PRBL = b"PRBL"
# Proxy info commands
PRIN = b"PRIN"
_peer_id = False
host, port = addr
nowtime = time()
Debug = self.debug
if host in self.IPBlackList:
return
# If the packet comes from the master
if host == self.master:
_command = data[:4]
if _command == PRBL:
_peer_id = data[4:8]
_bltime = data[8:].decode("UTF-8")
_bltime = float(_bltime)
try:
self.IPBlackList[self.peerTrack[_peer_id]["shost"]] = _bltime
except KeyError:
return
if self.clientinfo:
print(
"Add to blacklist: host {}. Expire time {}".format(
self.peerTrack[_peer_id]["shost"], _bltime
)
)
if self.privHelper:
print(
"Ask priv_helper to add to iptables: host {}, port {}.".format(
self.peerTrack[_peer_id]["shost"], self.ListenPort
)
)
reactor.callInThread(
self.privHelper.addBL,
self.ListenPort,
self.peerTrack[_peer_id]["shost"],
)
return
if _command == DMRD:
_peer_id = data[11:15]
elif _command == RPTA:
if data[6:10] in self.peerTrack:
_peer_id = data[6:10]
else:
_peer_id = self.connTrack[port]
elif _command == MSTN:
_peer_id = data[6:10]
elif _command == MSTP:
_peer_id = data[7:11]
elif _command == MSTC:
_peer_id = data[5:9]
if self.debug:
print(data)
if _peer_id in self.peerTrack:
self.transport.write(
data,
(
self.peerTrack[_peer_id]["shost"],
self.peerTrack[_peer_id]["sport"],
),
)
# Remove the client after send a MSTN or MSTC packet
if _command in (MSTN, MSTC):
# Give time to the client for a reply to prevent port reassignment
self.peerTrack[_peer_id]["timer"].reset(15)
return
else:
_command = data[:4]
if _command == DMRD: # DMRData -- encapsulated DMR data frame
_peer_id = data[11:15]
elif _command == DMRA: # DMRAlias -- Talker Alias information
_peer_id = data[4:8]
elif _command == RPTL: # RPTLogin -- a repeater wants to login
_peer_id = data[4:8]
# if we have seen more than 50 RPTL packets from this IP since the RPTL tracking table was reset (every 60 secs)
# blacklist IP for 10 minutes
if host not in self.rptlTrack:
self.rptlTrack[host] = 1
else:
self.rptlTrack[host] += 1
if self.rptlTrack[host] > 50:
print("(RPTL) exceeded max: {}".format(self.rptlTrack[host]))
_bltime = nowtime + 600
self.IPBlackList[host] = _bltime
self.rptlTrack.pop(host)
if self.clientinfo:
print(
"(RPTL) Add to blacklist: host {}. Expire time {}".format(
host, _bltime
)
)
if self.privHelper:
print(
"(RPTL) Ask priv_helper to add to iptables: host {}, port {}.".format(
host, self.ListenPort
)
)
reactor.callInThread(
self.privHelper.addBL, self.ListenPort, host
)
return
elif _command == RPTK: # Repeater has answered our login challenge
_peer_id = data[4:8]
elif (
_command == RPTC
): # Repeater is sending it's configuraiton OR disconnecting
if data[:5] == RPTCL: # Disconnect command
_peer_id = data[5:9]
else:
_peer_id = data[4:8] # Configure Command
if self.selfserv and _peer_id in self.peerTrack:
mode = data[97:98].decode()
callsign = data[8:16].rstrip().decode()
self.db_proxy.ins_conf(
int_id(_peer_id), _peer_id, callsign, addr[0], mode
)
# Self Service options will be send 10 sec. after login
self.peerTrack[_peer_id]["opt_timer"] = reactor.callLater(
10, self.login_opt, _peer_id
)
elif _command == RPTO: # options
_peer_id = data[4:8]
if self.selfserv and _peer_id in self.peerTrack:
# Store Self Service password in database
if data[8:].upper().startswith(b"PASS="):
_psswd = data[13:]
if len(_psswd) >= 6:
dk = pbkdf2_hmac("sha256", _psswd, b"ADN", 2000).hex()
self.db_proxy.updt_tbl("psswd", _peer_id, psswd=dk)
self.transport.write(b"".join([RPTACK, _peer_id]), addr)
print(f"Password stored for: {int_id(_peer_id)}")
return
self.db_proxy.updt_tbl("opt_rcvd", _peer_id)
# Options send by peer overrides Self Service options
if self.peerTrack[_peer_id]["opt_timer"].active():
self.peerTrack[_peer_id]["opt_timer"].cancel()
print(f"Options received from: {int_id(_peer_id)}")
elif _command == RPTP: # RPTPing -- peer is pinging us
_peer_id = data[7:11]
else:
return
if _peer_id in self.peerTrack:
_dport = self.peerTrack[_peer_id]["dport"]
self.peerTrack[_peer_id]["sport"] = port
self.peerTrack[_peer_id]["shost"] = host
self.transport.write(data, (self.master, _dport))
self.peerTrack[_peer_id]["timer"].reset(self.timeout)
if self.debug:
print(data)
return
else:
if int_id(_peer_id) in self.blackList:
return
# Make a list with the available ports
_ports_avail = [
port for port in self.connTrack if not self.connTrack[port]
]
if _ports_avail:
_dport = random.choice(_ports_avail)
else:
return
self.connTrack[_dport] = _peer_id
self.peerTrack[_peer_id] = {}
self.peerTrack[_peer_id]["dport"] = _dport
self.peerTrack[_peer_id]["sport"] = port
self.peerTrack[_peer_id]["shost"] = host
self.peerTrack[_peer_id]["timer"] = reactor.callLater(
self.timeout, self.reaper, _peer_id
)
self.transport.write(data, (self.master, _dport))
pripacket = b"".join(
[b"PRIN", host.encode("UTF-8"), b":", str(port).encode("UTF-8")]
)
# Send IP and Port info to server
self.transport.write(pripacket, (self.master, _dport))
if self.clientinfo and _peer_id != b"\xff\xff\xff\xff":
print(
f"{datetime.now().replace(microsecond=0)} New client: ID:{str(int_id(_peer_id)).rjust(9)} IP:{host.rjust(15)} Port:{port}, assigned to port:{_dport}."
)
if self.debug:
print(data)
return
@inlineCallbacks
def login_opt(self, _peer_id):
try:
res = yield db_proxy.slct_opt(_peer_id)
options = res[0][0]
if options:
bytes_pkt = b"".join((b"RPTO", _peer_id, options.encode()))
self.transport.write(
bytes_pkt, (self.master, self.peerTrack[_peer_id]["dport"])
)
print(f"Options sent at login for: {int_id(_peer_id)}, opt: {options}")
except Exception as err:
print(f"login_opt error: {err}")
@inlineCallbacks
def send_opts(self):
try:
results = yield db_proxy.slct_db()
for item in results:
_peer_id, options = item
if _peer_id not in self.peerTrack or not options:
continue
self.db_proxy.updt_tbl("rst_mod", _peer_id)
bytes_pkt = b"".join((b"RPTO", _peer_id, options.encode()))
self.transport.write(
bytes_pkt, (self.master, self.peerTrack[_peer_id]["dport"])
)
print(f"Options update sent for: {int_id(_peer_id)}")
except Exception as err:
print(f"send_opts error: {err}")
def lst_seen(self):
# Update last seen
dmrid_list = [(ite,) for ite in self.peerTrack]
if dmrid_list:
self.db_proxy.updt_lstseen(dmrid_list)
if __name__ == "__main__":
import signal
import configparser
import argparse
import sys
import json
import stat
import functools
print = functools.partial(print, flush=True)
# Set process title early
setproctitle(__file__)
# Change the current directory to the location of the application
os.chdir(os.path.dirname(os.path.realpath(sys.argv[0])))
# CLI argument parser - handles picking up the config file from the command line, and sending a "help" message
parser = argparse.ArgumentParser()
parser.add_argument(
"-c",
"--config",
action="store",
dest="CONFIG_FILE",
help="/full/path/to/config.file (usually adn.cfg)",
)
cli_args = parser.parse_args()
# Ensure we have a path for the config file, if one wasn't specified, then use the execution directory
if not cli_args.CONFIG_FILE:
cli_args.CONFIG_FILE = (
os.path.dirname(os.path.abspath(__file__)) + "/config/adn.cfg"
)
_config_file = cli_args.CONFIG_FILE
config = configparser.ConfigParser()
if not config.read(_config_file):
print(
"Configuration file '"
+ _config_file
+ "' is not a valid configuration file!"
)
try:
Master = config.get("PROXY", "Master")
ListenPort = config.getint("PROXY", "ListenPort")
ListenIP = config.get("PROXY", "ListenIP")
DestportStart = config.getint("PROXY", "DestportStart")
DestPortEnd = config.getint("PROXY", "DestPortEnd")
Timeout = config.getint("PROXY", "Timeout")
Stats = config.getboolean("PROXY", "Stats")
Debug = config.getboolean("PROXY", "Debug")
ClientInfo = config.getboolean("PROXY", "ClientInfo")
BlackList = json.loads(config.get("PROXY", "BlackList"))
IPBlackList = json.loads(config.get("PROXY", "IPBlackList"))
# Self Service
use_selfservice = config.getboolean("SELF SERVICE", "use_selfservice")
db_server = config.get("SELF SERVICE", "server")
db_username = config.get("SELF SERVICE", "username")
db_password = config.get("SELF SERVICE", "password")
db_name = config.get("SELF SERVICE", "db_name")
db_port = config.getint("SELF SERVICE", "port")
except configparser.Error as err:
print("Error processing configuration file -- {}".format(err))
print("Using default config")
# *** CONFIG HERE ***
Master = "127.0.0.1"
ListenPort = 62031
#'' = all IPv4, '::' = all IPv4 and IPv6 (Dual Stack)
ListenIP = ""
DestportStart = 56400
DestPortEnd = 56500
Timeout = 30
Stats = False
Debug = False
ClientInfo = False
BlackList = [1234567]
# e.g. {10.0.0.1: 0, 10.0.0.2: 0}
IPBlackList = {}
# Self Service database configuration
use_selfservice = True
db_server = "localhost"
db_username = "root"
db_password = ""
db_name = "test"
db_port = 3306
# *******************
CONNTRACK = {}
PEERTRACK = {}
RPTLTRACK = {}
PRIV_HELPER = None
# Set up the signal handler
def sig_handler(_signal, _frame):
print(
"(GLOBAL) SHUTDOWN: PROXY IS TERMINATING WITH SIGNAL {}".format(
str(_signal)
)
)
reactor.stop()
# Install signal handlers
signal.signal(signal.SIGTERM, sig_handler)
signal.signal(signal.SIGINT, sig_handler)
# readState()
# If IPv6 is enabled by enivornment variable...
if (
ListenIP == ""
and "FDPROXY_IPV6" in os.environ
and bool(os.environ["FDPROXY_IPV6"])
):
ListenIP = "::"
# Override static config from Environment
if "FDPROXY_STATS" in os.environ:
Stats = bool(os.environ["FDPROXY_STATS"])
# if 'FDPROXY_DEBUG' in os.environ:
# Debug = bool(os.environ['FDPROXY_DEBUG'])
if "FDPROXY_CLIENTINFO" in os.environ:
ClientInfo = bool(os.environ["FDPROXY_CLIENTINFO"])
if "FDPROXY_LISTENPORT" in os.environ:
ListenPort = int(os.environ["FDPROXY_LISTENPORT"])
unixSocket = "/run/priv_control/priv_control.unixsocket"
if os.path.exists(unixSocket) and stat.S_ISSOCK(os.stat(unixSocket).st_mode):
print("(PRIV) Found UNIX socket. Enabling priv helper")
PRIV_HELPER = privHelper()
print("(PRIV) flush conntrack")
PRIV_HELPER.flushCT()
print("(PRIV) flush blocklist")
PRIV_HELPER.blocklistFlush()
for port in range(DestportStart, DestPortEnd + 1, 1):
CONNTRACK[port] = False
# If we are listening IPv6 and Master is an IPv4 IPv4Address
# IPv6ify the address.
if ListenIP == "::" and IsIPv4Address(Master):
Master = "::ffff:" + Master
if use_selfservice:
# Create an instance of db_proxy and them pass it to the proxy
db_proxy = ProxyDB(db_server, db_username, db_password, db_name, db_port)
db_proxy.test_db(reactor)
else:
db_proxy = None
srv_proxy = Proxy(
Master,
ListenPort,
CONNTRACK,
PEERTRACK,
BlackList,
IPBlackList,
Timeout,
Debug,
ClientInfo,
DestportStart,
DestPortEnd,
PRIV_HELPER,
RPTLTRACK,
db_proxy,
use_selfservice,
)
reactor.listenUDP(ListenPort, srv_proxy, interface=ListenIP)
def loopingErrHandle(failure):
print(
"(GLOBAL) STOPPING REACTOR TO AVOID MEMORY LEAK: Unhandled error innowtimed loop.\n {}".format(
failure
)
)
reactor.stop()
if use_selfservice:
# Options loop
opts_loop = task.LoopingCall(srv_proxy.send_opts)
opts_loop.start(10).addErrback(loopingErrHandle)
# Clean table every hour
cl_tbl = task.LoopingCall(db_proxy.clean_tbl)
cl_tbl.start(3600).addErrback(loopingErrHandle)
# Update last seen loop
ls_loop = task.LoopingCall(srv_proxy.lst_seen)
ls_loop.start(120).addErrback(loopingErrHandle)
def stats():
count = 0
nowtime = time()
for port in CONNTRACK:
if CONNTRACK[port]:
count = count + 1
totalPorts = DestPortEnd - DestportStart
freePorts = totalPorts - count
print(
"{} ports out of {} in use ({} free)".format(count, totalPorts, freePorts)
)
def blackListTrimmer():
_timenow = time()
_dellist = []
for entry in IPBlackList:
deletetime = IPBlackList[entry]
if deletetime and deletetime < _timenow:
_dellist.append(entry)
for delete in _dellist:
IPBlackList.pop(delete)
if ClientInfo:
print("Remove dynamic blacklist entry for {}".format(delete))
if PRIV_HELPER:
print(
"Ask priv helper to remove blacklist entry for {} from iptables".format(
delete
)
)
reactor.callInThread(PRIV_HELPER.delBL, ListenPort, delete)
def rptlTrimmer():
RPTLTRACK.clear()
print("Purge RPTL table")
if Stats == True:
stats_task = task.LoopingCall(stats)
statsa = stats_task.start(30)
statsa.addErrback(loopingErrHandle)
blacklist_task = task.LoopingCall(blackListTrimmer)
blacklista = blacklist_task.start(15)
blacklista.addErrback(loopingErrHandle)
rptlTrimmer_task = task.LoopingCall(rptlTrimmer)
rptlTrimmera = rptlTrimmer_task.start(60)
rptlTrimmera.addErrback(loopingErrHandle)
reactor.run()

Powered by TurnKey Linux.