diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index a1de59ba6182f..579d4b9c87102 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -677,13 +677,18 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok val currentLogConfig = logManager.currentDefaultConfig val origUncleanLeaderElectionEnable = logManager.currentDefaultConfig.uncleanLeaderElectionEnable val newBrokerDefaults = new util.HashMap[String, Object](currentLogConfig.originals) - newConfig.valuesFromThisConfig.forEach { (k, v) => - if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) { - DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName => - if (v == null) - newBrokerDefaults.remove(configName) - else - newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef]) + // Call extractLogConfigMap to get the new LogConfig because there are some special handling for some configs, e.g., + // RetentionMsProp. This is to ensure that the new LogConfig is consistent with LogManager's LogConfig creation behavior. + val newLogConfig = LogConfig.extractLogConfigMap(newConfig) + newLogConfig.forEach { (k, v) => + // Only update the config if it is a reconfigurable config. ReconfigurableConfigs is a subset of TopicConfigSynonyms.values. + // TopicConfigSynonyms is a map from log config name to kafka config name. Here we get the value of k and + // then check whether ReconfigurableConfigs contains it. + if (LogConfig.TopicConfigSynonyms.contains(k) && DynamicLogConfig.ReconfigurableConfigs.contains(LogConfig.TopicConfigSynonyms(k))) { + if (v == null) { + newBrokerDefaults.remove(k) + } else { + newBrokerDefaults.put(k, v.asInstanceOf[AnyRef]) } } } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 2177b796f6361..bdf1669ab9405 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -102,6 +102,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet @BeforeEach override def setUp(): Unit = { + doSetUp(true) + } + + private def doSetUp(setLogRetentionTimeMillis: Boolean): Unit = { startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism))) super.setUp() @@ -123,8 +127,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet props.put(KafkaConfig.NumReplicaFetchersProp, "2") // greater than one to test reducing threads props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "10000000") // non-default value to trigger a new metric props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret") - props.put(KafkaConfig.LogRetentionTimeMillisProp, 1680000000.toString) - props.put(KafkaConfig.LogRetentionTimeHoursProp, 168.toString) + if (setLogRetentionTimeMillis) { + props.put(KafkaConfig.LogRetentionTimeMillisProp, 1680000000.toString) + } + props.put(KafkaConfig.LogRetentionTimeHoursProp, 137.toString) addExtraProps(props) props ++= sslProperties1 @@ -232,7 +238,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet // Verify a few log configs with and without synonyms val expectedProps = new Properties expectedProps.setProperty(KafkaConfig.LogRetentionTimeMillisProp, "1680000000") - expectedProps.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "168") + expectedProps.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "137") expectedProps.setProperty(KafkaConfig.LogRollTimeHoursProp, "168") expectedProps.setProperty(KafkaConfig.LogCleanerThreadsProp, "1") val logRetentionMs = configEntry(configDesc, KafkaConfig.LogRetentionTimeMillisProp) @@ -678,6 +684,114 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet } } + @Test + def testLogRetentionConfig(): Unit = { + // Verify that only setting LogRetentionTimeHoursProp without setting LogRetentionTimeMillisProp, the log retention is updated correctly + // after dynamic reconfigure. + + tearDown() + adminClients.clear() + servers.clear() + // Create servers without setting LogRetentionTimeMillisProp + doSetUp(false) + + val (producerThread, consumerThread) = startProduceConsume(retries = 0) + + val props = new Properties + props.put(KafkaConfig.MessageMaxBytesProp, "99800") + + reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MessageMaxBytesProp, "99800")) + + // Verify that all broker defaults have been updated + servers.foreach { server => + props.forEach { (k, v) => + assertEquals(server.config.originals.get(k).toString, v, s"Not reconfigured $k") + } + } + + // Verify that configs of existing logs have been updated + var newLogConfig = LogConfig(LogConfig.extractLogConfigMap(servers.head.config)) + newLogConfig.values().forEach((k, v) => TestUtils.waitUntilTrue(() => servers.head.logManager.currentDefaultConfig.values().get(k) == v, + k + " not updated in LogManager. expected: value=" + v + " actual: value=" + servers.head.logManager.currentDefaultConfig.values().get(k))) + + // Verify that retentionMs is set with the value of LogRetentionTimeHoursProp + assertEquals(137 * 3600000, servers.head.logManager.currentDefaultConfig.retentionMs); + + val log = servers.head.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found")) + TestUtils.waitUntilTrue(() => log.config.maxMessageSize == 99800, "Existing topic config using defaults not updated") + TestUtils.waitUntilTrue(() => log.config.retentionMs == 137 * 3600000, "retenionMs is updated incorrectly"); + + props.asScala.foreach { case (k, v) => + val logConfigName = DynamicLogConfig.KafkaConfigToLogConfigName(k) + val expectedValue = if (k == KafkaConfig.LogCleanupPolicyProp) s"[$v]" else v + assertEquals(expectedValue, log.config.originals.get(logConfigName).toString, + s"Not reconfigured $logConfigName for existing log") + } + + // Verify that overridden topic configs are not updated when broker default is updated + val log2 = servers.head.logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)) + .getOrElse(throw new IllegalStateException("Log not found")) + assertFalse(log2.config.delete, "Overridden clean up policy should not be updated") + assertEquals(ProducerCompressionCodec.name, log2.config.compressionType) + + // Verify that even though broker defaults can be defined at default cluster level for consistent + // configuration across brokers, they can also be defined at per-broker level for testing + props.clear() + props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "500000") + props.put(KafkaConfig.LogRetentionTimeMillisProp, TimeUnit.DAYS.toMillis(2).toString) + alterConfigsOnServer(servers.head, props) + assertEquals(500000, servers.head.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp)) + assertEquals(TimeUnit.DAYS.toMillis(2), servers.head.config.values.get(KafkaConfig.LogRetentionTimeMillisProp)) + servers.tail.foreach { server => + assertEquals(Defaults.LogIndexSizeMaxBytes, server.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp)) + assertEquals(null, server.config.values.get(KafkaConfig.LogRetentionTimeMillisProp)) + } + assertEquals(TimeUnit.DAYS.toMillis(2), servers.head.logManager.currentDefaultConfig.retentionMs) + + // Verify that produce/consume worked throughout this test without any retries in producer + stopAndVerifyProduceConsume(producerThread, consumerThread) + + // Verify that configuration at both per-broker level and default cluster level could be deleted and + // the default value should be restored + props.clear() + props.put(KafkaConfig.LogRetentionTimeMillisProp, "") + props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "") + TestUtils.incrementalAlterConfigs(servers.take(1), adminClients.head, props, perBrokerConfig = true, opType = OpType.DELETE).all.get + TestUtils.incrementalAlterConfigs(servers, adminClients.head, props, perBrokerConfig = false, opType = OpType.DELETE).all.get + servers.foreach { server => + waitForConfigOnServer(server, KafkaConfig.LogRetentionTimeMillisProp, null) + } + servers.foreach { server => assertEquals(137 * 3600000, server.logManager.currentDefaultConfig.retentionMs)} + + servers.foreach { server => + val log = server.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found")) + // Verify default values for these two configurations are restored on all brokers + TestUtils.waitUntilTrue(() => log.config.maxIndexSize == Defaults.LogIndexSizeMaxBytes && log.config.retentionMs == 137 * 3600000, + "Existing topic config using defaults not updated") + } + + props.clear() + props.put(KafkaConfig.MessageMaxBytesProp, "") + TestUtils.incrementalAlterConfigs(servers, adminClients.head, props, perBrokerConfig = false, opType = OpType.DELETE).all.get + servers.foreach { server => + waitForConfigOnServer(server, KafkaConfig.MessageMaxBytesProp, null) + } + + servers.foreach { server => assertEquals(137 * 3600000, server.logManager.currentDefaultConfig.retentionMs)} + + // Verify that configs of existing logs have been updated + newLogConfig = LogConfig(LogConfig.extractLogConfigMap(servers.head.config)) + newLogConfig.values().forEach((k, v) => TestUtils.waitUntilTrue(() => servers.head.logManager.currentDefaultConfig.values().get(k) == v, + k + " not updated in LogManager. expected: value=" + v + " actual: value=" + servers.head.logManager.currentDefaultConfig.values().get(k))) + + servers.foreach { server => + val log = server.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found")) + // Verify default values for these two configurations are restored on all brokers + TestUtils.waitUntilTrue(() => log.config.maxMessageSize == Defaults.MessageMaxBytes && log.config.retentionMs == 137 * 3600000, + "Existing topic config using defaults not updated") + } + } + @Test def testUncleanLeaderElectionEnable(): Unit = { val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get