diff --git a/build.gradle b/build.gradle index 6ba1c10..1cfcdb1 100644 --- a/build.gradle +++ b/build.gradle @@ -42,7 +42,7 @@ project.ext { } } kafkaVersion = "2.0.1" - marioVersion = "0.0.6" + marioVersion = "0.0.8" } subprojects { diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImpl.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImpl.java index 5aab5a8..f9a5a2e 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImpl.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImpl.java @@ -13,7 +13,7 @@ import com.linkedin.kafka.clients.metadataservice.MetadataServiceClient; import com.linkedin.kafka.clients.metadataservice.MetadataServiceClientException; import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils; -import com.linkedin.mario.common.websockets.MsgType; +import com.linkedin.mario.common.websockets.MessageType; import java.time.Duration; import java.util.ArrayList; @@ -96,6 +96,9 @@ public LiKafkaConsumer getConsumer() { // Consumer configs common to all clusters private LiKafkaConsumerConfig _commonConsumerConfigs; + // Consumer configs received from Conductor at boot up time + private Map _bootupConfigsFromConductor; + // max.poll.records for the federated consumer private int _maxPollRecordsForFederatedConsumer; @@ -113,6 +116,9 @@ public LiKafkaConsumer getConsumer() { private int _nextClusterIndexToPoll; + // Number of consumers that successfully applied configs at boot up time, used for testing only + private Set> _numConsumersWithBootupConfigs = new HashSet<>(); + // The prefix of the client.id property to be used for individual consumers. Since the client id is part of the // consumer metric keys, we need to use cluster-specific client ids to differentiate metrics from different clusters. // Per-cluster client id is generated by appending the cluster name to this prefix. @@ -203,29 +209,17 @@ private LiKafkaFederatedConsumerImpl(LiKafkaConsumerConfig configs, MetadataServ _closed = false; _numConfigReloads = 0; - try { - // Instantiate metadata service client if necessary. - _mdsClient = mdsClient != null ? mdsClient : - configs.getConfiguredInstance(LiKafkaConsumerConfig.METADATA_SERVICE_CLIENT_CLASS_CONFIG, MetadataServiceClient.class); - - // Register this federated client with the metadata service. The metadata service will assign a UUID to this - // client, which will be used for later interaction between the metadata service and the client. - // - // Registration may also return further information such as the metadata server version and any protocol settings. - // We assume that such information will be kept and used by the metadata service client itself. - // - // TODO: make sure this is not blocking indefinitely and also works when Mario is not available. - _mdsClient.registerFederatedClient(this, _clusterGroup, configs.originals(), _mdsRequestTimeoutMs); - } catch (Exception e) { - try { - if (_mdsClient != null) { - _mdsClient.close(_mdsRequestTimeoutMs); - } - } catch (Exception e2) { - e.addSuppressed(e2); - } - throw e; - } + // Instantiate metadata service client if necessary. + _mdsClient = mdsClient != null ? mdsClient : + configs.getConfiguredInstance(LiKafkaConsumerConfig.METADATA_SERVICE_CLIENT_CLASS_CONFIG, MetadataServiceClient.class); + + // Register this federated client with the metadata service. The metadata service will assign a UUID to this + // client, which will be used for later interaction between the metadata service and the client. + // + // Registration may also return further information such as the metadata server version and any protocol settings. + // We assume that such information will be kept and used by the metadata service client itself. + // + _mdsClient.registerFederatedClient(this, _clusterGroup, configs.originals(), _mdsRequestTimeoutMs); // Create a watchdog thread that polls the creation of nonexistent topics in the current assignment/subscription // and re-assign/subscribe if any of them have been created since the last poll. @@ -665,6 +659,14 @@ public FederatedSubscriptionState getCurrentSubscription() { return _currentSubscription; } + int getNumConsumersWithBootupConfigs() { + return _numConsumersWithBootupConfigs.size(); + } + + int getNumConfigReloads() { + return _numConfigReloads; + } + private void closeNoLock(Duration timeout) { // Get an immutable copy of the current set of consumers. List> consumers = _consumers; @@ -731,6 +733,17 @@ public LiKafkaConsumerConfig getCommonConsumerConfigs() { return _commonConsumerConfigs; } + synchronized public void applyBootupConfigFromConductor(Map configs) { + _bootupConfigsFromConductor = configs; + + // Only try to recreate per-cluster consumers when _consumers are initialized, i.e. after subscribe/assign has + // been called, otherwise it's impossible to create per-cluster consumers without the topic-cluster information, + // just save the boot up configs + if (_consumers != null && !_consumers.isEmpty()) { + recreateConsumers(configs, null); + } + } + public void reloadConfig(Map newConfigs, UUID commandId) { // Go over each consumer, close them, and update existing configs with newConfigs. Since each per-cluster consumer will be // instantiated when the client began consuming from that cluster, we just need to clear the mappings and update the configs. @@ -772,14 +785,18 @@ synchronized void recreateConsumers(Map newConfigs, UUID command // update existing configs with newConfigs // originals() would return a copy of the internal consumer configs, put in new configs and update existing configs Map configMap = _commonConsumerConfigs.originals(); - configMap.putAll(newConfigs); - _commonConsumerConfigs = new LiKafkaConsumerConfig(configMap); + + if (newConfigs != null && !newConfigs.isEmpty()) { + configMap.putAll(newConfigs); + _commonConsumerConfigs = new LiKafkaConsumerConfig(configMap); + } // re-create per-cluster consumers with new set of configs // if any error occurs, recreate all per-cluster consumers with previous last-known-good configs and // TODO : send an error back to Mario // _consumers should be filled when reload config happens List> newConsumers = new ArrayList<>(); + boolean recreateConsumerSuccessful = false; try { for (ClusterConsumerPair entry : _consumers) { @@ -790,6 +807,7 @@ synchronized void recreateConsumers(Map newConfigs, UUID command // replace _consumers with newly created consumers _consumers.clear(); _consumers = newConsumers; + recreateConsumerSuccessful = true; } catch (Exception e) { // if any exception occurs, re-create per-cluster consumers with last-known-good configs LOG.error("Failed to recreate per-cluster consumers with new config with exception, restore to previous consumers ", e); @@ -819,8 +837,7 @@ synchronized void recreateConsumers(Map newConfigs, UUID command for (Map.Entry entry : _commonConsumerConfigs.originals().entrySet()) { convertedConfig.put(entry.getKey(), String.valueOf(entry.getValue())); } - // TODO: Add a flag in RELOAD_CONFIG_RESPONSE message to indicate whether re-creating per-cluster consumers is successful - _mdsClient.reportCommandExecutionComplete(commandId, convertedConfig, MsgType.RELOAD_CONFIG_RESPONSE); + _mdsClient.reportCommandExecutionComplete(commandId, convertedConfig, MessageType.RELOAD_CONFIG_RESPONSE, recreateConsumerSuccessful); // re-register federated client with updated configs _mdsClient.reRegisterFederatedClient(newConfigs); @@ -879,9 +896,23 @@ private LiKafkaConsumer createPerClusterConsumer(ClusterDescriptor cluster configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapUrl()); configMap.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientIdPrefix + "-" + cluster.getName()); + Map configMapWithBootupConfig = new HashMap<>(configMap); + // Apply the configs received from Conductor at boot up/registration time + if (_bootupConfigsFromConductor != null && !_bootupConfigsFromConductor.isEmpty()) { + configMapWithBootupConfig.putAll(_bootupConfigsFromConductor); + } + int maxPollRecordsPerCluster = Math.max(_maxPollRecordsForFederatedConsumer / _numClustersToConnectTo, 1); configMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsPerCluster); - LiKafkaConsumer newConsumer = _consumerBuilder.setConsumerConfig(configMap).build(); + LiKafkaConsumer newConsumer; + + try { + newConsumer = _consumerBuilder.setConsumerConfig(configMapWithBootupConfig).build(); + _numConsumersWithBootupConfigs.add(newConsumer); + } catch (Exception e) { + LOG.error("Failed to create per-cluster consumer with config {}, try creating with config {}", configMapWithBootupConfig, configMap); + newConsumer = _consumerBuilder.setConsumerConfig(configMap).build(); + } return newConsumer; } diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/metadataservice/MarioCommandCallbackImpl.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/metadataservice/MarioCommandCallbackImpl.java index 9170cde..c35d6f0 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/metadataservice/MarioCommandCallbackImpl.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/metadataservice/MarioCommandCallbackImpl.java @@ -11,6 +11,7 @@ import com.linkedin.mario.common.websockets.MarioCommandCallback; import com.linkedin.mario.common.websockets.Messages; +import com.linkedin.mario.common.websockets.RegisterResponseMessages; import com.linkedin.mario.common.websockets.ReloadConfigRequestMessages; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,12 +32,13 @@ class MarioCommandCallbackImpl implements MarioCommandCallback { public void onReceivingCommand(Messages marioCommandMessage) { // Find a federate client callback that matches the given Mario command message type and execute it with arguments // included in the message. + LiKafkaFederatedClientType clientType = _federatedClient.getClientType(); switch (marioCommandMessage.getMsgType()) { case RELOAD_CONFIG_REQUEST: ReloadConfigRequestMessages reloadConfigMsg = (ReloadConfigRequestMessages) marioCommandMessage; - if (_federatedClient.getClientType() == LiKafkaFederatedClientType.FEDERATED_PRODUCER) { + if (clientType == LiKafkaFederatedClientType.FEDERATED_PRODUCER) { // Call producer reload config method ((LiKafkaFederatedProducerImpl) _federatedClient).reloadConfig(reloadConfigMsg.getConfigs(), reloadConfigMsg.getCommandId()); } else { @@ -44,6 +46,16 @@ public void onReceivingCommand(Messages marioCommandMessage) { ((LiKafkaFederatedConsumerImpl) _federatedClient).reloadConfig(reloadConfigMsg.getConfigs(), reloadConfigMsg.getCommandId()); } break; + case REGISTER_RESPONSE: + // Upon receiving register response from conductor, federated client would save the configs from the message and apply the + // configs when actually creating the per-cluster clients + RegisterResponseMessages registerResponseMessage = (RegisterResponseMessages) marioCommandMessage; + if (clientType == LiKafkaFederatedClientType.FEDERATED_PRODUCER) { + ((LiKafkaFederatedProducerImpl) _federatedClient).applyBootupConfigFromConductor(registerResponseMessage.getConfigs()); + } else { + ((LiKafkaFederatedConsumerImpl) _federatedClient).applyBootupConfigFromConductor(registerResponseMessage.getConfigs()); + } + break; default: // No current support at the moment throw new UnsupportedOperationException("command " + marioCommandMessage.getMsgType() + " is not supported"); diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/metadataservice/MarioMetadataServiceClient.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/metadataservice/MarioMetadataServiceClient.java index 952ca1c..625a955 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/metadataservice/MarioMetadataServiceClient.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/metadataservice/MarioMetadataServiceClient.java @@ -17,8 +17,9 @@ import com.linkedin.mario.common.models.v1.TopicQueryResults; import com.linkedin.mario.common.websockets.MarioCommandCallback; +import com.linkedin.mario.common.websockets.MarioException; import com.linkedin.mario.common.websockets.Messages; -import com.linkedin.mario.common.websockets.MsgType; +import com.linkedin.mario.common.websockets.MessageType; import com.linkedin.mario.common.websockets.ReloadConfigResponseMessages; import java.util.Arrays; import java.util.HashMap; @@ -58,7 +59,19 @@ public void registerFederatedClient(LiKafkaFederatedClient federatedClient, Clus MarioClusterGroupDescriptor marioClusterGroup = new MarioClusterGroupDescriptor(clusterGroup.getName(), clusterGroup.getEnvironment()); MarioCommandCallback marioCommandCallback = new MarioCommandCallbackImpl(federatedClient); - _marioClient.registerFederatedClient(marioClusterGroup, (Map) configs, timeoutMs, marioCommandCallback); + + try { + _marioClient.registerFederatedClient(marioClusterGroup, (Map) configs, timeoutMs, + marioCommandCallback); + } catch (MarioException e) { + // Based on the exception thrown, different actions might be taken, e.g. if mario returns a client version + // too low/not supported exception, we can stop the client immediately; otherwise if it's mario temperarily + // we can continue to create clients with original config and let mario client retry connecting in the background, + // once mario is available, + // + // For now, just print error and continue with original config + e.printStackTrace(); + } } @Override @@ -191,11 +204,11 @@ private Map> getClusterMapForTopics(Set t } @Override - public void reportCommandExecutionComplete(UUID commandId, Map configs, MsgType messageType) { + public void reportCommandExecutionComplete(UUID commandId, Map configs, MessageType messageType, boolean commandExecutionResult) { Messages messageToSent; switch (messageType) { case RELOAD_CONFIG_RESPONSE: - messageToSent = new ReloadConfigResponseMessages(configs, commandId); + messageToSent = new ReloadConfigResponseMessages(configs, commandId, commandExecutionResult); break; default: throw new UnsupportedOperationException("Message type " + messageType + " is not supported right now"); diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/metadataservice/MetadataServiceClient.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/metadataservice/MetadataServiceClient.java index 7d77088..0e18ce8 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/metadataservice/MetadataServiceClient.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/metadataservice/MetadataServiceClient.java @@ -10,7 +10,7 @@ import com.linkedin.kafka.clients.common.PartitionLookupResult; import com.linkedin.kafka.clients.common.TopicLookupResult; -import com.linkedin.mario.common.websockets.MsgType; +import com.linkedin.mario.common.websockets.MessageType; import java.util.Map; import java.util.Set; @@ -67,11 +67,12 @@ TopicLookupResult getClustersForTopics(Set topics, ClusterGroupDescripto /** * Report to mario server that command execution for commandId is completed - * @param commandId UUID identifying the completed command - * @param configs config diff before and after the command - * @param messageType response message type to mario server + * @param commandId UUID identifying the completed command + * @param configs config diff before and after the command + * @param messageType response message type to mario server + * @param commandExecutionResult execution result of the given command */ - void reportCommandExecutionComplete(UUID commandId, Map configs, MsgType messageType); + void reportCommandExecutionComplete(UUID commandId, Map configs, MessageType messageType, boolean commandExecutionResult); /** * Re-register federated client with new set of configs diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaFederatedProducerImpl.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaFederatedProducerImpl.java index 8b805ff..08303be 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaFederatedProducerImpl.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaFederatedProducerImpl.java @@ -12,7 +12,7 @@ import com.linkedin.kafka.clients.metadataservice.MetadataServiceClientException; import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils; -import com.linkedin.mario.common.websockets.MsgType; +import com.linkedin.mario.common.websockets.MessageType; import java.time.Duration; import java.util.HashMap; @@ -70,6 +70,12 @@ public class LiKafkaFederatedProducerImpl implements LiKafkaProducer // Producer configs common to all clusters private LiKafkaProducerConfig _commonProducerConfigs; + // Consumer configs received from Conductor at boot up time + private Map _bootupConfigsFromConductor; + + // Number of producers that successfully applied configs at boot up time, used for testing only + private Set> _numProducersWithBootupConfigs = new HashSet<>(); + // The prefix of the client.id property to be used for individual producers. Since the client id is part of the // producer metric keys, we need to use cluster-specific client ids to differentiate metrics from different clusters. // Per-cluster client id is generated by appending the cluster name to this prefix. @@ -124,29 +130,17 @@ private LiKafkaFederatedProducerImpl(LiKafkaProducerConfig configs, MetadataServ _closed = false; _numConfigReloads = 0; - try { - // Instantiate metadata service client if necessary. - _mdsClient = mdsClient != null ? mdsClient : - configs.getConfiguredInstance(LiKafkaProducerConfig.METADATA_SERVICE_CLIENT_CLASS_CONFIG, MetadataServiceClient.class); - - // Register this federated client with the metadata service. The metadata service will assign a UUID to this - // client, which will be used for later interaction between the metadata service and the client. - // - // Registration may also return further information such as the metadata server version and any protocol settings. - // We assume that such information will be kept and used by the metadata service client itself. - // - // TODO: make sure this is not blocking indefinitely and also works when Mario is not available. - _mdsClient.registerFederatedClient(this, _clusterGroup, configs.originals(), _mdsRequestTimeoutMs); - } catch (Exception e) { - try { - if (_mdsClient != null) { - _mdsClient.close(_mdsRequestTimeoutMs); - } - } catch (Exception e2) { - e.addSuppressed(e2); - } - throw e; - } + // Instantiate metadata service client if necessary. + _mdsClient = mdsClient != null ? mdsClient : + configs.getConfiguredInstance(LiKafkaProducerConfig.METADATA_SERVICE_CLIENT_CLASS_CONFIG, MetadataServiceClient.class); + + // Register this federated client with the metadata service. The metadata service will assign a UUID to this + // client, which will be used for later interaction between the metadata service and the client. + // + // Registration may also return further information such as the metadata server version and any protocol settings. + // We assume that such information will be kept and used by the metadata service client itself. + // + _mdsClient.registerFederatedClient(this, _clusterGroup, configs.originals(), _mdsRequestTimeoutMs); } @Override @@ -333,6 +327,25 @@ public LiKafkaProducerConfig getCommonProducerConfigs() { return _commonProducerConfigs; } + synchronized public void applyBootupConfigFromConductor(Map configs) { + _bootupConfigsFromConductor = configs; + + // Only try to recreate per-cluster consumers when _consumers are initialized, i.e. after subscribe/assign has + // been called, otherwise it's impossible to create per-cluster consumers without the topic-cluster information, + // just save the boot up configs + if (_producers != null && !_producers.isEmpty()) { + recreateProducers(configs, null); + } + } + + int getNumProducersWithBootupConfigs() { + return _numProducersWithBootupConfigs.size(); + } + + int getNumConfigReloads() { + return _numConfigReloads; + } + public void reloadConfig(Map newConfigs, UUID commandId) { // Go over each producer, flush and close. Since each per-cluster producer will be instantiated when the client began // producing to that cluster, we just need to clear the mappings and update the configs @@ -376,6 +389,7 @@ synchronized void recreateProducers(Map newConfigs, UUID command // TODO : send an error back to Mario // _producers should be filled when reload config happens Map> newProducers = new ConcurrentHashMap<>(); + boolean recreateProducerSuccessful = false; try { for (Map.Entry> entry : _producers.entrySet()) { @@ -386,6 +400,7 @@ synchronized void recreateProducers(Map newConfigs, UUID command // replace _producers with newly created producers _producers.clear(); _producers = newProducers; + recreateProducerSuccessful = true; } catch (Exception e) { LOG.error("Failed to recreate per-cluster producers with new configs with exception, restore to previous producers ", e); @@ -407,8 +422,7 @@ synchronized void recreateProducers(Map newConfigs, UUID command for (Map.Entry entry : _commonProducerConfigs.originals().entrySet()) { convertedConfig.put(entry.getKey(), String.valueOf(entry.getValue())); } - // TODO: add a flag in RELOAD_CONFIG_RESPONSE message to indicate result of config reload - _mdsClient.reportCommandExecutionComplete(commandId, convertedConfig, MsgType.RELOAD_CONFIG_RESPONSE); + _mdsClient.reportCommandExecutionComplete(commandId, convertedConfig, MessageType.RELOAD_CONFIG_RESPONSE, recreateProducerSuccessful); // re-register federated client with updated configs _mdsClient.reRegisterFederatedClient(newConfigs); @@ -481,8 +495,20 @@ private LiKafkaProducer createPerClusterProducer(ClusterDescriptor cluster configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapUrl()); configMap.put(ProducerConfig.CLIENT_ID_CONFIG, _clientIdPrefix + "-" + cluster.getName()); - _producerBuilder.setProducerConfig(configMap); - LiKafkaProducer newProducer = _producerBuilder.build(); + Map configMapWithBootupConfig = new HashMap<>(configMap); + // Apply the configs received from Conductor at boot up/registration time + if (_bootupConfigsFromConductor != null && !_bootupConfigsFromConductor.isEmpty()) { + configMapWithBootupConfig.putAll(_bootupConfigsFromConductor); + } + + LiKafkaProducer newProducer; + try { + newProducer = _producerBuilder.setProducerConfig(configMapWithBootupConfig).build(); + _numProducersWithBootupConfigs.add(newProducer); + } catch (Exception e) { + LOG.error("Failed to create per-cluster producer with config {}, try creating with config {}", configMapWithBootupConfig, configMap); + newProducer = _producerBuilder.setProducerConfig(configMap).build(); + } return newProducer; } diff --git a/li-apache-kafka-clients/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImplTest.java b/li-apache-kafka-clients/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImplTest.java index be925a1..4b03159 100644 --- a/li-apache-kafka-clients/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImplTest.java +++ b/li-apache-kafka-clients/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImplTest.java @@ -11,7 +11,7 @@ import com.linkedin.kafka.clients.metadataservice.MetadataServiceClient; import com.linkedin.kafka.clients.metadataservice.MetadataServiceClientException; -import com.linkedin.mario.common.websockets.MsgType; +import com.linkedin.mario.common.websockets.MessageType; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -30,6 +30,7 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.TopicPartition; import org.mockito.Mockito; +import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -272,9 +273,19 @@ public void testConsumerReloadConfigCommand() throws MetadataServiceClientExcept _federatedConsumer = new LiKafkaFederatedConsumerImpl<>(_consumerConfig, _mdsClient, new MockConsumerBuilder()); + // Simulate sending bootup configs from Conductor + Map bootupConfigs = new HashMap<>(); + bootupConfigs.put("K3", "V3"); + bootupConfigs.put("K4", "V4"); + + _federatedConsumer.applyBootupConfigFromConductor(bootupConfigs); + // Assign topic partitions from all three topics _federatedConsumer.assign(Arrays.asList(TOPIC_PARTITION1, TOPIC_PARTITION2, TOPIC_PARTITION3)); + // Verify that after assign, boot up configs have been successfully applied to the two consumers + Assert.assertEquals(_federatedConsumer.getNumConsumersWithBootupConfigs(), 2); + Set curAssignment = _federatedConsumer.assignment(); // Verify consumers for both clusters have been created. @@ -295,7 +306,7 @@ public void testConsumerReloadConfigCommand() throws MetadataServiceClientExcept _federatedConsumer.waitForReloadConfigFinish(); // verify corresponding marioClient method is only called once - verify(_mdsClient, times(1)).reportCommandExecutionComplete(eq(commandId), any(), eq(MsgType.RELOAD_CONFIG_RESPONSE)); + verify(_mdsClient, times(1)).reportCommandExecutionComplete(eq(commandId), any(), eq(MessageType.RELOAD_CONFIG_RESPONSE), eq(true)); verify(_mdsClient, times(1)).reRegisterFederatedClient(any()); // Verify consumers for both clusters have been created. @@ -314,6 +325,11 @@ public void testConsumerReloadConfigCommand() throws MetadataServiceClientExcept // Verify the topic partition assignment remains the same after reload config assertEquals(curAssignment, _federatedConsumer.assignment()); + + // Simulate boot up configs response came after consumers have already been created, this is essentially another config reload, + // so we expect the number of config reloads to be 2 + _federatedConsumer.applyBootupConfigFromConductor(bootupConfigs); + Assert.assertEquals(_federatedConsumer.getNumConfigReloads(), 2); } @Test diff --git a/li-apache-kafka-clients/src/test/java/com/linkedin/kafka/clients/metadataservice/MarioMetadataServiceClientTest.java b/li-apache-kafka-clients/src/test/java/com/linkedin/kafka/clients/metadataservice/MarioMetadataServiceClientTest.java index 435db72..0b1ebc1 100644 --- a/li-apache-kafka-clients/src/test/java/com/linkedin/kafka/clients/metadataservice/MarioMetadataServiceClientTest.java +++ b/li-apache-kafka-clients/src/test/java/com/linkedin/kafka/clients/metadataservice/MarioMetadataServiceClientTest.java @@ -76,7 +76,7 @@ public void setup() { } @Test - public void testRegisterFederatedClient() throws MetadataServiceClientException { + public void testRegisterFederatedClient() throws MetadataServiceClientException, Exception { Map configs = new HashMap<>(); configs.put("K1", "V1"); configs.put("K2", "V2"); @@ -87,7 +87,7 @@ public void testRegisterFederatedClient() throws MetadataServiceClientException // For now, simply verify the corresponding MarioClient method is called once with expected arguments. MarioClusterGroupDescriptor expectedMarioClusterGroup = new MarioClusterGroupDescriptor(CLUSTER_GROUP.getName(), CLUSTER_GROUP.getEnvironment()); - verify(_marioClient, times(1)).registerFederatedClient(eq(expectedMarioClusterGroup), eq(configs), eq(100), + verify(_marioClient, times(1)).registerFederatedClient(eq(expectedMarioClusterGroup), eq(configs), eq(100L), any(MarioCommandCallback.class)); } diff --git a/li-apache-kafka-clients/src/test/java/com/linkedin/kafka/clients/producer/LiKafkaFederatedProducerImplTest.java b/li-apache-kafka-clients/src/test/java/com/linkedin/kafka/clients/producer/LiKafkaFederatedProducerImplTest.java index 1b1b020..ac741ca 100644 --- a/li-apache-kafka-clients/src/test/java/com/linkedin/kafka/clients/producer/LiKafkaFederatedProducerImplTest.java +++ b/li-apache-kafka-clients/src/test/java/com/linkedin/kafka/clients/producer/LiKafkaFederatedProducerImplTest.java @@ -9,7 +9,7 @@ import com.linkedin.kafka.clients.metadataservice.MetadataServiceClient; import com.linkedin.kafka.clients.metadataservice.MetadataServiceClientException; -import com.linkedin.mario.common.websockets.MsgType; +import com.linkedin.mario.common.websockets.MessageType; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.mockito.Mockito; +import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -159,10 +160,20 @@ public void testReloadConfigCommand() throws MetadataServiceClientException, Int ProducerRecord record2 = new ProducerRecord<>(TOPIC2, 0, 0L, "key2".getBytes(), "value2".getBytes()); ProducerRecord record3 = new ProducerRecord<>(TOPIC3, 0, 0L, "key3".getBytes(), "value3".getBytes()); + // Simulate sending bootup configs from Conductor + Map bootupConfigs = new HashMap<>(); + bootupConfigs.put("K3", "V3"); + bootupConfigs.put("K4", "V4"); + + _federatedProducer.applyBootupConfigFromConductor(bootupConfigs); + _federatedProducer.send(record1); _federatedProducer.send(record2); _federatedProducer.send(record3); + // Verify that after send, boot up configs have been successfully applied to the two producers + Assert.assertEquals(_federatedProducer.getNumProducersWithBootupConfigs(), 2); + _federatedProducer.flush(); Map newConfigs = new HashMap<>(); @@ -176,7 +187,7 @@ public void testReloadConfigCommand() throws MetadataServiceClientException, Int _federatedProducer.waitForReloadConfigFinish(); // verify corresponding marioClient method is only called once - verify(_mdsClient, times(1)).reportCommandExecutionComplete(eq(commandId), any(), eq(MsgType.RELOAD_CONFIG_RESPONSE)); + verify(_mdsClient, times(1)).reportCommandExecutionComplete(eq(commandId), any(), eq(MessageType.RELOAD_CONFIG_RESPONSE), eq(true)); verify(_mdsClient, times(1)).reRegisterFederatedClient(any()); // verify per-cluster producers have been recreated after reloadConfig @@ -189,6 +200,11 @@ public void testReloadConfigCommand() throws MetadataServiceClientException, Int assertTrue(_federatedProducer.getCommonProducerConfigs().originals().containsKey("K1")); assertTrue(_federatedProducer.getCommonProducerConfigs().originals().containsKey("K2")); + // Simulate the boot up config response came after the producers have been created, this is essentially a config reload, so we expect the + // config reload count to be 2 + _federatedProducer.applyBootupConfigFromConductor(bootupConfigs); + Assert.assertEquals(_federatedProducer.getNumConfigReloads(), 2); + // verify send is still successful after reload config Future metadata1 = _federatedProducer.send(record1); Future metadata2 = _federatedProducer.send(record2);