diff --git a/core/v2/types.go b/core/v2/types.go index d9254f94ba..e26f2669a9 100644 --- a/core/v2/types.go +++ b/core/v2/types.go @@ -13,6 +13,7 @@ import ( "github.com/Layr-Labs/eigenda/encoding" "github.com/consensys/gnark-crypto/ecc/bn254" "github.com/ethereum/go-ethereum/accounts/abi" + gethcommon "github.com/ethereum/go-ethereum/common" "golang.org/x/crypto/sha3" ) @@ -343,6 +344,52 @@ type BatchHeader struct { ReferenceBlockNumber uint64 } +// GetBatchHeaderHash returns the hash of the batch header +func (h BatchHeader) Hash() ([32]byte, error) { + var headerHash [32]byte + + // The order here has to match the field ordering of ReducedBatchHeader defined in IEigenDAServiceManager.sol + // ref: https://github.com/Layr-Labs/eigenda/blob/master/contracts/src/interfaces/IEigenDAServiceManager.sol#L43 + batchHeaderType, err := abi.NewType("tuple", "", []abi.ArgumentMarshaling{ + { + Name: "blobHeadersRoot", + Type: "bytes32", + }, + { + Name: "referenceBlockNumber", + Type: "uint32", + }, + }) + if err != nil { + return headerHash, err + } + + arguments := abi.Arguments{ + { + Type: batchHeaderType, + }, + } + + s := struct { + BlobHeadersRoot [32]byte + ReferenceBlockNumber uint32 + }{ + BlobHeadersRoot: h.BatchRoot, + ReferenceBlockNumber: uint32(h.ReferenceBlockNumber), + } + + bytes, err := arguments.Pack(s) + if err != nil { + return headerHash, err + } + + hasher := sha3.NewLegacyKeccak256() + hasher.Write(bytes) + copy(headerHash[:], hasher.Sum(nil)[:32]) + + return headerHash, nil +} + type Batch struct { BatchHeader *BatchHeader BlobCertificates []*BlobCertificate @@ -364,6 +411,27 @@ func (p BlobVersionParameters) MaxNumOperators() uint32 { return uint32(math.Floor(float64(p.NumChunks) * (1 - 1/(p.ReconstructionThreshold*float64(p.CodingRate))))) } +// DispersalRequest is a request to disperse a batch to a specific operator +type DispersalRequest struct { + core.OperatorID `dynamodbav:"-"` + OperatorAddress gethcommon.Address + Socket string + DispersedAt uint64 + + BatchHeader +} + +// DispersalResponse is a response to a dispersal request +type DispersalResponse struct { + *DispersalRequest + + RespondedAt uint64 + // Signature is the signature of the response by the operator + Signature [32]byte + // Error is the error message if the dispersal failed + Error string +} + const ( // We use uint8 to count the number of quorums, so we can have at most 255 quorums, // which means the max ID can not be larger than 254 (from 0 to 254, there are 255 diff --git a/core/v2/types_test.go b/core/v2/types_test.go index c2c9b908c8..74aab46b37 100644 --- a/core/v2/types_test.go +++ b/core/v2/types_test.go @@ -55,3 +55,17 @@ func TestBlobKeyFromHeader(t *testing.T) { // 0xb19d368345990c79744fe571fe99f427f35787b9383c55089fb5bd6a5c171bbc verified in solidity assert.Equal(t, "b19d368345990c79744fe571fe99f427f35787b9383c55089fb5bd6a5c171bbc", blobKey.Hex()) } + +func TestBatchHeaderHAsh(t *testing.T) { + batchRoot := [32]byte{} + copy(batchRoot[:], []byte("1")) + batchHeader := &v2.BatchHeader{ + ReferenceBlockNumber: 1, + BatchRoot: batchRoot, + } + + hash, err := batchHeader.Hash() + assert.NoError(t, err) + // 0x891d0936da4627f445ef193aad63afb173409af9e775e292e4e35aff790a45e2 verified in solidity + assert.Equal(t, "891d0936da4627f445ef193aad63afb173409af9e775e292e4e35aff790a45e2", hex.EncodeToString(hash[:])) +} diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store.go b/disperser/common/v2/blobstore/dynamo_metadata_store.go index f039be6044..96b471f0e8 100644 --- a/disperser/common/v2/blobstore/dynamo_metadata_store.go +++ b/disperser/common/v2/blobstore/dynamo_metadata_store.go @@ -2,6 +2,7 @@ package blobstore import ( "context" + "encoding/hex" "errors" "fmt" "strconv" @@ -9,7 +10,8 @@ import ( "time" commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" - core "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/core" + corev2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/disperser/common" v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" "github.com/Layr-Labs/eigenda/encoding" @@ -26,9 +28,12 @@ const ( OperatorDispersalIndexName = "OperatorDispersalIndex" OperatorResponseIndexName = "OperatorResponseIndex" - blobKeyPrefix = "BlobKey#" - blobMetadataSK = "BlobMetadata" - blobCertSK = "BlobCertificate" + blobKeyPrefix = "BlobKey#" + dispersalKeyPrefix = "Dispersal#" + blobMetadataSK = "BlobMetadata" + blobCertSK = "BlobCertificate" + dispersalRequestSKPrefix = "DispersalRequest#" + dispersalResponseSKPrefix = "DispersalResponse#" ) var ( @@ -71,7 +76,7 @@ func (s *BlobMetadataStore) PutBlobMetadata(ctx context.Context, blobMetadata *v return err } -func (s *BlobMetadataStore) UpdateBlobStatus(ctx context.Context, blobKey core.BlobKey, status v2.BlobStatus) error { +func (s *BlobMetadataStore) UpdateBlobStatus(ctx context.Context, blobKey corev2.BlobKey, status v2.BlobStatus) error { validStatuses := statusUpdatePrecondition[status] if len(validStatuses) == 0 { return fmt.Errorf("%w: invalid status transition to %s", ErrInvalidStateTransition, status.String()) @@ -114,7 +119,7 @@ func (s *BlobMetadataStore) UpdateBlobStatus(ctx context.Context, blobKey core.B return err } -func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey core.BlobKey) (*v2.BlobMetadata, error) { +func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey corev2.BlobKey) (*v2.BlobMetadata, error) { item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{ "PK": &types.AttributeValueMemberS{ Value: blobKeyPrefix + blobKey.Hex(), @@ -180,7 +185,7 @@ func (s *BlobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, st return count, nil } -func (s *BlobMetadataStore) PutBlobCertificate(ctx context.Context, blobCert *core.BlobCertificate, fragmentInfo *encoding.FragmentInfo) error { +func (s *BlobMetadataStore) PutBlobCertificate(ctx context.Context, blobCert *corev2.BlobCertificate, fragmentInfo *encoding.FragmentInfo) error { item, err := MarshalBlobCertificate(blobCert, fragmentInfo) if err != nil { return err @@ -194,7 +199,7 @@ func (s *BlobMetadataStore) PutBlobCertificate(ctx context.Context, blobCert *co return err } -func (s *BlobMetadataStore) GetBlobCertificate(ctx context.Context, blobKey core.BlobKey) (*core.BlobCertificate, *encoding.FragmentInfo, error) { +func (s *BlobMetadataStore) GetBlobCertificate(ctx context.Context, blobKey corev2.BlobKey) (*corev2.BlobCertificate, *encoding.FragmentInfo, error) { item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{ "PK": &types.AttributeValueMemberS{ Value: blobKeyPrefix + blobKey.Hex(), @@ -220,6 +225,120 @@ func (s *BlobMetadataStore) GetBlobCertificate(ctx context.Context, blobKey core return cert, fragmentInfo, nil } +// GetBlobCertificates returns the certificates for the given blob keys +// Note: the returned certificates are NOT necessarily ordered by the order of the input blob keys +func (s *BlobMetadataStore) GetBlobCertificates(ctx context.Context, blobKeys []corev2.BlobKey) ([]*corev2.BlobCertificate, []*encoding.FragmentInfo, error) { + keys := make([]map[string]types.AttributeValue, len(blobKeys)) + for i, blobKey := range blobKeys { + keys[i] = map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{ + Value: blobKeyPrefix + blobKey.Hex(), + }, + "SK": &types.AttributeValueMemberS{ + Value: blobCertSK, + }, + } + } + + items, err := s.dynamoDBClient.GetItems(ctx, s.tableName, keys) + if err != nil { + return nil, nil, err + } + + certs := make([]*corev2.BlobCertificate, len(items)) + fragmentInfos := make([]*encoding.FragmentInfo, len(items)) + for i, item := range items { + cert, fragmentInfo, err := UnmarshalBlobCertificate(item) + if err != nil { + return nil, nil, err + } + certs[i] = cert + fragmentInfos[i] = fragmentInfo + } + + return certs, fragmentInfos, nil +} + +func (s *BlobMetadataStore) PutDispersalRequest(ctx context.Context, req *corev2.DispersalRequest) error { + item, err := MarshalDispersalRequest(req) + if err != nil { + return err + } + + err = s.dynamoDBClient.PutItemWithCondition(ctx, s.tableName, item, "attribute_not_exists(PK) AND attribute_not_exists(SK)", nil, nil) + if errors.Is(err, commondynamodb.ErrConditionFailed) { + return common.ErrAlreadyExists + } + + return err +} + +func (s *BlobMetadataStore) GetDispersalRequest(ctx context.Context, batchHeaderHash [32]byte, operatorID core.OperatorID) (*corev2.DispersalRequest, error) { + item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{ + Value: dispersalKeyPrefix + hex.EncodeToString(batchHeaderHash[:]), + }, + "SK": &types.AttributeValueMemberS{ + Value: fmt.Sprintf("%s%s", dispersalRequestSKPrefix, operatorID.Hex()), + }, + }) + + if err != nil { + return nil, err + } + + if item == nil { + return nil, fmt.Errorf("%w: dispersal request not found for batch header hash %x and operator %s", common.ErrMetadataNotFound, batchHeaderHash, operatorID.Hex()) + } + + req, err := UnmarshalDispersalRequest(item) + if err != nil { + return nil, err + } + + return req, nil +} + +func (s *BlobMetadataStore) PutDispersalResponse(ctx context.Context, res *corev2.DispersalResponse) error { + item, err := MarshalDispersalResponse(res) + if err != nil { + return err + } + + err = s.dynamoDBClient.PutItemWithCondition(ctx, s.tableName, item, "attribute_not_exists(PK) AND attribute_not_exists(SK)", nil, nil) + if errors.Is(err, commondynamodb.ErrConditionFailed) { + return common.ErrAlreadyExists + } + + return err +} + +func (s *BlobMetadataStore) GetDispersalResponse(ctx context.Context, batchHeaderHash [32]byte, operatorID core.OperatorID) (*corev2.DispersalResponse, error) { + item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{ + Value: dispersalKeyPrefix + hex.EncodeToString(batchHeaderHash[:]), + }, + "SK": &types.AttributeValueMemberS{ + Value: fmt.Sprintf("%s%s", dispersalResponseSKPrefix, operatorID.Hex()), + }, + }) + + if err != nil { + return nil, err + } + + if item == nil { + return nil, fmt.Errorf("%w: dispersal response not found for batch header hash %x and operator %s", common.ErrMetadataNotFound, batchHeaderHash, operatorID.Hex()) + } + + res, err := UnmarshalDispersalResponse(item) + if err != nil { + return nil, err + } + + return res, nil +} + func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput { return &dynamodb.CreateTableInput{ AttributeDefinitions: []types.AttributeDefinition{ @@ -351,7 +470,7 @@ func MarshalBlobMetadata(metadata *v2.BlobMetadata) (commondynamodb.Item, error) return fields, nil } -func UnmarshalBlobKey(item commondynamodb.Item) (core.BlobKey, error) { +func UnmarshalBlobKey(item commondynamodb.Item) (corev2.BlobKey, error) { type Blob struct { PK string } @@ -359,11 +478,11 @@ func UnmarshalBlobKey(item commondynamodb.Item) (core.BlobKey, error) { blob := Blob{} err := attributevalue.UnmarshalMap(item, &blob) if err != nil { - return core.BlobKey{}, err + return corev2.BlobKey{}, err } bk := strings.TrimPrefix(blob.PK, blobKeyPrefix) - return core.HexToBlobKey(bk) + return corev2.HexToBlobKey(bk) } func UnmarshalBlobMetadata(item commondynamodb.Item) (*v2.BlobMetadata, error) { @@ -375,7 +494,7 @@ func UnmarshalBlobMetadata(item commondynamodb.Item) (*v2.BlobMetadata, error) { return &metadata, nil } -func MarshalBlobCertificate(blobCert *core.BlobCertificate, fragmentInfo *encoding.FragmentInfo) (commondynamodb.Item, error) { +func MarshalBlobCertificate(blobCert *corev2.BlobCertificate, fragmentInfo *encoding.FragmentInfo) (commondynamodb.Item, error) { fields, err := attributevalue.MarshalMap(blobCert) if err != nil { return nil, fmt.Errorf("failed to marshal blob certificate: %w", err) @@ -401,8 +520,8 @@ func MarshalBlobCertificate(blobCert *core.BlobCertificate, fragmentInfo *encodi return fields, nil } -func UnmarshalBlobCertificate(item commondynamodb.Item) (*core.BlobCertificate, *encoding.FragmentInfo, error) { - cert := core.BlobCertificate{} +func UnmarshalBlobCertificate(item commondynamodb.Item) (*corev2.BlobCertificate, *encoding.FragmentInfo, error) { + cert := corev2.BlobCertificate{} err := attributevalue.UnmarshalMap(item, &cert) if err != nil { return nil, nil, fmt.Errorf("failed to unmarshal blob certificate: %w", err) @@ -414,3 +533,117 @@ func UnmarshalBlobCertificate(item commondynamodb.Item) (*core.BlobCertificate, } return &cert, &fragmentInfo, nil } + +func UnmarshalBatchHeaderHash(item commondynamodb.Item) ([32]byte, error) { + type Object struct { + PK string + } + + obj := Object{} + err := attributevalue.UnmarshalMap(item, &obj) + if err != nil { + return [32]byte{}, err + } + + root := strings.TrimPrefix(obj.PK, dispersalKeyPrefix) + return hexToHash(root) +} + +func UnmarshalOperatorID(item commondynamodb.Item) (*core.OperatorID, error) { + type Object struct { + OperatorID string + } + + obj := Object{} + err := attributevalue.UnmarshalMap(item, &obj) + if err != nil { + return nil, err + } + + operatorID, err := core.OperatorIDFromHex(obj.OperatorID) + if err != nil { + return nil, err + } + + return &operatorID, nil +} + +func MarshalDispersalRequest(req *corev2.DispersalRequest) (commondynamodb.Item, error) { + fields, err := attributevalue.MarshalMap(req) + if err != nil { + return nil, fmt.Errorf("failed to marshal dispersal request: %w", err) + } + + batchHeaderHash, err := req.BatchHeader.Hash() + if err != nil { + return nil, fmt.Errorf("failed to hash batch header: %w", err) + } + hashstr := hex.EncodeToString(batchHeaderHash[:]) + + fields["PK"] = &types.AttributeValueMemberS{Value: dispersalKeyPrefix + hashstr} + fields["SK"] = &types.AttributeValueMemberS{Value: fmt.Sprintf("%s%s", dispersalRequestSKPrefix, req.OperatorID.Hex())} + fields["OperatorID"] = &types.AttributeValueMemberS{Value: req.OperatorID.Hex()} + + return fields, nil +} + +func UnmarshalDispersalRequest(item commondynamodb.Item) (*corev2.DispersalRequest, error) { + req := corev2.DispersalRequest{} + err := attributevalue.UnmarshalMap(item, &req) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal dispersal request: %w", err) + } + + operatorID, err := UnmarshalOperatorID(item) + if err != nil { + return nil, err + } + req.OperatorID = *operatorID + + return &req, nil +} + +func MarshalDispersalResponse(res *corev2.DispersalResponse) (commondynamodb.Item, error) { + fields, err := attributevalue.MarshalMap(res) + if err != nil { + return nil, fmt.Errorf("failed to marshal dispersal response: %w", err) + } + + batchHeaderHash, err := res.BatchHeader.Hash() + if err != nil { + return nil, fmt.Errorf("failed to hash batch header: %w", err) + } + hashstr := hex.EncodeToString(batchHeaderHash[:]) + + fields["PK"] = &types.AttributeValueMemberS{Value: dispersalKeyPrefix + hashstr} + fields["SK"] = &types.AttributeValueMemberS{Value: fmt.Sprintf("%s%s", dispersalResponseSKPrefix, res.OperatorID.Hex())} + fields["OperatorID"] = &types.AttributeValueMemberS{Value: res.OperatorID.Hex()} + + return fields, nil +} + +func UnmarshalDispersalResponse(item commondynamodb.Item) (*corev2.DispersalResponse, error) { + res := corev2.DispersalResponse{} + err := attributevalue.UnmarshalMap(item, &res) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal dispersal response: %w", err) + } + + operatorID, err := UnmarshalOperatorID(item) + if err != nil { + return nil, err + } + res.OperatorID = *operatorID + + return &res, nil +} + +func hexToHash(h string) ([32]byte, error) { + s := strings.TrimPrefix(h, "0x") + s = strings.TrimPrefix(s, "0X") + b, err := hex.DecodeString(s) + if err != nil { + return [32]byte{}, err + } + return [32]byte(b), nil +} diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go index 9440050402..71c14f1be7 100644 --- a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go +++ b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go @@ -2,6 +2,7 @@ package blobstore_test import ( "context" + "encoding/hex" "math/big" "testing" "time" @@ -14,6 +15,7 @@ import ( "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" "github.com/Layr-Labs/eigenda/encoding" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + gethcommon "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/assert" ) @@ -215,6 +217,65 @@ func TestBlobMetadataStoreUpdateBlobStatus(t *testing.T) { }) } +func TestBlobMetadataStoreDispersals(t *testing.T) { + ctx := context.Background() + opID := core.OperatorID{0, 1} + dispersalRequest := &corev2.DispersalRequest{ + OperatorID: opID, + OperatorAddress: gethcommon.HexToAddress("0x1234567"), + Socket: "socket", + DispersedAt: uint64(time.Now().UnixNano()), + + BatchHeader: corev2.BatchHeader{ + BatchRoot: [32]byte{1, 2, 3}, + ReferenceBlockNumber: 100, + }, + } + + err := blobMetadataStore.PutDispersalRequest(ctx, dispersalRequest) + assert.NoError(t, err) + + bhh, err := dispersalRequest.BatchHeader.Hash() + assert.NoError(t, err) + + fetchedRequest, err := blobMetadataStore.GetDispersalRequest(ctx, bhh, dispersalRequest.OperatorID) + assert.NoError(t, err) + assert.Equal(t, dispersalRequest, fetchedRequest) + + // attempt to put dispersal request with the same key should fail + err = blobMetadataStore.PutDispersalRequest(ctx, dispersalRequest) + assert.ErrorIs(t, err, common.ErrAlreadyExists) + + dispersalResponse := &corev2.DispersalResponse{ + DispersalRequest: dispersalRequest, + RespondedAt: uint64(time.Now().UnixNano()), + Signature: [32]byte{1, 1, 1}, + Error: "error", + } + + err = blobMetadataStore.PutDispersalResponse(ctx, dispersalResponse) + assert.NoError(t, err) + + fetchedResponse, err := blobMetadataStore.GetDispersalResponse(ctx, bhh, dispersalRequest.OperatorID) + assert.NoError(t, err) + assert.Equal(t, dispersalResponse, fetchedResponse) + + // attempt to put dispersal response with the same key should fail + err = blobMetadataStore.PutDispersalResponse(ctx, dispersalResponse) + assert.ErrorIs(t, err, common.ErrAlreadyExists) + + deleteItems(t, []commondynamodb.Key{ + { + "PK": &types.AttributeValueMemberS{Value: "BatchHeader#" + hex.EncodeToString(bhh[:])}, + "SK": &types.AttributeValueMemberS{Value: "DispersalRequest#" + opID.Hex()}, + }, + { + "PK": &types.AttributeValueMemberS{Value: "BatchHeader#" + hex.EncodeToString(bhh[:])}, + "SK": &types.AttributeValueMemberS{Value: "DispersalResponse#" + opID.Hex()}, + }, + }) +} + func deleteItems(t *testing.T, keys []commondynamodb.Key) { failed, err := dynamoClient.DeleteItems(context.Background(), metadataTableName, keys) assert.NoError(t, err)