@ -1889,7 +1889,7 @@ void TrafficNetwork::taskNetworkRx(NetPacketRequest* req)
FNEPeerConnection * connection = network - > m_peers [ peerId ] ;
if ( connection ! = nullptr ) {
std : : string ip = udp : : Socket : : address ( req - > address ) ;
lookups: : AffiliationLookup * aff = network - > m_peerAffiliations [ peerId ] ;
std: : shared_ptr < fne_ lookups: : AffiliationLookup > aff = network - > getPeerAffiliations ( peerId ) ;
if ( aff = = nullptr ) {
LogError ( LOG_MASTER , " PEER %u (%s) has uninitialized affiliations lookup? " , peerId , connection - > identWithQualifier ( ) . c_str ( ) ) ;
network - > writePeerNAK ( peerId , streamId , TAG_ANNOUNCE , NET_CONN_NAK_INVALID ) ;
@ -1928,7 +1928,7 @@ void TrafficNetwork::taskNetworkRx(NetPacketRequest* req)
FNEPeerConnection * connection = network - > m_peers [ peerId ] ;
if ( connection ! = nullptr ) {
std : : string ip = udp : : Socket : : address ( req - > address ) ;
fne_lookups: : AffiliationLookup * aff = network - > m_peerAffiliations [ peerId ] ;
std: : shared_ptr < fne_lookups: : AffiliationLookup > aff = network - > getPeerAffiliations ( peerId ) ;
if ( aff = = nullptr ) {
LogError ( LOG_MASTER , " PEER %u (%s) has uninitialized affiliations lookup? " , peerId , connection - > identWithQualifier ( ) . c_str ( ) ) ;
network - > writePeerNAK ( peerId , streamId , TAG_ANNOUNCE , NET_CONN_NAK_INVALID ) ;
@ -1965,7 +1965,7 @@ void TrafficNetwork::taskNetworkRx(NetPacketRequest* req)
FNEPeerConnection * connection = network - > m_peers [ peerId ] ;
if ( connection ! = nullptr ) {
std : : string ip = udp : : Socket : : address ( req - > address ) ;
lookups: : AffiliationLookup * aff = network - > m_peerAffiliations [ peerId ] ;
std: : shared_ptr < fne_ lookups: : AffiliationLookup > aff = network - > getPeerAffiliations ( peerId ) ;
if ( aff = = nullptr ) {
LogError ( LOG_MASTER , " PEER %u (%s) has uninitialized affiliations lookup? " , peerId , connection - > identWithQualifier ( ) . c_str ( ) ) ;
network - > writePeerNAK ( peerId , streamId , TAG_ANNOUNCE , NET_CONN_NAK_INVALID ) ;
@ -2003,7 +2003,7 @@ void TrafficNetwork::taskNetworkRx(NetPacketRequest* req)
FNEPeerConnection * connection = network - > m_peers [ peerId ] ;
if ( connection ! = nullptr ) {
std : : string ip = udp : : Socket : : address ( req - > address ) ;
lookups: : AffiliationLookup * aff = network - > m_peerAffiliations [ peerId ] ;
std: : shared_ptr < fne_ lookups: : AffiliationLookup > aff = network - > getPeerAffiliations ( peerId ) ;
if ( aff = = nullptr ) {
LogError ( LOG_MASTER , " PEER %u (%s) has uninitialized affiliations lookup? " , peerId , connection - > identWithQualifier ( ) . c_str ( ) ) ;
network - > writePeerNAK ( peerId , streamId , TAG_ANNOUNCE , NET_CONN_NAK_INVALID ) ;
@ -2044,7 +2044,7 @@ void TrafficNetwork::taskNetworkRx(NetPacketRequest* req)
// validate peer (simple validation really)
if ( connection - > connected ( ) & & connection - > address ( ) = = ip ) {
lookups: : AffiliationLookup * aff = network - > m_peerAffiliations [ peerId ] ;
std: : shared_ptr < fne_ lookups: : AffiliationLookup > aff = network - > getPeerAffiliations ( peerId ) ;
if ( aff = = nullptr ) {
LogError ( LOG_MASTER , " PEER %u (%s) has uninitialized affiliations lookup? " , peerId , connection - > identWithQualifier ( ) . c_str ( ) ) ;
network - > writePeerNAK ( peerId , streamId , TAG_ANNOUNCE , NET_CONN_NAK_INVALID ) ;
@ -2227,34 +2227,74 @@ void TrafficNetwork::eraseStreamPktSeq(uint32_t peerId, uint32_t streamId)
void TrafficNetwork : : createPeerAffiliations ( uint32_t peerId , std : : string peerName )
{
erasePeerAffiliations ( peerId ) ;
std : : lock_guard < std : : mutex > lock ( m_peerAffiliationsMutex ) ;
auto it = m_peerAffiliations . find ( peerId ) ;
if ( it ! = m_peerAffiliations . end ( ) ) {
m_peerAffiliations . erase ( peerId ) ;
}
lookups : : ChannelLookup * chLookup = new lookups : : ChannelLookup ( ) ;
m_peerAffiliations [ peerId ] = new fne_lookups : : AffiliationLookup ( peerName , chLookup , m_verbose ) ;
m_peerAffiliations [ peerId ] - > setDisableUnitRegTimeout ( true ) ; // FNE doesn't allow unit registration timeouts (notification must come from the peers)
std : : shared_ptr < fne_lookups : : AffiliationLookup > aff (
new fne_lookups : : AffiliationLookup ( peerName , chLookup , m_verbose ) ,
[ ] ( fne_lookups : : AffiliationLookup * p ) {
if ( p ! = nullptr ) {
lookups : : ChannelLookup * rfCh = p - > rfCh ( ) ;
if ( rfCh ! = nullptr ) {
delete rfCh ;
}
delete p ;
}
} ) ;
aff - > setDisableUnitRegTimeout ( true ) ; // FNE doesn't allow unit registration timeouts (notification must come from the peers)
m_peerAffiliations . insert ( peerId , aff ) ;
}
/* Helper to erase the peer from the peers affiliations list. */
bool TrafficNetwork : : erasePeerAffiliations ( uint32_t peerId )
{
auto it = std : : find_if ( m_peerAffiliations . begin ( ) , m_peerAffiliations . end ( ) , [ & ] ( PeerAffiliationMapPair x ) { return x . first = = peerId ; } ) ;
std : : lock_guard < std : : mutex > lock ( m_peerAffiliationsMutex ) ;
auto it = m_peerAffiliations . find ( peerId ) ;
if ( it ! = m_peerAffiliations . end ( ) ) {
lookups : : AffiliationLookup * aff = m_peerAffiliations [ peerId ] ;
if ( aff ! = nullptr ) {
lookups : : ChannelLookup * rfCh = aff - > rfCh ( ) ;
if ( rfCh ! = nullptr )
delete rfCh ;
delete aff ;
}
m_peerAffiliations . erase ( peerId ) ;
return true ;
}
return false ;
}
/* Helper to get the peer affiliations entry for a peer. */
std : : shared_ptr < fne_lookups : : AffiliationLookup > TrafficNetwork : : getPeerAffiliations ( uint32_t peerId ) const
{
std : : lock_guard < std : : mutex > lock ( m_peerAffiliationsMutex ) ;
auto it = m_peerAffiliations . find ( peerId ) ;
if ( it ! = m_peerAffiliations . end ( ) ) {
return it - > second ;
}
return nullptr ;
}
/* Helper to create a snapshot of all peer affiliation entries. */
std : : vector < TrafficNetwork : : PeerAffiliationMapPair > TrafficNetwork : : peerAffiliationsSnapshot ( ) const
{
std : : vector < TrafficNetwork : : PeerAffiliationMapPair > snapshot ;
std : : lock_guard < std : : mutex > lock ( m_peerAffiliationsMutex ) ;
snapshot . reserve ( m_peerAffiliations . size ( ) ) ;
for ( auto it = m_peerAffiliations . begin ( ) ; it ! = m_peerAffiliations . end ( ) ; + + it ) {
snapshot . push_back ( * it ) ;
}
return snapshot ;
}
/* Helper to disconnect a downstream peer. */
void TrafficNetwork : : disconnectPeer ( uint32_t peerId , FNEPeerConnection * connection )
@ -2356,8 +2396,9 @@ bool TrafficNetwork::isPeerLocal(uint32_t peerId)
uint32_t TrafficNetwork : : findPeerUnitReg ( uint32_t srcId )
{
for ( auto it = m_peerAffiliations . begin ( ) ; it ! = m_peerAffiliations . end ( ) ; + + it ) {
fne_lookups : : AffiliationLookup * aff = it - > second ;
std : : vector < PeerAffiliationMapPair > affSnapshot = peerAffiliationsSnapshot ( ) ;
for ( const auto & entry : affSnapshot ) {
std : : shared_ptr < fne_lookups : : AffiliationLookup > aff = entry . second ;
if ( aff ! = nullptr ) {
if ( aff - > isUnitReg ( srcId ) ) {
return aff - > getSSRCByUnitReg ( srcId ) ;