@ -922,50 +922,84 @@ _tts_tasks = {}
_FRAME_INTERVAL = 0.058
_broadcast_queue = [ ]
_broadcast_active = False
_broadcast_active _tgs = set ( )
_BROADCAST_GAP = 1.5
def _enqueue_broadcast ( _type , _targets , _pkts_by_ts , _source_id , _dst_id , _tg , _num , _label ) :
global _broadcast_queue , _broadcast_active
_broadcast_queue . append ( {
' type ' : _type ,
' targets ' : _targets ,
' pkts_by_ts ' : _pkts_by_ts ,
' source_id ' : _source_id ,
' dst_id ' : _dst_id ,
' tg ' : _tg ,
' num ' : _num ,
' label ' : _label
} )
_pos = len ( _broadcast_queue )
if _broadcast_active :
logger . info ( ' ( %s ) Enqueued broadcast (position %s in queue) ' , _label , _pos )
global _broadcast_queue , _broadcast_active_tgs
_tg_key = str ( _tg )
if _tg_key in _broadcast_active_tgs :
_broadcast_queue . append ( {
' type ' : _type ,
' targets ' : _targets ,
' pkts_by_ts ' : _pkts_by_ts ,
' source_id ' : _source_id ,
' dst_id ' : _dst_id ,
' tg ' : _tg ,
' num ' : _num ,
' label ' : _label
} )
_pos = len ( _broadcast_queue )
logger . info ( ' ( %s ) Enqueued broadcast for same TG %s (position %s in queue) ' , _label , _tg , _pos )
else :
_start_next_broadcast ( )
_broadcast_active_tgs . add ( _tg_key )
_mark_slots_busy ( _targets )
logger . info ( ' ( %s ) Starting broadcast immediately for TG %s (active TGs: %s ) ' , _label , _tg , len ( _broadcast_active_tgs ) )
if _type == ' ann ' :
reactor . callLater ( 0.5 , _announcementSendBroadcast , _targets , _pkts_by_ts , 0 , _source_id , _dst_id , _tg , 0 , _num )
elif _type == ' tts ' :
reactor . callLater ( 0.5 , _ttsSendBroadcast , _targets , _pkts_by_ts , 0 , _source_id , _dst_id , _tg , 0 , _num )
def _start_next_broadcast ( ) :
global _broadcast_queue , _broadcast_active
global _broadcast_queue , _broadcast_active _tgs
if not _broadcast_queue :
_broadcast_active = False
return
_broadcast_active = True
_item = _broadcast_queue . pop ( 0 )
_type = _item [ ' type ' ]
_label = _item [ ' label ' ]
logger . info ( ' ( %s ) Starting broadcast from queue ( %s remaining) ' , _label , len ( _broadcast_queue ) )
_next = None
for i , _item in enumerate ( _broadcast_queue ) :
_tg_key = str ( _item [ ' tg ' ] )
if _tg_key not in _broadcast_active_tgs :
_next = _broadcast_queue . pop ( i )
break
if not _next :
return
_type = _next [ ' type ' ]
_label = _next [ ' label ' ]
_tg_key = str ( _next [ ' tg ' ] )
_broadcast_active_tgs . add ( _tg_key )
_mark_slots_busy ( _next [ ' targets ' ] )
logger . info ( ' ( %s ) Starting broadcast from queue for TG %s ( %s remaining, active TGs: %s ) ' , _label , _next [ ' tg ' ] , len ( _broadcast_queue ) , len ( _broadcast_active_tgs ) )
if _type == ' ann ' :
reactor . callLater ( 0.5 , _announcementSendBroadcast , _item [ ' targets ' ] , _item [ ' pkts_by_ts ' ] , 0 , _item [ ' source_id ' ] , _item [ ' dst_id ' ] , _item [ ' tg ' ] , 0 , _item [ ' num ' ] )
reactor . callLater ( 0.5 , _announcementSendBroadcast , _ next[ ' targets ' ] , _next [ ' pkts_by_ts ' ] , 0 , _ next[ ' source_id ' ] , _next [ ' dst_id ' ] , _next [ ' tg ' ] , 0 , _next [ ' num ' ] )
elif _type == ' tts ' :
reactor . callLater ( 0.5 , _ttsSendBroadcast , _item [ ' targets ' ] , _item [ ' pkts_by_ts ' ] , 0 , _item [ ' source_id ' ] , _item [ ' dst_id ' ] , _item [ ' tg ' ] , 0 , _item [ ' num ' ] )
reactor . callLater ( 0.5 , _ttsSendBroadcast , _ next[ ' targets ' ] , _next [ ' pkts_by_ts ' ] , 0 , _ next[ ' source_id ' ] , _next [ ' dst_id ' ] , _next [ ' tg ' ] , 0 , _next [ ' num ' ] )
def _broadcast_finished ( ) :
global _broadcast_active
def _broadcast_finished ( _tg = None ) :
global _broadcast_active_tgs
if _tg is not None :
_tg_key = str ( _tg )
_broadcast_active_tgs . discard ( _tg_key )
if _broadcast_queue :
logger . info ( ' (QUEUE) Broadcast finished, next in %.1f s ( %s queued) ' , _BROADCAST_GAP , len ( _broadcast_queue ) )
logger . info ( ' (QUEUE) Broadcast finished for TG %s , checking queue ( %s queued, active TGs: %s ) ' , _tg , len ( _broadcast_queue ) , len ( _broadcast_active_tgs ) )
reactor . callLater ( _BROADCAST_GAP , _start_next_broadcast )
else :
_broadcast_active = False
logger . info ( ' (QUEUE) Broadcast finished, queue empty ' )
if not _broadcast_active_tgs :
logger . info ( ' (QUEUE) All broadcasts finished, queue empty ' )
else :
logger . info ( ' (QUEUE) Broadcast finished for TG %s , %s TGs still active ' , _tg , len ( _broadcast_active_tgs ) )
def _mark_slots_busy ( _targets ) :
for _t in _targets :
try :
_t [ ' slot ' ] [ ' TX_TYPE ' ] = HBPF_SLT_VHEAD
except ( KeyError , TypeError ) :
pass
def _mark_slots_free ( _targets ) :
for _t in _targets :
try :
_t [ ' slot ' ] [ ' TX_TYPE ' ] = HBPF_SLT_VTERM
except ( KeyError , TypeError ) :
pass
_RECORDING_MAX_FRAMES = 2750
_recording_state = {
@ -1068,6 +1102,7 @@ def _announcementSendBroadcast(_targets, _pkts_by_ts, _pkt_idx, _source_id, _dst
_total_pkts = len ( _pkts_by_ts [ 1 ] )
if _pkt_idx > = _total_pkts :
_mark_slots_free ( _targets )
for _t in _targets :
try :
for _sid in list ( _t [ ' sys_obj ' ] . STATUS . keys ( ) ) :
@ -1077,7 +1112,7 @@ def _announcementSendBroadcast(_targets, _pkts_by_ts, _pkt_idx, _source_id, _dst
pass
_announcement_running [ _ann_num ] = False
logger . info ( ' ( %s ) Broadcast complete: %s packets sent to %s targets ' , _label , _total_pkts , len ( _targets ) )
_broadcast_finished ( )
_broadcast_finished ( _tg )
return
_now = time ( )
@ -1143,14 +1178,14 @@ def scheduledAnnouncement(_ann_num=1, _retry=0):
return
_announcement_last_hour [ _ann_num ] = _now . hour
if _broadcast_active and _retry < 60 :
_tg = CONFIG [ ' GLOBAL ' ] [ ' {} _TG ' . format ( _prefix ) ]
if str ( _tg ) in _broadcast_active_tgs and _retry < 60 :
if _retry == 0 :
logger . debug ( ' ( %s ) Broadcast queue busy , deferring prep' , _label )
logger . debug ( ' ( %s ) Same TG %s already broadcasting , deferring prep' , _label , _tg )
reactor . callLater ( 3.0 + _ann_num * 0.5 , scheduledAnnouncement , _ann_num , _retry + 1 )
return
_file = CONFIG [ ' GLOBAL ' ] [ ' {} _FILE ' . format ( _prefix ) ]
_tg = CONFIG [ ' GLOBAL ' ] [ ' {} _TG ' . format ( _prefix ) ]
_lang = CONFIG [ ' GLOBAL ' ] [ ' {} _LANGUAGE ' . format ( _prefix ) ]
_dst_id = bytes_3 ( _tg )
_source_id = bytes_3 ( 5000 )
@ -1341,14 +1376,14 @@ def scheduledTTSAnnouncement(_tts_num=1, _retry=0):
return
_tts_last_hour [ _tts_num ] = _now . hour
if _broadcast_active and _retry < 60 :
_tg = CONFIG [ ' GLOBAL ' ] [ ' {} _TG ' . format ( _prefix ) ]
if str ( _tg ) in _broadcast_active_tgs and _retry < 60 :
if _retry == 0 :
logger . debug ( ' ( %s ) Broadcast queue busy , deferring TTS prep' , _label )
logger . debug ( ' ( %s ) Same TG %s already broadcasting , deferring TTS prep' , _label , _tg )
reactor . callLater ( 3.0 + _tts_num * 0.5 , scheduledTTSAnnouncement , _tts_num , _retry + 1 )
return
_file = CONFIG [ ' GLOBAL ' ] [ ' {} _FILE ' . format ( _prefix ) ]
_tg = CONFIG [ ' GLOBAL ' ] [ ' {} _TG ' . format ( _prefix ) ]
_lang = CONFIG [ ' GLOBAL ' ] [ ' {} _LANGUAGE ' . format ( _prefix ) ]
_tts_running [ _tts_num ] = True
@ -1367,9 +1402,9 @@ def _ttsConversionDone(_ambe_path, _tts_num, _file, _tg, _lang, _mode, _label, _
logger . warning ( ' ( %s ) No AMBE file available for TTS announcement %s ' , _label , _file )
return
if _broadcast_active and _retry < 60 :
if str ( _tg ) in _broadcast_active _tgs and _retry < 60 :
if _retry == 0 :
logger . debug ( ' ( %s ) Broadcast queue busy , deferring TTS packet prep' , ' TTS- {} ' . format ( _tts_num ) )
logger . debug ( ' ( %s ) Same TG %s already broadcasting , deferring TTS packet prep' , ' TTS- {} ' . format ( _tts_num ) , _tg )
reactor . callLater ( 3.0 + _tts_num * 0.5 , _ttsConversionDone , _ambe_path , _tts_num , _file , _tg , _lang , _mode , _label , _retry + 1 )
return
@ -1484,6 +1519,7 @@ def _ttsSendBroadcast(_targets, _pkts_by_ts, _pkt_idx, _source_id, _dst_id, _tg,
_total_pkts = len ( _pkts_by_ts [ 1 ] )
if _pkt_idx > = _total_pkts :
_mark_slots_free ( _targets )
for _t in _targets :
try :
for _sid in list ( _t [ ' sys_obj ' ] . STATUS . keys ( ) ) :
@ -1493,7 +1529,7 @@ def _ttsSendBroadcast(_targets, _pkts_by_ts, _pkt_idx, _source_id, _dst_id, _tg,
pass
_tts_running [ _tts_num ] = False
logger . info ( ' ( %s ) Broadcast complete: %s packets sent to %s targets ' , _label , _total_pkts , len ( _targets ) )
_broadcast_finished ( )
_broadcast_finished ( _tg )
return
_now = time ( )