Log
represents a log of a partition of a topic.
Log
is a collection of LogSegments that are stored on disk in a given partition log directory with the name of the form topic-partition or topic-partition.uniqueId-delete (if marked for deletion).
While being created, Log
creates the log directory unless available already.
Log
is created when LogManager
is requested to:
-
Load a partition log (directory) (while being created which is when
KafkaServer
is requested to start up) -
Look up or create a partition log (when
Partition
is requested to getOrCreateReplica)
Log
uses .kafka_cleanshutdown file to indicate…FIXME
Log
is isFuture when…FIXME
Log
uses the file suffixes to differentiate between parts of log segments.
Suffix | Description |
---|---|
|
|
|
|
|
FileRecords log file |
|
|
|
|
|
The files of offset indices, time indices, and transaction indices are collectively called index files.
Log
uses a Scheduler to schedule the background tasks.
Name | Period | Delay | Description |
---|---|---|---|
|
Requests the ProducerStateManager to removeExpiredProducers Scheduled immediately when |
||
|
|
|
Scheduled when |
|
|
Scheduled when |
Log
uses [Log partition=[topicPartition], dir=[parent]] as the logging prefix (aka logIdent
).
Tip
|
Enable Add the following line to
Refer to Logging. |
segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]
segments
is a Java’s ConcurrentSkipListMap of LogSegments per baseOffset.
A new LogSegment
is added when Log
is requested to addSegment.
-
Cleared in loadSegments just before loadSegmentFiles
Log
is a KafkaMetricsGroup with the following performance metrics.
Metric Name | Description |
---|---|
|
|
|
|
|
|
|
The performance metrics are registered in kafka.log:type=Log group.
Log
takes the following to be created:
Log
initializes the internal properties.
While being created, Log
creates the log directory unless already available.
Log
loads segments.
Log
updateLogEndOffset.
Log
requests the LeaderEpochFileCache to remove epoch entries from the store with start offsets greater than or equal to the nextOffset
(LeaderEpochFileCache.truncateFromEnd
).
Log
computes the new start offset to be the maximum of the logStartOffset and the baseOffset
of the first LogSegment in the segments internal registry.
Log
requests the LeaderEpochFileCache to remove epoch entries from the store with offsets less than or equal to the new start offset (LeaderEpochFileCache.truncateFromStart
).
Log
throws a IllegalStateException
when the ProducerStateManager is not empty:
Producer state must be empty during log initialization
Log
loadProducerState (with the logEndOffset
and the reloadFromCleanShutdown
based on hasCleanShutdownFile).
In the end, Log
prints out the following INFO message to the logs:
Completed load of log with [size] segments, log start offset [logStartOffset] and log end offset [logEndOffset] in [time] ms
apply(
dir: File,
config: LogConfig,
logStartOffset: Long,
recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
time: Time = Time.SYSTEM,
maxProducerIdExpirationMs: Int,
producerIdExpirationCheckIntervalMs: Int,
logDirFailureChannel: LogDirFailureChannel): Log
apply
parseTopicPartitionName from the log directory.
apply
creates a new ProducerStateManager.
In the end, apply
creates a Log.
Note
|
apply is used when LogManager is requested to loadLog and look up or create a new partition log.
|
roll(
expectedNextOffset: Option[Long] = None): LogSegment
roll
…FIXME
Note
|
roll is used when Log is requested to deleteSegments and maybeRoll.
|
closeHandlers(): Unit
closeHandlers
…FIXME
Note
|
closeHandlers is used when…FIXME
|
updateHighWatermark(
hw: Long): Long
updateHighWatermark
…FIXME
Note
|
updateHighWatermark is used when…FIXME
|
addAbortedTransactions(
startOffset: Long,
segmentEntry: JEntry[JLong, LogSegment],
fetchInfo: FetchDataInfo): FetchDataInfo
addAbortedTransactions
…FIXME
Note
|
addAbortedTransactions is used when Log is requested to read records.
|
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long): List[AbortedTxn]
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long,
startingSegmentEntry: JEntry[JLong, LogSegment],
accumulator: List[AbortedTxn] => Unit): Unit
collectAbortedTransactions
…FIXME
Note
|
collectAbortedTransactions is used when Cleaner is requested to cleanSegments and buildOffsetMap.
|
maybeRoll(
messagesSize: Int,
appendInfo: LogAppendInfo): LogSegment
maybeRoll
…FIXME
Note
|
maybeRoll is used exclusively when Log is requested to append records.
|
asyncDeleteSegment(segment: LogSegment): Unit
asyncDeleteSegment
…FIXME
Note
|
asyncDeleteSegment is used when Log is requested to deleteSegment and replaceSegments.
|
flush(): Unit // (1)
flush(offset: Long): Unit
-
Uses logEndOffset for the offset (and so flushes all log segments)
flush
prints out the following DEBUG message to the logs:
Flushing log up to offset [offset], last flushed: [lastFlushTime], current time: [time], unflushed: [unflushedMessages]
flush
…FIXME
Note
|
|
deleteSeg(): Unit
deleteSeg
…FIXME
Note
|
deleteSeg is used exclusively for the delete-file Background Task.
|
appendAsLeader(
records: MemoryRecords,
leaderEpoch: Int,
isFromClient: Boolean = true): LogAppendInfo
appendAsLeader
simply appends the records with the assignOffsets
flag on.
Note
|
appendAsLeader is used exclusively when Partition is requested to appendRecordsToLeader.
|
appendAsFollower(records: MemoryRecords): LogAppendInfo
appendAsFollower
simply append (with the isFromClient
and assignOffsets
flags off, and the leaderEpoch
being -1
).
Note
|
appendAsFollower is used exclusively when Partition is requested to doAppendRecordsToFollowerOrFutureReplica.
|
append(
records: MemoryRecords,
isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion,
assignOffsets: Boolean,
leaderEpoch: Int): LogAppendInfo
append
…FIXME
Note
|
append is used when Log is requested to appendAsLeader (with assignOffsets enabled) and appendAsFollower (with assignOffsets and isFromClient disabled).
|
analyzeAndValidateRecords(
records: MemoryRecords,
isFromClient: Boolean): LogAppendInfo
analyzeAndValidateRecords
…FIXME
Note
|
analyzeAndValidateRecords is used exclusively when Log is requested to append.
|
deleteSegment(segment: LogSegment): Unit
deleteSegment
…FIXME
Note
|
deleteSegment is used when Log is requested to recoverLog, deleteSegments, roll, truncateTo, and truncateFullyAndStartAt.
|
replaceSegments(
newSegments: Seq[LogSegment],
oldSegments: Seq[LogSegment],
isRecoveredSwapFile: Boolean = false): Unit
replaceSegments
…FIXME
Note
|
|
Checking Whether .kafka_cleanshutdown Is In Parent Directory of Log Directory — hasCleanShutdownFile
Internal Method
hasCleanShutdownFile: Boolean
hasCleanShutdownFile
is true
when .kafka_cleanshutdown file is in the parent directory of the log directory. Otherwise, hasCleanShutdownFile
is false
.
Note
|
hasCleanShutdownFile is used exclusively when Log is created (to loadProducerState) and requested to recoverLog.
|
maybeIncrementLogStartOffset(
newLogStartOffset: Long): Unit
maybeIncrementLogStartOffset
…FIXME
Note
|
maybeIncrementLogStartOffset is used when…FIXME
|
truncateTo(targetOffset: Long): Boolean
truncateTo
…FIXME
Note
|
truncateTo is used when LogManager is requested to truncateTo.
|
truncateFullyAndStartAt(newOffset: Long): Unit
truncateFullyAndStartAt
…FIXME
Note
|
|
deleteOldSegments(): Long
deleteOldSegments
uses the delete flag (of the given LogConfig) to determine the scope of log deletion and returns the number of segments deleted.
Note
|
delete flag is enabled (true ) when delete cleanup policy is part of the cleanup.policy configuration property.
|
With the delete flag enabled (true
), deleteOldSegments
deleteRetentionMsBreachedSegments, deleteRetentionSizeBreachedSegments and deleteLogStartOffsetBreachedSegments.
With the delete flag disabled (false
), deleteOldSegments
merely deleteLogStartOffsetBreachedSegments.
Note
|
|
deleteOldSegments(
predicate: (LogSegment, Option[LogSegment]) => Boolean,
reason: String): Int
deleteOldSegments
finds deletable segments for the given predicate
and schedules their deletion.
If found any, deleteOldSegments
prints out the following INFO message to the logs:
Found deletable segments with base offsets [[baseOffsets]] due to [reason]
Note
|
deleteOldSegments is used when Log is requested to deleteRetentionMsBreachedSegments, deleteRetentionSizeBreachedSegments, and deleteLogStartOffsetBreachedSegments.
|
deletableSegments(
predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment]
deletableSegments
…FIXME
Note
|
deletableSegments is used exclusively when Log is requested to schedule deletion of old segments (per predicate).
|
deleteSegments(
deletable: Iterable[LogSegment]): Int
deleteSegments
roll if the number of deletable LogSegments is exactly all the segments.
For every log segment, deleteSegments
simply schedules it for deletion and maybeIncrementLogStartOffset (based on…FIXME).
Note
|
deleteSegments is used exclusively when Log is requested to schedule deletion of old segments (per predicate).
|
deleteRetentionMsBreachedSegments(): Int
deleteRetentionMsBreachedSegments
uses the retentionMs threshold (of the given LogConfig) to determine the scope of log retention.
deleteRetentionMsBreachedSegments
schedules deletion of segments with their largestTimestamp below the retentionMs threshold.
deleteRetentionMsBreachedSegments
uses the following reason:
retention time [retentionMs]ms breach
deleteRetentionMsBreachedSegments
simply returns 0
for a negative retention.ms threshold.
Note
|
deleteRetentionMsBreachedSegments is used exclusively when Log is requested to schedule deletion of old segments (log retention).
|
deleteRetentionSizeBreachedSegments(): Int
deleteRetentionSizeBreachedSegments
uses the retentionSize threshold (of the given LogConfig) to determine the scope of log retention.
Note
|
retentionSize threshold is the value of retention.bytes configuration for a topic or log.retention.bytes configuration for the cluster. |
With the retentionSize threshold negative (the default is -1L) or the size of the log below it, deleteRetentionSizeBreachedSegments
simply exits with the return value of 0
.
deleteRetentionSizeBreachedSegments
schedules deletion of records in log segments (per their size) so the log size drops below the threshold.
deleteRetentionSizeBreachedSegments
uses the following reason:
retention size in bytes [retentionSize] breach
Note
|
deleteRetentionSizeBreachedSegments is used exclusively when Log is requested to schedule deletion of old segments (log retention) (for delete cleanup policy).
|
deleteLogStartOffsetBreachedSegments(): Int
deleteLogStartOffsetBreachedSegments
…FIXME
Note
|
deleteLogStartOffsetBreachedSegments is used when…FIXME
|
splitOverflowedSegment(
segment: LogSegment): List[LogSegment]
splitOverflowedSegment
…FIXME
Note
|
|
onHighWatermarkIncremented(highWatermark: Long): Unit
onHighWatermarkIncremented
…FIXME
Note
|
onHighWatermarkIncremented is used when Replica is created and <<kafka-cluster-Replica.adoc#highWatermark_, highWatermark_⇒>.
|
parseTopicPartitionName(dir: File): TopicPartition
parseTopicPartitionName
parses the name of the given directory and creates a TopicPartition
.
parseTopicPartitionName
assumes that the name is of the form topic-partition or topic-partition.uniqueId-delete (if marked for deletion).
parseTopicPartitionName
uses all characters up to the last -
for the topic name and the rest as the partition ID.
Note
|
|
offsetFromFileName(filename: String): Long
offsetFromFileName
…FIXME
Note
|
offsetFromFileName is used when Log is requested to removeTempFilesAndCollectSwapFiles (right when created) and offsetFromFile.
|
offsetFromFile(file: File): Long
offsetFromFile
…FIXME
Note
|
offsetFromFile is used when…FIXME
|
read(
startOffset: Long,
maxLength: Int,
maxOffset: Option[Long],
minOneMessage: Boolean,
includeAbortedTxns: Boolean): FetchDataInfo
read
…FIXME
Note
|
|
convertToOffsetMetadata(
offset: Long): Option[LogOffsetMetadata]
convertToOffsetMetadata
…FIXME
Note
|
convertToOffsetMetadata is used exclusively when Replica is requested to convertHWToLocalOffsetMetadata
|
logEndOffset: Long
logEndOffset
is the offset of the next message that will be appended to the log (based on the nextOffsetMetadata internal registry).
Note
|
logEndOffset is used when…FIXME
|
addSegment(
segment: LogSegment): LogSegment
addSegment
simply associates the given LogSegment with the baseOffset in the segments internal registry.
Note
|
addSegment is used when Log is requested to replaceSegments, loadSegmentFiles, loadSegments, recoverLog, roll, and truncateFullyAndStartAt.
|
updateLogEndOffset(messageOffset: Long): Unit
updateLogEndOffset
simply creates a new LogOffsetMetadata
(with the messageOffset
, active segment) and becomes the nextOffsetMetadata internal registry.
Note
|
updateLogEndOffset is used when Log is requested to append records, roll log segment, truncateTo, and truncateFullyAndStartAt.
|
activeSegment: LogSegment
activeSegment
gives the active LogSegment that is currently taking appends (that is the greatest key in the segments internal registry).
Note
|
activeSegment is used exclusively when Log is created (to create a LogOffsetMetadata).
|
updateConfig(
updatedKeys: Set[String],
newConfig: LogConfig): Unit
If message.format.version
is among the updated keys, updateConfig
…FIXME
Note
|
|
renameDir(
name: String): Unit
renameDir
…FIXME
Note
|
renameDir is used when LogManager is requested to replaceCurrentWithFutureLog and asyncDelete.
|
logFile(
dir: File,
offset: Long,
suffix: String = ""): File
logFile
creates a prefix of the file name of a log segment (for the given offset
, the .log suffix and the optional suffix
) in the given dir
directory.
import java.nio.file.{Files, Path}
import java.io.File
val tmp = "/tmp/kafka-internals"
val p = Path.of(tmp)
val dir = if (Files.exists(p)) {
new File(tmp)
} else {
Files.createDirectory(p).toFile()
}
import kafka.log.Log
val log_file = Log.logFile(dir, offset = 10, suffix = ".suffix")
assert(log_file.getName == "00000000000000000010.log.suffix")
Note
|
|
offsetIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
offsetIndexFile
creates a prefix of the file name of a log segment (for the given offset
, the .index suffix and the optional suffix
) in the given dir
directory.
import java.nio.file.{Files, Path}
import java.io.File
val tmp = "/tmp/kafka-internals"
val p = Path.of(tmp)
val dir = if (Files.exists(p)) {
new File(tmp)
} else {
Files.createDirectory(p).toFile()
}
import kafka.log.Log
val log_file = Log.offsetIndexFile(dir, offset = 10, suffix = ".suffix")
assert(log_file.getName == "00000000000000000010.index.suffix")
Note
|
|
timeIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
timeIndexFile
creates a prefix of the file name of a log segment (for the given offset
, the .timeindex suffix and the optional suffix
) in the given dir
directory.
import java.nio.file.{Files, Path}
import java.io.File
val tmp = "/tmp/kafka-internals"
val p = Path.of(tmp)
val dir = if (Files.exists(p)) {
new File(tmp)
} else {
Files.createDirectory(p).toFile()
}
import kafka.log.Log
val log_file = Log.timeIndexFile(dir, offset = 10, suffix = ".suffix")
assert(log_file.getName == "00000000000000000010.timeindex.suffix")
Note
|
|
transactionIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
transactionIndexFile
creates a prefix of the file name of a log segment (for the given offset
, the .txnindex suffix and the optional suffix
) in the given dir
directory.
import java.nio.file.{Files, Path}
import java.io.File
val tmp = "/tmp/kafka-internals"
val p = Path.of(tmp)
val dir = if (Files.exists(p)) {
new File(tmp)
} else {
Files.createDirectory(p).toFile()
}
import kafka.log.Log
val log_file = Log.transactionIndexFile(dir, offset = 10, suffix = ".suffix")
assert(log_file.getName == "00000000000000000010.txnindex.suffix")
Note
|
|
producerSnapshotFile(
dir: File,
offset: Long): File
producerSnapshotFile
creates a prefix of the file name of a log segment (for the given offset
and the .snapshot suffix) in the given dir
directory.
import java.nio.file.{Files, Path}
import java.io.File
val tmp = "/tmp/kafka-internals"
val p = Path.of(tmp)
val dir = if (Files.exists(p)) {
new File(tmp)
} else {
Files.createDirectory(p).toFile()
}
import kafka.log.Log
val log_file = Log.producerSnapshotFile(dir, offset = 10)
assert(log_file.getName == "00000000000000000010.snapshot")
Note
|
producerSnapshotFile is used exclusively when ProducerStateManager is requested to takeSnapshot.
|
filenamePrefixFromOffset(
offset: Long): String
filenamePrefixFromOffset
uses java.text.NumberFormat to format the given offset
:
-
Minimum number of digits: 20
-
Maximum number of digits in the fraction portion of a number: 0
-
No grouping used
import kafka.log.Log
val filenamePrefix = Log.filenamePrefixFromOffset(offset = 10)
assert(filenamePrefix == "00000000000000000010")
Note
|
filenamePrefixFromOffset is used when Log utility is used to create file names for logFile, offsetIndexFile, timeIndexFile, producerSnapshotFile, and transactionIndexFile.
|
maybeAssignEpochStartOffset(
leaderEpoch: Int,
startOffset: Long): Unit
maybeAssignEpochStartOffset
…FIXME
Note
|
maybeAssignEpochStartOffset is used when…FIXME
|
size: Long
Note
|
size is used when…FIXME
|
sizeInBytes(
segments: Iterable[LogSegment]): Long
sizeInBytes
sums up the sizes of the given log segments.
Note
|
sizeInBytes is used when Log is requested for the size.
|
loadSegments(): Long
loadSegments
removeTempFilesAndCollectSwapFiles.
loadSegments
loadSegmentFiles (with retries when there are log segments with offset overflow).
loadSegments
completeSwapOperations.
loadSegments
branches off per whether the log directory is scheduled to be deleted or not.
Note
|
loadSegments is used when Log is created (to create a LogOffsetMetadata).
|
For the log directory that is not scheduled to be deleted, loadSegments
recoverLog.
loadSegments
requests the active segment to resizeIndexes (to the value of segment.index.bytes configuration property).
In the end, loadSegments
returns the next offset after recovery.
For the log directory that is scheduled to be deleted, loadSegments
adds a new log segment (with base offset 0
and initFileSize).
In the end, loadSegments
returns 0
.
removeTempFilesAndCollectSwapFiles(): Set[File]
removeTempFilesAndCollectSwapFiles
…FIXME
Note
|
removeTempFilesAndCollectSwapFiles is used exclusively when Log is requested to loadSegments (right when created).
|
loadSegmentFiles(): Unit
loadSegmentFiles
processes index and log files in the log directory (by name in ascending order).
Internally, loadSegmentFiles
finds all the files (sorted by name) in the log directory and branches off per whether a file is an index or a log file.
Note
|
loadSegmentFiles is used when Log is created (and in turn loadSegments).
|
For a log file, loadSegmentFiles
opens it and requests sanityCheck.
In case of NoSuchFileException
, loadSegmentFiles
prints out the following ERROR to the logs and recovers the segment.
Could not find offset index file corresponding to log file [path], recovering segment and rebuilding index files...
In case of CorruptIndexException
, loadSegmentFiles
prints out the following ERROR to the logs and recovers the segment.
Found a corrupted index file corresponding to log file [path] due to [message], recovering segment and rebuilding index files...
In the end, loadSegmentFiles
addSegment.
For index files, loadSegmentFiles
simply makes sure that they have corresponding .log files (in the same log directory).
If an orphaned index file is found, loadSegmentFiles
simply prints out the following WARN message and deletes the file:
Found an orphaned index file [path], with no corresponding log file.
isIndexFile(file: File): Boolean
isIndexFile
is positive (true
) when the given file has got one of the following file suffices:
Otherwise, isIndexFile
is false
.
Note
|
isIndexFile is used when Log is requested to removeTempFilesAndCollectSwapFiles and loadSegmentFiles.
|
isLogFile(file: File): Boolean
isLogFile
returns true
when the given file has .log file suffix. Otherwise, isLogFile
is false
.
Note
|
isLogFile is used when Log is requested to removeTempFilesAndCollectSwapFiles, loadSegmentFiles, and splitOverflowedSegment.
|
recoverSegment(
segment: LogSegment,
leaderEpochCache: Option[LeaderEpochFileCache] = None): Int
recoverSegment
creates a new ProducerStateManager (for the TopicPartition, log directory and maxProducerIdExpirationMs).
Note
|
Why does recoverSegment create a new ProducerStateManager rather than using the ProducerStateManager?
|
recoverSegment
then rebuildProducerState (with the baseOffset of the LogSegment, the reloadFromCleanShutdown
flag off, and the new ProducerStateManager
).
recoverSegment
requests the given LogSegment
to recover (with the new ProducerStateManager
and the optional LeaderEpochFileCache
).
recoverSegment
requests the ProducerStateManager
to takeSnapshot.
recoverSegment
returns the number of bytes truncated from the log (while doing segment recovery).
Note
|
recoverSegment is used when Log is requested to loadSegmentFiles, completeSwapOperations, and recoverLog.
|
loadProducerState(
lastOffset: Long,
reloadFromCleanShutdown: Boolean): Unit
loadProducerState
rebuildProducerState (with the lastOffset
, reloadFromCleanShutdown
and the ProducerStateManager).
In the end, loadProducerState
updateFirstUnstableOffset.
Note
|
loadProducerState is used when Log is created and requested to truncateTo.
|
rebuildProducerState(
lastOffset: Long,
reloadFromCleanShutdown: Boolean,
producerStateManager: ProducerStateManager): Unit
rebuildProducerState
…FIXME
Note
|
rebuildProducerState is used when Log is requested to recoverSegment and loadProducerState.
|
updateFirstUnstableOffset(): Unit
updateFirstUnstableOffset
…FIXME
Note
|
updateFirstUnstableOffset is used when Log is requested to loadProducerState, append, onHighWatermarkIncremented, maybeIncrementLogStartOffset, and truncateFullyAndStartAt.
|
completeSwapOperations(
swapFiles: Set[File]): Unit
completeSwapOperations
…FIXME
Note
|
completeSwapOperations is used when Log is created (and in turn loadSegments).
|
retryOnOffsetOverflow[T](fn: => T): T
retryOnOffsetOverflow
executes the fn
block and returns the result.
In case of LogSegmentOffsetOverflowException
, retryOnOffsetOverflow
prints out the following INFO message to the logs, splitOverflowedSegment and retries execution of the fn
block.
Caught segment overflow error: [message]. Split segment and retry.
Note
|
retryOnOffsetOverflow is used exclusively when Log is requested to loadSegments.
|
initializeLeaderEpochCache(): Unit
initializeLeaderEpochCache
…FIXME
Note
|
initializeLeaderEpochCache is used when Log is created and later requested to updateConfig and renameDir.
|
Name | Description |
---|---|
|
Used when:
|