@ -141,6 +141,9 @@ TrafficNetwork::TrafficNetwork(HostFNE* host, const std::string& address, uint16
m_jitterMaxSize ( 4U ) ,
m_jitterMaxSize ( 4U ) ,
m_jitterMaxWait ( 40000U ) ,
m_jitterMaxWait ( 40000U ) ,
m_threadPool ( workerCnt , " fne " ) ,
m_threadPool ( workerCnt , " fne " ) ,
m_metadataUpdateThreadPool ( workerCnt / 2U , " mupdt " ) ,
m_metadataUpdateMutex ( ) ,
m_metadataUpdateState ( ) ,
m_disablePacketData ( false ) ,
m_disablePacketData ( false ) ,
m_dumpPacketData ( false ) ,
m_dumpPacketData ( false ) ,
m_verbosePacketData ( false ) ,
m_verbosePacketData ( false ) ,
@ -724,6 +727,9 @@ bool TrafficNetwork::open()
// start thread pool
// start thread pool
m_threadPool . start ( ) ;
m_threadPool . start ( ) ;
// start metadata thread pool
m_metadataUpdateThreadPool . start ( ) ;
// start FluxQL thread pool
// start FluxQL thread pool
if ( m_enableInfluxDB ) {
if ( m_enableInfluxDB ) {
influxdb : : detail : : TSCaller : : start ( ) ;
influxdb : : detail : : TSCaller : : start ( ) ;
@ -780,6 +786,16 @@ void TrafficNetwork::close()
m_threadPool . stop ( ) ;
m_threadPool . stop ( ) ;
m_threadPool . wait ( ) ;
m_threadPool . wait ( ) ;
// stop metadata thread pool
m_metadataUpdateThreadPool . stop ( ) ;
m_metadataUpdateThreadPool . wait ( ) ;
// scope is intentional
{
std : : lock_guard < std : : mutex > lock ( m_metadataUpdateMutex ) ;
m_metadataUpdateState . clear ( ) ;
}
// stop FluxQL thread pool
// stop FluxQL thread pool
if ( m_enableInfluxDB ) {
if ( m_enableInfluxDB ) {
influxdb : : detail : : TSCaller : : stop ( ) ;
influxdb : : detail : : TSCaller : : stop ( ) ;
@ -2408,15 +2424,61 @@ void TrafficNetwork::processInCallCtrl(network::NET_ICC::ENUM command, network::
void TrafficNetwork : : peerMetadataUpdate ( uint32_t peerId )
void TrafficNetwork : : peerMetadataUpdate ( uint32_t peerId )
{
{
if ( peerId = = 0U ) {
return ;
}
bool enqueueTask = false ;
// scope is intentional
{
std : : lock_guard < std : : mutex > lock ( m_metadataUpdateMutex ) ;
MetadataUpdateState & state = m_metadataUpdateState [ peerId ] ;
if ( state . inFlight ) {
// coalesce duplicate requests while one update is running
LogWarning ( LOG_MASTER , " PEER %u metadata update already in flight, coalescing duplicate request " , peerId ) ;
state . pending = true ;
return ;
}
if ( state . pending ) {
// a request is already queued for this peer
LogWarning ( LOG_MASTER , " PEER %u metadata update already pending, coalescing duplicate request " , peerId ) ;
return ;
}
state . pending = true ;
enqueueTask = true ;
}
if ( ! enqueueTask ) {
return ;
}
MetadataUpdateRequest * req = new MetadataUpdateRequest ( ) ;
MetadataUpdateRequest * req = new MetadataUpdateRequest ( ) ;
req - > obj = this ;
req - > obj = this ;
req - > peerId = peerId ;
req - > peerId = peerId ;
// enqueue the task
// enqueue the task
if ( ! m_threadPool . enqueue ( new_pooltask ( taskMetadataUpdate , req ) ) ) {
if ( ! m_ me tadataUpdateT hreadPool. enqueue ( new_pooltask ( taskMetadataUpdate , req ) ) ) {
LogError ( LOG_NET , " Failed to task enqueue metadata update, peerId = %u " , peerId ) ;
LogError ( LOG_NET , " Failed to task enqueue metadata update, peerId = %u " , peerId ) ;
if ( req ! = nullptr )
// scope is intentional
{
std : : lock_guard < std : : mutex > lock ( m_metadataUpdateMutex ) ;
auto it = m_metadataUpdateState . find ( peerId ) ;
if ( it ! = m_metadataUpdateState . end ( ) ) {
it - > second . pending = false ;
if ( ! it - > second . inFlight ) {
m_metadataUpdateState . erase ( it ) ;
}
}
}
if ( req ! = nullptr ) {
delete req ;
delete req ;
}
}
}
}
}
@ -2435,37 +2497,76 @@ void TrafficNetwork::taskMetadataUpdate(MetadataUpdateRequest* req)
if ( req = = nullptr )
if ( req = = nullptr )
return ;
return ;
std : : string peerIdentity = network - > resolvePeerIdentity ( req - > peerId ) ;
while ( true ) {
// scope is intentional
{
std : : lock_guard < std : : mutex > lock ( network - > m_metadataUpdateMutex ) ;
// check if there is a pending metadata update for this peer
MetadataUpdateState & state = network - > m_metadataUpdateState [ req - > peerId ] ;
if ( ! state . pending ) {
// no pending metadata update for this peer, exit the loop
state . inFlight = false ;
network - > m_metadataUpdateState . erase ( req - > peerId ) ;
break ;
}
FNEPeerConnection * connection = network - > m_peers [ req - > peerId ] ;
// check if the peer connection is still valid and connected
if ( connection ! = nullptr ) {
FNEPeerConnection * connection = network - > m_peers [ req - > peerId ] ;
if ( connection - > connected ( ) ) {
if ( connection ! = nullptr ) {
connection - > lock ( ) ;
if ( ! connection - > connected ( ) ) {
uint32_t streamId = network - > createStreamId ( ) ;
// peer connection is not connected, skip the metadata update
LogWarning ( LOG_MASTER , " PEER %u (%s) not connected, skipping metadata update " , req - > peerId , connection - > identWithQualifier ( ) . c_str ( ) ) ;
state . pending = false ;
state . inFlight = false ;
network - > m_metadataUpdateState . erase ( req - > peerId ) ;
break ;
}
} else {
// peer connection is not found, skip the metadata update
LogWarning ( LOG_MASTER , " PEER %u not found, skipping metadata update " , req - > peerId ) ;
state . pending = false ;
state . inFlight = false ;
network - > m_metadataUpdateState . erase ( req - > peerId ) ;
break ;
}
// if the connection is a downstream neighbor FNE peer, and peer is participating in peer link,
state . pending = false ;
// send the peer proper configuration data
state . inFlight = true ;
if ( connection - > peerClass ( ) = = PEER_CONN_CLASS_NEIGHBOR & & connection - > isReplica ( ) ) {
}
LogInfoEx ( LOG_MASTER , " PEER %u (%s) sending replica network metadata updates " , req - > peerId , peerIdentity . c_str ( ) ) ;
network - > writeWhitelistRIDs ( req - > peerId , streamId , true ) ;
std : : string peerIdentity = network - > resolvePeerIdentity ( req - > peerId ) ;
network - > writeTGIDs ( req - > peerId , streamId , true ) ;
network - > writePeerList ( req - > peerId , streamId ) ;
network - > writeHAParameters ( req - > peerId , streamId , true ) ;
FNEPeerConnection * connection = network - > m_peers [ req - > peerId ] ;
}
if ( connection ! = nullptr ) {
else {
if ( connection - > connected ( ) ) {
LogInfoEx ( LOG_MASTER , " PEER %u (%s) sending network metadata updates " , req - > peerId , peerIdentity . c_str ( ) ) ;
connection - > lock ( ) ;
uint32_t streamId = network - > createStreamId ( ) ;
network - > writeWhitelistRIDs ( req - > peerId , streamId , false ) ;
// if the connection is a downstream neighbor FNE peer, and peer is participating in peer link,
network - > writeBlacklistRIDs ( req - > peerId , streamId ) ;
// send the peer proper configuration data
network - > writeTGIDs ( req - > peerId , streamId , false ) ;
if ( connection - > peerClass ( ) = = PEER_CONN_CLASS_NEIGHBOR & & connection - > isReplica ( ) ) {
network - > writeDeactiveTGIDs ( req - > peerId , streamId ) ;
LogInfoEx ( LOG_MASTER , " PEER %u (%s) sending replica network metadata updates " , req - > peerId , peerIdentity . c_str ( ) ) ;
network - > writeHAParameters ( req - > peerId , streamId , false ) ;
network - > writeWhitelistRIDs ( req - > peerId , streamId , true ) ;
}
network - > writeTGIDs ( req - > peerId , streamId , true ) ;
network - > writePeerList ( req - > peerId , streamId ) ;
connection - > unlock ( ) ;
network - > writeHAParameters ( req - > peerId , streamId , true ) ;
}
else {
LogInfoEx ( LOG_MASTER , " PEER %u (%s) sending network metadata updates " , req - > peerId , peerIdentity . c_str ( ) ) ;
network - > writeWhitelistRIDs ( req - > peerId , streamId , false ) ;
network - > writeBlacklistRIDs ( req - > peerId , streamId ) ;
network - > writeTGIDs ( req - > peerId , streamId , false ) ;
network - > writeDeactiveTGIDs ( req - > peerId , streamId ) ;
network - > writeHAParameters ( req - > peerId , streamId , false ) ;
}
connection - > unlock ( ) ;
}
}
}
}
}