Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LiMinSegmentRollMs to prevent log roll to be so fast #521

Merged
merged 3 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,18 @@ case class RollParams(maxSegmentMs: Long,
maxTimestampInMessages: Long,
maxOffsetInMessages: Long,
messagesSize: Int,
now: Long)
now: Long,
liMinLogRollMs: Long = 0)

object RollParams {
def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int, now: Long): RollParams = {
new RollParams(config.maxSegmentMs,
config.segmentSize,
appendInfo.maxTimestamp,
appendInfo.lastOffset,
messagesSize, now)
messagesSize,
now,
config.liMinSegmentRollMs)
}
}

Expand Down Expand Up @@ -2052,7 +2055,6 @@ class Log(@volatile private var _dir: File,
)
}
// FIXME: this code path involves not only data plane segments but also KRaft metadata logs. Should find a way to distinguish after moving to KRaft.

// XXX: An internal dashboard depends on parsing this warn log line. Get SRE reviews before changing the format.
warn(s"Attempted truncating to offset $targetOffset. Resulted in truncated to $offsetTruncatedTo from the original log end offset $originalLogEndOffset, " +
s"with $messagesTruncated messages and $bytesTruncated bytes truncated")
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/log/LogConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object Defaults {
val SegmentSize = kafka.server.Defaults.LogSegmentBytes
val SegmentMs = kafka.server.Defaults.LogRollHours * 60 * 60 * 1000L
val SegmentJitterMs = kafka.server.Defaults.LogRollJitterHours * 60 * 60 * 1000L
val LiMinSegmentRollMs = kafka.server.Defaults.LiMinSegmentRollMs
val FlushInterval = kafka.server.Defaults.LogFlushIntervalMessages
val FlushMs = kafka.server.Defaults.LogFlushSchedulerIntervalMs
val RetentionSize = kafka.server.Defaults.LogRetentionBytes
Expand Down Expand Up @@ -87,6 +88,7 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String]
val segmentSize = getInt(LogConfig.SegmentBytesProp)
val segmentMs = getLong(LogConfig.SegmentMsProp)
val segmentJitterMs = getLong(LogConfig.SegmentJitterMsProp)
val liMinSegmentRollMs = getLong(KafkaConfig.LiMinLogRollTimeMillisProp)
val maxIndexSize = getInt(LogConfig.SegmentIndexBytesProp)
val flushInterval = getLong(LogConfig.FlushMessagesProp)
val flushMs = getLong(LogConfig.FlushMsProp)
Expand Down Expand Up @@ -312,6 +314,8 @@ object LogConfig {
KafkaConfig.LogRollTimeMillisProp)
.define(SegmentJitterMsProp, LONG, Defaults.SegmentJitterMs, atLeast(0), MEDIUM, SegmentJitterMsDoc,
KafkaConfig.LogRollTimeJitterMillisProp)
.define(KafkaConfig.LiMinLogRollTimeMillisProp, LONG, Defaults.LiMinSegmentRollMs, atLeast(0), MEDIUM, KafkaConfig.LiMinLogRollTimeMillisDoc,
KafkaConfig.LiMinLogRollTimeMillisProp)
.define(SegmentIndexBytesProp, INT, Defaults.MaxIndexSize, atLeast(0), MEDIUM, MaxIndexSizeDoc,
KafkaConfig.LogIndexSizeMaxBytesProp)
.define(FlushMessagesProp, LONG, Defaults.FlushInterval, atLeast(0), MEDIUM, FlushIntervalDoc,
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class LogSegment private[log] (val log: FileRecords,
def timeIndex: TimeIndex = lazyTimeIndex.get

def shouldRoll(rollParams: RollParams): Boolean = {
val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs
val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) >
math.max(rollParams.maxSegmentMs - rollJitterMs, rollParams.liMinLogRollMs)
size > rollParams.maxSegmentBytes - rollParams.messagesSize ||
(size > 0 && reachedRollMs) ||
offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages)
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ object Defaults {
val LogRollHours = 24 * 7
val LogRollJitterHours = 0
val LogRetentionHours = 24 * 7
val LiMinSegmentRollMs = 15 * 60 * 1000L

val LogRetentionBytes = -1L
val LogCleanupIntervalMs = 5 * 60 * 1000L
Expand Down Expand Up @@ -515,6 +516,8 @@ object KafkaConfig {
val LogRollTimeJitterMillisProp = "log.roll.jitter.ms"
val LogRollTimeJitterHoursProp = "log.roll.jitter.hours"

val LiMinLogRollTimeMillisProp = "li.min.log.roll.ms"

val LogRetentionTimeMillisProp = "log.retention.ms"
val LogRetentionTimeMinutesProp = "log.retention.minutes"
val LogRetentionTimeHoursProp = "log.retention.hours"
Expand Down Expand Up @@ -919,6 +922,8 @@ object KafkaConfig {
val LogRollTimeJitterMillisDoc = "The maximum jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the value in " + LogRollTimeJitterHoursProp + " is used"
val LogRollTimeJitterHoursDoc = "The maximum jitter to subtract from logRollTimeMillis (in hours), secondary to " + LogRollTimeJitterMillisProp + " property"

val LiMinLogRollTimeMillisDoc = "The minimum interval (in milliseconds) before a log segment can be forced to roll over due to time-based rolling. Segments may still roll faster due to size limits."

val LogRetentionTimeMillisDoc = "The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in " + LogRetentionTimeMinutesProp + " is used. If set to -1, no time limit is applied."
val LogRetentionTimeMinsDoc = "The number of minutes to keep a log file before deleting it (in minutes), secondary to " + LogRetentionTimeMillisProp + " property. If not set, the value in " + LogRetentionTimeHoursProp + " is used"
val LogRetentionTimeHoursDoc = "The number of hours to keep a log file before deleting it (in hours), tertiary to " + LogRetentionTimeMillisProp + " property"
Expand Down
36 changes: 34 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ class LogTest {
@Test
def testTimeBasedLogRoll(): Unit = {
def createRecords = TestUtils.singletonRecords("test".getBytes)
val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L)
val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, liMinSegmentRollMs = 0L)

// create a log
val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = 24 * 60)
Expand Down Expand Up @@ -1665,7 +1665,7 @@ class LogTest {
var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
val maxJitter = 20 * 60L
// create a log
val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter)
val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter, liMinSegmentRollMs = 0L)
val log = createLog(logDir, logConfig)
assertEquals(1, log.numberOfSegments, "Log begins with a single empty segment.")
log.appendAsLeader(set, leaderEpoch = 0)
Expand All @@ -1682,6 +1682,38 @@ class LogTest {
"Log should roll after segmentMs adjusted by random jitter")
}

/**
* Test for jitter s for time based log roll with liMinSegmentRollMs not 0
*/
@Test
def testTimeBasedLogRollJitterWithMinSegmentRollMs(): Unit = {
var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
val maxJitter = 20 * 60L
// create a log
val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter,
liMinSegmentRollMs = 10 * 60 * 60L)
val log = createLog(logDir, logConfig)
assertEquals(1, log.numberOfSegments, "Log begins with a single empty segment.")
log.appendAsLeader(set, leaderEpoch = 0)

mockTime.sleep(log.config.segmentMs - maxJitter)
set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
log.appendAsLeader(set, leaderEpoch = 0)
assertEquals(1, log.numberOfSegments,
"Log does not roll on this append because it occurs earlier than max jitter")
mockTime.sleep(maxJitter - log.activeSegment.rollJitterMs + 1)
set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
log.appendAsLeader(set, leaderEpoch = 0)
assertEquals(1, log.numberOfSegments,
"Log should not roll before liMinSegmentRollMs")

mockTime.sleep(log.config.liMinSegmentRollMs)
set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
log.appendAsLeader(set, leaderEpoch = 0)
assertEquals(2, log.numberOfSegments,
"Log should roll as it passes liMinSegmentRollMs")
}

