Skip to content

Commit

Permalink
Address Greg's comment and fix/add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
Hao Geng committed Sep 4, 2024
1 parent 1515b46 commit e5f46af
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 4 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ object KafkaConfig {
val LogRollTimeJitterMillisDoc = "The maximum jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the value in " + LogRollTimeJitterHoursProp + " is used"
val LogRollTimeJitterHoursDoc = "The maximum jitter to subtract from logRollTimeMillis (in hours), secondary to " + LogRollTimeJitterMillisProp + " property"

val LiMinLogRollTimeMillisDoc = "The minimum interval (in milliseconds) between new segment rolling out."
val LiMinLogRollTimeMillisDoc = "The minimum interval (in milliseconds) before a log segment can be forced to roll over due to time-based rolling. Segments may still roll faster due to size limits."

val LogRetentionTimeMillisDoc = "The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in " + LogRetentionTimeMinutesProp + " is used. If set to -1, no time limit is applied."
val LogRetentionTimeMinsDoc = "The number of minutes to keep a log file before deleting it (in minutes), secondary to " + LogRetentionTimeMillisProp + " property. If not set, the value in " + LogRetentionTimeHoursProp + " is used"
Expand Down
36 changes: 34 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ class LogTest {
@Test
def testTimeBasedLogRoll(): Unit = {
def createRecords = TestUtils.singletonRecords("test".getBytes)
val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L)
val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, liMinSegmentRollMs = 0L)

// create a log
val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = 24 * 60)
Expand Down Expand Up @@ -1665,7 +1665,7 @@ class LogTest {
var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
val maxJitter = 20 * 60L
// create a log
val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter)
val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter, liMinSegmentRollMs = 0L)
val log = createLog(logDir, logConfig)
assertEquals(1, log.numberOfSegments, "Log begins with a single empty segment.")
log.appendAsLeader(set, leaderEpoch = 0)
Expand All @@ -1682,6 +1682,38 @@ class LogTest {
"Log should roll after segmentMs adjusted by random jitter")
}

/**
* Test for jitter s for time based log roll with liMinSegmentRollMs not 0
*/
@Test
def testTimeBasedLogRollJitterWithMinSegmentRollMs(): Unit = {
var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
val maxJitter = 20 * 60L
// create a log
val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter,
liMinSegmentRollMs = 10 * 60 * 60L)
val log = createLog(logDir, logConfig)
assertEquals(1, log.numberOfSegments, "Log begins with a single empty segment.")
log.appendAsLeader(set, leaderEpoch = 0)

mockTime.sleep(log.config.segmentMs - maxJitter)
set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
log.appendAsLeader(set, leaderEpoch = 0)
assertEquals(1, log.numberOfSegments,
"Log does not roll on this append because it occurs earlier than max jitter")
mockTime.sleep(maxJitter - log.activeSegment.rollJitterMs + 1)
set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
log.appendAsLeader(set, leaderEpoch = 0)
assertEquals(1, log.numberOfSegments,
"Log should not roll before liMinSegmentRollMs")

mockTime.sleep(log.config.liMinSegmentRollMs)
set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
log.appendAsLeader(set, leaderEpoch = 0)
assertEquals(2, log.numberOfSegments,
"Log should roll as it passes liMinSegmentRollMs")
}

/**
* Test that appending more than the maximum segment size rolls the log
*/
Expand Down
4 changes: 3 additions & 1 deletion core/src/test/scala/unit/kafka/log/LogTestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package kafka.log
import java.io.File
import java.util.Properties
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd, LogDirFailureChannel}
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd, KafkaConfig, LogDirFailureChannel}
import kafka.utils.{Scheduler, TestUtils}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
Expand Down Expand Up @@ -48,6 +48,7 @@ object LogTestUtils {

def createLogConfig(segmentMs: Long = Defaults.SegmentMs,
segmentBytes: Int = Defaults.SegmentSize,
liMinSegmentRollMs: Long = Defaults.LiMinSegmentRollMs,
retentionMs: Long = Defaults.RetentionMs,
retentionBytes: Long = Defaults.RetentionSize,
localRetentionMs: Long = Defaults.LocalRetentionMs,
Expand Down Expand Up @@ -75,6 +76,7 @@ object LogTestUtils {
logProps.put(LogConfig.RemoteLogStorageEnableProp, remoteLogStorageEnable: java.lang.Boolean)
logProps.put(LogConfig.LocalLogRetentionMsProp, localRetentionMs: java.lang.Long)
logProps.put(LogConfig.LocalLogRetentionBytesProp, localRetentionBytes: java.lang.Long)
logProps.put(KafkaConfig.LiMinLogRollTimeMillisProp, liMinSegmentRollMs: java.lang.Long)
LogConfig(logProps)
}

Expand Down

0 comments on commit e5f46af

Please sign in to comment.