From 7060fd9951e041c3efcf408fa986a57b0a284a1a Mon Sep 17 00:00:00 2001 From: Dimiter 'malkia' Stanev Date: Thu, 21 Nov 2024 10:58:10 -0800 Subject: [PATCH] emit header such that we can detect loops in the proxy --- exporters/otlp/src/otlp_grpc_forward_proxy.cc | 32 ++++++++++++++++++- x/BUILD.bazel | 5 +++ x/x.cpp | 6 ++-- 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/exporters/otlp/src/otlp_grpc_forward_proxy.cc b/exporters/otlp/src/otlp_grpc_forward_proxy.cc index 7eea760090..ff293eea12 100644 --- a/exporters/otlp/src/otlp_grpc_forward_proxy.cc +++ b/exporters/otlp/src/otlp_grpc_forward_proxy.cc @@ -3,6 +3,9 @@ #include #include +#include +#include + #include #include "opentelemetry/version.h" @@ -24,6 +27,10 @@ namespace exporter namespace otlp { +// Send this header along with unique random number, which we'll reject if received back. +// This is to avoid loops due to misconfugration +static std::string kFwProxyIdHeader{ "otelcpp-otlp-grpc-forward-proxy-id" }; + using namespace opentelemetry; using namespace opentelemetry::exporter::otlp; @@ -80,6 +87,15 @@ struct OtlpGrpcForwardProxy::Impl std::unique_ptr traceExporterProxy; std::unique_ptr metricExporterProxy; + std::string fw_proxy_id; + + bool LoopDetected( const std::multimap& client_metadata ) const + { + const auto key{ grpc::string_ref( kFwProxyIdHeader ) }; + const auto it{ client_metadata.find(key) }; + return it != client_metadata.cend() && it->second.compare( grpc::string_ref( fw_proxy_id ) ) == 0; + } + Impl(const OtlpGrpcClientOptions&); #if defined(ENABLE_ASYNC_EXPORT) bool HandleExportResult(sdk::common::ExportResult result) const; @@ -93,6 +109,14 @@ OtlpGrpcForwardProxy::Impl::Impl(const OtlpGrpcClientOptions& options) arenaOptions.initial_block_size = 65536; arenaOptions.max_block_size = 128 * 1024 * 1024; client = OtlpGrpcClientFactory::Create(clientOptions); + + char buf[32]{}; + std::random_device rd; + uint64_t p0 = (uint64_t(rd())<<32) | uint64_t(rd()); + uint64_t p1 = (uint64_t(rd())<<32) | uint64_t(rd()); + sprintf_s(buf, "%08.8lx%08.8lx", p0, p1); + fw_proxy_id = buf; + } #if defined(ENABLE_ASYNC_EXPORT) @@ -168,10 +192,13 @@ grpc::ServerUnaryReactor* OtlpGrpcForwardProxy::Impl::Finish(grpc::CallbackServe explicit NAME(OtlpGrpcForwardProxy::Impl& impl_): impl(impl_){} \ grpc::ServerUnaryReactor* Export(grpc::CallbackServerContext* cbServerContext, const REQUEST* req, RESPONSE* resp) override { \ auto exportResult{ sdk::common::ExportResult::kFailure }; \ + if( impl.LoopDetected(cbServerContext->client_metadata()) ) \ + return impl.Finish(cbServerContext, exportResult, grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "OtlpGrpcForwardProxy detected loop.")); \ if( impl.active ) { \ OTEL_INTERNAL_LOG_DEBUG("[otlp_grpc_forward_proxy] " TEXT " export"); \ auto syncStatus{ grpc::Status(grpc::Status(grpc::StatusCode::DO_NOT_USE, "")) }; \ auto context{ impl.client->MakeClientContext(impl.clientOptions) }; \ + context->AddMetadata(kFwProxyIdHeader, impl.fw_proxy_id); \ auto arena{ std::make_unique(impl.arenaOptions) }; \ auto request{ *req }; \ exportResult = impl.client->DelegateAsyncExport( impl.clientOptions, impl.STUB.get(), std::move(context), std::move(arena), std::move(request), \ @@ -193,10 +220,13 @@ grpc::ServerUnaryReactor* OtlpGrpcForwardProxy::Impl::Finish(grpc::CallbackServe struct OtlpGrpcForwardProxy::Impl::NAME final : public SERVICE::Service { \ OtlpGrpcForwardProxy::Impl& impl; \ explicit NAME(OtlpGrpcForwardProxy::Impl& impl_): impl(impl_){} \ - grpc::Status Export(grpc::ServerContext*, const REQUEST* req, RESPONSE* resp) override { \ + grpc::Status Export(grpc::ServerContext* serverContext, const REQUEST* req, RESPONSE* resp) override { \ + if( impl.LoopDetected(serverContext->client_metadata()) ) \ + return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "OtlpGrpcForwardProxy detected loop."); \ if( impl.active ) { \ OTEL_INTERNAL_LOG_DEBUG("[otlp_grpc_forward_proxy] " TEXT " export"); \ auto context{ impl.client->MakeClientContext(impl.clientOptions) }; \ + context->AddMetadata(kFwProxyIdHeader, impl.fw_proxy_id); \ return impl.STUB->Export( context.get(), *req, resp ); \ } \ return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "OtlpGrpcForwardProxy is not active."); \ diff --git a/x/BUILD.bazel b/x/BUILD.bazel index 62ff481782..cf708fe592 100644 --- a/x/BUILD.bazel +++ b/x/BUILD.bazel @@ -65,3 +65,8 @@ cc_binary( "@opentelemetry-proto//:trace_service_grpc_cc", ], ) + +cc_binary( + name = "z", + srcs = ["z.cpp"], +) \ No newline at end of file diff --git a/x/x.cpp b/x/x.cpp index 4b2dc5d44a..be984d6d16 100644 --- a/x/x.cpp +++ b/x/x.cpp @@ -389,13 +389,13 @@ struct proxy_thread OtlpGrpcClientOptions clientOptions; clientOptions.endpoint = GetOtlpDefaultGrpcEndpoint(); - clientOptions.max_concurrent_requests = 16384; - clientOptions.max_threads = 64; + clientOptions.max_concurrent_requests = 10; //16384; + clientOptions.max_threads = 10; ctx->proxy = std::make_unique(clientOptions); ctx->proxy->SetActive(true); - ctx->proxy->AddListenAddress("127.0.0.1:4317"); + ctx->proxy->AddListenAddress("localhost:4317"); proxy->RegisterMetricExporter(OtlpGrpcForwardProxy::ExportMode::AsyncDropOnFull); proxy->RegisterTraceExporter(OtlpGrpcForwardProxy::ExportMode::AsyncDropOnFull); proxy->RegisterLogRecordExporter(OtlpGrpcForwardProxy::ExportMode::AsyncDropOnFull);