Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
127519: storage: bump Pebble and update interval collector implementation r=RaduBerinde a=RaduBerinde

This change updates the interval collector implementation to match the
simplified interface in Pebble. We now have a stateless object that
just maps keys and ranges to timestamp intervals.

Pebble change:

 * [`654324f9`](cockroachdb/pebble@654324f9) sstable: improve the property collector interface

Epic: none
Release note: None

Co-authored-by: Radu Berinde <[email protected]>
  • Loading branch information
craig[bot] and RaduBerinde committed Jul 20, 2024
2 parents b341a88 + 04c6343 commit 05052df
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 169 deletions.
6 changes: 3 additions & 3 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1708,10 +1708,10 @@ def go_deps():
patches = [
"@com_github_cockroachdb_cockroach//build/patches:com_github_cockroachdb_pebble.patch",
],
sha256 = "3adab2d556f2f81e07280a0c2fef46a5fa94b3219c1850e3509e2e349b5c0022",
strip_prefix = "github.com/cockroachdb/[email protected]20240718155413-9e150e60e12c",
sha256 = "708d354be196883b6cfd1b013ec3569a39755419daa90307becb2f0a7543de68",
strip_prefix = "github.com/cockroachdb/[email protected]20240718162859-654324f90ba7",
urls = [
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20240718155413-9e150e60e12c.zip",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20240718162859-654324f90ba7.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion build/bazelutil/distdir_files.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ DISTDIR_FILES = {
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/gostdlib/com_github_cockroachdb_gostdlib-v1.19.0.zip": "c4d516bcfe8c07b6fc09b8a9a07a95065b36c2855627cb3514e40c98f872b69e",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/logtags/com_github_cockroachdb_logtags-v0.0.0-20230118201751-21c54148d20b.zip": "ca7776f47e5fecb4c495490a679036bfc29d95bd7625290cfdb9abb0baf97476",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/metamorphic/com_github_cockroachdb_metamorphic-v0.0.0-20231108215700-4ba948b56895.zip": "28c8cf42192951b69378cf537be5a9a43f2aeb35542908cc4fe5f689505853ea",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20240718155413-9e150e60e12c.zip": "3adab2d556f2f81e07280a0c2fef46a5fa94b3219c1850e3509e2e349b5c0022",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20240718162859-654324f90ba7.zip": "708d354be196883b6cfd1b013ec3569a39755419daa90307becb2f0a7543de68",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/redact/com_github_cockroachdb_redact-v1.1.5.zip": "11b30528eb0dafc8bc1a5ba39d81277c257cbe6946a7564402f588357c164560",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/returncheck/com_github_cockroachdb_returncheck-v0.0.0-20200612231554-92cdbca611dd.zip": "ce92ba4352deec995b1f2eecf16eba7f5d51f5aa245a1c362dfe24c83d31f82b",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/stress/com_github_cockroachdb_stress-v0.0.0-20220803192808-1806698b1b7b.zip": "3fda531795c600daf25532a4f98be2a1335cd1e5e182c72789bca79f5f69fcc1",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ require (
github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55
github.com/cockroachdb/gostdlib v1.19.0
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b
github.com/cockroachdb/pebble v0.0.0-20240718155413-9e150e60e12c
github.com/cockroachdb/pebble v0.0.0-20240718162859-654324f90ba7
github.com/cockroachdb/redact v1.1.5
github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd
github.com/cockroachdb/stress v0.0.0-20220803192808-1806698b1b7b
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,8 @@ github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZe
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs=
github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895 h1:XANOgPYtvELQ/h4IrmPAohXqe2pWA8Bwhejr3VQoZsA=
github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895/go.mod h1:aPd7gM9ov9M8v32Yy5NJrDyOcD8z642dqs+F0CeNXfA=
github.com/cockroachdb/pebble v0.0.0-20240718155413-9e150e60e12c h1:PvaKmnJc8C9/ARi+oMtOTahwsYdbnPMXKDdewJnGCz0=
github.com/cockroachdb/pebble v0.0.0-20240718155413-9e150e60e12c/go.mod h1:KEqs35rpPXdUg4kfWzjBLWXaxt9mjm9P3UR64K3KsJo=
github.com/cockroachdb/pebble v0.0.0-20240718162859-654324f90ba7 h1:RoAJkOmI4TPJUSm2AjEJflbXs6dIjww5Ca5Bp/NoYHo=
github.com/cockroachdb/pebble v0.0.0-20240718162859-654324f90ba7/go.mod h1:KEqs35rpPXdUg4kfWzjBLWXaxt9mjm9P3UR64K3KsJo=
github.com/cockroachdb/redact v1.1.3/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30=
github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
Expand Down
158 changes: 34 additions & 124 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,100 +655,67 @@ var MVCCMerger = &pebble.Merger{
},
}

var _ sstable.BlockIntervalSyntheticReplacer = MVCCBlockIntervalSyntheticReplacer{}
var _ sstable.BlockIntervalSuffixReplacer = MVCCBlockIntervalSuffixReplacer{}

type MVCCBlockIntervalSyntheticReplacer struct{}
type MVCCBlockIntervalSuffixReplacer struct{}

func (mbsr MVCCBlockIntervalSyntheticReplacer) AdjustIntervalWithSyntheticSuffix(
lower uint64, upper uint64, suffix []byte,
) (adjustedLower uint64, adjustedUpper uint64, err error) {
synthDecoded, err := DecodeMVCCTimestampSuffix(suffix)
func (MVCCBlockIntervalSuffixReplacer) ApplySuffixReplacement(
interval sstable.BlockInterval, newSuffix []byte,
) (sstable.BlockInterval, error) {
synthDecoded, err := DecodeMVCCTimestampSuffix(newSuffix)
if err != nil {
return 0, 0, errors.AssertionFailedf("could not decode synthetic suffix")
return sstable.BlockInterval{}, errors.AssertionFailedf("could not decode synthetic suffix")
}
synthDecodedWalltime := uint64(synthDecoded.WallTime)
if upper >= synthDecodedWalltime {
return 0, 0, errors.AssertionFailedf("the synthetic suffix %d is less than or equal to the original upper bound %d", synthDecoded, upper)
}
// The returned bound includes the synthetic suffix, regardless of its logical
// component.
return synthDecodedWalltime, synthDecodedWalltime + 1, nil
return sstable.BlockInterval{Lower: synthDecodedWalltime, Upper: synthDecodedWalltime + 1}, nil
}

// pebbleDataBlockMVCCTimeIntervalPointCollector implements
// pebble.DataBlockIntervalCollector for point keys.
type pebbleDataBlockMVCCTimeIntervalPointCollector struct {
pebbleDataBlockMVCCTimeIntervalCollector
}
type pebbleIntervalMapper struct{}

var _ sstable.DataBlockIntervalCollector = (*pebbleDataBlockMVCCTimeIntervalPointCollector)(nil)
var _ sstable.IntervalMapper = pebbleIntervalMapper{}

func (tc *pebbleDataBlockMVCCTimeIntervalPointCollector) Add(
key pebble.InternalKey, _ []byte,
) error {
return tc.add(key.UserKey)
}

// pebbleDataBlockMVCCTimeIntervalRangeCollector implements
// pebble.DataBlockIntervalCollector for range keys.
type pebbleDataBlockMVCCTimeIntervalRangeCollector struct {
pebbleDataBlockMVCCTimeIntervalCollector
// MapPointKey is part of the sstable.IntervalMapper interface.
func (pebbleIntervalMapper) MapPointKey(
key pebble.InternalKey, value []byte,
) (sstable.BlockInterval, error) {
return mapSuffixToInterval(key.UserKey)
}

var _ sstable.DataBlockIntervalCollector = (*pebbleDataBlockMVCCTimeIntervalRangeCollector)(nil)

func (tc *pebbleDataBlockMVCCTimeIntervalRangeCollector) Add(
key pebble.InternalKey, value []byte,
) error {
// TODO(erikgrinaker): should reuse a buffer for keysDst, but keyspan.Key is
// not exported by Pebble.
span, err := rangekey.Decode(key, value, nil)
if err != nil {
return errors.Wrapf(err, "decoding range key at %s", key)
}
// MapRangeKey is part of the sstable.IntervalMapper interface.
func (pebbleIntervalMapper) MapRangeKeys(span sstable.Span) (sstable.BlockInterval, error) {
var res sstable.BlockInterval
for _, k := range span.Keys {
if err := tc.add(k.Suffix); err != nil {
return errors.Wrapf(err, "recording suffix %x for range key at %s", k.Suffix, key)
i, err := mapSuffixToInterval(k.Suffix)
if err != nil {
return sstable.BlockInterval{}, err
}
res.UnionWith(i)
}
return nil
}

// pebbleDataBlockMVCCTimeIntervalCollector is a helper for a
// pebble.DataBlockIntervalCollector that is used to construct a
// pebble.BlockPropertyCollector. This provides per-block filtering, which
// also gets aggregated to the sstable-level and filters out sstables. It must
// only be used for MVCCKeyIterKind iterators, since it will ignore
// blocks/sstables that contain intents (and any other key that is not a real
// MVCC key).
//
// This is wrapped by structs for point or range key collection, which actually
// implement pebble.DataBlockIntervalCollector.
type pebbleDataBlockMVCCTimeIntervalCollector struct {
// min, max are the encoded timestamps.
min, max []byte
return res, nil
}

// add collects the given slice in the collector. The slice may be an entire
// encoded MVCC key, or the bare suffix of an encoded key.
func (tc *pebbleDataBlockMVCCTimeIntervalCollector) add(b []byte) error {
// mapSuffixToInterval maps the suffix of a key to a timestamp interval.
// The buffer can be an entire key or just the suffix.
func mapSuffixToInterval(b []byte) (sstable.BlockInterval, error) {
if len(b) == 0 {
return nil
return sstable.BlockInterval{}, nil
}
// Last byte is the version length + 1 when there is a version,
// else it is 0.
versionLen := int(b[len(b)-1])
if versionLen == 0 {
// This is not an MVCC key that we can collect.
return nil
return sstable.BlockInterval{}, nil
}
// prefixPartEnd points to the sentinel byte, unless this is a bare suffix, in
// which case the index is -1.
prefixPartEnd := len(b) - 1 - versionLen
// Sanity check: the index should be >= -1. Additionally, if the index is >=
// 0, it should point to the sentinel byte, as this is a full EngineKey.
if prefixPartEnd < -1 || (prefixPartEnd >= 0 && b[prefixPartEnd] != sentinel) {
return errors.Errorf("invalid key %s", roachpb.Key(b).String())
return sstable.BlockInterval{}, errors.Errorf("invalid key %s", roachpb.Key(b).String())
}
// We don't need the last byte (the version length).
versionLen--
Expand All @@ -758,67 +725,10 @@ func (tc *pebbleDataBlockMVCCTimeIntervalCollector) add(b []byte) error {
versionLen == engineKeyVersionWallLogicalAndSyntheticTimeLen {
// INVARIANT: -1 <= prefixPartEnd < len(b) - 1.
// Version consists of the bytes after the sentinel and before the length.
b = b[prefixPartEnd+1 : len(b)-1]
// Lexicographic comparison on the encoded timestamps is equivalent to the
// comparison on decoded timestamps, so delay decoding.
if len(tc.min) == 0 || bytes.Compare(b, tc.min) < 0 {
tc.min = append(tc.min[:0], b...)
}
if len(tc.max) == 0 || bytes.Compare(b, tc.max) > 0 {
tc.max = append(tc.max[:0], b...)
}
ts := binary.BigEndian.Uint64(b[prefixPartEnd+1:])
return sstable.BlockInterval{Lower: ts, Upper: ts + 1}, nil
}
return nil
}

func decodeWallTime(ts []byte) uint64 {
return binary.BigEndian.Uint64(ts[0:engineKeyVersionWallTimeLen])
}

// FinishDataBlock is part of the sstable.DataBlockIntervalCollector interface.
func (tc *pebbleDataBlockMVCCTimeIntervalCollector) FinishDataBlock() (
lower uint64,
upper uint64,
err error,
) {
if len(tc.min) == 0 {
// No calls to Add that contained a timestamped key.
return 0, 0, nil
}
// Construct a [lower, upper) walltime that will contain all the
// hlc.Timestamps in this block.
lower = decodeWallTime(tc.min)
// Remember that we have to reset tc.min and tc.max to get ready for the
// next data block, as specified in the DataBlockIntervalCollector interface
// help and help too.
tc.min = tc.min[:0]
// The actual value encoded into walltime is an int64, so +1 will not
// overflow.
//
// Note that the timestamp with a wall time tc.max+1 and no logical component is
// a valid exclusive upper bound for timestamps with wall time tc.max and any
// logical component.
upper = decodeWallTime(tc.max) + 1
tc.max = tc.max[:0]
if lower >= upper {
return 0, 0,
errors.Errorf("corrupt timestamps lower %d >= upper %d", lower, upper)
}
return lower, upper, nil
}

// AddCollectedWithSuffixReplacement is part of the
// sstable.DataBlockIntervalCollector interface.
func (tc *pebbleDataBlockMVCCTimeIntervalCollector) AddCollectedWithSuffixReplacement(
oldLower, oldUpper uint64, oldSuffix, newSuffix []byte,
) error {
return tc.add(newSuffix)
}

// SupportsSuffixReplacement is part of the sstable.DataBlockIntervalCollector
// interface.
func (tc *pebbleDataBlockMVCCTimeIntervalCollector) SupportsSuffixReplacement() bool {
return true
return sstable.BlockInterval{}, nil
}

const mvccWallTimeIntervalCollector = "MVCCTimeInterval"
Expand Down Expand Up @@ -850,8 +760,8 @@ var PebbleBlockPropertyCollectors = []func() pebble.BlockPropertyCollector{
func() pebble.BlockPropertyCollector {
return sstable.NewBlockIntervalCollector(
mvccWallTimeIntervalCollector,
&pebbleDataBlockMVCCTimeIntervalPointCollector{},
&pebbleDataBlockMVCCTimeIntervalRangeCollector{},
pebbleIntervalMapper{},
MVCCBlockIntervalSuffixReplacer{},
)
},
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/pebble_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (p *pebbleIterator) setOptions(
p.rangeKeyMaskingBuf = encodeMVCCTimestampSuffixToBuf(
p.rangeKeyMaskingBuf, opts.RangeKeyMaskingBelow)
p.options.RangeKeyMasking.Suffix = p.rangeKeyMaskingBuf
p.maskFilter.BlockIntervalFilter.Init(mvccWallTimeIntervalCollector, 0, math.MaxUint64, MVCCBlockIntervalSyntheticReplacer{})
p.maskFilter.BlockIntervalFilter.Init(mvccWallTimeIntervalCollector, 0, math.MaxUint64, MVCCBlockIntervalSuffixReplacer{})
p.options.RangeKeyMasking.Filter = p.getBlockPropertyFilterMask
}

Expand Down Expand Up @@ -318,7 +318,7 @@ func (p *pebbleIterator) setOptions(
sstable.NewBlockIntervalFilter(mvccWallTimeIntervalCollector,
uint64(opts.MinTimestamp.WallTime),
uint64(opts.MaxTimestamp.WallTime)+1,
MVCCBlockIntervalSyntheticReplacer{},
MVCCBlockIntervalSuffixReplacer{},
),
}
p.options.PointKeyFilters = pkf[:1:2]
Expand Down
86 changes: 50 additions & 36 deletions pkg/storage/pebble_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -583,51 +584,64 @@ func (fs *errorFS) Create(name string, category vfs.DiskWriteCategory) (vfs.File
return fs.FS.Create(name, category)
}

func TestPebbleMVCCTimeIntervalCollector(t *testing.T) {
func TestPebbleMVCCIntervalMapper(t *testing.T) {
defer leaktest.AfterTest(t)()

m := pebbleIntervalMapper{}
aKey := roachpb.Key("a")
collector := &pebbleDataBlockMVCCTimeIntervalPointCollector{}
finishAndCheck := func(lower, upper uint64) {
l, u, err := collector.FinishDataBlock()
uuid := uuid.Must(uuid.FromString("6ba7b810-9dad-11d1-80b4-00c04fd430c8"))

for _, tc := range []struct {
userKey []byte
expected sstable.BlockInterval
}{
{
userKey: func() []byte {
ek, _ := LockTableKey{aKey, lock.Intent, uuid}.ToEngineKey(nil)
return ek.Encode()
}(),
// Lock keys are not MVCC keys.
expected: sstable.BlockInterval{},
},
{
userKey: EncodeMVCCKey(MVCCKey{aKey, hlc.Timestamp{WallTime: 2, Logical: 1}}),
expected: sstable.BlockInterval{Lower: 2, Upper: 3},
},
{
userKey: EncodeMVCCKey(MVCCKey{aKey, hlc.Timestamp{WallTime: 22, Logical: 1}}),
expected: sstable.BlockInterval{Lower: 22, Upper: 23},
},
{
userKey: EncodeMVCCKey(MVCCKey{aKey, hlc.Timestamp{WallTime: 25}}),
expected: sstable.BlockInterval{Lower: 25, Upper: 26},
},
} {
i, err := m.MapPointKey(sstable.InternalKey{UserKey: tc.userKey}, nil)
require.NoError(t, err)
require.Equal(t, lower, l)
require.Equal(t, upper, u)
require.Equal(t, tc.expected, i)
}
// Nothing added.
finishAndCheck(0, 0)
uuid := uuid.Must(uuid.FromString("6ba7b810-9dad-11d1-80b4-00c04fd430c8"))
ek, _ := LockTableKey{aKey, lock.Intent, uuid}.ToEngineKey(nil)
require.NoError(t, collector.Add(pebble.InternalKey{UserKey: ek.Encode()}, []byte("foo")))
// The added key was not an MVCCKey.
finishAndCheck(0, 0)
require.NoError(t, collector.Add(pebble.InternalKey{
UserKey: EncodeMVCCKey(MVCCKey{aKey, hlc.Timestamp{WallTime: 2, Logical: 1}})},
[]byte("foo")))
// Added 1 MVCCKey which sets both the upper and lower bound.
finishAndCheck(2, 3)
require.NoError(t, collector.Add(pebble.InternalKey{
UserKey: EncodeMVCCKey(MVCCKey{aKey, hlc.Timestamp{WallTime: 22, Logical: 1}})},
[]byte("foo")))
require.NoError(t, collector.Add(pebble.InternalKey{
UserKey: EncodeMVCCKey(MVCCKey{aKey, hlc.Timestamp{WallTime: 25, Logical: 1}})},
[]byte("foo")))
// Added 2 MVCCKeys.
finishAndCheck(22, 26)
// Using the same suffix for all keys in a block results in an interval of
// width one (inclusive lower bound to exclusive upper bound).
suffix := EncodeMVCCTimestampSuffix(hlc.Timestamp{WallTime: 42, Logical: 1})
require.NoError(t, collector.AddCollectedWithSuffixReplacement(0, 0, nil, suffix))
finishAndCheck(42, 43)
// An invalid key results in an error.
// Case 1: malformed sentinel.
// An invalid key (malformed sentinel) results in an error.
key := EncodeMVCCKey(MVCCKey{aKey, hlc.Timestamp{WallTime: 2, Logical: 1}})
sentinelPos := len(key) - 1 - int(key[len(key)-1])
key[sentinelPos] = '\xff'
require.Error(t, collector.AddCollectedWithSuffixReplacement(0, 0, nil, key))
// Case 2: malformed bare suffix (too short).
_, err := m.MapPointKey(sstable.InternalKey{UserKey: key}, nil)
require.Error(t, err)
}

func TestPebbleMVCCBlockIntervalSuffixReplacer(t *testing.T) {
defer leaktest.AfterTest(t)()

r := MVCCBlockIntervalSuffixReplacer{}
suffix := EncodeMVCCTimestampSuffix(hlc.Timestamp{WallTime: 42, Logical: 1})
before := sstable.BlockInterval{Lower: 10, Upper: 15}
after, err := r.ApplySuffixReplacement(before, suffix)
require.NoError(t, err)
require.Equal(t, sstable.BlockInterval{Lower: 42, Upper: 43}, after)

// An invalid suffix (too short) results in an error.
suffix = EncodeMVCCTimestampSuffix(hlc.Timestamp{WallTime: 42, Logical: 1})[1:]
require.Error(t, collector.AddCollectedWithSuffixReplacement(0, 0, nil, suffix))
_, err = r.ApplySuffixReplacement(sstable.BlockInterval{Lower: 1, Upper: 2}, suffix)
require.Error(t, err)
}

// TestPebbleMVCCTimeIntervalCollectorAndFilter tests that point and range key
Expand Down

0 comments on commit 05052df

Please sign in to comment.