Skip to content

Commit

Permalink
Refactored BaseTransformation to make schema based conversions easier…
Browse files Browse the repository at this point in the history
…. Added BytesToString conversion. (#13)

fixes #12.
  • Loading branch information
jcustenborder authored Mar 20, 2018
1 parent 0f46bdb commit a2b19cd
Show file tree
Hide file tree
Showing 9 changed files with 450 additions and 53 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
</parent>
<artifactId>kafka-connect-transform-common</artifactId>
<version>0.1.0-SNAPSHOT</version>
<name>afka-connect-transform-common</name>
<name>kafka-connect-transform-common</name>
<url>https://github.com/jcustenborder/kafka-connect-transform-common</url>
<inceptionYear>2017</inceptionYear>
<licenses>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,160 @@
*/
package com.github.jcustenborder.kafka.connect.transform.common;

import com.github.jcustenborder.kafka.connect.utils.data.SchemaHelper;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
import java.util.Map;

public abstract class BaseTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
private static final Logger log = LoggerFactory.getLogger(BaseTransformation.class);

protected abstract SchemaAndValue processStruct(R record, SchemaAndValue schemaAndValue);
protected SchemaAndValue processMap(R record, Map<String, Object> input) {
throw new UnsupportedOperationException("MAP is not a supported type.");
}

protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) {
throw new UnsupportedOperationException("STRUCT is not a supported type.");
}

protected SchemaAndValue processString(R record, Schema inputSchema, String input) {
throw new UnsupportedOperationException("STRING is not a supported type.");
}

protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[] input) {
throw new UnsupportedOperationException("BYTES is not a supported type.");
}

protected SchemaAndValue processInt8(R record, Schema inputSchema, byte input) {
throw new UnsupportedOperationException("INT8 is not a supported type.");
}

protected SchemaAndValue processInt16(R record, Schema inputSchema, short input) {
throw new UnsupportedOperationException("INT16 is not a supported type.");
}

protected abstract SchemaAndValue processMap(R record, SchemaAndValue schemaAndValue);
protected SchemaAndValue processInt32(R record, Schema inputSchema, int input) {
throw new UnsupportedOperationException("INT32 is not a supported type.");
}

protected SchemaAndValue processInt64(R record, Schema inputSchema, long input) {
throw new UnsupportedOperationException("INT64 is not a supported type.");
}

protected SchemaAndValue processBoolean(R record, Schema inputSchema, boolean input) {
throw new UnsupportedOperationException("BOOLEAN is not a supported type.");
}

protected SchemaAndValue processTimestamp(R record, Schema inputSchema, Date input) {
throw new UnsupportedOperationException("Timestamp is not a supported type.");
}

protected SchemaAndValue processDate(R record, Schema inputSchema, Date input) {
throw new UnsupportedOperationException("Date is not a supported type.");
}

protected SchemaAndValue processTime(R record, Schema inputSchema, Date input) {
throw new UnsupportedOperationException("Time is not a supported type.");
}

