-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add JSON Field Typecast Support #39
Changes from 26 commits
cc6eb6e
f896b71
7688ecf
698ca06
2bac0d7
66c7571
7971ae1
cf33578
199ca9d
c1744c8
713290c
6672736
998c366
6d65e89
17c30dd
ca31eac
7684284
8277258
11c2a6a
f99175b
eb30be4
0f7447a
40d1fc9
b40bd2a
a9ee36b
c615d69
2b0a72e
03a76d7
09af216
40b3203
9900d48
ad51bb7
92189ab
0b7e275
8ad587a
b4f9041
eca5e7a
1793e7c
9b6f860
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package com.gotocompany.firehose.config; | ||
|
||
import com.gotocompany.firehose.config.converter.SerializerConfigConverter; | ||
import org.aeonbits.owner.Config; | ||
|
||
import java.util.Map; | ||
import java.util.function.Function; | ||
|
||
public interface SerializerConfig extends Config { | ||
@Config.Key("SERIALIZER_JSON_TYPECAST") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. after moving you can rename this config and prefix "SINK_HTTP_" to it |
||
@Config.ConverterClass(SerializerConfigConverter.class) | ||
@DefaultValue("[]") | ||
Map<String, Function<String, Object>> getJsonTypecastMapping(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
package com.gotocompany.firehose.config.converter; | ||
|
||
import com.gotocompany.firehose.serializer.constant.TypecastTarget; | ||
import lombok.AllArgsConstructor; | ||
import lombok.Builder; | ||
import lombok.Data; | ||
import lombok.NoArgsConstructor; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.aeonbits.owner.Converter; | ||
import org.codehaus.jackson.map.ObjectMapper; | ||
import org.codehaus.jackson.type.TypeReference; | ||
|
||
import java.io.IOException; | ||
import java.lang.reflect.Method; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
|
||
@Slf4j | ||
public class SerializerConfigConverter implements Converter<Map<String, Function<String, Object>>> { | ||
|
||
private final ObjectMapper objectMapper; | ||
|
||
public SerializerConfigConverter() { | ||
this.objectMapper = new ObjectMapper(); | ||
} | ||
|
||
@Override | ||
public Map<String, Function<String, Object>> convert(Method method, String input) { | ||
try { | ||
List<JsonTypecast> jsonTypecasts = | ||
objectMapper.readValue(input, new TypeReference<List<JsonTypecast>>() { | ||
}); | ||
return jsonTypecasts.stream() | ||
.collect(Collectors.toMap(JsonTypecast::getJsonPath, jsonTypecast -> jsonTypecast.getType()::cast)); | ||
} catch (IOException e) { | ||
log.error("Error when parsing serializer json config", e); | ||
throw new IllegalArgumentException(e.getMessage(), e.getCause()); | ||
} | ||
} | ||
|
||
@AllArgsConstructor | ||
@NoArgsConstructor | ||
@Data | ||
@Builder | ||
private static class JsonTypecast { | ||
private String jsonPath; | ||
private TypecastTarget type; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
package com.gotocompany.firehose.serializer; | ||
|
||
import com.gotocompany.firehose.config.SerializerConfig; | ||
import com.gotocompany.firehose.exception.DeserializerException; | ||
import com.gotocompany.firehose.message.Message; | ||
import com.jayway.jsonpath.Configuration; | ||
import com.jayway.jsonpath.DocumentContext; | ||
import com.jayway.jsonpath.JsonPath; | ||
import com.jayway.jsonpath.Option; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.function.Function; | ||
|
||
/*** | ||
* MessageSerializer wrapping other JSON MessageSerializer which add capability to typecast some of the fields from the inner serializer. | ||
*/ | ||
@Slf4j | ||
public class TypecastedJsonSerializer implements MessageSerializer { | ||
|
||
private final MessageSerializer messageSerializer; | ||
private final SerializerConfig serializerConfig; | ||
private final Configuration jsonPathConfiguration; | ||
|
||
/** | ||
* Constructor for TypecastedJsonSerializer. | ||
* | ||
* @param messageSerializer the inner serializer to be wrapped | ||
* @param serializerConfig the configuration for typecasting, where each map contains | ||
* a JSON path and the desired type | ||
*/ | ||
public TypecastedJsonSerializer(MessageSerializer messageSerializer, | ||
SerializerConfig serializerConfig) { | ||
this.messageSerializer = messageSerializer; | ||
this.serializerConfig = serializerConfig; | ||
this.jsonPathConfiguration = Configuration.builder() | ||
.options(Option.SUPPRESS_EXCEPTIONS) | ||
.build(); | ||
} | ||
|
||
/** | ||
* Serializes the given message, then applies typecasting to specified fields in the resulting JSON. | ||
* | ||
* @param message the message to be serialized | ||
* @return the serialized and typecasted JSON string | ||
* @throws DeserializerException if an error occurs during serialization or typecasting | ||
*/ | ||
@Override | ||
public String serialize(Message message) throws DeserializerException { | ||
String jsonString = messageSerializer.serialize(message); | ||
DocumentContext documentContext = JsonPath | ||
.using(jsonPathConfiguration) | ||
.parse(jsonString); | ||
|
||
for (Map.Entry<String, Function<String, Object>> entry : serializerConfig.getJsonTypecastMapping() | ||
.entrySet()) { | ||
documentContext.map(entry.getKey(), (currentValue, configuration) -> Optional.ofNullable(currentValue) | ||
.map(v -> entry.getValue().apply(v.toString())) | ||
.orElse(null) | ||
); | ||
} | ||
return documentContext.jsonString(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package com.gotocompany.firehose.serializer.constant; | ||
|
||
public enum TypecastTarget { | ||
INTEGER { | ||
@Override | ||
public Object cast(String input) { | ||
return Integer.valueOf(input); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add try-catch not to allow invalid input to make it future-proof
|
||
} | ||
}, LONG { | ||
@Override | ||
public Object cast(String input) { | ||
return Long.valueOf(input); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add try-catch not to allow invalid input to make it future-proof
|
||
} | ||
}, DOUBLE { | ||
@Override | ||
public Object cast(String input) { | ||
return Double.valueOf(input); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add try-catch not to allow invalid input to make it future-proof
|
||
} | ||
}; | ||
|
||
public abstract Object cast(String input); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
|
||
|
||
import com.gotocompany.firehose.config.HttpSinkConfig; | ||
import com.gotocompany.firehose.config.SerializerConfig; | ||
import com.gotocompany.firehose.metrics.FirehoseInstrumentation; | ||
import com.gotocompany.firehose.sink.http.auth.OAuth2Credential; | ||
import com.gotocompany.firehose.sink.http.request.RequestFactory; | ||
|
@@ -37,15 +38,15 @@ public class HttpSinkFactory { | |
*/ | ||
public static AbstractSink create(Map<String, String> configuration, StatsDReporter statsDReporter, StencilClient stencilClient) { | ||
HttpSinkConfig httpSinkConfig = ConfigFactory.create(HttpSinkConfig.class, configuration); | ||
|
||
SerializerConfig serializerConfig = ConfigFactory.create(SerializerConfig.class, configuration); | ||
FirehoseInstrumentation firehoseInstrumentation = new FirehoseInstrumentation(statsDReporter, HttpSinkFactory.class); | ||
|
||
CloseableHttpClient closeableHttpClient = newHttpClient(httpSinkConfig, statsDReporter); | ||
firehoseInstrumentation.logInfo("HTTP connection established"); | ||
|
||
UriParser uriParser = new UriParser(stencilClient.getParser(httpSinkConfig.getInputSchemaProtoClass()), httpSinkConfig.getKafkaRecordParserMode()); | ||
|
||
Request request = new RequestFactory(statsDReporter, httpSinkConfig, stencilClient, uriParser).createRequest(); | ||
Request request = new RequestFactory(statsDReporter, httpSinkConfig, stencilClient, uriParser, serializerConfig).createRequest(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove this serializerConfig parameter after you move the config to the HttpSinkConfig |
||
|
||
return new HttpSink(new FirehoseInstrumentation(statsDReporter, HttpSink.class), request, closeableHttpClient, stencilClient, httpSinkConfig.getSinkHttpRetryStatusCodeRanges(), httpSinkConfig.getSinkHttpRequestLogStatusCodeRanges()); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package com.gotocompany.firehose.converter; | ||
|
||
import com.gotocompany.firehose.config.converter.SerializerConfigConverter; | ||
import org.junit.Test; | ||
import org.junit.jupiter.api.Assertions; | ||
|
||
import java.util.Map; | ||
import java.util.function.Function; | ||
|
||
public class SerializerConfigConverterTest { | ||
|
||
private final SerializerConfigConverter serializerConfigConverter = new SerializerConfigConverter(); | ||
|
||
@Test | ||
public void convertShouldConvertToPropertyMapWhenValidJsonConfig() { | ||
String configJson = "[{\"jsonPath\": \"$.root.field\", \"type\": \"LONG\"}]"; | ||
String expectedPropertyMapKey = "$.root.field"; | ||
|
||
Map<String, Function<String, Object>> result = serializerConfigConverter.convert(null, configJson); | ||
Function<String, Object> mapper = result.get(expectedPropertyMapKey); | ||
Object mapperResult = mapper.apply("4"); | ||
|
||
Assertions.assertNotNull(mapper); | ||
Assertions.assertTrue(mapperResult instanceof Long); | ||
Assertions.assertEquals(4L, mapperResult); | ||
} | ||
|
||
@Test | ||
public void convertShouldThrowJsonParseExceptionWhenInvalidJsonFormatProvided() { | ||
String malformedConfigJson = "[{\"jsonPath\": \"$.root.field\" \"type\": \"LONG\""; | ||
|
||
Assertions.assertThrows(IllegalArgumentException.class, | ||
() -> serializerConfigConverter.convert(null, malformedConfigJson)); | ||
} | ||
|
||
@Test | ||
public void convertShouldThrowJsonParseExceptionWhenUnregisteredTypecastingProvided() { | ||
String malformedConfigJson = "[{\"jsonPath\": \"$.root.field\", \"type\": \"BIG_INTEGER\"}]"; | ||
|
||
Assertions.assertThrows(IllegalArgumentException.class, | ||
() -> serializerConfigConverter.convert(null, malformedConfigJson)); | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add these additional unit tests
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"There is no need for this separate interface. Can you move this configuration to HttpSinkConfig?"