From 8d98f0fc8cb3339a3b63d4f200080060dd55c71b Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 13 Sep 2024 16:53:53 -0700 Subject: [PATCH 1/4] add universal database operations --- cmd/bootstrap/utils/md5.go | 3 +- storage/batch.go | 8 +- storage/operation/badgerimpl/iterator.go | 65 ++++++ storage/operation/badgerimpl/reader.go | 54 +++++ storage/operation/badgerimpl/writer.go | 93 ++++++++ storage/operation/callbacks.go | 24 ++ storage/operation/codec.go | 34 +++ storage/operation/dbtest/helper.go | 60 +++++ storage/operation/pebbleimpl/iterator.go | 74 ++++++ storage/operation/pebbleimpl/reader.go | 47 ++++ storage/operation/pebbleimpl/writer.go | 83 +++++++ storage/operation/reads.go | 222 ++++++++++++++++++ storage/operation/reads_test.go | 189 +++++++++++++++ storage/operation/writes.go | 58 +++++ storage/operation/writes_test.go | 278 +++++++++++++++++++++++ storage/operations.go | 132 +++++++++++ utils/unittest/unittest.go | 70 ++++++ 17 files changed, 1492 insertions(+), 2 deletions(-) create mode 100644 storage/operation/badgerimpl/iterator.go create mode 100644 storage/operation/badgerimpl/reader.go create mode 100644 storage/operation/badgerimpl/writer.go create mode 100644 storage/operation/callbacks.go create mode 100644 storage/operation/codec.go create mode 100644 storage/operation/dbtest/helper.go create mode 100644 storage/operation/pebbleimpl/iterator.go create mode 100644 storage/operation/pebbleimpl/reader.go create mode 100644 storage/operation/pebbleimpl/writer.go create mode 100644 storage/operation/reads.go create mode 100644 storage/operation/reads_test.go create mode 100644 storage/operation/writes.go create mode 100644 storage/operation/writes_test.go create mode 100644 storage/operations.go diff --git a/cmd/bootstrap/utils/md5.go b/cmd/bootstrap/utils/md5.go index 65823fd6e96..4d4bbe21046 100644 --- a/cmd/bootstrap/utils/md5.go +++ b/cmd/bootstrap/utils/md5.go @@ -2,7 +2,8 @@ package utils // The google storage API only provides md5 and crc32 hence overriding the linter flag for md5 import ( - "crypto/md5" //nolint:gosec + // #nosec + "crypto/md5" "io" "os" ) diff --git a/storage/batch.go b/storage/batch.go index 3147fc5c0e7..23b9d39ac63 100644 --- a/storage/batch.go +++ b/storage/batch.go @@ -1,11 +1,17 @@ package storage -import "github.com/dgraph-io/badger/v2" +import ( + "github.com/dgraph-io/badger/v2" +) +// deprecated +// use Writer instead type Transaction interface { Set(key, val []byte) error } +// deprecated +// use ReaderBatchWriter instead // BatchStorage serves as an abstraction over batch storage, adding ability to add ability to add extra // callbacks which fire after the batch is successfully flushed. type BatchStorage interface { diff --git a/storage/operation/badgerimpl/iterator.go b/storage/operation/badgerimpl/iterator.go new file mode 100644 index 00000000000..81ecda2d719 --- /dev/null +++ b/storage/operation/badgerimpl/iterator.go @@ -0,0 +1,65 @@ +package badgerimpl + +import ( + "bytes" + + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/storage" +) + +type badgerIterator struct { + iter *badger.Iterator + lowerBound []byte + upperBound []byte +} + +var _ storage.Iterator = (*badgerIterator)(nil) + +func newBadgerIterator(db *badger.DB, startPrefix, endPrefix []byte, ops storage.IteratorOption) *badgerIterator { + options := badger.DefaultIteratorOptions + if ops.IterateKeyOnly { + options.PrefetchValues = false + } + + tx := db.NewTransaction(false) + iter := tx.NewIterator(options) + + lowerBound, upperBound := storage.StartEndPrefixToLowerUpperBound(startPrefix, endPrefix) + + return &badgerIterator{ + iter: iter, + lowerBound: lowerBound, + upperBound: upperBound, + } +} + +func (i *badgerIterator) SeekGE() { + i.iter.Seek(i.lowerBound) +} + +func (i *badgerIterator) Valid() bool { + // if it's beyond the upper bound, it's invalid + if !i.iter.Valid() { + return false + } + key := i.iter.Item().Key() + // "< 0" means the upperBound is exclusive + valid := bytes.Compare(key, i.upperBound) < 0 + return valid +} + +func (i *badgerIterator) Next() { + i.iter.Next() +} + +func (i *badgerIterator) IterItem() storage.IterItem { + return i.iter.Item() +} + +var _ storage.IterItem = (*badger.Item)(nil) + +func (i *badgerIterator) Close() error { + i.iter.Close() + return nil +} diff --git a/storage/operation/badgerimpl/reader.go b/storage/operation/badgerimpl/reader.go new file mode 100644 index 00000000000..06158e634ff --- /dev/null +++ b/storage/operation/badgerimpl/reader.go @@ -0,0 +1,54 @@ +package badgerimpl + +import ( + "errors" + "io" + + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/storage" +) + +type dbReader struct { + db *badger.DB +} + +type noopCloser struct{} + +var _ io.Closer = (*noopCloser)(nil) + +func (noopCloser) Close() error { return nil } + +func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) { + tx := b.db.NewTransaction(false) + defer tx.Discard() + + item, err := tx.Get(key) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return nil, nil, storage.ErrNotFound + } + return nil, nil, irrecoverable.NewExceptionf("could not load data: %w", err) + } + + var value []byte + err = item.Value(func(val []byte) error { + value = append([]byte{}, val...) + return nil + }) + if err != nil { + return nil, nil, irrecoverable.NewExceptionf("could not load value: %w", err) + } + + return value, noopCloser{}, nil +} + +func (b dbReader) NewIter(startPrefix, endPrefix []byte, ops storage.IteratorOption) (storage.Iterator, error) { + return newBadgerIterator(b.db, startPrefix, endPrefix, ops), nil +} + +// ToReader is a helper function to convert a *badger.DB to a Reader +func ToReader(db *badger.DB) storage.Reader { + return dbReader{db} +} diff --git a/storage/operation/badgerimpl/writer.go b/storage/operation/badgerimpl/writer.go new file mode 100644 index 00000000000..3837be3917f --- /dev/null +++ b/storage/operation/badgerimpl/writer.go @@ -0,0 +1,93 @@ +package badgerimpl + +import ( + "fmt" + + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation" + op "github.com/onflow/flow-go/storage/operation" +) + +type ReaderBatchWriter struct { + globalReader storage.Reader + batch *badger.WriteBatch + + callbacks op.Callbacks +} + +var _ storage.ReaderBatchWriter = (*ReaderBatchWriter)(nil) + +func (b *ReaderBatchWriter) GlobalReader() storage.Reader { + return b.globalReader +} + +func (b *ReaderBatchWriter) Writer() storage.Writer { + return b +} + +func (b *ReaderBatchWriter) BadgerWriteBatch() *badger.WriteBatch { + return b.batch +} + +func (b *ReaderBatchWriter) AddCallback(callback func(error)) { + b.callbacks.AddCallback(callback) +} + +func (b *ReaderBatchWriter) Commit() error { + err := b.batch.Flush() + + b.callbacks.NotifyCallbacks(err) + + return err +} + +func WithReaderBatchWriter(db *badger.DB, fn func(storage.ReaderBatchWriter) error) error { + batch := NewReaderBatchWriter(db) + + err := fn(batch) + if err != nil { + // fn might use lock to ensure concurrent safety while reading and writing data + // and the lock is usually released by a callback. + // in other words, fn might hold a lock to be released by a callback, + // we need to notify the callback for the locks to be released before + // returning the error. + batch.callbacks.NotifyCallbacks(err) + return err + } + + return batch.Commit() +} + +func NewReaderBatchWriter(db *badger.DB) *ReaderBatchWriter { + return &ReaderBatchWriter{ + globalReader: ToReader(db), + batch: db.NewWriteBatch(), + } +} + +var _ storage.Writer = (*ReaderBatchWriter)(nil) + +func (b *ReaderBatchWriter) Set(key, value []byte) error { + return b.batch.Set(key, value) +} + +func (b *ReaderBatchWriter) Delete(key []byte) error { + return b.batch.Delete(key) +} + +func (b *ReaderBatchWriter) DeleteByRange(globalReader storage.Reader, startPrefix, endPrefix []byte) error { + err := operation.IterateKeysInPrefixRange(startPrefix, endPrefix, func(key []byte) error { + err := b.batch.Delete(key) + if err != nil { + return fmt.Errorf("could not add key to delete batch (%v): %w", key, err) + } + return nil + })(globalReader) + + if err != nil { + return fmt.Errorf("could not find keys by range to be deleted: %w", err) + } + return nil +} diff --git a/storage/operation/callbacks.go b/storage/operation/callbacks.go new file mode 100644 index 00000000000..40d414ded91 --- /dev/null +++ b/storage/operation/callbacks.go @@ -0,0 +1,24 @@ +package operation + +import "sync" + +type Callbacks struct { + sync.Mutex // protect callbacks + callbacks []func(error) +} + +func (b *Callbacks) AddCallback(callback func(error)) { + b.Lock() + defer b.Unlock() + + b.callbacks = append(b.callbacks, callback) +} + +func (b *Callbacks) NotifyCallbacks(err error) { + b.Lock() + defer b.Unlock() + + for _, callback := range b.callbacks { + callback(err) + } +} diff --git a/storage/operation/codec.go b/storage/operation/codec.go new file mode 100644 index 00000000000..43dc4c37f7a --- /dev/null +++ b/storage/operation/codec.go @@ -0,0 +1,34 @@ +package operation + +import ( + "encoding/binary" + "fmt" + + "github.com/onflow/flow-go/model/flow" +) + +// EncodeKeyPart encodes a value to be used as a part of a key to be stored in storage. +func EncodeKeyPart(v interface{}) []byte { + switch i := v.(type) { + case uint8: + return []byte{i} + case uint32: + b := make([]byte, 4) + binary.BigEndian.PutUint32(b, i) + return b + case uint64: + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, i) + return b + case string: + return []byte(i) + case flow.Role: + return []byte{byte(i)} + case flow.Identifier: + return i[:] + case flow.ChainID: + return []byte(i) + default: + panic(fmt.Sprintf("unsupported type to convert (%T)", v)) + } +} diff --git a/storage/operation/dbtest/helper.go b/storage/operation/dbtest/helper.go new file mode 100644 index 00000000000..64a166c2390 --- /dev/null +++ b/storage/operation/dbtest/helper.go @@ -0,0 +1,60 @@ +package dbtest + +import ( + "testing" + + "github.com/cockroachdb/pebble" + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation/badgerimpl" + "github.com/onflow/flow-go/storage/operation/pebbleimpl" + "github.com/onflow/flow-go/utils/unittest" +) + +// helper types and functions +type WithWriter func(func(storage.Writer) error) error + +func RunWithStorages(t *testing.T, fn func(*testing.T, storage.Reader, WithWriter)) { + t.Run("BadgerStorage", func(t *testing.T) { + unittest.RunWithBadgerDB(t, func(db *badger.DB) { + withWriter := func(writing func(storage.Writer) error) error { + writer := badgerimpl.NewReaderBatchWriter(db) + err := writing(writer) + if err != nil { + return err + } + + err = writer.Commit() + if err != nil { + return err + } + return nil + } + + reader := badgerimpl.ToReader(db) + fn(t, reader, withWriter) + }) + }) + + t.Run("PebbleStorage", func(t *testing.T) { + unittest.RunWithPebbleDB(t, func(db *pebble.DB) { + withWriter := func(writing func(storage.Writer) error) error { + writer := pebbleimpl.NewReaderBatchWriter(db) + err := writing(writer) + if err != nil { + return err + } + + err = writer.Commit() + if err != nil { + return err + } + return nil + } + + reader := pebbleimpl.ToReader(db) + fn(t, reader, withWriter) + }) + }) +} diff --git a/storage/operation/pebbleimpl/iterator.go b/storage/operation/pebbleimpl/iterator.go new file mode 100644 index 00000000000..b6f3910cead --- /dev/null +++ b/storage/operation/pebbleimpl/iterator.go @@ -0,0 +1,74 @@ +package pebbleimpl + +import ( + "fmt" + + "github.com/cockroachdb/pebble" + + "github.com/onflow/flow-go/storage" +) + +type pebbleIterator struct { + iter *pebble.Iterator + lowerBound []byte +} + +var _ storage.Iterator = (*pebbleIterator)(nil) + +func newPebbleIterator(reader pebble.Reader, startPrefix, endPrefix []byte, ops storage.IteratorOption) (*pebbleIterator, error) { + lowerBound, upperBound := storage.StartEndPrefixToLowerUpperBound(startPrefix, endPrefix) + + options := pebble.IterOptions{ + LowerBound: lowerBound, + UpperBound: upperBound, + } + + iter, err := reader.NewIter(&options) + if err != nil { + return nil, fmt.Errorf("can not create iterator: %w", err) + } + + return &pebbleIterator{ + iter: iter, + lowerBound: lowerBound, + }, nil +} + +func (i *pebbleIterator) SeekGE() { + i.iter.SeekGE(i.lowerBound) +} + +func (i *pebbleIterator) Valid() bool { + return i.iter.Valid() +} + +func (i *pebbleIterator) Next() { + i.iter.Next() +} + +func (i *pebbleIterator) IterItem() storage.IterItem { + return pebbleIterItem{iter: i.iter} +} + +type pebbleIterItem struct { + iter *pebble.Iterator +} + +var _ storage.IterItem = (*pebbleIterItem)(nil) + +func (i pebbleIterItem) Key() []byte { + return i.iter.Key() +} + +func (i pebbleIterItem) Value(fn func([]byte) error) error { + val, err := i.iter.ValueAndErr() + if err != nil { + return err + } + + return fn(val) +} + +func (i *pebbleIterator) Close() error { + return i.iter.Close() +} diff --git a/storage/operation/pebbleimpl/reader.go b/storage/operation/pebbleimpl/reader.go new file mode 100644 index 00000000000..6cfdfd93da5 --- /dev/null +++ b/storage/operation/pebbleimpl/reader.go @@ -0,0 +1,47 @@ +package pebbleimpl + +import ( + "errors" + "io" + + "github.com/cockroachdb/pebble" + + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/storage" +) + +type dbReader struct { + db *pebble.DB +} + +var _ storage.Reader = (*dbReader)(nil) + +type noopCloser struct{} + +var _ io.Closer = (*noopCloser)(nil) + +func (noopCloser) Close() error { return nil } + +func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) { + value, closer, err := b.db.Get(key) + + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, nil, storage.ErrNotFound + } + + // exception while checking for the key + return nil, nil, irrecoverable.NewExceptionf("could not load data: %w", err) + } + + return value, closer, nil +} + +func (b dbReader) NewIter(startPrefix, endPrefix []byte, ops storage.IteratorOption) (storage.Iterator, error) { + return newPebbleIterator(b.db, startPrefix, endPrefix, ops) +} + +// ToReader is a helper function to convert a *pebble.DB to a Reader +func ToReader(db *pebble.DB) storage.Reader { + return dbReader{db} +} diff --git a/storage/operation/pebbleimpl/writer.go b/storage/operation/pebbleimpl/writer.go new file mode 100644 index 00000000000..ad639223209 --- /dev/null +++ b/storage/operation/pebbleimpl/writer.go @@ -0,0 +1,83 @@ +package pebbleimpl + +import ( + "github.com/cockroachdb/pebble" + + "github.com/onflow/flow-go/storage" + op "github.com/onflow/flow-go/storage/operation" +) + +type ReaderBatchWriter struct { + globalReader storage.Reader + batch *pebble.Batch + + callbacks op.Callbacks +} + +var _ storage.ReaderBatchWriter = (*ReaderBatchWriter)(nil) + +func (b *ReaderBatchWriter) GlobalReader() storage.Reader { + return b.globalReader +} + +func (b *ReaderBatchWriter) Writer() storage.Writer { + return b +} + +func (b *ReaderBatchWriter) PebbleWriterBatch() *pebble.Batch { + return b.batch +} + +func (b *ReaderBatchWriter) AddCallback(callback func(error)) { + b.callbacks.AddCallback(callback) +} + +func (b *ReaderBatchWriter) Commit() error { + err := b.batch.Commit(pebble.Sync) + + b.callbacks.NotifyCallbacks(err) + + return err +} + +func WithReaderBatchWriter(db *pebble.DB, fn func(storage.ReaderBatchWriter) error) error { + batch := NewReaderBatchWriter(db) + + err := fn(batch) + if err != nil { + // fn might use lock to ensure concurrent safety while reading and writing data + // and the lock is usually released by a callback. + // in other words, fn might hold a lock to be released by a callback, + // we need to notify the callback for the locks to be released before + // returning the error. + batch.callbacks.NotifyCallbacks(err) + return err + } + + return batch.Commit() +} + +func NewReaderBatchWriter(db *pebble.DB) *ReaderBatchWriter { + return &ReaderBatchWriter{ + globalReader: ToReader(db), + batch: db.NewBatch(), + } +} + +var _ storage.Writer = (*ReaderBatchWriter)(nil) + +func (b *ReaderBatchWriter) Set(key, value []byte) error { + return b.batch.Set(key, value, pebble.Sync) +} + +func (b *ReaderBatchWriter) Delete(key []byte) error { + return b.batch.Delete(key, pebble.Sync) +} + +// DeleteByRange deletes all keys with the given prefix defined by [startPrefix, endPrefix] (both inclusive). +func (b *ReaderBatchWriter) DeleteByRange(_ storage.Reader, startPrefix, endPrefix []byte) error { + // DeleteRange takes the prefix range with start (inclusive) and end (exclusive, note: not inclusive). + // therefore, we need to increment the endPrefix to make it inclusive. + start, end := storage.StartEndPrefixToLowerUpperBound(startPrefix, endPrefix) + return b.batch.DeleteRange(start, end, pebble.Sync) +} diff --git a/storage/operation/reads.go b/storage/operation/reads.go new file mode 100644 index 00000000000..2e6be8dd3fe --- /dev/null +++ b/storage/operation/reads.go @@ -0,0 +1,222 @@ +package operation + +import ( + "bytes" + "errors" + "fmt" + + "github.com/vmihailenco/msgpack" + + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/storage" +) + +// CheckFunc is a function that checks if the value should be read and decoded. +// return (true, nil) to read the value and pass it to the CreateFunc and HandleFunc for decoding +// return (false, nil) to skip reading the value +// return (false, err) if running into any exception, the iteration should be stopped. +type CheckFunc func(key []byte) (bool, error) + +// createFunc returns a pointer to an initialized entity that we can potentially +// decode the next value into during a badger DB iteration. +type CreateFunc func() interface{} + +// handleFunc is a function that starts the processing of the current key-value +// pair during a badger iteration. It should be called after the key was checked +// and the entity was decoded. +// No errors are expected during normal operation. Any errors will halt the iteration. +type HandleFunc func() error +type IterationFunc func() (CheckFunc, CreateFunc, HandleFunc) + +// IterateKeysInPrefixRange will iterate over all keys in the given range [startPrefix, endPrefix] (both inclusive) +func IterateKeysInPrefixRange(startPrefix []byte, endPrefix []byte, check func(key []byte) error) func(storage.Reader) error { + return Iterate(startPrefix, endPrefix, func() (CheckFunc, CreateFunc, HandleFunc) { + return func(key []byte) (bool, error) { + err := check(key) + if err != nil { + return false, err + } + return false, nil + }, nil, nil + }, storage.IteratorOption{IterateKeyOnly: true}) +} + +// Iterate will iterate over all keys in the given range [startPrefix, endPrefix] (both inclusive) +func Iterate(startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) func(storage.Reader) error { + return func(r storage.Reader) error { + + if len(startPrefix) == 0 { + return fmt.Errorf("startPrefix prefix is empty") + } + + if len(endPrefix) == 0 { + return fmt.Errorf("endPrefix prefix is empty") + } + + // Reverse iteration is not supported by pebble + if bytes.Compare(startPrefix, endPrefix) > 0 { + return fmt.Errorf("startPrefix key must be less than or equal to endPrefix key") + } + + it, err := r.NewIter(startPrefix, endPrefix, opt) + if err != nil { + return fmt.Errorf("can not create iterator: %w", err) + } + defer it.Close() + + for it.SeekGE(); it.Valid(); it.Next() { + item := it.IterItem() + key := item.Key() + + // initialize processing functions for iteration + check, create, handle := iterFunc() + + keyCopy := make([]byte, len(key)) + copy(keyCopy, key) + + // check if we should process the item at all + shouldReadValue, err := check(keyCopy) + if err != nil { + return err + } + if !shouldReadValue { // skip reading value + continue + } + + err = item.Value(func(val []byte) error { + + // decode into the entity + entity := create() + err = msgpack.Unmarshal(val, entity) + if err != nil { + return irrecoverable.NewExceptionf("could not decode entity: %w", err) + } + + // process the entity + err = handle() + if err != nil { + return fmt.Errorf("could not handle entity: %w", err) + } + + return nil + }) + + if err != nil { + return fmt.Errorf("could not process value: %w", err) + } + } + + return nil + } +} + +// Traverse will iterate over all keys with the given prefix +func Traverse(prefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) func(storage.Reader) error { + return Iterate(prefix, PrefixUpperBound(prefix), iterFunc, opt) +} + +// PrefixUpperBound returns a key K such that all possible keys beginning with the input prefix +// sort lower than K according to the byte-wise lexicographic key ordering used by Pebble. +// This is used to define an upper bound for iteration, when we want to iterate over +// all keys beginning with a given prefix. +// referred to https://pkg.go.dev/github.com/cockroachdb/pebble#example-Iterator-PrefixIteration +func PrefixUpperBound(prefix []byte) []byte { + end := make([]byte, len(prefix)) + copy(end, prefix) + for i := len(end) - 1; i >= 0; i-- { + // increment the bytes by 1 + end[i] = end[i] + 1 + if end[i] != 0 { + return end[:i+1] + } + } + return nil // no upper-bound +} + +// Exists returns true if a key exists in the database. +// No errors are expected during normal operation. +func Exists(key []byte, keyExists *bool) func(storage.Reader) error { + return func(r storage.Reader) error { + _, closer, err := r.Get(key) + if err != nil { + // the key does not exist in the database + if errors.Is(err, storage.ErrNotFound) { + *keyExists = false + return nil + } + // exception while checking for the key + return irrecoverable.NewExceptionf("could not load data: %w", err) + } + defer closer.Close() + + // the key does exist in the database + *keyExists = true + return nil + } +} + +// retrieve will retrieve the binary data under the given key from the badger DB +// and decode it into the given entity. The provided entity needs to be a +// pointer to an initialized entity of the correct type. +// Error returns: +// - storage.ErrNotFound if the key does not exist in the database +// - generic error in case of unexpected failure from the database layer, or failure +// to decode an existing database value +func Retrieve(key []byte, entity interface{}) func(storage.Reader) error { + return func(r storage.Reader) error { + val, closer, err := r.Get(key) + if err != nil { + return err + } + + defer closer.Close() + + err = msgpack.Unmarshal(val, entity) + if err != nil { + return irrecoverable.NewExceptionf("could not decode entity: %w", err) + } + return nil + } +} + +// FindHighestAtOrBelow finds the highest key with the given prefix and +// height equal to or below the given height. +func FindHighestAtOrBelow(prefix []byte, height uint64, entity interface{}) func(storage.Reader) error { + return func(r storage.Reader) error { + if len(prefix) == 0 { + return fmt.Errorf("prefix must not be empty") + } + + key := append(prefix, EncodeKeyPart(height)...) + it, err := r.NewIter(prefix, key, storage.DefaultIteratorOptions()) + if err != nil { + return fmt.Errorf("can not create iterator: %w", err) + } + defer it.Close() + + var highestKey []byte + // find highest value below the given height + for it.SeekGE(); it.Valid(); it.Next() { + highestKey = it.IterItem().Key() + } + + if len(highestKey) == 0 { + return storage.ErrNotFound + } + + // read the value of the highest key + val, closer, err := r.Get(highestKey) + if err != nil { + return err + } + + defer closer.Close() + + err = msgpack.Unmarshal(val, entity) + if err != nil { + return irrecoverable.NewExceptionf("failed to decode value: %w", err) + } + + return nil + } +} diff --git a/storage/operation/reads_test.go b/storage/operation/reads_test.go new file mode 100644 index 00000000000..e24bc15b5ae --- /dev/null +++ b/storage/operation/reads_test.go @@ -0,0 +1,189 @@ +package operation_test + +import ( + "bytes" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation" + "github.com/onflow/flow-go/storage/operation/dbtest" +) + +func TestIterateKeysInPrefixRange(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + // Define the prefix range + prefixStart := []byte{0x10} + prefixEnd := []byte{0x20} + + // Create a range of keys around the prefix start/end values + keys := [][]byte{ + // before start -> not included in range + {0x09, 0xff}, + // within the start prefix -> included in range + {0x10, 0x00}, + {0x10, 0xff}, + // between start and end -> included in range + {0x15, 0x00}, + {0x1A, 0xff}, + // within the end prefix -> included in range + {0x20, 0x00}, + {0x20, 0xff}, + // after end -> not included in range + {0x21, 0x00}, + } + + // Keys expected to be in the prefix range + lastNToExclude := 1 + keysInRange := keys[1 : len(keys)-lastNToExclude] // these keys are between the start and end + + // Insert the keys into the storage + require.NoError(t, withWriter(func(writer storage.Writer) error { + for _, key := range keys { + value := []byte{0x00} // value are skipped, doesn't matter + err := operation.Upsert(key, value)(writer) + if err != nil { + return err + } + } + return nil + })) + + // Forward iteration and check boundaries + var found [][]byte + require.NoError(t, operation.IterateKeysInPrefixRange(prefixStart, prefixEnd, func(key []byte) error { + found = append(found, key) + return nil + })(r), "should iterate forward without error") + require.ElementsMatch(t, keysInRange, found, "forward iteration should return the correct keys in range") + }) +} + +func TestTraverse(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + keys := [][]byte{ + {0x42, 0x00}, + {0xff}, + {0x42, 0x56}, + {0x00}, + {0x42, 0xff}, + } + vals := []uint64{11, 13, 17, 19, 23} + expected := []uint64{11, 23} + + // Insert the keys and values into storage + require.NoError(t, withWriter(func(writer storage.Writer) error { + for i, key := range keys { + err := operation.Upsert(key, vals[i])(writer) + if err != nil { + return err + } + } + return nil + })) + + actual := make([]uint64, 0, len(keys)) + + // Define the iteration logic + iterationFunc := func() (operation.CheckFunc, operation.CreateFunc, operation.HandleFunc) { + check := func(key []byte) (bool, error) { + // Skip the key {0x42, 0x56} + return !bytes.Equal(key, []byte{0x42, 0x56}), nil + } + var val uint64 + create := func() interface{} { + return &val + } + handle := func() error { + actual = append(actual, val) + return nil + } + return check, create, handle + } + + // Traverse the keys starting with prefix {0x42} + err := operation.Traverse([]byte{0x42}, iterationFunc, storage.DefaultIteratorOptions())(r) + require.NoError(t, err, "traverse should not return an error") + + // Assert that the actual values match the expected values + require.Equal(t, expected, actual, "traversed values should match expected values") + }) +} + +func TestFindHighestAtOrBelow(t *testing.T) { + // Helper function to insert an entity into the storage + insertEntity := func(writer storage.Writer, prefix []byte, height uint64, entity Entity) error { + key := append(prefix, operation.EncodeKeyPart(height)...) + return operation.Upsert(key, entity)(writer) + } + + // Entities to be inserted + entities := []struct { + height uint64 + entity Entity + }{ + {5, Entity{ID: 41}}, + {10, Entity{ID: 42}}, + {15, Entity{ID: 43}}, + } + + // Run test with multiple storage backends + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + prefix := []byte("test_prefix") + + // Insert entities into the storage + require.NoError(t, withWriter(func(writer storage.Writer) error { + for _, e := range entities { + if err := insertEntity(writer, prefix, e.height, e.entity); err != nil { + return err + } + } + return nil + })) + + // Declare entity to store the results of FindHighestAtOrBelow + var entity Entity + + // Test cases + tests := []struct { + name string + height uint64 + expectedValue uint64 + expectError bool + expectedErrMsg string + }{ + {"target first height exists", 5, 41, false, ""}, + {"target height exists", 10, 42, false, ""}, + {"target height above", 11, 42, false, ""}, + {"target height above highest", 20, 43, false, ""}, + {"target height below lowest", 4, 0, true, storage.ErrNotFound.Error()}, + {"empty prefix", 5, 0, true, "prefix must not be empty"}, + } + + // Execute test cases + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + prefixToUse := prefix + + if tt.name == "empty prefix" { + prefixToUse = []byte{} + } + + err := operation.FindHighestAtOrBelow( + prefixToUse, + tt.height, + &entity)(r) + + if tt.expectError { + require.Error(t, err, fmt.Sprintf("expected error but got nil, entity: %v", entity)) + require.Contains(t, err.Error(), tt.expectedErrMsg) + } else { + require.NoError(t, err) + require.Equal(t, tt.expectedValue, entity.ID) + } + }) + } + }) +} diff --git a/storage/operation/writes.go b/storage/operation/writes.go new file mode 100644 index 00000000000..3bbe08d12d2 --- /dev/null +++ b/storage/operation/writes.go @@ -0,0 +1,58 @@ +package operation + +import ( + "github.com/vmihailenco/msgpack" + + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/storage" +) + +// Upsert will encode the given entity using msgpack and will insert the resulting +// binary data under the provided key. +// If the key already exists, the value will be overwritten. +// Error returns: +// - generic error in case of unexpected failure from the database layer or +// encoding failure. +func Upsert(key []byte, val interface{}) func(storage.Writer) error { + return func(w storage.Writer) error { + value, err := msgpack.Marshal(val) + if err != nil { + return irrecoverable.NewExceptionf("failed to encode value: %w", err) + } + + err = w.Set(key, value) + if err != nil { + return irrecoverable.NewExceptionf("failed to store data: %w", err) + } + + return nil + } +} + +// Remove removes the entity with the given key, if it exists. If it doesn't +// exist, this is a no-op. +// Error returns: +// * generic error in case of unexpected database error +func Remove(key []byte) func(storage.Writer) error { + return func(w storage.Writer) error { + err := w.Delete(key) + if err != nil { + return irrecoverable.NewExceptionf("could not delete item: %w", err) + } + return nil + } +} + +// RemoveByPrefix removes all keys with the given prefix defined by [startPrefix, endPrefix] (both inclusive). +// If no keys exist with the given prefix, this is a no-op. +// Error returns: +// * generic error in case of unexpected database error +func RemoveByPrefix(reader storage.Reader, key []byte) func(storage.Writer) error { + return func(w storage.Writer) error { + err := w.DeleteByRange(reader, key, key) + if err != nil { + return irrecoverable.NewExceptionf("could not delete item: %w", err) + } + return nil + } +} diff --git a/storage/operation/writes_test.go b/storage/operation/writes_test.go new file mode 100644 index 00000000000..aa7b5020b1a --- /dev/null +++ b/storage/operation/writes_test.go @@ -0,0 +1,278 @@ +package operation_test + +import ( + "encoding/binary" + "errors" + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation" + "github.com/onflow/flow-go/storage/operation/dbtest" + "github.com/onflow/flow-go/utils/unittest" +) + +func TestReadWrite(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + e := Entity{ID: 1337} + + // Test read nothing should return not found + var item Entity + err := operation.Retrieve(e.Key(), &item)(r) + require.True(t, errors.Is(err, storage.ErrNotFound), "expected not found error") + + require.NoError(t, withWriter(operation.Upsert(e.Key(), e))) + + var readBack Entity + require.NoError(t, operation.Retrieve(e.Key(), &readBack)(r)) + require.Equal(t, e, readBack, "expected retrieved value to match written value") + + // Test write again should overwrite + newEntity := Entity{ID: 42} + require.NoError(t, withWriter(operation.Upsert(e.Key(), newEntity))) + + require.NoError(t, operation.Retrieve(e.Key(), &readBack)(r)) + require.Equal(t, newEntity, readBack, "expected overwritten value to be retrieved") + + // Test write should not overwrite a different key + anotherEntity := Entity{ID: 84} + require.NoError(t, withWriter(operation.Upsert(anotherEntity.Key(), anotherEntity))) + + var anotherReadBack Entity + require.NoError(t, operation.Retrieve(anotherEntity.Key(), &anotherReadBack)(r)) + require.Equal(t, anotherEntity, anotherReadBack, "expected different key to return different value") + }) +} + +func TestReadWriteMalformed(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + e := Entity{ID: 1337} + ue := UnencodeableEntity(e) + + // Test write should return encoding error + require.NoError(t, withWriter(func(writer storage.Writer) error { + err := operation.Upsert(e.Key(), ue)(writer) + require.Contains(t, err.Error(), errCantEncode.Error(), "expected encoding error") + return nil + })) + + // Test read should return decoding error + var exists bool + require.NoError(t, operation.Exists(e.Key(), &exists)(r)) + require.False(t, exists, "expected key to not exist") + }) +} + +// Verify multiple entities can be removed in one batch update +func TestBatchWrite(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + // Define multiple entities for batch insertion + entities := []Entity{ + {ID: 1337}, + {ID: 42}, + {ID: 84}, + } + + // Batch write: insert multiple entities in a single transaction + require.NoError(t, withWriter(func(writer storage.Writer) error { + for _, e := range entities { + if err := operation.Upsert(e.Key(), e)(writer); err != nil { + return err + } + } + return nil + })) + + // Verify that each entity can be read back + for _, e := range entities { + var readBack Entity + require.NoError(t, operation.Retrieve(e.Key(), &readBack)(r)) + require.Equal(t, e, readBack, "expected retrieved value to match written value for entity ID %d", e.ID) + } + + // Batch update: remove multiple entities in a single transaction + require.NoError(t, withWriter(func(writer storage.Writer) error { + for _, e := range entities { + if err := operation.Remove(e.Key())(writer); err != nil { + return err + } + } + return nil + })) + + // Verify that each entity has been removed + for _, e := range entities { + var readBack Entity + err := operation.Retrieve(e.Key(), &readBack)(r) + require.True(t, errors.Is(err, storage.ErrNotFound), "expected not found error for entity ID %d after removal", e.ID) + } + }) +} + +func TestRemove(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + e := Entity{ID: 1337} + + var exists bool + require.NoError(t, operation.Exists(e.Key(), &exists)(r)) + require.False(t, exists, "expected key to not exist") + + // Test delete nothing should return OK + require.NoError(t, withWriter(operation.Remove(e.Key()))) + + // Test write, delete, then read should return not found + require.NoError(t, withWriter(operation.Upsert(e.Key(), e))) + + require.NoError(t, operation.Exists(e.Key(), &exists)(r)) + require.True(t, exists, "expected key to exist") + + require.NoError(t, withWriter(operation.Remove(e.Key()))) + + var item Entity + err := operation.Retrieve(e.Key(), &item)(r) + require.True(t, errors.Is(err, storage.ErrNotFound), "expected not found error after delete") + }) +} + +func TestConcurrentWrite(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + var wg sync.WaitGroup + numWrites := 10 // number of concurrent writes + + for i := 0; i < numWrites; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + e := Entity{ID: uint64(i)} + + // Simulate a concurrent write to a different key + require.NoError(t, withWriter(operation.Upsert(e.Key(), e))) + + var readBack Entity + require.NoError(t, operation.Retrieve(e.Key(), &readBack)(r)) + require.Equal(t, e, readBack, "expected retrieved value to match written value for key %d", i) + }(i) + } + + wg.Wait() // Wait for all goroutines to finish + }) +} + +func TestConcurrentRemove(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + var wg sync.WaitGroup + numDeletes := 10 // number of concurrent deletions + + // First, insert entities to be deleted concurrently + for i := 0; i < numDeletes; i++ { + e := Entity{ID: uint64(i)} + require.NoError(t, withWriter(operation.Upsert(e.Key(), e))) + } + + // Now, perform concurrent deletes + for i := 0; i < numDeletes; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + e := Entity{ID: uint64(i)} + + // Simulate a concurrent delete + require.NoError(t, withWriter(operation.Remove(e.Key()))) + + // Check that the item is no longer retrievable + var item Entity + err := operation.Retrieve(e.Key(), &item)(r) + require.True(t, errors.Is(err, storage.ErrNotFound), "expected not found error after delete for key %d", i) + }(i) + } + + wg.Wait() // Wait for all goroutines to finish + }) +} + +func TestRemoveRange(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + + // Define the prefix + prefix := []byte{0x10} + + // Create a range of keys around the boundaries of the prefix + keys := [][]byte{ + // before prefix -> not included in range + {0x09, 0xff}, + // within the prefix -> included in range + {0x10, 0x00}, + {0x10, 0x50}, + {0x10, 0xff}, + // after end -> not included in range + {0x11, 0x00}, + {0x1A, 0xff}, + } + + // Keys expected to be in the prefix range + includeStart, includeEnd := 1, 3 + + // Insert the keys into the storage + require.NoError(t, withWriter(func(writer storage.Writer) error { + for _, key := range keys { + value := []byte{0x00} // value are skipped, doesn't matter + err := operation.Upsert(key, value)(writer) + if err != nil { + return err + } + } + return nil + })) + + // Remove the keys in the prefix range + require.NoError(t, withWriter(operation.RemoveByPrefix(r, prefix))) + + lg := unittest.Logger().With().Logger() + // Verify that the keys in the prefix range have been removed + for i, key := range keys { + var exists bool + require.NoError(t, operation.Exists(key, &exists)(r)) + lg.Info().Msgf("key %x exists: %t", key, exists) + + deleted := includeStart <= i && i <= includeEnd + + // deleted item should not exist + require.Equal(t, !deleted, exists, + "expected key %x to be %s", key, map[bool]string{true: "deleted", false: "not deleted"}) + } + }) +} + +type Entity struct { + ID uint64 +} + +func (e Entity) Key() []byte { + byteSlice := make([]byte, 8) // uint64 is 8 bytes + binary.BigEndian.PutUint64(byteSlice, e.ID) + return byteSlice +} + +type UnencodeableEntity Entity + +var errCantEncode = fmt.Errorf("encoding not supported") +var errCantDecode = fmt.Errorf("decoding not supported") + +func (a UnencodeableEntity) MarshalJSON() ([]byte, error) { + return nil, errCantEncode +} + +func (a *UnencodeableEntity) UnmarshalJSON(b []byte) error { + return errCantDecode +} + +func (a UnencodeableEntity) MarshalMsgpack() ([]byte, error) { + return nil, errCantEncode +} + +func (a UnencodeableEntity) UnmarshalMsgpack(b []byte) error { + return errCantDecode +} diff --git a/storage/operations.go b/storage/operations.go new file mode 100644 index 00000000000..c261d4ba28c --- /dev/null +++ b/storage/operations.go @@ -0,0 +1,132 @@ +package storage + +import ( + "io" +) + +// Iterator is an interface for iterating over key-value pairs in a storage backend. +type Iterator interface { + // SeekGE seeks to the smallest key greater than or equal to the given key. + SeekGE() + + // Valid returns whether the iterator is positioned at a valid key-value pair. + Valid() bool + + // Next advances the iterator to the next key-value pair. + Next() + + // Key returns the key of the current key-value pair, or nil if done. + IterItem() IterItem + + // Close closes the iterator. Iterator must be closed, otherwise it causes memory leak. + Close() error +} + +// IterItem is an interface for iterating over key-value pairs in a storage backend. +type IterItem interface { + Key() []byte + + // Value returns the value of the current key-value pair + // The reason it takes a function is to follow badgerDB's API pattern + Value(func(val []byte) error) error +} + +type IteratorOption struct { + IterateKeyOnly bool // default false +} + +func DefaultIteratorOptions() IteratorOption { + return IteratorOption{ + IterateKeyOnly: false, // only needed for badger. ignored by pebble + } +} + +type Reader interface { + // Get gets the value for the given key. It returns ErrNotFound if the DB + // does not contain the key. + // + // The caller should not modify the contents of the returned slice, but it is + // safe to modify the contents of the argument after Get returns. The + // returned slice will remain valid until the returned Closer is closed. On + // success, the caller MUST call closer.Close() or a memory leak will occur. + Get(key []byte) (value []byte, closer io.Closer, err error) + + // NewIter returns a new Iterator for the given key range [startPrefix, endPrefix], both inclusive. + NewIter(startPrefix, endPrefix []byte, ops IteratorOption) (Iterator, error) +} + +// Writer is an interface for batch writing to a storage backend. +type Writer interface { + // Set sets the value for the given key. It overwrites any previous value + // for that key; a DB is not a multi-map. + // + // It is safe to modify the contents of the arguments after Set returns. + Set(k, v []byte) error + + // Delete deletes the value for the given key. Deletes are blind all will + // succeed even if the given key does not exist. + // + // It is safe to modify the contents of the arguments after Delete returns. + Delete(key []byte) error + + // DeleteByRange removes all keys with a prefix that falls within the + // range [start, end], both inclusive. + DeleteByRange(globalReader Reader, startPrefix, endPrefix []byte) error +} + +// ReaderBatchWriter is an interface for reading and writing to a storage backend. +type ReaderBatchWriter interface { + // GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation"). + // This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed. + // This reader may observe different values for the same key on subsequent reads. + GlobalReader() Reader + + // Writer returns a writer associated with a batch of writes. The batch is pending until it is committed. + // When we `Write` into the batch, that write operation is added to the pending batch, but not committed. + // The commit operation is atomic w.r.t. the batch; either all writes are applied to the database, or no writes are. + // Note: + // - The writer cannot be used concurrently for writing. + Writer() Writer + + // AddCallback adds a callback to execute after the batch has been flush + // regardless the batch update is succeeded or failed. + // The error parameter is the error returned by the batch update. + AddCallback(func(error)) +} + +// OnCommitSucceed adds a callback to execute after the batch has been successfully committed. +func OnCommitSucceed(b ReaderBatchWriter, onSuccessFn func()) { + b.AddCallback(func(err error) { + if err == nil { + onSuccessFn() + } + }) +} + +func StartEndPrefixToLowerUpperBound(startPrefix, endPrefix []byte) (lowerBound, upperBound []byte) { + // LowerBound specifies the smallest key to iterate and it's inclusive. + // UpperBound specifies the largest key to iterate and it's exclusive (not inclusive) + // in order to match all keys prefixed with the `end` bytes, we increment the bytes of end by 1, + // for instance, to iterate keys between "hello" and "world", + // we use "hello" as LowerBound, "worle" as UpperBound, so that "world", "world1", "worldffff...ffff" + // will all be included. + return startPrefix, prefixUpperBound(endPrefix) +} + +// prefixUpperBound returns a key K such that all possible keys beginning with the input prefix +// sort lower than K according to the byte-wise lexicographic key ordering used by Pebble. +// This is used to define an upper bound for iteration, when we want to iterate over +// all keys beginning with a given prefix. +// referred to https://pkg.go.dev/github.com/cockroachdb/pebble#example-Iterator-PrefixIteration +func prefixUpperBound(prefix []byte) []byte { + end := make([]byte, len(prefix)) + copy(end, prefix) + for i := len(end) - 1; i >= 0; i-- { + // increment the bytes by 1 + end[i] = end[i] + 1 + if end[i] != 0 { + return end[:i+1] + } + } + return nil // no upper-bound +} diff --git a/utils/unittest/unittest.go b/utils/unittest/unittest.go index 4d13b279087..d15f39cd27c 100644 --- a/utils/unittest/unittest.go +++ b/utils/unittest/unittest.go @@ -368,6 +368,11 @@ func TempBadgerDB(t testing.TB) (*badger.DB, string) { return db, dir } +func TempPebbleDB(t testing.TB) (*pebble.DB, string) { + dir := TempDir(t) + return PebbleDB(t, dir), dir +} + func TempPebblePath(t *testing.T) string { return path.Join(TempDir(t), "pebble"+strconv.Itoa(rand.Int())+".db") } @@ -380,6 +385,71 @@ func TempPebbleDBWithOpts(t testing.TB, opts *pebble.Options) (*pebble.DB, strin return db, dbpath } +func RunWithPebbleDB(t testing.TB, f func(*pebble.DB)) { + RunWithTempDir(t, func(dir string) { + db, err := pebble.Open(dir, &pebble.Options{}) + require.NoError(t, err) + defer func() { + assert.NoError(t, db.Close()) + }() + f(db) + }) +} + +func PebbleDB(t testing.TB, dir string) *pebble.DB { + db, err := pebble.Open(dir, &pebble.Options{}) + require.NoError(t, err) + return db +} + +func TypedPebbleDB(t testing.TB, dir string, create func(string, *pebble.Options) (*pebble.DB, error)) *pebble.DB { + db, err := create(dir, &pebble.Options{}) + require.NoError(t, err) + return db +} + +type PebbleWrapper struct { + db *pebble.DB +} + +func (p *PebbleWrapper) View(fn func(pebble.Reader) error) error { + return fn(p.db) +} + +func (p *PebbleWrapper) Update(fn func(pebble.Writer) error) error { + return fn(p.db) +} + +func (p *PebbleWrapper) DB() *pebble.DB { + return p.db +} + +func RunWithWrappedPebbleDB(t testing.TB, f func(p *PebbleWrapper)) { + RunWithTempDir(t, func(dir string) { + db, err := pebble.Open(dir, &pebble.Options{}) + require.NoError(t, err) + defer func() { + assert.NoError(t, db.Close()) + }() + f(&PebbleWrapper{db}) + }) + +} + +func RunWithTypedPebbleDB( + t testing.TB, + create func(string, *pebble.Options) (*pebble.DB, error), + f func(*pebble.DB)) { + RunWithTempDir(t, func(dir string) { + db, err := create(dir, &pebble.Options{}) + require.NoError(t, err) + defer func() { + assert.NoError(t, db.Close()) + }() + f(db) + }) +} + func Concurrently(n int, f func(int)) { var wg sync.WaitGroup for i := 0; i < n; i++ { From 90a24264c6d0c652ddc02b5befe6bfe53d4d5a4d Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 1 Nov 2024 14:10:48 -0700 Subject: [PATCH 2/4] address review comments --- storage/operation/pebbleimpl/iterator.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/storage/operation/pebbleimpl/iterator.go b/storage/operation/pebbleimpl/iterator.go index b6f3910cead..d8453b497f5 100644 --- a/storage/operation/pebbleimpl/iterator.go +++ b/storage/operation/pebbleimpl/iterator.go @@ -9,8 +9,7 @@ import ( ) type pebbleIterator struct { - iter *pebble.Iterator - lowerBound []byte + iter *pebble.Iterator } var _ storage.Iterator = (*pebbleIterator)(nil) @@ -29,13 +28,12 @@ func newPebbleIterator(reader pebble.Reader, startPrefix, endPrefix []byte, ops } return &pebbleIterator{ - iter: iter, - lowerBound: lowerBound, + iter: iter, }, nil } func (i *pebbleIterator) SeekGE() { - i.iter.SeekGE(i.lowerBound) + i.iter.First() } func (i *pebbleIterator) Valid() bool { From 326772e598b9d754b34d9e90670e7e71589eef7a Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 1 Nov 2024 14:12:25 -0700 Subject: [PATCH 3/4] consolidate PrefixUpperbound --- storage/batch.go | 8 +++--- storage/operation/badgerimpl/iterator.go | 4 +-- storage/operation/badgerimpl/reader.go | 6 +---- storage/operation/pebbleimpl/iterator.go | 2 +- storage/operation/pebbleimpl/writer.go | 2 +- storage/operation/reads.go | 33 +++++++----------------- storage/operation/reads_test.go | 27 +++++++++---------- storage/operation/writes_test.go | 6 ++--- storage/operations.go | 33 +++++++++++++++++------- utils/unittest/unittest.go | 28 -------------------- 10 files changed, 57 insertions(+), 92 deletions(-) diff --git a/storage/batch.go b/storage/batch.go index 23b9d39ac63..bc9c4853294 100644 --- a/storage/batch.go +++ b/storage/batch.go @@ -4,16 +4,16 @@ import ( "github.com/dgraph-io/badger/v2" ) -// deprecated -// use Writer instead +// Deprecated: Transaction is being deprecated as part of the transition from Badger to Pebble. +// Use Writer instead of Transaction for all new code. type Transaction interface { Set(key, val []byte) error } -// deprecated -// use ReaderBatchWriter instead // BatchStorage serves as an abstraction over batch storage, adding ability to add ability to add extra // callbacks which fire after the batch is successfully flushed. +// Deprecated: BatchStorage is being deprecated as part of the transition from Badger to Pebble. +// Use ReaderBatchWriter instead of BatchStorage for all new code. type BatchStorage interface { GetWriter() *badger.WriteBatch diff --git a/storage/operation/badgerimpl/iterator.go b/storage/operation/badgerimpl/iterator.go index 81ecda2d719..e9f8b5dc6be 100644 --- a/storage/operation/badgerimpl/iterator.go +++ b/storage/operation/badgerimpl/iterator.go @@ -34,7 +34,7 @@ func newBadgerIterator(db *badger.DB, startPrefix, endPrefix []byte, ops storage } } -func (i *badgerIterator) SeekGE() { +func (i *badgerIterator) First() { i.iter.Seek(i.lowerBound) } @@ -44,7 +44,7 @@ func (i *badgerIterator) Valid() bool { return false } key := i.iter.Item().Key() - // "< 0" means the upperBound is exclusive + // "< 0" means "key < upperBound" valid := bytes.Compare(key, i.upperBound) < 0 return valid } diff --git a/storage/operation/badgerimpl/reader.go b/storage/operation/badgerimpl/reader.go index 06158e634ff..8d7d982d65e 100644 --- a/storage/operation/badgerimpl/reader.go +++ b/storage/operation/badgerimpl/reader.go @@ -32,11 +32,7 @@ func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) { return nil, nil, irrecoverable.NewExceptionf("could not load data: %w", err) } - var value []byte - err = item.Value(func(val []byte) error { - value = append([]byte{}, val...) - return nil - }) + value, err := item.ValueCopy(nil) if err != nil { return nil, nil, irrecoverable.NewExceptionf("could not load value: %w", err) } diff --git a/storage/operation/pebbleimpl/iterator.go b/storage/operation/pebbleimpl/iterator.go index d8453b497f5..b1b6630cc51 100644 --- a/storage/operation/pebbleimpl/iterator.go +++ b/storage/operation/pebbleimpl/iterator.go @@ -32,7 +32,7 @@ func newPebbleIterator(reader pebble.Reader, startPrefix, endPrefix []byte, ops }, nil } -func (i *pebbleIterator) SeekGE() { +func (i *pebbleIterator) First() { i.iter.First() } diff --git a/storage/operation/pebbleimpl/writer.go b/storage/operation/pebbleimpl/writer.go index ad639223209..c6ccdff06b9 100644 --- a/storage/operation/pebbleimpl/writer.go +++ b/storage/operation/pebbleimpl/writer.go @@ -74,7 +74,7 @@ func (b *ReaderBatchWriter) Delete(key []byte) error { return b.batch.Delete(key, pebble.Sync) } -// DeleteByRange deletes all keys with the given prefix defined by [startPrefix, endPrefix] (both inclusive). +// DeleteByRange deletes all keys with a prefix in the range [startPrefix, endPrefix] (both inclusive). func (b *ReaderBatchWriter) DeleteByRange(_ storage.Reader, startPrefix, endPrefix []byte) error { // DeleteRange takes the prefix range with start (inclusive) and end (exclusive, note: not inclusive). // therefore, we need to increment the endPrefix to make it inclusive. diff --git a/storage/operation/reads.go b/storage/operation/reads.go index 2e6be8dd3fe..1be299ab9d8 100644 --- a/storage/operation/reads.go +++ b/storage/operation/reads.go @@ -28,7 +28,7 @@ type CreateFunc func() interface{} type HandleFunc func() error type IterationFunc func() (CheckFunc, CreateFunc, HandleFunc) -// IterateKeysInPrefixRange will iterate over all keys in the given range [startPrefix, endPrefix] (both inclusive) +// IterateKeysInPrefixRange will iterate over all keys with prefixes in the range [startPrefix, endPrefix] (both inclusive) func IterateKeysInPrefixRange(startPrefix []byte, endPrefix []byte, check func(key []byte) error) func(storage.Reader) error { return Iterate(startPrefix, endPrefix, func() (CheckFunc, CreateFunc, HandleFunc) { return func(key []byte) (bool, error) { @@ -41,7 +41,7 @@ func IterateKeysInPrefixRange(startPrefix []byte, endPrefix []byte, check func(k }, storage.IteratorOption{IterateKeyOnly: true}) } -// Iterate will iterate over all keys in the given range [startPrefix, endPrefix] (both inclusive) +// Iterate will iterate over all keys with prefixes in the given range [startPrefix, endPrefix] (both inclusive) func Iterate(startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) func(storage.Reader) error { return func(r storage.Reader) error { @@ -64,7 +64,7 @@ func Iterate(startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt s } defer it.Close() - for it.SeekGE(); it.Valid(); it.Next() { + for it.First(); it.Valid(); it.Next() { item := it.IterItem() key := item.Key() @@ -72,6 +72,9 @@ func Iterate(startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt s check, create, handle := iterFunc() keyCopy := make([]byte, len(key)) + + // The underlying database may re-use and modify the backing memory of the returned key. + // Tor safety we proactively make a copy before passing the key to the upper layer. copy(keyCopy, key) // check if we should process the item at all @@ -112,25 +115,7 @@ func Iterate(startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt s // Traverse will iterate over all keys with the given prefix func Traverse(prefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) func(storage.Reader) error { - return Iterate(prefix, PrefixUpperBound(prefix), iterFunc, opt) -} - -// PrefixUpperBound returns a key K such that all possible keys beginning with the input prefix -// sort lower than K according to the byte-wise lexicographic key ordering used by Pebble. -// This is used to define an upper bound for iteration, when we want to iterate over -// all keys beginning with a given prefix. -// referred to https://pkg.go.dev/github.com/cockroachdb/pebble#example-Iterator-PrefixIteration -func PrefixUpperBound(prefix []byte) []byte { - end := make([]byte, len(prefix)) - copy(end, prefix) - for i := len(end) - 1; i >= 0; i-- { - // increment the bytes by 1 - end[i] = end[i] + 1 - if end[i] != 0 { - return end[:i+1] - } - } - return nil // no upper-bound + return Iterate(prefix, prefix, iterFunc, opt) } // Exists returns true if a key exists in the database. @@ -155,7 +140,7 @@ func Exists(key []byte, keyExists *bool) func(storage.Reader) error { } } -// retrieve will retrieve the binary data under the given key from the badger DB +// Retrieve will retrieve the binary data under the given key from the database // and decode it into the given entity. The provided entity needs to be a // pointer to an initialized entity of the correct type. // Error returns: @@ -196,7 +181,7 @@ func FindHighestAtOrBelow(prefix []byte, height uint64, entity interface{}) func var highestKey []byte // find highest value below the given height - for it.SeekGE(); it.Valid(); it.Next() { + for it.First(); it.Valid(); it.Next() { highestKey = it.IterItem().Key() } diff --git a/storage/operation/reads_test.go b/storage/operation/reads_test.go index e24bc15b5ae..b9addec418d 100644 --- a/storage/operation/reads_test.go +++ b/storage/operation/reads_test.go @@ -35,9 +35,9 @@ func TestIterateKeysInPrefixRange(t *testing.T) { {0x21, 0x00}, } - // Keys expected to be in the prefix range - lastNToExclude := 1 - keysInRange := keys[1 : len(keys)-lastNToExclude] // these keys are between the start and end + // The first and last keys are outside the prefix range, so we omit them + // from keysInRange, which is the set of keys we expect in the iteration + keysInRange := keys[1 : len(keys)-1] // Insert the keys into the storage require.NoError(t, withWriter(func(writer storage.Writer) error { @@ -63,20 +63,21 @@ func TestIterateKeysInPrefixRange(t *testing.T) { func TestTraverse(t *testing.T) { dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { - keys := [][]byte{ - {0x42, 0x00}, - {0xff}, - {0x42, 0x56}, - {0x00}, - {0x42, 0xff}, + keyVals := map[[2]byte]uint64{ + {0x41, 0xff}: 3, + {0x42, 0x00}: 11, + {0xff}: 13, + {0x42, 0x56}: 17, + {0x00}: 19, + {0x42, 0xff}: 23, + {0x43, 0x00}: 33, } - vals := []uint64{11, 13, 17, 19, 23} expected := []uint64{11, 23} // Insert the keys and values into storage require.NoError(t, withWriter(func(writer storage.Writer) error { - for i, key := range keys { - err := operation.Upsert(key, vals[i])(writer) + for key, val := range keyVals { + err := operation.Upsert(key[:], val)(writer) if err != nil { return err } @@ -84,7 +85,7 @@ func TestTraverse(t *testing.T) { return nil })) - actual := make([]uint64, 0, len(keys)) + actual := make([]uint64, 0, len(keyVals)) // Define the iteration logic iterationFunc := func() (operation.CheckFunc, operation.CreateFunc, operation.HandleFunc) { diff --git a/storage/operation/writes_test.go b/storage/operation/writes_test.go index aa7b5020b1a..9355b5822db 100644 --- a/storage/operation/writes_test.go +++ b/storage/operation/writes_test.go @@ -12,7 +12,6 @@ import ( "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/operation" "github.com/onflow/flow-go/storage/operation/dbtest" - "github.com/onflow/flow-go/utils/unittest" ) func TestReadWrite(t *testing.T) { @@ -230,16 +229,15 @@ func TestRemoveRange(t *testing.T) { // Remove the keys in the prefix range require.NoError(t, withWriter(operation.RemoveByPrefix(r, prefix))) - lg := unittest.Logger().With().Logger() // Verify that the keys in the prefix range have been removed for i, key := range keys { var exists bool require.NoError(t, operation.Exists(key, &exists)(r)) - lg.Info().Msgf("key %x exists: %t", key, exists) + t.Logf("key %x exists: %t", key, exists) deleted := includeStart <= i && i <= includeEnd - // deleted item should not exist + // An item that was not deleted must exist require.Equal(t, !deleted, exists, "expected key %x to be %s", key, map[bool]string{true: "deleted", false: "not deleted"}) } diff --git a/storage/operations.go b/storage/operations.go index c261d4ba28c..e0fe101f636 100644 --- a/storage/operations.go +++ b/storage/operations.go @@ -6,8 +6,8 @@ import ( // Iterator is an interface for iterating over key-value pairs in a storage backend. type Iterator interface { - // SeekGE seeks to the smallest key greater than or equal to the given key. - SeekGE() + // First seeks to the smallest key greater than or equal to the given key. + First() // Valid returns whether the iterator is positioned at a valid key-value pair. Valid() bool @@ -15,10 +15,11 @@ type Iterator interface { // Next advances the iterator to the next key-value pair. Next() - // Key returns the key of the current key-value pair, or nil if done. + // IterItem returns the current key-value pair, or nil if done. IterItem() IterItem // Close closes the iterator. Iterator must be closed, otherwise it causes memory leak. + // No errors expected during normal operation Close() error } @@ -28,6 +29,7 @@ type IterItem interface { // Value returns the value of the current key-value pair // The reason it takes a function is to follow badgerDB's API pattern + // No errors expected during normal operation Value(func(val []byte) error) error } @@ -44,6 +46,7 @@ func DefaultIteratorOptions() IteratorOption { type Reader interface { // Get gets the value for the given key. It returns ErrNotFound if the DB // does not contain the key. + // other errors are exceptions // // The caller should not modify the contents of the returned slice, but it is // safe to modify the contents of the argument after Get returns. The @@ -51,7 +54,11 @@ type Reader interface { // success, the caller MUST call closer.Close() or a memory leak will occur. Get(key []byte) (value []byte, closer io.Closer, err error) - // NewIter returns a new Iterator for the given key range [startPrefix, endPrefix], both inclusive. + // NewIter returns a new Iterator for the given key prefix range [startPrefix, endPrefix], both inclusive. + // Specifically, all keys that meet ANY of the following conditions are included in the iteration: + // - have a prefix equal to startPrefix OR + // - have a prefix equal to the endPrefix OR + // - have a prefix that is lexicographically between startPrefix and endPrefix NewIter(startPrefix, endPrefix []byte, ops IteratorOption) (Iterator, error) } @@ -61,20 +68,26 @@ type Writer interface { // for that key; a DB is not a multi-map. // // It is safe to modify the contents of the arguments after Set returns. + // No errors expected during normal operation Set(k, v []byte) error // Delete deletes the value for the given key. Deletes are blind all will // succeed even if the given key does not exist. // // It is safe to modify the contents of the arguments after Delete returns. + // No errors expected during normal operation Delete(key []byte) error // DeleteByRange removes all keys with a prefix that falls within the // range [start, end], both inclusive. + // No errors expected during normal operation DeleteByRange(globalReader Reader, startPrefix, endPrefix []byte) error } // ReaderBatchWriter is an interface for reading and writing to a storage backend. +// It is useful for performing a related sequence of reads and writes, after which you would like +// to modify some non-database state if the sequence completed successfully (via AddCallback). +// If you are not using AddCallback, avoid using ReaderBatchWriter: use Reader and Writer directly. type ReaderBatchWriter interface { // GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation"). // This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed. @@ -104,21 +117,21 @@ func OnCommitSucceed(b ReaderBatchWriter, onSuccessFn func()) { } func StartEndPrefixToLowerUpperBound(startPrefix, endPrefix []byte) (lowerBound, upperBound []byte) { - // LowerBound specifies the smallest key to iterate and it's inclusive. - // UpperBound specifies the largest key to iterate and it's exclusive (not inclusive) - // in order to match all keys prefixed with the `end` bytes, we increment the bytes of end by 1, + // Return value lowerBound specifies the smallest key to iterate and it's inclusive. + // Return value upperBound specifies the largest key to iterate and it's exclusive (not inclusive) + // in order to match all keys prefixed with `endPrefix`, we increment the bytes of `endPrefix` by 1, // for instance, to iterate keys between "hello" and "world", // we use "hello" as LowerBound, "worle" as UpperBound, so that "world", "world1", "worldffff...ffff" // will all be included. - return startPrefix, prefixUpperBound(endPrefix) + return startPrefix, PrefixUpperBound(endPrefix) } -// prefixUpperBound returns a key K such that all possible keys beginning with the input prefix +// PrefixUpperBound returns a key K such that all possible keys beginning with the input prefix // sort lower than K according to the byte-wise lexicographic key ordering used by Pebble. // This is used to define an upper bound for iteration, when we want to iterate over // all keys beginning with a given prefix. // referred to https://pkg.go.dev/github.com/cockroachdb/pebble#example-Iterator-PrefixIteration -func prefixUpperBound(prefix []byte) []byte { +func PrefixUpperBound(prefix []byte) []byte { end := make([]byte, len(prefix)) copy(end, prefix) for i := len(end) - 1; i >= 0; i-- { diff --git a/utils/unittest/unittest.go b/utils/unittest/unittest.go index d15f39cd27c..5edcd3d477e 100644 --- a/utils/unittest/unittest.go +++ b/utils/unittest/unittest.go @@ -408,34 +408,6 @@ func TypedPebbleDB(t testing.TB, dir string, create func(string, *pebble.Options return db } -type PebbleWrapper struct { - db *pebble.DB -} - -func (p *PebbleWrapper) View(fn func(pebble.Reader) error) error { - return fn(p.db) -} - -func (p *PebbleWrapper) Update(fn func(pebble.Writer) error) error { - return fn(p.db) -} - -func (p *PebbleWrapper) DB() *pebble.DB { - return p.db -} - -func RunWithWrappedPebbleDB(t testing.TB, f func(p *PebbleWrapper)) { - RunWithTempDir(t, func(dir string) { - db, err := pebble.Open(dir, &pebble.Options{}) - require.NoError(t, err) - defer func() { - assert.NoError(t, db.Close()) - }() - f(&PebbleWrapper{db}) - }) - -} - func RunWithTypedPebbleDB( t testing.TB, create func(string, *pebble.Options) (*pebble.DB, error), From 17972e0e6a779473d71f9cc491b09b20ce0aff59 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 1 Nov 2024 17:05:58 -0700 Subject: [PATCH 4/4] update comments --- storage/operation/badgerimpl/iterator.go | 6 ++++++ storage/operation/badgerimpl/reader.go | 13 ++++++++++++ storage/operation/badgerimpl/writer.go | 27 ++++++++++++++++++++++++ storage/operation/pebbleimpl/iterator.go | 6 ++++++ storage/operation/pebbleimpl/reader.go | 13 ++++++++++++ storage/operation/pebbleimpl/writer.go | 27 +++++++++++++++++++++++- storage/operations.go | 1 + 7 files changed, 92 insertions(+), 1 deletion(-) diff --git a/storage/operation/badgerimpl/iterator.go b/storage/operation/badgerimpl/iterator.go index e9f8b5dc6be..5cc5fc50340 100644 --- a/storage/operation/badgerimpl/iterator.go +++ b/storage/operation/badgerimpl/iterator.go @@ -34,10 +34,12 @@ func newBadgerIterator(db *badger.DB, startPrefix, endPrefix []byte, ops storage } } +// First seeks to the smallest key greater than or equal to the given key. func (i *badgerIterator) First() { i.iter.Seek(i.lowerBound) } +// Valid returns whether the iterator is positioned at a valid key-value pair. func (i *badgerIterator) Valid() bool { // if it's beyond the upper bound, it's invalid if !i.iter.Valid() { @@ -49,16 +51,20 @@ func (i *badgerIterator) Valid() bool { return valid } +// Next advances the iterator to the next key-value pair. func (i *badgerIterator) Next() { i.iter.Next() } +// IterItem returns the current key-value pair, or nil if done. func (i *badgerIterator) IterItem() storage.IterItem { return i.iter.Item() } var _ storage.IterItem = (*badger.Item)(nil) +// Close closes the iterator. Iterator must be closed, otherwise it causes memory leak. +// No errors expected during normal operation func (i *badgerIterator) Close() error { i.iter.Close() return nil diff --git a/storage/operation/badgerimpl/reader.go b/storage/operation/badgerimpl/reader.go index 8d7d982d65e..a410067a6b7 100644 --- a/storage/operation/badgerimpl/reader.go +++ b/storage/operation/badgerimpl/reader.go @@ -20,6 +20,14 @@ var _ io.Closer = (*noopCloser)(nil) func (noopCloser) Close() error { return nil } +// Get gets the value for the given key. It returns ErrNotFound if the DB +// does not contain the key. +// other errors are exceptions +// +// The caller should not modify the contents of the returned slice, but it is +// safe to modify the contents of the argument after Get returns. The +// returned slice will remain valid until the returned Closer is closed. On +// success, the caller MUST call closer.Close() or a memory leak will occur. func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) { tx := b.db.NewTransaction(false) defer tx.Discard() @@ -40,6 +48,11 @@ func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) { return value, noopCloser{}, nil } +// NewIter returns a new Iterator for the given key prefix range [startPrefix, endPrefix], both inclusive. +// Specifically, all keys that meet ANY of the following conditions are included in the iteration: +// - have a prefix equal to startPrefix OR +// - have a prefix equal to the endPrefix OR +// - have a prefix that is lexicographically between startPrefix and endPrefix func (b dbReader) NewIter(startPrefix, endPrefix []byte, ops storage.IteratorOption) (storage.Iterator, error) { return newBadgerIterator(b.db, startPrefix, endPrefix, ops), nil } diff --git a/storage/operation/badgerimpl/writer.go b/storage/operation/badgerimpl/writer.go index 3837be3917f..769187166ba 100644 --- a/storage/operation/badgerimpl/writer.go +++ b/storage/operation/badgerimpl/writer.go @@ -19,22 +19,36 @@ type ReaderBatchWriter struct { var _ storage.ReaderBatchWriter = (*ReaderBatchWriter)(nil) +// GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation"). +// This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed. +// This reader may observe different values for the same key on subsequent reads. func (b *ReaderBatchWriter) GlobalReader() storage.Reader { return b.globalReader } +// Writer returns a writer associated with a batch of writes. The batch is pending until it is committed. +// When we `Write` into the batch, that write operation is added to the pending batch, but not committed. +// The commit operation is atomic w.r.t. the batch; either all writes are applied to the database, or no writes are. +// Note: +// - The writer cannot be used concurrently for writing. func (b *ReaderBatchWriter) Writer() storage.Writer { return b } +// BadgerWriteBatch returns the badger write batch func (b *ReaderBatchWriter) BadgerWriteBatch() *badger.WriteBatch { return b.batch } +// AddCallback adds a callback to execute after the batch has been flush +// regardless the batch update is succeeded or failed. +// The error parameter is the error returned by the batch update. func (b *ReaderBatchWriter) AddCallback(callback func(error)) { b.callbacks.AddCallback(callback) } +// Commit flushes the batch to the database. +// No errors expected during normal operation func (b *ReaderBatchWriter) Commit() error { err := b.batch.Flush() @@ -69,14 +83,27 @@ func NewReaderBatchWriter(db *badger.DB) *ReaderBatchWriter { var _ storage.Writer = (*ReaderBatchWriter)(nil) +// Set sets the value for the given key. It overwrites any previous value +// for that key; a DB is not a multi-map. +// +// It is safe to modify the contents of the arguments after Set returns. +// No errors expected during normal operation func (b *ReaderBatchWriter) Set(key, value []byte) error { return b.batch.Set(key, value) } +// Delete deletes the value for the given key. Deletes are blind all will +// succeed even if the given key does not exist. +// +// It is safe to modify the contents of the arguments after Delete returns. +// No errors expected during normal operation func (b *ReaderBatchWriter) Delete(key []byte) error { return b.batch.Delete(key) } +// DeleteByRange removes all keys with a prefix that falls within the +// range [start, end], both inclusive. +// No errors expected during normal operation func (b *ReaderBatchWriter) DeleteByRange(globalReader storage.Reader, startPrefix, endPrefix []byte) error { err := operation.IterateKeysInPrefixRange(startPrefix, endPrefix, func(key []byte) error { err := b.batch.Delete(key) diff --git a/storage/operation/pebbleimpl/iterator.go b/storage/operation/pebbleimpl/iterator.go index b1b6630cc51..bc0cd2bae69 100644 --- a/storage/operation/pebbleimpl/iterator.go +++ b/storage/operation/pebbleimpl/iterator.go @@ -32,18 +32,22 @@ func newPebbleIterator(reader pebble.Reader, startPrefix, endPrefix []byte, ops }, nil } +// First seeks to the smallest key greater than or equal to the given key. func (i *pebbleIterator) First() { i.iter.First() } +// Valid returns whether the iterator is positioned at a valid key-value pair. func (i *pebbleIterator) Valid() bool { return i.iter.Valid() } +// Next advances the iterator to the next key-value pair. func (i *pebbleIterator) Next() { i.iter.Next() } +// IterItem returns the current key-value pair, or nil if done. func (i *pebbleIterator) IterItem() storage.IterItem { return pebbleIterItem{iter: i.iter} } @@ -67,6 +71,8 @@ func (i pebbleIterItem) Value(fn func([]byte) error) error { return fn(val) } +// Close closes the iterator. Iterator must be closed, otherwise it causes memory leak. +// No errors expected during normal operation func (i *pebbleIterator) Close() error { return i.iter.Close() } diff --git a/storage/operation/pebbleimpl/reader.go b/storage/operation/pebbleimpl/reader.go index 6cfdfd93da5..cff5a916048 100644 --- a/storage/operation/pebbleimpl/reader.go +++ b/storage/operation/pebbleimpl/reader.go @@ -22,6 +22,14 @@ var _ io.Closer = (*noopCloser)(nil) func (noopCloser) Close() error { return nil } +// Get gets the value for the given key. It returns ErrNotFound if the DB +// does not contain the key. +// other errors are exceptions +// +// The caller should not modify the contents of the returned slice, but it is +// safe to modify the contents of the argument after Get returns. The +// returned slice will remain valid until the returned Closer is closed. On +// success, the caller MUST call closer.Close() or a memory leak will occur. func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) { value, closer, err := b.db.Get(key) @@ -37,6 +45,11 @@ func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) { return value, closer, nil } +// NewIter returns a new Iterator for the given key prefix range [startPrefix, endPrefix], both inclusive. +// Specifically, all keys that meet ANY of the following conditions are included in the iteration: +// - have a prefix equal to startPrefix OR +// - have a prefix equal to the endPrefix OR +// - have a prefix that is lexicographically between startPrefix and endPrefix func (b dbReader) NewIter(startPrefix, endPrefix []byte, ops storage.IteratorOption) (storage.Iterator, error) { return newPebbleIterator(b.db, startPrefix, endPrefix, ops) } diff --git a/storage/operation/pebbleimpl/writer.go b/storage/operation/pebbleimpl/writer.go index c6ccdff06b9..3525bb59f2c 100644 --- a/storage/operation/pebbleimpl/writer.go +++ b/storage/operation/pebbleimpl/writer.go @@ -16,10 +16,18 @@ type ReaderBatchWriter struct { var _ storage.ReaderBatchWriter = (*ReaderBatchWriter)(nil) +// GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation"). +// This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed. +// This reader may observe different values for the same key on subsequent reads. func (b *ReaderBatchWriter) GlobalReader() storage.Reader { return b.globalReader } +// Writer returns a writer associated with a batch of writes. The batch is pending until it is committed. +// When we `Write` into the batch, that write operation is added to the pending batch, but not committed. +// The commit operation is atomic w.r.t. the batch; either all writes are applied to the database, or no writes are. +// Note: +// - The writer cannot be used concurrently for writing. func (b *ReaderBatchWriter) Writer() storage.Writer { return b } @@ -28,10 +36,15 @@ func (b *ReaderBatchWriter) PebbleWriterBatch() *pebble.Batch { return b.batch } +// AddCallback adds a callback to execute after the batch has been flush +// regardless the batch update is succeeded or failed. +// The error parameter is the error returned by the batch update. func (b *ReaderBatchWriter) AddCallback(callback func(error)) { b.callbacks.AddCallback(callback) } +// Commit flushes the batch to the database. +// No errors expected during normal operation func (b *ReaderBatchWriter) Commit() error { err := b.batch.Commit(pebble.Sync) @@ -66,15 +79,27 @@ func NewReaderBatchWriter(db *pebble.DB) *ReaderBatchWriter { var _ storage.Writer = (*ReaderBatchWriter)(nil) +// Set sets the value for the given key. It overwrites any previous value +// for that key; a DB is not a multi-map. +// +// It is safe to modify the contents of the arguments after Set returns. +// No errors expected during normal operation func (b *ReaderBatchWriter) Set(key, value []byte) error { return b.batch.Set(key, value, pebble.Sync) } +// Delete deletes the value for the given key. Deletes are blind all will +// succeed even if the given key does not exist. +// +// It is safe to modify the contents of the arguments after Delete returns. +// No errors expected during normal operation func (b *ReaderBatchWriter) Delete(key []byte) error { return b.batch.Delete(key, pebble.Sync) } -// DeleteByRange deletes all keys with a prefix in the range [startPrefix, endPrefix] (both inclusive). +// DeleteByRange removes all keys with a prefix that falls within the +// range [start, end], both inclusive. +// No errors expected during normal operation func (b *ReaderBatchWriter) DeleteByRange(_ storage.Reader, startPrefix, endPrefix []byte) error { // DeleteRange takes the prefix range with start (inclusive) and end (exclusive, note: not inclusive). // therefore, we need to increment the endPrefix to make it inclusive. diff --git a/storage/operations.go b/storage/operations.go index e0fe101f636..d407da299e7 100644 --- a/storage/operations.go +++ b/storage/operations.go @@ -63,6 +63,7 @@ type Reader interface { } // Writer is an interface for batch writing to a storage backend. +// It cannot be used concurrently for writing. type Writer interface { // Set sets the value for the given key. It overwrites any previous value // for that key; a DB is not a multi-map.