From c1adfdb0516eb224534883a90d56645626bd4a8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20B=C4=85czkowski?= Date: Tue, 17 Oct 2023 15:28:24 +0200 Subject: [PATCH] Fix table compression Currently setting table compression fails with `Missing sub-option 'sstable_compression' for the 'compression' option.`. This is due to Cassandra using now 'class' sub-option and Scylla using 'sstable_compression'. Resolved by calling with customized 'withOption' instead of directly 'withCompression'. Upgrades snappy to 1.1.10.5 . This technically also requires upping the driver version to the one using snappy 1.1.10.5 . --- pom.xml | 2 +- .../java/io/connect/scylladb/ScyllaDbSchemaBuilder.java | 6 +++--- .../java/io/connect/scylladb/ScyllaDbSessionFactory.java | 1 - 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index 0c57cbe..a866112 100644 --- a/pom.xml +++ b/pom.xml @@ -297,7 +297,7 @@ org.xerial.snappy snappy-java - 1.1.10.1 + 1.1.10.5 diff --git a/src/main/java/io/connect/scylladb/ScyllaDbSchemaBuilder.java b/src/main/java/io/connect/scylladb/ScyllaDbSchemaBuilder.java index 6d29bb0..bb68f2e 100644 --- a/src/main/java/io/connect/scylladb/ScyllaDbSchemaBuilder.java +++ b/src/main/java/io/connect/scylladb/ScyllaDbSchemaBuilder.java @@ -4,7 +4,6 @@ import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListenerBase; import com.datastax.oss.driver.api.core.type.DataType; import com.datastax.oss.driver.api.core.type.DataTypes; -import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.*; import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.*; import com.datastax.oss.driver.api.querybuilder.schema.AlterTableAddColumnEnd; @@ -21,6 +20,7 @@ import com.datastax.oss.driver.shaded.guava.common.cache.CacheBuilder; import com.datastax.oss.driver.shaded.guava.common.collect.ComparisonChain; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap; import io.connect.scylladb.topictotable.TopicConfigs; import org.apache.kafka.connect.data.Date; @@ -220,7 +220,7 @@ void alter( alterTableWithOptionsEnd = alterTable.withNoCompression(); } else { - alterTableWithOptionsEnd = alterTable.withCompression(config.tableCompressionAlgorithm); + alterTableWithOptionsEnd = alterTable.withOption("compression", ImmutableMap.of("sstable_compression", config.tableCompressionAlgorithm)); } String query = alterTableWithOptionsEnd.asCql(); this.session.executeQuery(query); @@ -376,7 +376,7 @@ void create( createWithOptions = createWithOptions.withNoCompression(); } else { - createWithOptions = createWithOptions.withCompression(config.tableCompressionAlgorithm); + createWithOptions = createWithOptions.withOption("compression", ImmutableMap.of("sstable_compression", config.tableCompressionAlgorithm)); } log.info("create() - Adding table {}.{}\n{}", this.config.keyspace, tableName, Arrays.toString(createWithOptions.getOptions().entrySet().toArray())); session.executeStatement(createWithOptions.build()); diff --git a/src/main/java/io/connect/scylladb/ScyllaDbSessionFactory.java b/src/main/java/io/connect/scylladb/ScyllaDbSessionFactory.java index 8bc50df..0c872a1 100644 --- a/src/main/java/io/connect/scylladb/ScyllaDbSessionFactory.java +++ b/src/main/java/io/connect/scylladb/ScyllaDbSessionFactory.java @@ -30,7 +30,6 @@ import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; -import java.util.Arrays; public class ScyllaDbSessionFactory {