From e301ee1e7174437581e18656bf44350e39a978c2 Mon Sep 17 00:00:00 2001 From: Patitapaban Mohapatra <52992475+patitapaban19@users.noreply.github.com> Date: Mon, 2 Mar 2020 13:09:59 +0530 Subject: [PATCH] support for record to table mapping config (#6) Added support for the below configs: "topic.my_topic.my_ks.my_table.mapping": "col1=key.f1, col2=value.f1, __ttl=value.f2, __timestamp=value.f3, col3=header.f1" "topic.my_topic.my_ks.my_table.consistencyLevel": "LOCAL_ONE" "topic.my_topic.my_ks.my_table.ttlSeconds": 1 "topic.my_topic.my_ks.my_table.deletesEnabled": "true" --- documentation/QUICKSTART.md | 4 +- .../io/connect/scylladb/RecordConverter.java | 225 ++++++++++------- .../scylladb/ScyllaDbSchemaBuilder.java | 200 +++++++++------ .../io/connect/scylladb/ScyllaDbSession.java | 12 +- .../connect/scylladb/ScyllaDbSessionImpl.java | 46 ++-- .../scylladb/ScyllaDbSinkConnector.java | 1 + .../scylladb/ScyllaDbSinkConnectorConfig.java | 33 ++- .../io/connect/scylladb/ScyllaDbSinkTask.java | 16 +- .../scylladb/ScyllaDbSinkTaskHelper.java | 38 ++- .../scylladb/topictotable/TopicConfigs.java | 236 ++++++++++++++++++ .../scylladb/utils/ScyllaDbConstants.java | 9 + .../scylladb/{ => utils}/VersionUtil.java | 2 +- .../integration/ScyllaDbSinkConnectorIT.java | 56 ++++- 13 files changed, 673 insertions(+), 205 deletions(-) create mode 100644 src/main/java/io/connect/scylladb/topictotable/TopicConfigs.java create mode 100644 src/main/java/io/connect/scylladb/utils/ScyllaDbConstants.java rename src/main/java/io/connect/scylladb/{ => utils}/VersionUtil.java (87%) diff --git a/documentation/QUICKSTART.md b/documentation/QUICKSTART.md index 611456a..e2724b2 100644 --- a/documentation/QUICKSTART.md +++ b/documentation/QUICKSTART.md @@ -122,7 +122,7 @@ Example: ``` kafka-avro-console-producer --broker-list localhost:9092 ---topic example +--topic topic1 --property parse.key=true --property key.schema='{"type":"record",name":"key_schema","fields":[{"name":"id","type":"int"}]}' --property "key.separator=$" @@ -132,7 +132,7 @@ kafka-avro-console-producer ``` Output upon running the select query in ScyllaDB: -select * from demo.example; +select * from test.topic1; ``` id | firstname | lastname diff --git a/src/main/java/io/connect/scylladb/RecordConverter.java b/src/main/java/io/connect/scylladb/RecordConverter.java index e2e3cef..ec449b2 100644 --- a/src/main/java/io/connect/scylladb/RecordConverter.java +++ b/src/main/java/io/connect/scylladb/RecordConverter.java @@ -1,10 +1,14 @@ package io.connect.scylladb; import com.google.common.base.Preconditions; +import io.connect.scylladb.topictotable.TopicConfigs; +import io.connect.scylladb.utils.ScyllaDbConstants; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,23 +59,44 @@ public abstract class RecordConverter { protected abstract void setNullField(T result, String name); - public T convert(Object value) { - Preconditions.checkNotNull(value, "value cannot be null."); + public T convert(SinkRecord record, TopicConfigs topicConfigs, String operationType) { + Object recordObject = ScyllaDbConstants.DELETE_OPERATION.equals(operationType) ? + record.key() : record.value(); T result = this.newValue(); - if (value instanceof Struct) { - this.convertStruct(result, (Struct)value); + Map columnDetailsMap = null; + Preconditions.checkNotNull(recordObject, + (ScyllaDbConstants.DELETE_OPERATION.equals(operationType) ? "key " : "value ") + "cannot be null."); + if (topicConfigs != null && topicConfigs.isScyllaColumnsMapped()) { + columnDetailsMap = topicConfigs.getTableColumnMap(); + Preconditions.checkNotNull(record.key(), "key cannot be null."); + findRecordTypeAndConvert(result, record.key(), topicConfigs.getTablePartitionKeyMap()); + for (Header header : record.headers()) { + if (topicConfigs.getTableColumnMap().containsKey(header.key())) { + TopicConfigs.KafkaScyllaColumnMapper headerKafkaScyllaColumnMapper = topicConfigs.getTableColumnMap().get(header.key()); + parseStructAndSetInStatement(result, header.schema(), + headerKafkaScyllaColumnMapper.getKafkaRecordField(), header.value(), + headerKafkaScyllaColumnMapper.getScyllaColumnName()); + } + } + } + findRecordTypeAndConvert(result, recordObject, columnDetailsMap); + return result; + } + + void findRecordTypeAndConvert(T result, Object recordObject, + Map columnDetailsMap) { + if (recordObject instanceof Struct) { + this.convertStruct(result, (Struct)recordObject, columnDetailsMap); } else { - if (!(value instanceof Map)) { - throw new DataException(String.format("Only Schema (%s) or Schema less (%s) are supported. %s is not a supported type.", Struct.class.getName(), Map.class.getName(), value.getClass().getName())); + if (!(recordObject instanceof Map)) { + throw new DataException(String.format("Only Schema (%s) or Schema less (%s) are supported. %s is not a supported type.", Struct.class.getName(), Map.class.getName(), recordObject.getClass().getName())); } - this.convertMap(result, (Map)value); + this.convertMap(result, (Map)recordObject, columnDetailsMap); } - - return result; } - void convertMap(T result, Map value) { + void convertMap(T result, Map value, Map columnDetailsMap) { Iterator valueIterator = value.keySet().iterator(); while(valueIterator.hasNext()) { @@ -79,6 +104,13 @@ void convertMap(T result, Map value) { Preconditions.checkState(key instanceof String, "Map key must be a String."); String fieldName = (String)key; Object fieldValue = value.get(key); + if (columnDetailsMap != null) { + if (columnDetailsMap.containsKey(fieldName)) { + fieldName = columnDetailsMap.get(fieldName).getScyllaColumnName(); + } else { + continue; + } + } try { if (null == fieldValue) { @@ -138,7 +170,7 @@ void convertMap(T result, Map value) { } - void convertStruct(T result, Struct struct) { + void convertStruct(T result, Struct struct, Map columnDetailsMap) { Schema schema = struct.schema(); Iterator fieldsIterator = schema.fields().iterator(); @@ -147,92 +179,101 @@ void convertStruct(T result, Struct struct) { String fieldName = field.name(); log.trace("convertStruct() - Processing '{}'", field.name()); Object fieldValue = struct.get(field); - - try { - if (null == fieldValue) { - log.trace("convertStruct() - Setting '{}' to null.", fieldName); - this.setNullField(result, fieldName); + if (columnDetailsMap != null) { + if (columnDetailsMap.containsKey(fieldName)) { + fieldName = columnDetailsMap.get(fieldName).getScyllaColumnName(); } else { - log.trace("convertStruct() - Field '{}'.field().schema().type() = '{}'", fieldName, field.schema().type()); - switch(field.schema().type()) { - case STRING: - log.trace("convertStruct() - Processing '{}' as string.", fieldName); - this.setStringField(result, fieldName, (String)fieldValue); - break; - case INT8: - log.trace("convertStruct() - Processing '{}' as int8.", fieldName); - this.setInt8Field(result, fieldName, (Byte)fieldValue); - break; - case INT16: - log.trace("convertStruct() - Processing '{}' as int16.", fieldName); - this.setInt16Field(result, fieldName, (Short)fieldValue); - break; - case INT32: - if ("org.apache.kafka.connect.data.Date".equals(field.schema().name())) { - log.trace("convertStruct() - Processing '{}' as date.", fieldName); - this.setDateField(result, fieldName, (Date)fieldValue); - } else if ("org.apache.kafka.connect.data.Time".equals(field.schema().name())) { - log.trace("convertStruct() - Processing '{}' as time.", fieldName); - this.setTimeField(result, fieldName, (Date)fieldValue); - } else { - Integer int32Value = (Integer)fieldValue; - log.trace("convertStruct() - Processing '{}' as int32.", fieldName); - this.setInt32Field(result, fieldName, int32Value); - } - break; - case INT64: - if ("org.apache.kafka.connect.data.Timestamp".equals(field.schema().name())) { - log.trace("convertStruct() - Processing '{}' as timestamp.", fieldName); - this.setTimestampField(result, fieldName, (Date)fieldValue); - } else { - Long int64Value = (Long)fieldValue; - log.trace("convertStruct() - Processing '{}' as int64.", fieldName); - this.setInt64Field(result, fieldName, int64Value); - } - break; - case BYTES: - if ("org.apache.kafka.connect.data.Decimal".equals(field.schema().name())) { - log.trace("convertStruct() - Processing '{}' as decimal.", fieldName); - this.setDecimalField(result, fieldName, (BigDecimal)fieldValue); - } else { - byte[] bytes = (byte[])((byte[])fieldValue); - log.trace("convertStruct() - Processing '{}' as bytes.", fieldName); - this.setBytesField(result, fieldName, bytes); - } - break; - case FLOAT32: - log.trace("convertStruct() - Processing '{}' as float32.", fieldName); - this.setFloat32Field(result, fieldName, (Float)fieldValue); - break; - case FLOAT64: - log.trace("convertStruct() - Processing '{}' as float64.", fieldName); - this.setFloat64Field(result, fieldName, (Double)fieldValue); - break; - case BOOLEAN: - log.trace("convertStruct() - Processing '{}' as boolean.", fieldName); - this.setBooleanField(result, fieldName, (Boolean)fieldValue); - break; - case STRUCT: - log.trace("convertStruct() - Processing '{}' as struct.", fieldName); - this.setStructField(result, fieldName, (Struct)fieldValue); - break; - case ARRAY: - log.trace("convertStruct() - Processing '{}' as array.", fieldName); - this.setArray(result, fieldName, schema, (List)fieldValue); - break; - case MAP: - log.trace("convertStruct() - Processing '{}' as map.", fieldName); - this.setMap(result, fieldName, schema, (Map)fieldValue); - break; - default: - throw new DataException("Unsupported schema.type(): " + schema.type()); - } + continue; } - } catch (Exception ex) { - throw new DataException(String.format("Exception thrown while processing field '%s'", fieldName), ex); } + parseStructAndSetInStatement(result, schema, field, fieldValue, fieldName); } + } + void parseStructAndSetInStatement(T result, Schema schema, Field field, Object fieldValue, String fieldName) { + try { + if (null == fieldValue) { + log.trace("convertStruct() - Setting '{}' to null.", fieldName); + this.setNullField(result, fieldName); + } else { + log.trace("convertStruct() - Field '{}'.field().schema().type() = '{}'", fieldName, field.schema().type()); + switch(field.schema().type()) { + case STRING: + log.trace("convertStruct() - Processing '{}' as string.", fieldName); + this.setStringField(result, fieldName, (String)fieldValue); + break; + case INT8: + log.trace("convertStruct() - Processing '{}' as int8.", fieldName); + this.setInt8Field(result, fieldName, (Byte)fieldValue); + break; + case INT16: + log.trace("convertStruct() - Processing '{}' as int16.", fieldName); + this.setInt16Field(result, fieldName, (Short)fieldValue); + break; + case INT32: + if ("org.apache.kafka.connect.data.Date".equals(field.schema().name())) { + log.trace("convertStruct() - Processing '{}' as date.", fieldName); + this.setDateField(result, fieldName, (Date)fieldValue); + } else if ("org.apache.kafka.connect.data.Time".equals(field.schema().name())) { + log.trace("convertStruct() - Processing '{}' as time.", fieldName); + this.setTimeField(result, fieldName, (Date)fieldValue); + } else { + Integer int32Value = (Integer)fieldValue; + log.trace("convertStruct() - Processing '{}' as int32.", fieldName); + this.setInt32Field(result, fieldName, int32Value); + } + break; + case INT64: + if ("org.apache.kafka.connect.data.Timestamp".equals(field.schema().name())) { + log.trace("convertStruct() - Processing '{}' as timestamp.", fieldName); + this.setTimestampField(result, fieldName, (Date)fieldValue); + } else { + Long int64Value = (Long)fieldValue; + log.trace("convertStruct() - Processing '{}' as int64.", fieldName); + this.setInt64Field(result, fieldName, int64Value); + } + break; + case BYTES: + if ("org.apache.kafka.connect.data.Decimal".equals(field.schema().name())) { + log.trace("convertStruct() - Processing '{}' as decimal.", fieldName); + this.setDecimalField(result, fieldName, (BigDecimal)fieldValue); + } else { + byte[] bytes = (byte[])((byte[])fieldValue); + log.trace("convertStruct() - Processing '{}' as bytes.", fieldName); + this.setBytesField(result, fieldName, bytes); + } + break; + case FLOAT32: + log.trace("convertStruct() - Processing '{}' as float32.", fieldName); + this.setFloat32Field(result, fieldName, (Float)fieldValue); + break; + case FLOAT64: + log.trace("convertStruct() - Processing '{}' as float64.", fieldName); + this.setFloat64Field(result, fieldName, (Double)fieldValue); + break; + case BOOLEAN: + log.trace("convertStruct() - Processing '{}' as boolean.", fieldName); + this.setBooleanField(result, fieldName, (Boolean)fieldValue); + break; + case STRUCT: + log.trace("convertStruct() - Processing '{}' as struct.", fieldName); + this.setStructField(result, fieldName, (Struct)fieldValue); + break; + case ARRAY: + log.trace("convertStruct() - Processing '{}' as array.", fieldName); + this.setArray(result, fieldName, schema, (List)fieldValue); + break; + case MAP: + log.trace("convertStruct() - Processing '{}' as map.", fieldName); + this.setMap(result, fieldName, schema, (Map)fieldValue); + break; + default: + throw new DataException("Unsupported schema.type(): " + schema.type()); + } + } + } catch (Exception ex) { + throw new DataException(String.format("Exception thrown while processing field '%s'", fieldName), ex); + } } } diff --git a/src/main/java/io/connect/scylladb/ScyllaDbSchemaBuilder.java b/src/main/java/io/connect/scylladb/ScyllaDbSchemaBuilder.java index ee2be6d..e5c47af 100644 --- a/src/main/java/io/connect/scylladb/ScyllaDbSchemaBuilder.java +++ b/src/main/java/io/connect/scylladb/ScyllaDbSchemaBuilder.java @@ -14,9 +14,12 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.ComparisonChain; +import io.connect.scylladb.topictotable.TopicConfigs; +import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.*; import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,29 +133,55 @@ DataType dataType(Schema schema) { } void alter( - final ScyllaDbSchemaKey key, - String tableName, - Schema keySchema, - Schema valueSchema, - TableMetadata.Table tableMetadata + final ScyllaDbSchemaKey key, + String tableName, + SinkRecord record, + TableMetadata.Table tableMetadata, + TopicConfigs topicConfigs ) { Preconditions.checkNotNull(tableMetadata, "tableMetadata cannot be null."); - Preconditions.checkNotNull(valueSchema, "valueSchema cannot be null."); + Preconditions.checkNotNull(record.valueSchema(), "valueSchema cannot be null."); log.trace("alter() - tableMetadata = '{}' ", tableMetadata); - Map addedColumns = new LinkedHashMap<>(); - for (final Field field : valueSchema.fields()) { - log.trace("alter() - Checking if table has '{}' column.", field.name()); - final TableMetadata.Column columnMetadata = tableMetadata.columnMetadata(field.name()); + if (topicConfigs != null && topicConfigs.isScyllaColumnsMapped()) { + if (topicConfigs.getTablePartitionKeyMap().keySet().size() != tableMetadata.primaryKey().size()) { + throw new DataException( + String.format( + "Cannot alter primary key of a ScyllaDb Table. Existing primary key: '%s', " + + "Primary key mapped in 'topic.my_topic.my_ks.my_table.mapping' config: '%s", + Joiner.on("', '").join(tableMetadata.primaryKey()), + Joiner.on("', '").join(topicConfigs.getTablePartitionKeyMap().keySet()) + ) + ); + } - if (null == columnMetadata) { - log.debug("alter() - Adding column '{}'", field.name()); - DataType dataType = dataType(field.schema()); - addedColumns.put(field.name(), dataType); - } else { - log.trace("alter() - Table already has '{}' column.", field.name()); + for (Map.Entry entry: topicConfigs.getTableColumnMap().entrySet()) { + String columnName = entry.getValue().getScyllaColumnName(); + log.trace("alter for mapping() - Checking if table has '{}' column.", columnName); + final TableMetadata.Column columnMetadata = tableMetadata.columnMetadata(columnName); + + if (null == columnMetadata) { + log.debug("alter for mapping() - Adding column '{}'", columnName); + final DataType dataType = dataType(entry.getValue().getKafkaRecordField().schema()); + addedColumns.put(columnName, dataType); + } else { + log.trace("alter for mapping() - Table already has '{}' column.", columnName); + } + } + } else { + for (final Field field : record.valueSchema().fields()) { + log.trace("alter() - Checking if table has '{}' column.", field.name()); + final TableMetadata.Column columnMetadata = tableMetadata.columnMetadata(field.name()); + + if (null == columnMetadata) { + log.debug("alter() - Adding column '{}'", field.name()); + DataType dataType = dataType(field.schema()); + addedColumns.put(field.name(), dataType); + } else { + log.trace("alter() - Table already has '{}' column.", field.name()); + } } } @@ -166,15 +195,15 @@ void alter( final Alter alterTable = SchemaBuilder.alterTable(this.config.keyspace, tableName); if (!this.config.tableManageEnabled) { List requiredAlterStatements = addedColumns.entrySet().stream() - .map(e -> alterTable.addColumn(e.getKey()).type(e.getValue()).toString()) - .collect(Collectors.toList()); + .map(e -> alterTable.addColumn(e.getKey()).type(e.getValue()).toString()) + .collect(Collectors.toList()); throw new DataException( - String.format( - "Alter statement(s) needed. Missing column(s): '%s'\n%s;", - Joiner.on("', '").join(addedColumns.keySet()), - Joiner.on(';').join(requiredAlterStatements) - ) + String.format( + "Alter statement(s) needed. Missing column(s): '%s'\n%s;", + Joiner.on("', '").join(addedColumns.keySet()), + Joiner.on(';').join(requiredAlterStatements) + ) ); } else { String query = alterTable.withOptions() @@ -193,15 +222,15 @@ void alter( this.schemaLookup.put(key, DEFAULT); } - public void build(String tableName, Schema keySchema, Schema valueSchema) { + public void build(String tableName, SinkRecord record, TopicConfigs topicConfigs) { log.trace("build() - tableName = '{}'", tableName); final ScyllaDbSchemaKey key = ScyllaDbSchemaKey.of(this.config.keyspace, tableName); if (null != this.schemaLookup.getIfPresent(key)) { return; } - if (null == keySchema || null == valueSchema) { + if (null == record.keySchema() || null == record.valueSchema()) { log.warn( - "build() - Schemaless mode detected. Cannot generate DDL so assuming table is correct." + "build() - Schemaless mode detected. Cannot generate DDL so assuming table is correct." ); this.schemaLookup.put(key, DEFAULT); return; @@ -211,52 +240,66 @@ public void build(String tableName, Schema keySchema, Schema valueSchema) { final TableMetadata.Table tableMetadata = this.session.tableMetadata(tableName); if (null != tableMetadata) { - alter(key, tableName, keySchema, valueSchema, tableMetadata); + alter(key, tableName, record, tableMetadata, topicConfigs); } else { - create(key, tableName, keySchema, valueSchema); + create(key, tableName, record, topicConfigs); } } void create( - final ScyllaDbSchemaKey key, - String tableName, - Schema keySchema, - Schema valueSchema + final ScyllaDbSchemaKey key, + String tableName, + SinkRecord record, + TopicConfigs topicConfigs ) { + Schema keySchema = record.keySchema(); + Schema valueSchema = record.valueSchema(); log.trace("create() - tableName = '{}'", tableName); Preconditions.checkState( - Schema.Type.STRUCT == keySchema.type(), - "record.keySchema() must be a struct. Received '%s'", - keySchema.type() + Schema.Type.STRUCT == keySchema.type(), + "record.keySchema() must be a struct. Received '%s'", + keySchema.type() ); Preconditions.checkState( - !keySchema.fields().isEmpty(), - "record.keySchema() must have some fields." + !keySchema.fields().isEmpty(), + "record.keySchema() must have some fields." ); - for (final Field keyField : keySchema.fields()) { - log.trace( - "create() - Checking key schema against value schema. fieldName={}", - keyField.name() + if (topicConfigs != null && topicConfigs.isScyllaColumnsMapped()) { + Preconditions.checkState( + Schema.Type.STRUCT == valueSchema.type(), + "record.valueSchema() must be a struct. Received '%s'", + valueSchema.type() ); - final Field valueField = valueSchema.field(keyField.name()); - - if (null == valueField) { - throw new DataException( - String.format( - "record.valueSchema() must contain all of the fields in record.keySchema(). " - + "record.keySchema() is used by the connector to determine the key for the " - + "table. record.valueSchema() is missing field '%s'. record.valueSchema() is " - + "used by the connector to persist data to the table in ScyllaDb. Here are " - + "the available fields for record.valueSchema(%s) and record.keySchema(%s).", - keyField.name(), - Joiner.on(", ").join( - valueSchema.fields().stream().map(Field::name).collect(Collectors.toList()) - ), - Joiner.on(", ").join( - keySchema.fields().stream().map(Field::name).collect(Collectors.toList()) - ) - ) + Preconditions.checkState( + !valueSchema.fields().isEmpty(), + "record.valueSchema() must have some fields." + ); + } else { + for (final Field keyField : keySchema.fields()) { + log.trace( + "create() - Checking key schema against value schema. fieldName={}", + keyField.name() ); + final Field valueField = valueSchema.field(keyField.name()); + + if (null == valueField) { + throw new DataException( + String.format( + "record.valueSchema() must contain all of the fields in record.keySchema(). " + + "record.keySchema() is used by the connector to determine the key for the " + + "table. record.valueSchema() is missing field '%s'. record.valueSchema() is " + + "used by the connector to persist data to the table in ScyllaDb. Here are " + + "the available fields for record.valueSchema(%s) and record.keySchema(%s).", + keyField.name(), + Joiner.on(", ").join( + valueSchema.fields().stream().map(Field::name).collect(Collectors.toList()) + ), + Joiner.on(", ").join( + keySchema.fields().stream().map(Field::name).collect(Collectors.toList()) + ) + ) + ); + } } } @@ -265,22 +308,32 @@ void create( if (!Strings.isNullOrEmpty(valueSchema.doc())) { tableOptions.comment(valueSchema.doc()); } + if (topicConfigs != null && topicConfigs.isScyllaColumnsMapped()) { + for (Map.Entry entry: topicConfigs.getTablePartitionKeyMap().entrySet()) { + final DataType dataType = dataType(entry.getValue().getKafkaRecordField().schema()); + create.addPartitionKey(entry.getValue().getScyllaColumnName(), dataType); + } + for (Map.Entry entry: topicConfigs.getTableColumnMap().entrySet()) { + final DataType dataType = dataType(entry.getValue().getKafkaRecordField().schema()); + create.addColumn(entry.getValue().getScyllaColumnName(), dataType); + } + } else { + Set fields = new HashSet<>(); + for (final Field keyField : keySchema.fields()) { + final DataType dataType = dataType(keyField.schema()); + create.addPartitionKey(keyField.name(), dataType); + fields.add(keyField.name()); + } - Set fields = new HashSet<>(); - for (final Field keyField : keySchema.fields()) { - final DataType dataType = dataType(keyField.schema()); - create.addPartitionKey(keyField.name(), dataType); - fields.add(keyField.name()); - } + for (final Field valueField : valueSchema.fields()) { + if (fields.contains(valueField.name())) { + log.trace("create() - Skipping '{}' because it's already in the key.", valueField.name()); + continue; + } - for (final Field valueField : valueSchema.fields()) { - if (fields.contains(valueField.name())) { - log.trace("create() - Skipping '{}' because it's already in the key.", valueField.name()); - continue; + final DataType dataType = dataType(valueField.schema()); + create.addColumn(valueField.name(), dataType); } - - final DataType dataType = dataType(valueField.schema()); - create.addColumn(valueField.name(), dataType); } if (this.config.tableManageEnabled) { @@ -289,10 +342,9 @@ void create( session.executeStatement(tableOptions); } else { throw new DataException( - String.format("Create statement needed:\n%s", create) + String.format("Create statement needed:\n%s", create) ); } - this.schemaLookup.put(key, DEFAULT); } diff --git a/src/main/java/io/connect/scylladb/ScyllaDbSession.java b/src/main/java/io/connect/scylladb/ScyllaDbSession.java index 94aedcf..35d9919 100644 --- a/src/main/java/io/connect/scylladb/ScyllaDbSession.java +++ b/src/main/java/io/connect/scylladb/ScyllaDbSession.java @@ -4,9 +4,11 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Statement; +import io.connect.scylladb.topictotable.TopicConfigs; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkRecord; import java.io.Closeable; import java.util.Map; @@ -44,10 +46,11 @@ public interface ScyllaDbSession extends Closeable { /** * Ensure that a table has a specified schema. * @param tableName name of the table - * @param keySchema schema that will be used for the primary key. - * @param valueSchema schema that will be used for the rest of the table. + * @param sinkRecord which will have keySchema that will be used for the primary key and + * valueSchema that will be used for the rest of the table. + * @param topicConfigs class containing mapping details for the record */ - void createOrAlterTable(String tableName, Schema keySchema, Schema valueSchema); + void createOrAlterTable(String tableName, SinkRecord sinkRecord, TopicConfigs topicConfigs); /** * Flag to determine if the session is valid. @@ -69,9 +72,10 @@ public interface ScyllaDbSession extends Closeable { /** * Method will return a RecordToBoundStatementConverter for an insert the supplied table. * @param tableName table to return the RecordToBoundStatementConverter for + * @param topicConfigs class containing mapping details for the record * @return RecordToBoundStatementConverter that can be used for the record. */ - RecordToBoundStatementConverter insert(String tableName); + RecordToBoundStatementConverter insert(String tableName, TopicConfigs topicConfigs); /** * Method is used to add prepared statements for the offsets that are in the current batch. diff --git a/src/main/java/io/connect/scylladb/ScyllaDbSessionImpl.java b/src/main/java/io/connect/scylladb/ScyllaDbSessionImpl.java index 75c134c..edc80ae 100644 --- a/src/main/java/io/connect/scylladb/ScyllaDbSessionImpl.java +++ b/src/main/java/io/connect/scylladb/ScyllaDbSessionImpl.java @@ -14,9 +14,11 @@ import com.datastax.driver.core.querybuilder.Insert; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select; +import io.connect.scylladb.topictotable.TopicConfigs; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,8 +93,8 @@ public boolean tableExists(String tableName) { } @Override - public void createOrAlterTable(String tableName, Schema keySchema, Schema valueSchema) { - this.schemaBuilder.build(tableName, keySchema, valueSchema); + public void createOrAlterTable(String tableName, SinkRecord record, TopicConfigs topicConfigs) { + this.schemaBuilder.build(tableName, record, topicConfigs); } @Override @@ -131,29 +133,39 @@ public RecordToBoundStatementConverter apply(String tableName) { ); } - private PreparedStatement createInsertPreparedStatement(String tableName) { + private PreparedStatement createInsertPreparedStatement(String tableName, TopicConfigs topicConfigs) { Insert statement = QueryBuilder.insertInto(config.keyspace, tableName); TableMetadata.Table tableMetadata = tableMetadata(tableName); for (TableMetadata.Column columnMetadata : tableMetadata.columns()) { statement.value(columnMetadata.getName(), QueryBuilder.bindMarker(columnMetadata.getName())); } log.debug("insert() - Preparing statement. '{}'", statement); - return (config.ttl() == null) ? session.prepare(statement) : - session.prepare(statement.using(QueryBuilder.ttl(Integer.parseInt(config.ttl())))); + if (topicConfigs != null) { + return (topicConfigs.getTtl() == null) ? session.prepare(statement) : + session.prepare(statement.using(QueryBuilder.ttl(topicConfigs.getTtl()))); + } else { + return (config.ttl == null) ? session.prepare(statement) : + session.prepare(statement.using(QueryBuilder.ttl(config.ttl))); + } } @Override - public RecordToBoundStatementConverter insert(String tableName) { - return this.insertStatementCache.computeIfAbsent( - tableName, - new Function() { - @Override - public RecordToBoundStatementConverter apply(String tableName) { - PreparedStatement preparedStatement = createInsertPreparedStatement(tableName); - return new RecordToBoundStatementConverter(preparedStatement); - } - } - ); + public RecordToBoundStatementConverter insert(String tableName, TopicConfigs topicConfigs) { + if (topicConfigs != null && topicConfigs.getTtl() != null) { + PreparedStatement preparedStatement = createInsertPreparedStatement(tableName, topicConfigs); + return new RecordToBoundStatementConverter(preparedStatement); + } else { + return this.insertStatementCache.computeIfAbsent( + tableName, + new Function() { + @Override + public RecordToBoundStatementConverter apply(String tableName) { + PreparedStatement preparedStatement = createInsertPreparedStatement(tableName, topicConfigs); + return new RecordToBoundStatementConverter(preparedStatement); + } + } + ); + } } private PreparedStatement offsetPreparedStatement; @@ -171,7 +183,7 @@ public void addOffsetsToBatch( final BoundStatement statement; if (null == this.offsetPreparedStatement) { this.offsetPreparedStatement = - createInsertPreparedStatement(this.config.offsetStorageTable); + createInsertPreparedStatement(this.config.offsetStorageTable, null); } log.debug( "addOffsetsToBatch() - Setting offset to {}:{}:{}", diff --git a/src/main/java/io/connect/scylladb/ScyllaDbSinkConnector.java b/src/main/java/io/connect/scylladb/ScyllaDbSinkConnector.java index f53f4ba..59b3e0d 100644 --- a/src/main/java/io/connect/scylladb/ScyllaDbSinkConnector.java +++ b/src/main/java/io/connect/scylladb/ScyllaDbSinkConnector.java @@ -6,6 +6,7 @@ import java.util.List; import java.util.Map; +import io.connect.scylladb.utils.VersionUtil; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.errors.ConnectException; diff --git a/src/main/java/io/connect/scylladb/ScyllaDbSinkConnectorConfig.java b/src/main/java/io/connect/scylladb/ScyllaDbSinkConnectorConfig.java index 53a83bb..349568e 100644 --- a/src/main/java/io/connect/scylladb/ScyllaDbSinkConnectorConfig.java +++ b/src/main/java/io/connect/scylladb/ScyllaDbSinkConnectorConfig.java @@ -1,9 +1,13 @@ package io.connect.scylladb; import java.io.File; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import io.connect.scylladb.topictotable.TopicConfigs; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -48,6 +52,12 @@ public class ScyllaDbSinkConnectorConfig extends AbstractConfig { public final int maxBatchSizeKb; public final String loadBalancingLocalDc; public final long timestampResolutionMs; + public final Map topicWiseConfigs; + public final Integer ttl; + + private static final Pattern TOPIC_KS_TABLE_SETTING_PATTERN = + Pattern.compile("topic\\.([a-zA-Z0-9._-]+)\\.([^.]+|\"[\"]+\")\\.([^.]+|\"[\"]+\")\\.(mapping|consistencyLevel|ttlSeconds|deletesEnabled)$"); + static final Map CLIENT_COMPRESSION = ImmutableMap.of( @@ -78,6 +88,7 @@ public ScyllaDbSinkConnectorConfig(Map originals) { this.deletesEnabled = getBoolean(DELETES_ENABLE_CONFIG); this.keyspace = getString(KEYSPACE_CONFIG); + this.ttl = getInt(TTL_CONFIG); final String trustStorePath = this.getString(SSL_TRUSTSTORE_PATH_CONFIG); this.trustStorePath = Strings.isNullOrEmpty(trustStorePath) ? null : new File(trustStorePath); @@ -116,6 +127,20 @@ public ScyllaDbSinkConnectorConfig(Map originals) { this.maxBatchSizeKb = getInt(MAX_BATCH_SIZE_CONFIG); this.loadBalancingLocalDc = getString(LOAD_BALANCING_LOCAL_DC_CONFIG); this.timestampResolutionMs = getLong(TIMESTAMP_RESOLUTION_MS_CONF); + Map> topicWiseConfigsMap = new HashMap<>(); + for (final Map.Entry entry : ((Map) originals).entrySet()) { + final String name2 = entry.getKey(); + if (name2.startsWith("topic.")) { + final String topicName = this.tryMatchTopicName(name2); + final Map topicMap = topicWiseConfigsMap.computeIfAbsent(topicName, t -> new HashMap()); + topicMap.put(name2.split("\\.")[name2.split("\\.").length - 1], entry.getValue()); + } + } + topicWiseConfigs = new HashMap<>(); + for (Map.Entry> topicWiseConfig : topicWiseConfigsMap.entrySet()) { + TopicConfigs topicConfigs = new TopicConfigs(topicWiseConfig.getValue(), this); + topicWiseConfigs.put(topicWiseConfig.getKey(), topicConfigs); + } } public static final String PORT_CONFIG = "scylladb.port"; @@ -502,8 +527,12 @@ public static ConfigDef config() { "Timestamp Threshold in MS"); } - public String ttl() { - return getString(TTL_CONFIG); + private String tryMatchTopicName(final String name) { + final Matcher m = ScyllaDbSinkConnectorConfig.TOPIC_KS_TABLE_SETTING_PATTERN.matcher(name); + if (m.matches()) { + return m.group(1); + } + throw new IllegalArgumentException("The setting: " + name + " does not match topic.keyspace.table nor topic.codec regular expression pattern"); } public boolean isOffsetEnabledInScyllaDb() { diff --git a/src/main/java/io/connect/scylladb/ScyllaDbSinkTask.java b/src/main/java/io/connect/scylladb/ScyllaDbSinkTask.java index 3a50d52..4d62459 100644 --- a/src/main/java/io/connect/scylladb/ScyllaDbSinkTask.java +++ b/src/main/java/io/connect/scylladb/ScyllaDbSinkTask.java @@ -13,11 +13,13 @@ import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.CodecRegistry; +import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.ProtocolVersion; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Statement; import com.google.common.collect.Iterables; +import io.connect.scylladb.utils.VersionUtil; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.RetriableException; @@ -104,7 +106,8 @@ public void put(Collection records) { final int partition = record.kafkaPartition(); BoundStatement boundStatement = scyllaDbSinkTaskHelper.getBoundStatementForRecord(record); - log.trace("put() - Adding Bound Statement for {}:{}:{}", + log.trace("put() - Adding Bound Statement {} for {}:{}:{}", + boundStatement.preparedStatement().getQueryString(), record.topic(), record.kafkaPartition(), record.kafkaOffset() @@ -141,9 +144,14 @@ public void put(Collection records) { for (List batchStatementList : batchesPerTopicPartition.values()) { for (BatchStatement batchStatement : batchStatementList) { - batchStatement.setConsistencyLevel(config.consistencyLevel); - log.trace("put() - Executing Batch Statement {} of size {}", - batchStatement, batchStatement.size()); + ConsistencyLevel consistencyLevel = config.consistencyLevel; + Statement firstStatement = batchStatement.getStatements().iterator().next(); + if (config.topicWiseConfigs.containsKey(firstStatement.getKeyspace())) { + consistencyLevel = firstStatement.getConsistencyLevel(); + } + batchStatement.setConsistencyLevel(consistencyLevel); + log.trace("put() - Executing Batch Statement with Consistency Level {} of size {}", + consistencyLevel, batchStatement.size()); ResultSetFuture resultSetFuture = this.getValidSession().executeStatementAsync(batchStatement); futures.add(resultSetFuture); count++; diff --git a/src/main/java/io/connect/scylladb/ScyllaDbSinkTaskHelper.java b/src/main/java/io/connect/scylladb/ScyllaDbSinkTaskHelper.java index 8b60d42..2acef63 100644 --- a/src/main/java/io/connect/scylladb/ScyllaDbSinkTaskHelper.java +++ b/src/main/java/io/connect/scylladb/ScyllaDbSinkTaskHelper.java @@ -2,6 +2,8 @@ import com.datastax.driver.core.BoundStatement; import com.google.common.base.Preconditions; +import io.connect.scylladb.topictotable.TopicConfigs; +import io.connect.scylladb.utils.ScyllaDbConstants; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; @@ -49,11 +51,21 @@ public void validateRecord(SinkRecord record) { public BoundStatement getBoundStatementForRecord(SinkRecord record) { final String tableName = record.topic(); BoundStatement boundStatement = null; + TopicConfigs topicConfigs = null; + if (scyllaDbSinkConnectorConfig.topicWiseConfigs.containsKey(tableName)) { + topicConfigs = scyllaDbSinkConnectorConfig.topicWiseConfigs.get(tableName); + if (topicConfigs.getMappingStringForTopic() != null && !topicConfigs.isScyllaColumnsMapped()) { + topicConfigs.setTablePartitionAndColumnValues(record); + } + topicConfigs.setTtlAndTimeStampIfAvailable(record); + } if (null == record.value()) { - if (scyllaDbSinkConnectorConfig.deletesEnabled) { + boolean deletionEnabled = topicConfigs != null + ? topicConfigs.isDeletesEnabled() : scyllaDbSinkConnectorConfig.deletesEnabled; + if (deletionEnabled) { if (this.session.tableExists(tableName)) { final RecordToBoundStatementConverter boundStatementConverter = this.session.delete(tableName); - final RecordToBoundStatementConverter.State state = boundStatementConverter.convert(record.key()); + final RecordToBoundStatementConverter.State state = boundStatementConverter.convert(record, null, ScyllaDbConstants.DELETE_OPERATION); Preconditions.checkState( state.parameters > 0, "key must contain the columns in the primary key." @@ -64,17 +76,27 @@ public BoundStatement getBoundStatementForRecord(SinkRecord record) { } } else { throw new DataException( - String.format("Record with null value found for the key '%s'. If you are trying to delete the record set " + - "scylladb.deletes.enabled = true in your connector configuration.", + String.format("Record with null value found for the key '%s'. If you are trying to delete the record set " + + "scylladb.deletes.enabled = true or topic.my_topic.my_ks.my_table.deletesEnabled = true in " + + "your connector configuration.", record.key())); } } else { - this.session.createOrAlterTable(tableName, record.keySchema(), record.valueSchema()); - final RecordToBoundStatementConverter boundStatementConverter = this.session.insert(tableName); - final RecordToBoundStatementConverter.State state = boundStatementConverter.convert(record.value()); + this.session.createOrAlterTable(tableName, record, topicConfigs); + final RecordToBoundStatementConverter boundStatementConverter = this.session.insert(tableName, topicConfigs); + final RecordToBoundStatementConverter.State state = boundStatementConverter.convert(record, topicConfigs, ScyllaDbConstants.INSERT_OPERATION); boundStatement = state.statement; } - boundStatement.setDefaultTimestamp(record.timestamp()); + + if (topicConfigs != null) { + log.trace("Topic mapped Consistency level : " + topicConfigs.getConsistencyLevel() + + ", Record/Topic mapped timestamp : " + topicConfigs.getTimeStamp()); + boundStatement.setConsistencyLevel(topicConfigs.getConsistencyLevel()); + boundStatement.setDefaultTimestamp(topicConfigs.getTimeStamp()); + } else { + boundStatement.setConsistencyLevel(this.scyllaDbSinkConnectorConfig.consistencyLevel); + boundStatement.setDefaultTimestamp(record.timestamp()); + } return boundStatement; } } diff --git a/src/main/java/io/connect/scylladb/topictotable/TopicConfigs.java b/src/main/java/io/connect/scylladb/topictotable/TopicConfigs.java new file mode 100644 index 0000000..8a00998 --- /dev/null +++ b/src/main/java/io/connect/scylladb/topictotable/TopicConfigs.java @@ -0,0 +1,236 @@ +package io.connect.scylladb.topictotable; + +import com.datastax.driver.core.ConsistencyLevel; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import io.connect.scylladb.ScyllaDbSinkConnectorConfig; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +public class TopicConfigs { + + private static final Logger log = LoggerFactory.getLogger(TopicConfigs.class); + private String mappingStringForTopic; + private Map tablePartitionKeyMap; + private Map tableColumnMap; + private ConsistencyLevel consistencyLevel = null; + private String ttlMappedField; + private Integer ttl; + private String timeStampMappedField; + private Long timeStamp; + private boolean deletesEnabled; + private boolean isScyllaColumnsMapped; + + public TopicConfigs(Map configsMapForTheTopic, + ScyllaDbSinkConnectorConfig scyllaDbSinkConnectorConfig) { + this.tablePartitionKeyMap = new HashMap<>(); + this.tableColumnMap = new HashMap<>(); + this.consistencyLevel = scyllaDbSinkConnectorConfig.consistencyLevel; + this.ttl = scyllaDbSinkConnectorConfig.ttl; + this.deletesEnabled = scyllaDbSinkConnectorConfig.deletesEnabled; + if (configsMapForTheTopic.containsKey("mapping")) { + this.mappingStringForTopic = configsMapForTheTopic.get("mapping"); + } + if (configsMapForTheTopic.containsKey("deletesEnabled")) { + String deleteEnabledValue = configsMapForTheTopic.get("deletesEnabled"); + if ("true".equalsIgnoreCase(deleteEnabledValue) || "false".equalsIgnoreCase(deleteEnabledValue)) { + this.deletesEnabled = Boolean.parseBoolean(deleteEnabledValue); + } else { + throw new DataException( + String.format("%s is not a valid value for deletesEnabled. Valid values are : true, false", + deleteEnabledValue + ) + ); + } + } + try { + if (configsMapForTheTopic.containsKey("ttlSeconds")) { + this.ttl = Integer.parseInt(configsMapForTheTopic.get("ttlSeconds")); + } + if (configsMapForTheTopic.containsKey("consistencyLevel")) { + this.consistencyLevel = ConsistencyLevel.valueOf(configsMapForTheTopic.get("consistencyLevel")); + } + } catch (NumberFormatException e) { + throw new DataException( + String.format("The setting ttlSeconds must be of type Integer. %s is not a suppoerted type", + configsMapForTheTopic.get("ttlSeconds").getClass().getName())); + } catch (IllegalArgumentException e) { + throw new DataException( + String.format("%s is not a valid value for consistencyLevel. Valid values are %s", + configsMapForTheTopic.get("consistencyLevel"), Arrays.toString(ConsistencyLevel.values())) + ); + } + } + + public void setTablePartitionAndColumnValues(SinkRecord record) { + for (String mappedEntry : this.mappingStringForTopic.split(",")) { + String[] columnNameMap = mappedEntry.split("="); + String recordField = columnNameMap[1].split("\\.").length > 0 + ? columnNameMap[1].split("\\.")[1] : ""; + String scyllaColumnName = columnNameMap[0].trim(); + KafkaScyllaColumnMapper kafkaScyllaColumnMapper = new KafkaScyllaColumnMapper(scyllaColumnName); + if (columnNameMap[1].startsWith("key.")) { + if (record.keySchema() != null) { + kafkaScyllaColumnMapper.kafkaRecordField = getFiledForNameFromSchema(record.keySchema(), recordField, "record.keySchema()"); + } + this.tablePartitionKeyMap.put(recordField, kafkaScyllaColumnMapper); + } else if (columnNameMap[1].startsWith("value.")) { + Field valueField = null; + if (record.valueSchema() != null) { + valueField = getFiledForNameFromSchema(record.valueSchema(), recordField, "record.valueSchema()"); + } + if (scyllaColumnName.equals("__ttl")) { + ttlMappedField = recordField; + } else if (scyllaColumnName.equals("__timestamp")) { + timeStampMappedField = recordField; + } else { + kafkaScyllaColumnMapper.kafkaRecordField = valueField; + this.tableColumnMap.put(recordField, kafkaScyllaColumnMapper); + } + } else if (columnNameMap[1].startsWith("header.")) { + int index = 0; + for (Header header : record.headers()) { + if (header.key().equals(recordField)) { + if (header.schema().type().isPrimitive()) { + kafkaScyllaColumnMapper.kafkaRecordField = new Field(header.key(), index, header.schema()); + tableColumnMap.put(recordField, kafkaScyllaColumnMapper); + index++; + } else { + throw new IllegalArgumentException(String.format("Header schema type should be of primitive type. " + + "%s schema type is not allowed in header.", header.schema().type().getName())); + } + } + } + } else { + throw new IllegalArgumentException("field name must start with 'key.', 'value.' or 'header.'."); + } + } + this.isScyllaColumnsMapped = true; + } + + private Field getFiledForNameFromSchema(Schema schema, String name, String schemaType) { + Field schemaField = schema.field(name); + if (null == schemaField) { + throw new DataException( + String.format( + schemaType + " must contain all of key fields mentioned in the " + + "'topic.my_topic.my_ks.my_table.mapping' config. " + schemaType + + "is missing field '%s'. " + schemaType + " is used by the connector " + + "to persist data to the table in ScyllaDb. Here are " + + "the available fields for " + schemaType + "(%s).", + name, + Joiner.on(", ").join( + schema.fields().stream().map(Field::name).collect(Collectors.toList()) + ) + ) + ); + } + return schemaField; + } + + public void setTtlAndTimeStampIfAvailable(SinkRecord record) { + this.timeStamp = record.timestamp(); + if (timeStampMappedField != null) { + Object timeStampValue = getValueOfField(record.value(), timeStampMappedField); + if (timeStampValue instanceof Long) { + this.timeStamp = (Long) timeStampValue; + } else { + throw new DataException( + String.format("TimeStamp should be of type Long. But record provided for %s is of type %s", + timeStampMappedField, timeStampValue.getClass().getName() + )); + } + } + if (ttlMappedField != null) { + Object ttlValue = getValueOfField(record.value(), ttlMappedField); + if (ttlValue instanceof Integer) { + this.ttl = (Integer) ttlValue; + } else { + throw new DataException( + String.format("TTL should be of type Integer. But record provided for %s is of type %s", + ttlMappedField, ttlValue.getClass().getName() + )); + } + } + } + + public Object getValueOfField(Object value, String field) { + Preconditions.checkNotNull(value, "value cannot be null."); + if (value instanceof Struct) { + return ((Struct)value).get(field); + } else { + if (!(value instanceof Map)) { + throw new DataException(String.format("Only Schema (%s) or Schema less (%s) are supported. %s is not a supported type.", Struct.class.getName(), Map.class.getName(), value.getClass().getName())); + } + return ((Map)value).get(field); + } + } + + public Map getTablePartitionKeyMap() { + return tablePartitionKeyMap; + } + + public Map getTableColumnMap() { + return tableColumnMap; + } + + public ConsistencyLevel getConsistencyLevel() { + return consistencyLevel; + } + + public String getTtlMappedField() { + return ttlMappedField; + } + + public Integer getTtl() { + return ttl; + } + + public Long getTimeStamp() { + return timeStamp; + } + + public boolean isScyllaColumnsMapped() { + return isScyllaColumnsMapped; + } + + public void setScyllaColumnsMappedFalse() { + this.isScyllaColumnsMapped = false; + } + + public String getMappingStringForTopic() { + return mappingStringForTopic; + } + + public boolean isDeletesEnabled() { + return deletesEnabled; + } + + public class KafkaScyllaColumnMapper { + private String scyllaColumnName; + private Field kafkaRecordField; + + KafkaScyllaColumnMapper(String scyllaColumnName) { + this.scyllaColumnName = scyllaColumnName; + } + + public String getScyllaColumnName() { + return scyllaColumnName; + } + + public Field getKafkaRecordField() { + return kafkaRecordField; + } + } +} diff --git a/src/main/java/io/connect/scylladb/utils/ScyllaDbConstants.java b/src/main/java/io/connect/scylladb/utils/ScyllaDbConstants.java new file mode 100644 index 0000000..efcab51 --- /dev/null +++ b/src/main/java/io/connect/scylladb/utils/ScyllaDbConstants.java @@ -0,0 +1,9 @@ +package io.connect.scylladb.utils; + +public class ScyllaDbConstants { + + public static final String DELETE_OPERATION = "delete"; + + public static final String INSERT_OPERATION = "insert"; + +} diff --git a/src/main/java/io/connect/scylladb/VersionUtil.java b/src/main/java/io/connect/scylladb/utils/VersionUtil.java similarity index 87% rename from src/main/java/io/connect/scylladb/VersionUtil.java rename to src/main/java/io/connect/scylladb/utils/VersionUtil.java index e772866..b8eb8bb 100644 --- a/src/main/java/io/connect/scylladb/VersionUtil.java +++ b/src/main/java/io/connect/scylladb/utils/VersionUtil.java @@ -1,4 +1,4 @@ -package io.connect.scylladb; +package io.connect.scylladb.utils; public class VersionUtil { public static String getVersion() { diff --git a/src/test/java/io/connect/scylladb/integration/ScyllaDbSinkConnectorIT.java b/src/test/java/io/connect/scylladb/integration/ScyllaDbSinkConnectorIT.java index 33419ef..fac5119 100644 --- a/src/test/java/io/connect/scylladb/integration/ScyllaDbSinkConnectorIT.java +++ b/src/test/java/io/connect/scylladb/integration/ScyllaDbSinkConnectorIT.java @@ -43,7 +43,7 @@ public class ScyllaDbSinkConnectorIT { private static final Logger log = LoggerFactory.getLogger(ScyllaDbSinkConnectorIT.class); - static final String SCYLLA_DB_CONTACT_POINT = "172.18.0.2"; + static final String SCYLLA_DB_CONTACT_POINT = "172.18.0.3"; static final int SCYLLA_DB_PORT = 9042; static final String SCYLLADB_KEYSPACE = "testkeyspace"; private static final String SCYLLADB_OFFSET_TABLE = "kafka_connect_offsets"; @@ -170,6 +170,60 @@ public void insert() { verify(this.sinkTaskContext, times(1)).assignment(); } + @Test + public void insertWithTopicMapping() { + final Map settings = settings(); + //adding mapping related configs + settings.put("topic.insertTestingWithMapping.testkeyspace.my_table.mapping", "userid=key.id, " + + "userfirstname=value.firstname, userlastname=value.lastname, userage=value.age, __ttl=value.time"); + settings.put("topic.insertTestingWithMapping.testkeyspace.my_table.consistencyLevel", "LOCAL_ONE"); + settings.put("topic.insertTestingWithMapping.testkeyspace.my_table.ttlSeconds", "3423"); + settings.put("topic.insertTestingWithMapping.testkeyspace.my_table.deletesEnabled", "false"); + connector = new ScyllaDbSinkConnector(); + connector.start(settings); + final String topic = "insertTestingWithMapping"; + when(this.sinkTaskContext.assignment()).thenReturn(ImmutableSet.of(new TopicPartition(topic, 3))); + this.task.start(settings); + List records = ImmutableList.of( + write( + topic, + struct("key", + "id", Schema.Type.INT64, true, 12345L + ), struct("value", + "firstname", Schema.Type.STRING, true, "test", + "lastname", Schema.Type.STRING, true, "user", + "age", Schema.Type.INT64, true, 1234L, + "time", Schema.Type.INT32, true, 1000 + ) + ), + write(topic, + null, + asMap( + struct("key", + "id", Schema.Type.INT64, true, 67890L + ) + ), + null, + asMap( + struct("key", + "firstname", Schema.Type.STRING, true, "another", + "lastname", Schema.Type.STRING, true, "user", + "age", Schema.Type.INT64, true, 10L, + "time", Schema.Type.INT32, true, 10 + ) + ) + ) + ); + this.validations = records.stream() + .map(RowValidator::of) + .collect(Collectors.toList()); + this.task.put(records); + Boolean tableExists = IsOffsetStorageTableExists(SCYLLADB_OFFSET_TABLE); + assertEquals(true, tableExists); + verify(this.sinkTaskContext, times(1)).requestCommit(); + verify(this.sinkTaskContext, times(1)).assignment(); + } + @Test public void decimal() { final Map settings = settings();