diff --git a/README.md b/README.md index ed5e1df..a66f048 100644 --- a/README.md +++ b/README.md @@ -9,14 +9,14 @@ for a list of available versions. mvn archetype:generate \ -DarchetypeGroupId=com.github.jcustenborder.kafka.connect \ -DarchetypeArtifactId=kafka-connect-quickstart \ - -DarchetypeVersion=2.0.0-cp1 + -DarchetypeVersion=2.4.0 ``` ``` mvn archetype:generate \ -DarchetypeGroupId=com.github.jcustenborder.kafka.connect \ -DarchetypeArtifactId=kafka-connect-quickstart \ - -DarchetypeVersion=2.0.0-cp1 \ + -DarchetypeVersion=2.4.0 \ -Dpackage=com.github.jcustenborder.kafka.connect.test \ -DgroupId=com.github.jcustenborder.kafka.connect \ -DartifactId=testconnect \ diff --git a/pom.xml b/pom.xml index 9fec137..a8cdf3f 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ com.github.jcustenborder.kafka.connect kafka-connect-quickstart - 2.0.0-cp1 + 2.4.0 kafka-connect-archtype A quickstart for building Kafka Connect connectors. https://github.com/jcustenborder/kafka-connect-archtype diff --git a/src/main/resources/META-INF/maven/archetype.xml b/src/main/resources/META-INF/maven/archetype.xml index 863425e..30b9e8a 100644 --- a/src/main/resources/META-INF/maven/archetype.xml +++ b/src/main/resources/META-INF/maven/archetype.xml @@ -3,6 +3,10 @@ xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype/1.0.0 http://maven.apache.org/xsd/archetype-1.0.0.xsd"> quickstart + src/main/java/package-info.java + src/main/java/MyConverter.java + src/main/java/MyKeyValueTransformation.java + src/main/java/MyKeyValueTransformationConfig.java src/main/java/MySourceConnector.java src/main/java/MySourceConnectorConfig.java src/main/java/MySourceTask.java @@ -21,6 +25,8 @@ src/test/resources/logback.xml + src/test/java/MyConverterTest.java + src/test/java/MyKeyValueTransformationTest.java src/test/java/MySinkConnectorTest.java src/test/java/MySinkTaskTest.java src/test/java/MySinkTaskIT.java diff --git a/src/main/resources/archetype-resources/README.md b/src/main/resources/archetype-resources/README.md index 441f53e..ea28653 100644 --- a/src/main/resources/archetype-resources/README.md +++ b/src/main/resources/archetype-resources/README.md @@ -1,6 +1,6 @@ # Introduction -Welcome to your new Kafka Connect connector! +Welcome to your new Kafka Connect plugin! # Running in development diff --git a/src/main/resources/archetype-resources/docker-compose.yml b/src/main/resources/archetype-resources/docker-compose.yml index 510cc33..a96bfbc 100644 --- a/src/main/resources/archetype-resources/docker-compose.yml +++ b/src/main/resources/archetype-resources/docker-compose.yml @@ -17,7 +17,7 @@ version: "2" services: zookeeper: - image: confluentinc/cp-zookeeper:5.0.0 + image: confluentinc/cp-zookeeper:5.4.0 environment: ZOOKEEPER_CLIENT_PORT: 2181 zk_id: "1" @@ -25,7 +25,7 @@ services: - "2181:2181" kafka: hostname: kafka - image: confluentinc/cp-enterprise-kafka:5.0.0 + image: confluentinc/cp-enterprise-kafka:5.4.0 links: - zookeeper ports: @@ -37,7 +37,7 @@ services: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 schema-registry: - image: confluentinc/cp-schema-registry:5.0.0 + image: confluentinc/cp-schema-registry:5.4.0 links: - kafka - zookeeper @@ -47,7 +47,7 @@ services: SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181" SCHEMA_REGISTRY_HOST_NAME: schema-registry control-center: - image: confluentinc/cp-enterprise-control-center:5.0.0 + image: confluentinc/cp-enterprise-control-center:5.4.0 depends_on: - zookeeper - kafka diff --git a/src/main/resources/archetype-resources/pom.xml b/src/main/resources/archetype-resources/pom.xml index 924d21a..8364476 100644 --- a/src/main/resources/archetype-resources/pom.xml +++ b/src/main/resources/archetype-resources/pom.xml @@ -14,7 +14,7 @@ com.github.jcustenborder.kafka.connect kafka-connect-parent - 2.0.0-cp1 + 2.4.0 @@ -36,6 +36,32 @@ true + diff --git a/src/main/resources/archetype-resources/src/main/java/MyConverter.java b/src/main/resources/archetype-resources/src/main/java/MyConverter.java new file mode 100644 index 0000000..d47e225 --- /dev/null +++ b/src/main/resources/archetype-resources/src/main/java/MyConverter.java @@ -0,0 +1,58 @@ +package ${package}; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import java.util.Map; +import com.github.jcustenborder.kafka.connect.utils.config.Description; +import com.github.jcustenborder.kafka.connect.utils.config.Title; +import com.github.jcustenborder.kafka.connect.utils.config.DocumentationImportant; +import com.github.jcustenborder.kafka.connect.utils.config.DocumentationNote; +import com.github.jcustenborder.kafka.connect.utils.config.DocumentationTip; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Description("This is a description of this connector and will show up in the documentation") +@DocumentationImportant("This is a important information that will show up in the documentation.") +@DocumentationTip("This is a tip that will show up in the documentation.") +@Title("Super Converter") //This is the display name that will show up in the documentation. +@DocumentationNote("This is a note that will show up in the documentation") +public class MyConverter implements Converter { + private static Logger log = LoggerFactory.getLogger(MyConverter.class); + + @Override + public void configure(Map settings, boolean isKey) { + //TODO: Do your setup here. + } + + @Override + public byte[] fromConnectData(String s, Schema schema, Object o) { + throw new UnsupportedOperationException( + "This needs to be completed" + ); + } + + @Override + public byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value) { + throw new UnsupportedOperationException( + "This converter requires Kafka 2.4.0 or higher with header support." + ); + } + + @Override + public SchemaAndValue toConnectData(String s, byte[] bytes) { + throw new UnsupportedOperationException( + "This needs to be completed" + ); + } + + @Override + public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) { + throw new UnsupportedOperationException( + "This converter requires Kafka 2.4.0 or higher with header support." + ); + } +} \ No newline at end of file diff --git a/src/main/resources/archetype-resources/src/main/java/MyKeyValueTransformation.java b/src/main/resources/archetype-resources/src/main/java/MyKeyValueTransformation.java new file mode 100644 index 0000000..8f5a02a --- /dev/null +++ b/src/main/resources/archetype-resources/src/main/java/MyKeyValueTransformation.java @@ -0,0 +1,70 @@ +package ${package}; + +import com.github.jcustenborder.kafka.connect.utils.config.Description; +import com.github.jcustenborder.kafka.connect.utils.config.Title; +import com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.DataException; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +@Title("Super Cool Transformation") +@Description("This transformation will change one record to another record.") +public class MyKeyValueTransformation> extends BaseKeyValueTransformation { + MyKeyValueTransformationConfig config; + + protected MyKeyValueTransformation(boolean isKey) { + super(isKey); + } + + @Override + public ConfigDef config() { + return MyKeyValueTransformationConfig.config(); + } + + @Override + public void close() { + + } + + @Override + protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[] input) { + throw new UnsupportedOperationException("This method will execute against byte arrays."); + } + + @Override + protected SchemaAndValue processString(R record, Schema inputSchema, String input) { + throw new UnsupportedOperationException("This method will execute against Strings."); + } + + @Override + public void configure(Map map) { + this.config = new MyKeyValueTransformationConfig(map); + } + + /** + * This implementation works against the key of the record. + * @param + */ + public static class Key> extends MyKeyValueTransformation { + public Key() { + super(true); + } + } + + /** + * This implementation works against the value of the record. + * @param + */ + public static class Value> extends MyKeyValueTransformation { + public Value() { + super(false); + } + } +} \ No newline at end of file diff --git a/src/main/resources/archetype-resources/src/main/java/MyKeyValueTransformationConfig.java b/src/main/resources/archetype-resources/src/main/java/MyKeyValueTransformationConfig.java new file mode 100644 index 0000000..3adc513 --- /dev/null +++ b/src/main/resources/archetype-resources/src/main/java/MyKeyValueTransformationConfig.java @@ -0,0 +1,33 @@ +package ${package}; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Importance; +import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder; + +import java.util.Map; + + +public class MyKeyValueTransformationConfig extends AbstractConfig { + + public static final String MY_SETTING_CONFIG = "my.setting"; + private static final String MY_SETTING_DOC = "This is a setting important to my connector."; + + public final String mySetting; + + public MyKeyValueTransformationConfig(Map originals) { + super(config(), originals); + this.mySetting = this.getString(MY_SETTING_CONFIG); + } + + public static ConfigDef config() { + return new ConfigDef() + .define( + ConfigKeyBuilder.of(MY_SETTING_CONFIG, Type.STRING) + .documentation(MY_SETTING_DOC) + .importance(Importance.HIGH) + .build() + ); + } +} diff --git a/src/main/resources/archetype-resources/src/main/java/package-info.java b/src/main/resources/archetype-resources/src/main/java/package-info.java index f8d1797..90e91e5 100644 --- a/src/main/resources/archetype-resources/src/main/java/package-info.java +++ b/src/main/resources/archetype-resources/src/main/java/package-info.java @@ -1,6 +1,22 @@ -@Introduction("This is the high level introduction section for your plugin") -@Title("This is the title of your plugin") +/** + * This attribute is used during documentation generation to write the introduction section. + */ +@Introduction("This plugin is used to add additional JSON parsing functionality to Kafka Connect.") +/** + * This attribute is used as the display name during documentation generation. + */ +@Title("${artifactId}") +/** + * This attribute is used to provide the owner on the connect hub. For example jcustenborder. + */ +@PluginOwner("${groupId}") +/** + * This attribute is used to provide the name of the plugin on the connect hub. + */ +@PluginName("${artifactId}") package ${package}; import com.github.jcustenborder.kafka.connect.utils.config.Introduction; +import com.github.jcustenborder.kafka.connect.utils.config.PluginName; +import com.github.jcustenborder.kafka.connect.utils.config.PluginOwner; import com.github.jcustenborder.kafka.connect.utils.config.Title; \ No newline at end of file diff --git a/src/main/resources/archetype-resources/src/test/java/DocumentationTest.java b/src/main/resources/archetype-resources/src/test/java/DocumentationTest.java index f1d6621..1a21ff1 100644 --- a/src/main/resources/archetype-resources/src/test/java/DocumentationTest.java +++ b/src/main/resources/archetype-resources/src/test/java/DocumentationTest.java @@ -3,8 +3,5 @@ import com.github.jcustenborder.kafka.connect.utils.BaseDocumentationTest; public class DocumentationTest extends BaseDocumentationTest { - @Override - protected String[] packages() { - return new String[0]; - } + } \ No newline at end of file diff --git a/src/main/resources/archetype-resources/src/test/java/MyConverterTest.java b/src/main/resources/archetype-resources/src/test/java/MyConverterTest.java new file mode 100644 index 0000000..5507b86 --- /dev/null +++ b/src/main/resources/archetype-resources/src/test/java/MyConverterTest.java @@ -0,0 +1,13 @@ +package ${package}; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class MyConverterTest { + @Test + public void test() { + // Congrats on a passing test! + } +} \ No newline at end of file diff --git a/src/main/resources/archetype-resources/src/test/java/MyKeyValueTransformationTest.java b/src/main/resources/archetype-resources/src/test/java/MyKeyValueTransformationTest.java new file mode 100644 index 0000000..29905fc --- /dev/null +++ b/src/main/resources/archetype-resources/src/test/java/MyKeyValueTransformationTest.java @@ -0,0 +1,13 @@ +package ${package}; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class MyKeyValueTransformationTest { + @Test + public void test() { + // Congrats on a passing test! + } +} \ No newline at end of file diff --git a/src/main/resources/archetype-resources/src/test/resources/logback.xml b/src/main/resources/archetype-resources/src/test/resources/logback.xml index 71c7bf0..afdb398 100644 --- a/src/main/resources/archetype-resources/src/test/resources/logback.xml +++ b/src/main/resources/archetype-resources/src/test/resources/logback.xml @@ -8,4 +8,5 @@ + \ No newline at end of file diff --git a/test-archtype.sh b/test-archtype.sh index 625de7d..9238424 100755 --- a/test-archtype.sh +++ b/test-archtype.sh @@ -2,7 +2,7 @@ mvn -B clean install ARTIFACT_ID=testconnect -ARCHETYPE_VERSION='2.0.0-cp1' +ARCHETYPE_VERSION='2.4.0' ARCHETYPE_TEMP=`mktemp -d 2>/dev/null || mktemp -d -t 'mytmpdir'` PROJECT_DIR="${ARCHETYPE_TEMP}/${ARTIFACT_ID}" @@ -11,4 +11,5 @@ cd "${ARCHETYPE_TEMP}" mvn -B archetype:generate -DarchetypeGroupId=com.github.jcustenborder.kafka.connect -DarchetypeArtifactId=kafka-connect-quickstart -DarchetypeVersion=$ARCHETYPE_VERSION -Dpackage=io.confluent.examples -DgroupId=io.confluent.examples -DartifactId=$ARTIFACT_ID -DpackageName=io.confluent.examples -Dversion=1.0-SNAPSHOT cd "${PROJECT_DIR}" mvn clean package -rm -rf "${ARCHETYPE_TEMP}" +#rm -rf "${ARCHETYPE_TEMP}" +echo "${ARCHETYPE_TEMP}" \ No newline at end of file