diff --git a/src/fne/network/MetadataNetwork.cpp b/src/fne/network/MetadataNetwork.cpp index 76a43754..48235fc2 100644 --- a/src/fne/network/MetadataNetwork.cpp +++ b/src/fne/network/MetadataNetwork.cpp @@ -42,7 +42,7 @@ MetadataNetwork::MetadataNetwork(HostFNE* host, TrafficNetwork* trafficNetwork, m_status(NET_STAT_INVALID), m_peerReplicaActPkt(), m_peerTreeListPkt(), - m_threadPool(workerCnt, "diag") + m_threadPool(workerCnt, "meta") { assert(trafficNetwork != nullptr); assert(host != nullptr); diff --git a/src/fne/network/TrafficNetwork.cpp b/src/fne/network/TrafficNetwork.cpp index 693d666b..ea7850a4 100644 --- a/src/fne/network/TrafficNetwork.cpp +++ b/src/fne/network/TrafficNetwork.cpp @@ -141,6 +141,9 @@ TrafficNetwork::TrafficNetwork(HostFNE* host, const std::string& address, uint16 m_jitterMaxSize(4U), m_jitterMaxWait(40000U), m_threadPool(workerCnt, "fne"), + m_metadataUpdateThreadPool(workerCnt / 2U, "mupdt"), + m_metadataUpdateMutex(), + m_metadataUpdateState(), m_disablePacketData(false), m_dumpPacketData(false), m_verbosePacketData(false), @@ -724,6 +727,9 @@ bool TrafficNetwork::open() // start thread pool m_threadPool.start(); + // start metadata thread pool + m_metadataUpdateThreadPool.start(); + // start FluxQL thread pool if (m_enableInfluxDB) { influxdb::detail::TSCaller::start(); @@ -780,6 +786,16 @@ void TrafficNetwork::close() m_threadPool.stop(); m_threadPool.wait(); + // stop metadata thread pool + m_metadataUpdateThreadPool.stop(); + m_metadataUpdateThreadPool.wait(); + + // scope is intentional + { + std::lock_guard lock(m_metadataUpdateMutex); + m_metadataUpdateState.clear(); + } + // stop FluxQL thread pool if (m_enableInfluxDB) { influxdb::detail::TSCaller::stop(); @@ -2408,15 +2424,61 @@ void TrafficNetwork::processInCallCtrl(network::NET_ICC::ENUM command, network:: void TrafficNetwork::peerMetadataUpdate(uint32_t peerId) { + if (peerId == 0U) { + return; + } + + bool enqueueTask = false; + + // scope is intentional + { + std::lock_guard lock(m_metadataUpdateMutex); + MetadataUpdateState& state = m_metadataUpdateState[peerId]; + + if (state.inFlight) { + // coalesce duplicate requests while one update is running + LogWarning(LOG_MASTER, "PEER %u metadata update already in flight, coalescing duplicate request", peerId); + state.pending = true; + return; + } + + if (state.pending) { + // a request is already queued for this peer + LogWarning(LOG_MASTER, "PEER %u metadata update already pending, coalescing duplicate request", peerId); + return; + } + + state.pending = true; + enqueueTask = true; + } + + if (!enqueueTask) { + return; + } + MetadataUpdateRequest* req = new MetadataUpdateRequest(); req->obj = this; req->peerId = peerId; // enqueue the task - if (!m_threadPool.enqueue(new_pooltask(taskMetadataUpdate, req))) { + if (!m_metadataUpdateThreadPool.enqueue(new_pooltask(taskMetadataUpdate, req))) { LogError(LOG_NET, "Failed to task enqueue metadata update, peerId = %u", peerId); - if (req != nullptr) + + // scope is intentional + { + std::lock_guard lock(m_metadataUpdateMutex); + auto it = m_metadataUpdateState.find(peerId); + if (it != m_metadataUpdateState.end()) { + it->second.pending = false; + if (!it->second.inFlight) { + m_metadataUpdateState.erase(it); + } + } + } + + if (req != nullptr) { delete req; + } } } @@ -2435,37 +2497,76 @@ void TrafficNetwork::taskMetadataUpdate(MetadataUpdateRequest* req) if (req == nullptr) return; - std::string peerIdentity = network->resolvePeerIdentity(req->peerId); + while (true) { + // scope is intentional + { + std::lock_guard lock(network->m_metadataUpdateMutex); + + // check if there is a pending metadata update for this peer + MetadataUpdateState& state = network->m_metadataUpdateState[req->peerId]; + if (!state.pending) { + // no pending metadata update for this peer, exit the loop + state.inFlight = false; + network->m_metadataUpdateState.erase(req->peerId); + break; + } - FNEPeerConnection* connection = network->m_peers[req->peerId]; - if (connection != nullptr) { - if (connection->connected()) { - connection->lock(); - uint32_t streamId = network->createStreamId(); + // check if the peer connection is still valid and connected + FNEPeerConnection* connection = network->m_peers[req->peerId]; + if (connection != nullptr) { + if (!connection->connected()) { + // peer connection is not connected, skip the metadata update + LogWarning(LOG_MASTER, "PEER %u (%s) not connected, skipping metadata update", req->peerId, connection->identWithQualifier().c_str()); + state.pending = false; + state.inFlight = false; + network->m_metadataUpdateState.erase(req->peerId); + break; + } + } else { + // peer connection is not found, skip the metadata update + LogWarning(LOG_MASTER, "PEER %u not found, skipping metadata update", req->peerId); + state.pending = false; + state.inFlight = false; + network->m_metadataUpdateState.erase(req->peerId); + break; + } - // if the connection is a downstream neighbor FNE peer, and peer is participating in peer link, - // send the peer proper configuration data - if (connection->peerClass() == PEER_CONN_CLASS_NEIGHBOR && connection->isReplica()) { - LogInfoEx(LOG_MASTER, "PEER %u (%s) sending replica network metadata updates", req->peerId, peerIdentity.c_str()); + state.pending = false; + state.inFlight = true; + } - network->writeWhitelistRIDs(req->peerId, streamId, true); - network->writeTGIDs(req->peerId, streamId, true); - network->writePeerList(req->peerId, streamId); + std::string peerIdentity = network->resolvePeerIdentity(req->peerId); - network->writeHAParameters(req->peerId, streamId, true); - } - else { - LogInfoEx(LOG_MASTER, "PEER %u (%s) sending network metadata updates", req->peerId, peerIdentity.c_str()); + FNEPeerConnection* connection = network->m_peers[req->peerId]; + if (connection != nullptr) { + if (connection->connected()) { + connection->lock(); + uint32_t streamId = network->createStreamId(); - network->writeWhitelistRIDs(req->peerId, streamId, false); - network->writeBlacklistRIDs(req->peerId, streamId); - network->writeTGIDs(req->peerId, streamId, false); - network->writeDeactiveTGIDs(req->peerId, streamId); + // if the connection is a downstream neighbor FNE peer, and peer is participating in peer link, + // send the peer proper configuration data + if (connection->peerClass() == PEER_CONN_CLASS_NEIGHBOR && connection->isReplica()) { + LogInfoEx(LOG_MASTER, "PEER %u (%s) sending replica network metadata updates", req->peerId, peerIdentity.c_str()); - network->writeHAParameters(req->peerId, streamId, false); - } + network->writeWhitelistRIDs(req->peerId, streamId, true); + network->writeTGIDs(req->peerId, streamId, true); + network->writePeerList(req->peerId, streamId); - connection->unlock(); + network->writeHAParameters(req->peerId, streamId, true); + } + else { + LogInfoEx(LOG_MASTER, "PEER %u (%s) sending network metadata updates", req->peerId, peerIdentity.c_str()); + + network->writeWhitelistRIDs(req->peerId, streamId, false); + network->writeBlacklistRIDs(req->peerId, streamId); + network->writeTGIDs(req->peerId, streamId, false); + network->writeDeactiveTGIDs(req->peerId, streamId); + + network->writeHAParameters(req->peerId, streamId, false); + } + + connection->unlock(); + } } } diff --git a/src/fne/network/TrafficNetwork.h b/src/fne/network/TrafficNetwork.h index 4ec8972d..b7687b95 100644 --- a/src/fne/network/TrafficNetwork.h +++ b/src/fne/network/TrafficNetwork.h @@ -404,6 +404,24 @@ namespace network uint32_t m_jitterMaxWait; ThreadPool m_threadPool; + ThreadPool m_metadataUpdateThreadPool; + + /** + * @brief Represents the state of a metadata update for a given peer ID. + * @ingroup fne_network + */ + struct MetadataUpdateState { + /** + * @brief Flag indicating whether a metadata update is currently in flight for this peer ID. + */ + bool inFlight = false; + /** + * @brief Flag indicating whether a metadata update is pending for this peer ID. + */ + bool pending = false; + }; + std::mutex m_metadataUpdateMutex; + std::unordered_map m_metadataUpdateState; bool m_disablePacketData; bool m_dumpPacketData;