@ -440,8 +440,13 @@ def stream_trimmer_loop():
# RX slot check
if _slot [ ' RX_TYPE ' ] != HBPF_SLT_VTERM and _slot [ ' RX_TIME ' ] < _now - 5 :
_slot [ ' RX_TYPE ' ] = HBPF_SLT_VTERM
logger . debug ( ' ( %s ) *TIME OUT* RX STREAM ID: %s SUB: %s TGID %s , TS %s , Duration: %.2f ' , \
system , int_id ( _slot [ ' RX_STREAM_ID ' ] ) , int_id ( _slot [ ' RX_RFS ' ] ) , int_id ( _slot [ ' RX_TGID ' ] ) , slot , _slot [ ' RX_TIME ' ] - _slot [ ' RX_START ' ] )
if ' loss ' in _slot and ' packets ' in _slot and _slot [ ' packets ' ] :
loss = ( _slot [ ' loss ' ] / _slot [ ' packets ' ] ) * 100
logger . info ( ' ( %s ) *TIME OUT* RX STREAM ID: %s SUB: %s TGID %s , TS %s , Duration: %.2f , LOSS: %.2f %% ' , \
system , int_id ( _slot [ ' RX_STREAM_ID ' ] ) , int_id ( _slot [ ' RX_RFS ' ] ) , int_id ( _slot [ ' RX_TGID ' ] ) , slot , _slot [ ' RX_TIME ' ] - _slot [ ' RX_START ' ] , loss )
else :
logger . info ( ' ( %s ) *TIME OUT* RX STREAM ID: %s SUB: %s TGID %s , TS %s , Duration: %.2f ' , \
system , int_id ( _slot [ ' RX_STREAM_ID ' ] ) , int_id ( _slot [ ' RX_RFS ' ] ) , int_id ( _slot [ ' RX_TGID ' ] ) , slot , _slot [ ' RX_TIME ' ] - _slot [ ' RX_START ' ] )
if CONFIG [ ' REPORTS ' ] [ ' REPORT ' ] :
systems [ system ] . _report . send_bridgeEvent ( ' GROUP VOICE,END,RX, {} , {} , {} , {} , {} , {} , {:.2f} ' . format ( system , int_id ( _slot [ ' RX_STREAM_ID ' ] ) , int_id ( _slot [ ' RX_PEER ' ] ) , int_id ( _slot [ ' RX_RFS ' ] ) , slot , int_id ( _slot [ ' RX_TGID ' ] ) , _slot [ ' RX_TIME ' ] - _slot [ ' RX_START ' ] ) . encode ( encoding = ' utf-8 ' , errors = ' ignore ' ) )
#Null stream_id - for loop control
@ -481,15 +486,23 @@ def stream_trimmer_loop():
logger . debug ( ' ( %s ) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 (BCSQ) ' , \
system , int_id ( stream_id ) , get_alias ( int_id ( _stream [ ' RFS ' ] ) , subscriber_ids ) , get_alias ( int_id ( _stream [ ' RX_PEER ' ] ) , peer_ids ) , get_alias ( int_id ( _stream [ ' TGID ' ] ) , talkgroup_ids ) )
else :
logger . debug ( ' ( %s ) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %.2f ' , \
system , int_id ( stream_id ) , get_alias ( int_id ( _stream [ ' RFS ' ] ) , subscriber_ids ) , get_alias ( int_id ( _stream [ ' RX_PEER ' ] ) , peer_ids ) , get_alias ( int_id ( _stream [ ' TGID ' ] ) , talkgroup_ids ) , _stream [ ' LAST ' ] - _stream [ ' START ' ] )
if ' loss ' in _stream and ' packets ' in _stream and _stream [ ' packets ' ] :
loss = _stream [ ' loss ' ] / _stream [ ' packets ' ] * 100
#Only report this at INFO level if it has loss information as this will be a source
#stream not a target stream
#These represent streams where the stream has been lost - i.e. no TERM packet.
logger . info ( ' ( %s ) *TIME OUT - STREAM LOST* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %.2f , Loss: %.2f %% ' , \
system , int_id ( stream_id ) , get_alias ( int_id ( _stream [ ' RFS ' ] ) , subscriber_ids ) , get_alias ( int_id ( _stream [ ' RX_PEER ' ] ) , peer_ids ) , get_alias ( int_id ( _stream [ ' TGID ' ] ) , talkgroup_ids ) , _stream [ ' LAST ' ] - _stream [ ' START ' ] , loss )
else :
logger . debug ( ' ( %s ) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %.2f ' , \
system , int_id ( stream_id ) , get_alias ( int_id ( _stream [ ' RFS ' ] ) , subscriber_ids ) , get_alias ( int_id ( _stream [ ' RX_PEER ' ] ) , peer_ids ) , get_alias ( int_id ( _stream [ ' TGID ' ] ) , talkgroup_ids ) , _stream [ ' LAST ' ] - _stream [ ' START ' ] )
if CONFIG [ ' REPORTS ' ] [ ' REPORT ' ] :
systems [ system ] . _report . send_bridgeEvent ( ' GROUP VOICE,END,RX, {} , {} , {} , {} , {} , {} , {:.2f} ' . format ( system , int_id ( stream_id ) , int_id ( _stream [ ' RX_PEER ' ] ) , int_id ( _stream [ ' RFS ' ] ) , 1 , int_id ( _stream [ ' TGID ' ] ) , _stream [ ' LAST ' ] - _stream [ ' START ' ] ) . encode ( encoding = ' utf-8 ' , errors = ' ignore ' ) )
systems [ system ] . STATUS [ stream_id ] [ ' _to ' ] = True
continue
except :
logger . warning ( " ( %s ) Keyerror - stream trimmer Stream ID: %s " , system , stream_id )
except Exception as e :
logger . exception ( " ( %s ) Keyerror - stream trimmer Stream ID: %s " , system , stream_id , exc_info = e )
systems [ system ] . STATUS [ stream_id ] [ ' LAST ' ] = _now
continue
@ -1673,6 +1686,7 @@ class routerOBP(OPENBRIDGE):
' lastData ' : False ,
' RX_PEER ' : _peer_id ,
' packets ' : 0 ,
' loss ' : 0 ,
' crcs ' : set ( )
}
@ -1767,23 +1781,28 @@ class routerOBP(OPENBRIDGE):
#Handle inbound duplicates
#Duplicate complete packet
if self . STATUS [ _stream_id ] [ ' lastData ' ] and self . STATUS [ _stream_id ] [ ' lastData ' ] == _data and _seq > 1 :
logger . info ( " ( %s ) *PacketControl* last packet is a complete duplicate of the previous one, disgarding. Stream ID:, %s TGID: %s " , self . _system , int_id ( _stream_id ) , int_id ( _dst_id ) )
self . STATUS [ _stream_id ] [ ' loss ' ] + = 1
logger . debug ( " ( %s ) *PacketControl* last packet is a complete duplicate of the previous one, disgarding. Stream ID:, %s TGID: %s , LOSS: %.2f %% " , self . _system , int_id ( _stream_id ) , int_id ( _dst_id ) , ( ( self . STATUS [ _stream_id ] [ ' loss ' ] / self . STATUS [ _stream_id ] [ ' packets ' ] ) * 100 ) )
return
#Duplicate SEQ number
if _seq and _seq == self . STATUS [ _stream_id ] [ ' lastSeq ' ] :
logger . info ( " ( %s ) *PacketControl* Duplicate sequence number %s , disgarding. Stream ID:, %s TGID: %s " , self . _system , _seq , int_id ( _stream_id ) , int_id ( _dst_id ) )
self . STATUS [ _stream_id ] [ ' loss ' ] + = 1
logger . debug ( " ( %s ) *PacketControl* Duplicate sequence number %s , disgarding. Stream ID:, %s TGID: %s , LOSS: %.2f %% " , self . _system , _seq , int_id ( _stream_id ) , int_id ( _dst_id ) , ( ( self . STATUS [ _stream_id ] [ ' loss ' ] / self . STATUS [ _stream_id ] [ ' packets ' ] ) * 100 ) )
return
#Inbound out-of-order packets
if _seq and self . STATUS [ _stream_id ] [ ' lastSeq ' ] and ( _seq != 1 ) and ( _seq < self . STATUS [ _stream_id ] [ ' lastSeq ' ] ) :
logger . info ( " %s ) *PacketControl* Out of order packet - last SEQ: %s , this SEQ: %s , disgarding. Stream ID:, %s TGID: %s " , self . _system , self . STATUS [ _stream_id ] [ ' lastSeq ' ] , _seq , int_id ( _stream_id ) , int_id ( _dst_id ) )
self . STATUS [ _stream_id ] [ ' loss ' ] + = 1
logger . debug ( " %s ) *PacketControl* Out of order packet - last SEQ: %s , this SEQ: %s , disgarding. Stream ID:, %s TGID: %s , LOSS: %.2f %% " , self . _system , self . STATUS [ _stream_id ] [ ' lastSeq ' ] , _seq , int_id ( _stream_id ) , int_id ( _dst_id ) , ( ( self . STATUS [ _stream_id ] [ ' loss ' ] / self . STATUS [ _stream_id ] [ ' packets ' ] ) * 100 ) )
return
#Duplicate DMR payload to previuos packet (by hash
if _seq > 0 and _pkt_crc in self . STATUS [ _stream_id ] [ ' crcs ' ] :
logger . info ( " ( %s ) *PacketControl* DMR packet payload with hash: %s seen before in this stream, disgarding. Stream ID:, %s TGID: %s : SEQ: %s packets: %s " , self . _system , _pkt_crc , int_id ( _stream_id ) , int_id ( _dst_id ) , _seq , self . STATUS [ _stream_id ] [ ' packets ' ] )
self . STATUS [ _stream_id ] [ ' loss ' ] + = 1
logger . debug ( " ( %s ) *PacketControl* DMR packet payload with hash: %s seen before in this stream, disgarding. Stream ID:, %s TGID: %s : SEQ: %s PACKETS: %s , LOSS: %.2f %% " , self . _system , _pkt_crc , int_id ( _stream_id ) , int_id ( _dst_id ) , _seq , self . STATUS [ _stream_id ] [ ' packets ' ] , ( ( self . STATUS [ _stream_id ] [ ' loss ' ] / self . STATUS [ _stream_id ] [ ' packets ' ] ) * 100 ) )
return
#Inbound missed packets
if _seq and self . STATUS [ _stream_id ] [ ' lastSeq ' ] and _seq > ( self . STATUS [ _stream_id ] [ ' lastSeq ' ] + 1 ) :
logger . info ( " ( %s ) *PacketControl* Missed packet(s) - last SEQ: %s , this SEQ: %s . Stream ID:, %s TGID: %s " , self . _system , self . STATUS [ _stream_id ] [ ' lastSeq ' ] , _seq , int_id ( _stream_id ) , int_id ( _dst_id ) )
self . STATUS [ _stream_id ] [ ' loss ' ] + = 1
logger . debug ( " ( %s ) *PacketControl* Missed packet(s) - last SEQ: %s , this SEQ: %s . Stream ID:, %s TGID: %s , LOSS: %.2f %% " , self . _system , self . STATUS [ _stream_id ] [ ' lastSeq ' ] , _seq , int_id ( _stream_id ) , int_id ( _dst_id ) , ( ( self . STATUS [ _stream_id ] [ ' loss ' ] / self . STATUS [ _stream_id ] [ ' packets ' ] ) * 100 ) )
#Save this sequence number
@ -1816,10 +1835,12 @@ class routerOBP(OPENBRIDGE):
if ( _frame_type == HBPF_DATA_SYNC ) and ( _dtype_vseq == HBPF_SLT_VTERM ) :
call_duration = pkt_time - self . STATUS [ _stream_id ] [ ' START ' ]
packet_rate = 0
loss = 0.00
if call_duration :
packet_rate = self . STATUS [ _stream_id ] [ ' packets ' ] / call_duration
logger . info ( ' ( %s ) *CALL END* STREAM ID: %s SUB: %s ( %s ) PEER: %s ( %s ) TGID %s ( %s ), TS %s , Duration: %.2f , Packet rate: %.2f /s ' , \
self . _system , int_id ( _stream_id ) , get_alias ( _rf_src , subscriber_ids ) , int_id ( _rf_src ) , get_alias ( _peer_id , peer_ids ) , int_id ( _peer_id ) , get_alias ( _dst_id , talkgroup_ids ) , int_id ( _dst_id ) , _slot , call_duration , packet_rate )
loss = ( self . STATUS [ _stream_id ] [ ' loss ' ] / self . STATUS [ _stream_id ] [ ' packets ' ] ) * 100
logger . info ( ' ( %s ) *CALL END* STREAM ID: %s SUB: %s ( %s ) PEER: %s ( %s ) TGID %s ( %s ), TS %s , Duration: %.2f , Packet rate: %.2f /s, Loss: %.2f %% ' , \
self . _system , int_id ( _stream_id ) , get_alias ( _rf_src , subscriber_ids ) , int_id ( _rf_src ) , get_alias ( _peer_id , peer_ids ) , int_id ( _peer_id ) , get_alias ( _dst_id , talkgroup_ids ) , int_id ( _dst_id ) , _slot , call_duration , packet_rate , loss )
if CONFIG [ ' REPORTS ' ] [ ' REPORT ' ] :
self . _report . send_bridgeEvent ( ' GROUP VOICE,END,RX, {} , {} , {} , {} , {} , {} , {:.2f} ' . format ( self . _system , int_id ( _stream_id ) , int_id ( _peer_id ) , int_id ( _rf_src ) , _slot , int_id ( _dst_id ) , call_duration ) . encode ( encoding = ' utf-8 ' , errors = ' ignore ' ) )
self . STATUS [ _stream_id ] [ ' _fin ' ] = True
@ -2479,6 +2500,7 @@ class routerHBP(HBSYSTEM):
if ( _stream_id != self . STATUS [ _slot ] [ ' RX_STREAM_ID ' ] ) :
self . STATUS [ _slot ] [ ' packets ' ] = 0
self . STATUS [ _slot ] [ ' loss ' ] = 0
self . STATUS [ _slot ] [ ' crcs ' ] = set ( )
if ( self . STATUS [ _slot ] [ ' RX_TYPE ' ] != HBPF_SLT_VTERM ) and ( pkt_time < ( self . STATUS [ _slot ] [ ' RX_TIME ' ] + STREAM_TO ) ) and ( _rf_src != self . STATUS [ _slot ] [ ' RX_RFS ' ] ) :
@ -2545,7 +2567,7 @@ class routerHBP(HBSYSTEM):
#Timeout
if self . STATUS [ _slot ] [ ' RX_START ' ] + 180 < pkt_time :
if ' LOOPLOG ' not in self . STATUS [ _slot ] or not self . STATUS [ _slot ] [ ' LOOPLOG ' ] :
logger . info ( " ( %s ) HBP * SOURCE TIMEOUT* STREAM ID: %s , TG: %s , TS: %s , IGNORE THIS SOURCE " , self . _system , int_id ( _stream_id ) , int_id ( _dst_id ) , _sysslot )
logger . info ( " ( %s ) HBP * SOURCE TIMEOUT* STREAM ID: %s , TG: %s , TS: %s , IGNORE THIS SOURCE " , self . _system , int_id ( _stream_id ) , int_id ( _dst_id ) , _sysslot )
self . STATUS [ _slot ] [ ' LOOPLOG ' ] = True
self . STATUS [ _slot ] [ ' LAST ' ] = pkt_time
return
@ -2577,23 +2599,28 @@ class routerHBP(HBSYSTEM):
#Duplicate handling#
#Duplicate complete packet
if self . STATUS [ _slot ] [ ' lastData ' ] and self . STATUS [ _slot ] [ ' lastData ' ] == _data and _seq > 1 :
self . STATUS [ _slot ] [ ' loss ' ] + = 1
logger . info ( " ( %s ) *PacketControl* last packet is a complete duplicate of the previous one, disgarding. Stream ID:, %s TGID: %s " , self . _system , int_id ( _stream_id ) , int_id ( _dst_id ) )
return
#Handle inbound duplicates
if _seq and _seq == self . STATUS [ _slot ] [ ' lastSeq ' ] :
logger . info ( " ( %s ) *PacketControl* Duplicate sequence number %s , disgarding. Stream ID:, %s TGID: %s " , self . _system , _seq , int_id ( _stream_id ) , int_id ( _dst_id ) )
self . STATUS [ _slot ] [ ' loss ' ] + = 1
logger . debug ( " ( %s ) *PacketControl* Duplicate sequence number %s , disgarding. Stream ID:, %s TGID: %s " , self . _system , _seq , int_id ( _stream_id ) , int_id ( _dst_id ) )
return
#Inbound out-of-order packets
if _seq and self . STATUS [ _slot ] [ ' lastSeq ' ] and ( _seq != 1 ) and ( _seq < self . STATUS [ _slot ] [ ' lastSeq ' ] ) :
logger . info ( " %s ) *PacketControl* Out of order packet - last SEQ: %s , this SEQ: %s , disgarding. Stream ID:, %s TGID: %s " , self . _system , self . STATUS [ _slot ] [ ' lastSeq ' ] , _seq , int_id ( _stream_id ) , int_id ( _dst_id ) )
self . STATUS [ _slot ] [ ' loss ' ] + = 1
logger . debug ( " %s ) *PacketControl* Out of order packet - last SEQ: %s , this SEQ: %s , disgarding. Stream ID:, %s TGID: %s " , self . _system , self . STATUS [ _slot ] [ ' lastSeq ' ] , _seq , int_id ( _stream_id ) , int_id ( _dst_id ) )
return
#Duplicate DMR payload to previuos packet (by hash)
if _seq > 0 and _pkt_crc in self . STATUS [ _slot ] [ ' crcs ' ] :
logger . info ( " ( %s ) *PacketControl* DMR packet payload with hash: %s seen before in this stream, disgarding. Stream ID:, %s TGID: %s , SEQ: %s , packets %s : " , self . _system , _pkt_crc , int_id ( _stream_id ) , int_id ( _dst_id ) , _seq , self . STATUS [ _slot ] [ ' packets ' ] )
self . STATUS [ _slot ] [ ' loss ' ] + = 1
logger . debug ( " ( %s ) *PacketControl* DMR packet payload with hash: %s seen before in this stream, disgarding. Stream ID:, %s TGID: %s , SEQ: %s , packets %s : " , self . _system , _pkt_crc , int_id ( _stream_id ) , int_id ( _dst_id ) , _seq , self . STATUS [ _slot ] [ ' packets ' ] )
return
#Inbound missed packets
if _seq and self . STATUS [ _slot ] [ ' lastSeq ' ] and _seq > ( self . STATUS [ _slot ] [ ' lastSeq ' ] + 1 ) :
logger . info ( " ( %s ) *PacketControl* Missed packet(s) - last SEQ: %s , this SEQ: %s . Stream ID:, %s TGID: %s " , self . _system , self . STATUS [ _slot ] [ ' lastSeq ' ] , _seq , int_id ( _stream_id ) , int_id ( _dst_id ) )
self . STATUS [ _slot ] [ ' loss ' ] + = 1
logger . debug ( " ( %s ) *PacketControl* Missed packet(s) - last SEQ: %s , this SEQ: %s . Stream ID:, %s TGID: %s " , self . _system , self . STATUS [ _slot ] [ ' lastSeq ' ] , _seq , int_id ( _stream_id ) , int_id ( _dst_id ) )
#Save this sequence number
self . STATUS [ _slot ] [ ' lastSeq ' ] = _seq
@ -2619,11 +2646,13 @@ class routerHBP(HBSYSTEM):
# Final actions - Is this a voice terminator?
if ( _frame_type == HBPF_DATA_SYNC ) and ( _dtype_vseq == HBPF_SLT_VTERM ) and ( self . STATUS [ _slot ] [ ' RX_TYPE ' ] != HBPF_SLT_VTERM ) :
packet_rate = 0
loss = 0.00
call_duration = pkt_time - self . STATUS [ _slot ] [ ' RX_START ' ]
if call_duration :
packet_rate = self . STATUS [ _slot ] [ ' packets ' ] / call_duration
logger . info ( ' ( %s ) *CALL END* STREAM ID: %s SUB: %s ( %s ) PEER: %s ( %s ) TGID %s ( %s ), TS %s , Duration: %.2f , Packet rate: %.2f /s ' , \
self . _system , int_id ( _stream_id ) , get_alias ( _rf_src , subscriber_ids ) , int_id ( _rf_src ) , get_alias ( _peer_id , peer_ids ) , int_id ( _peer_id ) , get_alias ( _dst_id , talkgroup_ids ) , int_id ( _dst_id ) , _slot , call_duration , packet_rate )
loss = ( self . STATUS [ _slot ] [ ' loss ' ] / self . STATUS [ _slot ] [ ' packets ' ] ) * 100
logger . info ( ' ( %s ) *CALL END* STREAM ID: %s SUB: %s ( %s ) PEER: %s ( %s ) TGID %s ( %s ), TS %s , Duration: %.2f , Packet rate: %.2f /s, LOSS: %.2f %% ' , \
self . _system , int_id ( _stream_id ) , get_alias ( _rf_src , subscriber_ids ) , int_id ( _rf_src ) , get_alias ( _peer_id , peer_ids ) , int_id ( _peer_id ) , get_alias ( _dst_id , talkgroup_ids ) , int_id ( _dst_id ) , _slot , call_duration , packet_rate , loss )
if CONFIG [ ' REPORTS ' ] [ ' REPORT ' ] :
self . _report . send_bridgeEvent ( ' GROUP VOICE,END,RX, {} , {} , {} , {} , {} , {} , {:.2f} ' . format ( self . _system , int_id ( _stream_id ) , int_id ( _peer_id ) , int_id ( _rf_src ) , _slot , int_id ( _dst_id ) , call_duration ) . encode ( encoding = ' utf-8 ' , errors = ' ignore ' ) )