Skip to content

Commit

Permalink
support for record to table mapping config (#6)
Browse files Browse the repository at this point in the history
Added support for the below configs:

"topic.my_topic.my_ks.my_table.mapping": "col1=key.f1, col2=value.f1, __ttl=value.f2, __timestamp=value.f3, col3=header.f1"
"topic.my_topic.my_ks.my_table.consistencyLevel": "LOCAL_ONE"
"topic.my_topic.my_ks.my_table.ttlSeconds": 1
"topic.my_topic.my_ks.my_table.deletesEnabled": "true"
  • Loading branch information
patitapaban19 authored Mar 2, 2020
1 parent 0a9cd32 commit e301ee1
Show file tree
Hide file tree
Showing 13 changed files with 673 additions and 205 deletions.
4 changes: 2 additions & 2 deletions documentation/QUICKSTART.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ Example:
```
kafka-avro-console-producer
--broker-list localhost:9092
--topic example
--topic topic1
--property parse.key=true
--property key.schema='{"type":"record",name":"key_schema","fields":[{"name":"id","type":"int"}]}'
--property "key.separator=$"
Expand All @@ -132,7 +132,7 @@ kafka-avro-console-producer
```

Output upon running the select query in ScyllaDB:
select * from demo.example;
select * from test.topic1;

```
id | firstname | lastname
Expand Down
225 changes: 133 additions & 92 deletions src/main/java/io/connect/scylladb/RecordConverter.java

Large diffs are not rendered by default.

200 changes: 126 additions & 74 deletions src/main/java/io/connect/scylladb/ScyllaDbSchemaBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ComparisonChain;
import io.connect.scylladb.topictotable.TopicConfigs;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.*;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -130,29 +133,55 @@ DataType dataType(Schema schema) {
}

void alter(
final ScyllaDbSchemaKey key,
String tableName,
Schema keySchema,
Schema valueSchema,
TableMetadata.Table tableMetadata
final ScyllaDbSchemaKey key,
String tableName,
SinkRecord record,
TableMetadata.Table tableMetadata,
TopicConfigs topicConfigs
) {
Preconditions.checkNotNull(tableMetadata, "tableMetadata cannot be null.");
Preconditions.checkNotNull(valueSchema, "valueSchema cannot be null.");
Preconditions.checkNotNull(record.valueSchema(), "valueSchema cannot be null.");
log.trace("alter() - tableMetadata = '{}' ", tableMetadata);


Map<String, DataType> addedColumns = new LinkedHashMap<>();

for (final Field field : valueSchema.fields()) {
log.trace("alter() - Checking if table has '{}' column.", field.name());
final TableMetadata.Column columnMetadata = tableMetadata.columnMetadata(field.name());
if (topicConfigs != null && topicConfigs.isScyllaColumnsMapped()) {
if (topicConfigs.getTablePartitionKeyMap().keySet().size() != tableMetadata.primaryKey().size()) {
throw new DataException(
String.format(
"Cannot alter primary key of a ScyllaDb Table. Existing primary key: '%s', "
+ "Primary key mapped in 'topic.my_topic.my_ks.my_table.mapping' config: '%s",
Joiner.on("', '").join(tableMetadata.primaryKey()),
Joiner.on("', '").join(topicConfigs.getTablePartitionKeyMap().keySet())
)
);
}

if (null == columnMetadata) {
log.debug("alter() - Adding column '{}'", field.name());
DataType dataType = dataType(field.schema());
addedColumns.put(field.name(), dataType);
} else {
log.trace("alter() - Table already has '{}' column.", field.name());
for (Map.Entry<String, TopicConfigs.KafkaScyllaColumnMapper> entry: topicConfigs.getTableColumnMap().entrySet()) {
String columnName = entry.getValue().getScyllaColumnName();
log.trace("alter for mapping() - Checking if table has '{}' column.", columnName);
final TableMetadata.Column columnMetadata = tableMetadata.columnMetadata(columnName);

if (null == columnMetadata) {
log.debug("alter for mapping() - Adding column '{}'", columnName);
final DataType dataType = dataType(entry.getValue().getKafkaRecordField().schema());
addedColumns.put(columnName, dataType);
} else {
log.trace("alter for mapping() - Table already has '{}' column.", columnName);
}
}
} else {
for (final Field field : record.valueSchema().fields()) {
log.trace("alter() - Checking if table has '{}' column.", field.name());
final TableMetadata.Column columnMetadata = tableMetadata.columnMetadata(field.name());

if (null == columnMetadata) {
log.debug("alter() - Adding column '{}'", field.name());
DataType dataType = dataType(field.schema());
addedColumns.put(field.name(), dataType);
} else {
log.trace("alter() - Table already has '{}' column.", field.name());
}
}
}

Expand All @@ -166,15 +195,15 @@ void alter(
final Alter alterTable = SchemaBuilder.alterTable(this.config.keyspace, tableName);
if (!this.config.tableManageEnabled) {
List<String> requiredAlterStatements = addedColumns.entrySet().stream()
.map(e -> alterTable.addColumn(e.getKey()).type(e.getValue()).toString())
.collect(Collectors.toList());
.map(e -> alterTable.addColumn(e.getKey()).type(e.getValue()).toString())
.collect(Collectors.toList());

throw new DataException(
String.format(
"Alter statement(s) needed. Missing column(s): '%s'\n%s;",
Joiner.on("', '").join(addedColumns.keySet()),
Joiner.on(';').join(requiredAlterStatements)
)
String.format(
"Alter statement(s) needed. Missing column(s): '%s'\n%s;",
Joiner.on("', '").join(addedColumns.keySet()),
Joiner.on(';').join(requiredAlterStatements)
)
);
} else {
String query = alterTable.withOptions()
Expand All @@ -193,15 +222,15 @@ void alter(
this.schemaLookup.put(key, DEFAULT);
}

public void build(String tableName, Schema keySchema, Schema valueSchema) {
public void build(String tableName, SinkRecord record, TopicConfigs topicConfigs) {
log.trace("build() - tableName = '{}'", tableName);
final ScyllaDbSchemaKey key = ScyllaDbSchemaKey.of(this.config.keyspace, tableName);
if (null != this.schemaLookup.getIfPresent(key)) {
return;
}
if (null == keySchema || null == valueSchema) {
if (null == record.keySchema() || null == record.valueSchema()) {
log.warn(
"build() - Schemaless mode detected. Cannot generate DDL so assuming table is correct."
"build() - Schemaless mode detected. Cannot generate DDL so assuming table is correct."
);
this.schemaLookup.put(key, DEFAULT);
return;
Expand All @@ -211,52 +240,66 @@ public void build(String tableName, Schema keySchema, Schema valueSchema) {
final TableMetadata.Table tableMetadata = this.session.tableMetadata(tableName);

if (null != tableMetadata) {
alter(key, tableName, keySchema, valueSchema, tableMetadata);
alter(key, tableName, record, tableMetadata, topicConfigs);
} else {
create(key, tableName, keySchema, valueSchema);
create(key, tableName, record, topicConfigs);
}
}

void create(
final ScyllaDbSchemaKey key,
String tableName,
Schema keySchema,
Schema valueSchema
final ScyllaDbSchemaKey key,
String tableName,
SinkRecord record,
TopicConfigs topicConfigs
) {
Schema keySchema = record.keySchema();
Schema valueSchema = record.valueSchema();
log.trace("create() - tableName = '{}'", tableName);
Preconditions.checkState(
Schema.Type.STRUCT == keySchema.type(),
"record.keySchema() must be a struct. Received '%s'",
keySchema.type()
Schema.Type.STRUCT == keySchema.type(),
"record.keySchema() must be a struct. Received '%s'",
keySchema.type()
);
Preconditions.checkState(
!keySchema.fields().isEmpty(),
"record.keySchema() must have some fields."
!keySchema.fields().isEmpty(),
"record.keySchema() must have some fields."
);
for (final Field keyField : keySchema.fields()) {
log.trace(
"create() - Checking key schema against value schema. fieldName={}",
keyField.name()
if (topicConfigs != null && topicConfigs.isScyllaColumnsMapped()) {
Preconditions.checkState(
Schema.Type.STRUCT == valueSchema.type(),
"record.valueSchema() must be a struct. Received '%s'",
valueSchema.type()
);
final Field valueField = valueSchema.field(keyField.name());

if (null == valueField) {
throw new DataException(
String.format(
"record.valueSchema() must contain all of the fields in record.keySchema(). "
+ "record.keySchema() is used by the connector to determine the key for the "
+ "table. record.valueSchema() is missing field '%s'. record.valueSchema() is "
+ "used by the connector to persist data to the table in ScyllaDb. Here are "
+ "the available fields for record.valueSchema(%s) and record.keySchema(%s).",
keyField.name(),
Joiner.on(", ").join(
valueSchema.fields().stream().map(Field::name).collect(Collectors.toList())
),
Joiner.on(", ").join(
keySchema.fields().stream().map(Field::name).collect(Collectors.toList())
)
)
Preconditions.checkState(
!valueSchema.fields().isEmpty(),
"record.valueSchema() must have some fields."
);
} else {
for (final Field keyField : keySchema.fields()) {
log.trace(
"create() - Checking key schema against value schema. fieldName={}",
keyField.name()
);
final Field valueField = valueSchema.field(keyField.name());

if (null == valueField) {
throw new DataException(
String.format(
"record.valueSchema() must contain all of the fields in record.keySchema(). "
+ "record.keySchema() is used by the connector to determine the key for the "
+ "table. record.valueSchema() is missing field '%s'. record.valueSchema() is "
+ "used by the connector to persist data to the table in ScyllaDb. Here are "
+ "the available fields for record.valueSchema(%s) and record.keySchema(%s).",
keyField.name(),
Joiner.on(", ").join(
valueSchema.fields().stream().map(Field::name).collect(Collectors.toList())
),
Joiner.on(", ").join(
keySchema.fields().stream().map(Field::name).collect(Collectors.toList())
)
)
);
}
}
}

Expand All @@ -265,22 +308,32 @@ void create(
if (!Strings.isNullOrEmpty(valueSchema.doc())) {
tableOptions.comment(valueSchema.doc());
}
if (topicConfigs != null && topicConfigs.isScyllaColumnsMapped()) {
for (Map.Entry<String, TopicConfigs.KafkaScyllaColumnMapper> entry: topicConfigs.getTablePartitionKeyMap().entrySet()) {
final DataType dataType = dataType(entry.getValue().getKafkaRecordField().schema());
create.addPartitionKey(entry.getValue().getScyllaColumnName(), dataType);
}
for (Map.Entry<String, TopicConfigs.KafkaScyllaColumnMapper> entry: topicConfigs.getTableColumnMap().entrySet()) {
final DataType dataType = dataType(entry.getValue().getKafkaRecordField().schema());
create.addColumn(entry.getValue().getScyllaColumnName(), dataType);
}
} else {
Set<String> fields = new HashSet<>();
for (final Field keyField : keySchema.fields()) {
final DataType dataType = dataType(keyField.schema());
create.addPartitionKey(keyField.name(), dataType);
fields.add(keyField.name());
}

Set<String> fields = new HashSet<>();
for (final Field keyField : keySchema.fields()) {
final DataType dataType = dataType(keyField.schema());
create.addPartitionKey(keyField.name(), dataType);
fields.add(keyField.name());
}
for (final Field valueField : valueSchema.fields()) {
if (fields.contains(valueField.name())) {
log.trace("create() - Skipping '{}' because it's already in the key.", valueField.name());
continue;
}

for (final Field valueField : valueSchema.fields()) {
if (fields.contains(valueField.name())) {
log.trace("create() - Skipping '{}' because it's already in the key.", valueField.name());
continue;
final DataType dataType = dataType(valueField.schema());
create.addColumn(valueField.name(), dataType);
}

final DataType dataType = dataType(valueField.schema());
create.addColumn(valueField.name(), dataType);
}

if (this.config.tableManageEnabled) {
Expand All @@ -289,10 +342,9 @@ void create(
session.executeStatement(tableOptions);
} else {
throw new DataException(
String.format("Create statement needed:\n%s", create)
String.format("Create statement needed:\n%s", create)
);
}

this.schemaLookup.put(key, DEFAULT);
}

Expand Down
12 changes: 8 additions & 4 deletions src/main/java/io/connect/scylladb/ScyllaDbSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import io.connect.scylladb.topictotable.TopicConfigs;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;

import java.io.Closeable;
import java.util.Map;
Expand Down Expand Up @@ -44,10 +46,11 @@ public interface ScyllaDbSession extends Closeable {
/**
* Ensure that a table has a specified schema.
* @param tableName name of the table
* @param keySchema schema that will be used for the primary key.
* @param valueSchema schema that will be used for the rest of the table.
* @param sinkRecord which will have keySchema that will be used for the primary key and
* valueSchema that will be used for the rest of the table.
* @param topicConfigs class containing mapping details for the record
*/
void createOrAlterTable(String tableName, Schema keySchema, Schema valueSchema);
void createOrAlterTable(String tableName, SinkRecord sinkRecord, TopicConfigs topicConfigs);

/**
* Flag to determine if the session is valid.
Expand All @@ -69,9 +72,10 @@ public interface ScyllaDbSession extends Closeable {
/**
* Method will return a RecordToBoundStatementConverter for an insert the supplied table.
* @param tableName table to return the RecordToBoundStatementConverter for
* @param topicConfigs class containing mapping details for the record
* @return RecordToBoundStatementConverter that can be used for the record.
*/
RecordToBoundStatementConverter insert(String tableName);
RecordToBoundStatementConverter insert(String tableName, TopicConfigs topicConfigs);

/**
* Method is used to add prepared statements for the offsets that are in the current batch.
Expand Down
Loading

0 comments on commit e301ee1

Please sign in to comment.