diff --git a/build.gradle b/build.gradle index 57ae7fe..f9c778c 100644 --- a/build.gradle +++ b/build.gradle @@ -42,7 +42,7 @@ project.ext { } } kafkaVersion = "2.0.1" - marioVersion = "0.0.12" + marioVersion = "0.0.14" } subprojects { diff --git a/integration-tests/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaInstrumentedConsumerIntegrationTest.java b/integration-tests/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaInstrumentedConsumerIntegrationTest.java index c17f50d..2d2807d 100644 --- a/integration-tests/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaInstrumentedConsumerIntegrationTest.java +++ b/integration-tests/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaInstrumentedConsumerIntegrationTest.java @@ -5,6 +5,7 @@ package com.linkedin.kafka.clients.consumer; import com.google.common.collect.ImmutableMap; +import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils; import com.linkedin.kafka.clients.utils.tests.AbstractKafkaClientsIntegrationTestHarness; import com.linkedin.mario.common.models.v1.ClientConfigRule; import com.linkedin.mario.common.models.v1.ClientConfigRules; @@ -81,9 +82,9 @@ public void testConsumerLiveConfigReload() throws Exception { Properties baseConsumerConfig = getConsumerProperties(extra); LiKafkaInstrumentedConsumerImpl consumer = new LiKafkaInstrumentedConsumerImpl<>( baseConsumerConfig, - LiKafkaConsumerImpl::new, - mario.getUrl() - ); + (baseConfig, overrideConfig) -> + new LiKafkaConsumerImpl(LiKafkaClientsUtils.getConsolidatedProperties(baseConfig, overrideConfig)), + () -> mario.getUrl()); consumer.subscribe(Collections.singletonList(topic)); AtomicReference> recordsRef = new AtomicReference<>(null); diff --git a/integration-tests/src/test/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerIntegrationTest.java b/integration-tests/src/test/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerIntegrationTest.java index 078beb4..a1f647f 100644 --- a/integration-tests/src/test/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerIntegrationTest.java +++ b/integration-tests/src/test/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerIntegrationTest.java @@ -6,6 +6,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils; import com.linkedin.kafka.clients.utils.tests.AbstractKafkaClientsIntegrationTestHarness; import com.linkedin.mario.common.models.v1.ClientConfigRule; import com.linkedin.mario.common.models.v1.ClientConfigRules; @@ -55,9 +56,9 @@ public void testProducerLiveConfigReload() throws Exception { Properties baseProducerConfig = getConsumerProperties(extra); LiKafkaInstrumentedProducerImpl producer = new LiKafkaInstrumentedProducerImpl<>( baseProducerConfig, - LiKafkaProducerImpl::new, - mario.getUrl() - ); + (baseConfig, overrideConfig) -> + new LiKafkaProducerImpl(LiKafkaClientsUtils.getConsolidatedProperties(baseConfig, overrideConfig)), + () -> mario.getUrl()); byte[] key = new byte[500]; byte[] value = new byte[500]; diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/ConsumerFactory.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/ConsumerFactory.java new file mode 100644 index 0000000..6daee33 --- /dev/null +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/ConsumerFactory.java @@ -0,0 +1,14 @@ +/* + * Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
 See License in the project root for license information. + */ + +package com.linkedin.kafka.clients.consumer; + +import java.util.Properties; +import org.apache.kafka.clients.consumer.Consumer; + + +@FunctionalInterface +public interface ConsumerFactory { + Consumer create(Properties base, Properties overrides); +} \ No newline at end of file diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaInstrumentedConsumerImpl.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaInstrumentedConsumerImpl.java index 5bedd4d..e353e9f 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaInstrumentedConsumerImpl.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaInstrumentedConsumerImpl.java @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Function; +import java.util.function.Supplier; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -53,8 +53,7 @@ public class LiKafkaInstrumentedConsumerImpl implements DelegatingConsumer private final long initialConnectionTimeoutMs = TimeUnit.SECONDS.toMillis(30); private final ReadWriteLock delegateLock = new ReentrantReadWriteLock(); private final Properties baseConfig; - private final Function> consumerFactory; - private final String mdsUrl; + private final ConsumerFactory consumerFactory; private final CountDownLatch initialConnectionLatch = new CountDownLatch(1); private final MetricsProxy metricsProxy = new MetricsProxy() { @Override @@ -75,14 +74,13 @@ public class LiKafkaInstrumentedConsumerImpl implements DelegatingConsumer public LiKafkaInstrumentedConsumerImpl( Properties baseConfig, - Function> consumerFactory, - String mdsUrl + ConsumerFactory consumerFactory, + Supplier mdsUrlSupplier ) { List conversionIssues = new ArrayList<>(1); this.baseConfig = baseConfig; Map translatedBaseConfig = LiKafkaClientsUtils.propertiesToStringMap(baseConfig, conversionIssues); this.consumerFactory = consumerFactory; - this.mdsUrl = mdsUrl; if (!conversionIssues.isEmpty()) { StringJoiner csv = new StringJoiner(", "); @@ -90,8 +88,7 @@ public LiKafkaInstrumentedConsumerImpl( LOG.error("issues translating consumer config to strings: {}", csv); } - mdsClient = new SimpleClient( - mdsUrl, + mdsClient = new SimpleClient(mdsUrlSupplier, TimeUnit.MINUTES.toMillis(1), TimeUnit.HOURS.toMillis(1), translatedBaseConfig, this @@ -110,9 +107,11 @@ public LiKafkaInstrumentedConsumerImpl( boolean delegateChanged = recreateDelegate(true); if (delegateChanged) { if (issue != null) { - LOG.error("exception waiting to contact {}, using user-provided configs as fallback", mdsUrl, issue); + LOG.error("exception waiting to contact {}, using user-provided configs as fallback", + mdsClient.getLastAttemptedMarioUrl(), issue); } else { - LOG.error("unable to contact {} within timeout ({}), using user-provided configs as fallback", mdsUrl, initialConnectionTimeoutMs); + LOG.error("unable to contact {} within timeout ({}), using user-provided configs as fallback", + mdsClient.getLastAttemptedMarioUrl(), initialConnectionTimeoutMs); } } else if (issue != null) { //we got interrupted waiting, but apparently connection to mds was successful? @@ -130,9 +129,9 @@ public void stateChanged(SimpleClient client, SimpleClientState oldState, Simple boolean configOverrideChanged = !Objects.equals(currentOverrides, newOverrides); if (delegate == null || configOverrideChanged) { if (configOverrideChanged) { - LOG.info("got new config overrides from {}: {}", mdsUrl, newOverrides); + LOG.info("got new config overrides from {}: {}", mdsClient.getLastConnectedMarioUrl(), newOverrides); } else { - LOG.info("successfully connected to {}, no config overrides", mdsUrl); + LOG.info("successfully connected to {}, no config overrides", mdsClient.getLastConnectedMarioUrl()); } this.configOverrides = newOverrides; recreateDelegate(false); @@ -147,7 +146,7 @@ public void stateChanged(SimpleClient client, SimpleClientState oldState, Simple public void configChangeRequested(UUID commandId, Map configDiff, String message) { Map currentOverrides = this.configOverrides; if (!Objects.equals(currentOverrides, configDiff)) { - LOG.info("got new config overrides from {}: {} ({})", mdsUrl, configDiff, message); + LOG.info("got new config overrides from {}: {} ({})", mdsClient.getLastConnectedMarioUrl(), configDiff, message); this.configOverrides = configDiff; recreateDelegate(false); } @@ -186,12 +185,9 @@ private boolean recreateDelegate(boolean abortIfExists) { LOG.error("error closing old delegate consumer", e); } } - Properties delegateConfig = new Properties(); - delegateConfig.putAll(baseConfig); - if (configOverrides != null) { - delegateConfig.putAll(configOverrides); - } - delegate = consumerFactory.apply(delegateConfig); + + delegate = consumerFactory.create(baseConfig, LiKafkaClientsUtils.convertConfigMapToProperties(configOverrides)); + if (subscriptionPattern != null) { if (rebalanceListener != null) { delegate.subscribe(subscriptionPattern, rebalanceListener); diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerImpl.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerImpl.java index da8aad0..fc34e2a 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerImpl.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerImpl.java @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Function; +import java.util.function.Supplier; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; @@ -56,8 +56,7 @@ public class LiKafkaInstrumentedProducerImpl implements DelegatingProducer private final long initialConnectionTimeoutMs = TimeUnit.SECONDS.toMillis(30); private final ReadWriteLock delegateLock = new ReentrantReadWriteLock(); private final Properties baseConfig; - private final Function> producerFactory; - private final String mdsUrl; + private final ProducerFactory producerFactory; private final CountDownLatch initialConnectionLatch = new CountDownLatch(1); private final MetricsProxy metricsProxy = new MetricsProxy() { @Override @@ -77,14 +76,13 @@ public class LiKafkaInstrumentedProducerImpl implements DelegatingProducer public LiKafkaInstrumentedProducerImpl( Properties baseConfig, - Function> producerFactory, - String mdsUrl + ProducerFactory producerFactory, + Supplier mdsUrlSupplier ) { List conversionIssues = new ArrayList<>(1); this.baseConfig = baseConfig; Map translatedBaseConfig = LiKafkaClientsUtils.propertiesToStringMap(baseConfig, conversionIssues); this.producerFactory = producerFactory; - this.mdsUrl = mdsUrl; if (!conversionIssues.isEmpty()) { StringJoiner csv = new StringJoiner(", "); @@ -92,8 +90,7 @@ public LiKafkaInstrumentedProducerImpl( LOG.error("issues translating producer config to strings: {}", csv); } - mdsClient = new SimpleClient( - mdsUrl, + mdsClient = new SimpleClient(mdsUrlSupplier, TimeUnit.MINUTES.toMillis(1), TimeUnit.HOURS.toMillis(1), translatedBaseConfig, this @@ -112,9 +109,11 @@ public LiKafkaInstrumentedProducerImpl( boolean delegateChanged = recreateDelegate(true); if (delegateChanged) { if (issue != null) { - LOG.error("exception waiting to contact {}, using user-provided configs as fallback", mdsUrl, issue); + LOG.error("exception waiting to contact {}, using user-provided configs as fallback", + mdsClient.getLastAttemptedMarioUrl(), issue); } else { - LOG.error("unable to contact {} within timeout ({}), using user-provided configs as fallback", mdsUrl, initialConnectionTimeoutMs); + LOG.error("unable to contact {} within timeout ({}), using user-provided configs as fallback", + mdsClient.getLastAttemptedMarioUrl(), initialConnectionTimeoutMs); } } else if (issue != null) { //we got interrupted waiting, but apparently connection to mds was successful? @@ -132,9 +131,9 @@ public void stateChanged(SimpleClient client, SimpleClientState oldState, Simple boolean configOverrideChanged = !Objects.equals(currentOverrides, newOverrides); if (delegate == null || configOverrideChanged) { if (configOverrideChanged) { - LOG.info("got new config overrides from {}: {}", mdsUrl, newOverrides); + LOG.info("got new config overrides from {}: {}", mdsClient.getLastConnectedMarioUrl(), newOverrides); } else { - LOG.info("successfully connected to {}, no config overrides", mdsUrl); + LOG.info("successfully connected to {}, no config overrides", mdsClient.getLastConnectedMarioUrl()); } this.configOverrides = newOverrides; recreateDelegate(false); @@ -149,7 +148,7 @@ public void stateChanged(SimpleClient client, SimpleClientState oldState, Simple public void configChangeRequested(UUID commandId, Map configDiff, String message) { Map currentOverrides = this.configOverrides; if (!Objects.equals(currentOverrides, configDiff)) { - LOG.info("got new config overrides from {}: {} ({})", mdsUrl, configDiff, message); + LOG.info("got new config overrides from {}: {} ({})", mdsClient.getLastConnectedMarioUrl(), configDiff, message); this.configOverrides = configDiff; recreateDelegate(false); } @@ -189,12 +188,8 @@ private boolean recreateDelegate(boolean abortIfExists) { //TODO - keep track of ongoing transactions, and cancel them? //(not sure, need to see transaction spec) } - Properties delegateConfig = new Properties(); - delegateConfig.putAll(baseConfig); - if (configOverrides != null) { - delegateConfig.putAll(configOverrides); - } - delegate = producerFactory.apply(delegateConfig); + + delegate = producerFactory.create(baseConfig, LiKafkaClientsUtils.convertConfigMapToProperties(configOverrides)); // TODO: Remove this hack when bounded flush is added to upstream Method producerSupportsBoundedFlush; diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/ProducerFactory.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/ProducerFactory.java new file mode 100644 index 0000000..57de155 --- /dev/null +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/ProducerFactory.java @@ -0,0 +1,14 @@ +/* + * Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
 See License in the project root for license information. + */ + +package com.linkedin.kafka.clients.producer; + +import java.util.Properties; +import org.apache.kafka.clients.producer.Producer; + + +@FunctionalInterface +public interface ProducerFactory { + Producer create(Properties base, Properties overrides); +} \ No newline at end of file diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/utils/LiKafkaClientsUtils.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/utils/LiKafkaClientsUtils.java index 3e38308..d7f651c 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/utils/LiKafkaClientsUtils.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/utils/LiKafkaClientsUtils.java @@ -126,4 +126,23 @@ public static Map propertiesToStringMap(Properties props, List configMap) { + Properties props = new Properties(); + if (configMap != null) { + props.putAll(configMap); + } + return props; + } }