Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JSON Field Typecast Support #39

Merged
merged 39 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
cc6eb6e
Add typecast functionality
ekawinataa Jun 13, 2024
f896b71
Add default value
ekawinataa Jun 13, 2024
7688ecf
directly use map function
ekawinataa Jun 13, 2024
698ca06
Rename class
ekawinataa Jun 13, 2024
2bac0d7
Add unit test for newly added class
ekawinataa Jun 14, 2024
66c7571
Update unit test
ekawinataa Jun 14, 2024
7971ae1
Merge branch 'main' of github.com:goto/firehose into add-typecast-fun…
ekawinataa Jun 14, 2024
cf33578
cleanup
ekawinataa Jun 14, 2024
199ca9d
Update serializer to handle exception for not found path
ekawinataa Jun 14, 2024
c1744c8
Add double type casting
ekawinataa Jun 14, 2024
713290c
Rename converter class
ekawinataa Jun 14, 2024
6672736
Rename enum
ekawinataa Jun 14, 2024
998c366
Change method name
ekawinataa Jun 14, 2024
6d65e89
Update test
ekawinataa Jun 14, 2024
17c30dd
rethrow as illegal argument exception
ekawinataa Jun 14, 2024
ca31eac
Rename classes and methods
ekawinataa Jun 14, 2024
7684284
Add comment
ekawinataa Jun 14, 2024
8277258
Add comment and SerializerConfig default value
ekawinataa Jun 14, 2024
11c2a6a
Rename field
ekawinataa Jun 14, 2024
f99175b
Apply null check before mapping
ekawinataa Jun 17, 2024
eb30be4
Use formatter
ekawinataa Jun 17, 2024
0f7447a
Add configuration option to ignore unknown path instead of throwing e…
ekawinataa Jun 17, 2024
40d1fc9
Rename test to match the rule and remove unused import
ekawinataa Jun 19, 2024
b40bd2a
update styling
ekawinataa Jun 19, 2024
a9ee36b
Add more testcases and separate testcases
ekawinataa Jun 19, 2024
c615d69
Add test when message is unparseable
ekawinataa Jun 19, 2024
2b0a72e
Remove separate config, use existing HttpConfig
ekawinataa Jun 19, 2024
03a76d7
Handle edge cases for SerializerConfigConverter
ekawinataa Jun 19, 2024
09af216
Add additional unit test for TypecastedJsonSerializerTest
ekawinataa Jun 19, 2024
40b3203
bump version
ekawinataa Jun 19, 2024
9900d48
add example for SINK_HTTP_SERIALIZER_JSON_TYPECAST
ekawinataa Jun 19, 2024
ad51bb7
only bump version
ekawinataa Jun 19, 2024
92189ab
Move typecasting docs from generic.md to http-sink.md
ekawinataa Jun 19, 2024
0b7e275
Rename converter to be more fitting
ekawinataa Jun 19, 2024
8ad587a
rename config method name
ekawinataa Jun 19, 2024
b4f9041
Update the loop to use lambda
ekawinataa Jun 19, 2024
eca5e7a
Remove unused import
ekawinataa Jun 19, 2024
1793e7c
Add more test
ekawinataa Jun 19, 2024
9b6f860
Add test for Numeric to String typecast
ekawinataa Jun 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lombok {
}

group 'com.gotocompany'
version '0.10.1'
version '0.10.2'

def projName = "firehose"