protected SchemaAndValue process(R record, SchemaAndValue schemaAndValue) {
protected SchemaAndValue processDecimal(R record, Schema inputSchema, BigDecimal input) {
throw new UnsupportedOperationException("Decimal is not a supported type.");
}

protected SchemaAndValue processFloat64(R record, Schema inputSchema, double input) {
throw new UnsupportedOperationException("FLOAT64 is not a supported type.");
}

protected SchemaAndValue processFloat32(R record, Schema inputSchema, float input) {
throw new UnsupportedOperationException("FLOAT32 is not a supported type.");
}

protected SchemaAndValue processArray(R record, Schema inputSchema, List<Object> input) {
throw new UnsupportedOperationException("ARRAY is not a supported type.");
}

protected SchemaAndValue processMap(R record, Schema inputSchema, Map<Object, Object> input) {
throw new UnsupportedOperationException("MAP is not a supported type.");
}

private static final Schema OPTIONAL_TIMESTAMP = Timestamp.builder().optional().build();

protected SchemaAndValue process(R record, Schema inputSchema, Object input) {
final SchemaAndValue result;
if (schemaAndValue.value() instanceof Struct) {
result = processStruct(record, schemaAndValue);
} else if (schemaAndValue.value() instanceof Map) {
result = processMap(record, schemaAndValue);

if (null == inputSchema && null == input) {
return new SchemaAndValue(
null,
null
);
}

if (input instanceof Map) {
log.trace("process() - Processing as map");
result = processMap(record, (Map<String, Object>) input);
return result;
}

if (null == inputSchema) {
log.trace("process() - Determining schema");
inputSchema = SchemaHelper.schema(input);
}

log.trace("process() - Input has as schema. schema = {}", inputSchema);
if (Schema.Type.STRUCT == inputSchema.type()) {
result = processStruct(record, inputSchema, (Struct) input);
} else if (Timestamp.LOGICAL_NAME.equals(inputSchema.name())) {
result = processTimestamp(record, inputSchema, (Date) input);
} else if (org.apache.kafka.connect.data.Date.LOGICAL_NAME.equals(inputSchema.name())) {
result = processDate(record, inputSchema, (Date) input);
} else if (Time.LOGICAL_NAME.equals(inputSchema.name())) {
result = processTime(record, inputSchema, (Date) input);
} else if (Decimal.LOGICAL_NAME.equals(inputSchema.name())) {
result = processDecimal(record, inputSchema, (BigDecimal) input);
} else if (Schema.Type.STRING == inputSchema.type()) {
result = processString(record, inputSchema, (String) input);
} else if (Schema.Type.BYTES == inputSchema.type()) {
result = processBytes(record, inputSchema, (byte[]) input);
} else if (Schema.Type.INT8 == inputSchema.type()) {
result = processInt8(record, inputSchema, (byte) input);
} else if (Schema.Type.INT16 == inputSchema.type()) {
result = processInt16(record, inputSchema, (short) input);
} else if (Schema.Type.INT32 == inputSchema.type()) {
result = processInt32(record, inputSchema, (int) input);
} else if (Schema.Type.INT64 == inputSchema.type()) {
result = processInt64(record, inputSchema, (long) input);
} else if (Schema.Type.FLOAT32 == inputSchema.type()) {
result = processFloat32(record, inputSchema, (float) input);
} else if (Schema.Type.FLOAT64 == inputSchema.type()) {
result = processFloat64(record, inputSchema, (double) input);
} else if (Schema.Type.ARRAY == inputSchema.type()) {
result = processArray(record, inputSchema, (List<Object>) input);
} else if (Schema.Type.MAP == inputSchema.type()) {
result = processMap(record, inputSchema, (Map<Object, Object>) input);
} else {
throw new UnsupportedOperationException();
throw new UnsupportedOperationException(
String.format(
"Schema is not supported. type='%s' name='%s'",
inputSchema.type(),
inputSchema.name()
)
);
}

return result;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/**
* Copyright © 2017 Jeremy Custenborder ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.jcustenborder.kafka.connect.transform.common;

import com.github.jcustenborder.kafka.connect.utils.config.Description;
import com.github.jcustenborder.kafka.connect.utils.config.DocumentationTip;
import com.github.jcustenborder.kafka.connect.utils.config.Title;
import com.google.common.base.Strings;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

public abstract class BytesToString<R extends ConnectRecord<R>> extends BaseTransformation<R> {
private static final Logger log = LoggerFactory.getLogger(BytesToString.class);

@Override
public ConfigDef config() {
return BytesToStringConfig.config();
}

BytesToStringConfig config;

@Override
public void configure(Map<String, ?> settings) {
this.config = new BytesToStringConfig(settings);
}

@Override
public void close() {

}

@Override
protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[] input) {
final Schema outputSchema = inputSchema.isOptional() ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA;
final String output = new String(input, this.config.charset);
return new SchemaAndValue(outputSchema, output);
}

Map<Schema, Schema> schemaCache = new HashMap<>();

@Override
protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) {
final Schema schema = this.schemaCache.computeIfAbsent(inputSchema, s -> {
final SchemaBuilder builder = SchemaBuilder.struct();
if (!Strings.isNullOrEmpty(inputSchema.name())) {
builder.name(inputSchema.name());
}
if (inputSchema.isOptional()) {
builder.optional();
}

for (Field field : inputSchema.fields()) {
log.trace("processStruct() - processing '{}'", field.name());
final Schema fieldSchema;
if (this.config.fields.contains(field.name())) {
fieldSchema = field.schema().isOptional() ?
Schema.OPTIONAL_STRING_SCHEMA :
Schema.STRING_SCHEMA;
} else {
fieldSchema = field.schema();
}
builder.field(field.name(), fieldSchema);
}
return builder.build();
});

Struct struct = new Struct(schema);
for (Field field : schema.fields()) {
if (this.config.fields.contains(field.name())) {
byte[] buffer = input.getBytes(field.name());
struct.put(field.name(), new String(buffer, this.config.charset));
} else {
struct.put(field.name(), input.get(field.name()));
}
}
return new SchemaAndValue(schema, struct);
}

@Title("BytesToString(Key)")
@Description("This transformation is used to convert a byte array to a string.")
@DocumentationTip("This transformation is used to manipulate fields in the Key of the record.")
public static class Key<R extends ConnectRecord<R>> extends BytesToString<R> {

@Override
public R apply(R r) {
final SchemaAndValue transformed = process(r, r.keySchema(), r.key());

return r.newRecord(
r.topic(),
r.kafkaPartition(),
transformed.schema(),
transformed.value(),
r.valueSchema(),
r.value(),
r.timestamp()
);
}
}

@Title("BytesToString(Value)")
@Description("This transformation is used to convert a byte array to a string.")
public static class Value<R extends ConnectRecord<R>> extends BytesToString<R> {
@Override
public R apply(R r) {
final SchemaAndValue transformed = process(r, r.valueSchema(), r.value());

return r.newRecord(
r.topic(),
r.kafkaPartition(),
r.keySchema(),
r.key(),
transformed.schema(),
transformed.value(),
r.timestamp()
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Copyright © 2017 Jeremy Custenborder ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.jcustenborder.kafka.connect.transform.common;

import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import java.nio.charset.Charset;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class BytesToStringConfig extends AbstractConfig {
public final Charset charset;
public final Set<String> fields;

public static final String CHARSET_CONFIG = "charset";
public static final String CHARSET_DOC = "The charset to use when creating the output string.";

public static final String FIELD_CONFIG = "fields";
public static final String FIELD_DOC = "The fields to transform.";


public BytesToStringConfig(Map<String, ?> settings) {
super(config(), settings);
String charset = getString(CHARSET_CONFIG);
this.charset = Charset.forName(charset);
List<String> fields = getList(FIELD_CONFIG);
this.fields = new HashSet<>(fields);
}

public static ConfigDef config() {
return new ConfigDef()
.define(
ConfigKeyBuilder.of(CHARSET_CONFIG, ConfigDef.Type.STRING)
.documentation(CHARSET_DOC)
.defaultValue("UTF-8")
.importance(ConfigDef.Importance.HIGH)
.build()
).define(
ConfigKeyBuilder.of(FIELD_CONFIG, ConfigDef.Type.LIST)
.documentation(FIELD_DOC)
.defaultValue(Collections.emptyList())
.importance(ConfigDef.Importance.HIGH)
.build()
);
}

}
Loading

0 comments on commit a2b19cd

Please sign in to comment.