From 3e840b03783d4c1f595e414b192bf238a88f73f6 Mon Sep 17 00:00:00 2001 From: owent Date: Tue, 14 Nov 2023 16:36:41 +0800 Subject: [PATCH 01/18] Use protobuf arena to reduce memory fragments and improve performence by using a modern malloc library. --- .../exporters/otlp/otlp_grpc_client.h | 1 + exporters/otlp/src/otlp_grpc_exporter.cc | 20 +++++++++++++++---- .../otlp/src/otlp_grpc_log_record_exporter.cc | 20 +++++++++++++++---- .../otlp/src/otlp_grpc_metric_exporter.cc | 20 +++++++++++++++---- exporters/otlp/src/otlp_http_exporter.cc | 17 +++++++++++++--- .../otlp/src/otlp_http_log_record_exporter.cc | 18 ++++++++++++++--- .../otlp/src/otlp_http_metric_exporter.cc | 20 +++++++++++++++---- 7 files changed, 94 insertions(+), 22 deletions(-) diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h index fa1a69d619..73447ce123 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h @@ -11,6 +11,7 @@ #include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" +#include "google/protobuf/arena.h" #include "opentelemetry/proto/collector/logs/v1/logs_service.grpc.pb.h" #include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h" #include "opentelemetry/proto/collector/trace/v1/trace_service.grpc.pb.h" diff --git a/exporters/otlp/src/otlp_grpc_exporter.cc b/exporters/otlp/src/otlp_grpc_exporter.cc index 30a9b63fe4..8fc4ce9272 100644 --- a/exporters/otlp/src/otlp_grpc_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_exporter.cc @@ -52,14 +52,26 @@ sdk::common::ExportResult OtlpGrpcExporter::Export( return sdk::common::ExportResult::kSuccess; } - proto::collector::trace::v1::ExportTraceServiceRequest request; - OtlpRecordableUtils::PopulateRequest(spans, &request); + google::protobuf::ArenaOptions arena_options; + // It's easy to allocate datas larger than 1024 when we populate basic resource and attributes + arena_options.initial_block_size = 1024; + // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager + // block to reduce memory fragments. + arena_options.max_block_size = 65536; + google::protobuf::Arena arena{arena_options}; + + proto::collector::trace::v1::ExportTraceServiceRequest *request = + google::protobuf::Arena::CreateMessage< + proto::collector::trace::v1::ExportTraceServiceRequest>(&arena); + OtlpRecordableUtils::PopulateRequest(spans, request); auto context = OtlpGrpcClient::MakeClientContext(options_); - proto::collector::trace::v1::ExportTraceServiceResponse response; + proto::collector::trace::v1::ExportTraceServiceResponse *response = + google::protobuf::Arena::CreateMessage< + proto::collector::trace::v1::ExportTraceServiceResponse>(&arena); grpc::Status status = - OtlpGrpcClient::DelegateExport(trace_service_stub_.get(), context.get(), request, &response); + OtlpGrpcClient::DelegateExport(trace_service_stub_.get(), context.get(), *request, response); if (!status.ok()) { diff --git a/exporters/otlp/src/otlp_grpc_log_record_exporter.cc b/exporters/otlp/src/otlp_grpc_log_record_exporter.cc index 2b397262b0..b0da987c43 100644 --- a/exporters/otlp/src/otlp_grpc_log_record_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_log_record_exporter.cc @@ -65,14 +65,26 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogRecordExporter::Export( return sdk::common::ExportResult::kSuccess; } - proto::collector::logs::v1::ExportLogsServiceRequest request; - OtlpRecordableUtils::PopulateRequest(logs, &request); + google::protobuf::ArenaOptions arena_options; + // It's easy to allocate datas larger than 1024 when we populate basic resource and attributes + arena_options.initial_block_size = 1024; + // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager + // block to reduce memory fragments. + arena_options.max_block_size = 65536; + google::protobuf::Arena arena{arena_options}; + + proto::collector::logs::v1::ExportLogsServiceRequest *request = + google::protobuf::Arena::CreateMessage( + &arena); + OtlpRecordableUtils::PopulateRequest(logs, request); auto context = OtlpGrpcClient::MakeClientContext(options_); - proto::collector::logs::v1::ExportLogsServiceResponse response; + proto::collector::logs::v1::ExportLogsServiceResponse *response = + google::protobuf::Arena::CreateMessage( + &arena); grpc::Status status = - OtlpGrpcClient::DelegateExport(log_service_stub_.get(), context.get(), request, &response); + OtlpGrpcClient::DelegateExport(log_service_stub_.get(), context.get(), *request, response); if (!status.ok()) { diff --git a/exporters/otlp/src/otlp_grpc_metric_exporter.cc b/exporters/otlp/src/otlp_grpc_metric_exporter.cc index fd503f9836..921eb4c481 100644 --- a/exporters/otlp/src/otlp_grpc_metric_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_metric_exporter.cc @@ -60,14 +60,26 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcMetricExporter::Export( return sdk::common::ExportResult::kSuccess; } - proto::collector::metrics::v1::ExportMetricsServiceRequest request; - OtlpMetricUtils::PopulateRequest(data, &request); + google::protobuf::ArenaOptions arena_options; + // It's easy to allocate datas larger than 1024 when we populate basic resource and attributes + arena_options.initial_block_size = 1024; + // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager + // block to reduce memory fragments. + arena_options.max_block_size = 65536; + google::protobuf::Arena arena{arena_options}; + + proto::collector::metrics::v1::ExportMetricsServiceRequest *request = + google::protobuf::Arena::CreateMessage< + proto::collector::metrics::v1::ExportMetricsServiceRequest>(&arena); + OtlpMetricUtils::PopulateRequest(data, request); auto context = OtlpGrpcClient::MakeClientContext(options_); - proto::collector::metrics::v1::ExportMetricsServiceResponse response; + proto::collector::metrics::v1::ExportMetricsServiceResponse *response = + google::protobuf::Arena::CreateMessage< + proto::collector::metrics::v1::ExportMetricsServiceResponse>(&arena); grpc::Status status = OtlpGrpcClient::DelegateExport(metrics_service_stub_.get(), context.get(), - request, &response); + *request, response); if (!status.ok()) { diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index 6732e31431..977ea2a0cb 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -7,6 +7,7 @@ #include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" +#include "google/protobuf/arena.h" #include "opentelemetry/proto/collector/trace/v1/trace_service.pb.h" #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" @@ -95,12 +96,22 @@ opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export( return opentelemetry::sdk::common::ExportResult::kSuccess; } - proto::collector::trace::v1::ExportTraceServiceRequest service_request; - OtlpRecordableUtils::PopulateRequest(spans, &service_request); + google::protobuf::ArenaOptions arena_options; + // It's easy to allocate datas larger than 1024 when we populate basic resource and attributes + arena_options.initial_block_size = 1024; + // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager + // block to reduce memory fragments. + arena_options.max_block_size = 65536; + google::protobuf::Arena arena{arena_options}; + + proto::collector::trace::v1::ExportTraceServiceRequest *service_request = + google::protobuf::Arena::CreateMessage< + proto::collector::trace::v1::ExportTraceServiceRequest>(&arena); + OtlpRecordableUtils::PopulateRequest(spans, service_request); std::size_t span_count = spans.size(); #ifdef ENABLE_ASYNC_EXPORT http_client_->Export( - service_request, [span_count](opentelemetry::sdk::common::ExportResult result) { + *service_request, [span_count](opentelemetry::sdk::common::ExportResult result) { if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " diff --git a/exporters/otlp/src/otlp_http_log_record_exporter.cc b/exporters/otlp/src/otlp_http_log_record_exporter.cc index 9414e2fd76..db4b813c02 100644 --- a/exporters/otlp/src/otlp_http_log_record_exporter.cc +++ b/exporters/otlp/src/otlp_http_log_record_exporter.cc @@ -7,6 +7,7 @@ #include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" +#include "google/protobuf/arena.h" #include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h" #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" @@ -98,12 +99,23 @@ opentelemetry::sdk::common::ExportResult OtlpHttpLogRecordExporter::Export( { return opentelemetry::sdk::common::ExportResult::kSuccess; } - proto::collector::logs::v1::ExportLogsServiceRequest service_request; - OtlpRecordableUtils::PopulateRequest(logs, &service_request); + + google::protobuf::ArenaOptions arena_options; + // It's easy to allocate datas larger than 1024 when we populate basic resource and attributes + arena_options.initial_block_size = 1024; + // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager + // block to reduce memory fragments. + arena_options.max_block_size = 65536; + google::protobuf::Arena arena{arena_options}; + + proto::collector::logs::v1::ExportLogsServiceRequest *service_request = + google::protobuf::Arena::CreateMessage( + &arena); + OtlpRecordableUtils::PopulateRequest(logs, service_request); std::size_t log_count = logs.size(); #ifdef ENABLE_ASYNC_EXPORT http_client_->Export( - service_request, [log_count](opentelemetry::sdk::common::ExportResult result) { + *service_request, [log_count](opentelemetry::sdk::common::ExportResult result) { if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " diff --git a/exporters/otlp/src/otlp_http_metric_exporter.cc b/exporters/otlp/src/otlp_http_metric_exporter.cc index e4f667eb3e..52c25b3a34 100644 --- a/exporters/otlp/src/otlp_http_metric_exporter.cc +++ b/exporters/otlp/src/otlp_http_metric_exporter.cc @@ -6,6 +6,7 @@ #include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" +#include "google/protobuf/arena.h" #include "opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h" #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" @@ -101,12 +102,23 @@ opentelemetry::sdk::common::ExportResult OtlpHttpMetricExporter::Export( { return opentelemetry::sdk::common::ExportResult::kSuccess; } - proto::collector::metrics::v1::ExportMetricsServiceRequest service_request; - OtlpMetricUtils::PopulateRequest(data, &service_request); + + google::protobuf::ArenaOptions arena_options; + // It's easy to allocate datas larger than 1024 when we populate basic resource and attributes + arena_options.initial_block_size = 1024; + // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager + // block to reduce memory fragments. + arena_options.max_block_size = 65536; + google::protobuf::Arena arena{arena_options}; + + proto::collector::metrics::v1::ExportMetricsServiceRequest *service_request = + google::protobuf::Arena::CreateMessage< + proto::collector::metrics::v1::ExportMetricsServiceRequest>(&arena); + OtlpMetricUtils::PopulateRequest(data, service_request); std::size_t metric_count = data.scope_metric_data_.size(); #ifdef ENABLE_ASYNC_EXPORT - http_client_->Export(service_request, [metric_count]( - opentelemetry::sdk::common::ExportResult result) { + http_client_->Export(*service_request, [metric_count]( + opentelemetry::sdk::common::ExportResult result) { if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " From 9b3b057bddbdeeb7806de75eb6745eaf7ff20d7e Mon Sep 17 00:00:00 2001 From: owent Date: Wed, 15 Nov 2023 21:08:19 +0800 Subject: [PATCH 02/18] Add async exporting for OTLP gRPC exporter --- api/include/opentelemetry/common/macros.h | 7 +- .../exporters/otlp/otlp_grpc_client.h | 112 +++- .../exporters/otlp/otlp_grpc_client_options.h | 8 + .../exporters/otlp/otlp_grpc_exporter.h | 6 + .../otlp/otlp_grpc_log_record_exporter.h | 6 + .../otlp/otlp_grpc_metric_exporter.h | 6 + exporters/otlp/src/otlp_grpc_client.cc | 489 +++++++++++++++++- exporters/otlp/src/otlp_grpc_exporter.cc | 68 ++- .../otlp/src/otlp_grpc_exporter_options.cc | 5 + .../otlp/src/otlp_grpc_log_record_exporter.cc | 66 ++- .../otlp_grpc_log_record_exporter_options.cc | 5 + .../otlp/src/otlp_grpc_metric_exporter.cc | 61 ++- .../src/otlp_grpc_metric_exporter_options.cc | 5 + exporters/otlp/src/otlp_http_exporter.cc | 4 +- .../otlp/src/otlp_http_log_record_exporter.cc | 3 +- .../otlp/src/otlp_http_metric_exporter.cc | 4 +- 16 files changed, 787 insertions(+), 68 deletions(-) diff --git a/api/include/opentelemetry/common/macros.h b/api/include/opentelemetry/common/macros.h index dd40c63bfa..c3b9fb82c0 100644 --- a/api/include/opentelemetry/common/macros.h +++ b/api/include/opentelemetry/common/macros.h @@ -7,9 +7,7 @@ // GCC 9 has likely attribute but do not support declare it at the beginning of statement # if defined(__has_cpp_attribute) && (defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 9) # if __has_cpp_attribute(likely) -# define OPENTELEMETRY_LIKELY_IF(...) \ - if (__VA_ARGS__) \ - [[likely]] +# define OPENTELEMETRY_LIKELY_IF(...) if (__VA_ARGS__) [[likely]] # endif # endif @@ -164,16 +162,19 @@ point. #if defined(__clang__) # define OPENTELEMETRY_API_SINGLETON __attribute__((visibility("default"))) +# define OPENTELEMETRY_LOCAL_SYMBOL __attribute__((visibility("hidden"))) #elif defined(__GNUC__) # define OPENTELEMETRY_API_SINGLETON __attribute__((visibility("default"))) +# define OPENTELEMETRY_LOCAL_SYMBOL __attribute__((visibility("hidden"))) #else /* Add support for other compilers here. */ # define OPENTELEMETRY_API_SINGLETON +# define OPENTELEMETRY_LOCAL_SYMBOL #endif diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h index 73447ce123..924a94e466 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h @@ -3,10 +3,13 @@ #pragma once +#include #include #include +#include "opentelemetry/sdk/common/exporter_utils.h" + #include "opentelemetry/exporters/otlp/otlp_grpc_client_options.h" #include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" @@ -26,12 +29,21 @@ namespace otlp struct OtlpGrpcClientOptions; +#ifdef ENABLE_ASYNC_EXPORT +struct OtlpGrpcClientAsyncData; +#endif + /** * The OTLP gRPC client contains utility functions of gRPC. */ class OtlpGrpcClient { public: +#ifdef ENABLE_ASYNC_EXPORT + OtlpGrpcClient(); + ~OtlpGrpcClient(); +#endif + /** * Create gRPC channel from the exporter options. */ @@ -68,21 +80,109 @@ class OtlpGrpcClient static grpc::Status DelegateExport( proto::collector::trace::v1::TraceService::StubInterface *stub, - grpc::ClientContext *context, - const proto::collector::trace::v1::ExportTraceServiceRequest &request, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::trace::v1::ExportTraceServiceRequest &&request, proto::collector::trace::v1::ExportTraceServiceResponse *response); static grpc::Status DelegateExport( proto::collector::metrics::v1::MetricsService::StubInterface *stub, - grpc::ClientContext *context, - const proto::collector::metrics::v1::ExportMetricsServiceRequest &request, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::metrics::v1::ExportMetricsServiceRequest &&request, proto::collector::metrics::v1::ExportMetricsServiceResponse *response); static grpc::Status DelegateExport( proto::collector::logs::v1::LogsService::StubInterface *stub, - grpc::ClientContext *context, - const proto::collector::logs::v1::ExportLogsServiceRequest &request, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::logs::v1::ExportLogsServiceRequest &&request, proto::collector::logs::v1::ExportLogsServiceResponse *response); + +#ifdef ENABLE_ASYNC_EXPORT + + /** + * Async export + * @param options Options used to message to create gRPC context and stub(if necessary) + * @param arena Protobuf arena to hold lifetime of all messages + * @param request Request for this RPC + * @param result_callback callback to call when the exporting is done + * @return return the status of this operation + */ + sdk::common::ExportResult DelegateAsyncExport( + const OtlpGrpcClientOptions &options, + proto::collector::trace::v1::TraceService::StubInterface *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::trace::v1::ExportTraceServiceRequest &&request, + std::function &&, + const proto::collector::trace::v1::ExportTraceServiceRequest &, + proto::collector::trace::v1::ExportTraceServiceResponse *)> + &&result_callback) noexcept; + + /** + * Async export + * @param options Options used to message to create gRPC context and stub(if necessary) + * @param arena Protobuf arena to hold lifetime of all messages + * @param request Request for this RPC + * @param result_callback callback to call when the exporting is done + * @return return the status of this operation + */ + sdk::common::ExportResult DelegateAsyncExport( + const OtlpGrpcClientOptions &options, + proto::collector::metrics::v1::MetricsService::StubInterface *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::metrics::v1::ExportMetricsServiceRequest &&request, + std::function &&, + const proto::collector::metrics::v1::ExportMetricsServiceRequest &, + proto::collector::metrics::v1::ExportMetricsServiceResponse *)> + &&result_callback) noexcept; + + /** + * Async export + * @param options Options used to message to create gRPC context and stub(if necessary) + * @param arena Protobuf arena to hold lifetime of all messages + * @param request Request for this RPC + * @param result_callback callback to call when the exporting is done + * @return return the status of this operation + */ + sdk::common::ExportResult DelegateAsyncExport( + const OtlpGrpcClientOptions &options, + proto::collector::logs::v1::LogsService::StubInterface *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::logs::v1::ExportLogsServiceRequest &&request, + std::function &&, + const proto::collector::logs::v1::ExportLogsServiceRequest &, + proto::collector::logs::v1::ExportLogsServiceResponse *)> + &&result_callback) noexcept; + + /** + * Force flush the gRPC client. + */ + bool ForceFlush(std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept; + + /** + * Shut down the gRPC client. + * @param timeout an optional timeout, the default timeout of 0 means that no + * timeout is applied. + * @return return the status of this operation + */ + bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept; + + std::shared_ptr MutableAsyncData(const OtlpGrpcClientOptions &options); + +private: + // Stores if this gRPC client had its Shutdown() method called + bool is_shutdown_; + + // Stores shared data between threads of this gRPC client + std::shared_ptr async_data_; +#endif }; } // namespace otlp } // namespace exporter diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client_options.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client_options.h index 310fc94d4d..1191c2118a 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client_options.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client_options.h @@ -51,6 +51,14 @@ struct OtlpGrpcClientOptions /** User agent. */ std::string user_agent; + + /** max number of threads that can be allocated from this */ + std::size_t max_threads; + +#ifdef ENABLE_ASYNC_EXPORT + // Concurrent requests + std::size_t max_concurrent_requests; +#endif }; } // namespace otlp diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h index 7aff1e24a5..393b54f6f4 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h @@ -23,6 +23,8 @@ namespace exporter namespace otlp { +class OtlpGrpcClient; + /** * The OTLP exporter exports span data in OpenTelemetry Protocol (OTLP) format. */ @@ -73,6 +75,10 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter // The configuration options associated with this exporter. const OtlpGrpcExporterOptions options_; +#ifdef ENABLE_ASYNC_EXPORT + std::shared_ptr client_; +#endif + // For testing friend class OtlpGrpcExporterTestPeer; friend class OtlpGrpcLogRecordExporterTestPeer; diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h index 29333703b1..149234f9fa 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h @@ -22,6 +22,8 @@ namespace exporter namespace otlp { +class OtlpGrpcClient; + /** * The OTLP exporter exports log data in OpenTelemetry Protocol (OTLP) format in gRPC. */ @@ -73,6 +75,10 @@ class OtlpGrpcLogRecordExporter : public opentelemetry::sdk::logs::LogRecordExpo // Configuration options for the exporter const OtlpGrpcLogRecordExporterOptions options_; +#ifdef ENABLE_ASYNC_EXPORT + std::shared_ptr client_; +#endif + // For testing friend class OtlpGrpcLogRecordExporterTestPeer; diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_metric_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_metric_exporter.h index 5f975c8ce3..1385c381ec 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_metric_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_metric_exporter.h @@ -22,6 +22,8 @@ namespace exporter namespace otlp { +class OtlpGrpcClient; + /** * The OTLP exporter exports metrics data in OpenTelemetry Protocol (OTLP) format in gRPC. */ @@ -59,6 +61,10 @@ class OtlpGrpcMetricExporter : public opentelemetry::sdk::metrics::PushMetricExp // The configuration options associated with this exporter. const OtlpGrpcMetricExporterOptions options_; +#ifdef ENABLE_ASYNC_EXPORT + std::shared_ptr client_; +#endif + // Aggregation Temporality selector const sdk::metrics::AggregationTemporalitySelector aggregation_temporality_selector_; diff --git a/exporters/otlp/src/otlp_grpc_client.cc b/exporters/otlp/src/otlp_grpc_client.cc index c0436eddaf..7a6cf4ccc1 100644 --- a/exporters/otlp/src/otlp_grpc_client.cc +++ b/exporters/otlp/src/otlp_grpc_client.cc @@ -9,12 +9,19 @@ # include #endif +#include +#include #include #include #include +#include #include +#include +#include +#include "opentelemetry/common/timestamp.h" #include "opentelemetry/ext/http/common/url_parser.h" +#include "opentelemetry/nostd/function_ref.h" #include "opentelemetry/sdk/common/global_log_handler.h" OPENTELEMETRY_BEGIN_NAMESPACE @@ -23,6 +30,77 @@ namespace exporter namespace otlp { +#ifdef ENABLE_ASYNC_EXPORT + +namespace +{ +struct OPENTELEMETRY_LOCAL_SYMBOL OtlpGrpcAsyncCallDataBase +{ + using GrpcAsyncCallback = bool (*)(OtlpGrpcAsyncCallDataBase *); + + std::unique_ptr arena; + grpc::Status grpc_status; + std::unique_ptr grpc_context; + + opentelemetry::sdk::common::ExportResult export_result = + opentelemetry::sdk::common::ExportResult::kFailure; + GrpcAsyncCallback grpc_async_callback = nullptr; + + OtlpGrpcAsyncCallDataBase() {} + virtual ~OtlpGrpcAsyncCallDataBase() {} +}; + +template +struct OPENTELEMETRY_LOCAL_SYMBOL OtlpGrpcAsyncCallData : public OtlpGrpcAsyncCallDataBase +{ + using RequestType = GrpcRequestType; + using ResponseType = GrpcResponseType; + + RequestType *request = nullptr; + ResponseType *response = nullptr; + std::unique_ptr> response_reader; + + std::function &&, + const RequestType &, + ResponseType *)> + result_callback; + + OtlpGrpcAsyncCallData() {} + virtual ~OtlpGrpcAsyncCallData() {} +}; +} // namespace + +struct OtlpGrpcClientAsyncData +{ + std::chrono::system_clock::duration export_timeout = std::chrono::seconds{10}; + + // The best performance trade-off of gRPC is having numcpu's threads and one completion queue + // per thread, but this exporter should not cost a lot resource and we don't want to create + // too many threads in the process. So we use one completion queue. + grpc::CompletionQueue cq; + + // Running requests, this is used to limit the number of concurrent requests. + std::atomic running_requests{0}; + // Request counter is used to record ForceFlush. + std::atomic start_request_counter{0}; + std::atomic finished_request_counter{0}; + std::size_t max_concurrent_requests = 64; + + // Condition variable and mutex to control the concurrency count of running requests. + std::mutex session_waker_lock; + std::condition_variable session_waker; + + std::mutex background_thread_m; + std::unique_ptr background_thread; + + // Do not use OtlpGrpcClientAsyncData() = default; here, some versions of GCC&Clang have BUGs + // and may not initialize the member correctly. See also + // https://stackoverflow.com/questions/53408962/try-to-understand-compiler-error-message-default-member-initializer-required-be + OtlpGrpcClientAsyncData() {} +}; +#endif + namespace { // ----------------------------- Helper functions ------------------------------ @@ -46,16 +124,216 @@ static std::string GetFileContentsOrInMemoryContents(const std::string &file_pat return contents; } +static void MaybeSpawnBackgroundThread(std::shared_ptr async_data) +{ + std::lock_guard lock_guard{async_data->background_thread_m}; + if (async_data->background_thread) + { + return; + } + + async_data->background_thread.reset(new std::thread([async_data]() { + bool running = true; + while (running) + { + void *tag = nullptr; + bool ok = false; + // If there is no job in exporting_timeout+5 minutes, we can exit this thread and start + // another thread later when new datas arrived. + auto got_status = async_data->cq.AsyncNext( + &tag, &ok, + std::chrono::system_clock::now() + std::chrono::minutes{5} + async_data->export_timeout); + if (grpc::CompletionQueue::SHUTDOWN == got_status) + { + std::lock_guard internal_lock_guard{async_data->background_thread_m}; + if (async_data->background_thread) + { + async_data->background_thread->detach(); + async_data->background_thread.reset(); + } + break; + } + if (grpc::CompletionQueue::TIMEOUT == got_status) + { + std::lock_guard internal_lock_guard{async_data->background_thread_m}; + running = async_data->running_requests.load(std::memory_order_acquire) > 0; + + if (!running) + { + if (async_data->background_thread) + { + async_data->background_thread->detach(); + async_data->background_thread.reset(); + } + break; + } + } + + if (nullptr == tag) + { + continue; + } + + auto callback_data = reinterpret_cast(tag); + --async_data->running_requests; + ++async_data->finished_request_counter; + + if (callback_data->grpc_status.ok()) + { + callback_data->export_result = opentelemetry::sdk::common::ExportResult::kSuccess; + } + + if (callback_data->grpc_async_callback) + { + callback_data->grpc_async_callback(callback_data); + delete callback_data; + } + + // Maybe wake up blocking DelegateAsyncExport() call + async_data->session_waker.notify_all(); + } + })); +} + +template +static sdk::common::ExportResult InternalDelegateAsyncExport( + std::shared_ptr async_data, + StubType *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + RequestType &&request, + std::function &&, + const RequestType &, + ResponseType *)> &&result_callback, + int32_t export_data_count, + const char *export_data_name) noexcept +{ + if (async_data->running_requests.load(std::memory_order_acquire) >= + async_data->max_concurrent_requests) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " + << export_data_count << " " << export_data_name + << " failed, exporter queue is full"); + if (result_callback) + { + result_callback(opentelemetry::sdk::common::ExportResult::kFailureFull, std::move(arena), + request, nullptr); + } + return opentelemetry::sdk::common::ExportResult::kFailureFull; + } + + OtlpGrpcAsyncCallData *call_data = + new OtlpGrpcAsyncCallData(); + call_data->arena.swap(arena); + call_data->result_callback.swap(result_callback); + + call_data->request = + google::protobuf::Arena::Create(call_data->arena.get(), std::move(request)); + call_data->response = google::protobuf::Arena::Create(call_data->arena.get()); + + if (call_data->request == nullptr || call_data->response == nullptr) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " + << export_data_count << " " << export_data_name + << " failed, create gRPC request/response failed"); + + if (call_data->result_callback) + { + call_data->result_callback( + opentelemetry::sdk::common::ExportResult::kFailure, std::move(call_data->arena), + nullptr == call_data->request ? request : *call_data->request, call_data->response); + } + + delete call_data; + return opentelemetry::sdk::common::ExportResult::kFailure; + } + call_data->grpc_context.swap(context); + + call_data->grpc_async_callback = [](OtlpGrpcAsyncCallDataBase *base_call_data) { + OtlpGrpcAsyncCallData *real_call_data = + static_cast *>(base_call_data); + if (real_call_data->result_callback) + { + return real_call_data->result_callback(real_call_data->export_result, + std::move(real_call_data->arena), + *real_call_data->request, real_call_data->response); + } + + return true; + }; + + call_data->response_reader = + stub->AsyncExport(call_data->grpc_context.get(), *call_data->request, &async_data->cq); + if (!call_data->response_reader) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " + << export_data_count << " " << export_data_name + << " failed, call AsyncExport failed"); + + if (call_data->result_callback) + { + call_data->result_callback( + opentelemetry::sdk::common::ExportResult::kFailure, std::move(call_data->arena), + nullptr == call_data->request ? request : *call_data->request, call_data->response); + } + delete call_data; + return opentelemetry::sdk::common::ExportResult::kFailure; + } + + call_data->response_reader->Finish(call_data->response, &call_data->grpc_status, + reinterpret_cast(call_data)); + + { + ++async_data->start_request_counter; + ++async_data->running_requests; + } + + // Maybe spawn background thread to handle the completion queue + MaybeSpawnBackgroundThread(async_data); + + // Can not cancle when start the request + { + std::unique_lock lock{async_data->session_waker_lock}; + async_data->session_waker.wait_for(lock, async_data->export_timeout, [async_data]() { + return async_data->running_requests.load(std::memory_order_acquire) <= + async_data->max_concurrent_requests; + }); + } + + return opentelemetry::sdk::common::ExportResult::kSuccess; +} + } // namespace +#ifdef ENABLE_ASYNC_EXPORT +OtlpGrpcClient::OtlpGrpcClient() : is_shutdown_{false} {} + +OtlpGrpcClient::~OtlpGrpcClient() +{ + std::shared_ptr async_data; + async_data.swap(async_data_); + + while (async_data->running_requests.load(std::memory_order_acquire) > 0) + { + std::unique_lock lock{async_data->session_waker_lock}; + async_data->session_waker.wait_for(lock, async_data->export_timeout, [async_data]() { + return async_data->running_requests.load(std::memory_order_acquire) <= + async_data->max_concurrent_requests; + }); + } +} + +#endif + std::shared_ptr OtlpGrpcClient::MakeChannel(const OtlpGrpcClientOptions &options) { std::shared_ptr channel; // - // Scheme is allowed in OTLP endpoint definition, but is not allowed for creating gRPC channel. - // Passing URI with scheme to grpc::CreateChannel could resolve the endpoint to some unexpected - // address. + // Scheme is allowed in OTLP endpoint definition, but is not allowed for creating gRPC + // channel. Passing URI with scheme to grpc::CreateChannel could resolve the endpoint to some + // unexpected address. // ext::http::common::UrlParser url(options.endpoint); @@ -70,6 +348,13 @@ std::shared_ptr OtlpGrpcClient::MakeChannel(const OtlpGrpcClientO grpc::ChannelArguments grpc_arguments; grpc_arguments.SetUserAgentPrefix(options.user_agent); + if (options.max_threads > 0) + { + grpc::ResourceQuota quota; + quota.SetMaxThreads(static_cast(options.max_threads)); + grpc_arguments.SetResourceQuota(quota); + } + if (options.use_ssl_credentials) { grpc::SslCredentialsOptions ssl_opts; @@ -79,7 +364,7 @@ std::shared_ptr OtlpGrpcClient::MakeChannel(const OtlpGrpcClientO ssl_opts.pem_private_key = GetFileContentsOrInMemoryContents(options.ssl_client_key_path, options.ssl_client_key_string); ssl_opts.pem_cert_chain = GetFileContentsOrInMemoryContents(options.ssl_client_cert_path, - options.ssl_client_cert_string); + options.ssl_client_cert_string); #endif channel = grpc::CreateCustomChannel(grpc_target, grpc::SslCredentials(ssl_opts), grpc_arguments); @@ -140,31 +425,209 @@ OtlpGrpcClient::MakeLogsServiceStub(const OtlpGrpcClientOptions &options) grpc::Status OtlpGrpcClient::DelegateExport( proto::collector::trace::v1::TraceService::StubInterface *stub, - grpc::ClientContext *context, - const proto::collector::trace::v1::ExportTraceServiceRequest &request, + std::unique_ptr &&context, + std::unique_ptr && /*arena*/, + proto::collector::trace::v1::ExportTraceServiceRequest &&request, proto::collector::trace::v1::ExportTraceServiceResponse *response) { - return stub->Export(context, request, response); + return stub->Export(context.get(), request, response); } grpc::Status OtlpGrpcClient::DelegateExport( proto::collector::metrics::v1::MetricsService::StubInterface *stub, - grpc::ClientContext *context, - const proto::collector::metrics::v1::ExportMetricsServiceRequest &request, + std::unique_ptr &&context, + std::unique_ptr && /*arena*/, + proto::collector::metrics::v1::ExportMetricsServiceRequest &&request, proto::collector::metrics::v1::ExportMetricsServiceResponse *response) { - return stub->Export(context, request, response); + return stub->Export(context.get(), request, response); } grpc::Status OtlpGrpcClient::DelegateExport( proto::collector::logs::v1::LogsService::StubInterface *stub, - grpc::ClientContext *context, - const proto::collector::logs::v1::ExportLogsServiceRequest &request, + std::unique_ptr &&context, + std::unique_ptr && /*arena*/, + proto::collector::logs::v1::ExportLogsServiceRequest &&request, proto::collector::logs::v1::ExportLogsServiceResponse *response) { - return stub->Export(context, request, response); + return stub->Export(context.get(), request, response); +} + +#ifdef ENABLE_ASYNC_EXPORT + +/** + * Async export + * @param options Options used to message to create gRPC context and stub(if necessary) + * @param arena Protobuf arena to hold lifetime of all messages + * @param request Request for this RPC + * @param result_callback callback to call when the exporting is done + * @return return the status of this operation + */ +sdk::common::ExportResult OtlpGrpcClient::DelegateAsyncExport( + const OtlpGrpcClientOptions &options, + proto::collector::trace::v1::TraceService::StubInterface *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::trace::v1::ExportTraceServiceRequest &&request, + std::function &&, + const proto::collector::trace::v1::ExportTraceServiceRequest &, + proto::collector::trace::v1::ExportTraceServiceResponse *)> + &&result_callback) noexcept +{ + auto span_count = request.resource_spans_size(); + if (is_shutdown_) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " + << span_count << " trace span(s) failed, exporter is shutdown"); + if (result_callback) + { + result_callback(opentelemetry::sdk::common::ExportResult::kFailure, std::move(arena), request, + nullptr); + } + return opentelemetry::sdk::common::ExportResult::kFailure; + } + + std::shared_ptr async_data = MutableAsyncData(options); + return InternalDelegateAsyncExport(async_data, stub, std::move(context), std::move(arena), + std::move(request), std::move(result_callback), span_count, + "trace span(s)"); +} + +sdk::common::ExportResult OtlpGrpcClient::DelegateAsyncExport( + const OtlpGrpcClientOptions &options, + proto::collector::metrics::v1::MetricsService::StubInterface *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::metrics::v1::ExportMetricsServiceRequest &&request, + std::function &&, + const proto::collector::metrics::v1::ExportMetricsServiceRequest &, + proto::collector::metrics::v1::ExportMetricsServiceResponse *)> + &&result_callback) noexcept +{ + auto metrics_count = request.resource_metrics_size(); + if (is_shutdown_) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " + << metrics_count << " metric(s) failed, exporter is shutdown"); + if (result_callback) + { + result_callback(opentelemetry::sdk::common::ExportResult::kFailure, std::move(arena), request, + nullptr); + } + return opentelemetry::sdk::common::ExportResult::kFailure; + } + + std::shared_ptr async_data = MutableAsyncData(options); + return InternalDelegateAsyncExport(async_data, stub, std::move(context), std::move(arena), + std::move(request), std::move(result_callback), metrics_count, + "metric(s)"); +} + +sdk::common::ExportResult OtlpGrpcClient::DelegateAsyncExport( + const OtlpGrpcClientOptions &options, + proto::collector::logs::v1::LogsService::StubInterface *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::logs::v1::ExportLogsServiceRequest &&request, + std::function &&, + const proto::collector::logs::v1::ExportLogsServiceRequest &, + proto::collector::logs::v1::ExportLogsServiceResponse *)> + &&result_callback) noexcept +{ + auto logs_count = request.resource_logs_size(); + if (is_shutdown_) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " + << logs_count << " log(s) failed, exporter is shutdown"); + if (result_callback) + { + result_callback(opentelemetry::sdk::common::ExportResult::kFailure, std::move(arena), request, + nullptr); + } + return opentelemetry::sdk::common::ExportResult::kFailure; + } + + std::shared_ptr async_data = MutableAsyncData(options); + return InternalDelegateAsyncExport(async_data, stub, std::move(context), std::move(arena), + std::move(request), std::move(result_callback), logs_count, + "log(s)"); +} + +std::shared_ptr OtlpGrpcClient::MutableAsyncData( + const OtlpGrpcClientOptions &options) +{ + if (!async_data_) + { + async_data_ = std::make_shared(); + async_data_->export_timeout = options.timeout; + async_data_->max_concurrent_requests = options.max_concurrent_requests; + } + + return async_data_; +} + +bool OtlpGrpcClient::ForceFlush(std::chrono::microseconds timeout) noexcept +{ + if (!async_data_) + { + return true; + } + + std::size_t request_counter = async_data_->start_request_counter.load(std::memory_order_acquire); + if (request_counter <= async_data_->finished_request_counter.load(std::memory_order_acquire)) + { + return true; + } + + // ASAN will report chrono: runtime error: signed integer overflow: A + B cannot be represented + // in type 'long int' here. So we reset timeout to meet signed long int limit here. + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + + // Wait for all the sessions to finish + std::unique_lock lock(async_data_->session_waker_lock); + + std::chrono::steady_clock::duration timeout_steady = + std::chrono::duration_cast(timeout); + if (timeout_steady <= std::chrono::steady_clock::duration::zero()) + { + timeout_steady = std::chrono::steady_clock::duration::max(); + } + + while (timeout_steady > std::chrono::steady_clock::duration::zero()) + { + // When changes of running_sessions_ and notify_one/notify_all happen between predicate + // checking and waiting, we should not wait forever.We should cleanup gc sessions here as soon + // as possible to call FinishSession() and cleanup resources. + std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); + if (std::cv_status::timeout != + async_data_->session_waker.wait_for(lock, async_data_->export_timeout)) + { + break; + } + + timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; + } + + return timeout_steady > std::chrono::steady_clock::duration::zero(); +} + +bool OtlpGrpcClient::Shutdown(std::chrono::microseconds timeout) noexcept +{ + if (!async_data_) + { + return true; + } + + async_data_->cq.Shutdown(); + return ForceFlush(timeout); } +#endif + } // namespace otlp } // namespace exporter OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/otlp/src/otlp_grpc_exporter.cc b/exporters/otlp/src/otlp_grpc_exporter.cc index 8fc4ce9272..fcf4b884fd 100644 --- a/exporters/otlp/src/otlp_grpc_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_exporter.cc @@ -23,12 +23,20 @@ namespace otlp OtlpGrpcExporter::OtlpGrpcExporter() : OtlpGrpcExporter(OtlpGrpcExporterOptions()) {} OtlpGrpcExporter::OtlpGrpcExporter(const OtlpGrpcExporterOptions &options) - : options_(options), trace_service_stub_(OtlpGrpcClient::MakeTraceServiceStub(options)) + : options_(options), +#ifdef ENABLE_ASYNC_EXPORT + client_(std::make_shared()), +#endif + trace_service_stub_(OtlpGrpcClient::MakeTraceServiceStub(options)) {} OtlpGrpcExporter::OtlpGrpcExporter( std::unique_ptr stub) - : options_(OtlpGrpcExporterOptions()), trace_service_stub_(std::move(stub)) + : options_(OtlpGrpcExporterOptions()), +#ifdef ENABLE_ASYNC_EXPORT + client_(std::make_shared()), +#endif + trace_service_stub_(std::move(stub)) {} // ----------------------------- Exporter methods ------------------------------ @@ -58,28 +66,58 @@ sdk::common::ExportResult OtlpGrpcExporter::Export( // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager // block to reduce memory fragments. arena_options.max_block_size = 65536; - google::protobuf::Arena arena{arena_options}; + std::unique_ptr arena{new google::protobuf::Arena{arena_options}}; proto::collector::trace::v1::ExportTraceServiceRequest *request = - google::protobuf::Arena::CreateMessage< - proto::collector::trace::v1::ExportTraceServiceRequest>(&arena); + google::protobuf::Arena::Create( + arena.get()); OtlpRecordableUtils::PopulateRequest(spans, request); auto context = OtlpGrpcClient::MakeClientContext(options_); proto::collector::trace::v1::ExportTraceServiceResponse *response = - google::protobuf::Arena::CreateMessage< - proto::collector::trace::v1::ExportTraceServiceResponse>(&arena); + google::protobuf::Arena::Create( + arena.get()); - grpc::Status status = - OtlpGrpcClient::DelegateExport(trace_service_stub_.get(), context.get(), *request, response); - - if (!status.ok()) +#ifdef ENABLE_ASYNC_EXPORT + if (options_.max_concurrent_requests > 1) { - OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE GRPC Exporter] Export() failed with status_code: \"" - << grpc_utils::grpc_status_code_to_string(status.error_code()) - << "\" error_message: \"" << status.error_message() << "\""); - return sdk::common::ExportResult::kFailure; + return client_->DelegateAsyncExport( + options_, trace_service_stub_.get(), std::move(context), std::move(arena), + std::move(*request), + [](opentelemetry::sdk::common::ExportResult result, + std::unique_ptr &&, + const proto::collector::trace::v1::ExportTraceServiceRequest &request, + proto::collector::trace::v1::ExportTraceServiceResponse *) { + if (result != opentelemetry::sdk::common::ExportResult::kSuccess) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + << request.resource_spans_size() + << " trace span(s) error: " << static_cast(result)); + } + else + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << request.resource_spans_size() + << " trace span(s) success"); + } + return true; + }); + } + else + { +#endif + grpc::Status status = + OtlpGrpcClient::DelegateExport(trace_service_stub_.get(), std::move(context), + std::move(arena), std::move(*request), response); + if (!status.ok()) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE GRPC Exporter] Export() failed with status_code: \"" + << grpc_utils::grpc_status_code_to_string(status.error_code()) + << "\" error_message: \"" << status.error_message() << "\""); + return sdk::common::ExportResult::kFailure; + } +#ifdef ENABLE_ASYNC_EXPORT } +#endif return sdk::common::ExportResult::kSuccess; } diff --git a/exporters/otlp/src/otlp_grpc_exporter_options.cc b/exporters/otlp/src/otlp_grpc_exporter_options.cc index 3099303ac4..c5dc94ec9d 100644 --- a/exporters/otlp/src/otlp_grpc_exporter_options.cc +++ b/exporters/otlp/src/otlp_grpc_exporter_options.cc @@ -29,6 +29,11 @@ OtlpGrpcExporterOptions::OtlpGrpcExporterOptions() timeout = GetOtlpDefaultTracesTimeout(); metadata = GetOtlpDefaultTracesHeaders(); user_agent = GetOtlpDefaultUserAgent(); + + max_threads = 0; +#ifdef ENABLE_ASYNC_EXPORT + max_concurrent_requests = 64; +#endif } OtlpGrpcExporterOptions::~OtlpGrpcExporterOptions() {} diff --git a/exporters/otlp/src/otlp_grpc_log_record_exporter.cc b/exporters/otlp/src/otlp_grpc_log_record_exporter.cc index b0da987c43..50fcd1d491 100644 --- a/exporters/otlp/src/otlp_grpc_log_record_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_log_record_exporter.cc @@ -35,12 +35,20 @@ OtlpGrpcLogRecordExporter::OtlpGrpcLogRecordExporter() OtlpGrpcLogRecordExporter::OtlpGrpcLogRecordExporter( const OtlpGrpcLogRecordExporterOptions &options) - : options_(options), log_service_stub_(OtlpGrpcClient::MakeLogsServiceStub(options)) + : options_(options), +#ifdef ENABLE_ASYNC_EXPORT + client_(std::make_shared()), +#endif + log_service_stub_(OtlpGrpcClient::MakeLogsServiceStub(options)) {} OtlpGrpcLogRecordExporter::OtlpGrpcLogRecordExporter( std::unique_ptr stub) - : options_(OtlpGrpcLogRecordExporterOptions()), log_service_stub_(std::move(stub)) + : options_(OtlpGrpcLogRecordExporterOptions()), +#ifdef ENABLE_ASYNC_EXPORT + client_(std::make_shared()), +#endif + log_service_stub_(std::move(stub)) {} // ----------------------------- Exporter methods ------------------------------ @@ -71,26 +79,58 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogRecordExporter::Export( // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager // block to reduce memory fragments. arena_options.max_block_size = 65536; - google::protobuf::Arena arena{arena_options}; + std::unique_ptr arena{new google::protobuf::Arena{arena_options}}; proto::collector::logs::v1::ExportLogsServiceRequest *request = - google::protobuf::Arena::CreateMessage( - &arena); + google::protobuf::Arena::Create( + arena.get()); OtlpRecordableUtils::PopulateRequest(logs, request); auto context = OtlpGrpcClient::MakeClientContext(options_); proto::collector::logs::v1::ExportLogsServiceResponse *response = - google::protobuf::Arena::CreateMessage( - &arena); + google::protobuf::Arena::Create( + arena.get()); - grpc::Status status = - OtlpGrpcClient::DelegateExport(log_service_stub_.get(), context.get(), *request, response); - - if (!status.ok()) +#ifdef ENABLE_ASYNC_EXPORT + if (options_.max_concurrent_requests > 1) { - OTEL_INTERNAL_LOG_ERROR("[OTLP LOG GRPC Exporter] Export() failed: " << status.error_message()); - return sdk::common::ExportResult::kFailure; + return client_->DelegateAsyncExport( + options_, log_service_stub_.get(), std::move(context), std::move(arena), + std::move(*request), + [](opentelemetry::sdk::common::ExportResult result, + std::unique_ptr &&, + const proto::collector::logs::v1::ExportLogsServiceRequest &request, + proto::collector::logs::v1::ExportLogsServiceResponse *) { + if (result != opentelemetry::sdk::common::ExportResult::kSuccess) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + << request.resource_logs_size() + << " log(s) error: " << static_cast(result)); + } + else + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << request.resource_logs_size() + << " log(s) success"); + } + return true; + }); + } + else + { +#endif + grpc::Status status = + OtlpGrpcClient::DelegateExport(log_service_stub_.get(), std::move(context), + std::move(arena), std::move(*request), response); + + if (!status.ok()) + { + OTEL_INTERNAL_LOG_ERROR( + "[OTLP LOG GRPC Exporter] Export() failed: " << status.error_message()); + return sdk::common::ExportResult::kFailure; + } +#ifdef ENABLE_ASYNC_EXPORT } +#endif return sdk::common::ExportResult::kSuccess; } diff --git a/exporters/otlp/src/otlp_grpc_log_record_exporter_options.cc b/exporters/otlp/src/otlp_grpc_log_record_exporter_options.cc index 697ef28f2a..5925808abe 100644 --- a/exporters/otlp/src/otlp_grpc_log_record_exporter_options.cc +++ b/exporters/otlp/src/otlp_grpc_log_record_exporter_options.cc @@ -27,6 +27,11 @@ OtlpGrpcLogRecordExporterOptions::OtlpGrpcLogRecordExporterOptions() timeout = GetOtlpDefaultLogsTimeout(); metadata = GetOtlpDefaultLogsHeaders(); user_agent = GetOtlpDefaultUserAgent(); + + max_threads = 0; +#ifdef ENABLE_ASYNC_EXPORT + max_concurrent_requests = 64; +#endif } OtlpGrpcLogRecordExporterOptions::~OtlpGrpcLogRecordExporterOptions() {} diff --git a/exporters/otlp/src/otlp_grpc_metric_exporter.cc b/exporters/otlp/src/otlp_grpc_metric_exporter.cc index 921eb4c481..8d87d20a38 100644 --- a/exporters/otlp/src/otlp_grpc_metric_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_metric_exporter.cc @@ -23,6 +23,9 @@ OtlpGrpcMetricExporter::OtlpGrpcMetricExporter() OtlpGrpcMetricExporter::OtlpGrpcMetricExporter(const OtlpGrpcMetricExporterOptions &options) : options_(options), +#ifdef ENABLE_ASYNC_EXPORT + client_(std::make_shared()), +#endif aggregation_temporality_selector_{ OtlpMetricUtils::ChooseTemporalitySelector(options_.aggregation_temporality)}, metrics_service_stub_(OtlpGrpcClient::MakeMetricsServiceStub(options)) @@ -31,6 +34,9 @@ OtlpGrpcMetricExporter::OtlpGrpcMetricExporter(const OtlpGrpcMetricExporterOptio OtlpGrpcMetricExporter::OtlpGrpcMetricExporter( std::unique_ptr stub) : options_(OtlpGrpcMetricExporterOptions()), +#ifdef ENABLE_ASYNC_EXPORT + client_(std::make_shared()), +#endif aggregation_temporality_selector_{ OtlpMetricUtils::ChooseTemporalitySelector(options_.aggregation_temporality)}, metrics_service_stub_(std::move(stub)) @@ -66,27 +72,58 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcMetricExporter::Export( // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager // block to reduce memory fragments. arena_options.max_block_size = 65536; - google::protobuf::Arena arena{arena_options}; + std::unique_ptr arena{new google::protobuf::Arena{arena_options}}; proto::collector::metrics::v1::ExportMetricsServiceRequest *request = - google::protobuf::Arena::CreateMessage< - proto::collector::metrics::v1::ExportMetricsServiceRequest>(&arena); + google::protobuf::Arena::Create( + arena.get()); OtlpMetricUtils::PopulateRequest(data, request); auto context = OtlpGrpcClient::MakeClientContext(options_); proto::collector::metrics::v1::ExportMetricsServiceResponse *response = - google::protobuf::Arena::CreateMessage< - proto::collector::metrics::v1::ExportMetricsServiceResponse>(&arena); + google::protobuf::Arena::Create( + arena.get()); - grpc::Status status = OtlpGrpcClient::DelegateExport(metrics_service_stub_.get(), context.get(), - *request, response); - - if (!status.ok()) +#ifdef ENABLE_ASYNC_EXPORT + if (options_.max_concurrent_requests > 1) { - OTEL_INTERNAL_LOG_ERROR( - "[OTLP METRIC GRPC Exporter] Export() failed: " << status.error_message()); - return sdk::common::ExportResult::kFailure; + return client_->DelegateAsyncExport( + options_, metrics_service_stub_.get(), std::move(context), std::move(arena), + std::move(*request), + [](opentelemetry::sdk::common::ExportResult result, + std::unique_ptr &&, + const proto::collector::metrics::v1::ExportMetricsServiceRequest &request, + proto::collector::metrics::v1::ExportMetricsServiceResponse *) { + if (result != opentelemetry::sdk::common::ExportResult::kSuccess) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + << request.resource_metrics_size() + << " metric(s) error: " << static_cast(result)); + } + else + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << request.resource_metrics_size() + << " metric(s) success"); + } + return true; + }); + } + else + { +#endif + grpc::Status status = + OtlpGrpcClient::DelegateExport(metrics_service_stub_.get(), std::move(context), + std::move(arena), std::move(*request), response); + + if (!status.ok()) + { + OTEL_INTERNAL_LOG_ERROR( + "[OTLP METRIC GRPC Exporter] Export() failed: " << status.error_message()); + return sdk::common::ExportResult::kFailure; + } +#ifdef ENABLE_ASYNC_EXPORT } +#endif return opentelemetry::sdk::common::ExportResult::kSuccess; } diff --git a/exporters/otlp/src/otlp_grpc_metric_exporter_options.cc b/exporters/otlp/src/otlp_grpc_metric_exporter_options.cc index cb99d1cc5a..ff8466b0b2 100644 --- a/exporters/otlp/src/otlp_grpc_metric_exporter_options.cc +++ b/exporters/otlp/src/otlp_grpc_metric_exporter_options.cc @@ -29,6 +29,11 @@ OtlpGrpcMetricExporterOptions::OtlpGrpcMetricExporterOptions() user_agent = GetOtlpDefaultUserAgent(); aggregation_temporality = PreferredAggregationTemporality::kCumulative; + + max_threads = 0; +#ifdef ENABLE_ASYNC_EXPORT + max_concurrent_requests = 64; +#endif } OtlpGrpcMetricExporterOptions::~OtlpGrpcMetricExporterOptions() {} diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index 977ea2a0cb..3ba0d19424 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -105,8 +105,8 @@ opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export( google::protobuf::Arena arena{arena_options}; proto::collector::trace::v1::ExportTraceServiceRequest *service_request = - google::protobuf::Arena::CreateMessage< - proto::collector::trace::v1::ExportTraceServiceRequest>(&arena); + google::protobuf::Arena::Create( + &arena); OtlpRecordableUtils::PopulateRequest(spans, service_request); std::size_t span_count = spans.size(); #ifdef ENABLE_ASYNC_EXPORT diff --git a/exporters/otlp/src/otlp_http_log_record_exporter.cc b/exporters/otlp/src/otlp_http_log_record_exporter.cc index db4b813c02..6e046f888c 100644 --- a/exporters/otlp/src/otlp_http_log_record_exporter.cc +++ b/exporters/otlp/src/otlp_http_log_record_exporter.cc @@ -109,8 +109,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpLogRecordExporter::Export( google::protobuf::Arena arena{arena_options}; proto::collector::logs::v1::ExportLogsServiceRequest *service_request = - google::protobuf::Arena::CreateMessage( - &arena); + google::protobuf::Arena::Create(&arena); OtlpRecordableUtils::PopulateRequest(logs, service_request); std::size_t log_count = logs.size(); #ifdef ENABLE_ASYNC_EXPORT diff --git a/exporters/otlp/src/otlp_http_metric_exporter.cc b/exporters/otlp/src/otlp_http_metric_exporter.cc index 52c25b3a34..a4a3e3b586 100644 --- a/exporters/otlp/src/otlp_http_metric_exporter.cc +++ b/exporters/otlp/src/otlp_http_metric_exporter.cc @@ -112,8 +112,8 @@ opentelemetry::sdk::common::ExportResult OtlpHttpMetricExporter::Export( google::protobuf::Arena arena{arena_options}; proto::collector::metrics::v1::ExportMetricsServiceRequest *service_request = - google::protobuf::Arena::CreateMessage< - proto::collector::metrics::v1::ExportMetricsServiceRequest>(&arena); + google::protobuf::Arena::Create( + &arena); OtlpMetricUtils::PopulateRequest(data, service_request); std::size_t metric_count = data.scope_metric_data_.size(); #ifdef ENABLE_ASYNC_EXPORT From 74175a38cb3f3b8dc16a88ab5100bdb7e2c4ca48 Mon Sep 17 00:00:00 2001 From: owent Date: Wed, 15 Nov 2023 21:23:57 +0800 Subject: [PATCH 03/18] Fix OTLP HTTP exporter without async exporting. --- exporters/otlp/src/otlp_http_exporter.cc | 2 +- exporters/otlp/src/otlp_http_log_record_exporter.cc | 2 +- exporters/otlp/src/otlp_http_metric_exporter.cc | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index 3ba0d19424..5b2c664334 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -127,7 +127,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export( }); return opentelemetry::sdk::common::ExportResult::kSuccess; #else - opentelemetry::sdk::common::ExportResult result = http_client_->Export(service_request); + opentelemetry::sdk::common::ExportResult result = http_client_->Export(*service_request); if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " diff --git a/exporters/otlp/src/otlp_http_log_record_exporter.cc b/exporters/otlp/src/otlp_http_log_record_exporter.cc index 6e046f888c..de1567a955 100644 --- a/exporters/otlp/src/otlp_http_log_record_exporter.cc +++ b/exporters/otlp/src/otlp_http_log_record_exporter.cc @@ -128,7 +128,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpLogRecordExporter::Export( }); return opentelemetry::sdk::common::ExportResult::kSuccess; #else - opentelemetry::sdk::common::ExportResult result = http_client_->Export(service_request); + opentelemetry::sdk::common::ExportResult result = http_client_->Export(*service_request); if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " diff --git a/exporters/otlp/src/otlp_http_metric_exporter.cc b/exporters/otlp/src/otlp_http_metric_exporter.cc index a4a3e3b586..40ed8272f3 100644 --- a/exporters/otlp/src/otlp_http_metric_exporter.cc +++ b/exporters/otlp/src/otlp_http_metric_exporter.cc @@ -132,7 +132,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpMetricExporter::Export( }); return opentelemetry::sdk::common::ExportResult::kSuccess; #else - opentelemetry::sdk::common::ExportResult result = http_client_->Export(service_request); + opentelemetry::sdk::common::ExportResult result = http_client_->Export(*service_request); if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " From 3cc277686099099d06537f2a67d68967fbd0109a Mon Sep 17 00:00:00 2001 From: owent Date: Wed, 15 Nov 2023 21:25:32 +0800 Subject: [PATCH 04/18] Fix condition_variable including --- exporters/otlp/src/otlp_grpc_client.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/exporters/otlp/src/otlp_grpc_client.cc b/exporters/otlp/src/otlp_grpc_client.cc index 7a6cf4ccc1..84f402116a 100644 --- a/exporters/otlp/src/otlp_grpc_client.cc +++ b/exporters/otlp/src/otlp_grpc_client.cc @@ -11,6 +11,7 @@ #include #include +#include #include #include #include From aeb7b55e761fa5f384aa37eaae181c1eb8b14132 Mon Sep 17 00:00:00 2001 From: owent Date: Thu, 16 Nov 2023 10:23:39 +0800 Subject: [PATCH 05/18] Fix compile problem without `ENABLE_ASYNC_EXPORT` --- exporters/otlp/src/otlp_grpc_client.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/exporters/otlp/src/otlp_grpc_client.cc b/exporters/otlp/src/otlp_grpc_client.cc index 84f402116a..1c06875a9a 100644 --- a/exporters/otlp/src/otlp_grpc_client.cc +++ b/exporters/otlp/src/otlp_grpc_client.cc @@ -125,6 +125,7 @@ static std::string GetFileContentsOrInMemoryContents(const std::string &file_pat return contents; } +#ifdef ENABLE_ASYNC_EXPORT static void MaybeSpawnBackgroundThread(std::shared_ptr async_data) { std::lock_guard lock_guard{async_data->background_thread_m}; @@ -304,7 +305,7 @@ static sdk::common::ExportResult InternalDelegateAsyncExport( return opentelemetry::sdk::common::ExportResult::kSuccess; } - +#endif } // namespace #ifdef ENABLE_ASYNC_EXPORT @@ -622,6 +623,7 @@ bool OtlpGrpcClient::Shutdown(std::chrono::microseconds timeout) noexcept { return true; } + is_shutdown_ = true; async_data_->cq.Shutdown(); return ForceFlush(timeout); From 25419f6f9e76d04a7955425cd86491829cab9bb6 Mon Sep 17 00:00:00 2001 From: owent Date: Thu, 16 Nov 2023 15:05:35 +0800 Subject: [PATCH 06/18] Use callback API for OTLP gRPC exporter. --- CHANGELOG.md | 2 + exporters/otlp/src/otlp_grpc_client.cc | 146 +++++------------- exporters/otlp/src/otlp_grpc_exporter.cc | 14 +- .../otlp/src/otlp_grpc_log_record_exporter.cc | 14 +- .../otlp/src/otlp_grpc_metric_exporter.cc | 13 +- 5 files changed, 69 insertions(+), 120 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f89ef1505..fbf4fe1342 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,8 @@ Increment the: [#2398](https://github.com/open-telemetry/opentelemetry-cpp/pull/2398) * [EXPORTER] Rework OTLP/HTTP and OTLP/GRPC exporter options [#2388](https://github.com/open-telemetry/opentelemetry-cpp/pull/2388) +* [EXPORTER] Add async exporting for OTLP/GRPC exporter + [#2407](https://github.com/open-telemetry/opentelemetry-cpp/pull/2407) Important changes: diff --git a/exporters/otlp/src/otlp_grpc_client.cc b/exporters/otlp/src/otlp_grpc_client.cc index 1c06875a9a..fb48c77364 100644 --- a/exporters/otlp/src/otlp_grpc_client.cc +++ b/exporters/otlp/src/otlp_grpc_client.cc @@ -59,7 +59,6 @@ struct OPENTELEMETRY_LOCAL_SYMBOL OtlpGrpcAsyncCallData : public OtlpGrpcAsyncCa RequestType *request = nullptr; ResponseType *response = nullptr; - std::unique_ptr> response_reader; std::function &&, @@ -92,9 +91,6 @@ struct OtlpGrpcClientAsyncData std::mutex session_waker_lock; std::condition_variable session_waker; - std::mutex background_thread_m; - std::unique_ptr background_thread; - // Do not use OtlpGrpcClientAsyncData() = default; here, some versions of GCC&Clang have BUGs // and may not initialize the member correctly. See also // https://stackoverflow.com/questions/53408962/try-to-understand-compiler-error-message-default-member-initializer-required-be @@ -126,77 +122,6 @@ static std::string GetFileContentsOrInMemoryContents(const std::string &file_pat } #ifdef ENABLE_ASYNC_EXPORT -static void MaybeSpawnBackgroundThread(std::shared_ptr async_data) -{ - std::lock_guard lock_guard{async_data->background_thread_m}; - if (async_data->background_thread) - { - return; - } - - async_data->background_thread.reset(new std::thread([async_data]() { - bool running = true; - while (running) - { - void *tag = nullptr; - bool ok = false; - // If there is no job in exporting_timeout+5 minutes, we can exit this thread and start - // another thread later when new datas arrived. - auto got_status = async_data->cq.AsyncNext( - &tag, &ok, - std::chrono::system_clock::now() + std::chrono::minutes{5} + async_data->export_timeout); - if (grpc::CompletionQueue::SHUTDOWN == got_status) - { - std::lock_guard internal_lock_guard{async_data->background_thread_m}; - if (async_data->background_thread) - { - async_data->background_thread->detach(); - async_data->background_thread.reset(); - } - break; - } - if (grpc::CompletionQueue::TIMEOUT == got_status) - { - std::lock_guard internal_lock_guard{async_data->background_thread_m}; - running = async_data->running_requests.load(std::memory_order_acquire) > 0; - - if (!running) - { - if (async_data->background_thread) - { - async_data->background_thread->detach(); - async_data->background_thread.reset(); - } - break; - } - } - - if (nullptr == tag) - { - continue; - } - - auto callback_data = reinterpret_cast(tag); - --async_data->running_requests; - ++async_data->finished_request_counter; - - if (callback_data->grpc_status.ok()) - { - callback_data->export_result = opentelemetry::sdk::common::ExportResult::kSuccess; - } - - if (callback_data->grpc_async_callback) - { - callback_data->grpc_async_callback(callback_data); - delete callback_data; - } - - // Maybe wake up blocking DelegateAsyncExport() call - async_data->session_waker.notify_all(); - } - })); -} - template static sdk::common::ExportResult InternalDelegateAsyncExport( std::shared_ptr async_data, @@ -225,8 +150,8 @@ static sdk::common::ExportResult InternalDelegateAsyncExport( return opentelemetry::sdk::common::ExportResult::kFailureFull; } - OtlpGrpcAsyncCallData *call_data = - new OtlpGrpcAsyncCallData(); + std::shared_ptr> call_data = + std::make_shared>(); call_data->arena.swap(arena); call_data->result_callback.swap(result_callback); @@ -247,7 +172,6 @@ static sdk::common::ExportResult InternalDelegateAsyncExport( nullptr == call_data->request ? request : *call_data->request, call_data->response); } - delete call_data; return opentelemetry::sdk::common::ExportResult::kFailure; } call_data->grpc_context.swap(context); @@ -265,34 +189,32 @@ static sdk::common::ExportResult InternalDelegateAsyncExport( return true; }; - call_data->response_reader = - stub->AsyncExport(call_data->grpc_context.get(), *call_data->request, &async_data->cq); - if (!call_data->response_reader) - { - OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " - << export_data_count << " " << export_data_name - << " failed, call AsyncExport failed"); - - if (call_data->result_callback) - { - call_data->result_callback( - opentelemetry::sdk::common::ExportResult::kFailure, std::move(call_data->arena), - nullptr == call_data->request ? request : *call_data->request, call_data->response); - } - delete call_data; - return opentelemetry::sdk::common::ExportResult::kFailure; - } - - call_data->response_reader->Finish(call_data->response, &call_data->grpc_status, - reinterpret_cast(call_data)); - - { - ++async_data->start_request_counter; - ++async_data->running_requests; - } - - // Maybe spawn background thread to handle the completion queue - MaybeSpawnBackgroundThread(async_data); + ++async_data->start_request_counter; + ++async_data->running_requests; +# if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 + stub->async() +# else + stub->experimental_async() +# endif + ->Export(call_data->grpc_context.get(), call_data->request, call_data->response, + [call_data, async_data](::grpc::Status grpc_status) { + --async_data->running_requests; + ++async_data->finished_request_counter; + + call_data->grpc_status = grpc_status; + if (call_data->grpc_status.ok()) + { + call_data->export_result = opentelemetry::sdk::common::ExportResult::kSuccess; + } + + if (call_data->grpc_async_callback) + { + call_data->grpc_async_callback(call_data.get()); + } + + // Maybe wake up blocking DelegateAsyncExport() call + async_data->session_waker.notify_all(); + }); // Can not cancle when start the request { @@ -599,7 +521,8 @@ bool OtlpGrpcClient::ForceFlush(std::chrono::microseconds timeout) noexcept timeout_steady = std::chrono::steady_clock::duration::max(); } - while (timeout_steady > std::chrono::steady_clock::duration::zero()) + while (timeout_steady > std::chrono::steady_clock::duration::zero() && + request_counter > async_data_->finished_request_counter.load(std::memory_order_acquire)) { // When changes of running_sessions_ and notify_one/notify_all happen between predicate // checking and waiting, we should not wait forever.We should cleanup gc sessions here as soon @@ -623,9 +546,14 @@ bool OtlpGrpcClient::Shutdown(std::chrono::microseconds timeout) noexcept { return true; } - is_shutdown_ = true; - async_data_->cq.Shutdown(); + if (!is_shutdown_) + { + is_shutdown_ = true; + + async_data_->cq.Shutdown(); + } + return ForceFlush(timeout); } diff --git a/exporters/otlp/src/otlp_grpc_exporter.cc b/exporters/otlp/src/otlp_grpc_exporter.cc index fcf4b884fd..c3602f857d 100644 --- a/exporters/otlp/src/otlp_grpc_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_exporter.cc @@ -121,18 +121,24 @@ sdk::common::ExportResult OtlpGrpcExporter::Export( return sdk::common::ExportResult::kSuccess; } -bool OtlpGrpcExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept +bool OtlpGrpcExporter::ForceFlush(std::chrono::microseconds timeout) noexcept { - // TODO: When we implement async exporting in OTLP gRPC exporter in the future, we need wait the - // running exporting finished here. +#ifdef ENABLE_ASYNC_EXPORT + return client_->ForceFlush(timeout); +#else return true; +#endif } -bool OtlpGrpcExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept +bool OtlpGrpcExporter::Shutdown(std::chrono::microseconds timeout) noexcept { const std::lock_guard locked(lock_); is_shutdown_ = true; +#ifdef ENABLE_ASYNC_EXPORT + return client_->Shutdown(timeout); +#else return true; +#endif } bool OtlpGrpcExporter::isShutdown() const noexcept diff --git a/exporters/otlp/src/otlp_grpc_log_record_exporter.cc b/exporters/otlp/src/otlp_grpc_log_record_exporter.cc index 50fcd1d491..8793328844 100644 --- a/exporters/otlp/src/otlp_grpc_log_record_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_log_record_exporter.cc @@ -134,18 +134,24 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogRecordExporter::Export( return sdk::common::ExportResult::kSuccess; } -bool OtlpGrpcLogRecordExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept +bool OtlpGrpcLogRecordExporter::Shutdown(std::chrono::microseconds timeout) noexcept { const std::lock_guard locked(lock_); is_shutdown_ = true; +#ifdef ENABLE_ASYNC_EXPORT + return client_->Shutdown(timeout); +#else return true; +#endif } -bool OtlpGrpcLogRecordExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept +bool OtlpGrpcLogRecordExporter::ForceFlush(std::chrono::microseconds timeout) noexcept { - // TODO: When we implement async exporting in OTLP gRPC exporter in the future, we need wait the - // running exporting finished here. +#ifdef ENABLE_ASYNC_EXPORT + return client_->ForceFlush(timeout); +#else return true; +#endif } bool OtlpGrpcLogRecordExporter::isShutdown() const noexcept diff --git a/exporters/otlp/src/otlp_grpc_metric_exporter.cc b/exporters/otlp/src/otlp_grpc_metric_exporter.cc index 8d87d20a38..de8f5cf8f1 100644 --- a/exporters/otlp/src/otlp_grpc_metric_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_metric_exporter.cc @@ -127,17 +127,24 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcMetricExporter::Export( return opentelemetry::sdk::common::ExportResult::kSuccess; } -bool OtlpGrpcMetricExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept +bool OtlpGrpcMetricExporter::ForceFlush(std::chrono::microseconds timeout) noexcept { - // TODO: OTLP gRPC exporter does not support concurrency exporting now. +#ifdef ENABLE_ASYNC_EXPORT + return client_->ForceFlush(timeout); +#else return true; +#endif } -bool OtlpGrpcMetricExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept +bool OtlpGrpcMetricExporter::Shutdown(std::chrono::microseconds timeout) noexcept { const std::lock_guard locked(lock_); is_shutdown_ = true; +#ifdef ENABLE_ASYNC_EXPORT + return client_->Shutdown(timeout); +#else return true; +#endif } bool OtlpGrpcMetricExporter::isShutdown() const noexcept From e7f119a37b19fe80ff7bf32a46834ee32cbf6582 Mon Sep 17 00:00:00 2001 From: owent Date: Thu, 16 Nov 2023 19:14:20 +0800 Subject: [PATCH 07/18] Fix unit test for OTLP gRPC async exporting. --- exporters/otlp/src/otlp_grpc_client.cc | 2 +- .../otlp/test/otlp_grpc_exporter_test.cc | 81 +++++++++- .../otlp_grpc_log_record_exporter_test.cc | 153 +++++++++++++++++- .../test/otlp_grpc_metric_exporter_test.cc | 1 + 4 files changed, 225 insertions(+), 12 deletions(-) diff --git a/exporters/otlp/src/otlp_grpc_client.cc b/exporters/otlp/src/otlp_grpc_client.cc index fb48c77364..27fb170a0d 100644 --- a/exporters/otlp/src/otlp_grpc_client.cc +++ b/exporters/otlp/src/otlp_grpc_client.cc @@ -238,7 +238,7 @@ OtlpGrpcClient::~OtlpGrpcClient() std::shared_ptr async_data; async_data.swap(async_data_); - while (async_data->running_requests.load(std::memory_order_acquire) > 0) + while (async_data && async_data->running_requests.load(std::memory_order_acquire) > 0) { std::unique_lock lock{async_data->session_waker_lock}; async_data->session_waker.wait_for(lock, async_data->export_timeout, [async_data]() { diff --git a/exporters/otlp/test/otlp_grpc_exporter_test.cc b/exporters/otlp/test/otlp_grpc_exporter_test.cc index 3be2bcc653..a525175078 100644 --- a/exporters/otlp/test/otlp_grpc_exporter_test.cc +++ b/exporters/otlp/test/otlp_grpc_exporter_test.cc @@ -29,6 +29,7 @@ # include "opentelemetry/sdk/trace/tracer_provider.h" # include "opentelemetry/trace/provider.h" +# include # include # if defined(_MSC_VER) @@ -45,6 +46,73 @@ namespace exporter namespace otlp { +namespace +{ +class OtlpMockTraceServiceStub : public proto::collector::trace::v1::MockTraceServiceStub +{ +public: +# if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 + using async_interface_base = + proto::collector::trace::v1::TraceService::StubInterface::async_interface; +# else + using async_interface_base = + proto::collector::trace::v1::TraceService::StubInterface::experimental_async_interface; +# endif + + OtlpMockTraceServiceStub() : async_interface_(this) {} + + class async_interface : public async_interface_base + { + public: + async_interface(OtlpMockTraceServiceStub *owner) : stub_(owner) {} + + virtual ~async_interface() {} + + void Export( + ::grpc::ClientContext *context, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest *request, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse *response, + std::function callback) override + { + stub_->last_async_status_ = stub_->Export(context, *request, response); + callback(stub_->last_async_status_); + } + +# if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \ + defined(GRPC_CALLBACK_API_NONEXPERIMENTAL) + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest * /*request*/, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse * /*response*/, + ::grpc::ClientUnaryReactor * /*reactor*/) override + {} +# else + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest * /*request*/, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse * /*response*/, + ::grpc::experimental::ClientUnaryReactor * /*reactor*/) + {} +# endif + + private: + OtlpMockTraceServiceStub *stub_; + }; + +# if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 + async_interface_base *async() override { return &async_interface_; } +# else + async_interface_base *experimental_async() override { return &async_interface_; } +# endif + + ::grpc::Status GetLastAsyncStatus() const noexcept { return last_async_status_; } + +private: + ::grpc::Status last_async_status_; + async_interface async_interface_; +}; +} // namespace + class OtlpGrpcExporterTestPeer : public ::testing::Test { public: @@ -64,7 +132,7 @@ class OtlpGrpcExporterTestPeer : public ::testing::Test TEST_F(OtlpGrpcExporterTestPeer, ShutdownTest) { - auto mock_stub = new proto::collector::trace::v1::MockTraceServiceStub(); + auto mock_stub = new OtlpMockTraceServiceStub(); std::unique_ptr stub_interface( mock_stub); auto exporter = GetExporter(stub_interface); @@ -78,6 +146,7 @@ TEST_F(OtlpGrpcExporterTestPeer, ShutdownTest) nostd::span> batch_1(&recordable_1, 1); EXPECT_CALL(*mock_stub, Export(_, _, _)).Times(Exactly(1)).WillOnce(Return(grpc::Status::OK)); auto result = exporter->Export(batch_1); + exporter->ForceFlush(); EXPECT_EQ(sdk::common::ExportResult::kSuccess, result); exporter->Shutdown(); @@ -90,7 +159,7 @@ TEST_F(OtlpGrpcExporterTestPeer, ShutdownTest) // Call Export() directly TEST_F(OtlpGrpcExporterTestPeer, ExportUnitTest) { - auto mock_stub = new proto::collector::trace::v1::MockTraceServiceStub(); + auto mock_stub = new OtlpMockTraceServiceStub(); std::unique_ptr stub_interface( mock_stub); auto exporter = GetExporter(stub_interface); @@ -112,13 +181,19 @@ TEST_F(OtlpGrpcExporterTestPeer, ExportUnitTest) .Times(Exactly(1)) .WillOnce(Return(grpc::Status::CANCELLED)); result = exporter->Export(batch_2); + exporter->ForceFlush(); +# if defined(ENABLE_ASYNC_EXPORT) + EXPECT_EQ(sdk::common::ExportResult::kSuccess, result); + EXPECT_FALSE(mock_stub->GetLastAsyncStatus().ok()); +# else EXPECT_EQ(sdk::common::ExportResult::kFailure, result); +# endif } // Create spans, let processor call Export() TEST_F(OtlpGrpcExporterTestPeer, ExportIntegrationTest) { - auto mock_stub = new proto::collector::trace::v1::MockTraceServiceStub(); + auto mock_stub = new OtlpMockTraceServiceStub(); std::unique_ptr stub_interface( mock_stub); diff --git a/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc b/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc index 6a31639219..143be576e6 100644 --- a/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc +++ b/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc @@ -25,6 +25,7 @@ #include "opentelemetry/sdk/trace/tracer_provider_factory.h" #include "opentelemetry/trace/provider.h" +#include #include #if defined(_MSC_VER) @@ -41,6 +42,137 @@ namespace exporter namespace otlp { +namespace +{ +class OtlpMockTraceServiceStub : public proto::collector::trace::v1::MockTraceServiceStub +{ +public: +#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 + using async_interface_base = + proto::collector::trace::v1::TraceService::StubInterface::async_interface; +#else + using async_interface_base = + proto::collector::trace::v1::TraceService::StubInterface::experimental_async_interface; +#endif + + OtlpMockTraceServiceStub() : async_interface_(this) {} + + class async_interface : public async_interface_base + { + public: + async_interface(OtlpMockTraceServiceStub *owner) : stub_(owner) {} + + virtual ~async_interface() {} + + void Export( + ::grpc::ClientContext *context, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest *request, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse *response, + std::function callback) override + { + stub_->last_async_status_ = stub_->Export(context, *request, response); + callback(stub_->last_async_status_); + } + +#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \ + defined(GRPC_CALLBACK_API_NONEXPERIMENTAL) + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest * /*request*/, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse * /*response*/, + ::grpc::ClientUnaryReactor * /*reactor*/) override + {} +#else + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest * /*request*/, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse * /*response*/, + ::grpc::experimental::ClientUnaryReactor * /*reactor*/) + {} +#endif + + private: + OtlpMockTraceServiceStub *stub_; + }; + +#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 + async_interface_base *async() override { return &async_interface_; } +#else + async_interface_base *experimental_async() override { return &async_interface_; } +#endif + + ::grpc::Status GetLastAsyncStatus() const noexcept { return last_async_status_; } + +private: + ::grpc::Status last_async_status_; + async_interface async_interface_; +}; + +class OtlpMockLogsServiceStub : public proto::collector::logs::v1::MockLogsServiceStub +{ +public: +#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 + using async_interface_base = + proto::collector::logs::v1::LogsService::StubInterface::async_interface; +#else + using async_interface_base = + proto::collector::logs::v1::LogsService::StubInterface::experimental_async_interface; +#endif + + OtlpMockLogsServiceStub() : async_interface_(this) {} + + class async_interface : public async_interface_base + { + public: + async_interface(OtlpMockLogsServiceStub *owner) : stub_(owner) {} + + virtual ~async_interface() {} + + void Export( + ::grpc::ClientContext *context, + const ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest *request, + ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse *response, + std::function callback) override + { + stub_->last_async_status_ = stub_->Export(context, *request, response); + callback(stub_->last_async_status_); + } + +#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \ + defined(GRPC_CALLBACK_API_NONEXPERIMENTAL) + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest * /*request*/, + ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse * /*response*/, + ::grpc::ClientUnaryReactor * /*reactor*/) override + {} +#else + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest * /*request*/, + ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse * /*response*/, + ::grpc::experimental::ClientUnaryReactor * /*reactor*/) + {} +#endif + + private: + OtlpMockLogsServiceStub *stub_; + }; + +#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 + async_interface_base *async() override { return &async_interface_; } +#else + async_interface_base *experimental_async() override { return &async_interface_; } +#endif + + ::grpc::Status GetLastAsyncStatus() const noexcept { return last_async_status_; } + +private: + ::grpc::Status last_async_status_; + async_interface async_interface_; +}; +} // namespace + class OtlpGrpcLogRecordExporterTestPeer : public ::testing::Test { public: @@ -68,7 +200,7 @@ class OtlpGrpcLogRecordExporterTestPeer : public ::testing::Test TEST_F(OtlpGrpcLogRecordExporterTestPeer, ShutdownTest) { - auto mock_stub = new proto::collector::logs::v1::MockLogsServiceStub(); + auto mock_stub = new OtlpMockLogsServiceStub(); std::unique_ptr stub_interface(mock_stub); auto exporter = GetExporter(stub_interface); @@ -77,12 +209,10 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ShutdownTest) // exporter shuold not be shutdown by default nostd::span> batch_1(&recordable_1, 1); - EXPECT_CALL(*mock_stub, Export(_, _, _)) - .Times(Exactly(1)) - .WillOnce(Return(grpc::Status::OK)) - .WillOnce(Return(grpc::Status::CANCELLED)); + EXPECT_CALL(*mock_stub, Export(_, _, _)).Times(Exactly(1)).WillOnce(Return(grpc::Status::OK)); auto result = exporter->Export(batch_1); + exporter->ForceFlush(); EXPECT_EQ(sdk::common::ExportResult::kSuccess, result); exporter->Shutdown(); @@ -95,7 +225,7 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ShutdownTest) // Call Export() directly TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportUnitTest) { - auto mock_stub = new proto::collector::logs::v1::MockLogsServiceStub(); + auto mock_stub = new OtlpMockLogsServiceStub(); std::unique_ptr stub_interface(mock_stub); auto exporter = GetExporter(stub_interface); @@ -106,6 +236,7 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportUnitTest) nostd::span> batch_1(&recordable_1, 1); EXPECT_CALL(*mock_stub, Export(_, _, _)).Times(Exactly(1)).WillOnce(Return(grpc::Status::OK)); auto result = exporter->Export(batch_1); + exporter->ForceFlush(); EXPECT_EQ(sdk::common::ExportResult::kSuccess, result); // Test failed RPC @@ -114,13 +245,19 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportUnitTest) .Times(Exactly(1)) .WillOnce(Return(grpc::Status::CANCELLED)); result = exporter->Export(batch_2); + exporter->ForceFlush(); +#if defined(ENABLE_ASYNC_EXPORT) + EXPECT_EQ(sdk::common::ExportResult::kSuccess, result); + EXPECT_FALSE(mock_stub->GetLastAsyncStatus().ok()); +#else EXPECT_EQ(sdk::common::ExportResult::kFailure, result); +#endif } // Create spans, let processor call Export() TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportIntegrationTest) { - auto mock_stub = new proto::collector::logs::v1::MockLogsServiceStub(); + auto mock_stub = new OtlpMockLogsServiceStub(); std::unique_ptr stub_interface(mock_stub); auto exporter = GetExporter(stub_interface); @@ -149,7 +286,7 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportIntegrationTest) '3', '2', '1', '0'}; opentelemetry::trace::SpanId span_id{span_id_bin}; - auto trace_mock_stub = new proto::collector::trace::v1::MockTraceServiceStub(); + auto trace_mock_stub = new OtlpMockTraceServiceStub(); std::unique_ptr trace_stub_interface( trace_mock_stub); diff --git a/exporters/otlp/test/otlp_grpc_metric_exporter_test.cc b/exporters/otlp/test/otlp_grpc_metric_exporter_test.cc index dc6dde9d79..40587e5a52 100644 --- a/exporters/otlp/test/otlp_grpc_metric_exporter_test.cc +++ b/exporters/otlp/test/otlp_grpc_metric_exporter_test.cc @@ -29,6 +29,7 @@ # include "opentelemetry/sdk/trace/tracer_provider.h" # include "opentelemetry/trace/provider.h" +# include # include # if defined(_MSC_VER) From 4301f23348604456639c189594653645d29a4f92 Mon Sep 17 00:00:00 2001 From: owent Date: Thu, 16 Nov 2023 19:30:17 +0800 Subject: [PATCH 08/18] Fix grpcpp header --- exporters/otlp/test/otlp_grpc_exporter_test.cc | 2 +- exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc | 2 +- exporters/otlp/test/otlp_grpc_metric_exporter_test.cc | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/exporters/otlp/test/otlp_grpc_exporter_test.cc b/exporters/otlp/test/otlp_grpc_exporter_test.cc index a525175078..017108f182 100644 --- a/exporters/otlp/test/otlp_grpc_exporter_test.cc +++ b/exporters/otlp/test/otlp_grpc_exporter_test.cc @@ -29,7 +29,7 @@ # include "opentelemetry/sdk/trace/tracer_provider.h" # include "opentelemetry/trace/provider.h" -# include +# include # include # if defined(_MSC_VER) diff --git a/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc b/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc index 143be576e6..549168b937 100644 --- a/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc +++ b/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc @@ -25,7 +25,7 @@ #include "opentelemetry/sdk/trace/tracer_provider_factory.h" #include "opentelemetry/trace/provider.h" -#include +#include #include #if defined(_MSC_VER) diff --git a/exporters/otlp/test/otlp_grpc_metric_exporter_test.cc b/exporters/otlp/test/otlp_grpc_metric_exporter_test.cc index 40587e5a52..6689233a9d 100644 --- a/exporters/otlp/test/otlp_grpc_metric_exporter_test.cc +++ b/exporters/otlp/test/otlp_grpc_metric_exporter_test.cc @@ -29,7 +29,7 @@ # include "opentelemetry/sdk/trace/tracer_provider.h" # include "opentelemetry/trace/provider.h" -# include +# include # include # if defined(_MSC_VER) From 1862010854dad4aa7593f886fb4369f7b2af8eef Mon Sep 17 00:00:00 2001 From: owent Date: Thu, 16 Nov 2023 21:00:40 +0800 Subject: [PATCH 09/18] There is no compatible way to detect gRPC version. --- exporters/otlp/test/otlp_grpc_exporter_test.cc | 7 ++----- .../test/otlp_grpc_log_record_exporter_test.cc | 14 ++++---------- 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/exporters/otlp/test/otlp_grpc_exporter_test.cc b/exporters/otlp/test/otlp_grpc_exporter_test.cc index 017108f182..bced58b833 100644 --- a/exporters/otlp/test/otlp_grpc_exporter_test.cc +++ b/exporters/otlp/test/otlp_grpc_exporter_test.cc @@ -99,11 +99,8 @@ class OtlpMockTraceServiceStub : public proto::collector::trace::v1::MockTraceSe OtlpMockTraceServiceStub *stub_; }; -# if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 - async_interface_base *async() override { return &async_interface_; } -# else - async_interface_base *experimental_async() override { return &async_interface_; } -# endif + async_interface_base *async() { return &async_interface_; } + async_interface_base *experimental_async() { return &async_interface_; } ::grpc::Status GetLastAsyncStatus() const noexcept { return last_async_status_; } diff --git a/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc b/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc index 549168b937..c47afb646e 100644 --- a/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc +++ b/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc @@ -95,11 +95,8 @@ class OtlpMockTraceServiceStub : public proto::collector::trace::v1::MockTraceSe OtlpMockTraceServiceStub *stub_; }; -#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 - async_interface_base *async() override { return &async_interface_; } -#else - async_interface_base *experimental_async() override { return &async_interface_; } -#endif + async_interface_base *async() { return &async_interface_; } + async_interface_base *experimental_async() { return &async_interface_; } ::grpc::Status GetLastAsyncStatus() const noexcept { return last_async_status_; } @@ -159,11 +156,8 @@ class OtlpMockLogsServiceStub : public proto::collector::logs::v1::MockLogsServi OtlpMockLogsServiceStub *stub_; }; -#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 - async_interface_base *async() override { return &async_interface_; } -#else - async_interface_base *experimental_async() override { return &async_interface_; } -#endif + async_interface_base *async() { return &async_interface_; } + async_interface_base *experimental_async() { return &async_interface_; } ::grpc::Status GetLastAsyncStatus() const noexcept { return last_async_status_; } From 5fef32a8c627d06b3fd9c4ce6e1f4eaf7aba659b Mon Sep 17 00:00:00 2001 From: owent Date: Thu, 16 Nov 2023 21:35:11 +0800 Subject: [PATCH 10/18] Fix style and benchmark --- api/include/opentelemetry/common/macros.h | 4 +- exporters/otlp/src/otlp_grpc_client.cc | 4 +- .../otlp/test/otlp_grpc_exporter_benchmark.cc | 74 +++++++++++++++++-- .../otlp/test/otlp_grpc_exporter_test.cc | 6 +- .../otlp_grpc_log_record_exporter_test.cc | 12 ++- 5 files changed, 84 insertions(+), 16 deletions(-) diff --git a/api/include/opentelemetry/common/macros.h b/api/include/opentelemetry/common/macros.h index c3b9fb82c0..88ec2f0182 100644 --- a/api/include/opentelemetry/common/macros.h +++ b/api/include/opentelemetry/common/macros.h @@ -7,7 +7,9 @@ // GCC 9 has likely attribute but do not support declare it at the beginning of statement # if defined(__has_cpp_attribute) && (defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 9) # if __has_cpp_attribute(likely) -# define OPENTELEMETRY_LIKELY_IF(...) if (__VA_ARGS__) [[likely]] +# define OPENTELEMETRY_LIKELY_IF(...) \ + if (__VA_ARGS__) \ + [[likely]] # endif # endif diff --git a/exporters/otlp/src/otlp_grpc_client.cc b/exporters/otlp/src/otlp_grpc_client.cc index 27fb170a0d..1c589b963c 100644 --- a/exporters/otlp/src/otlp_grpc_client.cc +++ b/exporters/otlp/src/otlp_grpc_client.cc @@ -191,7 +191,8 @@ static sdk::common::ExportResult InternalDelegateAsyncExport( ++async_data->start_request_counter; ++async_data->running_requests; -# if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 +# if defined(GRPC_CPP_VERSION_MAJOR) && \ + (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 stub->async() # else stub->experimental_async() @@ -289,6 +290,7 @@ std::shared_ptr OtlpGrpcClient::MakeChannel(const OtlpGrpcClientO options.ssl_client_key_string); ssl_opts.pem_cert_chain = GetFileContentsOrInMemoryContents(options.ssl_client_cert_path, options.ssl_client_cert_string); + #endif channel = grpc::CreateCustomChannel(grpc_target, grpc::SslCredentials(ssl_opts), grpc_arguments); diff --git a/exporters/otlp/test/otlp_grpc_exporter_benchmark.cc b/exporters/otlp/test/otlp_grpc_exporter_benchmark.cc index b4a23c04a5..e791dd4459 100644 --- a/exporters/otlp/test/otlp_grpc_exporter_benchmark.cc +++ b/exporters/otlp/test/otlp_grpc_exporter_benchmark.cc @@ -49,6 +49,59 @@ const trace_api::SpanContext kSpanContext{ // Create a fake service stub to avoid dependency on gmock class FakeServiceStub : public proto::collector::trace::v1::TraceService::StubInterface { +public: +#if defined(GRPC_CPP_VERSION_MAJOR) && \ + (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 + using async_interface_base = + proto::collector::trace::v1::TraceService::StubInterface::async_interface; +#else + using async_interface_base = + proto::collector::trace::v1::TraceService::StubInterface::experimental_async_interface; +#endif + + FakeServiceStub() : async_interface_(this) {} + + class async_interface : public async_interface_base + { + public: + async_interface(FakeServiceStub *owner) : stub_(owner) {} + + virtual ~async_interface() {} + + void Export( + ::grpc::ClientContext *context, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest *request, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse *response, + std::function callback) override + { + callback(stub_->Export(context, *request, response)); + } + +#if defined(GRPC_CPP_VERSION_MAJOR) && \ + (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \ + defined(GRPC_CALLBACK_API_NONEXPERIMENTAL) + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest * /*request*/, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse * /*response*/, + ::grpc::ClientUnaryReactor * /*reactor*/) override + {} +#else + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest * /*request*/, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse * /*response*/, + ::grpc::experimental::ClientUnaryReactor * /*reactor*/) + {} +#endif + + private: + FakeServiceStub *stub_; + }; + + async_interface_base *async() { return &async_interface_; } + async_interface_base *experimental_async() { return &async_interface_; } + grpc::Status Export(grpc::ClientContext *, const proto::collector::trace::v1::ExportTraceServiceRequest &, proto::collector::trace::v1::ExportTraceServiceResponse *) override @@ -56,21 +109,26 @@ class FakeServiceStub : public proto::collector::trace::v1::TraceService::StubIn return grpc::Status::OK; } - grpc::ClientAsyncResponseReaderInterface - *AsyncExportRaw(grpc::ClientContext *, - const proto::collector::trace::v1::ExportTraceServiceRequest &, - grpc::CompletionQueue *) override + grpc::ClientAsyncResponseReaderInterface< + proto::collector::trace::v1::ExportTraceServiceResponse> * + AsyncExportRaw(grpc::ClientContext *, + const proto::collector::trace::v1::ExportTraceServiceRequest &, + grpc::CompletionQueue *) override { return nullptr; } - grpc::ClientAsyncResponseReaderInterface - *PrepareAsyncExportRaw(grpc::ClientContext *, - const proto::collector::trace::v1::ExportTraceServiceRequest &, - grpc::CompletionQueue *) override + grpc::ClientAsyncResponseReaderInterface< + proto::collector::trace::v1::ExportTraceServiceResponse> * + PrepareAsyncExportRaw(grpc::ClientContext *, + const proto::collector::trace::v1::ExportTraceServiceRequest &, + grpc::CompletionQueue *) override { return nullptr; } + +private: + async_interface async_interface_; }; // OtlpGrpcExporterTestPeer is a friend class of OtlpGrpcExporter diff --git a/exporters/otlp/test/otlp_grpc_exporter_test.cc b/exporters/otlp/test/otlp_grpc_exporter_test.cc index bced58b833..233de4eef7 100644 --- a/exporters/otlp/test/otlp_grpc_exporter_test.cc +++ b/exporters/otlp/test/otlp_grpc_exporter_test.cc @@ -51,7 +51,8 @@ namespace class OtlpMockTraceServiceStub : public proto::collector::trace::v1::MockTraceServiceStub { public: -# if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 +# if defined(GRPC_CPP_VERSION_MAJOR) && \ + (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 using async_interface_base = proto::collector::trace::v1::TraceService::StubInterface::async_interface; # else @@ -78,7 +79,8 @@ class OtlpMockTraceServiceStub : public proto::collector::trace::v1::MockTraceSe callback(stub_->last_async_status_); } -# if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \ +# if defined(GRPC_CPP_VERSION_MAJOR) && \ + (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \ defined(GRPC_CALLBACK_API_NONEXPERIMENTAL) void Export( ::grpc::ClientContext * /*context*/, diff --git a/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc b/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc index c47afb646e..c0c04c577a 100644 --- a/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc +++ b/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc @@ -47,7 +47,8 @@ namespace class OtlpMockTraceServiceStub : public proto::collector::trace::v1::MockTraceServiceStub { public: -#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 +#if defined(GRPC_CPP_VERSION_MAJOR) && \ + (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 using async_interface_base = proto::collector::trace::v1::TraceService::StubInterface::async_interface; #else @@ -74,7 +75,8 @@ class OtlpMockTraceServiceStub : public proto::collector::trace::v1::MockTraceSe callback(stub_->last_async_status_); } -#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \ +#if defined(GRPC_CPP_VERSION_MAJOR) && \ + (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \ defined(GRPC_CALLBACK_API_NONEXPERIMENTAL) void Export( ::grpc::ClientContext * /*context*/, @@ -108,7 +110,8 @@ class OtlpMockTraceServiceStub : public proto::collector::trace::v1::MockTraceSe class OtlpMockLogsServiceStub : public proto::collector::logs::v1::MockLogsServiceStub { public: -#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 +#if defined(GRPC_CPP_VERSION_MAJOR) && \ + (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 using async_interface_base = proto::collector::logs::v1::LogsService::StubInterface::async_interface; #else @@ -135,7 +138,8 @@ class OtlpMockLogsServiceStub : public proto::collector::logs::v1::MockLogsServi callback(stub_->last_async_status_); } -#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \ +#if defined(GRPC_CPP_VERSION_MAJOR) && \ + (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \ defined(GRPC_CALLBACK_API_NONEXPERIMENTAL) void Export( ::grpc::ClientContext * /*context*/, From 0dc3f73c5ed600c11651265aad96411028faad74 Mon Sep 17 00:00:00 2001 From: owent Date: Thu, 16 Nov 2023 21:53:57 +0800 Subject: [PATCH 11/18] Fix styles --- exporters/otlp/src/otlp_grpc_client.cc | 2 +- .../otlp/test/otlp_grpc_exporter_benchmark.cc | 18 ++++++++---------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/exporters/otlp/src/otlp_grpc_client.cc b/exporters/otlp/src/otlp_grpc_client.cc index 1c589b963c..0a3b9f43be 100644 --- a/exporters/otlp/src/otlp_grpc_client.cc +++ b/exporters/otlp/src/otlp_grpc_client.cc @@ -289,7 +289,7 @@ std::shared_ptr OtlpGrpcClient::MakeChannel(const OtlpGrpcClientO ssl_opts.pem_private_key = GetFileContentsOrInMemoryContents(options.ssl_client_key_path, options.ssl_client_key_string); ssl_opts.pem_cert_chain = GetFileContentsOrInMemoryContents(options.ssl_client_cert_path, - options.ssl_client_cert_string); + options.ssl_client_cert_string); #endif channel = diff --git a/exporters/otlp/test/otlp_grpc_exporter_benchmark.cc b/exporters/otlp/test/otlp_grpc_exporter_benchmark.cc index e791dd4459..e03985e3ae 100644 --- a/exporters/otlp/test/otlp_grpc_exporter_benchmark.cc +++ b/exporters/otlp/test/otlp_grpc_exporter_benchmark.cc @@ -109,20 +109,18 @@ class FakeServiceStub : public proto::collector::trace::v1::TraceService::StubIn return grpc::Status::OK; } - grpc::ClientAsyncResponseReaderInterface< - proto::collector::trace::v1::ExportTraceServiceResponse> * - AsyncExportRaw(grpc::ClientContext *, - const proto::collector::trace::v1::ExportTraceServiceRequest &, - grpc::CompletionQueue *) override + grpc::ClientAsyncResponseReaderInterface + *AsyncExportRaw(grpc::ClientContext *, + const proto::collector::trace::v1::ExportTraceServiceRequest &, + grpc::CompletionQueue *) override { return nullptr; } - grpc::ClientAsyncResponseReaderInterface< - proto::collector::trace::v1::ExportTraceServiceResponse> * - PrepareAsyncExportRaw(grpc::ClientContext *, - const proto::collector::trace::v1::ExportTraceServiceRequest &, - grpc::CompletionQueue *) override + grpc::ClientAsyncResponseReaderInterface + *PrepareAsyncExportRaw(grpc::ClientContext *, + const proto::collector::trace::v1::ExportTraceServiceRequest &, + grpc::CompletionQueue *) override { return nullptr; } From 0dadc1cd84a372eba9d1697607ce30511ec630a5 Mon Sep 17 00:00:00 2001 From: owent Date: Fri, 1 Dec 2023 23:21:45 +0800 Subject: [PATCH 12/18] Fix feedbacks. --- .../exporters/otlp/otlp_grpc_client.h | 3 +- exporters/otlp/src/otlp_grpc_client.cc | 15 ++++++--- exporters/otlp/src/otlp_grpc_exporter.cc | 13 +++++--- .../otlp/src/otlp_grpc_log_record_exporter.cc | 13 +++++--- .../otlp/src/otlp_grpc_metric_exporter.cc | 13 +++++--- exporters/otlp/src/otlp_http_exporter.cc | 13 ++++---- .../otlp/src/otlp_http_log_record_exporter.cc | 32 +++++++++---------- .../otlp/src/otlp_http_metric_exporter.cc | 12 ++++--- 8 files changed, 65 insertions(+), 49 deletions(-) diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h index 924a94e466..7d980ab521 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h @@ -39,10 +39,9 @@ struct OtlpGrpcClientAsyncData; class OtlpGrpcClient { public: -#ifdef ENABLE_ASYNC_EXPORT OtlpGrpcClient(); + ~OtlpGrpcClient(); -#endif /** * Create gRPC channel from the exporter options. diff --git a/exporters/otlp/src/otlp_grpc_client.cc b/exporters/otlp/src/otlp_grpc_client.cc index 0a3b9f43be..941ce86dae 100644 --- a/exporters/otlp/src/otlp_grpc_client.cc +++ b/exporters/otlp/src/otlp_grpc_client.cc @@ -35,8 +35,9 @@ namespace otlp namespace { -struct OPENTELEMETRY_LOCAL_SYMBOL OtlpGrpcAsyncCallDataBase +class OPENTELEMETRY_LOCAL_SYMBOL OtlpGrpcAsyncCallDataBase { +public: using GrpcAsyncCallback = bool (*)(OtlpGrpcAsyncCallDataBase *); std::unique_ptr arena; @@ -52,8 +53,9 @@ struct OPENTELEMETRY_LOCAL_SYMBOL OtlpGrpcAsyncCallDataBase }; template -struct OPENTELEMETRY_LOCAL_SYMBOL OtlpGrpcAsyncCallData : public OtlpGrpcAsyncCallDataBase +class OPENTELEMETRY_LOCAL_SYMBOL OtlpGrpcAsyncCallData : public OtlpGrpcAsyncCallDataBase { +public: using RequestType = GrpcRequestType; using ResponseType = GrpcResponseType; @@ -231,11 +233,15 @@ static sdk::common::ExportResult InternalDelegateAsyncExport( #endif } // namespace +OtlpGrpcClient::OtlpGrpcClient() #ifdef ENABLE_ASYNC_EXPORT -OtlpGrpcClient::OtlpGrpcClient() : is_shutdown_{false} {} + : is_shutdown_{false} +#endif +{} OtlpGrpcClient::~OtlpGrpcClient() { +#ifdef ENABLE_ASYNC_EXPORT std::shared_ptr async_data; async_data.swap(async_data_); @@ -247,9 +253,8 @@ OtlpGrpcClient::~OtlpGrpcClient() async_data->max_concurrent_requests; }); } -} - #endif +} std::shared_ptr OtlpGrpcClient::MakeChannel(const OtlpGrpcClientOptions &options) { diff --git a/exporters/otlp/src/otlp_grpc_exporter.cc b/exporters/otlp/src/otlp_grpc_exporter.cc index c3602f857d..607c5ac7c1 100644 --- a/exporters/otlp/src/otlp_grpc_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_exporter.cc @@ -4,6 +4,7 @@ #include #include +#include "opentelemetry/common/macros.h" #include "opentelemetry/exporters/otlp/otlp_grpc_exporter.h" #include "opentelemetry/exporters/otlp/otlp_grpc_client.h" @@ -90,14 +91,14 @@ sdk::common::ExportResult OtlpGrpcExporter::Export( proto::collector::trace::v1::ExportTraceServiceResponse *) { if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE GRPC Exporter] ERROR: Export " << request.resource_spans_size() << " trace span(s) error: " << static_cast(result)); } else { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << request.resource_spans_size() - << " trace span(s) success"); + OTEL_INTERNAL_LOG_DEBUG("[OTLP TRACE GRPC Exporter] Export " + << request.resource_spans_size() << " trace span(s) success"); } return true; }); @@ -121,7 +122,8 @@ sdk::common::ExportResult OtlpGrpcExporter::Export( return sdk::common::ExportResult::kSuccess; } -bool OtlpGrpcExporter::ForceFlush(std::chrono::microseconds timeout) noexcept +bool OtlpGrpcExporter::ForceFlush( + OPENTELEMETRY_MAYBE_UNUSED std::chrono::microseconds timeout) noexcept { #ifdef ENABLE_ASYNC_EXPORT return client_->ForceFlush(timeout); @@ -130,7 +132,8 @@ bool OtlpGrpcExporter::ForceFlush(std::chrono::microseconds timeout) noexcept #endif } -bool OtlpGrpcExporter::Shutdown(std::chrono::microseconds timeout) noexcept +bool OtlpGrpcExporter::Shutdown( + OPENTELEMETRY_MAYBE_UNUSED std::chrono::microseconds timeout) noexcept { const std::lock_guard locked(lock_); is_shutdown_ = true; diff --git a/exporters/otlp/src/otlp_grpc_log_record_exporter.cc b/exporters/otlp/src/otlp_grpc_log_record_exporter.cc index 8793328844..f5971228e9 100644 --- a/exporters/otlp/src/otlp_grpc_log_record_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_log_record_exporter.cc @@ -4,6 +4,7 @@ #include #include +#include "opentelemetry/common/macros.h" #include "opentelemetry/exporters/otlp/otlp_grpc_client.h" #include "opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h" #include "opentelemetry/exporters/otlp/otlp_log_recordable.h" @@ -103,14 +104,14 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogRecordExporter::Export( proto::collector::logs::v1::ExportLogsServiceResponse *) { if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + OTEL_INTERNAL_LOG_ERROR("[OTLP LOG GRPC Exporter] ERROR: Export " << request.resource_logs_size() << " log(s) error: " << static_cast(result)); } else { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << request.resource_logs_size() - << " log(s) success"); + OTEL_INTERNAL_LOG_DEBUG("[OTLP LOG GRPC Exporter] Export " + << request.resource_logs_size() << " log(s) success"); } return true; }); @@ -134,7 +135,8 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogRecordExporter::Export( return sdk::common::ExportResult::kSuccess; } -bool OtlpGrpcLogRecordExporter::Shutdown(std::chrono::microseconds timeout) noexcept +bool OtlpGrpcLogRecordExporter::Shutdown( + OPENTELEMETRY_MAYBE_UNUSED std::chrono::microseconds timeout) noexcept { const std::lock_guard locked(lock_); is_shutdown_ = true; @@ -145,7 +147,8 @@ bool OtlpGrpcLogRecordExporter::Shutdown(std::chrono::microseconds timeout) noex #endif } -bool OtlpGrpcLogRecordExporter::ForceFlush(std::chrono::microseconds timeout) noexcept +bool OtlpGrpcLogRecordExporter::ForceFlush( + OPENTELEMETRY_MAYBE_UNUSED std::chrono::microseconds timeout) noexcept { #ifdef ENABLE_ASYNC_EXPORT return client_->ForceFlush(timeout); diff --git a/exporters/otlp/src/otlp_grpc_metric_exporter.cc b/exporters/otlp/src/otlp_grpc_metric_exporter.cc index de8f5cf8f1..1e0aeb29f9 100644 --- a/exporters/otlp/src/otlp_grpc_metric_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_metric_exporter.cc @@ -4,6 +4,7 @@ #include #include +#include "opentelemetry/common/macros.h" #include "opentelemetry/exporters/otlp/otlp_grpc_client.h" #include "opentelemetry/exporters/otlp/otlp_grpc_metric_exporter.h" #include "opentelemetry/exporters/otlp/otlp_metric_utils.h" @@ -96,14 +97,14 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcMetricExporter::Export( proto::collector::metrics::v1::ExportMetricsServiceResponse *) { if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + OTEL_INTERNAL_LOG_ERROR("[OTLP METRIC GRPC Exporter] ERROR: Export " << request.resource_metrics_size() << " metric(s) error: " << static_cast(result)); } else { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << request.resource_metrics_size() - << " metric(s) success"); + OTEL_INTERNAL_LOG_DEBUG("[OTLP METRIC GRPC Exporter] Export " + << request.resource_metrics_size() << " metric(s) success"); } return true; }); @@ -127,7 +128,8 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcMetricExporter::Export( return opentelemetry::sdk::common::ExportResult::kSuccess; } -bool OtlpGrpcMetricExporter::ForceFlush(std::chrono::microseconds timeout) noexcept +bool OtlpGrpcMetricExporter::ForceFlush( + OPENTELEMETRY_MAYBE_UNUSED std::chrono::microseconds timeout) noexcept { #ifdef ENABLE_ASYNC_EXPORT return client_->ForceFlush(timeout); @@ -136,7 +138,8 @@ bool OtlpGrpcMetricExporter::ForceFlush(std::chrono::microseconds timeout) noexc #endif } -bool OtlpGrpcMetricExporter::Shutdown(std::chrono::microseconds timeout) noexcept +bool OtlpGrpcMetricExporter::Shutdown( + OPENTELEMETRY_MAYBE_UNUSED std::chrono::microseconds timeout) noexcept { const std::lock_guard locked(lock_); is_shutdown_ = true; diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index 5b2c664334..74e8743b8a 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -86,7 +86,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export( if (http_client_->IsShutdown()) { std::size_t span_count = spans.size(); - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE HTTP Exporter] ERROR: Export " << span_count << " trace span(s) failed, exporter is shutdown"); return opentelemetry::sdk::common::ExportResult::kFailure; } @@ -114,14 +114,14 @@ opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export( *service_request, [span_count](opentelemetry::sdk::common::ExportResult result) { if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE HTTP Exporter] ERROR: Export " << span_count << " trace span(s) error: " << static_cast(result)); } else { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << span_count - << " trace span(s) success"); + OTEL_INTERNAL_LOG_DEBUG("[OTLP TRACE HTTP Exporter] Export " << span_count + << " trace span(s) success"); } return true; }); @@ -130,12 +130,13 @@ opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export( opentelemetry::sdk::common::ExportResult result = http_client_->Export(*service_request); if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE HTTP Exporter] ERROR: Export " << span_count << " trace span(s) error: " << static_cast(result)); } else { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << span_count << " trace span(s) success"); + OTEL_INTERNAL_LOG_DEBUG("[OTLP TRACE HTTP Exporter] Export " << span_count + << " trace span(s) success"); } return opentelemetry::sdk::common::ExportResult::kSuccess; #endif diff --git a/exporters/otlp/src/otlp_http_log_record_exporter.cc b/exporters/otlp/src/otlp_http_log_record_exporter.cc index de1567a955..ceba1fd557 100644 --- a/exporters/otlp/src/otlp_http_log_record_exporter.cc +++ b/exporters/otlp/src/otlp_http_log_record_exporter.cc @@ -90,7 +90,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpLogRecordExporter::Export( if (http_client_->IsShutdown()) { std::size_t log_count = logs.size(); - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + OTEL_INTERNAL_LOG_ERROR("[OTLP LOG HTTP Exporter] ERROR: Export " << log_count << " log(s) failed, exporter is shutdown"); return opentelemetry::sdk::common::ExportResult::kFailure; } @@ -113,30 +113,30 @@ opentelemetry::sdk::common::ExportResult OtlpHttpLogRecordExporter::Export( OtlpRecordableUtils::PopulateRequest(logs, service_request); std::size_t log_count = logs.size(); #ifdef ENABLE_ASYNC_EXPORT - http_client_->Export( - *service_request, [log_count](opentelemetry::sdk::common::ExportResult result) { - if (result != opentelemetry::sdk::common::ExportResult::kSuccess) - { - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " - << log_count << " log(s) error: " << static_cast(result)); - } - else - { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << log_count << " log(s) success"); - } - return true; - }); + http_client_->Export(*service_request, [log_count]( + opentelemetry::sdk::common::ExportResult result) { + if (result != opentelemetry::sdk::common::ExportResult::kSuccess) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP LOG HTTP Exporter] ERROR: Export " + << log_count << " log(s) error: " << static_cast(result)); + } + else + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP LOG HTTP Exporter] Export " << log_count << " log(s) success"); + } + return true; + }); return opentelemetry::sdk::common::ExportResult::kSuccess; #else opentelemetry::sdk::common::ExportResult result = http_client_->Export(*service_request); if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + OTEL_INTERNAL_LOG_ERROR("[OTLP LOG HTTP Exporter] ERROR: Export " << log_count << " log(s) error: " << static_cast(result)); } else { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << log_count << " log(s) success"); + OTEL_INTERNAL_LOG_DEBUG("[OTLP LOG HTTP Exporter] Export " << log_count << " log(s) success"); } return opentelemetry::sdk::common::ExportResult::kSuccess; #endif diff --git a/exporters/otlp/src/otlp_http_metric_exporter.cc b/exporters/otlp/src/otlp_http_metric_exporter.cc index 40ed8272f3..d654a5dad2 100644 --- a/exporters/otlp/src/otlp_http_metric_exporter.cc +++ b/exporters/otlp/src/otlp_http_metric_exporter.cc @@ -93,7 +93,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpMetricExporter::Export( if (http_client_->IsShutdown()) { std::size_t metric_count = data.scope_metric_data_.size(); - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + OTEL_INTERNAL_LOG_ERROR("[OTLP METRIC HTTP Exporter] ERROR: Export " << metric_count << " metric(s) failed, exporter is shutdown"); return opentelemetry::sdk::common::ExportResult::kFailure; } @@ -121,12 +121,13 @@ opentelemetry::sdk::common::ExportResult OtlpHttpMetricExporter::Export( opentelemetry::sdk::common::ExportResult result) { if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + OTEL_INTERNAL_LOG_ERROR("[OTLP METRIC HTTP Exporter] ERROR: Export " << metric_count << " metric(s) error: " << static_cast(result)); } else { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << metric_count << " metric(s) success"); + OTEL_INTERNAL_LOG_DEBUG("[OTLP METRIC HTTP Exporter] Export " << metric_count + << " metric(s) success"); } return true; }); @@ -135,12 +136,13 @@ opentelemetry::sdk::common::ExportResult OtlpHttpMetricExporter::Export( opentelemetry::sdk::common::ExportResult result = http_client_->Export(*service_request); if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + OTEL_INTERNAL_LOG_ERROR("[OTLP METRIC HTTP Exporter] ERROR: Export " << metric_count << " metric(s) error: " << static_cast(result)); } else { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << metric_count << " metric(s) success"); + OTEL_INTERNAL_LOG_DEBUG("[OTLP METRIC HTTP Exporter] Export " << metric_count + << " metric(s) success"); } return opentelemetry::sdk::common::ExportResult::kSuccess; #endif From ac964f32b12fd374ca3a93e5315ae3a3c541ce32 Mon Sep 17 00:00:00 2001 From: owent Date: Fri, 1 Dec 2023 23:49:03 +0800 Subject: [PATCH 13/18] Fix style --- exporters/otlp/src/otlp_grpc_client.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/exporters/otlp/src/otlp_grpc_client.cc b/exporters/otlp/src/otlp_grpc_client.cc index 941ce86dae..f36a5e3b8f 100644 --- a/exporters/otlp/src/otlp_grpc_client.cc +++ b/exporters/otlp/src/otlp_grpc_client.cc @@ -235,7 +235,10 @@ static sdk::common::ExportResult InternalDelegateAsyncExport( OtlpGrpcClient::OtlpGrpcClient() #ifdef ENABLE_ASYNC_EXPORT - : is_shutdown_{false} + : is_shutdown_ +{ + false +} #endif {} From 692cfede84c0dd5f6d13521d19e73b2899056019 Mon Sep 17 00:00:00 2001 From: owent Date: Fri, 8 Dec 2023 16:52:28 +0800 Subject: [PATCH 14/18] Fixes format --- exporters/otlp/src/otlp_grpc_client.cc | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/exporters/otlp/src/otlp_grpc_client.cc b/exporters/otlp/src/otlp_grpc_client.cc index d749786d63..5ddbc6d82d 100644 --- a/exporters/otlp/src/otlp_grpc_client.cc +++ b/exporters/otlp/src/otlp_grpc_client.cc @@ -241,7 +241,10 @@ static sdk::common::ExportResult InternalDelegateAsyncExport( OtlpGrpcClient::OtlpGrpcClient() #ifdef ENABLE_ASYNC_EXPORT - : is_shutdown_{false} + : is_shutdown_ +{ + false +} #endif {} @@ -300,7 +303,7 @@ std::shared_ptr OtlpGrpcClient::MakeChannel(const OtlpGrpcClientO ssl_opts.pem_private_key = GetFileContentsOrInMemoryContents(options.ssl_client_key_path, options.ssl_client_key_string); ssl_opts.pem_cert_chain = GetFileContentsOrInMemoryContents(options.ssl_client_cert_path, - options.ssl_client_cert_string); + options.ssl_client_cert_string); #endif channel = From 05525fdbd353d907093943991da83d4ff65d47de Mon Sep 17 00:00:00 2001 From: owent Date: Wed, 10 Jan 2024 10:47:50 +0800 Subject: [PATCH 15/18] Cleanup unused methods --- .../exporters/otlp/otlp_grpc_client.h | 7 ++----- exporters/otlp/src/otlp_grpc_client.cc | 21 ++++--------------- 2 files changed, 6 insertions(+), 22 deletions(-) diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h index 7d980ab521..00f53f245b 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h @@ -39,9 +39,11 @@ struct OtlpGrpcClientAsyncData; class OtlpGrpcClient { public: +#ifdef ENABLE_ASYNC_EXPORT OtlpGrpcClient(); ~OtlpGrpcClient(); +#endif /** * Create gRPC channel from the exporter options. @@ -54,11 +56,6 @@ class OtlpGrpcClient static std::unique_ptr MakeClientContext( const OtlpGrpcClientOptions &options); - /** - * Create gRPC CompletionQueue to async call RPC. - */ - static std::unique_ptr MakeCompletionQueue(); - /** * Create trace service stub to communicate with the OpenTelemetry Collector. */ diff --git a/exporters/otlp/src/otlp_grpc_client.cc b/exporters/otlp/src/otlp_grpc_client.cc index 5ddbc6d82d..3224343abb 100644 --- a/exporters/otlp/src/otlp_grpc_client.cc +++ b/exporters/otlp/src/otlp_grpc_client.cc @@ -225,7 +225,7 @@ static sdk::common::ExportResult InternalDelegateAsyncExport( async_data->session_waker.notify_all(); }); - // Can not cancle when start the request + // Can not cancel when start the request { std::unique_lock lock{async_data->session_waker_lock}; async_data->session_waker.wait_for(lock, async_data->export_timeout, [async_data]() { @@ -239,18 +239,11 @@ static sdk::common::ExportResult InternalDelegateAsyncExport( #endif } // namespace -OtlpGrpcClient::OtlpGrpcClient() #ifdef ENABLE_ASYNC_EXPORT - : is_shutdown_ -{ - false -} -#endif -{} +OtlpGrpcClient::OtlpGrpcClient() : is_shutdown_(false) {} OtlpGrpcClient::~OtlpGrpcClient() { -#ifdef ENABLE_ASYNC_EXPORT std::shared_ptr async_data; async_data.swap(async_data_); @@ -258,12 +251,11 @@ OtlpGrpcClient::~OtlpGrpcClient() { std::unique_lock lock{async_data->session_waker_lock}; async_data->session_waker.wait_for(lock, async_data->export_timeout, [async_data]() { - return async_data->running_requests.load(std::memory_order_acquire) <= - async_data->max_concurrent_requests; + return async_data->running_requests.load(std::memory_order_acquire) == 0; }); } -#endif } +#endif std::shared_ptr OtlpGrpcClient::MakeChannel(const OtlpGrpcClientOptions &options) { @@ -340,11 +332,6 @@ std::unique_ptr OtlpGrpcClient::MakeClientContext( return context; } -std::unique_ptr OtlpGrpcClient::MakeCompletionQueue() -{ - return std::unique_ptr(new grpc::CompletionQueue()); -} - std::unique_ptr OtlpGrpcClient::MakeTraceServiceStub(const OtlpGrpcClientOptions &options) { From d07cdf68aa7abf261d12f94a3b65cad63ce8f74f Mon Sep 17 00:00:00 2001 From: owent Date: Wed, 10 Jan 2024 19:18:21 +0800 Subject: [PATCH 16/18] Make constructor and destructors always available. --- .../opentelemetry/exporters/otlp/otlp_grpc_client.h | 2 -- exporters/otlp/src/otlp_grpc_client.cc | 11 ++++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h index 00f53f245b..74c2630e72 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h @@ -39,11 +39,9 @@ struct OtlpGrpcClientAsyncData; class OtlpGrpcClient { public: -#ifdef ENABLE_ASYNC_EXPORT OtlpGrpcClient(); ~OtlpGrpcClient(); -#endif /** * Create gRPC channel from the exporter options. diff --git a/exporters/otlp/src/otlp_grpc_client.cc b/exporters/otlp/src/otlp_grpc_client.cc index 3224343abb..206e057bd5 100644 --- a/exporters/otlp/src/otlp_grpc_client.cc +++ b/exporters/otlp/src/otlp_grpc_client.cc @@ -239,11 +239,15 @@ static sdk::common::ExportResult InternalDelegateAsyncExport( #endif } // namespace +OtlpGrpcClient::OtlpGrpcClient() #ifdef ENABLE_ASYNC_EXPORT -OtlpGrpcClient::OtlpGrpcClient() : is_shutdown_(false) {} + : is_shutdown_(false) +#endif +{} OtlpGrpcClient::~OtlpGrpcClient() { +#ifdef ENABLE_ASYNC_EXPORT std::shared_ptr async_data; async_data.swap(async_data_); @@ -251,11 +255,12 @@ OtlpGrpcClient::~OtlpGrpcClient() { std::unique_lock lock{async_data->session_waker_lock}; async_data->session_waker.wait_for(lock, async_data->export_timeout, [async_data]() { - return async_data->running_requests.load(std::memory_order_acquire) == 0; + return async_data->running_requests.load(std::memory_order_acquire) <= + async_data->max_concurrent_requests; }); } -} #endif +} std::shared_ptr OtlpGrpcClient::MakeChannel(const OtlpGrpcClientOptions &options) { From cdb8816aab8a3938b233af4a33b635174e8fbebf Mon Sep 17 00:00:00 2001 From: owent Date: Fri, 12 Jan 2024 22:47:32 +0800 Subject: [PATCH 17/18] Fix wair_for in destructor --- exporters/otlp/src/otlp_grpc_client.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/exporters/otlp/src/otlp_grpc_client.cc b/exporters/otlp/src/otlp_grpc_client.cc index 206e057bd5..b90718a320 100644 --- a/exporters/otlp/src/otlp_grpc_client.cc +++ b/exporters/otlp/src/otlp_grpc_client.cc @@ -255,8 +255,7 @@ OtlpGrpcClient::~OtlpGrpcClient() { std::unique_lock lock{async_data->session_waker_lock}; async_data->session_waker.wait_for(lock, async_data->export_timeout, [async_data]() { - return async_data->running_requests.load(std::memory_order_acquire) <= - async_data->max_concurrent_requests; + return async_data->running_requests.load(std::memory_order_acquire) == 0; }); } #endif From 0f4c77f395ddf59c95f261e7f74e3d524881eeb2 Mon Sep 17 00:00:00 2001 From: owent Date: Fri, 2 Feb 2024 01:26:05 +0800 Subject: [PATCH 18/18] Add comments for old version of gRPC, make `is_shutdown_` thread-safety --- .../opentelemetry/exporters/otlp/otlp_grpc_client.h | 3 ++- exporters/otlp/src/otlp_grpc_client.cc | 9 +++++---- exporters/otlp/test/otlp_grpc_exporter_test.cc | 2 ++ 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h index 74c2630e72..89224403a4 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h @@ -6,6 +6,7 @@ #include #include +#include #include #include "opentelemetry/sdk/common/exporter_utils.h" @@ -172,7 +173,7 @@ class OtlpGrpcClient private: // Stores if this gRPC client had its Shutdown() method called - bool is_shutdown_; + std::atomic is_shutdown_; // Stores shared data between threads of this gRPC client std::shared_ptr async_data_; diff --git a/exporters/otlp/src/otlp_grpc_client.cc b/exporters/otlp/src/otlp_grpc_client.cc index b90718a320..ed3c53b3a0 100644 --- a/exporters/otlp/src/otlp_grpc_client.cc +++ b/exporters/otlp/src/otlp_grpc_client.cc @@ -199,6 +199,7 @@ static sdk::common::ExportResult InternalDelegateAsyncExport( ++async_data->start_request_counter; ++async_data->running_requests; + // Some old toolchains can only use gRPC 1.33 and it's experimental. # if defined(GRPC_CPP_VERSION_MAJOR) && \ (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 stub->async() @@ -407,7 +408,7 @@ sdk::common::ExportResult OtlpGrpcClient::DelegateAsyncExport( &&result_callback) noexcept { auto span_count = request.resource_spans_size(); - if (is_shutdown_) + if (is_shutdown_.load(std::memory_order_acquire)) { OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " << span_count << " trace span(s) failed, exporter is shutdown"); @@ -438,7 +439,7 @@ sdk::common::ExportResult OtlpGrpcClient::DelegateAsyncExport( &&result_callback) noexcept { auto metrics_count = request.resource_metrics_size(); - if (is_shutdown_) + if (is_shutdown_.load(std::memory_order_acquire)) { OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " << metrics_count << " metric(s) failed, exporter is shutdown"); @@ -469,7 +470,7 @@ sdk::common::ExportResult OtlpGrpcClient::DelegateAsyncExport( &&result_callback) noexcept { auto logs_count = request.resource_logs_size(); - if (is_shutdown_) + if (is_shutdown_.load(std::memory_order_acquire)) { OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " << logs_count << " log(s) failed, exporter is shutdown"); @@ -554,7 +555,7 @@ bool OtlpGrpcClient::Shutdown(std::chrono::microseconds timeout) noexcept return true; } - if (!is_shutdown_) + if (false == is_shutdown_.exchange(true, std::memory_order_acq_rel)) { is_shutdown_ = true; diff --git a/exporters/otlp/test/otlp_grpc_exporter_test.cc b/exporters/otlp/test/otlp_grpc_exporter_test.cc index 233de4eef7..b8c651a8fe 100644 --- a/exporters/otlp/test/otlp_grpc_exporter_test.cc +++ b/exporters/otlp/test/otlp_grpc_exporter_test.cc @@ -51,6 +51,7 @@ namespace class OtlpMockTraceServiceStub : public proto::collector::trace::v1::MockTraceServiceStub { public: +// Some old toolchains can only use gRPC 1.33 and it's experimental. # if defined(GRPC_CPP_VERSION_MAJOR) && \ (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 using async_interface_base = @@ -79,6 +80,7 @@ class OtlpMockTraceServiceStub : public proto::collector::trace::v1::MockTraceSe callback(stub_->last_async_status_); } +// Some old toolchains can only use gRPC 1.33 and it's experimental. # if defined(GRPC_CPP_VERSION_MAJOR) && \ (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \ defined(GRPC_CALLBACK_API_NONEXPERIMENTAL)