make InfluxDB queries async (non-blocking);

pull/85/head
Bryan Biedenkapp 10 months ago
parent d669410e46
commit 4fc8990d16

@ -13,6 +13,7 @@ file(GLOB dvmfne_SRC
"src/fne/network/callhandler/packetdata/*.h"
"src/fne/network/callhandler/packetdata/*.cpp"
"src/fne/network/influxdb/*.h"
"src/fne/network/influxdb/*.cpp"
"src/fne/network/*.h"
"src/fne/network/*.cpp"
"src/fne/xml/*.h"

@ -231,7 +231,7 @@ void* DiagNetwork::threadedNetworkRx(void* arg)
.field("identity", connection->identity())
.field("msg", payload)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(network->m_influxServer);
.requestAsync(network->m_influxServer);
}
// repeat traffic to the connected SysView peers
@ -301,7 +301,7 @@ void* DiagNetwork::threadedNetworkRx(void* arg)
.field("identity", connection->identity())
.field("msg", payload)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(network->m_influxServer);
.requestAsync(network->m_influxServer);
}
}
else {

@ -1268,7 +1268,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
.field("identity", connection->identity())
.field("msg", payload)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(network->m_influxServer);
.requestAsync(network->m_influxServer);
}
}
else {
@ -1309,7 +1309,7 @@ void* FNENetwork::threadedNetworkRx(void* arg)
.field("identity", connection->identity())
.field("msg", payload)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(network->m_influxServer);
.requestAsync(network->m_influxServer);
}
}
else {

@ -194,7 +194,7 @@ bool TagDMRData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
.field("duration", duration)
.field("slot", slotNo)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
m_network->eraseStreamPktSeq(peerId, streamId);
@ -619,7 +619,7 @@ bool TagDMRData::processCSBK(uint8_t* buffer, uint32_t peerId, dmr::data::NetDat
.tag("csbk", csbk->toString())
.field("raw", ss.str())
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
}
@ -766,7 +766,7 @@ bool TagDMRData::validate(uint32_t peerId, data::NetData& data, uint32_t streamI
.field("message", INFLUXDB_ERRSTR_DISABLED_SRC_RID)
.field("slot", data.getSlotNo())
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
// report In-Call Control to the peer sending traffic
@ -803,7 +803,7 @@ bool TagDMRData::validate(uint32_t peerId, data::NetData& data, uint32_t streamI
.field("message", INFLUXDB_ERRSTR_DISABLED_DST_RID)
.field("slot", data.getSlotNo())
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
return false;
@ -824,7 +824,7 @@ bool TagDMRData::validate(uint32_t peerId, data::NetData& data, uint32_t streamI
.field("message", INFLUXDB_ERRSTR_DISABLED_SRC_RID)
.field("slot", data.getSlotNo())
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
LogWarning(LOG_NET, "DMR slot %s, illegal/unknown RID attempted access, srcId = %u, dstId = %u", data.getSlotNo(), data.getSrcId(), data.getDstId());
@ -851,7 +851,7 @@ bool TagDMRData::validate(uint32_t peerId, data::NetData& data, uint32_t streamI
.field("message", INFLUXDB_ERRSTR_INV_TALKGROUP)
.field("slot", data.getSlotNo())
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
// report In-Call Control to the peer sending traffic
@ -883,7 +883,7 @@ bool TagDMRData::validate(uint32_t peerId, data::NetData& data, uint32_t streamI
.field("message", INFLUXDB_ERRSTR_DISABLED_SRC_RID)
.field("slot", data.getSlotNo())
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
LogWarning(LOG_NET, "DMR slot %s, illegal/unknown RID attempted access, srcId = %u, dstId = %u", data.getSlotNo(), data.getSrcId(), data.getDstId());
@ -906,7 +906,7 @@ bool TagDMRData::validate(uint32_t peerId, data::NetData& data, uint32_t streamI
.field("message", INFLUXDB_ERRSTR_INV_SLOT)
.field("slot", data.getSlotNo())
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
// report In-Call Control to the peer sending traffic
@ -927,7 +927,7 @@ bool TagDMRData::validate(uint32_t peerId, data::NetData& data, uint32_t streamI
.field("message", INFLUXDB_ERRSTR_DISABLED_TALKGROUP)
.field("slot", data.getSlotNo())
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
// report In-Call Control to the peer sending traffic
@ -952,7 +952,7 @@ bool TagDMRData::validate(uint32_t peerId, data::NetData& data, uint32_t streamI
.tag("dstId", std::to_string(data.getDstId()))
.field("message", INFLUXDB_ERRSTR_RID_NOT_PERMITTED)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
// report In-Call Control to the peer sending traffic

@ -147,7 +147,7 @@ bool TagNXDNData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerI
.tag("dstId", std::to_string(dstId))
.field("duration", duration)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
m_network->eraseStreamPktSeq(peerId, streamId);
@ -572,7 +572,7 @@ bool TagNXDNData::validate(uint32_t peerId, lc::RTCH& lc, uint8_t messageType, u
.tag("dstId", std::to_string(lc.getDstId()))
.field("message", INFLUXDB_ERRSTR_DISABLED_SRC_RID)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
// report In-Call Control to the peer sending traffic
@ -608,7 +608,7 @@ bool TagNXDNData::validate(uint32_t peerId, lc::RTCH& lc, uint8_t messageType, u
.tag("dstId", std::to_string(lc.getDstId()))
.field("message", INFLUXDB_ERRSTR_DISABLED_DST_RID)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
// report In-Call Control to the peer sending traffic
@ -630,7 +630,7 @@ bool TagNXDNData::validate(uint32_t peerId, lc::RTCH& lc, uint8_t messageType, u
.tag("dstId", std::to_string(lc.getDstId()))
.field("message", INFLUXDB_ERRSTR_DISABLED_SRC_RID)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
LogWarning(LOG_NET, "NXDN, illegal/unknown RID attempted access, srcId = %u, dstId = %u", lc.getSrcId(), lc.getDstId());
@ -658,7 +658,7 @@ bool TagNXDNData::validate(uint32_t peerId, lc::RTCH& lc, uint8_t messageType, u
.tag("dstId", std::to_string(lc.getDstId()))
.field("message", INFLUXDB_ERRSTR_INV_TALKGROUP)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
// report In-Call Control to the peer sending traffic
@ -689,7 +689,7 @@ bool TagNXDNData::validate(uint32_t peerId, lc::RTCH& lc, uint8_t messageType, u
.tag("dstId", std::to_string(lc.getDstId()))
.field("message", INFLUXDB_ERRSTR_DISABLED_SRC_RID)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
LogWarning(LOG_NET, "NXDN, illegal/unknown RID attempted access, srcId = %u, dstId = %u", lc.getSrcId(), lc.getDstId());
@ -711,7 +711,7 @@ bool TagNXDNData::validate(uint32_t peerId, lc::RTCH& lc, uint8_t messageType, u
.tag("dstId", std::to_string(lc.getDstId()))
.field("message", INFLUXDB_ERRSTR_DISABLED_TALKGROUP)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
// report In-Call Control to the peer sending traffic
@ -736,7 +736,7 @@ bool TagNXDNData::validate(uint32_t peerId, lc::RTCH& lc, uint8_t messageType, u
.tag("dstId", std::to_string(lc.getDstId()))
.field("message", INFLUXDB_ERRSTR_RID_NOT_PERMITTED)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
// report In-Call Control to the peer sending traffic

@ -220,7 +220,7 @@ bool TagP25Data::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
.tag("dstId", std::to_string(dstId))
.field("duration", duration)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
m_network->eraseStreamPktSeq(peerId, streamId);
@ -731,7 +731,7 @@ bool TagP25Data::processTSDUFrom(uint8_t* buffer, uint32_t peerId, uint8_t duid)
.tag("tsbk", tsbk->toString())
.field("raw", ss.str())
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
}
@ -1090,7 +1090,7 @@ bool TagP25Data::validate(uint32_t peerId, lc::LC& control, DUID::E duid, const
.tag("dstId", std::to_string(control.getDstId()))
.field("message", INFLUXDB_ERRSTR_DISABLED_SRC_RID)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
// report In-Call Control to the peer sending traffic
@ -1158,7 +1158,7 @@ bool TagP25Data::validate(uint32_t peerId, lc::LC& control, DUID::E duid, const
.tag("dstId", std::to_string(control.getDstId()))
.field("message", INFLUXDB_ERRSTR_DISABLED_DST_RID)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
// report In-Call Control to the peer sending traffic
@ -1180,7 +1180,7 @@ bool TagP25Data::validate(uint32_t peerId, lc::LC& control, DUID::E duid, const
.tag("dstId", std::to_string(control.getDstId()))
.field("message", INFLUXDB_ERRSTR_DISABLED_SRC_RID)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
LogWarning(LOG_NET, "P25, illegal/unknown RID attempted access, srcId = %u, dstId = %u", control.getSrcId(), control.getDstId());
@ -1256,7 +1256,7 @@ bool TagP25Data::validate(uint32_t peerId, lc::LC& control, DUID::E duid, const
.tag("dstId", std::to_string(control.getDstId()))
.field("message", INFLUXDB_ERRSTR_INV_TALKGROUP)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
// report In-Call Control to the peer sending traffic
@ -1287,7 +1287,7 @@ bool TagP25Data::validate(uint32_t peerId, lc::LC& control, DUID::E duid, const
.tag("dstId", std::to_string(control.getDstId()))
.field("message", INFLUXDB_ERRSTR_DISABLED_SRC_RID)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
LogWarning(LOG_NET, "P25, illegal/unknown RID attempted access, srcId = %u, dstId = %u", control.getSrcId(), control.getDstId());
@ -1309,7 +1309,7 @@ bool TagP25Data::validate(uint32_t peerId, lc::LC& control, DUID::E duid, const
.tag("dstId", std::to_string(control.getDstId()))
.field("message", INFLUXDB_ERRSTR_DISABLED_TALKGROUP)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
// report In-Call Control to the peer sending traffic
@ -1334,7 +1334,7 @@ bool TagP25Data::validate(uint32_t peerId, lc::LC& control, DUID::E duid, const
.tag("dstId", std::to_string(control.getDstId()))
.field("message", INFLUXDB_ERRSTR_RID_NOT_PERMITTED)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
// report In-Call Control to the peer sending traffic

@ -226,7 +226,7 @@ bool DMRPacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t pee
.field("duration", duration)
.field("slot", slotNo)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
delete status;

@ -177,7 +177,7 @@ bool P25PacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t pee
.tag("dstId", std::to_string(status->header.getLLId()))
.field("message", INFLUXDB_ERRSTR_DISABLED_SRC_RID)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
delete status;
@ -318,7 +318,7 @@ bool P25PacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t pee
.tag("dstId", std::to_string(dstId))
.field("duration", duration)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
.requestAsync(m_network->m_influxServer);
}
delete status;

@ -0,0 +1,19 @@
// SPDX-License-Identifier: GPL-2.0-only
/*
* Digital Voice Modem - Converged FNE Software
* GPLv2 Open Source. Use is subject to license terms.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* Copyright (C) 2025 Bryan Biedenkapp, N2PLL
*
*/
#include "fne/Defines.h"
#include "fne/network/influxdb/InfluxDB.h"
using namespace network::influxdb;
// ---------------------------------------------------------------------------
// Static Class Members
// ---------------------------------------------------------------------------
uint32_t detail::TSCaller::m_currThreadCnt = 0U;

@ -21,6 +21,7 @@
#include "fne/Defines.h"
#include "common/Log.h"
#include "common/Thread.h"
#include <sstream>
#include <cstring>
@ -56,6 +57,12 @@ namespace network
{
namespace influxdb
{
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
#define MAX_INFLUXQL_THREAD_CNT 75U // this is a really extreme number of pending queries...
// ---------------------------------------------------------------------------
// Class Declaration
// ---------------------------------------------------------------------------
@ -177,14 +184,14 @@ namespace network
ret = getaddrinfo(si.host().c_str(), std::to_string(si.port()).c_str(), &hints, &addr);
if (ret != 0) {
LogError(LOG_NET, "Failed to determine InfluxDB server host, err: %d", errno);
::LogError(LOG_HOST, "Failed to determine InfluxDB server host, err: %d", errno);
return 1;
}
// open the socket
fd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
if (fd < 0) {
LogError(LOG_NET, "Failed to connect to InfluxDB server, err: %d", errno);
::LogError(LOG_HOST, "Failed to connect to InfluxDB server, err: %d", errno);
closesocket(fd);
return 1;
}
@ -193,13 +200,13 @@ namespace network
const int sockOptVal = 1;
#if defined(_WIN32)
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&sockOptVal, sizeof(int)) < 0) {
LogError(LOG_NET, "Failed to connect to InfluxDB server, err: %d", errno);
::LogError(LOG_HOST, "Failed to connect to InfluxDB server, err: %d", errno);
closesocket(fd);
return 1;
}
#else
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &sockOptVal, sizeof(int)) < 0) {
LogError(LOG_NET, "Failed to connect to InfluxDB server, err: %d", errno);
::LogError(LOG_HOST, "Failed to connect to InfluxDB server, err: %d", errno);
closesocket(fd);
return 1;
}
@ -207,7 +214,7 @@ namespace network
// connect to the server
ret = connect(fd, addr->ai_addr, addr->ai_addrlen);
if (ret < 0) {
LogError(LOG_NET, "Failed to connect to InfluxDB server, err: %d", errno);
::LogError(LOG_HOST, "Failed to connect to InfluxDB server, err: %d", errno);
closesocket(fd);
return 1;
}
@ -575,14 +582,90 @@ namespace network
// Structure Declaration
// ---------------------------------------------------------------------------
/**
* @brief Represents the data required for a voice service packet handler thread.
* @ingroup rc_network
*/
struct TSCallerRequest : thread_t {
ServerInfo si; //!
std::string lines; //!
};
// ---------------------------------------------------------------------------
// Structure Declaration
// ---------------------------------------------------------------------------
/**
* @brief
* @ingroup fne_influx
*/
struct HOST_SW_API TSCaller : public QueryBuilder
{
detail::TagCaller& meas(const std::string& m) { m_lines << '\n'; return this->m(m); }
int request(const ServerInfo& si, std::string* resp = nullptr) { return detail::inner::request("POST", "write", "", m_lines.str(), si, resp); }
{
detail::TagCaller& meas(const std::string& m) { m_lines << '\n'; return this->m(m); }
int request(const ServerInfo& si, std::string* resp = nullptr) { return detail::inner::request("POST", "write", "", m_lines.str(), si, resp); }
int requestAsync(const ServerInfo& si)
{
if (m_currThreadCnt >= MAX_INFLUXQL_THREAD_CNT) {
::LogError(LOG_HOST, "Maximum concurrent FluxQL thread count reached, dropping request!");
return 1;
}
TSCallerRequest* req = new TSCallerRequest();
req->obj = this;
req->si = ServerInfo(si.host(), si.port(), si.org(), si.token(), si.bucket());
req->lines = std::string(m_lines.str());
if (!Thread::runAsThread(this, threadedRequest, req)) {
delete req;
return 1;
} else {
m_currThreadCnt++;
}
return 0;
}
private:
static uint32_t m_currThreadCnt;
/**
* @brief
*/
static void* threadedRequest(void* arg)
{
TSCallerRequest* req = (TSCallerRequest*)arg;
if (req != nullptr) {
::pthread_detach(req->thread);
#ifdef _GNU_SOURCE
::pthread_setname_np(req->thread, "fluxql:request");
#endif // _GNU_SOURCE
if (req == nullptr) {
m_currThreadCnt--;
return nullptr;
}
TSCaller* caller = static_cast<TSCaller*>(req->obj);
if (caller == nullptr) {
if (req != nullptr) {
delete req;
}
m_currThreadCnt--;
return nullptr;
}
const ServerInfo& si = req->si;
detail::inner::request("POST", "write", "", req->lines, si, nullptr);
delete req;
}
m_currThreadCnt--;
return nullptr;
}
};
// ---------------------------------------------------------------------------

Loading…
Cancel
Save

Powered by TurnKey Linux.