From 53718d7b5c6f8dceef762c670501a4d53d5a637a Mon Sep 17 00:00:00 2001 From: Jeremy Custenborder Date: Wed, 13 Nov 2019 17:50:22 -0700 Subject: [PATCH] Added transformation to copy the schema name to the topic. Fixes #48. (#49) --- .../transform/common/SchemaNameToTopic.java | 85 +++++++++++++++++++ .../common/SchemaNameToTopicTest.java | 64 ++++++++++++++ 2 files changed, 149 insertions(+) create mode 100644 src/main/java/com/github/jcustenborder/kafka/connect/transform/common/SchemaNameToTopic.java create mode 100644 src/test/java/com/github/jcustenborder/kafka/connect/transform/common/SchemaNameToTopicTest.java diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/SchemaNameToTopic.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/SchemaNameToTopic.java new file mode 100644 index 0000000..6933427 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/SchemaNameToTopic.java @@ -0,0 +1,85 @@ +/** + * Copyright © 2019 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.transform.common; + +import com.github.jcustenborder.kafka.connect.utils.config.Description; +import com.github.jcustenborder.kafka.connect.utils.config.Title; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.transforms.Transformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +@Title("SchemaNameToTopic") +@Description("This transformation is used to take the name from the schema for the key or value and" + + " replace the topic with this value.") +public abstract class SchemaNameToTopic> implements Transformation { + private static final Logger log = LoggerFactory.getLogger(SchemaNameToTopic.class); + + @Override + public ConfigDef config() { + return new ConfigDef(); + } + + @Override + public void close() { + + } + + @Override + public void configure(Map map) { + + } + + public static class Key> extends SchemaNameToTopic { + @Override + public R apply(R r) { + + return r.newRecord( + r.keySchema().name(), + r.kafkaPartition(), + r.keySchema(), + r.key(), + r.valueSchema(), + r.value(), + r.timestamp(), + r.headers() + ); + } + } + + + public static class Value> extends SchemaNameToTopic { + @Override + public R apply(R r) { + + return r.newRecord( + r.valueSchema().name(), + r.kafkaPartition(), + r.keySchema(), + r.key(), + r.valueSchema(), + r.value(), + r.timestamp(), + r.headers() + ); + } + } + + +} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/SchemaNameToTopicTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/SchemaNameToTopicTest.java new file mode 100644 index 0000000..23d138b --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/SchemaNameToTopicTest.java @@ -0,0 +1,64 @@ +package com.github.jcustenborder.kafka.connect.transform.common; + +import com.google.common.base.Strings; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.transforms.Transformation; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class SchemaNameToTopicTest { + Transformation transformation = new SchemaNameToTopic.Value<>(); + SinkRecord exampleRecord(Schema schema) { + Struct struct = new Struct(schema); + for (Field field : schema.fields()) { + struct.put(field, Strings.repeat("x", 50)); + } + return new SinkRecord( + "test", + 0, + null, + null, + schema, + struct, + 1234L + ); + + } + + Schema exampleSchema(List fieldNames, final int version) { + SchemaBuilder builder = SchemaBuilder.struct() + .name(this.getClass().getName()); + for (String fieldName : fieldNames) { + builder.field(fieldName, Schema.STRING_SCHEMA); + } + builder.version(version); + return builder.build(); + } + + @Test + public void apply() { + Schema schema = SchemaBuilder.struct() + .name("com.foo.bar.whatever.ASDF") + .field("firstName", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + SinkRecord input = exampleRecord(schema); + SinkRecord actual = this.transformation.apply(input); + assertNotNull(actual); + assertEquals(schema.name(), actual.topic()); + + + } + + +}