Skip to content

Commit

Permalink
Bugfixes (#99)
Browse files Browse the repository at this point in the history
* Tighten constraints on TopicWiseConfigs

Currently if any config option starts with `topic.` prefix,
the connector will attempt to match it against regular expression
for TopicWiseConfigs. If it fails then whole connector fails to start.
This is a problem when launching on Confluent Cloud which adds
for example `topic.creation.default.max.message.bytes` config.

It mistakenly gets through the first check because it starts with `topic.`
and then trips up on regular expression. It seems there could be many more
configs like this that pertain to Kafka topic but are not this connectors
TopicWiseConfigs.

Adding suffix check should be sufficient and it should not break existing
configurations as long as those adhered to the existing documentation.

* Log warning when returning null TableMetadata

If user sets connector up in schemaless mode and forgets
to create the table, the connector tasks will fail with NullPointerException.
Added warning should make it more obvious as to what is the issue.

I am not adding throw here just in case there exists a situation
where table creation could lag behind. Connector should continue
working normally after the table is created.
  • Loading branch information
Bouncheck authored Dec 18, 2024
1 parent a458e42 commit 5e44b45
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/main/java/io/connect/scylladb/ScyllaDbSessionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public TableMetadata.Table tableMetadata(String tableName) {
result = new TableMetadataImpl.TableImpl(tableMetadata.get());
this.tableMetadataCache.put(tableName, result);
} else {
log.warn("Could not find metadata for table {} in keyspace {}. Are you sure the table exists?", tableName, keyspaceMetadata.getName().asCql(false));
result = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.confluent.kafka.connect.utils.config.ValidEnum;
import io.confluent.kafka.connect.utils.config.ValidPort;
import io.connect.scylladb.topictotable.TopicConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Configuration class for {@link ScyllaDbSinkConnector}.
Expand Down Expand Up @@ -62,6 +64,10 @@ public class ScyllaDbSinkConnectorConfig extends AbstractConfig {
private static final Pattern TOPIC_KS_TABLE_SETTING_PATTERN =
Pattern.compile("topic\\.([a-zA-Z0-9._-]+)\\.([^.]+|\"[\"]+\")\\.([^.]+|\"[\"]+\")\\.(mapping|consistencyLevel|ttlSeconds|deletesEnabled)$");

private static final String[] TOPIC_WISE_CONFIGS_VALID_SUFFIXES = {".mapping",".consistencyLevel",".ttlSeconds",".deletesEnabled"};

private static final Logger log = LoggerFactory.getLogger(ScyllaDbSinkConnectorConfig.class);

static final Set<String> CLIENT_COMPRESSION = ImmutableSet.of("none", "lz4", "snappy");

static final Set<String> TABLE_COMPRESSORS = ImmutableSet.of("SnappyCompressor", "LZ4Compressor", "DeflateCompressor", "none");
Expand Down Expand Up @@ -111,8 +117,9 @@ public ScyllaDbSinkConnectorConfig(Map<?, ?> originals) {
Map<String, Map<String, String>> topicWiseConfigsMap = new HashMap<>();
for (final Map.Entry<String, String> entry : ((Map<String, String>) originals).entrySet()) {
final String name2 = entry.getKey();
if (name2.startsWith("topic.")) {
if (name2.startsWith("topic.") && hasTopicWiseConfigSuffix(name2)) {
final String topicName = this.tryMatchTopicName(name2);
log.debug("Interpreting " + name2 + " as custom TopicWiseConfig for topic " + topicName);
final Map<String, String> topicMap = topicWiseConfigsMap.computeIfAbsent(topicName, t -> new HashMap());
topicMap.put(name2.split("\\.")[name2.split("\\.").length - 1], entry.getValue());
}
Expand Down Expand Up @@ -567,6 +574,13 @@ private String tryMatchTopicName(final String name) {
throw new IllegalArgumentException("The setting: " + name + " does not match topic.keyspace.table nor topic.codec regular expression pattern");
}

private boolean hasTopicWiseConfigSuffix(final String name) {
for (String suffix : TOPIC_WISE_CONFIGS_VALID_SUFFIXES) {
if (name.endsWith(suffix)) return true;
}
return false;
}

private static String[] toStringArray(Object[] arr){
return Arrays.stream(arr).map(Object::toString).toArray(String[]::new);
}
Expand Down

0 comments on commit 5e44b45

Please sign in to comment.