diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 15eb5d9b4663a..928eb2d4e4731 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -922,7 +922,7 @@ object KafkaConfig { val LogRollTimeJitterMillisDoc = "The maximum jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the value in " + LogRollTimeJitterHoursProp + " is used" val LogRollTimeJitterHoursDoc = "The maximum jitter to subtract from logRollTimeMillis (in hours), secondary to " + LogRollTimeJitterMillisProp + " property" - val LiMinLogRollTimeMillisDoc = "The minimum interval (in milliseconds) between new segment rolling out." + val LiMinLogRollTimeMillisDoc = "The minimum interval (in milliseconds) before a log segment can be forced to roll over due to time-based rolling. Segments may still roll faster due to size limits." val LogRetentionTimeMillisDoc = "The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in " + LogRetentionTimeMinutesProp + " is used. If set to -1, no time limit is applied." val LogRetentionTimeMinsDoc = "The number of minutes to keep a log file before deleting it (in minutes), secondary to " + LogRetentionTimeMillisProp + " property. If not set, the value in " + LogRetentionTimeHoursProp + " is used" diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 014b3d31723df..23ecbd45ed3a9 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -541,7 +541,7 @@ class LogTest { @Test def testTimeBasedLogRoll(): Unit = { def createRecords = TestUtils.singletonRecords("test".getBytes) - val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L) + val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, liMinSegmentRollMs = 0L) // create a log val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = 24 * 60) @@ -1665,7 +1665,7 @@ class LogTest { var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) val maxJitter = 20 * 60L // create a log - val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter) + val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter, liMinSegmentRollMs = 0L) val log = createLog(logDir, logConfig) assertEquals(1, log.numberOfSegments, "Log begins with a single empty segment.") log.appendAsLeader(set, leaderEpoch = 0) @@ -1682,6 +1682,38 @@ class LogTest { "Log should roll after segmentMs adjusted by random jitter") } + /** + * Test for jitter s for time based log roll with liMinSegmentRollMs not 0 + */ + @Test + def testTimeBasedLogRollJitterWithMinSegmentRollMs(): Unit = { + var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) + val maxJitter = 20 * 60L + // create a log + val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter, + liMinSegmentRollMs = 10 * 60 * 60L) + val log = createLog(logDir, logConfig) + assertEquals(1, log.numberOfSegments, "Log begins with a single empty segment.") + log.appendAsLeader(set, leaderEpoch = 0) + + mockTime.sleep(log.config.segmentMs - maxJitter) + set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) + log.appendAsLeader(set, leaderEpoch = 0) + assertEquals(1, log.numberOfSegments, + "Log does not roll on this append because it occurs earlier than max jitter") + mockTime.sleep(maxJitter - log.activeSegment.rollJitterMs + 1) + set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) + log.appendAsLeader(set, leaderEpoch = 0) + assertEquals(1, log.numberOfSegments, + "Log should not roll before liMinSegmentRollMs") + + mockTime.sleep(log.config.liMinSegmentRollMs) + set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) + log.appendAsLeader(set, leaderEpoch = 0) + assertEquals(2, log.numberOfSegments, + "Log should roll as it passes liMinSegmentRollMs") + } + /** * Test that appending more than the maximum segment size rolls the log */ diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala index 5b4edbcdd3b69..f851f1e054b52 100644 --- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala +++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala @@ -20,7 +20,7 @@ package kafka.log import java.io.File import java.util.Properties import kafka.server.checkpoints.LeaderEpochCheckpointFile -import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd, LogDirFailureChannel} +import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd, KafkaConfig, LogDirFailureChannel} import kafka.utils.{Scheduler, TestUtils} import org.apache.kafka.common.Uuid import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord} @@ -48,6 +48,7 @@ object LogTestUtils { def createLogConfig(segmentMs: Long = Defaults.SegmentMs, segmentBytes: Int = Defaults.SegmentSize, + liMinSegmentRollMs: Long = Defaults.LiMinSegmentRollMs, retentionMs: Long = Defaults.RetentionMs, retentionBytes: Long = Defaults.RetentionSize, localRetentionMs: Long = Defaults.LocalRetentionMs, @@ -75,6 +76,7 @@ object LogTestUtils { logProps.put(LogConfig.RemoteLogStorageEnableProp, remoteLogStorageEnable: java.lang.Boolean) logProps.put(LogConfig.LocalLogRetentionMsProp, localRetentionMs: java.lang.Long) logProps.put(LogConfig.LocalLogRetentionBytesProp, localRetentionBytes: java.lang.Long) + logProps.put(KafkaConfig.LiMinLogRollTimeMillisProp, liMinSegmentRollMs: java.lang.Long) LogConfig(logProps) }