diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/converter/AvroBinaryDecoders.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/converter/AvroBinaryDecoders.java index 4c7d397651..724ae88037 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/converter/AvroBinaryDecoders.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/converter/AvroBinaryDecoders.java @@ -1,8 +1,6 @@ package pl.allegro.tech.hermes.common.message.converter; -import java.io.ByteArrayInputStream; -import java.io.Closeable; -import java.io.InputStream; +import org.apache.avro.Conversion; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; @@ -11,6 +9,11 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import tech.allegro.schema.json2avro.converter.AvroConversionException; +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.InputStream; +import java.util.List; + public class AvroBinaryDecoders { private static ThreadLocal threadLocalEmptyInputStream = @@ -20,11 +23,13 @@ public class AvroBinaryDecoders { ThreadLocal.withInitial( () -> DecoderFactory.get().binaryDecoder(threadLocalEmptyInputStream.get(), null)); - static GenericRecord decodeReusingThreadLocalBinaryDecoder(byte[] message, Schema schema) { + static GenericRecord decodeReusingThreadLocalBinaryDecoder(byte[] message, Schema schema, List> logicalTypeConversions) { try (FlushableBinaryDecoderHolder holder = new FlushableBinaryDecoderHolder()) { BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(message, holder.getBinaryDecoder()); - return new GenericDatumReader(schema).read(null, binaryDecoder); + GenericDatumReader genericDatumWriter = new GenericDatumReader<>(schema); + logicalTypeConversions.forEach(conversion -> genericDatumWriter.getData().addLogicalTypeConversion(conversion)); + return genericDatumWriter.read(null, binaryDecoder); } catch (Exception e) { String reason = e.getMessage() == null ? ExceptionUtils.getRootCauseMessage(e) : e.getMessage(); diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/converter/AvroRecordToBytesConverter.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/converter/AvroRecordToBytesConverter.java index 64762fcccc..765c5f113c 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/converter/AvroRecordToBytesConverter.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/message/converter/AvroRecordToBytesConverter.java @@ -1,17 +1,25 @@ package pl.allegro.tech.hermes.common.message.converter; -import java.io.ByteArrayOutputStream; -import java.io.IOException; +import org.apache.avro.Conversion; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.EncoderFactory; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Collections; +import java.util.List; + public interface AvroRecordToBytesConverter { static GenericRecord bytesToRecord(byte[] data, Schema schema) { - return AvroBinaryDecoders.decodeReusingThreadLocalBinaryDecoder(data, schema); + return bytesToRecord(data, schema, Collections.emptyList()); + } + + static GenericRecord bytesToRecord(byte[] data, Schema schema, List> logicalTypeConversions) { + return AvroBinaryDecoders.decodeReusingThreadLocalBinaryDecoder(data, schema, logicalTypeConversions); } static byte[] recordToBytes(GenericRecord genericRecord, Schema schema) throws IOException { diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/converter/AvroToJsonMessageConverter.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/converter/AvroToJsonMessageConverter.java index fe9e4345c5..c38e231fa6 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/converter/AvroToJsonMessageConverter.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/converter/AvroToJsonMessageConverter.java @@ -13,6 +13,8 @@ import pl.allegro.tech.hermes.consumers.consumer.Message; import tech.allegro.schema.json2avro.converter.JsonAvroConverter; +import java.util.List; + public class AvroToJsonMessageConverter implements MessageConverter { private final JsonAvroConverter converter; @@ -35,7 +37,7 @@ public Message convert(Message message, Topic topic) { } private GenericRecord recordWithoutMetadata(byte[] data, Schema schema) { - GenericRecord original = bytesToRecord(data, schema); + GenericRecord original = bytesToRecord(data, schema, List.of(new DecimalToStringConversion())); Schema schemaWithoutMetadata = removeMetadataField(schema); GenericRecordBuilder builder = new GenericRecordBuilder(schemaWithoutMetadata); schemaWithoutMetadata diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/converter/DecimalToStringConversion.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/converter/DecimalToStringConversion.java new file mode 100644 index 0000000000..70244ba7a6 --- /dev/null +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/converter/DecimalToStringConversion.java @@ -0,0 +1,33 @@ +package pl.allegro.tech.hermes.consumers.consumer.converter; + +import org.apache.avro.Conversion; +import org.apache.avro.Conversions; +import org.apache.avro.LogicalType; +import org.apache.avro.Schema; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; + +class DecimalToStringConversion extends Conversion { + private final Conversions.DecimalConversion decimalConversion = new Conversions.DecimalConversion(); + + @Override + public Class getConvertedType() { + return String.class; + } + + @Override + public String fromBytes(ByteBuffer value, Schema schema, LogicalType type) { + return decimalConversion.fromBytes(value, schema, type).toString(); + } + + @Override + public ByteBuffer toBytes(String value, Schema schema, LogicalType type) { + return decimalConversion.toBytes(new BigDecimal(value), schema, type); + } + + @Override + public String getLogicalTypeName() { + return "decimal"; + } +} diff --git a/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/converter/DecimalToStringConversionTest.java b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/converter/DecimalToStringConversionTest.java new file mode 100644 index 0000000000..02ebbaf4ef --- /dev/null +++ b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/converter/DecimalToStringConversionTest.java @@ -0,0 +1,58 @@ +package pl.allegro.tech.hermes.consumers.consumer.converter; + +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; + +@RunWith(MockitoJUnitRunner.class) +public class DecimalToStringConversionTest { + + private Schema schema; + private LogicalType logicalType; + + @Before + public void setup() { + schema = Schema.create(Schema.Type.BYTES); + schema.addProp("logicalType", "decimal"); + schema.addProp("precision", 10); + schema.addProp("scale", 2); + logicalType = LogicalTypes.fromSchema(schema); + } + + @Test + public void toFromBytes() { + // given + final String value = "19.91"; + final DecimalToStringConversion conversion = new DecimalToStringConversion(); + + //when + final ByteBuffer byteBuffer = conversion.toBytes(value, schema, logicalType); + final String result = conversion.fromBytes(byteBuffer, schema, logicalType); + + //then + Assert.assertEquals(result, value); + } + + @Test + public void fromToBytes() { + // given + final ByteBuffer value = ByteBuffer.wrap(new BigDecimal("19.91").unscaledValue().toByteArray()); + final DecimalToStringConversion conversion = new DecimalToStringConversion(); + + //when + final String decimal = conversion.fromBytes(value, schema, logicalType); + final ByteBuffer result = conversion.toBytes(decimal, schema, logicalType); + + //then + Assert.assertEquals(result, value); + } + +} diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/PublishingAvroTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/PublishingAvroTest.java index 9dd6e576f3..57f13639e4 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/PublishingAvroTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/PublishingAvroTest.java @@ -20,8 +20,13 @@ import java.time.Clock; import java.time.Duration; +import java.util.HashMap; import java.util.Map; import java.util.UUID; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import net.javacrumbs.jsonunit.core.Option; import org.apache.avro.Schema; import org.junit.jupiter.api.Test; @@ -55,6 +60,63 @@ public class PublishingAvroTest { private static final AvroUser user = new AvroUser("Bob", 50, "blue"); + private static final ObjectMapper objMapper = new ObjectMapper(); + + @Test + public void shouldConsumeJsonMessageWithDecimalFromAvroTopic() throws JsonProcessingException { + // given + String schema = """ + { + "namespace": "pl.allegro", + "type": "record", + "name": "User", + "fields": [ + { + "name": "__metadata", + "type": ["null", {"type": "map", "values": "string"}], + "default": null + }, + {"name": "name", "type": "string"}, + {"name": "balance","type": { + "type": "bytes", + "logicalType": "decimal", + "precision": 10, + "scale": 2 + } + } + ] + }"""; + + + TopicWithSchema topicWithSchema = topicWithSchema(topicWithRandomName() + .withContentType(AVRO) + .build(), schema); + Topic topic = hermes.initHelper().createTopicWithSchema(topicWithSchema); + + TestSubscriber subscriber = subscribers.createSubscriber(); + + hermes.initHelper().createSubscription( + subscription(topic.getQualifiedName(), "subscription", subscriber.getEndpoint()).build() + ); + + Map map = Map.of( + "name", "Bob", "balance", "1.20" + ); + + String userWithBalance = objMapper.writeValueAsString(map); + + // when + hermes.api().publishJSONUntilSuccess(topic.getQualifiedName(), userWithBalance); + + // then + subscriber.waitUntilAnyMessageReceived(); + + Map actual = objMapper.readValue( + subscriber.getLastReceivedRequest().getBodyAsString(), + new TypeReference>() {}); + assertThat(actual.get("balance")).isEqualTo("1.20"); + } + @Test public void shouldPublishAvroAndConsumeJsonMessage() { // given