From e6f11fa380d25fae98c03271740ebd656b0f7cf8 Mon Sep 17 00:00:00 2001 From: Cort Buffington Date: Fri, 18 Jan 2019 13:31:40 -0600 Subject: [PATCH] General asyncio progress --- bridge.py | 221 ++++++++++++++++++++++++++---------------------- hblink.py | 3 + rules_SAMPLE.py | 13 +-- 3 files changed, 132 insertions(+), 105 deletions(-) diff --git a/bridge.py b/bridge.py index 48c68fb..3a5c319 100755 --- a/bridge.py +++ b/bridge.py @@ -37,12 +37,14 @@ from time import time from importlib import import_module # Twisted is pretty important, so I keep it separate -from twisted.internet.protocol import Factory, Protocol -from twisted.protocols.basic import NetstringReceiver -from twisted.internet import reactor, task +#from twisted.internet.protocol import Factory, Protocol +#from twisted.protocols.basic import NetstringReceiver +#from twisted.internet import reactor, task +import asyncio +import uvloop # Things we import from the main hblink module -from hblink import HBSYSTEM, OPENBRIDGE, systems, hblink_handler, reportFactory, REPORT_OPCODES, mk_aliases +from hblink import HBSYSTEM, OPENBRIDGE, systems, transports, hblink_handler, mk_aliases, loop#, reportFactory, REPORT_OPCODES from dmr_utils3.utils import bytes_3, int_id, get_alias from dmr_utils3 import decode, bptc, const import config @@ -121,84 +123,98 @@ def make_bridges(_rules): # Run this every minute for rule timer updates -def rule_timer_loop(): - logger.debug('(ROUTER) routerHBP Rule timer loop started') - _now = time() - - for _bridge in BRIDGES: - for _system in BRIDGES[_bridge]: - if _system['TO_TYPE'] == 'ON': - if _system['ACTIVE'] == True: - if _system['TIMER'] < _now: - _system['ACTIVE'] = False - logger.info('(ROUTER) Conference Bridge TIMEOUT: DEACTIVATE System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) - else: - timeout_in = _system['TIMER'] - _now - logger.info('(ROUTER) Conference Bridge ACTIVE (ON timer running): System: %s Bridge: %s, TS: %s, TGID: %s, Timeout in: %.2fs,', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID']), timeout_in) - elif _system['ACTIVE'] == False: - logger.debug('(ROUTER) Conference Bridge INACTIVE (no change): System: %s Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) - elif _system['TO_TYPE'] == 'OFF': - if _system['ACTIVE'] == False: - if _system['TIMER'] < _now: - _system['ACTIVE'] = True - logger.info('(ROUTER) Conference Bridge TIMEOUT: ACTIVATE System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) + +async def rule_timer_loop(): + while True: + try: + logger.debug('(ROUTER) routerHBP Rule timer loop started') + _now = time() + + for _bridge in BRIDGES: + for _system in BRIDGES[_bridge]: + if _system['TO_TYPE'] == 'ON': + if _system['ACTIVE'] == True: + if _system['TIMER'] < _now: + _system['ACTIVE'] = False + logger.info('(ROUTER) Conference Bridge TIMEOUT: DEACTIVATE System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) + else: + timeout_in = _system['TIMER'] - _now + logger.info('(ROUTER) Conference Bridge ACTIVE (ON timer running): System: %s Bridge: %s, TS: %s, TGID: %s, Timeout in: %.2fs,', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID']), timeout_in) + elif _system['ACTIVE'] == False: + logger.debug('(ROUTER) Conference Bridge INACTIVE (no change): System: %s Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) + elif _system['TO_TYPE'] == 'OFF': + if _system['ACTIVE'] == False: + if _system['TIMER'] < _now: + _system['ACTIVE'] = True + logger.info('(ROUTER) Conference Bridge TIMEOUT: ACTIVATE System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) + else: + timeout_in = _system['TIMER'] - _now + logger.info('(ROUTER) Conference Bridge INACTIVE (OFF timer running): System: %s Bridge: %s, TS: %s, TGID: %s, Timeout in: %.2fs,', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID']), timeout_in) + elif _system['ACTIVE'] == True: + logger.debug('(ROUTER) Conference Bridge ACTIVE (no change): System: %s Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) else: - timeout_in = _system['TIMER'] - _now - logger.info('(ROUTER) Conference Bridge INACTIVE (OFF timer running): System: %s Bridge: %s, TS: %s, TGID: %s, Timeout in: %.2fs,', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID']), timeout_in) - elif _system['ACTIVE'] == True: - logger.debug('(ROUTER) Conference Bridge ACTIVE (no change): System: %s Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) - else: - logger.debug('(ROUTER) Conference Bridge NO ACTION: System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) + logger.debug('(ROUTER) Conference Bridge NO ACTION: System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) + + except Exception as e: + logger.error('(%s) Rule Timer Loop ERROR: %s', self._system, e) + + await asyncio.sleep(60) + - if CONFIG['REPORTS']['REPORT']: - report_server.send_clients(b'bridge updated') # run this every 10 seconds to trim orphaned stream ids -def stream_trimmer_loop(): - logger.debug('(ROUTER) Trimming inactive stream IDs from system lists') - _now = time() - - for system in systems: - # HBP systems, master and peer - if CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE': - for slot in range(1,3): - _slot = systems[system].STATUS[slot] - - # RX slot check - if _slot['RX_TYPE'] != HBPF_SLT_VTERM and _slot['RX_TIME'] < _now - 5: - _slot['RX_TYPE'] = HBPF_SLT_VTERM - logger.info('(%s) *TIME OUT* RX STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %.2f', \ - system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_RFS']), int_id(_slot['RX_TGID']), slot, _slot['RX_TIME'] - _slot['RX_START']) - if CONFIG['REPORTS']['REPORT']: - systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_PEER']), int_id(_slot['RX_RFS']), slot, int_id(_slot['RX_TGID']), _slot['RX_TIME'] - _slot['RX_START']).encode(encoding='utf-8', errors='ignore')) - - # TX slot check - if _slot['TX_TYPE'] != HBPF_SLT_VTERM and _slot['TX_TIME'] < _now - 5: - _slot['TX_TYPE'] = HBPF_SLT_VTERM - logger.info('(%s) *TIME OUT* TX STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %.2f', \ - system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_RFS']), int_id(_slot['TX_TGID']), slot, _slot['TX_TIME'] - _slot['TX_START']) - if CONFIG['REPORTS']['REPORT']: - systems[system]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_PEER']), int_id(_slot['TX_RFS']), slot, int_id(_slot['TX_TGID']), _slot['TX_TIME'] - _slot['TX_START']).encode(encoding='utf-8', errors='ignore')) - - # OBP systems - # We can't delete items from a dicationry that's being iterated, so we have to make a temporarly list of entrys to remove later - if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': - remove_list = [] - for stream_id in systems[system].STATUS: - if systems[system].STATUS[stream_id]['LAST'] < _now - 5: - remove_list.append(stream_id) - for stream_id in remove_list: - if stream_id in systems[system].STATUS: - _stream = systems[system].STATUS[stream_id] - _sysconfig = CONFIG['SYSTEMS'][system] - logger.info('(%s) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %.2f', \ - system, int_id(stream_id), get_alias(int_id(_stream['RFS']), subscriber_ids), get_alias(int_id(_sysconfig['NETWORK_ID']), peer_ids), get_alias(int_id(_stream['TGID']), talkgroup_ids), _stream['LAST'] - _stream['START']) - if CONFIG['REPORTS']['REPORT']: - systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_sysconfig['NETWORK_ID']), int_id(_stream['RFS']), 1, int_id(_stream['TGID']), _stream['LAST'] - _stream['START']).encode(encoding='utf-8', errors='ignore')) - removed = systems[system].STATUS.pop(stream_id) - else: - logger.error('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS]) +async def stream_trimmer_loop(): + while True: + try: + logger.debug('(ROUTER) Trimming inactive stream IDs from system lists') + _now = time() + + for system in systems: + # HBP systems, master and peer + if CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE': + for slot in range(1,3): + _slot = systems[system].STATUS[slot] + + # RX slot check + if _slot['RX_TYPE'] != HBPF_SLT_VTERM and _slot['RX_TIME'] < _now - 5: + _slot['RX_TYPE'] = HBPF_SLT_VTERM + logger.info('(%s) *TIME OUT* RX STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %.2f', \ + system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_RFS']), int_id(_slot['RX_TGID']), slot, _slot['RX_TIME'] - _slot['RX_START']) + if CONFIG['REPORTS']['REPORT']: + systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_PEER']), int_id(_slot['RX_RFS']), slot, int_id(_slot['RX_TGID']), _slot['RX_TIME'] - _slot['RX_START']).encode(encoding='utf-8', errors='ignore')) + + # TX slot check + if _slot['TX_TYPE'] != HBPF_SLT_VTERM and _slot['TX_TIME'] < _now - 5: + _slot['TX_TYPE'] = HBPF_SLT_VTERM + logger.info('(%s) *TIME OUT* TX STREAM ID: %s SUB: %s TGID %s, TS %s, Duration: %.2f', \ + system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_RFS']), int_id(_slot['TX_TGID']), slot, _slot['TX_TIME'] - _slot['TX_START']) + if CONFIG['REPORTS']['REPORT']: + systems[system]._report.send_bridgeEvent('GROUP VOICE,END,TX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['TX_STREAM_ID']), int_id(_slot['TX_PEER']), int_id(_slot['TX_RFS']), slot, int_id(_slot['TX_TGID']), _slot['TX_TIME'] - _slot['TX_START']).encode(encoding='utf-8', errors='ignore')) + + # OBP systems + # We can't delete items from a dicationry that's being iterated, so we have to make a temporarly list of entrys to remove later + if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': + remove_list = [] + for stream_id in systems[system].STATUS: + if systems[system].STATUS[stream_id]['LAST'] < _now - 5: + remove_list.append(stream_id) + for stream_id in remove_list: + if stream_id in systems[system].STATUS: + _stream = systems[system].STATUS[stream_id] + _sysconfig = CONFIG['SYSTEMS'][system] + logger.info('(%s) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %.2f', \ + system, int_id(stream_id), get_alias(int_id(_stream['RFS']), subscriber_ids), get_alias(int_id(_sysconfig['NETWORK_ID']), peer_ids), get_alias(int_id(_stream['TGID']), talkgroup_ids), _stream['LAST'] - _stream['START']) + if CONFIG['REPORTS']['REPORT']: + systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_sysconfig['NETWORK_ID']), int_id(_stream['RFS']), 1, int_id(_stream['TGID']), _stream['LAST'] - _stream['START']).encode(encoding='utf-8', errors='ignore')) + removed = systems[system].STATUS.pop(stream_id) + else: + logger.error('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS]) + + except Exception as e: + logger.error('(%s) Rule Timer Loop ERROR: %s', self._system, e) + + await asyncio.sleep(10) class routerOBP(OPENBRIDGE): @@ -708,7 +724,7 @@ class routerHBP(HBSYSTEM): self.STATUS[_slot]['RX_TGID'] = _dst_id self.STATUS[_slot]['RX_TIME'] = pkt_time self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id - +'''' # # Socket-based reporting section # @@ -722,7 +738,7 @@ class bridgeReportFactory(reportFactory): if isinstance(_data, str): _data = _data.decode('utf-8', error='ignore') self.send_clients(REPORT_OPCODES['BRDG_EVENT']+_data) - +''' #************************************************ # MAIN PROGRAM LOOP STARTS HERE @@ -734,6 +750,7 @@ if __name__ == '__main__': import sys import os import signal + import functools # Change the current directory to the location of the application os.chdir(os.path.dirname(os.path.realpath(sys.argv[0]))) @@ -760,10 +777,10 @@ if __name__ == '__main__': # Set up the signal handler def sig_handler(_signal, _frame): - logger.info('(GLOBAL) SHUTDOWN: CONFBRIDGE IS TERMINATING WITH SIGNAL %s', str(_signal)) + logger.info('(GLOBAL) SHUTDOWN: HBLINK IS TERMINATING WITH SIGNAL %s', str(_signal)) hblink_handler(_signal, _frame) - logger.info('(GLOBAL) SHUTDOWN: ALL SYSTEM HANDLERS EXECUTED - STOPPING REACTOR') - reactor.stop() + logger.info('(GLOBAL) SHUTDOWN: ALL SYSTEM HANDLERS EXECUTED - STOPPING ASYNCIO LOOP') + loop.stop() # Set signal handers so that we can gracefully exit if need be for sig in [signal.SIGINT, signal.SIGTERM]: @@ -782,29 +799,35 @@ if __name__ == '__main__': report_server = None logger.info('(REPORT) TCP Socket reporting not configured') - # HBlink instance creation - logger.info('(GLOBAL) HBlink \'bridge.py\' -- SYSTEM STARTING...') - for system in CONFIG['SYSTEMS']: - if CONFIG['SYSTEMS'][system]['ENABLED']: - if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': - systems[system] = routerOBP(system, CONFIG, report_server) - else: - systems[system] = routerHBP(system, CONFIG, report_server) - reactor.listenUDP(CONFIG['SYSTEMS'][system]['PORT'], systems[system], interface=CONFIG['SYSTEMS'][system]['IP']) - logger.debug('(GLOBAL) %s instance created: %s, %s', CONFIG['SYSTEMS'][system]['MODE'], system, systems[system]) + + def loopingErrHandle(failure): logger.error('(GLOBAL) STOPPING REACTOR TO AVOID MEMORY LEAK: Unhandled error in timed loop.\n %s', failure) reactor.stop() # Initialize the rule timer -- this if for user activated stuff - rule_timer_task = task.LoopingCall(rule_timer_loop) - rule_timer = rule_timer_task.start(60) - rule_timer.addErrback(loopingErrHandle) + rule_timer_task = loop.create_task(rule_timer_loop()) + # Initialize the stream trimmer - stream_trimmer_task = task.LoopingCall(stream_trimmer_loop) - stream_trimmer = stream_trimmer_task.start(5) - stream_trimmer.addErrback(loopingErrHandle) + stream_trimmer_tast = loop.create_task(stream_trimmer_loop()) + + + + + + # HBlink instance creation + logger.info('(GLOBAL) HBlink \'bridge.py\' -- SYSTEM STARTING...') + for system in CONFIG['SYSTEMS']: + if CONFIG['SYSTEMS'][system]['ENABLED']: + + if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': + OBPfactory = functools.partial(routerOBP, system, CONFIG, report_server) + transports[system], systems[system] = loop.run_until_complete(loop.create_datagram_endpoint(OBPfactory, local_addr=CONFIG['SYSTEMS'][system]['SOCK_ADDR'])) + else: + HBPfactory = functools.partial(routerHBP, system, CONFIG, report_server) + transports[system], systems[system] = loop.run_until_complete(loop.create_datagram_endpoint(HBPfactory, local_addr=CONFIG['SYSTEMS'][system]['SOCK_ADDR'])) + logger.debug('(GLOBAL) %s instance created: %s, %s', CONFIG['SYSTEMS'][system]['MODE'], system, systems[system]) - reactor.run() + loop.run_forever() diff --git a/hblink.py b/hblink.py index 861021c..43f4b35 100755 --- a/hblink.py +++ b/hblink.py @@ -70,6 +70,9 @@ __email__ = 'n0mjs@me.com' systems = {} transports = {} +# Create the asyncio event loop that drives EVERYTHING +#asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) +loop = asyncio.get_event_loop() # Shut ourselves down gracefully by disconnecting from the masters and peers. def hblink_handler(_signal, _frame): diff --git a/rules_SAMPLE.py b/rules_SAMPLE.py index c1fa40a..213fa8f 100755 --- a/rules_SAMPLE.py +++ b/rules_SAMPLE.py @@ -32,16 +32,17 @@ configuration file. BRIDGES = { 'WORLDWIDE': [ - {'SYSTEM': 'MASTER-1', 'TS': 1, 'TGID': 1, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'ON', 'ON': [2,], 'OFF': [9,10], 'RESET': []}, - {'SYSTEM': 'CLIENT-1', 'TS': 1, 'TGID': 3100, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'ON', 'ON': [2,], 'OFF': [9,10], 'RESET': []}, + {'SYSTEM': 'OBP-1', 'TS': 1, 'TGID': 1, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'NONE', 'ON': [], 'OFF': [], 'RESET': []}, + {'SYSTEM': 'MASTER-1', 'TS': 1, 'TGID': 3100, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'ON', 'ON': [2,], 'OFF': [9,10], 'RESET': []}, ], 'ENGLISH': [ - {'SYSTEM': 'MASTER-1', 'TS': 1, 'TGID': 13, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'NONE', 'ON': [3,], 'OFF': [8,10], 'RESET': []}, - {'SYSTEM': 'CLIENT-2', 'TS': 1, 'TGID': 13, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'NONE', 'ON': [3,], 'OFF': [8,10], 'RESET': []}, + {'SYSTEM': 'MASTER-1', 'TS': 1, 'TGID': 13, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'NONE', 'ON': [3,], 'OFF': [8,10], 'RESET': []}, + {'SYSTEM': 'REPEATER-1', 'TS': 1, 'TGID': 13, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'NONE', 'ON': [3,], 'OFF': [8,10], 'RESET': []}, ], 'STATEWIDE': [ - {'SYSTEM': 'MASTER-1', 'TS': 2, 'TGID': 3129, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'NONE', 'ON': [4,], 'OFF': [7,10], 'RESET': []}, - {'SYSTEM': 'CLIENT-2', 'TS': 2, 'TGID': 3129, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'NONE', 'ON': [4,], 'OFF': [7,10], 'RESET': []}, + {'SYSTEM': 'OBP-1', 'TS': 2, 'TGID': 3129, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'NONE', 'ON': [4,], 'OFF': [7,10], 'RESET': []}, + {'SYSTEM': 'MASTER-1', 'TS': 2, 'TGID': 3129, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'NONE', 'ON': [4,], 'OFF': [7,10], 'RESET': []}, + {'SYSTEM': 'REPEATER-1', 'TS': 2, 'TGID': 3129, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'NONE', 'ON': [4,], 'OFF': [7,10], 'RESET': []}, ] }