Skip to content

Commit

Permalink
[SDK] Do not frequently create and destroy http client threads (open-…
Browse files Browse the repository at this point in the history
  • Loading branch information
xiehuc authored Dec 18, 2024
1 parent d9ad23e commit 2b9bff9
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 16 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ Increment the:

## [Unreleased]

* [SDK] Do not frequently create and destroy http client threads
[#3198](https://github.com/open-telemetry/opentelemetry-cpp/pull/3198)

## [1.18 2024-11-25]

* [EXPORTER] Fix crash in ElasticsearchLogRecordExporter
Expand Down
20 changes: 7 additions & 13 deletions ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,25 +322,16 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient

inline CURLM *GetMultiHandle() noexcept { return multi_handle_; }

void MaybeSpawnBackgroundThread();
// return true if create background thread, false is already exist background thread
bool MaybeSpawnBackgroundThread();

void ScheduleAddSession(uint64_t session_id);
void ScheduleAbortSession(uint64_t session_id);
void ScheduleRemoveSession(uint64_t session_id, HttpCurlEasyResource &&resource);

void WaitBackgroundThreadExit()
{
std::unique_ptr<std::thread> background_thread;
{
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
background_thread.swap(background_thread_);
}
void SetBackgroundWaitFor(std::chrono::milliseconds ms);

if (background_thread && background_thread->joinable())
{
background_thread->join();
}
}
void WaitBackgroundThreadExit();

private:
void wakeupBackgroundThread();
Expand All @@ -366,6 +357,9 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient
std::unique_ptr<std::thread> background_thread_;
std::chrono::milliseconds scheduled_delay_milliseconds_;

std::chrono::milliseconds background_thread_wait_for_;
std::atomic<bool> is_shutdown_{false};

nostd::shared_ptr<HttpCurlGlobalInitializer> curl_global_initializer_;
};

Expand Down
62 changes: 59 additions & 3 deletions ext/src/http/client/curl/http_client_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,13 @@ HttpClient::HttpClient()
next_session_id_{0},
max_sessions_per_connection_{8},
scheduled_delay_milliseconds_{std::chrono::milliseconds(256)},
background_thread_wait_for_{std::chrono::minutes{1}},
curl_global_initializer_(HttpCurlGlobalInitializer::GetInstance())
{}

HttpClient::~HttpClient()
{
is_shutdown_.store(true, std::memory_order_release);
while (true)
{
std::unique_ptr<std::thread> background_thread;
Expand All @@ -211,6 +213,7 @@ HttpClient::~HttpClient()
}
if (background_thread->joinable())
{
wakeupBackgroundThread(); // if delay quit, wake up first
background_thread->join();
}
}
Expand Down Expand Up @@ -335,29 +338,33 @@ void HttpClient::CleanupSession(uint64_t session_id)
}
}

