From 4520aa598b7c100eefbb4ce9a9f73cb50bb69514 Mon Sep 17 00:00:00 2001 From: WenTao Ou Date: Fri, 19 Jul 2024 18:12:48 +0800 Subject: [PATCH] [SDK] Fix crash in `PeriodicExportingMetricReader`. (#2983) --- api/include/opentelemetry/common/macros.h | 31 ++++++++ examples/plugin/plugin/tracer.cc | 1 + exporters/otlp/src/otlp_file_client.cc | 22 +++--- .../periodic_exporting_metric_reader.cc | 77 ++++++++++++++----- .../periodic_exporting_metric_reader_test.cc | 38 +++++++-- 5 files changed, 133 insertions(+), 36 deletions(-) diff --git a/api/include/opentelemetry/common/macros.h b/api/include/opentelemetry/common/macros.h index b4a270084d..b74c1048fc 100644 --- a/api/include/opentelemetry/common/macros.h +++ b/api/include/opentelemetry/common/macros.h @@ -387,6 +387,37 @@ point. #endif +// OPENTELEMETRY_HAVE_EXCEPTIONS +// +// Checks whether the compiler both supports and enables exceptions. Many +// compilers support a "no exceptions" mode that disables exceptions. +// +// Generally, when OPENTELEMETRY_HAVE_EXCEPTIONS is not defined: +// +// * Code using `throw` and `try` may not compile. +// * The `noexcept` specifier will still compile and behave as normal. +// * The `noexcept` operator may still return `false`. +// +// For further details, consult the compiler's documentation. +#ifndef OPENTELEMETRY_HAVE_EXCEPTIONS +# if defined(__clang__) && ((__clang_major__ * 100) + __clang_minor__) < 306 +// Clang < 3.6 +// http://releases.llvm.org/3.6.0/tools/clang/docs/ReleaseNotes.html#the-exceptions-macro +# if defined(__EXCEPTIONS) && OPENTELEMETRY_HAVE_FEATURE(cxx_exceptions) +# define OPENTELEMETRY_HAVE_EXCEPTIONS 1 +# endif // defined(__EXCEPTIONS) && OPENTELEMETRY_HAVE_FEATURE(cxx_exceptions) +# elif OPENTELEMETRY_HAVE_FEATURE(cxx_exceptions) +# define OPENTELEMETRY_HAVE_EXCEPTIONS 1 +// Handle remaining special cases and default to exceptions being supported. +# elif !(defined(__GNUC__) && !defined(__EXCEPTIONS) && !defined(__cpp_exceptions)) && \ + !(defined(_MSC_VER) && !defined(_CPPUNWIND)) +# define OPENTELEMETRY_HAVE_EXCEPTIONS 1 +# endif +#endif +#ifndef OPENTELEMETRY_HAVE_EXCEPTIONS +# define OPENTELEMETRY_HAVE_EXCEPTIONS 0 +#endif + /* OPENTELEMETRY_ATTRIBUTE_LIFETIME_BOUND indicates that a resource owned by a function parameter or implicit object parameter is retained by the return value of the diff --git a/examples/plugin/plugin/tracer.cc b/examples/plugin/plugin/tracer.cc index 5361197584..05ff971fb1 100644 --- a/examples/plugin/plugin/tracer.cc +++ b/examples/plugin/plugin/tracer.cc @@ -9,6 +9,7 @@ #include "opentelemetry/common/attribute_value.h" #include "opentelemetry/common/timestamp.h" #include "opentelemetry/context/context_value.h" +#include "opentelemetry/nostd/utility.h" #include "opentelemetry/trace/span_context.h" #include "opentelemetry/trace/span_metadata.h" #include "tracer.h" diff --git a/exporters/otlp/src/otlp_file_client.cc b/exporters/otlp/src/otlp_file_client.cc index 62552bdeff..8efa5c15b5 100644 --- a/exporters/otlp/src/otlp_file_client.cc +++ b/exporters/otlp/src/otlp_file_client.cc @@ -14,8 +14,6 @@ // clang-format on #include "google/protobuf/message.h" -#include "google/protobuf/reflection.h" -#include "google/protobuf/stubs/common.h" #include "nlohmann/json.hpp" // clang-format off @@ -28,15 +26,26 @@ #include "opentelemetry/sdk/common/global_log_handler.h" #include "opentelemetry/version.h" +#ifdef _MSC_VER +# include +# define strcasecmp _stricmp +#else +# include +#endif + +#include #include #include +#include #include -#include #include +#include #include +#include #include #include #include +#include #include #if !defined(__CYGWIN__) && defined(_WIN32) @@ -64,11 +73,8 @@ #else -# include -# include # include # include -# include # include # define FS_ACCESS(x) access(x, F_OK) @@ -89,10 +95,6 @@ # undef GetMessage #endif -#ifdef _MSC_VER -# define strcasecmp _stricmp -#endif - #if (defined(_MSC_VER) && _MSC_VER >= 1600) || \ (defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L) || defined(__STDC_LIB_EXT1__) # ifdef _MSC_VER diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index f6ae47977d..7ca2747337 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -13,6 +13,7 @@ #include #include +#include "opentelemetry/common/macros.h" #include "opentelemetry/common/timestamp.h" #include "opentelemetry/sdk/common/global_log_handler.h" #include "opentelemetry/sdk/metrics/export/metric_producer.h" @@ -29,6 +30,10 @@ # include #endif +#if OPENTELEMETRY_HAVE_EXCEPTIONS +# include +#endif + OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk { @@ -90,31 +95,61 @@ void PeriodicExportingMetricReader::DoBackgroundWork() bool PeriodicExportingMetricReader::CollectAndExportOnce() { std::atomic cancel_export_for_timeout{false}; - auto future_receive = std::async(std::launch::async, [this, &cancel_export_for_timeout] { - Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) { - if (cancel_export_for_timeout) - { - OTEL_INTERNAL_LOG_ERROR( - "[Periodic Exporting Metric Reader] Collect took longer configured time: " - << export_timeout_millis_.count() << " ms, and timed out"); - return false; - } - this->exporter_->Export(metric_data); - return true; - }); - }); - std::future_status status; std::uint64_t notify_force_flush = force_flush_pending_sequence_.load(std::memory_order_acquire); - do + std::unique_ptr task_thread; + +#if OPENTELEMETRY_HAVE_EXCEPTIONS + try { - status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_)); - if (status == std::future_status::timeout) +#endif + std::promise sender; + auto receiver = sender.get_future(); + + task_thread.reset(new std::thread([this, &cancel_export_for_timeout] { + this->Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) { + if (cancel_export_for_timeout.load(std::memory_order_acquire)) + { + OTEL_INTERNAL_LOG_ERROR( + "[Periodic Exporting Metric Reader] Collect took longer configured time: " + << this->export_timeout_millis_.count() << " ms, and timed out"); + return false; + } + this->exporter_->Export(metric_data); + return true; + }); + })); + + std::future_status status; + do { - cancel_export_for_timeout = true; - break; - } - } while (status != std::future_status::ready); + status = receiver.wait_for(std::chrono::milliseconds(export_timeout_millis_)); + if (status == std::future_status::timeout) + { + cancel_export_for_timeout.store(true, std::memory_order_release); + break; + } + } while (status != std::future_status::ready); +#if OPENTELEMETRY_HAVE_EXCEPTIONS + } + catch (std::exception &e) + { + OTEL_INTERNAL_LOG_ERROR("[Periodic Exporting Metric Reader] Collect failed with exception " + << e.what()); + return false; + } + catch (...) + { + OTEL_INTERNAL_LOG_ERROR( + "[Periodic Exporting Metric Reader] Collect failed with unknown exception"); + return false; + } +#endif + + if (task_thread && task_thread->joinable()) + { + task_thread->join(); + } std::uint64_t notified_sequence = force_flush_notified_sequence_.load(std::memory_order_acquire); while (notify_force_flush > notified_sequence) diff --git a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc index e115f79f75..f65c10ca05 100644 --- a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc +++ b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc @@ -7,6 +7,10 @@ #include +#include +#include +#include + using namespace opentelemetry; using namespace opentelemetry::sdk::instrumentationscope; using namespace opentelemetry::sdk::metrics; @@ -14,8 +18,14 @@ using namespace opentelemetry::sdk::metrics; class MockPushMetricExporter : public PushMetricExporter { public: + MockPushMetricExporter(std::chrono::milliseconds wait) : wait_(wait) {} + opentelemetry::sdk::common::ExportResult Export(const ResourceMetrics &record) noexcept override { + if (wait_ > std::chrono::milliseconds::zero()) + { + std::this_thread::sleep_for(wait_); + } records_.push_back(record); return opentelemetry::sdk::common::ExportResult::kSuccess; } @@ -34,6 +44,7 @@ class MockPushMetricExporter : public PushMetricExporter private: std::vector records_; + std::chrono::milliseconds wait_; }; class MockMetricProducer : public MetricProducer @@ -61,17 +72,34 @@ class MockMetricProducer : public MetricProducer TEST(PeriodicExporingMetricReader, BasicTests) { - std::unique_ptr exporter(new MockPushMetricExporter()); + std::unique_ptr exporter( + new MockPushMetricExporter(std::chrono::milliseconds{0})); PeriodicExportingMetricReaderOptions options; options.export_timeout_millis = std::chrono::milliseconds(200); options.export_interval_millis = std::chrono::milliseconds(500); auto exporter_ptr = exporter.get(); - PeriodicExportingMetricReader reader(std::move(exporter), options); + std::shared_ptr reader = + std::make_shared(std::move(exporter), options); MockMetricProducer producer; - reader.SetMetricProducer(&producer); + reader->SetMetricProducer(&producer); std::this_thread::sleep_for(std::chrono::milliseconds(2000)); - EXPECT_NO_THROW(reader.ForceFlush()); - reader.Shutdown(); + EXPECT_NO_THROW(reader->ForceFlush()); + reader->Shutdown(); EXPECT_EQ(static_cast(exporter_ptr)->GetDataCount(), static_cast(&producer)->GetDataCount()); } + +TEST(PeriodicExporingMetricReader, Timeout) +{ + std::unique_ptr exporter( + new MockPushMetricExporter(std::chrono::milliseconds{2000})); + PeriodicExportingMetricReaderOptions options; + options.export_timeout_millis = std::chrono::milliseconds(200); + options.export_interval_millis = std::chrono::milliseconds(500); + std::shared_ptr reader = + std::make_shared(std::move(exporter), options); + MockMetricProducer producer; + reader->SetMetricProducer(&producer); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + reader->Shutdown(); +}