From da7796b3b53a550a632bf206cc3da0d42c05b680 Mon Sep 17 00:00:00 2001 From: jack-berg <34418638+jack-berg@users.noreply.github.com> Date: Thu, 4 Jan 2024 09:36:03 -0600 Subject: [PATCH] Use minimal fallback managed channel when none is specified (#6110) --- .../otlp/trace/OltpExporterBenchmark.java | 1 + .../AbstractGrpcTelemetryExporterTest.java | 21 ++++++++++++ ...anagedChannelTelemetryExporterBuilder.java | 21 +++++++----- .../internal/UpstreamGrpcSender.java | 10 +++++- .../internal/UpstreamGrpcSenderProvider.java | 32 ++++++++++++++++++- 5 files changed, 75 insertions(+), 10 deletions(-) diff --git a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java index 1ad124271a3..ef620333474 100644 --- a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java +++ b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java @@ -85,6 +85,7 @@ public void setUp() { "span", new UpstreamGrpcSender<>( MarshalerTraceServiceGrpc.newFutureStub(defaultGrpcChannel, null), + /* shutdownChannel= */ false, 10, Collections::emptyMap), MeterProvider::noop); diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java index c2490029aac..0b3d79a2690 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java @@ -5,6 +5,7 @@ package io.opentelemetry.exporter.otlp.testing.internal; +import static org.assertj.core.api.Assertions.as; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -23,8 +24,10 @@ import com.linecorp.armeria.testing.junit5.server.SelfSignedCertificateExtension; import com.linecorp.armeria.testing.junit5.server.ServerExtension; import io.github.netmikey.logunit.api.LogCapturer; +import io.grpc.ManagedChannel; import io.opentelemetry.exporter.internal.TlsUtil; import io.opentelemetry.exporter.internal.grpc.GrpcExporter; +import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub; import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; @@ -61,6 +64,7 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.X509KeyManager; import javax.net.ssl.X509TrustManager; +import org.assertj.core.api.InstanceOfAssertFactories; import org.assertj.core.api.iterable.ThrowingExtractor; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -215,6 +219,23 @@ void reset() { httpRequests.clear(); } + @Test + void minimalChannel() { + // Test that UpstreamGrpcSender uses minimal fallback managed channel, so skip for + // OkHttpGrpcSender + assumeThat(exporter.unwrap()) + .extracting("delegate.grpcSender") + .matches(sender -> sender.getClass().getSimpleName().equals("UpstreamGrpcSender")); + // When no channel is explicitly set, should fall back to a minimally configured managed channel + TelemetryExporter exporter = exporterBuilder().build(); + assertThat(exporter.shutdown().join(10, TimeUnit.SECONDS).isSuccess()).isTrue(); + assertThat(exporter.unwrap()) + .extracting( + "delegate.grpcSender.stub", + as(InstanceOfAssertFactories.type(MarshalerServiceStub.class))) + .satisfies(stub -> assertThat(((ManagedChannel) stub.getChannel()).isShutdown()).isTrue()); + } + @Test void export() { List telemetry = Collections.singletonList(generateFakeTelemetry()); diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java index be7d97b9678..e5545aa41bd 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java @@ -158,16 +158,21 @@ public TelemetryExporterBuilder setChannel(Object channel) { @Override public TelemetryExporter build() { - requireNonNull(channelBuilder, "channel"); + Runnable shutdownCallback; + if (channelBuilder != null) { + try { + setSslContext(channelBuilder, tlsConfigHelper); + } catch (SSLException e) { + throw new IllegalStateException(e); + } - try { - setSslContext(channelBuilder, tlsConfigHelper); - } catch (SSLException e) { - throw new IllegalStateException(e); + ManagedChannel channel = channelBuilder.build(); + delegate.setChannel(channel); + shutdownCallback = channel::shutdownNow; + } else { + shutdownCallback = () -> {}; } - ManagedChannel channel = channelBuilder.build(); - delegate.setChannel(channel); TelemetryExporter delegateExporter = delegate.build(); return new TelemetryExporter() { @Override @@ -182,7 +187,7 @@ public CompletableResultCode export(Collection items) { @Override public CompletableResultCode shutdown() { - channel.shutdownNow(); + shutdownCallback.run(); return delegateExporter.shutdown(); } }; diff --git a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java index 28f813accc7..f8fa234e4e6 100644 --- a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java +++ b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java @@ -8,6 +8,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.stub.MetadataUtils; @@ -32,16 +33,19 @@ public final class UpstreamGrpcSender implements GrpcSender { private final MarshalerServiceStub stub; + private final boolean shutdownChannel; private final long timeoutNanos; private final Supplier>> headersSupplier; /** Creates a new {@link UpstreamGrpcSender}. */ public UpstreamGrpcSender( MarshalerServiceStub stub, + boolean shutdownChannel, long timeoutNanos, Supplier>> headersSupplier) { - this.timeoutNanos = timeoutNanos; this.stub = stub; + this.shutdownChannel = shutdownChannel; + this.timeoutNanos = timeoutNanos; this.headersSupplier = headersSupplier; } @@ -82,6 +86,10 @@ public void onFailure(Throwable t) { @Override public CompletableResultCode shutdown() { + if (shutdownChannel) { + ManagedChannel channel = (ManagedChannel) stub.getChannel(); + channel.shutdownNow(); + } return CompletableResultCode.ofSuccess(); } } diff --git a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java index 34200681980..a4effdbd09c 100644 --- a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java +++ b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java @@ -7,6 +7,8 @@ import io.grpc.Channel; import io.grpc.Codec; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import io.opentelemetry.exporter.internal.grpc.GrpcSender; import io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider; import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub; @@ -41,6 +43,13 @@ public GrpcSender createSender( @Nullable RetryPolicy retryPolicy, @Nullable SSLContext sslContext, @Nullable X509TrustManager trustManager) { + boolean shutdownChannel = false; + if (managedChannel == null) { + // Shutdown the channel as part of the exporter shutdown sequence if + shutdownChannel = true; + managedChannel = minimalFallbackManagedChannel(endpoint); + } + String authorityOverride = null; Map> headers = headersSupplier.get(); if (headers != null) { @@ -58,6 +67,27 @@ public GrpcSender createSender( .apply((Channel) managedChannel, authorityOverride) .withCompression(codec.getMessageEncoding()); - return new UpstreamGrpcSender<>(stub, timeoutNanos, headersSupplier); + return new UpstreamGrpcSender<>(stub, shutdownChannel, timeoutNanos, headersSupplier); + } + + /** + * If {@link ManagedChannel} is not explicitly set, provide a minimally configured fallback + * channel to avoid failing initialization. + * + *

This is required to accommodate autoconfigure with {@code + * opentelemetry-exporter-sender-grpc-managed-channel} which will always fail to initialize + * without a fallback channel since there isn't an opportunity to explicitly set the channel. + * + *

This only incorporates the target address, port, and whether to use plain text. All + * additional settings are intentionally ignored and must be configured with an explicitly set + * {@link ManagedChannel}. + */ + private static ManagedChannel minimalFallbackManagedChannel(URI endpoint) { + ManagedChannelBuilder channelBuilder = + ManagedChannelBuilder.forAddress(endpoint.getHost(), endpoint.getPort()); + if (!endpoint.getScheme().equals("https")) { + channelBuilder.usePlaintext(); + } + return channelBuilder.build(); } }