General asyncio progress

asyncio
Cort Buffington 7 years ago
parent 05c05cb027
commit e6f11fa380

@ -37,12 +37,14 @@ from time import time
from importlib import import_module
# Twisted is pretty important, so I keep it separate
from twisted.internet.protocol import Factory, Protocol
from twisted.protocols.basic import NetstringReceiver
from twisted.internet import reactor, task
#from twisted.internet.protocol import Factory, Protocol
#from twisted.protocols.basic import NetstringReceiver
#from twisted.internet import reactor, task
import asyncio
import uvloop
# Things we import from the main hblink module
from hblink import HBSYSTEM, OPENBRIDGE, systems, hblink_handler, reportFactory, REPORT_OPCODES, mk_aliases
from hblink import HBSYSTEM, OPENBRIDGE, systems, transports, hblink_handler, mk_aliases, loop#, reportFactory, REPORT_OPCODES
from dmr_utils3.utils import bytes_3, int_id, get_alias
from dmr_utils3 import decode, bptc, const
import config
@ -121,7 +123,10 @@ def make_bridges(_rules):
# Run this every minute for rule timer updates
def rule_timer_loop():
async def rule_timer_loop():
while True:
try:
logger.debug('(ROUTER) routerHBP Rule timer loop started')
_now = time()
@ -150,12 +155,18 @@ def rule_timer_loop():
else:
logger.debug('(ROUTER) Conference Bridge NO ACTION: System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID']))
if CONFIG['REPORTS']['REPORT']:
report_server.send_clients(b'bridge updated')
except Exception as e:
logger.error('(%s) Rule Timer Loop ERROR: %s', self._system, e)
await asyncio.sleep(60)
# run this every 10 seconds to trim orphaned stream ids
def stream_trimmer_loop():
async def stream_trimmer_loop():
while True:
try:
logger.debug('(ROUTER) Trimming inactive stream IDs from system lists')
_now = time()
@ -200,6 +211,11 @@ def stream_trimmer_loop():
else:
logger.error('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS])
except Exception as e:
logger.error('(%s) Rule Timer Loop ERROR: %s', self._system, e)
await asyncio.sleep(10)
class routerOBP(OPENBRIDGE):
def __init__(self, _name, _config, _report):
@ -708,7 +724,7 @@ class routerHBP(HBSYSTEM):
self.STATUS[_slot]['RX_TGID'] = _dst_id
self.STATUS[_slot]['RX_TIME'] = pkt_time
self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id
''''
#
# Socket-based reporting section
#
@ -722,7 +738,7 @@ class bridgeReportFactory(reportFactory):
if isinstance(_data, str):
_data = _data.decode('utf-8', error='ignore')
self.send_clients(REPORT_OPCODES['BRDG_EVENT']+_data)
'''
#************************************************
# MAIN PROGRAM LOOP STARTS HERE
@ -734,6 +750,7 @@ if __name__ == '__main__':
import sys
import os
import signal
import functools
# Change the current directory to the location of the application
os.chdir(os.path.dirname(os.path.realpath(sys.argv[0])))
@ -760,10 +777,10 @@ if __name__ == '__main__':
# Set up the signal handler
def sig_handler(_signal, _frame):
logger.info('(GLOBAL) SHUTDOWN: CONFBRIDGE IS TERMINATING WITH SIGNAL %s', str(_signal))
logger.info('(GLOBAL) SHUTDOWN: HBLINK IS TERMINATING WITH SIGNAL %s', str(_signal))
hblink_handler(_signal, _frame)
logger.info('(GLOBAL) SHUTDOWN: ALL SYSTEM HANDLERS EXECUTED - STOPPING REACTOR')
reactor.stop()
logger.info('(GLOBAL) SHUTDOWN: ALL SYSTEM HANDLERS EXECUTED - STOPPING ASYNCIO LOOP')
loop.stop()
# Set signal handers so that we can gracefully exit if need be
for sig in [signal.SIGINT, signal.SIGTERM]:
@ -782,29 +799,35 @@ if __name__ == '__main__':
report_server = None
logger.info('(REPORT) TCP Socket reporting not configured')
# HBlink instance creation
logger.info('(GLOBAL) HBlink \'bridge.py\' -- SYSTEM STARTING...')
for system in CONFIG['SYSTEMS']:
if CONFIG['SYSTEMS'][system]['ENABLED']:
if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE':
systems[system] = routerOBP(system, CONFIG, report_server)
else:
systems[system] = routerHBP(system, CONFIG, report_server)
reactor.listenUDP(CONFIG['SYSTEMS'][system]['PORT'], systems[system], interface=CONFIG['SYSTEMS'][system]['IP'])
logger.debug('(GLOBAL) %s instance created: %s, %s', CONFIG['SYSTEMS'][system]['MODE'], system, systems[system])
def loopingErrHandle(failure):
logger.error('(GLOBAL) STOPPING REACTOR TO AVOID MEMORY LEAK: Unhandled error in timed loop.\n %s', failure)
reactor.stop()
# Initialize the rule timer -- this if for user activated stuff
rule_timer_task = task.LoopingCall(rule_timer_loop)
rule_timer = rule_timer_task.start(60)
rule_timer.addErrback(loopingErrHandle)
rule_timer_task = loop.create_task(rule_timer_loop())
# Initialize the stream trimmer
stream_trimmer_task = task.LoopingCall(stream_trimmer_loop)
stream_trimmer = stream_trimmer_task.start(5)
stream_trimmer.addErrback(loopingErrHandle)
stream_trimmer_tast = loop.create_task(stream_trimmer_loop())
# HBlink instance creation
logger.info('(GLOBAL) HBlink \'bridge.py\' -- SYSTEM STARTING...')
for system in CONFIG['SYSTEMS']:
if CONFIG['SYSTEMS'][system]['ENABLED']:
if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE':
OBPfactory = functools.partial(routerOBP, system, CONFIG, report_server)
transports[system], systems[system] = loop.run_until_complete(loop.create_datagram_endpoint(OBPfactory, local_addr=CONFIG['SYSTEMS'][system]['SOCK_ADDR']))
else:
HBPfactory = functools.partial(routerHBP, system, CONFIG, report_server)
transports[system], systems[system] = loop.run_until_complete(loop.create_datagram_endpoint(HBPfactory, local_addr=CONFIG['SYSTEMS'][system]['SOCK_ADDR']))
logger.debug('(GLOBAL) %s instance created: %s, %s', CONFIG['SYSTEMS'][system]['MODE'], system, systems[system])
reactor.run()
loop.run_forever()

@ -70,6 +70,9 @@ __email__ = 'n0mjs@me.com'
systems = {}
transports = {}
# Create the asyncio event loop that drives EVERYTHING
#asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop = asyncio.get_event_loop()
# Shut ourselves down gracefully by disconnecting from the masters and peers.
def hblink_handler(_signal, _frame):

@ -32,16 +32,17 @@ configuration file.
BRIDGES = {
'WORLDWIDE': [
{'SYSTEM': 'MASTER-1', 'TS': 1, 'TGID': 1, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'ON', 'ON': [2,], 'OFF': [9,10], 'RESET': []},
{'SYSTEM': 'CLIENT-1', 'TS': 1, 'TGID': 3100, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'ON', 'ON': [2,], 'OFF': [9,10], 'RESET': []},
{'SYSTEM': 'OBP-1', 'TS': 1, 'TGID': 1, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'NONE', 'ON': [], 'OFF': [], 'RESET': []},
{'SYSTEM': 'MASTER-1', 'TS': 1, 'TGID': 3100, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'ON', 'ON': [2,], 'OFF': [9,10], 'RESET': []},
],
'ENGLISH': [
{'SYSTEM': 'MASTER-1', 'TS': 1, 'TGID': 13, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'NONE', 'ON': [3,], 'OFF': [8,10], 'RESET': []},
{'SYSTEM': 'CLIENT-2', 'TS': 1, 'TGID': 13, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'NONE', 'ON': [3,], 'OFF': [8,10], 'RESET': []},
{'SYSTEM': 'REPEATER-1', 'TS': 1, 'TGID': 13, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'NONE', 'ON': [3,], 'OFF': [8,10], 'RESET': []},
],
'STATEWIDE': [
{'SYSTEM': 'OBP-1', 'TS': 2, 'TGID': 3129, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'NONE', 'ON': [4,], 'OFF': [7,10], 'RESET': []},
{'SYSTEM': 'MASTER-1', 'TS': 2, 'TGID': 3129, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'NONE', 'ON': [4,], 'OFF': [7,10], 'RESET': []},
{'SYSTEM': 'CLIENT-2', 'TS': 2, 'TGID': 3129, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'NONE', 'ON': [4,], 'OFF': [7,10], 'RESET': []},
{'SYSTEM': 'REPEATER-1', 'TS': 2, 'TGID': 3129, 'ACTIVE': True, 'TIMEOUT': 2, 'TO_TYPE': 'NONE', 'ON': [4,], 'OFF': [7,10], 'RESET': []},
]
}

Loading…
Cancel
Save

Powered by TurnKey Linux.