Tip
|
Use DumpLogSegments tool to review the content of (the underlying files of) a log segment. |
LogSegment
is composed of two main file types, e.g. the log file itself (with records) and index files.
Note
|
When Log is created (and requested to loadSegmentFiles) orphaned index files are removed. |
The files are all in the same directory (as specified when opening a segment).
$ tree /tmp/kafka-logs/t1-1/
/tmp/kafka-logs/t1-1/
├── 00000000000000000000.index
├── 00000000000000000000.log
├── 00000000000000000000.timeindex
└── leader-epoch-checkpoint
LogSegment
is created (indirectly using LogSegment.open utility) when:
-
Log
is requested to loadSegmentFiles (for every log file), completeSwapOperations, loadSegments, recoverLog, roll, and truncateFullyAndStartAt -
LogCleaner
is requested to createNewCleanedSegment
// One of log.dirs directories
import java.nio.file.Path
val dir = Path.of("/tmp/logsegment")
import java.nio.file.Files
Files.createDirectories(dir)
import kafka.log.{LogConfig, LogSegment}
import org.apache.kafka.common.utils.Time
val config = LogConfig()
val segment = LogSegment.open(dir.toFile, baseOffset = 0, config, time = Time.SYSTEM)
scala> println(segment)
LogSegment(baseOffset=0, size=0, lastModifiedTime=1576677605692, largestTime=0)
LogSegment
uses the following configuration properties (while opening a log segment):
LogSegment
takes the following to be created:
-
Lazy OffsetIndex with deferred loading (
LazyIndex[OffsetIndex]
) -
Lazy TimeIndex with deferred loading (
LazyIndex[TimeIndex]
)
LogSegment
initializes the internal properties.
open(
dir: File,
baseOffset: Long,
config: LogConfig,
time: Time,
fileAlreadyExists: Boolean = false,
initFileSize: Int = 0,
preallocate: Boolean = false,
fileSuffix: String = ""): LogSegment
open
uses the following configuration properties (of the given LogConfig):
-
segment.index.bytes for the
maxIndexSize
open
creates a new LogSegment with the following files in the given dir
directory:
-
Creates a log file (with the given
baseOffset
, andfileSuffix
) and opens it (usingFileRecords.open
)
Note
|
|
txnIndex: TransactionIndex
When created, LogSegment
is given a TransactionIndex.
LogSegment
uses the TransactionIndex
for the following:
txnIndex
is also used when Cleaner
is requested to clean a log (and in turn cleanSegments)
TransactionIndex
is closed when LogSegment
is requested to close and closeHandlers.
TransactionIndex
is deleted (if exists) when LogSegment
is requested to deleteIfExists.
offsetIndex: OffsetIndex
When created, LogSegment
is given an OffsetIndex with deferred loading (LazyIndex[OffsetIndex]
).
offsetIndex
simply gets (unwraps) the OffsetIndex.
offsetIndex
is used for the following…FIXME
collectAbortedTxns(
fetchOffset: Long,
upperBoundOffset: Long): TxnIndexSearchResult
collectAbortedTxns
…FIXME
Note
|
collectAbortedTxns is used when Log is requested to collectAbortedTransactions.
|
close(): Unit
close
…FIXME
Note
|
close is used when Log is requested to load segments, close, and kafka-log-Log.adoc.
|
closeHandlers(): Unit
closeHandlers
…FIXME
Note
|
closeHandlers is used when Log is requested to closeHandlers.
|
recover(
producerStateManager: ProducerStateManager,
leaderEpochCache: Option[LeaderEpochFileCache] = None): Int
recover
requests the OffsetIndex, the TimeIndex, and the TransactionIndex to reset.
Note
|
recover is used when Log is requested to recover a log segment.
|
updateProducerState(
producerStateManager: ProducerStateManager,
batch: RecordBatch): Unit
updateProducerState
…FIXME
Note
|
updateProducerState is used when LogSegment is requested to recover.
|
sanityCheck(
timeIndexFileNewlyCreated: Boolean): Unit
sanityCheck
…FIXME
Note
|
sanityCheck is used exclusively when Log is requested to loadSegments (when created).
|
updateDir(
dir: File): Unit
updateDir
…FIXME
Note
|
updateDir is used exclusively when Log is requested to renameDir.
|
changeFileSuffixes(
oldSuffix: String,
newSuffix: String): Unit
changeFileSuffixes
…FIXME
Note
|
changeFileSuffixes is used when Log is requested to asyncDeleteSegment and replaceSegments.
|
flush(): Unit
flush
…FIXME
Note
|
|
deleteIfExists(): Unit
deleteIfExists
…FIXME
Note
|
deleteIfExists is used when…FIXME
|
deleteIfExists(
dir: File,
baseOffset: Long,
fileSuffix: String = ""): Unit
deleteIfExists
…FIXME
Note
|
deleteIfExists is used when…FIXME
|
resizeIndexes(
size: Int): Unit
resizeIndexes
…FIXME
Note
|
resizeIndexes is used when…FIXME
|
largestTimestamp: Long
largestTimestamp
…FIXME
Note
|
largestTimestamp is used when…FIXME
|
shouldRoll(
rollParams: RollParams): Boolean
shouldRoll
…FIXME
Note
|
shouldRoll is used exclusively when Log is requested to maybeRoll (while appending records).
|
timeWaitedForRoll(
now: Long,
messageTimestamp: Long) : Long
timeWaitedForRoll
…FIXME
Note
|
timeWaitedForRoll is used exclusively when LogSegment is requested to shouldRoll.
|
append(
largestOffset: Long,
largestTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
records: MemoryRecords): Unit
append
…FIXME
Note
|
|
appendFromFile(
records: FileRecords,
start: Int): Int
appendFromFile
…FIXME
Note
|
appendFromFile is used exclusively when Log is requested to splitOverflowedSegment.
|
appendChunkFromFile(
records: FileRecords,
position: Int,
bufferSupplier: BufferSupplier): Int
appendChunkFromFile
…FIXME
Note
|
appendChunkFromFile is used when LogSegment is requested to appendFromFile.
|
truncateTo(
offset: Long): Int
truncateTo
…FIXME
Note
|
truncateTo is used when Log is created (and in turn recoverLog) and truncateTo.
|
updateTxnIndex(
completedTxn: CompletedTxn,
lastStableOffset: Long): Unit
updateTxnIndex
…FIXME
Note
|
|
Name | Description |
---|---|
|
Time(stamp) when this Used exclusively when |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |