From 1db4e3effbea642d4f1b0992f1b2fd46f814263e Mon Sep 17 00:00:00 2001 From: Dimiter 'malkia' Stanev Date: Mon, 18 Nov 2024 21:47:40 -0800 Subject: [PATCH] fixes --- .../exporters/otlp/otlp_grpc_forward_proxy.h | 18 +- exporters/otlp/src/otlp_grpc_forward_proxy.cc | 263 ++++++++++-------- x/x.cpp | 20 +- 3 files changed, 173 insertions(+), 128 deletions(-) diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_forward_proxy.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_forward_proxy.h index 3e98852abc..a116af904a 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_forward_proxy.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_forward_proxy.h @@ -22,17 +22,23 @@ class OPENTELEMETRY_EXPORT_TYPE OtlpGrpcForwardProxy { struct Impl; std::unique_ptr impl; + OtlpGrpcForwardProxy() = delete; public: - explicit OtlpGrpcForwardProxy(); + enum class ExportMode { + Unknown = -1, + Sync = 0, + // For async to work, options.max_concurrent_requests > 1. + AsyncBlockOnFull = 1, + AsyncDropOnFull = 2, + }; + explicit OtlpGrpcForwardProxy(const OtlpGrpcClientOptions& options); ~OtlpGrpcForwardProxy(); void SetActive(bool active); bool IsActive() const; - void SetAsync(bool async); - bool IsAsync() const; void AddListenAddress(const std::string& listenAddress); - void RegisterTraceExporter(const OtlpGrpcExporterOptions& options); - void RegisterMetricExporter(const OtlpGrpcMetricExporterOptions& options); - void RegisterLogRecordExporter(const OtlpGrpcLogRecordExporterOptions& options); + void RegisterTraceExporter( ExportMode exportMode ); + void RegisterMetricExporter( ExportMode exportMode ); + void RegisterLogRecordExporter( ExportMode exportMode ); void Start(); void Wait(); void Shutdown(); diff --git a/exporters/otlp/src/otlp_grpc_forward_proxy.cc b/exporters/otlp/src/otlp_grpc_forward_proxy.cc index de7c8f93bb..7eea760090 100644 --- a/exporters/otlp/src/otlp_grpc_forward_proxy.cc +++ b/exporters/otlp/src/otlp_grpc_forward_proxy.cc @@ -6,12 +6,17 @@ #include #include "opentelemetry/version.h" + +#include "opentelemetry/sdk/common/global_log_handler.h" + #include "opentelemetry/exporters/otlp/otlp_grpc_forward_proxy.h" #include "opentelemetry/exporters/otlp/otlp_grpc_client.h" +#include "opentelemetry/exporters/otlp/otlp_grpc_client_factory.h" + #include "opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h" #include "opentelemetry/proto/collector/trace/v1/trace_service.pb.h" #include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h" -#include "opentelemetry/sdk/common/global_log_handler.h" + OPENTELEMETRY_BEGIN_NAMESPACE namespace exporter @@ -20,21 +25,23 @@ namespace otlp { using namespace opentelemetry; + using namespace opentelemetry::exporter::otlp; -using namespace opentelemetry::proto::collector::metrics::v1; + using namespace opentelemetry::proto::collector::logs::v1; using namespace opentelemetry::proto::collector::trace::v1; +using namespace opentelemetry::proto::collector::metrics::v1; struct OtlpGrpcForwardProxy::Impl { bool active{ false }; -#if defined(ENABLE_ASYNC_EXPORT) && ENABLE_ASYNC_EXPORT - bool async{ false }; -#endif + + OtlpGrpcClientOptions clientOptions; + std::shared_ptr client; grpc::ServerBuilder serverBuilder; std::unique_ptr server; - OtlpGrpcClient client; + google::protobuf::ArenaOptions arenaOptions; struct AddressPortPair @@ -47,40 +54,55 @@ struct OtlpGrpcForwardProxy::Impl std::list listenAddresses; - OtlpGrpcExporterOptions traceExporterOptions; - OtlpGrpcMetricExporterOptions metricExporterOptions; - OtlpGrpcLogRecordExporterOptions logExporterOptions; - + std::unique_ptr logExporterStub; std::unique_ptr traceExporterStub; std::unique_ptr metricExporterStub; - std::unique_ptr logExporterStub; +#if defined(ENABLE_ASYNC_EXPORT) + ExportMode logExporterMode{ ExportMode::Unknown }; + ExportMode traceExporterMode{ ExportMode::Unknown };; + ExportMode metricExporterMode{ ExportMode::Unknown };; + + struct LogExporterProxyAsync; + struct TraceExporterProxyAsync; + struct MetricExporterProxyAsync; + + std::unique_ptr logExporterProxyAsync; + std::unique_ptr traceExporterProxyAsync; + std::unique_ptr metricExporterProxyAsync; +#endif + + struct LogExporterProxy; struct TraceExporterProxy; struct MetricExporterProxy; - struct LogExporterProxy; + std::unique_ptr logExporterProxy; std::unique_ptr traceExporterProxy; std::unique_ptr metricExporterProxy; - std::unique_ptr logExporterProxy; - Impl(); + Impl(const OtlpGrpcClientOptions&); +#if defined(ENABLE_ASYNC_EXPORT) bool HandleExportResult(sdk::common::ExportResult result) const; - grpc::ServerUnaryReactor* Finish(grpc::CallbackServerContext *cbServerContext, opentelemetry::sdk::common::ExportResult exportResult) const; + grpc::ServerUnaryReactor* Finish(grpc::CallbackServerContext *cbServerContext, opentelemetry::sdk::common::ExportResult exportResult, const grpc::Status& syncStatus) const; +#endif }; -OtlpGrpcForwardProxy::Impl::Impl() +OtlpGrpcForwardProxy::Impl::Impl(const OtlpGrpcClientOptions& options) + : clientOptions(options) { - arenaOptions.initial_block_size = 1024; - arenaOptions.max_block_size = 1024 * 1024; + arenaOptions.initial_block_size = 65536; + arenaOptions.max_block_size = 128 * 1024 * 1024; + client = OtlpGrpcClientFactory::Create(clientOptions); } +#if defined(ENABLE_ASYNC_EXPORT) bool OtlpGrpcForwardProxy::Impl::HandleExportResult(sdk::common::ExportResult result) const { if( active ) { if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { - OTEL_INTERNAL_LOG_ERROR("[otlp_grpc_forward_proxy] AsyncExport failed");// << result); + OTEL_INTERNAL_LOG_ERROR("[otlp_grpc_forward_proxy] AsyncExport failed"); } else { @@ -90,100 +112,108 @@ bool OtlpGrpcForwardProxy::Impl::HandleExportResult(sdk::common::ExportResult re return true; } -grpc::ServerUnaryReactor* OtlpGrpcForwardProxy::Impl::Finish(grpc::CallbackServerContext *cbServerContext, opentelemetry::sdk::common::ExportResult exportResult) const +grpc::ServerUnaryReactor* OtlpGrpcForwardProxy::Impl::Finish(grpc::CallbackServerContext *cbServerContext, opentelemetry::sdk::common::ExportResult exportResult, const grpc::Status& syncStatus ) const { grpc::Status grpcStatus; if( active ) { - switch( exportResult ) + if( syncStatus.error_code() != grpc::DO_NOT_USE ) { - // Batch was exported successfully. - case sdk::common::ExportResult::kSuccess: - grpcStatus = grpc::Status(grpc::StatusCode::OK, "ok"); - break; - - // Batch exporting failed, caller must not retry exporting the same batch and the batch must be dropped. - case sdk::common::ExportResult::kFailure: - grpcStatus = grpc::Status(grpc::StatusCode::ABORTED, "ExportResult::kFailure"); - break; - - // The collection does not have enough space to receive the export batch. - case sdk::common::ExportResult::kFailureFull: - grpcStatus = grpc::Status(grpc::StatusCode::UNAVAILABLE, "ExportResult::kFailureFull"); - break; - - case sdk::common::ExportResult::kFailureInvalidArgument: - grpcStatus = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "ExportResult::kFailureInvalidArgument"); - break; - - default: - grpcStatus = grpc::Status(grpc::StatusCode::UNKNOWN, "ExportResult::???", std::to_string(int(exportResult))); - break; + grpcStatus = syncStatus; } - - if( exportResult != sdk::common::ExportResult::kSuccess ) + else { - OTEL_INTERNAL_LOG_ERROR("[OTLP GRPC FORWARD PROXY] Finish failed with result "); // << exportResult << " and status code " << grpcStatus); + switch( exportResult ) + { + // Batch was exported successfully. + case sdk::common::ExportResult::kSuccess: + grpcStatus = grpc::Status(grpc::StatusCode::OK, ""); + break; + + // Batch exporting failed, caller must not retry exporting the same batch and the batch must be dropped. + case sdk::common::ExportResult::kFailure: + grpcStatus = grpc::Status(grpc::StatusCode::ABORTED, "ExportResult::kFailure"); + break; + + // The collection does not have enough space to receive the export batch. + case sdk::common::ExportResult::kFailureFull: + grpcStatus = grpc::Status(grpc::StatusCode::UNAVAILABLE, "ExportResult::kFailureFull"); + break; + + case sdk::common::ExportResult::kFailureInvalidArgument: + grpcStatus = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "ExportResult::kFailureInvalidArgument"); + break; + + default: + grpcStatus = grpc::Status(grpc::StatusCode::UNKNOWN, "ExportResult::???"); + break; + } } } else { - grpcStatus = grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "OtlpGrpcForwardProxy is not active.", std::to_string(int(exportResult))); + grpcStatus = grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "OtlpGrpcForwardProxy is not active."); } const auto reactor{ cbServerContext->DefaultReactor() }; reactor->Finish(grpcStatus); return reactor; } +#endif // This is bit ugly, but still easier to understand (at least by me) than template<>'s -#if defined(ENABLE_ASYNC_EXPORT) && ENABLE_ASYNC_EXPORT -#define MAKE_PROXY(NAME, OPTIONS, STUB, SERVICE, REQUEST, RESPONSE, TEXT) \ - struct OtlpGrpcForwardProxy::Impl::NAME : public SERVICE::CallbackService { \ +#if defined(ENABLE_ASYNC_EXPORT) +#define MAKE_PROXY_ASYNC(NAME, STUB, SERVICE, REQUEST, RESPONSE, TEXT, MODE) \ + struct OtlpGrpcForwardProxy::Impl::NAME final : public SERVICE::CallbackService { \ OtlpGrpcForwardProxy::Impl& impl; \ explicit NAME(OtlpGrpcForwardProxy::Impl& impl_): impl(impl_){} \ grpc::ServerUnaryReactor* Export(grpc::CallbackServerContext* cbServerContext, const REQUEST* req, RESPONSE* resp) override { \ auto exportResult{ sdk::common::ExportResult::kFailure }; \ if( impl.active ) { \ - auto context{ impl.client.MakeClientContext(impl.OPTIONS) }; \ - if( impl.async ) { \ - OTEL_INTERNAL_LOG_DEBUG("[otlp_grpc_forward_proxy] Async " TEXT " export"); \ - auto arena{ std::make_unique(impl.arenaOptions) }; \ - auto request{ *req }; \ - exportResult = impl.client.DelegateAsyncExport( impl.OPTIONS, impl.STUB.get(), std::move(context), std::move(arena), std::move(request), \ - [implPtr = &impl] (sdk::common::ExportResult r, std::unique_ptr&&, const REQUEST&, RESPONSE* ) -> bool { \ - return implPtr->HandleExportResult(r); \ - }); \ - } else { \ - OTEL_INTERNAL_LOG_DEBUG("[otlp_grpc_forward_proxy] Sync " TEXT " export"); \ - const auto status{ impl.STUB->Export( context.get(), *req, resp ) }; \ - exportResult = status.ok() ? sdk::common::ExportResult::kSuccess : sdk::common::ExportResult::kFailure; \ + OTEL_INTERNAL_LOG_DEBUG("[otlp_grpc_forward_proxy] " TEXT " export"); \ + auto syncStatus{ grpc::Status(grpc::Status(grpc::StatusCode::DO_NOT_USE, "")) }; \ + auto context{ impl.client->MakeClientContext(impl.clientOptions) }; \ + auto arena{ std::make_unique(impl.arenaOptions) }; \ + auto request{ *req }; \ + exportResult = impl.client->DelegateAsyncExport( impl.clientOptions, impl.STUB.get(), std::move(context), std::move(arena), std::move(request), \ + [implPtr = &impl] (sdk::common::ExportResult r, std::unique_ptr&&, const REQUEST&, RESPONSE* ) -> bool { \ + return implPtr->HandleExportResult(r); \ + }); \ + if( exportResult == sdk::common::ExportResult::kFailureFull && impl.MODE == OtlpGrpcForwardProxy::ExportMode::AsyncBlockOnFull ) { \ + OTEL_INTERNAL_LOG_DEBUG("[otlp_grpc_forward_proxy] " TEXT " export (blocking)"); \ + auto syncContext{ impl.client->MakeClientContext(impl.clientOptions ) }; \ + syncStatus = impl.STUB->Export( syncContext.get(), *req, resp ); \ } \ + return impl.Finish(cbServerContext, exportResult, syncStatus); \ } \ - return impl.Finish(cbServerContext, exportResult); \ + return impl.Finish(cbServerContext, exportResult, grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "OtlpGrpcForwardProxy is not active.")); \ }}; -#else -#define MAKE_PROXY(NAME, OPTIONS, REQUEST, RESPONSE, TEXT) \ - struct OtlpGrpcForwardProxy::Impl::NAME : public SERVICE::CallbackService { \ +#endif + +#define MAKE_PROXY(NAME, STUB, SERVICE, REQUEST, RESPONSE, TEXT) \ + struct OtlpGrpcForwardProxy::Impl::NAME final : public SERVICE::Service { \ OtlpGrpcForwardProxy::Impl& impl; \ explicit NAME(OtlpGrpcForwardProxy::Impl& impl_): impl(impl_){} \ - grpc::ServerUnaryReactor* Export(grpc::CallbackServerContext* cbServerContext, const REQUEST* req, RESPONSE* resp) override { \ - auto exportResult{ sdk::common::ExportResult::kFailure }; \ + grpc::Status Export(grpc::ServerContext*, const REQUEST* req, RESPONSE* resp) override { \ if( impl.active ) { \ - auto context{ impl.client.MakeClientContext(impl.OPTIONS) }; \ - OTEL_INTERNAL_LOG_DEBUG("[otlp_grpc_forward_proxy] Sync " TEXT " export"); \ - const auto status{ impl.STUB->Export( context.get(), *req, resp ) }; \ - exportResult = status.ok() ? sdk::common::ExportResult::kSuccess : sdk::common::ExportResult::kFailure; \ + OTEL_INTERNAL_LOG_DEBUG("[otlp_grpc_forward_proxy] " TEXT " export"); \ + auto context{ impl.client->MakeClientContext(impl.clientOptions) }; \ + return impl.STUB->Export( context.get(), *req, resp ); \ } \ - return impl.Finish(cbServerContext, exportResult); \ + return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "OtlpGrpcForwardProxy is not active."); \ }}; + +#if defined(ENABLE_ASYNC_EXPORT) +MAKE_PROXY_ASYNC(MetricExporterProxyAsync, metricExporterStub, MetricsService, ExportMetricsServiceRequest, ExportMetricsServiceResponse, "Async Metric", metricExporterMode) +MAKE_PROXY_ASYNC(TraceExporterProxyAsync, traceExporterStub, TraceService, ExportTraceServiceRequest, ExportTraceServiceResponse, "Async Trace", traceExporterMode) +MAKE_PROXY_ASYNC(LogExporterProxyAsync, logExporterStub, LogsService, ExportLogsServiceRequest, ExportLogsServiceResponse, "Async Log", logExporterMode) #endif -MAKE_PROXY(MetricExporterProxy, metricExporterOptions, metricExporterStub, MetricsService, ExportMetricsServiceRequest, ExportMetricsServiceResponse, "metrics") -MAKE_PROXY(TraceExporterProxy, traceExporterOptions, traceExporterStub, TraceService, ExportTraceServiceRequest, ExportTraceServiceResponse, "traces") -MAKE_PROXY(LogExporterProxy, logExporterOptions, logExporterStub, LogsService, ExportLogsServiceRequest, ExportLogsServiceResponse, "logs") +MAKE_PROXY(MetricExporterProxy, metricExporterStub, MetricsService, ExportMetricsServiceRequest, ExportMetricsServiceResponse, "Metric") +MAKE_PROXY(TraceExporterProxy, traceExporterStub, TraceService, ExportTraceServiceRequest, ExportTraceServiceResponse, "Trace") +MAKE_PROXY(LogExporterProxy, logExporterStub, LogsService, ExportLogsServiceRequest, ExportLogsServiceResponse, "Log") -OtlpGrpcForwardProxy::OtlpGrpcForwardProxy() - : impl(std::make_unique()) +OtlpGrpcForwardProxy::OtlpGrpcForwardProxy(const OtlpGrpcClientOptions& options_) + : impl(std::make_unique(options_)) { } @@ -203,24 +233,6 @@ bool OtlpGrpcForwardProxy::IsActive() const return impl->active; } -void OtlpGrpcForwardProxy::SetAsync(bool async_) -{ - assert(impl != nullptr); -#if defined(ENABLE_ASYNC_EXPORT) && ENABLE_ASYNC_EXPORT - impl->async = async_; -#endif -} - -bool OtlpGrpcForwardProxy::IsAsync() const -{ - assert(impl != nullptr); -#if defined(ENABLE_ASYNC_EXPORT) && ENABLE_ASYNC_EXPORT - return impl->async; -#else - return false; -#endif -} - void OtlpGrpcForwardProxy::AddListenAddress(const std::string& listenAddress) { assert(impl != nullptr); @@ -228,35 +240,68 @@ void OtlpGrpcForwardProxy::AddListenAddress(const std::string& listenAddress) impl->serverBuilder.AddListeningPort(listenAddress, grpc::InsecureServerCredentials(), &( pair.port )); } -void OtlpGrpcForwardProxy::RegisterMetricExporter(const OtlpGrpcMetricExporterOptions& options) +void OtlpGrpcForwardProxy::RegisterMetricExporter( ExportMode exportMode ) { assert(impl != nullptr); - assert(impl->metricExporterProxy == nullptr); assert(impl->metricExporterStub == nullptr); - impl->metricExporterOptions = options; - impl->metricExporterStub = impl->client.MakeMetricsServiceStub(options); + assert(impl->metricExporterProxy == nullptr); + impl->metricExporterStub = impl->client->MakeMetricsServiceStub(); +#if defined(ENABLE_ASYNC_EXPORT) + assert(impl->metricExporterProxyAsync == nullptr); + assert(impl->metricExporterMode == ExportMode::Unknown); + if( exportMode != ExportMode::Sync && impl->clientOptions.max_concurrent_requests > 1 ) + { + impl->metricExporterMode = exportMode; + impl->metricExporterProxyAsync = std::make_unique(*impl); + impl->serverBuilder.RegisterService(impl->metricExporterProxyAsync.get()); + return; + } + impl->metricExporterMode = ExportMode::Sync; +#endif impl->metricExporterProxy = std::make_unique(*impl); impl->serverBuilder.RegisterService(impl->metricExporterProxy.get()); } -void OtlpGrpcForwardProxy::RegisterTraceExporter(const OtlpGrpcExporterOptions& options) +void OtlpGrpcForwardProxy::RegisterTraceExporter( ExportMode exportMode ) { assert(impl != nullptr); - assert(impl->traceExporterProxy == nullptr); assert(impl->traceExporterStub == nullptr); - impl->traceExporterOptions = options; - impl->traceExporterStub = impl->client.MakeTraceServiceStub(options); + assert(impl->traceExporterProxy == nullptr); + impl->traceExporterStub = impl->client->MakeTraceServiceStub(); +#if defined(ENABLE_ASYNC_EXPORT) + assert(impl->traceExporterProxyAsync == nullptr); + assert(impl->traceExporterMode == ExportMode::Unknown); + if( exportMode != ExportMode::Sync && impl->clientOptions.max_concurrent_requests > 1 ) + { + impl->traceExporterMode = exportMode; + impl->traceExporterProxyAsync = std::make_unique(*impl); + impl->serverBuilder.RegisterService(impl->traceExporterProxyAsync.get()); + return; + } + impl->traceExporterMode = ExportMode::Sync; +#endif impl->traceExporterProxy = std::make_unique(*impl); impl->serverBuilder.RegisterService(impl->traceExporterProxy.get()); } -void OtlpGrpcForwardProxy::RegisterLogRecordExporter(const OtlpGrpcLogRecordExporterOptions& options) +void OtlpGrpcForwardProxy::RegisterLogRecordExporter( ExportMode exportMode ) { assert(impl != nullptr); - assert(impl->logExporterProxy == nullptr); assert(impl->logExporterStub == nullptr); - impl->logExporterOptions = options; - impl->logExporterStub = impl->client.MakeLogsServiceStub(options); + assert(impl->logExporterProxy == nullptr); + impl->logExporterStub = impl->client->MakeLogsServiceStub(); +#if defined(ENABLE_ASYNC_EXPORT) + assert(impl->logExporterProxyAsync == nullptr); + assert(impl->logExporterMode == ExportMode::Unknown); + if( exportMode != ExportMode::Sync && impl->clientOptions.max_concurrent_requests > 1 ) + { + impl->logExporterMode = exportMode; + impl->logExporterProxyAsync = std::make_unique(*impl); + impl->serverBuilder.RegisterService(impl->logExporterProxyAsync.get()); + return; + } + impl->logExporterMode = ExportMode::Sync; +#endif impl->logExporterProxy = std::make_unique(*impl); impl->serverBuilder.RegisterService(impl->logExporterProxy.get()); } diff --git a/x/x.cpp b/x/x.cpp index 10ab19e451..87f3404b33 100644 --- a/x/x.cpp +++ b/x/x.cpp @@ -391,23 +391,17 @@ struct proxy_thread { using namespace opentelemetry::exporter::otlp; - OtlpGrpcExporterOptions traceExporterOptions; - traceExporterOptions.endpoint = GetOtlpDefaultGrpcEndpoint(); + OtlpGrpcClientOptions clientOptions; + clientOptions.endpoint = GetOtlpDefaultGrpcEndpoint(); + clientOptions.max_concurrent_requests = 1024; - OtlpGrpcMetricExporterOptions metricExporterOptions; - metricExporterOptions.endpoint = GetOtlpDefaultGrpcEndpoint(); - - OtlpGrpcLogRecordExporterOptions logExporterOptions; - logExporterOptions.endpoint = GetOtlpDefaultGrpcEndpoint(); - - ctx->proxy = std::make_unique(); + ctx->proxy = std::make_unique(clientOptions); ctx->proxy->SetActive(true); - ctx->proxy->SetAsync(true); ctx->proxy->AddListenAddress("127.0.0.1:4317"); - proxy->RegisterMetricExporter(metricExporterOptions); - proxy->RegisterTraceExporter(traceExporterOptions); - proxy->RegisterLogRecordExporter(logExporterOptions); + proxy->RegisterMetricExporter(OtlpGrpcForwardProxy::ExportMode::AsyncBlockOnFull); + proxy->RegisterTraceExporter(OtlpGrpcForwardProxy::ExportMode::AsyncBlockOnFull); + proxy->RegisterLogRecordExporter(OtlpGrpcForwardProxy::ExportMode::AsyncBlockOnFull); printf("Start\n"); ctx->proxy->Start(); {