implement support for reporting activity, diagnostic and call events to an InfluxDB instance;

pull/51/head
Bryan Biedenkapp 2 years ago
parent a8adeeaad7
commit 7d1af0235b

@ -76,6 +76,19 @@ master:
# Flag indicating whether or not a P25 ADJ_STS_BCAST will pass to connected external peers. # Flag indicating whether or not a P25 ADJ_STS_BCAST will pass to connected external peers.
disallowExtAdjStsBcast: true disallowExtAdjStsBcast: true
# Flag indicating whether or not InfluxDB logging and metrics recording is enabled.
enableInflux: false
# Hostname/IP address of the InfluxDB instance to connect to.
influxServerAddress: 127.0.0.1
# Port number of the InfluxDB instance to connect to.
influxServerPort: 8086
# API Token to access the InfluxDB instance API.
influxServerToken: "APITOKEN"
# Organization Name on InfluxDB instance API.
influxOrg: "dvm"
# Data Bucket Name on InfluxDB instance API.
influxBucket: "dvm"
# #
# Talkgroup Rules Configuration # Talkgroup Rules Configuration
# #

@ -16,6 +16,7 @@ file(GLOB dvmfne_SRC
"src/fne/network/fne/*.h" "src/fne/network/fne/*.h"
"src/fne/network/fne/*.cpp" "src/fne/network/fne/*.cpp"
"src/fne/network/influxdb/*.h"
"src/fne/network/*.h" "src/fne/network/*.h"
"src/fne/network/*.cpp" "src/fne/network/*.cpp"
"src/fne/*.h" "src/fne/*.h"

@ -223,6 +223,16 @@ void* DiagNetwork::threadedNetworkRx(void* arg)
std::string payload(rawPayload, rawPayload + (req->length - 11U)); std::string payload(rawPayload, rawPayload + (req->length - 11U));
::ActivityLog("%u %s", peerId, payload.c_str()); ::ActivityLog("%u %s", peerId, payload.c_str());
// report activity log to InfluxDB
if (network->m_enableInfluxDB) {
influxdb::QueryBuilder()
.meas("activity")
.tag("peerId", std::to_string(peerId))
.field("msg", payload)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(network->m_influxServer);
}
} }
else { else {
network->writePeerNAK(peerId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED); network->writePeerNAK(peerId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED);
@ -249,6 +259,16 @@ void* DiagNetwork::threadedNetworkRx(void* arg)
g_disableTimeDisplay = true; g_disableTimeDisplay = true;
::Log(9999U, nullptr, "%u %s", peerId, payload.c_str()); ::Log(9999U, nullptr, "%u %s", peerId, payload.c_str());
g_disableTimeDisplay = currState; g_disableTimeDisplay = currState;
// report diagnostic log to InfluxDB
if (network->m_enableInfluxDB) {
influxdb::QueryBuilder()
.meas("diag")
.tag("peerId", std::to_string(peerId))
.field("msg", payload)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(network->m_influxServer);
}
} }
else { else {
network->writePeerNAK(peerId, TAG_TRANSFER_DIAG_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED); network->writePeerNAK(peerId, TAG_TRANSFER_DIAG_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED);

@ -95,6 +95,12 @@ FNENetwork::FNENetwork(HostFNE* host, const std::string& address, uint16_t port,
m_callInProgress(false), m_callInProgress(false),
m_disallowAdjStsBcast(false), m_disallowAdjStsBcast(false),
m_disallowExtAdjStsBcast(true), m_disallowExtAdjStsBcast(true),
m_enableInfluxDB(false),
m_influxServerAddress("127.0.0.1"),
m_influxServerPort(8086U),
m_influxServerToken(),
m_influxOrg("dvm"),
m_influxBucket("dvm"),
m_reportPeerPing(reportPeerPing), m_reportPeerPing(reportPeerPing),
m_verbose(verbose) m_verbose(verbose)
{ {
@ -139,6 +145,16 @@ void FNENetwork::setOptions(yaml::Node& conf, bool printOptions)
m_disallowExtAdjStsBcast = true; m_disallowExtAdjStsBcast = true;
} }
m_enableInfluxDB = conf["enableInflux"].as<bool>(false);
m_influxServerAddress = conf["influxServerAddress"].as<std::string>("127.0.0.1");
m_influxServerPort = conf["influxServerPort"].as<uint16_t>(8086U);
m_influxServerToken = conf["influxServerToken"].as<std::string>();
m_influxOrg = conf["influxOrg"].as<std::string>("dvm");
m_influxBucket = conf["influxBucket"].as<std::string>("dvm");
if (m_enableInfluxDB) {
m_influxServer = influxdb::ServerInfo(m_influxServerAddress, m_influxServerPort, m_influxOrg, m_influxServerToken, m_influxBucket);
}
if (printOptions) { if (printOptions) {
LogInfo(" Maximum Permitted Connections: %u", m_softConnLimit); LogInfo(" Maximum Permitted Connections: %u", m_softConnLimit);
LogInfo(" Disable P25 ADJ_STS_BCAST to any peers: %s", m_disallowAdjStsBcast ? "yes" : "no"); LogInfo(" Disable P25 ADJ_STS_BCAST to any peers: %s", m_disallowAdjStsBcast ? "yes" : "no");
@ -146,6 +162,13 @@ void FNENetwork::setOptions(yaml::Node& conf, bool printOptions)
LogWarning(LOG_NET, "NOTICE: All P25 ADJ_STS_BCAST messages will be blocked and dropped!"); LogWarning(LOG_NET, "NOTICE: All P25 ADJ_STS_BCAST messages will be blocked and dropped!");
} }
LogInfo(" Disable P25 ADJ_STS_BCAST to external peers: %s", m_disallowExtAdjStsBcast ? "yes" : "no"); LogInfo(" Disable P25 ADJ_STS_BCAST to external peers: %s", m_disallowExtAdjStsBcast ? "yes" : "no");
LogInfo(" InfluxDB Reporting Enabled: %s", m_enableInfluxDB ? "yes" : "no");
if (m_enableInfluxDB) {
LogInfo(" InfluxDB Address: %s", m_influxServerAddress.c_str());
LogInfo(" InfluxDB Port: %u", m_influxServerPort);
LogInfo(" InfluxDB Organization: %s", m_influxOrg.c_str());
LogInfo(" InfluxDB Bucket: %s", m_influxBucket.c_str());
}
} }
} }
@ -832,6 +855,16 @@ void* FNENetwork::threadedNetworkRx(void* arg)
std::string payload(rawPayload, rawPayload + (req->length - 11U)); std::string payload(rawPayload, rawPayload + (req->length - 11U));
::ActivityLog("%u %s", peerId, payload.c_str()); ::ActivityLog("%u %s", peerId, payload.c_str());
// report activity log to InfluxDB
if (network->m_enableInfluxDB) {
influxdb::QueryBuilder()
.meas("activity")
.tag("peerId", std::to_string(peerId))
.field("msg", payload)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(network->m_influxServer);
}
} }
else { else {
network->writePeerNAK(peerId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED); network->writePeerNAK(peerId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED);
@ -858,6 +891,16 @@ void* FNENetwork::threadedNetworkRx(void* arg)
g_disableTimeDisplay = true; g_disableTimeDisplay = true;
::Log(9999U, nullptr, "%u %s", peerId, payload.c_str()); ::Log(9999U, nullptr, "%u %s", peerId, payload.c_str());
g_disableTimeDisplay = currState; g_disableTimeDisplay = currState;
// report diagnostic log to InfluxDB
if (network->m_enableInfluxDB) {
influxdb::QueryBuilder()
.meas("diag")
.tag("peerId", std::to_string(peerId))
.field("msg", payload)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(network->m_influxServer);
}
} }
else { else {
network->writePeerNAK(peerId, TAG_TRANSFER_DIAG_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED); network->writePeerNAK(peerId, TAG_TRANSFER_DIAG_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED);

@ -19,6 +19,7 @@
#include "common/lookups/AffiliationLookup.h" #include "common/lookups/AffiliationLookup.h"
#include "common/lookups/RadioIdLookup.h" #include "common/lookups/RadioIdLookup.h"
#include "common/lookups/TalkgroupRulesLookup.h" #include "common/lookups/TalkgroupRulesLookup.h"
#include "fne/network/influxdb/InfluxDB.h"
#include "host/network/Network.h" #include "host/network/Network.h"
#include <string> #include <string>
@ -287,6 +288,14 @@ namespace network
bool m_disallowAdjStsBcast; bool m_disallowAdjStsBcast;
bool m_disallowExtAdjStsBcast; bool m_disallowExtAdjStsBcast;
bool m_enableInfluxDB;
std::string m_influxServerAddress;
uint16_t m_influxServerPort;
std::string m_influxServerToken;
std::string m_influxOrg;
std::string m_influxBucket;
influxdb::ServerInfo m_influxServer;
bool m_reportPeerPing; bool m_reportPeerPing;
bool m_verbose; bool m_verbose;

@ -156,6 +156,22 @@ bool TagDMRData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
LogMessage(LOG_NET, "DMR, Call End, peer = %u, srcId = %u, dstId = %u, duration = %u, streamId = %u, external = %u", LogMessage(LOG_NET, "DMR, Call End, peer = %u, srcId = %u, dstId = %u, duration = %u, streamId = %u, external = %u",
peerId, srcId, dstId, duration / 1000, streamId, external); peerId, srcId, dstId, duration / 1000, streamId, external);
// report call event to InfluxDB
if (m_network->m_enableInfluxDB) {
influxdb::QueryBuilder()
.meas("call_event")
.tag("peerId", std::to_string(peerId))
.tag("mode", "DMR")
.tag("streamId", std::to_string(streamId))
.tag("srcId", std::to_string(srcId))
.tag("dstId", std::to_string(dstId))
.field("duration", duration)
.field("slot", slotNo)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
}
m_network->m_callInProgress = false; m_network->m_callInProgress = false;
} }
} }
@ -200,7 +216,9 @@ bool TagDMRData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
status.slotNo = slotNo; status.slotNo = slotNo;
status.streamId = streamId; status.streamId = streamId;
m_status[dstId] = status; // this *could* be an issue if a dstId appears on both slots somehow... m_status[dstId] = status; // this *could* be an issue if a dstId appears on both slots somehow...
LogMessage(LOG_NET, "DMR, Call Start, peer = %u, srcId = %u, dstId = %u, streamId = %u, external = %u", peerId, srcId, dstId, streamId, external); LogMessage(LOG_NET, "DMR, Call Start, peer = %u, srcId = %u, dstId = %u, streamId = %u, external = %u", peerId, srcId, dstId, streamId, external);
m_network->m_callInProgress = true; m_network->m_callInProgress = true;
} }
} }

