You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ADN-DMR-Peer-Server/hotspot_proxy_v2.py

461 lines
17 KiB

###############################################################################
# Copyright (C) 2020 Simon Adlem, G7RZU <g7rzu@gb7fr.org.uk>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
###############################################################################
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor, task
from time import time
from dmr_utils3.utils import int_id
import random
import ipaddress
import os
from setproctitle import setproctitle
from datetime import datetime
import Pyro5.api
# Does anybody read this stuff? There's a PEP somewhere that says I should do this.
__author__ = 'Simon Adlem - G7RZU'
__copyright__ = 'Copyright (c) Simon Adlem, G7RZU 2020,2021,2022,2023'
__credits__ = 'Jon Lee, G4TSN; Norman Williams, M6NBP; Christian, OA4DOA'
__license__ = 'GNU GPLv3'
__maintainer__ = 'Simon Adlem G7RZU'
__email__ = 'simon@gb7fr.org.uk'
def IsIPv4Address(ip):
try:
ipaddress.IPv4Address(ip)
return True
except ValueError as errorCode:
pass
return False
def IsIPv6Address(ip):
try:
ipaddress.IPv6Address(ip)
return True
except ValueError as errorCode:
pass
class privHelper():
def __init__(self):
self._netfilterURI = 'PYRO:netfilterControl@./u:/run/priv_control/priv_control.unixsocket'
self._conntrackURI = 'PYRO:conntrackControl@./u:/run/priv_control/priv_control.unixsocket'
def addBL(self,dport,ip):
try:
with Pyro5.api.Proxy(self._netfilterURI) as nf:
nf.blocklistAdd(dport,ip)
except Exception as e:
print('(PrivError) {}'.format(e))
def delBL(self,dport,ip):
try:
with Pyro5.api.Proxy(self._netfilterURI) as nf:
nf.blocklistDel(dport,ip)
except Exception as e:
print('(PrivError) {}'.format(e))
def blocklistFlush(self):
try:
with Pyro5.api.Proxy(self._netfilterURI) as nf:
nf.blocklistFlush()
except Exception as e:
print('(PrivError) {}'.format(e))
def flushCT(self):
try:
with Pyro5.api.Proxy(self._conntrackURI) as ct:
ct.flushUDPTarget(62031)
except Exception as e:
print('(PrivError) {}'.format(e))
class Proxy(DatagramProtocol):
def __init__(self,Master,ListenPort,connTrack,peerTrack,blackList,IPBlackList,Timeout,Debug,ClientInfo,DestportStart,DestPortEnd,privHelper,rptlTrack):
self.master = Master
self.ListenPort = ListenPort
self.connTrack = connTrack
self.peerTrack = peerTrack
self.timeout = Timeout
self.debug = Debug
self.clientinfo = ClientInfo
self.blackList = blackList
self.IPBlackList = IPBlackList
self.destPortStart = DestportStart
self.destPortEnd = DestPortEnd
self.numPorts = DestPortEnd - DestportStart
self.privHelper = privHelper
self.rptlTrack = rptlTrack
def reaper(self,_peer_id):
if self.debug:
print("dead",_peer_id)
if self.clientinfo and _peer_id != b'\xff\xff\xff\xff':
print(f"{datetime.now().replace(microsecond=0)} Client: ID:{str(int_id(_peer_id)).rjust(9)} IP:{self.peerTrack[_peer_id]['shost'].rjust(15)} Port:{self.peerTrack[_peer_id]['sport']} Removed.")
self.transport.write(b'RPTCL'+_peer_id, (self.master,self.peerTrack[_peer_id]['dport']))
#Tell client we have closed the session - 3 times, in case they are on a lossy network
self.transport.write(b'MSTCL',(self.peerTrack[_peer_id]['shost'],self.peerTrack[_peer_id]['sport']))
self.transport.write(b'MSTCL',(self.peerTrack[_peer_id]['shost'],self.peerTrack[_peer_id]['sport']))
self.transport.write(b'MSTCL',(self.peerTrack[_peer_id]['shost'],self.peerTrack[_peer_id]['sport']))
self.connTrack[self.peerTrack[_peer_id]['dport']] = False
del self.peerTrack[_peer_id]
def datagramReceived(self, data, addr):
# HomeBrew Protocol Commands
DMRD = b'DMRD'
DMRA = b'DMRA'
MSTCL = b'MSTCL'
MSTNAK = b'MSTNAK'
MSTPONG = b'MSTPONG'
MSTN = b'MSTN'
MSTP = b'MSTP'
MSTC = b'MSTC'
RPTL = b'RPTL'
RPTPING = b'RPTPING'
RPTCL = b'RPTCL'
RPTL = b'RPTL'
RPTACK = b'RPTACK'
RPTK = b'RPTK'
RPTC = b'RPTC'
RPTP = b'RPTP'
RPTA = b'RPTA'
RPTO = b'RPTO'
#Proxy control commands
PRBL = b'PRBL'
#Proxy info commands
PRIN = b'PRIN'
_peer_id = False
host,port = addr
nowtime = time()
Debug = self.debug
if host in self.IPBlackList:
return
#If the packet comes from the master
if host == self.master:
_command = data[:4]
if _command == PRBL:
_peer_id = data[4:8]
_bltime = data[8:].decode('UTF-8')
_bltime = float(_bltime)
try:
self.IPBlackList[self.peerTrack[_peer_id]['shost']] = _bltime
except KeyError:
return
if self.clientinfo:
print('Add to blacklist: host {}. Expire time {}'.format(self.peerTrack[_peer_id]['shost'],_bltime))
if self.privHelper:
print('Ask priv_helper to add to iptables: host {}, port {}.'.format(self.peerTrack[_peer_id]['shost'],self.ListenPort))
reactor.callInThread(self.privHelper.addBL,self.ListenPort,self.peerTrack[_peer_id]['shost'])
return
if _command == DMRD:
_peer_id = data[11:15]
elif _command == RPTA:
if data[6:10] in self.peerTrack:
_peer_id = data[6:10]
else:
_peer_id = self.connTrack[port]
elif _command == MSTN:
_peer_id = data[6:10]
elif _command == MSTP:
_peer_id = data[7:11]
elif _command == MSTC:
_peer_id = data[5:9]
if self.debug:
print(data)
if _peer_id in self.peerTrack:
self.transport.write(data,(self.peerTrack[_peer_id]['shost'],self.peerTrack[_peer_id]['sport']))
# Remove the client after send a MSTN or MSTC packet
if _command in (MSTN,MSTC):
# Give time to the client for a reply to prevent port reassignment
self.peerTrack[_peer_id]['timer'].reset(15)
return
else:
_command = data[:4]
if _command == DMRD: # DMRData -- encapsulated DMR data frame
_peer_id = data[11:15]
elif _command == DMRA: # DMRAlias -- Talker Alias information
_peer_id = data[4:8]
elif _command == RPTL: # RPTLogin -- a repeater wants to login
_peer_id = data[4:8]
#if we have seen more than 20 RPTL packets from this IP since the RPTL tracking table was reset (every 60 secs)
#blacklist IP for 10 minutes
if host not in self.rptlTrack:
self.rptlTrack[host] = 1
else:
self.rptlTrack[host] += 1
if self.rptlTrack[host] > 50:
print('(RPTL) exceeded max: {}'.format(self.rptlTrack[host]))
_bltime = nowtime + 600
self.IPBlackList[host] = _bltime
self.rptlTrack.pop(host)
if self.clientinfo:
print('(RPTL) Add to blacklist: host {}. Expire time {}'.format(host,_bltime))
if self.privHelper:
print('(RPTL) Ask priv_helper to add to iptables: host {}, port {}.'.format(host,self.ListenPort))
reactor.callInThread(self.privHelper.addBL,self.ListenPort,host)
return
elif _command == RPTK: # Repeater has answered our login challenge
_peer_id = data[4:8]
elif _command == RPTC: # Repeater is sending it's configuraiton OR disconnecting
if data[:5] == RPTCL: # Disconnect command
_peer_id = data[5:9]
else:
_peer_id = data[4:8] # Configure Command
elif _command == RPTO: # options
_peer_id = data[4:8]
elif _command == RPTP: # RPTPing -- peer is pinging us
_peer_id = data[7:11]
else:
return
if _peer_id in self.peerTrack:
_dport = self.peerTrack[_peer_id]['dport']
self.peerTrack[_peer_id]['sport'] = port
self.peerTrack[_peer_id]['shost'] = host
self.transport.write(data, (self.master,_dport))
self.peerTrack[_peer_id]['timer'].reset(self.timeout)
if self.debug:
print(data)
return
else:
if int_id(_peer_id) in self.blackList:
return
# Make a list with the available ports
_ports_avail = [port for port in self.connTrack if not self.connTrack[port]]
if _ports_avail:
_dport = random.choice(_ports_avail)
else:
return
self.connTrack[_dport] = _peer_id
self.peerTrack[_peer_id] = {}
self.peerTrack[_peer_id]['dport'] = _dport
self.peerTrack[_peer_id]['sport'] = port
self.peerTrack[_peer_id]['shost'] = host
self.peerTrack[_peer_id]['timer'] = reactor.callLater(self.timeout,self.reaper,_peer_id)
self.transport.write(data, (self.master,_dport))
pripacket = b''.join([b'PRIN',host.encode('UTF-8'),b':',str(port).encode('UTF-8')])
#Send IP and Port info to server
self.transport.write(pripacket, (self.master,_dport))
if self.clientinfo and _peer_id != b'\xff\xff\xff\xff':
print(f'{datetime.now().replace(microsecond=0)} New client: ID:{str(int_id(_peer_id)).rjust(9)} IP:{host.rjust(15)} Port:{port}, assigned to port:{_dport}.')
if self.debug:
print(data)
return
if __name__ == '__main__':
import signal
import configparser
import argparse
import sys
import json
import stat
import functools
print = functools.partial(print, flush=True)
#Set process title early
setproctitle(__file__)
# Change the current directory to the location of the application
os.chdir(os.path.dirname(os.path.realpath(sys.argv[0])))
# CLI argument parser - handles picking up the config file from the command line, and sending a "help" message
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', action='store', dest='CONFIG_FILE', help='/full/path/to/config.file (usually adn.cfg)')
cli_args = parser.parse_args()
# Ensure we have a path for the config file, if one wasn't specified, then use the execution directory
if not cli_args.CONFIG_FILE:
cli_args.CONFIG_FILE = os.path.dirname(os.path.abspath(__file__))+'/config/adn.cfg'
_config_file = cli_args.CONFIG_FILE
config = configparser.ConfigParser()
if not config.read(_config_file):
print('Configuration file \''+_config_file+'\' is not a valid configuration file!')
try:
Master = config.get('PROXY','Master')
ListenPort = config.getint('PROXY','ListenPort')
ListenIP = config.get('PROXY','ListenIP')
DestportStart = config.getint('PROXY','DestportStart')
DestPortEnd = config.getint('PROXY','DestPortEnd')
Timeout = config.getint('PROXY','Timeout')
Stats = config.getboolean('PROXY','Stats')
Debug = config.getboolean('PROXY','Debug')
ClientInfo = config.getboolean('PROXY','ClientInfo')
BlackList = json.loads(config.get('PROXY','BlackList'))
IPBlackList = json.loads(config.get('PROXY','IPBlackList'))
except configparser.Error as err:
print('Error processing configuration file -- {}'.format(err))
print('Using default config')
#*** CONFIG HERE ***
Master = "127.0.0.1"
ListenPort = 62031
#'' = all IPv4, '::' = all IPv4 and IPv6 (Dual Stack)
ListenIP = ''
DestportStart = 56400
DestPortEnd = 56500
Timeout = 30
Stats = False
Debug = False
ClientInfo = False
BlackList = [1234567]
#e.g. {10.0.0.1: 0, 10.0.0.2: 0}
IPBlackList = {}
#*******************
CONNTRACK = {}
PEERTRACK = {}
RPTLTRACK = {}
PRIV_HELPER = None
# Set up the signal handler
def sig_handler(_signal, _frame):
print('(GLOBAL) SHUTDOWN: PROXY IS TERMINATING WITH SIGNAL {}'.format(str(_signal)))
reactor.stop()
#Install signal handlers
signal.signal(signal.SIGTERM, sig_handler)
signal.signal(signal.SIGINT, sig_handler)
#readState()
#If IPv6 is enabled by enivornment variable...
if ListenIP == '' and 'FDPROXY_IPV6' in os.environ and bool(os.environ['FDPROXY_IPV6']):
ListenIP = '::'
#Override static config from Environment
if 'FDPROXY_STATS' in os.environ:
Stats = bool(os.environ['FDPROXY_STATS'])
#if 'FDPROXY_DEBUG' in os.environ:
# Debug = bool(os.environ['FDPROXY_DEBUG'])
if 'FDPROXY_CLIENTINFO' in os.environ:
ClientInfo = bool(os.environ['FDPROXY_CLIENTINFO'])
if 'FDPROXY_LISTENPORT' in os.environ:
ListenPort = int(os.environ['FDPROXY_LISTENPORT'])
unixSocket = '/run/priv_control/priv_control.unixsocket'
if os.path.exists(unixSocket) and stat.S_ISSOCK(os.stat(unixSocket).st_mode):
print('(PRIV) Found UNIX socket. Enabling priv helper')
PRIV_HELPER = privHelper()
print('(PRIV) flush conntrack')
PRIV_HELPER.flushCT()
print('(PRIV) flush blocklist')
PRIV_HELPER.blocklistFlush()
for port in range(DestportStart,DestPortEnd+1,1):
CONNTRACK[port] = False
#If we are listening IPv6 and Master is an IPv4 IPv4Address
#IPv6ify the address.
if ListenIP == '::' and IsIPv4Address(Master):
Master = '::ffff:' + Master
reactor.listenUDP(ListenPort,Proxy(Master,ListenPort,CONNTRACK,PEERTRACK,BlackList,IPBlackList,Timeout,Debug,ClientInfo,DestportStart,DestPortEnd,PRIV_HELPER, RPTLTRACK),interface=ListenIP)
def loopingErrHandle(failure):
print('(GLOBAL) STOPPING REACTOR TO AVOID MEMORY LEAK: Unhandled error innowtimed loop.\n {}'.format(failure))
reactor.stop()
def stats():
count = 0
nowtime = time()
for port in CONNTRACK:
if CONNTRACK[port]:
count = count+1
totalPorts = DestPortEnd - DestportStart
freePorts = totalPorts - count
print("{} ports out of {} in use ({} free)".format(count,totalPorts,freePorts))
def blackListTrimmer():
_timenow = time()
_dellist = []
for entry in IPBlackList:
deletetime = IPBlackList[entry]
if deletetime and deletetime < _timenow:
_dellist.append(entry)
for delete in _dellist:
IPBlackList.pop(delete)
if ClientInfo:
print('Remove dynamic blacklist entry for {}'.format(delete))
if PRIV_HELPER:
print('Ask priv helper to remove blacklist entry for {} from iptables'.format(delete))
reactor.callInThread(PRIV_HELPER.delBL,ListenPort,delete)
def rptlTrimmer():
RPTLTRACK.clear()
print('Purge RPTL table')
if Stats == True:
stats_task = task.LoopingCall(stats)
statsa = stats_task.start(30)
statsa.addErrback(loopingErrHandle)
blacklist_task = task.LoopingCall(blackListTrimmer)
blacklista = blacklist_task.start(15)
blacklista.addErrback(loopingErrHandle)
rptlTrimmer_task = task.LoopingCall(rptlTrimmer)
rptlTrimmera = rptlTrimmer_task.start(60)
rptlTrimmera.addErrback(loopingErrHandle)
reactor.run()

Powered by TurnKey Linux.