diff --git a/bridge_master.py b/bridge_master.py index 3f6a666..1a965c7 100644 --- a/bridge_master.py +++ b/bridge_master.py @@ -377,6 +377,19 @@ def kaReporting(): logger.warning('(ROUTER) not sending to system %s as KeepAlive never seen',system) elif CONFIG['SYSTEMS'][system]['_bcka'] < time() - 60: logger.warning('(ROUTER) not sending to system %s as last KeepAlive was %s seconds ago',system, int(time() - CONFIG['SYSTEMS'][system]['_bcka'])) + +#Subscriber Map trimmer loop +def SubMapTrimmer(): + logger.debug('(SUBSCRIBER) Subscriber Map trimmer loop started') + _sub_time = time() + _remove_list = [] + for _subscriber in SUB_MAP: + if _subscriber[2] < (_sub_time - 86400): + _remove.append(_subscriber) + + for _remove in _remove_list: + SUB_MAP.pop(_remove) + # run this every 10 seconds to trim stream ids def stream_trimmer_loop(): @@ -1829,8 +1842,7 @@ class routerHBP(HBSYSTEM): _data_call = True SUB_MAP[_rf_src] = (self._system,_slot,pkt_time) - logger.info('(%s) Added subscriber %s to SUB_MAP. Slot: %s, Time: %s', self._system,int_id(_rf_src),_slot,pkt_time) - + if _dtype_vseq == 3: logger.info('(%s) *UNIT CSBK* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) DST_ID %s (%s), TS %s', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) @@ -1896,14 +1908,11 @@ class routerHBP(HBSYSTEM): elif CONFIG['SYSTEMS'][CONFIG['GLOBAL']['DATA_GATEWAY']]['MODE'] != 'OPENBRIDGE': logger.warning('(%s) UNIT Data not Bridged - DATA_GATEWAY: %s not OPENBRIDGE. DST_ID: %s',self._system, CONFIG['GLOBAL']['DATA_GATEWAY'],_int_dst_id) - logger.info(SUB_MAP) - logger.info(_dst_id) if _dst_id in SUB_MAP: (_d_system,_d_slot,_d_time) = SUB_MAP[_dst_id] _dst_slot = systems[_d_system].STATUS[_d_slot] logger.info('(%s) SUB_MAP matched, System: %s Slot: %s, Time: %s',self._system, _d_system,_d_slot,_d_time) #If slot is idle for RX and TX - #print("RX:"+str(_slot['RX_TYPE'])+" TX:"+str(_slot['TX_TYPE'])+" TIME:"+str(time() - _slot['TX_TIME'])) if (_dst_slot['RX_TYPE'] == HBPF_SLT_VTERM) and (_dst_slot['TX_TYPE'] == HBPF_SLT_VTERM) and (time() - _dst_slot['TX_TIME'] > CONFIG['SYSTEMS'][_d_system]['GROUP_HANGTIME']): #Currently we send on the same slot received on _tmp_bits = _bits # & ~(1 << 7) @@ -2554,6 +2563,11 @@ if __name__ == '__main__': ka = ka_task.start(60) ka.addErrback(loopingErrHandle) + #Subscriber map trimmer + sub_trimmer_task = task.LoopingCall(SubMapTrimmer) + sub_trimmer = stat_trimmer_task.start(3600)#3600 + sub_trimmer.addErrback(loopingErrHandle) + #more threads reactor.suggestThreadPoolSize(100)