Skip to content

Commit

Permalink
update test cases for Index
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Aug 19, 2024
1 parent 9737a92 commit a01d317
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 8 deletions.
12 changes: 8 additions & 4 deletions storage/pebble/approvals.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,6 @@ func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) f

func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.PebbleReaderBatchWriter) error {
return func(tx storage.PebbleReaderBatchWriter) error {
// acquring the lock to prevent dirty reads of check conflicted approvals
r.indexing.Lock()
defer r.indexing.Unlock()

r, w := tx.ReaderWriter()

var storedApprovalID flow.Identifier
Expand Down Expand Up @@ -118,6 +114,14 @@ func (r *ResultApprovals) Store(approval *flow.ResultApproval) error {
// just calling the method once; still the method succeeds on each call).
// this method is concurrent-safe
func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) error {
// acquring the lock to prevent dirty reads when checking conflicted approvals
// how it works:
// the lock can only be acquired after the index operation is committed to the database,
// since the index operation is the only operation that would affect the reads operation,
// no writes can go through util the lock is released, so locking here could prevent dirty reads.
r.indexing.Lock()
defer r.indexing.Unlock()

err := operation.WithReaderBatchWriter(r.db, r.index(resultID, chunkIndex, approvalID))
if err != nil {
return fmt.Errorf("could not index result approval: %w", err)
Expand Down
60 changes: 56 additions & 4 deletions storage/pebble/approvals_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package pebble_test

import (
"errors"
"sync"
"testing"

"github.com/cockroachdb/pebble"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/utils/unittest"
)
Expand Down Expand Up @@ -72,9 +75,58 @@ func TestApprovalStoreTwoDifferentApprovalsShouldFail(t *testing.T) {
err = store.Store(approval2)
require.NoError(t, err)

// TODO: fix later once implement insert and upsert
// err = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID())
// require.Error(t, err)
// require.True(t, errors.Is(err, storage.ErrDataMismatch))
// index again with a different approval should fail
err = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID())
require.Error(t, err)
require.True(t, errors.Is(err, storage.ErrDataMismatch))
})
}

// verify that storing and indexing two conflicting approvals concurrently should fail
// one of them is succeed, the other one should fail
func TestApprovalStoreTwoDifferentApprovalsConcurrently(t *testing.T) {
unittest.RunWithPebbleDB(t, func(db *pebble.DB) {
metrics := metrics.NewNoopCollector()
store := bstorage.NewResultApprovals(metrics, db)

approval1 := unittest.ResultApprovalFixture()
approval2 := unittest.ResultApprovalFixture()

var wg sync.WaitGroup
wg.Add(2)

var firstIndexErr, secondIndexErr error

// First goroutine stores and indexes the first approval.
go func() {
defer wg.Done()

err := store.Store(approval1)
require.NoError(t, err)

firstIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval1.ID())
}()

// Second goroutine stores and tries to index the second approval for the same chunk.
go func() {
defer wg.Done()

err := store.Store(approval2)
require.NoError(t, err)

secondIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID())
}()

// Wait for both goroutines to finish
wg.Wait()

// Check that one of the Index operations succeeded and the other failed
if firstIndexErr == nil {
require.Error(t, secondIndexErr)
require.True(t, errors.Is(secondIndexErr, storage.ErrDataMismatch))
} else {
require.NoError(t, secondIndexErr)
require.True(t, errors.Is(firstIndexErr, storage.ErrDataMismatch))
}
})
}

0 comments on commit a01d317

Please sign in to comment.