diff --git a/hotspot_proxy_self_service.py b/hotspot_proxy_self_service.py new file mode 100644 index 0000000..86d971d --- /dev/null +++ b/hotspot_proxy_self_service.py @@ -0,0 +1,684 @@ +#!/usr/bin/env python3 +############################################################################### +# Copyright (C) 2020 Simon Adlem, G7RZU +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +############################################################################### +from hashlib import pbkdf2_hmac +import random +import ipaddress +import os +from datetime import datetime +from time import time + +from twisted.internet.protocol import DatagramProtocol +from twisted.internet.defer import inlineCallbacks +from twisted.internet import reactor, task +from setproctitle import setproctitle +from dmr_utils3.utils import int_id +import Pyro5.api +from proxy_db import ProxyDB + +# Does anybody read this stuff? There's a PEP somewhere that says I should do this. +__author__ = "Simon Adlem - G7RZU" +__verion__ = "23.10.14" +__copyright__ = "Copyright (c) Simon Adlem, G7RZU 2020,2021,2022,2023" +__credits__ = "Jon Lee, G4TSN; Norman Williams, M6NBP; Christian, OA4DOA" +__license__ = "GNU GPLv3" +__maintainer__ = "Simon Adlem G7RZU" +__email__ = "simon@gb7fr.org.uk" + + +def IsIPv4Address(ip): + try: + ipaddress.IPv4Address(ip) + return True + except ValueError as errorCode: + pass + return False + + +def IsIPv6Address(ip): + try: + ipaddress.IPv6Address(ip) + return True + except ValueError as errorCode: + pass + + +class privHelper: + def __init__(self): + self._netfilterURI = ( + "PYRO:netfilterControl@./u:/run/priv_control/priv_control.unixsocket" + ) + self._conntrackURI = ( + "PYRO:conntrackControl@./u:/run/priv_control/priv_control.unixsocket" + ) + + def addBL(self, dport, ip): + try: + with Pyro5.api.Proxy(self._netfilterURI) as nf: + nf.blocklistAdd(dport, ip) + except Exception as e: + print("(PrivError) {}".format(e)) + + def delBL(self, dport, ip): + try: + with Pyro5.api.Proxy(self._netfilterURI) as nf: + nf.blocklistDel(dport, ip) + except Exception as e: + print("(PrivError) {}".format(e)) + + def blocklistFlush(self): + try: + with Pyro5.api.Proxy(self._netfilterURI) as nf: + nf.blocklistFlush() + except Exception as e: + print("(PrivError) {}".format(e)) + + def flushCT(self): + try: + with Pyro5.api.Proxy(self._conntrackURI) as ct: + ct.flushUDPTarget(62031) + except Exception as e: + print("(PrivError) {}".format(e)) + + +class Proxy(DatagramProtocol): + + def __init__( + self, + Master, + ListenPort, + connTrack, + peerTrack, + blackList, + IPBlackList, + Timeout, + Debug, + ClientInfo, + DestportStart, + DestPortEnd, + privHelper, + rptlTrack, + db_proxy, + selfservice, + ): + self.master = Master + self.ListenPort = ListenPort + self.connTrack = connTrack + self.peerTrack = peerTrack + self.timeout = Timeout + self.debug = Debug + self.clientinfo = ClientInfo + self.blackList = blackList + self.IPBlackList = IPBlackList + self.destPortStart = DestportStart + self.destPortEnd = DestPortEnd + self.numPorts = DestPortEnd - DestportStart + self.privHelper = privHelper + self.rptlTrack = rptlTrack + self.db_proxy = db_proxy + self.selfserv = selfservice + + def reaper(self, _peer_id): + if self.debug: + print("dead", _peer_id) + if self.clientinfo and _peer_id != b"\xff\xff\xff\xff": + print( + f"{datetime.now().replace(microsecond=0)} Client: ID:{str(int_id(_peer_id)).rjust(9)} IP:{self.peerTrack[_peer_id]['shost'].rjust(15)} Port:{self.peerTrack[_peer_id]['sport']} Removed." + ) + self.transport.write( + b"RPTCL" + _peer_id, (self.master, self.peerTrack[_peer_id]["dport"]) + ) + # Tell client we have closed the session - 3 times, in case they are on a lossy network + self.transport.write( + b"MSTCL", + (self.peerTrack[_peer_id]["shost"], self.peerTrack[_peer_id]["sport"]), + ) + self.transport.write( + b"MSTCL", + (self.peerTrack[_peer_id]["shost"], self.peerTrack[_peer_id]["sport"]), + ) + self.transport.write( + b"MSTCL", + (self.peerTrack[_peer_id]["shost"], self.peerTrack[_peer_id]["sport"]), + ) + self.connTrack[self.peerTrack[_peer_id]["dport"]] = False + if self.selfserv: + self.db_proxy.updt_tbl("log_out", _peer_id) + del self.peerTrack[_peer_id] + + def datagramReceived(self, data, addr): + + # HomeBrew Protocol Commands + DMRD = b"DMRD" + DMRA = b"DMRA" + MSTCL = b"MSTCL" + MSTNAK = b"MSTNAK" + MSTPONG = b"MSTPONG" + MSTN = b"MSTN" + MSTP = b"MSTP" + MSTC = b"MSTC" + RPTL = b"RPTL" + RPTPING = b"RPTPING" + RPTCL = b"RPTCL" + RPTL = b"RPTL" + RPTACK = b"RPTACK" + RPTK = b"RPTK" + RPTC = b"RPTC" + RPTP = b"RPTP" + RPTA = b"RPTA" + RPTO = b"RPTO" + + # Proxy control commands + PRBL = b"PRBL" + + # Proxy info commands + PRIN = b"PRIN" + + _peer_id = False + + host, port = addr + + nowtime = time() + + Debug = self.debug + + if host in self.IPBlackList: + return + + # If the packet comes from the master + if host == self.master: + _command = data[:4] + + if _command == PRBL: + _peer_id = data[4:8] + _bltime = data[8:].decode("UTF-8") + _bltime = float(_bltime) + try: + self.IPBlackList[self.peerTrack[_peer_id]["shost"]] = _bltime + except KeyError: + return + if self.clientinfo: + print( + "Add to blacklist: host {}. Expire time {}".format( + self.peerTrack[_peer_id]["shost"], _bltime + ) + ) + if self.privHelper: + print( + "Ask priv_helper to add to iptables: host {}, port {}.".format( + self.peerTrack[_peer_id]["shost"], self.ListenPort + ) + ) + reactor.callInThread( + self.privHelper.addBL, + self.ListenPort, + self.peerTrack[_peer_id]["shost"], + ) + return + + if _command == DMRD: + _peer_id = data[11:15] + elif _command == RPTA: + if data[6:10] in self.peerTrack: + _peer_id = data[6:10] + else: + _peer_id = self.connTrack[port] + elif _command == MSTN: + _peer_id = data[6:10] + elif _command == MSTP: + _peer_id = data[7:11] + elif _command == MSTC: + _peer_id = data[5:9] + + if self.debug: + print(data) + if _peer_id in self.peerTrack: + self.transport.write( + data, + ( + self.peerTrack[_peer_id]["shost"], + self.peerTrack[_peer_id]["sport"], + ), + ) + # Remove the client after send a MSTN or MSTC packet + if _command in (MSTN, MSTC): + # Give time to the client for a reply to prevent port reassignment + self.peerTrack[_peer_id]["timer"].reset(15) + + return + + else: + _command = data[:4] + + if _command == DMRD: # DMRData -- encapsulated DMR data frame + _peer_id = data[11:15] + elif _command == DMRA: # DMRAlias -- Talker Alias information + _peer_id = data[4:8] + elif _command == RPTL: # RPTLogin -- a repeater wants to login + _peer_id = data[4:8] + + # if we have seen more than 50 RPTL packets from this IP since the RPTL tracking table was reset (every 60 secs) + # blacklist IP for 10 minutes + if host not in self.rptlTrack: + self.rptlTrack[host] = 1 + else: + self.rptlTrack[host] += 1 + + if self.rptlTrack[host] > 50: + print("(RPTL) exceeded max: {}".format(self.rptlTrack[host])) + _bltime = nowtime + 600 + self.IPBlackList[host] = _bltime + self.rptlTrack.pop(host) + + if self.clientinfo: + print( + "(RPTL) Add to blacklist: host {}. Expire time {}".format( + host, _bltime + ) + ) + if self.privHelper: + print( + "(RPTL) Ask priv_helper to add to iptables: host {}, port {}.".format( + host, self.ListenPort + ) + ) + reactor.callInThread( + self.privHelper.addBL, self.ListenPort, host + ) + return + + elif _command == RPTK: # Repeater has answered our login challenge + _peer_id = data[4:8] + elif ( + _command == RPTC + ): # Repeater is sending it's configuraiton OR disconnecting + if data[:5] == RPTCL: # Disconnect command + _peer_id = data[5:9] + else: + _peer_id = data[4:8] # Configure Command + if self.selfserv and _peer_id in self.peerTrack: + mode = data[97:98].decode() + callsign = data[8:16].rstrip().decode() + self.db_proxy.ins_conf( + int_id(_peer_id), _peer_id, callsign, addr[0], mode + ) + # Self Service options will be send 10 sec. after login + self.peerTrack[_peer_id]["opt_timer"] = reactor.callLater( + 10, self.login_opt, _peer_id + ) + + elif _command == RPTO: # options + _peer_id = data[4:8] + if self.selfserv and _peer_id in self.peerTrack: + # Store Self Service password in database + if data[8:].upper().startswith(b"PASS="): + _psswd = data[13:] + if len(_psswd) >= 6: + dk = pbkdf2_hmac("sha256", _psswd, b"FreeDMR", 2000).hex() + self.db_proxy.updt_tbl("psswd", _peer_id, psswd=dk) + self.transport.write(b"".join([RPTACK, _peer_id]), addr) + print(f"Password stored for: {int_id(_peer_id)}") + return + self.db_proxy.updt_tbl("opt_rcvd", _peer_id) + # Options send by peer overrides Self Service options + if self.peerTrack[_peer_id]["opt_timer"].active(): + self.peerTrack[_peer_id]["opt_timer"].cancel() + print(f"Options received from: {int_id(_peer_id)}") + + elif _command == RPTP: # RPTPing -- peer is pinging us + _peer_id = data[7:11] + else: + return + + if _peer_id in self.peerTrack: + _dport = self.peerTrack[_peer_id]["dport"] + self.peerTrack[_peer_id]["sport"] = port + self.peerTrack[_peer_id]["shost"] = host + self.transport.write(data, (self.master, _dport)) + self.peerTrack[_peer_id]["timer"].reset(self.timeout) + if self.debug: + print(data) + return + + else: + if int_id(_peer_id) in self.blackList: + return + # Make a list with the available ports + _ports_avail = [ + port for port in self.connTrack if not self.connTrack[port] + ] + if _ports_avail: + _dport = random.choice(_ports_avail) + else: + return + self.connTrack[_dport] = _peer_id + self.peerTrack[_peer_id] = {} + self.peerTrack[_peer_id]["dport"] = _dport + self.peerTrack[_peer_id]["sport"] = port + self.peerTrack[_peer_id]["shost"] = host + self.peerTrack[_peer_id]["timer"] = reactor.callLater( + self.timeout, self.reaper, _peer_id + ) + self.transport.write(data, (self.master, _dport)) + pripacket = b"".join( + [b"PRIN", host.encode("UTF-8"), b":", str(port).encode("UTF-8")] + ) + # Send IP and Port info to server + self.transport.write(pripacket, (self.master, _dport)) + + if self.clientinfo and _peer_id != b"\xff\xff\xff\xff": + print( + f"{datetime.now().replace(microsecond=0)} New client: ID:{str(int_id(_peer_id)).rjust(9)} IP:{host.rjust(15)} Port:{port}, assigned to port:{_dport}." + ) + if self.debug: + print(data) + return + + @inlineCallbacks + def login_opt(self, _peer_id): + try: + res = yield db_proxy.slct_opt(_peer_id) + options = res[0][0] + if options: + bytes_pkt = b"".join((b"RPTO", _peer_id, options.encode())) + self.transport.write( + bytes_pkt, (self.master, self.peerTrack[_peer_id]["dport"]) + ) + print(f"Options sent at login for: {int_id(_peer_id)}, opt: {options}") + + except Exception as err: + print(f"login_opt error: {err}") + + @inlineCallbacks + def send_opts(self): + try: + results = yield db_proxy.slct_db() + for item in results: + _peer_id, options = item + if _peer_id not in self.peerTrack or not options: + continue + self.db_proxy.updt_tbl("rst_mod", _peer_id) + bytes_pkt = b"".join((b"RPTO", _peer_id, options.encode())) + self.transport.write( + bytes_pkt, (self.master, self.peerTrack[_peer_id]["dport"]) + ) + print(f"Options update sent for: {int_id(_peer_id)}") + + except Exception as err: + print(f"send_opts error: {err}") + + def lst_seen(self): + # Update last seen + dmrid_list = [(ite,) for ite in self.peerTrack] + if dmrid_list: + self.db_proxy.updt_lstseen(dmrid_list) + + +if __name__ == "__main__": + + import signal + import configparser + import argparse + import sys + import json + import stat + import functools + + print = functools.partial(print, flush=True) + + # Set process title early + setproctitle(__file__) + + # Change the current directory to the location of the application + os.chdir(os.path.dirname(os.path.realpath(sys.argv[0]))) + + # CLI argument parser - handles picking up the config file from the command line, and sending a "help" message + parser = argparse.ArgumentParser() + parser.add_argument( + "-c", + "--config", + action="store", + dest="CONFIG_FILE", + help="/full/path/to/config.file (usually freedmr.cfg)", + ) + cli_args = parser.parse_args() + + # Ensure we have a path for the config file, if one wasn't specified, then use the execution directory + if not cli_args.CONFIG_FILE: + cli_args.CONFIG_FILE = ( + os.path.dirname(os.path.abspath(__file__)) + "/config/adn.cfg" + ) + + _config_file = cli_args.CONFIG_FILE + + config = configparser.ConfigParser() + + if not config.read(_config_file): + print( + "Configuration file '" + + _config_file + + "' is not a valid configuration file!" + ) + + try: + + Master = config.get("PROXY", "Master") + ListenPort = config.getint("PROXY", "ListenPort") + ListenIP = config.get("PROXY", "ListenIP") + DestportStart = config.getint("PROXY", "DestportStart") + DestPortEnd = config.getint("PROXY", "DestPortEnd") + Timeout = config.getint("PROXY", "Timeout") + Stats = config.getboolean("PROXY", "Stats") + Debug = config.getboolean("PROXY", "Debug") + ClientInfo = config.getboolean("PROXY", "ClientInfo") + BlackList = json.loads(config.get("PROXY", "BlackList")) + IPBlackList = json.loads(config.get("PROXY", "IPBlackList")) + # Self Service + use_selfservice = config.getboolean("SELF SERVICE", "use_selfservice") + db_server = config.get("SELF SERVICE", "server") + db_username = config.get("SELF SERVICE", "username") + db_password = config.get("SELF SERVICE", "password") + db_name = config.get("SELF SERVICE", "db_name") + db_port = config.getint("SELF SERVICE", "port") + + except configparser.Error as err: + print("Error processing configuration file -- {}".format(err)) + + print("Using default config") + # *** CONFIG HERE *** + + Master = "127.0.0.1" + ListenPort = 62031 + #'' = all IPv4, '::' = all IPv4 and IPv6 (Dual Stack) + ListenIP = "" + DestportStart = 56400 + DestPortEnd = 56500 + Timeout = 30 + Stats = False + Debug = False + ClientInfo = False + BlackList = [1234567] + # e.g. {10.0.0.1: 0, 10.0.0.2: 0} + IPBlackList = {} + + # Self Service database configuration + use_selfservice = True + db_server = "localhost" + db_username = "root" + db_password = "" + db_name = "test" + db_port = 3306 + + # ******************* + + CONNTRACK = {} + PEERTRACK = {} + RPTLTRACK = {} + PRIV_HELPER = None + + # Set up the signal handler + def sig_handler(_signal, _frame): + print( + "(GLOBAL) SHUTDOWN: PROXY IS TERMINATING WITH SIGNAL {}".format( + str(_signal) + ) + ) + reactor.stop() + + # Install signal handlers + signal.signal(signal.SIGTERM, sig_handler) + signal.signal(signal.SIGINT, sig_handler) + + # readState() + + # If IPv6 is enabled by enivornment variable... + if ( + ListenIP == "" + and "FDPROXY_IPV6" in os.environ + and bool(os.environ["FDPROXY_IPV6"]) + ): + ListenIP = "::" + + # Override static config from Environment + if "FDPROXY_STATS" in os.environ: + Stats = bool(os.environ["FDPROXY_STATS"]) + # if 'FDPROXY_DEBUG' in os.environ: + # Debug = bool(os.environ['FDPROXY_DEBUG']) + if "FDPROXY_CLIENTINFO" in os.environ: + ClientInfo = bool(os.environ["FDPROXY_CLIENTINFO"]) + if "FDPROXY_LISTENPORT" in os.environ: + ListenPort = int(os.environ["FDPROXY_LISTENPORT"]) + + unixSocket = "/run/priv_control/priv_control.unixsocket" + + if os.path.exists(unixSocket) and stat.S_ISSOCK(os.stat(unixSocket).st_mode): + print("(PRIV) Found UNIX socket. Enabling priv helper") + PRIV_HELPER = privHelper() + print("(PRIV) flush conntrack") + PRIV_HELPER.flushCT() + print("(PRIV) flush blocklist") + PRIV_HELPER.blocklistFlush() + + for port in range(DestportStart, DestPortEnd + 1, 1): + CONNTRACK[port] = False + + # If we are listening IPv6 and Master is an IPv4 IPv4Address + # IPv6ify the address. + if ListenIP == "::" and IsIPv4Address(Master): + Master = "::ffff:" + Master + + if use_selfservice: + # Create an instance of db_proxy and them pass it to the proxy + db_proxy = ProxyDB(db_server, db_username, db_password, db_name, db_port) + db_proxy.test_db(reactor) + else: + db_proxy = None + + srv_proxy = Proxy( + Master, + ListenPort, + CONNTRACK, + PEERTRACK, + BlackList, + IPBlackList, + Timeout, + Debug, + ClientInfo, + DestportStart, + DestPortEnd, + PRIV_HELPER, + RPTLTRACK, + db_proxy, + use_selfservice, + ) + + reactor.listenUDP(ListenPort, srv_proxy, interface=ListenIP) + + def loopingErrHandle(failure): + print( + "(GLOBAL) STOPPING REACTOR TO AVOID MEMORY LEAK: Unhandled error innowtimed loop.\n {}".format( + failure + ) + ) + reactor.stop() + + if use_selfservice: + # Options loop + opts_loop = task.LoopingCall(srv_proxy.send_opts) + opts_loop.start(10).addErrback(loopingErrHandle) + + # Clean table every hour + cl_tbl = task.LoopingCall(db_proxy.clean_tbl) + cl_tbl.start(3600).addErrback(loopingErrHandle) + + # Update last seen loop + ls_loop = task.LoopingCall(srv_proxy.lst_seen) + ls_loop.start(120).addErrback(loopingErrHandle) + + def stats(): + count = 0 + nowtime = time() + for port in CONNTRACK: + if CONNTRACK[port]: + count = count + 1 + + totalPorts = DestPortEnd - DestportStart + freePorts = totalPorts - count + + print( + "{} ports out of {} in use ({} free)".format(count, totalPorts, freePorts) + ) + + def blackListTrimmer(): + _timenow = time() + _dellist = [] + for entry in IPBlackList: + deletetime = IPBlackList[entry] + if deletetime and deletetime < _timenow: + _dellist.append(entry) + + for delete in _dellist: + IPBlackList.pop(delete) + if ClientInfo: + print("Remove dynamic blacklist entry for {}".format(delete)) + if PRIV_HELPER: + print( + "Ask priv helper to remove blacklist entry for {} from iptables".format( + delete + ) + ) + reactor.callInThread(PRIV_HELPER.delBL, ListenPort, delete) + + def rptlTrimmer(): + RPTLTRACK.clear() + print("Purge RPTL table") + + if Stats == True: + stats_task = task.LoopingCall(stats) + statsa = stats_task.start(30) + statsa.addErrback(loopingErrHandle) + + blacklist_task = task.LoopingCall(blackListTrimmer) + blacklista = blacklist_task.start(15) + blacklista.addErrback(loopingErrHandle) + + rptlTrimmer_task = task.LoopingCall(rptlTrimmer) + rptlTrimmera = rptlTrimmer_task.start(60) + rptlTrimmera.addErrback(loopingErrHandle) + + reactor.run() diff --git a/hotspot_proxy_v2.py b/hotspot_proxy_v2.py index 03ce5c0..0878cfc 100644 --- a/hotspot_proxy_v2.py +++ b/hotspot_proxy_v2.py @@ -196,10 +196,10 @@ class Proxy(DatagramProtocol): if _command in (MSTN,MSTC): # Give time to the client for a reply to prevent port reassignment self.peerTrack[_peer_id]['timer'].reset(15) - + return - + else: _command = data[:4] @@ -307,7 +307,7 @@ if __name__ == '__main__': # Ensure we have a path for the config file, if one wasn't specified, then use the execution directory if not cli_args.CONFIG_FILE: - cli_args.CONFIG_FILE = os.path.dirname(os.path.abspath(__file__))+'/freedmr.cfg' + cli_args.CONFIG_FILE = os.path.dirname(os.path.abspath(__file__))+'/config/adn.cfg' _config_file = cli_args.CONFIG_FILE @@ -340,8 +340,8 @@ if __name__ == '__main__': ListenPort = 62031 #'' = all IPv4, '::' = all IPv4 and IPv6 (Dual Stack) ListenIP = '' - DestportStart = 54000 - DestPortEnd = 54100 + DestportStart = 56400 + DestPortEnd = 56500 Timeout = 30 Stats = False Debug = False diff --git a/proxy_db.py b/proxy_db.py new file mode 100644 index 0000000..6d062cd --- /dev/null +++ b/proxy_db.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python +# +############################################################################### +# Copyright (C) 2021-2022 Christian Quiroz, OA4DOA +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +############################################################################### +import sys + +from twisted.enterprise import adbapi +from twisted.internet.defer import inlineCallbacks + + +__author__ = 'Christian Quiroz, OA4DOA' +__version__ = '1.0.0' +__copyright__ = 'Copyright (c) 2021-2022 Christian Quiroz, OA4DOA' +__license__ = 'GNU GPLv3' +__maintainer__ = 'Christian Quiroz, OA4DOA' +__email__ = 'adm@dmr-peru.pe' + + +class ProxyDB: + def __init__(self, host, user, psswd, db_name, port): + self.db_name = db_name + self.dbpool = adbapi.ConnectionPool("MySQLdb", host, user, psswd, db_name, + port=port, charset="utf8mb4") + + @inlineCallbacks + def make_clients_tbl(self): + try: + yield self.dbpool.runOperation( + ''' CREATE TABLE IF NOT EXISTS Clients( + int_id INT UNIQUE PRIMARY KEY NOT NULL, + dmr_id TINYBLOB NOT NULL, + callsign VARCHAR(10) NOT NULL, + host VARCHAR(15), + options VARCHAR(100), + opt_rcvd TINYINT(1) DEFAULT False NOT NULL, + mode TINYINT(1) DEFAULT 4 NOT NULL, + logged_in TINYINT(1) DEFAULT False NOT NULL, + modified TINYINT(1) DEFAULT False NOT NULL, + psswd BLOB(256), + last_seen INT NOT NULL) CHARSET=utf8mb4''') + + except Exception as err: + print(f"make_clientss_tbl error: {err}") + + @inlineCallbacks + def test_db(self, _reactor): + try: + res = yield self.dbpool.runQuery("SELECT 1") + if res: + self.updt_tbl("start") + print("Database connection test: OK") + + except Exception as err: + if _reactor.running: + print(f"Database connection error: {err}, stopping the reactor.") + _reactor.stop() + else: + sys.exit(f"Database connection error: {err}, exiting.") + + @inlineCallbacks + def ins_conf(self, int_id, dmr_id, callsign, host, mode): + try: + yield self.dbpool.runOperation( + '''INSERT IGNORE INTO Clients ( + int_id, dmr_id, callsign, host, mode, logged_in, last_seen, psswd) + VALUES (%s, %s, %s, %s, %s, True, UNIX_TIMESTAMP(), NULL) ON DUPLICATE KEY UPDATE + callsign = %s, host = %s, mode = %s, logged_in = True, opt_rcvd = False, + last_seen = UNIX_TIMESTAMP(), psswd = NULL''', + (int_id, dmr_id, callsign, host, mode, callsign, host, mode)) + + except Exception as err: + print(f"ins_conf error: {err}") + + @inlineCallbacks + def clean_tbl(self): + try: + yield self.dbpool.runOperation( + "DELETE FROM Clients WHERE last_seen < UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 7 DAY))") + + except Exception as err: + print(f"clean_tbl error: {err}") + + def slct_db(self): + return self.dbpool.runQuery( + "SELECT dmr_id, options FROM Clients WHERE modified = True and logged_in = True") + + def slct_opt(self, _peer_id): + return self.dbpool.runQuery("SELECT options FROM Clients WHERE dmr_id = %s", (_peer_id,)) + + @inlineCallbacks + def updt_tbl(self, actn, dmr_id=None, psswd=None): + try: + if actn == "start": + yield self.dbpool.runOperation("UPDATE Clients SET logged_in=False, opt_rcvd=False") + elif actn == "opt_rcvd": + yield self.dbpool.runOperation( + "UPDATE Clients SET opt_rcvd = True, options = NULL WHERE dmr_id = %s", + (dmr_id,)) + elif actn == "last_seen": + yield self.dbpool.runOperation( + "UPDATE Clients SET last_seen = UNIX_TIMESTAMP() WHERE dmr_id = %s and logged_in = True", + (dmr_id,)) + elif actn == "log_out": + yield self.dbpool.runOperation( + "UPDATE Clients SET logged_in = False, modified = False WHERE dmr_id = %s", + (dmr_id,)) + elif actn == "rst_mod": + yield self.dbpool.runOperation( + "UPDATE Clients SET modified = False WHERE dmr_id = %s", (dmr_id,)) + elif actn == "psswd": + yield self.dbpool.runOperation( + "UPDATE Clients SET psswd = %s WHERE dmr_id = %s", (psswd, dmr_id)) + + except Exception as err: + print(f"updt_tbl error: {err}") + + @inlineCallbacks + def updt_lstseen(self, dmrid_list): + try: + def db_actn(txn): + txn.executemany( + "UPDATE Clients SET last_seen = UNIX_TIMESTAMP() WHERE dmr_id = %s", dmrid_list) + yield self.dbpool.runInteraction(db_actn) + + except Exception as err: + print(f"updt_lstseen error: {err}") + + +if __name__ == "__main__": + db_test = ProxyDB('localhost', 'root', '', 'test', 3306) + print(db_test) + \ No newline at end of file