diff --git a/README.md b/README.md index 68df4c2..00f880f 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ - # Introduction +[Documentation](https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-solr) +[Confluent Hub](https://www.confluent.io/hub/jcustenborder/kafka-connect-solr) The SOLR connector is a high speed mechanism for writing data to [Apache Solr](http://lucene.apache.org/solr/). diff --git a/pom.xml b/pom.xml index fa8416e..2a6a1b4 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.github.jcustenborder.kafka.connect kafka-connect-parent - 2.1.1-cp1 + 2.2.1-cp1 kafka-connect-solr 0.1-SNAPSHOT diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/solr/CloudSolrSinkConnectorConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/solr/CloudSolrSinkConnectorConfig.java index e93fdc0..a086808 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/solr/CloudSolrSinkConnectorConfig.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/solr/CloudSolrSinkConnectorConfig.java @@ -25,17 +25,31 @@ class CloudSolrSinkConnectorConfig extends SolrSinkConnectorConfig { public static final String ZOOKEEPER_HOSTS_CONFIG = "solr.zookeeper.hosts"; public static final String ZOOKEEPER_CHROOT_CONFIG = "solr.zookeeper.chroot"; - public static final String COLLECTION_NAME_CONFIG = "solr.collection.name"; + public static final String ZOOKEEPER_CONNECT_TIMEOUT_CONFIG = "solr.zookeeper.connect.timeout.ms"; + public static final String ZOOKEEPER_CLIENT_TIMEOUT_CONFIG = "solr.zookeeper.client.timeout.ms"; + public static final String ZOOKEEPER_RETRY_EXPIRY_TIME_CONFIG = "solr.zookeeper.retry.expiry.time.ms"; + + private static final String ZOOKEEPER_HOSTS_DOC = "Zookeeper hosts that are used to store solr configuration."; private static final String ZOOKEEPER_CHROOT_DOC = "Chroot within solr for the zookeeper configuration."; + private static final String ZOOKEEPER_CONNECT_TIMEOUT_DOC = "Set the connect timeout to the zookeeper ensemble in ms."; + private static final String ZOOKEEPER_CLIENT_TIMEOUT_DOC = "Set the timeout to the zookeeper ensemble in ms."; + private static final String ZOOKEEPER_RETRY_EXPIRY_TIME_DOC = "This is the time to wait to refetch the " + + "state after getting the same state version from ZK in ms."; public final List zookeeperHosts; public final String zookeeperChroot; + public final int zookeeperConnectTimeoutMs; + public final int zookeeperClientTimeoutMs; + public final int zookeeperRetryExpiryTimeMs; protected CloudSolrSinkConnectorConfig(Map props) { super(config(), props); this.zookeeperHosts = this.getList(ZOOKEEPER_HOSTS_CONFIG); this.zookeeperChroot = this.getString(ZOOKEEPER_CHROOT_CONFIG); + this.zookeeperConnectTimeoutMs = getInt(ZOOKEEPER_CONNECT_TIMEOUT_CONFIG); + this.zookeeperClientTimeoutMs = getInt(ZOOKEEPER_CLIENT_TIMEOUT_CONFIG); + this.zookeeperRetryExpiryTimeMs = getInt(ZOOKEEPER_RETRY_EXPIRY_TIME_CONFIG); } @@ -54,6 +68,27 @@ public static ConfigDef config() { .group(CONNECTION_GROUP) .defaultValue(null) .build() + ).define( + ConfigKeyBuilder.of(ZOOKEEPER_CONNECT_TIMEOUT_CONFIG, ConfigDef.Type.INT) + .importance(ConfigDef.Importance.LOW) + .documentation(ZOOKEEPER_CONNECT_TIMEOUT_DOC) + .group(CONNECTION_GROUP) + .defaultValue(15000) + .build() + ).define( + ConfigKeyBuilder.of(ZOOKEEPER_CLIENT_TIMEOUT_CONFIG, ConfigDef.Type.INT) + .importance(ConfigDef.Importance.LOW) + .documentation(ZOOKEEPER_CLIENT_TIMEOUT_DOC) + .group(CONNECTION_GROUP) + .defaultValue(45000) + .build() + ).define( + ConfigKeyBuilder.of(ZOOKEEPER_RETRY_EXPIRY_TIME_CONFIG, ConfigDef.Type.INT) + .importance(ConfigDef.Importance.LOW) + .documentation(ZOOKEEPER_RETRY_EXPIRY_TIME_DOC) + .group(CONNECTION_GROUP) + .defaultValue(3000) + .build() ); } } diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/solr/CloudSolrSinkTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/solr/CloudSolrSinkTask.java index 2595fdd..f3c1b05 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/solr/CloudSolrSinkTask.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/solr/CloudSolrSinkTask.java @@ -16,7 +16,6 @@ package com.github.jcustenborder.kafka.connect.solr; -import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.request.UpdateRequest; @@ -26,6 +25,7 @@ import java.io.IOException; import java.util.Map; +import java.util.concurrent.TimeUnit; public class CloudSolrSinkTask extends SolrSinkTask { private static final Logger log = LoggerFactory.getLogger(CloudSolrSinkTask.class); @@ -35,7 +35,7 @@ protected CloudSolrSinkConnectorConfig config(Map settings) { return new CloudSolrSinkConnectorConfig(settings); } - SolrClient client; + CloudSolrClient client; @Override public void start(Map settings) { @@ -44,6 +44,9 @@ public void start(Map settings) { builder.withZkHost(this.config.zookeeperHosts); builder.withZkChroot(this.config.zookeeperChroot); this.client = builder.build(); + this.client.setZkConnectTimeout(this.config.zookeeperConnectTimeoutMs); + this.client.setZkClientTimeout(this.config.zookeeperClientTimeoutMs); + this.client.setRetryExpiryTime((int) TimeUnit.SECONDS.convert(this.config.zookeeperRetryExpiryTimeMs, TimeUnit.MILLISECONDS)); } @Override