diff --git a/README.md b/README.md index 53489a9..ad8d126 100644 --- a/README.md +++ b/README.md @@ -1,401 +1,1033 @@ + # Introduction -This project provides some common transformation functionality for Kafka Connect. + +This project contains common transformations for every day use cases with Kafka Connect. + + + # Transformations + ## BytesToString(Key) This transformation is used to convert a byte array to a string. +### Tip + +This transformation is used to manipulate fields in the Key of the record. + + ### Configuration -| Name | Type | Importance | Default Value | Validator | Documentation | -| ------- | ------ | ---------- | ------------- | --------- | ---------------------------------------------------| -| charset | String | High | UTF-8 | | The charset to use when creating the output string.| -| fields | List | High | [] | | The fields to transform. | +#### General + + +##### `charset` + +The charset to use when creating the output string. + +*Importance:* High + +*Type:* String + +*Default Value:* UTF-8 + + + +##### `fields` + +The fields to transform. + +*Importance:* High + +*Type:* List + +*Default Value:* [] + -#### Standalone Example +#### Examples + +##### Standalone Example + +This configuration is used typically along with [standalone mode](http://docs.confluent.io/current/connect/concepts.html#standalone-workers). ```properties -transforms=Key -transforms.Key.type=com.github.jcustenborder.kafka.connect.transform.common.BytesToString$Key -# The following values must be configured. +name=Connector1 +connector.class=org.apache.kafka.some.SourceConnector +tasks.max=1 +transforms=tran +transforms.tran.type=com.github.jcustenborder.kafka.connect.transform.common.BytesToString$Key ``` -#### Distributed Example +##### Distributed Example + +This configuration is used typically along with [distributed mode](http://docs.confluent.io/current/connect/concepts.html#distributed-workers). +Write the following json to `connector.json`, configure all of the required values, and use the command below to +post the configuration to one the distributed connect worker(s). ```json { -"name": "connector1", - "config": { - "connector.class": "com.github.jcustenborder.kafka.connect.transform.common.BytesToString$Key", - "transforms": "Key", - "transforms.Key.type": "com.github.jcustenborder.kafka.connect.transform.common.BytesToString$Key", - } + "name" : "Connector1", + "connector.class" : "org.apache.kafka.some.SourceConnector", + "transforms" : "tran", + "transforms.tran.type" : "com.github.jcustenborder.kafka.connect.transform.common.BytesToString$Key" } ``` +Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8083/` the the endpoint of +one of your Kafka Connect worker(s). + +Create a new instance. +```bash +curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors +``` + +Update an existing instance. +```bash +curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config +``` + + + ## BytesToString(Value) This transformation is used to convert a byte array to a string. + + ### Configuration -| Name | Type | Importance | Default Value | Validator | Documentation | -| ------- | ------ | ---------- | ------------- | --------- | ---------------------------------------------------| -| charset | String | High | UTF-8 | | The charset to use when creating the output string.| -| fields | List | High | [] | | The fields to transform. | +#### General + + +##### `charset` + +The charset to use when creating the output string. + +*Importance:* High + +*Type:* String + +*Default Value:* UTF-8 + + + +##### `fields` + +The fields to transform. + +*Importance:* High +*Type:* List -#### Standalone Example +*Default Value:* [] + + + +#### Examples + +##### Standalone Example + +This configuration is used typically along with [standalone mode](http://docs.confluent.io/current/connect/concepts.html#standalone-workers). ```properties -transforms=Value -transforms.Value.type=com.github.jcustenborder.kafka.connect.transform.common.BytesToString$Value -# The following values must be configured. +name=Connector1 +connector.class=org.apache.kafka.some.SourceConnector +tasks.max=1 +transforms=tran +transforms.tran.type=com.github.jcustenborder.kafka.connect.transform.common.BytesToString$Value ``` -#### Distributed Example +##### Distributed Example + +This configuration is used typically along with [distributed mode](http://docs.confluent.io/current/connect/concepts.html#distributed-workers). +Write the following json to `connector.json`, configure all of the required values, and use the command below to +post the configuration to one the distributed connect worker(s). ```json { -"name": "connector1", - "config": { - "connector.class": "com.github.jcustenborder.kafka.connect.transform.common.BytesToString$Value", - "transforms": "Value", - "transforms.Value.type": "com.github.jcustenborder.kafka.connect.transform.common.BytesToString$Value", - } + "name" : "Connector1", + "connector.class" : "org.apache.kafka.some.SourceConnector", + "transforms" : "tran", + "transforms.tran.type" : "com.github.jcustenborder.kafka.connect.transform.common.BytesToString$Value" } ``` +Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8083/` the the endpoint of +one of your Kafka Connect worker(s). + +Create a new instance. +```bash +curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors +``` + +Update an existing instance. +```bash +curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config +``` + + + ## ChangeCase(Key) This transformation is used to change the case of fields in an input struct. +### Tip + +This transformation is used to manipulate fields in the Key of the record. + + ### Configuration -| Name | Type | Importance | Default Value | Validator | Documentation | -| ---- | ------ | ---------- | ------------- | ---------------------------------------------------------------------------------------------------------------- | ------------------------| -| from | String | High | | ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, UPPER_CAMEL, UPPER_UNDERSCORE]} | The format to move from | -| to | String | High | | ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, UPPER_CAMEL, UPPER_UNDERSCORE]} | | +#### General + + +##### `from` + +The format to move from + +*Importance:* High + +*Type:* String + +*Validator:* ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, UPPER_CAMEL, UPPER_UNDERSCORE]} + -#### Standalone Example +##### `to` + + + +*Importance:* High + +*Type:* String + +*Validator:* ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, UPPER_CAMEL, UPPER_UNDERSCORE]} + + + +#### Examples + +##### Standalone Example + +This configuration is used typically along with [standalone mode](http://docs.confluent.io/current/connect/concepts.html#standalone-workers). ```properties -transforms=Key -transforms.Key.type=com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Key -# The following values must be configured. -transforms.Key.from= -transforms.Key.to= +name=Connector1 +connector.class=org.apache.kafka.some.SourceConnector +tasks.max=1 +transforms=tran +transforms.tran.type=com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Key +transforms.tran.from=< Required Configuration > +transforms.tran.to=< Required Configuration > ``` -#### Distributed Example +##### Distributed Example + +This configuration is used typically along with [distributed mode](http://docs.confluent.io/current/connect/concepts.html#distributed-workers). +Write the following json to `connector.json`, configure all of the required values, and use the command below to +post the configuration to one the distributed connect worker(s). ```json { -"name": "connector1", - "config": { - "connector.class": "com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Key", - "transforms": "Key", - "transforms.Key.type": "com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Key", - "transforms.Key.from":"", - "transforms.Key.to":"", - } + "name" : "Connector1", + "connector.class" : "org.apache.kafka.some.SourceConnector", + "transforms" : "tran", + "transforms.tran.type" : "com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Key", + "transforms.tran.from" : "< Required Configuration >", + "transforms.tran.to" : "< Required Configuration >" } ``` +Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8083/` the the endpoint of +one of your Kafka Connect worker(s). + +Create a new instance. +```bash +curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors +``` + +Update an existing instance. +```bash +curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config +``` + + + ## ChangeCase(Value) This transformation is used to change the case of fields in an input struct. + + +### Configuration + +#### General + + +##### `from` + +The format to move from + +*Importance:* High + +*Type:* String + +*Validator:* ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, UPPER_CAMEL, UPPER_UNDERSCORE]} + + + +##### `to` + + + +*Importance:* High + +*Type:* String + +*Validator:* ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, UPPER_CAMEL, UPPER_UNDERSCORE]} + + + +#### Examples + +##### Standalone Example + +This configuration is used typically along with [standalone mode](http://docs.confluent.io/current/connect/concepts.html#standalone-workers). + +```properties +name=Connector1 +connector.class=org.apache.kafka.some.SourceConnector +tasks.max=1 +transforms=tran +transforms.tran.type=com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Value +transforms.tran.from=< Required Configuration > +transforms.tran.to=< Required Configuration > +``` + +##### Distributed Example + +This configuration is used typically along with [distributed mode](http://docs.confluent.io/current/connect/concepts.html#distributed-workers). +Write the following json to `connector.json`, configure all of the required values, and use the command below to +post the configuration to one the distributed connect worker(s). + +```json +{ + "name" : "Connector1", + "connector.class" : "org.apache.kafka.some.SourceConnector", + "transforms" : "tran", + "transforms.tran.type" : "com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Value", + "transforms.tran.from" : "< Required Configuration >", + "transforms.tran.to" : "< Required Configuration >" +} +``` + +Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8083/` the the endpoint of +one of your Kafka Connect worker(s). + +Create a new instance. +```bash +curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors +``` + +Update an existing instance. +```bash +curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config +``` + + + +## ChangeTopicCase + +This transformation is used to change the case of a topic. + +### Tip + +This transformation will convert a topic name like 'TOPIC_NAME' to `topicName`, or `topic_name`. + + ### Configuration -| Name | Type | Importance | Default Value | Validator | Documentation | -| ---- | ------ | ---------- | ------------- | ---------------------------------------------------------------------------------------------------------------- | ------------------------| -| from | String | High | | ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, UPPER_CAMEL, UPPER_UNDERSCORE]} | The format to move from | -| to | String | High | | ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, UPPER_CAMEL, UPPER_UNDERSCORE]} | | +#### General + + +##### `from` + +The format of the incoming topic name. `LOWER_CAMEL` = Java variable naming convention, e.g., "lowerCamel". `LOWER_HYPHEN` = Hyphenated variable naming convention, e.g., "lower-hyphen". `LOWER_UNDERSCORE` = C++ variable naming convention, e.g., "lower_underscore". `UPPER_CAMEL` = Java and C++ class naming convention, e.g., "UpperCamel". `UPPER_UNDERSCORE` = Java and C++ constant naming convention, e.g., "UPPER_UNDERSCORE". + +*Importance:* High +*Type:* String -#### Standalone Example +*Validator:* ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, UPPER_CAMEL, UPPER_UNDERSCORE]} + + + +##### `to` + +The format of the outgoing topic name. `LOWER_CAMEL` = Java variable naming convention, e.g., "lowerCamel". `LOWER_HYPHEN` = Hyphenated variable naming convention, e.g., "lower-hyphen". `LOWER_UNDERSCORE` = C++ variable naming convention, e.g., "lower_underscore". `UPPER_CAMEL` = Java and C++ class naming convention, e.g., "UpperCamel". `UPPER_UNDERSCORE` = Java and C++ constant naming convention, e.g., "UPPER_UNDERSCORE". + +*Importance:* High + +*Type:* String + +*Validator:* ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, UPPER_CAMEL, UPPER_UNDERSCORE]} + + + +#### Examples + +##### Standalone Example + +This configuration is used typically along with [standalone mode](http://docs.confluent.io/current/connect/concepts.html#standalone-workers). ```properties -transforms=Value -transforms.Value.type=com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Value -# The following values must be configured. -transforms.Value.from= -transforms.Value.to= +name=Connector1 +connector.class=org.apache.kafka.some.SourceConnector +tasks.max=1 +transforms=tran +transforms.tran.type=com.github.jcustenborder.kafka.connect.transform.common.ChangeTopicCase +transforms.tran.from=< Required Configuration > +transforms.tran.to=< Required Configuration > ``` -#### Distributed Example +##### Distributed Example + +This configuration is used typically along with [distributed mode](http://docs.confluent.io/current/connect/concepts.html#distributed-workers). +Write the following json to `connector.json`, configure all of the required values, and use the command below to +post the configuration to one the distributed connect worker(s). ```json { -"name": "connector1", - "config": { - "connector.class": "com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Value", - "transforms": "Value", - "transforms.Value.type": "com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Value", - "transforms.Value.from":"", - "transforms.Value.to":"", - } + "name" : "Connector1", + "connector.class" : "org.apache.kafka.some.SourceConnector", + "transforms" : "tran", + "transforms.tran.type" : "com.github.jcustenborder.kafka.connect.transform.common.ChangeTopicCase", + "transforms.tran.from" : "< Required Configuration >", + "transforms.tran.to" : "< Required Configuration >" } ``` +Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8083/` the the endpoint of +one of your Kafka Connect worker(s). + +Create a new instance. +```bash +curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors +``` + +Update an existing instance. +```bash +curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config +``` + + + ## ExtractNestedField(Key) This transformation is used to extract a field from a nested struct and append it to the parent struct. +### Tip + +This transformation is used to manipulate fields in the Key of the record. + + ### Configuration -| Name | Type | Importance | Default Value | Validator | Documentation | -| ---------------------- | ------ | ---------- | ------------- | --------- | ------------------------------------------------------------------------------------------------------------------------------------------------| -| input.inner.field.name | String | High | | | The field on the child struct containing the field to be extracted. For example if you wanted the extract `address.state` you would use `state`.| -| input.outer.field.name | String | High | | | The field on the parent struct containing the child struct. For example if you wanted the extract `address.state` you would use `address`. | -| output.field.name | String | High | | | The field to place the extracted value into. | +#### General -#### Standalone Example +##### `input.inner.field.name` + +The field on the child struct containing the field to be extracted. For example if you wanted the extract `address.state` you would use `state`. + +*Importance:* High + +*Type:* String + + + +##### `input.outer.field.name` + +The field on the parent struct containing the child struct. For example if you wanted the extract `address.state` you would use `address`. + +*Importance:* High + +*Type:* String + + + +##### `output.field.name` + +The field to place the extracted value into. + +*Importance:* High + +*Type:* String + + + +#### Examples + +##### Standalone Example + +This configuration is used typically along with [standalone mode](http://docs.confluent.io/current/connect/concepts.html#standalone-workers). ```properties -transforms=Key -transforms.Key.type=com.github.jcustenborder.kafka.connect.transform.common.ExtractNestedField$Key -# The following values must be configured. -transforms.Key.input.inner.field.name= -transforms.Key.input.outer.field.name= -transforms.Key.output.field.name= +name=Connector1 +connector.class=org.apache.kafka.some.SourceConnector +tasks.max=1 +transforms=tran +transforms.tran.type=com.github.jcustenborder.kafka.connect.transform.common.ExtractNestedField$Key +transforms.tran.input.inner.field.name=< Required Configuration > +transforms.tran.input.outer.field.name=< Required Configuration > +transforms.tran.output.field.name=< Required Configuration > ``` -#### Distributed Example +##### Distributed Example + +This configuration is used typically along with [distributed mode](http://docs.confluent.io/current/connect/concepts.html#distributed-workers). +Write the following json to `connector.json`, configure all of the required values, and use the command below to +post the configuration to one the distributed connect worker(s). ```json { -"name": "connector1", - "config": { - "connector.class": "com.github.jcustenborder.kafka.connect.transform.common.ExtractNestedField$Key", - "transforms": "Key", - "transforms.Key.type": "com.github.jcustenborder.kafka.connect.transform.common.ExtractNestedField$Key", - "transforms.Key.input.inner.field.name":"", - "transforms.Key.input.outer.field.name":"", - "transforms.Key.output.field.name":"", - } + "name" : "Connector1", + "connector.class" : "org.apache.kafka.some.SourceConnector", + "transforms" : "tran", + "transforms.tran.type" : "com.github.jcustenborder.kafka.connect.transform.common.ExtractNestedField$Key", + "transforms.tran.input.inner.field.name" : "< Required Configuration >", + "transforms.tran.input.outer.field.name" : "< Required Configuration >", + "transforms.tran.output.field.name" : "< Required Configuration >" } ``` +Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8083/` the the endpoint of +one of your Kafka Connect worker(s). + +Create a new instance. +```bash +curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors +``` + +Update an existing instance. +```bash +curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config +``` + + + ## ExtractNestedField(Value) This transformation is used to extract a field from a nested struct and append it to the parent struct. + + ### Configuration -| Name | Type | Importance | Default Value | Validator | Documentation | -| ---------------------- | ------ | ---------- | ------------- | --------- | ------------------------------------------------------------------------------------------------------------------------------------------------| -| input.inner.field.name | String | High | | | The field on the child struct containing the field to be extracted. For example if you wanted the extract `address.state` you would use `state`.| -| input.outer.field.name | String | High | | | The field on the parent struct containing the child struct. For example if you wanted the extract `address.state` you would use `address`. | -| output.field.name | String | High | | | The field to place the extracted value into. | +#### General + + +##### `input.inner.field.name` + +The field on the child struct containing the field to be extracted. For example if you wanted the extract `address.state` you would use `state`. + +*Importance:* High + +*Type:* String + + + +##### `input.outer.field.name` + +The field on the parent struct containing the child struct. For example if you wanted the extract `address.state` you would use `address`. + +*Importance:* High + +*Type:* String + + + +##### `output.field.name` + +The field to place the extracted value into. +*Importance:* High -#### Standalone Example +*Type:* String + + + +#### Examples + +##### Standalone Example + +This configuration is used typically along with [standalone mode](http://docs.confluent.io/current/connect/concepts.html#standalone-workers). ```properties -transforms=Value -transforms.Value.type=com.github.jcustenborder.kafka.connect.transform.common.ExtractNestedField$Value -# The following values must be configured. -transforms.Value.input.inner.field.name= -transforms.Value.input.outer.field.name= -transforms.Value.output.field.name= +name=Connector1 +connector.class=org.apache.kafka.some.SourceConnector +tasks.max=1 +transforms=tran +transforms.tran.type=com.github.jcustenborder.kafka.connect.transform.common.ExtractNestedField$Value +transforms.tran.input.inner.field.name=< Required Configuration > +transforms.tran.input.outer.field.name=< Required Configuration > +transforms.tran.output.field.name=< Required Configuration > ``` -#### Distributed Example +##### Distributed Example + +This configuration is used typically along with [distributed mode](http://docs.confluent.io/current/connect/concepts.html#distributed-workers). +Write the following json to `connector.json`, configure all of the required values, and use the command below to +post the configuration to one the distributed connect worker(s). ```json { -"name": "connector1", - "config": { - "connector.class": "com.github.jcustenborder.kafka.connect.transform.common.ExtractNestedField$Value", - "transforms": "Value", - "transforms.Value.type": "com.github.jcustenborder.kafka.connect.transform.common.ExtractNestedField$Value", - "transforms.Value.input.inner.field.name":"", - "transforms.Value.input.outer.field.name":"", - "transforms.Value.output.field.name":"", - } + "name" : "Connector1", + "connector.class" : "org.apache.kafka.some.SourceConnector", + "transforms" : "tran", + "transforms.tran.type" : "com.github.jcustenborder.kafka.connect.transform.common.ExtractNestedField$Value", + "transforms.tran.input.inner.field.name" : "< Required Configuration >", + "transforms.tran.input.outer.field.name" : "< Required Configuration >", + "transforms.tran.output.field.name" : "< Required Configuration >" } ``` +Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8083/` the the endpoint of +one of your Kafka Connect worker(s). + +Create a new instance. +```bash +curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors +``` + +Update an existing instance. +```bash +curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config +``` + + + ## ExtractTimestamp(Value) This transformation is used to use a field from the input data to override the timestamp for the record. + + ### Configuration -| Name | Type | Importance | Default Value | Validator | Documentation | -| ---------- | ------ | ---------- | ------------- | --------- | ---------------------------------------------------------------------------| -| field.name | String | High | | | The field to pull the timestamp from. This must be an int64 or a timestamp.| +#### General + + +##### `field.name` + +The field to pull the timestamp from. This must be an int64 or a timestamp. + +*Importance:* High + +*Type:* String + + + +#### Examples +##### Standalone Example -#### Standalone Example +This configuration is used typically along with [standalone mode](http://docs.confluent.io/current/connect/concepts.html#standalone-workers). ```properties -transforms=Value -transforms.Value.type=com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Value -# The following values must be configured. -transforms.Value.field.name= +name=Connector1 +connector.class=org.apache.kafka.some.SourceConnector +tasks.max=1 +transforms=tran +transforms.tran.type=com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Value +transforms.tran.field.name=< Required Configuration > ``` -#### Distributed Example +##### Distributed Example + +This configuration is used typically along with [distributed mode](http://docs.confluent.io/current/connect/concepts.html#distributed-workers). +Write the following json to `connector.json`, configure all of the required values, and use the command below to +post the configuration to one the distributed connect worker(s). ```json { -"name": "connector1", - "config": { - "connector.class": "com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Value", - "transforms": "Value", - "transforms.Value.type": "com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Value", - "transforms.Value.field.name":"", - } + "name" : "Connector1", + "connector.class" : "org.apache.kafka.some.SourceConnector", + "transforms" : "tran", + "transforms.tran.type" : "com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Value", + "transforms.tran.field.name" : "< Required Configuration >" } ``` +Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8083/` the the endpoint of +one of your Kafka Connect worker(s). + +Create a new instance. +```bash +curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors +``` + +Update an existing instance. +```bash +curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config +``` + + + ## PatternRename(Key) This transformation is used to rename fields in the key of an input struct based on a regular expression and a replacement string. +### Tip + +This transformation is used to manipulate fields in the Key of the record. + + ### Configuration -| Name | Type | Importance | Default Value | Validator | Documentation| -| ------------------- | ------ | ---------- | ------------------ | --------------------------------------------------------------------------------------------------------------------- | -------------| -| field.pattern | String | High | | | | -| field.replacement | String | High | | | | -| field.pattern.flags | List | Low | [CASE_INSENSITIVE] | [UNICODE_CHARACTER_CLASS, CANON_EQ, UNICODE_CASE, DOTALL, LITERAL, MULTILINE, COMMENTS, CASE_INSENSITIVE, UNIX_LINES] | | +#### General + + +##### `field.pattern` + + + +*Importance:* High + +*Type:* String + + + +##### `field.replacement` + + + +*Importance:* High +*Type:* String -#### Standalone Example + + +##### `field.pattern.flags` + + + +*Importance:* Low + +*Type:* List + +*Default Value:* [CASE_INSENSITIVE] + +*Validator:* [UNICODE_CHARACTER_CLASS, CANON_EQ, UNICODE_CASE, DOTALL, LITERAL, MULTILINE, COMMENTS, CASE_INSENSITIVE, UNIX_LINES] + + + +#### Examples + +##### Standalone Example + +This configuration is used typically along with [standalone mode](http://docs.confluent.io/current/connect/concepts.html#standalone-workers). ```properties -transforms=Key -transforms.Key.type=com.github.jcustenborder.kafka.connect.transform.common.PatternRename$Key -# The following values must be configured. -transforms.Key.field.pattern= -transforms.Key.field.replacement= +name=Connector1 +connector.class=org.apache.kafka.some.SourceConnector +tasks.max=1 +transforms=tran +transforms.tran.type=com.github.jcustenborder.kafka.connect.transform.common.PatternRename$Key +transforms.tran.field.pattern=< Required Configuration > +transforms.tran.field.replacement=< Required Configuration > ``` -#### Distributed Example +##### Distributed Example + +This configuration is used typically along with [distributed mode](http://docs.confluent.io/current/connect/concepts.html#distributed-workers). +Write the following json to `connector.json`, configure all of the required values, and use the command below to +post the configuration to one the distributed connect worker(s). ```json { -"name": "connector1", - "config": { - "connector.class": "com.github.jcustenborder.kafka.connect.transform.common.PatternRename$Key", - "transforms": "Key", - "transforms.Key.type": "com.github.jcustenborder.kafka.connect.transform.common.PatternRename$Key", - "transforms.Key.field.pattern":"", - "transforms.Key.field.replacement":"", - } + "name" : "Connector1", + "connector.class" : "org.apache.kafka.some.SourceConnector", + "transforms" : "tran", + "transforms.tran.type" : "com.github.jcustenborder.kafka.connect.transform.common.PatternRename$Key", + "transforms.tran.field.pattern" : "< Required Configuration >", + "transforms.tran.field.replacement" : "< Required Configuration >" } ``` +Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8083/` the the endpoint of +one of your Kafka Connect worker(s). + +Create a new instance. +```bash +curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors +``` + +Update an existing instance. +```bash +curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config +``` + + + ## PatternRename(Value) This transformation is used to rename fields in the value of an input struct based on a regular expression and a replacement string. + + ### Configuration -| Name | Type | Importance | Default Value | Validator | Documentation| -| ------------------- | ------ | ---------- | ------------------ | --------------------------------------------------------------------------------------------------------------------- | -------------| -| field.pattern | String | High | | | | -| field.replacement | String | High | | | | -| field.pattern.flags | List | Low | [CASE_INSENSITIVE] | [UNICODE_CHARACTER_CLASS, CANON_EQ, UNICODE_CASE, DOTALL, LITERAL, MULTILINE, COMMENTS, CASE_INSENSITIVE, UNIX_LINES] | | +#### General + + +##### `field.pattern` + + + +*Importance:* High + +*Type:* String + + + +##### `field.replacement` + + + +*Importance:* High + +*Type:* String + + + +##### `field.pattern.flags` + + + +*Importance:* Low + +*Type:* List + +*Default Value:* [CASE_INSENSITIVE] + +*Validator:* [UNICODE_CHARACTER_CLASS, CANON_EQ, UNICODE_CASE, DOTALL, LITERAL, MULTILINE, COMMENTS, CASE_INSENSITIVE, UNIX_LINES] -#### Standalone Example + +#### Examples + +##### Standalone Example + +This configuration is used typically along with [standalone mode](http://docs.confluent.io/current/connect/concepts.html#standalone-workers). ```properties -transforms=Value -transforms.Value.type=com.github.jcustenborder.kafka.connect.transform.common.PatternRename$Value -# The following values must be configured. -transforms.Value.field.pattern= -transforms.Value.field.replacement= +name=Connector1 +connector.class=org.apache.kafka.some.SourceConnector +tasks.max=1 +transforms=tran +transforms.tran.type=com.github.jcustenborder.kafka.connect.transform.common.PatternRename$Value +transforms.tran.field.pattern=< Required Configuration > +transforms.tran.field.replacement=< Required Configuration > ``` -#### Distributed Example +##### Distributed Example + +This configuration is used typically along with [distributed mode](http://docs.confluent.io/current/connect/concepts.html#distributed-workers). +Write the following json to `connector.json`, configure all of the required values, and use the command below to +post the configuration to one the distributed connect worker(s). ```json { -"name": "connector1", - "config": { - "connector.class": "com.github.jcustenborder.kafka.connect.transform.common.PatternRename$Value", - "transforms": "Value", - "transforms.Value.type": "com.github.jcustenborder.kafka.connect.transform.common.PatternRename$Value", - "transforms.Value.field.pattern":"", - "transforms.Value.field.replacement":"", - } + "name" : "Connector1", + "connector.class" : "org.apache.kafka.some.SourceConnector", + "transforms" : "tran", + "transforms.tran.type" : "com.github.jcustenborder.kafka.connect.transform.common.PatternRename$Value", + "transforms.tran.field.pattern" : "< Required Configuration >", + "transforms.tran.field.replacement" : "< Required Configuration >" } ``` +Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8083/` the the endpoint of +one of your Kafka Connect worker(s). + +Create a new instance. +```bash +curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors +``` + +Update an existing instance. +```bash +curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config +``` + + + ## ToJson(Key) This transformation is used to take structured data such as AVRO and output it as JSON by way of the JsonConverter built into Kafka Connect. +### Tip + +This transformation is used to manipulate fields in the Key of the record. + + ### Configuration -| Name | Type | Importance | Default Value | Validator | Documentation | -| ------------------ | ------- | ---------- | ------------- | --------------- | -------------------------------------------------------------| -| output.schema.type | String | Medium | STRING | [STRING, BYTES] | The connect schema type to output the converted JSON as. | -| schemas.enable | Boolean | Medium | false | | Flag to determine if the JSON data should include the schema.| +#### General + + +##### `output.schema.type` + +The connect schema type to output the converted JSON as. + +*Importance:* Medium + +*Type:* String + +*Default Value:* STRING + +*Validator:* [STRING, BYTES] + -#### Standalone Example +##### `schemas.enable` + +Flag to determine if the JSON data should include the schema. + +*Importance:* Medium + +*Type:* Boolean + +*Default Value:* false + + + +#### Examples + +##### Standalone Example + +This configuration is used typically along with [standalone mode](http://docs.confluent.io/current/connect/concepts.html#standalone-workers). ```properties -transforms=Key -transforms.Key.type=com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Key -# The following values must be configured. +name=Connector1 +connector.class=org.apache.kafka.some.SourceConnector +tasks.max=1 +transforms=tran +transforms.tran.type=com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Key ``` -#### Distributed Example +##### Distributed Example + +This configuration is used typically along with [distributed mode](http://docs.confluent.io/current/connect/concepts.html#distributed-workers). +Write the following json to `connector.json`, configure all of the required values, and use the command below to +post the configuration to one the distributed connect worker(s). ```json { -"name": "connector1", - "config": { - "connector.class": "com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Key", - "transforms": "Key", - "transforms.Key.type": "com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Key", - } + "name" : "Connector1", + "connector.class" : "org.apache.kafka.some.SourceConnector", + "transforms" : "tran", + "transforms.tran.type" : "com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Key" } ``` +Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8083/` the the endpoint of +one of your Kafka Connect worker(s). + +Create a new instance. +```bash +curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors +``` + +Update an existing instance. +```bash +curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config +``` + + + ## ToJson(Value) This transformation is used to take structured data such as AVRO and output it as JSON by way of the JsonConverter built into Kafka Connect. + + ### Configuration -| Name | Type | Importance | Default Value | Validator | Documentation | -| ------------------ | ------- | ---------- | ------------- | --------------- | -------------------------------------------------------------| -| output.schema.type | String | Medium | STRING | [STRING, BYTES] | The connect schema type to output the converted JSON as. | -| schemas.enable | Boolean | Medium | false | | Flag to determine if the JSON data should include the schema.| +#### General + + +##### `output.schema.type` + +The connect schema type to output the converted JSON as. + +*Importance:* Medium +*Type:* String -#### Standalone Example +*Default Value:* STRING + +*Validator:* [STRING, BYTES] + + + +##### `schemas.enable` + +Flag to determine if the JSON data should include the schema. + +*Importance:* Medium + +*Type:* Boolean + +*Default Value:* false + + + +#### Examples + +##### Standalone Example + +This configuration is used typically along with [standalone mode](http://docs.confluent.io/current/connect/concepts.html#standalone-workers). ```properties -transforms=Value -transforms.Value.type=com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value -# The following values must be configured. +name=Connector1 +connector.class=org.apache.kafka.some.SourceConnector +tasks.max=1 +transforms=tran +transforms.tran.type=com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value ``` -#### Distributed Example +##### Distributed Example + +This configuration is used typically along with [distributed mode](http://docs.confluent.io/current/connect/concepts.html#distributed-workers). +Write the following json to `connector.json`, configure all of the required values, and use the command below to +post the configuration to one the distributed connect worker(s). ```json { -"name": "connector1", - "config": { - "connector.class": "com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value", - "transforms": "Value", - "transforms.Value.type": "com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value", - } + "name" : "Connector1", + "connector.class" : "org.apache.kafka.some.SourceConnector", + "transforms" : "tran", + "transforms.tran.type" : "com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value" } -``` \ No newline at end of file +``` + +Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8083/` the the endpoint of +one of your Kafka Connect worker(s). + +Create a new instance. +```bash +curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors +``` + +Update an existing instance. +```bash +curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config +``` + + diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeTopicCase.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeTopicCase.java new file mode 100644 index 0000000..f687e45 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeTopicCase.java @@ -0,0 +1,64 @@ +/** + * Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.transform.common; + +import com.github.jcustenborder.kafka.connect.utils.config.Description; +import com.github.jcustenborder.kafka.connect.utils.config.DocumentationTip; +import com.github.jcustenborder.kafka.connect.utils.config.Title; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.transforms.Transformation; + +import java.util.Map; + +@Title("ChangeTopicCase") +@Description("This transformation is used to change the case of a topic.") +@DocumentationTip("This transformation will convert a topic name like 'TOPIC_NAME' to `topicName`, " + + "or `topic_name`.") +public class ChangeTopicCase> implements Transformation { + + @Override + public R apply(R record) { + final String newTopic = this.config.from.to(this.config.to, record.topic()); + + return record.newRecord( + newTopic, + record.kafkaPartition(), + record.keySchema(), + record.key(), + record.valueSchema(), + record.value(), + record.timestamp() + ); + } + + ChangeTopicCaseConfig config; + + @Override + public ConfigDef config() { + return ChangeTopicCaseConfig.config(); + } + + @Override + public void close() { + + } + + @Override + public void configure(Map settings) { + this.config = new ChangeTopicCaseConfig(settings); + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeTopicCaseConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeTopicCaseConfig.java new file mode 100644 index 0000000..3baee7f --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeTopicCaseConfig.java @@ -0,0 +1,52 @@ +/** + * Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.transform.common; + +import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils; +import com.github.jcustenborder.kafka.connect.utils.config.ValidEnum; +import com.google.common.base.CaseFormat; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +import java.util.Map; + +class ChangeTopicCaseConfig extends AbstractConfig { + public final CaseFormat from; + public final CaseFormat to; + + static final String CONSTANTS = "`LOWER_CAMEL` = Java variable naming convention, e.g., \"lowerCamel\". " + + "`LOWER_HYPHEN` = Hyphenated variable naming convention, e.g., \"lower-hyphen\". " + + "`LOWER_UNDERSCORE` = C++ variable naming convention, e.g., \"lower_underscore\". " + + "`UPPER_CAMEL` = Java and C++ class naming convention, e.g., \"UpperCamel\". " + + "`UPPER_UNDERSCORE` = Java and C++ constant naming convention, e.g., \"UPPER_UNDERSCORE\"."; + + public static final String FROM_CONFIG = "from"; + static final String FROM_DOC = "The format of the incoming topic name. " + CONSTANTS; + public static final String TO_CONFIG = "to"; + static final String TO_DOC = "The format of the outgoing topic name. " + CONSTANTS; + + public ChangeTopicCaseConfig(Map originals) { + super(config(), originals); + this.from = ConfigUtils.getEnum(CaseFormat.class, this, FROM_CONFIG); + this.to = ConfigUtils.getEnum(CaseFormat.class, this, TO_CONFIG); + } + + public static ConfigDef config() { + return new ConfigDef() + .define(FROM_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ValidEnum.of(CaseFormat.class), ConfigDef.Importance.HIGH, FROM_DOC) + .define(TO_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ValidEnum.of(CaseFormat.class), ConfigDef.Importance.HIGH, TO_DOC); + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/package-info.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/package-info.java new file mode 100644 index 0000000..fcb3fa9 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/package-info.java @@ -0,0 +1,22 @@ +/** + * Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@Introduction("\n" + + "This project contains common transformations for every day use cases with Kafka Connect.") + @Title("Common Transformations") +package com.github.jcustenborder.kafka.connect.transform.common; + +import com.github.jcustenborder.kafka.connect.utils.config.Introduction; +import com.github.jcustenborder.kafka.connect.utils.config.Title; \ No newline at end of file diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeTopicCaseTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeTopicCaseTest.java new file mode 100644 index 0000000..1c436ef --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/ChangeTopicCaseTest.java @@ -0,0 +1,89 @@ +package com.github.jcustenborder.kafka.connect.transform.common; + +import com.google.common.base.CaseFormat; +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableMap; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.transforms.Transformation; +import org.junit.jupiter.api.DynamicTest; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestFactory; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class ChangeTopicCaseTest extends TransformationTest { + protected ChangeTopicCaseTest() { + super(false); + } + + @Override + protected Transformation create() { + return new ChangeTopicCase<>(); + } + + SinkRecord record(String topic) { + return new SinkRecord( + topic, + 1, + null, + null, + null, + null, + 12345L + ); + + } + + static class TestCase { + final CaseFormat from; + final String input; + final CaseFormat to; + final String expected; + + TestCase(CaseFormat from, String input, CaseFormat to, String expected) { + this.from = from; + this.input = input; + this.to = to; + this.expected = expected; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("from", this.from) + .add("to", this.to) + .add("input", this.input) + .add("expected", this.expected) + .toString(); + } + } + + static TestCase of(CaseFormat from, String input, CaseFormat to, String expected) { + return new TestCase(from, input, to, expected); + } + + @TestFactory + public Stream apply() { + return Arrays.asList( + of(CaseFormat.UPPER_UNDERSCORE, "TOPIC_NAME", CaseFormat.LOWER_CAMEL, "topicName"), + of(CaseFormat.LOWER_CAMEL, "topicName", CaseFormat.UPPER_UNDERSCORE, "TOPIC_NAME"), + of(CaseFormat.LOWER_HYPHEN, "topic-name", CaseFormat.LOWER_UNDERSCORE, "topic_name") + ).stream() + .map(t -> DynamicTest.dynamicTest(t.toString(), () -> { + final Map settings = ImmutableMap.of( + ChangeTopicCaseConfig.FROM_CONFIG, t.from.toString(), + ChangeTopicCaseConfig.TO_CONFIG, t.to.toString() + ); + this.transformation.configure(settings); + final SinkRecord input = record(t.input); + final SinkRecord actual = this.transformation.apply(input); + assertNotNull(actual, "actual should not be null."); + assertEquals(t.expected, actual.topic(), "topic does not match."); + })); + } +}