From 5e44b45627f1c6bf4d55b0202f26f629f1e06c87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20B=C4=85czkowski?= <36934780+Bouncheck@users.noreply.github.com> Date: Wed, 18 Dec 2024 16:10:58 +0100 Subject: [PATCH] Bugfixes (#99) * Tighten constraints on TopicWiseConfigs Currently if any config option starts with `topic.` prefix, the connector will attempt to match it against regular expression for TopicWiseConfigs. If it fails then whole connector fails to start. This is a problem when launching on Confluent Cloud which adds for example `topic.creation.default.max.message.bytes` config. It mistakenly gets through the first check because it starts with `topic.` and then trips up on regular expression. It seems there could be many more configs like this that pertain to Kafka topic but are not this connectors TopicWiseConfigs. Adding suffix check should be sufficient and it should not break existing configurations as long as those adhered to the existing documentation. * Log warning when returning null TableMetadata If user sets connector up in schemaless mode and forgets to create the table, the connector tasks will fail with NullPointerException. Added warning should make it more obvious as to what is the issue. I am not adding throw here just in case there exists a situation where table creation could lag behind. Connector should continue working normally after the table is created. --- .../io/connect/scylladb/ScyllaDbSessionImpl.java | 1 + .../scylladb/ScyllaDbSinkConnectorConfig.java | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/connect/scylladb/ScyllaDbSessionImpl.java b/src/main/java/io/connect/scylladb/ScyllaDbSessionImpl.java index 01fb369..ea976ae 100644 --- a/src/main/java/io/connect/scylladb/ScyllaDbSessionImpl.java +++ b/src/main/java/io/connect/scylladb/ScyllaDbSessionImpl.java @@ -91,6 +91,7 @@ public TableMetadata.Table tableMetadata(String tableName) { result = new TableMetadataImpl.TableImpl(tableMetadata.get()); this.tableMetadataCache.put(tableName, result); } else { + log.warn("Could not find metadata for table {} in keyspace {}. Are you sure the table exists?", tableName, keyspaceMetadata.getName().asCql(false)); result = null; } } diff --git a/src/main/java/io/connect/scylladb/ScyllaDbSinkConnectorConfig.java b/src/main/java/io/connect/scylladb/ScyllaDbSinkConnectorConfig.java index 3af8d66..b7e4d78 100644 --- a/src/main/java/io/connect/scylladb/ScyllaDbSinkConnectorConfig.java +++ b/src/main/java/io/connect/scylladb/ScyllaDbSinkConnectorConfig.java @@ -25,6 +25,8 @@ import io.confluent.kafka.connect.utils.config.ValidEnum; import io.confluent.kafka.connect.utils.config.ValidPort; import io.connect.scylladb.topictotable.TopicConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Configuration class for {@link ScyllaDbSinkConnector}. @@ -62,6 +64,10 @@ public class ScyllaDbSinkConnectorConfig extends AbstractConfig { private static final Pattern TOPIC_KS_TABLE_SETTING_PATTERN = Pattern.compile("topic\\.([a-zA-Z0-9._-]+)\\.([^.]+|\"[\"]+\")\\.([^.]+|\"[\"]+\")\\.(mapping|consistencyLevel|ttlSeconds|deletesEnabled)$"); + private static final String[] TOPIC_WISE_CONFIGS_VALID_SUFFIXES = {".mapping",".consistencyLevel",".ttlSeconds",".deletesEnabled"}; + + private static final Logger log = LoggerFactory.getLogger(ScyllaDbSinkConnectorConfig.class); + static final Set CLIENT_COMPRESSION = ImmutableSet.of("none", "lz4", "snappy"); static final Set TABLE_COMPRESSORS = ImmutableSet.of("SnappyCompressor", "LZ4Compressor", "DeflateCompressor", "none"); @@ -111,8 +117,9 @@ public ScyllaDbSinkConnectorConfig(Map originals) { Map> topicWiseConfigsMap = new HashMap<>(); for (final Map.Entry entry : ((Map) originals).entrySet()) { final String name2 = entry.getKey(); - if (name2.startsWith("topic.")) { + if (name2.startsWith("topic.") && hasTopicWiseConfigSuffix(name2)) { final String topicName = this.tryMatchTopicName(name2); + log.debug("Interpreting " + name2 + " as custom TopicWiseConfig for topic " + topicName); final Map topicMap = topicWiseConfigsMap.computeIfAbsent(topicName, t -> new HashMap()); topicMap.put(name2.split("\\.")[name2.split("\\.").length - 1], entry.getValue()); } @@ -567,6 +574,13 @@ private String tryMatchTopicName(final String name) { throw new IllegalArgumentException("The setting: " + name + " does not match topic.keyspace.table nor topic.codec regular expression pattern"); } + private boolean hasTopicWiseConfigSuffix(final String name) { + for (String suffix : TOPIC_WISE_CONFIGS_VALID_SUFFIXES) { + if (name.endsWith(suffix)) return true; + } + return false; + } + private static String[] toStringArray(Object[] arr){ return Arrays.stream(arr).map(Object::toString).toArray(String[]::new); }