@ -129,6 +129,21 @@ bool TagNXDNData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerI
LogMessage(LOG_NET, "NXDN, Call End, peer = %u, srcId = %u, dstId = %u, duration = %u, streamId = %u, external = %u", LogMessage(LOG_NET, "NXDN, Call End, peer = %u, srcId = %u, dstId = %u, duration = %u, streamId = %u, external = %u",
peerId, srcId, dstId, duration / 1000, streamId, external); peerId, srcId, dstId, duration / 1000, streamId, external);
// report call event to InfluxDB
if (m_network->m_enableInfluxDB) {
influxdb::QueryBuilder()
.meas("call_event")
.tag("peerId", std::to_string(peerId))
.tag("mode", "NXDN")
.tag("streamId", std::to_string(streamId))
.tag("srcId", std::to_string(srcId))
.tag("dstId", std::to_string(dstId))
.field("duration", duration)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
}
m_network->m_callInProgress = false; m_network->m_callInProgress = false;
} }
} }
@ -172,7 +187,9 @@ bool TagNXDNData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerI
status.dstId = dstId; status.dstId = dstId;
status.streamId = streamId; status.streamId = streamId;
m_status[dstId] = status; m_status[dstId] = status;
LogMessage(LOG_NET, "NXDN, Call Start, peer = %u, srcId = %u, dstId = %u, streamId = %u, external = %u", peerId, srcId, dstId, streamId, external); LogMessage(LOG_NET, "NXDN, Call Start, peer = %u, srcId = %u, dstId = %u, streamId = %u, external = %u", peerId, srcId, dstId, streamId, external);
m_network->m_callInProgress = true; m_network->m_callInProgress = true;
} }
} }

