Skip to content

Commit

Permalink
colblk: separate prefixes and suffixes in index blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
jbowens committed Oct 18, 2024
1 parent 6a4e025 commit 0fda49a
Show file tree
Hide file tree
Showing 22 changed files with 843 additions and 688 deletions.
191 changes: 124 additions & 67 deletions sstable/colblk/index_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
package colblk

import (
"slices"
"bytes"
"encoding/binary"

"github.com/cockroachdb/crlib/crbytes"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/binfmt"
Expand All @@ -15,7 +17,7 @@ import (
"github.com/cockroachdb/pebble/sstable/block"
)

const indexBlockCustomHeaderSize = 0
const indexBlockCustomHeaderSize = 4

// IndexBlockWriter writes columnar index blocks. The writer is used for both
// first-level and second-level index blocks. The index block schema consists of
Expand All @@ -37,39 +39,50 @@ const indexBlockCustomHeaderSize = 0
// materialize the separator key when we need to use it outside the context of
// seeking within the block.
type IndexBlockWriter struct {
separators RawBytesBuilder
offsets UintBuilder
lengths UintBuilder
blockProperties RawBytesBuilder
rows int
enc blockEncoder
separatorPrefixes PrefixBytesBuilder
separatorSuffixes RawBytesBuilder
offsets UintBuilder
lengths UintBuilder
blockProperties RawBytesBuilder
rows int
enc blockEncoder
separatorBuf []byte
split base.Split
maxSeparatorLen int
}

const (
indexBlockColumnSeparator = iota
indexBlockColumnSeparatorPrefix = iota
indexBlockColumnSeparatorSuffix
indexBlockColumnOffsets
indexBlockColumnLengths
indexBlockColumnBlockProperties
indexBlockColumnCount
)

// Init initializes the index block writer.
func (w *IndexBlockWriter) Init() {
w.separators.Init()
func (w *IndexBlockWriter) Init(split base.Split, bundleSize int) {
w.separatorPrefixes.Init(bundleSize)
w.separatorSuffixes.Init()
w.offsets.Init()
w.lengths.Init()
w.blockProperties.Init()
w.rows = 0
w.split = split
w.maxSeparatorLen = 0
}

// Reset resets the index block writer to its initial state, retaining buffers.
func (w *IndexBlockWriter) Reset() {
w.separators.Reset()
w.separatorPrefixes.Reset()
w.separatorSuffixes.Reset()
w.offsets.Reset()
w.lengths.Reset()
w.blockProperties.Reset()
w.rows = 0
w.enc.reset()
w.separatorBuf = w.separatorBuf[:0]
w.maxSeparatorLen = 0
}

// Rows returns the number of entries in the index block so far.
Expand All @@ -85,17 +98,32 @@ func (w *IndexBlockWriter) AddBlockHandle(
separator []byte, handle block.Handle, blockProperties []byte,
) int {
idx := w.rows
w.separators.Put(separator)

// Decompose the separator into prefix and suffix.
s := w.split(separator)
commonPrefixLen := s
if idx > 0 {
commonPrefixLen = crbytes.CommonPrefix(w.separatorPrefixes.UnsafeGet(idx-1), separator[:s])
}
w.maxSeparatorLen = max(w.maxSeparatorLen, len(separator))
w.separatorPrefixes.Put(separator[:s], commonPrefixLen)
w.separatorSuffixes.Put(separator[s:])
w.offsets.Set(w.rows, handle.Offset)
w.lengths.Set(w.rows, handle.Length)
w.blockProperties.Put(blockProperties)
w.rows++
return idx
}

// UnsafeSeparator returns the separator of the i'th entry.
// UnsafeSeparator returns the separator of the i'th entry. If r is the number
// of rows, it is required that r-2 <= i < r.
func (w *IndexBlockWriter) UnsafeSeparator(i int) []byte {
return w.separators.UnsafeGet(i)
if i < w.rows-2 || i >= w.rows {
panic(errors.AssertionFailedf("UnsafeSeparator(%d); writer has %d rows", i, w.rows))
}
w.separatorBuf = append(w.separatorBuf[:0], w.separatorPrefixes.UnsafeGet(i)...)
w.separatorBuf = append(w.separatorBuf, w.separatorSuffixes.UnsafeGet(i)...)
return w.separatorBuf
}

// Size returns the size of the pending index block.
Expand All @@ -105,7 +133,8 @@ func (w *IndexBlockWriter) Size() int {

func (w *IndexBlockWriter) size(rows int) int {
off := blockHeaderSize(indexBlockColumnCount, indexBlockCustomHeaderSize)
off = w.separators.Size(rows, off)
off = w.separatorPrefixes.Size(rows, off)
off = w.separatorSuffixes.Size(rows, off)
off = w.offsets.Size(rows, off)
off = w.lengths.Size(rows, off)
off = w.blockProperties.Size(rows, off)
Expand All @@ -119,13 +148,19 @@ func (w *IndexBlockWriter) Finish(rows int) []byte {
if invariants.Enabled && rows != w.rows && rows != w.rows-1 {
panic(errors.AssertionFailedf("index block has %d rows; asked to finish %d", w.rows, rows))
}

w.enc.init(w.size(rows), Header{
Version: Version1,
Columns: indexBlockColumnCount,
Rows: uint32(rows),
}, indexBlockCustomHeaderSize)
w.enc.encode(rows, &w.separators)

// Write the max key length in the custom header.
binary.LittleEndian.PutUint32(w.enc.data()[:indexBlockCustomHeaderSize], uint32(w.maxSeparatorLen))
if w.rows == 0 {
return w.enc.finish()
}
w.enc.encode(rows, &w.separatorPrefixes)
w.enc.encode(rows, &w.separatorSuffixes)
w.enc.encode(rows, &w.offsets)
w.enc.encode(rows, &w.lengths)
w.enc.encode(rows, &w.blockProperties)
Expand All @@ -134,21 +169,27 @@ func (w *IndexBlockWriter) Finish(rows int) []byte {

// An IndexBlockDecoder reads columnar index blocks.
type IndexBlockDecoder struct {
separators RawBytes
offsets UnsafeUints
lengths UnsafeUints // only used for second-level index blocks
blockProps RawBytes
bd BlockDecoder
separatorPrefixes PrefixBytes
separatorSuffixes RawBytes
offsets UnsafeUints
lengths UnsafeUints // only used for second-level index blocks
blockProps RawBytes
bd BlockDecoder
maxSeparatorLen uint32
}

// Init initializes the index block decoder with the given serialized index
// block.
func (r *IndexBlockDecoder) Init(data []byte) {
r.bd.Init(data, indexBlockCustomHeaderSize)
r.separators = r.bd.RawBytes(indexBlockColumnSeparator)
r.offsets = r.bd.Uints(indexBlockColumnOffsets)
r.lengths = r.bd.Uints(indexBlockColumnLengths)
r.blockProps = r.bd.RawBytes(indexBlockColumnBlockProperties)
r.maxSeparatorLen = binary.LittleEndian.Uint32(data[:indexBlockCustomHeaderSize])
if r.bd.header.Rows > 0 {
r.separatorPrefixes = r.bd.PrefixBytes(indexBlockColumnSeparatorPrefix)
r.separatorSuffixes = r.bd.RawBytes(indexBlockColumnSeparatorSuffix)
r.offsets = r.bd.Uints(indexBlockColumnOffsets)
r.lengths = r.bd.Uints(indexBlockColumnLengths)
r.blockProps = r.bd.RawBytes(indexBlockColumnBlockProperties)
}
}

// DebugString prints a human-readable explanation of the keyspan block's binary
Expand All @@ -172,6 +213,7 @@ func (r *IndexBlockDecoder) Describe(f *binfmt.Formatter, tp treeprinter.Node) {
f.SetAnchorOffset()

n := tp.Child("index block header")
f.HexBytesln(4, "maximum sep length: %d", r.maxSeparatorLen)
r.bd.headerToBinFormatter(f, n)
for i := 0; i < indexBlockColumnCount; i++ {
r.bd.columnToBinFormatter(f, n, i, int(r.bd.header.Rows))
Expand All @@ -194,7 +236,7 @@ type IndexIter struct {

h block.BufferHandle
allocDecoder IndexBlockDecoder
keyBuf []byte
sepBuf PrefixBytesIter
}

// Assert that IndexIter satisfies the block.IndexBlockIterator interface.
Expand All @@ -211,11 +253,12 @@ func (i *IndexIter) InitWithDecoder(
n: int(d.bd.header.Rows),
h: i.h,
allocDecoder: i.allocDecoder,
keyBuf: i.keyBuf,
syntheticPrefix: transforms.SyntheticPrefix,
syntheticSuffix: transforms.SyntheticSuffix,
noTransforms: !transforms.SyntheticPrefix.IsSet() && !transforms.SyntheticSuffix.IsSet(),
sepBuf: PrefixBytesIter{Buf: i.sepBuf.Buf},
}
i.sepBuf.Init(int(i.d.maxSeparatorLen)+len(transforms.SyntheticPrefix)+len(transforms.SyntheticSuffix), transforms.SyntheticPrefix)
}

// Init initializes an iterator from the provided block data slice.
Expand Down Expand Up @@ -288,11 +331,11 @@ func (i *IndexIter) Handle() block.BufferHandle {
// Separator returns the separator at the iterator's current position. The
// iterator must be positioned at a valid row.
func (i *IndexIter) Separator() []byte {
key := i.d.separators.At(i.row)
if i.noTransforms {
return key
i.d.separatorPrefixes.SetAt(&i.sepBuf, i.row)
if i.syntheticSuffix.IsSet() {
return append(i.sepBuf.Buf, i.syntheticSuffix...)
}
return i.applyTransforms(key)
return append(i.sepBuf.Buf, i.d.separatorSuffixes.At(i.row)...)
}

// SeparatorLT returns true if the separator at the iterator's current
Expand All @@ -308,17 +351,6 @@ func (i *IndexIter) SeparatorGT(key []byte, inclusively bool) bool {
return cmp > 0 || (cmp == 0 && inclusively)
}

func (i *IndexIter) applyTransforms(key []byte) []byte {
if i.syntheticSuffix.IsSet() {
key = key[:i.split(key)]
}
i.keyBuf = slices.Grow(i.keyBuf[:0], len(i.syntheticPrefix)+len(key)+len(i.syntheticSuffix))
i.keyBuf = append(i.keyBuf, i.syntheticPrefix...)
i.keyBuf = append(i.keyBuf, key...)
i.keyBuf = append(i.keyBuf, i.syntheticSuffix...)
return i.keyBuf
}

// BlockHandleWithProperties decodes the block handle with any encoded
// properties at the iterator's current position.
func (i *IndexIter) BlockHandleWithProperties() (block.HandleWithProperties, error) {
Expand All @@ -335,32 +367,57 @@ func (i *IndexIter) BlockHandleWithProperties() (block.HandleWithProperties, err
// greater or equal to the given key. It returns false if the seek key is
// greater than all index block separators.
func (i *IndexIter) SeekGE(key []byte) bool {
// Define f(-1) == false and f(upper) == true.
// Invariant: f(index-1) == false, f(upper) == true.
index, upper := 0, i.n
for index < upper {
h := int(uint(index+upper) >> 1) // avoid overflow when computing h
// index ≤ h < upper

// TODO(jackson): Is Bytes.At or Bytes.Slice(Bytes.Offset(h),
// Bytes.Offset(h+1)) faster in this code?
separator := i.d.separators.At(h)
if !i.noTransforms {
// TODO(radu): compare without materializing the transformed key.
separator = i.applyTransforms(separator)
if i.n == 0 {
i.row = 0
return i.row < i.n
}
if i.syntheticPrefix.IsSet() {
var keyPrefix []byte
keyPrefix, key = splitKey(key, len(i.syntheticPrefix))
if cmp := bytes.Compare(keyPrefix, i.syntheticPrefix); cmp != 0 {
if cmp < 0 {
i.row = 0
return i.row < i.n
}
i.row = i.d.bd.Rows()
return i.row < i.n
}
}
s := i.split(key)
// Search for the separator prefix.
var eq bool
i.row, eq = i.d.separatorPrefixes.Search(key[:s])
if !eq {
return i.row < i.n
}

// The separator prefix is equal, so we need to compare against the suffix
// as well.
keySuffix := key[s:]
if i.syntheticSuffix.IsSet() {
if i.compare(i.syntheticSuffix, keySuffix) < 0 {
i.row++
}
// TODO(radu): experiment with splitting the separator prefix and suffix in
// separate columns and using bytes.Compare() on the prefix in the hot path.
c := i.compare(key, separator)
if c > 0 {
index = h + 1 // preserves f(index-1) == false
} else {
upper = h // preserves f(upper) == true
return i.row < i.n
}
if i.compare(i.d.separatorSuffixes.At(i.row), keySuffix) >= 0 {
// Fast path; we landed on a separator that is greater than or equal to
// the "transformed" seek key.
return i.row < i.n
}

// Fall back to a slow scan forward.
//
// TODO(jackson): This can be improved by adding a PrefixBytes.NextUniqueKey
// method and then binary searching among the suffixes. The logic for
// implementing NextUniqueKey is finicky requiring lots of delicate fiddling
// with offset indexes, so it's deferred for now.
for i.row++; i.row < i.d.bd.Rows(); i.row++ {
if i.SeparatorGT(key, true /* orEqual */) {
return i.row < i.n
}
}
// index == upper, f(index-1) == false, and f(upper) (= f(index)) == true => answer is index.
i.row = index
return index < i.n
return false
}

// First seeks index iterator to the first block entry. It returns false if the
Expand Down
4 changes: 2 additions & 2 deletions sstable/colblk/index_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestIndexBlock(t *testing.T) {
switch d.Cmd {
case "build":
var w IndexBlockWriter
w.Init()
w.Init(testkeys.Comparer.Split, 16)
for _, line := range strings.Split(d.Input, "\n") {
fields := strings.Fields(line)
var err error
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestIndexBlock(t *testing.T) {
// InitHandle.
func TestIndexIterInitHandle(t *testing.T) {
var w IndexBlockWriter
w.Init()
w.Init(testkeys.Comparer.Split, 16)
bh1 := block.Handle{Offset: 0, Length: 2000}
bh2 := block.Handle{Offset: 2008, Length: 1000}
w.AddBlockHandle([]byte("a"), bh1, nil)
Expand Down
Loading

0 comments on commit 0fda49a

Please sign in to comment.