Skip to content

Commit

Permalink
Remove encoded blob if confirmed (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Nov 17, 2023
1 parent fa45b32 commit 663bd68
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 13 deletions.
16 changes: 11 additions & 5 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
batchID, err := b.getBatchID(ctx, txnReceipt)
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata)
return fmt.Errorf("HandleSingleBatch: error confirming batch: %w", err)
return fmt.Errorf("HandleSingleBatch: error fetching batch ID: %w", err)
}

// Mark the blobs as complete
Expand Down Expand Up @@ -316,11 +316,17 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
}

if status == disperser.Confirmed {
_, updateConfirmationInfoErr = b.Queue.MarkBlobConfirmed(ctx, metadata, confirmationInfo)
b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.Confirmed)
if _, updateConfirmationInfoErr = b.Queue.MarkBlobConfirmed(ctx, metadata, confirmationInfo); updateConfirmationInfoErr == nil {
b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.Confirmed)
// remove encoded blob from storage so we don't disperse it again
b.EncodingStreamer.RemoveEncodedBlob(metadata)
}
} else if status == disperser.InsufficientSignatures {
_, updateConfirmationInfoErr = b.Queue.MarkBlobInsufficientSignatures(ctx, metadata, confirmationInfo)
b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.InsufficientSignatures)
if _, updateConfirmationInfoErr = b.Queue.MarkBlobInsufficientSignatures(ctx, metadata, confirmationInfo); updateConfirmationInfoErr == nil {
b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.InsufficientSignatures)
// remove encoded blob from storage so we don't disperse it again
b.EncodingStreamer.RemoveEncodedBlob(metadata)
}
} else {
updateConfirmationInfoErr = fmt.Errorf("HandleSingleBatch: trying to update confirmation info for blob in status other than confirmed or insufficient signatures: %s", status.String())
}
Expand Down
23 changes: 15 additions & 8 deletions disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,24 @@ func TestBatcherIterations(t *testing.T) {
err = batcher.HandleSingleBatch(ctx)
assert.NoError(t, err)
// Check that the blob was processed
meta, err := blobStore.GetBlobMetadata(ctx, blobKey1)
meta1, err := blobStore.GetBlobMetadata(ctx, blobKey1)
assert.NoError(t, err)
assert.Equal(t, blobKey1, meta.GetBlobKey())
assert.Equal(t, requestedAt1, meta.RequestMetadata.RequestedAt)
assert.Equal(t, disperser.Confirmed, meta.BlobStatus)
assert.Equal(t, meta.ConfirmationInfo.BatchID, uint32(3))
assert.Equal(t, blobKey1, meta1.GetBlobKey())
assert.Equal(t, requestedAt1, meta1.RequestMetadata.RequestedAt)
assert.Equal(t, disperser.Confirmed, meta1.BlobStatus)
assert.Equal(t, meta1.ConfirmationInfo.BatchID, uint32(3))

meta, err = blobStore.GetBlobMetadata(ctx, blobKey2)
meta2, err := blobStore.GetBlobMetadata(ctx, blobKey2)
assert.NoError(t, err)
assert.Equal(t, blobKey2, meta.GetBlobKey())
assert.Equal(t, disperser.Confirmed, meta.BlobStatus)
assert.Equal(t, blobKey2, meta2.GetBlobKey())
assert.Equal(t, disperser.Confirmed, meta2.BlobStatus)

res, err := components.encodingStreamer.EncodedBlobstore.GetEncodingResult(meta1.GetBlobKey(), 0)
assert.ErrorContains(t, err, "no such key")
assert.Nil(t, res)
res, err = components.encodingStreamer.EncodedBlobstore.GetEncodingResult(meta2.GetBlobKey(), 1)
assert.ErrorContains(t, err, "no such key")
assert.Nil(t, res)
}

func TestBlobFailures(t *testing.T) {
Expand Down
12 changes: 12 additions & 0 deletions disperser/batcher/encoded_blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ func (e *encodedBlobStore) GetEncodingResult(blobKey disperser.BlobKey, quorumID
return e.encoded[requestID], nil
}

func (e *encodedBlobStore) DeleteEncodingResult(blobKey disperser.BlobKey, quorumID core.QuorumID) {
e.mu.Lock()
defer e.mu.Unlock()

requestID := getRequestID(blobKey, quorumID)
if _, ok := e.encoded[requestID]; !ok {
return
}

delete(e.encoded, requestID)
}

// GetNewAndDeleteStaleEncodingResults returns all the fresh encoded results and deletes all the stale results
func (e *encodedBlobStore) GetNewAndDeleteStaleEncodingResults(blockNumber uint) []*EncodingResult {
e.mu.Lock()
Expand Down
6 changes: 6 additions & 0 deletions disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,12 @@ func (e *EncodingStreamer) CreateBatch() (*batch, error) {
}, nil
}

func (e *EncodingStreamer) RemoveEncodedBlob(metadata *disperser.BlobMetadata) {
for _, sp := range metadata.RequestMetadata.SecurityParams {
e.EncodedBlobstore.DeleteEncodingResult(metadata.GetBlobKey(), sp.QuorumID)
}
}

func (e *EncodingStreamer) getBatchMetadata(ctx context.Context, metadatas []*disperser.BlobMetadata, blockNumber uint) (*batchMetadata, error) {
quorums := make(map[core.QuorumID]QuorumInfo, 0)
for _, metadata := range metadatas {
Expand Down

0 comments on commit 663bd68

Please sign in to comment.