Skip to content

Commit

Permalink
Add compressor SPI to support additional compression algos (#5990)
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg authored Jan 2, 2024
1 parent e1b1963 commit 60ae0af
Show file tree
Hide file tree
Showing 20 changed files with 327 additions and 154 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.compression;

import java.io.IOException;
import java.io.OutputStream;
import javax.annotation.concurrent.ThreadSafe;

/**
* An abstraction for compressing messages. Implementation MUST be thread safe as the same instance
* is expected to be used many times and concurrently. Instances are usually singletons.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
@ThreadSafe
public interface Compressor {

/**
* The name of the compressor encoding.
*
* <p>Used to identify the compressor during configuration and to populate the {@code
* Content-Encoding} header.
*/
String getEncoding();

/** Wrap the {@code outputStream} with a compressing output stream. */
OutputStream compress(OutputStream outputStream) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.compression;

/**
* A service provider interface (SPI) for providing {@link Compressor}s.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public interface CompressorProvider {

/** Return the {@link Compressor}. */
Compressor getInstance();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.compression;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;

/**
* Utilities for resolving SPI {@link Compressor}s.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*
* @see CompressorProvider
*/
public final class CompressorUtil {

private static final Map<String, Compressor> compressorRegistry = buildCompressorRegistry();

private CompressorUtil() {}

/** Get list of loaded compressors, named according to {@link Compressor#getEncoding()}. */
public static Set<String> supportedCompressors() {
return Collections.unmodifiableSet(compressorRegistry.keySet());
}

/**
* Resolve the {@link Compressor} with the {@link Compressor#getEncoding()} equal to the {@code
* encoding}.
*
* @throws IllegalArgumentException if no match is found
*/
public static Compressor resolveCompressor(String encoding) {
Compressor compressor = compressorRegistry.get(encoding);
if (compressor == null) {
throw new IllegalArgumentException(
"Could not resolve compressor for encoding \"" + encoding + "\".");
}
return compressor;
}

private static Map<String, Compressor> buildCompressorRegistry() {
Map<String, Compressor> compressors = new HashMap<>();
for (CompressorProvider spi :
ServiceLoader.load(CompressorProvider.class, CompressorUtil.class.getClassLoader())) {
Compressor compressor = spi.getInstance();
compressors.put(compressor.getEncoding(), compressor);
}
// Hardcode gzip compressor
compressors.put(GzipCompressor.getInstance().getEncoding(), GzipCompressor.getInstance());
return compressors;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.compression;

import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.GZIPOutputStream;

/**
* Gzip {@link Compressor}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class GzipCompressor implements Compressor {

private static final GzipCompressor INSTANCE = new GzipCompressor();

private GzipCompressor() {}

public static GzipCompressor getInstance() {
return INSTANCE;
}

@Override
public String getEncoding() {
return "gzip";
}

@Override
public OutputStream compress(OutputStream outputStream) throws IOException {
return new GZIPOutputStream(outputStream);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.internal.TlsConfigHelper;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.net.URI;
Expand All @@ -19,6 +20,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -48,8 +50,8 @@ public final class HttpExporterBuilder<T extends Marshaler> {
private String endpoint;

private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS);
@Nullable private Compressor compressor;
private long connectTimeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_CONNECT_TIMEOUT_SECS);
private boolean compressionEnabled = false;
private boolean exportAsJson = false;
private final Map<String, String> constantHeaders = new HashMap<>();
private Supplier<Map<String, String>> headerSupplier = Collections::emptyMap;
Expand Down Expand Up @@ -82,8 +84,8 @@ public HttpExporterBuilder<T> setEndpoint(String endpoint) {
return this;
}

public HttpExporterBuilder<T> setCompression(String compressionMethod) {
this.compressionEnabled = compressionMethod.equals("gzip");
public HttpExporterBuilder<T> setCompression(@Nullable Compressor compressor) {
this.compressor = compressor;
return this;
}

Expand Down Expand Up @@ -141,7 +143,7 @@ public HttpExporterBuilder<T> copy() {
copy.timeoutNanos = timeoutNanos;
copy.connectTimeoutNanos = connectTimeoutNanos;
copy.exportAsJson = exportAsJson;
copy.compressionEnabled = compressionEnabled;
copy.compressor = compressor;
copy.constantHeaders.putAll(constantHeaders);
copy.headerSupplier = headerSupplier;
copy.tlsConfigHelper = tlsConfigHelper.copy();
Expand Down Expand Up @@ -179,7 +181,7 @@ public HttpExporter<T> build() {
HttpSender httpSender =
httpSenderProvider.createSender(
endpoint,
compressionEnabled,
compressor,
exportAsJson ? "application/json" : "application/x-protobuf",
timeoutNanos,
connectTimeoutNanos,
Expand All @@ -202,8 +204,10 @@ public String toString(boolean includePrefixAndSuffix) {
joiner.add("type=" + type);
joiner.add("endpoint=" + endpoint);
joiner.add("timeoutNanos=" + timeoutNanos);
joiner.add(
"compressorEncoding="
+ Optional.ofNullable(compressor).map(Compressor::getEncoding).orElse(null));
joiner.add("connectTimeoutNanos=" + connectTimeoutNanos);
joiner.add("compressionEnabled=" + compressionEnabled);
joiner.add("exportAsJson=" + exportAsJson);
StringJoiner headersJoiner = new StringJoiner(", ", "Headers{", "}");
constantHeaders.forEach((key, value) -> headersJoiner.add(key + "=OBFUSCATED"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.exporter.internal.http;

import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.util.List;
import java.util.Map;
Expand All @@ -27,7 +28,7 @@ public interface HttpSenderProvider {
@SuppressWarnings("TooManyParameters")
HttpSender createSender(
String endpoint,
boolean compressionEnabled,
@Nullable Compressor compressor,
String contentType,
long timeoutNanos,
long connectTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@

import static io.opentelemetry.api.internal.Utils.checkArgument;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.compression.CompressorUtil;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler;
import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.net.ssl.SSLContext;
Expand Down Expand Up @@ -101,10 +104,16 @@ public OtlpHttpLogRecordExporterBuilder setEndpoint(String endpoint) {
*/
public OtlpHttpLogRecordExporterBuilder setCompression(String compressionMethod) {
requireNonNull(compressionMethod, "compressionMethod");
if (compressionMethod.equals("none")) {
delegate.setCompression(null);
return this;
}
Set<String> supportedCompressionMethods = CompressorUtil.supportedCompressors();
checkArgument(
compressionMethod.equals("gzip") || compressionMethod.equals("none"),
"Unsupported compression method. Supported compression methods include: gzip, none.");
delegate.setCompression(compressionMethod);
supportedCompressionMethods.contains(compressionMethod),
"Unsupported compressionMethod. Compression method must be \"none\" or one of: "
+ supportedCompressionMethods.stream().collect(joining(",", "[", "]")));
delegate.setCompression(CompressorUtil.resolveCompressor(compressionMethod));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

import static io.opentelemetry.api.internal.Utils.checkArgument;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;

import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.compression.CompressorUtil;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler;
import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent;
Expand All @@ -19,6 +21,7 @@
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.net.ssl.SSLContext;
Expand Down Expand Up @@ -113,10 +116,16 @@ public OtlpHttpMetricExporterBuilder setEndpoint(String endpoint) {
*/
public OtlpHttpMetricExporterBuilder setCompression(String compressionMethod) {
requireNonNull(compressionMethod, "compressionMethod");
if (compressionMethod.equals("none")) {
delegate.setCompression(null);
return this;
}
Set<String> supportedCompressionMethods = CompressorUtil.supportedCompressors();
checkArgument(
compressionMethod.equals("gzip") || compressionMethod.equals("none"),
"Unsupported compression method. Supported compression methods include: gzip, none.");
delegate.setCompression(compressionMethod);
supportedCompressionMethods.contains(compressionMethod),
"Unsupported compressionMethod. Compression method must be \"none\" or one of: "
+ supportedCompressionMethods.stream().collect(joining(",", "[", "]")));
delegate.setCompression(CompressorUtil.resolveCompressor(compressionMethod));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@

import static io.opentelemetry.api.internal.Utils.checkArgument;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.compression.CompressorUtil;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.net.ssl.SSLContext;
Expand Down Expand Up @@ -101,10 +104,16 @@ public OtlpHttpSpanExporterBuilder setEndpoint(String endpoint) {
*/
public OtlpHttpSpanExporterBuilder setCompression(String compressionMethod) {
requireNonNull(compressionMethod, "compressionMethod");
if (compressionMethod.equals("none")) {
delegate.setCompression(null);
return this;
}
Set<String> supportedCompressionMethods = CompressorUtil.supportedCompressors();
checkArgument(
compressionMethod.equals("gzip") || compressionMethod.equals("none"),
"Unsupported compression method. Supported compression methods include: gzip, none.");
delegate.setCompression(compressionMethod);
supportedCompressionMethods.contains(compressionMethod),
"Unsupported compressionMethod. Compression method must be \"none\" or one of: "
+ supportedCompressionMethods.stream().collect(joining(",", "[", "]")));
delegate.setCompression(CompressorUtil.resolveCompressor(compressionMethod));
return this;
}

Expand Down
Loading

0 comments on commit 60ae0af

Please sign in to comment.