Skip to content

Commit

Permalink
Added more documentation to HeaderToField. Fixes #66. (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcustenborder authored Jan 14, 2021
1 parent 1f794b1 commit ccf9778
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>com.github.jcustenborder.kafka.connect</groupId>
<artifactId>kafka-connect-parent</artifactId>
<version>2.2.1-cp1</version>
<version>2.6.0</version>
</parent>
<artifactId>kafka-connect-transform-common</artifactId>
<version>0.1.0-SNAPSHOT</version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Copyright © 2017 Jeremy Custenborder ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.jcustenborder.kafka.connect.transform.common;

import com.fasterxml.jackson.databind.SerializationFeature;
import com.github.jcustenborder.kafka.connect.utils.jackson.ObjectMapperFactory;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedHashMap;
import java.util.Map;

public class Debug<R extends ConnectRecord<R>> implements Transformation<R> {
private static final Logger log = LoggerFactory.getLogger(Debug.class);

void addConnectRecord(Map<String, Object> debugContent, ConnectRecord r) {
debugContent.put("topic", r.topic());
debugContent.put("kafkaPartition", r.kafkaPartition());
debugContent.put("timestamp", r.timestamp());
}

void addSinkRecord(Map<String, Object> debugContent, SinkRecord r) {
debugContent.put("timestampType", r.timestampType());
debugContent.put("kafkaOffset", r.kafkaOffset());
}

void addSourceRecord(Map<String, Object> debugContent, SourceRecord r) {
debugContent.put("sourcePartition", r.sourcePartition());
debugContent.put("sourceOffset", r.sourceOffset());
}

void addKey(Map<String, Object> debugContent, R record) {
Object result = record.key();
debugContent.put("key", result);
}

void addValue(Map<String, Object> debugContent, R record) {
Object result = record.value();
debugContent.put("value", result);
}

@Override
public R apply(R r) {
try {
Map<String, Object> debugContent = new LinkedHashMap<>();
addConnectRecord(debugContent, r);
if (r instanceof SinkRecord) {
SinkRecord sinkRecord = (SinkRecord) r;
addSinkRecord(debugContent, sinkRecord);
} else if (r instanceof SourceRecord) {
SourceRecord sourceRecord = (SourceRecord) r;
addSourceRecord(debugContent, sourceRecord);
}
addKey(debugContent, r);
addValue(debugContent, r);
log.info("\n{}", ObjectMapperFactory.INSTANCE.writeValueAsString(debugContent));
} catch (Exception ex) {
log.error("Exception while generating debug content.", ex);
}

return r;
}

@Override
public ConfigDef config() {
return new ConfigDef();
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> settings) {
ObjectMapperFactory.INSTANCE.configure(SerializationFeature.INDENT_OUTPUT, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.github.jcustenborder.kafka.connect.transform.common;

import com.github.jcustenborder.kafka.connect.utils.config.Description;
import com.github.jcustenborder.kafka.connect.utils.config.Title;
import com.github.jcustenborder.kafka.connect.utils.data.SchemaBuilders;
import com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation;
import org.apache.kafka.common.config.ConfigDef;
Expand All @@ -32,6 +34,9 @@
import java.util.List;
import java.util.Map;

@Title("HeaderToField")
@Description("This transformation is used to copy the value of a header to a field in the key or " +
"value of the record.")
public class HeaderToField<R extends ConnectRecord<R>> extends BaseKeyValueTransformation<R> {
private static final Logger log = LoggerFactory.getLogger(HeaderToField.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,28 @@
class HeaderToFieldConfig extends AbstractConfig {
private static final Logger log = LoggerFactory.getLogger(HeaderToFieldConfig.class);

static String supportedHeaderTypes() {
StringBuilder builder = new StringBuilder();
HeaderToFieldMapping.SCHEMA_TYPE_LOOKUP.keySet()
.stream()
.sorted()
.forEach(key -> {
builder.append("* ");
builder.append(key);
builder.append("\n");
});

return builder.toString();
}


public static final String HEADER_MAPPINGS_CONF = "header.mappings";
static final String HEADER_MAPPINGS_DOC = "The mapping of the header to the field in the message.";
static final String HEADER_MAPPINGS_DOC = "The mapping of the header to the field in the message. " +
"More than one mapping can be specified separated by a comma. " +
"The format is `<header name>:<header type>[:field name]`. Supported header types are:\n\n" +
supportedHeaderTypes();



public final List<HeaderToFieldMapping> mappings;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Copyright © 2017 Jeremy Custenborder ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.jcustenborder.kafka.connect.transform.common;

import com.github.jcustenborder.kafka.connect.utils.SinkRecordHelper;
import com.google.common.collect.ImmutableMap;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class DebugTest {
Debug<SinkRecord> transform;

@BeforeEach
public void before() {
this.transform = new Debug<>();
this.transform.configure(ImmutableMap.of());
}

@Test
public void apply() {
Schema valueSchema = SchemaBuilder.struct()
.name("foo")
.field("firstName", Schema.STRING_SCHEMA)
.field("lastName", Schema.STRING_SCHEMA)
.build();


SinkRecord input = SinkRecordHelper.write("test", Schema.STRING_SCHEMA, "1234", valueSchema, new Struct(valueSchema).put("firstName", "adfs").put("lastName", "asdfas"));
SinkRecord output = this.transform.apply(input);

}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
{
"input": {
"topic": "testing",
"kafkaPartition": 1,
"valueSchema": {
"type": "STRUCT",
"isOptional": false,
"fieldSchemas": {
"firstName": {
"type": "STRING",
"isOptional": true
},
"lastName": {
"type": "STRING",
"isOptional": true
}
}
},
"value": {
"schema": {
"type": "STRUCT",
"isOptional": false,
"fieldSchemas": {
"firstName": {
"type": "STRING",
"isOptional": true
},
"lastName": {
"type": "STRING",
"isOptional": true
}
}
},
"fieldValues": [
{
"name": "firstName",
"schema": {
"type": "STRING",
"isOptional": true
},
"storage": "example"
},
{
"name": "lastName",
"schema": {
"type": "STRING",
"isOptional": true
},
"storage": "user"
}
]
},
"timestamp": 123412351,
"timestampType": "NO_TIMESTAMP_TYPE",
"offset": 12345,
"headers": [
{
"name": "file.path",
"schema": {
"type": "STRING",
"isOptional": false
},
"storage": "/tmp/input/test1.csv"
},
{
"name": "file.name",
"schema": {
"type": "STRING",
"isOptional": false
},
"storage": "test1.csv"
},
{
"name": "file.last.modified",
"schema": {
"type": "INT64",
"isOptional": false,
"name": "org.apache.kafka.connect.data.Timestamp"
},
"storage": 1610656447123
}
]
},
"description": "The following example takes the output from the Spooldir connector copies headers for the metadata to fields in the value.",
"name": "Spooldir metadata",
"config": {
"header.mappings": "file.path:STRING:file_path,file.name:STRING:file_name,file.last.modified:INT64(Timestamp):file_last_modified"
},
"childClass": "Value"
}

0 comments on commit ccf9778

Please sign in to comment.