-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add JSON Field Typecast Support #39
Add JSON Field Typecast Support #39
Conversation
} | ||
|
||
@Test | ||
public void serialize_GivenMessageWithNullMessage_ShouldIgnore() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change test method name to camelCase like shouldIgnoreWhenGivenMessageWithNullMessage
} | ||
|
||
@Test | ||
public void serialize_GivenMessageWithQuoteWrappedNumber_ShouldCastToNumber() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change test method name to camelcase like shouldCastToNumberWhenGivenMessageWithQuoteWrappedNumber
src/test/java/com/gotocompany/firehose/serializer/TypecastedJsonSerializerTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
build is failing due to checkstyle errors
Assertions.assertThrows(IllegalArgumentException.class, | ||
() -> serializerConfigConverter.convert(null, malformedConfigJson)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add these additional unit tests
@Test
public void convertShouldHandleEmptyJsonConfig() {
String emptyConfigJson = "[]";
Map<String, Function<String, Object>> result = serializerConfigConverter.convert(null, emptyConfigJson);
Assertions.assertTrue(result.isEmpty());
}
@Test
public void convertShouldHandleNullJsonConfig() {
String nullConfigJson = null;
Map<String, Function<String, Object>> result = serializerConfigConverter.convert(null, nullConfigJson);
Assertions.assertTrue(result.isEmpty());
}
@Test
public void convertShouldThrowExceptionForUnsupportedDataType() {
String unsupportedTypeConfigJson = "[{\"jsonPath\": \"$.root.field\", \"type\": \"UNSUPPORTED_TYPE\"}]";
Assertions.assertThrows(IllegalArgumentException.class,
() -> serializerConfigConverter.convert(null, unsupportedTypeConfigJson));
}
@Test
public void convertShouldHandleMultipleValidConfigs() {
String multipleConfigJson = "[{\"jsonPath\": \"$.root.field1\", \"type\": \"LONG\"}, {\"jsonPath\": \"$.root.field2\", \"type\": \"STRING\"}]";
Map<String, Function<String, Object>> result = serializerConfigConverter.convert(null, multipleConfigJson);
Function<String, Object> mapper1 = result.get("$.root.field1");
Function<String, Object> mapper2 = result.get("$.root.field2");
Assertions.assertNotNull(mapper1);
Assertions.assertNotNull(mapper2);
Assertions.assertTrue(mapper1.apply("4") instanceof Long);
Assertions.assertTrue(mapper2.apply("test") instanceof String);
}
@Test
public void convertShouldThrowExceptionForMissingFieldsInConfig() {
String missingFieldsConfigJson = "[{\"jsonPath\": \"$.root.field\"}]";
Assertions.assertThrows(IllegalArgumentException.class,
() -> serializerConfigConverter.convert(null, missingFieldsConfigJson));
}
src/test/java/com/gotocompany/firehose/serializer/TypecastedJsonSerializerTest.java
Show resolved
Hide resolved
src/test/java/com/gotocompany/firehose/serializer/TypecastedJsonSerializerTest.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Test | ||
public void serializeShouldIgnoreWhenGivenNullMessageValue() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove serialize from the starting of the test name , test name should with "should"
} | ||
|
||
@Test | ||
public void serializeShouldCastToNumberWhenGivenMessageWithQuoteWrappedNumberAndMatchingJsonPathConfiguration() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove serialize from the starting of the test name , test name should with "should"
} | ||
|
||
@Test | ||
public void serializeShouldReturnMessageAsItIsWhenNoJsonPathConfigurationGiven() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove serialize from the starting of the test name , test name should with "should"
import java.util.Map; | ||
import java.util.function.Function; | ||
|
||
public interface SerializerConfig extends Config { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"There is no need for this separate interface. Can you move this configuration to HttpSinkConfig?"
import java.util.function.Function; | ||
|
||
public interface SerializerConfig extends Config { | ||
@Config.Key("SERIALIZER_JSON_TYPECAST") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after moving you can rename this config and prefix "SINK_HTTP_" to it
src/test/java/com/gotocompany/firehose/serializer/TypecastedJsonSerializerTest.java
Show resolved
Hide resolved
@@ -65,7 +69,7 @@ public void shouldReturnParameterizedRequstWhenParameterSourceIsNotDisableAndPla | |||
configuration.put("SINK_HTTP_SERVICE_URL", "http://127.0.0.1:1080/api,%s"); | |||
httpSinkConfig = ConfigFactory.create(HttpSinkConfig.class, configuration); | |||
|
|||
Request request = new RequestFactory(statsDReporter, httpSinkConfig, stencilClient, uriParser).createRequest(); | |||
Request request = new RequestFactory(statsDReporter, httpSinkConfig, stencilClient, uriParser, serializerConfig).createRequest(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this serializerConfig after you move the config to the HttpSinkConfig class
FirehoseInstrumentation firehoseInstrumentation = new FirehoseInstrumentation(statsDReporter, HttpSinkFactory.class); | ||
|
||
CloseableHttpClient closeableHttpClient = newHttpClient(httpSinkConfig, statsDReporter); | ||
firehoseInstrumentation.logInfo("HTTP connection established"); | ||
|
||
UriParser uriParser = new UriParser(stencilClient.getParser(httpSinkConfig.getInputSchemaProtoClass()), httpSinkConfig.getKafkaRecordParserMode()); | ||
|
||
Request request = new RequestFactory(statsDReporter, httpSinkConfig, stencilClient, uriParser).createRequest(); | ||
Request request = new RequestFactory(statsDReporter, httpSinkConfig, stencilClient, uriParser, serializerConfig).createRequest(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this serializerConfig parameter after you move the config to the HttpSinkConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add add the new config to the firehose documentation here https://github.com/goto/firehose/blob/main/docs/docs/sinks/http-sink.md and give an example
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also bump up Firehose version, just a minor version bump
@Key("SINK_HTTP_SERIALIZER_JSON_TYPECAST") | ||
@ConverterClass(HttpSinkSerializerJsonTypecastConfigConverter.class) | ||
@DefaultValue("[]") | ||
Map<String, Function<String, Object>> getJsonTypecastMapping(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename method to getSinkHttpSerializerJsonTypecast()
.using(jsonPathConfiguration) | ||
.parse(jsonString); | ||
|
||
for (Map.Entry<String, Function<String, Object>> entry : httpSinkConfig.getJsonTypecastMapping() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactor this code to the following
serializerConfig.getJsonTypecastMapping().forEach((jsonPath, typecastFunction) -> {
documentContext.map(jsonPath, (currentValue, configuration) ->
Optional.ofNullable(currentValue)
.map(value -> typecastFunction.apply(value.toString()))
.orElse(null)
INTEGER { | ||
@Override | ||
public Object cast(String input) { | ||
return Integer.valueOf(input); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add try-catch not to allow invalid input to make it future-proof
try {
return Integer.valueOf(input);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid input for INTEGER: " + input, e);
}
}, LONG { | ||
@Override | ||
public Object cast(String input) { | ||
return Long.valueOf(input); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add try-catch not to allow invalid input to make it future-proof
try {
return Long.valueOf(input);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid input for LONG: " + input, e);
}
}, DOUBLE { | ||
@Override | ||
public Object cast(String input) { | ||
return Double.valueOf(input); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add try-catch not to allow invalid input to make it future-proof
try {
return Double.valueOf(input);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid input for DOUBLE: " + input, e);
}
throw new IllegalArgumentException(e.getMessage(), e.getCause()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new IllegalArgumentException("Error parsing serializer json config: " + e.getMessage(), e);
New Classes:
Purpose:
TypecastedJsonSerializer
is a decorator forMessageToTemplatizedJson
andMessageToJson
serializers.It enables typecasting for the resulting JSON from the wrapped serializer.
Addresses the issue where default Protobuf to JSON conversion(using
JSONFormat
) treatsing int64 as a String.Introduces the capability for users to specify JSON paths to be typecasted to INTEGER, LONG, or DOUBLE.
This issue specifically aimed to provide backward compability for users that want to migrate from EGLC to Firehose
Example Parameter:
SERIALIZER_JSON_TYPECAST=[{"jsonPath": "$.someField.nominal", "type": "INTEGER"}]
it should cast from :
{"someField": {"nominal": "12345"}}
to
{"someField": {"nominal":12345}}
Manually casts the result into a specific type.
Reason for the PR
Current Issue: The JsonFormat library converts int64 to String.
User Need: Some users need int64 to be serialized as a JSON integer/number due to strict schema contracts in their Sink.
Solution: A decorator to typecast the JSON output to meet the user's requirements for the HTTP Sink.
Impact
This change should only impact the HTTP Sink, specifically the parts which utilize
MessageToTemplatizedJson
andMessageToJson
serializerReference :