diff --git a/config.py b/config.py index ae8fbc6..59424dd 100755 --- a/config.py +++ b/config.py @@ -205,6 +205,7 @@ def build_config(_config_file): 'MAX_PEERS': config.getint(section, 'MAX_PEERS'), 'IP': gethostbyname(config.get(section, 'IP')), 'PORT': config.getint(section, 'PORT'), + 'SOCK_ADDR': (gethostbyname(config.get(section, 'IP')), config.get(section, 'PORT')), 'PASSPHRASE': bytes(config.get(section, 'PASSPHRASE'), 'utf-8'), 'GROUP_HANGTIME': config.getint(section, 'GROUP_HANGTIME'), 'USE_ACL': config.getboolean(section, 'USE_ACL'), @@ -221,7 +222,8 @@ def build_config(_config_file): 'ENABLED': config.getboolean(section, 'ENABLED'), 'NETWORK_ID': config.getint(section, 'NETWORK_ID').to_bytes(4, 'big'), 'IP': gethostbyname(config.get(section, 'IP')), - 'PORT': config.getint(section, 'PORT'), + 'PORT': config.get(section, 'PORT'), + 'SOCK_ADDR': (gethostbyname(config.get(section, 'IP')), config.get(section, 'PORT')), 'PASSPHRASE': bytes(config.get(section, 'PASSPHRASE').ljust(20,'\x00')[:20], 'utf-8'), 'TARGET_SOCK': (gethostbyname(config.get(section, 'TARGET_IP')), config.getint(section, 'TARGET_PORT')), 'TARGET_IP': gethostbyname(config.get(section, 'TARGET_IP')), diff --git a/hblink.py b/hblink.py index f2efbba..afb2b53 100755 --- a/hblink.py +++ b/hblink.py @@ -37,9 +37,12 @@ from time import time from collections import deque # Twisted is pretty important, so I keep it separate -from twisted.internet.protocol import DatagramProtocol, Factory, Protocol -from twisted.protocols.basic import NetstringReceiver -from twisted.internet import reactor, task +#from twisted.internet.protocol import DatagramProtocol, Factory, Protocol +#from twisted.protocols.basic import NetstringReceiver +#from twisted.internet import reactor, task +import asyncio +import uvloop + # Other files we pull from -- this is mostly for readability and segmentation import log @@ -48,8 +51,8 @@ from const import * from dmr_utils3.utils import int_id, bytes_4, try_download, mk_id_dict # Imports for the reporting server -import pickle -from reporting_const import * +#import pickle +#from reporting_const import * # The module needs logging logging, but handlers, etc. are controlled by the parent import logging @@ -65,23 +68,7 @@ __email__ = 'n0mjs@me.com' # Global variables used whether we are a module or __main__ systems = {} - -# Timed loop used for reporting HBP status -def config_reports(_config, _factory): - def reporting_loop(_logger, _server): - _logger.debug('(GLOBAL) Periodic reporting loop started') - _server.send_config() - - logger.info('(GLOBAL) HBlink TCP reporting server configured') - - report_server = _factory(_config) - report_server.clients = [] - reactor.listenTCP(_config['REPORTS']['REPORT_PORT'], report_server) - - reporting = task.LoopingCall(reporting_loop, logger, report_server) - reporting.start(_config['REPORTS']['REPORT_INTERVAL']) - - return report_server +transports = {} # Shut ourselves down gracefully by disconnecting from the masters and peers. @@ -104,7 +91,7 @@ def acl_check(_id, _acl): # OPENBRIDGE CLASS #************************************************ -class OPENBRIDGE(DatagramProtocol): +class OPENBRIDGE(asyncio.DatagramProtocol): def __init__(self, _name, _config, _report): # Define a few shortcuts to make the rest of the class more readable self._CONFIG = _config @@ -113,20 +100,23 @@ class OPENBRIDGE(DatagramProtocol): self._config = self._CONFIG['SYSTEMS'][self._system] self._laststrid = deque([], 20) + def connection_made(self, transport): + self.transport = transport + def dereg(self): logger.info('(%s) is mode OPENBRIDGE. No De-Registration required, continuing shutdown', self._system) def send_system(self, _packet): - if _packet[:4] == DMRD: + if _packet[:4] == 'DMRD': #_packet = _packet[:11] + self._config['NETWORK_ID'] + _packet[15:] _packet = b''.join([_packet[:11], self._config['NETWORK_ID'], _packet[15:]]) #_packet += hmac_new(self._config['PASSPHRASE'],_packet,sha1).digest() _packet = b''.join([_packet, (hmac_new(self._config['PASSPHRASE'],_packet,sha1).digest())]) - self.transport.write(_packet, (self._config['TARGET_IP'], self._config['TARGET_PORT'])) + self.transport.sendto(_packet, (self._config['TARGET_IP'], self._config['TARGET_PORT'])) # KEEP THE FOLLOWING COMMENTED OUT UNLESS YOU'RE DEBUGGING DEEPLY!!!! # logger.debug('(%s) TX Packet to OpenBridge %s:%s -- %s', self._system, self._config['TARGET_IP'], self._config['TARGET_PORT'], ahex(_packet)) else: - logger.error('(%s) OpenBridge system was asked to send non DMRD packet: %s', self._system, _packet) + logger.error('(%s) OpenBridge system was asked to send non DMRD packet', self._system) def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data): pass @@ -136,7 +126,7 @@ class OPENBRIDGE(DatagramProtocol): # Keep This Line Commented Unless HEAVILY Debugging! #logger.debug('(%s) RX packet from %s -- %s', self._system, _sockaddr, ahex(_packet)) - if _packet[:4] == DMRD: # DMRData -- encapsulated DMR data frame + if _packet[:4] == b'DMRD': # DMRData -- encapsulated DMR data frame _data = _packet[:53] _hash = _packet[53:] _ckhs = hmac_new(self._config['PASSPHRASE'],_data,sha1).digest() @@ -199,7 +189,7 @@ class OPENBRIDGE(DatagramProtocol): # HB MASTER CLASS #************************************************ -class HBSYSTEM(DatagramProtocol): +class HBSYSTEM(asyncio.DatagramProtocol): def __init__(self, _name, _config, _report): # Define a few shortcuts to make the rest of the class more readable self._CONFIG = _config @@ -214,55 +204,64 @@ class HBSYSTEM(DatagramProtocol): self._peers = self._CONFIG['SYSTEMS'][self._system]['PEERS'] self.send_system = self.send_peers self.maintenance_loop = self.master_maintenance_loop - self.datagramReceived = self.master_datagramReceived + self.datagram_received = self.master_datagram_received self.dereg = self.master_dereg elif self._config['MODE'] == 'PEER': self._stats = self._config['STATS'] self.send_system = self.send_master self.maintenance_loop = self.peer_maintenance_loop - self.datagramReceived = self.peer_datagramReceived + self.datagram_received = self.peer_datagram_received self.dereg = self.peer_dereg - def startProtocol(self): - # Set up periodic loop for tracking pings from peers. Run every 'PING_TIME' seconds - self._system_maintenance = task.LoopingCall(self.maintenance_loop) - self._system_maintenance_loop = self._system_maintenance.start(self._CONFIG['GLOBAL']['PING_TIME']) + def connection_made(self, transport): + self.transport = transport + self._system_maintenance = loop.create_task(self.maintenance_loop()) # Aliased in __init__ to maintenance_loop if system is a master - def master_maintenance_loop(self): - logger.debug('(%s) Master maintenance loop started', self._system) - remove_list = [] - for peer in self._peers: - _this_peer = self._peers[peer] - # Check to see if any of the peers have been quiet (no ping) longer than allowed - if _this_peer['LAST_PING']+(self._CONFIG['GLOBAL']['PING_TIME']*self._CONFIG['GLOBAL']['MAX_MISSED']) < time(): - remove_list.append(peer) - for peer in remove_list: - logger.info('(%s) Peer %s (%s) has timed out and is being removed', self._system, self._peers[peer]['CALLSIGN'], self._peers[peer]['RADIO_ID']) - # Remove any timed out peers from the configuration - del self._CONFIG['SYSTEMS'][self._system]['PEERS'][peer] + async def master_maintenance_loop(self): + while True: + try: + logger.debug('(%s) Master maintenance loop started', self._system) + remove_list = [] + for peer in self._peers: + _this_peer = self._peers[peer] + # Check to see if any of the peers have been quiet (no ping) longer than allowed + if _this_peer['LAST_PING']+(self._CONFIG['GLOBAL']['PING_TIME']*self._CONFIG['GLOBAL']['MAX_MISSED']) < time(): + remove_list.append(peer) + for peer in remove_list: + logger.info('(%s) Peer %s (%s) has timed out and is being removed', self._system, self._peers[peer]['CALLSIGN'], self._peers[peer]['RADIO_ID']) + # Remove any timed out peers from the configuration + del self._CONFIG['SYSTEMS'][self._system]['PEERS'][peer] + except Exception as e: + logger.error('(%s) Master Maintenance Loop ERROR: %s', self._system, e) + await asyncio.sleep(self._CONFIG['GLOBAL']['PING_TIME']) # Aliased in __init__ to maintenance_loop if system is a peer - def peer_maintenance_loop(self): - logger.debug('(%s) Peer maintenance loop started', self._system) - if self._stats['PING_OUTSTANDING']: - self._stats['NUM_OUTSTANDING'] += 1 - # If we're not connected, zero out the stats and send a login request RPTL - if self._stats['CONNECTION'] != 'YES' or self._stats['NUM_OUTSTANDING'] >= self._CONFIG['GLOBAL']['MAX_MISSED']: - self._stats['PINGS_SENT'] = 0 - self._stats['PINGS_ACKD'] = 0 - self._stats['NUM_OUTSTANDING'] = 0 - self._stats['PING_OUTSTANDING'] = False - self._stats['CONNECTION'] = 'RPTL_SENT' - self.send_master(b''.join([RPTL, self._config['RADIO_ID']])) - logger.info('(%s) Sending login request to master %s:%s', self._system, self._config['MASTER_IP'], self._config['MASTER_PORT']) - # If we are connected, sent a ping to the master and increment the counter - if self._stats['CONNECTION'] == 'YES': - self.send_master(b''.join([RPTPING, self._config['RADIO_ID']])) - logger.debug('(%s) RPTPING Sent to Master. Total Sent: %s, Total Missed: %s, Currently Outstanding: %s', self._system, self._stats['PINGS_SENT'], self._stats['PINGS_SENT'] - self._stats['PINGS_ACKD'], self._stats['NUM_OUTSTANDING']) - self._stats['PINGS_SENT'] += 1 - self._stats['PING_OUTSTANDING'] = True + async def peer_maintenance_loop(self): + while True: + try: + logger.debug('(%s) Peer maintenance loop started', self._system) + if self._stats['PING_OUTSTANDING']: + self._stats['NUM_OUTSTANDING'] += 1 + # If we're not connected, zero out the stats and send a login request RPTL + if self._stats['CONNECTION'] != 'YES' or self._stats['NUM_OUTSTANDING'] >= self._CONFIG['GLOBAL']['MAX_MISSED']: + self._stats['PINGS_SENT'] = 0 + self._stats['PINGS_ACKD'] = 0 + self._stats['NUM_OUTSTANDING'] = 0 + self._stats['PING_OUTSTANDING'] = False + self._stats['CONNECTION'] = 'RPTL_SENT' + self.send_master(b''.join([RPTL, self._config['RADIO_ID']])) + logger.info('(%s) Sending login request to master %s:%s', self._system, self._config['MASTER_IP'], self._config['MASTER_PORT']) + # If we are connected, sent a ping to the master and increment the counter + if self._stats['CONNECTION'] == 'YES': + self.send_master(b''.join([RPTPING, self._config['RADIO_ID']])) + logger.debug('(%s) RPTPING Sent to Master. Total Sent: %s, Total Missed: %s, Currently Outstanding: %s', self._system, self._stats['PINGS_SENT'], self._stats['PINGS_SENT'] - self._stats['PINGS_ACKD'], self._stats['NUM_OUTSTANDING']) + self._stats['PINGS_SENT'] += 1 + self._stats['PING_OUTSTANDING'] = True + except Exception as e: + logger.error('(%s) Master Maintenance Loop ERROR: %s', self._system, e) + await asyncio.sleep(self._CONFIG['GLOBAL']['PING_TIME']) def send_peers(self, _packet): for _peer in self._peers: @@ -270,16 +269,16 @@ class HBSYSTEM(DatagramProtocol): #logger.debug('(%s) Packet sent to peer %s', self._system, self._peers[_peer]['RADIO_ID']) def send_peer(self, _peer, _packet): - if _packet[:4] == DMRD: + if _packet[:4] == 'DMRD': _packet = b''.join([_packet[:11], _peer, _packet[15:]]) - self.transport.write(_packet, self._peers[_peer]['SOCKADDR']) + self.transport.sendto(_packet, self._peers[_peer]['SOCKADDR']) # KEEP THE FOLLOWING COMMENTED OUT UNLESS YOU'RE DEBUGGING DEEPLY!!!! #logger.debug('(%s) TX Packet to %s on port %s: %s', self._peers[_peer]['RADIO_ID'], self._peers[_peer]['IP'], self._peers[_peer]['PORT'], ahex(_packet)) def send_master(self, _packet): - if _packet[:4] == DMRD: + if _packet[:4] == b'DMRD': _packet = b''.join([_packet[:11], self._config['RADIO_ID'], _packet[15:]]) - self.transport.write(_packet, self._config['MASTER_SOCKADDR']) + self.transport.sendto(_packet, self._config['MASTER_SOCKADDR']) # KEEP THE FOLLOWING COMMENTED OUT UNLESS YOU'RE DEBUGGING DEEPLY!!!! # logger.debug('(%s) TX Packet to %s:%s -- %s', self._system, self._config['MASTER_IP'], self._config['MASTER_PORT'], ahex(_packet)) @@ -295,15 +294,19 @@ class HBSYSTEM(DatagramProtocol): self.send_master(RPTCL + self._config['RADIO_ID']) logger.info('(%s) De-Registration sent to Master: %s:%s', self._system, self._config['MASTER_SOCKADDR'][0], self._config['MASTER_SOCKADDR'][1]) + + def datagram_received(self, data, addr, args=None): + print('(%s) Received %r from %s' % (self, data, addr)) + # Aliased in __init__ to datagramReceived if system is a master - def master_datagramReceived(self, _data, _sockaddr): + def master_datagram_received(self, _data, _sockaddr): # Keep This Line Commented Unless HEAVILY Debugging! # logger.debug('(%s) RX packet from %s -- %s', self._system, _sockaddr, ahex(_data)) # Extract the command, which is various length, all but one 4 significant characters -- RPTCL _command = _data[:4] - if _command == DMRD: # DMRData -- encapsulated DMR data frame + if _command == b'DMRD': # DMRData -- encapsulated DMR data frame _peer_id = _data[11:15] if _peer_id in self._peers \ and self._peers[_peer_id]['CONNECTION'] == 'YES' \ @@ -370,7 +373,7 @@ class HBSYSTEM(DatagramProtocol): for _peer in self._peers: if _peer != _peer_id: pkt[1] = _peer - self.transport.write(b''.join(pkt), self._peers[_peer]['SOCKADDR']) + self.transport.sendto(b''.join(pkt), self._peers[_peer]['SOCKADDR']) #logger.debug('(%s) Packet on TS%s from %s (%s) for destination ID %s repeated to peer: %s (%s) [Stream ID: %s]', self._system, _slot, self._peers[_peer_id]['CALLSIGN'], int_id(_peer_id), int_id(_dst_id), self._peers[_peer]['CALLSIGN'], int_id(_peer), int_id(_stream_id)) @@ -415,10 +418,10 @@ class HBSYSTEM(DatagramProtocol): self._peers[_peer_id]['CONNECTION'] = 'CHALLENGE_SENT' logger.info('(%s) Sent Challenge Response to %s for login: %s', self._system, int_id(_peer_id), self._peers[_peer_id]['SALT']) else: - self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) + self.transport.sendto(b''.join([MSTNAK, _peer_id]), _sockaddr) logger.warning('(%s) Invalid Login from Radio ID: %s Denied by Registation ACL', self._system, int_id(_peer_id)) else: - self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) + self.transport.sendto(b''.join([MSTNAK, _peer_id]), _sockaddr) logger.warning('(%s) Registration denied from Radio ID: %s Maximum number of peers exceeded', self._system, int_id(_peer_id)) elif _command == RPTK: # Repeater has answered our login challenge @@ -437,10 +440,10 @@ class HBSYSTEM(DatagramProtocol): logger.info('(%s) Peer %s has completed the login exchange successfully', self._system, _this_peer['RADIO_ID']) else: logger.info('(%s) Peer %s has FAILED the login exchange successfully', self._system, _this_peer['RADIO_ID']) - self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) + self.transport.sendto(b''.join([MSTNAK, _peer_id]), _sockaddr) del self._peers[_peer_id] else: - self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) + self.transport.sendto(b''.join([MSTNAK, _peer_id]), _sockaddr) logger.warning('(%s) Login challenge from Radio ID that has not logged in: %s', self._system, int_id(_peer_id)) elif _command == RPTC: # Repeater is sending it's configuraiton OR disconnecting @@ -450,7 +453,7 @@ class HBSYSTEM(DatagramProtocol): and self._peers[_peer_id]['CONNECTION'] == 'YES' \ and self._peers[_peer_id]['SOCKADDR'] == _sockaddr: logger.info('(%s) Peer is closing down: %s (%s)', self._system, self._peers[_peer_id]['CALLSIGN'], int_id(_peer_id)) - self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) + self.transport.sendto(b''.join([MSTNAK, _peer_id]), _sockaddr) del self._peers[_peer_id] else: @@ -480,7 +483,7 @@ class HBSYSTEM(DatagramProtocol): self.send_peer(_peer_id, b''.join([RPTACK, _peer_id])) logger.info('(%s) Peer %s (%s) has sent repeater configuration', self._system, _this_peer['CALLSIGN'], _this_peer['RADIO_ID']) else: - self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) + self.transport.sendto(b''.join([MSTNAK, _peer_id]), _sockaddr) logger.warning('(%s) Peer info from Radio ID that has not logged in: %s', self._system, int_id(_peer_id)) elif _command == RPTP: # RPTPing -- peer is pinging us @@ -493,14 +496,14 @@ class HBSYSTEM(DatagramProtocol): self.send_peer(_peer_id, b''.join([MSTPONG, _peer_id])) logger.debug('(%s) Received and answered RPTPING from peer %s (%s)', self._system, self._peers[_peer_id]['CALLSIGN'], int_id(_peer_id)) else: - self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) + self.transport.sendto(b''.join([MSTNAK, _peer_id]), _sockaddr) logger.warning('(%s) Ping from Radio ID that is not logged in: %s', self._system, int_id(_peer_id)) else: logger.error('(%s) Unrecognized command. Raw HBP PDU: %s', self._system, ahex(_data)) # Aliased in __init__ to datagramReceived if system is a peer - def peer_datagramReceived(self, _data, _sockaddr): + def peer_datagram_received(self, _data, _sockaddr): # Keep This Line Commented Unless HEAVILY Debugging! # logger.debug('(%s) RX packet from %s -- %s', self._system, _sockaddr, ahex(_data)) @@ -508,8 +511,8 @@ class HBSYSTEM(DatagramProtocol): if self._config['MASTER_SOCKADDR'] == _sockaddr: # Extract the command, which is various length, but only 4 significant characters _command = _data[:4] - if _command == DMRD: # DMRData -- encapsulated DMR data frame - + if _command == b'DMRD': # DMRData -- encapsulated DMR data frame + _peer_id = _data[11:15] if self._config['LOOSE'] or _peer_id == self._config['RADIO_ID']: # Validate the Radio_ID unless using loose validation _seq = _data[4:5] @@ -528,7 +531,7 @@ class HBSYSTEM(DatagramProtocol): _dtype_vseq = (_bits & 0xF) # data, 1=voice header, 2=voice terminator; voice, 0=burst A ... 5=burst F _stream_id = _data[16:20] #logger.debug('(%s) DMRD - Sequence: %s, RF Source: %s, Destination ID: %s', self._system, int_id(_seq), int_id(_rf_src), int_id(_dst_id)) - + # ACL Processing if self._CONFIG['GLOBAL']['USE_ACL']: if not acl_check(_rf_src, self._CONFIG['GLOBAL']['SUB_ACL']): @@ -663,53 +666,6 @@ class HBSYSTEM(DatagramProtocol): else: logger.error('(%s) Received an invalid command in packet: %s', self._system, ahex(_data)) -# -# Socket-based reporting section -# -class report(NetstringReceiver): - def __init__(self, factory): - self._factory = factory - - def connectionMade(self): - self._factory.clients.append(self) - logger.info('(REPORT) HBlink reporting client connected: %s', self.transport.getPeer()) - - def connectionLost(self, reason): - logger.info('(REPORT) HBlink reporting client disconnected: %s', self.transport.getPeer()) - self._factory.clients.remove(self) - - def stringReceived(self, data): - self.process_message(data) - - def process_message(self, _message): - opcode = _message[:1] - if opcode == REPORT_OPCODES['CONFIG_REQ']: - logger.info('(REPORT) HBlink reporting client sent \'CONFIG_REQ\': %s', self.transport.getPeer()) - self.send_config() - else: - logger.error('(REPORT) got unknown opcode') - -class reportFactory(Factory): - def __init__(self, config): - self._config = config - - def buildProtocol(self, addr): - if (addr.host) in self._config['REPORTS']['REPORT_CLIENTS'] or '*' in self._config['REPORTS']['REPORT_CLIENTS']: - logger.debug('(REPORT) Permitting report server connection attempt from: %s:%s', addr.host, addr.port) - return report(self) - else: - logger.error('(REPORT) Invalid report server connection attempt from: %s:%s', addr.host, addr.port) - return None - - def send_clients(self, _message): - for client in self.clients: - client.sendString(_message) - - def send_config(self): - serialized = pickle.dumps(self._config['SYSTEMS'], protocol=2) #.decode('utf-8', errors='ignore') #pickle.HIGHEST_PROTOCOL) - self.send_clients(b''.join([REPORT_OPCODES['CONFIG_SND'], serialized])) - - # ID ALIAS CREATION # Download def mk_aliases(_config): @@ -746,6 +702,7 @@ if __name__ == '__main__': import sys import os import signal + import functools # Change the current directory to the location of the application os.chdir(os.path.dirname(os.path.realpath(sys.argv[0]))) @@ -774,8 +731,8 @@ if __name__ == '__main__': def sig_handler(_signal, _frame): logger.info('(GLOBAL) SHUTDOWN: HBLINK IS TERMINATING WITH SIGNAL %s', str(_signal)) hblink_handler(_signal, _frame) - logger.info('(GLOBAL) SHUTDOWN: ALL SYSTEM HANDLERS EXECUTED - STOPPING REACTOR') - reactor.stop() + logger.info('(GLOBAL) SHUTDOWN: ALL SYSTEM HANDLERS EXECUTED - STOPPING ASYNCIO LOOP') + loop.stop() # Set signal handers so that we can gracefully exit if need be for sig in [signal.SIGTERM, signal.SIGINT]: @@ -783,22 +740,29 @@ if __name__ == '__main__': peer_ids, subscriber_ids, talkgroup_ids = mk_aliases(CONFIG) - # INITIALIZE THE REPORTING LOOP - if CONFIG['REPORTS']['REPORT']: - report_server = config_reports(CONFIG, reportFactory) - else: - report_server = None - logger.info('(REPORT) TCP Socket reporting not configured') + report_server = None + + + asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) + loop = asyncio.get_event_loop() + # HBlink instance creation logger.info('(GLOBAL) HBlink \'HBlink.py\' -- SYSTEM STARTING...') for system in CONFIG['SYSTEMS']: if CONFIG['SYSTEMS'][system]['ENABLED']: + if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': - systems[system] = OPENBRIDGE(system, CONFIG, report_server) + OBPfactory = functools.partial(OPENBRIDGE, system, CONFIG, report_server) + transports[system], systems[system] = loop.run_until_complete(loop.create_datagram_endpoint(OBPfactory, local_addr=CONFIG['SYSTEMS'][system]['SOCK_ADDR'])) else: - systems[system] = HBSYSTEM(system, CONFIG, report_server) - reactor.listenUDP(CONFIG['SYSTEMS'][system]['PORT'], systems[system], interface=CONFIG['SYSTEMS'][system]['IP']) + HBPfactory = functools.partial(HBSYSTEM, system, CONFIG, report_server) + transports[system], systems[system] = loop.run_until_complete(loop.create_datagram_endpoint(HBPfactory, local_addr=CONFIG['SYSTEMS'][system]['SOCK_ADDR'])) logger.debug('(GLOBAL) %s instance created: %s, %s', CONFIG['SYSTEMS'][system]['MODE'], system, systems[system]) - reactor.run() + + + + + loop.run_forever() + #reactor.run()