diff --git a/bridge.py b/bridge.py index 7cb6d04..48dde59 100755 --- a/bridge.py +++ b/bridge.py @@ -249,7 +249,7 @@ class routerOBP(OPENBRIDGE): self._lastSeq = False - def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data): + def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data,_hash): pkt_time = time() dmrpkt = _data[20:53] _bits = _data[15] diff --git a/bridge_master.py b/bridge_master.py index e7a7393..25c9522 100644 --- a/bridge_master.py +++ b/bridge_master.py @@ -40,6 +40,9 @@ import re import copy from setproctitle import setproctitle +#from crccheck.crc import Crc32 +from hashlib import sha1 + # Twisted is pretty important, so I keep it separate from twisted.internet.protocol import Factory, Protocol from twisted.protocols.basic import NetstringReceiver @@ -1459,11 +1462,15 @@ class routerOBP(OPENBRIDGE): systems[_target]._report.send_bridgeEvent('UNIT DATA,START,TX,{},{},{},{},{},{}'.format(_target, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), 1, _int_dst_id).encode(encoding='utf-8', errors='ignore')) - def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data): + def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data,_hash): pkt_time = time() dmrpkt = _data[20:53] _bits = _data[15] + #pkt_crc = Crc32.calc(_data[4:53]) + #_pkt_crc = Crc32.calc(dmrpkt) + _pkt_crc = _hash + # Match UNIT data, SMS/GPS, and send it to the dst_id if it is in SUB_MAP if _call_type == 'unit' and (_dtype_vseq == 6 or _dtype_vseq == 7 or _dtype_vseq == 8 or ((_stream_id not in self.STATUS) and _dtype_vseq == 3)): @@ -1485,7 +1492,8 @@ class routerOBP(OPENBRIDGE): 'lastSeq': False, 'lastData': False, 'RX_PEER': _peer_id, - 'packets': 0 + 'packets': 0, + 'crcs': [] } @@ -1603,6 +1611,9 @@ class routerOBP(OPENBRIDGE): else: logger.info('(%s) UNIT Data not bridged to HBP on slot %s - target busy: %s DST_ID: %s',self._system,_d_slot,_d_system,_int_dst_id) + + self.STATUS[_stream_id]['crcs'].add(_pkt_crc) + if _call_type == 'group' or _call_type == 'vcsbk': # Is this a new call stream? @@ -1618,7 +1629,8 @@ class routerOBP(OPENBRIDGE): 'lastSeq': False, 'lastData': False, 'RX_PEER': _peer_id, - 'packets': 0 + 'packets': 0, + 'crcs': set() } @@ -1693,11 +1705,12 @@ class routerOBP(OPENBRIDGE): return #Duplicate handling# + #Handle inbound duplicates #Duplicate complete packet if self.STATUS[_stream_id]['lastData'] and self.STATUS[_stream_id]['lastData'] == _data and _seq > 1: logger.warning("(%s) *PacketControl* last packet is a complete duplicate of the previous one, disgarding. Stream ID:, %s TGID: %s",self._system,int_id(_stream_id),int_id(_dst_id)) return - #Handle inbound duplicates + #Duplicate SEQ number if _seq and _seq == self.STATUS[_stream_id]['lastSeq']: logger.warning("(%s) *PacketControl* Duplicate sequence number %s, disgarding. Stream ID:, %s TGID: %s",self._system,_seq,int_id(_stream_id),int_id(_dst_id)) return @@ -1705,9 +1718,14 @@ class routerOBP(OPENBRIDGE): if _seq and self.STATUS[_stream_id]['lastSeq'] and (_seq != 1) and (_seq < self.STATUS[_stream_id]['lastSeq']): logger.warning("%s) *PacketControl* Out of order packet - last SEQ: %s, this SEQ: %s, disgarding. Stream ID:, %s TGID: %s ",self._system,self.STATUS[_stream_id]['lastSeq'],_seq,int_id(_stream_id),int_id(_dst_id)) return + #Duplicate DMR payload to previuos packet (by SHA1 + if _seq > 0 and _pkt_crc in self.STATUS[_stream_id]['crcs']: + logger.warning("(%s) *PacketControl* DMR packet payload with SHA1: %s seen before in this stream, disgarding. Stream ID:, %s TGID: %s: SEQ:%s packets: %s ",self._system,_pkt_crc,int_id(_stream_id),int_id(_dst_id),_seq, self.STATUS[_stream_id]['packets']) + return #Inbound missed packets if _seq and self.STATUS[_stream_id]['lastSeq'] and _seq > (self.STATUS[_stream_id]['lastSeq']+1): logger.warning("(%s) *PacketControl* Missed packet(s) - last SEQ: %s, this SEQ: %s. Stream ID:, %s TGID: %s ",self._system,self.STATUS[_stream_id]['lastSeq'],_seq,int_id(_stream_id),int_id(_dst_id)) + #Save this sequence number self.STATUS[_stream_id]['lastSeq'] = _seq @@ -1715,7 +1733,9 @@ class routerOBP(OPENBRIDGE): self.STATUS[_stream_id]['lastData'] = _data - + + self.STATUS[_stream_id]['crcs'].add(_pkt_crc) + self.STATUS[_stream_id]['LAST'] = pkt_time @@ -1781,7 +1801,8 @@ class routerHBP(HBSYSTEM): }, 'lastSeq': False, 'lastData': False, - 'packets': 0 + 'packets': 0, + 'crcs': set() }, 2: { 'RX_START': time(), @@ -1810,7 +1831,8 @@ class routerHBP(HBSYSTEM): }, 'lastSeq': False, 'lastData': False, - 'packets': 0 + 'packets': 0, + 'crcs': set() } } @@ -2041,6 +2063,9 @@ class routerHBP(HBSYSTEM): dmrpkt = _data[20:53] _bits = _data[15] + #_pkt_crc = Crc32.calc(_data[4:53]) + _pkt_crc = sha1(_data).digest() + _nine = bytes_3(9) _lang = CONFIG['SYSTEMS'][self._system]['ANNOUNCEMENT_LANGUAGE'] @@ -2070,6 +2095,9 @@ class routerHBP(HBSYSTEM): if _call_type == 'unit' and (_dtype_vseq == 6 or _dtype_vseq == 7 or _dtype_vseq == 8 or (_stream_id != self.STATUS[_slot]['RX_STREAM_ID'] and _dtype_vseq == 3)): _data_call = True + self.STATUS[_slot]['packets'] = 0 + self.STATUS[_slot]['crcs'] = set() + if _dtype_vseq == 3: logger.info('(%s) *UNIT CSBK* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) DST_ID %s (%s), TS %s', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) @@ -2168,6 +2196,7 @@ class routerHBP(HBSYSTEM): if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): self.STATUS[_slot]['packets'] = 0 + self.STATUS[_slot]['crcs'] = set() self.STATUS[_slot]['_stopTgAnnounce'] = False @@ -2317,6 +2346,7 @@ class routerHBP(HBSYSTEM): if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): self.STATUS[_slot]['packets'] = 0 + self.STATUS[_slot]['crcs'] = set() if (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM) and (pkt_time < (self.STATUS[_slot]['RX_TIME'] + STREAM_TO)) and (_rf_src != self.STATUS[_slot]['RX_RFS']): logger.warning('(%s) Packet received with STREAM ID: %s SUB: %s PEER: %s TGID %s, SLOT %s collided with existing call', self._system, int_id(_stream_id), int_id(_rf_src), int_id(_peer_id), int_id(_dst_id), _slot) @@ -2409,6 +2439,10 @@ class routerHBP(HBSYSTEM): if _seq and self.STATUS[_slot]['lastSeq'] and (_seq != 1) and (_seq < self.STATUS[_slot]['lastSeq']): logger.warning("%s) *PacketControl* Out of order packet - last SEQ: %s, this SEQ: %s, disgarding. Stream ID:, %s TGID: %s ",self._system,self.STATUS[_slot]['lastSeq'],_seq,int_id(_stream_id),int_id(_dst_id)) return + #Duplicate DMR payload to previuos packet (by SHA1) + if _seq > 0 and _pkt_crc in self.STATUS[_slot]['crcs']: + logger.warning("(%s) *PacketControl* DMR packet payload with SHA1: %s seen before in this stream, disgarding. Stream ID:, %s TGID: %s, SEQ: %s, packets %s: ",self._system,_pkt_crc,int_id(_stream_id),int_id(_dst_id),_seq,self.STATUS[_slot]['packets']) + return #Inbound missed packets if _seq and self.STATUS[_slot]['lastSeq'] and _seq > (self.STATUS[_slot]['lastSeq']+1): logger.warning("(%s) *PacketControl* Missed packet(s) - last SEQ: %s, this SEQ: %s. Stream ID:, %s TGID: %s ",self._system,self.STATUS[_slot]['lastSeq'],_seq,int_id(_stream_id),int_id(_dst_id)) @@ -2539,6 +2573,8 @@ class routerHBP(HBSYSTEM): self.STATUS[_slot]['RX_TGID'] = _dst_id self.STATUS[_slot]['RX_TIME'] = pkt_time self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id + + self.STATUS[_slot]['crcs'].add(_pkt_crc) # # Socket-based reporting section diff --git a/hblink.py b/hblink.py index b12b529..8017b48 100755 --- a/hblink.py +++ b/hblink.py @@ -166,7 +166,7 @@ class OPENBRIDGE(DatagramProtocol): logger.debug('(%s) *BridgeControl* Not sent BCSQ Source Quench TARGET_IP not known , TG: %s, Stream ID: %s',self._system,int_id(_tgid)) - def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data): + def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data,_hash): pass #print(int_id(_peer_id), int_id(_rf_src), int_id(_dst_id), int_id(_seq), _slot, _call_type, _frame_type, repr(_dtype_vseq), int_id(_stream_id)) @@ -245,7 +245,7 @@ class OPENBRIDGE(DatagramProtocol): return # Userland actions -- typically this is the function you subclass for an application - self.dmrd_received(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data) + self.dmrd_received(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data,_hash) #Silently treat a DMRD packet like a keepalive - this is because it's traffic and the #Other end may not have enabled ENAHNCED_OBP self._config['_bcka'] = time()