enhance InfluxDB implementation: fix bad handling on Windows; fix bad use of free() for address info freeaddrinfo(); fix partial data writes for bigger payloads; enhance getting responses back from InfluxDB (this makes it so that InfluxDB cannot fail silently easily);

r05a04_dev
Bryan Biedenkapp 2 months ago
parent 463bdee4bf
commit b5d480ed24

@ -79,9 +79,8 @@ int detail::inner::request(const char* method, const char* uri, const std::strin
#else
::LogError(LOG_HOST, "Failed to connect to InfluxDB server, err: %d (%s)", errno, strerror(errno));
#endif // defined(_WIN32)
closesocket(fd);
if (addr != nullptr)
free(addr);
freeaddrinfo(addr);
return 1;
}
@ -92,7 +91,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin
::LogError(LOG_HOST, "Failed to connect to InfluxDB server, err: %lu", ::GetLastError());
closesocket(fd);
if (addr != nullptr)
free(addr);
freeaddrinfo(addr);
return 1;
}
#else
@ -100,7 +99,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin
::LogError(LOG_HOST, "Failed to connect to InfluxDB server, err: %d (%s)", errno, strerror(errno));
closesocket(fd);
if (addr != nullptr)
free(addr);
freeaddrinfo(addr);
return 1;
}
#endif // defined(_WIN32)
@ -112,7 +111,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin
::LogError(LOG_HOST, "Failed to connect to InfluxDB server, failed ioctlsocket, err: %lu", ::GetLastError());
closesocket(fd);
if (addr != nullptr)
free(addr);
freeaddrinfo(addr);
return 1;
}
#else
@ -121,7 +120,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin
::LogError(LOG_HOST, "Failed to connect to InfluxDB server, failed fcntl(F_GETFL), err: %d (%s)", errno, strerror(errno));
closesocket(fd);
if (addr != nullptr)
free(addr);
freeaddrinfo(addr);
return 1;
}
@ -129,7 +128,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin
::LogError(LOG_HOST, "Failed to connect to InfluxDB server, failed fcntl(F_SETFL), err: %d (%s)", errno, strerror(errno));
closesocket(fd);
if (addr != nullptr)
free(addr);
freeaddrinfo(addr);
return 1;
}
#endif // defined(_WIN32)
@ -141,7 +140,11 @@ int detail::inner::request(const char* method, const char* uri, const std::strin
uint8_t retryCnt = 0U;
ret = connect(fd, addr->ai_addr, addr->ai_addrlen);
if (ret < 0) {
#if defined(_WIN32)
if (WSAGetLastError() == WSAEWOULDBLOCK) {
#else
if (errno == EINPROGRESS) {
#endif
do {
tv.tv_sec = SOCK_CONNECT_TIMEOUT;
tv.tv_usec = 0;
@ -157,7 +160,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin
::LogError(LOG_HOST, "Failed to connect to InfluxDB server, timed out while connecting");
closesocket(fd);
if (addr != nullptr)
free(addr);
freeaddrinfo(addr);
return 1;
}
@ -172,7 +175,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin
#endif // defined(_WIN32)
closesocket(fd);
if (addr != nullptr)
free(addr);
freeaddrinfo(addr);
return 1;
} else if (ret > 0) {
#if !defined(_WIN32)
@ -184,7 +187,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin
::LogError(LOG_HOST, "Failed to connect to InfluxDB server, err: %d (%s)", errno, strerror(errno));
closesocket(fd);
if (addr != nullptr)
free(addr);
freeaddrinfo(addr);
return 1;
}
@ -192,7 +195,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin
::LogError(LOG_HOST, "Failed to connect to InfluxDB server, err: %d", valopt);
closesocket(fd);
if (addr != nullptr)
free(addr);
freeaddrinfo(addr);
return 1;
}
#endif // !defined(_WIN32)
@ -201,7 +204,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin
::LogError(LOG_HOST, "Failed to connect to InfluxDB server, timed out while connecting");
closesocket(fd);
if (addr != nullptr)
free(addr);
freeaddrinfo(addr);
return 1;
}
} while (true);
@ -215,7 +218,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin
::LogError(LOG_HOST, "Failed to connect to InfluxDB server, failed ioctlsocket, err: %lu", ::GetLastError());
closesocket(fd);
if (addr != nullptr)
free(addr);
freeaddrinfo(addr);
return 1;
}
#else
@ -224,7 +227,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin
::LogError(LOG_HOST, "Failed to connect to InfluxDB server, failed fcntl(F_GETFL), err: %d (%s)", errno, strerror(errno));
closesocket(fd);
if (addr != nullptr)
free(addr);
freeaddrinfo(addr);
return 1;
}
@ -232,7 +235,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin
::LogError(LOG_HOST, "Failed to connect to InfluxDB server, failed fcntl(F_SETFL), err: %d (%s)", errno, strerror(errno));
closesocket(fd);
if (addr != nullptr)
free(addr);
freeaddrinfo(addr);
return 1;
}
#endif // defined(_WIN32)
@ -249,16 +252,22 @@ int detail::inner::request(const char* method, const char* uri, const std::strin
setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
#endif // defined(_WIN32)
// URL encode org and bucket parameters to handle special characters
std::string encodedOrg;
std::string encodedBucket;
detail::inner::urlEncode(encodedOrg, si.org());
detail::inner::urlEncode(encodedBucket, si.bucket());
header.resize(len = 0x100);
while (true) {
if (!si.token().empty()) {
iv[0].iov_len = snprintf(&header[0], len,
"%s /api/v2/%s?org=%s&bucket=%s%s HTTP/1.1\r\nHost: %s\r\nConnection: close\r\nAuthorization: Token %s\r\nContent-Type: text/plain; charset=utf-8\r\nContent-Length: %d\r\n\r\n",
method, uri, si.org().c_str(), si.bucket().c_str(), queryString.c_str(), si.host().c_str(), si.token().c_str(), (int)body.length());
method, uri, encodedOrg.c_str(), encodedBucket.c_str(), queryString.c_str(), si.host().c_str(), si.token().c_str(), (int)body.length());
} else {
iv[0].iov_len = snprintf(&header[0], len,
"%s /api/v2/%s?org=%s&bucket=%s%s HTTP/1.1\r\nHost: %s\r\nConnection: close\r\nContent-Type: text/plain; charset=utf-8\r\nContent-Length: %d\r\n\r\n",
method, uri, si.org().c_str(), si.bucket().c_str(), queryString.c_str(), si.host().c_str(), (int)body.length());
method, uri, encodedOrg.c_str(), encodedBucket.c_str(), queryString.c_str(), si.host().c_str(), (int)body.length());
}
#ifdef INFLUX_DEBUG
LogDebug(LOG_HOST, "InfluxDB Request: %s\n%s", &header[0], body.c_str());
@ -275,9 +284,102 @@ int detail::inner::request(const char* method, const char* uri, const std::strin
ret = 0;
if (writev(fd, iv, 2) < (int)(iv[0].iov_len + iv[1].iov_len)) {
::LogError(LOG_HOST, "Failed to write statistical data to InfluxDB server, err: %d (%s)", errno, strerror(errno));
ret = -6;
// handle partial writes by looping until all data is sent
size_t totalToWrite = iv[0].iov_len + iv[1].iov_len;
size_t totalWritten = 0;
int iovIndex = 0;
while (totalWritten < totalToWrite) {
ssize_t bytesWritten = writev(fd, &iv[iovIndex], 2 - iovIndex);
if (bytesWritten < 0) {
#if !defined(_WIN32)
if (errno == EINTR) {
// interrupted by signal, retry
continue;
}
#endif
#if defined(_WIN32)
::LogError(LOG_HOST, "Failed to write statistical data to InfluxDB server, err: %lu", ::GetLastError());
#else
::LogError(LOG_HOST, "Failed to write statistical data to InfluxDB server, err: %d (%s)", errno, strerror(errno));
#endif
ret = -6;
break;
}
if (bytesWritten == 0) {
::LogError(LOG_HOST, "Failed to write statistical data to InfluxDB server, connection closed");
ret = -6;
break;
}
totalWritten += bytesWritten;
// adjust iovec for partial writes
size_t remaining = bytesWritten;
while (remaining > 0 && iovIndex < 2) {
if (remaining >= iv[iovIndex].iov_len) {
remaining -= iv[iovIndex].iov_len;
iovIndex++;
} else {
iv[iovIndex].iov_base = (char*)iv[iovIndex].iov_base + remaining;
iv[iovIndex].iov_len -= remaining;
remaining = 0;
}
}
}
// read and validate HTTP response
if (ret == 0) {
char response[2048];
::memset(response, 0x00U, sizeof(response));
ssize_t bytesRead = recv(fd, response, sizeof(response) - 1, 0);
if (bytesRead > 0) {
response[bytesRead] = '\0';
#ifdef INFLUX_DEBUG
LogDebug(LOG_HOST, "InfluxDB Response: %s", response);
#endif
// check for successful HTTP status codes
// InfluxDB v2 returns 204 No Content on successful write
if (strstr(response, "HTTP/1.1 204") == nullptr &&
strstr(response, "HTTP/1.0 204") == nullptr &&
strstr(response, "HTTP/1.1 200") == nullptr &&
strstr(response, "HTTP/1.0 200") == nullptr) {
// extract status line for logging
char statusLine[256];
::memset(statusLine, 0x00U, sizeof(statusLine));
const char* lineEnd = strstr(response, "\r\n");
if (lineEnd != nullptr) {
size_t lineLen = std::min((size_t)(lineEnd - response), sizeof(statusLine) - 1);
::memcpy(statusLine, response, lineLen);
statusLine[lineLen] = '\0';
::LogError(LOG_HOST, "InfluxDB returned error: %s", statusLine);
} else {
::LogError(LOG_HOST, "InfluxDB returned non-success response");
}
ret = -7;
}
} else if (bytesRead < 0) {
#if defined(_WIN32)
int wsaError = WSAGetLastError();
if (wsaError != WSAEWOULDBLOCK && wsaError != WSAETIMEDOUT) {
::LogError(LOG_HOST, "Failed to read response from InfluxDB server, err: %d", wsaError);
ret = -8;
}
#else
if (errno != EAGAIN && errno != EWOULDBLOCK) {
::LogError(LOG_HOST, "Failed to read response from InfluxDB server, err: %d (%s)", errno, strerror(errno));
ret = -8;
}
#endif
// EAGAIN/EWOULDBLOCK/WSAETIMEDOUT with socket timeout is acceptable (server closed connection after write)
}
// bytesRead == 0 means connection closed gracefully, which is acceptable
}
// set SO_LINGER option
@ -292,6 +394,6 @@ int detail::inner::request(const char* method, const char* uri, const std::strin
// close socket
closesocket(fd);
if (addr != nullptr)
free(addr);
freeaddrinfo(addr);
return ret;
}

@ -41,8 +41,44 @@
typedef struct iovec { void* iov_base; size_t iov_len; } iovec;
inline __int64 writev(int sock, struct iovec* iov, int cnt) {
__int64 r = send(sock, (const char*)iov->iov_base, iov->iov_len, 0);
return (r < 0 || cnt == 1) ? r : r + writev(sock, iov + 1, cnt - 1);
if (cnt <= 0 || iov == nullptr)
return -1;
__int64 totalWritten = 0;
// iterate through all iovec entries
for (int i = 0; i < cnt; i++) {
if (iov[i].iov_len == 0)
continue;
size_t remaining = iov[i].iov_len;
char* ptr = (char*)iov[i].iov_base;
// handle partial writes for this iovec entry
while (remaining > 0) {
int bytesWritten = send(sock, ptr, (int)remaining, 0);
if (bytesWritten < 0) {
// error occurred
if (totalWritten > 0) {
// return bytes written so far
return totalWritten;
}
return -1;
}
if (bytesWritten == 0) {
// connection closed
return totalWritten;
}
totalWritten += bytesWritten;
remaining -= bytesWritten;
ptr += bytesWritten;
}
}
return totalWritten;
}
#else
#include <unistd.h>

Loading…
Cancel
Save

Powered by TurnKey Linux.