diff --git a/configs/fne-config.example.yml b/configs/fne-config.example.yml index 69693952..4fc2f286 100644 --- a/configs/fne-config.example.yml +++ b/configs/fne-config.example.yml @@ -76,6 +76,19 @@ master: # Flag indicating whether or not a P25 ADJ_STS_BCAST will pass to connected external peers. disallowExtAdjStsBcast: true + # Flag indicating whether or not InfluxDB logging and metrics recording is enabled. + enableInflux: false + # Hostname/IP address of the InfluxDB instance to connect to. + influxServerAddress: 127.0.0.1 + # Port number of the InfluxDB instance to connect to. + influxServerPort: 8086 + # API Token to access the InfluxDB instance API. + influxServerToken: "APITOKEN" + # Organization Name on InfluxDB instance API. + influxOrg: "dvm" + # Data Bucket Name on InfluxDB instance API. + influxBucket: "dvm" + # # Talkgroup Rules Configuration # diff --git a/src/fne/CMakeLists.txt b/src/fne/CMakeLists.txt index 7af5e992..bb2c6b87 100644 --- a/src/fne/CMakeLists.txt +++ b/src/fne/CMakeLists.txt @@ -16,6 +16,7 @@ file(GLOB dvmfne_SRC "src/fne/network/fne/*.h" "src/fne/network/fne/*.cpp" + "src/fne/network/influxdb/*.h" "src/fne/network/*.h" "src/fne/network/*.cpp" "src/fne/*.h" diff --git a/src/fne/network/DiagNetwork.cpp b/src/fne/network/DiagNetwork.cpp index cbb29d45..a07b0aa5 100644 --- a/src/fne/network/DiagNetwork.cpp +++ b/src/fne/network/DiagNetwork.cpp @@ -223,6 +223,16 @@ void* DiagNetwork::threadedNetworkRx(void* arg) std::string payload(rawPayload, rawPayload + (req->length - 11U)); ::ActivityLog("%u %s", peerId, payload.c_str()); + + // report activity log to InfluxDB + if (network->m_enableInfluxDB) { + influxdb::QueryBuilder() + .meas("activity") + .tag("peerId", std::to_string(peerId)) + .field("msg", payload) + .timestamp(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count()) + .request(network->m_influxServer); + } } else { network->writePeerNAK(peerId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED); @@ -249,6 +259,16 @@ void* DiagNetwork::threadedNetworkRx(void* arg) g_disableTimeDisplay = true; ::Log(9999U, nullptr, "%u %s", peerId, payload.c_str()); g_disableTimeDisplay = currState; + + // report diagnostic log to InfluxDB + if (network->m_enableInfluxDB) { + influxdb::QueryBuilder() + .meas("diag") + .tag("peerId", std::to_string(peerId)) + .field("msg", payload) + .timestamp(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count()) + .request(network->m_influxServer); + } } else { network->writePeerNAK(peerId, TAG_TRANSFER_DIAG_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED); diff --git a/src/fne/network/FNENetwork.cpp b/src/fne/network/FNENetwork.cpp index ff836968..e1206110 100644 --- a/src/fne/network/FNENetwork.cpp +++ b/src/fne/network/FNENetwork.cpp @@ -95,6 +95,12 @@ FNENetwork::FNENetwork(HostFNE* host, const std::string& address, uint16_t port, m_callInProgress(false), m_disallowAdjStsBcast(false), m_disallowExtAdjStsBcast(true), + m_enableInfluxDB(false), + m_influxServerAddress("127.0.0.1"), + m_influxServerPort(8086U), + m_influxServerToken(), + m_influxOrg("dvm"), + m_influxBucket("dvm"), m_reportPeerPing(reportPeerPing), m_verbose(verbose) { @@ -139,6 +145,16 @@ void FNENetwork::setOptions(yaml::Node& conf, bool printOptions) m_disallowExtAdjStsBcast = true; } + m_enableInfluxDB = conf["enableInflux"].as(false); + m_influxServerAddress = conf["influxServerAddress"].as("127.0.0.1"); + m_influxServerPort = conf["influxServerPort"].as(8086U); + m_influxServerToken = conf["influxServerToken"].as(); + m_influxOrg = conf["influxOrg"].as("dvm"); + m_influxBucket = conf["influxBucket"].as("dvm"); + if (m_enableInfluxDB) { + m_influxServer = influxdb::ServerInfo(m_influxServerAddress, m_influxServerPort, m_influxOrg, m_influxServerToken, m_influxBucket); + } + if (printOptions) { LogInfo(" Maximum Permitted Connections: %u", m_softConnLimit); LogInfo(" Disable P25 ADJ_STS_BCAST to any peers: %s", m_disallowAdjStsBcast ? "yes" : "no"); @@ -146,6 +162,13 @@ void FNENetwork::setOptions(yaml::Node& conf, bool printOptions) LogWarning(LOG_NET, "NOTICE: All P25 ADJ_STS_BCAST messages will be blocked and dropped!"); } LogInfo(" Disable P25 ADJ_STS_BCAST to external peers: %s", m_disallowExtAdjStsBcast ? "yes" : "no"); + LogInfo(" InfluxDB Reporting Enabled: %s", m_enableInfluxDB ? "yes" : "no"); + if (m_enableInfluxDB) { + LogInfo(" InfluxDB Address: %s", m_influxServerAddress.c_str()); + LogInfo(" InfluxDB Port: %u", m_influxServerPort); + LogInfo(" InfluxDB Organization: %s", m_influxOrg.c_str()); + LogInfo(" InfluxDB Bucket: %s", m_influxBucket.c_str()); + } } } @@ -832,6 +855,16 @@ void* FNENetwork::threadedNetworkRx(void* arg) std::string payload(rawPayload, rawPayload + (req->length - 11U)); ::ActivityLog("%u %s", peerId, payload.c_str()); + + // report activity log to InfluxDB + if (network->m_enableInfluxDB) { + influxdb::QueryBuilder() + .meas("activity") + .tag("peerId", std::to_string(peerId)) + .field("msg", payload) + .timestamp(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count()) + .request(network->m_influxServer); + } } else { network->writePeerNAK(peerId, TAG_TRANSFER_ACT_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED); @@ -858,6 +891,16 @@ void* FNENetwork::threadedNetworkRx(void* arg) g_disableTimeDisplay = true; ::Log(9999U, nullptr, "%u %s", peerId, payload.c_str()); g_disableTimeDisplay = currState; + + // report diagnostic log to InfluxDB + if (network->m_enableInfluxDB) { + influxdb::QueryBuilder() + .meas("diag") + .tag("peerId", std::to_string(peerId)) + .field("msg", payload) + .timestamp(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count()) + .request(network->m_influxServer); + } } else { network->writePeerNAK(peerId, TAG_TRANSFER_DIAG_LOG, NET_CONN_NAK_FNE_UNAUTHORIZED); diff --git a/src/fne/network/FNENetwork.h b/src/fne/network/FNENetwork.h index 565af180..58980841 100644 --- a/src/fne/network/FNENetwork.h +++ b/src/fne/network/FNENetwork.h @@ -19,6 +19,7 @@ #include "common/lookups/AffiliationLookup.h" #include "common/lookups/RadioIdLookup.h" #include "common/lookups/TalkgroupRulesLookup.h" +#include "fne/network/influxdb/InfluxDB.h" #include "host/network/Network.h" #include @@ -287,6 +288,14 @@ namespace network bool m_disallowAdjStsBcast; bool m_disallowExtAdjStsBcast; + bool m_enableInfluxDB; + std::string m_influxServerAddress; + uint16_t m_influxServerPort; + std::string m_influxServerToken; + std::string m_influxOrg; + std::string m_influxBucket; + influxdb::ServerInfo m_influxServer; + bool m_reportPeerPing; bool m_verbose; diff --git a/src/fne/network/fne/TagDMRData.cpp b/src/fne/network/fne/TagDMRData.cpp index 824eef1e..380d331a 100644 --- a/src/fne/network/fne/TagDMRData.cpp +++ b/src/fne/network/fne/TagDMRData.cpp @@ -156,6 +156,22 @@ bool TagDMRData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId LogMessage(LOG_NET, "DMR, Call End, peer = %u, srcId = %u, dstId = %u, duration = %u, streamId = %u, external = %u", peerId, srcId, dstId, duration / 1000, streamId, external); + + // report call event to InfluxDB + if (m_network->m_enableInfluxDB) { + influxdb::QueryBuilder() + .meas("call_event") + .tag("peerId", std::to_string(peerId)) + .tag("mode", "DMR") + .tag("streamId", std::to_string(streamId)) + .tag("srcId", std::to_string(srcId)) + .tag("dstId", std::to_string(dstId)) + .field("duration", duration) + .field("slot", slotNo) + .timestamp(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count()) + .request(m_network->m_influxServer); + } + m_network->m_callInProgress = false; } } @@ -200,7 +216,9 @@ bool TagDMRData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId status.slotNo = slotNo; status.streamId = streamId; m_status[dstId] = status; // this *could* be an issue if a dstId appears on both slots somehow... + LogMessage(LOG_NET, "DMR, Call Start, peer = %u, srcId = %u, dstId = %u, streamId = %u, external = %u", peerId, srcId, dstId, streamId, external); + m_network->m_callInProgress = true; } } diff --git a/src/fne/network/fne/TagNXDNData.cpp b/src/fne/network/fne/TagNXDNData.cpp index 0b34bb44..967040d7 100644 --- a/src/fne/network/fne/TagNXDNData.cpp +++ b/src/fne/network/fne/TagNXDNData.cpp @@ -129,6 +129,21 @@ bool TagNXDNData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerI LogMessage(LOG_NET, "NXDN, Call End, peer = %u, srcId = %u, dstId = %u, duration = %u, streamId = %u, external = %u", peerId, srcId, dstId, duration / 1000, streamId, external); + + // report call event to InfluxDB + if (m_network->m_enableInfluxDB) { + influxdb::QueryBuilder() + .meas("call_event") + .tag("peerId", std::to_string(peerId)) + .tag("mode", "NXDN") + .tag("streamId", std::to_string(streamId)) + .tag("srcId", std::to_string(srcId)) + .tag("dstId", std::to_string(dstId)) + .field("duration", duration) + .timestamp(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count()) + .request(m_network->m_influxServer); + } + m_network->m_callInProgress = false; } } @@ -172,7 +187,9 @@ bool TagNXDNData::processFrame(const uint8_t* data, uint32_t len, uint32_t peerI status.dstId = dstId; status.streamId = streamId; m_status[dstId] = status; + LogMessage(LOG_NET, "NXDN, Call Start, peer = %u, srcId = %u, dstId = %u, streamId = %u, external = %u", peerId, srcId, dstId, streamId, external); + m_network->m_callInProgress = true; } } diff --git a/src/fne/network/fne/TagP25Data.cpp b/src/fne/network/fne/TagP25Data.cpp index 7a7096e0..b9a5423f 100644 --- a/src/fne/network/fne/TagP25Data.cpp +++ b/src/fne/network/fne/TagP25Data.cpp @@ -187,6 +187,21 @@ bool TagP25Data::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId LogMessage(LOG_NET, "P25, Call End, peer = %u, srcId = %u, dstId = %u, duration = %u, streamId = %u, external = %u", peerId, srcId, dstId, duration / 1000, streamId, external); + + // report call event to InfluxDB + if (m_network->m_enableInfluxDB) { + influxdb::QueryBuilder() + .meas("call_event") + .tag("peerId", std::to_string(peerId)) + .tag("mode", "P25") + .tag("streamId", std::to_string(streamId)) + .tag("srcId", std::to_string(srcId)) + .tag("dstId", std::to_string(dstId)) + .field("duration", duration) + .timestamp(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count()) + .request(m_network->m_influxServer); + } + m_network->m_callInProgress = false; } } @@ -230,7 +245,9 @@ bool TagP25Data::processFrame(const uint8_t* data, uint32_t len, uint32_t peerId status.dstId = dstId; status.streamId = streamId; m_status[dstId] = status; + LogMessage(LOG_NET, "P25, Call Start, peer = %u, srcId = %u, dstId = %u, streamId = %u, external = %u", peerId, srcId, dstId, streamId, external); + m_network->m_callInProgress = true; } } diff --git a/src/fne/network/influxdb/InfluxDB.h b/src/fne/network/influxdb/InfluxDB.h new file mode 100644 index 00000000..030b34ec --- /dev/null +++ b/src/fne/network/influxdb/InfluxDB.h @@ -0,0 +1,551 @@ +// SPDX-License-Identifier: MIT-only +/** +* Digital Voice Modem - Converged FNE Software +* MIT Open Source. Use is subject to license terms. +* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. +* +* @package DVM / Converged FNE Software +* @derivedfrom influxdb-cpp (https://github.com/orca-zhang/influxdb-cpp) +* @license MIT License (https://opensource.org/licenses/MIT) +* +* Copyright (c) 2010-2018 +* Copyright (C) 2024 Bryan Biedenkapp, N2PLL +* +*/ +#if !defined(__INFLUXDB_H__) +#define __INFLUXDB_H__ + +#include "fne/Defines.h" +#include "common/Log.h" + +#include +#include +#include +#include + +#define DEFAULT_PRECISION 5 + +#ifdef _WIN32 + #define NOMINMAX + #include + #include + + #pragma comment(lib, "ws2_32") + typedef struct iovec { void* iov_base; size_t iov_len; } iovec; + + inline __int64 writev(int sock, struct iovec* iov, int cnt) { + __int64 r = send(sock, (const char*)iov->iov_base, iov->iov_len, 0); + return (r < 0 || cnt == 1) ? r : r + writev(sock, iov + 1, cnt - 1); + } +#else + #include + #include + #include + #include + #include + #include + #include + #define closesocket close +#endif + +namespace network +{ + namespace influxdb + { + // --------------------------------------------------------------------------- + // Class Declaration + // Implements the diagnostic/activity log networking logic. + // --------------------------------------------------------------------------- + + class HOST_SW_API ServerInfo { + public: + /// Initializes a new instance of the ServerInfo class. + ServerInfo() : + m_host(), + m_port(8086U), + m_org(), + m_bucket(), + m_token() + { + /* stub */ + } + + /// Initializes a new instance of the ServerInfo class. + /// + /// + /// + /// + /// + ServerInfo(const std::string& host, uint16_t port, const std::string& org, const std::string& token, const std::string& bucket = "") : + m_host(host), + m_port(port), + m_org(org), + m_bucket(bucket), + m_token(token) + { + /* stub */ + } + + public: + /// + __PROPERTY_PLAIN(std::string, host); + /// + __PROPERTY_PLAIN(uint16_t, port); + /// + __PROPERTY_PLAIN(std::string, org); + /// + __PROPERTY_PLAIN(std::string, bucket); + /// + __PROPERTY_PLAIN(std::string, token); + }; + + namespace detail + { + struct MeasCaller; + struct TagCaller; + struct FieldCaller; + struct TSCaller; + + struct HOST_SW_API inner { + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + static int request(const char* method, const char* uri, const std::string& queryString, const std::string& body, + const ServerInfo& si, std::string* resp) + { + std::string header; + struct iovec iv[2]; + int fd, contentLength = 0, len = 0; + char ch; + unsigned char chunked = 0; + + if (resp) + resp->clear(); + + struct addrinfo hints, *addr = nullptr; + struct in6_addr serverAddr; + memset(&hints, 0x00, sizeof(hints)); + hints.ai_flags = AI_NUMERICSERV; + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + // check to see if the address is a valid IPv4 address + int ret = inet_pton(AF_INET, si.host().c_str(), &serverAddr); + if (ret == 1) { + hints.ai_family = AF_INET; // IPv4 + hints.ai_flags |= AI_NUMERICHOST; + // not a valid IPv4 -> check to see if address is a valid IPv6 address + } else { + ret = inet_pton(AF_INET6, si.host().c_str(), &serverAddr); + if (ret == 1) { + hints.ai_family = AF_INET6; // IPv6 + hints.ai_flags |= AI_NUMERICHOST; + } + } + + ret = getaddrinfo(si.host().c_str(), std::to_string(si.port()).c_str(), &hints, &addr); + if (ret != 0) { + LogError(LOG_NET, "Failed to determine InfluxDB server host, err: %d", errno); + return 1; + } + + // open the socket + fd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); + if (fd < 0) { + LogError(LOG_NET, "Failed to connect to InfluxDB server, err: %d", errno); + closesocket(fd); + return 1; + } + + // connect to the server + ret = connect(fd, addr->ai_addr, addr->ai_addrlen); + if (ret < 0) { + LogError(LOG_NET, "Failed to connect to InfluxDB server, err: %d", errno); + closesocket(fd); + return 1; + } + + header.resize(len = 0x100); + while (true) { + if (!si.token().empty()) { + iv[0].iov_len = snprintf(&header[0], len, + "%s /api/v2/%s?org=%s&bucket=%s%s HTTP/1.1\r\nHost: %s\r\nAuthorization: Token %s\r\nContent-Type: text/plain; charset=utf-8\r\nContent-Length: %d\r\n\r\n", + method, uri, si.org().c_str(), si.bucket().c_str(), queryString.c_str(), si.host().c_str(), si.token().c_str(), (int)body.length()); + } else { + iv[0].iov_len = snprintf(&header[0], len, + "%s /api/v2/%s?org=%s&bucket=%s%s HTTP/1.1\r\nHost: %s\r\nContent-Type: text/plain; charset=utf-8\r\nContent-Length: %d\r\n\r\n", + method, uri, si.org().c_str(), si.bucket().c_str(), queryString.c_str(), si.host().c_str(), (int)body.length()); + } +#ifdef INFLUX_DEBUG + LogDebug(LOG_HOST, "InfluxDB Request: %s\n%s", &header[0], body.c_str()); +#endif + if ((int)iv[0].iov_len >= len) + header.resize(len *= 2); + else + break; + } + + iv[0].iov_base = &header[0]; + iv[1].iov_base = (void*)&body[0]; + iv[1].iov_len = body.length(); + + if (writev(fd, iv, 2) < (int)(iv[0].iov_len + iv[1].iov_len)) { + ret = -6; + goto END; + } + + iv[0].iov_len = len; + +#define _NO_MORE() (len >= (int)iv[0].iov_len && (iv[0].iov_len = recv(fd, &header[0], header.length(), len = 0)) == size_t(-1)) +#define _GET_NEXT_CHAR() (ch = _NO_MORE() ? 0 : header[len++]) +#define _LOOP_NEXT(statement) for(;;) { if(!(_GET_NEXT_CHAR())) { ret = -7; goto END; } statement } +#define _UNTIL(c) _LOOP_NEXT( if(ch == c) break; ) +#define _GET_NUMBER(n) _LOOP_NEXT( if(ch >= '0' && ch <= '9') n = n * 10 + (ch - '0'); else break; ) +#define _GET_CHUNKED_LEN(n, c) _LOOP_NEXT( if(ch >= '0' && ch <= '9') n = n * 16 + (ch - '0'); \ + else if(ch >= 'A' && ch <= 'F') n = n * 16 + (ch - 'A') + 10; \ + else if(ch >= 'a' && ch <= 'f') n = n * 16 + (ch - 'a') + 10; else {if(ch != c) { ret = -8; goto END; } break;} ) +#define _(c) if((_GET_NEXT_CHAR()) != c) break; +#define __(c) if((_GET_NEXT_CHAR()) != c) { ret = -9; goto END; } + + if (resp) + resp->clear(); + + _UNTIL(' ')_GET_NUMBER(ret) + while (true) { + _UNTIL('\n') + switch (_GET_NEXT_CHAR()) { + case 'C':_('o')_('n')_('t')_('e')_('n')_('t')_('-') + _('L')_('e')_('n')_('g')_('t')_('h')_(':')_(' ') + _GET_NUMBER(contentLength) + break; + case 'T':_('r')_('a')_('n')_('s')_('f')_('e')_('r')_('-') + _('E')_('n')_('c')_('o')_('d')_('i')_('n')_('g')_(':') + _(' ')_('c')_('h')_('u')_('n')_('k')_('e')_('d') + chunked = 1; + break; + case '\r':__('\n') + switch (chunked) { + do {__('\r')__('\n') + case 1: + _GET_CHUNKED_LEN(contentLength, '\r')__('\n') + if (!contentLength) { + __('\r')__('\n') + goto END; + } + case 0: + while (contentLength > 0 && !_NO_MORE()) { + contentLength -= (iv[1].iov_len = std::min(contentLength, (int)iv[0].iov_len - len)); + if (resp) + resp->append(&header[len], iv[1].iov_len); + len += iv[1].iov_len; + } + } while(chunked); + } + goto END; + } + + if (!ch) { + ret = -10; + goto END; + } + } + + ret = -11; + END: + closesocket(fd); + return ret / 100 == 2 ? 0 : ret; +#undef _NO_MORE +#undef _GET_NEXT_CHAR +#undef _LOOP_NEXT +#undef _UNTIL +#undef _GET_NUMBER +#undef _GET_CHUNKED_LEN +#undef _ +#undef __ + } + + private: + /// + /// + /// + /// + /// + static inline uint8_t toHex(uint8_t x) { return x > 9 ? x + 55 : x + 48; } + + /// + /// + /// + /// + /// + static void urlEncode(std::string& out, const std::string& src) + { + size_t pos = 0, start = 0; + while((pos = src.find_first_not_of("abcdefghijklmnopqrstuvwxyqABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_.~", start)) != std::string::npos) { + out.append(src.c_str() + start, pos - start); + + if(src[pos] == ' ') + out += "+"; + else { + out += '%'; + out += toHex((uint8_t)src[pos] >> 4); + out += toHex((uint8_t)src[pos] & 0xF); + } + + start = ++pos; + } + + out.append(src.c_str() + start, src.length() - start); + } + }; + + /// + /// + /// + /// + /// + /// + inline int fluxQL(std::string& resp, const std::string& query, const ServerInfo& si) + { + // query JSON body + std::stringstream body; + body << "{\"query\": \""; + body << query; + body << "\", \"type\": \"flux\" }"; + + return detail::inner::request("POST", "query", "", body.str(), si, &resp); + } + } // namespace detail + + // --------------------------------------------------------------------------- + // Structure Declaration + // + // --------------------------------------------------------------------------- + + struct HOST_SW_API QueryBuilder { + public: + /// + /// + /// + /// + /// + detail::TagCaller& meas(const std::string& m) { + m_lines.imbue(std::locale("C")); + m_lines.clear(); + return this->m(m); + } + + protected: + /// + /// + /// + /// + /// + detail::TagCaller& m(const std::string& m) { + escape(m, ", "); + return (detail::TagCaller&)*this; + } + + /// + /// + /// + /// + /// + /// + detail::TagCaller& t(const std::string& k, const std::string& v) { + m_lines << ","; + + escape(k, ",= "); + m_lines << '='; + escape(std::string(v.c_str()), ",= "); + + return (detail::TagCaller&)*this; + } + + /// + /// + /// + /// + /// + /// + /// + detail::FieldCaller& f_s(char delim, const std::string& k, const std::string& v) { + m_lines << delim; + m_lines << std::fixed; + + escape(k, ",= "); + m_lines << "=\""; + escape(std::string(v.c_str()), "\""); + m_lines << "\""; + + return (detail::FieldCaller&)*this; + } + + /// + /// + /// + /// + /// + /// + /// + detail::FieldCaller& f_i(char delim, const std::string& k, long long v) { + m_lines << delim; + m_lines << std::fixed; + + escape(k, ",= "); + m_lines << "="; + m_lines << v << "i"; + + return (detail::FieldCaller&)*this; + } + + /// + /// + /// + /// + /// + /// + /// + detail::FieldCaller& f_ui(char delim, const std::string& k, unsigned long long v) { + m_lines << delim; + m_lines << std::fixed; + + escape(k, ",= "); + m_lines << "="; + m_lines << v << "i"; + + return (detail::FieldCaller&)*this; + } + + /// + /// + /// + /// + /// + /// + /// + /// + detail::FieldCaller& f_f(char delim, const std::string& k, double v, int prec) { + m_lines << delim; + + escape(k, ",= "); + m_lines << std::fixed; + m_lines.precision(prec); + m_lines << "=" << v; + + return (detail::FieldCaller&)*this; + } + + /// + /// + /// + /// + /// + /// + /// + detail::FieldCaller& f_b(char delim, const std::string& k, bool v) { + m_lines << delim; + + escape(k, ",= "); + m_lines << std::fixed; + m_lines << "=" << (v ? "t" : "f"); + + return (detail::FieldCaller&)*this; + } + + /// + /// + /// + /// + /// + detail::TSCaller& ts(uint64_t ts) { + m_lines << " " << ts; + return (detail::TSCaller&)*this; + } + + /// + /// + /// + /// + /// + void escape(const std::string& src, const char* escapeSeq) + { + size_t pos = 0, start = 0; + + while ((pos = src.find_first_of(escapeSeq, start)) != std::string::npos) { + m_lines.write(src.c_str() + start, pos - start); + m_lines << "\\" << src[pos]; + start = ++pos; + } + + m_lines.write(src.c_str() + start, src.length() - start); + } + + std::stringstream m_lines; + }; + + namespace detail { + // --------------------------------------------------------------------------- + // Structure Declaration + // + // --------------------------------------------------------------------------- + + struct HOST_SW_API TagCaller : public QueryBuilder + { + detail::TagCaller& tag(const std::string& k, const std::string& v) { return t(k, v); } + detail::FieldCaller& field(const std::string& k, const std::string& v) { return f_s(' ', k, v); } + detail::FieldCaller& field(const std::string& k, bool v) { return f_b(' ', k, v); } + detail::FieldCaller& field(const std::string& k, short v) { return f_i(' ', k, v); } + detail::FieldCaller& field(const std::string& k, int v) { return f_i(' ', k, v); } + detail::FieldCaller& field(const std::string& k, long v) { return f_i(' ', k, v); } + detail::FieldCaller& field(const std::string& k, uint16_t v) { return f_ui(' ', k, v); } + detail::FieldCaller& field(const std::string& k, uint32_t v) { return f_ui(' ', k, v); } + detail::FieldCaller& field(const std::string& k, uint64_t v) { return f_ui(' ', k, v); } + detail::FieldCaller& field(const std::string& k, long long v) { return f_i(' ', k, v); } + detail::FieldCaller& field(const std::string& k, double v, int prec = DEFAULT_PRECISION) { return f_f(' ', k, v, prec); } + + private: + detail::TagCaller& meas(const std::string& m); + }; + + // --------------------------------------------------------------------------- + // Structure Declaration + // + // --------------------------------------------------------------------------- + + struct HOST_SW_API TSCaller : public QueryBuilder + { + detail::TagCaller& meas(const std::string& m) { m_lines << '\n'; return this->m(m); } + int request(const ServerInfo& si, std::string* resp = nullptr) { return detail::inner::request("POST", "write", "", m_lines.str(), si, resp); } + }; + + // --------------------------------------------------------------------------- + // Structure Declaration + // + // --------------------------------------------------------------------------- + + struct HOST_SW_API FieldCaller : public TSCaller + { + detail::FieldCaller& field(const std::string& k, const std::string& v) { return f_s(',', k, v); } + detail::FieldCaller& field(const std::string& k, bool v) { return f_b(',', k, v); } + detail::FieldCaller& field(const std::string& k, short v) { return f_i(',', k, v); } + detail::FieldCaller& field(const std::string& k, int v) { return f_i(',', k, v); } + detail::FieldCaller& field(const std::string& k, long v) { return f_i(',', k, v); } + detail::FieldCaller& field(const std::string& k, uint16_t v) { return f_ui(',', k, v); } + detail::FieldCaller& field(const std::string& k, uint32_t v) { return f_ui(',', k, v); } + detail::FieldCaller& field(const std::string& k, uint64_t v) { return f_ui(',', k, v); } + detail::FieldCaller& field(const std::string& k, long long v) { return f_i(',', k, v); } + detail::FieldCaller& field(const std::string& k, double v, int prec = 2) { return f_f(',', k, v, prec); } + detail::TSCaller& timestamp(uint64_t ts) { return this->ts(ts); } + }; + } + } // namespace influxdb +} // namespace network + +#endif // __INFLUXDB_H__ \ No newline at end of file