Skip to content

Commit

Permalink
Merge branch '3.0-li' into user/rjobse/testBranch
Browse files Browse the repository at this point in the history
  • Loading branch information
JobseRyan authored Mar 18, 2024
2 parents 1181561 + 962fabd commit 4683901
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 10 deletions.
19 changes: 12 additions & 7 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -677,13 +677,18 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok
val currentLogConfig = logManager.currentDefaultConfig
val origUncleanLeaderElectionEnable = logManager.currentDefaultConfig.uncleanLeaderElectionEnable
val newBrokerDefaults = new util.HashMap[String, Object](currentLogConfig.originals)
newConfig.valuesFromThisConfig.forEach { (k, v) =>
if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) {
DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName =>
if (v == null)
newBrokerDefaults.remove(configName)
else
newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
// Call extractLogConfigMap to get the new LogConfig because there are some special handling for some configs, e.g.,
// RetentionMsProp. This is to ensure that the new LogConfig is consistent with LogManager's LogConfig creation behavior.
val newLogConfig = LogConfig.extractLogConfigMap(newConfig)
newLogConfig.forEach { (k, v) =>
// Only update the config if it is a reconfigurable config. ReconfigurableConfigs is a subset of TopicConfigSynonyms.values.
// TopicConfigSynonyms is a map from log config name to kafka config name. Here we get the value of k and
// then check whether ReconfigurableConfigs contains it.
if (LogConfig.TopicConfigSynonyms.contains(k) && DynamicLogConfig.ReconfigurableConfigs.contains(LogConfig.TopicConfigSynonyms(k))) {
if (v == null) {
newBrokerDefaults.remove(k)
} else {
newBrokerDefaults.put(k, v.asInstanceOf[AnyRef])
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet

@BeforeEach
override def setUp(): Unit = {
doSetUp(true)
}

private def doSetUp(setLogRetentionTimeMillis: Boolean): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism)))
super.setUp()

Expand All @@ -123,8 +127,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
props.put(KafkaConfig.NumReplicaFetchersProp, "2") // greater than one to test reducing threads
props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "10000000") // non-default value to trigger a new metric
props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret")
props.put(KafkaConfig.LogRetentionTimeMillisProp, 1680000000.toString)
props.put(KafkaConfig.LogRetentionTimeHoursProp, 168.toString)
if (setLogRetentionTimeMillis) {
props.put(KafkaConfig.LogRetentionTimeMillisProp, 1680000000.toString)
}
props.put(KafkaConfig.LogRetentionTimeHoursProp, 137.toString)
addExtraProps(props)

props ++= sslProperties1
Expand Down Expand Up @@ -232,7 +238,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
// Verify a few log configs with and without synonyms
val expectedProps = new Properties
expectedProps.setProperty(KafkaConfig.LogRetentionTimeMillisProp, "1680000000")
expectedProps.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "168")
expectedProps.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "137")
expectedProps.setProperty(KafkaConfig.LogRollTimeHoursProp, "168")
expectedProps.setProperty(KafkaConfig.LogCleanerThreadsProp, "1")
val logRetentionMs = configEntry(configDesc, KafkaConfig.LogRetentionTimeMillisProp)
Expand Down Expand Up @@ -678,6 +684,114 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
}

@Test
def testLogRetentionConfig(): Unit = {
// Verify that only setting LogRetentionTimeHoursProp without setting LogRetentionTimeMillisProp, the log retention is updated correctly
// after dynamic reconfigure.

tearDown()
adminClients.clear()
servers.clear()
// Create servers without setting LogRetentionTimeMillisProp
doSetUp(false)

val (producerThread, consumerThread) = startProduceConsume(retries = 0)

val props = new Properties
props.put(KafkaConfig.MessageMaxBytesProp, "99800")

reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MessageMaxBytesProp, "99800"))

// Verify that all broker defaults have been updated
servers.foreach { server =>
props.forEach { (k, v) =>
assertEquals(server.config.originals.get(k).toString, v, s"Not reconfigured $k")
}
}

// Verify that configs of existing logs have been updated
var newLogConfig = LogConfig(LogConfig.extractLogConfigMap(servers.head.config))
newLogConfig.values().forEach((k, v) => TestUtils.waitUntilTrue(() => servers.head.logManager.currentDefaultConfig.values().get(k) == v,
k + " not updated in LogManager. expected: value=" + v + " actual: value=" + servers.head.logManager.currentDefaultConfig.values().get(k)))

