Skip to content

Commit

Permalink
[EXPORTER] Optimize OTLP HTTP compression (open-telemetry#3178)
Browse files Browse the repository at this point in the history
  • Loading branch information
chusitoo authored Dec 18, 2024
1 parent 2b9bff9 commit 0b94d71
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 29 deletions.
2 changes: 1 addition & 1 deletion exporters/otlp/src/otlp_http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ OtlpHttpClient::createSession(
// Parse uri and store it to cache
if (http_uri_.empty())
{
auto parse_url = opentelemetry::ext::http::common::UrlParser(std::string(options_.url));
const auto parse_url = opentelemetry::ext::http::common::UrlParser(options_.url);
if (!parse_url.success_)
{
std::string error_message = "[OTLP HTTP Client] Export failed, invalid url: " + options_.url;
Expand Down
128 changes: 104 additions & 24 deletions ext/src/http/client/curl/http_client_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@

#include <curl/curl.h>
#include <curl/curlver.h>
#include <algorithm>
#include <array>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <list>
#include <mutex>
#include <string>
#include <thread>
#include <type_traits>
#include <unordered_map>
#include <unordered_set>
#include <utility>
Expand Down Expand Up @@ -57,11 +60,85 @@ nostd::shared_ptr<HttpCurlGlobalInitializer> HttpCurlGlobalInitializer::GetInsta
return shared_initializer;
}

#ifdef ENABLE_OTLP_COMPRESSION_PREVIEW
// Original source:
// https://stackoverflow.com/questions/12398377/is-it-possible-to-have-zlib-read-from-and-write-to-the-same-memory-buffer/12412863#12412863
int deflateInPlace(z_stream *strm, unsigned char *buf, uint32_t len, uint32_t *max_len)
{
// must be large enough to hold zlib or gzip header (if any) and one more byte -- 11 works for the
// worst case here, but if gzip encoding is used and a deflateSetHeader() call is inserted in this
// code after the deflateReset(), then the 11 needs to be increased to accommodate the resulting
// gzip header size plus one
std::array<unsigned char, 11> temp{};

// kick start the process with a temporary output buffer -- this allows deflate to consume a large
// chunk of input data in order to make room for output data there
strm->next_in = buf;
strm->avail_in = len;
if (*max_len < len)
{
*max_len = len;
}
strm->next_out = temp.data();
strm->avail_out = (std::min)(static_cast<decltype(z_stream::avail_out)>(temp.size()), *max_len);
auto ret = deflate(strm, Z_FINISH);
if (ret == Z_STREAM_ERROR)
{
return ret;
}

// if we can, copy the temporary output data to the consumed portion of the input buffer, and then
// continue to write up to the start of the consumed input for as long as possible
auto have = strm->next_out - temp.data(); // number of bytes in temp[]
if (have <= static_cast<decltype(have)>(strm->avail_in ? len - strm->avail_in : *max_len))
{
std::memcpy(buf, temp.data(), have);
strm->next_out = buf + have;
have = 0;
while (ret == Z_OK)
{
strm->avail_out =
strm->avail_in ? strm->next_in - strm->next_out : (buf + *max_len) - strm->next_out;
ret = deflate(strm, Z_FINISH);
}
if (ret != Z_BUF_ERROR || strm->avail_in == 0)
{
*max_len = strm->next_out - buf;
return ret == Z_STREAM_END ? Z_OK : ret;
}
}

// the output caught up with the input due to insufficiently compressible data -- copy the
// remaining input data into an allocated buffer and complete the compression from there to the
// now empty input buffer (this will only occur for long incompressible streams, more than ~20 MB
// for the default deflate memLevel of 8, or when *max_len is too small and less than the length
// of the header plus one byte)
auto hold = static_cast<std::remove_const_t<decltype(z_stream::next_in)>>(
strm->zalloc(strm->opaque, strm->avail_in, 1)); // allocated buffer to hold input data
if (hold == Z_NULL)
{
return Z_MEM_ERROR;
}
std::memcpy(hold, strm->next_in, strm->avail_in);
strm->next_in = hold;
if (have)
{
std::memcpy(buf, temp.data(), have);
strm->next_out = buf + have;
}
strm->avail_out = (buf + *max_len) - strm->next_out;
ret = deflate(strm, Z_FINISH);
strm->zfree(strm->opaque, hold);
*max_len = strm->next_out - buf;
return ret == Z_OK ? Z_BUF_ERROR : (ret == Z_STREAM_END ? Z_OK : ret);
}
#endif // ENABLE_OTLP_COMPRESSION_PREVIEW

void Session::SendRequest(
std::shared_ptr<opentelemetry::ext::http::client::EventHandler> callback) noexcept
{
is_session_active_.store(true, std::memory_order_release);
std::string url = host_ + std::string(http_request_->uri_);
const auto &url = host_ + http_request_->uri_;
auto callback_ptr = callback.get();
bool reuse_connection = false;

Expand All @@ -76,44 +153,47 @@ void Session::SendRequest(
if (http_request_->compression_ == opentelemetry::ext::http::client::Compression::kGzip)
{
#ifdef ENABLE_OTLP_COMPRESSION_PREVIEW
http_request_->AddHeader("Content-Encoding", "gzip");

opentelemetry::ext::http::client::Body compressed_body(http_request_->body_.size());
z_stream zs;
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;
zs.avail_in = static_cast<uInt>(http_request_->body_.size());
zs.next_in = http_request_->body_.data();
zs.avail_out = static_cast<uInt>(compressed_body.size());
zs.next_out = compressed_body.data();
z_stream zs{};
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;

// ZLIB: Have to maually specify 16 bits for the Gzip headers
const int window_bits = 15 + 16;
static constexpr int kWindowBits = MAX_WBITS + 16;
static constexpr int kMemLevel = MAX_MEM_LEVEL;

int stream =
deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, 8, Z_DEFAULT_STRATEGY);
auto stream = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, kWindowBits, kMemLevel,
Z_DEFAULT_STRATEGY);

if (stream == Z_OK)
{
deflate(&zs, Z_FINISH);
deflateEnd(&zs);
compressed_body.resize(zs.total_out);
http_request_->SetBody(compressed_body);
auto size = static_cast<uInt>(http_request_->body_.size());
auto max_size = size;
stream = deflateInPlace(&zs, http_request_->body_.data(), size, &max_size);

if (stream == Z_OK)
{
http_request_->AddHeader("Content-Encoding", "gzip");
http_request_->body_.resize(max_size);
}
}
else

if (stream != Z_OK)
{
if (callback)
{
callback->OnEvent(opentelemetry::ext::http::client::SessionState::CreateFailed, "");
callback->OnEvent(opentelemetry::ext::http::client::SessionState::CreateFailed,
zs.msg ? zs.msg : "");
}
is_session_active_.store(false, std::memory_order_release);
}

deflateEnd(&zs);
#else
OTEL_INTERNAL_LOG_ERROR(
"[HTTP Client Curl] Set WITH_OTLP_HTTP_COMPRESSION=ON to use gzip compression with the "
"OTLP HTTP Exporter");
#endif
#endif // ENABLE_OTLP_COMPRESSION_PREVIEW
}

curl_operation_.reset(new HttpOperation(
Expand Down Expand Up @@ -226,7 +306,7 @@ HttpClient::~HttpClient()
std::shared_ptr<opentelemetry::ext::http::client::Session> HttpClient::CreateSession(
nostd::string_view url) noexcept
{
auto parsedUrl = common::UrlParser(std::string(url));
const auto parsedUrl = common::UrlParser(std::string(url));
if (!parsedUrl.success_)
{
return std::make_shared<Session>(*this);
Expand Down
4 changes: 1 addition & 3 deletions ext/src/http/client/curl/http_operation_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,7 @@ HttpOperation::HttpOperation(opentelemetry::ext::http::client::Method method,
{
for (auto &kv : this->request_headers_)
{
std::string header = std::string(kv.first);
header += ": ";
header += std::string(kv.second);
const auto header = std::string(kv.first).append(": ").append(kv.second);
curl_resource_.headers_chunk =
curl_slist_append(curl_resource_.headers_chunk, header.c_str());
}
Expand Down
127 changes: 126 additions & 1 deletion ext/test/http/curl_http_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <map>
#include <memory>
#include <mutex>
#include <numeric>
#include <string>
#include <thread>
#include <utility>
Expand Down Expand Up @@ -558,7 +559,6 @@ TEST_F(BasicCurlHttpTests, ElegantQuitQuick)
ASSERT_TRUE(handler->is_called_);
ASSERT_TRUE(handler->got_response_);
}

TEST_F(BasicCurlHttpTests, BackgroundThreadWaitMore)
{
{
Expand All @@ -581,3 +581,128 @@ TEST_F(BasicCurlHttpTests, BackgroundThreadWaitMore)
ASSERT_TRUE(http_client.MaybeSpawnBackgroundThread());
}
}

#ifdef ENABLE_OTLP_COMPRESSION_PREVIEW
struct GzipEventHandler : public CustomEventHandler
{
~GzipEventHandler() override = default;

void OnResponse(http_client::Response & /* response */) noexcept override {}

void OnEvent(http_client::SessionState state, nostd::string_view reason) noexcept override
{
is_called_ = true;
state_ = state;
reason_ = std::string{reason};
}

bool is_called_ = false;
http_client::SessionState state_ = static_cast<http_client::SessionState>(-1);
std::string reason_;
};

TEST_F(BasicCurlHttpTests, GzipCompressibleData)
{
received_requests_.clear();
auto session_manager = http_client::HttpClientFactory::Create();
EXPECT_TRUE(session_manager != nullptr);

auto session = session_manager->CreateSession("http://127.0.0.1:19000");
auto request = session->CreateRequest();
request->SetUri("post/");
request->SetMethod(http_client::Method::Post);

const auto original_size = 500UL;
http_client::Body body(original_size);
std::iota(body.begin(), body.end(), 0);
request->SetBody(body);
request->AddHeader("Content-Type", "text/plain");
request->SetCompression(opentelemetry::ext::http::client::Compression::kGzip);
auto handler = std::make_shared<GzipEventHandler>();
session->SendRequest(handler);
ASSERT_TRUE(waitForRequests(30, 1));
session->FinishSession();
ASSERT_TRUE(handler->is_called_);
ASSERT_EQ(handler->state_, http_client::SessionState::Response);
ASSERT_TRUE(handler->reason_.empty());

auto http_request =
dynamic_cast<opentelemetry::ext::http::client::curl::Request *>(request.get());
ASSERT_TRUE(http_request != nullptr);
ASSERT_LT(http_request->body_.size(), original_size);

session_manager->CancelAllSessions();
session_manager->FinishAllSessions();
}

TEST_F(BasicCurlHttpTests, GzipIncompressibleData)
{
received_requests_.clear();
auto session_manager = http_client::HttpClientFactory::Create();
EXPECT_TRUE(session_manager != nullptr);

auto session = session_manager->CreateSession("http://127.0.0.1:19000");
auto request = session->CreateRequest();
request->SetUri("post/");
request->SetMethod(http_client::Method::Post);

// Random data generated using code snippet below.
// const auto original_size = 500UL;
// http_client::Body body(original_size);
// std::random_device rd;
// std::mt19937 gen(rd());
// std::uniform_int_distribution<> uid(1, 255);
// std::generate(body.begin(), body.end(), [&]() { return uid(gen); });

// The input values are fixed to make the test repeatable in the event that some distributions
// might yield results that are, in fact, compressible.
http_client::Body body = {
140, 198, 12, 56, 165, 185, 173, 20, 13, 83, 127, 223, 77, 38, 224, 43, 236, 10, 178,
75, 169, 157, 136, 199, 74, 30, 148, 195, 51, 30, 225, 21, 121, 219, 7, 155, 198, 121,
205, 102, 80, 38, 132, 202, 45, 229, 206, 90, 150, 202, 53, 221, 54, 37, 172, 90, 238,
248, 191, 240, 109, 227, 248, 41, 251, 121, 35, 226, 107, 122, 15, 242, 203, 45, 64, 195,
186, 23, 1, 158, 61, 196, 182, 26, 201, 47, 211, 241, 251, 209, 255, 170, 181, 192, 89,
133, 176, 60, 178, 97, 168, 223, 152, 9, 118, 98, 169, 240, 170, 15, 13, 161, 24, 57,
123, 117, 230, 30, 244, 117, 238, 255, 198, 232, 95, 148, 37, 61, 67, 103, 31, 240, 52,
21, 145, 175, 201, 86, 19, 61, 228, 76, 131, 185, 111, 149, 203, 143, 16, 142, 95, 173,
42, 106, 39, 203, 116, 235, 20, 162, 112, 173, 112, 70, 126, 191, 210, 219, 90, 145, 126,
118, 43, 241, 101, 66, 175, 179, 5, 233, 208, 164, 180, 83, 214, 194, 173, 29, 179, 149,
75, 202, 17, 152, 139, 130, 94, 247, 142, 249, 159, 224, 205, 131, 93, 82, 186, 226, 210,
84, 17, 212, 155, 61, 226, 103, 152, 37, 3, 193, 216, 219, 203, 101, 99, 33, 59, 38,
106, 62, 232, 127, 44, 125, 90, 169, 148, 238, 34, 106, 12, 221, 90, 173, 67, 122, 232,
161, 89, 198, 43, 241, 195, 248, 219, 35, 47, 200, 11, 227, 168, 246, 243, 103, 38, 17,
203, 237, 203, 158, 204, 89, 231, 19, 24, 25, 199, 160, 233, 43, 117, 144, 196, 117, 152,
42, 121, 189, 217, 202, 221, 250, 157, 237, 47, 29, 64, 32, 10, 32, 243, 28, 114, 158,
228, 102, 36, 191, 139, 217, 161, 162, 186, 19, 141, 212, 49, 1, 239, 153, 107, 249, 31,
235, 138, 73, 80, 58, 152, 15, 149, 50, 42, 84, 75, 95, 82, 56, 86, 143, 45, 214,
11, 184, 164, 181, 249, 74, 184, 26, 207, 165, 162, 240, 154, 90, 56, 175, 72, 4, 166,
188, 78, 232, 87, 243, 50, 59, 62, 175, 213, 210, 182, 31, 123, 91, 118, 98, 249, 23,
170, 240, 228, 236, 121, 87, 132, 129, 250, 41, 227, 204, 250, 147, 145, 109, 149, 210, 21,
174, 165, 127, 234, 64, 211, 52, 93, 126, 117, 231, 216, 210, 15, 16, 2, 167, 215, 178,
104, 245, 119, 211, 235, 120, 135, 202, 117, 150, 101, 94, 201, 136, 179, 205, 167, 212, 236,
7, 178, 132, 228, 65, 230, 90, 171, 109, 31, 83, 31, 210, 123, 136, 76, 186, 81, 205,
63, 35, 21, 121, 152, 22, 242, 199, 106, 217, 199, 211, 206, 165, 88, 77, 112, 108, 193,
122, 8, 193, 74, 91, 50, 6, 156, 185, 165, 15, 92, 116, 3, 18, 244, 165, 191, 2,
183, 9, 164, 116, 75, 127};
const auto original_size = body.size();

request->SetBody(body);
request->AddHeader("Content-Type", "text/plain");
request->SetCompression(opentelemetry::ext::http::client::Compression::kGzip);
auto handler = std::make_shared<GzipEventHandler>();
session->SendRequest(handler);
ASSERT_TRUE(waitForRequests(30, 1));
session->FinishSession();
ASSERT_TRUE(handler->is_called_);
ASSERT_EQ(handler->state_, http_client::SessionState::Response);
ASSERT_TRUE(handler->reason_.empty());

auto http_request =
dynamic_cast<opentelemetry::ext::http::client::curl::Request *>(request.get());
ASSERT_TRUE(http_request != nullptr);
ASSERT_EQ(http_request->body_.size(), original_size);

session_manager->CancelAllSessions();
session_manager->FinishAllSessions();
}
#endif // ENABLE_OTLP_COMPRESSION_PREVIEW

0 comments on commit 0b94d71

Please sign in to comment.