From 60ae0af7f683af11588bc540c052d8f3bc40ef74 Mon Sep 17 00:00:00 2001 From: jack-berg <34418638+jack-berg@users.noreply.github.com> Date: Tue, 2 Jan 2024 15:00:54 -0600 Subject: [PATCH] Add compressor SPI to support additional compression algos (#5990) --- .../internal/compression/Compressor.java | 32 +++++++ .../compression/CompressorProvider.java | 18 ++++ .../internal/compression/CompressorUtil.java | 59 ++++++++++++ .../internal/compression/GzipCompressor.java | 37 ++++++++ .../internal/http/HttpExporterBuilder.java | 16 ++-- .../internal/http/HttpSenderProvider.java | 3 +- .../OtlpHttpLogRecordExporterBuilder.java | 15 ++- .../OtlpHttpMetricExporterBuilder.java | 15 ++- .../trace/OtlpHttpSpanExporterBuilder.java | 15 ++- .../AbstractHttpTelemetryExporterTest.java | 63 +++++++++---- .../internal/compressor/Base64Compressor.java | 35 +++++++ .../compressor/Base64CompressorProvider.java | 17 ++++ ...er.internal.compression.CompressorProvider | 1 + .../sender/jdk/internal/JdkHttpSender.java | 20 ++-- .../jdk/internal/JdkHttpSenderProvider.java | 5 +- .../jdk/internal/JdkHttpSenderTest.java | 4 +- .../okhttp/internal/OkHttpHttpSender.java | 27 +++--- .../internal/OkHttpHttpSenderProvider.java | 5 +- .../internal/HttpExporterBuilderTest.java | 92 ------------------- .../internal/OkHttpHttpSuppressionTest.java | 2 +- 20 files changed, 327 insertions(+), 154 deletions(-) create mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/Compressor.java create mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorProvider.java create mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorUtil.java create mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/GzipCompressor.java create mode 100644 exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/compressor/Base64Compressor.java create mode 100644 exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/compressor/Base64CompressorProvider.java create mode 100644 exporters/otlp/testing-internal/src/main/resources/META-INF/services/io.opentelemetry.exporter.internal.compression.CompressorProvider delete mode 100644 exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/HttpExporterBuilderTest.java diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/Compressor.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/Compressor.java new file mode 100644 index 00000000000..71894cc9d4a --- /dev/null +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/Compressor.java @@ -0,0 +1,32 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.compression; + +import java.io.IOException; +import java.io.OutputStream; +import javax.annotation.concurrent.ThreadSafe; + +/** + * An abstraction for compressing messages. Implementation MUST be thread safe as the same instance + * is expected to be used many times and concurrently. Instances are usually singletons. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +@ThreadSafe +public interface Compressor { + + /** + * The name of the compressor encoding. + * + *

Used to identify the compressor during configuration and to populate the {@code + * Content-Encoding} header. + */ + String getEncoding(); + + /** Wrap the {@code outputStream} with a compressing output stream. */ + OutputStream compress(OutputStream outputStream) throws IOException; +} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorProvider.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorProvider.java new file mode 100644 index 00000000000..6b4518f1ea0 --- /dev/null +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorProvider.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.compression; + +/** + * A service provider interface (SPI) for providing {@link Compressor}s. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public interface CompressorProvider { + + /** Return the {@link Compressor}. */ + Compressor getInstance(); +} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorUtil.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorUtil.java new file mode 100644 index 00000000000..6a777f759ba --- /dev/null +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorUtil.java @@ -0,0 +1,59 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.compression; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.Set; + +/** + * Utilities for resolving SPI {@link Compressor}s. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + * + * @see CompressorProvider + */ +public final class CompressorUtil { + + private static final Map compressorRegistry = buildCompressorRegistry(); + + private CompressorUtil() {} + + /** Get list of loaded compressors, named according to {@link Compressor#getEncoding()}. */ + public static Set supportedCompressors() { + return Collections.unmodifiableSet(compressorRegistry.keySet()); + } + + /** + * Resolve the {@link Compressor} with the {@link Compressor#getEncoding()} equal to the {@code + * encoding}. + * + * @throws IllegalArgumentException if no match is found + */ + public static Compressor resolveCompressor(String encoding) { + Compressor compressor = compressorRegistry.get(encoding); + if (compressor == null) { + throw new IllegalArgumentException( + "Could not resolve compressor for encoding \"" + encoding + "\"."); + } + return compressor; + } + + private static Map buildCompressorRegistry() { + Map compressors = new HashMap<>(); + for (CompressorProvider spi : + ServiceLoader.load(CompressorProvider.class, CompressorUtil.class.getClassLoader())) { + Compressor compressor = spi.getInstance(); + compressors.put(compressor.getEncoding(), compressor); + } + // Hardcode gzip compressor + compressors.put(GzipCompressor.getInstance().getEncoding(), GzipCompressor.getInstance()); + return compressors; + } +} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/GzipCompressor.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/GzipCompressor.java new file mode 100644 index 00000000000..7395fdb41b1 --- /dev/null +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/GzipCompressor.java @@ -0,0 +1,37 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.compression; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.zip.GZIPOutputStream; + +/** + * Gzip {@link Compressor}. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class GzipCompressor implements Compressor { + + private static final GzipCompressor INSTANCE = new GzipCompressor(); + + private GzipCompressor() {} + + public static GzipCompressor getInstance() { + return INSTANCE; + } + + @Override + public String getEncoding() { + return "gzip"; + } + + @Override + public OutputStream compress(OutputStream outputStream) throws IOException { + return new GZIPOutputStream(outputStream); + } +} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java index ad46956a702..2285cc97729 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java @@ -11,6 +11,7 @@ import io.opentelemetry.exporter.internal.ExporterBuilderUtil; import io.opentelemetry.exporter.internal.TlsConfigHelper; import io.opentelemetry.exporter.internal.auth.Authenticator; +import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.net.URI; @@ -19,6 +20,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.ServiceLoader; import java.util.StringJoiner; import java.util.concurrent.TimeUnit; @@ -48,8 +50,8 @@ public final class HttpExporterBuilder { private String endpoint; private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS); + @Nullable private Compressor compressor; private long connectTimeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_CONNECT_TIMEOUT_SECS); - private boolean compressionEnabled = false; private boolean exportAsJson = false; private final Map constantHeaders = new HashMap<>(); private Supplier> headerSupplier = Collections::emptyMap; @@ -82,8 +84,8 @@ public HttpExporterBuilder setEndpoint(String endpoint) { return this; } - public HttpExporterBuilder setCompression(String compressionMethod) { - this.compressionEnabled = compressionMethod.equals("gzip"); + public HttpExporterBuilder setCompression(@Nullable Compressor compressor) { + this.compressor = compressor; return this; } @@ -141,7 +143,7 @@ public HttpExporterBuilder copy() { copy.timeoutNanos = timeoutNanos; copy.connectTimeoutNanos = connectTimeoutNanos; copy.exportAsJson = exportAsJson; - copy.compressionEnabled = compressionEnabled; + copy.compressor = compressor; copy.constantHeaders.putAll(constantHeaders); copy.headerSupplier = headerSupplier; copy.tlsConfigHelper = tlsConfigHelper.copy(); @@ -179,7 +181,7 @@ public HttpExporter build() { HttpSender httpSender = httpSenderProvider.createSender( endpoint, - compressionEnabled, + compressor, exportAsJson ? "application/json" : "application/x-protobuf", timeoutNanos, connectTimeoutNanos, @@ -202,8 +204,10 @@ public String toString(boolean includePrefixAndSuffix) { joiner.add("type=" + type); joiner.add("endpoint=" + endpoint); joiner.add("timeoutNanos=" + timeoutNanos); + joiner.add( + "compressorEncoding=" + + Optional.ofNullable(compressor).map(Compressor::getEncoding).orElse(null)); joiner.add("connectTimeoutNanos=" + connectTimeoutNanos); - joiner.add("compressionEnabled=" + compressionEnabled); joiner.add("exportAsJson=" + exportAsJson); StringJoiner headersJoiner = new StringJoiner(", ", "Headers{", "}"); constantHeaders.forEach((key, value) -> headersJoiner.add(key + "=OBFUSCATED")); diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpSenderProvider.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpSenderProvider.java index 6ab06f31d98..20d4322ae67 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpSenderProvider.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpSenderProvider.java @@ -6,6 +6,7 @@ package io.opentelemetry.exporter.internal.http; import io.opentelemetry.exporter.internal.auth.Authenticator; +import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.util.List; import java.util.Map; @@ -27,7 +28,7 @@ public interface HttpSenderProvider { @SuppressWarnings("TooManyParameters") HttpSender createSender( String endpoint, - boolean compressionEnabled, + @Nullable Compressor compressor, String contentType, long timeoutNanos, long connectTimeout, diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java index 59a84e76e72..5861cd15832 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java @@ -7,15 +7,18 @@ import static io.opentelemetry.api.internal.Utils.checkArgument; import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.joining; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.compression.CompressorUtil; import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler; import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.time.Duration; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import javax.net.ssl.SSLContext; @@ -101,10 +104,16 @@ public OtlpHttpLogRecordExporterBuilder setEndpoint(String endpoint) { */ public OtlpHttpLogRecordExporterBuilder setCompression(String compressionMethod) { requireNonNull(compressionMethod, "compressionMethod"); + if (compressionMethod.equals("none")) { + delegate.setCompression(null); + return this; + } + Set supportedCompressionMethods = CompressorUtil.supportedCompressors(); checkArgument( - compressionMethod.equals("gzip") || compressionMethod.equals("none"), - "Unsupported compression method. Supported compression methods include: gzip, none."); - delegate.setCompression(compressionMethod); + supportedCompressionMethods.contains(compressionMethod), + "Unsupported compressionMethod. Compression method must be \"none\" or one of: " + + supportedCompressionMethods.stream().collect(joining(",", "[", "]"))); + delegate.setCompression(CompressorUtil.resolveCompressor(compressionMethod)); return this; } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java index 5d573da557e..9f97d7b3f9e 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java @@ -7,8 +7,10 @@ import static io.opentelemetry.api.internal.Utils.checkArgument; import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.joining; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.compression.CompressorUtil; import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler; import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent; @@ -19,6 +21,7 @@ import io.opentelemetry.sdk.metrics.export.MetricExporter; import java.time.Duration; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import javax.net.ssl.SSLContext; @@ -113,10 +116,16 @@ public OtlpHttpMetricExporterBuilder setEndpoint(String endpoint) { */ public OtlpHttpMetricExporterBuilder setCompression(String compressionMethod) { requireNonNull(compressionMethod, "compressionMethod"); + if (compressionMethod.equals("none")) { + delegate.setCompression(null); + return this; + } + Set supportedCompressionMethods = CompressorUtil.supportedCompressors(); checkArgument( - compressionMethod.equals("gzip") || compressionMethod.equals("none"), - "Unsupported compression method. Supported compression methods include: gzip, none."); - delegate.setCompression(compressionMethod); + supportedCompressionMethods.contains(compressionMethod), + "Unsupported compressionMethod. Compression method must be \"none\" or one of: " + + supportedCompressionMethods.stream().collect(joining(",", "[", "]"))); + delegate.setCompression(CompressorUtil.resolveCompressor(compressionMethod)); return this; } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java index 58768c98881..ed5975aa4b2 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java @@ -7,15 +7,18 @@ import static io.opentelemetry.api.internal.Utils.checkArgument; import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.joining; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.compression.CompressorUtil; import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler; import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.time.Duration; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import javax.net.ssl.SSLContext; @@ -101,10 +104,16 @@ public OtlpHttpSpanExporterBuilder setEndpoint(String endpoint) { */ public OtlpHttpSpanExporterBuilder setCompression(String compressionMethod) { requireNonNull(compressionMethod, "compressionMethod"); + if (compressionMethod.equals("none")) { + delegate.setCompression(null); + return this; + } + Set supportedCompressionMethods = CompressorUtil.supportedCompressors(); checkArgument( - compressionMethod.equals("gzip") || compressionMethod.equals("none"), - "Unsupported compression method. Supported compression methods include: gzip, none."); - delegate.setCompression(compressionMethod); + supportedCompressionMethods.contains(compressionMethod), + "Unsupported compressionMethod. Compression method must be \"none\" or one of: " + + supportedCompressionMethods.stream().collect(joining(",", "[", "]"))); + delegate.setCompression(CompressorUtil.resolveCompressor(compressionMethod)); return this; } diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java index a29b9f0b5b2..3b903ba0c32 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java @@ -27,8 +27,10 @@ import com.linecorp.armeria.testing.junit5.server.ServerExtension; import io.github.netmikey.logunit.api.LogCapturer; import io.opentelemetry.exporter.internal.TlsUtil; +import io.opentelemetry.exporter.internal.compression.GzipCompressor; import io.opentelemetry.exporter.internal.http.HttpExporter; import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.exporter.otlp.testing.internal.compressor.Base64Compressor; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; @@ -49,6 +51,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -66,6 +69,7 @@ import okio.Buffer; import okio.GzipSource; import okio.Okio; +import okio.Source; import org.assertj.core.api.iterable.ThrowingExtractor; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -163,8 +167,7 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) { aggReq -> { T request; try { - byte[] requestBody = - maybeGzipInflate(aggReq.headers(), aggReq.content().array()); + byte[] requestBody = maybeInflate(aggReq.headers(), aggReq.content().array()); request = parse.extractThrows(requestBody); } catch (IOException e) { throw new UncheckedIOException(e); @@ -181,15 +184,22 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) { return HttpResponse.of(responseFuture); } - private static byte[] maybeGzipInflate(RequestHeaders requestHeaders, byte[] content) + private static byte[] maybeInflate(RequestHeaders requestHeaders, byte[] content) throws IOException { - if (!requestHeaders.contains("content-encoding", "gzip")) { - return content; + if (requestHeaders.contains("content-encoding", "gzip")) { + Buffer buffer = new Buffer(); + GzipSource gzipSource = new GzipSource(Okio.source(new ByteArrayInputStream(content))); + gzipSource.read(buffer, Integer.MAX_VALUE); + return buffer.readByteArray(); } - Buffer buffer = new Buffer(); - GzipSource gzipSource = new GzipSource(Okio.source(new ByteArrayInputStream(content))); - gzipSource.read(buffer, Integer.MAX_VALUE); - return buffer.readByteArray(); + if (requestHeaders.contains("content-encoding", "base64")) { + Buffer buffer = new Buffer(); + Source base64Source = + Okio.source(Base64.getDecoder().wrap(new ByteArrayInputStream(content))); + base64Source.read(buffer, Integer.MAX_VALUE); + return buffer.readByteArray(); + } + return content; } } @@ -275,9 +285,7 @@ void multipleItems() { void compressionWithNone() { TelemetryExporter exporter = exporterBuilder().setEndpoint(server.httpUri() + path).setCompression("none").build(); - assertThat(exporter.unwrap()) - .extracting("delegate.httpSender.compressionEnabled") - .isEqualTo(false); + assertThat(exporter.unwrap()).extracting("delegate.httpSender.compressor").isNull(); try { CompletableResultCode result = exporter.export(Collections.singletonList(generateFakeTelemetry())); @@ -295,8 +303,8 @@ void compressionWithGzip() { TelemetryExporter exporter = exporterBuilder().setEndpoint(server.httpUri() + path).setCompression("gzip").build(); assertThat(exporter.unwrap()) - .extracting("delegate.httpSender.compressionEnabled") - .isEqualTo(true); + .extracting("delegate.httpSender.compressor") + .isEqualTo(GzipCompressor.getInstance()); try { CompletableResultCode result = exporter.export(Collections.singletonList(generateFakeTelemetry())); @@ -309,6 +317,25 @@ void compressionWithGzip() { } } + @Test + void compressionWithSpiCompressor() { + TelemetryExporter exporter = + exporterBuilder().setEndpoint(server.httpUri() + path).setCompression("base64").build(); + assertThat(exporter.unwrap()) + .extracting("delegate.httpSender.compressor") + .isEqualTo(Base64Compressor.getInstance()); + try { + CompletableResultCode result = + exporter.export(Collections.singletonList(generateFakeTelemetry())); + assertThat(result.join(10, TimeUnit.SECONDS).isSuccess()).isTrue(); + assertThat(httpRequests) + .singleElement() + .satisfies(req -> assertThat(req.headers().get("content-encoding")).isEqualTo("base64")); + } finally { + exporter.shutdown(); + } + } + @Test void authorityWithAuth() { TelemetryExporter exporter = @@ -659,6 +686,8 @@ void validConfig() { .doesNotThrowAnyException(); assertThatCode(() -> exporterBuilder().setCompression("gzip")).doesNotThrowAnyException(); + // SPI compressor available for this test but not packaged with OTLP exporter + assertThatCode(() -> exporterBuilder().setCompression("base64")).doesNotThrowAnyException(); assertThatCode(() -> exporterBuilder().setCompression("none")).doesNotThrowAnyException(); assertThatCode(() -> exporterBuilder().addHeader("foo", "bar").addHeader("baz", "qux")) @@ -711,7 +740,7 @@ void invalidConfig() { assertThatThrownBy(() -> exporterBuilder().setCompression("foo")) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Unsupported compression method. Supported compression methods include: gzip, none."); + "Unsupported compressionMethod. Compression method must be \"none\" or one of: [base64,gzip]"); } @Test @@ -786,10 +815,10 @@ void stringRepresentation() throws IOException, CertificateEncodingException { + "timeoutNanos=" + TimeUnit.SECONDS.toNanos(10) + ", " + + "compressorEncoding=null, " + "connectTimeoutNanos=" + TimeUnit.SECONDS.toNanos(10) + ", " - + "compressionEnabled=false, " + "exportAsJson=false, " + "headers=Headers\\{User-Agent=OBFUSCATED\\}" + "\\}"); @@ -826,10 +855,10 @@ void stringRepresentation() throws IOException, CertificateEncodingException { + "timeoutNanos=" + TimeUnit.SECONDS.toNanos(5) + ", " + + "compressorEncoding=gzip, " + "connectTimeoutNanos=" + TimeUnit.SECONDS.toNanos(4) + ", " - + "compressionEnabled=true, " + "exportAsJson=false, " + "headers=Headers\\{.*foo=OBFUSCATED.*\\}, " + "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3\\}" diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/compressor/Base64Compressor.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/compressor/Base64Compressor.java new file mode 100644 index 00000000000..b0b3121d635 --- /dev/null +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/compressor/Base64Compressor.java @@ -0,0 +1,35 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.testing.internal.compressor; + +import io.opentelemetry.exporter.internal.compression.Compressor; +import java.io.OutputStream; +import java.util.Base64; + +/** + * This exists to test the compressor SPI mechanism but does not actually compress data in any + * useful way. + */ +public class Base64Compressor implements Compressor { + + private static final Base64Compressor INSTANCE = new Base64Compressor(); + + private Base64Compressor() {} + + public static Base64Compressor getInstance() { + return INSTANCE; + } + + @Override + public String getEncoding() { + return "base64"; + } + + @Override + public OutputStream compress(OutputStream outputStream) { + return Base64.getEncoder().wrap(outputStream); + } +} diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/compressor/Base64CompressorProvider.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/compressor/Base64CompressorProvider.java new file mode 100644 index 00000000000..8d4b4a6cdbc --- /dev/null +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/compressor/Base64CompressorProvider.java @@ -0,0 +1,17 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.testing.internal.compressor; + +import io.opentelemetry.exporter.internal.compression.Compressor; +import io.opentelemetry.exporter.internal.compression.CompressorProvider; + +public class Base64CompressorProvider implements CompressorProvider { + + @Override + public Compressor getInstance() { + return Base64Compressor.getInstance(); + } +} diff --git a/exporters/otlp/testing-internal/src/main/resources/META-INF/services/io.opentelemetry.exporter.internal.compression.CompressorProvider b/exporters/otlp/testing-internal/src/main/resources/META-INF/services/io.opentelemetry.exporter.internal.compression.CompressorProvider new file mode 100644 index 00000000000..6fac487c249 --- /dev/null +++ b/exporters/otlp/testing-internal/src/main/resources/META-INF/services/io.opentelemetry.exporter.internal.compression.CompressorProvider @@ -0,0 +1 @@ +io.opentelemetry.exporter.otlp.testing.internal.compressor.Base64CompressorProvider diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java index 673b315d230..73d895a4883 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java @@ -5,6 +5,7 @@ package io.opentelemetry.exporter.sender.jdk.internal; +import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.http.HttpSender; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.RetryPolicy; @@ -29,7 +30,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; -import java.util.zip.GZIPOutputStream; import javax.annotation.Nullable; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLException; @@ -52,7 +52,7 @@ public final class JdkHttpSender implements HttpSender { private final ExecutorService executorService = Executors.newFixedThreadPool(5); private final HttpClient client; private final URI uri; - private final boolean compressionEnabled; + @Nullable private final Compressor compressor; private final String contentType; private final long timeoutNanos; private final Supplier>> headerSupplier; @@ -62,7 +62,7 @@ public final class JdkHttpSender implements HttpSender { JdkHttpSender( HttpClient client, String endpoint, - boolean compressionEnabled, + @Nullable Compressor compressor, String contentType, long timeoutNanos, Supplier>> headerSupplier, @@ -73,7 +73,7 @@ public final class JdkHttpSender implements HttpSender { } catch (URISyntaxException e) { throw new IllegalArgumentException(e); } - this.compressionEnabled = compressionEnabled; + this.compressor = compressor; this.contentType = contentType; this.timeoutNanos = timeoutNanos; this.headerSupplier = headerSupplier; @@ -82,7 +82,7 @@ public final class JdkHttpSender implements HttpSender { JdkHttpSender( String endpoint, - boolean compressionEnabled, + @Nullable Compressor compressor, String contentType, long timeoutNanos, long connectTimeoutNanos, @@ -92,7 +92,7 @@ public final class JdkHttpSender implements HttpSender { this( configureClient(sslContext, connectTimeoutNanos), endpoint, - compressionEnabled, + compressor, contentType, timeoutNanos, headerSupplier, @@ -148,10 +148,10 @@ HttpResponse sendInternal(Consumer marshaler) throws IOExc NoCopyByteArrayOutputStream os = threadLocalBaos.get(); os.reset(); - if (compressionEnabled) { - requestBuilder.header("Content-Encoding", "gzip"); - try (GZIPOutputStream gzos = new GZIPOutputStream(os)) { - marshaler.accept(gzos); + if (compressor != null) { + requestBuilder.header("Content-Encoding", compressor.getEncoding()); + try (OutputStream compressed = compressor.compress(os)) { + marshaler.accept(compressed); } catch (IOException e) { throw new IllegalStateException(e); } diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java index 2478079dce6..1891cf27c85 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java @@ -6,6 +6,7 @@ package io.opentelemetry.exporter.sender.jdk.internal; import io.opentelemetry.exporter.internal.auth.Authenticator; +import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.http.HttpSender; import io.opentelemetry.exporter.internal.http.HttpSenderProvider; import io.opentelemetry.sdk.common.export.RetryPolicy; @@ -27,7 +28,7 @@ public final class JdkHttpSenderProvider implements HttpSenderProvider { @Override public HttpSender createSender( String endpoint, - boolean compressionEnabled, + @Nullable Compressor compressor, String contentType, long timeoutNanos, long connectTimeout, @@ -38,7 +39,7 @@ public HttpSender createSender( @Nullable X509TrustManager trustManager) { return new JdkHttpSender( endpoint, - compressionEnabled, + compressor, contentType, timeoutNanos, connectTimeout, diff --git a/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java b/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java index 0bf99ab9525..1e2ee939863 100644 --- a/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java +++ b/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java @@ -53,7 +53,7 @@ void setup() throws IOException, InterruptedException { mockHttpClient, "http://10.255.255.1", // Connecting to a non-routable IP address to trigger connection // timeout - false, + null, "text/plain", Duration.ofSeconds(10).toNanos(), Collections::emptyMap, @@ -98,7 +98,7 @@ void connectTimeout() { sender = new JdkHttpSender( "http://localhost", - true, + null, "text/plain", 1, TimeUnit.SECONDS.toNanos(10), diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java index 337a722a06d..7724416c7fe 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java @@ -8,6 +8,7 @@ import io.opentelemetry.exporter.internal.InstrumentationUtil; import io.opentelemetry.exporter.internal.RetryUtil; import io.opentelemetry.exporter.internal.auth.Authenticator; +import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.http.HttpSender; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.RetryPolicy; @@ -30,7 +31,6 @@ import okhttp3.RequestBody; import okhttp3.ResponseBody; import okio.BufferedSink; -import okio.GzipSink; import okio.Okio; /** @@ -43,7 +43,7 @@ public final class OkHttpHttpSender implements HttpSender { private final OkHttpClient client; private final HttpUrl url; - private final boolean compressionEnabled; + @Nullable private final Compressor compressor; private final Supplier>> headerSupplier; private final MediaType mediaType; @@ -51,7 +51,7 @@ public final class OkHttpHttpSender implements HttpSender { @SuppressWarnings("TooManyParameters") public OkHttpHttpSender( String endpoint, - boolean compressionEnabled, + @Nullable Compressor compressor, String contentType, long timeoutNanos, long connectionTimeoutNanos, @@ -85,7 +85,7 @@ public OkHttpHttpSender( } this.client = builder.build(); this.url = HttpUrl.get(endpoint); - this.compressionEnabled = compressionEnabled; + this.compressor = compressor; this.mediaType = MediaType.parse(contentType); this.headerSupplier = headerSupplier; } @@ -104,9 +104,9 @@ public void send( (key, values) -> values.forEach(value -> requestBuilder.addHeader(key, value))); } RequestBody body = new RawRequestBody(marshaler, contentLength, mediaType); - if (compressionEnabled) { - requestBuilder.addHeader("Content-Encoding", "gzip"); - requestBuilder.post(new GzipRequestBody(body)); + if (compressor != null) { + requestBuilder.addHeader("Content-Encoding", compressor.getEncoding()); + requestBuilder.post(new CompressedRequestBody(compressor, body)); } else { requestBuilder.post(body); } @@ -188,10 +188,12 @@ public void writeTo(BufferedSink bufferedSink) { } } - private static class GzipRequestBody extends RequestBody { + private static class CompressedRequestBody extends RequestBody { + private final Compressor compressor; private final RequestBody requestBody; - private GzipRequestBody(RequestBody requestBody) { + private CompressedRequestBody(Compressor compressor, RequestBody requestBody) { + this.compressor = compressor; this.requestBody = requestBody; } @@ -207,9 +209,10 @@ public long contentLength() { @Override public void writeTo(BufferedSink bufferedSink) throws IOException { - BufferedSink gzipSink = Okio.buffer(new GzipSink(bufferedSink)); - requestBody.writeTo(gzipSink); - gzipSink.close(); + BufferedSink compressedSink = + Okio.buffer(Okio.sink(compressor.compress(bufferedSink.outputStream()))); + requestBody.writeTo(compressedSink); + compressedSink.close(); } } } diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java index 18573049082..d969a1ee23d 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java @@ -6,6 +6,7 @@ package io.opentelemetry.exporter.sender.okhttp.internal; import io.opentelemetry.exporter.internal.auth.Authenticator; +import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.http.HttpSender; import io.opentelemetry.exporter.internal.http.HttpSenderProvider; import io.opentelemetry.sdk.common.export.RetryPolicy; @@ -27,7 +28,7 @@ public final class OkHttpHttpSenderProvider implements HttpSenderProvider { @Override public HttpSender createSender( String endpoint, - boolean compressionEnabled, + @Nullable Compressor compressor, String contentType, long timeoutNanos, long connectTimeout, @@ -38,7 +39,7 @@ public HttpSender createSender( @Nullable X509TrustManager trustManager) { return new OkHttpHttpSender( endpoint, - compressionEnabled, + compressor, contentType, timeoutNanos, connectTimeout, diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/HttpExporterBuilderTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/HttpExporterBuilderTest.java deleted file mode 100644 index b7b80453be0..00000000000 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/HttpExporterBuilderTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.exporter.sender.okhttp.internal; - -import static org.assertj.core.api.Assertions.assertThat; - -import io.opentelemetry.exporter.internal.http.HttpExporter; -import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; -import io.opentelemetry.exporter.internal.marshal.Marshaler; -import org.junit.jupiter.api.Test; - -class HttpExporterBuilderTest { - - private final HttpExporterBuilder builder = - new HttpExporterBuilder<>("otlp", "span", "http://localhost:4318/v1/traces"); - - @Test - void compressionDefault() { - HttpExporter exporter = builder.build(); - try { - assertThat(exporter) - .isInstanceOfSatisfying( - HttpExporter.class, - otlp -> - assertThat(otlp) - .extracting("httpSender") - .isInstanceOf(OkHttpHttpSender.class) - .extracting("compressionEnabled") - .isEqualTo(false)); - } finally { - exporter.shutdown(); - } - } - - @Test - void compressionNone() { - HttpExporter exporter = builder.setCompression("none").build(); - try { - assertThat(exporter) - .isInstanceOfSatisfying( - HttpExporter.class, - otlp -> - assertThat(otlp) - .extracting("httpSender") - .isInstanceOf(OkHttpHttpSender.class) - .extracting("compressionEnabled") - .isEqualTo(false)); - } finally { - exporter.shutdown(); - } - } - - @Test - void compressionGzip() { - HttpExporter exporter = builder.setCompression("gzip").build(); - try { - assertThat(exporter) - .isInstanceOfSatisfying( - HttpExporter.class, - otlp -> - assertThat(otlp) - .extracting("httpSender") - .isInstanceOf(OkHttpHttpSender.class) - .extracting("compressionEnabled") - .isEqualTo(true)); - } finally { - exporter.shutdown(); - } - } - - @Test - void compressionEnabledAndDisabled() { - HttpExporter exporter = - builder.setCompression("gzip").setCompression("none").build(); - try { - assertThat(exporter) - .isInstanceOfSatisfying( - HttpExporter.class, - otlp -> - assertThat(otlp) - .extracting("httpSender") - .isInstanceOf(OkHttpHttpSender.class) - .extracting("compressionEnabled") - .isEqualTo(false)); - } finally { - exporter.shutdown(); - } - } -} diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java index 87da2a18e84..0227826d4cd 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java @@ -34,6 +34,6 @@ void send(OkHttpHttpSender sender, Runnable onSuccess, Runnable onFailure) { @Override OkHttpHttpSender createSender(String endpoint) { return new OkHttpHttpSender( - endpoint, false, "text/plain", 10L, 10L, Collections::emptyMap, null, null, null, null); + endpoint, null, "text/plain", 10L, 10L, Collections::emptyMap, null, null, null, null); } }