Skip to content

Commit

Permalink
consolidate PrefixUpperbound
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Nov 1, 2024
1 parent 7921fbd commit 93fcaa1
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 92 deletions.
8 changes: 4 additions & 4 deletions storage/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ import (
"github.com/dgraph-io/badger/v2"
)

// deprecated
// use Writer instead
// Deprecated: Transaction is being deprecated as part of the transition from Badger to Pebble.
// Use Writer instead of Transaction for all new code.
type Transaction interface {
Set(key, val []byte) error
}

// deprecated
// use ReaderBatchWriter instead
// BatchStorage serves as an abstraction over batch storage, adding ability to add ability to add extra
// callbacks which fire after the batch is successfully flushed.
// Deprecated: BatchStorage is being deprecated as part of the transition from Badger to Pebble.
// Use ReaderBatchWriter instead of BatchStorage for all new code.
type BatchStorage interface {
GetWriter() *badger.WriteBatch

Expand Down
4 changes: 2 additions & 2 deletions storage/operation/badgerimpl/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func newBadgerIterator(db *badger.DB, startPrefix, endPrefix []byte, ops storage
}
}

func (i *badgerIterator) SeekGE() {
func (i *badgerIterator) First() {
i.iter.Seek(i.lowerBound)
}

Expand All @@ -44,7 +44,7 @@ func (i *badgerIterator) Valid() bool {
return false
}
key := i.iter.Item().Key()
// "< 0" means the upperBound is exclusive
// "< 0" means "key < upperBound"
valid := bytes.Compare(key, i.upperBound) < 0
return valid
}
Expand Down
6 changes: 1 addition & 5 deletions storage/operation/badgerimpl/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@ func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) {
return nil, nil, irrecoverable.NewExceptionf("could not load data: %w", err)
}

var value []byte
err = item.Value(func(val []byte) error {
value = append([]byte{}, val...)
return nil
})
value, err := item.ValueCopy(nil)
if err != nil {
return nil, nil, irrecoverable.NewExceptionf("could not load value: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion storage/operation/pebbleimpl/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func newPebbleIterator(reader pebble.Reader, startPrefix, endPrefix []byte, ops
}, nil
}

