diff --git a/storage/pebble/approvals.go b/storage/pebble/approvals.go index a521e6757af..2f4b1575197 100644 --- a/storage/pebble/approvals.go +++ b/storage/pebble/approvals.go @@ -74,10 +74,6 @@ func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) f func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.PebbleReaderBatchWriter) error { return func(tx storage.PebbleReaderBatchWriter) error { - // acquring the lock to prevent dirty reads of check conflicted approvals - r.indexing.Lock() - defer r.indexing.Unlock() - r, w := tx.ReaderWriter() var storedApprovalID flow.Identifier @@ -118,6 +114,14 @@ func (r *ResultApprovals) Store(approval *flow.ResultApproval) error { // just calling the method once; still the method succeeds on each call). // this method is concurrent-safe func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) error { + // acquring the lock to prevent dirty reads when checking conflicted approvals + // how it works: + // the lock can only be acquired after the index operation is committed to the database, + // since the index operation is the only operation that would affect the reads operation, + // no writes can go through util the lock is released, so locking here could prevent dirty reads. + r.indexing.Lock() + defer r.indexing.Unlock() + err := operation.WithReaderBatchWriter(r.db, r.index(resultID, chunkIndex, approvalID)) if err != nil { return fmt.Errorf("could not index result approval: %w", err) diff --git a/storage/pebble/approvals_test.go b/storage/pebble/approvals_test.go index 39f60c5bdaf..f1f6ed2ef45 100644 --- a/storage/pebble/approvals_test.go +++ b/storage/pebble/approvals_test.go @@ -1,12 +1,15 @@ package pebble_test import ( + "errors" + "sync" "testing" "github.com/cockroachdb/pebble" "github.com/stretchr/testify/require" "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/storage" bstorage "github.com/onflow/flow-go/storage/pebble" "github.com/onflow/flow-go/utils/unittest" ) @@ -72,9 +75,58 @@ func TestApprovalStoreTwoDifferentApprovalsShouldFail(t *testing.T) { err = store.Store(approval2) require.NoError(t, err) - // TODO: fix later once implement insert and upsert - // err = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID()) - // require.Error(t, err) - // require.True(t, errors.Is(err, storage.ErrDataMismatch)) + // index again with a different approval should fail + err = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID()) + require.Error(t, err) + require.True(t, errors.Is(err, storage.ErrDataMismatch)) + }) +} + +// verify that storing and indexing two conflicting approvals concurrently should fail +// one of them is succeed, the other one should fail +func TestApprovalStoreTwoDifferentApprovalsConcurrently(t *testing.T) { + unittest.RunWithPebbleDB(t, func(db *pebble.DB) { + metrics := metrics.NewNoopCollector() + store := bstorage.NewResultApprovals(metrics, db) + + approval1 := unittest.ResultApprovalFixture() + approval2 := unittest.ResultApprovalFixture() + + var wg sync.WaitGroup + wg.Add(2) + + var firstIndexErr, secondIndexErr error + + // First goroutine stores and indexes the first approval. + go func() { + defer wg.Done() + + err := store.Store(approval1) + require.NoError(t, err) + + firstIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval1.ID()) + }() + + // Second goroutine stores and tries to index the second approval for the same chunk. + go func() { + defer wg.Done() + + err := store.Store(approval2) + require.NoError(t, err) + + secondIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID()) + }() + + // Wait for both goroutines to finish + wg.Wait() + + // Check that one of the Index operations succeeded and the other failed + if firstIndexErr == nil { + require.Error(t, secondIndexErr) + require.True(t, errors.Is(secondIndexErr, storage.ErrDataMismatch)) + } else { + require.NoError(t, secondIndexErr) + require.True(t, errors.Is(firstIndexErr, storage.ErrDataMismatch)) + } }) }