@ -187,6 +187,21 @@ bool TagP25Data::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
LogMessage(LOG_NET, "P25, Call End, peer = %u, srcId = %u, dstId = %u, duration = %u, streamId = %u, external = %u", LogMessage(LOG_NET, "P25, Call End, peer = %u, srcId = %u, dstId = %u, duration = %u, streamId = %u, external = %u",
peerId, srcId, dstId, duration / 1000, streamId, external); peerId, srcId, dstId, duration / 1000, streamId, external);
// report call event to InfluxDB
if (m_network->m_enableInfluxDB) {
influxdb::QueryBuilder()
.meas("call_event")
.tag("peerId", std::to_string(peerId))
.tag("mode", "P25")
.tag("streamId", std::to_string(streamId))
.tag("srcId", std::to_string(srcId))
.tag("dstId", std::to_string(dstId))
.field("duration", duration)
.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count())
.request(m_network->m_influxServer);
}
m_network->m_callInProgress = false; m_network->m_callInProgress = false;
} }
} }
@ -230,7 +245,9 @@ bool TagP25Data::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId
status.dstId = dstId; status.dstId = dstId;
status.streamId = streamId; status.streamId = streamId;
m_status[dstId] = status; m_status[dstId] = status;
LogMessage(LOG_NET, "P25, Call Start, peer = %u, srcId = %u, dstId = %u, streamId = %u, external = %u", peerId, srcId, dstId, streamId, external); LogMessage(LOG_NET, "P25, Call Start, peer = %u, srcId = %u, dstId = %u, streamId = %u, external = %u", peerId, srcId, dstId, streamId, external);
m_network->m_callInProgress = true; m_network->m_callInProgress = true;
} }
} }

