Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JSON Field Typecast Support #39

Merged
merged 39 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
cc6eb6e
Add typecast functionality
ekawinataa Jun 13, 2024
f896b71
Add default value
ekawinataa Jun 13, 2024
7688ecf
directly use map function
ekawinataa Jun 13, 2024
698ca06
Rename class
ekawinataa Jun 13, 2024
2bac0d7
Add unit test for newly added class
ekawinataa Jun 14, 2024
66c7571
Update unit test
ekawinataa Jun 14, 2024
7971ae1
Merge branch 'main' of github.com:goto/firehose into add-typecast-fun…
ekawinataa Jun 14, 2024
cf33578
cleanup
ekawinataa Jun 14, 2024
199ca9d
Update serializer to handle exception for not found path
ekawinataa Jun 14, 2024
c1744c8
Add double type casting
ekawinataa Jun 14, 2024
713290c
Rename converter class
ekawinataa Jun 14, 2024
6672736
Rename enum
ekawinataa Jun 14, 2024
998c366
Change method name
ekawinataa Jun 14, 2024
6d65e89
Update test
ekawinataa Jun 14, 2024
17c30dd
rethrow as illegal argument exception
ekawinataa Jun 14, 2024
ca31eac
Rename classes and methods
ekawinataa Jun 14, 2024
7684284
Add comment
ekawinataa Jun 14, 2024
8277258
Add comment and SerializerConfig default value
ekawinataa Jun 14, 2024
11c2a6a
Rename field
ekawinataa Jun 14, 2024
f99175b
Apply null check before mapping
ekawinataa Jun 17, 2024
eb30be4
Use formatter
ekawinataa Jun 17, 2024
0f7447a
Add configuration option to ignore unknown path instead of throwing e…
ekawinataa Jun 17, 2024
40d1fc9
Rename test to match the rule and remove unused import
ekawinataa Jun 19, 2024
b40bd2a
update styling
ekawinataa Jun 19, 2024
a9ee36b
Add more testcases and separate testcases
ekawinataa Jun 19, 2024
c615d69
Add test when message is unparseable
ekawinataa Jun 19, 2024
2b0a72e
Remove separate config, use existing HttpConfig
ekawinataa Jun 19, 2024
03a76d7
Handle edge cases for SerializerConfigConverter
ekawinataa Jun 19, 2024
09af216
Add additional unit test for TypecastedJsonSerializerTest
ekawinataa Jun 19, 2024
40b3203
bump version
ekawinataa Jun 19, 2024
9900d48
add example for SINK_HTTP_SERIALIZER_JSON_TYPECAST
ekawinataa Jun 19, 2024
ad51bb7
only bump version
ekawinataa Jun 19, 2024
92189ab
Move typecasting docs from generic.md to http-sink.md
ekawinataa Jun 19, 2024
0b7e275
Rename converter to be more fitting
ekawinataa Jun 19, 2024
8ad587a
rename config method name
ekawinataa Jun 19, 2024
b4f9041
Update the loop to use lambda
ekawinataa Jun 19, 2024
eca5e7a
Remove unused import
ekawinataa Jun 19, 2024
1793e7c
Add more test
ekawinataa Jun 19, 2024
9b6f860
Add test for Numeric to String typecast
ekawinataa Jun 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified docs/docs/advance/generic.md
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.gotocompany.firehose.config;

import com.gotocompany.firehose.config.converter.SerializerConfigConverter;
import org.aeonbits.owner.Config;

import java.util.Map;
import java.util.function.Function;

