From 1320ffa764d894236b1449e417fc5789d6cd9300 Mon Sep 17 00:00:00 2001 From: Malthe Borch Date: Thu, 3 Aug 2023 14:36:50 +0000 Subject: [PATCH] Recurse into structs and arrays when changing case (#102) --- .../connect/transform/common/ChangeCase.java | 111 ++++++++++++------ .../transform/common/ChangeCaseTest.java | 67 ++++++----- 2 files changed, 115 insertions(+), 63 deletions(-) diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCase.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCase.java index df4ea16..0adf819 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCase.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCase.java @@ -30,22 +30,13 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; -import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public abstract class ChangeCase> extends BaseTransformation { private static final Logger log = LoggerFactory.getLogger(ChangeCase.class); - class State { - public final Map columnMapping; - public final Schema schema; - - State(Map columnMapping, Schema schema) { - this.columnMapping = columnMapping; - this.schema = schema; - } - } - private ChangeCaseConfig config; @Override @@ -63,39 +54,89 @@ public void configure(Map map) { this.config = new ChangeCaseConfig(map); } - Map schemaState = new HashMap<>(); + Map schemaState = new HashMap<>(); @Override protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) { - final State state = this.schemaState.computeIfAbsent(inputSchema, schema -> { - final SchemaBuilder builder = SchemaBuilder.struct(); - if (!Strings.isNullOrEmpty(schema.name())) { - builder.name(schema.name()); - } - if (schema.isOptional()) { - builder.optional(); - } + final Schema outputSchema = this.schemaState.computeIfAbsent(inputSchema, schema -> convertSchema(schema)); + final Struct outputStruct = convertStruct(inputSchema, outputSchema, input); + return new SchemaAndValue(outputSchema, outputStruct); + } - final Map columnMapping = new LinkedHashMap<>(); + private Struct convertStruct(Schema inputSchema, Schema outputSchema, Struct input) { + final Struct struct = new Struct(outputSchema); + for (Field inputField : inputSchema.fields()) { + final int index = inputField.index(); + final Field outputField = outputSchema.fields().get(index); + final Schema inputFieldSchema = inputField.schema(); + final Schema outputFieldSchema = outputField.schema(); + final Object value = convertValue(inputFieldSchema, outputFieldSchema, input.get(inputField)); + struct.put(outputField, value); + } + return struct; + } - for (Field field : schema.fields()) { - final String newFieldName = this.config.from.to(this.config.to, field.name()); - log.trace("processStruct() - Mapped '{}' to '{}'", field.name(), newFieldName); - columnMapping.put(field.name(), newFieldName); - builder.field(newFieldName, field.schema()); + private Object convertValue(Schema inputFieldSchema, Schema outputFieldSchema, Object value) { + switch (outputFieldSchema.type()) { + case STRUCT: { + return convertStruct(inputFieldSchema, outputFieldSchema, (Struct) value); } + case ARRAY: { + return convertArray(inputFieldSchema, outputFieldSchema, (List) value); + } + } + return value; + } - return new State(columnMapping, builder.build()); - }); - - final Struct outputStruct = new Struct(state.schema); - - for (Map.Entry kvp : state.columnMapping.entrySet()) { - final Object value = input.get(kvp.getKey()); - outputStruct.put(kvp.getValue(), value); + private Object convertArray(Schema inputFieldSchema, Schema outputFieldSchema, List value) { + final Schema inputSchema = inputFieldSchema.valueSchema(); + final Schema outputSchema = outputFieldSchema.valueSchema(); + switch (outputSchema.type()) { + case STRUCT: { + return value.stream().map(entry -> convertStruct( + inputSchema, + outputSchema, + (Struct) entry + )).collect(Collectors.toList()); + } + case ARRAY: { + return value.stream().map(entry -> convertArray( + inputSchema, + outputSchema, + (List) entry + )).collect(Collectors.toList()); + } } + return value; + } - return new SchemaAndValue(state.schema, outputStruct); + private Schema convertSchema(Schema inputSchema) { + switch (inputSchema.type()) { + case ARRAY: { + log.trace("convertSchema() - Recurse into array"); + final SchemaBuilder builder = SchemaBuilder.array(convertSchema(inputSchema.valueSchema())); + if (inputSchema.isOptional()) { + builder.optional(); + } + return builder.build(); + } + case STRUCT: { + final SchemaBuilder builder = SchemaBuilder.struct(); + if (!Strings.isNullOrEmpty(inputSchema.name())) { + builder.name(inputSchema.name()); + } + if (inputSchema.isOptional()) { + builder.optional(); + } + for (Field field : inputSchema.fields()) { + final String newFieldName = this.config.from.to(this.config.to, field.name()); + log.trace("convertSchema() - Mapped '{}' to '{}'", field.name(), newFieldName); + builder.field(newFieldName, convertSchema(field.schema())); + } + return builder.build(); + } + } + return inputSchema; } @Title("ChangeCase(Key)") diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCaseTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCaseTest.java index 37b3dce..78fedbf 100644 --- a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCaseTest.java +++ b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCaseTest.java @@ -25,6 +25,10 @@ import org.apache.kafka.connect.transforms.Transformation; import org.junit.jupiter.api.Test; +import java.util.ArrayList; +import java.util.Collections; +import java.util.function.Function; + import static com.github.jcustenborder.kafka.connect.utils.AssertSchema.assertSchema; import static com.github.jcustenborder.kafka.connect.utils.AssertStruct.assertStruct; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -37,44 +41,51 @@ protected ChangeCaseTest(boolean isKey) { @Test public void test() { this.transformation.configure( - ImmutableMap.of( - ChangeCaseConfig.FROM_CONFIG, CaseFormat.UPPER_UNDERSCORE.toString(), - ChangeCaseConfig.TO_CONFIG, CaseFormat.LOWER_UNDERSCORE.toString() - ) - ); - final Schema inputSchema = SchemaBuilder.struct() - .field("FIRST_NAME", Schema.STRING_SCHEMA) - .field("LAST_NAME", Schema.STRING_SCHEMA) - .build(); - final Schema expectedSchema = SchemaBuilder.struct() - .field("first_name", Schema.STRING_SCHEMA) - .field("last_name", Schema.STRING_SCHEMA) - .build(); - final Struct inputStruct = new Struct(inputSchema) - .put("FIRST_NAME", "test") - .put("LAST_NAME", "user"); - final Struct expectedStruct = new Struct(expectedSchema) - .put("first_name", "test") - .put("last_name", "user"); + ImmutableMap.of(ChangeCaseConfig.FROM_CONFIG, CaseFormat.UPPER_UNDERSCORE.toString(), + ChangeCaseConfig.TO_CONFIG, CaseFormat.LOWER_UNDERSCORE.toString())); + final Schema inputSchema = makeSchema(CaseFormat.UPPER_UNDERSCORE); + final Schema expectedSchema = makeSchema(CaseFormat.LOWER_UNDERSCORE); - final SinkRecord inputRecord = new SinkRecord( - "topic", - 1, - null, - null, - inputSchema, - inputStruct, - 1L - ); + final Struct inputStruct = makeStruct(inputSchema, CaseFormat.UPPER_UNDERSCORE); + final Struct expectedStruct = makeStruct(expectedSchema, CaseFormat.LOWER_UNDERSCORE); + + final SinkRecord inputRecord = new SinkRecord("topic", 1, null, null, inputSchema, inputStruct, 1L); for (int i = 0; i < 50; i++) { final SinkRecord transformedRecord = this.transformation.apply(inputRecord); assertNotNull(transformedRecord, "transformedRecord should not be null."); assertSchema(expectedSchema, transformedRecord.valueSchema()); assertStruct(expectedStruct, (Struct) transformedRecord.value()); } + } + private Schema makeSchema(CaseFormat caseFormat) { + final Function convert = s -> CaseFormat.LOWER_UNDERSCORE.to(caseFormat, s); + return SchemaBuilder.struct().field(convert.apply("contacts"), + SchemaBuilder.array(SchemaBuilder.struct() + .field(convert.apply("contact"), + SchemaBuilder.struct() + .field(convert.apply("first_name"), Schema.STRING_SCHEMA) + .field(convert.apply("last_name"), Schema.STRING_SCHEMA) + .build() + ).build()) + ).build(); } + private Struct makeStruct(Schema schema, CaseFormat caseFormat) { + final Function convert = s -> CaseFormat.LOWER_UNDERSCORE.to(caseFormat, s); + final Schema contacts = schema.fields().get(0).schema().valueSchema(); + final Schema contact = contacts.fields().get(0).schema(); + return new Struct(schema).put(convert.apply("contacts"), + new ArrayList<>( + Collections.singletonList( + new Struct(contacts).put(convert.apply("contact"), + new Struct(contact) + .put(convert.apply("first_name"), "test") + .put(convert.apply("last_name"), "user")) + ) + ) + ); + } public static class ValueTest> extends ChangeCaseTest { protected ValueTest() {