Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gh 1808 logical type decimal to json as string #1920

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package pl.allegro.tech.hermes.common.message.converter;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.InputStream;
import org.apache.avro.Conversion;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -11,6 +9,11 @@
import org.apache.commons.lang3.exception.ExceptionUtils;
import tech.allegro.schema.json2avro.converter.AvroConversionException;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.InputStream;
import java.util.List;

public class AvroBinaryDecoders {

private static ThreadLocal<InputStream> threadLocalEmptyInputStream =
Expand All @@ -20,11 +23,13 @@ public class AvroBinaryDecoders {
ThreadLocal.withInitial(
() -> DecoderFactory.get().binaryDecoder(threadLocalEmptyInputStream.get(), null));

static GenericRecord decodeReusingThreadLocalBinaryDecoder(byte[] message, Schema schema) {
static GenericRecord decodeReusingThreadLocalBinaryDecoder(byte[] message, Schema schema, List<Conversion<?>> logicalTypeConversions) {
try (FlushableBinaryDecoderHolder holder = new FlushableBinaryDecoderHolder()) {
BinaryDecoder binaryDecoder =
DecoderFactory.get().binaryDecoder(message, holder.getBinaryDecoder());
return new GenericDatumReader<GenericRecord>(schema).read(null, binaryDecoder);
GenericDatumReader<GenericRecord> genericDatumWriter = new GenericDatumReader<>(schema);
logicalTypeConversions.forEach(conversion -> genericDatumWriter.getData().addLogicalTypeConversion(conversion));
return genericDatumWriter.read(null, binaryDecoder);
} catch (Exception e) {
String reason =
e.getMessage() == null ? ExceptionUtils.getRootCauseMessage(e) : e.getMessage();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
package pl.allegro.tech.hermes.common.message.converter;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.avro.Conversion;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.List;

public interface AvroRecordToBytesConverter {

static GenericRecord bytesToRecord(byte[] data, Schema schema) {
return AvroBinaryDecoders.decodeReusingThreadLocalBinaryDecoder(data, schema);
return bytesToRecord(data, schema, Collections.emptyList());
}

static GenericRecord bytesToRecord(byte[] data, Schema schema, List<Conversion<?>> logicalTypeConversions) {
return AvroBinaryDecoders.decodeReusingThreadLocalBinaryDecoder(data, schema, logicalTypeConversions);
}

static byte[] recordToBytes(GenericRecord genericRecord, Schema schema) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import pl.allegro.tech.hermes.consumers.consumer.Message;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;

import java.util.List;

public class AvroToJsonMessageConverter implements MessageConverter {

private final JsonAvroConverter converter;
Expand All @@ -35,7 +37,7 @@ public Message convert(Message message, Topic topic) {
}

private GenericRecord recordWithoutMetadata(byte[] data, Schema schema) {
GenericRecord original = bytesToRecord(data, schema);
GenericRecord original = bytesToRecord(data, schema, List.of(new DecimalToStringConversion()));
Schema schemaWithoutMetadata = removeMetadataField(schema);
GenericRecordBuilder builder = new GenericRecordBuilder(schemaWithoutMetadata);
schemaWithoutMetadata
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package pl.allegro.tech.hermes.consumers.consumer.converter;

import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;

import java.math.BigDecimal;
import java.nio.ByteBuffer;

class DecimalToStringConversion extends Conversion<String> {
private final Conversions.DecimalConversion decimalConversion = new Conversions.DecimalConversion();

@Override
public Class<String> getConvertedType() {
return String.class;
}

@Override
public String fromBytes(ByteBuffer value, Schema schema, LogicalType type) {
return decimalConversion.fromBytes(value, schema, type).toString();
}

@Override
public ByteBuffer toBytes(String value, Schema schema, LogicalType type) {
return decimalConversion.toBytes(new BigDecimal(value), schema, type);
}

@Override
public String getLogicalTypeName() {
return "decimal";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package pl.allegro.tech.hermes.consumers.consumer.converter;

import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;

import java.math.BigDecimal;
import java.nio.ByteBuffer;

@RunWith(MockitoJUnitRunner.class)
public class DecimalToStringConversionTest {

private Schema schema;
private LogicalType logicalType;

@Before
public void setup() {
schema = Schema.create(Schema.Type.BYTES);
schema.addProp("logicalType", "decimal");
schema.addProp("precision", 10);
schema.addProp("scale", 2);
logicalType = LogicalTypes.fromSchema(schema);
}

@Test
public void toFromBytes() {
// given
final String value = "19.91";
final DecimalToStringConversion conversion = new DecimalToStringConversion();

//when
final ByteBuffer byteBuffer = conversion.toBytes(value, schema, logicalType);
final String result = conversion.fromBytes(byteBuffer, schema, logicalType);

//then
Assert.assertEquals(result, value);
}

@Test
public void fromToBytes() {
// given
final ByteBuffer value = ByteBuffer.wrap(new BigDecimal("19.91").unscaledValue().toByteArray());
final DecimalToStringConversion conversion = new DecimalToStringConversion();

//when
final String decimal = conversion.fromBytes(value, schema, logicalType);
final ByteBuffer result = conversion.toBytes(decimal, schema, logicalType);

//then
Assert.assertEquals(result, value);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@

import java.time.Clock;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import net.javacrumbs.jsonunit.core.Option;
import org.apache.avro.Schema;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -55,6 +60,63 @@ public class PublishingAvroTest {

private static final AvroUser user = new AvroUser("Bob", 50, "blue");

private static final ObjectMapper objMapper = new ObjectMapper();

@Test
public void shouldConsumeJsonMessageWithDecimalFromAvroTopic() throws JsonProcessingException {
// given
String schema = """
{
"namespace": "pl.allegro",
"type": "record",
"name": "User",
"fields": [
{
"name": "__metadata",
"type": ["null", {"type": "map", "values": "string"}],
"default": null
},
{"name": "name", "type": "string"},
{"name": "balance","type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 10,
"scale": 2
}
}
]
}""";


TopicWithSchema topicWithSchema = topicWithSchema(topicWithRandomName()
.withContentType(AVRO)
.build(), schema);
Topic topic = hermes.initHelper().createTopicWithSchema(topicWithSchema);

TestSubscriber subscriber = subscribers.createSubscriber();

hermes.initHelper().createSubscription(
subscription(topic.getQualifiedName(), "subscription", subscriber.getEndpoint()).build()
);

Map<String, Object> map = Map.of(
"name", "Bob", "balance", "1.20"
);

String userWithBalance = objMapper.writeValueAsString(map);

// when
hermes.api().publishJSONUntilSuccess(topic.getQualifiedName(), userWithBalance);

// then
subscriber.waitUntilAnyMessageReceived();

Map<String, Object> actual = objMapper.readValue(
subscriber.getLastReceivedRequest().getBodyAsString(),
new TypeReference<HashMap<String,Object>>() {});
assertThat(actual.get("balance")).isEqualTo("1.20");
}

@Test
public void shouldPublishAvroAndConsumeJsonMessage() {
// given
Expand Down
Loading