Skip to content

Commit

Permalink
Add JSON Field Typecast Support (#39)
Browse files Browse the repository at this point in the history
* Add typecast functionality

* Add default value

* directly use map function

* Rename class

* Add unit test for newly added class

* Update unit test

* cleanup

* Update serializer to handle exception for not found path

* Add double type casting

* Rename converter class

* Rename enum

* Change method name

* Update test

* rethrow as illegal argument exception

* Rename classes and methods

* Add comment

* Add comment and SerializerConfig default value

* Rename field

* Apply null check before mapping

* Use formatter

* Add configuration option to ignore unknown path instead of throwing error

* Rename test to match the rule and remove unused import

* update styling

* Add more testcases and separate testcases

* Add test when message is unparseable

* Remove separate config, use existing HttpConfig

* Handle edge cases for SerializerConfigConverter

* Add additional unit test for TypecastedJsonSerializerTest

* bump version

* add example for SINK_HTTP_SERIALIZER_JSON_TYPECAST

* only bump version

* Move typecasting docs from generic.md to http-sink.md

* Rename converter to be more fitting

* rename config method name

* Update the loop to use lambda

* Remove unused import

* Add more test

* Add test for Numeric to String typecast
  • Loading branch information
ekawinataa authored Jun 20, 2024
1 parent 7d3ac95 commit ff9e2a9
Show file tree
Hide file tree
Showing 12 changed files with 535 additions and 4 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lombok {
}

group 'com.gotocompany'
version '0.10.1'
version '0.10.2'

def projName = "firehose"

Expand Down
Binary file modified docs/docs/advance/generic.md
Binary file not shown.
9 changes: 9 additions & 0 deletions docs/docs/sinks/http-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,12 @@ This config if set to `true` will enable the simple date format (Eg. - `May 3, 2
- Example value: `false`
- Type: `optional`
- Default value: `true`

### `SINK_HTTP_SERIALIZER_JSON_TYPECAST`

Defines the mapping for field typecasting of the resulting JSON Serialization. This configuration could accept multiple mappings for multiple JSON Path
Currently supported typecasting target: DOUBLE, INTEGER, LONG, STRING

- Example value: `[{"jsonPath": "$.root.someIntegerField", "type": "INTEGER"}, {"jsonPath": "$..[*].doubleField", "type": "DOUBLE"}]`
- Type: `optional`
- Default Value: `[]`
1 change: 1 addition & 0 deletions env/local.properties
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ SOURCE_KAFKA_CONSUMER_GROUP_ID=sample-group-id
# SINK_HTTP_OAUTH2_CLIENT_NAME=client-name
# SINK_HTTP_OAUTH2_CLIENT_SECRET=client-secret
# SINK_HTTP_OAUTH2_SCOPE=User:read, sys:info
# SINK_HTTP_SERIALIZER_JSON_TYPECAST=[{"jsonPath": "$.root.field1", "type": "INTEGER"},{"jsonPath": "$.root.field2", "type": "INTEGER"},{"jsonPath": "$..field3", "type": "INTEGER"},{"jsonPath": "$..[*].field4", "type": "DOUBLE"}]
#
#
#############################################
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.gotocompany.firehose.config;

import com.gotocompany.firehose.config.converter.HttpSinkSerializerJsonTypecastConfigConverter;
import com.gotocompany.firehose.config.enums.HttpSinkDataFormatType;
import com.gotocompany.firehose.config.enums.HttpSinkParameterPlacementType;
import com.gotocompany.firehose.config.enums.HttpSinkParameterSourceType;
Expand All @@ -11,6 +12,7 @@
import com.gotocompany.firehose.config.converter.RangeToHashMapConverter;

import java.util.Map;
import java.util.function.Function;

public interface HttpSinkConfig extends AppConfig {

Expand Down Expand Up @@ -94,4 +96,9 @@ public interface HttpSinkConfig extends AppConfig {
@DefaultValue("true")
Boolean getSinkHttpSimpleDateFormatEnable();

@Key("SINK_HTTP_SERIALIZER_JSON_TYPECAST")
@ConverterClass(HttpSinkSerializerJsonTypecastConfigConverter.class)
@DefaultValue("[]")
Map<String, Function<String, Object>> getSinkHttpSerializerJsonTypecast();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.gotocompany.firehose.config.converter;

import com.gotocompany.firehose.serializer.constant.TypecastTarget;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.aeonbits.owner.Converter;
import org.apache.commons.lang3.StringUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

public class HttpSinkSerializerJsonTypecastConfigConverter implements Converter<Map<String, Function<String, Object>>> {

private final ObjectMapper objectMapper;

public HttpSinkSerializerJsonTypecastConfigConverter() {
this.objectMapper = new ObjectMapper();
}

@Override
public Map<String, Function<String, Object>> convert(Method method, String input) {
if (StringUtils.isBlank(input)) {
return Collections.emptyMap();
}
try {
List<JsonTypecast> jsonTypecasts = objectMapper.readValue(input, new TypeReference<List<JsonTypecast>>() {
});
validate(jsonTypecasts);
return jsonTypecasts.stream()
.collect(Collectors.toMap(JsonTypecast::getJsonPath, jsonTypecast -> jsonTypecast.getType()::cast));
} catch (IOException e) {
throw new IllegalArgumentException("Error when parsing serializer json config: " + e.getMessage(), e);
}
}

private void validate(List<JsonTypecast> jsonTypecasts) {
boolean invalidConfigurationExist = jsonTypecasts.stream()
.anyMatch(jt -> Objects.isNull(jt.getJsonPath()) || Objects.isNull(jt.getType()));
if (invalidConfigurationExist) {
throw new IllegalArgumentException("Invalid configuration: jsonPath or type should not be null");
}
}

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
private static class JsonTypecast {
private String jsonPath;
private TypecastTarget type;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.gotocompany.firehose.serializer;

import com.gotocompany.firehose.config.HttpSinkConfig;
import com.gotocompany.firehose.exception.DeserializerException;
import com.gotocompany.firehose.message.Message;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import lombok.extern.slf4j.Slf4j;

import java.util.Optional;

/***
* MessageSerializer wrapping other JSON MessageSerializer which add capability to typecast some of the fields from the inner serializer.
*/
@Slf4j
public class TypecastedJsonSerializer implements MessageSerializer {

private final MessageSerializer messageSerializer;
private final HttpSinkConfig httpSinkConfig;
private final Configuration jsonPathConfiguration;

/**
* Constructor for TypecastedJsonSerializer.
*
* @param messageSerializer the inner serializer to be wrapped
* @param httpSinkConfig the HTTP Sink config configuration containing typecasting parameters,
* where each map entry contains a JSON path and the desired type
*/
public TypecastedJsonSerializer(MessageSerializer messageSerializer,
HttpSinkConfig httpSinkConfig) {
this.messageSerializer = messageSerializer;
this.httpSinkConfig = httpSinkConfig;
this.jsonPathConfiguration = Configuration.builder()
.options(Option.SUPPRESS_EXCEPTIONS)
.build();
}

/**
* Serializes the given message, then applies typecasting to specified fields in the resulting JSON.
*
* @param message the message to be serialized
* @return the serialized and typecasted JSON string
* @throws DeserializerException if an error occurs during serialization or typecasting
*/
@Override
public String serialize(Message message) throws DeserializerException {
String jsonString = messageSerializer.serialize(message);
DocumentContext documentContext = JsonPath
.using(jsonPathConfiguration)
.parse(jsonString);
httpSinkConfig.getSinkHttpSerializerJsonTypecast()
.forEach((jsonPath, typecastFunction) -> documentContext.map(jsonPath,
(currentValue, configuration) -> Optional.ofNullable(currentValue)
.map(v -> typecastFunction.apply(v.toString()))
.orElse(null)
));
return documentContext.jsonString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.gotocompany.firehose.serializer.constant;

public enum TypecastTarget {
INTEGER {
@Override
public Object cast(String input) {
try {
return Integer.valueOf(input);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid input for INTEGER: " + input, e);
}
}
}, LONG {
@Override
public Object cast(String input) {
try {
return Long.valueOf(input);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid input for LONG: " + input, e);
}
}
}, DOUBLE {
@Override
public Object cast(String input) {
try {
return Double.valueOf(input);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid input for DOUBLE: " + input, e);
}
}
}, STRING {
@Override
public Object cast(String input) {
return String.valueOf(input);
}
};

public abstract Object cast(String input);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class HttpSinkFactory {
*/
public static AbstractSink create(Map<String, String> configuration, StatsDReporter statsDReporter, StencilClient stencilClient) {
HttpSinkConfig httpSinkConfig = ConfigFactory.create(HttpSinkConfig.class, configuration);

FirehoseInstrumentation firehoseInstrumentation = new FirehoseInstrumentation(statsDReporter, HttpSinkFactory.class);

CloseableHttpClient closeableHttpClient = newHttpClient(httpSinkConfig, statsDReporter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.gotocompany.firehose.serializer.MessageToJson;
import com.gotocompany.firehose.serializer.MessageToTemplatizedJson;
import com.gotocompany.depot.metrics.StatsDReporter;
import com.gotocompany.firehose.serializer.TypecastedJsonSerializer;
import com.gotocompany.stencil.client.StencilClient;
import com.gotocompany.stencil.Parser;
import lombok.AllArgsConstructor;
Expand All @@ -34,10 +35,11 @@ public MessageSerializer build() {
Parser protoParser = stencilClient.getParser(httpSinkConfig.getInputSchemaProtoClass());
if (httpSinkConfig.getSinkHttpJsonBodyTemplate().isEmpty()) {
firehoseInstrumentation.logDebug("Serializer type: EsbMessageToJson", HttpSinkDataFormatType.JSON);
return new MessageToJson(protoParser, false, httpSinkConfig.getSinkHttpSimpleDateFormatEnable());
return getTypecastedJsonSerializer(new MessageToJson(protoParser, false, httpSinkConfig.getSinkHttpSimpleDateFormatEnable()));
} else {
firehoseInstrumentation.logDebug("Serializer type: EsbMessageToTemplatizedJson");
return MessageToTemplatizedJson.create(new FirehoseInstrumentation(statsDReporter, MessageToTemplatizedJson.class), httpSinkConfig.getSinkHttpJsonBodyTemplate(), protoParser);
return getTypecastedJsonSerializer(
MessageToTemplatizedJson.create(new FirehoseInstrumentation(statsDReporter, MessageToTemplatizedJson.class), httpSinkConfig.getSinkHttpJsonBodyTemplate(), protoParser));
}
}

Expand All @@ -48,6 +50,10 @@ public MessageSerializer build() {
return new JsonWrappedProtoByte();
}

private MessageSerializer getTypecastedJsonSerializer(MessageSerializer messageSerializer) {
return new TypecastedJsonSerializer(messageSerializer, httpSinkConfig);
}

private boolean isProtoSchemaEmpty() {
return httpSinkConfig.getInputSchemaProtoClass() == null || httpSinkConfig.getInputSchemaProtoClass().equals("");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.gotocompany.firehose.converter;

import com.gotocompany.firehose.config.converter.HttpSinkSerializerJsonTypecastConfigConverter;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

import java.util.Map;
import java.util.function.Function;

public class HttpSinkSerializerJsonTypecastConfigConverterTest {

private final HttpSinkSerializerJsonTypecastConfigConverter httpSinkSerializerJsonTypecastConfigConverter = new HttpSinkSerializerJsonTypecastConfigConverter();

@Test
public void convertShouldConvertToPropertyMapWhenValidJsonConfig() {
String configJson = "[{\"jsonPath\": \"$.root.field\", \"type\": \"LONG\"}]";
String expectedPropertyMapKey = "$.root.field";

Map<String, Function<String, Object>> result = httpSinkSerializerJsonTypecastConfigConverter.convert(null, configJson);
Function<String, Object> mapper = result.get(expectedPropertyMapKey);
Object mapperResult = mapper.apply("4");

Assertions.assertNotNull(mapper);
Assertions.assertTrue(mapperResult instanceof Long);
Assertions.assertEquals(4L, mapperResult);
}

@Test
public void convertShouldThrowJsonParseExceptionWhenInvalidJsonFormatProvided() {
String malformedConfigJson = "[{\"jsonPath\": \"$.root.field\" \"type\": \"LONG\"";

Assertions.assertThrows(IllegalArgumentException.class,
() -> httpSinkSerializerJsonTypecastConfigConverter.convert(null, malformedConfigJson));
}

@Test
public void convertShouldThrowJsonParseExceptionWhenUnregisteredTypecastingProvided() {
String malformedConfigJson = "[{\"jsonPath\": \"$.root.field\", \"type\": \"BIG_INTEGER\"}]";

Assertions.assertThrows(IllegalArgumentException.class,
() -> httpSinkSerializerJsonTypecastConfigConverter.convert(null, malformedConfigJson));
}

@Test
public void convertShouldHandleEmptyJsonConfig() {
String emptyConfigJson = "[]";

Map<String, Function<String, Object>> result = httpSinkSerializerJsonTypecastConfigConverter.convert(null, emptyConfigJson);

Assertions.assertTrue(result.isEmpty());
}

@Test
public void convertShouldHandleNullJsonConfig() {
String nullConfigJson = null;

Map<String, Function<String, Object>> result = httpSinkSerializerJsonTypecastConfigConverter.convert(null, nullConfigJson);

Assertions.assertTrue(result.isEmpty());
}

@Test
public void convertShouldThrowExceptionForUnsupportedDataType() {
String unsupportedTypeConfigJson = "[{\"jsonPath\": \"$.root.field\", \"type\": \"UNSUPPORTED_TYPE\"}]";

Assertions.assertThrows(IllegalArgumentException.class,
() -> httpSinkSerializerJsonTypecastConfigConverter.convert(null, unsupportedTypeConfigJson));
}

@Test
public void convertShouldHandleMultipleValidConfigs() {
String multipleConfigJson = "[{\"jsonPath\": \"$.root.field1\", \"type\": \"LONG\"}, {\"jsonPath\": \"$.root.field2\", \"type\": \"STRING\"}]";

Map<String, Function<String, Object>> result = httpSinkSerializerJsonTypecastConfigConverter.convert(null, multipleConfigJson);
Function<String, Object> mapper1 = result.get("$.root.field1");
Function<String, Object> mapper2 = result.get("$.root.field2");

Assertions.assertNotNull(mapper1);
Assertions.assertNotNull(mapper2);
Assertions.assertTrue(mapper1.apply("4") instanceof Long);
Assertions.assertTrue(mapper2.apply("test") instanceof String);
}

@Test
public void convertShouldThrowExceptionForMissingFieldsInConfig() {
String missingFieldsConfigJson = "[{\"jsonPath\": \"$.root.field\"}]";

Assertions.assertThrows(IllegalArgumentException.class,
() -> httpSinkSerializerJsonTypecastConfigConverter.convert(null, missingFieldsConfigJson));
}
}
Loading

0 comments on commit ff9e2a9

Please sign in to comment.