Skip to content

Commit

Permalink
Added example for a converter. Added example for a transformation wit…
Browse files Browse the repository at this point in the history
…h a Key and Value implementation. Fixes #11. (#12)
  • Loading branch information
jcustenborder authored Mar 6, 2020
1 parent a690082 commit cf9d9d1
Show file tree
Hide file tree
Showing 15 changed files with 251 additions and 17 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ for a list of available versions.
mvn archetype:generate \
-DarchetypeGroupId=com.github.jcustenborder.kafka.connect \
-DarchetypeArtifactId=kafka-connect-quickstart \
-DarchetypeVersion=2.0.0-cp1
-DarchetypeVersion=2.4.0
```

```
mvn archetype:generate \
-DarchetypeGroupId=com.github.jcustenborder.kafka.connect \
-DarchetypeArtifactId=kafka-connect-quickstart \
-DarchetypeVersion=2.0.0-cp1 \
-DarchetypeVersion=2.4.0 \
-Dpackage=com.github.jcustenborder.kafka.connect.test \
-DgroupId=com.github.jcustenborder.kafka.connect \
-DartifactId=testconnect \
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<groupId>com.github.jcustenborder.kafka.connect</groupId>
<artifactId>kafka-connect-quickstart</artifactId>
<version>2.0.0-cp1</version>
<version>2.4.0</version>
<name>kafka-connect-archtype</name>
<description>A quickstart for building Kafka Connect connectors.</description>
<url>https://github.com/jcustenborder/kafka-connect-archtype</url>
Expand Down
6 changes: 6 additions & 0 deletions src/main/resources/META-INF/maven/archetype.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype/1.0.0 http://maven.apache.org/xsd/archetype-1.0.0.xsd">
<id>quickstart</id>
<sources>
<source>src/main/java/package-info.java</source>
<source>src/main/java/MyConverter.java</source>
<source>src/main/java/MyKeyValueTransformation.java</source>
<source>src/main/java/MyKeyValueTransformationConfig.java</source>
<source>src/main/java/MySourceConnector.java</source>
<source>src/main/java/MySourceConnectorConfig.java</source>
<source>src/main/java/MySourceTask.java</source>
Expand All @@ -21,6 +25,8 @@
<resource>src/test/resources/logback.xml</resource>
</resources>
<testSources>
<source>src/test/java/MyConverterTest.java</source>
<source>src/test/java/MyKeyValueTransformationTest.java</source>
<source>src/test/java/MySinkConnectorTest.java</source>
<source>src/test/java/MySinkTaskTest.java</source>
<source>src/test/java/MySinkTaskIT.java</source>
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/archetype-resources/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Introduction

Welcome to your new Kafka Connect connector!
Welcome to your new Kafka Connect plugin!

# Running in development

Expand Down
8 changes: 4 additions & 4 deletions src/main/resources/archetype-resources/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
version: "2"
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.0.0
image: confluentinc/cp-zookeeper:5.4.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
zk_id: "1"
ports:
- "2181:2181"
kafka:
hostname: kafka
image: confluentinc/cp-enterprise-kafka:5.0.0
image: confluentinc/cp-enterprise-kafka:5.4.0
links:
- zookeeper
ports:
Expand All @@ -37,7 +37,7 @@ services:
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
schema-registry:
image: confluentinc/cp-schema-registry:5.0.0
image: confluentinc/cp-schema-registry:5.4.0
links:
- kafka
- zookeeper
Expand All @@ -47,7 +47,7 @@ services:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"
SCHEMA_REGISTRY_HOST_NAME: schema-registry
control-center:
image: confluentinc/cp-enterprise-control-center:5.0.0
image: confluentinc/cp-enterprise-control-center:5.4.0
depends_on:
- zookeeper
- kafka
Expand Down
28 changes: 27 additions & 1 deletion src/main/resources/archetype-resources/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<parent>
<groupId>com.github.jcustenborder.kafka.connect</groupId>
<artifactId>kafka-connect-parent</artifactId>
<version>2.0.0-cp1</version>
<version>2.4.0</version>
</parent>

<scm>
Expand All @@ -36,6 +36,32 @@
<skip>true</skip>
</configuration>
</plugin>
<!--
Uncomment this second to generate a package to deploy to the Confluent Hub.
<plugin>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-maven-plugin</artifactId>
<configuration>
<confluentControlCenterIntegration>true</confluentControlCenterIntegration>
<documentationUrl>https://jcustenborder.github.io/kafka-connect-documentation/</documentationUrl>
<componentTypes>
<componentType>source</componentType>
<componentType>sink</componentType>
<componentType>transform</componentType>
<componentType>converter</componentType>
</componentTypes>
<tags>
<tag>${artifactId}</tag>
</tags>
<title>Kafka Connect ${artifactId}</title>
<supportUrl>${pom.issueManagement.url}</supportUrl>
<supportSummary>Support provided through community involvement.</supportSummary>
<excludes>
<exclude>org.reflections:reflections</exclude>
</excludes>
</configuration>
</plugin>
-->
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package ${package};

import org.apache.kafka.common.header.Header;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import java.util.Map;
import com.github.jcustenborder.kafka.connect.utils.config.Description;
import com.github.jcustenborder.kafka.connect.utils.config.Title;
import com.github.jcustenborder.kafka.connect.utils.config.DocumentationImportant;
import com.github.jcustenborder.kafka.connect.utils.config.DocumentationNote;
import com.github.jcustenborder.kafka.connect.utils.config.DocumentationTip;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Description("This is a description of this connector and will show up in the documentation")
@DocumentationImportant("This is a important information that will show up in the documentation.")
@DocumentationTip("This is a tip that will show up in the documentation.")
@Title("Super Converter") //This is the display name that will show up in the documentation.
@DocumentationNote("This is a note that will show up in the documentation")
public class MyConverter implements Converter {
private static Logger log = LoggerFactory.getLogger(MyConverter.class);

@Override
public void configure(Map<String, ?> settings, boolean isKey) {
//TODO: Do your setup here.
}

@Override
public byte[] fromConnectData(String s, Schema schema, Object o) {
throw new UnsupportedOperationException(
"This needs to be completed"
);
}

@Override
public byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value) {
throw new UnsupportedOperationException(
"This converter requires Kafka 2.4.0 or higher with header support."
);
}

