diff --git a/api/include/opentelemetry/common/macros.h b/api/include/opentelemetry/common/macros.h index dd40c63bfa..c3b9fb82c0 100644 --- a/api/include/opentelemetry/common/macros.h +++ b/api/include/opentelemetry/common/macros.h @@ -7,9 +7,7 @@ // GCC 9 has likely attribute but do not support declare it at the beginning of statement # if defined(__has_cpp_attribute) && (defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 9) # if __has_cpp_attribute(likely) -# define OPENTELEMETRY_LIKELY_IF(...) \ - if (__VA_ARGS__) \ - [[likely]] +# define OPENTELEMETRY_LIKELY_IF(...) if (__VA_ARGS__) [[likely]] # endif # endif @@ -164,16 +162,19 @@ point. #if defined(__clang__) # define OPENTELEMETRY_API_SINGLETON __attribute__((visibility("default"))) +# define OPENTELEMETRY_LOCAL_SYMBOL __attribute__((visibility("hidden"))) #elif defined(__GNUC__) # define OPENTELEMETRY_API_SINGLETON __attribute__((visibility("default"))) +# define OPENTELEMETRY_LOCAL_SYMBOL __attribute__((visibility("hidden"))) #else /* Add support for other compilers here. */ # define OPENTELEMETRY_API_SINGLETON +# define OPENTELEMETRY_LOCAL_SYMBOL #endif diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h index 73447ce123..924a94e466 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h @@ -3,10 +3,13 @@ #pragma once +#include #include #include +#include "opentelemetry/sdk/common/exporter_utils.h" + #include "opentelemetry/exporters/otlp/otlp_grpc_client_options.h" #include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" @@ -26,12 +29,21 @@ namespace otlp struct OtlpGrpcClientOptions; +#ifdef ENABLE_ASYNC_EXPORT +struct OtlpGrpcClientAsyncData; +#endif + /** * The OTLP gRPC client contains utility functions of gRPC. */ class OtlpGrpcClient { public: +#ifdef ENABLE_ASYNC_EXPORT + OtlpGrpcClient(); + ~OtlpGrpcClient(); +#endif + /** * Create gRPC channel from the exporter options. */ @@ -68,21 +80,109 @@ class OtlpGrpcClient static grpc::Status DelegateExport( proto::collector::trace::v1::TraceService::StubInterface *stub, - grpc::ClientContext *context, - const proto::collector::trace::v1::ExportTraceServiceRequest &request, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::trace::v1::ExportTraceServiceRequest &&request, proto::collector::trace::v1::ExportTraceServiceResponse *response); static grpc::Status DelegateExport( proto::collector::metrics::v1::MetricsService::StubInterface *stub, - grpc::ClientContext *context, - const proto::collector::metrics::v1::ExportMetricsServiceRequest &request, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::metrics::v1::ExportMetricsServiceRequest &&request, proto::collector::metrics::v1::ExportMetricsServiceResponse *response); static grpc::Status DelegateExport( proto::collector::logs::v1::LogsService::StubInterface *stub, - grpc::ClientContext *context, - const proto::collector::logs::v1::ExportLogsServiceRequest &request, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::logs::v1::ExportLogsServiceRequest &&request, proto::collector::logs::v1::ExportLogsServiceResponse *response); + +#ifdef ENABLE_ASYNC_EXPORT + + /** + * Async export + * @param options Options used to message to create gRPC context and stub(if necessary) + * @param arena Protobuf arena to hold lifetime of all messages + * @param request Request for this RPC + * @param result_callback callback to call when the exporting is done + * @return return the status of this operation + */ + sdk::common::ExportResult DelegateAsyncExport( + const OtlpGrpcClientOptions &options, + proto::collector::trace::v1::TraceService::StubInterface *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::trace::v1::ExportTraceServiceRequest &&request, + std::function &&, + const proto::collector::trace::v1::ExportTraceServiceRequest &, + proto::collector::trace::v1::ExportTraceServiceResponse *)> + &&result_callback) noexcept; + + /** + * Async export + * @param options Options used to message to create gRPC context and stub(if necessary) + * @param arena Protobuf arena to hold lifetime of all messages + * @param request Request for this RPC + * @param result_callback callback to call when the exporting is done + * @return return the status of this operation + */ + sdk::common::ExportResult DelegateAsyncExport( + const OtlpGrpcClientOptions &options, + proto::collector::metrics::v1::MetricsService::StubInterface *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::metrics::v1::ExportMetricsServiceRequest &&request, + std::function &&, + const proto::collector::metrics::v1::ExportMetricsServiceRequest &, + proto::collector::metrics::v1::ExportMetricsServiceResponse *)> + &&result_callback) noexcept; + + /** + * Async export + * @param options Options used to message to create gRPC context and stub(if necessary) + * @param arena Protobuf arena to hold lifetime of all messages + * @param request Request for this RPC + * @param result_callback callback to call when the exporting is done + * @return return the status of this operation + */ + sdk::common::ExportResult DelegateAsyncExport( + const OtlpGrpcClientOptions &options, + proto::collector::logs::v1::LogsService::StubInterface *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::logs::v1::ExportLogsServiceRequest &&request, + std::function &&, + const proto::collector::logs::v1::ExportLogsServiceRequest &, + proto::collector::logs::v1::ExportLogsServiceResponse *)> + &&result_callback) noexcept; + + /** + * Force flush the gRPC client. + */ + bool ForceFlush(std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept; + + /** + * Shut down the gRPC client. + * @param timeout an optional timeout, the default timeout of 0 means that no + * timeout is applied. + * @return return the status of this operation + */ + bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept; + + std::shared_ptr MutableAsyncData(const OtlpGrpcClientOptions &options); + +private: + // Stores if this gRPC client had its Shutdown() method called + bool is_shutdown_; + + // Stores shared data between threads of this gRPC client + std::shared_ptr async_data_; +#endif }; } // namespace otlp } // namespace exporter diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client_options.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client_options.h index 310fc94d4d..1191c2118a 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client_options.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client_options.h @@ -51,6 +51,14 @@ struct OtlpGrpcClientOptions /** User agent. */ std::string user_agent; + + /** max number of threads that can be allocated from this */ + std::size_t max_threads; + +#ifdef ENABLE_ASYNC_EXPORT + // Concurrent requests + std::size_t max_concurrent_requests; +#endif }; } // namespace otlp diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h index 7aff1e24a5..393b54f6f4 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h @@ -23,6 +23,8 @@ namespace exporter namespace otlp { +class OtlpGrpcClient; + /** * The OTLP exporter exports span data in OpenTelemetry Protocol (OTLP) format. */ @@ -73,6 +75,10 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter // The configuration options associated with this exporter. const OtlpGrpcExporterOptions options_; +#ifdef ENABLE_ASYNC_EXPORT + std::shared_ptr client_; +#endif + // For testing friend class OtlpGrpcExporterTestPeer; friend class OtlpGrpcLogRecordExporterTestPeer; diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h index 29333703b1..149234f9fa 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h @@ -22,6 +22,8 @@ namespace exporter namespace otlp { +class OtlpGrpcClient; + /** * The OTLP exporter exports log data in OpenTelemetry Protocol (OTLP) format in gRPC. */ @@ -73,6 +75,10 @@ class OtlpGrpcLogRecordExporter : public opentelemetry::sdk::logs::LogRecordExpo // Configuration options for the exporter const OtlpGrpcLogRecordExporterOptions options_; +#ifdef ENABLE_ASYNC_EXPORT + std::shared_ptr client_; +#endif + // For testing friend class OtlpGrpcLogRecordExporterTestPeer; diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_metric_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_metric_exporter.h index 5f975c8ce3..1385c381ec 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_metric_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_metric_exporter.h @@ -22,6 +22,8 @@ namespace exporter namespace otlp { +class OtlpGrpcClient; + /** * The OTLP exporter exports metrics data in OpenTelemetry Protocol (OTLP) format in gRPC. */ @@ -59,6 +61,10 @@ class OtlpGrpcMetricExporter : public opentelemetry::sdk::metrics::PushMetricExp // The configuration options associated with this exporter. const OtlpGrpcMetricExporterOptions options_; +#ifdef ENABLE_ASYNC_EXPORT + std::shared_ptr client_; +#endif + // Aggregation Temporality selector const sdk::metrics::AggregationTemporalitySelector aggregation_temporality_selector_; diff --git a/exporters/otlp/src/otlp_grpc_client.cc b/exporters/otlp/src/otlp_grpc_client.cc index c0436eddaf..7a6cf4ccc1 100644 --- a/exporters/otlp/src/otlp_grpc_client.cc +++ b/exporters/otlp/src/otlp_grpc_client.cc @@ -9,12 +9,19 @@ # include #endif +#include +#include #include #include #include +#include #include +#include +#include +#include "opentelemetry/common/timestamp.h" #include "opentelemetry/ext/http/common/url_parser.h" +#include "opentelemetry/nostd/function_ref.h" #include "opentelemetry/sdk/common/global_log_handler.h" OPENTELEMETRY_BEGIN_NAMESPACE @@ -23,6 +30,77 @@ namespace exporter namespace otlp { +#ifdef ENABLE_ASYNC_EXPORT + +namespace +{ +struct OPENTELEMETRY_LOCAL_SYMBOL OtlpGrpcAsyncCallDataBase +{ + using GrpcAsyncCallback = bool (*)(OtlpGrpcAsyncCallDataBase *); + + std::unique_ptr arena; + grpc::Status grpc_status; + std::unique_ptr grpc_context; + + opentelemetry::sdk::common::ExportResult export_result = + opentelemetry::sdk::common::ExportResult::kFailure; + GrpcAsyncCallback grpc_async_callback = nullptr; + + OtlpGrpcAsyncCallDataBase() {} + virtual ~OtlpGrpcAsyncCallDataBase() {} +}; + +template +struct OPENTELEMETRY_LOCAL_SYMBOL OtlpGrpcAsyncCallData : public OtlpGrpcAsyncCallDataBase +{ + using RequestType = GrpcRequestType; + using ResponseType = GrpcResponseType; + + RequestType *request = nullptr; + ResponseType *response = nullptr; + std::unique_ptr> response_reader; + + std::function &&, + const RequestType &, + ResponseType *)> + result_callback; + + OtlpGrpcAsyncCallData() {} + virtual ~OtlpGrpcAsyncCallData() {} +}; +} // namespace + +struct OtlpGrpcClientAsyncData +{ + std::chrono::system_clock::duration export_timeout = std::chrono::seconds{10}; + + // The best performance trade-off of gRPC is having numcpu's threads and one completion queue + // per thread, but this exporter should not cost a lot resource and we don't want to create + // too many threads in the process. So we use one completion queue. + grpc::CompletionQueue cq; + + // Running requests, this is used to limit the number of concurrent requests. + std::atomic running_requests{0}; + // Request counter is used to record ForceFlush. + std::atomic start_request_counter{0}; + std::atomic finished_request_counter{0}; + std::size_t max_concurrent_requests = 64; + + // Condition variable and mutex to control the concurrency count of running requests. + std::mutex session_waker_lock; + std::condition_variable session_waker; + + std::mutex background_thread_m; + std::unique_ptr background_thread; + + // Do not use OtlpGrpcClientAsyncData() = default; here, some versions of GCC&Clang have BUGs + // and may not initialize the member correctly. See also + // https://stackoverflow.com/questions/53408962/try-to-understand-compiler-error-message-default-member-initializer-required-be + OtlpGrpcClientAsyncData() {} +}; +#endif + namespace { // ----------------------------- Helper functions ------------------------------ @@ -46,16 +124,216 @@ static std::string GetFileContentsOrInMemoryContents(const std::string &file_pat return contents; } +static void MaybeSpawnBackgroundThread(std::shared_ptr async_data) +{ + std::lock_guard lock_guard{async_data->background_thread_m}; + if (async_data->background_thread) + { + return; + } + + async_data->background_thread.reset(new std::thread([async_data]() { + bool running = true; + while (running) + { + void *tag = nullptr; + bool ok = false; + // If there is no job in exporting_timeout+5 minutes, we can exit this thread and start + // another thread later when new datas arrived. + auto got_status = async_data->cq.AsyncNext( + &tag, &ok, + std::chrono::system_clock::now() + std::chrono::minutes{5} + async_data->export_timeout); + if (grpc::CompletionQueue::SHUTDOWN == got_status) + { + std::lock_guard internal_lock_guard{async_data->background_thread_m}; + if (async_data->background_thread) + { + async_data->background_thread->detach(); + async_data->background_thread.reset(); + } + break; + } + if (grpc::CompletionQueue::TIMEOUT == got_status) + { + std::lock_guard internal_lock_guard{async_data->background_thread_m}; + running = async_data->running_requests.load(std::memory_order_acquire) > 0; + + if (!running) + { + if (async_data->background_thread) + { + async_data->background_thread->detach(); + async_data->background_thread.reset(); + } + break; + } + } + + if (nullptr == tag) + { + continue; + } + + auto callback_data = reinterpret_cast(tag); + --async_data->running_requests; + ++async_data->finished_request_counter; + + if (callback_data->grpc_status.ok()) + { + callback_data->export_result = opentelemetry::sdk::common::ExportResult::kSuccess; + } + + if (callback_data->grpc_async_callback) + { + callback_data->grpc_async_callback(callback_data); + delete callback_data; + } + + // Maybe wake up blocking DelegateAsyncExport() call + async_data->session_waker.notify_all(); + } + })); +} + +template +static sdk::common::ExportResult InternalDelegateAsyncExport( + std::shared_ptr async_data, + StubType *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + RequestType &&request, + std::function &&, + const RequestType &, + ResponseType *)> &&result_callback, + int32_t export_data_count, + const char *export_data_name) noexcept +{ + if (async_data->running_requests.load(std::memory_order_acquire) >= + async_data->max_concurrent_requests) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " + << export_data_count << " " << export_data_name + << " failed, exporter queue is full"); + if (result_callback) + { + result_callback(opentelemetry::sdk::common::ExportResult::kFailureFull, std::move(arena), + request, nullptr); + } + return opentelemetry::sdk::common::ExportResult::kFailureFull; + } + + OtlpGrpcAsyncCallData *call_data = + new OtlpGrpcAsyncCallData(); + call_data->arena.swap(arena); + call_data->result_callback.swap(result_callback); + + call_data->request = + google::protobuf::Arena::Create(call_data->arena.get(), std::move(request)); + call_data->response = google::protobuf::Arena::Create(call_data->arena.get()); + + if (call_data->request == nullptr || call_data->response == nullptr) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " + << export_data_count << " " << export_data_name + << " failed, create gRPC request/response failed"); + + if (call_data->result_callback) + { + call_data->result_callback( + opentelemetry::sdk::common::ExportResult::kFailure, std::move(call_data->arena), + nullptr == call_data->request ? request : *call_data->request, call_data->response); + } + + delete call_data; + return opentelemetry::sdk::common::ExportResult::kFailure; + } + call_data->grpc_context.swap(context); + + call_data->grpc_async_callback = [](OtlpGrpcAsyncCallDataBase *base_call_data) { + OtlpGrpcAsyncCallData *real_call_data = + static_cast *>(base_call_data); + if (real_call_data->result_callback) + { + return real_call_data->result_callback(real_call_data->export_result, + std::move(real_call_data->arena), + *real_call_data->request, real_call_data->response); + } + + return true; + }; + + call_data->response_reader = + stub->AsyncExport(call_data->grpc_context.get(), *call_data->request, &async_data->cq); + if (!call_data->response_reader) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " + << export_data_count << " " << export_data_name + << " failed, call AsyncExport failed"); + + if (call_data->result_callback) + { + call_data->result_callback( + opentelemetry::sdk::common::ExportResult::kFailure, std::move(call_data->arena), + nullptr == call_data->request ? request : *call_data->request, call_data->response); + } + delete call_data; + return opentelemetry::sdk::common::ExportResult::kFailure; + } + + call_data->response_reader->Finish(call_data->response, &call_data->grpc_status, + reinterpret_cast(call_data)); + + { + ++async_data->start_request_counter; + ++async_data->running_requests; + } + + // Maybe spawn background thread to handle the completion queue + MaybeSpawnBackgroundThread(async_data); + + // Can not cancle when start the request + { + std::unique_lock lock{async_data->session_waker_lock}; + async_data->session_waker.wait_for(lock, async_data->export_timeout, [async_data]() { + return async_data->running_requests.load(std::memory_order_acquire) <= + async_data->max_concurrent_requests; + }); + } + + return opentelemetry::sdk::common::ExportResult::kSuccess; +} + } // namespace +#ifdef ENABLE_ASYNC_EXPORT +OtlpGrpcClient::OtlpGrpcClient() : is_shutdown_{false} {} + +OtlpGrpcClient::~OtlpGrpcClient() +{ + std::shared_ptr async_data; + async_data.swap(async_data_); + + while (async_data->running_requests.load(std::memory_order_acquire) > 0) + { + std::unique_lock lock{async_data->session_waker_lock}; + async_data->session_waker.wait_for(lock, async_data->export_timeout, [async_data]() { + return async_data->running_requests.load(std::memory_order_acquire) <= + async_data->max_concurrent_requests; + }); + } +} + +#endif + std::shared_ptr OtlpGrpcClient::MakeChannel(const OtlpGrpcClientOptions &options) { std::shared_ptr channel; // - // Scheme is allowed in OTLP endpoint definition, but is not allowed for creating gRPC channel. - // Passing URI with scheme to grpc::CreateChannel could resolve the endpoint to some unexpected - // address. + // Scheme is allowed in OTLP endpoint definition, but is not allowed for creating gRPC + // channel. Passing URI with scheme to grpc::CreateChannel could resolve the endpoint to some + // unexpected address. // ext::http::common::UrlParser url(options.endpoint); @@ -70,6 +348,13 @@ std::shared_ptr OtlpGrpcClient::MakeChannel(const OtlpGrpcClientO grpc::ChannelArguments grpc_arguments; grpc_arguments.SetUserAgentPrefix(options.user_agent); + if (options.max_threads > 0) + { + grpc::ResourceQuota quota; + quota.SetMaxThreads(static_cast(options.max_threads)); + grpc_arguments.SetResourceQuota(quota); + } + if (options.use_ssl_credentials) { grpc::SslCredentialsOptions ssl_opts; @@ -79,7 +364,7 @@ std::shared_ptr OtlpGrpcClient::MakeChannel(const OtlpGrpcClientO ssl_opts.pem_private_key = GetFileContentsOrInMemoryContents(options.ssl_client_key_path, options.ssl_client_key_string); ssl_opts.pem_cert_chain = GetFileContentsOrInMemoryContents(options.ssl_client_cert_path, - options.ssl_client_cert_string); + options.ssl_client_cert_string); #endif channel = grpc::CreateCustomChannel(grpc_target, grpc::SslCredentials(ssl_opts), grpc_arguments); @@ -140,31 +425,209 @@ OtlpGrpcClient::MakeLogsServiceStub(const OtlpGrpcClientOptions &options) grpc::Status OtlpGrpcClient::DelegateExport( proto::collector::trace::v1::TraceService::StubInterface *stub, - grpc::ClientContext *context, - const proto::collector::trace::v1::ExportTraceServiceRequest &request, + std::unique_ptr &&context, + std::unique_ptr && /*arena*/, + proto::collector::trace::v1::ExportTraceServiceRequest &&request, proto::collector::trace::v1::ExportTraceServiceResponse *response) { - return stub->Export(context, request, response); + return stub->Export(context.get(), request, response); } grpc::Status OtlpGrpcClient::DelegateExport( proto::collector::metrics::v1::MetricsService::StubInterface *stub, - grpc::ClientContext *context, - const proto::collector::metrics::v1::ExportMetricsServiceRequest &request, + std::unique_ptr &&context, + std::unique_ptr && /*arena*/, + proto::collector::metrics::v1::ExportMetricsServiceRequest &&request, proto::collector::metrics::v1::ExportMetricsServiceResponse *response) { - return stub->Export(context, request, response); + return stub->Export(context.get(), request, response); } grpc::Status OtlpGrpcClient::DelegateExport( proto::collector::logs::v1::LogsService::StubInterface *stub, - grpc::ClientContext *context, - const proto::collector::logs::v1::ExportLogsServiceRequest &request, + std::unique_ptr &&context, + std::unique_ptr && /*arena*/, + proto::collector::logs::v1::ExportLogsServiceRequest &&request, proto::collector::logs::v1::ExportLogsServiceResponse *response) { - return stub->Export(context, request, response); + return stub->Export(context.get(), request, response); +} + +#ifdef ENABLE_ASYNC_EXPORT + +/** + * Async export + * @param options Options used to message to create gRPC context and stub(if necessary) + * @param arena Protobuf arena to hold lifetime of all messages + * @param request Request for this RPC + * @param result_callback callback to call when the exporting is done + * @return return the status of this operation + */ +sdk::common::ExportResult OtlpGrpcClient::DelegateAsyncExport( + const OtlpGrpcClientOptions &options, + proto::collector::trace::v1::TraceService::StubInterface *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::trace::v1::ExportTraceServiceRequest &&request, + std::function &&, + const proto::collector::trace::v1::ExportTraceServiceRequest &, + proto::collector::trace::v1::ExportTraceServiceResponse *)> + &&result_callback) noexcept +{ + auto span_count = request.resource_spans_size(); + if (is_shutdown_) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " + << span_count << " trace span(s) failed, exporter is shutdown"); + if (result_callback) + { + result_callback(opentelemetry::sdk::common::ExportResult::kFailure, std::move(arena), request, + nullptr); + } + return opentelemetry::sdk::common::ExportResult::kFailure; + } + + std::shared_ptr async_data = MutableAsyncData(options); + return InternalDelegateAsyncExport(async_data, stub, std::move(context), std::move(arena), + std::move(request), std::move(result_callback), span_count, + "trace span(s)"); +} + +sdk::common::ExportResult OtlpGrpcClient::DelegateAsyncExport( + const OtlpGrpcClientOptions &options, + proto::collector::metrics::v1::MetricsService::StubInterface *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::metrics::v1::ExportMetricsServiceRequest &&request, + std::function &&, + const proto::collector::metrics::v1::ExportMetricsServiceRequest &, + proto::collector::metrics::v1::ExportMetricsServiceResponse *)> + &&result_callback) noexcept +{ + auto metrics_count = request.resource_metrics_size(); + if (is_shutdown_) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " + << metrics_count << " metric(s) failed, exporter is shutdown"); + if (result_callback) + { + result_callback(opentelemetry::sdk::common::ExportResult::kFailure, std::move(arena), request, + nullptr); + } + return opentelemetry::sdk::common::ExportResult::kFailure; + } + + std::shared_ptr async_data = MutableAsyncData(options); + return InternalDelegateAsyncExport(async_data, stub, std::move(context), std::move(arena), + std::move(request), std::move(result_callback), metrics_count, + "metric(s)"); +} + +sdk::common::ExportResult OtlpGrpcClient::DelegateAsyncExport( + const OtlpGrpcClientOptions &options, + proto::collector::logs::v1::LogsService::StubInterface *stub, + std::unique_ptr &&context, + std::unique_ptr &&arena, + proto::collector::logs::v1::ExportLogsServiceRequest &&request, + std::function &&, + const proto::collector::logs::v1::ExportLogsServiceRequest &, + proto::collector::logs::v1::ExportLogsServiceResponse *)> + &&result_callback) noexcept +{ + auto logs_count = request.resource_logs_size(); + if (is_shutdown_) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC Client] ERROR: Export " + << logs_count << " log(s) failed, exporter is shutdown"); + if (result_callback) + { + result_callback(opentelemetry::sdk::common::ExportResult::kFailure, std::move(arena), request, + nullptr); + } + return opentelemetry::sdk::common::ExportResult::kFailure; + } + + std::shared_ptr async_data = MutableAsyncData(options); + return InternalDelegateAsyncExport(async_data, stub, std::move(context), std::move(arena), + std::move(request), std::move(result_callback), logs_count, + "log(s)"); +} + +std::shared_ptr OtlpGrpcClient::MutableAsyncData( + const OtlpGrpcClientOptions &options) +{ + if (!async_data_) + { + async_data_ = std::make_shared(); + async_data_->export_timeout = options.timeout; + async_data_->max_concurrent_requests = options.max_concurrent_requests; + } + + return async_data_; +} + +bool OtlpGrpcClient::ForceFlush(std::chrono::microseconds timeout) noexcept +{ + if (!async_data_) + { + return true; + } + + std::size_t request_counter = async_data_->start_request_counter.load(std::memory_order_acquire); + if (request_counter <= async_data_->finished_request_counter.load(std::memory_order_acquire)) + { + return true; + } + + // ASAN will report chrono: runtime error: signed integer overflow: A + B cannot be represented + // in type 'long int' here. So we reset timeout to meet signed long int limit here. + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + + // Wait for all the sessions to finish + std::unique_lock lock(async_data_->session_waker_lock); + + std::chrono::steady_clock::duration timeout_steady = + std::chrono::duration_cast(timeout); + if (timeout_steady <= std::chrono::steady_clock::duration::zero()) + { + timeout_steady = std::chrono::steady_clock::duration::max(); + } + + while (timeout_steady > std::chrono::steady_clock::duration::zero()) + { + // When changes of running_sessions_ and notify_one/notify_all happen between predicate + // checking and waiting, we should not wait forever.We should cleanup gc sessions here as soon + // as possible to call FinishSession() and cleanup resources. + std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); + if (std::cv_status::timeout != + async_data_->session_waker.wait_for(lock, async_data_->export_timeout)) + { + break; + } + + timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; + } + + return timeout_steady > std::chrono::steady_clock::duration::zero(); +} + +bool OtlpGrpcClient::Shutdown(std::chrono::microseconds timeout) noexcept +{ + if (!async_data_) + { + return true; + } + + async_data_->cq.Shutdown(); + return ForceFlush(timeout); } +#endif + } // namespace otlp } // namespace exporter OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/otlp/src/otlp_grpc_exporter.cc b/exporters/otlp/src/otlp_grpc_exporter.cc index 8fc4ce9272..fcf4b884fd 100644 --- a/exporters/otlp/src/otlp_grpc_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_exporter.cc @@ -23,12 +23,20 @@ namespace otlp OtlpGrpcExporter::OtlpGrpcExporter() : OtlpGrpcExporter(OtlpGrpcExporterOptions()) {} OtlpGrpcExporter::OtlpGrpcExporter(const OtlpGrpcExporterOptions &options) - : options_(options), trace_service_stub_(OtlpGrpcClient::MakeTraceServiceStub(options)) + : options_(options), +#ifdef ENABLE_ASYNC_EXPORT + client_(std::make_shared()), +#endif + trace_service_stub_(OtlpGrpcClient::MakeTraceServiceStub(options)) {} OtlpGrpcExporter::OtlpGrpcExporter( std::unique_ptr stub) - : options_(OtlpGrpcExporterOptions()), trace_service_stub_(std::move(stub)) + : options_(OtlpGrpcExporterOptions()), +#ifdef ENABLE_ASYNC_EXPORT + client_(std::make_shared()), +#endif + trace_service_stub_(std::move(stub)) {} // ----------------------------- Exporter methods ------------------------------ @@ -58,28 +66,58 @@ sdk::common::ExportResult OtlpGrpcExporter::Export( // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager // block to reduce memory fragments. arena_options.max_block_size = 65536; - google::protobuf::Arena arena{arena_options}; + std::unique_ptr arena{new google::protobuf::Arena{arena_options}}; proto::collector::trace::v1::ExportTraceServiceRequest *request = - google::protobuf::Arena::CreateMessage< - proto::collector::trace::v1::ExportTraceServiceRequest>(&arena); + google::protobuf::Arena::Create( + arena.get()); OtlpRecordableUtils::PopulateRequest(spans, request); auto context = OtlpGrpcClient::MakeClientContext(options_); proto::collector::trace::v1::ExportTraceServiceResponse *response = - google::protobuf::Arena::CreateMessage< - proto::collector::trace::v1::ExportTraceServiceResponse>(&arena); + google::protobuf::Arena::Create( + arena.get()); - grpc::Status status = - OtlpGrpcClient::DelegateExport(trace_service_stub_.get(), context.get(), *request, response); - - if (!status.ok()) +#ifdef ENABLE_ASYNC_EXPORT + if (options_.max_concurrent_requests > 1) { - OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE GRPC Exporter] Export() failed with status_code: \"" - << grpc_utils::grpc_status_code_to_string(status.error_code()) - << "\" error_message: \"" << status.error_message() << "\""); - return sdk::common::ExportResult::kFailure; + return client_->DelegateAsyncExport( + options_, trace_service_stub_.get(), std::move(context), std::move(arena), + std::move(*request), + [](opentelemetry::sdk::common::ExportResult result, + std::unique_ptr &&, + const proto::collector::trace::v1::ExportTraceServiceRequest &request, + proto::collector::trace::v1::ExportTraceServiceResponse *) { + if (result != opentelemetry::sdk::common::ExportResult::kSuccess) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + << request.resource_spans_size() + << " trace span(s) error: " << static_cast(result)); + } + else + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << request.resource_spans_size() + << " trace span(s) success"); + } + return true; + }); + } + else + { +#endif + grpc::Status status = + OtlpGrpcClient::DelegateExport(trace_service_stub_.get(), std::move(context), + std::move(arena), std::move(*request), response); + if (!status.ok()) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE GRPC Exporter] Export() failed with status_code: \"" + << grpc_utils::grpc_status_code_to_string(status.error_code()) + << "\" error_message: \"" << status.error_message() << "\""); + return sdk::common::ExportResult::kFailure; + } +#ifdef ENABLE_ASYNC_EXPORT } +#endif return sdk::common::ExportResult::kSuccess; } diff --git a/exporters/otlp/src/otlp_grpc_exporter_options.cc b/exporters/otlp/src/otlp_grpc_exporter_options.cc index 3099303ac4..c5dc94ec9d 100644 --- a/exporters/otlp/src/otlp_grpc_exporter_options.cc +++ b/exporters/otlp/src/otlp_grpc_exporter_options.cc @@ -29,6 +29,11 @@ OtlpGrpcExporterOptions::OtlpGrpcExporterOptions() timeout = GetOtlpDefaultTracesTimeout(); metadata = GetOtlpDefaultTracesHeaders(); user_agent = GetOtlpDefaultUserAgent(); + + max_threads = 0; +#ifdef ENABLE_ASYNC_EXPORT + max_concurrent_requests = 64; +#endif } OtlpGrpcExporterOptions::~OtlpGrpcExporterOptions() {} diff --git a/exporters/otlp/src/otlp_grpc_log_record_exporter.cc b/exporters/otlp/src/otlp_grpc_log_record_exporter.cc index b0da987c43..50fcd1d491 100644 --- a/exporters/otlp/src/otlp_grpc_log_record_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_log_record_exporter.cc @@ -35,12 +35,20 @@ OtlpGrpcLogRecordExporter::OtlpGrpcLogRecordExporter() OtlpGrpcLogRecordExporter::OtlpGrpcLogRecordExporter( const OtlpGrpcLogRecordExporterOptions &options) - : options_(options), log_service_stub_(OtlpGrpcClient::MakeLogsServiceStub(options)) + : options_(options), +#ifdef ENABLE_ASYNC_EXPORT + client_(std::make_shared()), +#endif + log_service_stub_(OtlpGrpcClient::MakeLogsServiceStub(options)) {} OtlpGrpcLogRecordExporter::OtlpGrpcLogRecordExporter( std::unique_ptr stub) - : options_(OtlpGrpcLogRecordExporterOptions()), log_service_stub_(std::move(stub)) + : options_(OtlpGrpcLogRecordExporterOptions()), +#ifdef ENABLE_ASYNC_EXPORT + client_(std::make_shared()), +#endif + log_service_stub_(std::move(stub)) {} // ----------------------------- Exporter methods ------------------------------ @@ -71,26 +79,58 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogRecordExporter::Export( // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager // block to reduce memory fragments. arena_options.max_block_size = 65536; - google::protobuf::Arena arena{arena_options}; + std::unique_ptr arena{new google::protobuf::Arena{arena_options}}; proto::collector::logs::v1::ExportLogsServiceRequest *request = - google::protobuf::Arena::CreateMessage( - &arena); + google::protobuf::Arena::Create( + arena.get()); OtlpRecordableUtils::PopulateRequest(logs, request); auto context = OtlpGrpcClient::MakeClientContext(options_); proto::collector::logs::v1::ExportLogsServiceResponse *response = - google::protobuf::Arena::CreateMessage( - &arena); + google::protobuf::Arena::Create( + arena.get()); - grpc::Status status = - OtlpGrpcClient::DelegateExport(log_service_stub_.get(), context.get(), *request, response); - - if (!status.ok()) +#ifdef ENABLE_ASYNC_EXPORT + if (options_.max_concurrent_requests > 1) { - OTEL_INTERNAL_LOG_ERROR("[OTLP LOG GRPC Exporter] Export() failed: " << status.error_message()); - return sdk::common::ExportResult::kFailure; + return client_->DelegateAsyncExport( + options_, log_service_stub_.get(), std::move(context), std::move(arena), + std::move(*request), + [](opentelemetry::sdk::common::ExportResult result, + std::unique_ptr &&, + const proto::collector::logs::v1::ExportLogsServiceRequest &request, + proto::collector::logs::v1::ExportLogsServiceResponse *) { + if (result != opentelemetry::sdk::common::ExportResult::kSuccess) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + << request.resource_logs_size() + << " log(s) error: " << static_cast(result)); + } + else + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << request.resource_logs_size() + << " log(s) success"); + } + return true; + }); + } + else + { +#endif + grpc::Status status = + OtlpGrpcClient::DelegateExport(log_service_stub_.get(), std::move(context), + std::move(arena), std::move(*request), response); + + if (!status.ok()) + { + OTEL_INTERNAL_LOG_ERROR( + "[OTLP LOG GRPC Exporter] Export() failed: " << status.error_message()); + return sdk::common::ExportResult::kFailure; + } +#ifdef ENABLE_ASYNC_EXPORT } +#endif return sdk::common::ExportResult::kSuccess; } diff --git a/exporters/otlp/src/otlp_grpc_log_record_exporter_options.cc b/exporters/otlp/src/otlp_grpc_log_record_exporter_options.cc index 697ef28f2a..5925808abe 100644 --- a/exporters/otlp/src/otlp_grpc_log_record_exporter_options.cc +++ b/exporters/otlp/src/otlp_grpc_log_record_exporter_options.cc @@ -27,6 +27,11 @@ OtlpGrpcLogRecordExporterOptions::OtlpGrpcLogRecordExporterOptions() timeout = GetOtlpDefaultLogsTimeout(); metadata = GetOtlpDefaultLogsHeaders(); user_agent = GetOtlpDefaultUserAgent(); + + max_threads = 0; +#ifdef ENABLE_ASYNC_EXPORT + max_concurrent_requests = 64; +#endif } OtlpGrpcLogRecordExporterOptions::~OtlpGrpcLogRecordExporterOptions() {} diff --git a/exporters/otlp/src/otlp_grpc_metric_exporter.cc b/exporters/otlp/src/otlp_grpc_metric_exporter.cc index 921eb4c481..8d87d20a38 100644 --- a/exporters/otlp/src/otlp_grpc_metric_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_metric_exporter.cc @@ -23,6 +23,9 @@ OtlpGrpcMetricExporter::OtlpGrpcMetricExporter() OtlpGrpcMetricExporter::OtlpGrpcMetricExporter(const OtlpGrpcMetricExporterOptions &options) : options_(options), +#ifdef ENABLE_ASYNC_EXPORT + client_(std::make_shared()), +#endif aggregation_temporality_selector_{ OtlpMetricUtils::ChooseTemporalitySelector(options_.aggregation_temporality)}, metrics_service_stub_(OtlpGrpcClient::MakeMetricsServiceStub(options)) @@ -31,6 +34,9 @@ OtlpGrpcMetricExporter::OtlpGrpcMetricExporter(const OtlpGrpcMetricExporterOptio OtlpGrpcMetricExporter::OtlpGrpcMetricExporter( std::unique_ptr stub) : options_(OtlpGrpcMetricExporterOptions()), +#ifdef ENABLE_ASYNC_EXPORT + client_(std::make_shared()), +#endif aggregation_temporality_selector_{ OtlpMetricUtils::ChooseTemporalitySelector(options_.aggregation_temporality)}, metrics_service_stub_(std::move(stub)) @@ -66,27 +72,58 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcMetricExporter::Export( // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager // block to reduce memory fragments. arena_options.max_block_size = 65536; - google::protobuf::Arena arena{arena_options}; + std::unique_ptr arena{new google::protobuf::Arena{arena_options}}; proto::collector::metrics::v1::ExportMetricsServiceRequest *request = - google::protobuf::Arena::CreateMessage< - proto::collector::metrics::v1::ExportMetricsServiceRequest>(&arena); + google::protobuf::Arena::Create( + arena.get()); OtlpMetricUtils::PopulateRequest(data, request); auto context = OtlpGrpcClient::MakeClientContext(options_); proto::collector::metrics::v1::ExportMetricsServiceResponse *response = - google::protobuf::Arena::CreateMessage< - proto::collector::metrics::v1::ExportMetricsServiceResponse>(&arena); + google::protobuf::Arena::Create( + arena.get()); - grpc::Status status = OtlpGrpcClient::DelegateExport(metrics_service_stub_.get(), context.get(), - *request, response); - - if (!status.ok()) +#ifdef ENABLE_ASYNC_EXPORT + if (options_.max_concurrent_requests > 1) { - OTEL_INTERNAL_LOG_ERROR( - "[OTLP METRIC GRPC Exporter] Export() failed: " << status.error_message()); - return sdk::common::ExportResult::kFailure; + return client_->DelegateAsyncExport( + options_, metrics_service_stub_.get(), std::move(context), std::move(arena), + std::move(*request), + [](opentelemetry::sdk::common::ExportResult result, + std::unique_ptr &&, + const proto::collector::metrics::v1::ExportMetricsServiceRequest &request, + proto::collector::metrics::v1::ExportMetricsServiceResponse *) { + if (result != opentelemetry::sdk::common::ExportResult::kSuccess) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] ERROR: Export " + << request.resource_metrics_size() + << " metric(s) error: " << static_cast(result)); + } + else + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Export " << request.resource_metrics_size() + << " metric(s) success"); + } + return true; + }); + } + else + { +#endif + grpc::Status status = + OtlpGrpcClient::DelegateExport(metrics_service_stub_.get(), std::move(context), + std::move(arena), std::move(*request), response); + + if (!status.ok()) + { + OTEL_INTERNAL_LOG_ERROR( + "[OTLP METRIC GRPC Exporter] Export() failed: " << status.error_message()); + return sdk::common::ExportResult::kFailure; + } +#ifdef ENABLE_ASYNC_EXPORT } +#endif return opentelemetry::sdk::common::ExportResult::kSuccess; } diff --git a/exporters/otlp/src/otlp_grpc_metric_exporter_options.cc b/exporters/otlp/src/otlp_grpc_metric_exporter_options.cc index cb99d1cc5a..ff8466b0b2 100644 --- a/exporters/otlp/src/otlp_grpc_metric_exporter_options.cc +++ b/exporters/otlp/src/otlp_grpc_metric_exporter_options.cc @@ -29,6 +29,11 @@ OtlpGrpcMetricExporterOptions::OtlpGrpcMetricExporterOptions() user_agent = GetOtlpDefaultUserAgent(); aggregation_temporality = PreferredAggregationTemporality::kCumulative; + + max_threads = 0; +#ifdef ENABLE_ASYNC_EXPORT + max_concurrent_requests = 64; +#endif } OtlpGrpcMetricExporterOptions::~OtlpGrpcMetricExporterOptions() {} diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index 977ea2a0cb..3ba0d19424 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -105,8 +105,8 @@ opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export( google::protobuf::Arena arena{arena_options}; proto::collector::trace::v1::ExportTraceServiceRequest *service_request = - google::protobuf::Arena::CreateMessage< - proto::collector::trace::v1::ExportTraceServiceRequest>(&arena); + google::protobuf::Arena::Create( + &arena); OtlpRecordableUtils::PopulateRequest(spans, service_request); std::size_t span_count = spans.size(); #ifdef ENABLE_ASYNC_EXPORT diff --git a/exporters/otlp/src/otlp_http_log_record_exporter.cc b/exporters/otlp/src/otlp_http_log_record_exporter.cc index db4b813c02..6e046f888c 100644 --- a/exporters/otlp/src/otlp_http_log_record_exporter.cc +++ b/exporters/otlp/src/otlp_http_log_record_exporter.cc @@ -109,8 +109,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpLogRecordExporter::Export( google::protobuf::Arena arena{arena_options}; proto::collector::logs::v1::ExportLogsServiceRequest *service_request = - google::protobuf::Arena::CreateMessage( - &arena); + google::protobuf::Arena::Create(&arena); OtlpRecordableUtils::PopulateRequest(logs, service_request); std::size_t log_count = logs.size(); #ifdef ENABLE_ASYNC_EXPORT diff --git a/exporters/otlp/src/otlp_http_metric_exporter.cc b/exporters/otlp/src/otlp_http_metric_exporter.cc index 52c25b3a34..a4a3e3b586 100644 --- a/exporters/otlp/src/otlp_http_metric_exporter.cc +++ b/exporters/otlp/src/otlp_http_metric_exporter.cc @@ -112,8 +112,8 @@ opentelemetry::sdk::common::ExportResult OtlpHttpMetricExporter::Export( google::protobuf::Arena arena{arena_options}; proto::collector::metrics::v1::ExportMetricsServiceRequest *service_request = - google::protobuf::Arena::CreateMessage< - proto::collector::metrics::v1::ExportMetricsServiceRequest>(&arena); + google::protobuf::Arena::Create( + &arena); OtlpMetricUtils::PopulateRequest(data, service_request); std::size_t metric_count = data.scope_metric_data_.size(); #ifdef ENABLE_ASYNC_EXPORT