func (i *pebbleIterator) SeekGE() {
func (i *pebbleIterator) First() {
i.iter.First()
}

Expand Down
2 changes: 1 addition & 1 deletion storage/operation/pebbleimpl/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (b *ReaderBatchWriter) Delete(key []byte) error {
return b.batch.Delete(key, pebble.Sync)
}

// DeleteByRange deletes all keys with the given prefix defined by [startPrefix, endPrefix] (both inclusive).
// DeleteByRange deletes all keys with a prefix in the range [startPrefix, endPrefix] (both inclusive).
func (b *ReaderBatchWriter) DeleteByRange(_ storage.Reader, startPrefix, endPrefix []byte) error {
// DeleteRange takes the prefix range with start (inclusive) and end (exclusive, note: not inclusive).
// therefore, we need to increment the endPrefix to make it inclusive.
Expand Down
33 changes: 9 additions & 24 deletions storage/operation/reads.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type CreateFunc func() interface{}
type HandleFunc func() error
type IterationFunc func() (CheckFunc, CreateFunc, HandleFunc)

// IterateKeysInPrefixRange will iterate over all keys in the given range [startPrefix, endPrefix] (both inclusive)
// IterateKeysInPrefixRange will iterate over all keys with prefixes in the range [startPrefix, endPrefix] (both inclusive)
func IterateKeysInPrefixRange(startPrefix []byte, endPrefix []byte, check func(key []byte) error) func(storage.Reader) error {
return Iterate(startPrefix, endPrefix, func() (CheckFunc, CreateFunc, HandleFunc) {
return func(key []byte) (bool, error) {
Expand All @@ -41,7 +41,7 @@ func IterateKeysInPrefixRange(startPrefix []byte, endPrefix []byte, check func(k
}, storage.IteratorOption{IterateKeyOnly: true})
}

// Iterate will iterate over all keys in the given range [startPrefix, endPrefix] (both inclusive)
// Iterate will iterate over all keys with prefixes in the given range [startPrefix, endPrefix] (both inclusive)
func Iterate(startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) func(storage.Reader) error {
return func(r storage.Reader) error {

Expand All @@ -64,14 +64,17 @@ func Iterate(startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt s
}
defer it.Close()

for it.SeekGE(); it.Valid(); it.Next() {
for it.First(); it.Valid(); it.Next() {
item := it.IterItem()
key := item.Key()

// initialize processing functions for iteration
check, create, handle := iterFunc()

keyCopy := make([]byte, len(key))

// The underlying database may re-use and modify the backing memory of the returned key.
// Tor safety we proactively make a copy before passing the key to the upper layer.
copy(keyCopy, key)

// check if we should process the item at all
Expand Down Expand Up @@ -112,25 +115,7 @@ func Iterate(startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt s

// Traverse will iterate over all keys with the given prefix
func Traverse(prefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) func(storage.Reader) error {
return Iterate(prefix, PrefixUpperBound(prefix), iterFunc, opt)
}

// PrefixUpperBound returns a key K such that all possible keys beginning with the input prefix
// sort lower than K according to the byte-wise lexicographic key ordering used by Pebble.
// This is used to define an upper bound for iteration, when we want to iterate over
// all keys beginning with a given prefix.
// referred to https://pkg.go.dev/github.com/cockroachdb/pebble#example-Iterator-PrefixIteration
func PrefixUpperBound(prefix []byte) []byte {
end := make([]byte, len(prefix))
copy(end, prefix)
for i := len(end) - 1; i >= 0; i-- {
// increment the bytes by 1
end[i] = end[i] + 1
if end[i] != 0 {
return end[:i+1]
}
}
return nil // no upper-bound
return Iterate(prefix, prefix, iterFunc, opt)
}

// Exists returns true if a key exists in the database.
Expand All @@ -155,7 +140,7 @@ func Exists(key []byte, keyExists *bool) func(storage.Reader) error {
}
}

// retrieve will retrieve the binary data under the given key from the badger DB
// Retrieve will retrieve the binary data under the given key from the database
// and decode it into the given entity. The provided entity needs to be a
// pointer to an initialized entity of the correct type.
// Error returns:
Expand Down Expand Up @@ -196,7 +181,7 @@ func FindHighestAtOrBelow(prefix []byte, height uint64, entity interface{}) func

var highestKey []byte
// find highest value below the given height
for it.SeekGE(); it.Valid(); it.Next() {
for it.First(); it.Valid(); it.Next() {
highestKey = it.IterItem().Key()
}

Expand Down
27 changes: 14 additions & 13 deletions storage/operation/reads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ func TestIterateKeysInPrefixRange(t *testing.T) {
{0x21, 0x00},
}

// Keys expected to be in the prefix range
lastNToExclude := 1
keysInRange := keys[1 : len(keys)-lastNToExclude] // these keys are between the start and end
// The first and last keys are outside the prefix range, so we omit them
// from keysInRange, which is the set of keys we expect in the iteration
keysInRange := keys[1 : len(keys)-1]

// Insert the keys into the storage
require.NoError(t, withWriter(func(writer storage.Writer) error {
Expand All @@ -63,28 +63,29 @@ func TestIterateKeysInPrefixRange(t *testing.T) {

func TestTraverse(t *testing.T) {
dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) {
keys := [][]byte{
{0x42, 0x00},
{0xff},
{0x42, 0x56},
{0x00},
{0x42, 0xff},
keyVals := map[[2]byte]uint64{
{0x41, 0xff}: 3,
{0x42, 0x00}: 11,
{0xff}: 13,
{0x42, 0x56}: 17,
{0x00}: 19,
{0x42, 0xff}: 23,
{0x43, 0x00}: 33,
}
vals := []uint64{11, 13, 17, 19, 23}
expected := []uint64{11, 23}

// Insert the keys and values into storage
require.NoError(t, withWriter(func(writer storage.Writer) error {
for i, key := range keys {
err := operation.Upsert(key, vals[i])(writer)
for key, val := range keyVals {
err := operation.Upsert(key[:], val)(writer)
if err != nil {
return err
}
}
return nil
}))

actual := make([]uint64, 0, len(keys))
actual := make([]uint64, 0, len(keyVals))

// Define the iteration logic
iterationFunc := func() (operation.CheckFunc, operation.CreateFunc, operation.HandleFunc) {
Expand Down
6 changes: 2 additions & 4 deletions storage/operation/writes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/operation"
"github.com/onflow/flow-go/storage/operation/dbtest"
"github.com/onflow/flow-go/utils/unittest"
)

func TestReadWrite(t *testing.T) {
Expand Down Expand Up @@ -230,16 +229,15 @@ func TestRemoveRange(t *testing.T) {
// Remove the keys in the prefix range
require.NoError(t, withWriter(operation.RemoveByPrefix(r, prefix)))

lg := unittest.Logger().With().Logger()
// Verify that the keys in the prefix range have been removed
for i, key := range keys {
var exists bool
require.NoError(t, operation.Exists(key, &exists)(r))
lg.Info().Msgf("key %x exists: %t", key, exists)
t.Logf("key %x exists: %t", key, exists)

deleted := includeStart <= i && i <= includeEnd

// deleted item should not exist
// An item that was not deleted must exist
require.Equal(t, !deleted, exists,
"expected key %x to be %s", key, map[bool]string{true: "deleted", false: "not deleted"})
}
Expand Down
33 changes: 23 additions & 10 deletions storage/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ import (

// Iterator is an interface for iterating over key-value pairs in a storage backend.
type Iterator interface {
// SeekGE seeks to the smallest key greater than or equal to the given key.
SeekGE()
// First seeks to the smallest key greater than or equal to the given key.
First()

// Valid returns whether the iterator is positioned at a valid key-value pair.
Valid() bool

// Next advances the iterator to the next key-value pair.
Next()

// Key returns the key of the current key-value pair, or nil if done.
// IterItem returns the current key-value pair, or nil if done.
IterItem() IterItem

// Close closes the iterator. Iterator must be closed, otherwise it causes memory leak.
// No errors expected during normal operation
Close() error
}

Expand All @@ -28,6 +29,7 @@ type IterItem interface {

// Value returns the value of the current key-value pair
// The reason it takes a function is to follow badgerDB's API pattern
// No errors expected during normal operation
Value(func(val []byte) error) error
}

Expand All @@ -44,14 +46,19 @@ func DefaultIteratorOptions() IteratorOption {
type Reader interface {
// Get gets the value for the given key. It returns ErrNotFound if the DB
// does not contain the key.
// other errors are exceptions
//
// The caller should not modify the contents of the returned slice, but it is
// safe to modify the contents of the argument after Get returns. The
// returned slice will remain valid until the returned Closer is closed. On
// success, the caller MUST call closer.Close() or a memory leak will occur.
Get(key []byte) (value []byte, closer io.Closer, err error)

// NewIter returns a new Iterator for the given key range [startPrefix, endPrefix], both inclusive.
// NewIter returns a new Iterator for the given key prefix range [startPrefix, endPrefix], both inclusive.
// Specifically, all keys that meet ANY of the following conditions are included in the iteration:
// - have a prefix equal to startPrefix OR
// - have a prefix equal to the endPrefix OR
// - have a prefix that is lexicographically between startPrefix and endPrefix
NewIter(startPrefix, endPrefix []byte, ops IteratorOption) (Iterator, error)
}

Expand All @@ -61,20 +68,26 @@ type Writer interface {
// for that key; a DB is not a multi-map.
//
// It is safe to modify the contents of the arguments after Set returns.
// No errors expected during normal operation
Set(k, v []byte) error

// Delete deletes the value for the given key. Deletes are blind all will
// succeed even if the given key does not exist.
//
// It is safe to modify the contents of the arguments after Delete returns.
// No errors expected during normal operation
Delete(key []byte) error

// DeleteByRange removes all keys with a prefix that falls within the
// range [start, end], both inclusive.
// No errors expected during normal operation
DeleteByRange(globalReader Reader, startPrefix, endPrefix []byte) error
}

// ReaderBatchWriter is an interface for reading and writing to a storage backend.
// It is useful for performing a related sequence of reads and writes, after which you would like
// to modify some non-database state if the sequence completed successfully (via AddCallback).
// If you are not using AddCallback, avoid using ReaderBatchWriter: use Reader and Writer directly.
type ReaderBatchWriter interface {
// GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation").
// This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed.
Expand Down Expand Up @@ -104,21 +117,21 @@ func OnCommitSucceed(b ReaderBatchWriter, onSuccessFn func()) {
}

func StartEndPrefixToLowerUpperBound(startPrefix, endPrefix []byte) (lowerBound, upperBound []byte) {
// LowerBound specifies the smallest key to iterate and it's inclusive.
// UpperBound specifies the largest key to iterate and it's exclusive (not inclusive)
// in order to match all keys prefixed with the `end` bytes, we increment the bytes of end by 1,
// Return value lowerBound specifies the smallest key to iterate and it's inclusive.
// Return value upperBound specifies the largest key to iterate and it's exclusive (not inclusive)
// in order to match all keys prefixed with `endPrefix`, we increment the bytes of `endPrefix` by 1,
// for instance, to iterate keys between "hello" and "world",
// we use "hello" as LowerBound, "worle" as UpperBound, so that "world", "world1", "worldffff...ffff"
// will all be included.
return startPrefix, prefixUpperBound(endPrefix)
return startPrefix, PrefixUpperBound(endPrefix)
}

// prefixUpperBound returns a key K such that all possible keys beginning with the input prefix
// PrefixUpperBound returns a key K such that all possible keys beginning with the input prefix
// sort lower than K according to the byte-wise lexicographic key ordering used by Pebble.
// This is used to define an upper bound for iteration, when we want to iterate over
// all keys beginning with a given prefix.
// referred to https://pkg.go.dev/github.com/cockroachdb/pebble#example-Iterator-PrefixIteration
func prefixUpperBound(prefix []byte) []byte {
func PrefixUpperBound(prefix []byte) []byte {
end := make([]byte, len(prefix))
copy(end, prefix)
for i := len(end) - 1; i >= 0; i-- {
Expand Down
28 changes: 0 additions & 28 deletions utils/unittest/unittest.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,34 +408,6 @@ func TypedPebbleDB(t testing.TB, dir string, create func(string, *pebble.Options
return db
}

type PebbleWrapper struct {
db *pebble.DB
}

func (p *PebbleWrapper) View(fn func(pebble.Reader) error) error {
return fn(p.db)
}

func (p *PebbleWrapper) Update(fn func(pebble.Writer) error) error {
return fn(p.db)
}

func (p *PebbleWrapper) DB() *pebble.DB {
return p.db
}

func RunWithWrappedPebbleDB(t testing.TB, f func(p *PebbleWrapper)) {
RunWithTempDir(t, func(dir string) {
db, err := pebble.Open(dir, &pebble.Options{})
require.NoError(t, err)
defer func() {
assert.NoError(t, db.Close())
}()
f(&PebbleWrapper{db})
})

}

func RunWithTypedPebbleDB(
t testing.TB,
create func(string, *pebble.Options) (*pebble.DB, error),
Expand Down

0 comments on commit 93fcaa1

Please sign in to comment.