diff --git a/storage/badger/approvals.go b/storage/badger/approvals.go index eb3cf4ae820..3023204290c 100644 --- a/storage/badger/approvals.go +++ b/storage/badger/approvals.go @@ -3,6 +3,7 @@ package badger import ( "errors" "fmt" + "sync" "github.com/dgraph-io/badger/v2" @@ -11,24 +12,24 @@ import ( "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/badger/operation" - "github.com/onflow/flow-go/storage/badger/transaction" ) // ResultApprovals implements persistent storage for result approvals. type ResultApprovals struct { - db *badger.DB - cache *Cache[flow.Identifier, *flow.ResultApproval] + db *badger.DB + cache *CacheB[flow.Identifier, *flow.ResultApproval] + indexing *sync.Mutex // preventing concurrent indexing of approvals } func NewResultApprovals(collector module.CacheMetrics, db *badger.DB) *ResultApprovals { - store := func(key flow.Identifier, val *flow.ResultApproval) func(*transaction.Tx) error { - return transaction.WithTx(operation.SkipDuplicates(operation.InsertResultApproval(val))) + store := func(key flow.Identifier, val *flow.ResultApproval) func(storage.BadgerReaderBatchWriter) error { + return storage.OnlyBadgerWriter(operation.InsertResultApproval(val)) } - retrieve := func(approvalID flow.Identifier) func(tx *badger.Txn) (*flow.ResultApproval, error) { + retrieve := func(approvalID flow.Identifier) func(tx storage.Reader) (*flow.ResultApproval, error) { var approval flow.ResultApproval - return func(tx *badger.Txn) (*flow.ResultApproval, error) { + return func(tx storage.Reader) (*flow.ResultApproval, error) { err := operation.RetrieveResultApproval(approvalID, &approval)(tx) return &approval, err } @@ -36,21 +37,22 @@ func NewResultApprovals(collector module.CacheMetrics, db *badger.DB) *ResultApp res := &ResultApprovals{ db: db, - cache: newCache[flow.Identifier, *flow.ResultApproval](collector, metrics.ResourceResultApprovals, - withLimit[flow.Identifier, *flow.ResultApproval](flow.DefaultTransactionExpiry+100), - withStore[flow.Identifier, *flow.ResultApproval](store), - withRetrieve[flow.Identifier, *flow.ResultApproval](retrieve)), + cache: newCacheB[flow.Identifier, *flow.ResultApproval](collector, metrics.ResourceResultApprovals, + withLimitB[flow.Identifier, *flow.ResultApproval](flow.DefaultTransactionExpiry+100), + withStoreB[flow.Identifier, *flow.ResultApproval](store), + withRetrieveB[flow.Identifier, *flow.ResultApproval](retrieve)), + indexing: new(sync.Mutex), } return res } -func (r *ResultApprovals) store(approval *flow.ResultApproval) func(*transaction.Tx) error { +func (r *ResultApprovals) store(approval *flow.ResultApproval) func(storage.BadgerReaderBatchWriter) error { return r.cache.PutTx(approval.ID(), approval) } -func (r *ResultApprovals) byID(approvalID flow.Identifier) func(*badger.Txn) (*flow.ResultApproval, error) { - return func(tx *badger.Txn) (*flow.ResultApproval, error) { +func (r *ResultApprovals) byID(approvalID flow.Identifier) func(storage.Reader) (*flow.ResultApproval, error) { + return func(tx storage.Reader) (*flow.ResultApproval, error) { val, err := r.cache.Get(approvalID)(tx) if err != nil { return nil, err @@ -59,8 +61,8 @@ func (r *ResultApprovals) byID(approvalID flow.Identifier) func(*badger.Txn) (*f } } -func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) func(*badger.Txn) (*flow.ResultApproval, error) { - return func(tx *badger.Txn) (*flow.ResultApproval, error) { +func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) func(storage.Reader) (*flow.ResultApproval, error) { + return func(tx storage.Reader) (*flow.ResultApproval, error) { var approvalID flow.Identifier err := operation.LookupResultApproval(resultID, chunkIndex, &approvalID)(tx) if err != nil { @@ -70,29 +72,26 @@ func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) f } } -func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(*badger.Txn) error { - return func(tx *badger.Txn) error { - err := operation.IndexResultApproval(resultID, chunkIndex, approvalID)(tx) - if err == nil { - return nil - } +// CAUTION: Caller must acquire `indexing` lock. +func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.BadgerReaderBatchWriter) error { + return func(rw storage.BadgerReaderBatchWriter) error { + var storedApprovalID flow.Identifier + err := operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(rw.GlobalReader()) + if err != nil { + if !errors.Is(err, storage.ErrNotFound) { + return fmt.Errorf("could not lookup result approval ID: %w", err) + } + + // no approval found, index the approval - if !errors.Is(err, storage.ErrAlreadyExists) { - return err + return operation.UnsafeIndexResultApproval(resultID, chunkIndex, approvalID)(rw.Writer()) } - // When trying to index an approval for a result, and there is already - // an approval for the result, double check if the indexed approval is - // the same. + // an approval is already indexed, double check if it is the same // We don't allow indexing multiple approvals per chunk because the // store is only used within Verification nodes, and it is impossible // for a Verification node to compute different approvals for the same // chunk. - var storedApprovalID flow.Identifier - err = operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(tx) - if err != nil { - return fmt.Errorf("there is an approval stored already, but cannot retrieve it: %w", err) - } if storedApprovalID != approvalID { return fmt.Errorf("attempting to store conflicting approval (result: %v, chunk index: %d): storing: %v, stored: %v. %w", @@ -105,14 +104,22 @@ func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, app // Store stores a ResultApproval func (r *ResultApprovals) Store(approval *flow.ResultApproval) error { - return operation.RetryOnConflictTx(r.db, transaction.Update, r.store(approval)) + return operation.WithReaderBatchWriter(r.db, r.store(approval)) } // Index indexes a ResultApproval by chunk (ResultID + chunk index). // operation is idempotent (repeated calls with the same value are equivalent to // just calling the method once; still the method succeeds on each call). func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) error { - err := operation.RetryOnConflict(r.db.Update, r.index(resultID, chunkIndex, approvalID)) + // acquring the lock to prevent dirty reads when checking conflicted approvals + // how it works: + // the lock can only be acquired after the index operation is committed to the database, + // since the index operation is the only operation that would affect the reads operation, + // no writes can go through util the lock is released, so locking here could prevent dirty reads. + r.indexing.Lock() + defer r.indexing.Unlock() + + err := operation.WithReaderBatchWriter(r.db, r.index(resultID, chunkIndex, approvalID)) if err != nil { return fmt.Errorf("could not index result approval: %w", err) } @@ -121,16 +128,12 @@ func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, app // ByID retrieves a ResultApproval by its ID func (r *ResultApprovals) ByID(approvalID flow.Identifier) (*flow.ResultApproval, error) { - tx := r.db.NewTransaction(false) - defer tx.Discard() - return r.byID(approvalID)(tx) + return r.byID(approvalID)(operation.ToReader(r.db)) } // ByChunk retrieves a ResultApproval by result ID and chunk index. The // ResultApprovals store is only used within a verification node, where it is // assumed that there is never more than one approval per chunk. func (r *ResultApprovals) ByChunk(resultID flow.Identifier, chunkIndex uint64) (*flow.ResultApproval, error) { - tx := r.db.NewTransaction(false) - defer tx.Discard() - return r.byChunk(resultID, chunkIndex)(tx) + return r.byChunk(resultID, chunkIndex)(operation.ToReader(r.db)) } diff --git a/storage/badger/approvals_test.go b/storage/badger/approvals_test.go index 1b13a49ae59..5e433fa0c66 100644 --- a/storage/badger/approvals_test.go +++ b/storage/badger/approvals_test.go @@ -2,6 +2,7 @@ package badger_test import ( "errors" + "sync" "testing" "github.com/dgraph-io/badger/v2" @@ -79,3 +80,52 @@ func TestApprovalStoreTwoDifferentApprovalsShouldFail(t *testing.T) { require.True(t, errors.Is(err, storage.ErrDataMismatch)) }) } + +// verify that storing and indexing two conflicting approvals concurrently should fail +// one of them is succeed, the other one should fail +func TestApprovalStoreTwoDifferentApprovalsConcurrently(t *testing.T) { + unittest.RunWithBadgerDB(t, func(db *badger.DB) { + metrics := metrics.NewNoopCollector() + store := bstorage.NewResultApprovals(metrics, db) + + approval1 := unittest.ResultApprovalFixture() + approval2 := unittest.ResultApprovalFixture() + + var wg sync.WaitGroup + wg.Add(2) + + var firstIndexErr, secondIndexErr error + + // First goroutine stores and indexes the first approval. + go func() { + defer wg.Done() + + err := store.Store(approval1) + require.NoError(t, err) + + firstIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval1.ID()) + }() + + // Second goroutine stores and tries to index the second approval for the same chunk. + go func() { + defer wg.Done() + + err := store.Store(approval2) + require.NoError(t, err) + + secondIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID()) + }() + + // Wait for both goroutines to finish + wg.Wait() + + // Check that one of the Index operations succeeded and the other failed + if firstIndexErr == nil { + require.Error(t, secondIndexErr) + require.True(t, errors.Is(secondIndexErr, storage.ErrDataMismatch)) + } else { + require.NoError(t, secondIndexErr) + require.True(t, errors.Is(firstIndexErr, storage.ErrDataMismatch)) + } + }) +} diff --git a/storage/badger/cache_b.go b/storage/badger/cache_b.go new file mode 100644 index 00000000000..5e786d51146 --- /dev/null +++ b/storage/badger/cache_b.go @@ -0,0 +1,150 @@ +package badger + +import ( + "errors" + "fmt" + + lru "github.com/hashicorp/golang-lru/v2" + + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/storage" +) + +func withLimitB[K comparable, V any](limit uint) func(*CacheB[K, V]) { + return func(c *CacheB[K, V]) { + c.limit = limit + } +} + +type storeFuncB[K comparable, V any] func(key K, val V) func(storage.BadgerReaderBatchWriter) error + +func withStoreB[K comparable, V any](store storeFuncB[K, V]) func(*CacheB[K, V]) { + return func(c *CacheB[K, V]) { + c.store = store + } +} + +func noStoreB[K comparable, V any](_ K, _ V) func(storage.BadgerReaderBatchWriter) error { + return func(tx storage.BadgerReaderBatchWriter) error { + return fmt.Errorf("no store function for cache put available") + } +} + +// nolint: unused +func noopStoreB[K comparable, V any](_ K, _ V) func(storage.BadgerReaderBatchWriter) error { + return func(tx storage.BadgerReaderBatchWriter) error { + return nil + } +} + +type retrieveFuncB[K comparable, V any] func(key K) func(storage.Reader) (V, error) + +func withRetrieveB[K comparable, V any](retrieve retrieveFuncB[K, V]) func(*CacheB[K, V]) { + return func(c *CacheB[K, V]) { + c.retrieve = retrieve + } +} + +func noRetrieveB[K comparable, V any](_ K) func(storage.Reader) (V, error) { + return func(tx storage.Reader) (V, error) { + var nullV V + return nullV, fmt.Errorf("no retrieve function for cache get available") + } +} + +type CacheB[K comparable, V any] struct { + metrics module.CacheMetrics + limit uint + store storeFuncB[K, V] + retrieve retrieveFuncB[K, V] + resource string + cache *lru.Cache[K, V] +} + +func newCacheB[K comparable, V any](collector module.CacheMetrics, resourceName string, options ...func(*CacheB[K, V])) *CacheB[K, V] { + c := CacheB[K, V]{ + metrics: collector, + limit: 1000, + store: noStoreB[K, V], + retrieve: noRetrieveB[K, V], + resource: resourceName, + } + for _, option := range options { + option(&c) + } + c.cache, _ = lru.New[K, V](int(c.limit)) + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + return &c +} + +// IsCached returns true if the key exists in the cache. +// It DOES NOT check whether the key exists in the underlying data store. +func (c *CacheB[K, V]) IsCached(key K) bool { + return c.cache.Contains(key) +} + +// Get will try to retrieve the resource from cache first, and then from the +// injected. During normal operations, the following error returns are expected: +// - `storage.ErrNotFound` if key is unknown. +func (c *CacheB[K, V]) Get(key K) func(storage.Reader) (V, error) { + return func(r storage.Reader) (V, error) { + + // check if we have it in the cache + resource, cached := c.cache.Get(key) + if cached { + c.metrics.CacheHit(c.resource) + return resource, nil + } + + // get it from the database + resource, err := c.retrieve(key)(r) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + c.metrics.CacheNotFound(c.resource) + } + var nullV V + return nullV, fmt.Errorf("could not retrieve resource: %w", err) + } + + c.metrics.CacheMiss(c.resource) + + // cache the resource and eject least recently used one if we reached limit + evicted := c.cache.Add(key, resource) + if !evicted { + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + } + + return resource, nil + } +} + +func (c *CacheB[K, V]) Remove(key K) { + c.cache.Remove(key) +} + +// Insert will add a resource directly to the cache with the given ID +func (c *CacheB[K, V]) Insert(key K, resource V) { + // cache the resource and eject least recently used one if we reached limit + evicted := c.cache.Add(key, resource) + if !evicted { + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + } +} + +// PutTx will return tx which adds a resource to the cache with the given ID. +func (c *CacheB[K, V]) PutTx(key K, resource V) func(storage.BadgerReaderBatchWriter) error { + storeOps := c.store(key, resource) // assemble DB operations to store resource (no execution) + + return func(rw storage.BadgerReaderBatchWriter) error { + storage.OnCommitSucceed(rw, func() { + c.Insert(key, resource) + }) + + err := storeOps(rw) // execute operations to store resource + if err != nil { + return fmt.Errorf("could not store resource: %w", err) + } + + return nil + } +} diff --git a/storage/badger/operation/approvals.go b/storage/badger/operation/approvals.go index 8a994eed2a2..7c78095e090 100644 --- a/storage/badger/operation/approvals.go +++ b/storage/badger/operation/approvals.go @@ -1,31 +1,35 @@ package operation import ( - "github.com/dgraph-io/badger/v2" - "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/storage" ) // InsertResultApproval inserts a ResultApproval by ID. -func InsertResultApproval(approval *flow.ResultApproval) func(*badger.Txn) error { - return insert(makePrefix(codeResultApproval, approval.ID()), approval) +// The same key (`approval.ID()`) necessitates that the value (full `approval`) is +// also identical (otherwise, we would have a successful pre-image attack on our +// cryptographic hash function). Therefore, concurrent calls to this function are safe. +func InsertResultApproval(approval *flow.ResultApproval) func(storage.Writer) error { + return insertW(makePrefix(codeResultApproval, approval.ID()), approval) } // RetrieveResultApproval retrieves an approval by ID. -func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApproval) func(*badger.Txn) error { - return retrieve(makePrefix(codeResultApproval, approvalID), approval) +func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApproval) func(storage.Reader) error { + return retrieveR(makePrefix(codeResultApproval, approvalID), approval) } -// IndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID +// UnsafeIndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID // and chunk index. If a value for this key exists, a storage.ErrAlreadyExists // error is returned. This operation is only used by the ResultApprovals store, // which is only used within a Verification node, where it is assumed that there // is only one approval per chunk. -func IndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(*badger.Txn) error { - return insert(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) +// CAUTION: In order to prevent overwriting, use of this function must be +// synchronized with check (RetrieveResultApproval) for existance of the key. +func UnsafeIndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.Writer) error { + return insertW(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) } // LookupResultApproval finds a ResultApproval by result ID and chunk index. -func LookupResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID *flow.Identifier) func(*badger.Txn) error { - return retrieve(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) +func LookupResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID *flow.Identifier) func(storage.Reader) error { + return retrieveR(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) } diff --git a/storage/badger/operation/common.go b/storage/badger/operation/common.go index 1c293348231..09abd97a62f 100644 --- a/storage/badger/operation/common.go +++ b/storage/badger/operation/common.go @@ -44,6 +44,29 @@ func batchWrite(key []byte, entity interface{}) func(writeBatch *badger.WriteBat } } +// insertW will encode the given entity using msgpack and will insert the resulting +// binary data in the badger DB under the provided key. It will error if the +// key already exists. +// Error returns: +// - generic error in case of unexpected failure from the database layer or +// encoding failure. +func insertW(key []byte, val interface{}) func(storage.Writer) error { + return func(w storage.Writer) error { + value, err := msgpack.Marshal(val) + if err != nil { + return irrecoverable.NewExceptionf("failed to encode value: %w", err) + } + + err = w.Set(key, value) + if err != nil { + return irrecoverable.NewExceptionf("failed to store data: %w", err) + } + + return nil + } +} + +// deprecated - use insertW instead // insert will encode the given entity using msgpack and will insert the resulting // binary data in the badger DB under the provided key. It will error if the // key already exists. @@ -266,6 +289,30 @@ func retrieve(key []byte, entity interface{}) func(*badger.Txn) error { } } +// retrieve will retrieve the binary data under the given key from the badger DB +// and decode it into the given entity. The provided entity needs to be a +// pointer to an initialized entity of the correct type. +// Error returns: +// - storage.ErrNotFound if the key does not exist in the database +// - generic error in case of unexpected failure from the database layer, or failure +// to decode an existing database value +func retrieveR(key []byte, entity interface{}) func(storage.Reader) error { + return func(r storage.Reader) error { + val, closer, err := r.Get(key) + if err != nil { + return err + } + + defer closer.Close() + + err = msgpack.Unmarshal(val, entity) + if err != nil { + return irrecoverable.NewExceptionf("could not decode entity: %w", err) + } + return nil + } +} + // exists returns true if a key exists in the database. // No errors are expected during normal operation. func exists(key []byte, keyExists *bool) func(*badger.Txn) error { diff --git a/storage/badger/operation/reader_batch_writer.go b/storage/badger/operation/reader_batch_writer.go new file mode 100644 index 00000000000..3c8a26530fd --- /dev/null +++ b/storage/badger/operation/reader_batch_writer.go @@ -0,0 +1,144 @@ +package operation + +import ( + "errors" + "io" + "sync" + + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/storage" +) + +type ReaderBatchWriter struct { + db *badger.DB + batch *badger.WriteBatch + + addingCallback sync.Mutex // protect callbacks + + // callbacks are executed regardless of the success of the batch commit. + // if any function that is adding writes to the batch fails, the callbacks + // are also called with the error, in this case the callbacks are executed + // before the batch is submitted. This is useful for the locks in those functions + // to be released. + // callbacks must be non-blocking + callbacks []func(error) +} + +var _ storage.BadgerReaderBatchWriter = (*ReaderBatchWriter)(nil) + +// GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation"). +// This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed. +// This reader may observe different values for the same key on subsequent reads. +func (b *ReaderBatchWriter) GlobalReader() storage.Reader { + return b +} + +// Writer returns a writer associated with a batch of writes. The batch is pending until it is committed. +// When we `Write` into the batch, that write operation is added to the pending batch, but not committed. +// The commit operation is atomic w.r.t. the batch; either all writes are applied to the database, or no writes are. +// Note: +// - The writer cannot be used concurrently for writing. +func (b *ReaderBatchWriter) Writer() storage.Writer { + return b +} + +func (b *ReaderBatchWriter) BadgerWriteBatch() *badger.WriteBatch { + return b.batch +} + +func (b *ReaderBatchWriter) AddCallback(callback func(error)) { + b.addingCallback.Lock() + defer b.addingCallback.Unlock() + + b.callbacks = append(b.callbacks, callback) +} + +func (b *ReaderBatchWriter) Commit() error { + err := b.batch.Flush() + + b.notifyCallbacks(err) + + return err +} + +func (b *ReaderBatchWriter) notifyCallbacks(err error) { + b.addingCallback.Lock() + defer b.addingCallback.Unlock() + + for _, callback := range b.callbacks { + callback(err) + } +} + +func WithReaderBatchWriter(db *badger.DB, fn func(storage.BadgerReaderBatchWriter) error) error { + batch := NewReaderBatchWriter(db) + + err := fn(batch) + if err != nil { + // fn might use lock to ensure concurrent safety while reading and writing data + // and the lock is usually released by a callback. + // in other words, fn might hold a lock to be released by a callback, + // we need to notify the callback for the locks to be released before + // returning the error. + batch.notifyCallbacks(err) + return err + } + + return batch.Commit() +} + +func NewReaderBatchWriter(db *badger.DB) *ReaderBatchWriter { + return &ReaderBatchWriter{ + db: db, + batch: db.NewWriteBatch(), + } +} + +// ToReader is a helper function to convert a *badger.DB to a Reader +func ToReader(db *badger.DB) storage.Reader { + return NewReaderBatchWriter(db) +} + +var _ storage.Reader = (*ReaderBatchWriter)(nil) + +type noopCloser struct{} + +var _ io.Closer = (*noopCloser)(nil) + +func (noopCloser) Close() error { return nil } + +func (b *ReaderBatchWriter) Get(key []byte) ([]byte, io.Closer, error) { + tx := b.db.NewTransaction(false) + defer tx.Discard() + + item, err := tx.Get(key) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return nil, nil, storage.ErrNotFound + } + return nil, nil, irrecoverable.NewExceptionf("could not load data: %w", err) + } + + var value []byte + err = item.Value(func(val []byte) error { + value = append([]byte{}, val...) + return nil + }) + if err != nil { + return nil, nil, irrecoverable.NewExceptionf("could not load value: %w", err) + } + + return value, noopCloser{}, nil +} + +var _ storage.Writer = (*ReaderBatchWriter)(nil) + +func (b *ReaderBatchWriter) Set(key, value []byte) error { + return b.batch.Set(key, value) +} + +func (b *ReaderBatchWriter) Delete(key []byte) error { + return b.batch.Delete(key) +} diff --git a/storage/batch.go b/storage/batch.go index 3147fc5c0e7..2b3d687d78c 100644 --- a/storage/batch.go +++ b/storage/batch.go @@ -1,11 +1,19 @@ package storage -import "github.com/dgraph-io/badger/v2" +import ( + "io" + "github.com/dgraph-io/badger/v2" +) + +// deprecated +// use Writer instead type Transaction interface { Set(key, val []byte) error } +// deprecated +// use BadgerReaderBatchWriter instead // BatchStorage serves as an abstraction over batch storage, adding ability to add ability to add extra // callbacks which fire after the batch is successfully flushed. type BatchStorage interface { @@ -20,3 +28,70 @@ type BatchStorage interface { // Flush will flush the write batch and update the cache. Flush() error } + +type Reader interface { + // Get gets the value for the given key. It returns ErrNotFound if the DB + // does not contain the key. + // + // The caller should not modify the contents of the returned slice, but it is + // safe to modify the contents of the argument after Get returns. The + // returned slice will remain valid until the returned Closer is closed. On + // success, the caller MUST call closer.Close() or a memory leak will occur. + Get(key []byte) (value []byte, closer io.Closer, err error) +} + +// Writer is an interface for batch writing to a storage backend. +type Writer interface { + // Set sets the value for the given key. It overwrites any previous value + // for that key; a DB is not a multi-map. + // + // It is safe to modify the contents of the arguments after Set returns. + Set(k, v []byte) error + + // Delete deletes the value for the given key. Deletes are blind all will + // succeed even if the given key does not exist. + // + // It is safe to modify the contents of the arguments after Delete returns. + Delete(key []byte) error +} + +// BadgerReaderBatchWriter is an interface for badger-specific reader and writer. +type BadgerReaderBatchWriter interface { + // GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation"). + // This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed. + // This reader may observe different values for the same key on subsequent reads. + GlobalReader() Reader + + // Writer returns a writer associated with a batch of writes. The batch is pending until it is committed. + // When we `Write` into the batch, that write operation is added to the pending batch, but not committed. + // The commit operation is atomic w.r.t. the batch; either all writes are applied to the database, or no writes are. + // Note: + // - The writer cannot be used concurrently for writing. + Writer() Writer + + // BadgerBatch returns the underlying batch object + // Useful for implementing badger-specific operations + BadgerWriteBatch() *badger.WriteBatch + + // AddCallback adds a callback to execute after the batch has been flush + // regardless the batch update is succeeded or failed. + // The error parameter is the error returned by the batch update. + AddCallback(func(error)) +} + +// OnlyBadgerWriter is an adapter to convert a function that takes a Writer +// to a function that takes a BadgerReaderBatchWriter. +func OnlyBadgerWriter(fn func(Writer) error) func(BadgerReaderBatchWriter) error { + return func(rw BadgerReaderBatchWriter) error { + return fn(rw.Writer()) + } +} + +// OnCommitSucceed adds a callback to execute after the batch has been successfully committed. +func OnCommitSucceed(b BadgerReaderBatchWriter, onSuccessFn func()) { + b.AddCallback(func(err error) { + if err == nil { + onSuccessFn() + } + }) +}