// Verify that retentionMs is set with the value of LogRetentionTimeHoursProp
assertEquals(137 * 3600000, servers.head.logManager.currentDefaultConfig.retentionMs);

val log = servers.head.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found"))
TestUtils.waitUntilTrue(() => log.config.maxMessageSize == 99800, "Existing topic config using defaults not updated")
TestUtils.waitUntilTrue(() => log.config.retentionMs == 137 * 3600000, "retenionMs is updated incorrectly");

props.asScala.foreach { case (k, v) =>
val logConfigName = DynamicLogConfig.KafkaConfigToLogConfigName(k)
val expectedValue = if (k == KafkaConfig.LogCleanupPolicyProp) s"[$v]" else v
assertEquals(expectedValue, log.config.originals.get(logConfigName).toString,
s"Not reconfigured $logConfigName for existing log")
}

// Verify that overridden topic configs are not updated when broker default is updated
val log2 = servers.head.logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0))
.getOrElse(throw new IllegalStateException("Log not found"))
assertFalse(log2.config.delete, "Overridden clean up policy should not be updated")
assertEquals(ProducerCompressionCodec.name, log2.config.compressionType)

// Verify that even though broker defaults can be defined at default cluster level for consistent
// configuration across brokers, they can also be defined at per-broker level for testing
props.clear()
props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "500000")
props.put(KafkaConfig.LogRetentionTimeMillisProp, TimeUnit.DAYS.toMillis(2).toString)
alterConfigsOnServer(servers.head, props)
assertEquals(500000, servers.head.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp))
assertEquals(TimeUnit.DAYS.toMillis(2), servers.head.config.values.get(KafkaConfig.LogRetentionTimeMillisProp))
servers.tail.foreach { server =>
assertEquals(Defaults.LogIndexSizeMaxBytes, server.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp))
assertEquals(null, server.config.values.get(KafkaConfig.LogRetentionTimeMillisProp))
}
assertEquals(TimeUnit.DAYS.toMillis(2), servers.head.logManager.currentDefaultConfig.retentionMs)

// Verify that produce/consume worked throughout this test without any retries in producer
stopAndVerifyProduceConsume(producerThread, consumerThread)

// Verify that configuration at both per-broker level and default cluster level could be deleted and
// the default value should be restored
props.clear()
props.put(KafkaConfig.LogRetentionTimeMillisProp, "")
props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "")
TestUtils.incrementalAlterConfigs(servers.take(1), adminClients.head, props, perBrokerConfig = true, opType = OpType.DELETE).all.get
TestUtils.incrementalAlterConfigs(servers, adminClients.head, props, perBrokerConfig = false, opType = OpType.DELETE).all.get
servers.foreach { server =>
waitForConfigOnServer(server, KafkaConfig.LogRetentionTimeMillisProp, null)
}
servers.foreach { server => assertEquals(137 * 3600000, server.logManager.currentDefaultConfig.retentionMs)}

servers.foreach { server =>
val log = server.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found"))
// Verify default values for these two configurations are restored on all brokers
TestUtils.waitUntilTrue(() => log.config.maxIndexSize == Defaults.LogIndexSizeMaxBytes && log.config.retentionMs == 137 * 3600000,
"Existing topic config using defaults not updated")
}

props.clear()
props.put(KafkaConfig.MessageMaxBytesProp, "")
TestUtils.incrementalAlterConfigs(servers, adminClients.head, props, perBrokerConfig = false, opType = OpType.DELETE).all.get
servers.foreach { server =>
waitForConfigOnServer(server, KafkaConfig.MessageMaxBytesProp, null)
}

servers.foreach { server => assertEquals(137 * 3600000, server.logManager.currentDefaultConfig.retentionMs)}

// Verify that configs of existing logs have been updated
newLogConfig = LogConfig(LogConfig.extractLogConfigMap(servers.head.config))
newLogConfig.values().forEach((k, v) => TestUtils.waitUntilTrue(() => servers.head.logManager.currentDefaultConfig.values().get(k) == v,
k + " not updated in LogManager. expected: value=" + v + " actual: value=" + servers.head.logManager.currentDefaultConfig.values().get(k)))

servers.foreach { server =>
val log = server.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found"))
// Verify default values for these two configurations are restored on all brokers
TestUtils.waitUntilTrue(() => log.config.maxMessageSize == Defaults.MessageMaxBytes && log.config.retentionMs == 137 * 3600000,
"Existing topic config using defaults not updated")
}
}

@Test
def testUncleanLeaderElectionEnable(): Unit = {
val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get
Expand Down

0 comments on commit 4683901

Please sign in to comment.