Skip to content

Commit

Permalink
Add GetBlobMetadataCountByCountId
Browse files Browse the repository at this point in the history
  • Loading branch information
Siddharth More authored and Siddharth More committed May 5, 2024
1 parent 0562b5a commit 8e6c7d0
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 5 deletions.
45 changes: 43 additions & 2 deletions disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -16,8 +17,9 @@ import (
)

const (
statusIndexName = "StatusIndex"
batchIndexName = "BatchIndex"
statusIndexName = "StatusIndex"
batchIndexName = "BatchIndex"
accountIdIndexName = "AccountIdIndex"
)

// BlobMetadataStore is a blob metadata storage backed by DynamoDB
Expand Down Expand Up @@ -218,6 +220,21 @@ func (s *BlobMetadataStore) GetBlobMetadataInBatch(ctx context.Context, batchHea
return metadata, nil
}

// GetBlobMetadataByAccount Count returns the count of all the metadata with the given status
// Because this function scans the entire index, it should only be used for status with a limited number of items.
// It should only be used to filter "Processing" status. To support other status, a streaming version should be implemented.
func (s *BlobMetadataStore) GetBlobMetadataCountByAccountID(ctx context.Context, accountID core.AccountID) (int32, error) {
count, err := s.dynamoDBClient.QueryIndexCount(ctx, s.tableName, accountIdIndexName, "AccountID = :accountID", commondynamodb.ExpresseionValues{
":accountID": &types.AttributeValueMemberS{
Value: accountID,
}})
if err != nil {
return 0, err
}

return count, nil
}

func (s *BlobMetadataStore) IncrementNumRetries(ctx context.Context, existingMetadata *disperser.BlobMetadata) error {
_, err := s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{
"BlobHash": &types.AttributeValueMemberS{
Expand Down Expand Up @@ -297,6 +314,10 @@ func GenerateTableSchema(metadataTableName string, readCapacityUnits int64, writ
AttributeName: aws.String("BlobIndex"),
AttributeType: types.ScalarAttributeTypeN,
},
{
AttributeName: aws.String("AccountID"),
AttributeType: types.ScalarAttributeTypeS,
},
},
KeySchema: []types.KeySchemaElement{
{
Expand Down Expand Up @@ -350,6 +371,26 @@ func GenerateTableSchema(metadataTableName string, readCapacityUnits int64, writ
WriteCapacityUnits: aws.Int64(writeCapacityUnits),
},
},
{
IndexName: aws.String(accountIdIndexName),
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("AccountID"),
KeyType: types.KeyTypeHash,
},
{
AttributeName: aws.String("RequestedAt"),
KeyType: types.KeyTypeRange,
},
},
Projection: &types.Projection{
ProjectionType: types.ProjectionTypeAll,
},
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(readCapacityUnits),
WriteCapacityUnits: aws.Int64(writeCapacityUnits),
},
},
},
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(readCapacityUnits),
Expand Down
102 changes: 102 additions & 0 deletions disperser/common/blobstore/blob_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,105 @@ func getConfirmedMetadata(t *testing.T, metadataKey disperser.BlobKey) *disperse
},
}
}

func TestBlobMetadataStoreWithAccountId(t *testing.T) {
ctx := context.Background()
blobKey1 := disperser.BlobKey{
BlobHash: blobHash,
MetadataHash: "hash",
}
metadata1 := &disperser.BlobMetadata{
MetadataHash: blobKey1.MetadataHash,
BlobHash: blobHash,
BlobStatus: disperser.Processing,
AccountID: "test",
Expiry: 0,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: blob.RequestHeader,
BlobSize: blobSize,
RequestedAt: 123,
},
}
blobKey2 := disperser.BlobKey{
BlobHash: "blob2",
MetadataHash: "hash2",
}
metadata2 := &disperser.BlobMetadata{
MetadataHash: blobKey2.MetadataHash,
BlobHash: blobKey2.BlobHash,
BlobStatus: disperser.Finalized,
AccountID: "test",
Expiry: 0,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: blob.RequestHeader,
BlobSize: blobSize,
RequestedAt: 124,
},
ConfirmationInfo: &disperser.ConfirmationInfo{},
}
err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata1)
assert.NoError(t, err)
err = blobMetadataStore.QueueNewBlobMetadata(ctx, metadata2)
assert.NoError(t, err)

fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey1)
assert.NoError(t, err)
assert.Equal(t, metadata1, fetchedMetadata)
fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey2)
assert.NoError(t, err)
assert.Equal(t, metadata2, fetchedMetadata)

processing, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, disperser.Processing)
assert.NoError(t, err)
assert.Len(t, processing, 1)
assert.Equal(t, metadata1, processing[0])

processingCount, err := blobMetadataStore.GetBlobMetadataByStatusCount(ctx, disperser.Processing)
assert.NoError(t, err)
assert.Equal(t, int32(1), processingCount)

blobCountByAccountId, err := blobMetadataStore.GetBlobMetadataCountByAccountID(ctx, "test")
assert.NoError(t, err)
assert.Equal(t, int32(2), blobCountByAccountId)

err = blobMetadataStore.IncrementNumRetries(ctx, metadata1)
assert.NoError(t, err)
fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey1)
assert.NoError(t, err)
metadata1.NumRetries = 1
assert.Equal(t, metadata1, fetchedMetadata)

finalized, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, disperser.Finalized)
assert.NoError(t, err)
assert.Len(t, finalized, 1)
assert.Equal(t, metadata2, finalized[0])

finalizedCount, err := blobMetadataStore.GetBlobMetadataByStatusCount(ctx, disperser.Finalized)
assert.NoError(t, err)
assert.Equal(t, int32(1), finalizedCount)

confirmedMetadata := getConfirmedMetadata(t, blobKey1)
err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey1, confirmedMetadata)
assert.NoError(t, err)

metadata, err := blobMetadataStore.GetBlobMetadataInBatch(ctx, confirmedMetadata.ConfirmationInfo.BatchHeaderHash, confirmedMetadata.ConfirmationInfo.BlobIndex)
assert.NoError(t, err)
assert.Equal(t, metadata, confirmedMetadata)

confirmedCount, err := blobMetadataStore.GetBlobMetadataByStatusCount(ctx, disperser.Confirmed)
assert.NoError(t, err)
assert.Equal(t, int32(1), confirmedCount)

