Skip to content

Commit

Permalink
[v2] Encoding manager (#846)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Nov 4, 2024
1 parent 7af65ea commit c63dd61
Show file tree
Hide file tree
Showing 16 changed files with 918 additions and 40 deletions.
43 changes: 43 additions & 0 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,49 @@ func (c *Client) UpdateItem(ctx context.Context, tableName string, key Key, item
return resp.Attributes, err
}

func (c *Client) UpdateItemWithCondition(
ctx context.Context,
tableName string,
key Key,
item Item,
condition expression.ConditionBuilder,
) (Item, error) {
update := expression.UpdateBuilder{}
for itemKey, itemValue := range item {
// Ignore primary key updates
if _, ok := key[itemKey]; ok {
continue
}
update = update.Set(expression.Name(itemKey), expression.Value(itemValue))
}

expr, err := expression.NewBuilder().WithUpdate(update).WithCondition(condition).Build()
if err != nil {
return nil, err
}

resp, err := c.dynamoClient.UpdateItem(ctx, &dynamodb.UpdateItemInput{
TableName: aws.String(tableName),
Key: key,
ConditionExpression: expr.Condition(),
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
UpdateExpression: expr.Update(),
ReturnValues: types.ReturnValueUpdatedNew,
})

var ccfe *types.ConditionalCheckFailedException
if errors.As(err, &ccfe) {
return nil, ErrConditionFailed
}

if err != nil {
return nil, err
}

return resp.Attributes, err
}

// IncrementBy increments the attribute by the value for item that matches with the key
func (c *Client) IncrementBy(ctx context.Context, tableName string, key Key, attr string, value uint64) (Item, error) {
// ADD numeric values
Expand Down
18 changes: 18 additions & 0 deletions common/aws/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils"
"github.com/Layr-Labs/eigenda/inabox/deploy"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/ory/dockertest/v3"
Expand Down Expand Up @@ -217,6 +218,22 @@ func TestBasicOperations(t *testing.T) {
})
assert.NoError(t, err)

// Attempt to update the item with invalid condition
_, err = dynamoClient.UpdateItemWithCondition(ctx, tableName, commondynamodb.Key{
"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
}, commondynamodb.Item{
"RequestedAt": &types.AttributeValueMemberN{Value: "456"},
}, expression.Name("Status").In(expression.Value("Dispersing")))
assert.Error(t, err)

// Attempt to update the item with valid condition
_, err = dynamoClient.UpdateItemWithCondition(ctx, tableName, commondynamodb.Key{
"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
}, commondynamodb.Item{
"RequestedAt": &types.AttributeValueMemberN{Value: "456"},
}, expression.Name("Status").In(expression.Value("Confirmed")))
assert.NoError(t, err)

_, err = dynamoClient.IncrementBy(ctx, tableName, commondynamodb.Key{
"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
}, "BlobSize", 1000)
Expand All @@ -231,6 +248,7 @@ func TestBasicOperations(t *testing.T) {
assert.Equal(t, "0x123", fetchedItem["BatchHeaderHash"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "0", fetchedItem["BlobIndex"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, "1123", fetchedItem["BlobSize"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, "456", fetchedItem["RequestedAt"].(*types.AttributeValueMemberN).Value)

err = dynamoClient.DeleteTable(ctx, tableName)
assert.NoError(t, err)
Expand Down
5 changes: 3 additions & 2 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/Layr-Labs/eigenda/core/auth"
"github.com/Layr-Labs/eigenda/core/meterer"
"github.com/Layr-Labs/eigenda/disperser"
dispcommon "github.com/Layr-Labs/eigenda/disperser/common"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/encoding/rs"
"github.com/Layr-Labs/eigensdk-go/logging"
Expand Down Expand Up @@ -597,7 +598,7 @@ func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusR
s.logger.Debug("metadataKey", "metadataKey", metadataKey.String())
metadata, err := s.blobStore.GetBlobMetadata(ctx, metadataKey)
if err != nil {
if errors.Is(err, disperser.ErrMetadataNotFound) {
if errors.Is(err, dispcommon.ErrMetadataNotFound) {
s.metrics.HandleNotFoundRpcRequest("GetBlobStatus")
s.metrics.HandleNotFoundRequest("GetBlobStatus")
return nil, api.NewErrorNotFound("no metadata found for the requestID")
Expand Down Expand Up @@ -740,7 +741,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob
blobMetadata, err := s.blobStore.GetMetadataInBatch(ctx, batchHeaderHash32, blobIndex)
if err != nil {
s.logger.Error("Failed to retrieve blob metadata", "err", err)
if errors.Is(err, disperser.ErrMetadataNotFound) {
if errors.Is(err, dispcommon.ErrMetadataNotFound) {
s.metrics.HandleNotFoundRpcRequest("RetrieveBlob")
s.metrics.HandleNotFoundRequest("RetrieveBlob")
return nil, api.NewErrorNotFound("no metadata found for the given batch header hash and blob index")
Expand Down
5 changes: 3 additions & 2 deletions disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/common"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
Expand Down Expand Up @@ -65,7 +66,7 @@ func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey dispers
})

if item == nil {
return nil, fmt.Errorf("%w: metadata not found for key %s", disperser.ErrMetadataNotFound, blobKey)
return nil, fmt.Errorf("%w: metadata not found for key %s", common.ErrMetadataNotFound, blobKey)
}

if err != nil {
Expand Down Expand Up @@ -312,7 +313,7 @@ func (s *BlobMetadataStore) GetBlobMetadataInBatch(ctx context.Context, batchHea
}

if len(items) == 0 {
return nil, fmt.Errorf("%w: there is no metadata for batch %s and blob index %d", disperser.ErrMetadataNotFound, hexutil.Encode(batchHeaderHash[:]), blobIndex)
return nil, fmt.Errorf("%w: there is no metadata for batch %s and blob index %d", common.ErrMetadataNotFound, hexutil.Encode(batchHeaderHash[:]), blobIndex)
}

if len(items) > 1 {
Expand Down
2 changes: 1 addition & 1 deletion disperser/errors.go → disperser/common/errors.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package disperser
package common

import "errors"

Expand Down
25 changes: 13 additions & 12 deletions disperser/common/inmem/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/common"
)

// BlobStore is an in-memory implementation of the BlobStore interface
Expand Down Expand Up @@ -75,7 +76,7 @@ func (q *BlobStore) GetBlobContent(ctx context.Context, blobHash disperser.BlobH
if holder, ok := q.Blobs[blobHash]; ok {
return holder.Data, nil
} else {
return nil, disperser.ErrBlobNotFound
return nil, common.ErrBlobNotFound
}
}

Expand All @@ -93,7 +94,7 @@ func (q *BlobStore) MarkBlobConfirmed(ctx context.Context, existingMetadata *dis
}
blobKey := existingMetadata.GetBlobKey()
if _, ok := q.Metadata[blobKey]; !ok {
return nil, disperser.ErrBlobNotFound
return nil, common.ErrBlobNotFound
}
newMetadata := *existingMetadata
newMetadata.BlobStatus = disperser.Confirmed
Expand All @@ -106,7 +107,7 @@ func (q *BlobStore) MarkBlobDispersing(ctx context.Context, blobKey disperser.Bl
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.Metadata[blobKey]; !ok {
return disperser.ErrBlobNotFound
return common.ErrBlobNotFound
}
q.Metadata[blobKey].BlobStatus = disperser.Dispersing
return nil
Expand All @@ -117,7 +118,7 @@ func (q *BlobStore) MarkBlobInsufficientSignatures(ctx context.Context, existing
defer q.mu.Unlock()
blobKey := existingMetadata.GetBlobKey()
if _, ok := q.Metadata[blobKey]; !ok {
return nil, disperser.ErrBlobNotFound
return nil, common.ErrBlobNotFound
}
newMetadata := *existingMetadata
newMetadata.BlobStatus = disperser.InsufficientSignatures
Expand All @@ -130,7 +131,7 @@ func (q *BlobStore) MarkBlobFinalized(ctx context.Context, blobKey disperser.Blo
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.Metadata[blobKey]; !ok {
return disperser.ErrBlobNotFound
return common.ErrBlobNotFound
}

q.Metadata[blobKey].BlobStatus = disperser.Finalized
Expand All @@ -141,7 +142,7 @@ func (q *BlobStore) MarkBlobProcessing(ctx context.Context, blobKey disperser.Bl
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.Metadata[blobKey]; !ok {
return disperser.ErrBlobNotFound
return common.ErrBlobNotFound
}

q.Metadata[blobKey].BlobStatus = disperser.Processing
Expand All @@ -152,7 +153,7 @@ func (q *BlobStore) MarkBlobFailed(ctx context.Context, blobKey disperser.BlobKe
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.Metadata[blobKey]; !ok {
return disperser.ErrBlobNotFound
return common.ErrBlobNotFound
}

q.Metadata[blobKey].BlobStatus = disperser.Failed
Expand All @@ -163,7 +164,7 @@ func (q *BlobStore) IncrementBlobRetryCount(ctx context.Context, existingMetadat
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.Metadata[existingMetadata.GetBlobKey()]; !ok {
return disperser.ErrBlobNotFound
return common.ErrBlobNotFound
}

q.Metadata[existingMetadata.GetBlobKey()].NumRetries++
Expand All @@ -174,7 +175,7 @@ func (q *BlobStore) UpdateConfirmationBlockNumber(ctx context.Context, existingM
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.Metadata[existingMetadata.GetBlobKey()]; !ok {
return disperser.ErrBlobNotFound
return common.ErrBlobNotFound
}

if q.Metadata[existingMetadata.GetBlobKey()].ConfirmationInfo == nil {
Expand All @@ -196,7 +197,7 @@ func (q *BlobStore) GetBlobsByMetadata(ctx context.Context, metadata []*disperse
Data: holder.Data,
}
} else {
return nil, disperser.ErrBlobNotFound
return nil, common.ErrBlobNotFound
}
}
return blobs, nil
Expand Down Expand Up @@ -266,7 +267,7 @@ func (q *BlobStore) GetMetadataInBatch(ctx context.Context, batchHeaderHash [32]
}
}

return nil, disperser.ErrBlobNotFound
return nil, common.ErrBlobNotFound
}

func (q *BlobStore) GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHash [32]byte) ([]*disperser.BlobMetadata, error) {
Expand Down Expand Up @@ -327,7 +328,7 @@ func (q *BlobStore) GetBlobMetadata(ctx context.Context, blobKey disperser.BlobK
if meta, ok := q.Metadata[blobKey]; ok {
return meta, nil
}
return nil, disperser.ErrBlobNotFound
return nil, common.ErrBlobNotFound
}

func (q *BlobStore) GetBulkBlobMetadata(ctx context.Context, blobKeys []disperser.BlobKey) ([]*disperser.BlobMetadata, error) {
Expand Down
3 changes: 3 additions & 0 deletions disperser/common/v2/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v2
import (
pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
core "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/encoding"
)

type BlobStatus uint
Expand Down Expand Up @@ -65,4 +66,6 @@ type BlobMetadata struct {
RequestedAt uint64
// UpdatedAt is the Unix timestamp of when the blob was last updated in _nanoseconds_
UpdatedAt uint64

*encoding.FragmentInfo
}
Loading

0 comments on commit c63dd61

Please sign in to comment.