@Override
public SchemaAndValue toConnectData(String s, byte[] bytes) {
throw new UnsupportedOperationException(
"This needs to be completed"
);
}

@Override
public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
throw new UnsupportedOperationException(
"This converter requires Kafka 2.4.0 or higher with header support."
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package ${package};

import com.github.jcustenborder.kafka.connect.utils.config.Description;
import com.github.jcustenborder.kafka.connect.utils.config.Title;
import com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;

import java.io.IOException;
import java.io.InputStream;
import java.util.Map;

@Title("Super Cool Transformation")
@Description("This transformation will change one record to another record.")
public class MyKeyValueTransformation<R extends ConnectRecord<R>> extends BaseKeyValueTransformation<R> {
MyKeyValueTransformationConfig config;

protected MyKeyValueTransformation(boolean isKey) {
super(isKey);
}

@Override
public ConfigDef config() {
return MyKeyValueTransformationConfig.config();
}

@Override
public void close() {

}

@Override
protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[] input) {
throw new UnsupportedOperationException("This method will execute against byte arrays.");
}

@Override
protected SchemaAndValue processString(R record, Schema inputSchema, String input) {
throw new UnsupportedOperationException("This method will execute against Strings.");
}

@Override
public void configure(Map<String, ?> map) {
this.config = new MyKeyValueTransformationConfig(map);
}

/**
* This implementation works against the key of the record.
* @param <R>
*/
public static class Key<R extends ConnectRecord<R>> extends MyKeyValueTransformation<R> {
public Key() {
super(true);
}
}

/**
* This implementation works against the value of the record.
* @param <R>
*/
public static class Value<R extends ConnectRecord<R>> extends MyKeyValueTransformation<R> {
public Value() {
super(false);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package ${package};

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Importance;
import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder;

import java.util.Map;


public class MyKeyValueTransformationConfig extends AbstractConfig {

public static final String MY_SETTING_CONFIG = "my.setting";
private static final String MY_SETTING_DOC = "This is a setting important to my connector.";

public final String mySetting;

public MyKeyValueTransformationConfig(Map<?, ?> originals) {
super(config(), originals);
this.mySetting = this.getString(MY_SETTING_CONFIG);
}

public static ConfigDef config() {
return new ConfigDef()
.define(
ConfigKeyBuilder.of(MY_SETTING_CONFIG, Type.STRING)
.documentation(MY_SETTING_DOC)
.importance(Importance.HIGH)
.build()
);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
@Introduction("This is the high level introduction section for your plugin")
@Title("This is the title of your plugin")
/**
* This attribute is used during documentation generation to write the introduction section.
*/
@Introduction("This plugin is used to add additional JSON parsing functionality to Kafka Connect.")
/**
* This attribute is used as the display name during documentation generation.
*/
@Title("${artifactId}")
/**
* This attribute is used to provide the owner on the connect hub. For example jcustenborder.
*/
@PluginOwner("${groupId}")
/**
* This attribute is used to provide the name of the plugin on the connect hub.
*/
@PluginName("${artifactId}")
package ${package};

import com.github.jcustenborder.kafka.connect.utils.config.Introduction;
import com.github.jcustenborder.kafka.connect.utils.config.PluginName;
import com.github.jcustenborder.kafka.connect.utils.config.PluginOwner;
import com.github.jcustenborder.kafka.connect.utils.config.Title;
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,5 @@
import com.github.jcustenborder.kafka.connect.utils.BaseDocumentationTest;

public class DocumentationTest extends BaseDocumentationTest {
@Override
protected String[] packages() {
return new String[0];
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package ${package};

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class MyConverterTest {
@Test
public void test() {
// Congrats on a passing test!
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package ${package};

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class MyKeyValueTransformationTest {
@Test
public void test() {
// Congrats on a passing test!
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@
<appender-ref ref="STDOUT" />
</root>
<logger name="${package}" level="TRACE" />
<logger name="org.reflections" level="ERROR"/>
</configuration>
5 changes: 3 additions & 2 deletions test-archtype.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
mvn -B clean install

ARTIFACT_ID=testconnect
ARCHETYPE_VERSION='2.0.0-cp1'
ARCHETYPE_VERSION='2.4.0'
ARCHETYPE_TEMP=`mktemp -d 2>/dev/null || mktemp -d -t 'mytmpdir'`
PROJECT_DIR="${ARCHETYPE_TEMP}/${ARTIFACT_ID}"

Expand All @@ -11,4 +11,5 @@ cd "${ARCHETYPE_TEMP}"
mvn -B archetype:generate -DarchetypeGroupId=com.github.jcustenborder.kafka.connect -DarchetypeArtifactId=kafka-connect-quickstart -DarchetypeVersion=$ARCHETYPE_VERSION -Dpackage=io.confluent.examples -DgroupId=io.confluent.examples -DartifactId=$ARTIFACT_ID -DpackageName=io.confluent.examples -Dversion=1.0-SNAPSHOT
cd "${PROJECT_DIR}"
mvn clean package
rm -rf "${ARCHETYPE_TEMP}"
#rm -rf "${ARCHETYPE_TEMP}"
echo "${ARCHETYPE_TEMP}"

0 comments on commit cf9d9d1

Please sign in to comment.