diff --git a/build.gradle b/build.gradle index f9a332010..592aab0ca 100644 --- a/build.gradle +++ b/build.gradle @@ -33,7 +33,7 @@ lombok { } group 'com.gotocompany' -version '0.10.1' +version '0.10.2' def projName = "firehose" diff --git a/docs/docs/advance/generic.md b/docs/docs/advance/generic.md index b18238575..fe6353f5d 100644 Binary files a/docs/docs/advance/generic.md and b/docs/docs/advance/generic.md differ diff --git a/docs/docs/sinks/http-sink.md b/docs/docs/sinks/http-sink.md index 845254b57..953c1410a 100644 --- a/docs/docs/sinks/http-sink.md +++ b/docs/docs/sinks/http-sink.md @@ -151,3 +151,12 @@ This config if set to `true` will enable the simple date format (Eg. - `May 3, 2 - Example value: `false` - Type: `optional` - Default value: `true` + +### `SINK_HTTP_SERIALIZER_JSON_TYPECAST` + +Defines the mapping for field typecasting of the resulting JSON Serialization. This configuration could accept multiple mappings for multiple JSON Path +Currently supported typecasting target: DOUBLE, INTEGER, LONG, STRING + +- Example value: `[{"jsonPath": "$.root.someIntegerField", "type": "INTEGER"}, {"jsonPath": "$..[*].doubleField", "type": "DOUBLE"}]` +- Type: `optional` +- Default Value: `[]` \ No newline at end of file diff --git a/env/local.properties b/env/local.properties index 1908b7b76..dbb2a05e7 100644 --- a/env/local.properties +++ b/env/local.properties @@ -121,6 +121,7 @@ SOURCE_KAFKA_CONSUMER_GROUP_ID=sample-group-id # SINK_HTTP_OAUTH2_CLIENT_NAME=client-name # SINK_HTTP_OAUTH2_CLIENT_SECRET=client-secret # SINK_HTTP_OAUTH2_SCOPE=User:read, sys:info +# SINK_HTTP_SERIALIZER_JSON_TYPECAST=[{"jsonPath": "$.root.field1", "type": "INTEGER"},{"jsonPath": "$.root.field2", "type": "INTEGER"},{"jsonPath": "$..field3", "type": "INTEGER"},{"jsonPath": "$..[*].field4", "type": "DOUBLE"}] # # ############################################# diff --git a/src/main/java/com/gotocompany/firehose/config/HttpSinkConfig.java b/src/main/java/com/gotocompany/firehose/config/HttpSinkConfig.java index 8d16a0257..e59a05196 100644 --- a/src/main/java/com/gotocompany/firehose/config/HttpSinkConfig.java +++ b/src/main/java/com/gotocompany/firehose/config/HttpSinkConfig.java @@ -1,5 +1,6 @@ package com.gotocompany.firehose.config; +import com.gotocompany.firehose.config.converter.HttpSinkSerializerJsonTypecastConfigConverter; import com.gotocompany.firehose.config.enums.HttpSinkDataFormatType; import com.gotocompany.firehose.config.enums.HttpSinkParameterPlacementType; import com.gotocompany.firehose.config.enums.HttpSinkParameterSourceType; @@ -11,6 +12,7 @@ import com.gotocompany.firehose.config.converter.RangeToHashMapConverter; import java.util.Map; +import java.util.function.Function; public interface HttpSinkConfig extends AppConfig { @@ -94,4 +96,9 @@ public interface HttpSinkConfig extends AppConfig { @DefaultValue("true") Boolean getSinkHttpSimpleDateFormatEnable(); + @Key("SINK_HTTP_SERIALIZER_JSON_TYPECAST") + @ConverterClass(HttpSinkSerializerJsonTypecastConfigConverter.class) + @DefaultValue("[]") + Map> getSinkHttpSerializerJsonTypecast(); + } diff --git a/src/main/java/com/gotocompany/firehose/config/converter/HttpSinkSerializerJsonTypecastConfigConverter.java b/src/main/java/com/gotocompany/firehose/config/converter/HttpSinkSerializerJsonTypecastConfigConverter.java new file mode 100644 index 000000000..8cfd77800 --- /dev/null +++ b/src/main/java/com/gotocompany/firehose/config/converter/HttpSinkSerializerJsonTypecastConfigConverter.java @@ -0,0 +1,63 @@ +package com.gotocompany.firehose.config.converter; + +import com.gotocompany.firehose.serializer.constant.TypecastTarget; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.aeonbits.owner.Converter; +import org.apache.commons.lang3.StringUtils; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class HttpSinkSerializerJsonTypecastConfigConverter implements Converter>> { + + private final ObjectMapper objectMapper; + + public HttpSinkSerializerJsonTypecastConfigConverter() { + this.objectMapper = new ObjectMapper(); + } + + @Override + public Map> convert(Method method, String input) { + if (StringUtils.isBlank(input)) { + return Collections.emptyMap(); + } + try { + List jsonTypecasts = objectMapper.readValue(input, new TypeReference>() { + }); + validate(jsonTypecasts); + return jsonTypecasts.stream() + .collect(Collectors.toMap(JsonTypecast::getJsonPath, jsonTypecast -> jsonTypecast.getType()::cast)); + } catch (IOException e) { + throw new IllegalArgumentException("Error when parsing serializer json config: " + e.getMessage(), e); + } + } + + private void validate(List jsonTypecasts) { + boolean invalidConfigurationExist = jsonTypecasts.stream() + .anyMatch(jt -> Objects.isNull(jt.getJsonPath()) || Objects.isNull(jt.getType())); + if (invalidConfigurationExist) { + throw new IllegalArgumentException("Invalid configuration: jsonPath or type should not be null"); + } + } + + @AllArgsConstructor + @NoArgsConstructor + @Data + @Builder + private static class JsonTypecast { + private String jsonPath; + private TypecastTarget type; + } + +} diff --git a/src/main/java/com/gotocompany/firehose/serializer/TypecastedJsonSerializer.java b/src/main/java/com/gotocompany/firehose/serializer/TypecastedJsonSerializer.java new file mode 100644 index 000000000..e70b4a317 --- /dev/null +++ b/src/main/java/com/gotocompany/firehose/serializer/TypecastedJsonSerializer.java @@ -0,0 +1,61 @@ +package com.gotocompany.firehose.serializer; + +import com.gotocompany.firehose.config.HttpSinkConfig; +import com.gotocompany.firehose.exception.DeserializerException; +import com.gotocompany.firehose.message.Message; +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.Option; +import lombok.extern.slf4j.Slf4j; + +import java.util.Optional; + +/*** + * MessageSerializer wrapping other JSON MessageSerializer which add capability to typecast some of the fields from the inner serializer. + */ +@Slf4j +public class TypecastedJsonSerializer implements MessageSerializer { + + private final MessageSerializer messageSerializer; + private final HttpSinkConfig httpSinkConfig; + private final Configuration jsonPathConfiguration; + + /** + * Constructor for TypecastedJsonSerializer. + * + * @param messageSerializer the inner serializer to be wrapped + * @param httpSinkConfig the HTTP Sink config configuration containing typecasting parameters, + * where each map entry contains a JSON path and the desired type + */ + public TypecastedJsonSerializer(MessageSerializer messageSerializer, + HttpSinkConfig httpSinkConfig) { + this.messageSerializer = messageSerializer; + this.httpSinkConfig = httpSinkConfig; + this.jsonPathConfiguration = Configuration.builder() + .options(Option.SUPPRESS_EXCEPTIONS) + .build(); + } + + /** + * Serializes the given message, then applies typecasting to specified fields in the resulting JSON. + * + * @param message the message to be serialized + * @return the serialized and typecasted JSON string + * @throws DeserializerException if an error occurs during serialization or typecasting + */ + @Override + public String serialize(Message message) throws DeserializerException { + String jsonString = messageSerializer.serialize(message); + DocumentContext documentContext = JsonPath + .using(jsonPathConfiguration) + .parse(jsonString); + httpSinkConfig.getSinkHttpSerializerJsonTypecast() + .forEach((jsonPath, typecastFunction) -> documentContext.map(jsonPath, + (currentValue, configuration) -> Optional.ofNullable(currentValue) + .map(v -> typecastFunction.apply(v.toString())) + .orElse(null) + )); + return documentContext.jsonString(); + } +} diff --git a/src/main/java/com/gotocompany/firehose/serializer/constant/TypecastTarget.java b/src/main/java/com/gotocompany/firehose/serializer/constant/TypecastTarget.java new file mode 100644 index 000000000..42f5e31f8 --- /dev/null +++ b/src/main/java/com/gotocompany/firehose/serializer/constant/TypecastTarget.java @@ -0,0 +1,39 @@ +package com.gotocompany.firehose.serializer.constant; + +public enum TypecastTarget { + INTEGER { + @Override + public Object cast(String input) { + try { + return Integer.valueOf(input); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid input for INTEGER: " + input, e); + } + } + }, LONG { + @Override + public Object cast(String input) { + try { + return Long.valueOf(input); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid input for LONG: " + input, e); + } + } + }, DOUBLE { + @Override + public Object cast(String input) { + try { + return Double.valueOf(input); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid input for DOUBLE: " + input, e); + } + } + }, STRING { + @Override + public Object cast(String input) { + return String.valueOf(input); + } + }; + + public abstract Object cast(String input); +} diff --git a/src/main/java/com/gotocompany/firehose/sink/http/HttpSinkFactory.java b/src/main/java/com/gotocompany/firehose/sink/http/HttpSinkFactory.java index 569e650b0..2f2c78fa5 100644 --- a/src/main/java/com/gotocompany/firehose/sink/http/HttpSinkFactory.java +++ b/src/main/java/com/gotocompany/firehose/sink/http/HttpSinkFactory.java @@ -37,7 +37,6 @@ public class HttpSinkFactory { */ public static AbstractSink create(Map configuration, StatsDReporter statsDReporter, StencilClient stencilClient) { HttpSinkConfig httpSinkConfig = ConfigFactory.create(HttpSinkConfig.class, configuration); - FirehoseInstrumentation firehoseInstrumentation = new FirehoseInstrumentation(statsDReporter, HttpSinkFactory.class); CloseableHttpClient closeableHttpClient = newHttpClient(httpSinkConfig, statsDReporter); diff --git a/src/main/java/com/gotocompany/firehose/sink/http/factory/SerializerFactory.java b/src/main/java/com/gotocompany/firehose/sink/http/factory/SerializerFactory.java index 923d08471..8de6a8c44 100644 --- a/src/main/java/com/gotocompany/firehose/sink/http/factory/SerializerFactory.java +++ b/src/main/java/com/gotocompany/firehose/sink/http/factory/SerializerFactory.java @@ -8,6 +8,7 @@ import com.gotocompany.firehose.serializer.MessageToJson; import com.gotocompany.firehose.serializer.MessageToTemplatizedJson; import com.gotocompany.depot.metrics.StatsDReporter; +import com.gotocompany.firehose.serializer.TypecastedJsonSerializer; import com.gotocompany.stencil.client.StencilClient; import com.gotocompany.stencil.Parser; import lombok.AllArgsConstructor; @@ -34,10 +35,11 @@ public MessageSerializer build() { Parser protoParser = stencilClient.getParser(httpSinkConfig.getInputSchemaProtoClass()); if (httpSinkConfig.getSinkHttpJsonBodyTemplate().isEmpty()) { firehoseInstrumentation.logDebug("Serializer type: EsbMessageToJson", HttpSinkDataFormatType.JSON); - return new MessageToJson(protoParser, false, httpSinkConfig.getSinkHttpSimpleDateFormatEnable()); + return getTypecastedJsonSerializer(new MessageToJson(protoParser, false, httpSinkConfig.getSinkHttpSimpleDateFormatEnable())); } else { firehoseInstrumentation.logDebug("Serializer type: EsbMessageToTemplatizedJson"); - return MessageToTemplatizedJson.create(new FirehoseInstrumentation(statsDReporter, MessageToTemplatizedJson.class), httpSinkConfig.getSinkHttpJsonBodyTemplate(), protoParser); + return getTypecastedJsonSerializer( + MessageToTemplatizedJson.create(new FirehoseInstrumentation(statsDReporter, MessageToTemplatizedJson.class), httpSinkConfig.getSinkHttpJsonBodyTemplate(), protoParser)); } } @@ -48,6 +50,10 @@ public MessageSerializer build() { return new JsonWrappedProtoByte(); } + private MessageSerializer getTypecastedJsonSerializer(MessageSerializer messageSerializer) { + return new TypecastedJsonSerializer(messageSerializer, httpSinkConfig); + } + private boolean isProtoSchemaEmpty() { return httpSinkConfig.getInputSchemaProtoClass() == null || httpSinkConfig.getInputSchemaProtoClass().equals(""); } diff --git a/src/test/java/com/gotocompany/firehose/converter/HttpSinkSerializerJsonTypecastConfigConverterTest.java b/src/test/java/com/gotocompany/firehose/converter/HttpSinkSerializerJsonTypecastConfigConverterTest.java new file mode 100644 index 000000000..6564625df --- /dev/null +++ b/src/test/java/com/gotocompany/firehose/converter/HttpSinkSerializerJsonTypecastConfigConverterTest.java @@ -0,0 +1,91 @@ +package com.gotocompany.firehose.converter; + +import com.gotocompany.firehose.config.converter.HttpSinkSerializerJsonTypecastConfigConverter; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.util.Map; +import java.util.function.Function; + +public class HttpSinkSerializerJsonTypecastConfigConverterTest { + + private final HttpSinkSerializerJsonTypecastConfigConverter httpSinkSerializerJsonTypecastConfigConverter = new HttpSinkSerializerJsonTypecastConfigConverter(); + + @Test + public void convertShouldConvertToPropertyMapWhenValidJsonConfig() { + String configJson = "[{\"jsonPath\": \"$.root.field\", \"type\": \"LONG\"}]"; + String expectedPropertyMapKey = "$.root.field"; + + Map> result = httpSinkSerializerJsonTypecastConfigConverter.convert(null, configJson); + Function mapper = result.get(expectedPropertyMapKey); + Object mapperResult = mapper.apply("4"); + + Assertions.assertNotNull(mapper); + Assertions.assertTrue(mapperResult instanceof Long); + Assertions.assertEquals(4L, mapperResult); + } + + @Test + public void convertShouldThrowJsonParseExceptionWhenInvalidJsonFormatProvided() { + String malformedConfigJson = "[{\"jsonPath\": \"$.root.field\" \"type\": \"LONG\""; + + Assertions.assertThrows(IllegalArgumentException.class, + () -> httpSinkSerializerJsonTypecastConfigConverter.convert(null, malformedConfigJson)); + } + + @Test + public void convertShouldThrowJsonParseExceptionWhenUnregisteredTypecastingProvided() { + String malformedConfigJson = "[{\"jsonPath\": \"$.root.field\", \"type\": \"BIG_INTEGER\"}]"; + + Assertions.assertThrows(IllegalArgumentException.class, + () -> httpSinkSerializerJsonTypecastConfigConverter.convert(null, malformedConfigJson)); + } + + @Test + public void convertShouldHandleEmptyJsonConfig() { + String emptyConfigJson = "[]"; + + Map> result = httpSinkSerializerJsonTypecastConfigConverter.convert(null, emptyConfigJson); + + Assertions.assertTrue(result.isEmpty()); + } + + @Test + public void convertShouldHandleNullJsonConfig() { + String nullConfigJson = null; + + Map> result = httpSinkSerializerJsonTypecastConfigConverter.convert(null, nullConfigJson); + + Assertions.assertTrue(result.isEmpty()); + } + + @Test + public void convertShouldThrowExceptionForUnsupportedDataType() { + String unsupportedTypeConfigJson = "[{\"jsonPath\": \"$.root.field\", \"type\": \"UNSUPPORTED_TYPE\"}]"; + + Assertions.assertThrows(IllegalArgumentException.class, + () -> httpSinkSerializerJsonTypecastConfigConverter.convert(null, unsupportedTypeConfigJson)); + } + + @Test + public void convertShouldHandleMultipleValidConfigs() { + String multipleConfigJson = "[{\"jsonPath\": \"$.root.field1\", \"type\": \"LONG\"}, {\"jsonPath\": \"$.root.field2\", \"type\": \"STRING\"}]"; + + Map> result = httpSinkSerializerJsonTypecastConfigConverter.convert(null, multipleConfigJson); + Function mapper1 = result.get("$.root.field1"); + Function mapper2 = result.get("$.root.field2"); + + Assertions.assertNotNull(mapper1); + Assertions.assertNotNull(mapper2); + Assertions.assertTrue(mapper1.apply("4") instanceof Long); + Assertions.assertTrue(mapper2.apply("test") instanceof String); + } + + @Test + public void convertShouldThrowExceptionForMissingFieldsInConfig() { + String missingFieldsConfigJson = "[{\"jsonPath\": \"$.root.field\"}]"; + + Assertions.assertThrows(IllegalArgumentException.class, + () -> httpSinkSerializerJsonTypecastConfigConverter.convert(null, missingFieldsConfigJson)); + } +} diff --git a/src/test/java/com/gotocompany/firehose/serializer/TypecastedJsonSerializerTest.java b/src/test/java/com/gotocompany/firehose/serializer/TypecastedJsonSerializerTest.java new file mode 100644 index 000000000..03219c54e --- /dev/null +++ b/src/test/java/com/gotocompany/firehose/serializer/TypecastedJsonSerializerTest.java @@ -0,0 +1,255 @@ +package com.gotocompany.firehose.serializer; + +import com.gotocompany.firehose.config.HttpSinkConfig; +import com.gotocompany.firehose.config.converter.HttpSinkSerializerJsonTypecastConfigConverter; +import com.gotocompany.firehose.message.Message; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.InvalidJsonException; +import com.jayway.jsonpath.JsonPath; +import net.minidev.json.JSONArray; +import org.junit.Before; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.mockito.Mock; +import org.mockito.Mockito; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +public class TypecastedJsonSerializerTest { + + private static final String DEFAULT_JSON_MESSAGE = "{\"key\": \"value\", \"long\":\"1234568129012312\",\"nested\": {\"int\": \"1234\"}, \"double\": \"12.1\", \"numeric\": 10}"; + private static final String DEFAULT_PARAMETERS = "[{\"jsonPath\": \"$..int\", \"type\": \"INTEGER\"}, {\"jsonPath\": \"$..long\", \"type\": \"LONG\"}, {\"jsonPath\": \"$..double\", \"type\": \"DOUBLE\"}, {\"jsonPath\": \"$.numeric\", \"type\": \"STRING\"}]"; + + private TypecastedJsonSerializer typecastedJsonSerializer; + + @Mock + private MessageSerializer messageSerializer; + + @Mock + private HttpSinkConfig httpSinkConfig; + + private HttpSinkSerializerJsonTypecastConfigConverter httpSinkSerializerJsonTypecastConfigConverter = new HttpSinkSerializerJsonTypecastConfigConverter(); + + @Before + public void setup() { + messageSerializer = Mockito.mock(MessageSerializer.class); + httpSinkConfig = Mockito.mock(HttpSinkConfig.class); + Map> property = httpSinkSerializerJsonTypecastConfigConverter.convert(null, DEFAULT_PARAMETERS); + Mockito.when(messageSerializer.serialize(Mockito.any())).thenReturn(DEFAULT_JSON_MESSAGE); + Mockito.when(httpSinkConfig.getSinkHttpSerializerJsonTypecast()).thenReturn(property); + typecastedJsonSerializer = new TypecastedJsonSerializer( + messageSerializer, httpSinkConfig + ); + } + + @Test + public void shouldCastToNumberWhenGivenMessageWithQuoteWrappedNumberAndMatchingJsonPathConfiguration() { + String processedJsonString = typecastedJsonSerializer.serialize(buildMessage("key", DEFAULT_JSON_MESSAGE)); + DocumentContext jsonPath = JsonPath.parse(processedJsonString); + JSONArray integerJsonArray = jsonPath.read("$..int"); + JSONArray longJsonArray = jsonPath.read("$..long"); + JSONArray doubleJsonArray = jsonPath.read("$..double"); + JSONArray castedNumericArray = jsonPath.read("$..numeric"); + + Assertions.assertTrue(integerJsonArray.get(0) instanceof Integer); + Assertions.assertTrue(longJsonArray.get(0) instanceof Long); + Assertions.assertTrue(doubleJsonArray.get(0) instanceof Double); + Assertions.assertTrue(castedNumericArray.get(0) instanceof String); + Assertions.assertEquals(integerJsonArray.get(0), 1234); + Assertions.assertEquals(longJsonArray.get(0), 1234568129012312L); + Assertions.assertEquals(doubleJsonArray.get(0), 12.1); + Assertions.assertEquals(castedNumericArray.get(0), "10"); + } + + @Test + public void shouldIgnoreWhenGivenNullMessageValue() { + String jsonWithNullMappedValue = "{\"key\": \"value\", \"long\":null}"; + Mockito.when(messageSerializer.serialize(Mockito.any())).thenReturn(jsonWithNullMappedValue); + String processedJsonString = typecastedJsonSerializer.serialize(buildMessage("key", jsonWithNullMappedValue)); + DocumentContext jsonPath = JsonPath.parse(processedJsonString); + JSONArray fieldWithValue = jsonPath.read("$..key"); + JSONArray integerJsonArray = jsonPath.read("$..long"); + + Assertions.assertEquals("value", fieldWithValue.get(0)); + Assertions.assertNull(integerJsonArray.get(0)); + } + + @Test + public void serializeShouldHandleEmptyJsonMessage() { + String emptyJsonMessage = "{}"; + Mockito.when(messageSerializer.serialize(Mockito.any())).thenReturn(emptyJsonMessage); + String processedJsonString = typecastedJsonSerializer.serialize(buildMessage("key", emptyJsonMessage)); + DocumentContext jsonPath = JsonPath.parse(processedJsonString); + + Assertions.assertEquals(JsonPath.parse(emptyJsonMessage).jsonString(), jsonPath.jsonString()); + Assertions.assertEquals(emptyJsonMessage, processedJsonString); + } + + @Test + public void serializeShouldThrowExceptionForInvalidJsonMessage() { + String invalidJsonMessage = "{key value}"; + Mockito.when(messageSerializer.serialize(Mockito.any())).thenReturn(invalidJsonMessage); + + Assertions.assertThrows(InvalidJsonException.class, + () -> typecastedJsonSerializer.serialize(buildMessage("key", invalidJsonMessage))); + } + + @Test + public void serializeShouldHandleNestedJsonPathConfiguration() { + String nestedJsonMessage = "{\"key\": \"value\", \"nested\": {\"int\": \"1234\"}}"; + String parameters = "[{\"jsonPath\": \"$.nested.int\", \"type\": \"INTEGER\"}]"; + Map> property = httpSinkSerializerJsonTypecastConfigConverter.convert(null, parameters); + Mockito.when(httpSinkConfig.getSinkHttpSerializerJsonTypecast()).thenReturn(property); + Mockito.when(messageSerializer.serialize(Mockito.any())).thenReturn(nestedJsonMessage); + + String processedJsonString = typecastedJsonSerializer.serialize(buildMessage("key", nestedJsonMessage)); + DocumentContext jsonPath = JsonPath.parse(processedJsonString); + Object integerJsonArray = jsonPath.read("$.nested.int"); + + Assertions.assertTrue(integerJsonArray instanceof Integer); + Assertions.assertEquals(integerJsonArray, 1234); + Assertions.assertEquals("{\"key\":\"value\",\"nested\":{\"int\":1234}}", processedJsonString); + } + + @Test + public void shouldReturnMessageAsItIsWhenNoJsonPathConfigurationGiven() { + Mockito.when(httpSinkConfig.getSinkHttpSerializerJsonTypecast()).thenReturn(new HashMap<>()); + typecastedJsonSerializer = new TypecastedJsonSerializer( + messageSerializer, httpSinkConfig + ); + + String result = typecastedJsonSerializer.serialize(buildMessage("key", DEFAULT_JSON_MESSAGE)); + + Assertions.assertEquals(JsonPath.parse(DEFAULT_JSON_MESSAGE).jsonString(), JsonPath.parse(result).jsonString()); + } + + @Test + public void shouldReturnMessageAsItIsWhenJsonPathConfigurationDoesNotMatch() { + String parameters = "[{\"jsonPath\": \"$..unrecognizedPath\", \"type\": \"INTEGER\"}]"; + Map> property = httpSinkSerializerJsonTypecastConfigConverter.convert(null, parameters); + Mockito.when(httpSinkConfig.getSinkHttpSerializerJsonTypecast()).thenReturn(property); + + String result = typecastedJsonSerializer.serialize(buildMessage("key", DEFAULT_JSON_MESSAGE)); + + Assertions.assertEquals(JsonPath.parse(DEFAULT_JSON_MESSAGE).jsonString(), JsonPath.parse(result).jsonString()); + } + + @Test + public void shouldThrowNumberFormatExceptionWhenPayloadTypecastIntegerIsUnparseable() { + String payload = "{\"key\": \"value\", \"long\":\"1234568129012312\",\"nested\": {\"int\": \"1234\"}, \"double\": \"12.1\"}"; + String parameters = "[{\"jsonPath\": \"$.key\", \"type\": \"INTEGER\"}]"; + Map> property = httpSinkSerializerJsonTypecastConfigConverter.convert(null, parameters); + Mockito.when(httpSinkConfig.getSinkHttpSerializerJsonTypecast()).thenReturn(property); + Mockito.when(messageSerializer.serialize(Mockito.any())).thenReturn(payload); + + Assertions.assertThrows(IllegalArgumentException.class, + () -> typecastedJsonSerializer.serialize(buildMessage("key", DEFAULT_JSON_MESSAGE))); + } + + @Test + public void shouldThrowNumberFormatExceptionWhenPayloadTypecastDoubleIsUnparseable() { + String payload = "{\"key\": \"value\", \"long\":\"1234568129012312\",\"nested\": {\"int\": \"1234\"}, \"double\": \"12.1\"}"; + String parameters = "[{\"jsonPath\": \"$.key\", \"type\": \"DOUBLE\"}]"; + Map> property = httpSinkSerializerJsonTypecastConfigConverter.convert(null, parameters); + Mockito.when(httpSinkConfig.getSinkHttpSerializerJsonTypecast()).thenReturn(property); + Mockito.when(messageSerializer.serialize(Mockito.any())).thenReturn(payload); + + Assertions.assertThrows(IllegalArgumentException.class, + () -> typecastedJsonSerializer.serialize(buildMessage("key", DEFAULT_JSON_MESSAGE))); + } + + @Test + public void shouldThrowNumberFormatExceptionWhenPayloadTypecastLongIsUnparseable() { + String payload = "{\"key\": \"value\", \"long\":\"1234568129012312\",\"nested\": {\"int\": \"1234\"}, \"double\": \"12.1\"}"; + String parameters = "[{\"jsonPath\": \"$.key\", \"type\": \"LONG\"}]"; + Map> property = httpSinkSerializerJsonTypecastConfigConverter.convert(null, parameters); + Mockito.when(httpSinkConfig.getSinkHttpSerializerJsonTypecast()).thenReturn(property); + Mockito.when(messageSerializer.serialize(Mockito.any())).thenReturn(payload); + + Assertions.assertThrows(IllegalArgumentException.class, + () -> typecastedJsonSerializer.serialize(buildMessage("key", DEFAULT_JSON_MESSAGE))); + } + + @Test + public void shouldHandleEmptyJsonMessage() { + String emptyJsonMessage = "{}"; + Mockito.when(messageSerializer.serialize(Mockito.any())).thenReturn(emptyJsonMessage); + + String result = typecastedJsonSerializer.serialize(buildMessage("key", emptyJsonMessage)); + + Assertions.assertEquals(JsonPath.parse(emptyJsonMessage).jsonString(), JsonPath.parse(result).jsonString()); + } + + @Test + public void shouldHandleEmptyJsonPathConfiguration() { + String parameters = "[]"; + Map> property = httpSinkSerializerJsonTypecastConfigConverter.convert(null, parameters); + Mockito.when(httpSinkConfig.getSinkHttpSerializerJsonTypecast()).thenReturn(property); + + String result = typecastedJsonSerializer.serialize(buildMessage("key", DEFAULT_JSON_MESSAGE)); + + Assertions.assertEquals(JsonPath.parse(DEFAULT_JSON_MESSAGE).jsonString(), JsonPath.parse(result).jsonString()); + } + + @Test + public void shouldHandleInvalidJsonMessage() { + String invalidJsonMessage = "{\"key\": \"value\", \"long\":}"; + Mockito.when(messageSerializer.serialize(Mockito.any())).thenReturn(invalidJsonMessage); + + Assertions.assertThrows(InvalidJsonException.class, + () -> typecastedJsonSerializer.serialize(buildMessage("key", invalidJsonMessage))); + } + + @Test + public void shouldHandleNonMatchingJsonPathConfiguration() { + String parameters = "[{\"jsonPath\": \"$..nonExistentField\", \"type\": \"INTEGER\"}]"; + Map> property = httpSinkSerializerJsonTypecastConfigConverter.convert(null, parameters); + Mockito.when(httpSinkConfig.getSinkHttpSerializerJsonTypecast()).thenReturn(property); + + String result = typecastedJsonSerializer.serialize(buildMessage("key", DEFAULT_JSON_MESSAGE)); + + Assertions.assertEquals(JsonPath.parse(DEFAULT_JSON_MESSAGE).jsonString(), JsonPath.parse(result).jsonString()); + } + + @Test + public void shouldHandleNestedJsonPathConfiguration() { + String parameters = "[{\"jsonPath\": \"$..nested.int\", \"type\": \"INTEGER\"}]"; + Map> property = httpSinkSerializerJsonTypecastConfigConverter.convert(null, parameters); + Mockito.when(httpSinkConfig.getSinkHttpSerializerJsonTypecast()).thenReturn(property); + + String result = typecastedJsonSerializer.serialize(buildMessage("key", DEFAULT_JSON_MESSAGE)); + DocumentContext jsonPath = JsonPath.parse(result); + JSONArray nestedIntJsonArray = jsonPath.read("$..nested.int"); + + Assertions.assertEquals(nestedIntJsonArray.get(0), 1234); + } + + @Test + public void shouldHandleMultipleJsonPathConfigurations() { + String parameters = "[{\"jsonPath\": \"$..int\", \"type\": \"INTEGER\"}, {\"jsonPath\": \"$..double\", \"type\": \"DOUBLE\"}]"; + Map> property = httpSinkSerializerJsonTypecastConfigConverter.convert(null, parameters); + Mockito.when(httpSinkConfig.getSinkHttpSerializerJsonTypecast()).thenReturn(property); + + String result = typecastedJsonSerializer.serialize(buildMessage("key", DEFAULT_JSON_MESSAGE)); + DocumentContext jsonPath = JsonPath.parse(result); + JSONArray intJsonArray = jsonPath.read("$..int"); + JSONArray doubleJsonArray = jsonPath.read("$..double"); + + Assertions.assertEquals(intJsonArray.get(0), 1234); + Assertions.assertEquals(doubleJsonArray.get(0), 12.1); + } + + + private Message buildMessage(String key, String payload) { + return new Message( + key.getBytes(StandardCharsets.UTF_8), + payload.getBytes(StandardCharsets.UTF_8), + "topic", + 1, + 1 + ); + } +}