diff --git a/CHANGELOG.md b/CHANGELOG.md index 22176df917..e1ee4af753 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,9 @@ Increment the: ## [Unreleased] +* [SDK] Do not frequently create and destroy http client threads + [#3198](https://github.com/open-telemetry/opentelemetry-cpp/pull/3198) + ## [1.18 2024-11-25] * [EXPORTER] Fix crash in ElasticsearchLogRecordExporter diff --git a/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h b/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h index 97386890f2..bef15997fc 100644 --- a/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h +++ b/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h @@ -322,25 +322,16 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient inline CURLM *GetMultiHandle() noexcept { return multi_handle_; } - void MaybeSpawnBackgroundThread(); + // return true if create background thread, false is already exist background thread + bool MaybeSpawnBackgroundThread(); void ScheduleAddSession(uint64_t session_id); void ScheduleAbortSession(uint64_t session_id); void ScheduleRemoveSession(uint64_t session_id, HttpCurlEasyResource &&resource); - void WaitBackgroundThreadExit() - { - std::unique_ptr background_thread; - { - std::lock_guard lock_guard{background_thread_m_}; - background_thread.swap(background_thread_); - } + void SetBackgroundWaitFor(std::chrono::milliseconds ms); - if (background_thread && background_thread->joinable()) - { - background_thread->join(); - } - } + void WaitBackgroundThreadExit(); private: void wakeupBackgroundThread(); @@ -366,6 +357,9 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient std::unique_ptr background_thread_; std::chrono::milliseconds scheduled_delay_milliseconds_; + std::chrono::milliseconds background_thread_wait_for_; + std::atomic is_shutdown_{false}; + nostd::shared_ptr curl_global_initializer_; }; diff --git a/ext/src/http/client/curl/http_client_curl.cc b/ext/src/http/client/curl/http_client_curl.cc index 6827b9f9c7..e6292f54f2 100644 --- a/ext/src/http/client/curl/http_client_curl.cc +++ b/ext/src/http/client/curl/http_client_curl.cc @@ -189,11 +189,13 @@ HttpClient::HttpClient() next_session_id_{0}, max_sessions_per_connection_{8}, scheduled_delay_milliseconds_{std::chrono::milliseconds(256)}, + background_thread_wait_for_{std::chrono::minutes{1}}, curl_global_initializer_(HttpCurlGlobalInitializer::GetInstance()) {} HttpClient::~HttpClient() { + is_shutdown_.store(true, std::memory_order_release); while (true) { std::unique_ptr background_thread; @@ -211,6 +213,7 @@ HttpClient::~HttpClient() } if (background_thread->joinable()) { + wakeupBackgroundThread(); // if delay quit, wake up first background_thread->join(); } } @@ -335,29 +338,33 @@ void HttpClient::CleanupSession(uint64_t session_id) } } -void HttpClient::MaybeSpawnBackgroundThread() +bool HttpClient::MaybeSpawnBackgroundThread() { std::lock_guard lock_guard{background_thread_m_}; if (background_thread_) { - return; + return false; } background_thread_.reset(new std::thread( [](HttpClient *self) { int still_running = 1; + std::chrono::system_clock::time_point last_free_job_timepoint = + std::chrono::system_clock::now(); + bool need_wait_more = false; while (true) { CURLMsg *msg; int queued; CURLMcode mc = curl_multi_perform(self->multi_handle_, &still_running); + // According to https://curl.se/libcurl/c/curl_multi_perform.html, when mc is not OK, we // can not curl_multi_perform it again if (mc != CURLM_OK) { self->resetMultiHandle(); } - else if (still_running) + else if (still_running || need_wait_more) { // curl_multi_poll is added from libcurl 7.66.0, before 7.68.0, we can only wait util // timeout to do the rest jobs @@ -416,6 +423,32 @@ void HttpClient::MaybeSpawnBackgroundThread() still_running = 1; } + std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); + if (still_running > 0) + { + last_free_job_timepoint = now; + need_wait_more = false; + continue; + } + + std::chrono::milliseconds wait_for = std::chrono::milliseconds::zero(); + +#if LIBCURL_VERSION_NUM >= 0x074400 + // only available with curl_multi_poll+curl_multi_wakeup, because curl_multi_wait would + // cause CPU busy, curl_multi_wait+sleep could not wakeup quickly + wait_for = self->background_thread_wait_for_; +#endif + if (self->is_shutdown_.load(std::memory_order_acquire)) + { + wait_for = std::chrono::milliseconds::zero(); + } + + if (now - last_free_job_timepoint < wait_for) + { + need_wait_more = true; + continue; + } + if (still_running == 0) { std::lock_guard lock_guard{self->background_thread_m_}; @@ -454,6 +487,7 @@ void HttpClient::MaybeSpawnBackgroundThread() } }, this)); + return true; } void HttpClient::ScheduleAddSession(uint64_t session_id) @@ -502,6 +536,28 @@ void HttpClient::ScheduleRemoveSession(uint64_t session_id, HttpCurlEasyResource wakeupBackgroundThread(); } +void HttpClient::SetBackgroundWaitFor(std::chrono::milliseconds ms) +{ + background_thread_wait_for_ = ms; +} + +void HttpClient::WaitBackgroundThreadExit() +{ + is_shutdown_.store(true, std::memory_order_release); + std::unique_ptr background_thread; + { + std::lock_guard lock_guard{background_thread_m_}; + background_thread.swap(background_thread_); + } + + if (background_thread && background_thread->joinable()) + { + wakeupBackgroundThread(); + background_thread->join(); + } + is_shutdown_.store(false, std::memory_order_release); +} + void HttpClient::wakeupBackgroundThread() { // Before libcurl 7.68.0, we can only wait for timeout and do the rest jobs diff --git a/ext/test/http/curl_http_test.cc b/ext/test/http/curl_http_test.cc index 1b4289f400..497c5bb529 100644 --- a/ext/test/http/curl_http_test.cc +++ b/ext/test/http/curl_http_test.cc @@ -1,6 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +#include #include #include #include @@ -11,6 +12,7 @@ #include #include #include +#include #include #include @@ -531,3 +533,51 @@ TEST_F(BasicCurlHttpTests, FinishInAsyncCallback) } } } + +TEST_F(BasicCurlHttpTests, ElegantQuitQuick) +{ + auto http_client = http_client::HttpClientFactory::Create(); + std::static_pointer_cast(http_client)->MaybeSpawnBackgroundThread(); + // start background first, then test it could wakeup + auto session = http_client->CreateSession("http://127.0.0.1:19000/get/"); + auto request = session->CreateRequest(); + request->SetUri("get/"); + auto handler = std::make_shared(); + session->SendRequest(handler); + std::this_thread::sleep_for(std::chrono::milliseconds{10}); // let it enter poll state + auto beg = std::chrono::system_clock::now(); + http_client->FinishAllSessions(); + http_client.reset(); + // when background_thread_wait_for_ is used, it should have no side effect on elegant quit + // wait should be less than scheduled_delay_milliseconds_ + // Due to load on CI hosts (some take 10ms), we assert it is less than 20ms + auto cost = std::chrono::system_clock::now() - beg; + ASSERT_TRUE(cost < std::chrono::milliseconds{20}) + << "cost ms: " << std::chrono::duration_cast(cost).count() + << " libcurl version: 0x" << std::hex << LIBCURL_VERSION_NUM; + ASSERT_TRUE(handler->is_called_); + ASSERT_TRUE(handler->got_response_); +} + +TEST_F(BasicCurlHttpTests, BackgroundThreadWaitMore) +{ + { + curl::HttpClient http_client; + http_client.MaybeSpawnBackgroundThread(); + std::this_thread::sleep_for(std::chrono::milliseconds{10}); +#if LIBCURL_VERSION_NUM >= 0x074200 + ASSERT_FALSE(http_client.MaybeSpawnBackgroundThread()); +#else + // low version curl do not support delay quit, so old background would quit + ASSERT_TRUE(http_client.MaybeSpawnBackgroundThread()); +#endif + } + { + curl::HttpClient http_client; + http_client.SetBackgroundWaitFor(std::chrono::milliseconds::zero()); + http_client.MaybeSpawnBackgroundThread(); + std::this_thread::sleep_for(std::chrono::milliseconds{10}); + // we can disable delay quit by set wait for 0 + ASSERT_TRUE(http_client.MaybeSpawnBackgroundThread()); + } +}