Skip to content

Commit

Permalink
replace confirming state with dispersing state
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Apr 24, 2024
1 parent 1001025 commit 61ee80f
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 45 deletions.
12 changes: 6 additions & 6 deletions api/grpc/disperser/disperser.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions api/proto/disperser/disperser.proto
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ enum BlobStatus {
// for at least one quorum.
INSUFFICIENT_SIGNATURES = 5;

// CONFIRMING means that the blob has been dispersed to DA nodes and it's waiting for the confirmation onchain
CONFIRMING = 6;
// DISPERSING means that the blob is currently being dispersed to DA Nodes and being confirmed onchain
DISPERSING = 6;
}

// Types below correspond to the types necessary to verify a blob
Expand Down
2 changes: 1 addition & 1 deletion disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ func (s *DispersalServer) updateQuorumConfig(ctx context.Context) (QuorumConfig,

func getResponseStatus(status disperser.BlobStatus) pb.BlobStatus {
switch status {
case disperser.Confirming, disperser.Processing:
case disperser.Dispersing, disperser.Processing:
return pb.BlobStatus_PROCESSING
case disperser.Confirmed:
return pb.BlobStatus_CONFIRMED
Expand Down
6 changes: 3 additions & 3 deletions disperser/apiserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func TestGetBlobStatus(t *testing.T) {
assert.Equal(t, reply.GetInfo().GetBlobVerificationProof().GetQuorumIndexes(), quorumIndexes)
}

func TestGetBlobConfirmingStatus(t *testing.T) {
func TestGetBlobDispersingStatus(t *testing.T) {
data := make([]byte, 1024)
_, err := rand.Read(data)
assert.NoError(t, err)
Expand All @@ -306,11 +306,11 @@ func TestGetBlobConfirmingStatus(t *testing.T) {
assert.NotNil(t, requestID)
blobKey, err := disperser.ParseBlobKey(string(requestID))
assert.NoError(t, err)
err = queue.MarkBlobConfirming(context.Background(), blobKey)
err = queue.MarkBlobDispersing(context.Background(), blobKey)
assert.NoError(t, err)
meta, err := queue.GetBlobMetadata(context.Background(), blobKey)
assert.NoError(t, err)
assert.Equal(t, meta.BlobStatus, disperser.Confirming)
assert.Equal(t, meta.BlobStatus, disperser.Dispersing)

reply, err := dispersalServer.GetBlobStatus(context.Background(), &pb.BlobStatusRequest{
RequestId: requestID,
Expand Down
17 changes: 1 addition & 16 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,17 +389,6 @@ func (b *Batcher) handleFailure(ctx context.Context, blobMetadatas []*disperser.
return result.ErrorOrNil()
}

func (b *Batcher) transitionBlobToConfirming(ctx context.Context, metadata *disperser.BlobMetadata) error {
err := b.Queue.MarkBlobConfirming(ctx, metadata.GetBlobKey())
if err != nil {
b.logger.Error("error marking blob as confirming", "err", err)
return err
}
// remove encoded blob from storage so we don't disperse it again
b.EncodingStreamer.RemoveEncodedBlob(metadata)
return nil
}

type confirmationMetadata struct {
batchHeader *core.BatchHeader
blobs []*disperser.BlobMetadata
Expand All @@ -421,7 +410,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
defer timer.ObserveDuration()

stageTimer := time.Now()
batch, err := b.EncodingStreamer.CreateBatch()
batch, err := b.EncodingStreamer.CreateBatch(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -504,10 +493,6 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
return fmt.Errorf("HandleSingleBatch: error sending confirmBatch transaction: %w", err)
}

for _, metadata := range batch.BlobMetadata {
_ = b.transitionBlobToConfirming(ctx, metadata)
}

return nil
}

Expand Down
8 changes: 4 additions & 4 deletions disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func TestBlobRetry(t *testing.T) {
// ConfirmBatch transaction has been sent. Waiting for transaction to be confirmed onchain
meta, err := blobStore.GetBlobMetadata(ctx, blobKey)
assert.NoError(t, err)
assert.Equal(t, disperser.Confirming, meta.BlobStatus)
assert.Equal(t, disperser.Dispersing, meta.BlobStatus)
encodedResult, err = components.encodingStreamer.EncodedBlobstore.GetEncodingResult(blobKey, 0)
assert.ErrorContains(t, err, "no such key")
assert.Nil(t, encodedResult)
Expand All @@ -428,7 +428,7 @@ func TestBlobRetry(t *testing.T) {
t.Fatal("shouldn't have picked up any blobs to encode")
case <-timer.C:
}
batch, err := components.encodingStreamer.CreateBatch()
batch, err := components.encodingStreamer.CreateBatch(context.Background())
assert.ErrorContains(t, err, "no encoded results")
assert.Nil(t, batch)

Expand All @@ -443,13 +443,13 @@ func TestBlobRetry(t *testing.T) {
case <-timer.C:
}

batch, err = components.encodingStreamer.CreateBatch()
batch, err = components.encodingStreamer.CreateBatch(context.Background())
assert.ErrorContains(t, err, "no encoded results")
assert.Nil(t, batch)

meta, err = blobStore.GetBlobMetadata(ctx, blobKey)
assert.NoError(t, err)
assert.Equal(t, disperser.Confirming, meta.BlobStatus)
assert.Equal(t, disperser.Dispersing, meta.BlobStatus)

// Trigger a retry
confirmationErr := errors.New("error")
Expand Down
17 changes: 16 additions & 1 deletion disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (e *EncodingStreamer) ProcessEncodedBlobs(ctx context.Context, result Encod
// If successful, it returns a batch, and updates the reference block number for next batch to use.
// Otherwise, it returns an error and keeps the blobs in the encoded blob store.
// This function is meant to be called periodically in a single goroutine as it resets the state of the encoded blob store.
func (e *EncodingStreamer) CreateBatch() (*batch, error) {
func (e *EncodingStreamer) CreateBatch(ctx context.Context) (*batch, error) {
// lock to update e.ReferenceBlockNumber
e.mu.Lock()
defer e.mu.Unlock()
Expand Down Expand Up @@ -556,6 +556,10 @@ func (e *EncodingStreamer) CreateBatch() (*batch, error) {
return nil, err
}

for _, metadata := range metadatas {
_ = e.transitionBlobToDispersing(ctx, metadata)
}

e.ReferenceBlockNumber = 0

return &batch{
Expand All @@ -568,6 +572,17 @@ func (e *EncodingStreamer) CreateBatch() (*batch, error) {
}, nil
}

func (e *EncodingStreamer) transitionBlobToDispersing(ctx context.Context, metadata *disperser.BlobMetadata) error {
err := e.blobStore.MarkBlobDispersing(ctx, metadata.GetBlobKey())
if err != nil {
e.logger.Error("error marking blob as dispersing", "err", err)
return err
}
// remove encoded blob from storage so we don't disperse it again
e.RemoveEncodedBlob(metadata)
return nil
}

func (e *EncodingStreamer) RemoveEncodedBlob(metadata *disperser.BlobMetadata) {
for _, sp := range metadata.RequestMetadata.SecurityParams {
e.EncodedBlobstore.DeleteEncodingResult(metadata.GetBlobKey(), sp.QuorumID)
Expand Down
19 changes: 17 additions & 2 deletions disperser/batcher/encoding_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func TestPartialBlob(t *testing.T) {

// get batch
assert.Equal(t, encodingStreamer.ReferenceBlockNumber, uint(10))
batch, err := encodingStreamer.CreateBatch()
batch, err := encodingStreamer.CreateBatch(context.Background())
assert.Nil(t, err)
assert.NotNil(t, batch)
assert.Equal(t, encodingStreamer.ReferenceBlockNumber, uint(0))
Expand Down Expand Up @@ -644,10 +644,25 @@ func TestGetBatch(t *testing.T) {

// get batch
assert.Equal(t, encodingStreamer.ReferenceBlockNumber, uint(10))
batch, err := encodingStreamer.CreateBatch()
batch, err := encodingStreamer.CreateBatch(context.Background())
assert.Nil(t, err)
assert.NotNil(t, batch)
assert.Equal(t, encodingStreamer.ReferenceBlockNumber, uint(0))
metadata1, err = c.blobStore.GetBlobMetadata(ctx, metadataKey1)
assert.Nil(t, err)
assert.Equal(t, disperser.Dispersing, metadata1.BlobStatus)
metadata2, err = c.blobStore.GetBlobMetadata(ctx, metadataKey2)
assert.Equal(t, disperser.Dispersing, metadata2.BlobStatus)
assert.Nil(t, err)
res, err := encodingStreamer.EncodedBlobstore.GetEncodingResult(metadataKey1, core.QuorumID(0))
assert.Nil(t, res)
assert.ErrorContains(t, err, "GetEncodedBlob: no such key")
res, err = encodingStreamer.EncodedBlobstore.GetEncodingResult(metadataKey1, core.QuorumID(1))
assert.Nil(t, res)
assert.ErrorContains(t, err, "GetEncodedBlob: no such key")
res, err = encodingStreamer.EncodedBlobstore.GetEncodingResult(metadataKey2, core.QuorumID(0))
assert.Nil(t, res)
assert.ErrorContains(t, err, "GetEncodedBlob: no such key")

// Check BatchHeader
assert.NotNil(t, batch.BatchHeader)
Expand Down
4 changes: 2 additions & 2 deletions disperser/common/blobstore/shared_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ func (s *SharedBlobStore) MarkBlobConfirmed(ctx context.Context, existingMetadat
return &newMetadata, s.blobMetadataStore.UpdateBlobMetadata(ctx, existingMetadata.GetBlobKey(), &newMetadata)
}

func (s *SharedBlobStore) MarkBlobConfirming(ctx context.Context, metadataKey disperser.BlobKey) error {
return s.blobMetadataStore.SetBlobStatus(ctx, metadataKey, disperser.Confirming)
func (s *SharedBlobStore) MarkBlobDispersing(ctx context.Context, metadataKey disperser.BlobKey) error {
return s.blobMetadataStore.SetBlobStatus(ctx, metadataKey, disperser.Dispersing)
}

func (s *SharedBlobStore) MarkBlobInsufficientSignatures(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationInfo *disperser.ConfirmationInfo) (*disperser.BlobMetadata, error) {
Expand Down
4 changes: 2 additions & 2 deletions disperser/common/inmem/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ func (q *BlobStore) MarkBlobConfirmed(ctx context.Context, existingMetadata *dis
return &newMetadata, nil
}

func (q *BlobStore) MarkBlobConfirming(ctx context.Context, blobKey disperser.BlobKey) error {
func (q *BlobStore) MarkBlobDispersing(ctx context.Context, blobKey disperser.BlobKey) error {
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.Metadata[blobKey]; !ok {
return disperser.ErrBlobNotFound
}
q.Metadata[blobKey].BlobStatus = disperser.Confirming
q.Metadata[blobKey].BlobStatus = disperser.Dispersing
return nil
}

Expand Down
12 changes: 6 additions & 6 deletions disperser/disperser.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (
Failed
Finalized
InsufficientSignatures
Confirming
Dispersing
)

var enumStrings = map[BlobStatus]string{
Expand All @@ -35,7 +35,7 @@ var enumStrings = map[BlobStatus]string{
Failed: "Failed",
Finalized: "Finalized",
InsufficientSignatures: "InsufficientSignatures",
Confirming: "Confirming",
Dispersing: "Dispersing",
}

func (bs BlobStatus) String() string {
Expand Down Expand Up @@ -142,8 +142,8 @@ type BlobStore interface {
// MarkBlobConfirmed updates blob metadata to Confirmed status with confirmation info
// Returns the updated metadata and error
MarkBlobConfirmed(ctx context.Context, existingMetadata *BlobMetadata, confirmationInfo *ConfirmationInfo) (*BlobMetadata, error)
// MarkBlobConfirming updates blob metadata to Confirming status
MarkBlobConfirming(ctx context.Context, blobKey BlobKey) error
// MarkBlobDispersing updates blob metadata to Dispersing status
MarkBlobDispersing(ctx context.Context, blobKey BlobKey) error
// MarkBlobInsufficientSignatures updates blob metadata to InsufficientSignatures status with confirmation info
// Returns the updated metadata and error
MarkBlobInsufficientSignatures(ctx context.Context, existingMetadata *BlobMetadata, confirmationInfo *ConfirmationInfo) (*BlobMetadata, error)
Expand Down Expand Up @@ -209,8 +209,8 @@ func FromBlobStatusProto(status disperser_rpc.BlobStatus) (*BlobStatus, error) {
case disperser_rpc.BlobStatus_INSUFFICIENT_SIGNATURES:
res = InsufficientSignatures
return &res, nil
case disperser_rpc.BlobStatus_CONFIRMING:
res = Confirming
case disperser_rpc.BlobStatus_DISPERSING:
res = Dispersing
return &res, nil
}

Expand Down

0 comments on commit 61ee80f

Please sign in to comment.