@ -0,0 +1,551 @@
// SPDX-License-Identifier: MIT-only
/**
* Digital Voice Modem - Converged FNE Software
* MIT Open Source. Use is subject to license terms.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* @package DVM / Converged FNE Software
* @derivedfrom influxdb-cpp (https://github.com/orca-zhang/influxdb-cpp)
* @license MIT License (https://opensource.org/licenses/MIT)
*
* Copyright (c) 2010-2018 <http://ez8.co> <orca.zhang@yahoo.com>
* Copyright (C) 2024 Bryan Biedenkapp, N2PLL
*
*/
#if !defined(__INFLUXDB_H__)
#define __INFLUXDB_H__
#include "fne/Defines.h"
#include "common/Log.h"
#include <sstream>
#include <cstring>
#include <cstdio>
#include <cstdlib>
#define DEFAULT_PRECISION 5
#ifdef _WIN32
#define NOMINMAX
#include <windows.h>
#include <algorithm>
#pragma comment(lib, "ws2_32")
typedef struct iovec { void* iov_base; size_t iov_len; } iovec;
inline __int64 writev(int sock, struct iovec* iov, int cnt) {
__int64 r = send(sock, (const char*)iov->iov_base, iov->iov_len, 0);
return (r < 0 || cnt == 1) ? r : r + writev(sock, iov + 1, cnt - 1);
}
#else
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#define closesocket close
#endif
namespace network
{
namespace influxdb
{
// ---------------------------------------------------------------------------
// Class Declaration
// Implements the diagnostic/activity log networking logic.
// ---------------------------------------------------------------------------
class HOST_SW_API ServerInfo {
public:
/// <summary>Initializes a new instance of the ServerInfo class.</summary>
ServerInfo() :
m_host(),
m_port(8086U),
m_org(),
m_bucket(),
m_token()
{
/* stub */
}
/// <summary>Initializes a new instance of the ServerInfo class.</summary>
/// <param name="host"></param>
/// <param name="port"></param>
/// <param name="org"></param>
/// <param name="token"></param>
/// <param name="bucket"></param>
ServerInfo(const std::string& host, uint16_t port, const std::string& org, const std::string& token, const std::string& bucket = "") :
m_host(host),
m_port(port),
m_org(org),
m_bucket(bucket),
m_token(token)
{
/* stub */
}
public:
/// <summary></summary>
__PROPERTY_PLAIN(std::string, host);
/// <summary></summary>
__PROPERTY_PLAIN(uint16_t, port);
/// <summary></summary>
__PROPERTY_PLAIN(std::string, org);
/// <summary></summary>
__PROPERTY_PLAIN(std::string, bucket);
/// <summary></summary>
__PROPERTY_PLAIN(std::string, token);
};
namespace detail
{
struct MeasCaller;
struct TagCaller;
struct FieldCaller;
struct TSCaller;
struct HOST_SW_API inner {
/// <summary>
///
/// </summary>
/// <param name="method"></param>
/// <param name="uri"></param>
/// <param name="queryString"></param>
/// <param name="body"></param>
/// <param name="si"></param>
/// <param name="resp"></param>
/// <returns></returns>
static int request(const char* method, const char* uri, const std::string& queryString, const std::string& body,
const ServerInfo& si, std::string* resp)
{
std::string header;
struct iovec iv[2];
int fd, contentLength = 0, len = 0;
char ch;
unsigned char chunked = 0;
if (resp)
resp->clear();
struct addrinfo hints, *addr = nullptr;
struct in6_addr serverAddr;
memset(&hints, 0x00, sizeof(hints));
hints.ai_flags = AI_NUMERICSERV;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
// check to see if the address is a valid IPv4 address
int ret = inet_pton(AF_INET, si.host().c_str(), &serverAddr);
if (ret == 1) {
hints.ai_family = AF_INET; // IPv4
hints.ai_flags |= AI_NUMERICHOST;
// not a valid IPv4 -> check to see if address is a valid IPv6 address
} else {
ret = inet_pton(AF_INET6, si.host().c_str(), &serverAddr);
if (ret == 1) {
hints.ai_family = AF_INET6; // IPv6
hints.ai_flags |= AI_NUMERICHOST;
}
}
ret = getaddrinfo(si.host().c_str(), std::to_string(si.port()).c_str(), &hints, &addr);
if (ret != 0) {
LogError(LOG_NET, "Failed to determine InfluxDB server host, err: %d", errno);
return 1;
}
// open the socket
fd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
if (fd < 0) {
LogError(LOG_NET, "Failed to connect to InfluxDB server, err: %d", errno);
closesocket(fd);
return 1;
}
// connect to the server
ret = connect(fd, addr->ai_addr, addr->ai_addrlen);
if (ret < 0) {
LogError(LOG_NET, "Failed to connect to InfluxDB server, err: %d", errno);
closesocket(fd);
return 1;
}
header.resize(len = 0x100);
while (true) {
if (!si.token().empty()) {
iv[0].iov_len = snprintf(&header[0], len,
"%s /api/v2/%s?org=%s&bucket=%s%s HTTP/1.1\r\nHost: %s\r\nAuthorization: Token %s\r\nContent-Type: text/plain; charset=utf-8\r\nContent-Length: %d\r\n\r\n",
method, uri, si.org().c_str(), si.bucket().c_str(), queryString.c_str(), si.host().c_str(), si.token().c_str(), (int)body.length());
} else {
iv[0].iov_len = snprintf(&header[0], len,
"%s /api/v2/%s?org=%s&bucket=%s%s HTTP/1.1\r\nHost: %s\r\nContent-Type: text/plain; charset=utf-8\r\nContent-Length: %d\r\n\r\n",
method, uri, si.org().c_str(), si.bucket().c_str(), queryString.c_str(), si.host().c_str(), (int)body.length());
}
#ifdef INFLUX_DEBUG
LogDebug(LOG_HOST, "InfluxDB Request: %s\n%s", &header[0], body.c_str());
#endif
if ((int)iv[0].iov_len >= len)
header.resize(len *= 2);
else
break;
}
iv[0].iov_base = &header[0];
iv[1].iov_base = (void*)&body[0];
iv[1].iov_len = body.length();
if (writev(fd, iv, 2) < (int)(iv[0].iov_len + iv[1].iov_len)) {
ret = -6;
goto END;
}
iv[0].iov_len = len;
#define _NO_MORE() (len >= (int)iv[0].iov_len && (iv[0].iov_len = recv(fd, &header[0], header.length(), len = 0)) == size_t(-1))
#define _GET_NEXT_CHAR() (ch = _NO_MORE() ? 0 : header[len++])
#define _LOOP_NEXT(statement) for(;;) { if(!(_GET_NEXT_CHAR())) { ret = -7; goto END; } statement }
#define _UNTIL(c) _LOOP_NEXT( if(ch == c) break; )
#define _GET_NUMBER(n) _LOOP_NEXT( if(ch >= '0' && ch <= '9') n = n * 10 + (ch - '0'); else break; )
#define _GET_CHUNKED_LEN(n, c) _LOOP_NEXT( if(ch >= '0' && ch <= '9') n = n * 16 + (ch - '0'); \
else if(ch >= 'A' && ch <= 'F') n = n * 16 + (ch - 'A') + 10; \
else if(ch >= 'a' && ch <= 'f') n = n * 16 + (ch - 'a') + 10; else {if(ch != c) { ret = -8; goto END; } break;} )
#define _(c) if((_GET_NEXT_CHAR()) != c) break;
#define __(c) if((_GET_NEXT_CHAR()) != c) { ret = -9; goto END; }
if (resp)
resp->clear();
_UNTIL(' ')_GET_NUMBER(ret)
while (true) {
_UNTIL('\n')
switch (_GET_NEXT_CHAR()) {
case 'C':_('o')_('n')_('t')_('e')_('n')_('t')_('-')
_('L')_('e')_('n')_('g')_('t')_('h')_(':')_(' ')
_GET_NUMBER(contentLength)
break;
case 'T':_('r')_('a')_('n')_('s')_('f')_('e')_('r')_('-')
_('E')_('n')_('c')_('o')_('d')_('i')_('n')_('g')_(':')
_(' ')_('c')_('h')_('u')_('n')_('k')_('e')_('d')
chunked = 1;
break;
case '\r':__('\n')
switch (chunked) {
do {__('\r')__('\n')
case 1:
_GET_CHUNKED_LEN(contentLength, '\r')__('\n')
if (!contentLength) {
__('\r')__('\n')
goto END;
}
case 0:
while (contentLength > 0 && !_NO_MORE()) {
contentLength -= (iv[1].iov_len = std::min(contentLength, (int)iv[0].iov_len - len));
if (resp)
resp->append(&header[len], iv[1].iov_len);
len += iv[1].iov_len;
}
} while(chunked);
}
goto END;
}
if (!ch) {
ret = -10;
goto END;
}
}
ret = -11;
END:
closesocket(fd);
return ret / 100 == 2 ? 0 : ret;
#undef _NO_MORE
#undef _GET_NEXT_CHAR
#undef _LOOP_NEXT
#undef _UNTIL
#undef _GET_NUMBER
#undef _GET_CHUNKED_LEN
#undef _
#undef __
}
private:
/// <summary>
///
/// </summary>
/// <param name="x"></param>
/// <returns></returns>
static inline uint8_t toHex(uint8_t x) { return x > 9 ? x + 55 : x + 48; }
/// <summary>
///
/// </summary>
/// <param name="out"></param>
/// <param name="src"></param>
static void urlEncode(std::string& out, const std::string& src)
{
size_t pos = 0, start = 0;
while((pos = src.find_first_not_of("abcdefghijklmnopqrstuvwxyqABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_.~", start)) != std::string::npos) {
out.append(src.c_str() + start, pos - start);
if(src[pos] == ' ')
out += "+";
else {
out += '%';
out += toHex((uint8_t)src[pos] >> 4);
out += toHex((uint8_t)src[pos] & 0xF);
}
start = ++pos;
}
out.append(src.c_str() + start, src.length() - start);
}
};
/// <summary>
///
/// </summary>
/// <param name="resp"></param>
/// <param name="query"></param>
/// <param name="si"></param>
inline int fluxQL(std::string& resp, const std::string& query, const ServerInfo& si)
{
// query JSON body
std::stringstream body;
body << "{\"query\": \"";
body << query;
body << "\", \"type\": \"flux\" }";
return detail::inner::request("POST", "query", "", body.str(), si, &resp);
}
} // namespace detail
// ---------------------------------------------------------------------------
// Structure Declaration
//
// ---------------------------------------------------------------------------
struct HOST_SW_API QueryBuilder {
public:
/// <summary>
///
/// </summary>
/// <param name="m"></param>
/// <returns></returns>
detail::TagCaller& meas(const std::string& m) {
m_lines.imbue(std::locale("C"));
m_lines.clear();
return this->m(m);
}
protected:
/// <summary>
///
/// </summary>
/// <param name="m"></param>
/// <returns></returns>
detail::TagCaller& m(const std::string& m) {
escape(m, ", ");
return (detail::TagCaller&)*this;
}
/// <summary>
///
/// </summary>
/// <param name="k"></param>
/// <param name="v"></param>
/// <returns></returns>
detail::TagCaller& t(const std::string& k, const std::string& v) {
m_lines << ",";
escape(k, ",= ");
m_lines << '=';
escape(std::string(v.c_str()), ",= ");
return (detail::TagCaller&)*this;
}
/// <summary>
///
/// </summary>
/// <param name="delim"></param>
/// <param name="k"></param>
/// <param name="v"></param>
/// <returns></returns>
detail::FieldCaller& f_s(char delim, const std::string& k, const std::string& v) {
m_lines << delim;
m_lines << std::fixed;
escape(k, ",= ");
m_lines << "=\"";
escape(std::string(v.c_str()), "\"");
m_lines << "\"";
return (detail::FieldCaller&)*this;
}
/// <summary>
///
/// </summary>
/// <param name="delim"></param>
/// <param name="k"></param>
/// <param name="v"></param>
/// <returns></returns>
detail::FieldCaller& f_i(char delim, const std::string& k, long long v) {
m_lines << delim;
m_lines << std::fixed;
escape(k, ",= ");
m_lines << "=";
m_lines << v << "i";
return (detail::FieldCaller&)*this;
}
/// <summary>
///
/// </summary>
/// <param name="delim"></param>
/// <param name="k"></param>
/// <param name="v"></param>
/// <returns></returns>
detail::FieldCaller& f_ui(char delim, const std::string& k, unsigned long long v) {
m_lines << delim;
m_lines << std::fixed;
escape(k, ",= ");
m_lines << "=";
m_lines << v << "i";
return (detail::FieldCaller&)*this;
}
/// <summary>
///
/// </summary>
/// <param name="delim"></param>
/// <param name="k"></param>
/// <param name="v"></param>
/// <param name="prec"></param>
/// <returns></returns>
detail::FieldCaller& f_f(char delim, const std::string& k, double v, int prec) {
m_lines << delim;
escape(k, ",= ");
m_lines << std::fixed;
m_lines.precision(prec);
m_lines << "=" << v;
return (detail::FieldCaller&)*this;
}
/// <summary>
///
/// </summary>
/// <param name="delim"></param>
/// <param name="k"></param>
/// <param name="v"></param>
/// <returns></returns>
detail::FieldCaller& f_b(char delim, const std::string& k, bool v) {
m_lines << delim;
escape(k, ",= ");
m_lines << std::fixed;
m_lines << "=" << (v ? "t" : "f");
return (detail::FieldCaller&)*this;
}
/// <summary>
///
/// </summary>
/// <param name="ts"></param>
/// <returns></returns>
detail::TSCaller& ts(uint64_t ts) {
m_lines << " " << ts;
return (detail::TSCaller&)*this;
}
/// <summary>
///
/// </summary>
/// <param name="src"></param>
/// <param name="escapeSeq"></param>
void escape(const std::string& src, const char* escapeSeq)
{
size_t pos = 0, start = 0;
while ((pos = src.find_first_of(escapeSeq, start)) != std::string::npos) {
m_lines.write(src.c_str() + start, pos - start);
m_lines << "\\" << src[pos];
start = ++pos;
}
m_lines.write(src.c_str() + start, src.length() - start);
}
std::stringstream m_lines;
};
namespace detail {
// ---------------------------------------------------------------------------
// Structure Declaration
//
// ---------------------------------------------------------------------------
struct HOST_SW_API TagCaller : public QueryBuilder
{
detail::TagCaller& tag(const std::string& k, const std::string& v) { return t(k, v); }
detail::FieldCaller& field(const std::string& k, const std::string& v) { return f_s(' ', k, v); }
detail::FieldCaller& field(const std::string& k, bool v) { return f_b(' ', k, v); }
detail::FieldCaller& field(const std::string& k, short v) { return f_i(' ', k, v); }
detail::FieldCaller& field(const std::string& k, int v) { return f_i(' ', k, v); }
detail::FieldCaller& field(const std::string& k, long v) { return f_i(' ', k, v); }
detail::FieldCaller& field(const std::string& k, uint16_t v) { return f_ui(' ', k, v); }
detail::FieldCaller& field(const std::string& k, uint32_t v) { return f_ui(' ', k, v); }
detail::FieldCaller& field(const std::string& k, uint64_t v) { return f_ui(' ', k, v); }
detail::FieldCaller& field(const std::string& k, long long v) { return f_i(' ', k, v); }
detail::FieldCaller& field(const std::string& k, double v, int prec = DEFAULT_PRECISION) { return f_f(' ', k, v, prec); }
private:
detail::TagCaller& meas(const std::string& m);
};
// ---------------------------------------------------------------------------
// Structure Declaration
//
// ---------------------------------------------------------------------------
struct HOST_SW_API TSCaller : public QueryBuilder
{
detail::TagCaller& meas(const std::string& m) { m_lines << '\n'; return this->m(m); }
int request(const ServerInfo& si, std::string* resp = nullptr) { return detail::inner::request("POST", "write", "", m_lines.str(), si, resp); }
};
// ---------------------------------------------------------------------------
// Structure Declaration
//
// ---------------------------------------------------------------------------
struct HOST_SW_API FieldCaller : public TSCaller
{
detail::FieldCaller& field(const std::string& k, const std::string& v) { return f_s(',', k, v); }
detail::FieldCaller& field(const std::string& k, bool v) { return f_b(',', k, v); }
detail::FieldCaller& field(const std::string& k, short v) { return f_i(',', k, v); }
detail::FieldCaller& field(const std::string& k, int v) { return f_i(',', k, v); }
detail::FieldCaller& field(const std::string& k, long v) { return f_i(',', k, v); }
detail::FieldCaller& field(const std::string& k, uint16_t v) { return f_ui(',', k, v); }
detail::FieldCaller& field(const std::string& k, uint32_t v) { return f_ui(',', k, v); }
detail::FieldCaller& field(const std::string& k, uint64_t v) { return f_ui(',', k, v); }
detail::FieldCaller& field(const std::string& k, long long v) { return f_i(',', k, v); }
detail::FieldCaller& field(const std::string& k, double v, int prec = 2) { return f_f(',', k, v, prec); }
detail::TSCaller& timestamp(uint64_t ts) { return this->ts(ts); }
};
}
} // namespace influxdb
} // namespace network
#endif // __INFLUXDB_H__
Loading…
Cancel
Save

Powered by TurnKey Linux.