/**
* Test that appending more than the maximum segment size rolls the log
*/
Expand Down
4 changes: 3 additions & 1 deletion core/src/test/scala/unit/kafka/log/LogTestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package kafka.log
import java.io.File
import java.util.Properties
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd, LogDirFailureChannel}
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd, KafkaConfig, LogDirFailureChannel}
import kafka.utils.{Scheduler, TestUtils}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
Expand Down Expand Up @@ -48,6 +48,7 @@ object LogTestUtils {

def createLogConfig(segmentMs: Long = Defaults.SegmentMs,
segmentBytes: Int = Defaults.SegmentSize,
liMinSegmentRollMs: Long = Defaults.LiMinSegmentRollMs,
retentionMs: Long = Defaults.RetentionMs,
retentionBytes: Long = Defaults.RetentionSize,
localRetentionMs: Long = Defaults.LocalRetentionMs,
Expand Down Expand Up @@ -75,6 +76,7 @@ object LogTestUtils {
logProps.put(LogConfig.RemoteLogStorageEnableProp, remoteLogStorageEnable: java.lang.Boolean)
logProps.put(LogConfig.LocalLogRetentionMsProp, localRetentionMs: java.lang.Long)
logProps.put(LogConfig.LocalLogRetentionBytesProp, localRetentionBytes: java.lang.Long)
logProps.put(KafkaConfig.LiMinLogRollTimeMillisProp, liMinSegmentRollMs: java.lang.Long)
LogConfig(logProps)
}

Expand Down
Loading