diff --git a/CHANGELOG.md b/CHANGELOG.md index 23b80ce5b8..370d9f1049 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ Increment the: ## [Unreleased] +* [EXPORTER] Add async exporting for OTLP/GRPC exporter + [#2407](https://github.com/open-telemetry/opentelemetry-cpp/pull/2407) * [API] Fix b3, w3c and jaeger propagators: they will not overwrite the active span with a default invalid span, which is especially useful when used with CompositePropagator diff --git a/api/include/opentelemetry/common/macros.h b/api/include/opentelemetry/common/macros.h index dd40c63bfa..88ec2f0182 100644 --- a/api/include/opentelemetry/common/macros.h +++ b/api/include/opentelemetry/common/macros.h @@ -164,16 +164,19 @@ point. #if defined(__clang__) # define OPENTELEMETRY_API_SINGLETON __attribute__((visibility("default"))) +# define OPENTELEMETRY_LOCAL_SYMBOL __attribute__((visibility("hidden"))) #elif defined(__GNUC__) # define OPENTELEMETRY_API_SINGLETON __attribute__((visibility("default"))) +# define OPENTELEMETRY_LOCAL_SYMBOL __attribute__((visibility("hidden"))) #else /* Add support for other compilers here. */ # define OPENTELEMETRY_API_SINGLETON +# define OPENTELEMETRY_LOCAL_SYMBOL #endif diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h index fa1a69d619..89224403a4 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h @@ -3,14 +3,19 @@ #pragma once +#include #include +#include #include +#include "opentelemetry/sdk/common/exporter_utils.h" + #include "opentelemetry/exporters/otlp/otlp_grpc_client_options.h" #include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" +#include "google/protobuf/arena.h" #include "opentelemetry/proto/collector/logs/v1/logs_service.grpc.pb.h" #include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h" #include "opentelemetry/proto/collector/trace/v1/trace_service.grpc.pb.h" @@ -25,12 +30,20 @@ namespace otlp struct OtlpGrpcClientOptions; +#ifdef ENABLE_ASYNC_EXPORT +struct OtlpGrpcClientAsyncData; +#endif + /** * The OTLP gRPC client contains utility functions of gRPC. */ class OtlpGrpcClient { public: + OtlpGrpcClient(); + + ~OtlpGrpcClient(); + /** * Create gRPC channel from the exporter options. */ @@ -42,11 +55,6 @@ class OtlpGrpcClient static std::unique_ptr MakeClientContext( const OtlpGrpcClientOptions &options); - /** - * Create gRPC CompletionQueue to async call RPC. - */ - static std::unique_ptr MakeCompletionQueue(); - /** * Create trace service stub to communicate with the OpenTelemetry Collector. */ @@ -67,21 +75,109 @@ class OtlpGrpcClient static grpc::Status DelegateExport( proto::collector::trace::v1::TraceService::StubInterface *stub, - grpc::ClientContext *context, - const proto::collector::trace::v1::ExportTraceServiceRequest &request, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::trace::v1::ExportTraceServiceRequest &&request, proto::collector::trace::v1::ExportTraceServiceResponse *response); static grpc::Status DelegateExport( proto::collector::metrics::v1::MetricsService::StubInterface *stub, - grpc::ClientContext *context, - const proto::collector::metrics::v1::ExportMetricsServiceRequest &request, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::metrics::v1::ExportMetricsServiceRequest &&request, proto::collector::metrics::v1::ExportMetricsServiceResponse *response); static grpc::Status DelegateExport( proto::collector::logs::v1::LogsService::StubInterface *stub, - grpc::ClientContext *context, - const proto::collector::logs::v1::ExportLogsServiceRequest &request, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::logs::v1::ExportLogsServiceRequest &&request, proto::collector::logs::v1::ExportLogsServiceResponse *response); + +#ifdef ENABLE_ASYNC_EXPORT + + /** + * Async export + * @param options Options used to message to create gRPC context and stub(if necessary) + * @param arena Protobuf arena to hold lifetime of all messages + * @param request Request for this RPC + * @param result_callback callback to call when the exporting is done + * @return return the status of this operation + */ + sdk::common::ExportResult DelegateAsyncExport( + const OtlpGrpcClientOptions &options, + proto::collector::trace::v1::TraceService::StubInterface *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::trace::v1::ExportTraceServiceRequest &&request, + std::function &&, + const proto::collector::trace::v1::ExportTraceServiceRequest &, + proto::collector::trace::v1::ExportTraceServiceResponse *)> + &&result_callback) noexcept; + + /** + * Async export + * @param options Options used to message to create gRPC context and stub(if necessary) + * @param arena Protobuf arena to hold lifetime of all messages + * @param request Request for this RPC + * @param result_callback callback to call when the exporting is done + * @return return the status of this operation + */ + sdk::common::ExportResult DelegateAsyncExport( + const OtlpGrpcClientOptions &options, + proto::collector::metrics::v1::MetricsService::StubInterface *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::metrics::v1::ExportMetricsServiceRequest &&request, + std::function &&, + const proto::collector::metrics::v1::ExportMetricsServiceRequest &, + proto::collector::metrics::v1::ExportMetricsServiceResponse *)> + &&result_callback) noexcept; + + /** + * Async export + * @param options Options used to message to create gRPC context and stub(if necessary) + * @param arena Protobuf arena to hold lifetime of all messages + * @param request Request for this RPC + * @param result_callback callback to call when the exporting is done + * @return return the status of this operation + */ + sdk::common::ExportResult DelegateAsyncExport( + const OtlpGrpcClientOptions &options, + proto::collector::logs::v1::LogsService::StubInterface *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::logs::v1::ExportLogsServiceRequest &&request, + std::function &&, + const proto::collector::logs::v1::ExportLogsServiceRequest &, + proto::collector::logs::v1::ExportLogsServiceResponse *)> + &&result_callback) noexcept; + + /** + * Force flush the gRPC client. + */ + bool ForceFlush(std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept; + + /** + * Shut down the gRPC client. + * @param timeout an optional timeout, the default timeout of 0 means that no + * timeout is applied. + * @return return the status of this operation + */ + bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept; + + std::shared_ptr MutableAsyncData(const OtlpGrpcClientOptions &options); + +private: + // Stores if this gRPC client had its Shutdown() method called + std::atomic is_shutdown_; + + // Stores shared data between threads of this gRPC client + std::shared_ptr async_data_; +#endif }; } // namespace otlp } // namespace exporter diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client_options.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client_options.h index 310fc94d4d..1191c2118a 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client_options.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client_options.h @@ -51,6 +51,14 @@ struct OtlpGrpcClientOptions /** User agent. */ std::string user_agent; + + /** max number of threads that can be allocated from this */ + std::size_t max_threads; + +#ifdef ENABLE_ASYNC_EXPORT + // Concurrent requests + std::size_t max_concurrent_requests; +#endif }; } // namespace otlp diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h index 870e5a043a..fdd6bc4a17 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h @@ -23,6 +23,8 @@ namespace exporter namespace otlp { +class OtlpGrpcClient; + /** * The OTLP exporter exports span data in OpenTelemetry Protocol (OTLP) format. */ @@ -73,6 +75,10 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter // The configuration options associated with this exporter. const OtlpGrpcExporterOptions options_; +#ifdef ENABLE_ASYNC_EXPORT + std::shared_ptr client_; +#endif + // For testing friend class OtlpGrpcExporterTestPeer; friend class OtlpGrpcLogRecordExporterTestPeer; diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h index f1cd96888c..e4c2e96617 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h @@ -22,6 +22,8 @@ namespace exporter namespace otlp { +class OtlpGrpcClient; + /** * The OTLP exporter exports log data in OpenTelemetry Protocol (OTLP) format in gRPC. */ @@ -73,6 +75,10 @@ class OtlpGrpcLogRecordExporter : public opentelemetry::sdk::logs::LogRecordExpo // Configuration options for the exporter const OtlpGrpcLogRecordExporterOptions options_; +#ifdef ENABLE_ASYNC_EXPORT + std::shared_ptr client_; +#endif + // For testing friend class OtlpGrpcLogRecordExporterTestPeer; diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_metric_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_metric_exporter.h index 5f975c8ce3..1385c381ec 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_metric_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_metric_exporter.h @@ -22,6 +22,8 @@ namespace exporter namespace otlp { +class OtlpGrpcClient; + /** * The OTLP exporter exports metrics data in OpenTelemetry Protocol (OTLP) format in gRPC. */ @@ -59,6 +61,10 @@ class OtlpGrpcMetricExporter : public opentelemetry::sdk::metrics::PushMetricExp // The configuration options associated with this exporter. const OtlpGrpcMetricExporterOptions options_; +#ifdef ENABLE_ASYNC_EXPORT + std::shared_ptr client_; +#endif + // Aggregation Temporality selector const sdk::metrics::AggregationTemporalitySelector aggregation_temporality_selector_; diff --git a/exporters/otlp/src/otlp_grpc_client.cc b/exporters/otlp/src/otlp_grpc_client.cc index c0436eddaf..ed3c53b3a0 100644 --- a/exporters/otlp/src/otlp_grpc_client.cc +++ b/exporters/otlp/src/otlp_grpc_client.cc @@ -9,12 +9,20 @@ # include #endif +#include +#include +#include #include #include #include +#include #include +#include +#include +#include "opentelemetry/common/timestamp.h" #include "opentelemetry/ext/http/common/url_parser.h" +#include "opentelemetry/nostd/function_ref.h" #include "opentelemetry/sdk/common/global_log_handler.h" OPENTELEMETRY_BEGIN_NAMESPACE @@ -23,6 +31,81 @@ namespace exporter namespace otlp { +#ifdef ENABLE_ASYNC_EXPORT + +namespace +{ +// When building with -fvisibility=default, we hide the symbols and vtable to ensure we always use +// local version of OtlpGrpcAsyncCallDataBase and OtlpGrpcAsyncCallData. It's used to keep +// compatibility when executables links multiple versions of otel-cpp. +class OPENTELEMETRY_LOCAL_SYMBOL OtlpGrpcAsyncCallDataBase +{ +public: + using GrpcAsyncCallback = bool (*)(OtlpGrpcAsyncCallDataBase *); + + std::unique_ptr arena; + grpc::Status grpc_status; + std::unique_ptr grpc_context; + + opentelemetry::sdk::common::ExportResult export_result = + opentelemetry::sdk::common::ExportResult::kFailure; + GrpcAsyncCallback grpc_async_callback = nullptr; + + OtlpGrpcAsyncCallDataBase() {} + virtual ~OtlpGrpcAsyncCallDataBase() {} +}; + +// When building with -fvisibility=default, we hide the symbols and vtable to ensure we always use +// local version of OtlpGrpcAsyncCallDataBase and OtlpGrpcAsyncCallData. It's used to keep +// compatibility when executables links multiple versions of otel-cpp. +template +class OPENTELEMETRY_LOCAL_SYMBOL OtlpGrpcAsyncCallData : public OtlpGrpcAsyncCallDataBase +{ +public: + using RequestType = GrpcRequestType; + using ResponseType = GrpcResponseType; + + RequestType *request = nullptr; + ResponseType *response = nullptr; + + std::function &&, + const RequestType &, + ResponseType *)> + result_callback; + + OtlpGrpcAsyncCallData() {} + virtual ~OtlpGrpcAsyncCallData() {} +}; +} // namespace + +struct OtlpGrpcClientAsyncData +{ + std::chrono::system_clock::duration export_timeout = std::chrono::seconds{10}; + + // The best performance trade-off of gRPC is having numcpu's threads and one completion queue + // per thread, but this exporter should not cost a lot resource and we don't want to create + // too many threads in the process. So we use one completion queue. + grpc::CompletionQueue cq; + + // Running requests, this is used to limit the number of concurrent requests. + std::atomic running_requests{0}; + // Request counter is used to record ForceFlush. + std::atomic start_request_counter{0}; + std::atomic finished_request_counter{0}; + std::size_t max_concurrent_requests = 64; + + // Condition variable and mutex to control the concurrency count of running requests. + std::mutex session_waker_lock; + std::condition_variable session_waker; + + // Do not use OtlpGrpcClientAsyncData() = default; here, some versions of GCC&Clang have BUGs + // and may not initialize the member correctly. See also + // https://stackoverflow.com/questions/53408962/try-to-understand-compiler-error-message-default-member-initializer-required-be + OtlpGrpcClientAsyncData() {} +}; +#endif + namespace { // ----------------------------- Helper functions ------------------------------ @@ -46,16 +129,147 @@ static std::string GetFileContentsOrInMemoryContents(const std::string &file_pat return contents; } +#ifdef ENABLE_ASYNC_EXPORT +template +static sdk::common::ExportResult InternalDelegateAsyncExport( + std::shared_ptr async_data, + StubType *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + RequestType &&request, + std::function &&, + const RequestType &, + ResponseType *)> &&result_callback, + int32_t export_data_count, + const char *export_data_name) noexcept +{ + if (async_data->running_requests.load(std::memory_order_acquire) >= + async_data->max_concurrent_requests) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " + << export_data_count << " " << export_data_name + << " failed, exporter queue is full"); + if (result_callback) + { + result_callback(opentelemetry::sdk::common::ExportResult::kFailureFull, std::move(arena), + request, nullptr); + } + return opentelemetry::sdk::common::ExportResult::kFailureFull; + } + + std::shared_ptr> call_data = + std::make_shared>(); + call_data->arena.swap(arena); + call_data->result_callback.swap(result_callback); + + call_data->request = + google::protobuf::Arena::Create(call_data->arena.get(), std::move(request)); + call_data->response = google::protobuf::Arena::Create(call_data->arena.get()); + + if (call_data->request == nullptr || call_data->response == nullptr) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " + << export_data_count << " " << export_data_name + << " failed, create gRPC request/response failed"); + + if (call_data->result_callback) + { + call_data->result_callback( + opentelemetry::sdk::common::ExportResult::kFailure, std::move(call_data->arena), + nullptr == call_data->request ? request : *call_data->request, call_data->response); + } + + return opentelemetry::sdk::common::ExportResult::kFailure; + } + call_data->grpc_context.swap(context); + + call_data->grpc_async_callback = [](OtlpGrpcAsyncCallDataBase *base_call_data) { + OtlpGrpcAsyncCallData *real_call_data = + static_cast *>(base_call_data); + if (real_call_data->result_callback) + { + return real_call_data->result_callback(real_call_data->export_result, + std::move(real_call_data->arena), + *real_call_data->request, real_call_data->response); + } + + return true; + }; + + ++async_data->start_request_counter; + ++async_data->running_requests; + // Some old toolchains can only use gRPC 1.33 and it's experimental. +# if defined(GRPC_CPP_VERSION_MAJOR) && \ + (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 + stub->async() +# else + stub->experimental_async() +# endif + ->Export(call_data->grpc_context.get(), call_data->request, call_data->response, + [call_data, async_data](::grpc::Status grpc_status) { + --async_data->running_requests; + ++async_data->finished_request_counter; + + call_data->grpc_status = grpc_status; + if (call_data->grpc_status.ok()) + { + call_data->export_result = opentelemetry::sdk::common::ExportResult::kSuccess; + } + + if (call_data->grpc_async_callback) + { + call_data->grpc_async_callback(call_data.get()); + } + + // Maybe wake up blocking DelegateAsyncExport() call + async_data->session_waker.notify_all(); + }); + + // Can not cancel when start the request + { + std::unique_lock lock{async_data->session_waker_lock}; + async_data->session_waker.wait_for(lock, async_data->export_timeout, [async_data]() { + return async_data->running_requests.load(std::memory_order_acquire) <= + async_data->max_concurrent_requests; + }); + } + + return opentelemetry::sdk::common::ExportResult::kSuccess; +} +#endif } // namespace +OtlpGrpcClient::OtlpGrpcClient() +#ifdef ENABLE_ASYNC_EXPORT + : is_shutdown_(false) +#endif +{} + +OtlpGrpcClient::~OtlpGrpcClient() +{ +#ifdef ENABLE_ASYNC_EXPORT + std::shared_ptr async_data; + async_data.swap(async_data_); + + while (async_data && async_data->running_requests.load(std::memory_order_acquire) > 0) + { + std::unique_lock lock{async_data->session_waker_lock}; + async_data->session_waker.wait_for(lock, async_data->export_timeout, [async_data]() { + return async_data->running_requests.load(std::memory_order_acquire) == 0; + }); + } +#endif +} + std::shared_ptr OtlpGrpcClient::MakeChannel(const OtlpGrpcClientOptions &options) { std::shared_ptr channel; // - // Scheme is allowed in OTLP endpoint definition, but is not allowed for creating gRPC channel. - // Passing URI with scheme to grpc::CreateChannel could resolve the endpoint to some unexpected - // address. + // Scheme is allowed in OTLP endpoint definition, but is not allowed for creating gRPC + // channel. Passing URI with scheme to grpc::CreateChannel could resolve the endpoint to some + // unexpected address. // ext::http::common::UrlParser url(options.endpoint); @@ -70,6 +284,13 @@ std::shared_ptr OtlpGrpcClient::MakeChannel(const OtlpGrpcClientO grpc::ChannelArguments grpc_arguments; grpc_arguments.SetUserAgentPrefix(options.user_agent); + if (options.max_threads > 0) + { + grpc::ResourceQuota quota; + quota.SetMaxThreads(static_cast(options.max_threads)); + grpc_arguments.SetResourceQuota(quota); + } + if (options.use_ssl_credentials) { grpc::SslCredentialsOptions ssl_opts; @@ -80,6 +301,7 @@ std::shared_ptr OtlpGrpcClient::MakeChannel(const OtlpGrpcClientO options.ssl_client_key_string); ssl_opts.pem_cert_chain = GetFileContentsOrInMemoryContents(options.ssl_client_cert_path, options.ssl_client_cert_string); + #endif channel = grpc::CreateCustomChannel(grpc_target, grpc::SslCredentials(ssl_opts), grpc_arguments); @@ -115,11 +337,6 @@ std::unique_ptr OtlpGrpcClient::MakeClientContext( return context; } -std::unique_ptr OtlpGrpcClient::MakeCompletionQueue() -{ - return std::unique_ptr(new grpc::CompletionQueue()); -} - std::unique_ptr OtlpGrpcClient::MakeTraceServiceStub(const OtlpGrpcClientOptions &options) { @@ -140,31 +357,216 @@ OtlpGrpcClient::MakeLogsServiceStub(const OtlpGrpcClientOptions &options) grpc::Status OtlpGrpcClient::DelegateExport( proto::collector::trace::v1::TraceService::StubInterface *stub, - grpc::ClientContext *context, - const proto::collector::trace::v1::ExportTraceServiceRequest &request, + std::unique_ptr &&context, + std::unique_ptr && /*arena*/, + proto::collector::trace::v1::ExportTraceServiceRequest &&request, proto::collector::trace::v1::ExportTraceServiceResponse *response) { - return stub->Export(context, request, response); + return stub->Export(context.get(), request, response); } grpc::Status OtlpGrpcClient::DelegateExport( proto::collector::metrics::v1::MetricsService::StubInterface *stub, - grpc::ClientContext *context, - const proto::collector::metrics::v1::ExportMetricsServiceRequest &request, + std::unique_ptr &&context, + std::unique_ptr && /*arena*/, + proto::collector::metrics::v1::ExportMetricsServiceRequest &&request, proto::collector::metrics::v1::ExportMetricsServiceResponse *response) { - return stub->Export(context, request, response); + return stub->Export(context.get(), request, response); } grpc::Status OtlpGrpcClient::DelegateExport( proto::collector::logs::v1::LogsService::StubInterface *stub, - grpc::ClientContext *context, - const proto::collector::logs::v1::ExportLogsServiceRequest &request, + std::unique_ptr &&context, + std::unique_ptr && /*arena*/, + proto::collector::logs::v1::ExportLogsServiceRequest &&request, proto::collector::logs::v1::ExportLogsServiceResponse *response) { - return stub->Export(context, request, response); + return stub->Export(context.get(), request, response); } +#ifdef ENABLE_ASYNC_EXPORT + +/** + * Async export + * @param options Options used to message to create gRPC context and stub(if necessary) + * @param arena Protobuf arena to hold lifetime of all messages + * @param request Request for this RPC + * @param result_callback callback to call when the exporting is done + * @return return the status of this operation + */ +sdk::common::ExportResult OtlpGrpcClient::DelegateAsyncExport( + const OtlpGrpcClientOptions &options, + proto::collector::trace::v1::TraceService::StubInterface *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::trace::v1::ExportTraceServiceRequest &&request, + std::function &&, + const proto::collector::trace::v1::ExportTraceServiceRequest &, + proto::collector::trace::v1::ExportTraceServiceResponse *)> + &&result_callback) noexcept +{ + auto span_count = request.resource_spans_size(); + if (is_shutdown_.load(std::memory_order_acquire)) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " + << span_count << " trace span(s) failed, exporter is shutdown"); + if (result_callback) + { + result_callback(opentelemetry::sdk::common::ExportResult::kFailure, std::move(arena), request, + nullptr); + } + return opentelemetry::sdk::common::ExportResult::kFailure; + } + + std::shared_ptr async_data = MutableAsyncData(options); + return InternalDelegateAsyncExport(async_data, stub, std::move(context), std::move(arena), + std::move(request), std::move(result_callback), span_count, + "trace span(s)"); +} + +sdk::common::ExportResult OtlpGrpcClient::DelegateAsyncExport( + const OtlpGrpcClientOptions &options, + proto::collector::metrics::v1::MetricsService::StubInterface *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::metrics::v1::ExportMetricsServiceRequest &&request, + std::function &&, + const proto::collector::metrics::v1::ExportMetricsServiceRequest &, + proto::collector::metrics::v1::ExportMetricsServiceResponse *)> + &&result_callback) noexcept +{ + auto metrics_count = request.resource_metrics_size(); + if (is_shutdown_.load(std::memory_order_acquire)) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " + << metrics_count << " metric(s) failed, exporter is shutdown"); + if (result_callback) + { + result_callback(opentelemetry::sdk::common::ExportResult::kFailure, std::move(arena), request, + nullptr); + } + return opentelemetry::sdk::common::ExportResult::kFailure; + } + + std::shared_ptr async_data = MutableAsyncData(options); + return InternalDelegateAsyncExport(async_data, stub, std::move(context), std::move(arena), + std::move(request), std::move(result_callback), metrics_count, + "metric(s)"); +} + +sdk::common::ExportResult OtlpGrpcClient::DelegateAsyncExport( + const OtlpGrpcClientOptions &options, + proto::collector::logs::v1::LogsService::StubInterface *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::logs::v1::ExportLogsServiceRequest &&request, + std::function &&, + const proto::collector::logs::v1::ExportLogsServiceRequest &, + proto::collector::logs::v1::ExportLogsServiceResponse *)> + &&result_callback) noexcept +{ + auto logs_count = request.resource_logs_size(); + if (is_shutdown_.load(std::memory_order_acquire)) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " + << logs_count << " log(s) failed, exporter is shutdown"); + if (result_callback) + { + result_callback(opentelemetry::sdk::common::ExportResult::kFailure, std::move(arena), request, + nullptr); + } + return opentelemetry::sdk::common::ExportResult::kFailure; + } + + std::shared_ptr async_data = MutableAsyncData(options); + return InternalDelegateAsyncExport(async_data, stub, std::move(context), std::move(arena), + std::move(request), std::move(result_callback), logs_count, + "log(s)"); +} + +std::shared_ptr OtlpGrpcClient::MutableAsyncData( + const OtlpGrpcClientOptions &options) +{ + if (!async_data_) + { + async_data_ = std::make_shared(); + async_data_->export_timeout = options.timeout; + async_data_->max_concurrent_requests = options.max_concurrent_requests; + } + + return async_data_; +} + +bool OtlpGrpcClient::ForceFlush(std::chrono::microseconds timeout) noexcept +{ + if (!async_data_) + { + return true; + } + + std::size_t request_counter = async_data_->start_request_counter.load(std::memory_order_acquire); + if (request_counter <= async_data_->finished_request_counter.load(std::memory_order_acquire)) + { + return true; + } + + // ASAN will report chrono: runtime error: signed integer overflow: A + B cannot be represented + // in type 'long int' here. So we reset timeout to meet signed long int limit here. + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + + // Wait for all the sessions to finish + std::unique_lock lock(async_data_->session_waker_lock); + + std::chrono::steady_clock::duration timeout_steady = + std::chrono::duration_cast(timeout); + if (timeout_steady <= std::chrono::steady_clock::duration::zero()) + { + timeout_steady = std::chrono::steady_clock::duration::max(); + } + + while (timeout_steady > std::chrono::steady_clock::duration::zero() && + request_counter > async_data_->finished_request_counter.load(std::memory_order_acquire)) + { + // When changes of running_sessions_ and notify_one/notify_all happen between predicate + // checking and waiting, we should not wait forever.We should cleanup gc sessions here as soon + // as possible to call FinishSession() and cleanup resources. + std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); + if (std::cv_status::timeout != + async_data_->session_waker.wait_for(lock, async_data_->export_timeout)) + { + break; + } + + timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; + } + + return timeout_steady > std::chrono::steady_clock::duration::zero(); +} + +bool OtlpGrpcClient::Shutdown(std::chrono::microseconds timeout) noexcept +{ + if (!async_data_) + { + return true; + } + + if (false == is_shutdown_.exchange(true, std::memory_order_acq_rel)) + { + is_shutdown_ = true; + + async_data_->cq.Shutdown(); + } + + return ForceFlush(timeout); +} + +#endif + } // namespace otlp } // namespace exporter OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/otlp/src/otlp_grpc_exporter.cc b/exporters/otlp/src/otlp_grpc_exporter.cc index 30a9b63fe4..607c5ac7c1 100644 --- a/exporters/otlp/src/otlp_grpc_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_exporter.cc @@ -4,6 +4,7 @@ #include #include +#include "opentelemetry/common/macros.h" #include "opentelemetry/exporters/otlp/otlp_grpc_exporter.h" #include "opentelemetry/exporters/otlp/otlp_grpc_client.h" @@ -23,12 +24,20 @@ namespace otlp OtlpGrpcExporter::OtlpGrpcExporter() : OtlpGrpcExporter(OtlpGrpcExporterOptions()) {} OtlpGrpcExporter::OtlpGrpcExporter(const OtlpGrpcExporterOptions &options) - : options_(options), trace_service_stub_(OtlpGrpcClient::MakeTraceServiceStub(options)) + : options_(options), +#ifdef ENABLE_ASYNC_EXPORT + client_(std::make_shared()), +#endif + trace_service_stub_(OtlpGrpcClient::MakeTraceServiceStub(options)) {} OtlpGrpcExporter::OtlpGrpcExporter( std::unique_ptr stub) - : options_(OtlpGrpcExporterOptions()), trace_service_stub_(std::move(stub)) + : options_(OtlpGrpcExporterOptions()), +#ifdef ENABLE_ASYNC_EXPORT + client_(std::make_shared()), +#endif + trace_service_stub_(std::move(stub)) {} // ----------------------------- Exporter methods ------------------------------ @@ -52,37 +61,87 @@ sdk::common::ExportResult OtlpGrpcExporter::Export( return sdk::common::ExportResult::kSuccess; } - proto::collector::trace::v1::ExportTraceServiceRequest request; - OtlpRecordableUtils::PopulateRequest(spans, &request); + google::protobuf::ArenaOptions arena_options; + // It's easy to allocate datas larger than 1024 when we populate basic resource and attributes + arena_options.initial_block_size = 1024; + // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager + // block to reduce memory fragments. + arena_options.max_block_size = 65536; + std::unique_ptr arena{new google::protobuf::Arena{arena_options}}; - auto context = OtlpGrpcClient::MakeClientContext(options_); - proto::collector::trace::v1::ExportTraceServiceResponse response; + proto::collector::trace::v1::ExportTraceServiceRequest *request = + google::protobuf::Arena::Create( + arena.get()); + OtlpRecordableUtils::PopulateRequest(spans, request); - grpc::Status status = - OtlpGrpcClient::DelegateExport(trace_service_stub_.get(), context.get(), request, &response); + auto context = OtlpGrpcClient::MakeClientContext(options_); + proto::collector::trace::v1::ExportTraceServiceResponse *response = + google::protobuf::Arena::Create( + arena.get()); - if (!status.ok()) +#ifdef ENABLE_ASYNC_EXPORT + if (options_.max_concurrent_requests > 1) { - OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE GRPC Exporter] Export() failed with status_code: \"" - << grpc_utils::grpc_status_code_to_string(status.error_code()) - << "\" error_message: \"" << status.error_message() << "\""); - return sdk::common::ExportResult::kFailure; + return client_->DelegateAsyncExport( + options_, trace_service_stub_.get(), std::move(context), std::move(arena), + std::move(*request), + [](opentelemetry::sdk::common::ExportResult result, + std::unique_ptr &&, + const proto::collector::trace::v1::ExportTraceServiceRequest &request, + proto::collector::trace::v1::ExportTraceServiceResponse *) { + if (result != opentelemetry::sdk::common::ExportResult::kSuccess) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE GRPC Exporter] ERROR: Export " + << request.resource_spans_size() + << " trace span(s) error: " << static_cast(result)); + } + else + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP TRACE GRPC Exporter] Export " + << request.resource_spans_size() << " trace span(s) success"); + } + return true; + }); + } + else + { +#endif + grpc::Status status = + OtlpGrpcClient::DelegateExport(trace_service_stub_.get(), std::move(context), + std::move(arena), std::move(*request), response); + if (!status.ok()) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE GRPC Exporter] Export() failed with status_code: \"" + << grpc_utils::grpc_status_code_to_string(status.error_code()) + << "\" error_message: \"" << status.error_message() << "\""); + return sdk::common::ExportResult::kFailure; + } +#ifdef ENABLE_ASYNC_EXPORT } +#endif return sdk::common::ExportResult::kSuccess; } -bool OtlpGrpcExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept +bool OtlpGrpcExporter::ForceFlush( + OPENTELEMETRY_MAYBE_UNUSED std::chrono::microseconds timeout) noexcept { - // TODO: When we implement async exporting in OTLP gRPC exporter in the future, we need wait the - // running exporting finished here. +#ifdef ENABLE_ASYNC_EXPORT + return client_->ForceFlush(timeout); +#else return true; +#endif } -bool OtlpGrpcExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept +bool OtlpGrpcExporter::Shutdown( + OPENTELEMETRY_MAYBE_UNUSED std::chrono::microseconds timeout) noexcept { const std::lock_guard locked(lock_); is_shutdown_ = true; +#ifdef ENABLE_ASYNC_EXPORT + return client_->Shutdown(timeout); +#else return true; +#endif } bool OtlpGrpcExporter::isShutdown() const noexcept diff --git a/exporters/otlp/src/otlp_grpc_exporter_options.cc b/exporters/otlp/src/otlp_grpc_exporter_options.cc index 3099303ac4..c5dc94ec9d 100644 --- a/exporters/otlp/src/otlp_grpc_exporter_options.cc +++ b/exporters/otlp/src/otlp_grpc_exporter_options.cc @@ -29,6 +29,11 @@ OtlpGrpcExporterOptions::OtlpGrpcExporterOptions() timeout = GetOtlpDefaultTracesTimeout(); metadata = GetOtlpDefaultTracesHeaders(); user_agent = GetOtlpDefaultUserAgent(); + + max_threads = 0; +#ifdef ENABLE_ASYNC_EXPORT + max_concurrent_requests = 64; +#endif } OtlpGrpcExporterOptions::~OtlpGrpcExporterOptions() {} diff --git a/exporters/otlp/src/otlp_grpc_log_record_exporter.cc b/exporters/otlp/src/otlp_grpc_log_record_exporter.cc index 2b397262b0..f5971228e9 100644 --- a/exporters/otlp/src/otlp_grpc_log_record_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_log_record_exporter.cc @@ -4,6 +4,7 @@ #include #include +#include "opentelemetry/common/macros.h" #include "opentelemetry/exporters/otlp/otlp_grpc_client.h" #include "opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h" #include "opentelemetry/exporters/otlp/otlp_log_recordable.h" @@ -35,12 +36,20 @@ OtlpGrpcLogRecordExporter::OtlpGrpcLogRecordExporter() OtlpGrpcLogRecordExporter::OtlpGrpcLogRecordExporter( const OtlpGrpcLogRecordExporterOptions &options) - : options_(options), log_service_stub_(OtlpGrpcClient::MakeLogsServiceStub(options)) + : options_(options), +#ifdef ENABLE_ASYNC_EXPORT + client_(std::make_shared()), +#endif + log_service_stub_(OtlpGrpcClient::MakeLogsServiceStub(options)) {} OtlpGrpcLogRecordExporter::OtlpGrpcLogRecordExporter( std::unique_ptr stub) - : options_(OtlpGrpcLogRecordExporterOptions()), log_service_stub_(std::move(stub)) + : options_(OtlpGrpcLogRecordExporterOptions()), +#ifdef ENABLE_ASYNC_EXPORT + client_(std::make_shared()), +#endif + log_service_stub_(std::move(stub)) {} // ----------------------------- Exporter methods ------------------------------ @@ -65,35 +74,87 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogRecordExporter::Export( return sdk::common::ExportResult::kSuccess; } - proto::collector::logs::v1::ExportLogsServiceRequest request; - OtlpRecordableUtils::PopulateRequest(logs, &request); + google::protobuf::ArenaOptions arena_options; + // It's easy to allocate datas larger than 1024 when we populate basic resource and attributes + arena_options.initial_block_size = 1024; + // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager + // block to reduce memory fragments. + arena_options.max_block_size = 65536; + std::unique_ptr arena{new google::protobuf::Arena{arena_options}}; - auto context = OtlpGrpcClient::MakeClientContext(options_); - proto::collector::logs::v1::ExportLogsServiceResponse response; + proto::collector::logs::v1::ExportLogsServiceRequest *request = + google::protobuf::Arena::Create( + arena.get()); + OtlpRecordableUtils::PopulateRequest(logs, request); - grpc::Status status = - OtlpGrpcClient::DelegateExport(log_service_stub_.get(), context.get(), request, &response); + auto context = OtlpGrpcClient::MakeClientContext(options_); + proto::collector::logs::v1::ExportLogsServiceResponse *response = + google::protobuf::Arena::Create( + arena.get()); - if (!status.ok()) +#ifdef ENABLE_ASYNC_EXPORT + if (options_.max_concurrent_requests > 1) { - OTEL_INTERNAL_LOG_ERROR("[OTLP LOG GRPC Exporter] Export() failed: " << status.error_message()); - return sdk::common::ExportResult::kFailure; + return client_->DelegateAsyncExport( + options_, log_service_stub_.get(), std::move(context), std::move(arena), + std::move(*request), + [](opentelemetry::sdk::common::ExportResult result, + std::unique_ptr &&, + const proto::collector::logs::v1::ExportLogsServiceRequest &request, + proto::collector::logs::v1::ExportLogsServiceResponse *) { + if (result != opentelemetry::sdk::common::ExportResult::kSuccess) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP LOG GRPC Exporter] ERROR: Export " + << request.resource_logs_size() + << " log(s) error: " << static_cast(result)); + } + else + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP LOG GRPC Exporter] Export " + << request.resource_logs_size() << " log(s) success"); + } + return true; + }); + } + else + { +#endif + grpc::Status status = + OtlpGrpcClient::DelegateExport(log_service_stub_.get(), std::move(context), + std::move(arena), std::move(*request), response); + + if (!status.ok()) + { + OTEL_INTERNAL_LOG_ERROR( + "[OTLP LOG GRPC Exporter] Export() failed: " << status.error_message()); + return sdk::common::ExportResult::kFailure; + } +#ifdef ENABLE_ASYNC_EXPORT } +#endif return sdk::common::ExportResult::kSuccess; } -bool OtlpGrpcLogRecordExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept +bool OtlpGrpcLogRecordExporter::Shutdown( + OPENTELEMETRY_MAYBE_UNUSED std::chrono::microseconds timeout) noexcept { const std::lock_guard locked(lock_); is_shutdown_ = true; +#ifdef ENABLE_ASYNC_EXPORT + return client_->Shutdown(timeout); +#else return true; +#endif } -bool OtlpGrpcLogRecordExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept +bool OtlpGrpcLogRecordExporter::ForceFlush( + OPENTELEMETRY_MAYBE_UNUSED std::chrono::microseconds timeout) noexcept { - // TODO: When we implement async exporting in OTLP gRPC exporter in the future, we need wait the - // running exporting finished here. +#ifdef ENABLE_ASYNC_EXPORT + return client_->ForceFlush(timeout); +#else return true; +#endif } bool OtlpGrpcLogRecordExporter::isShutdown() const noexcept diff --git a/exporters/otlp/src/otlp_grpc_log_record_exporter_options.cc b/exporters/otlp/src/otlp_grpc_log_record_exporter_options.cc index 697ef28f2a..5925808abe 100644 --- a/exporters/otlp/src/otlp_grpc_log_record_exporter_options.cc +++ b/exporters/otlp/src/otlp_grpc_log_record_exporter_options.cc @@ -27,6 +27,11 @@ OtlpGrpcLogRecordExporterOptions::OtlpGrpcLogRecordExporterOptions() timeout = GetOtlpDefaultLogsTimeout(); metadata = GetOtlpDefaultLogsHeaders(); user_agent = GetOtlpDefaultUserAgent(); + + max_threads = 0; +#ifdef ENABLE_ASYNC_EXPORT + max_concurrent_requests = 64; +#endif } OtlpGrpcLogRecordExporterOptions::~OtlpGrpcLogRecordExporterOptions() {} diff --git a/exporters/otlp/src/otlp_grpc_metric_exporter.cc b/exporters/otlp/src/otlp_grpc_metric_exporter.cc index fd503f9836..1e0aeb29f9 100644 --- a/exporters/otlp/src/otlp_grpc_metric_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_metric_exporter.cc @@ -4,6 +4,7 @@ #include #include +#include "opentelemetry/common/macros.h" #include "opentelemetry/exporters/otlp/otlp_grpc_client.h" #include "opentelemetry/exporters/otlp/otlp_grpc_metric_exporter.h" #include "opentelemetry/exporters/otlp/otlp_metric_utils.h" @@ -23,6 +24,9 @@ OtlpGrpcMetricExporter::OtlpGrpcMetricExporter() OtlpGrpcMetricExporter::OtlpGrpcMetricExporter(const OtlpGrpcMetricExporterOptions &options) : options_(options), +#ifdef ENABLE_ASYNC_EXPORT + client_(std::make_shared()), +#endif aggregation_temporality_selector_{ OtlpMetricUtils::ChooseTemporalitySelector(options_.aggregation_temporality)}, metrics_service_stub_(OtlpGrpcClient::MakeMetricsServiceStub(options)) @@ -31,6 +35,9 @@ OtlpGrpcMetricExporter::OtlpGrpcMetricExporter(const OtlpGrpcMetricExporterOptio OtlpGrpcMetricExporter::OtlpGrpcMetricExporter( std::unique_ptr stub) : options_(OtlpGrpcMetricExporterOptions()), +#ifdef ENABLE_ASYNC_EXPORT + client_(std::make_shared()), +#endif aggregation_temporality_selector_{ OtlpMetricUtils::ChooseTemporalitySelector(options_.aggregation_temporality)}, metrics_service_stub_(std::move(stub)) @@ -60,35 +67,87 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcMetricExporter::Export( return sdk::common::ExportResult::kSuccess; } - proto::collector::metrics::v1::ExportMetricsServiceRequest request; - OtlpMetricUtils::PopulateRequest(data, &request); + google::protobuf::ArenaOptions arena_options; + // It's easy to allocate datas larger than 1024 when we populate basic resource and attributes + arena_options.initial_block_size = 1024; + // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager + // block to reduce memory fragments. + arena_options.max_block_size = 65536; + std::unique_ptr arena{new google::protobuf::Arena{arena_options}}; - auto context = OtlpGrpcClient::MakeClientContext(options_); - proto::collector::metrics::v1::ExportMetricsServiceResponse response; + proto::collector::metrics::v1::ExportMetricsServiceRequest *request = + google::protobuf::Arena::Create( + arena.get()); + OtlpMetricUtils::PopulateRequest(data, request); - grpc::Status status = OtlpGrpcClient::DelegateExport(metrics_service_stub_.get(), context.get(), - request, &response); + auto context = OtlpGrpcClient::MakeClientContext(options_); + proto::collector::metrics::v1::ExportMetricsServiceResponse *response = + google::protobuf::Arena::Create( + arena.get()); - if (!status.ok()) +#ifdef ENABLE_ASYNC_EXPORT + if (options_.max_concurrent_requests > 1) { - OTEL_INTERNAL_LOG_ERROR( - "[OTLP METRIC GRPC Exporter] Export() failed: " << status.error_message()); - return sdk::common::ExportResult::kFailure; + return client_->DelegateAsyncExport( + options_, metrics_service_stub_.get(), std::move(context), std::move(arena), + std::move(*request), + [](opentelemetry::sdk::common::ExportResult result, + std::unique_ptr &&, + const proto::collector::metrics::v1::ExportMetricsServiceRequest &request, + proto::collector::metrics::v1::ExportMetricsServiceResponse *) { + if (result != opentelemetry::sdk::common::ExportResult::kSuccess) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP METRIC GRPC Exporter] ERROR: Export " + << request.resource_metrics_size() + << " metric(s) error: " << static_cast(result)); + } + else + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP METRIC GRPC Exporter] Export " + << request.resource_metrics_size() << " metric(s) success"); + } + return true; + }); + } + else + { +#endif + grpc::Status status = + OtlpGrpcClient::DelegateExport(metrics_service_stub_.get(), std::move(context), + std::move(arena), std::move(*request), response); + + if (!status.ok()) + { + OTEL_INTERNAL_LOG_ERROR( + "[OTLP METRIC GRPC Exporter] Export() failed: " << status.error_message()); + return sdk::common::ExportResult::kFailure; + } +#ifdef ENABLE_ASYNC_EXPORT } +#endif return opentelemetry::sdk::common::ExportResult::kSuccess; } -bool OtlpGrpcMetricExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept +bool OtlpGrpcMetricExporter::ForceFlush( + OPENTELEMETRY_MAYBE_UNUSED std::chrono::microseconds timeout) noexcept { - // TODO: OTLP gRPC exporter does not support concurrency exporting now. +#ifdef ENABLE_ASYNC_EXPORT + return client_->ForceFlush(timeout); +#else return true; +#endif } -bool OtlpGrpcMetricExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept +bool OtlpGrpcMetricExporter::Shutdown( + OPENTELEMETRY_MAYBE_UNUSED std::chrono::microseconds timeout) noexcept { const std::lock_guard locked(lock_); is_shutdown_ = true; +#ifdef ENABLE_ASYNC_EXPORT + return client_->Shutdown(timeout); +#else return true; +#endif } bool OtlpGrpcMetricExporter::isShutdown() const noexcept diff --git a/exporters/otlp/src/otlp_grpc_metric_exporter_options.cc b/exporters/otlp/src/otlp_grpc_metric_exporter_options.cc index cb99d1cc5a..ff8466b0b2 100644 --- a/exporters/otlp/src/otlp_grpc_metric_exporter_options.cc +++ b/exporters/otlp/src/otlp_grpc_metric_exporter_options.cc @@ -29,6 +29,11 @@ OtlpGrpcMetricExporterOptions::OtlpGrpcMetricExporterOptions() user_agent = GetOtlpDefaultUserAgent(); aggregation_temporality = PreferredAggregationTemporality::kCumulative; + + max_threads = 0; +#ifdef ENABLE_ASYNC_EXPORT + max_concurrent_requests = 64; +#endif } OtlpGrpcMetricExporterOptions::~OtlpGrpcMetricExporterOptions() {} diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index 6059db36a4..60f037f07a 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -7,6 +7,7 @@ #include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" +#include "google/protobuf/arena.h" #include "opentelemetry/proto/collector/trace/v1/trace_service.pb.h" #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" @@ -81,7 +82,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export( if (http_client_->IsShutdown()) { std::size_t span_count = spans.size(); - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE HTTP Exporter] ERROR: Export " << span_count << " trace span(s) failed, exporter is shutdown"); return opentelemetry::sdk::common::ExportResult::kFailure; } @@ -91,36 +92,47 @@ opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export( return opentelemetry::sdk::common::ExportResult::kSuccess; } - proto::collector::trace::v1::ExportTraceServiceRequest service_request; - OtlpRecordableUtils::PopulateRequest(spans, &service_request); + google::protobuf::ArenaOptions arena_options; + // It's easy to allocate datas larger than 1024 when we populate basic resource and attributes + arena_options.initial_block_size = 1024; + // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager + // block to reduce memory fragments. + arena_options.max_block_size = 65536; + google::protobuf::Arena arena{arena_options}; + + proto::collector::trace::v1::ExportTraceServiceRequest *service_request = + google::protobuf::Arena::Create( + &arena); + OtlpRecordableUtils::PopulateRequest(spans, service_request); std::size_t span_count = spans.size(); #ifdef ENABLE_ASYNC_EXPORT http_client_->Export( - service_request, [span_count](opentelemetry::sdk::common::ExportResult result) { + *service_request, [span_count](opentelemetry::sdk::common::ExportResult result) { if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE HTTP Exporter] ERROR: Export " << span_count << " trace span(s) error: " << static_cast(result)); } else { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << span_count - << " trace span(s) success"); + OTEL_INTERNAL_LOG_DEBUG("[OTLP TRACE HTTP Exporter] Export " << span_count + << " trace span(s) success"); } return true; }); return opentelemetry::sdk::common::ExportResult::kSuccess; #else - opentelemetry::sdk::common::ExportResult result = http_client_->Export(service_request); + opentelemetry::sdk::common::ExportResult result = http_client_->Export(*service_request); if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE HTTP Exporter] ERROR: Export " << span_count << " trace span(s) error: " << static_cast(result)); } else { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << span_count << " trace span(s) success"); + OTEL_INTERNAL_LOG_DEBUG("[OTLP TRACE HTTP Exporter] Export " << span_count + << " trace span(s) success"); } return opentelemetry::sdk::common::ExportResult::kSuccess; #endif diff --git a/exporters/otlp/src/otlp_http_log_record_exporter.cc b/exporters/otlp/src/otlp_http_log_record_exporter.cc index e16c5d0008..8530421f82 100644 --- a/exporters/otlp/src/otlp_http_log_record_exporter.cc +++ b/exporters/otlp/src/otlp_http_log_record_exporter.cc @@ -7,6 +7,7 @@ #include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" +#include "google/protobuf/arena.h" #include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h" #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" @@ -85,7 +86,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpLogRecordExporter::Export( if (http_client_->IsShutdown()) { std::size_t log_count = logs.size(); - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + OTEL_INTERNAL_LOG_ERROR("[OTLP LOG HTTP Exporter] ERROR: Export " << log_count << " log(s) failed, exporter is shutdown"); return opentelemetry::sdk::common::ExportResult::kFailure; } @@ -94,34 +95,44 @@ opentelemetry::sdk::common::ExportResult OtlpHttpLogRecordExporter::Export( { return opentelemetry::sdk::common::ExportResult::kSuccess; } - proto::collector::logs::v1::ExportLogsServiceRequest service_request; - OtlpRecordableUtils::PopulateRequest(logs, &service_request); + + google::protobuf::ArenaOptions arena_options; + // It's easy to allocate datas larger than 1024 when we populate basic resource and attributes + arena_options.initial_block_size = 1024; + // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager + // block to reduce memory fragments. + arena_options.max_block_size = 65536; + google::protobuf::Arena arena{arena_options}; + + proto::collector::logs::v1::ExportLogsServiceRequest *service_request = + google::protobuf::Arena::Create(&arena); + OtlpRecordableUtils::PopulateRequest(logs, service_request); std::size_t log_count = logs.size(); #ifdef ENABLE_ASYNC_EXPORT - http_client_->Export( - service_request, [log_count](opentelemetry::sdk::common::ExportResult result) { - if (result != opentelemetry::sdk::common::ExportResult::kSuccess) - { - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " - << log_count << " log(s) error: " << static_cast(result)); - } - else - { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << log_count << " log(s) success"); - } - return true; - }); + http_client_->Export(*service_request, [log_count]( + opentelemetry::sdk::common::ExportResult result) { + if (result != opentelemetry::sdk::common::ExportResult::kSuccess) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP LOG HTTP Exporter] ERROR: Export " + << log_count << " log(s) error: " << static_cast(result)); + } + else + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP LOG HTTP Exporter] Export " << log_count << " log(s) success"); + } + return true; + }); return opentelemetry::sdk::common::ExportResult::kSuccess; #else - opentelemetry::sdk::common::ExportResult result = http_client_->Export(service_request); + opentelemetry::sdk::common::ExportResult result = http_client_->Export(*service_request); if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + OTEL_INTERNAL_LOG_ERROR("[OTLP LOG HTTP Exporter] ERROR: Export " << log_count << " log(s) error: " << static_cast(result)); } else { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << log_count << " log(s) success"); + OTEL_INTERNAL_LOG_DEBUG("[OTLP LOG HTTP Exporter] Export " << log_count << " log(s) success"); } return opentelemetry::sdk::common::ExportResult::kSuccess; #endif diff --git a/exporters/otlp/src/otlp_http_metric_exporter.cc b/exporters/otlp/src/otlp_http_metric_exporter.cc index 816f4eee44..00dbf2c6a3 100644 --- a/exporters/otlp/src/otlp_http_metric_exporter.cc +++ b/exporters/otlp/src/otlp_http_metric_exporter.cc @@ -6,6 +6,7 @@ #include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" +#include "google/protobuf/arena.h" #include "opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h" #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" @@ -88,7 +89,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpMetricExporter::Export( if (http_client_->IsShutdown()) { std::size_t metric_count = data.scope_metric_data_.size(); - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + OTEL_INTERNAL_LOG_ERROR("[OTLP METRIC HTTP Exporter] ERROR: Export " << metric_count << " metric(s) failed, exporter is shutdown"); return opentelemetry::sdk::common::ExportResult::kFailure; } @@ -97,34 +98,47 @@ opentelemetry::sdk::common::ExportResult OtlpHttpMetricExporter::Export( { return opentelemetry::sdk::common::ExportResult::kSuccess; } - proto::collector::metrics::v1::ExportMetricsServiceRequest service_request; - OtlpMetricUtils::PopulateRequest(data, &service_request); + + google::protobuf::ArenaOptions arena_options; + // It's easy to allocate datas larger than 1024 when we populate basic resource and attributes + arena_options.initial_block_size = 1024; + // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager + // block to reduce memory fragments. + arena_options.max_block_size = 65536; + google::protobuf::Arena arena{arena_options}; + + proto::collector::metrics::v1::ExportMetricsServiceRequest *service_request = + google::protobuf::Arena::Create( + &arena); + OtlpMetricUtils::PopulateRequest(data, service_request); std::size_t metric_count = data.scope_metric_data_.size(); #ifdef ENABLE_ASYNC_EXPORT - http_client_->Export(service_request, [metric_count]( - opentelemetry::sdk::common::ExportResult result) { + http_client_->Export(*service_request, [metric_count]( + opentelemetry::sdk::common::ExportResult result) { if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + OTEL_INTERNAL_LOG_ERROR("[OTLP METRIC HTTP Exporter] ERROR: Export " << metric_count << " metric(s) error: " << static_cast(result)); } else { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << metric_count << " metric(s) success"); + OTEL_INTERNAL_LOG_DEBUG("[OTLP METRIC HTTP Exporter] Export " << metric_count + << " metric(s) success"); } return true; }); return opentelemetry::sdk::common::ExportResult::kSuccess; #else - opentelemetry::sdk::common::ExportResult result = http_client_->Export(service_request); + opentelemetry::sdk::common::ExportResult result = http_client_->Export(*service_request); if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + OTEL_INTERNAL_LOG_ERROR("[OTLP METRIC HTTP Exporter] ERROR: Export " << metric_count << " metric(s) error: " << static_cast(result)); } else { - OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << metric_count << " metric(s) success"); + OTEL_INTERNAL_LOG_DEBUG("[OTLP METRIC HTTP Exporter] Export " << metric_count + << " metric(s) success"); } return opentelemetry::sdk::common::ExportResult::kSuccess; #endif diff --git a/exporters/otlp/test/otlp_grpc_exporter_benchmark.cc b/exporters/otlp/test/otlp_grpc_exporter_benchmark.cc index b4a23c04a5..e03985e3ae 100644 --- a/exporters/otlp/test/otlp_grpc_exporter_benchmark.cc +++ b/exporters/otlp/test/otlp_grpc_exporter_benchmark.cc @@ -49,6 +49,59 @@ const trace_api::SpanContext kSpanContext{ // Create a fake service stub to avoid dependency on gmock class FakeServiceStub : public proto::collector::trace::v1::TraceService::StubInterface { +public: +#if defined(GRPC_CPP_VERSION_MAJOR) && \ + (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 + using async_interface_base = + proto::collector::trace::v1::TraceService::StubInterface::async_interface; +#else + using async_interface_base = + proto::collector::trace::v1::TraceService::StubInterface::experimental_async_interface; +#endif + + FakeServiceStub() : async_interface_(this) {} + + class async_interface : public async_interface_base + { + public: + async_interface(FakeServiceStub *owner) : stub_(owner) {} + + virtual ~async_interface() {} + + void Export( + ::grpc::ClientContext *context, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest *request, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse *response, + std::function callback) override + { + callback(stub_->Export(context, *request, response)); + } + +#if defined(GRPC_CPP_VERSION_MAJOR) && \ + (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \ + defined(GRPC_CALLBACK_API_NONEXPERIMENTAL) + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest * /*request*/, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse * /*response*/, + ::grpc::ClientUnaryReactor * /*reactor*/) override + {} +#else + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest * /*request*/, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse * /*response*/, + ::grpc::experimental::ClientUnaryReactor * /*reactor*/) + {} +#endif + + private: + FakeServiceStub *stub_; + }; + + async_interface_base *async() { return &async_interface_; } + async_interface_base *experimental_async() { return &async_interface_; } + grpc::Status Export(grpc::ClientContext *, const proto::collector::trace::v1::ExportTraceServiceRequest &, proto::collector::trace::v1::ExportTraceServiceResponse *) override @@ -71,6 +124,9 @@ class FakeServiceStub : public proto::collector::trace::v1::TraceService::StubIn { return nullptr; } + +private: + async_interface async_interface_; }; // OtlpGrpcExporterTestPeer is a friend class of OtlpGrpcExporter diff --git a/exporters/otlp/test/otlp_grpc_exporter_test.cc b/exporters/otlp/test/otlp_grpc_exporter_test.cc index 3be2bcc653..b8c651a8fe 100644 --- a/exporters/otlp/test/otlp_grpc_exporter_test.cc +++ b/exporters/otlp/test/otlp_grpc_exporter_test.cc @@ -29,6 +29,7 @@ # include "opentelemetry/sdk/trace/tracer_provider.h" # include "opentelemetry/trace/provider.h" +# include # include # if defined(_MSC_VER) @@ -45,6 +46,74 @@ namespace exporter namespace otlp { +namespace +{ +class OtlpMockTraceServiceStub : public proto::collector::trace::v1::MockTraceServiceStub +{ +public: +// Some old toolchains can only use gRPC 1.33 and it's experimental. +# if defined(GRPC_CPP_VERSION_MAJOR) && \ + (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 + using async_interface_base = + proto::collector::trace::v1::TraceService::StubInterface::async_interface; +# else + using async_interface_base = + proto::collector::trace::v1::TraceService::StubInterface::experimental_async_interface; +# endif + + OtlpMockTraceServiceStub() : async_interface_(this) {} + + class async_interface : public async_interface_base + { + public: + async_interface(OtlpMockTraceServiceStub *owner) : stub_(owner) {} + + virtual ~async_interface() {} + + void Export( + ::grpc::ClientContext *context, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest *request, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse *response, + std::function callback) override + { + stub_->last_async_status_ = stub_->Export(context, *request, response); + callback(stub_->last_async_status_); + } + +// Some old toolchains can only use gRPC 1.33 and it's experimental. +# if defined(GRPC_CPP_VERSION_MAJOR) && \ + (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \ + defined(GRPC_CALLBACK_API_NONEXPERIMENTAL) + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest * /*request*/, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse * /*response*/, + ::grpc::ClientUnaryReactor * /*reactor*/) override + {} +# else + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest * /*request*/, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse * /*response*/, + ::grpc::experimental::ClientUnaryReactor * /*reactor*/) + {} +# endif + + private: + OtlpMockTraceServiceStub *stub_; + }; + + async_interface_base *async() { return &async_interface_; } + async_interface_base *experimental_async() { return &async_interface_; } + + ::grpc::Status GetLastAsyncStatus() const noexcept { return last_async_status_; } + +private: + ::grpc::Status last_async_status_; + async_interface async_interface_; +}; +} // namespace + class OtlpGrpcExporterTestPeer : public ::testing::Test { public: @@ -64,7 +133,7 @@ class OtlpGrpcExporterTestPeer : public ::testing::Test TEST_F(OtlpGrpcExporterTestPeer, ShutdownTest) { - auto mock_stub = new proto::collector::trace::v1::MockTraceServiceStub(); + auto mock_stub = new OtlpMockTraceServiceStub(); std::unique_ptr stub_interface( mock_stub); auto exporter = GetExporter(stub_interface); @@ -78,6 +147,7 @@ TEST_F(OtlpGrpcExporterTestPeer, ShutdownTest) nostd::span> batch_1(&recordable_1, 1); EXPECT_CALL(*mock_stub, Export(_, _, _)).Times(Exactly(1)).WillOnce(Return(grpc::Status::OK)); auto result = exporter->Export(batch_1); + exporter->ForceFlush(); EXPECT_EQ(sdk::common::ExportResult::kSuccess, result); exporter->Shutdown(); @@ -90,7 +160,7 @@ TEST_F(OtlpGrpcExporterTestPeer, ShutdownTest) // Call Export() directly TEST_F(OtlpGrpcExporterTestPeer, ExportUnitTest) { - auto mock_stub = new proto::collector::trace::v1::MockTraceServiceStub(); + auto mock_stub = new OtlpMockTraceServiceStub(); std::unique_ptr stub_interface( mock_stub); auto exporter = GetExporter(stub_interface); @@ -112,13 +182,19 @@ TEST_F(OtlpGrpcExporterTestPeer, ExportUnitTest) .Times(Exactly(1)) .WillOnce(Return(grpc::Status::CANCELLED)); result = exporter->Export(batch_2); + exporter->ForceFlush(); +# if defined(ENABLE_ASYNC_EXPORT) + EXPECT_EQ(sdk::common::ExportResult::kSuccess, result); + EXPECT_FALSE(mock_stub->GetLastAsyncStatus().ok()); +# else EXPECT_EQ(sdk::common::ExportResult::kFailure, result); +# endif } // Create spans, let processor call Export() TEST_F(OtlpGrpcExporterTestPeer, ExportIntegrationTest) { - auto mock_stub = new proto::collector::trace::v1::MockTraceServiceStub(); + auto mock_stub = new OtlpMockTraceServiceStub(); std::unique_ptr stub_interface( mock_stub); diff --git a/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc b/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc index 6a31639219..c0c04c577a 100644 --- a/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc +++ b/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc @@ -25,6 +25,7 @@ #include "opentelemetry/sdk/trace/tracer_provider_factory.h" #include "opentelemetry/trace/provider.h" +#include #include #if defined(_MSC_VER) @@ -41,6 +42,135 @@ namespace exporter namespace otlp { +namespace +{ +class OtlpMockTraceServiceStub : public proto::collector::trace::v1::MockTraceServiceStub +{ +public: +#if defined(GRPC_CPP_VERSION_MAJOR) && \ + (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 + using async_interface_base = + proto::collector::trace::v1::TraceService::StubInterface::async_interface; +#else + using async_interface_base = + proto::collector::trace::v1::TraceService::StubInterface::experimental_async_interface; +#endif + + OtlpMockTraceServiceStub() : async_interface_(this) {} + + class async_interface : public async_interface_base + { + public: + async_interface(OtlpMockTraceServiceStub *owner) : stub_(owner) {} + + virtual ~async_interface() {} + + void Export( + ::grpc::ClientContext *context, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest *request, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse *response, + std::function callback) override + { + stub_->last_async_status_ = stub_->Export(context, *request, response); + callback(stub_->last_async_status_); + } + +#if defined(GRPC_CPP_VERSION_MAJOR) && \ + (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \ + defined(GRPC_CALLBACK_API_NONEXPERIMENTAL) + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest * /*request*/, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse * /*response*/, + ::grpc::ClientUnaryReactor * /*reactor*/) override + {} +#else + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest * /*request*/, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse * /*response*/, + ::grpc::experimental::ClientUnaryReactor * /*reactor*/) + {} +#endif + + private: + OtlpMockTraceServiceStub *stub_; + }; + + async_interface_base *async() { return &async_interface_; } + async_interface_base *experimental_async() { return &async_interface_; } + + ::grpc::Status GetLastAsyncStatus() const noexcept { return last_async_status_; } + +private: + ::grpc::Status last_async_status_; + async_interface async_interface_; +}; + +class OtlpMockLogsServiceStub : public proto::collector::logs::v1::MockLogsServiceStub +{ +public: +#if defined(GRPC_CPP_VERSION_MAJOR) && \ + (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 + using async_interface_base = + proto::collector::logs::v1::LogsService::StubInterface::async_interface; +#else + using async_interface_base = + proto::collector::logs::v1::LogsService::StubInterface::experimental_async_interface; +#endif + + OtlpMockLogsServiceStub() : async_interface_(this) {} + + class async_interface : public async_interface_base + { + public: + async_interface(OtlpMockLogsServiceStub *owner) : stub_(owner) {} + + virtual ~async_interface() {} + + void Export( + ::grpc::ClientContext *context, + const ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest *request, + ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse *response, + std::function callback) override + { + stub_->last_async_status_ = stub_->Export(context, *request, response); + callback(stub_->last_async_status_); + } + +#if defined(GRPC_CPP_VERSION_MAJOR) && \ + (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \ + defined(GRPC_CALLBACK_API_NONEXPERIMENTAL) + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest * /*request*/, + ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse * /*response*/, + ::grpc::ClientUnaryReactor * /*reactor*/) override + {} +#else + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest * /*request*/, + ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse * /*response*/, + ::grpc::experimental::ClientUnaryReactor * /*reactor*/) + {} +#endif + + private: + OtlpMockLogsServiceStub *stub_; + }; + + async_interface_base *async() { return &async_interface_; } + async_interface_base *experimental_async() { return &async_interface_; } + + ::grpc::Status GetLastAsyncStatus() const noexcept { return last_async_status_; } + +private: + ::grpc::Status last_async_status_; + async_interface async_interface_; +}; +} // namespace + class OtlpGrpcLogRecordExporterTestPeer : public ::testing::Test { public: @@ -68,7 +198,7 @@ class OtlpGrpcLogRecordExporterTestPeer : public ::testing::Test TEST_F(OtlpGrpcLogRecordExporterTestPeer, ShutdownTest) { - auto mock_stub = new proto::collector::logs::v1::MockLogsServiceStub(); + auto mock_stub = new OtlpMockLogsServiceStub(); std::unique_ptr stub_interface(mock_stub); auto exporter = GetExporter(stub_interface); @@ -77,12 +207,10 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ShutdownTest) // exporter shuold not be shutdown by default nostd::span> batch_1(&recordable_1, 1); - EXPECT_CALL(*mock_stub, Export(_, _, _)) - .Times(Exactly(1)) - .WillOnce(Return(grpc::Status::OK)) - .WillOnce(Return(grpc::Status::CANCELLED)); + EXPECT_CALL(*mock_stub, Export(_, _, _)).Times(Exactly(1)).WillOnce(Return(grpc::Status::OK)); auto result = exporter->Export(batch_1); + exporter->ForceFlush(); EXPECT_EQ(sdk::common::ExportResult::kSuccess, result); exporter->Shutdown(); @@ -95,7 +223,7 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ShutdownTest) // Call Export() directly TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportUnitTest) { - auto mock_stub = new proto::collector::logs::v1::MockLogsServiceStub(); + auto mock_stub = new OtlpMockLogsServiceStub(); std::unique_ptr stub_interface(mock_stub); auto exporter = GetExporter(stub_interface); @@ -106,6 +234,7 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportUnitTest) nostd::span> batch_1(&recordable_1, 1); EXPECT_CALL(*mock_stub, Export(_, _, _)).Times(Exactly(1)).WillOnce(Return(grpc::Status::OK)); auto result = exporter->Export(batch_1); + exporter->ForceFlush(); EXPECT_EQ(sdk::common::ExportResult::kSuccess, result); // Test failed RPC @@ -114,13 +243,19 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportUnitTest) .Times(Exactly(1)) .WillOnce(Return(grpc::Status::CANCELLED)); result = exporter->Export(batch_2); + exporter->ForceFlush(); +#if defined(ENABLE_ASYNC_EXPORT) + EXPECT_EQ(sdk::common::ExportResult::kSuccess, result); + EXPECT_FALSE(mock_stub->GetLastAsyncStatus().ok()); +#else EXPECT_EQ(sdk::common::ExportResult::kFailure, result); +#endif } // Create spans, let processor call Export() TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportIntegrationTest) { - auto mock_stub = new proto::collector::logs::v1::MockLogsServiceStub(); + auto mock_stub = new OtlpMockLogsServiceStub(); std::unique_ptr stub_interface(mock_stub); auto exporter = GetExporter(stub_interface); @@ -149,7 +284,7 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportIntegrationTest) '3', '2', '1', '0'}; opentelemetry::trace::SpanId span_id{span_id_bin}; - auto trace_mock_stub = new proto::collector::trace::v1::MockTraceServiceStub(); + auto trace_mock_stub = new OtlpMockTraceServiceStub(); std::unique_ptr trace_stub_interface( trace_mock_stub); diff --git a/exporters/otlp/test/otlp_grpc_metric_exporter_test.cc b/exporters/otlp/test/otlp_grpc_metric_exporter_test.cc index dc6dde9d79..6689233a9d 100644 --- a/exporters/otlp/test/otlp_grpc_metric_exporter_test.cc +++ b/exporters/otlp/test/otlp_grpc_metric_exporter_test.cc @@ -29,6 +29,7 @@ # include "opentelemetry/sdk/trace/tracer_provider.h" # include "opentelemetry/trace/provider.h" +# include # include # if defined(_MSC_VER)