Skip to content

Commit

Permalink
Add Confirming state for blobs (#466)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Apr 18, 2024
1 parent 66a8486 commit a0c28c7
Show file tree
Hide file tree
Showing 14 changed files with 140 additions and 111 deletions.
80 changes: 45 additions & 35 deletions api/grpc/disperser/disperser.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions api/grpc/node/node.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions api/proto/disperser/disperser.proto
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ enum BlobStatus {
// INSUFFICIENT_SIGNATURES means that the confirmation threshold for the blob was not met
// for at least one quorum.
INSUFFICIENT_SIGNATURES = 5;

// CONFIRMING means that the blob has been dispersed to DA nodes and it's waiting for the confirmation onchain
CONFIRMING = 6;
}

// Types below correspond to the types necessary to verify a blob
Expand Down
2 changes: 1 addition & 1 deletion disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ func (s *DispersalServer) updateQuorumConfig(ctx context.Context) (QuorumConfig,

func getResponseStatus(status disperser.BlobStatus) pb.BlobStatus {
switch status {
case disperser.Processing:
case disperser.Confirming, disperser.Processing:
return pb.BlobStatus_PROCESSING
case disperser.Confirmed:
return pb.BlobStatus_CONFIRMED
Expand Down
25 changes: 25 additions & 0 deletions disperser/apiserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,31 @@ func TestGetBlobStatus(t *testing.T) {
assert.Equal(t, reply.GetInfo().GetBlobVerificationProof().GetQuorumIndexes(), quorumIndexes)
}

func TestGetBlobConfirmingStatus(t *testing.T) {
data := make([]byte, 1024)
_, err := rand.Read(data)
assert.NoError(t, err)

data = codec.ConvertByPaddingEmptyByte(data)

status, _, requestID := disperseBlob(t, dispersalServer, data)
assert.Equal(t, status, pb.BlobStatus_PROCESSING)
assert.NotNil(t, requestID)
blobKey, err := disperser.ParseBlobKey(string(requestID))
assert.NoError(t, err)
err = queue.MarkBlobConfirming(context.Background(), blobKey)
assert.NoError(t, err)
meta, err := queue.GetBlobMetadata(context.Background(), blobKey)
assert.NoError(t, err)
assert.Equal(t, meta.BlobStatus, disperser.Confirming)

reply, err := dispersalServer.GetBlobStatus(context.Background(), &pb.BlobStatusRequest{
RequestId: requestID,
})
assert.NoError(t, err)
assert.Equal(t, reply.GetStatus(), pb.BlobStatus_PROCESSING)
}

func TestRetrieveBlob(t *testing.T) {

for i := 0; i < 3; i++ {
Expand Down
26 changes: 15 additions & 11 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,10 @@ func (b *Batcher) updateConfirmationInfo(
if status == disperser.Confirmed {
if _, updateConfirmationInfoErr = b.Queue.MarkBlobConfirmed(ctx, metadata, confirmationInfo); updateConfirmationInfoErr == nil {
b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.Confirmed)
// remove encoded blob from storage so we don't disperse it again
b.EncodingStreamer.RemoveEncodedBlob(metadata)
}
} else if status == disperser.InsufficientSignatures {
if _, updateConfirmationInfoErr = b.Queue.MarkBlobInsufficientSignatures(ctx, metadata, confirmationInfo); updateConfirmationInfoErr == nil {
b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.InsufficientSignatures)
// remove encoded blob from storage so we don't disperse it again
b.EncodingStreamer.RemoveEncodedBlob(metadata)
}
} else {
updateConfirmationInfoErr = fmt.Errorf("HandleSingleBatch: trying to update confirmation info for blob in status other than confirmed or insufficient signatures: %s", status.String())
Expand Down Expand Up @@ -392,6 +388,17 @@ func (b *Batcher) handleFailure(ctx context.Context, blobMetadatas []*disperser.
return result.ErrorOrNil()
}

func (b *Batcher) transitionBlobToConfirming(ctx context.Context, metadata *disperser.BlobMetadata) error {
err := b.Queue.MarkBlobConfirming(ctx, metadata.GetBlobKey())
if err != nil {
b.logger.Error("error marking blob as confirming", "err", err)
return err
}
// remove encoded blob from storage so we don't disperse it again
b.EncodingStreamer.RemoveEncodedBlob(metadata)
return nil
}

type confirmationMetadata struct {
batchHeader *core.BatchHeader
blobs []*disperser.BlobMetadata
Expand Down Expand Up @@ -494,13 +501,10 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailConfirmBatch)
return fmt.Errorf("HandleSingleBatch: error sending confirmBatch transaction: %w", err)
} else {
for _, metadata := range batch.BlobMetadata {
err = b.EncodingStreamer.MarkBlobPendingConfirmation(metadata)
if err != nil {
log.Error("HandleSingleBatch: error marking blob as pending confirmation", "err", err)
}
}
}

for _, metadata := range batch.BlobMetadata {
_ = b.transitionBlobToConfirming(ctx, metadata)
}

return nil
Expand Down
19 changes: 11 additions & 8 deletions disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func TestBlobRetry(t *testing.T) {

encodedResult, err := components.encodingStreamer.EncodedBlobstore.GetEncodingResult(blobKey, 0)
assert.NoError(t, err)
assert.Equal(t, encodedResult.Status, bat.PendingDispersal)
assert.NotNil(t, encodedResult)

txn := types.NewTransaction(0, gethcommon.Address{}, big.NewInt(0), 0, big.NewInt(0), nil)
components.transactor.On("BuildConfirmBatchTxn").Return(txn, nil)
Expand All @@ -411,12 +411,13 @@ func TestBlobRetry(t *testing.T) {
err = batcher.HandleSingleBatch(ctx)
assert.NoError(t, err)

// ConfirmBatch transaction has been sent. Waiting for transaction to be confirmed onchain
meta, err := blobStore.GetBlobMetadata(ctx, blobKey)
assert.NoError(t, err)
assert.Equal(t, disperser.Processing, meta.BlobStatus)
assert.Equal(t, disperser.Confirming, meta.BlobStatus)
encodedResult, err = components.encodingStreamer.EncodedBlobstore.GetEncodingResult(blobKey, 0)
assert.NoError(t, err)
assert.Equal(t, encodedResult.Status, bat.PendingConfirmation)
assert.ErrorContains(t, err, "no such key")
assert.Nil(t, encodedResult)

err = components.encodingStreamer.RequestEncoding(ctx, out)
assert.NoError(t, err)
Expand Down Expand Up @@ -444,12 +445,10 @@ func TestBlobRetry(t *testing.T) {
batch, err = components.encodingStreamer.CreateBatch()
assert.ErrorContains(t, err, "no encoded results")
assert.Nil(t, batch)
_, err = components.encodingStreamer.EncodedBlobstore.GetEncodingResult(blobKey, 0)
assert.NoError(t, err)

meta, err = blobStore.GetBlobMetadata(ctx, blobKey)
assert.NoError(t, err)
assert.Equal(t, disperser.Processing, meta.BlobStatus)
assert.Equal(t, disperser.Confirming, meta.BlobStatus)

// Trigger a retry
confirmationErr := errors.New("error")
Expand All @@ -459,6 +458,10 @@ func TestBlobRetry(t *testing.T) {
Metadata: components.txnManager.Requests[len(components.txnManager.Requests)-1].Metadata,
})
assert.ErrorIs(t, err, confirmationErr)
meta, err = blobStore.GetBlobMetadata(ctx, blobKey)
assert.NoError(t, err)
assert.Equal(t, disperser.Processing, meta.BlobStatus)
assert.Equal(t, uint(1), meta.NumRetries)

components.encodingStreamer.ReferenceBlockNumber = 14
// Should pick up the blob to encode
Expand All @@ -475,7 +478,7 @@ func TestBlobRetry(t *testing.T) {
assert.NoError(t, err)
encodedResult, err = components.encodingStreamer.EncodedBlobstore.GetEncodingResult(blobKey, 0)
assert.NoError(t, err)
assert.Equal(t, encodedResult.Status, bat.PendingDispersal)
assert.NotNil(t, encodedResult)
}

func TestRetryTxnReceipt(t *testing.T) {
Expand Down
28 changes: 3 additions & 25 deletions disperser/batcher/encoded_blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@ import (
)

type requestID string
type status uint

const (
PendingDispersal status = iota
PendingConfirmation
)

type encodedBlobStore struct {
mu sync.RWMutex
Expand All @@ -37,7 +31,6 @@ type EncodingResult struct {
Commitment *encoding.BlobCommitments
Chunks []*encoding.Frame
Assignments map[core.OperatorID]core.Assignment
Status status
}

// EncodingResultOrStatus is a wrapper for EncodingResult that also contains an error
Expand Down Expand Up @@ -74,7 +67,7 @@ func (e *encodedBlobStore) HasEncodingRequested(blobKey disperser.BlobKey, quoru
}

res, ok := e.encoded[requestID]
if ok && (res.Status == PendingConfirmation || res.ReferenceBlockNumber == referenceBlockNumber) {
if ok && res.ReferenceBlockNumber == referenceBlockNumber {
return true
}
return false
Expand Down Expand Up @@ -148,17 +141,15 @@ func (e *encodedBlobStore) GetNewAndDeleteStaleEncodingResults(blockNumber uint)
staleCount := 0
pendingConfirmation := 0
for k, encodedResult := range e.encoded {
if encodedResult.Status == PendingConfirmation {
pendingConfirmation++
} else if encodedResult.ReferenceBlockNumber == blockNumber {
if encodedResult.ReferenceBlockNumber == blockNumber {
fetched = append(fetched, encodedResult)
} else if encodedResult.ReferenceBlockNumber < blockNumber {
// this is safe: https://go.dev/doc/effective_go#for
delete(e.encoded, k)
staleCount++
e.encodedResultSize -= getChunksSize(encodedResult)
} else {
e.logger.Error("GetNewAndDeleteStaleEncodingResults: unexpected case", "refBlockNumber", encodedResult.ReferenceBlockNumber, "blockNumber", blockNumber, "status", encodedResult.Status)
e.logger.Error("unexpected case", "refBlockNumber", encodedResult.ReferenceBlockNumber, "blockNumber", blockNumber)
}
}
e.logger.Debug("consumed encoded results", "fetched", len(fetched), "stale", staleCount, "pendingConfirmation", pendingConfirmation, "blockNumber", blockNumber, "encodedSize", e.encodedResultSize)
Expand All @@ -174,19 +165,6 @@ func (e *encodedBlobStore) GetEncodedResultSize() (int, uint64) {
return len(e.encoded), e.encodedResultSize
}

func (e *encodedBlobStore) MarkEncodedResultPendingConfirmation(blobKey disperser.BlobKey, quorumID core.QuorumID) error {
e.mu.Lock()
defer e.mu.Unlock()

requestID := getRequestID(blobKey, quorumID)
if _, ok := e.encoded[requestID]; !ok {
return fmt.Errorf("MarkEncodedBlobPendingConfirmation: no such key (%s) in encoded set", requestID)
}

e.encoded[requestID].Status = PendingConfirmation
return nil
}

func getRequestID(key disperser.BlobKey, quorumID core.QuorumID) requestID {
return requestID(fmt.Sprintf("%s-%d", key.String(), quorumID))
}
Expand Down
Loading

0 comments on commit a0c28c7

Please sign in to comment.