Skip to content

Commit

Permalink
emit header such that we can detect loops in the proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
malkia committed Nov 21, 2024
1 parent c85dcbe commit 7060fd9
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 4 deletions.
32 changes: 31 additions & 1 deletion exporters/otlp/src/otlp_grpc_forward_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

#include <memory>
#include <list>
#include <string>
#include <random>

#include <grpcpp/ext/proto_server_reflection_plugin.h>

#include "opentelemetry/version.h"
Expand All @@ -24,6 +27,10 @@ namespace exporter
namespace otlp
{

// Send this header along with unique random number, which we'll reject if received back.
// This is to avoid loops due to misconfugration
static std::string kFwProxyIdHeader{ "otelcpp-otlp-grpc-forward-proxy-id" };

using namespace opentelemetry;

using namespace opentelemetry::exporter::otlp;
Expand Down Expand Up @@ -80,6 +87,15 @@ struct OtlpGrpcForwardProxy::Impl
std::unique_ptr<TraceExporterProxy> traceExporterProxy;
std::unique_ptr<MetricExporterProxy> metricExporterProxy;

std::string fw_proxy_id;

bool LoopDetected( const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata ) const
{
const auto key{ grpc::string_ref( kFwProxyIdHeader ) };
const auto it{ client_metadata.find(key) };
return it != client_metadata.cend() && it->second.compare( grpc::string_ref( fw_proxy_id ) ) == 0;
}

Impl(const OtlpGrpcClientOptions&);
#if defined(ENABLE_ASYNC_EXPORT)
bool HandleExportResult(sdk::common::ExportResult result) const;
Expand All @@ -93,6 +109,14 @@ OtlpGrpcForwardProxy::Impl::Impl(const OtlpGrpcClientOptions& options)
arenaOptions.initial_block_size = 65536;
arenaOptions.max_block_size = 128 * 1024 * 1024;
client = OtlpGrpcClientFactory::Create(clientOptions);

char buf[32]{};
std::random_device rd;
uint64_t p0 = (uint64_t(rd())<<32) | uint64_t(rd());
uint64_t p1 = (uint64_t(rd())<<32) | uint64_t(rd());
sprintf_s(buf, "%08.8lx%08.8lx", p0, p1);
fw_proxy_id = buf;

}

#if defined(ENABLE_ASYNC_EXPORT)
Expand Down Expand Up @@ -168,10 +192,13 @@ grpc::ServerUnaryReactor* OtlpGrpcForwardProxy::Impl::Finish(grpc::CallbackServe
explicit NAME(OtlpGrpcForwardProxy::Impl& impl_): impl(impl_){} \
grpc::ServerUnaryReactor* Export(grpc::CallbackServerContext* cbServerContext, const REQUEST* req, RESPONSE* resp) override { \
auto exportResult{ sdk::common::ExportResult::kFailure }; \
if( impl.LoopDetected(cbServerContext->client_metadata()) ) \
return impl.Finish(cbServerContext, exportResult, grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "OtlpGrpcForwardProxy detected loop.")); \
if( impl.active ) { \
OTEL_INTERNAL_LOG_DEBUG("[otlp_grpc_forward_proxy] " TEXT " export"); \
auto syncStatus{ grpc::Status(grpc::Status(grpc::StatusCode::DO_NOT_USE, "")) }; \
auto context{ impl.client->MakeClientContext(impl.clientOptions) }; \
context->AddMetadata(kFwProxyIdHeader, impl.fw_proxy_id); \
auto arena{ std::make_unique<google::protobuf::Arena>(impl.arenaOptions) }; \
auto request{ *req }; \
exportResult = impl.client->DelegateAsyncExport( impl.clientOptions, impl.STUB.get(), std::move(context), std::move(arena), std::move(request), \
Expand All @@ -193,10 +220,13 @@ grpc::ServerUnaryReactor* OtlpGrpcForwardProxy::Impl::Finish(grpc::CallbackServe
struct OtlpGrpcForwardProxy::Impl::NAME final : public SERVICE::Service { \
OtlpGrpcForwardProxy::Impl& impl; \
explicit NAME(OtlpGrpcForwardProxy::Impl& impl_): impl(impl_){} \
grpc::Status Export(grpc::ServerContext*, const REQUEST* req, RESPONSE* resp) override { \
grpc::Status Export(grpc::ServerContext* serverContext, const REQUEST* req, RESPONSE* resp) override { \
if( impl.LoopDetected(serverContext->client_metadata()) ) \
return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "OtlpGrpcForwardProxy detected loop."); \
if( impl.active ) { \
OTEL_INTERNAL_LOG_DEBUG("[otlp_grpc_forward_proxy] " TEXT " export"); \
auto context{ impl.client->MakeClientContext(impl.clientOptions) }; \
context->AddMetadata(kFwProxyIdHeader, impl.fw_proxy_id); \
return impl.STUB->Export( context.get(), *req, resp ); \
} \
return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "OtlpGrpcForwardProxy is not active."); \
Expand Down
5 changes: 5 additions & 0 deletions x/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,8 @@ cc_binary(
"@opentelemetry-proto//:trace_service_grpc_cc",
],
)

cc_binary(
name = "z",
srcs = ["z.cpp"],
)
6 changes: 3 additions & 3 deletions x/x.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,13 +389,13 @@ struct proxy_thread

OtlpGrpcClientOptions clientOptions;
clientOptions.endpoint = GetOtlpDefaultGrpcEndpoint();
clientOptions.max_concurrent_requests = 16384;
clientOptions.max_threads = 64;
clientOptions.max_concurrent_requests = 10; //16384;
clientOptions.max_threads = 10;

ctx->proxy = std::make_unique<OtlpGrpcForwardProxy>(clientOptions);
ctx->proxy->SetActive(true);

ctx->proxy->AddListenAddress("127.0.0.1:4317");
ctx->proxy->AddListenAddress("localhost:4317");
proxy->RegisterMetricExporter(OtlpGrpcForwardProxy::ExportMode::AsyncDropOnFull);
proxy->RegisterTraceExporter(OtlpGrpcForwardProxy::ExportMode::AsyncDropOnFull);
proxy->RegisterLogRecordExporter(OtlpGrpcForwardProxy::ExportMode::AsyncDropOnFull);
Expand Down

0 comments on commit 7060fd9

Please sign in to comment.