void HttpClient::MaybeSpawnBackgroundThread()
bool HttpClient::MaybeSpawnBackgroundThread()
{
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
if (background_thread_)
{
return;
return false;
}

background_thread_.reset(new std::thread(
[](HttpClient *self) {
int still_running = 1;
std::chrono::system_clock::time_point last_free_job_timepoint =
std::chrono::system_clock::now();
bool need_wait_more = false;
while (true)
{
CURLMsg *msg;
int queued;
CURLMcode mc = curl_multi_perform(self->multi_handle_, &still_running);

// According to https://curl.se/libcurl/c/curl_multi_perform.html, when mc is not OK, we
// can not curl_multi_perform it again
if (mc != CURLM_OK)
{
self->resetMultiHandle();
}
else if (still_running)
else if (still_running || need_wait_more)
{
// curl_multi_poll is added from libcurl 7.66.0, before 7.68.0, we can only wait util
// timeout to do the rest jobs
Expand Down Expand Up @@ -416,6 +423,32 @@ void HttpClient::MaybeSpawnBackgroundThread()
still_running = 1;
}

std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
if (still_running > 0)
{
last_free_job_timepoint = now;
need_wait_more = false;
continue;
}

std::chrono::milliseconds wait_for = std::chrono::milliseconds::zero();

#if LIBCURL_VERSION_NUM >= 0x074400
// only available with curl_multi_poll+curl_multi_wakeup, because curl_multi_wait would
// cause CPU busy, curl_multi_wait+sleep could not wakeup quickly
wait_for = self->background_thread_wait_for_;
#endif
if (self->is_shutdown_.load(std::memory_order_acquire))
{
wait_for = std::chrono::milliseconds::zero();
}

if (now - last_free_job_timepoint < wait_for)
{
need_wait_more = true;
continue;
}

if (still_running == 0)
{
std::lock_guard<std::mutex> lock_guard{self->background_thread_m_};
Expand Down Expand Up @@ -454,6 +487,7 @@ void HttpClient::MaybeSpawnBackgroundThread()
}
},
this));
return true;
}

void HttpClient::ScheduleAddSession(uint64_t session_id)
Expand Down Expand Up @@ -502,6 +536,28 @@ void HttpClient::ScheduleRemoveSession(uint64_t session_id, HttpCurlEasyResource
wakeupBackgroundThread();
}

void HttpClient::SetBackgroundWaitFor(std::chrono::milliseconds ms)
{
background_thread_wait_for_ = ms;
}

void HttpClient::WaitBackgroundThreadExit()
{
is_shutdown_.store(true, std::memory_order_release);
std::unique_ptr<std::thread> background_thread;
{
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
background_thread.swap(background_thread_);
}

if (background_thread && background_thread->joinable())
{
wakeupBackgroundThread();
background_thread->join();
}
is_shutdown_.store(false, std::memory_order_release);
}

void HttpClient::wakeupBackgroundThread()
{
// Before libcurl 7.68.0, we can only wait for timeout and do the rest jobs
Expand Down
50 changes: 50 additions & 0 deletions ext/test/http/curl_http_test.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#include <curl/curlver.h>
#include <gtest/gtest.h>
#include <string.h>
#include <atomic>
Expand All @@ -11,6 +12,7 @@
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -531,3 +533,51 @@ TEST_F(BasicCurlHttpTests, FinishInAsyncCallback)
}
}
}

TEST_F(BasicCurlHttpTests, ElegantQuitQuick)
{
auto http_client = http_client::HttpClientFactory::Create();
std::static_pointer_cast<curl::HttpClient>(http_client)->MaybeSpawnBackgroundThread();
// start background first, then test it could wakeup
auto session = http_client->CreateSession("http://127.0.0.1:19000/get/");
auto request = session->CreateRequest();
request->SetUri("get/");
auto handler = std::make_shared<GetEventHandler>();
session->SendRequest(handler);
std::this_thread::sleep_for(std::chrono::milliseconds{10}); // let it enter poll state
auto beg = std::chrono::system_clock::now();
http_client->FinishAllSessions();
http_client.reset();
// when background_thread_wait_for_ is used, it should have no side effect on elegant quit
// wait should be less than scheduled_delay_milliseconds_
// Due to load on CI hosts (some take 10ms), we assert it is less than 20ms
auto cost = std::chrono::system_clock::now() - beg;
ASSERT_TRUE(cost < std::chrono::milliseconds{20})
<< "cost ms: " << std::chrono::duration_cast<std::chrono::milliseconds>(cost).count()
<< " libcurl version: 0x" << std::hex << LIBCURL_VERSION_NUM;
ASSERT_TRUE(handler->is_called_);
ASSERT_TRUE(handler->got_response_);
}

TEST_F(BasicCurlHttpTests, BackgroundThreadWaitMore)
{
{
curl::HttpClient http_client;
http_client.MaybeSpawnBackgroundThread();
std::this_thread::sleep_for(std::chrono::milliseconds{10});
#if LIBCURL_VERSION_NUM >= 0x074200
ASSERT_FALSE(http_client.MaybeSpawnBackgroundThread());
#else
// low version curl do not support delay quit, so old background would quit
ASSERT_TRUE(http_client.MaybeSpawnBackgroundThread());
#endif
}
{
curl::HttpClient http_client;
http_client.SetBackgroundWaitFor(std::chrono::milliseconds::zero());
http_client.MaybeSpawnBackgroundThread();
std::this_thread::sleep_for(std::chrono::milliseconds{10});
// we can disable delay quit by set wait for 0
ASSERT_TRUE(http_client.MaybeSpawnBackgroundThread());
}
}

0 comments on commit 2b9bff9

Please sign in to comment.