diff --git a/pom.xml b/pom.xml index cf35050..b00e4c2 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ kafka-connect-transform-common 0.1.0-SNAPSHOT - afka-connect-transform-common + kafka-connect-transform-common https://github.com/jcustenborder/kafka-connect-transform-common 2017 diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/BaseTransformation.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/BaseTransformation.java index 7cf6812..1cd1e7b 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/BaseTransformation.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/BaseTransformation.java @@ -15,28 +15,160 @@ */ package com.github.jcustenborder.kafka.connect.transform.common; +import com.github.jcustenborder.kafka.connect.utils.data.SchemaHelper; import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.transforms.Transformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.math.BigDecimal; +import java.util.Date; +import java.util.List; import java.util.Map; public abstract class BaseTransformation> implements Transformation { + private static final Logger log = LoggerFactory.getLogger(BaseTransformation.class); - protected abstract SchemaAndValue processStruct(R record, SchemaAndValue schemaAndValue); + protected SchemaAndValue processMap(R record, Map input) { + throw new UnsupportedOperationException("MAP is not a supported type."); + } + + protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) { + throw new UnsupportedOperationException("STRUCT is not a supported type."); + } + + protected SchemaAndValue processString(R record, Schema inputSchema, String input) { + throw new UnsupportedOperationException("STRING is not a supported type."); + } + + protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[] input) { + throw new UnsupportedOperationException("BYTES is not a supported type."); + } + + protected SchemaAndValue processInt8(R record, Schema inputSchema, byte input) { + throw new UnsupportedOperationException("INT8 is not a supported type."); + } + + protected SchemaAndValue processInt16(R record, Schema inputSchema, short input) { + throw new UnsupportedOperationException("INT16 is not a supported type."); + } - protected abstract SchemaAndValue processMap(R record, SchemaAndValue schemaAndValue); + protected SchemaAndValue processInt32(R record, Schema inputSchema, int input) { + throw new UnsupportedOperationException("INT32 is not a supported type."); + } + + protected SchemaAndValue processInt64(R record, Schema inputSchema, long input) { + throw new UnsupportedOperationException("INT64 is not a supported type."); + } + + protected SchemaAndValue processBoolean(R record, Schema inputSchema, boolean input) { + throw new UnsupportedOperationException("BOOLEAN is not a supported type."); + } + + protected SchemaAndValue processTimestamp(R record, Schema inputSchema, Date input) { + throw new UnsupportedOperationException("Timestamp is not a supported type."); + } + + protected SchemaAndValue processDate(R record, Schema inputSchema, Date input) { + throw new UnsupportedOperationException("Date is not a supported type."); + } + + protected SchemaAndValue processTime(R record, Schema inputSchema, Date input) { + throw new UnsupportedOperationException("Time is not a supported type."); + } - protected SchemaAndValue process(R record, SchemaAndValue schemaAndValue) { + protected SchemaAndValue processDecimal(R record, Schema inputSchema, BigDecimal input) { + throw new UnsupportedOperationException("Decimal is not a supported type."); + } + + protected SchemaAndValue processFloat64(R record, Schema inputSchema, double input) { + throw new UnsupportedOperationException("FLOAT64 is not a supported type."); + } + + protected SchemaAndValue processFloat32(R record, Schema inputSchema, float input) { + throw new UnsupportedOperationException("FLOAT32 is not a supported type."); + } + + protected SchemaAndValue processArray(R record, Schema inputSchema, List input) { + throw new UnsupportedOperationException("ARRAY is not a supported type."); + } + + protected SchemaAndValue processMap(R record, Schema inputSchema, Map input) { + throw new UnsupportedOperationException("MAP is not a supported type."); + } + + private static final Schema OPTIONAL_TIMESTAMP = Timestamp.builder().optional().build(); + + protected SchemaAndValue process(R record, Schema inputSchema, Object input) { final SchemaAndValue result; - if (schemaAndValue.value() instanceof Struct) { - result = processStruct(record, schemaAndValue); - } else if (schemaAndValue.value() instanceof Map) { - result = processMap(record, schemaAndValue); + + if (null == inputSchema && null == input) { + return new SchemaAndValue( + null, + null + ); + } + + if (input instanceof Map) { + log.trace("process() - Processing as map"); + result = processMap(record, (Map) input); + return result; + } + + if (null == inputSchema) { + log.trace("process() - Determining schema"); + inputSchema = SchemaHelper.schema(input); + } + + log.trace("process() - Input has as schema. schema = {}", inputSchema); + if (Schema.Type.STRUCT == inputSchema.type()) { + result = processStruct(record, inputSchema, (Struct) input); + } else if (Timestamp.LOGICAL_NAME.equals(inputSchema.name())) { + result = processTimestamp(record, inputSchema, (Date) input); + } else if (org.apache.kafka.connect.data.Date.LOGICAL_NAME.equals(inputSchema.name())) { + result = processDate(record, inputSchema, (Date) input); + } else if (Time.LOGICAL_NAME.equals(inputSchema.name())) { + result = processTime(record, inputSchema, (Date) input); + } else if (Decimal.LOGICAL_NAME.equals(inputSchema.name())) { + result = processDecimal(record, inputSchema, (BigDecimal) input); + } else if (Schema.Type.STRING == inputSchema.type()) { + result = processString(record, inputSchema, (String) input); + } else if (Schema.Type.BYTES == inputSchema.type()) { + result = processBytes(record, inputSchema, (byte[]) input); + } else if (Schema.Type.INT8 == inputSchema.type()) { + result = processInt8(record, inputSchema, (byte) input); + } else if (Schema.Type.INT16 == inputSchema.type()) { + result = processInt16(record, inputSchema, (short) input); + } else if (Schema.Type.INT32 == inputSchema.type()) { + result = processInt32(record, inputSchema, (int) input); + } else if (Schema.Type.INT64 == inputSchema.type()) { + result = processInt64(record, inputSchema, (long) input); + } else if (Schema.Type.FLOAT32 == inputSchema.type()) { + result = processFloat32(record, inputSchema, (float) input); + } else if (Schema.Type.FLOAT64 == inputSchema.type()) { + result = processFloat64(record, inputSchema, (double) input); + } else if (Schema.Type.ARRAY == inputSchema.type()) { + result = processArray(record, inputSchema, (List) input); + } else if (Schema.Type.MAP == inputSchema.type()) { + result = processMap(record, inputSchema, (Map) input); } else { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException( + String.format( + "Schema is not supported. type='%s' name='%s'", + inputSchema.type(), + inputSchema.name() + ) + ); } + return result; } + + } diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/BytesToString.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/BytesToString.java new file mode 100644 index 0000000..3bd088c --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/BytesToString.java @@ -0,0 +1,141 @@ +/** + * Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.transform.common; + +import com.github.jcustenborder.kafka.connect.utils.config.Description; +import com.github.jcustenborder.kafka.connect.utils.config.DocumentationTip; +import com.github.jcustenborder.kafka.connect.utils.config.Title; +import com.google.common.base.Strings; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +public abstract class BytesToString> extends BaseTransformation { + private static final Logger log = LoggerFactory.getLogger(BytesToString.class); + + @Override + public ConfigDef config() { + return BytesToStringConfig.config(); + } + + BytesToStringConfig config; + + @Override + public void configure(Map settings) { + this.config = new BytesToStringConfig(settings); + } + + @Override + public void close() { + + } + + @Override + protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[] input) { + final Schema outputSchema = inputSchema.isOptional() ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA; + final String output = new String(input, this.config.charset); + return new SchemaAndValue(outputSchema, output); + } + + Map schemaCache = new HashMap<>(); + + @Override + protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) { + final Schema schema = this.schemaCache.computeIfAbsent(inputSchema, s -> { + final SchemaBuilder builder = SchemaBuilder.struct(); + if (!Strings.isNullOrEmpty(inputSchema.name())) { + builder.name(inputSchema.name()); + } + if (inputSchema.isOptional()) { + builder.optional(); + } + + for (Field field : inputSchema.fields()) { + log.trace("processStruct() - processing '{}'", field.name()); + final Schema fieldSchema; + if (this.config.fields.contains(field.name())) { + fieldSchema = field.schema().isOptional() ? + Schema.OPTIONAL_STRING_SCHEMA : + Schema.STRING_SCHEMA; + } else { + fieldSchema = field.schema(); + } + builder.field(field.name(), fieldSchema); + } + return builder.build(); + }); + + Struct struct = new Struct(schema); + for (Field field : schema.fields()) { + if (this.config.fields.contains(field.name())) { + byte[] buffer = input.getBytes(field.name()); + struct.put(field.name(), new String(buffer, this.config.charset)); + } else { + struct.put(field.name(), input.get(field.name())); + } + } + return new SchemaAndValue(schema, struct); + } + + @Title("BytesToString(Key)") + @Description("This transformation is used to convert a byte array to a string.") + @DocumentationTip("This transformation is used to manipulate fields in the Key of the record.") + public static class Key> extends BytesToString { + + @Override + public R apply(R r) { + final SchemaAndValue transformed = process(r, r.keySchema(), r.key()); + + return r.newRecord( + r.topic(), + r.kafkaPartition(), + transformed.schema(), + transformed.value(), + r.valueSchema(), + r.value(), + r.timestamp() + ); + } + } + + @Title("BytesToString(Value)") + @Description("This transformation is used to convert a byte array to a string.") + public static class Value> extends BytesToString { + @Override + public R apply(R r) { + final SchemaAndValue transformed = process(r, r.valueSchema(), r.value()); + + return r.newRecord( + r.topic(), + r.kafkaPartition(), + r.keySchema(), + r.key(), + transformed.schema(), + transformed.value(), + r.timestamp() + ); + } + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/BytesToStringConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/BytesToStringConfig.java new file mode 100644 index 0000000..932804d --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/BytesToStringConfig.java @@ -0,0 +1,65 @@ +/** + * Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.transform.common; + +import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +import java.nio.charset.Charset; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class BytesToStringConfig extends AbstractConfig { + public final Charset charset; + public final Set fields; + + public static final String CHARSET_CONFIG = "charset"; + public static final String CHARSET_DOC = "The charset to use when creating the output string."; + + public static final String FIELD_CONFIG = "fields"; + public static final String FIELD_DOC = "The fields to transform."; + + + public BytesToStringConfig(Map settings) { + super(config(), settings); + String charset = getString(CHARSET_CONFIG); + this.charset = Charset.forName(charset); + List fields = getList(FIELD_CONFIG); + this.fields = new HashSet<>(fields); + } + + public static ConfigDef config() { + return new ConfigDef() + .define( + ConfigKeyBuilder.of(CHARSET_CONFIG, ConfigDef.Type.STRING) + .documentation(CHARSET_DOC) + .defaultValue("UTF-8") + .importance(ConfigDef.Importance.HIGH) + .build() + ).define( + ConfigKeyBuilder.of(FIELD_CONFIG, ConfigDef.Type.LIST) + .documentation(FIELD_DOC) + .defaultValue(Collections.emptyList()) + .importance(ConfigDef.Importance.HIGH) + .build() + ); + } + +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCase.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCase.java index c58b36b..df4ea16 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCase.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeCase.java @@ -66,9 +66,7 @@ public void configure(Map map) { Map schemaState = new HashMap<>(); @Override - protected SchemaAndValue processStruct(R record, SchemaAndValue schemaAndValue) { - final Schema inputSchema = schemaAndValue.schema(); - final Struct inputStruct = (Struct) schemaAndValue.value(); + protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) { final State state = this.schemaState.computeIfAbsent(inputSchema, schema -> { final SchemaBuilder builder = SchemaBuilder.struct(); if (!Strings.isNullOrEmpty(schema.name())) { @@ -93,18 +91,13 @@ protected SchemaAndValue processStruct(R record, SchemaAndValue schemaAndValue) final Struct outputStruct = new Struct(state.schema); for (Map.Entry kvp : state.columnMapping.entrySet()) { - final Object value = inputStruct.get(kvp.getKey()); + final Object value = input.get(kvp.getKey()); outputStruct.put(kvp.getValue(), value); } return new SchemaAndValue(state.schema, outputStruct); } - @Override - protected SchemaAndValue processMap(R record, SchemaAndValue schemaAndValue) { - throw new UnsupportedOperationException(); - } - @Title("ChangeCase(Key)") @Description("This transformation is used to change the case of fields in an input struct.") @DocumentationTip("This transformation is used to manipulate fields in the Key of the record.") @@ -112,7 +105,7 @@ public static class Key> extends ChangeCase { @Override public R apply(R r) { - final SchemaAndValue transformed = process(r, new SchemaAndValue(r.keySchema(), r.key())); + final SchemaAndValue transformed = process(r, r.keySchema(), r.key()); return r.newRecord( r.topic(), @@ -132,7 +125,7 @@ public static class Value> extends ChangeCase { @Override public R apply(R r) { - final SchemaAndValue transformed = process(r, new SchemaAndValue(r.valueSchema(), r.value())); + final SchemaAndValue transformed = process(r, r.valueSchema(), r.value()); return r.newRecord( r.topic(), diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ExtractNestedField.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ExtractNestedField.java index 542b4be..acf2d8e 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ExtractNestedField.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ExtractNestedField.java @@ -55,12 +55,9 @@ public void configure(Map map) { this.schemaCache = new HashMap<>(); } - @Override - protected SchemaAndValue processStruct(R record, SchemaAndValue schemaAndValue) { - final Schema inputSchema = schemaAndValue.schema(); - final Struct inputStruct = (Struct) schemaAndValue.value(); - final Struct innerStruct = inputStruct.getStruct(this.config.outerFieldName); + protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) { + final Struct innerStruct = input.getStruct(this.config.outerFieldName); final Schema outputSchema = this.schemaCache.computeIfAbsent(inputSchema, s -> { final Field innerField = innerStruct.schema().field(this.config.innerFieldName); @@ -79,20 +76,17 @@ protected SchemaAndValue processStruct(R record, SchemaAndValue schemaAndValue) }); final Struct outputStruct = new Struct(outputSchema); for (Field inputField : inputSchema.fields()) { - final Object value = inputStruct.get(inputField); + final Object value = input.get(inputField); outputStruct.put(inputField.name(), value); } final Object innerFieldValue = innerStruct.get(this.config.innerFieldName); outputStruct.put(this.config.innerFieldName, innerFieldValue); return new SchemaAndValue(outputSchema, outputStruct); - } - @Override - protected SchemaAndValue processMap(R record, SchemaAndValue schemaAndValue) { - throw new UnsupportedOperationException(); } + @Title("ExtractNestedField(Key)") @Description("This transformation is used to extract a field from a nested struct and append it " + "to the parent struct.") @@ -101,7 +95,7 @@ public static class Key> extends ExtractNestedField> extends ExtractNestedField @Override public R apply(R r) { - final SchemaAndValue transformed = process(r, new SchemaAndValue(r.valueSchema(), r.value())); + final SchemaAndValue transformed = process(r, r.valueSchema(), r.value()); return r.newRecord( r.topic(), diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/PatternRename.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/PatternRename.java index 4cb0c0b..201f329 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/PatternRename.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/PatternRename.java @@ -54,9 +54,7 @@ public void close() { } @Override - protected SchemaAndValue processStruct(R record, SchemaAndValue schemaAndValue) { - final Schema inputSchema = schemaAndValue.schema(); - final Struct inputStruct = (Struct) schemaAndValue.value(); + protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct inputStruct) { final SchemaBuilder outputSchemaBuilder = SchemaBuilder.struct(); outputSchemaBuilder.name(inputSchema.name()); outputSchemaBuilder.doc(inputSchema.doc()); @@ -95,11 +93,10 @@ protected SchemaAndValue processStruct(R record, SchemaAndValue schemaAndValue) } @Override - protected SchemaAndValue processMap(R record, SchemaAndValue schemaAndValue) { - final Map inputMap = (Map) schemaAndValue.value(); - final Map outputMap = new LinkedHashMap<>(inputMap.size()); + protected SchemaAndValue processMap(R record, Map input) { + final Map outputMap = new LinkedHashMap<>(input.size()); - for (final String inputFieldName : inputMap.keySet()) { + for (final String inputFieldName : input.keySet()) { log.trace("process() - Processing field '{}'", inputFieldName); final Matcher fieldMatcher = this.config.pattern.matcher(inputFieldName); final String outputFieldName; @@ -108,10 +105,9 @@ protected SchemaAndValue processMap(R record, SchemaAndValue schemaAndValue) { } else { outputFieldName = inputFieldName; } - final Object value = inputMap.get(inputFieldName); + final Object value = input.get(inputFieldName); outputMap.put(outputFieldName, value); } - return new SchemaAndValue(null, outputMap); } @@ -122,7 +118,7 @@ public static class Key> extends PatternRename { @Override public R apply(R r) { - final SchemaAndValue transformed = process(r, new SchemaAndValue(r.keySchema(), r.key())); + final SchemaAndValue transformed = process(r, r.keySchema(), r.key()); return r.newRecord( r.topic(), @@ -141,7 +137,7 @@ public R apply(R r) { public static class Value> extends PatternRename { @Override public R apply(R r) { - final SchemaAndValue transformed = process(r, new SchemaAndValue(r.valueSchema(), r.value())); + final SchemaAndValue transformed = process(r, r.valueSchema(), r.value()); return r.newRecord( r.topic(), diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ToJSON.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ToJSON.java index dcc33d1..6a5a072 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ToJSON.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ToJSON.java @@ -23,6 +23,7 @@ import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.json.JsonConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,10 +57,9 @@ public void configure(Map settings) { this.converter.configure(settingsClone, false); } - @Override - protected SchemaAndValue processStruct(R record, SchemaAndValue schemaAndValue) { - final byte[] buffer = this.converter.fromConnectData("dummy", schemaAndValue.schema(), schemaAndValue.value()); + protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) { + final byte[] buffer = this.converter.fromConnectData("dummy", inputSchema, input); final Schema schema; final Object value; @@ -85,11 +85,6 @@ protected SchemaAndValue processStruct(R record, SchemaAndValue schemaAndValue) return new SchemaAndValue(schema, value); } - @Override - protected SchemaAndValue processMap(R record, SchemaAndValue schemaAndValue) { - throw new UnsupportedOperationException(); - } - @Title("ToJson(Key)") @Description("This transformation is used to take structured data such as AVRO and output it as " + "JSON by way of the JsonConverter built into Kafka Connect.") @@ -98,7 +93,7 @@ public static class Key> extends ToJSON { @Override public R apply(R r) { - final SchemaAndValue transformed = process(r, new SchemaAndValue(r.keySchema(), r.key())); + final SchemaAndValue transformed = process(r, r.keySchema(), r.key()); return r.newRecord( r.topic(), @@ -119,7 +114,7 @@ public static class Value> extends ToJSON { @Override public R apply(R r) { - final SchemaAndValue transformed = process(r, new SchemaAndValue(r.valueSchema(), r.value())); + final SchemaAndValue transformed = process(r, r.valueSchema(), r.value()); return r.newRecord( r.topic(), diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/BytesToStringTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/BytesToStringTest.java new file mode 100644 index 0000000..d13eb7c --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/BytesToStringTest.java @@ -0,0 +1,81 @@ +package com.github.jcustenborder.kafka.connect.transform.common; + +import com.google.common.collect.ImmutableMap; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.transforms.Transformation; +import org.junit.jupiter.api.Test; + +import java.io.UnsupportedEncodingException; + +import static com.github.jcustenborder.kafka.connect.utils.AssertSchema.assertSchema; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public abstract class BytesToStringTest extends TransformationTest { + protected BytesToStringTest(boolean isKey) { + super(isKey); + } + + @Test + public void struct() throws UnsupportedEncodingException { + this.transformation.configure( + ImmutableMap.of(BytesToStringConfig.FIELD_CONFIG, "bytes") + ); + final String expected = "this is a test"; + Schema schema = SchemaBuilder.struct() + .field("bytes", Schema.BYTES_SCHEMA) + .build(); + Struct struct = new Struct(schema) + .put("bytes", expected.getBytes("UTF-8")); + + final SinkRecord inputRecord = new SinkRecord( + "topic", + 1, + null, + null, + schema, + struct, + 1L + ); + + SinkRecord outputRecord = this.transformation.apply(inputRecord); + + + + } + + @Test + public void bytes() throws UnsupportedEncodingException { + this.transformation.configure( + ImmutableMap.of() + ); + final String expected = "this is a test"; + final SinkRecord inputRecord = new SinkRecord( + "topic", + 1, + null, + null, + Schema.BYTES_SCHEMA, + expected.getBytes("UTF-8"), + 1L + ); + + SinkRecord outputRecord = this.transformation.apply(inputRecord); + assertEquals(expected, outputRecord.value()); + assertSchema(Schema.STRING_SCHEMA, outputRecord.valueSchema()); + } + + public static class ValueTest> extends BytesToStringTest { + protected ValueTest() { + super(false); + } + + @Override + protected Transformation create() { + return new BytesToString.Value<>(); + } + } +}