diff --git a/src/fne/network/influxdb/InfluxDB.cpp b/src/fne/network/influxdb/InfluxDB.cpp index 0cf1857d..64fd1700 100644 --- a/src/fne/network/influxdb/InfluxDB.cpp +++ b/src/fne/network/influxdb/InfluxDB.cpp @@ -79,9 +79,8 @@ int detail::inner::request(const char* method, const char* uri, const std::strin #else ::LogError(LOG_HOST, "Failed to connect to InfluxDB server, err: %d (%s)", errno, strerror(errno)); #endif // defined(_WIN32) - closesocket(fd); if (addr != nullptr) - free(addr); + freeaddrinfo(addr); return 1; } @@ -92,7 +91,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin ::LogError(LOG_HOST, "Failed to connect to InfluxDB server, err: %lu", ::GetLastError()); closesocket(fd); if (addr != nullptr) - free(addr); + freeaddrinfo(addr); return 1; } #else @@ -100,7 +99,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin ::LogError(LOG_HOST, "Failed to connect to InfluxDB server, err: %d (%s)", errno, strerror(errno)); closesocket(fd); if (addr != nullptr) - free(addr); + freeaddrinfo(addr); return 1; } #endif // defined(_WIN32) @@ -112,7 +111,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin ::LogError(LOG_HOST, "Failed to connect to InfluxDB server, failed ioctlsocket, err: %lu", ::GetLastError()); closesocket(fd); if (addr != nullptr) - free(addr); + freeaddrinfo(addr); return 1; } #else @@ -121,7 +120,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin ::LogError(LOG_HOST, "Failed to connect to InfluxDB server, failed fcntl(F_GETFL), err: %d (%s)", errno, strerror(errno)); closesocket(fd); if (addr != nullptr) - free(addr); + freeaddrinfo(addr); return 1; } @@ -129,7 +128,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin ::LogError(LOG_HOST, "Failed to connect to InfluxDB server, failed fcntl(F_SETFL), err: %d (%s)", errno, strerror(errno)); closesocket(fd); if (addr != nullptr) - free(addr); + freeaddrinfo(addr); return 1; } #endif // defined(_WIN32) @@ -141,7 +140,11 @@ int detail::inner::request(const char* method, const char* uri, const std::strin uint8_t retryCnt = 0U; ret = connect(fd, addr->ai_addr, addr->ai_addrlen); if (ret < 0) { +#if defined(_WIN32) + if (WSAGetLastError() == WSAEWOULDBLOCK) { +#else if (errno == EINPROGRESS) { +#endif do { tv.tv_sec = SOCK_CONNECT_TIMEOUT; tv.tv_usec = 0; @@ -157,7 +160,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin ::LogError(LOG_HOST, "Failed to connect to InfluxDB server, timed out while connecting"); closesocket(fd); if (addr != nullptr) - free(addr); + freeaddrinfo(addr); return 1; } @@ -172,7 +175,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin #endif // defined(_WIN32) closesocket(fd); if (addr != nullptr) - free(addr); + freeaddrinfo(addr); return 1; } else if (ret > 0) { #if !defined(_WIN32) @@ -184,7 +187,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin ::LogError(LOG_HOST, "Failed to connect to InfluxDB server, err: %d (%s)", errno, strerror(errno)); closesocket(fd); if (addr != nullptr) - free(addr); + freeaddrinfo(addr); return 1; } @@ -192,7 +195,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin ::LogError(LOG_HOST, "Failed to connect to InfluxDB server, err: %d", valopt); closesocket(fd); if (addr != nullptr) - free(addr); + freeaddrinfo(addr); return 1; } #endif // !defined(_WIN32) @@ -201,7 +204,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin ::LogError(LOG_HOST, "Failed to connect to InfluxDB server, timed out while connecting"); closesocket(fd); if (addr != nullptr) - free(addr); + freeaddrinfo(addr); return 1; } } while (true); @@ -215,7 +218,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin ::LogError(LOG_HOST, "Failed to connect to InfluxDB server, failed ioctlsocket, err: %lu", ::GetLastError()); closesocket(fd); if (addr != nullptr) - free(addr); + freeaddrinfo(addr); return 1; } #else @@ -224,7 +227,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin ::LogError(LOG_HOST, "Failed to connect to InfluxDB server, failed fcntl(F_GETFL), err: %d (%s)", errno, strerror(errno)); closesocket(fd); if (addr != nullptr) - free(addr); + freeaddrinfo(addr); return 1; } @@ -232,7 +235,7 @@ int detail::inner::request(const char* method, const char* uri, const std::strin ::LogError(LOG_HOST, "Failed to connect to InfluxDB server, failed fcntl(F_SETFL), err: %d (%s)", errno, strerror(errno)); closesocket(fd); if (addr != nullptr) - free(addr); + freeaddrinfo(addr); return 1; } #endif // defined(_WIN32) @@ -249,16 +252,22 @@ int detail::inner::request(const char* method, const char* uri, const std::strin setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); #endif // defined(_WIN32) + // URL encode org and bucket parameters to handle special characters + std::string encodedOrg; + std::string encodedBucket; + detail::inner::urlEncode(encodedOrg, si.org()); + detail::inner::urlEncode(encodedBucket, si.bucket()); + header.resize(len = 0x100); while (true) { if (!si.token().empty()) { iv[0].iov_len = snprintf(&header[0], len, "%s /api/v2/%s?org=%s&bucket=%s%s HTTP/1.1\r\nHost: %s\r\nConnection: close\r\nAuthorization: Token %s\r\nContent-Type: text/plain; charset=utf-8\r\nContent-Length: %d\r\n\r\n", - method, uri, si.org().c_str(), si.bucket().c_str(), queryString.c_str(), si.host().c_str(), si.token().c_str(), (int)body.length()); + method, uri, encodedOrg.c_str(), encodedBucket.c_str(), queryString.c_str(), si.host().c_str(), si.token().c_str(), (int)body.length()); } else { iv[0].iov_len = snprintf(&header[0], len, "%s /api/v2/%s?org=%s&bucket=%s%s HTTP/1.1\r\nHost: %s\r\nConnection: close\r\nContent-Type: text/plain; charset=utf-8\r\nContent-Length: %d\r\n\r\n", - method, uri, si.org().c_str(), si.bucket().c_str(), queryString.c_str(), si.host().c_str(), (int)body.length()); + method, uri, encodedOrg.c_str(), encodedBucket.c_str(), queryString.c_str(), si.host().c_str(), (int)body.length()); } #ifdef INFLUX_DEBUG LogDebug(LOG_HOST, "InfluxDB Request: %s\n%s", &header[0], body.c_str()); @@ -275,9 +284,102 @@ int detail::inner::request(const char* method, const char* uri, const std::strin ret = 0; - if (writev(fd, iv, 2) < (int)(iv[0].iov_len + iv[1].iov_len)) { - ::LogError(LOG_HOST, "Failed to write statistical data to InfluxDB server, err: %d (%s)", errno, strerror(errno)); - ret = -6; + // handle partial writes by looping until all data is sent + size_t totalToWrite = iv[0].iov_len + iv[1].iov_len; + size_t totalWritten = 0; + int iovIndex = 0; + + while (totalWritten < totalToWrite) { + ssize_t bytesWritten = writev(fd, &iv[iovIndex], 2 - iovIndex); + + if (bytesWritten < 0) { +#if !defined(_WIN32) + if (errno == EINTR) { + // interrupted by signal, retry + continue; + } +#endif +#if defined(_WIN32) + ::LogError(LOG_HOST, "Failed to write statistical data to InfluxDB server, err: %lu", ::GetLastError()); +#else + ::LogError(LOG_HOST, "Failed to write statistical data to InfluxDB server, err: %d (%s)", errno, strerror(errno)); +#endif + ret = -6; + break; + } + + if (bytesWritten == 0) { + ::LogError(LOG_HOST, "Failed to write statistical data to InfluxDB server, connection closed"); + ret = -6; + break; + } + + totalWritten += bytesWritten; + + // adjust iovec for partial writes + size_t remaining = bytesWritten; + while (remaining > 0 && iovIndex < 2) { + if (remaining >= iv[iovIndex].iov_len) { + remaining -= iv[iovIndex].iov_len; + iovIndex++; + } else { + iv[iovIndex].iov_base = (char*)iv[iovIndex].iov_base + remaining; + iv[iovIndex].iov_len -= remaining; + remaining = 0; + } + } + } + + // read and validate HTTP response + if (ret == 0) { + char response[2048]; + ::memset(response, 0x00U, sizeof(response)); + + ssize_t bytesRead = recv(fd, response, sizeof(response) - 1, 0); + if (bytesRead > 0) { + response[bytesRead] = '\0'; + +#ifdef INFLUX_DEBUG + LogDebug(LOG_HOST, "InfluxDB Response: %s", response); +#endif + + // check for successful HTTP status codes + // InfluxDB v2 returns 204 No Content on successful write + if (strstr(response, "HTTP/1.1 204") == nullptr && + strstr(response, "HTTP/1.0 204") == nullptr && + strstr(response, "HTTP/1.1 200") == nullptr && + strstr(response, "HTTP/1.0 200") == nullptr) { + + // extract status line for logging + char statusLine[256]; + ::memset(statusLine, 0x00U, sizeof(statusLine)); + const char* lineEnd = strstr(response, "\r\n"); + if (lineEnd != nullptr) { + size_t lineLen = std::min((size_t)(lineEnd - response), sizeof(statusLine) - 1); + ::memcpy(statusLine, response, lineLen); + statusLine[lineLen] = '\0'; + ::LogError(LOG_HOST, "InfluxDB returned error: %s", statusLine); + } else { + ::LogError(LOG_HOST, "InfluxDB returned non-success response"); + } + ret = -7; + } + } else if (bytesRead < 0) { +#if defined(_WIN32) + int wsaError = WSAGetLastError(); + if (wsaError != WSAEWOULDBLOCK && wsaError != WSAETIMEDOUT) { + ::LogError(LOG_HOST, "Failed to read response from InfluxDB server, err: %d", wsaError); + ret = -8; + } +#else + if (errno != EAGAIN && errno != EWOULDBLOCK) { + ::LogError(LOG_HOST, "Failed to read response from InfluxDB server, err: %d (%s)", errno, strerror(errno)); + ret = -8; + } +#endif + // EAGAIN/EWOULDBLOCK/WSAETIMEDOUT with socket timeout is acceptable (server closed connection after write) + } + // bytesRead == 0 means connection closed gracefully, which is acceptable } // set SO_LINGER option @@ -292,6 +394,6 @@ int detail::inner::request(const char* method, const char* uri, const std::strin // close socket closesocket(fd); if (addr != nullptr) - free(addr); + freeaddrinfo(addr); return ret; } diff --git a/src/fne/network/influxdb/InfluxDB.h b/src/fne/network/influxdb/InfluxDB.h index 1e9ea955..1d40ee17 100644 --- a/src/fne/network/influxdb/InfluxDB.h +++ b/src/fne/network/influxdb/InfluxDB.h @@ -41,8 +41,44 @@ typedef struct iovec { void* iov_base; size_t iov_len; } iovec; inline __int64 writev(int sock, struct iovec* iov, int cnt) { - __int64 r = send(sock, (const char*)iov->iov_base, iov->iov_len, 0); - return (r < 0 || cnt == 1) ? r : r + writev(sock, iov + 1, cnt - 1); + if (cnt <= 0 || iov == nullptr) + return -1; + + __int64 totalWritten = 0; + + // iterate through all iovec entries + for (int i = 0; i < cnt; i++) { + if (iov[i].iov_len == 0) + continue; + + size_t remaining = iov[i].iov_len; + char* ptr = (char*)iov[i].iov_base; + + // handle partial writes for this iovec entry + while (remaining > 0) { + int bytesWritten = send(sock, ptr, (int)remaining, 0); + + if (bytesWritten < 0) { + // error occurred + if (totalWritten > 0) { + // return bytes written so far + return totalWritten; + } + return -1; + } + + if (bytesWritten == 0) { + // connection closed + return totalWritten; + } + + totalWritten += bytesWritten; + remaining -= bytesWritten; + ptr += bytesWritten; + } + } + + return totalWritten; } #else #include