From 93fcaa1cceb8b20e7bf7bb4524e2789f93b8b60a Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 1 Nov 2024 14:12:25 -0700 Subject: [PATCH] consolidate PrefixUpperbound --- storage/batch.go | 8 +++--- storage/operation/badgerimpl/iterator.go | 4 +-- storage/operation/badgerimpl/reader.go | 6 +---- storage/operation/pebbleimpl/iterator.go | 2 +- storage/operation/pebbleimpl/writer.go | 2 +- storage/operation/reads.go | 33 +++++++----------------- storage/operation/reads_test.go | 27 +++++++++---------- storage/operation/writes_test.go | 6 ++--- storage/operations.go | 33 +++++++++++++++++------- utils/unittest/unittest.go | 28 -------------------- 10 files changed, 57 insertions(+), 92 deletions(-) diff --git a/storage/batch.go b/storage/batch.go index 23b9d39ac63..bc9c4853294 100644 --- a/storage/batch.go +++ b/storage/batch.go @@ -4,16 +4,16 @@ import ( "github.com/dgraph-io/badger/v2" ) -// deprecated -// use Writer instead +// Deprecated: Transaction is being deprecated as part of the transition from Badger to Pebble. +// Use Writer instead of Transaction for all new code. type Transaction interface { Set(key, val []byte) error } -// deprecated -// use ReaderBatchWriter instead // BatchStorage serves as an abstraction over batch storage, adding ability to add ability to add extra // callbacks which fire after the batch is successfully flushed. +// Deprecated: BatchStorage is being deprecated as part of the transition from Badger to Pebble. +// Use ReaderBatchWriter instead of BatchStorage for all new code. type BatchStorage interface { GetWriter() *badger.WriteBatch diff --git a/storage/operation/badgerimpl/iterator.go b/storage/operation/badgerimpl/iterator.go index 81ecda2d719..e9f8b5dc6be 100644 --- a/storage/operation/badgerimpl/iterator.go +++ b/storage/operation/badgerimpl/iterator.go @@ -34,7 +34,7 @@ func newBadgerIterator(db *badger.DB, startPrefix, endPrefix []byte, ops storage } } -func (i *badgerIterator) SeekGE() { +func (i *badgerIterator) First() { i.iter.Seek(i.lowerBound) } @@ -44,7 +44,7 @@ func (i *badgerIterator) Valid() bool { return false } key := i.iter.Item().Key() - // "< 0" means the upperBound is exclusive + // "< 0" means "key < upperBound" valid := bytes.Compare(key, i.upperBound) < 0 return valid } diff --git a/storage/operation/badgerimpl/reader.go b/storage/operation/badgerimpl/reader.go index 06158e634ff..8d7d982d65e 100644 --- a/storage/operation/badgerimpl/reader.go +++ b/storage/operation/badgerimpl/reader.go @@ -32,11 +32,7 @@ func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) { return nil, nil, irrecoverable.NewExceptionf("could not load data: %w", err) } - var value []byte - err = item.Value(func(val []byte) error { - value = append([]byte{}, val...) - return nil - }) + value, err := item.ValueCopy(nil) if err != nil { return nil, nil, irrecoverable.NewExceptionf("could not load value: %w", err) } diff --git a/storage/operation/pebbleimpl/iterator.go b/storage/operation/pebbleimpl/iterator.go index d8453b497f5..b1b6630cc51 100644 --- a/storage/operation/pebbleimpl/iterator.go +++ b/storage/operation/pebbleimpl/iterator.go @@ -32,7 +32,7 @@ func newPebbleIterator(reader pebble.Reader, startPrefix, endPrefix []byte, ops }, nil } -func (i *pebbleIterator) SeekGE() { +func (i *pebbleIterator) First() { i.iter.First() } diff --git a/storage/operation/pebbleimpl/writer.go b/storage/operation/pebbleimpl/writer.go index ad639223209..c6ccdff06b9 100644 --- a/storage/operation/pebbleimpl/writer.go +++ b/storage/operation/pebbleimpl/writer.go @@ -74,7 +74,7 @@ func (b *ReaderBatchWriter) Delete(key []byte) error { return b.batch.Delete(key, pebble.Sync) } -// DeleteByRange deletes all keys with the given prefix defined by [startPrefix, endPrefix] (both inclusive). +// DeleteByRange deletes all keys with a prefix in the range [startPrefix, endPrefix] (both inclusive). func (b *ReaderBatchWriter) DeleteByRange(_ storage.Reader, startPrefix, endPrefix []byte) error { // DeleteRange takes the prefix range with start (inclusive) and end (exclusive, note: not inclusive). // therefore, we need to increment the endPrefix to make it inclusive. diff --git a/storage/operation/reads.go b/storage/operation/reads.go index 2e6be8dd3fe..1be299ab9d8 100644 --- a/storage/operation/reads.go +++ b/storage/operation/reads.go @@ -28,7 +28,7 @@ type CreateFunc func() interface{} type HandleFunc func() error type IterationFunc func() (CheckFunc, CreateFunc, HandleFunc) -// IterateKeysInPrefixRange will iterate over all keys in the given range [startPrefix, endPrefix] (both inclusive) +// IterateKeysInPrefixRange will iterate over all keys with prefixes in the range [startPrefix, endPrefix] (both inclusive) func IterateKeysInPrefixRange(startPrefix []byte, endPrefix []byte, check func(key []byte) error) func(storage.Reader) error { return Iterate(startPrefix, endPrefix, func() (CheckFunc, CreateFunc, HandleFunc) { return func(key []byte) (bool, error) { @@ -41,7 +41,7 @@ func IterateKeysInPrefixRange(startPrefix []byte, endPrefix []byte, check func(k }, storage.IteratorOption{IterateKeyOnly: true}) } -// Iterate will iterate over all keys in the given range [startPrefix, endPrefix] (both inclusive) +// Iterate will iterate over all keys with prefixes in the given range [startPrefix, endPrefix] (both inclusive) func Iterate(startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) func(storage.Reader) error { return func(r storage.Reader) error { @@ -64,7 +64,7 @@ func Iterate(startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt s } defer it.Close() - for it.SeekGE(); it.Valid(); it.Next() { + for it.First(); it.Valid(); it.Next() { item := it.IterItem() key := item.Key() @@ -72,6 +72,9 @@ func Iterate(startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt s check, create, handle := iterFunc() keyCopy := make([]byte, len(key)) + + // The underlying database may re-use and modify the backing memory of the returned key. + // Tor safety we proactively make a copy before passing the key to the upper layer. copy(keyCopy, key) // check if we should process the item at all @@ -112,25 +115,7 @@ func Iterate(startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt s // Traverse will iterate over all keys with the given prefix func Traverse(prefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) func(storage.Reader) error { - return Iterate(prefix, PrefixUpperBound(prefix), iterFunc, opt) -} - -// PrefixUpperBound returns a key K such that all possible keys beginning with the input prefix -// sort lower than K according to the byte-wise lexicographic key ordering used by Pebble. -// This is used to define an upper bound for iteration, when we want to iterate over -// all keys beginning with a given prefix. -// referred to https://pkg.go.dev/github.com/cockroachdb/pebble#example-Iterator-PrefixIteration -func PrefixUpperBound(prefix []byte) []byte { - end := make([]byte, len(prefix)) - copy(end, prefix) - for i := len(end) - 1; i >= 0; i-- { - // increment the bytes by 1 - end[i] = end[i] + 1 - if end[i] != 0 { - return end[:i+1] - } - } - return nil // no upper-bound + return Iterate(prefix, prefix, iterFunc, opt) } // Exists returns true if a key exists in the database. @@ -155,7 +140,7 @@ func Exists(key []byte, keyExists *bool) func(storage.Reader) error { } } -// retrieve will retrieve the binary data under the given key from the badger DB +// Retrieve will retrieve the binary data under the given key from the database // and decode it into the given entity. The provided entity needs to be a // pointer to an initialized entity of the correct type. // Error returns: @@ -196,7 +181,7 @@ func FindHighestAtOrBelow(prefix []byte, height uint64, entity interface{}) func var highestKey []byte // find highest value below the given height - for it.SeekGE(); it.Valid(); it.Next() { + for it.First(); it.Valid(); it.Next() { highestKey = it.IterItem().Key() } diff --git a/storage/operation/reads_test.go b/storage/operation/reads_test.go index e24bc15b5ae..b9addec418d 100644 --- a/storage/operation/reads_test.go +++ b/storage/operation/reads_test.go @@ -35,9 +35,9 @@ func TestIterateKeysInPrefixRange(t *testing.T) { {0x21, 0x00}, } - // Keys expected to be in the prefix range - lastNToExclude := 1 - keysInRange := keys[1 : len(keys)-lastNToExclude] // these keys are between the start and end + // The first and last keys are outside the prefix range, so we omit them + // from keysInRange, which is the set of keys we expect in the iteration + keysInRange := keys[1 : len(keys)-1] // Insert the keys into the storage require.NoError(t, withWriter(func(writer storage.Writer) error { @@ -63,20 +63,21 @@ func TestIterateKeysInPrefixRange(t *testing.T) { func TestTraverse(t *testing.T) { dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { - keys := [][]byte{ - {0x42, 0x00}, - {0xff}, - {0x42, 0x56}, - {0x00}, - {0x42, 0xff}, + keyVals := map[[2]byte]uint64{ + {0x41, 0xff}: 3, + {0x42, 0x00}: 11, + {0xff}: 13, + {0x42, 0x56}: 17, + {0x00}: 19, + {0x42, 0xff}: 23, + {0x43, 0x00}: 33, } - vals := []uint64{11, 13, 17, 19, 23} expected := []uint64{11, 23} // Insert the keys and values into storage require.NoError(t, withWriter(func(writer storage.Writer) error { - for i, key := range keys { - err := operation.Upsert(key, vals[i])(writer) + for key, val := range keyVals { + err := operation.Upsert(key[:], val)(writer) if err != nil { return err } @@ -84,7 +85,7 @@ func TestTraverse(t *testing.T) { return nil })) - actual := make([]uint64, 0, len(keys)) + actual := make([]uint64, 0, len(keyVals)) // Define the iteration logic iterationFunc := func() (operation.CheckFunc, operation.CreateFunc, operation.HandleFunc) { diff --git a/storage/operation/writes_test.go b/storage/operation/writes_test.go index aa7b5020b1a..9355b5822db 100644 --- a/storage/operation/writes_test.go +++ b/storage/operation/writes_test.go @@ -12,7 +12,6 @@ import ( "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/operation" "github.com/onflow/flow-go/storage/operation/dbtest" - "github.com/onflow/flow-go/utils/unittest" ) func TestReadWrite(t *testing.T) { @@ -230,16 +229,15 @@ func TestRemoveRange(t *testing.T) { // Remove the keys in the prefix range require.NoError(t, withWriter(operation.RemoveByPrefix(r, prefix))) - lg := unittest.Logger().With().Logger() // Verify that the keys in the prefix range have been removed for i, key := range keys { var exists bool require.NoError(t, operation.Exists(key, &exists)(r)) - lg.Info().Msgf("key %x exists: %t", key, exists) + t.Logf("key %x exists: %t", key, exists) deleted := includeStart <= i && i <= includeEnd - // deleted item should not exist + // An item that was not deleted must exist require.Equal(t, !deleted, exists, "expected key %x to be %s", key, map[bool]string{true: "deleted", false: "not deleted"}) } diff --git a/storage/operations.go b/storage/operations.go index c261d4ba28c..e0fe101f636 100644 --- a/storage/operations.go +++ b/storage/operations.go @@ -6,8 +6,8 @@ import ( // Iterator is an interface for iterating over key-value pairs in a storage backend. type Iterator interface { - // SeekGE seeks to the smallest key greater than or equal to the given key. - SeekGE() + // First seeks to the smallest key greater than or equal to the given key. + First() // Valid returns whether the iterator is positioned at a valid key-value pair. Valid() bool @@ -15,10 +15,11 @@ type Iterator interface { // Next advances the iterator to the next key-value pair. Next() - // Key returns the key of the current key-value pair, or nil if done. + // IterItem returns the current key-value pair, or nil if done. IterItem() IterItem // Close closes the iterator. Iterator must be closed, otherwise it causes memory leak. + // No errors expected during normal operation Close() error } @@ -28,6 +29,7 @@ type IterItem interface { // Value returns the value of the current key-value pair // The reason it takes a function is to follow badgerDB's API pattern + // No errors expected during normal operation Value(func(val []byte) error) error } @@ -44,6 +46,7 @@ func DefaultIteratorOptions() IteratorOption { type Reader interface { // Get gets the value for the given key. It returns ErrNotFound if the DB // does not contain the key. + // other errors are exceptions // // The caller should not modify the contents of the returned slice, but it is // safe to modify the contents of the argument after Get returns. The @@ -51,7 +54,11 @@ type Reader interface { // success, the caller MUST call closer.Close() or a memory leak will occur. Get(key []byte) (value []byte, closer io.Closer, err error) - // NewIter returns a new Iterator for the given key range [startPrefix, endPrefix], both inclusive. + // NewIter returns a new Iterator for the given key prefix range [startPrefix, endPrefix], both inclusive. + // Specifically, all keys that meet ANY of the following conditions are included in the iteration: + // - have a prefix equal to startPrefix OR + // - have a prefix equal to the endPrefix OR + // - have a prefix that is lexicographically between startPrefix and endPrefix NewIter(startPrefix, endPrefix []byte, ops IteratorOption) (Iterator, error) } @@ -61,20 +68,26 @@ type Writer interface { // for that key; a DB is not a multi-map. // // It is safe to modify the contents of the arguments after Set returns. + // No errors expected during normal operation Set(k, v []byte) error // Delete deletes the value for the given key. Deletes are blind all will // succeed even if the given key does not exist. // // It is safe to modify the contents of the arguments after Delete returns. + // No errors expected during normal operation Delete(key []byte) error // DeleteByRange removes all keys with a prefix that falls within the // range [start, end], both inclusive. + // No errors expected during normal operation DeleteByRange(globalReader Reader, startPrefix, endPrefix []byte) error } // ReaderBatchWriter is an interface for reading and writing to a storage backend. +// It is useful for performing a related sequence of reads and writes, after which you would like +// to modify some non-database state if the sequence completed successfully (via AddCallback). +// If you are not using AddCallback, avoid using ReaderBatchWriter: use Reader and Writer directly. type ReaderBatchWriter interface { // GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation"). // This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed. @@ -104,21 +117,21 @@ func OnCommitSucceed(b ReaderBatchWriter, onSuccessFn func()) { } func StartEndPrefixToLowerUpperBound(startPrefix, endPrefix []byte) (lowerBound, upperBound []byte) { - // LowerBound specifies the smallest key to iterate and it's inclusive. - // UpperBound specifies the largest key to iterate and it's exclusive (not inclusive) - // in order to match all keys prefixed with the `end` bytes, we increment the bytes of end by 1, + // Return value lowerBound specifies the smallest key to iterate and it's inclusive. + // Return value upperBound specifies the largest key to iterate and it's exclusive (not inclusive) + // in order to match all keys prefixed with `endPrefix`, we increment the bytes of `endPrefix` by 1, // for instance, to iterate keys between "hello" and "world", // we use "hello" as LowerBound, "worle" as UpperBound, so that "world", "world1", "worldffff...ffff" // will all be included. - return startPrefix, prefixUpperBound(endPrefix) + return startPrefix, PrefixUpperBound(endPrefix) } -// prefixUpperBound returns a key K such that all possible keys beginning with the input prefix +// PrefixUpperBound returns a key K such that all possible keys beginning with the input prefix // sort lower than K according to the byte-wise lexicographic key ordering used by Pebble. // This is used to define an upper bound for iteration, when we want to iterate over // all keys beginning with a given prefix. // referred to https://pkg.go.dev/github.com/cockroachdb/pebble#example-Iterator-PrefixIteration -func prefixUpperBound(prefix []byte) []byte { +func PrefixUpperBound(prefix []byte) []byte { end := make([]byte, len(prefix)) copy(end, prefix) for i := len(end) - 1; i >= 0; i-- { diff --git a/utils/unittest/unittest.go b/utils/unittest/unittest.go index d15f39cd27c..5edcd3d477e 100644 --- a/utils/unittest/unittest.go +++ b/utils/unittest/unittest.go @@ -408,34 +408,6 @@ func TypedPebbleDB(t testing.TB, dir string, create func(string, *pebble.Options return db } -type PebbleWrapper struct { - db *pebble.DB -} - -func (p *PebbleWrapper) View(fn func(pebble.Reader) error) error { - return fn(p.db) -} - -func (p *PebbleWrapper) Update(fn func(pebble.Writer) error) error { - return fn(p.db) -} - -func (p *PebbleWrapper) DB() *pebble.DB { - return p.db -} - -func RunWithWrappedPebbleDB(t testing.TB, f func(p *PebbleWrapper)) { - RunWithTempDir(t, func(dir string) { - db, err := pebble.Open(dir, &pebble.Options{}) - require.NoError(t, err) - defer func() { - assert.NoError(t, db.Close()) - }() - f(&PebbleWrapper{db}) - }) - -} - func RunWithTypedPebbleDB( t testing.TB, create func(string, *pebble.Options) (*pebble.DB, error),