Expand Down
Binary file modified docs/docs/advance/generic.md
Binary file not shown.
9 changes: 9 additions & 0 deletions docs/docs/sinks/http-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,12 @@ This config if set to `true` will enable the simple date format (Eg. - `May 3, 2
- Example value: `false`
- Type: `optional`
- Default value: `true`

### `SINK_HTTP_SERIALIZER_JSON_TYPECAST`

Defines the mapping for field typecasting of the resulting JSON Serialization. This configuration could accept multiple mappings for multiple JSON Path
Currently supported typecasting target: DOUBLE, INTEGER, LONG, STRING

- Example value: `[{"jsonPath": "$.root.someIntegerField", "type": "INTEGER"}, {"jsonPath": "$..[*].doubleField", "type": "DOUBLE"}]`
- Type: `optional`
- Default Value: `[]`
1 change: 1 addition & 0 deletions env/local.properties
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ SOURCE_KAFKA_CONSUMER_GROUP_ID=sample-group-id
# SINK_HTTP_OAUTH2_CLIENT_NAME=client-name
# SINK_HTTP_OAUTH2_CLIENT_SECRET=client-secret
# SINK_HTTP_OAUTH2_SCOPE=User:read, sys:info
# SINK_HTTP_SERIALIZER_JSON_TYPECAST=[{"jsonPath": "$.root.field1", "type": "INTEGER"},{"jsonPath": "$.root.field2", "type": "INTEGER"},{"jsonPath": "$..field3", "type": "INTEGER"},{"jsonPath": "$..[*].field4", "type": "DOUBLE"}]
#
#
#############################################
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.gotocompany.firehose.config;

import com.gotocompany.firehose.config.converter.HttpSinkSerializerJsonTypecastConfigConverter;
import com.gotocompany.firehose.config.enums.HttpSinkDataFormatType;
import com.gotocompany.firehose.config.enums.HttpSinkParameterPlacementType;
import com.gotocompany.firehose.config.enums.HttpSinkParameterSourceType;
Expand All @@ -11,6 +12,7 @@
import com.gotocompany.firehose.config.converter.RangeToHashMapConverter;

import java.util.Map;
import java.util.function.Function;

public interface HttpSinkConfig extends AppConfig {

Expand Down Expand Up @@ -94,4 +96,9 @@ public interface HttpSinkConfig extends AppConfig {
@DefaultValue("true")
Boolean getSinkHttpSimpleDateFormatEnable();

@Key("SINK_HTTP_SERIALIZER_JSON_TYPECAST")
@ConverterClass(HttpSinkSerializerJsonTypecastConfigConverter.class)
@DefaultValue("[]")
Map<String, Function<String, Object>> getSinkHttpSerializerJsonTypecast();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.gotocompany.firehose.config.converter;

import com.gotocompany.firehose.serializer.constant.TypecastTarget;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.aeonbits.owner.Converter;
import org.apache.commons.lang3.StringUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

public class HttpSinkSerializerJsonTypecastConfigConverter implements Converter<Map<String, Function<String, Object>>> {

private final ObjectMapper objectMapper;

public HttpSinkSerializerJsonTypecastConfigConverter() {
this.objectMapper = new ObjectMapper();
}

@Override
public Map<String, Function<String, Object>> convert(Method method, String input) {
if (StringUtils.isBlank(input)) {
return Collections.emptyMap();
}
try {
List<JsonTypecast> jsonTypecasts = objectMapper.readValue(input, new TypeReference<List<JsonTypecast>>() {
});
validate(jsonTypecasts);
return jsonTypecasts.stream()
.collect(Collectors.toMap(JsonTypecast::getJsonPath, jsonTypecast -> jsonTypecast.getType()::cast));
} catch (IOException e) {
throw new IllegalArgumentException("Error when parsing serializer json config: " + e.getMessage(), e);
}
}

private void validate(List<JsonTypecast> jsonTypecasts) {
boolean invalidConfigurationExist = jsonTypecasts.stream()
.anyMatch(jt -> Objects.isNull(jt.getJsonPath()) || Objects.isNull(jt.getType()));
if (invalidConfigurationExist) {
throw new IllegalArgumentException("Invalid configuration: jsonPath or type should not be null");
}
}

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
private static class JsonTypecast {
private String jsonPath;
private TypecastTarget type;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.gotocompany.firehose.serializer;

import com.gotocompany.firehose.config.HttpSinkConfig;
import com.gotocompany.firehose.exception.DeserializerException;
import com.gotocompany.firehose.message.Message;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import lombok.extern.slf4j.Slf4j;

import java.util.Optional;

/***
* MessageSerializer wrapping other JSON MessageSerializer which add capability to typecast some of the fields from the inner serializer.
*/
@Slf4j
public class TypecastedJsonSerializer implements MessageSerializer {

private final MessageSerializer messageSerializer;
private final HttpSinkConfig httpSinkConfig;
private final Configuration jsonPathConfiguration;

/**
* Constructor for TypecastedJsonSerializer.
*
* @param messageSerializer the inner serializer to be wrapped
* @param httpSinkConfig the HTTP Sink config configuration containing typecasting parameters,
* where each map entry contains a JSON path and the desired type
*/
public TypecastedJsonSerializer(MessageSerializer messageSerializer,
HttpSinkConfig httpSinkConfig) {
this.messageSerializer = messageSerializer;
this.httpSinkConfig = httpSinkConfig;
this.jsonPathConfiguration = Configuration.builder()
.options(Option.SUPPRESS_EXCEPTIONS)
.build();
}

/**
* Serializes the given message, then applies typecasting to specified fields in the resulting JSON.
*
* @param message the message to be serialized
* @return the serialized and typecasted JSON string
* @throws DeserializerException if an error occurs during serialization or typecasting
*/
@Override
public String serialize(Message message) throws DeserializerException {
String jsonString = messageSerializer.serialize(message);
DocumentContext documentContext = JsonPath
.using(jsonPathConfiguration)
.parse(jsonString);
httpSinkConfig.getSinkHttpSerializerJsonTypecast()
.forEach((jsonPath, typecastFunction) -> documentContext.map(jsonPath,
(currentValue, configuration) -> Optional.ofNullable(currentValue)
.map(v -> typecastFunction.apply(v.toString()))
.orElse(null)
));
return documentContext.jsonString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.gotocompany.firehose.serializer.constant;

public enum TypecastTarget {
INTEGER {
@Override
public Object cast(String input) {
try {
return Integer.valueOf(input);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid input for INTEGER: " + input, e);
}
}
}, LONG {
@Override
public Object cast(String input) {
try {
return Long.valueOf(input);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid input for LONG: " + input, e);
}
}
}, DOUBLE {
@Override
public Object cast(String input) {
try {
return Double.valueOf(input);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid input for DOUBLE: " + input, e);
}
}
}, STRING {
@Override
public Object cast(String input) {
return String.valueOf(input);
}
};

public abstract Object cast(String input);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class HttpSinkFactory {
*/
public static AbstractSink create(Map<String, String> configuration, StatsDReporter statsDReporter, StencilClient stencilClient) {
HttpSinkConfig httpSinkConfig = ConfigFactory.create(HttpSinkConfig.class, configuration);

FirehoseInstrumentation firehoseInstrumentation = new FirehoseInstrumentation(statsDReporter, HttpSinkFactory.class);

CloseableHttpClient closeableHttpClient = newHttpClient(httpSinkConfig, statsDReporter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.gotocompany.firehose.serializer.MessageToJson;
import com.gotocompany.firehose.serializer.MessageToTemplatizedJson;
import com.gotocompany.depot.metrics.StatsDReporter;
import com.gotocompany.firehose.serializer.TypecastedJsonSerializer;
import com.gotocompany.stencil.client.StencilClient;
import com.gotocompany.stencil.Parser;
import lombok.AllArgsConstructor;
Expand All @@ -34,10 +35,11 @@ public MessageSerializer build() {
Parser protoParser = stencilClient.getParser(httpSinkConfig.getInputSchemaProtoClass());
if (httpSinkConfig.getSinkHttpJsonBodyTemplate().isEmpty()) {
firehoseInstrumentation.logDebug("Serializer type: EsbMessageToJson", HttpSinkDataFormatType.JSON);
return new MessageToJson(protoParser, false, httpSinkConfig.getSinkHttpSimpleDateFormatEnable());
return getTypecastedJsonSerializer(new MessageToJson(protoParser, false, httpSinkConfig.getSinkHttpSimpleDateFormatEnable()));
} else {
firehoseInstrumentation.logDebug("Serializer type: EsbMessageToTemplatizedJson");
return MessageToTemplatizedJson.create(new FirehoseInstrumentation(statsDReporter, MessageToTemplatizedJson.class), httpSinkConfig.getSinkHttpJsonBodyTemplate(), protoParser);
return getTypecastedJsonSerializer(
MessageToTemplatizedJson.create(new FirehoseInstrumentation(statsDReporter, MessageToTemplatizedJson.class), httpSinkConfig.getSinkHttpJsonBodyTemplate(), protoParser));
}
}

Expand All @@ -48,6 +50,10 @@ public MessageSerializer build() {
return new JsonWrappedProtoByte();
}

private MessageSerializer getTypecastedJsonSerializer(MessageSerializer messageSerializer) {
return new TypecastedJsonSerializer(messageSerializer, httpSinkConfig);
}

private boolean isProtoSchemaEmpty() {
return httpSinkConfig.getInputSchemaProtoClass() == null || httpSinkConfig.getInputSchemaProtoClass().equals("");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.gotocompany.firehose.converter;

import com.gotocompany.firehose.config.converter.HttpSinkSerializerJsonTypecastConfigConverter;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

import java.util.Map;
import java.util.function.Function;

public class HttpSinkSerializerJsonTypecastConfigConverterTest {

private final HttpSinkSerializerJsonTypecastConfigConverter httpSinkSerializerJsonTypecastConfigConverter = new HttpSinkSerializerJsonTypecastConfigConverter();

@Test
public void convertShouldConvertToPropertyMapWhenValidJsonConfig() {
String configJson = "[{\"jsonPath\": \"$.root.field\", \"type\": \"LONG\"}]";
String expectedPropertyMapKey = "$.root.field";

Map<String, Function<String, Object>> result = httpSinkSerializerJsonTypecastConfigConverter.convert(null, configJson);
Function<String, Object> mapper = result.get(expectedPropertyMapKey);
Object mapperResult = mapper.apply("4");

Assertions.assertNotNull(mapper);
Assertions.assertTrue(mapperResult instanceof Long);
Assertions.assertEquals(4L, mapperResult);
}

@Test
public void convertShouldThrowJsonParseExceptionWhenInvalidJsonFormatProvided() {
String malformedConfigJson = "[{\"jsonPath\": \"$.root.field\" \"type\": \"LONG\"";

Assertions.assertThrows(IllegalArgumentException.class,
() -> httpSinkSerializerJsonTypecastConfigConverter.convert(null, malformedConfigJson));
}

@Test
public void convertShouldThrowJsonParseExceptionWhenUnregisteredTypecastingProvided() {
String malformedConfigJson = "[{\"jsonPath\": \"$.root.field\", \"type\": \"BIG_INTEGER\"}]";

Assertions.assertThrows(IllegalArgumentException.class,
() -> httpSinkSerializerJsonTypecastConfigConverter.convert(null, malformedConfigJson));
}

@Test
public void convertShouldHandleEmptyJsonConfig() {
String emptyConfigJson = "[]";

Map<String, Function<String, Object>> result = httpSinkSerializerJsonTypecastConfigConverter.convert(null, emptyConfigJson);

Assertions.assertTrue(result.isEmpty());
}

@Test
public void convertShouldHandleNullJsonConfig() {
String nullConfigJson = null;

Map<String, Function<String, Object>> result = httpSinkSerializerJsonTypecastConfigConverter.convert(null, nullConfigJson);

Assertions.assertTrue(result.isEmpty());
}

@Test
public void convertShouldThrowExceptionForUnsupportedDataType() {
String unsupportedTypeConfigJson = "[{\"jsonPath\": \"$.root.field\", \"type\": \"UNSUPPORTED_TYPE\"}]";

Assertions.assertThrows(IllegalArgumentException.class,
() -> httpSinkSerializerJsonTypecastConfigConverter.convert(null, unsupportedTypeConfigJson));
}

@Test
public void convertShouldHandleMultipleValidConfigs() {
String multipleConfigJson = "[{\"jsonPath\": \"$.root.field1\", \"type\": \"LONG\"}, {\"jsonPath\": \"$.root.field2\", \"type\": \"STRING\"}]";

Map<String, Function<String, Object>> result = httpSinkSerializerJsonTypecastConfigConverter.convert(null, multipleConfigJson);
Function<String, Object> mapper1 = result.get("$.root.field1");
Function<String, Object> mapper2 = result.get("$.root.field2");

Assertions.assertNotNull(mapper1);
Assertions.assertNotNull(mapper2);
Assertions.assertTrue(mapper1.apply("4") instanceof Long);
Assertions.assertTrue(mapper2.apply("test") instanceof String);
}

@Test
public void convertShouldThrowExceptionForMissingFieldsInConfig() {
String missingFieldsConfigJson = "[{\"jsonPath\": \"$.root.field\"}]";

Assertions.assertThrows(IllegalArgumentException.class,
() -> httpSinkSerializerJsonTypecastConfigConverter.convert(null, missingFieldsConfigJson));
}
}
Loading
Loading