diff --git a/pom.xml b/pom.xml index 08db37c..0627d78 100644 --- a/pom.xml +++ b/pom.xml @@ -16,14 +16,15 @@ limitations under the License. --> - + 4.0.0 com.github.jcustenborder.kafka.connect kafka-connect-parent - 2.6.1 + 3.3.1-1 kafka-connect-transform-common 0.1.0-SNAPSHOT @@ -49,8 +50,11 @@ - scm:git:https://github.com/jcustenborder/kafka-connect-transform-common.git - scm:git:git@github.com:jcustenborder/kafka-connect-transform-common.git + scm:git:https://github.com/jcustenborder/kafka-connect-transform-common.git + + + scm:git:git@github.com:jcustenborder/kafka-connect-transform-common.git + https://github.com/jcustenborder/kafka-connect-transform-common @@ -61,8 +65,10 @@ org.apache.kafka connect-json - ${kafka.version} - provided + + + com.github.jcustenborder.kafka.connect + connect-utils-jackson org.reflections @@ -73,7 +79,7 @@ com.github.jcustenborder.kafka.connect connect-utils-testing-data - [0.3.33,0.3.1000) + ${connect-utils.version} test @@ -85,7 +91,12 @@ xerces xercesImpl 2.12.1 - + + + xml-apis + xml-apis + 1.4.01 + @@ -94,7 +105,8 @@ kafka-connect-maven-plugin true - https://jcustenborder.github.io/kafka-connect-documentation/ + https://jcustenborder.github.io/kafka-connect-documentation/ + transform diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/LowerCaseTopic.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/LowerCaseTopic.java new file mode 100644 index 0000000..267a391 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/LowerCaseTopic.java @@ -0,0 +1,61 @@ +/** + * Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.transform.common; + +import com.github.jcustenborder.kafka.connect.utils.config.Description; +import com.github.jcustenborder.kafka.connect.utils.config.Title; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.transforms.Transformation; + +import java.util.Map; + +@Title("LowerCaseTopic") +@Description("This transformation is used to change a topic name to be all lower case.") +public class LowerCaseTopic> implements Transformation { + Time time = SystemTime.SYSTEM; + + @Override + public R apply(R record) { + return record.newRecord( + record.topic().toLowerCase(), + record.kafkaPartition(), + record.keySchema(), + record.key(), + record.valueSchema(), + record.value(), + record.timestamp(), + record.headers() + ); + } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } + + @Override + public void close() { + + } + + @Override + public void configure(Map map) { + + } +} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/LowerCaseTopicTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/LowerCaseTopicTest.java new file mode 100644 index 0000000..3b74655 --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/LowerCaseTopicTest.java @@ -0,0 +1,35 @@ +package com.github.jcustenborder.kafka.connect.transform.common; + +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class LowerCaseTopicTest { + + @Test + public void test() { + final SinkRecord input = new SinkRecord( + "TeSt", + 1, + null, + "", + null, + "", + 1234123L, + 12341312L, + TimestampType.NO_TIMESTAMP_TYPE + ); + LowerCaseTopic transform = new LowerCaseTopic<>(); + final SinkRecord actual = transform.apply(input); + assertEquals("test", actual.topic(), "Topic should match."); + } + + +} diff --git a/src/test/resources/com/github/jcustenborder/kafka/connect/transform/common/LowerCaseTopic/example.json b/src/test/resources/com/github/jcustenborder/kafka/connect/transform/common/LowerCaseTopic/example.json new file mode 100644 index 0000000..7c97dff --- /dev/null +++ b/src/test/resources/com/github/jcustenborder/kafka/connect/transform/common/LowerCaseTopic/example.json @@ -0,0 +1,15 @@ +{ + "input" : { + "topic" : "TestTopic", + "kafkaPartition" : 1, + "key" : "", + "value" : "", + "timestamp" : 12341312, + "timestampType" : "NO_TIMESTAMP_TYPE", + "offset" : 1234123, + "headers" : [ ] + }, + "description" : "This example will change the topic name to be all lower case.", + "name" : "Example", + "config" : { } +} \ No newline at end of file