From ea2ccfe4fc33c0919759688db07e84999ee7c295 Mon Sep 17 00:00:00 2001 From: Hao Geng Date: Wed, 4 Sep 2024 11:09:05 -0700 Subject: [PATCH 1/3] Add LiMinSegmentRollMs to prevent log roll to be so fast --- core/src/main/scala/kafka/log/Log.scala | 9 ++++++--- core/src/main/scala/kafka/log/LogConfig.scala | 4 ++++ core/src/main/scala/kafka/log/LogSegment.scala | 3 ++- core/src/main/scala/kafka/server/KafkaConfig.scala | 5 +++++ 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0e2fab85d49e..2e472848785f 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -187,7 +187,8 @@ case class RollParams(maxSegmentMs: Long, maxTimestampInMessages: Long, maxOffsetInMessages: Long, messagesSize: Int, - now: Long) + now: Long, + liMinLogRollMs: Long = 0) object RollParams { def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int, now: Long): RollParams = { @@ -195,7 +196,9 @@ object RollParams { config.segmentSize, appendInfo.maxTimestamp, appendInfo.lastOffset, - messagesSize, now) + messagesSize, + now, + config.liMinSegmentRollMs) } } @@ -2052,7 +2055,7 @@ class Log(@volatile private var _dir: File, ) } // FIXME: this code path involves not only data plane segments but also KRaft metadata logs. Should find a way to distinguish after moving to KRaft. - + // XXX: An internal dashboard depends on parsing this warn log line. Get SRE reviews before changing the format. warn(s"Attempted truncating to offset $targetOffset. Resulted in truncated to $offsetTruncatedTo from the original log end offset $originalLogEndOffset, " + s"with $messagesTruncated messages and $bytesTruncated bytes truncated") diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 815c7fe30911..8a76e49de9a7 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -44,6 +44,7 @@ object Defaults { val SegmentSize = kafka.server.Defaults.LogSegmentBytes val SegmentMs = kafka.server.Defaults.LogRollHours * 60 * 60 * 1000L val SegmentJitterMs = kafka.server.Defaults.LogRollJitterHours * 60 * 60 * 1000L + val LiMinSegmentRollMs = kafka.server.Defaults.LiMinSegmentRollMs val FlushInterval = kafka.server.Defaults.LogFlushIntervalMessages val FlushMs = kafka.server.Defaults.LogFlushSchedulerIntervalMs val RetentionSize = kafka.server.Defaults.LogRetentionBytes @@ -87,6 +88,7 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String] val segmentSize = getInt(LogConfig.SegmentBytesProp) val segmentMs = getLong(LogConfig.SegmentMsProp) val segmentJitterMs = getLong(LogConfig.SegmentJitterMsProp) + val liMinSegmentRollMs = getLong(KafkaConfig.LiMinLogRollTimeMillisProp) val maxIndexSize = getInt(LogConfig.SegmentIndexBytesProp) val flushInterval = getLong(LogConfig.FlushMessagesProp) val flushMs = getLong(LogConfig.FlushMsProp) @@ -312,6 +314,8 @@ object LogConfig { KafkaConfig.LogRollTimeMillisProp) .define(SegmentJitterMsProp, LONG, Defaults.SegmentJitterMs, atLeast(0), MEDIUM, SegmentJitterMsDoc, KafkaConfig.LogRollTimeJitterMillisProp) + .define(KafkaConfig.LiMinLogRollTimeMillisProp, LONG, Defaults.LiMinSegmentRollMs, atLeast(0), MEDIUM, KafkaConfig.LiMinLogRollTimeMillisDoc, + KafkaConfig.LiMinLogRollTimeMillisProp) .define(SegmentIndexBytesProp, INT, Defaults.MaxIndexSize, atLeast(0), MEDIUM, MaxIndexSizeDoc, KafkaConfig.LogIndexSizeMaxBytesProp) .define(FlushMessagesProp, LONG, Defaults.FlushInterval, atLeast(0), MEDIUM, FlushIntervalDoc, diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index a07e23d7bc63..7e5ae1d1689f 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -66,7 +66,8 @@ class LogSegment private[log] (val log: FileRecords, def timeIndex: TimeIndex = lazyTimeIndex.get def shouldRoll(rollParams: RollParams): Boolean = { - val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs + val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > + math.max(rollParams.maxSegmentMs - rollJitterMs, rollParams.liMinLogRollMs) size > rollParams.maxSegmentBytes - rollParams.messagesSize || (size > 0 && reachedRollMs) || offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 62d744ab9ff8..15eb5d9b4663 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -138,6 +138,7 @@ object Defaults { val LogRollHours = 24 * 7 val LogRollJitterHours = 0 val LogRetentionHours = 24 * 7 + val LiMinSegmentRollMs = 15 * 60 * 1000L val LogRetentionBytes = -1L val LogCleanupIntervalMs = 5 * 60 * 1000L @@ -515,6 +516,8 @@ object KafkaConfig { val LogRollTimeJitterMillisProp = "log.roll.jitter.ms" val LogRollTimeJitterHoursProp = "log.roll.jitter.hours" + val LiMinLogRollTimeMillisProp = "li.min.log.roll.ms" + val LogRetentionTimeMillisProp = "log.retention.ms" val LogRetentionTimeMinutesProp = "log.retention.minutes" val LogRetentionTimeHoursProp = "log.retention.hours" @@ -919,6 +922,8 @@ object KafkaConfig { val LogRollTimeJitterMillisDoc = "The maximum jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the value in " + LogRollTimeJitterHoursProp + " is used" val LogRollTimeJitterHoursDoc = "The maximum jitter to subtract from logRollTimeMillis (in hours), secondary to " + LogRollTimeJitterMillisProp + " property" + val LiMinLogRollTimeMillisDoc = "The minimum interval (in milliseconds) between new segment rolling out." + val LogRetentionTimeMillisDoc = "The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in " + LogRetentionTimeMinutesProp + " is used. If set to -1, no time limit is applied." val LogRetentionTimeMinsDoc = "The number of minutes to keep a log file before deleting it (in minutes), secondary to " + LogRetentionTimeMillisProp + " property. If not set, the value in " + LogRetentionTimeHoursProp + " is used" val LogRetentionTimeHoursDoc = "The number of hours to keep a log file before deleting it (in hours), tertiary to " + LogRetentionTimeMillisProp + " property" From 1515b4612c2cdf6f2dd717a81d04ca135e129825 Mon Sep 17 00:00:00 2001 From: Hao Geng Date: Wed, 4 Sep 2024 11:11:07 -0700 Subject: [PATCH 2/3] Remove blank line --- core/src/main/scala/kafka/log/Log.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 2e472848785f..1255ba225ceb 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -2055,7 +2055,6 @@ class Log(@volatile private var _dir: File, ) } // FIXME: this code path involves not only data plane segments but also KRaft metadata logs. Should find a way to distinguish after moving to KRaft. - // XXX: An internal dashboard depends on parsing this warn log line. Get SRE reviews before changing the format. warn(s"Attempted truncating to offset $targetOffset. Resulted in truncated to $offsetTruncatedTo from the original log end offset $originalLogEndOffset, " + s"with $messagesTruncated messages and $bytesTruncated bytes truncated") From e5f46af0f523f4a72d4591708b370e51c92d4963 Mon Sep 17 00:00:00 2001 From: Hao Geng Date: Wed, 4 Sep 2024 12:57:40 -0700 Subject: [PATCH 3/3] Address Greg's comment and fix/add unit test --- .../main/scala/kafka/server/KafkaConfig.scala | 2 +- .../test/scala/unit/kafka/log/LogTest.scala | 36 +++++++++++++++++-- .../scala/unit/kafka/log/LogTestUtils.scala | 4 ++- 3 files changed, 38 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 15eb5d9b4663..928eb2d4e473 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -922,7 +922,7 @@ object KafkaConfig { val LogRollTimeJitterMillisDoc = "The maximum jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the value in " + LogRollTimeJitterHoursProp + " is used" val LogRollTimeJitterHoursDoc = "The maximum jitter to subtract from logRollTimeMillis (in hours), secondary to " + LogRollTimeJitterMillisProp + " property" - val LiMinLogRollTimeMillisDoc = "The minimum interval (in milliseconds) between new segment rolling out." + val LiMinLogRollTimeMillisDoc = "The minimum interval (in milliseconds) before a log segment can be forced to roll over due to time-based rolling. Segments may still roll faster due to size limits." val LogRetentionTimeMillisDoc = "The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in " + LogRetentionTimeMinutesProp + " is used. If set to -1, no time limit is applied." val LogRetentionTimeMinsDoc = "The number of minutes to keep a log file before deleting it (in minutes), secondary to " + LogRetentionTimeMillisProp + " property. If not set, the value in " + LogRetentionTimeHoursProp + " is used" diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 014b3d31723d..23ecbd45ed3a 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -541,7 +541,7 @@ class LogTest { @Test def testTimeBasedLogRoll(): Unit = { def createRecords = TestUtils.singletonRecords("test".getBytes) - val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L) + val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, liMinSegmentRollMs = 0L) // create a log val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = 24 * 60) @@ -1665,7 +1665,7 @@ class LogTest { var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) val maxJitter = 20 * 60L // create a log - val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter) + val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter, liMinSegmentRollMs = 0L) val log = createLog(logDir, logConfig) assertEquals(1, log.numberOfSegments, "Log begins with a single empty segment.") log.appendAsLeader(set, leaderEpoch = 0) @@ -1682,6 +1682,38 @@ class LogTest { "Log should roll after segmentMs adjusted by random jitter") } + /** + * Test for jitter s for time based log roll with liMinSegmentRollMs not 0 + */ + @Test + def testTimeBasedLogRollJitterWithMinSegmentRollMs(): Unit = { + var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) + val maxJitter = 20 * 60L + // create a log + val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter, + liMinSegmentRollMs = 10 * 60 * 60L) + val log = createLog(logDir, logConfig) + assertEquals(1, log.numberOfSegments, "Log begins with a single empty segment.") + log.appendAsLeader(set, leaderEpoch = 0) + + mockTime.sleep(log.config.segmentMs - maxJitter) + set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) + log.appendAsLeader(set, leaderEpoch = 0) + assertEquals(1, log.numberOfSegments, + "Log does not roll on this append because it occurs earlier than max jitter") + mockTime.sleep(maxJitter - log.activeSegment.rollJitterMs + 1) + set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) + log.appendAsLeader(set, leaderEpoch = 0) + assertEquals(1, log.numberOfSegments, + "Log should not roll before liMinSegmentRollMs") + + mockTime.sleep(log.config.liMinSegmentRollMs) + set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) + log.appendAsLeader(set, leaderEpoch = 0) + assertEquals(2, log.numberOfSegments, + "Log should roll as it passes liMinSegmentRollMs") + } + /** * Test that appending more than the maximum segment size rolls the log */ diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala index 5b4edbcdd3b6..f851f1e054b5 100644 --- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala +++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala @@ -20,7 +20,7 @@ package kafka.log import java.io.File import java.util.Properties import kafka.server.checkpoints.LeaderEpochCheckpointFile -import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd, LogDirFailureChannel} +import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd, KafkaConfig, LogDirFailureChannel} import kafka.utils.{Scheduler, TestUtils} import org.apache.kafka.common.Uuid import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord} @@ -48,6 +48,7 @@ object LogTestUtils { def createLogConfig(segmentMs: Long = Defaults.SegmentMs, segmentBytes: Int = Defaults.SegmentSize, + liMinSegmentRollMs: Long = Defaults.LiMinSegmentRollMs, retentionMs: Long = Defaults.RetentionMs, retentionBytes: Long = Defaults.RetentionSize, localRetentionMs: Long = Defaults.LocalRetentionMs, @@ -75,6 +76,7 @@ object LogTestUtils { logProps.put(LogConfig.RemoteLogStorageEnableProp, remoteLogStorageEnable: java.lang.Boolean) logProps.put(LogConfig.LocalLogRetentionMsProp, localRetentionMs: java.lang.Long) logProps.put(LogConfig.LocalLogRetentionBytesProp, localRetentionBytes: java.lang.Long) + logProps.put(KafkaConfig.LiMinLogRollTimeMillisProp, liMinSegmentRollMs: java.lang.Long) LogConfig(logProps) }