From 962fabdd82123c6edd097cb57237996cd4255f3d Mon Sep 17 00:00:00 2001 From: Huilin Shi Date: Mon, 18 Mar 2024 11:22:52 -0700 Subject: [PATCH] [LI-HOTFIX] Fix dynamic config reconfigure (#507) TICKET = LIKAFKA-57566 LI_DESCRIPTION = LogConfig is obtained from KafkaConfig with special handlings for some configs, e.g., RetentionMs is calculated from LogRetentionTimeMillis if it set, else from LogRetentionTimeMinutes if it is set, else by LogRetentionTimeHours. However, in DynamicLogConfig reconfigure, this special handling was not run, instead, it was using the config directly from KafkaConfig. As a result, if the static config has only LogRetentionTimeHours set without setting LogRetentionTimeMillis, LogRetentionTimeHours was not honored in LogConfig. In this PR, we change reconfigure to run LogConfig.extractLogConfigMap on KafkaConfig first (this is the same method LogManager used to get LogConfig initially) and then apply the changes to newBrokerDefaults. EXIT_CRITERIA = NA Testing Done Tested with newly added test; Also tested in kafka.tango --- .../kafka/server/DynamicBrokerConfig.scala | 19 ++- .../DynamicBrokerReconfigurationTest.scala | 120 +++++++++++++++++- 2 files changed, 129 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index a1de59ba6182f..579d4b9c87102 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -677,13 +677,18 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok val currentLogConfig = logManager.currentDefaultConfig val origUncleanLeaderElectionEnable = logManager.currentDefaultConfig.uncleanLeaderElectionEnable val newBrokerDefaults = new util.HashMap[String, Object](currentLogConfig.originals) - newConfig.valuesFromThisConfig.forEach { (k, v) => - if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) { - DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName => - if (v == null) - newBrokerDefaults.remove(configName) - else - newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef]) + // Call extractLogConfigMap to get the new LogConfig because there are some special handling for some configs, e.g., + // RetentionMsProp. This is to ensure that the new LogConfig is consistent with LogManager's LogConfig creation behavior. + val newLogConfig = LogConfig.extractLogConfigMap(newConfig) + newLogConfig.forEach { (k, v) => + // Only update the config if it is a reconfigurable config. ReconfigurableConfigs is a subset of TopicConfigSynonyms.values. + // TopicConfigSynonyms is a map from log config name to kafka config name. Here we get the value of k and + // then check whether ReconfigurableConfigs contains it. + if (LogConfig.TopicConfigSynonyms.contains(k) && DynamicLogConfig.ReconfigurableConfigs.contains(LogConfig.TopicConfigSynonyms(k))) { + if (v == null) { + newBrokerDefaults.remove(k) + } else { + newBrokerDefaults.put(k, v.asInstanceOf[AnyRef]) } } } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 2177b796f6361..bdf1669ab9405 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -102,6 +102,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet @BeforeEach override def setUp(): Unit = { + doSetUp(true) + } + + private def doSetUp(setLogRetentionTimeMillis: Boolean): Unit = { startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism))) super.setUp() @@ -123,8 +127,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet props.put(KafkaConfig.NumReplicaFetchersProp, "2") // greater than one to test reducing threads props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "10000000") // non-default value to trigger a new metric props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret") - props.put(KafkaConfig.LogRetentionTimeMillisProp, 1680000000.toString) - props.put(KafkaConfig.LogRetentionTimeHoursProp, 168.toString) + if (setLogRetentionTimeMillis) { + props.put(KafkaConfig.LogRetentionTimeMillisProp, 1680000000.toString) + } + props.put(KafkaConfig.LogRetentionTimeHoursProp, 137.toString) addExtraProps(props) props ++= sslProperties1 @@ -232,7 +238,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet // Verify a few log configs with and without synonyms val expectedProps = new Properties expectedProps.setProperty(KafkaConfig.LogRetentionTimeMillisProp, "1680000000") - expectedProps.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "168") + expectedProps.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "137") expectedProps.setProperty(KafkaConfig.LogRollTimeHoursProp, "168") expectedProps.setProperty(KafkaConfig.LogCleanerThreadsProp, "1") val logRetentionMs = configEntry(configDesc, KafkaConfig.LogRetentionTimeMillisProp) @@ -678,6 +684,114 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet } } + @Test + def testLogRetentionConfig(): Unit = { + // Verify that only setting LogRetentionTimeHoursProp without setting LogRetentionTimeMillisProp, the log retention is updated correctly + // after dynamic reconfigure. + + tearDown() + adminClients.clear() + servers.clear() + // Create servers without setting LogRetentionTimeMillisProp + doSetUp(false) + + val (producerThread, consumerThread) = startProduceConsume(retries = 0) + + val props = new Properties + props.put(KafkaConfig.MessageMaxBytesProp, "99800") + + reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MessageMaxBytesProp, "99800")) + + // Verify that all broker defaults have been updated + servers.foreach { server => + props.forEach { (k, v) => + assertEquals(server.config.originals.get(k).toString, v, s"Not reconfigured $k") + } + } + + // Verify that configs of existing logs have been updated + var newLogConfig = LogConfig(LogConfig.extractLogConfigMap(servers.head.config)) + newLogConfig.values().forEach((k, v) => TestUtils.waitUntilTrue(() => servers.head.logManager.currentDefaultConfig.values().get(k) == v, + k + " not updated in LogManager. expected: value=" + v + " actual: value=" + servers.head.logManager.currentDefaultConfig.values().get(k))) + + // Verify that retentionMs is set with the value of LogRetentionTimeHoursProp + assertEquals(137 * 3600000, servers.head.logManager.currentDefaultConfig.retentionMs); + + val log = servers.head.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found")) + TestUtils.waitUntilTrue(() => log.config.maxMessageSize == 99800, "Existing topic config using defaults not updated") + TestUtils.waitUntilTrue(() => log.config.retentionMs == 137 * 3600000, "retenionMs is updated incorrectly"); + + props.asScala.foreach { case (k, v) => + val logConfigName = DynamicLogConfig.KafkaConfigToLogConfigName(k) + val expectedValue = if (k == KafkaConfig.LogCleanupPolicyProp) s"[$v]" else v + assertEquals(expectedValue, log.config.originals.get(logConfigName).toString, + s"Not reconfigured $logConfigName for existing log") + } + + // Verify that overridden topic configs are not updated when broker default is updated + val log2 = servers.head.logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)) + .getOrElse(throw new IllegalStateException("Log not found")) + assertFalse(log2.config.delete, "Overridden clean up policy should not be updated") + assertEquals(ProducerCompressionCodec.name, log2.config.compressionType) + + // Verify that even though broker defaults can be defined at default cluster level for consistent + // configuration across brokers, they can also be defined at per-broker level for testing + props.clear() + props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "500000") + props.put(KafkaConfig.LogRetentionTimeMillisProp, TimeUnit.DAYS.toMillis(2).toString) + alterConfigsOnServer(servers.head, props) + assertEquals(500000, servers.head.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp)) + assertEquals(TimeUnit.DAYS.toMillis(2), servers.head.config.values.get(KafkaConfig.LogRetentionTimeMillisProp)) + servers.tail.foreach { server => + assertEquals(Defaults.LogIndexSizeMaxBytes, server.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp)) + assertEquals(null, server.config.values.get(KafkaConfig.LogRetentionTimeMillisProp)) + } + assertEquals(TimeUnit.DAYS.toMillis(2), servers.head.logManager.currentDefaultConfig.retentionMs) + + // Verify that produce/consume worked throughout this test without any retries in producer + stopAndVerifyProduceConsume(producerThread, consumerThread) + + // Verify that configuration at both per-broker level and default cluster level could be deleted and + // the default value should be restored + props.clear() + props.put(KafkaConfig.LogRetentionTimeMillisProp, "") + props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "") + TestUtils.incrementalAlterConfigs(servers.take(1), adminClients.head, props, perBrokerConfig = true, opType = OpType.DELETE).all.get + TestUtils.incrementalAlterConfigs(servers, adminClients.head, props, perBrokerConfig = false, opType = OpType.DELETE).all.get + servers.foreach { server => + waitForConfigOnServer(server, KafkaConfig.LogRetentionTimeMillisProp, null) + } + servers.foreach { server => assertEquals(137 * 3600000, server.logManager.currentDefaultConfig.retentionMs)} + + servers.foreach { server => + val log = server.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found")) + // Verify default values for these two configurations are restored on all brokers + TestUtils.waitUntilTrue(() => log.config.maxIndexSize == Defaults.LogIndexSizeMaxBytes && log.config.retentionMs == 137 * 3600000, + "Existing topic config using defaults not updated") + } + + props.clear() + props.put(KafkaConfig.MessageMaxBytesProp, "") + TestUtils.incrementalAlterConfigs(servers, adminClients.head, props, perBrokerConfig = false, opType = OpType.DELETE).all.get + servers.foreach { server => + waitForConfigOnServer(server, KafkaConfig.MessageMaxBytesProp, null) + } + + servers.foreach { server => assertEquals(137 * 3600000, server.logManager.currentDefaultConfig.retentionMs)} + + // Verify that configs of existing logs have been updated + newLogConfig = LogConfig(LogConfig.extractLogConfigMap(servers.head.config)) + newLogConfig.values().forEach((k, v) => TestUtils.waitUntilTrue(() => servers.head.logManager.currentDefaultConfig.values().get(k) == v, + k + " not updated in LogManager. expected: value=" + v + " actual: value=" + servers.head.logManager.currentDefaultConfig.values().get(k))) + + servers.foreach { server => + val log = server.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found")) + // Verify default values for these two configurations are restored on all brokers + TestUtils.waitUntilTrue(() => log.config.maxMessageSize == Defaults.MessageMaxBytes && log.config.retentionMs == 137 * 3600000, + "Existing topic config using defaults not updated") + } + } + @Test def testUncleanLeaderElectionEnable(): Unit = { val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get