diff --git a/disperser/batcher/finalizer.go b/disperser/batcher/finalizer.go index 0072335457..6fd21f06a6 100644 --- a/disperser/batcher/finalizer.go +++ b/disperser/batcher/finalizer.go @@ -193,12 +193,22 @@ func (f *finalizer) updateBlobs(ctx context.Context, metadatas []*disperser.Blob continue } + if confirmationBlockNumber != uint64(confirmationMetadata.ConfirmationInfo.ConfirmationBlockNumber) { + // Confirmation block number has changed due to reorg. Update the confirmation block number in the metadata + err := f.blobStore.UpdateConfirmationBlockNumber(ctx, m, uint32(confirmationBlockNumber)) + if err != nil { + f.logger.Error("error updating confirmation block number", "blobKey", blobKey.String(), "err", err) + f.metrics.IncrementNumBlobs("failed") + continue + } + } + // Leave as confirmed if the reorged confirmation block is after the latest finalized block (not yet finalized) if uint64(confirmationBlockNumber) > lastFinalBlock { continue } - _, err = f.blobStore.MarkBlobFinalized(ctx, confirmationMetadata, confirmationBlockNumber) + err = f.blobStore.MarkBlobFinalized(ctx, blobKey) if err != nil { f.logger.Error("error marking blob as finalized", "blobKey", blobKey.String(), "err", err) f.metrics.IncrementNumBlobs("failed") diff --git a/disperser/common/blobstore/blob_metadata_store.go b/disperser/common/blobstore/blob_metadata_store.go index a731d418bd..9c12520c17 100644 --- a/disperser/common/blobstore/blob_metadata_store.go +++ b/disperser/common/blobstore/blob_metadata_store.go @@ -235,6 +235,26 @@ func (s *BlobMetadataStore) IncrementNumRetries(ctx context.Context, existingMet return err } +func (s *BlobMetadataStore) UpdateConfirmationBlockNumber(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationBlockNumber uint32) error { + updated := *existingMetadata + updated.ConfirmationInfo.ConfirmationBlockNumber = confirmationBlockNumber + item, err := MarshalBlobMetadata(&updated) + if err != nil { + return err + } + + _, err = s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{ + "BlobHash": &types.AttributeValueMemberS{ + Value: existingMetadata.BlobHash, + }, + "MetadataHash": &types.AttributeValueMemberS{ + Value: existingMetadata.MetadataHash, + }, + }, item) + + return err +} + func (s *BlobMetadataStore) UpdateBlobMetadata(ctx context.Context, metadataKey disperser.BlobKey, updated *disperser.BlobMetadata) error { item, err := MarshalBlobMetadata(updated) if err != nil { diff --git a/disperser/common/blobstore/shared_storage.go b/disperser/common/blobstore/shared_storage.go index 5112fe5f60..ab2d0acd21 100644 --- a/disperser/common/blobstore/shared_storage.go +++ b/disperser/common/blobstore/shared_storage.go @@ -172,17 +172,8 @@ func (s *SharedBlobStore) MarkBlobInsufficientSignatures(ctx context.Context, ex return &newMetadata, s.blobMetadataStore.UpdateBlobMetadata(ctx, existingMetadata.GetBlobKey(), &newMetadata) } -func (s *SharedBlobStore) MarkBlobFinalized(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationBlockNumber uint64) (*disperser.BlobMetadata, error) { - if existingMetadata == nil { - return nil, errors.New("metadata is nil") - } - newMetadata := *existingMetadata - newMetadata.BlobStatus = disperser.Finalized - if confirmationBlockNumber > 0 { - newMetadata.ConfirmationInfo.ConfirmationBlockNumber = uint32(confirmationBlockNumber) - } - - return &newMetadata, s.blobMetadataStore.UpdateBlobMetadata(ctx, existingMetadata.GetBlobKey(), &newMetadata) +func (s *SharedBlobStore) MarkBlobFinalized(ctx context.Context, blobKey disperser.BlobKey) error { + return s.blobMetadataStore.SetBlobStatus(ctx, blobKey, disperser.Finalized) } func (s *SharedBlobStore) MarkBlobProcessing(ctx context.Context, metadataKey disperser.BlobKey) error { @@ -199,6 +190,10 @@ func (s *SharedBlobStore) IncrementBlobRetryCount(ctx context.Context, existingM return s.blobMetadataStore.IncrementNumRetries(ctx, existingMetadata) } +func (s *SharedBlobStore) UpdateConfirmationBlockNumber(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationBlockNumber uint32) error { + return s.blobMetadataStore.UpdateConfirmationBlockNumber(ctx, existingMetadata, confirmationBlockNumber) +} + func (s *SharedBlobStore) GetBlobsByMetadata(ctx context.Context, metadata []*disperser.BlobMetadata) (map[disperser.BlobKey]*core.Blob, error) { pool := workerpool.New(maxS3BlobFetchWorkers) resultChan := make(chan blobResultOrError, len(metadata)) diff --git a/disperser/common/blobstore/shared_storage_test.go b/disperser/common/blobstore/shared_storage_test.go index e2ab767ad5..146b5d8bc0 100644 --- a/disperser/common/blobstore/shared_storage_test.go +++ b/disperser/common/blobstore/shared_storage_test.go @@ -102,10 +102,15 @@ func TestSharedBlobStore(t *testing.T) { assert.Nil(t, err) assertMetadata(t, blobKey, blobSize, requestedAt, disperser.Confirmed, metadata1) - updatedMetadata, err = sharedStorage.MarkBlobFinalized(ctx, metadata1, 151) + err = sharedStorage.UpdateConfirmationBlockNumber(ctx, metadata1, 151) + assert.Nil(t, err) + metadata1, err = sharedStorage.GetBlobMetadata(ctx, blobKey) + assert.Nil(t, err) + assert.Equal(t, uint32(151), metadata1.ConfirmationInfo.ConfirmationBlockNumber) + + err = sharedStorage.MarkBlobFinalized(ctx, blobKey) assert.Nil(t, err) assert.Equal(t, disperser.Finalized, updatedMetadata.BlobStatus) - assert.Equal(t, uint32(151), updatedMetadata.ConfirmationInfo.ConfirmationBlockNumber) metadata1, err = sharedStorage.GetBlobMetadata(ctx, blobKey) assert.Nil(t, err) diff --git a/disperser/common/inmem/store.go b/disperser/common/inmem/store.go index 33b11a40cf..8fbab3607b 100644 --- a/disperser/common/inmem/store.go +++ b/disperser/common/inmem/store.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "encoding/hex" + "fmt" "sort" "strconv" "sync" @@ -123,19 +124,15 @@ func (q *BlobStore) MarkBlobInsufficientSignatures(ctx context.Context, existing return &newMetadata, nil } -func (q *BlobStore) MarkBlobFinalized(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationBlockNumber uint64) (*disperser.BlobMetadata, error) { +func (q *BlobStore) MarkBlobFinalized(ctx context.Context, blobKey disperser.BlobKey) error { q.mu.Lock() defer q.mu.Unlock() - blobKey := existingMetadata.GetBlobKey() if _, ok := q.Metadata[blobKey]; !ok { - return nil, disperser.ErrBlobNotFound + return disperser.ErrBlobNotFound } - newMetadata := *existingMetadata - newMetadata.BlobStatus = disperser.Finalized - newMetadata.ConfirmationInfo.ConfirmationBlockNumber = uint32(confirmationBlockNumber) - q.Metadata[blobKey] = &newMetadata - return &newMetadata, nil + q.Metadata[blobKey].BlobStatus = disperser.Finalized + return nil } func (q *BlobStore) MarkBlobProcessing(ctx context.Context, blobKey disperser.BlobKey) error { @@ -171,6 +168,21 @@ func (q *BlobStore) IncrementBlobRetryCount(ctx context.Context, existingMetadat return nil } +func (q *BlobStore) UpdateConfirmationBlockNumber(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationBlockNumber uint32) error { + q.mu.Lock() + defer q.mu.Unlock() + if _, ok := q.Metadata[existingMetadata.GetBlobKey()]; !ok { + return disperser.ErrBlobNotFound + } + + if q.Metadata[existingMetadata.GetBlobKey()].ConfirmationInfo == nil { + return fmt.Errorf("cannot update confirmation block number for blob without confirmation info: %s", existingMetadata.GetBlobKey().String()) + } + + q.Metadata[existingMetadata.GetBlobKey()].ConfirmationInfo.ConfirmationBlockNumber = confirmationBlockNumber + return nil +} + func (q *BlobStore) GetBlobsByMetadata(ctx context.Context, metadata []*disperser.BlobMetadata) (map[disperser.BlobKey]*core.Blob, error) { q.mu.RLock() defer q.mu.RUnlock() diff --git a/disperser/common/inmem/store_test.go b/disperser/common/inmem/store_test.go index 0f8a702d73..2d6b57bc33 100644 --- a/disperser/common/inmem/store_test.go +++ b/disperser/common/inmem/store_test.go @@ -96,9 +96,14 @@ func TestBlobStore(t *testing.T) { assert.Nil(t, err) assert.Equal(t, disperser.Confirmed, updated.BlobStatus) + err = bs.UpdateConfirmationBlockNumber(ctx, updated, 151) + assert.Nil(t, err) + meta2, err = bs.GetBlobMetadata(ctx, blobKey2) assert.Nil(t, err) assert.Equal(t, meta2.BlobStatus, disperser.Confirmed) + assert.Equal(t, uint32(151), meta2.ConfirmationInfo.ConfirmationBlockNumber) + meta1, err = bs.GetBlobMetadata(ctx, blobKey1) assert.Nil(t, err) assert.Equal(t, meta1.BlobStatus, disperser.Processing) diff --git a/disperser/disperser.go b/disperser/disperser.go index b693a1ac27..78d078e594 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -148,13 +148,15 @@ type BlobStore interface { // Returns the updated metadata and error MarkBlobInsufficientSignatures(ctx context.Context, existingMetadata *BlobMetadata, confirmationInfo *ConfirmationInfo) (*BlobMetadata, error) // MarkBlobFinalized marks a blob as finalized - MarkBlobFinalized(ctx context.Context, existingMetadata *BlobMetadata, confirmationBlockNumber uint64) (*BlobMetadata, error) + MarkBlobFinalized(ctx context.Context, blobKey BlobKey) error // MarkBlobProcessing marks a blob as processing MarkBlobProcessing(ctx context.Context, blobKey BlobKey) error // MarkBlobFailed marks a blob as failed MarkBlobFailed(ctx context.Context, blobKey BlobKey) error // IncrementBlobRetryCount increments the retry count of a blob IncrementBlobRetryCount(ctx context.Context, existingMetadata *BlobMetadata) error + // UpdateConfirmationBlockNumber updates the confirmation block number of a blob + UpdateConfirmationBlockNumber(ctx context.Context, existingMetadata *BlobMetadata, confirmationBlockNumber uint32) error // GetBlobsByMetadata retrieves a list of blobs given a list of metadata GetBlobsByMetadata(ctx context.Context, metadata []*BlobMetadata) (map[BlobKey]*core.Blob, error) // GetBlobMetadataByStatus returns a list of blob metadata for blobs with the given status