deleteItems(t, []commondynamodb.Key{
{
"MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash},
"BlobHash": &types.AttributeValueMemberS{Value: blobKey1.BlobHash},
},
{
"MetadataHash": &types.AttributeValueMemberS{Value: blobKey2.MetadataHash},
"BlobHash": &types.AttributeValueMemberS{Value: blobKey2.BlobHash},
},
})
}
8 changes: 8 additions & 0 deletions disperser/common/blobstore/blobstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ var (
blob = &core.Blob{
RequestHeader: core.BlobRequestHeader{
SecurityParams: securityParams,
BlobAuthHeader: core.BlobAuthHeader{
AccountID: "test",
},
},

Data: []byte("test"),
}
s3Client = cmock.NewS3Client()
Expand Down Expand Up @@ -84,6 +88,10 @@ func setup(m *testing.M) {
EndpointURL: fmt.Sprintf("http://0.0.0.0:%s", localStackPort),
}

fmt.Printf("Creating dynamodb table %s\n", metadataTableName)
fmt.Printf("Table Schema: %v\n", blobstore.GenerateTableSchema(metadataTableName, 10, 10).AttributeDefinitions)
fmt.Printf("Table Schema: %v\n", blobstore.GenerateTableSchema(metadataTableName, 10, 10).KeySchema)
fmt.Printf("Table Schema: %v\n", blobstore.GenerateTableSchema(metadataTableName, 10, 10).GlobalSecondaryIndexes)
_, err := test_utils.CreateTable(context.Background(), cfg, metadataTableName, blobstore.GenerateTableSchema(metadataTableName, 10, 10))
if err != nil {
teardown()
Expand Down
5 changes: 5 additions & 0 deletions disperser/common/blobstore/shared_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (s *SharedBlobStore) StoreBlob(ctx context.Context, blob *core.Blob, reques
metadata := disperser.BlobMetadata{
BlobHash: blobHash,
MetadataHash: metadataHash,
AccountID: blob.RequestHeader.AccountID,
NumRetries: 0,
BlobStatus: disperser.Processing,
Expiry: expiry,
Expand Down Expand Up @@ -237,6 +238,10 @@ func (s *SharedBlobStore) GetBlobMetadata(ctx context.Context, metadataKey dispe
return s.blobMetadataStore.GetBlobMetadata(ctx, metadataKey)
}

func (s *SharedBlobStore) GetBlobMetadataCountByAccountID(ctx context.Context, accountID core.AccountID) (int32, error) {
return s.blobMetadataStore.GetBlobMetadataCountByAccountID(ctx, accountID)
}

func (s *SharedBlobStore) HandleBlobFailure(ctx context.Context, metadata *disperser.BlobMetadata, maxRetry uint) (bool, error) {
if metadata.NumRetries < maxRetry {
if err := s.MarkBlobProcessing(ctx, metadata.GetBlobKey()); err != nil {
Expand Down
13 changes: 13 additions & 0 deletions disperser/common/inmem/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (q *BlobStore) StoreBlob(ctx context.Context, blob *core.Blob, requestedAt
BlobHash: blobHash,
MetadataHash: blobKey.MetadataHash,
BlobStatus: disperser.Processing,
AccountID: blob.RequestHeader.AccountID,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: blob.RequestHeader,
Expand Down Expand Up @@ -270,6 +271,18 @@ func (q *BlobStore) GetBlobMetadata(ctx context.Context, blobKey disperser.BlobK
return nil, disperser.ErrBlobNotFound
}

func (q *BlobStore) GetBlobMetadataCountByAccountID(ctx context.Context, accountID core.AccountID) (int32, error) {
q.mu.RLock()
defer q.mu.RUnlock()
count := int32(0)
for _, meta := range q.Metadata {
if meta.AccountID == accountID {
count++
}
}
return count, nil
}

func (q *BlobStore) HandleBlobFailure(ctx context.Context, metadata *disperser.BlobMetadata, maxRetry uint) (bool, error) {
if metadata.NumRetries < maxRetry {
if err := q.MarkBlobProcessing(ctx, metadata.GetBlobKey()); err != nil {
Expand Down
9 changes: 6 additions & 3 deletions disperser/disperser.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ func ParseBlobKey(key string) (BlobKey, error) {
}

type BlobMetadata struct {
BlobHash BlobHash `json:"blob_hash"`
MetadataHash MetadataHash `json:"metadata_hash"`
BlobStatus BlobStatus `json:"blob_status"`
BlobHash BlobHash `json:"blob_hash"`
MetadataHash MetadataHash `json:"metadata_hash"`
BlobStatus BlobStatus `json:"blob_status"`
AccountID core.AccountID `json:"account_id"`
// Expiry is unix epoch time in seconds at which the blob will expire
Expiry uint64 `json:"expiry"`
// NumRetries is the number of times the blob has been retried
Expand Down Expand Up @@ -168,6 +169,8 @@ type BlobStore interface {
GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHash [32]byte) ([]*BlobMetadata, error)
// GetBlobMetadata returns a blob metadata given a metadata key
GetBlobMetadata(ctx context.Context, blobKey BlobKey) (*BlobMetadata, error)
// GetBlobMetadataCountByAccountID returns a list of blob metadata for blobs with the given accountId
GetBlobMetadataCountByAccountID(ctx context.Context, accountID core.AccountID) (int32, error)
// HandleBlobFailure handles a blob failure by either incrementing the retry count or marking the blob as failed
// Returns a boolean indicating whether the blob should be retried and an error
HandleBlobFailure(ctx context.Context, metadata *BlobMetadata, maxRetry uint) (bool, error)
Expand Down

0 comments on commit 8e6c7d0

Please sign in to comment.