public interface SerializerConfig extends Config {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"There is no need for this separate interface. Can you move this configuration to HttpSinkConfig?"

@Config.Key("SERIALIZER_JSON_TYPECAST")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after moving you can rename this config and prefix "SINK_HTTP_" to it

@Config.ConverterClass(SerializerConfigConverter.class)
@DefaultValue("[]")
Map<String, Function<String, Object>> getJsonTypecastMapping();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.gotocompany.firehose.config.converter;

import com.gotocompany.firehose.serializer.constant.TypecastTarget;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.aeonbits.owner.Converter;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

@Slf4j
public class SerializerConfigConverter implements Converter<Map<String, Function<String, Object>>> {

private final ObjectMapper objectMapper;

public SerializerConfigConverter() {
this.objectMapper = new ObjectMapper();
}

@Override
public Map<String, Function<String, Object>> convert(Method method, String input) {
try {
List<JsonTypecast> jsonTypecasts =
objectMapper.readValue(input, new TypeReference<List<JsonTypecast>>(){});
return jsonTypecasts.stream()
.collect(Collectors.toMap(JsonTypecast::getJsonPath, jsonTypecast -> jsonTypecast.getType()::cast));
} catch (IOException e) {
log.error("Error when parsing serializer json config", e);
throw new IllegalArgumentException(e.getMessage(), e.getCause());
}
}

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
private static class JsonTypecast {
private String jsonPath;
private TypecastTarget type;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.gotocompany.firehose.serializer;

import com.gotocompany.firehose.config.SerializerConfig;
import com.gotocompany.firehose.exception.DeserializerException;
import com.gotocompany.firehose.message.Message;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

/***
* MessageSerializer wrapping other JSON MessageSerializer which add capability to typecast
* some of the fields from the inner serializer
*/
@Slf4j
public class TypecastedJsonSerializer implements MessageSerializer {

private final MessageSerializer messageSerializer;
private final SerializerConfig serializerConfig;
private final Configuration configuration;

/**
* Constructor for TypecastedJsonSerializer.
*
* @param messageSerializer the inner serializer to be wrapped
* @param serializerConfig the configuration for typecasting, where each map contains
* a JSON path and the desired type
*/
public TypecastedJsonSerializer(MessageSerializer messageSerializer,
SerializerConfig serializerConfig) {
this.messageSerializer = messageSerializer;
this.serializerConfig = serializerConfig;
this.configuration = Configuration.builder()
.options(Option.SUPPRESS_EXCEPTIONS)
.build();
}

/**
* Serializes the given message, then applies typecasting to specified fields in the resulting JSON.
*
* @param message the message to be serialized
* @return the serialized and typecasted JSON string
* @throws DeserializerException if an error occurs during serialization or typecasting
*/
@Override
public String serialize(Message message) throws DeserializerException {
String jsonString = messageSerializer.serialize(message);
DocumentContext documentContext = JsonPath
.using(configuration)
.parse(jsonString);

for (Map.Entry<String, Function<String, Object>> entry : serializerConfig.getJsonTypecastMapping()
.entrySet()) {
documentContext.map(entry.getKey(), (currentValue, configuration) -> Optional.ofNullable(currentValue)
.map(v -> entry.getValue().apply(v.toString()))
.orElse(null)
);
}
return documentContext.jsonString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.gotocompany.firehose.serializer.constant;

public enum TypecastTarget {
INTEGER {
@Override
public Object cast(String input) {
return Integer.valueOf(input);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add try-catch not to allow invalid input to make it future-proof

            try {
                return Integer.valueOf(input);
            } catch (NumberFormatException e) {
                throw new IllegalArgumentException("Invalid input for INTEGER: " + input, e);
            }

}
}, LONG {
@Override
public Object cast(String input) {
return Long.valueOf(input);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add try-catch not to allow invalid input to make it future-proof

            try {
                return Long.valueOf(input);
            } catch (NumberFormatException e) {
                throw new IllegalArgumentException("Invalid input for LONG: " + input, e);
            }

}
}, DOUBLE {
@Override
public Object cast(String input) {
return Double.valueOf(input);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add try-catch not to allow invalid input to make it future-proof

            try {
                return Double.valueOf(input);
            } catch (NumberFormatException e) {
                throw new IllegalArgumentException("Invalid input for DOUBLE: " + input, e);
            }

}
};

public abstract Object cast(String input);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import com.gotocompany.firehose.config.HttpSinkConfig;
import com.gotocompany.firehose.config.SerializerConfig;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
import com.gotocompany.firehose.sink.http.auth.OAuth2Credential;
import com.gotocompany.firehose.sink.http.request.RequestFactory;
Expand Down Expand Up @@ -37,15 +38,15 @@ public class HttpSinkFactory {
*/
public static AbstractSink create(Map<String, String> configuration, StatsDReporter statsDReporter, StencilClient stencilClient) {
HttpSinkConfig httpSinkConfig = ConfigFactory.create(HttpSinkConfig.class, configuration);

SerializerConfig serializerConfig = ConfigFactory.create(SerializerConfig.class, configuration);
FirehoseInstrumentation firehoseInstrumentation = new FirehoseInstrumentation(statsDReporter, HttpSinkFactory.class);

CloseableHttpClient closeableHttpClient = newHttpClient(httpSinkConfig, statsDReporter);
firehoseInstrumentation.logInfo("HTTP connection established");

UriParser uriParser = new UriParser(stencilClient.getParser(httpSinkConfig.getInputSchemaProtoClass()), httpSinkConfig.getKafkaRecordParserMode());

Request request = new RequestFactory(statsDReporter, httpSinkConfig, stencilClient, uriParser).createRequest();
Request request = new RequestFactory(statsDReporter, httpSinkConfig, stencilClient, uriParser, serializerConfig).createRequest();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this serializerConfig parameter after you move the config to the HttpSinkConfig


return new HttpSink(new FirehoseInstrumentation(statsDReporter, HttpSink.class), request, closeableHttpClient, stencilClient, httpSinkConfig.getSinkHttpRetryStatusCodeRanges(), httpSinkConfig.getSinkHttpRequestLogStatusCodeRanges());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package com.gotocompany.firehose.sink.http.factory;

import com.gotocompany.firehose.config.HttpSinkConfig;
import com.gotocompany.firehose.config.SerializerConfig;
import com.gotocompany.firehose.config.enums.HttpSinkDataFormatType;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
import com.gotocompany.firehose.serializer.JsonWrappedProtoByte;
import com.gotocompany.firehose.serializer.MessageSerializer;
import com.gotocompany.firehose.serializer.MessageToJson;
import com.gotocompany.firehose.serializer.MessageToTemplatizedJson;
import com.gotocompany.depot.metrics.StatsDReporter;
import com.gotocompany.firehose.serializer.TypecastedJsonSerializer;
import com.gotocompany.stencil.client.StencilClient;
import com.gotocompany.stencil.Parser;
import lombok.AllArgsConstructor;
Expand All @@ -19,6 +21,7 @@
public class SerializerFactory {

private HttpSinkConfig httpSinkConfig;
private SerializerConfig serializerConfig;
private StencilClient stencilClient;
private StatsDReporter statsDReporter;

Expand All @@ -34,10 +37,11 @@ public MessageSerializer build() {
Parser protoParser = stencilClient.getParser(httpSinkConfig.getInputSchemaProtoClass());
if (httpSinkConfig.getSinkHttpJsonBodyTemplate().isEmpty()) {
firehoseInstrumentation.logDebug("Serializer type: EsbMessageToJson", HttpSinkDataFormatType.JSON);
return new MessageToJson(protoParser, false, httpSinkConfig.getSinkHttpSimpleDateFormatEnable());
return getTypecastedJsonSerializer(new MessageToJson(protoParser, false, httpSinkConfig.getSinkHttpSimpleDateFormatEnable()));
} else {
firehoseInstrumentation.logDebug("Serializer type: EsbMessageToTemplatizedJson");
return MessageToTemplatizedJson.create(new FirehoseInstrumentation(statsDReporter, MessageToTemplatizedJson.class), httpSinkConfig.getSinkHttpJsonBodyTemplate(), protoParser);
return getTypecastedJsonSerializer(
MessageToTemplatizedJson.create(new FirehoseInstrumentation(statsDReporter, MessageToTemplatizedJson.class), httpSinkConfig.getSinkHttpJsonBodyTemplate(), protoParser));
}
}

Expand All @@ -48,6 +52,10 @@ public MessageSerializer build() {
return new JsonWrappedProtoByte();
}

private MessageSerializer getTypecastedJsonSerializer(MessageSerializer messageSerializer) {
return new TypecastedJsonSerializer(messageSerializer, serializerConfig);
}

private boolean isProtoSchemaEmpty() {
return httpSinkConfig.getInputSchemaProtoClass() == null || httpSinkConfig.getInputSchemaProtoClass().equals("");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import com.gotocompany.firehose.config.HttpSinkConfig;
import com.gotocompany.firehose.config.SerializerConfig;
import com.gotocompany.firehose.config.enums.HttpSinkRequestMethodType;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
import com.gotocompany.firehose.proto.ProtoToFieldMapper;
Expand Down Expand Up @@ -30,6 +31,7 @@
public class RequestFactory {

private HttpSinkConfig httpSinkConfig;
private SerializerConfig serializerConfig;
private UriParser uriParser;
private StencilClient stencilClient;
private StatsDReporter statsDReporter;
Expand All @@ -43,11 +45,12 @@ public class RequestFactory {
* @param stencilClient the stencil client
* @param uriParser the uri parser
*/
public RequestFactory(StatsDReporter statsDReporter, HttpSinkConfig httpSinkConfig, StencilClient stencilClient, UriParser uriParser) {
public RequestFactory(StatsDReporter statsDReporter, HttpSinkConfig httpSinkConfig, StencilClient stencilClient, UriParser uriParser, SerializerConfig serializerConfig) {
this.statsDReporter = statsDReporter;
this.stencilClient = stencilClient;
this.httpSinkConfig = httpSinkConfig;
this.uriParser = uriParser;
this.serializerConfig = serializerConfig;
firehoseInstrumentation = new FirehoseInstrumentation(this.statsDReporter, RequestFactory.class);
}

Expand Down Expand Up @@ -81,6 +84,7 @@ private ProtoToFieldMapper getProtoToFieldMapper() {
private JsonBody createBody() {
MessageSerializer messageSerializer = new SerializerFactory(
httpSinkConfig,
serializerConfig,
stencilClient,
statsDReporter)
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.gotocompany.firehose.converter;

import com.gotocompany.firehose.config.converter.SerializerConfigConverter;
import com.gotocompany.firehose.exception.JsonParseException;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

import java.util.Map;
import java.util.function.Function;

public class SerializerConfigConverterTest {

private final SerializerConfigConverter serializerConfigConverter = new SerializerConfigConverter();

@Test
public void convert_GivenValidJsonConfig_ShouldConvertToPropertyMap() {
String configJson = "[{\"jsonPath\": \"$.root.field\", \"type\": \"LONG\"}]";
String expectedPropertyMapKey = "$.root.field";

Map<String, Function<String, Object>> result = serializerConfigConverter.convert(null, configJson);
Function<String, Object> mapper = result.get(expectedPropertyMapKey);
Object mapperResult = mapper.apply("4");

Assertions.assertNotNull(mapper);
Assertions.assertTrue(mapperResult instanceof Long);
Assertions.assertEquals(4L, mapperResult);
}

@Test
public void convert_GivenInvalidJsonFormat_ShouldThrowJsonParseException() {
String malformedConfigJson = "[{\"jsonPath\": \"$.root.field\" \"type\": \"LONG\"";

Assertions.assertThrows(IllegalArgumentException.class,
() -> serializerConfigConverter.convert(null, malformedConfigJson));
}
}
Copy link

@sumitaich1998 sumitaich1998 Jun 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add these additional unit tests

@Test
    public void convertShouldHandleEmptyJsonConfig() {
        String emptyConfigJson = "[]";

        Map<String, Function<String, Object>> result = serializerConfigConverter.convert(null, emptyConfigJson);

        Assertions.assertTrue(result.isEmpty());
    }

    @Test
    public void convertShouldHandleNullJsonConfig() {
        String nullConfigJson = null;

        Map<String, Function<String, Object>> result = serializerConfigConverter.convert(null, nullConfigJson);

        Assertions.assertTrue(result.isEmpty());
    }

    @Test
    public void convertShouldThrowExceptionForUnsupportedDataType() {
        String unsupportedTypeConfigJson = "[{\"jsonPath\": \"$.root.field\", \"type\": \"UNSUPPORTED_TYPE\"}]";

        Assertions.assertThrows(IllegalArgumentException.class,
                () -> serializerConfigConverter.convert(null, unsupportedTypeConfigJson));
    }

    @Test
    public void convertShouldHandleMultipleValidConfigs() {
        String multipleConfigJson = "[{\"jsonPath\": \"$.root.field1\", \"type\": \"LONG\"}, {\"jsonPath\": \"$.root.field2\", \"type\": \"STRING\"}]";

        Map<String, Function<String, Object>> result = serializerConfigConverter.convert(null, multipleConfigJson);

        Function<String, Object> mapper1 = result.get("$.root.field1");
        Function<String, Object> mapper2 = result.get("$.root.field2");

        Assertions.assertNotNull(mapper1);
        Assertions.assertNotNull(mapper2);
        Assertions.assertTrue(mapper1.apply("4") instanceof Long);
        Assertions.assertTrue(mapper2.apply("test") instanceof String);
    }

    @Test
    public void convertShouldThrowExceptionForMissingFieldsInConfig() {
        String missingFieldsConfigJson = "[{\"jsonPath\": \"$.root.field\"}]";

        Assertions.assertThrows(IllegalArgumentException.class,
                () -> serializerConfigConverter.convert(null, missingFieldsConfigJson));
    }

Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.gotocompany.firehose.serializer;

import com.gotocompany.firehose.config.SerializerConfig;
import com.gotocompany.firehose.config.converter.SerializerConfigConverter;
import com.gotocompany.firehose.message.Message;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import net.minidev.json.JSONArray;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.mockito.Mock;
import org.mockito.Mockito;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.function.Function;

public class TypecastedJsonSerializerTest {

private static final String DEFAULT_JSON_MESSAGE = "{\"key\": \"value\", \"long\":\"1234568129012312\",\"nested\": {\"int\": \"1234\"}, \"double\": \"12.1\"}";
private static final String DEFAULT_PARAMETERS = "[{\"jsonPath\": \"$..int\", \"type\": \"INTEGER\"}, {\"jsonPath\": \"$..long\", \"type\": \"LONG\"}, {\"jsonPath\": \"$..double\", \"type\": \"DOUBLE\"}, {\"jsonPath\": \"$..unrecognizedPath\", \"type\": \"INTEGER\"}]";

private TypecastedJsonSerializer typecastedJsonSerializer;

@Mock
private MessageSerializer messageSerializer;

@Mock
private SerializerConfig serializerConfig;

private SerializerConfigConverter serializerConfigConverter = new SerializerConfigConverter();

@Before
public void setup() {
messageSerializer = Mockito.mock(MessageSerializer.class);
serializerConfig = Mockito.mock(SerializerConfig.class);
Map<String, Function<String, Object>> property = serializerConfigConverter.convert(null, DEFAULT_PARAMETERS);
Mockito.when(messageSerializer.serialize(Mockito.any())).thenReturn(DEFAULT_JSON_MESSAGE);
Mockito.when(serializerConfig.getJsonTypecastMapping()).thenReturn(property);
typecastedJsonSerializer = new TypecastedJsonSerializer(
messageSerializer, serializerConfig
);
}

@Test
public void serialize_GivenMessageWithQuoteWrappedNumber_ShouldCastToNumber() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change test method name to camelcase like shouldCastToNumberWhenGivenMessageWithQuoteWrappedNumber

String processedJsonString = typecastedJsonSerializer.serialize(buildMessage("key", DEFAULT_JSON_MESSAGE));
DocumentContext jsonPath = JsonPath.parse(processedJsonString);
JSONArray integerJsonArray = jsonPath.read("$..int");
JSONArray longJsonArray = jsonPath.read("$..long");
JSONArray doubleJsonArray = jsonPath.read("$..double");

Assertions.assertTrue(integerJsonArray.get(0) instanceof Integer);
Assertions.assertTrue(longJsonArray.get(0) instanceof Long);
Assertions.assertTrue(doubleJsonArray.get(0) instanceof Double);
Assertions.assertEquals(integerJsonArray.get(0), 1234);
Assertions.assertEquals(longJsonArray.get(0), 1234568129012312L);
Assertions.assertEquals(doubleJsonArray.get(0), 12.1);
}

@Test
public void serialize_GivenMessageWithNullMessage_ShouldIgnore() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change test method name to camelCase like shouldIgnoreWhenGivenMessageWithNullMessage

String jsonWithNullMappedValue = "{\"key\": \"value\", \"long\":null}";
Mockito.when(messageSerializer.serialize(Mockito.any())).thenReturn(jsonWithNullMappedValue);
String processedJsonString = typecastedJsonSerializer.serialize(buildMessage("key", jsonWithNullMappedValue));
ekawinataa marked this conversation as resolved.
Show resolved Hide resolved
DocumentContext jsonPath = JsonPath.parse(processedJsonString);
JSONArray fieldWithValue = jsonPath.read("$..key");
JSONArray integerJsonArray = jsonPath.read("$..long");

Assertions.assertEquals("value", fieldWithValue.get(0));
Assertions.assertNull(integerJsonArray.get(0));
}


ekawinataa marked this conversation as resolved.
Show resolved Hide resolved
private Message buildMessage(String key, String payload) {
return new Message(
key.getBytes(StandardCharsets.UTF_8),
payload.getBytes(StandardCharsets.UTF_8),
"topic",
1,
1
);
}

}
Loading
Loading