From 0a9414ef3bee8478fe53872a31eb57ed31ad70d5 Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Fri, 3 Nov 2023 14:40:23 -0500 Subject: [PATCH] Serialize AnyValue body in OTLP LogMarshaler --- .../internal/marshal/JsonSerializer.java | 2 +- .../internal/marshal/ProtoSerializer.java | 2 +- .../exporter/internal/marshal/Serializer.java | 2 +- exporters/otlp/common/build.gradle.kts | 1 + .../internal/otlp/AnyValueMarshaler.java | 44 ++++ .../internal/otlp/ArrayAnyValueMarshaler.java | 84 ++++++++ .../internal/otlp/BoolAnyValueMarshaler.java | 37 ++++ .../internal/otlp/BytesAnyValueMarshaler.java | 40 ++++ .../otlp/DoubleAnyValueMarshaler.java | 37 ++++ .../otlp/InstrumentationScopeMarshaler.java | 3 +- .../internal/otlp/IntAnyValueMarshaler.java | 37 ++++ .../otlp/KeyValueListAnyValueMarshaler.java | 63 ++++++ .../internal/otlp/KeyValueMarshaler.java | 198 +++--------------- .../internal/otlp/ResourceMarshaler.java | 3 +- .../otlp/StringAnyValueMarshaler.java | 9 +- .../internal/otlp/logs/LogMarshaler.java | 28 ++- .../otlp/metrics/ExemplarMarshaler.java | 2 +- ...xponentialHistogramDataPointMarshaler.java | 2 +- .../metrics/HistogramDataPointMarshaler.java | 2 +- .../metrics/NumberDataPointMarshaler.java | 2 +- .../metrics/SummaryDataPointMarshaler.java | 10 +- .../otlp/traces/SpanEventMarshaler.java | 2 +- .../otlp/traces/SpanLinkMarshaler.java | 2 +- .../internal/otlp/traces/SpanMarshaler.java | 2 +- .../internal/otlp/AnyValueMarshalerTest.java | 171 +++++++++++++++ integration-tests/otlp/build.gradle.kts | 1 + .../OtlpExporterIntegrationTest.java | 120 ++++++++++- .../sdk/logs/SdkLogRecordBuilder.java | 2 +- 28 files changed, 711 insertions(+), 197 deletions(-) create mode 100644 exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/AnyValueMarshaler.java create mode 100644 exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/ArrayAnyValueMarshaler.java create mode 100644 exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/BoolAnyValueMarshaler.java create mode 100644 exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/BytesAnyValueMarshaler.java create mode 100644 exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/DoubleAnyValueMarshaler.java create mode 100644 exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/IntAnyValueMarshaler.java create mode 100644 exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/KeyValueListAnyValueMarshaler.java create mode 100644 exporters/otlp/common/src/test/java/io/opentelemetry/exporter/internal/otlp/AnyValueMarshalerTest.java diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/JsonSerializer.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/JsonSerializer.java index ec805fc1fbf..f4745e68cba 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/JsonSerializer.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/JsonSerializer.java @@ -109,7 +109,7 @@ public void writeString(ProtoFieldInfo field, byte[] utf8Bytes) throws IOExcepti } @Override - protected void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException { + public void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException { generator.writeBinaryField(field.getJsonName(), value); } diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/ProtoSerializer.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/ProtoSerializer.java index f34c5690edd..16015fe6bbf 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/ProtoSerializer.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/ProtoSerializer.java @@ -123,7 +123,7 @@ public void writeString(ProtoFieldInfo field, byte[] utf8Bytes) throws IOExcepti } @Override - protected void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException { + public void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException { output.writeUInt32NoTag(field.getTag()); output.writeByteArrayNoTag(value); } diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/Serializer.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/Serializer.java index e3f36e6f98a..622201579a9 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/Serializer.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/Serializer.java @@ -177,7 +177,7 @@ public void serializeBytes(ProtoFieldInfo field, byte[] value) throws IOExceptio writeBytes(field, value); } - protected abstract void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException; + public abstract void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException; protected abstract void writeStartMessage(ProtoFieldInfo field, int protoMessageSize) throws IOException; diff --git a/exporters/otlp/common/build.gradle.kts b/exporters/otlp/common/build.gradle.kts index 35b7a1b2eb7..1baf390fc34 100644 --- a/exporters/otlp/common/build.gradle.kts +++ b/exporters/otlp/common/build.gradle.kts @@ -16,6 +16,7 @@ dependencies { protoSource("io.opentelemetry.proto:opentelemetry-proto:${versions["io.opentelemetry.proto"]}") api(project(":exporters:common")) + implementation(project(":extensions:incubator")) compileOnly(project(":sdk:metrics")) compileOnly(project(":sdk:trace")) diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/AnyValueMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/AnyValueMarshaler.java new file mode 100644 index 00000000000..2bd103a154a --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/AnyValueMarshaler.java @@ -0,0 +1,44 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.otlp; + +import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize; +import io.opentelemetry.extension.incubator.logs.AnyValue; +import io.opentelemetry.extension.incubator.logs.KeyAnyValue; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * Utility methods for obtaining AnyValue marshaler. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class AnyValueMarshaler { + + private AnyValueMarshaler() {} + + @SuppressWarnings("unchecked") + public static MarshalerWithSize create(AnyValue anyValue) { + switch (anyValue.getType()) { + case STRING: + return StringAnyValueMarshaler.create((String) anyValue.getValue()); + case BOOLEAN: + return BoolAnyValueMarshaler.create((boolean) anyValue.getValue()); + case LONG: + return IntAnyValueMarshaler.create((long) anyValue.getValue()); + case DOUBLE: + return DoubleAnyValueMarshaler.create((double) anyValue.getValue()); + case ARRAY: + return ArrayAnyValueMarshaler.createAnyValue((List>) anyValue.getValue()); + case KEY_VALUE_LIST: + return KeyValueListAnyValueMarshaler.create((List) anyValue.getValue()); + case BYTES: + return BytesAnyValueMarshaler.create((ByteBuffer) anyValue.getValue()); + } + throw new IllegalArgumentException("Unsupported AnyValue type: " + anyValue.getType()); + } +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/ArrayAnyValueMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/ArrayAnyValueMarshaler.java new file mode 100644 index 00000000000..e4184086468 --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/ArrayAnyValueMarshaler.java @@ -0,0 +1,84 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.otlp; + +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.exporter.internal.marshal.MarshalerUtil; +import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize; +import io.opentelemetry.exporter.internal.marshal.Serializer; +import io.opentelemetry.proto.common.v1.internal.AnyValue; +import io.opentelemetry.proto.common.v1.internal.ArrayValue; +import java.io.IOException; +import java.util.List; +import java.util.function.Function; + +final class ArrayAnyValueMarshaler extends MarshalerWithSize { + private final Marshaler value; + + private ArrayAnyValueMarshaler(ArrayValueMarshaler value) { + super(calculateSize(value)); + this.value = value; + } + + static MarshalerWithSize createAnyValue( + List> values) { + return createInternal(values, AnyValueMarshaler::create); + } + + static MarshalerWithSize createString(List values) { + return createInternal(values, StringAnyValueMarshaler::create); + } + + static MarshalerWithSize createBool(List values) { + return createInternal(values, BoolAnyValueMarshaler::create); + } + + static MarshalerWithSize createInt(List values) { + return createInternal(values, IntAnyValueMarshaler::create); + } + + static MarshalerWithSize createDouble(List values) { + return createInternal(values, DoubleAnyValueMarshaler::create); + } + + private static MarshalerWithSize createInternal( + List values, Function initializer) { + int len = values.size(); + Marshaler[] marshalers = new Marshaler[len]; + for (int i = 0; i < len; i++) { + marshalers[i] = initializer.apply(values.get(i)); + } + return new ArrayAnyValueMarshaler(new ArrayValueMarshaler(marshalers)); + } + + @Override + public void writeTo(Serializer output) throws IOException { + output.serializeMessage(AnyValue.ARRAY_VALUE, value); + } + + private static int calculateSize(Marshaler value) { + return MarshalerUtil.sizeMessage(AnyValue.ARRAY_VALUE, value); + } + + private static class ArrayValueMarshaler extends MarshalerWithSize { + + private final Marshaler[] values; + + private ArrayValueMarshaler(Marshaler[] values) { + super(calculateSize(values)); + this.values = values; + } + + @Override + public void writeTo(Serializer output) throws IOException { + output.serializeRepeatedMessage(ArrayValue.VALUES, values); + } + + private static int calculateSize(Marshaler[] values) { + return MarshalerUtil.sizeRepeatedMessage(ArrayValue.VALUES, values); + } + } +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/BoolAnyValueMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/BoolAnyValueMarshaler.java new file mode 100644 index 00000000000..2293c0c0e58 --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/BoolAnyValueMarshaler.java @@ -0,0 +1,37 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.otlp; + +import io.opentelemetry.exporter.internal.marshal.CodedOutputStream; +import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize; +import io.opentelemetry.exporter.internal.marshal.Serializer; +import io.opentelemetry.proto.common.v1.internal.AnyValue; +import java.io.IOException; + +final class BoolAnyValueMarshaler extends MarshalerWithSize { + + private final boolean value; + + private BoolAnyValueMarshaler(boolean value) { + super(calculateSize(value)); + this.value = value; + } + + static MarshalerWithSize create(boolean value) { + return new BoolAnyValueMarshaler(value); + } + + @Override + public void writeTo(Serializer output) throws IOException { + // Do not call serialize* method because we always have to write the message tag even if the + // value is empty since it's a oneof. + output.writeBool(AnyValue.BOOL_VALUE, value); + } + + private static int calculateSize(boolean value) { + return AnyValue.BOOL_VALUE.getTagSize() + CodedOutputStream.computeBoolSizeNoTag(value); + } +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/BytesAnyValueMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/BytesAnyValueMarshaler.java new file mode 100644 index 00000000000..d0a781039be --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/BytesAnyValueMarshaler.java @@ -0,0 +1,40 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.otlp; + +import io.opentelemetry.exporter.internal.marshal.CodedOutputStream; +import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize; +import io.opentelemetry.exporter.internal.marshal.Serializer; +import io.opentelemetry.proto.common.v1.internal.AnyValue; +import java.io.IOException; +import java.nio.ByteBuffer; + +final class BytesAnyValueMarshaler extends MarshalerWithSize { + + private final byte[] value; + + private BytesAnyValueMarshaler(byte[] value) { + super(calculateSize(value)); + this.value = value; + } + + static MarshalerWithSize create(ByteBuffer value) { + byte[] bytes = new byte[value.remaining()]; + value.get(bytes); + return new BytesAnyValueMarshaler(bytes); + } + + @Override + public void writeTo(Serializer output) throws IOException { + // Do not call serialize* method because we always have to write the message tag even if the + // value is empty since it's a oneof. + output.writeBytes(AnyValue.BYTES_VALUE, value); + } + + private static int calculateSize(byte[] value) { + return AnyValue.BYTES_VALUE.getTagSize() + CodedOutputStream.computeByteArraySizeNoTag(value); + } +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/DoubleAnyValueMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/DoubleAnyValueMarshaler.java new file mode 100644 index 00000000000..5837976c92d --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/DoubleAnyValueMarshaler.java @@ -0,0 +1,37 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.otlp; + +import io.opentelemetry.exporter.internal.marshal.CodedOutputStream; +import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize; +import io.opentelemetry.exporter.internal.marshal.Serializer; +import io.opentelemetry.proto.common.v1.internal.AnyValue; +import java.io.IOException; + +final class DoubleAnyValueMarshaler extends MarshalerWithSize { + + private final double value; + + private DoubleAnyValueMarshaler(double value) { + super(calculateSize(value)); + this.value = value; + } + + static MarshalerWithSize create(double value) { + return new DoubleAnyValueMarshaler(value); + } + + @Override + public void writeTo(Serializer output) throws IOException { + // Do not call serialize* method because we always have to write the message tag even if the + // value is empty since it's a oneof. + output.writeDouble(AnyValue.DOUBLE_VALUE, value); + } + + private static int calculateSize(double value) { + return AnyValue.DOUBLE_VALUE.getTagSize() + CodedOutputStream.computeDoubleSizeNoTag(value); + } +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/InstrumentationScopeMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/InstrumentationScopeMarshaler.java index 1d0a1d67349..c5466cad479 100644 --- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/InstrumentationScopeMarshaler.java +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/InstrumentationScopeMarshaler.java @@ -37,7 +37,8 @@ public static InstrumentationScopeMarshaler create(InstrumentationScopeInfo scop // a few times until the cache gets filled which is fine. byte[] name = MarshalerUtil.toBytes(scopeInfo.getName()); byte[] version = MarshalerUtil.toBytes(scopeInfo.getVersion()); - KeyValueMarshaler[] attributes = KeyValueMarshaler.createRepeated(scopeInfo.getAttributes()); + KeyValueMarshaler[] attributes = + KeyValueMarshaler.createForAttributes(scopeInfo.getAttributes()); RealInstrumentationScopeMarshaler realMarshaler = new RealInstrumentationScopeMarshaler(name, version, attributes); diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/IntAnyValueMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/IntAnyValueMarshaler.java new file mode 100644 index 00000000000..498c60e76bb --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/IntAnyValueMarshaler.java @@ -0,0 +1,37 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.otlp; + +import io.opentelemetry.exporter.internal.marshal.CodedOutputStream; +import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize; +import io.opentelemetry.exporter.internal.marshal.Serializer; +import io.opentelemetry.proto.common.v1.internal.AnyValue; +import java.io.IOException; + +final class IntAnyValueMarshaler extends MarshalerWithSize { + + private final long value; + + private IntAnyValueMarshaler(long value) { + super(calculateSize(value)); + this.value = value; + } + + static MarshalerWithSize create(long value) { + return new IntAnyValueMarshaler(value); + } + + @Override + public void writeTo(Serializer output) throws IOException { + // Do not call serialize* method because we always have to write the message tag even if the + // value is empty since it's a oneof. + output.writeInt64(AnyValue.INT_VALUE, value); + } + + private static int calculateSize(long value) { + return AnyValue.INT_VALUE.getTagSize() + CodedOutputStream.computeInt64SizeNoTag(value); + } +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/KeyValueListAnyValueMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/KeyValueListAnyValueMarshaler.java new file mode 100644 index 00000000000..823c28226c8 --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/KeyValueListAnyValueMarshaler.java @@ -0,0 +1,63 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.otlp; + +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.exporter.internal.marshal.MarshalerUtil; +import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize; +import io.opentelemetry.exporter.internal.marshal.Serializer; +import io.opentelemetry.extension.incubator.logs.KeyAnyValue; +import io.opentelemetry.proto.common.v1.internal.AnyValue; +import io.opentelemetry.proto.common.v1.internal.KeyValueList; +import java.io.IOException; +import java.util.List; + +final class KeyValueListAnyValueMarshaler extends MarshalerWithSize { + + private final Marshaler value; + + private KeyValueListAnyValueMarshaler(KeyValueListMarshaler value) { + super(calculateSize(value)); + this.value = value; + } + + static MarshalerWithSize create(List values) { + int len = values.size(); + KeyValueMarshaler[] marshalers = new KeyValueMarshaler[values.size()]; + for (int i = 0; i < len; i++) { + marshalers[i] = KeyValueMarshaler.createForKeyAnyValue(values.get(i)); + } + return new KeyValueListAnyValueMarshaler(new KeyValueListMarshaler(marshalers)); + } + + @Override + public void writeTo(Serializer output) throws IOException { + output.serializeMessage(AnyValue.KVLIST_VALUE, value); + } + + private static int calculateSize(Marshaler value) { + return MarshalerUtil.sizeMessage(AnyValue.KVLIST_VALUE, value); + } + + private static class KeyValueListMarshaler extends MarshalerWithSize { + + private final Marshaler[] values; + + private KeyValueListMarshaler(KeyValueMarshaler[] values) { + super(calculateSize(values)); + this.values = values; + } + + @Override + public void writeTo(Serializer output) throws IOException { + output.serializeRepeatedMessage(KeyValueList.VALUES, values); + } + + private static int calculateSize(Marshaler[] values) { + return MarshalerUtil.sizeRepeatedMessage(KeyValueList.VALUES, values); + } + } +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/KeyValueMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/KeyValueMarshaler.java index 18f151a8f0c..1db0dc2edd5 100644 --- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/KeyValueMarshaler.java +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/KeyValueMarshaler.java @@ -8,13 +8,11 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.internal.InternalAttributeKeyImpl; -import io.opentelemetry.exporter.internal.marshal.CodedOutputStream; import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.exporter.internal.marshal.MarshalerUtil; import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize; import io.opentelemetry.exporter.internal.marshal.Serializer; -import io.opentelemetry.proto.common.v1.internal.AnyValue; -import io.opentelemetry.proto.common.v1.internal.ArrayValue; +import io.opentelemetry.extension.incubator.logs.KeyAnyValue; import io.opentelemetry.proto.common.v1.internal.KeyValue; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -22,38 +20,52 @@ import java.util.function.BiConsumer; /** - * A Marshaler of {@link Attributes}. + * A Marshaler of key value pairs. * *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. */ public final class KeyValueMarshaler extends MarshalerWithSize { + private static final byte[] EMPTY_BYTES = new byte[0]; private static final KeyValueMarshaler[] EMPTY_REPEATED = new KeyValueMarshaler[0]; + private final byte[] keyUtf8; + private final Marshaler value; + + private KeyValueMarshaler(byte[] keyUtf8, Marshaler value) { + super(calculateSize(keyUtf8, value)); + this.keyUtf8 = keyUtf8; + this.value = value; + } + + /** Returns Marshaler for the given KeyAnyValue. */ + public static KeyValueMarshaler createForKeyAnyValue(KeyAnyValue keyAnyValue) { + return new KeyValueMarshaler( + keyAnyValue.getKey().getBytes(StandardCharsets.UTF_8), + AnyValueMarshaler.create(keyAnyValue.getAnyValue())); + } + /** Returns Marshalers for the given Attributes. */ @SuppressWarnings("AvoidObjectArrays") - public static KeyValueMarshaler[] createRepeated(Attributes attributes) { + public static KeyValueMarshaler[] createForAttributes(Attributes attributes) { if (attributes.isEmpty()) { return EMPTY_REPEATED; } - KeyValueMarshaler[] attributeMarshalers = new KeyValueMarshaler[attributes.size()]; + KeyValueMarshaler[] marshalers = new KeyValueMarshaler[attributes.size()]; attributes.forEach( new BiConsumer, Object>() { int index = 0; @Override public void accept(AttributeKey attributeKey, Object o) { - attributeMarshalers[index++] = KeyValueMarshaler.create(attributeKey, o); + marshalers[index++] = create(attributeKey, o); } }); - return attributeMarshalers; + return marshalers; } - private final byte[] keyUtf8; - private final Marshaler value; - @SuppressWarnings("unchecked") private static KeyValueMarshaler create(AttributeKey attributeKey, Object value) { byte[] keyUtf8; @@ -66,42 +78,30 @@ private static KeyValueMarshaler create(AttributeKey attributeKey, Object val } switch (attributeKey.getType()) { case STRING: - return new KeyValueMarshaler( - keyUtf8, new StringAnyValueMarshaler(MarshalerUtil.toBytes((String) value))); + return new KeyValueMarshaler(keyUtf8, StringAnyValueMarshaler.create((String) value)); case LONG: - return new KeyValueMarshaler(keyUtf8, new Int64AnyValueMarshaler((long) value)); + return new KeyValueMarshaler(keyUtf8, IntAnyValueMarshaler.create((long) value)); case BOOLEAN: - return new KeyValueMarshaler(keyUtf8, new BoolAnyValueMarshaler((boolean) value)); + return new KeyValueMarshaler(keyUtf8, BoolAnyValueMarshaler.create((boolean) value)); case DOUBLE: - return new KeyValueMarshaler(keyUtf8, new AnyDoubleFieldMarshaler((double) value)); + return new KeyValueMarshaler(keyUtf8, DoubleAnyValueMarshaler.create((double) value)); case STRING_ARRAY: return new KeyValueMarshaler( - keyUtf8, - new ArrayAnyValueMarshaler(ArrayValueMarshaler.createString((List) value))); + keyUtf8, ArrayAnyValueMarshaler.createString((List) value)); case LONG_ARRAY: - return new KeyValueMarshaler( - keyUtf8, - new ArrayAnyValueMarshaler(ArrayValueMarshaler.createInt64((List) value))); + return new KeyValueMarshaler(keyUtf8, ArrayAnyValueMarshaler.createInt((List) value)); case BOOLEAN_ARRAY: return new KeyValueMarshaler( - keyUtf8, - new ArrayAnyValueMarshaler(ArrayValueMarshaler.createBool((List) value))); + keyUtf8, ArrayAnyValueMarshaler.createBool((List) value)); case DOUBLE_ARRAY: return new KeyValueMarshaler( - keyUtf8, - new ArrayAnyValueMarshaler(ArrayValueMarshaler.createDouble((List) value))); + keyUtf8, ArrayAnyValueMarshaler.createDouble((List) value)); } // Error prone ensures the switch statement is complete, otherwise only can happen with // unaligned versions which are not supported. throw new IllegalArgumentException("Unsupported attribute type."); } - private KeyValueMarshaler(byte[] keyUtf8, Marshaler value) { - super(calculateSize(keyUtf8, value)); - this.keyUtf8 = keyUtf8; - this.value = value; - } - @Override public void writeTo(Serializer output) throws IOException { output.serializeString(KeyValue.KEY, keyUtf8); @@ -114,140 +114,4 @@ private static int calculateSize(byte[] keyUtf8, Marshaler value) { size += MarshalerUtil.sizeMessage(KeyValue.VALUE, value); return size; } - - private static class BoolAnyValueMarshaler extends MarshalerWithSize { - - private final boolean value; - - BoolAnyValueMarshaler(boolean value) { - super(calculateSize(value)); - this.value = value; - } - - @Override - public void writeTo(Serializer output) throws IOException { - // Do not call serialize* method because we always have to write the message tag even if the - // value is empty since it's a oneof. - output.writeBool(AnyValue.BOOL_VALUE, value); - } - - private static int calculateSize(boolean value) { - return AnyValue.BOOL_VALUE.getTagSize() + CodedOutputStream.computeBoolSizeNoTag(value); - } - } - - private static class Int64AnyValueMarshaler extends MarshalerWithSize { - - private final long value; - - Int64AnyValueMarshaler(long value) { - super(calculateSize(value)); - this.value = value; - } - - @Override - public void writeTo(Serializer output) throws IOException { - // Do not call serialize* method because we always have to write the message tag even if the - // value is empty since it's a oneof. - output.writeInt64(AnyValue.INT_VALUE, value); - } - - private static int calculateSize(long value) { - return AnyValue.INT_VALUE.getTagSize() + CodedOutputStream.computeInt64SizeNoTag(value); - } - } - - private static class AnyDoubleFieldMarshaler extends MarshalerWithSize { - - private final double value; - - AnyDoubleFieldMarshaler(double value) { - super(calculateSize(value)); - this.value = value; - } - - @Override - public void writeTo(Serializer output) throws IOException { - // Do not call serialize* method because we always have to write the message tag even if the - // value is empty since it's a oneof. - output.writeDouble(AnyValue.DOUBLE_VALUE, value); - } - - private static int calculateSize(double value) { - return AnyValue.DOUBLE_VALUE.getTagSize() + CodedOutputStream.computeDoubleSizeNoTag(value); - } - } - - private static class ArrayAnyValueMarshaler extends MarshalerWithSize { - private final Marshaler value; - - private ArrayAnyValueMarshaler(Marshaler value) { - super(calculateSize(value)); - this.value = value; - } - - @Override - public void writeTo(Serializer output) throws IOException { - output.serializeMessage(AnyValue.ARRAY_VALUE, value); - } - - private static int calculateSize(Marshaler value) { - return MarshalerUtil.sizeMessage(AnyValue.ARRAY_VALUE, value); - } - } - - private static class ArrayValueMarshaler extends MarshalerWithSize { - - static ArrayValueMarshaler createString(List values) { - int len = values.size(); - Marshaler[] marshalers = new StringAnyValueMarshaler[len]; - for (int i = 0; i < len; i++) { - marshalers[i] = new StringAnyValueMarshaler(values.get(i).getBytes(StandardCharsets.UTF_8)); - } - return new ArrayValueMarshaler(marshalers); - } - - static ArrayValueMarshaler createBool(List values) { - int len = values.size(); - Marshaler[] marshalers = new BoolAnyValueMarshaler[len]; - for (int i = 0; i < len; i++) { - marshalers[i] = new BoolAnyValueMarshaler(values.get(i)); - } - return new ArrayValueMarshaler(marshalers); - } - - static ArrayValueMarshaler createInt64(List values) { - int len = values.size(); - Marshaler[] marshalers = new Int64AnyValueMarshaler[len]; - for (int i = 0; i < len; i++) { - marshalers[i] = new Int64AnyValueMarshaler(values.get(i)); - } - return new ArrayValueMarshaler(marshalers); - } - - static ArrayValueMarshaler createDouble(List values) { - int len = values.size(); - Marshaler[] marshalers = new AnyDoubleFieldMarshaler[len]; - for (int i = 0; i < len; i++) { - marshalers[i] = new AnyDoubleFieldMarshaler(values.get(i)); - } - return new ArrayValueMarshaler(marshalers); - } - - private final Marshaler[] values; - - private ArrayValueMarshaler(Marshaler[] values) { - super(calculateSize(values)); - this.values = values; - } - - @Override - public void writeTo(Serializer output) throws IOException { - output.serializeRepeatedMessage(ArrayValue.VALUES, values); - } - - private static int calculateSize(Marshaler[] values) { - return MarshalerUtil.sizeRepeatedMessage(ArrayValue.VALUES, values); - } - } } diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/ResourceMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/ResourceMarshaler.java index 259f0070000..b3395448a79 100644 --- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/ResourceMarshaler.java +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/ResourceMarshaler.java @@ -36,7 +36,8 @@ public static ResourceMarshaler create(io.opentelemetry.sdk.resources.Resource r // a few times until the cache gets filled which is fine. RealResourceMarshaler realMarshaler = - new RealResourceMarshaler(KeyValueMarshaler.createRepeated(resource.getAttributes())); + new RealResourceMarshaler( + KeyValueMarshaler.createForAttributes(resource.getAttributes())); ByteArrayOutputStream binaryBos = new ByteArrayOutputStream(realMarshaler.getBinarySerializedSize()); diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/StringAnyValueMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/StringAnyValueMarshaler.java index 81e1ab2c7be..e62c55d2da1 100644 --- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/StringAnyValueMarshaler.java +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/StringAnyValueMarshaler.java @@ -6,6 +6,7 @@ package io.opentelemetry.exporter.internal.otlp; import io.opentelemetry.exporter.internal.marshal.CodedOutputStream; +import io.opentelemetry.exporter.internal.marshal.MarshalerUtil; import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize; import io.opentelemetry.exporter.internal.marshal.Serializer; import io.opentelemetry.proto.common.v1.internal.AnyValue; @@ -17,15 +18,19 @@ *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. */ -public final class StringAnyValueMarshaler extends MarshalerWithSize { +final class StringAnyValueMarshaler extends MarshalerWithSize { private final byte[] valueUtf8; - public StringAnyValueMarshaler(byte[] valueUtf8) { + private StringAnyValueMarshaler(byte[] valueUtf8) { super(calculateSize(valueUtf8)); this.valueUtf8 = valueUtf8; } + static MarshalerWithSize create(String value) { + return new StringAnyValueMarshaler(MarshalerUtil.toBytes(value)); + } + @Override public void writeTo(Serializer output) throws IOException { // Do not call serialize* method because we always have to write the message tag even if the diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/logs/LogMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/logs/LogMarshaler.java index 7a68041e5d1..863b27b6442 100644 --- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/logs/LogMarshaler.java +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/logs/LogMarshaler.java @@ -14,17 +14,22 @@ import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize; import io.opentelemetry.exporter.internal.marshal.ProtoEnumInfo; import io.opentelemetry.exporter.internal.marshal.Serializer; +import io.opentelemetry.exporter.internal.otlp.AnyValueMarshaler; import io.opentelemetry.exporter.internal.otlp.KeyValueMarshaler; -import io.opentelemetry.exporter.internal.otlp.StringAnyValueMarshaler; +import io.opentelemetry.extension.incubator.logs.AnyValue; import io.opentelemetry.proto.logs.v1.internal.LogRecord; import io.opentelemetry.proto.logs.v1.internal.SeverityNumber; +import io.opentelemetry.sdk.logs.data.Body; import io.opentelemetry.sdk.logs.data.LogRecordData; +import io.opentelemetry.sdk.logs.internal.AnyValueBody; import java.io.IOException; import javax.annotation.Nullable; final class LogMarshaler extends MarshalerWithSize { private static final String INVALID_TRACE_ID = TraceId.getInvalid(); private static final String INVALID_SPAN_ID = SpanId.getInvalid(); + private static final MarshalerWithSize EMPTY_BODY_MARSHALER = + AnyValueMarshaler.create(AnyValue.of("")); private final long timeUnixNano; private final long observedTimeUnixNano; @@ -39,11 +44,9 @@ final class LogMarshaler extends MarshalerWithSize { static LogMarshaler create(LogRecordData logRecordData) { KeyValueMarshaler[] attributeMarshalers = - KeyValueMarshaler.createRepeated(logRecordData.getAttributes()); + KeyValueMarshaler.createForAttributes(logRecordData.getAttributes()); - // TODO(jack-berg): handle AnyValue log body - StringAnyValueMarshaler anyValueMarshaler = - new StringAnyValueMarshaler(MarshalerUtil.toBytes(logRecordData.getBody().asString())); + MarshalerWithSize bodyMarshaler = body(logRecordData.getBody()); SpanContext spanContext = logRecordData.getSpanContext(); return new LogMarshaler( @@ -51,7 +54,7 @@ static LogMarshaler create(LogRecordData logRecordData) { logRecordData.getObservedTimestampEpochNanos(), toProtoSeverityNumber(logRecordData.getSeverity()), MarshalerUtil.toBytes(logRecordData.getSeverityText()), - anyValueMarshaler, + bodyMarshaler, attributeMarshalers, logRecordData.getTotalAttributeCount() - logRecordData.getAttributes().size(), spanContext.getTraceFlags(), @@ -59,6 +62,19 @@ static LogMarshaler create(LogRecordData logRecordData) { spanContext.getSpanId().equals(INVALID_SPAN_ID) ? null : spanContext.getSpanId()); } + private static MarshalerWithSize body(Body body) { + if (body instanceof AnyValueBody) { + return AnyValueMarshaler.create(((AnyValueBody) body).asAnyValue()); + } + switch (body.getType()) { + case STRING: + return AnyValueMarshaler.create(AnyValue.of(body.asString())); + case EMPTY: + return EMPTY_BODY_MARSHALER; + } + throw new IllegalStateException("Unsupported Body type: " + body.getType()); + } + private LogMarshaler( long timeUnixNano, long observedTimeUnixNano, diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/ExemplarMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/ExemplarMarshaler.java index ed4956ec9a8..8d16c9e40d7 100644 --- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/ExemplarMarshaler.java +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/ExemplarMarshaler.java @@ -39,7 +39,7 @@ static ExemplarMarshaler[] createRepeated(List exemplars private static ExemplarMarshaler create(ExemplarData exemplar) { KeyValueMarshaler[] attributeMarshalers = - KeyValueMarshaler.createRepeated(exemplar.getFilteredAttributes()); + KeyValueMarshaler.createForAttributes(exemplar.getFilteredAttributes()); ProtoFieldInfo valueField; if (exemplar instanceof LongExemplarData) { diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/ExponentialHistogramDataPointMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/ExponentialHistogramDataPointMarshaler.java index e7ac8dcd68b..071833954ad 100644 --- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/ExponentialHistogramDataPointMarshaler.java +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/ExponentialHistogramDataPointMarshaler.java @@ -83,7 +83,7 @@ private ExponentialHistogramDataPointMarshaler( } static ExponentialHistogramDataPointMarshaler create(ExponentialHistogramPointData point) { - KeyValueMarshaler[] attributes = KeyValueMarshaler.createRepeated(point.getAttributes()); + KeyValueMarshaler[] attributes = KeyValueMarshaler.createForAttributes(point.getAttributes()); ExemplarMarshaler[] exemplars = ExemplarMarshaler.createRepeated(point.getExemplars()); ExponentialHistogramBucketsMarshaler positiveBuckets = diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/HistogramDataPointMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/HistogramDataPointMarshaler.java index b1a027a3455..8022635e868 100644 --- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/HistogramDataPointMarshaler.java +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/HistogramDataPointMarshaler.java @@ -41,7 +41,7 @@ static HistogramDataPointMarshaler[] createRepeated(Collection static NumberDataPointMarshaler create(PointData point) { ExemplarMarshaler[] exemplarMarshalers = ExemplarMarshaler.createRepeated(point.getExemplars()); KeyValueMarshaler[] attributeMarshalers = - KeyValueMarshaler.createRepeated(point.getAttributes()); + KeyValueMarshaler.createForAttributes(point.getAttributes()); ProtoFieldInfo valueField; if (point instanceof LongPointData) { diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/SummaryDataPointMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/SummaryDataPointMarshaler.java index cb38c86ed13..c0b6a1dcded 100644 --- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/SummaryDataPointMarshaler.java +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/SummaryDataPointMarshaler.java @@ -20,7 +20,7 @@ final class SummaryDataPointMarshaler extends MarshalerWithSize { private final long count; private final double sum; private final ValueAtQuantileMarshaler[] quantileValues; - private final KeyValueMarshaler[] attributes; + private final MarshalerWithSize[] attributes; static SummaryDataPointMarshaler[] createRepeated(Collection points) { SummaryDataPointMarshaler[] marshalers = new SummaryDataPointMarshaler[points.size()]; @@ -34,8 +34,8 @@ static SummaryDataPointMarshaler[] createRepeated(Collection p static SummaryDataPointMarshaler create(SummaryPointData point) { ValueAtQuantileMarshaler[] quantileMarshalers = ValueAtQuantileMarshaler.createRepeated(point.getValues()); - KeyValueMarshaler[] attributeMarshalers = - KeyValueMarshaler.createRepeated(point.getAttributes()); + MarshalerWithSize[] attributeMarshalers = + KeyValueMarshaler.createForAttributes(point.getAttributes()); return new SummaryDataPointMarshaler( point.getStartEpochNanos(), @@ -52,7 +52,7 @@ private SummaryDataPointMarshaler( long count, double sum, ValueAtQuantileMarshaler[] quantileValues, - KeyValueMarshaler[] attributes) { + MarshalerWithSize[] attributes) { super(calculateSize(startTimeUnixNano, timeUnixNano, count, sum, quantileValues, attributes)); this.startTimeUnixNano = startTimeUnixNano; this.timeUnixNano = timeUnixNano; @@ -78,7 +78,7 @@ private static int calculateSize( long count, double sum, ValueAtQuantileMarshaler[] quantileValues, - KeyValueMarshaler[] attributes) { + MarshalerWithSize[] attributes) { int size = 0; size += MarshalerUtil.sizeFixed64(SummaryDataPoint.START_TIME_UNIX_NANO, startTimeUnixNano); size += MarshalerUtil.sizeFixed64(SummaryDataPoint.TIME_UNIX_NANO, timeUnixNano); diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/traces/SpanEventMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/traces/SpanEventMarshaler.java index 03b208f3df0..2b2cc1d7775 100644 --- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/traces/SpanEventMarshaler.java +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/traces/SpanEventMarshaler.java @@ -40,7 +40,7 @@ static SpanEventMarshaler create(EventData event) { return new SpanEventMarshaler( event.getEpochNanos(), MarshalerUtil.toBytes(event.getName()), - KeyValueMarshaler.createRepeated(event.getAttributes()), + KeyValueMarshaler.createForAttributes(event.getAttributes()), event.getTotalAttributeCount() - event.getAttributes().size()); } diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/traces/SpanLinkMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/traces/SpanLinkMarshaler.java index 0579fb56e10..6f08b257dce 100644 --- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/traces/SpanLinkMarshaler.java +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/traces/SpanLinkMarshaler.java @@ -52,7 +52,7 @@ static SpanLinkMarshaler create(LinkData link) { link.getSpanContext().getTraceId(), link.getSpanContext().getSpanId(), traceStateUtf8, - KeyValueMarshaler.createRepeated(link.getAttributes()), + KeyValueMarshaler.createForAttributes(link.getAttributes()), link.getTotalAttributeCount() - link.getAttributes().size()); } diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/traces/SpanMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/traces/SpanMarshaler.java index b487f262a3c..7eae7c7b25c 100644 --- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/traces/SpanMarshaler.java +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/traces/SpanMarshaler.java @@ -41,7 +41,7 @@ final class SpanMarshaler extends MarshalerWithSize { // Because SpanMarshaler is always part of a repeated field, it cannot return "null". static SpanMarshaler create(SpanData spanData) { KeyValueMarshaler[] attributeMarshalers = - KeyValueMarshaler.createRepeated(spanData.getAttributes()); + KeyValueMarshaler.createForAttributes(spanData.getAttributes()); SpanEventMarshaler[] spanEventMarshalers = SpanEventMarshaler.createRepeated(spanData.getEvents()); SpanLinkMarshaler[] spanLinkMarshalers = SpanLinkMarshaler.createRepeated(spanData.getLinks()); diff --git a/exporters/otlp/common/src/test/java/io/opentelemetry/exporter/internal/otlp/AnyValueMarshalerTest.java b/exporters/otlp/common/src/test/java/io/opentelemetry/exporter/internal/otlp/AnyValueMarshalerTest.java new file mode 100644 index 00000000000..d52d504e9c4 --- /dev/null +++ b/exporters/otlp/common/src/test/java/io/opentelemetry/exporter/internal/otlp/AnyValueMarshalerTest.java @@ -0,0 +1,171 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.otlp; + +import static io.opentelemetry.extension.incubator.logs.AnyValue.of; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.util.JsonFormat; +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize; +import io.opentelemetry.extension.incubator.logs.KeyAnyValue; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.ArrayValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.common.v1.KeyValueList; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.stream.Stream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +@SuppressWarnings("BadImport") +class AnyValueMarshalerTest { + + @ParameterizedTest + @MethodSource("serializeAnyValueArgs") + void anyValueString( + io.opentelemetry.extension.incubator.logs.AnyValue anyValue, + AnyValue expectedSerializedValue) { + MarshalerWithSize marshaler = AnyValueMarshaler.create(anyValue); + AnyValue serializedValue = parse(AnyValue.getDefaultInstance(), marshaler); + assertThat(serializedValue).isEqualTo(expectedSerializedValue); + } + + private static Stream serializeAnyValueArgs() { + return Stream.of( + // primitives + arguments(of("str"), AnyValue.newBuilder().setStringValue("str").build()), + arguments(of(true), AnyValue.newBuilder().setBoolValue(true).build()), + arguments(of(1), AnyValue.newBuilder().setIntValue(1).build()), + arguments(of(1.1), AnyValue.newBuilder().setDoubleValue(1.1).build()), + // heterogeneous array + arguments( + of(of("str"), of(true), of(1), of(1.1)), + AnyValue.newBuilder() + .setArrayValue( + ArrayValue.newBuilder() + .addValues(AnyValue.newBuilder().setStringValue("str").build()) + .addValues(AnyValue.newBuilder().setBoolValue(true).build()) + .addValues(AnyValue.newBuilder().setIntValue(1).build()) + .addValues(AnyValue.newBuilder().setDoubleValue(1.1).build()) + .build()) + .build()), + // map + arguments( + of(KeyAnyValue.of("key1", of("val1")), KeyAnyValue.of("key2", of(2))), + AnyValue.newBuilder() + .setKvlistValue( + KeyValueList.newBuilder() + .addValues( + KeyValue.newBuilder() + .setKey("key1") + .setValue(AnyValue.newBuilder().setStringValue("val1").build()) + .build()) + .addValues( + KeyValue.newBuilder() + .setKey("key2") + .setValue(AnyValue.newBuilder().setIntValue(2).build()) + .build()) + .build()) + .build()), + // map of maps + arguments( + of( + Collections.singletonMap( + "child", of(Collections.singletonMap("grandchild", of("str"))))), + AnyValue.newBuilder() + .setKvlistValue( + KeyValueList.newBuilder() + .addValues( + KeyValue.newBuilder() + .setKey("child") + .setValue( + AnyValue.newBuilder() + .setKvlistValue( + KeyValueList.newBuilder() + .addValues( + KeyValue.newBuilder() + .setKey("grandchild") + .setValue( + AnyValue.newBuilder() + .setStringValue("str") + .build()) + .build()) + .build()) + .build()) + .build()) + .build()) + .build()), + // bytes + arguments( + of("hello world".getBytes(StandardCharsets.UTF_8)), + AnyValue.newBuilder() + .setBytesValue(ByteString.copyFrom("hello world".getBytes(StandardCharsets.UTF_8))) + .build())); + } + + @SuppressWarnings("unchecked") + private static T parse(T prototype, Marshaler marshaler) { + byte[] serialized = toByteArray(marshaler); + T result; + try { + result = (T) prototype.newBuilderForType().mergeFrom(serialized).build(); + } catch (InvalidProtocolBufferException e) { + throw new UncheckedIOException(e); + } + // Our marshaler should produce the exact same length of serialized output (for example, field + // default values are not outputted), so we check that here. The output itself may have slightly + // different ordering, mostly due to the way we don't output oneof values in field order all the + // tieme. If the lengths are equal and the resulting protos are equal, the marshaling is + // guaranteed to be valid. + assertThat(result.getSerializedSize()).isEqualTo(serialized.length); + + // We don't compare JSON strings due to some differences (particularly serializing enums as + // numbers instead of names). This may improve in the future but what matters is what we produce + // can be parsed. + String json = toJson(marshaler); + Message.Builder builder = prototype.newBuilderForType(); + try { + JsonFormat.parser().merge(json, builder); + } catch (InvalidProtocolBufferException e) { + throw new UncheckedIOException(e); + } + + assertThat(builder.build()).isEqualTo(result); + + return result; + } + + private static byte[] toByteArray(Marshaler marshaler) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try { + marshaler.writeBinaryTo(bos); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return bos.toByteArray(); + } + + private static String toJson(Marshaler marshaler) { + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try { + marshaler.writeJsonTo(bos); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return new String(bos.toByteArray(), StandardCharsets.UTF_8); + } +} diff --git a/integration-tests/otlp/build.gradle.kts b/integration-tests/otlp/build.gradle.kts index 6b01f55ac9e..bbd9bc85c4c 100644 --- a/integration-tests/otlp/build.gradle.kts +++ b/integration-tests/otlp/build.gradle.kts @@ -10,6 +10,7 @@ dependencies { implementation(project(":exporters:otlp:all")) implementation(project(":api:events")) + implementation(project(":extensions:incubator")) compileOnly("com.google.errorprone:error_prone_annotations") diff --git a/integration-tests/otlp/src/main/java/io/opentelemetry/integrationtest/OtlpExporterIntegrationTest.java b/integration-tests/otlp/src/main/java/io/opentelemetry/integrationtest/OtlpExporterIntegrationTest.java index 7bfa82a3876..271409e8ea3 100644 --- a/integration-tests/otlp/src/main/java/io/opentelemetry/integrationtest/OtlpExporterIntegrationTest.java +++ b/integration-tests/otlp/src/main/java/io/opentelemetry/integrationtest/OtlpExporterIntegrationTest.java @@ -5,11 +5,13 @@ package io.opentelemetry.integrationtest; +import static io.opentelemetry.extension.incubator.logs.AnyValue.of; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.testcontainers.Testcontainers.exposeHostPorts; +import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.ServiceRequestContext; @@ -38,6 +40,8 @@ import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporter; import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; +import io.opentelemetry.extension.incubator.logs.ExtendedLogRecordBuilder; +import io.opentelemetry.extension.incubator.logs.KeyAnyValue; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; @@ -45,7 +49,9 @@ import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.ArrayValue; import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.common.v1.KeyValueList; import io.opentelemetry.proto.logs.v1.ResourceLogs; import io.opentelemetry.proto.logs.v1.ScopeLogs; import io.opentelemetry.proto.metrics.v1.AggregationTemporality; @@ -73,6 +79,7 @@ import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import io.opentelemetry.sdk.trace.export.SpanExporter; import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -514,6 +521,7 @@ void testOtlpHttpLogExport_mtls() throws Exception { testLogRecordExporter(exporter); } + @SuppressWarnings("BadImport") private static void testLogRecordExporter(LogRecordExporter logRecordExporter) { SdkLoggerProvider loggerProvider = SdkLoggerProvider.builder() @@ -539,10 +547,23 @@ private static void testLogRecordExporter(LogRecordExporter logRecordExporter) { TraceState.getDefault()); try (Scope unused = Span.wrap(spanContext).makeCurrent()) { - logger - .logRecordBuilder() + ((ExtendedLogRecordBuilder) logger.logRecordBuilder()) + .setBody( + of( + KeyAnyValue.of("str_key", of("value")), + KeyAnyValue.of("bool_key", of(true)), + KeyAnyValue.of("int_key", of(1L)), + KeyAnyValue.of("double_key", of(1.1)), + KeyAnyValue.of("bytes_key", of("value".getBytes(StandardCharsets.UTF_8))), + KeyAnyValue.of("arr_key", of(of("value"), of(1L))), + KeyAnyValue.of( + "kv_list", + of( + KeyAnyValue.of("child_str_key", of("value")), + KeyAnyValue.of( + "child_kv_list", + of(KeyAnyValue.of("grandchild_str_key", of("value")))))))) .setTimestamp(100, TimeUnit.NANOSECONDS) - .setBody("log body") .setAllAttributes(Attributes.builder().put("key", "value").build()) .setSeverity(Severity.DEBUG) .setSeverityText("DEBUG") @@ -576,7 +597,98 @@ private static void testLogRecordExporter(LogRecordExporter logRecordExporter) { // LogRecord via Logger.logRecordBuilder()...emit() io.opentelemetry.proto.logs.v1.LogRecord protoLog1 = ilLogs.getLogRecords(0); - assertThat(protoLog1.getBody().getStringValue()).isEqualTo("log body"); + assertThat(protoLog1.getBody()) + .isEqualTo( + AnyValue.newBuilder() + .setKvlistValue( + KeyValueList.newBuilder() + .addValues( + KeyValue.newBuilder() + .setKey("str_key") + .setValue(AnyValue.newBuilder().setStringValue("value").build()) + .build()) + .addValues( + KeyValue.newBuilder() + .setKey("bool_key") + .setValue(AnyValue.newBuilder().setBoolValue(true).build()) + .build()) + .addValues( + KeyValue.newBuilder() + .setKey("int_key") + .setValue(AnyValue.newBuilder().setIntValue(1).build()) + .build()) + .addValues( + KeyValue.newBuilder() + .setKey("double_key") + .setValue(AnyValue.newBuilder().setDoubleValue(1.1).build()) + .build()) + .addValues( + KeyValue.newBuilder() + .setKey("bytes_key") + .setValue( + AnyValue.newBuilder() + .setBytesValue( + ByteString.copyFrom( + "value".getBytes(StandardCharsets.UTF_8))) + .build()) + .build()) + .addValues( + KeyValue.newBuilder() + .setKey("arr_key") + .setValue( + AnyValue.newBuilder() + .setArrayValue( + ArrayValue.newBuilder() + .addValues( + AnyValue.newBuilder() + .setStringValue("value") + .build()) + .addValues( + AnyValue.newBuilder().setIntValue(1).build()) + .build()) + .build()) + .build()) + .addValues( + KeyValue.newBuilder() + .setKey("kv_list") + .setValue( + AnyValue.newBuilder() + .setKvlistValue( + KeyValueList.newBuilder() + .addValues( + KeyValue.newBuilder() + .setKey("child_str_key") + .setValue( + AnyValue.newBuilder() + .setStringValue("value") + .build()) + .build()) + .addValues( + KeyValue.newBuilder() + .setKey("child_kv_list") + .setValue( + AnyValue.newBuilder() + .setKvlistValue( + KeyValueList.newBuilder() + .addValues( + KeyValue.newBuilder() + .setKey( + "grandchild_str_key") + .setValue( + AnyValue + .newBuilder() + .setStringValue( + "value") + .build()) + .build()) + .build()) + .build()) + .build()) + .build()) + .build()) + .build()) + .build()) + .build()); assertThat(protoLog1.getAttributesList()) .isEqualTo( Collections.singletonList( diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilder.java index 6bdd0407aa5..70683a0d22b 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilder.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilder.java @@ -88,7 +88,7 @@ public SdkLogRecordBuilder setSeverityText(String severityText) { @Override public SdkLogRecordBuilder setBody(String body) { - this.body = Body.string(body); + this.body = AnyValueBody.create(AnyValue.of(body)); return this; }