From e6b20e651b125b4b9ab994d64249d0eb2ca74ccd Mon Sep 17 00:00:00 2001 From: Hao Geng Date: Wed, 4 Sep 2024 13:16:24 -0700 Subject: [PATCH] Add LiMinSegmentRollMs to prevent log roll to be so fast (#521) Today, the log roll time can be very short if maxCompactionLagMs and logRollTimeJitterMillis are not set properly. E.g. if logRollTimeJitterMillis is larger than maxCompactionLagMs, the log roll time can be as short as 0 millisecond so that new segments are rolling out on every new message. This will cause too many open file handles error and crash the process. This fix adds a min interval between segments rollout, and the interval will be able to configured at server side via config LiMinSegmentRollMs. In this PR, we set the default value to be 15 minutes, so that no new segments can be rollout within 15 minutes. --- core/src/main/scala/kafka/log/Log.scala | 8 +++-- core/src/main/scala/kafka/log/LogConfig.scala | 4 +++ .../src/main/scala/kafka/log/LogSegment.scala | 3 +- .../main/scala/kafka/server/KafkaConfig.scala | 5 +++ .../test/scala/unit/kafka/log/LogTest.scala | 36 +++++++++++++++++-- .../scala/unit/kafka/log/LogTestUtils.scala | 4 ++- 6 files changed, 53 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0e2fab85d49ed..1255ba225ceb7 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -187,7 +187,8 @@ case class RollParams(maxSegmentMs: Long, maxTimestampInMessages: Long, maxOffsetInMessages: Long, messagesSize: Int, - now: Long) + now: Long, + liMinLogRollMs: Long = 0) object RollParams { def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int, now: Long): RollParams = { @@ -195,7 +196,9 @@ object RollParams { config.segmentSize, appendInfo.maxTimestamp, appendInfo.lastOffset, - messagesSize, now) + messagesSize, + now, + config.liMinSegmentRollMs) } } @@ -2052,7 +2055,6 @@ class Log(@volatile private var _dir: File, ) } // FIXME: this code path involves not only data plane segments but also KRaft metadata logs. Should find a way to distinguish after moving to KRaft. - // XXX: An internal dashboard depends on parsing this warn log line. Get SRE reviews before changing the format. warn(s"Attempted truncating to offset $targetOffset. Resulted in truncated to $offsetTruncatedTo from the original log end offset $originalLogEndOffset, " + s"with $messagesTruncated messages and $bytesTruncated bytes truncated") diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 815c7fe309110..8a76e49de9a79 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -44,6 +44,7 @@ object Defaults { val SegmentSize = kafka.server.Defaults.LogSegmentBytes val SegmentMs = kafka.server.Defaults.LogRollHours * 60 * 60 * 1000L val SegmentJitterMs = kafka.server.Defaults.LogRollJitterHours * 60 * 60 * 1000L + val LiMinSegmentRollMs = kafka.server.Defaults.LiMinSegmentRollMs val FlushInterval = kafka.server.Defaults.LogFlushIntervalMessages val FlushMs = kafka.server.Defaults.LogFlushSchedulerIntervalMs val RetentionSize = kafka.server.Defaults.LogRetentionBytes @@ -87,6 +88,7 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String] val segmentSize = getInt(LogConfig.SegmentBytesProp) val segmentMs = getLong(LogConfig.SegmentMsProp) val segmentJitterMs = getLong(LogConfig.SegmentJitterMsProp) + val liMinSegmentRollMs = getLong(KafkaConfig.LiMinLogRollTimeMillisProp) val maxIndexSize = getInt(LogConfig.SegmentIndexBytesProp) val flushInterval = getLong(LogConfig.FlushMessagesProp) val flushMs = getLong(LogConfig.FlushMsProp) @@ -312,6 +314,8 @@ object LogConfig { KafkaConfig.LogRollTimeMillisProp) .define(SegmentJitterMsProp, LONG, Defaults.SegmentJitterMs, atLeast(0), MEDIUM, SegmentJitterMsDoc, KafkaConfig.LogRollTimeJitterMillisProp) + .define(KafkaConfig.LiMinLogRollTimeMillisProp, LONG, Defaults.LiMinSegmentRollMs, atLeast(0), MEDIUM, KafkaConfig.LiMinLogRollTimeMillisDoc, + KafkaConfig.LiMinLogRollTimeMillisProp) .define(SegmentIndexBytesProp, INT, Defaults.MaxIndexSize, atLeast(0), MEDIUM, MaxIndexSizeDoc, KafkaConfig.LogIndexSizeMaxBytesProp) .define(FlushMessagesProp, LONG, Defaults.FlushInterval, atLeast(0), MEDIUM, FlushIntervalDoc, diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index a07e23d7bc637..7e5ae1d1689f6 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -66,7 +66,8 @@ class LogSegment private[log] (val log: FileRecords, def timeIndex: TimeIndex = lazyTimeIndex.get def shouldRoll(rollParams: RollParams): Boolean = { - val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs + val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > + math.max(rollParams.maxSegmentMs - rollJitterMs, rollParams.liMinLogRollMs) size > rollParams.maxSegmentBytes - rollParams.messagesSize || (size > 0 && reachedRollMs) || offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 62d744ab9ff8a..928eb2d4e4731 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -138,6 +138,7 @@ object Defaults { val LogRollHours = 24 * 7 val LogRollJitterHours = 0 val LogRetentionHours = 24 * 7 + val LiMinSegmentRollMs = 15 * 60 * 1000L val LogRetentionBytes = -1L val LogCleanupIntervalMs = 5 * 60 * 1000L @@ -515,6 +516,8 @@ object KafkaConfig { val LogRollTimeJitterMillisProp = "log.roll.jitter.ms" val LogRollTimeJitterHoursProp = "log.roll.jitter.hours" + val LiMinLogRollTimeMillisProp = "li.min.log.roll.ms" + val LogRetentionTimeMillisProp = "log.retention.ms" val LogRetentionTimeMinutesProp = "log.retention.minutes" val LogRetentionTimeHoursProp = "log.retention.hours" @@ -919,6 +922,8 @@ object KafkaConfig { val LogRollTimeJitterMillisDoc = "The maximum jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the value in " + LogRollTimeJitterHoursProp + " is used" val LogRollTimeJitterHoursDoc = "The maximum jitter to subtract from logRollTimeMillis (in hours), secondary to " + LogRollTimeJitterMillisProp + " property" + val LiMinLogRollTimeMillisDoc = "The minimum interval (in milliseconds) before a log segment can be forced to roll over due to time-based rolling. Segments may still roll faster due to size limits." + val LogRetentionTimeMillisDoc = "The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in " + LogRetentionTimeMinutesProp + " is used. If set to -1, no time limit is applied." val LogRetentionTimeMinsDoc = "The number of minutes to keep a log file before deleting it (in minutes), secondary to " + LogRetentionTimeMillisProp + " property. If not set, the value in " + LogRetentionTimeHoursProp + " is used" val LogRetentionTimeHoursDoc = "The number of hours to keep a log file before deleting it (in hours), tertiary to " + LogRetentionTimeMillisProp + " property" diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 014b3d31723df..23ecbd45ed3a9 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -541,7 +541,7 @@ class LogTest { @Test def testTimeBasedLogRoll(): Unit = { def createRecords = TestUtils.singletonRecords("test".getBytes) - val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L) + val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, liMinSegmentRollMs = 0L) // create a log val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = 24 * 60) @@ -1665,7 +1665,7 @@ class LogTest { var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) val maxJitter = 20 * 60L // create a log - val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter) + val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter, liMinSegmentRollMs = 0L) val log = createLog(logDir, logConfig) assertEquals(1, log.numberOfSegments, "Log begins with a single empty segment.") log.appendAsLeader(set, leaderEpoch = 0) @@ -1682,6 +1682,38 @@ class LogTest { "Log should roll after segmentMs adjusted by random jitter") } + /** + * Test for jitter s for time based log roll with liMinSegmentRollMs not 0 + */ + @Test + def testTimeBasedLogRollJitterWithMinSegmentRollMs(): Unit = { + var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) + val maxJitter = 20 * 60L + // create a log + val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter, + liMinSegmentRollMs = 10 * 60 * 60L) + val log = createLog(logDir, logConfig) + assertEquals(1, log.numberOfSegments, "Log begins with a single empty segment.") + log.appendAsLeader(set, leaderEpoch = 0) + + mockTime.sleep(log.config.segmentMs - maxJitter) + set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) + log.appendAsLeader(set, leaderEpoch = 0) + assertEquals(1, log.numberOfSegments, + "Log does not roll on this append because it occurs earlier than max jitter") + mockTime.sleep(maxJitter - log.activeSegment.rollJitterMs + 1) + set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) + log.appendAsLeader(set, leaderEpoch = 0) + assertEquals(1, log.numberOfSegments, + "Log should not roll before liMinSegmentRollMs") + + mockTime.sleep(log.config.liMinSegmentRollMs) + set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) + log.appendAsLeader(set, leaderEpoch = 0) + assertEquals(2, log.numberOfSegments, + "Log should roll as it passes liMinSegmentRollMs") + } + /** * Test that appending more than the maximum segment size rolls the log */ diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala index 5b4edbcdd3b69..f851f1e054b52 100644 --- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala +++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala @@ -20,7 +20,7 @@ package kafka.log import java.io.File import java.util.Properties import kafka.server.checkpoints.LeaderEpochCheckpointFile -import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd, LogDirFailureChannel} +import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd, KafkaConfig, LogDirFailureChannel} import kafka.utils.{Scheduler, TestUtils} import org.apache.kafka.common.Uuid import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord} @@ -48,6 +48,7 @@ object LogTestUtils { def createLogConfig(segmentMs: Long = Defaults.SegmentMs, segmentBytes: Int = Defaults.SegmentSize, + liMinSegmentRollMs: Long = Defaults.LiMinSegmentRollMs, retentionMs: Long = Defaults.RetentionMs, retentionBytes: Long = Defaults.RetentionSize, localRetentionMs: Long = Defaults.LocalRetentionMs, @@ -75,6 +76,7 @@ object LogTestUtils { logProps.put(LogConfig.RemoteLogStorageEnableProp, remoteLogStorageEnable: java.lang.Boolean) logProps.put(LogConfig.LocalLogRetentionMsProp, localRetentionMs: java.lang.Long) logProps.put(LogConfig.LocalLogRetentionBytesProp, localRetentionBytes: java.lang.Long) + logProps.put(KafkaConfig.LiMinLogRollTimeMillisProp, liMinSegmentRollMs: java.lang.Long) LogConfig(logProps) }