@ -148,7 +148,7 @@ def rule_timer_loop():
report_server . send_clients ( b ' bridge updated ' )
# run this every 10 seconds to trim orphaned stream ids
# # run this every 10 seconds to trim orphaned stream ids
def stream_trimmer_loop ( ) :
logger . debug ( ' (ROUTER) Trimming inactive stream IDs from system lists ' )
_now = time ( )
@ -166,6 +166,9 @@ def stream_trimmer_loop():
system , int_id ( _slot [ ' RX_STREAM_ID ' ] ) , int_id ( _slot [ ' RX_RFS ' ] ) , int_id ( _slot [ ' RX_TGID ' ] ) , slot , _slot [ ' RX_TIME ' ] - _slot [ ' RX_START ' ] )
if CONFIG [ ' REPORTS ' ] [ ' REPORT ' ] :
systems [ system ] . _report . send_bridgeEvent ( ' GROUP VOICE,END,RX, {} , {} , {} , {} , {} , {} , {:.2f} ' . format ( system , int_id ( _slot [ ' RX_STREAM_ID ' ] ) , int_id ( _slot [ ' RX_PEER ' ] ) , int_id ( _slot [ ' RX_RFS ' ] ) , slot , int_id ( _slot [ ' RX_TGID ' ] ) , _slot [ ' RX_TIME ' ] - _slot [ ' RX_START ' ] ) . encode ( encoding = ' utf-8 ' , errors = ' ignore ' ) )
#Null stream_id - for loop control
if _slot [ ' RX_TIME ' ] < _now - 60 :
_slot [ ' RX_STREAM_ID ' ] = b ' \x00 '
# TX slot check
if _slot [ ' TX_TYPE ' ] != HBPF_SLT_VTERM and _slot [ ' TX_TIME ' ] < _now - 5 :
@ -179,23 +182,60 @@ def stream_trimmer_loop():
# We can't delete items from a dicationry that's being iterated, so we have to make a temporarly list of entrys to remove later
if CONFIG [ ' SYSTEMS ' ] [ system ] [ ' MODE ' ] == ' OPENBRIDGE ' :
remove_list = [ ]
fin_list = [ ]
for stream_id in systems [ system ] . STATUS :
#if stream already marked as finished, just remove it
if ' _fin ' in systems [ system ] . STATUS [ stream_id ] and systems [ system ] . STATUS [ stream_id ] [ ' LAST ' ] < _now - 180 :
logger . info ( ' ( %s ) *FINISHED STREAM* STREAM ID: %s ' , system , int_id ( stream_id ) )
fin_list . append ( stream_id )
continue
#try:
if ' _to ' not in systems [ system ] . STATUS [ stream_id ] and ' _fin ' not in systems [ system ] . STATUS [ stream_id ] and systems [ system ] . STATUS [ stream_id ] [ ' LAST ' ] < _now - 5 :
_stream = systems [ system ] . STATUS [ stream_id ]
_sysconfig = CONFIG [ ' SYSTEMS ' ] [ system ]
#systems[system].STATUS[stream_id]['_fin'] = True
logger . info ( ' ( %s ) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %.2f ' , \
system , int_id ( stream_id ) , get_alias ( int_id ( _stream [ ' RFS ' ] ) , subscriber_ids ) , get_alias ( int_id ( _sysconfig [ ' NETWORK_ID ' ] ) , peer_ids ) , get_alias ( int_id ( _stream [ ' TGID ' ] ) , talkgroup_ids ) , _stream [ ' LAST ' ] - _stream [ ' START ' ] )
if CONFIG [ ' REPORTS ' ] [ ' REPORT ' ] :
systems [ system ] . _report . send_bridgeEvent ( ' GROUP VOICE,END,RX, {} , {} , {} , {} , {} , {} , {:.2f} ' . format ( system , int_id ( stream_id ) , int_id ( _sysconfig [ ' NETWORK_ID ' ] ) , int_id ( _stream [ ' RFS ' ] ) , 1 , int_id ( _stream [ ' TGID ' ] ) , _stream [ ' LAST ' ] - _stream [ ' START ' ] ) . encode ( encoding = ' utf-8 ' , errors = ' ignore ' ) )
systems [ system ] . STATUS [ stream_id ] [ ' _to ' ] = True
continue
#except:
#logger.warning("(%s) Keyerror - stream trimmer Stream ID: %s",system,stream_id)
#systems[system].STATUS[stream_id]['LAST'] = _now
#continue
try :
if systems [ system ] . STATUS [ stream_id ] [ ' LAST ' ] < _now - 5 :
if systems [ system ] . STATUS [ stream_id ] [ ' LAST ' ] < _now - 180 :
remove_list . append ( stream_id )
except :
logger . debug ( " ( %s ) Keyerror - stream trimmer Stream ID: %s Start: %s Contention: %s RFS: %s TGID: %s " , stream_id , systems [ system ] . STATUS [ stream_id ] [ ' START ' ] , systems [ system ] . STATUS [ stream_id ] [ ' CONTENTION ' ] , systems [ system ] . STATUS [ stream_id ] [ ' RFS ' ] , int_id ( systems [ system ] . STATUS [ stream_id ] [ ' TGID ' ] ) )
logger . warnin g( " ( %s ) Keyerror - stream trimmer Stream ID: %s ", s ystem, s tream_id)
systems [ system ] . STATUS [ stream_id ] [ ' LAST ' ] = _now
continue
#remove finished
for stream_id in fin_list :
removed = systems [ system ] . STATUS . pop ( stream_id )
for stream_id in remove_list :
if stream_id in systems [ system ] . STATUS :
_stream = systems [ system ] . STATUS [ stream_id ]
_sysconfig = CONFIG [ ' SYSTEMS ' ] [ system ]
logger . info ( ' ( %s ) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %.2f ' , \
system , int_id ( stream_id ) , get_alias ( int_id ( _stream [ ' RFS ' ] ) , subscriber_ids ) , get_alias ( int_id ( _sysconfig [ ' NETWORK_ID ' ] ) , peer_ids ) , get_alias ( int_id ( _stream [ ' TGID ' ] ) , talkgroup_ids ) , _stream [ ' LAST ' ] - _stream [ ' START ' ] )
if CONFIG [ ' REPORTS ' ] [ ' REPORT ' ] :
systems [ system ] . _report . send_bridgeEvent ( ' GROUP VOICE,END,RX, {} , {} , {} , {} , {} , {} , {:.2f} ' . format ( system , int_id ( stream_id ) , int_id ( _sysconfig [ ' NETWORK_ID ' ] ) , int_id ( _stream [ ' RFS ' ] ) , 1 , int_id ( _stream [ ' TGID ' ] ) , _stream [ ' LAST ' ] - _stream [ ' START ' ] ) . encode ( encoding = ' utf-8 ' , errors = ' ignore ' ) )
removed = systems [ system ] . STATUS . pop ( stream_id )
try :
_bcsq_remove = [ ]
for tgid in _sysconfig [ ' _bcsq ' ] :
if _sysconfig [ ' _bcsq ' ] [ tgid ] == stream_id :
_bcsq_remove . append ( tgid )
for bcrm in _bcsq_remove :
removed = _sysconfig [ ' _bcsq ' ] . pop ( bcrm )
except KeyError :
pass
else :
logger . error ( ' ( %s ) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s ' , system , int_id ( stream_id ) , [ id for id in systems [ system ] . STATUS ] )
@ -213,18 +253,6 @@ class routerOBP(OPENBRIDGE):
pkt_time = time ( )
dmrpkt = _data [ 20 : 53 ]
_bits = _data [ 15 ]
#Handle inbound duplicates
if _seq == True and _seq == self . _lastSeq :
logger . debug ( " %s ) Duplicate sequence number %s , disgarding " , self . _system , _seq )
return
#Inbound out-of-order packets
elif _seq == True and ( _seq != 1 ) and ( _seq < self . _lastSeq ) :
logger . debug ( " %s ) Out of order packet - last sequence number %s , this sequence number %s , disgarding " , self . _system , self . _lastSeq , _seq )
return
#Inbound missed packets
elif _seq == True and _seq > ( self . _lastSeq + 1 ) :
logger . debug ( " ( %s ) Missed packet - last sequence number %s , this sequence number %s " , self . _system , self . _lastSeq , _seq )
if _call_type == ' group ' :
# Is this a new call stream?
@ -235,6 +263,8 @@ class routerOBP(OPENBRIDGE):
' CONTENTION ' : False ,
' RFS ' : _rf_src ,
' TGID ' : _dst_id ,
' lastSeq ' : False ,
' lastData ' : False
}
# If we can, use the LC from the voice header as to keep all options intact
@ -252,6 +282,63 @@ class routerOBP(OPENBRIDGE):
self . _system , int_id ( _stream_id ) , get_alias ( _rf_src , subscriber_ids ) , int_id ( _rf_src ) , get_alias ( _peer_id , peer_ids ) , int_id ( _peer_id ) , get_alias ( _dst_id , talkgroup_ids ) , int_id ( _dst_id ) , _slot )
if CONFIG [ ' REPORTS ' ] [ ' REPORT ' ] :
self . _report . send_bridgeEvent ( ' GROUP VOICE,START,RX, {} , {} , {} , {} , {} , {} ' . format ( self . _system , int_id ( _stream_id ) , int_id ( _peer_id ) , int_id ( _rf_src ) , _slot , int_id ( _dst_id ) ) . encode ( encoding = ' utf-8 ' , errors = ' ignore ' ) )
else :
#Finished stream handling#
if ' _fin ' in self . STATUS [ _stream_id ] :
if ' _finlog ' not in self . STATUS [ _stream_id ] :
logger . warning ( " ( %s ) OBP *LoopControl* STREAM ID: %s ALREADY FINISHED FROM THIS SOURCE, IGNORING " , self . _system , int_id ( _stream_id ) )
self . STATUS [ _stream_id ] [ ' _finlog ' ] = True
return
#LoopControl#
for system in systems :
if system == self . _system :
continue
if CONFIG [ ' SYSTEMS ' ] [ system ] [ ' MODE ' ] != ' OPENBRIDGE ' :
for _sysslot in systems [ system ] . STATUS :
if ' RX_STREAM_ID ' in systems [ system ] . STATUS [ _sysslot ] and _stream_id == systems [ system ] . STATUS [ _sysslot ] [ ' RX_STREAM_ID ' ] :
if ' LOOPLOG ' not in self . STATUS [ _stream_id ] or not self . STATUS [ _stream_id ] [ ' LOOPLOG ' ] :
logger . warning ( " ( %s ) OBP *LoopControl* FIRST HBP: %s , STREAM ID: %s , TG: %s , TS: %s , IGNORE THIS SOURCE " , self . _system , system , int_id ( _stream_id ) , int_id ( _dst_id ) , _sysslot )
self . STATUS [ _stream_id ] [ ' LOOPLOG ' ] = True
self . STATUS [ _stream_id ] [ ' LAST ' ] = pkt_time
return
else :
#if _stream_id in systems[system].STATUS and systems[system].STATUS[_stream_id]['START'] <= self.STATUS[_stream_id]['START']:
if _stream_id in systems [ system ] . STATUS and ' 1ST ' in systems [ system ] . STATUS [ _stream_id ] and systems [ system ] . STATUS [ _stream_id ] [ ' TGID ' ] == _dst_id :
if ' LOOPLOG ' not in self . STATUS [ _stream_id ] or not self . STATUS [ _stream_id ] [ ' LOOPLOG ' ] :
logger . warning ( " ( %s ) OBP *LoopControl* FIRST OBP %s , STREAM ID: %s , TG %s , IGNORE THIS SOURCE " , self . _system , system , int_id ( _stream_id ) , int_id ( _dst_id ) )
self . STATUS [ _stream_id ] [ ' LOOPLOG ' ] = True
self . STATUS [ _stream_id ] [ ' LAST ' ] = pkt_time
if CONFIG [ ' SYSTEMS ' ] [ self . _system ] [ ' ENHANCED_OBP ' ] and ' _bcsq ' not in self . STATUS [ _stream_id ] :
systems [ self . _system ] . send_bcsq ( _dst_id , _stream_id )
#logger.warning("(%s) OBP *BridgeControl* Sent BCSQ , STREAM ID: %s, TG %s",self._system, int_id(_stream_id), int_id(_dst_id))
self . STATUS [ _stream_id ] [ ' _bcsq ' ] = True
return
#Duplicate handling#
#Duplicate complete packet
if self . STATUS [ _stream_id ] [ ' lastData ' ] and self . STATUS [ _stream_id ] [ ' lastData ' ] == _data and _seq > 1 :
logger . warning ( " ( %s ) *PacketControl* last packet is a complete duplicate of the previous one, disgarding. Stream ID:, %s TGID: %s " , self . _system , int_id ( _stream_id ) , int_id ( _dst_id ) )
return
#Handle inbound duplicates
if _seq and _seq == self . STATUS [ _stream_id ] [ ' lastSeq ' ] :
logger . warning ( " ( %s ) *PacketControl* Duplicate sequence number %s , disgarding. Stream ID:, %s TGID: %s " , self . _system , _seq , int_id ( _stream_id ) , int_id ( _dst_id ) )
return
#Inbound out-of-order packets
if _seq and self . STATUS [ _stream_id ] [ ' lastSeq ' ] and ( _seq != 1 ) and ( _seq < self . STATUS [ _stream_id ] [ ' lastSeq ' ] ) :
logger . warning ( " %s ) *PacketControl* Out of order packet - last SEQ: %s , this SEQ: %s , disgarding. Stream ID:, %s TGID: %s " , self . _system , self . STATUS [ _stream_id ] [ ' lastSeq ' ] , _seq , int_id ( _stream_id ) , int_id ( _dst_id ) )
return
#Inbound missed packets
if _seq and self . STATUS [ _stream_id ] [ ' lastSeq ' ] and _seq > ( self . STATUS [ _stream_id ] [ ' lastSeq ' ] + 1 ) :
logger . warning ( " ( %s ) *PacketControl* Missed packet(s) - last SEQ: %s , this SEQ: %s . Stream ID:, %s TGID: %s " , self . _system , self . STATUS [ _stream_id ] [ ' lastSeq ' ] , _seq , int_id ( _stream_id ) , int_id ( _dst_id ) )
#Save this sequence number
self . STATUS [ _stream_id ] [ ' lastSeq ' ] = _seq
#Save this packet
self . STATUS [ _stream_id ] [ ' lastData ' ] = _data
self . STATUS [ _stream_id ] [ ' LAST ' ] = pkt_time
@ -408,10 +495,11 @@ class routerOBP(OPENBRIDGE):
self . _system , int_id ( _stream_id ) , get_alias ( _rf_src , subscriber_ids ) , int_id ( _rf_src ) , get_alias ( _peer_id , peer_ids ) , int_id ( _peer_id ) , get_alias ( _dst_id , talkgroup_ids ) , int_id ( _dst_id ) , _slot , call_duration )
if CONFIG [ ' REPORTS ' ] [ ' REPORT ' ] :
self . _report . send_bridgeEvent ( ' GROUP VOICE,END,RX, {} , {} , {} , {} , {} , {} , {:.2f} ' . format ( self . _system , int_id ( _stream_id ) , int_id ( _peer_id ) , int_id ( _rf_src ) , _slot , int_id ( _dst_id ) , call_duration ) . encode ( encoding = ' utf-8 ' , errors = ' ignore ' ) )
removed = self . STATUS . pop ( _stream_id )
logger . debug ( ' ( %s ) OpenBridge sourced call stream end, remove terminated Stream ID: %s ' , self . _system , int_id ( _stream_id ) )
if not removed :
selflogger . error ( ' ( %s ) *CALL END* STREAM ID: %s NOT IN LIST -- THIS IS A REAL PROBLEM ' , self . _system , int_id ( _stream_id ) )
self . STATUS [ _stream_id ] [ ' _fin ' ] = True
#removed = self.STATUS.pop(_stream_id)
#logger.debug('(%s) OpenBridge sourced call stream end, remove terminated Stream ID: %s', self._system, int_id(_stream_id))
#if not removed:
#selflogger.error('(%s) *CALL END* STREAM ID: %s NOT IN LIST -- THIS IS A REAL PROBLEM', self._system, int_id(_stream_id))
#Reset sequence number
self . _lastSeq = False
@ -449,7 +537,10 @@ class routerHBP(HBSYSTEM):
2 : b ' \x00 ' ,
3 : b ' \x00 ' ,
4 : b ' \x00 ' ,
}
} ,
' lastSeq ' : False ,
' lastData ' : False
} ,
2 : {
' RX_START ' : time ( ) ,
@ -475,7 +566,9 @@ class routerHBP(HBSYSTEM):
2 : b ' \x00 ' ,
3 : b ' \x00 ' ,
4 : b ' \x00 ' ,
}
} ,
' lastSeq ' : False ,
' lastData ' : False
}
}
@ -509,6 +602,56 @@ class routerHBP(HBSYSTEM):
else :
self . STATUS [ _slot ] [ ' RX_LC ' ] = LC_OPT + _dst_id + _rf_src
#LoopControl#
for system in systems :
if system == self . _system :
continue
if CONFIG [ ' SYSTEMS ' ] [ system ] [ ' MODE ' ] != ' OPENBRIDGE ' :
for _sysslot in systems [ system ] . STATUS :
if ' RX_STREAM_ID ' in systems [ system ] . STATUS [ _sysslot ] and _stream_id == systems [ system ] . STATUS [ _sysslot ] [ ' RX_STREAM_ID ' ] :
if ' LOOPLOG ' not in self . STATUS [ _slot ] or not self . STATUS [ _slot ] [ ' LOOPLOG ' ] :
logger . warning ( " ( %s ) OBP *LoopControl* FIRST HBP: %s , STREAM ID: %s , TG: %s , TS: %s , IGNORE THIS SOURCE " , self . _system , system , int_id ( _stream_id ) , int_id ( _dst_id ) , _sysslot )
self . STATUS [ _slot ] [ ' LOOPLOG ' ] = True
self . STATUS [ _slot ] [ ' LAST ' ] = pkt_time
return
else :
#if _stream_id in systems[system].STATUS and systems[system].STATUS[_stream_id]['START'] <= self.STATUS[_stream_id]['START']:
if _stream_id in systems [ system ] . STATUS and ' 1ST ' in systems [ system ] . STATUS [ _stream_id ] and systems [ system ] . STATUS [ _stream_id ] [ ' TGID ' ] == _dst_id :
if ' LOOPLOG ' not in self . STATUS [ _slot ] or not self . STATUS [ _slot ] [ ' LOOPLOG ' ] :
logger . warning ( " ( %s ) OBP *LoopControl* FIRST OBP %s , STREAM ID: %s , TG %s , IGNORE THIS SOURCE " , self . _system , system , int_id ( _stream_id ) , int_id ( _dst_id ) )
self . STATUS [ _slot ] [ ' LOOPLOG ' ] = True
self . STATUS [ _slot ] [ ' LAST ' ] = pkt_time
if CONFIG [ ' SYSTEMS ' ] [ self . _system ] [ ' ENHANCED_OBP ' ] and ' _bcsq ' not in self . STATUS [ _slot ] :
systems [ self . _system ] . send_bcsq ( _dst_id , _stream_id )
#logger.warning("(%s) OBP *BridgeControl* Sent BCSQ , STREAM ID: %s, TG %s",self._system, int_id(_stream_id), int_id(_dst_id))
self . STATUS [ _slot ] [ ' _bcsq ' ] = True
return
#Duplicate handling#
#Duplicate complete packet
if self . STATUS [ _slot ] [ ' lastData ' ] and self . STATUS [ _slot ] [ ' lastData ' ] == _data and _seq > 1 :
logger . warning ( " ( %s ) *PacketControl* last packet is a complete duplicate of the previous one, disgarding. Stream ID:, %s TGID: %s " , self . _system , int_id ( _stream_id ) , int_id ( _dst_id ) )
return
#Handle inbound duplicates
if _seq and _seq == self . STATUS [ _slot ] [ ' lastSeq ' ] :
logger . warning ( " ( %s ) *PacketControl* Duplicate sequence number %s , disgarding. Stream ID:, %s TGID: %s " , self . _system , _seq , int_id ( _stream_id ) , int_id ( _dst_id ) )
return
#Inbound out-of-order packets
if _seq and self . STATUS [ _slot ] [ ' lastSeq ' ] and ( _seq != 1 ) and ( _seq < self . STATUS [ _slot ] [ ' lastSeq ' ] ) :
logger . warning ( " %s ) *PacketControl* Out of order packet - last SEQ: %s , this SEQ: %s , disgarding. Stream ID:, %s TGID: %s " , self . _system , self . STATUS [ _slot ] [ ' lastSeq ' ] , _seq , int_id ( _stream_id ) , int_id ( _dst_id ) )
return
#Inbound missed packets
if _seq and self . STATUS [ _slot ] [ ' lastSeq ' ] and _seq > ( self . STATUS [ _slot ] [ ' lastSeq ' ] + 1 ) :
logger . warning ( " ( %s ) *PacketControl* Missed packet(s) - last SEQ: %s , this SEQ: %s . Stream ID:, %s TGID: %s " , self . _system , self . STATUS [ _slot ] [ ' lastSeq ' ] , _seq , int_id ( _stream_id ) , int_id ( _dst_id ) )
#Save this sequence number
self . STATUS [ _slot ] [ ' lastSeq ' ] = _seq
#Save this packet
self . STATUS [ _slot ] [ ' lastData ' ] = _data
for _bridge in BRIDGES :
for _system in BRIDGES [ _bridge ] :