diff --git a/core/mock/v2/validator.go b/core/mock/v2/validator.go new file mode 100644 index 0000000000..591257a97c --- /dev/null +++ b/core/mock/v2/validator.go @@ -0,0 +1,31 @@ +package v2 + +import ( + "context" + + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/core" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/stretchr/testify/mock" +) + +// MockShardValidator is a mock implementation of ShardValidator +type MockShardValidator struct { + mock.Mock +} + +var _ corev2.ShardValidator = (*MockShardValidator)(nil) + +func NewMockShardValidator() *MockShardValidator { + return &MockShardValidator{} +} + +func (v *MockShardValidator) ValidateBatchHeader(ctx context.Context, header *corev2.BatchHeader, blobCerts []*corev2.BlobCertificate) error { + args := v.Called() + return args.Error(0) +} + +func (v *MockShardValidator) ValidateBlobs(ctx context.Context, blobs []*corev2.BlobShard, pool common.WorkerPool, state *core.OperatorState) error { + args := v.Called() + return args.Error(0) +} diff --git a/core/v2/core_test.go b/core/v2/core_test.go index 6184313771..7404ba92b4 100644 --- a/core/v2/core_test.go +++ b/core/v2/core_test.go @@ -216,7 +216,7 @@ func checkBatchByUniversalVerifier( for id := range state.IndexedOperators { - val := corev2.NewShardValidator(v, cst, id) + val := corev2.NewShardValidator(v, id) blobs := packagedBlobs[id] diff --git a/core/v2/serialization.go b/core/v2/serialization.go index 1a67f2ab6d..2864f3bd61 100644 --- a/core/v2/serialization.go +++ b/core/v2/serialization.go @@ -7,6 +7,8 @@ import ( "math/big" "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/wealdtech/go-merkletree/v2" + "github.com/wealdtech/go-merkletree/v2/keccak256" "golang.org/x/crypto/sha3" ) @@ -292,6 +294,24 @@ func DeserializeBatchHeader(data []byte) (*BatchHeader, error) { return &h, nil } +func BuildMerkleTree(certs []*BlobCertificate) (*merkletree.MerkleTree, error) { + leafs := make([][]byte, len(certs)) + for i, cert := range certs { + leaf, err := cert.Hash() + if err != nil { + return nil, fmt.Errorf("failed to compute blob header hash: %w", err) + } + leafs[i] = leaf[:] + } + + tree, err := merkletree.NewTree(merkletree.WithData(leafs), merkletree.WithHashType(keccak256.New())) + if err != nil { + return nil, err + } + + return tree, nil +} + func encode(obj any) ([]byte, error) { var buf bytes.Buffer enc := gob.NewEncoder(&buf) diff --git a/core/v2/validator.go b/core/v2/validator.go index bb47ae9c6f..cf16d9f41c 100644 --- a/core/v2/validator.go +++ b/core/v2/validator.go @@ -1,6 +1,7 @@ package v2 import ( + "bytes" "context" "errors" "fmt" @@ -15,27 +16,32 @@ var ( ErrBlobQuorumSkip = errors.New("blob skipped for a quorum before verification") ) +type ShardValidator interface { + ValidateBatchHeader(ctx context.Context, header *BatchHeader, blobCerts []*BlobCertificate) error + ValidateBlobs(ctx context.Context, blobs []*BlobShard, pool common.WorkerPool, state *core.OperatorState) error +} + type BlobShard struct { *BlobCertificate Bundles core.Bundles } // shardValidator implements the validation logic that a DA node should apply to its received data -type ShardValidator struct { +type shardValidator struct { verifier encoding.Verifier - chainState core.ChainState operatorID core.OperatorID } -func NewShardValidator(v encoding.Verifier, cst core.ChainState, operatorID core.OperatorID) *ShardValidator { - return &ShardValidator{ +var _ ShardValidator = (*shardValidator)(nil) + +func NewShardValidator(v encoding.Verifier, operatorID core.OperatorID) *shardValidator { + return &shardValidator{ verifier: v, - chainState: cst, operatorID: operatorID, } } -func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShard, operatorState *core.OperatorState) ([]*encoding.Frame, *Assignment, error) { +func (v *shardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShard, operatorState *core.OperatorState) ([]*encoding.Frame, *Assignment, error) { // Check if the operator is a member of the quorum if _, ok := operatorState.Operators[quorum]; !ok { @@ -72,7 +78,28 @@ func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar return chunks, &assignment, nil } -func (v *ShardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard, pool common.WorkerPool, state *core.OperatorState) error { +func (v *shardValidator) ValidateBatchHeader(ctx context.Context, header *BatchHeader, blobCerts []*BlobCertificate) error { + if header == nil { + return fmt.Errorf("batch header is nil") + } + + if len(blobCerts) == 0 { + return fmt.Errorf("no blob certificates") + } + + tree, err := BuildMerkleTree(blobCerts) + if err != nil { + return fmt.Errorf("failed to build merkle tree: %v", err) + } + + if !bytes.Equal(tree.Root(), header.BatchRoot[:]) { + return fmt.Errorf("batch root does not match") + } + + return nil +} + +func (v *shardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard, pool common.WorkerPool, state *core.OperatorState) error { var err error subBatchMap := make(map[encoding.EncodingParams]*encoding.SubBatch) blobCommitmentList := make([]encoding.BlobCommitments, len(blobs)) @@ -152,7 +179,7 @@ func (v *ShardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard, for _, blobCommitments := range blobCommitmentList { blobCommitments := blobCommitments pool.Submit(func() { - v.VerifyBlobLengthWorker(blobCommitments, out) + v.verifyBlobLengthWorker(blobCommitments, out) }) } // check if commitments are equivalent @@ -171,7 +198,7 @@ func (v *ShardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard, return nil } -func (v *ShardValidator) universalVerifyWorker(params encoding.EncodingParams, subBatch *encoding.SubBatch, out chan error) { +func (v *shardValidator) universalVerifyWorker(params encoding.EncodingParams, subBatch *encoding.SubBatch, out chan error) { err := v.verifier.UniversalVerifySubBatch(params, subBatch.Samples, subBatch.NumBlobs) if err != nil { @@ -182,7 +209,7 @@ func (v *ShardValidator) universalVerifyWorker(params encoding.EncodingParams, s out <- nil } -func (v *ShardValidator) VerifyBlobLengthWorker(blobCommitments encoding.BlobCommitments, out chan error) { +func (v *shardValidator) verifyBlobLengthWorker(blobCommitments encoding.BlobCommitments, out chan error) { err := v.verifier.VerifyBlobLength(blobCommitments) if err != nil { out <- err diff --git a/disperser/controller/dispatcher.go b/disperser/controller/dispatcher.go index c8e7486c3f..584e4a4390 100644 --- a/disperser/controller/dispatcher.go +++ b/disperser/controller/dispatcher.go @@ -15,8 +15,6 @@ import ( "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" "github.com/Layr-Labs/eigensdk-go/logging" gethcommon "github.com/ethereum/go-ethereum/common" - "github.com/wealdtech/go-merkletree/v2" - "github.com/wealdtech/go-merkletree/v2/keccak256" ) var errNoBlobsToDispatch = errors.New("no blobs to dispatch") @@ -292,7 +290,7 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64) ReferenceBlockNumber: referenceBlockNumber, } - tree, err := BuildMerkleTree(certs) + tree, err := corev2.BuildMerkleTree(certs) if err != nil { return nil, fmt.Errorf("failed to build merkle tree: %w", err) } @@ -397,21 +395,3 @@ func (d *Dispatcher) updateBatchStatus(ctx context.Context, keys []corev2.BlobKe } return nil } - -func BuildMerkleTree(certs []*corev2.BlobCertificate) (*merkletree.MerkleTree, error) { - leafs := make([][]byte, len(certs)) - for i, cert := range certs { - leaf, err := cert.Hash() - if err != nil { - return nil, fmt.Errorf("failed to compute blob header hash: %w", err) - } - leafs[i] = leaf[:] - } - - tree, err := merkletree.NewTree(merkletree.WithData(leafs), merkletree.WithHashType(keccak256.New())) - if err != nil { - return nil, err - } - - return tree, nil -} diff --git a/disperser/controller/dispatcher_test.go b/disperser/controller/dispatcher_test.go index c390f897a6..718384fc6a 100644 --- a/disperser/controller/dispatcher_test.go +++ b/disperser/controller/dispatcher_test.go @@ -57,7 +57,7 @@ func TestDispatcherHandleBatch(t *testing.T) { ctx := context.Background() // Get batch header hash to mock signatures - merkleTree, err := controller.BuildMerkleTree(objs.blobCerts) + merkleTree, err := corev2.BuildMerkleTree(objs.blobCerts) require.NoError(t, err) require.NotNil(t, merkleTree) require.NotNil(t, merkleTree.Root()) @@ -200,7 +200,7 @@ func TestDispatcherBuildMerkleTree(t *testing.T) { RelayKeys: []corev2.RelayKey{0, 1, 2}, }, } - merkleTree, err := controller.BuildMerkleTree(certs) + merkleTree, err := corev2.BuildMerkleTree(certs) require.NoError(t, err) require.NotNil(t, merkleTree) require.NotNil(t, merkleTree.Root()) diff --git a/node/grpc/server_v2.go b/node/grpc/server_v2.go index a24e04a0ee..9fc53178f3 100644 --- a/node/grpc/server_v2.go +++ b/node/grpc/server_v2.go @@ -2,11 +2,16 @@ package grpc import ( "context" + "encoding/hex" + "fmt" "runtime" "github.com/Layr-Labs/eigenda/api" pb "github.com/Layr-Labs/eigenda/api/grpc/node/v2" "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/common/kvstore" + "github.com/Layr-Labs/eigenda/core" + corev2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/node" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/shirou/gopsutil/mem" @@ -53,5 +58,99 @@ func (s *ServerV2) NodeInfo(ctx context.Context, in *pb.NodeInfoRequest) (*pb.No } func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*pb.StoreChunksReply, error) { - return &pb.StoreChunksReply{}, api.NewErrorUnimplemented() + batch, err := s.validateStoreChunksRequest(in) + if err != nil { + return nil, err + } + + batchHeaderHash, err := batch.BatchHeader.Hash() + if err != nil { + return nil, api.NewErrorInternal(fmt.Sprintf("invalid batch header: %v", err)) + } + + operatorState, err := s.node.ChainState.GetOperatorStateByOperator(ctx, uint(batch.BatchHeader.ReferenceBlockNumber), s.node.Config.ID) + if err != nil { + return nil, err + } + + blobShards, rawBundles, err := s.node.DownloadBundles(ctx, batch, operatorState) + if err != nil { + return nil, api.NewErrorInternal(fmt.Sprintf("failed to download batch: %v", err)) + } + + type storeResult struct { + keys []kvstore.Key + err error + } + storeChan := make(chan storeResult) + go func() { + keys, err := s.node.StoreV2.StoreBatch(batch, rawBundles) + if err != nil { + storeChan <- storeResult{ + keys: nil, + err: fmt.Errorf("failed to store batch: %v", err), + } + return + } + + storeChan <- storeResult{ + keys: keys, + err: nil, + } + }() + + err = s.node.ValidateBatchV2(ctx, batch, blobShards, operatorState) + if err != nil { + res := <-storeChan + if len(res.keys) > 0 { + if deleteErr := s.node.StoreV2.DeleteKeys(res.keys); deleteErr != nil { + s.logger.Error("failed to delete keys", "err", deleteErr, "batchHeaderHash", hex.EncodeToString(batchHeaderHash[:])) + } + } + return nil, api.NewErrorInternal(fmt.Sprintf("failed to validate batch: %v", err)) + } + + res := <-storeChan + if res.err != nil { + return nil, api.NewErrorInternal(fmt.Sprintf("failed to store batch: %v", res.err)) + } + + sig := s.node.KeyPair.SignMessage(batchHeaderHash).Bytes() + return &pb.StoreChunksReply{ + Signature: sig[:], + }, nil +} + +// validateStoreChunksRequest validates the StoreChunksRequest and returns deserialized batch in the request +func (s *ServerV2) validateStoreChunksRequest(req *pb.StoreChunksRequest) (*corev2.Batch, error) { + if req.GetBatch() == nil { + return nil, api.NewErrorInvalidArg("missing batch in request") + } + + batch, err := corev2.BatchFromProtobuf(req.GetBatch()) + if err != nil { + return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to deserialize batch: %v", err)) + } + + return batch, nil +} + +func (s *ServerV2) GetChunks(ctx context.Context, in *pb.GetChunksRequest) (*pb.GetChunksReply, error) { + blobKey, err := corev2.BytesToBlobKey(in.GetBlobKey()) + if err != nil { + return nil, api.NewErrorInvalidArg(fmt.Sprintf("invalid blob key: %v", err)) + } + + if corev2.MaxQuorumID < in.GetQuorumId() { + return nil, api.NewErrorInvalidArg("invalid quorum ID") + } + quorumID := core.QuorumID(in.GetQuorumId()) + chunks, err := s.node.StoreV2.GetChunks(blobKey, quorumID) + if err != nil { + return nil, api.NewErrorInternal(fmt.Sprintf("failed to get chunks: %v", err)) + } + + return &pb.GetChunksReply{ + Chunks: chunks, + }, nil } diff --git a/node/grpc/server_v2_test.go b/node/grpc/server_v2_test.go index e3d125fb88..b83d36e72d 100644 --- a/node/grpc/server_v2_test.go +++ b/node/grpc/server_v2_test.go @@ -2,118 +2,345 @@ package grpc_test import ( "context" - "fmt" + "errors" "os" "testing" - commonpb "github.com/Layr-Labs/eigenda/api/grpc/common/v2" + "github.com/Layr-Labs/eigenda/api/clients" + clientsmock "github.com/Layr-Labs/eigenda/api/clients/mock" + pbcommon "github.com/Layr-Labs/eigenda/api/grpc/common/v2" pbv2 "github.com/Layr-Labs/eigenda/api/grpc/node/v2" "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/common/kvstore" commonmock "github.com/Layr-Labs/eigenda/common/mock" "github.com/Layr-Labs/eigenda/core" coremock "github.com/Layr-Labs/eigenda/core/mock" + coremockv2 "github.com/Layr-Labs/eigenda/core/mock/v2" + v2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/node" "github.com/Layr-Labs/eigenda/node/grpc" + nodemock "github.com/Layr-Labs/eigenda/node/mock" "github.com/Layr-Labs/eigensdk-go/metrics" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) -func newTestServerV2(t *testing.T, mockValidator bool) *grpc.ServerV2 { - return newTestServerV2WithConfig(t, mockValidator, makeConfig(t)) +type testComponents struct { + server *grpc.ServerV2 + node *node.Node + store *nodemock.MockStoreV2 + validator *coremockv2.MockShardValidator + relayClient *clientsmock.MockRelayClient } -func newTestServerV2WithConfig(t *testing.T, mockValidator bool, config *node.Config) *grpc.ServerV2 { - var err error - keyPair, err = core.GenRandomBlsKeys() - if err != nil { - panic("failed to create a BLS Key") - } - opID = [32]byte{} - copy(opID[:], []byte(fmt.Sprintf("%d", 3))) +func newTestComponents(t *testing.T, config *node.Config) *testComponents { + keyPair, err := core.GenRandomBlsKeys() + require.NoError(t, err) + opID = [32]byte{0} loggerConfig := common.DefaultLoggerConfig() logger, err := common.NewLogger(loggerConfig) - if err != nil { - panic("failed to create a logger") - } - + require.NoError(t, err) err = os.MkdirAll(config.DbPath, os.ModePerm) - if err != nil { - panic("failed to create a directory for db") - } + require.NoError(t, err) noopMetrics := metrics.NewNoopMetrics() reg := prometheus.NewRegistry() tx := &coremock.MockWriter{} ratelimiter := &commonmock.NoopRatelimiter{} - var val core.ShardValidator - - if mockValidator { - mockVal := coremock.NewMockShardValidator() - mockVal.On("ValidateBlobs", mock.Anything, mock.Anything, mock.Anything).Return(nil) - mockVal.On("ValidateBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil) - val = mockVal - } else { + val := coremockv2.NewMockShardValidator() + metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090", opID, -1, tx, chainState) - _, v, err := makeTestComponents() - if err != nil { - panic("failed to create test encoder") - } + s := nodemock.NewMockStoreV2() + relay := clientsmock.NewRelayClient() + node := &node.Node{ + Config: config, + Logger: logger, + KeyPair: keyPair, + Metrics: metrics, + StoreV2: s, + ChainState: chainState, + ValidatorV2: val, + RelayClient: relay, + } + server := grpc.NewServerV2(config, node, logger, ratelimiter) + return &testComponents{ + server: server, + node: node, + store: s, + validator: val, + relayClient: relay, + } +} - asn := &core.StdAssignmentCoordinator{} +func TestV2NodeInfoRequest(t *testing.T) { + c := newTestComponents(t, makeConfig(t)) + resp, err := c.server.NodeInfo(context.Background(), &pbv2.NodeInfoRequest{}) + assert.True(t, resp.Semver == "0.0.0") + assert.True(t, err == nil) +} - cst, err := coremock.MakeChainDataMock(map[uint8]int{ - 0: 10, - 1: 10, - 2: 10, - }) - if err != nil { - panic("failed to create test encoder") - } +func TestV2StoreChunksInputValidation(t *testing.T) { + c := newTestComponents(t, makeConfig(t)) + _, batch, _ := nodemock.MockBatch(t) + batchProto, err := batch.ToProtobuf() + require.NoError(t, err) - val = core.NewShardValidator(v, asn, cst, opID) + req := &pbv2.StoreChunksRequest{ + Batch: &pbcommon.Batch{}, } + _, err = c.server.StoreChunks(context.Background(), req) + require.Error(t, err) + s, ok := status.FromError(err) + require.True(t, ok) + assert.Equal(t, s.Code(), codes.InvalidArgument) - metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090", opID, -1, tx, chainState) - store, err := node.NewLevelDBStore(config.DbPath, logger, metrics, 1e9, 1e9) - if err != nil { - panic("failed to create a new levelDB store") + req = &pbv2.StoreChunksRequest{ + Batch: &pbcommon.Batch{ + Header: &pbcommon.BatchHeader{}, + BlobCertificates: batchProto.BlobCertificates, + }, } + _, err = c.server.StoreChunks(context.Background(), req) + require.Error(t, err) + s, ok = status.FromError(err) + require.True(t, ok) + assert.Equal(t, s.Code(), codes.InvalidArgument) - node := &node.Node{ - Config: config, - Logger: logger, - KeyPair: keyPair, - Metrics: metrics, - Store: store, - ChainState: chainState, - Validator: val, + req = &pbv2.StoreChunksRequest{ + Batch: &pbcommon.Batch{ + Header: batchProto.Header, + BlobCertificates: []*pbcommon.BlobCertificate{}, + }, } - return grpc.NewServerV2(config, node, logger, ratelimiter) + _, err = c.server.StoreChunks(context.Background(), req) + require.Error(t, err) + s, ok = status.FromError(err) + require.True(t, ok) + assert.Equal(t, s.Code(), codes.InvalidArgument) } -func TestV2NodeInfoRequest(t *testing.T) { - server := newTestServerV2(t, true) - resp, err := server.NodeInfo(context.Background(), &pbv2.NodeInfoRequest{}) - assert.True(t, resp.Semver == "0.0.0") - assert.True(t, err == nil) +func TestV2StoreChunksSuccess(t *testing.T) { + c := newTestComponents(t, makeConfig(t)) + + blobKeys, batch, bundles := nodemock.MockBatch(t) + batchProto, err := batch.ToProtobuf() + require.NoError(t, err) + + c.validator.On("ValidateBlobs", mock.Anything, mock.Anything, mock.Anything).Return(nil) + c.validator.On("ValidateBatchHeader", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + bundles00Bytes, err := bundles[0][0].Serialize() + require.NoError(t, err) + bundles01Bytes, err := bundles[0][1].Serialize() + require.NoError(t, err) + bundles10Bytes, err := bundles[1][0].Serialize() + require.NoError(t, err) + bundles11Bytes, err := bundles[1][1].Serialize() + require.NoError(t, err) + bundles21Bytes, err := bundles[2][1].Serialize() + require.NoError(t, err) + bundles22Bytes, err := bundles[2][2].Serialize() + require.NoError(t, err) + c.relayClient.On("GetChunksByRange", mock.Anything, v2.RelayKey(0), mock.Anything).Return([][]byte{bundles00Bytes, bundles01Bytes, bundles21Bytes, bundles22Bytes}, nil).Run(func(args mock.Arguments) { + requests := args.Get(2).([]*clients.ChunkRequestByRange) + require.Len(t, requests, 4) + require.Equal(t, blobKeys[0], requests[0].BlobKey) + require.Equal(t, blobKeys[0], requests[1].BlobKey) + require.Equal(t, blobKeys[2], requests[2].BlobKey) + require.Equal(t, blobKeys[2], requests[3].BlobKey) + }) + c.relayClient.On("GetChunksByRange", mock.Anything, v2.RelayKey(1), mock.Anything).Return([][]byte{bundles10Bytes, bundles11Bytes}, nil).Run(func(args mock.Arguments) { + requests := args.Get(2).([]*clients.ChunkRequestByRange) + require.Len(t, requests, 2) + require.Equal(t, blobKeys[1], requests[0].BlobKey) + require.Equal(t, blobKeys[1], requests[1].BlobKey) + }) + c.store.On("StoreBatch", batch, mock.Anything).Return(nil, nil) + reply, err := c.server.StoreChunks(context.Background(), &pbv2.StoreChunksRequest{ + Batch: batchProto, + }) + require.NoError(t, err) + require.NotNil(t, reply.GetSignature()) + sigBytes := reply.GetSignature() + point, err := new(core.Signature).Deserialize(sigBytes) + require.NoError(t, err) + sig := &core.Signature{G1Point: point} + bhh, err := batch.BatchHeader.Hash() + require.NoError(t, err) + require.True(t, sig.Verify(c.node.KeyPair.GetPubKeyG2(), bhh)) } -func TestV2StoreChunks(t *testing.T) { - server := newTestServerV2(t, true) - _, err := server.StoreChunks(context.Background(), &pbv2.StoreChunksRequest{ - Batch: &commonpb.Batch{}, +func TestV2StoreChunksDownloadFailure(t *testing.T) { + c := newTestComponents(t, makeConfig(t)) + + _, batch, _ := nodemock.MockBatch(t) + batchProto, err := batch.ToProtobuf() + require.NoError(t, err) + + c.validator.On("ValidateBlobs", mock.Anything, mock.Anything, mock.Anything).Return(nil) + c.validator.On("ValidateBatchHeader", mock.Anything, mock.Anything, mock.Anything).Return(nil) + relayErr := errors.New("error") + c.relayClient.On("GetChunksByRange", mock.Anything, v2.RelayKey(0), mock.Anything).Return([][]byte{}, relayErr) + c.relayClient.On("GetChunksByRange", mock.Anything, v2.RelayKey(1), mock.Anything).Return([][]byte{}, relayErr) + reply, err := c.server.StoreChunks(context.Background(), &pbv2.StoreChunksRequest{ + Batch: batchProto, }) - assert.ErrorContains(t, err, "not implemented") + require.Nil(t, reply.GetSignature()) + require.Error(t, err) + s, ok := status.FromError(err) + require.True(t, ok) + assert.Equal(t, s.Code(), codes.Internal) } -func TestV2GetChunks(t *testing.T) { - server := newTestServerV2(t, true) +func TestV2StoreChunksStorageFailure(t *testing.T) { + c := newTestComponents(t, makeConfig(t)) - _, err := server.GetChunks(context.Background(), &pbv2.GetChunksRequest{ - BlobKey: []byte{0}, + blobKeys, batch, bundles := nodemock.MockBatch(t) + batchProto, err := batch.ToProtobuf() + require.NoError(t, err) + + c.validator.On("ValidateBlobs", mock.Anything, mock.Anything, mock.Anything).Return(nil) + c.validator.On("ValidateBatchHeader", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + bundles00Bytes, err := bundles[0][0].Serialize() + require.NoError(t, err) + bundles01Bytes, err := bundles[0][1].Serialize() + require.NoError(t, err) + bundles10Bytes, err := bundles[1][0].Serialize() + require.NoError(t, err) + bundles11Bytes, err := bundles[1][1].Serialize() + require.NoError(t, err) + bundles21Bytes, err := bundles[2][1].Serialize() + require.NoError(t, err) + bundles22Bytes, err := bundles[2][2].Serialize() + require.NoError(t, err) + c.relayClient.On("GetChunksByRange", mock.Anything, v2.RelayKey(0), mock.Anything).Return([][]byte{bundles00Bytes, bundles01Bytes, bundles21Bytes, bundles22Bytes}, nil).Run(func(args mock.Arguments) { + requests := args.Get(2).([]*clients.ChunkRequestByRange) + require.Len(t, requests, 4) + require.Equal(t, blobKeys[0], requests[0].BlobKey) + require.Equal(t, blobKeys[0], requests[1].BlobKey) + require.Equal(t, blobKeys[2], requests[2].BlobKey) + require.Equal(t, blobKeys[2], requests[3].BlobKey) }) - assert.ErrorContains(t, err, "not implemented") + c.relayClient.On("GetChunksByRange", mock.Anything, v2.RelayKey(1), mock.Anything).Return([][]byte{bundles10Bytes, bundles11Bytes}, nil).Run(func(args mock.Arguments) { + requests := args.Get(2).([]*clients.ChunkRequestByRange) + require.Len(t, requests, 2) + require.Equal(t, blobKeys[1], requests[0].BlobKey) + require.Equal(t, blobKeys[1], requests[1].BlobKey) + }) + c.store.On("StoreBatch", batch, mock.Anything).Return(nil, errors.New("error")) + reply, err := c.server.StoreChunks(context.Background(), &pbv2.StoreChunksRequest{ + Batch: batchProto, + }) + require.Nil(t, reply.GetSignature()) + require.Error(t, err) + s, ok := status.FromError(err) + require.True(t, ok) + assert.Equal(t, s.Code(), codes.Internal) +} + +func TestV2StoreChunksValidationFailure(t *testing.T) { + c := newTestComponents(t, makeConfig(t)) + + blobKeys, batch, bundles := nodemock.MockBatch(t) + batchProto, err := batch.ToProtobuf() + require.NoError(t, err) + + c.validator.On("ValidateBlobs", mock.Anything, mock.Anything, mock.Anything).Return(errors.New("error")) + c.validator.On("ValidateBatchHeader", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + bundles00Bytes, err := bundles[0][0].Serialize() + require.NoError(t, err) + bundles01Bytes, err := bundles[0][1].Serialize() + require.NoError(t, err) + bundles10Bytes, err := bundles[1][0].Serialize() + require.NoError(t, err) + bundles11Bytes, err := bundles[1][1].Serialize() + require.NoError(t, err) + bundles21Bytes, err := bundles[2][1].Serialize() + require.NoError(t, err) + bundles22Bytes, err := bundles[2][2].Serialize() + require.NoError(t, err) + c.relayClient.On("GetChunksByRange", mock.Anything, v2.RelayKey(0), mock.Anything).Return([][]byte{bundles00Bytes, bundles01Bytes, bundles21Bytes, bundles22Bytes}, nil).Run(func(args mock.Arguments) { + requests := args.Get(2).([]*clients.ChunkRequestByRange) + require.Len(t, requests, 4) + require.Equal(t, blobKeys[0], requests[0].BlobKey) + require.Equal(t, blobKeys[0], requests[1].BlobKey) + require.Equal(t, blobKeys[2], requests[2].BlobKey) + require.Equal(t, blobKeys[2], requests[3].BlobKey) + }) + c.relayClient.On("GetChunksByRange", mock.Anything, v2.RelayKey(1), mock.Anything).Return([][]byte{bundles10Bytes, bundles11Bytes}, nil).Run(func(args mock.Arguments) { + requests := args.Get(2).([]*clients.ChunkRequestByRange) + require.Len(t, requests, 2) + require.Equal(t, blobKeys[1], requests[0].BlobKey) + require.Equal(t, blobKeys[1], requests[1].BlobKey) + }) + c.store.On("StoreBatch", batch, mock.Anything).Return([]kvstore.Key{mockKey{}}, nil) + c.store.On("DeleteKeys", mock.Anything, mock.Anything).Return(nil) + reply, err := c.server.StoreChunks(context.Background(), &pbv2.StoreChunksRequest{ + Batch: batchProto, + }) + require.Nil(t, reply.GetSignature()) + require.Error(t, err) + s, ok := status.FromError(err) + require.True(t, ok) + assert.Equal(t, s.Code(), codes.Internal) + + c.store.AssertCalled(t, "DeleteKeys", mock.Anything, mock.Anything) +} + +func TestV2GetChunksInputValidation(t *testing.T) { + c := newTestComponents(t, makeConfig(t)) + ctx := context.Background() + req := &pbv2.GetChunksRequest{ + BlobKey: []byte{0}, + } + _, err := c.server.GetChunks(ctx, req) + require.Error(t, err) + s, ok := status.FromError(err) + require.True(t, ok) + assert.Equal(t, s.Code(), codes.InvalidArgument) + + bk := [32]byte{0} + maxUInt32 := uint32(0xFFFFFFFF) + req = &pbv2.GetChunksRequest{ + BlobKey: bk[:], + QuorumId: maxUInt32, + } + _, err = c.server.GetChunks(ctx, req) + require.Error(t, err) + s, ok = status.FromError(err) + require.True(t, ok) + assert.Equal(t, s.Code(), codes.InvalidArgument) +} + +type mockKey struct{} +type mockKeyBuilder struct{} + +var _ kvstore.Key = mockKey{} +var _ kvstore.KeyBuilder = mockKeyBuilder{} + +func (mockKey) Bytes() []byte { + return []byte{0} +} + +func (mockKey) Raw() []byte { + return []byte{0} +} + +func (mockKey) Builder() kvstore.KeyBuilder { + return &mockKeyBuilder{} +} + +func (mockKeyBuilder) TableName() string { + return "tableName" +} + +func (mockKeyBuilder) Key(data []byte) kvstore.Key { + return mockKey{} } diff --git a/node/mock/store_v2.go b/node/mock/store_v2.go new file mode 100644 index 0000000000..e7d473c3d4 --- /dev/null +++ b/node/mock/store_v2.go @@ -0,0 +1,41 @@ +package mock + +import ( + "github.com/Layr-Labs/eigenda/common/kvstore" + "github.com/Layr-Labs/eigenda/core" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/node" + "github.com/stretchr/testify/mock" +) + +// MockStoreV2 is a mock implementation ofStoreV2 +type MockStoreV2 struct { + mock.Mock +} + +var _ node.StoreV2 = (*MockStoreV2)(nil) + +func NewMockStoreV2() *MockStoreV2 { + return &MockStoreV2{} +} + +func (m *MockStoreV2) StoreBatch(batch *corev2.Batch, rawBundles []*node.RawBundles) ([]kvstore.Key, error) { + args := m.Called(batch, rawBundles) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).([]kvstore.Key), args.Error(1) +} + +func (m *MockStoreV2) DeleteKeys(keys []kvstore.Key) error { + args := m.Called(keys) + return args.Error(0) +} + +func (m *MockStoreV2) GetChunks(blobKey corev2.BlobKey, quorum core.QuorumID) ([][]byte, error) { + args := m.Called(blobKey, quorum) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).([][]byte), args.Error(1) +} diff --git a/node/mock/testdata.go b/node/mock/testdata.go new file mode 100644 index 0000000000..ae48051429 --- /dev/null +++ b/node/mock/testdata.go @@ -0,0 +1,201 @@ +package mock + +import ( + "math/big" + "testing" + + "github.com/Layr-Labs/eigenda/core" + v2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/consensys/gnark-crypto/ecc/bn254" + "github.com/consensys/gnark-crypto/ecc/bn254/fp" + "github.com/consensys/gnark-crypto/ecc/bn254/fr" + "github.com/stretchr/testify/require" +) + +func MockBatch(t *testing.T) ([]v2.BlobKey, *v2.Batch, []map[core.QuorumID]core.Bundle) { + commitments := MockCommitment(t) + bh0 := &v2.BlobHeader{ + BlobVersion: 0, + BlobCommitments: commitments, + QuorumNumbers: []core.QuorumID{0, 1}, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x123", + BinIndex: 5, + CumulativePayment: big.NewInt(100), + }, + Signature: []byte{1, 2, 3}, + } + bh1 := &v2.BlobHeader{ + BlobVersion: 0, + BlobCommitments: commitments, + QuorumNumbers: []core.QuorumID{0, 1}, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x456", + BinIndex: 6, + CumulativePayment: big.NewInt(200), + }, + Signature: []byte{1, 2, 3}, + } + bh2 := &v2.BlobHeader{ + BlobVersion: 0, + BlobCommitments: commitments, + QuorumNumbers: []core.QuorumID{1, 2}, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x789", + BinIndex: 7, + CumulativePayment: big.NewInt(300), + }, + Signature: []byte{1, 2, 3}, + } + blobKey0, err := bh0.BlobKey() + require.NoError(t, err) + blobKey1, err := bh1.BlobKey() + require.NoError(t, err) + blobKey2, err := bh2.BlobKey() + require.NoError(t, err) + + // blobCert 0 and blobCert 2 will be downloaded from relay 0 + // blobCert 1 will be downloaded from relay 1 + blobCert0 := &v2.BlobCertificate{ + BlobHeader: bh0, + RelayKeys: []v2.RelayKey{0}, + } + blobCert1 := &v2.BlobCertificate{ + BlobHeader: bh1, + RelayKeys: []v2.RelayKey{1}, + } + blobCert2 := &v2.BlobCertificate{ + BlobHeader: bh2, + RelayKeys: []v2.RelayKey{0}, + } + + bundles0 := map[core.QuorumID]core.Bundle{ + 0: { + { + Proof: encoding.Proof(*core.NewG1Point(big.NewInt(1), big.NewInt(2)).G1Affine), + Coeffs: []fr.Element{ + {1, 2, 3, 4}, + {5, 6, 7, 8}, + }, + }, + }, + 1: { + { + Proof: encoding.Proof(*core.NewG1Point(big.NewInt(3), big.NewInt(4)).G1Affine), + Coeffs: []fr.Element{ + {9, 10, 11, 12}, + {13, 14, 15, 16}, + }, + }, + { + Proof: encoding.Proof(*core.NewG1Point(big.NewInt(5), big.NewInt(6)).G1Affine), + Coeffs: []fr.Element{ + {17, 18, 19, 20}, + {21, 22, 23, 24}, + }, + }, + }, + } + bundles1 := map[core.QuorumID]core.Bundle{ + 0: { + { + Proof: encoding.Proof(*core.NewG1Point(big.NewInt(7), big.NewInt(8)).G1Affine), + Coeffs: []fr.Element{ + {25, 26, 27, 28}, + {29, 30, 31, 32}, + }, + }, + { + Proof: encoding.Proof(*core.NewG1Point(big.NewInt(9), big.NewInt(10)).G1Affine), + Coeffs: []fr.Element{ + {33, 34, 35, 36}, + {37, 38, 39, 40}, + }, + }, + }, + 1: { + { + Proof: encoding.Proof(*core.NewG1Point(big.NewInt(11), big.NewInt(12)).G1Affine), + Coeffs: []fr.Element{ + {41, 42, 43, 44}, + {45, 46, 47, 48}, + }, + }, + }, + } + bundles2 := map[core.QuorumID]core.Bundle{ + 1: { + { + Proof: encoding.Proof(*core.NewG1Point(big.NewInt(13), big.NewInt(14)).G1Affine), + Coeffs: []fr.Element{ + {49, 50, 51, 52}, + {53, 54, 55, 56}, + }, + }, + { + Proof: encoding.Proof(*core.NewG1Point(big.NewInt(15), big.NewInt(16)).G1Affine), + Coeffs: []fr.Element{ + {57, 58, 59, 60}, + {61, 62, 63, 64}, + }, + }, + }, + 2: { + { + Proof: encoding.Proof(*core.NewG1Point(big.NewInt(17), big.NewInt(18)).G1Affine), + Coeffs: []fr.Element{ + {65, 66, 67, 68}, + {69, 70, 71, 72}, + }, + }, + }, + } + + certs := []*v2.BlobCertificate{blobCert0, blobCert1, blobCert2} + tree, err := v2.BuildMerkleTree(certs) + require.NoError(t, err) + var root [32]byte + copy(root[:], tree.Root()) + return []v2.BlobKey{blobKey0, blobKey1, blobKey2}, &v2.Batch{ + BatchHeader: &v2.BatchHeader{ + BatchRoot: root, + ReferenceBlockNumber: 100, + }, + BlobCertificates: certs, + }, []map[core.QuorumID]core.Bundle{bundles0, bundles1, bundles2} +} + +func MockCommitment(t *testing.T) encoding.BlobCommitments { + var X1, Y1 fp.Element + X1 = *X1.SetBigInt(big.NewInt(1)) + Y1 = *Y1.SetBigInt(big.NewInt(2)) + + var lengthXA0, lengthXA1, lengthYA0, lengthYA1 fp.Element + _, err := lengthXA0.SetString("10857046999023057135944570762232829481370756359578518086990519993285655852781") + require.NoError(t, err) + _, err = lengthXA1.SetString("11559732032986387107991004021392285783925812861821192530917403151452391805634") + require.NoError(t, err) + _, err = lengthYA0.SetString("8495653923123431417604973247489272438418190587263600148770280649306958101930") + require.NoError(t, err) + _, err = lengthYA1.SetString("4082367875863433681332203403145435568316851327593401208105741076214120093531") + require.NoError(t, err) + + var lengthProof, lengthCommitment bn254.G2Affine + lengthProof.X.A0 = lengthXA0 + lengthProof.X.A1 = lengthXA1 + lengthProof.Y.A0 = lengthYA0 + lengthProof.Y.A1 = lengthYA1 + + lengthCommitment = lengthProof + + return encoding.BlobCommitments{ + Commitment: &encoding.G1Commitment{ + X: X1, + Y: Y1, + }, + LengthCommitment: (*encoding.G2Commitment)(&lengthCommitment), + LengthProof: (*encoding.G2Commitment)(&lengthProof), + Length: 10, + } +} diff --git a/node/node.go b/node/node.go index d2061ff2ae..547f8bfc90 100644 --- a/node/node.go +++ b/node/node.go @@ -28,6 +28,7 @@ import ( "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/eth" "github.com/Layr-Labs/eigenda/core/indexer" + corev2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/Layr-Labs/eigensdk-go/metrics" rpccalls "github.com/Layr-Labs/eigensdk-go/metrics/collectors/rpc_calls" @@ -56,8 +57,10 @@ type Node struct { Metrics *Metrics NodeApi *nodeapi.NodeApi Store *Store + StoreV2 StoreV2 ChainState core.ChainState Validator core.ShardValidator + ValidatorV2 corev2.ShardValidator Transactor core.Writer PubIPProvider pubip.Provider OperatorSocketsFilterer indexer.OperatorSocketsFilterer @@ -130,6 +133,7 @@ func NewNode( } asgn := &core.StdAssignmentCoordinator{} validator := core.NewShardValidator(v, asgn, cst, config.ID) + validatorV2 := corev2.NewShardValidator(v, config.ID) // Resolve the BLOCK_STALE_MEASURE and STORE_DURATION_BLOCKS. var blockStaleMeasure, storeDurationBlocks uint32 @@ -177,9 +181,11 @@ func NewNode( Metrics: metrics, NodeApi: nodeApi, Store: store, + StoreV2: nil, ChainState: cst, Transactor: tx, Validator: validator, + ValidatorV2: validatorV2, PubIPProvider: pubIPProvider, OperatorSocketsFilterer: socketsFilterer, ChainID: chainID, diff --git a/node/node_v2.go b/node/node_v2.go index bd1701fb4b..5dd91992a5 100644 --- a/node/node_v2.go +++ b/node/node_v2.go @@ -33,16 +33,11 @@ type RawBundles struct { Bundles map[core.QuorumID][]byte } -func (n *Node) DownloadBundles(ctx context.Context, batch *corev2.Batch) ([]*corev2.BlobShard, []*RawBundles, error) { +func (n *Node) DownloadBundles(ctx context.Context, batch *corev2.Batch, operatorState *core.OperatorState) ([]*corev2.BlobShard, []*RawBundles, error) { if n.RelayClient == nil { return nil, nil, fmt.Errorf("relay client is not set") } - operatorState, err := n.ChainState.GetOperatorStateByOperator(ctx, uint(batch.BatchHeader.ReferenceBlockNumber), n.Config.ID) - if err != nil { - return nil, nil, err - } - blobShards := make([]*corev2.BlobShard, len(batch.BlobCertificates)) rawBundles := make([]*RawBundles, len(batch.BlobCertificates)) requests := make(map[corev2.RelayKey]*relayRequest) @@ -117,6 +112,7 @@ func (n *Node) DownloadBundles(ctx context.Context, batch *corev2.Batch) ([]*cor } pool.StopWait() + var err error for i := 0; i < len(requests); i++ { resp := <-bundleChan if resp.err != nil { @@ -134,3 +130,20 @@ func (n *Node) DownloadBundles(ctx context.Context, batch *corev2.Batch) ([]*cor return blobShards, rawBundles, nil } + +func (n *Node) ValidateBatchV2( + ctx context.Context, + batch *corev2.Batch, + blobShards []*corev2.BlobShard, + operatorState *core.OperatorState, +) error { + if n.ValidatorV2 == nil { + return fmt.Errorf("store v2 is not set") + } + + if err := n.ValidatorV2.ValidateBatchHeader(ctx, batch.BatchHeader, batch.BlobCertificates); err != nil { + return fmt.Errorf("failed to validate batch header: %v", err) + } + pool := workerpool.New(n.Config.NumBatchValidators) + return n.ValidatorV2.ValidateBlobs(ctx, blobShards, pool, operatorState) +} diff --git a/node/node_v2_test.go b/node/node_v2_test.go index 36d6e60ce2..3fc9795dde 100644 --- a/node/node_v2_test.go +++ b/node/node_v2_test.go @@ -3,172 +3,21 @@ package node_test import ( "context" "fmt" - "math/big" "testing" "github.com/Layr-Labs/eigenda/api/clients" "github.com/Layr-Labs/eigenda/core" v2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/encoding" - "github.com/consensys/gnark-crypto/ecc/bn254" - "github.com/consensys/gnark-crypto/ecc/bn254/fp" - "github.com/consensys/gnark-crypto/ecc/bn254/fr" + nodemock "github.com/Layr-Labs/eigenda/node/mock" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) -func mockBatch(t *testing.T) ([]v2.BlobKey, *v2.Batch, []map[core.QuorumID]core.Bundle) { - commitments := mockCommitment(t) - bh0 := &v2.BlobHeader{ - BlobVersion: 0, - BlobCommitments: commitments, - QuorumNumbers: []core.QuorumID{0, 1}, - PaymentMetadata: core.PaymentMetadata{ - AccountID: "0x123", - BinIndex: 5, - CumulativePayment: big.NewInt(100), - }, - Signature: []byte{1, 2, 3}, - } - bh1 := &v2.BlobHeader{ - BlobVersion: 0, - BlobCommitments: commitments, - QuorumNumbers: []core.QuorumID{0, 1}, - PaymentMetadata: core.PaymentMetadata{ - AccountID: "0x456", - BinIndex: 6, - CumulativePayment: big.NewInt(200), - }, - Signature: []byte{1, 2, 3}, - } - bh2 := &v2.BlobHeader{ - BlobVersion: 0, - BlobCommitments: commitments, - QuorumNumbers: []core.QuorumID{1, 2}, - PaymentMetadata: core.PaymentMetadata{ - AccountID: "0x789", - BinIndex: 7, - CumulativePayment: big.NewInt(300), - }, - Signature: []byte{1, 2, 3}, - } - blobKey0, err := bh0.BlobKey() - require.NoError(t, err) - blobKey1, err := bh1.BlobKey() - require.NoError(t, err) - blobKey2, err := bh2.BlobKey() - require.NoError(t, err) - - // blobCert 0 and blobCert 2 will be downloaded from relay 0 - // blobCert 1 will be downloaded from relay 1 - blobCert0 := &v2.BlobCertificate{ - BlobHeader: bh0, - RelayKeys: []v2.RelayKey{0}, - } - blobCert1 := &v2.BlobCertificate{ - BlobHeader: bh1, - RelayKeys: []v2.RelayKey{1}, - } - blobCert2 := &v2.BlobCertificate{ - BlobHeader: bh2, - RelayKeys: []v2.RelayKey{0}, - } - - bundles0 := map[core.QuorumID]core.Bundle{ - 0: { - { - Proof: encoding.Proof(*core.NewG1Point(big.NewInt(1), big.NewInt(2)).G1Affine), - Coeffs: []fr.Element{ - {1, 2, 3, 4}, - {5, 6, 7, 8}, - }, - }, - }, - 1: { - { - Proof: encoding.Proof(*core.NewG1Point(big.NewInt(3), big.NewInt(4)).G1Affine), - Coeffs: []fr.Element{ - {9, 10, 11, 12}, - {13, 14, 15, 16}, - }, - }, - { - Proof: encoding.Proof(*core.NewG1Point(big.NewInt(5), big.NewInt(6)).G1Affine), - Coeffs: []fr.Element{ - {17, 18, 19, 20}, - {21, 22, 23, 24}, - }, - }, - }, - } - bundles1 := map[core.QuorumID]core.Bundle{ - 0: { - { - Proof: encoding.Proof(*core.NewG1Point(big.NewInt(7), big.NewInt(8)).G1Affine), - Coeffs: []fr.Element{ - {25, 26, 27, 28}, - {29, 30, 31, 32}, - }, - }, - { - Proof: encoding.Proof(*core.NewG1Point(big.NewInt(9), big.NewInt(10)).G1Affine), - Coeffs: []fr.Element{ - {33, 34, 35, 36}, - {37, 38, 39, 40}, - }, - }, - }, - 1: { - { - Proof: encoding.Proof(*core.NewG1Point(big.NewInt(11), big.NewInt(12)).G1Affine), - Coeffs: []fr.Element{ - {41, 42, 43, 44}, - {45, 46, 47, 48}, - }, - }, - }, - } - bundles2 := map[core.QuorumID]core.Bundle{ - 1: { - { - Proof: encoding.Proof(*core.NewG1Point(big.NewInt(13), big.NewInt(14)).G1Affine), - Coeffs: []fr.Element{ - {49, 50, 51, 52}, - {53, 54, 55, 56}, - }, - }, - { - Proof: encoding.Proof(*core.NewG1Point(big.NewInt(15), big.NewInt(16)).G1Affine), - Coeffs: []fr.Element{ - {57, 58, 59, 60}, - {61, 62, 63, 64}, - }, - }, - }, - 2: { - { - Proof: encoding.Proof(*core.NewG1Point(big.NewInt(17), big.NewInt(18)).G1Affine), - Coeffs: []fr.Element{ - {65, 66, 67, 68}, - {69, 70, 71, 72}, - }, - }, - }, - } - - return []v2.BlobKey{blobKey0, blobKey1, blobKey2}, &v2.Batch{ - BatchHeader: &v2.BatchHeader{ - BatchRoot: [32]byte{1, 1, 1}, - ReferenceBlockNumber: 100, - }, - BlobCertificates: []*v2.BlobCertificate{blobCert0, blobCert1, blobCert2}, - }, []map[core.QuorumID]core.Bundle{bundles0, bundles1, bundles2} -} - func TestDownloadBundles(t *testing.T) { c := newComponents(t) ctx := context.Background() - blobKeys, batch, bundles := mockBatch(t) + blobKeys, batch, bundles := nodemock.MockBatch(t) blobCerts := batch.BlobCertificates bundles00Bytes, err := bundles[0][0].Serialize() @@ -197,7 +46,9 @@ func TestDownloadBundles(t *testing.T) { require.Equal(t, blobKeys[1], requests[0].BlobKey) require.Equal(t, blobKeys[1], requests[1].BlobKey) }) - blobShards, rawBundles, err := c.node.DownloadBundles(ctx, batch) + state, err := c.node.ChainState.GetOperatorState(ctx, uint(10), []core.QuorumID{0, 1, 2}) + require.NoError(t, err) + blobShards, rawBundles, err := c.node.DownloadBundles(ctx, batch, state) require.NoError(t, err) require.Len(t, blobShards, 3) require.Equal(t, blobCerts[0], blobShards[0].BlobCertificate) @@ -238,7 +89,7 @@ func TestDownloadBundles(t *testing.T) { func TestDownloadBundlesFail(t *testing.T) { c := newComponents(t) ctx := context.Background() - blobKeys, batch, bundles := mockBatch(t) + blobKeys, batch, bundles := nodemock.MockBatch(t) bundles00Bytes, err := bundles[0][0].Serialize() require.NoError(t, err) @@ -263,47 +114,14 @@ func TestDownloadBundlesFail(t *testing.T) { require.Equal(t, blobKeys[1], requests[0].BlobKey) require.Equal(t, blobKeys[1], requests[1].BlobKey) }) - - blobShards, rawBundles, err := c.node.DownloadBundles(ctx, batch) + state, err := c.node.ChainState.GetOperatorState(ctx, uint(10), []core.QuorumID{0, 1, 2}) + require.NoError(t, err) + blobShards, rawBundles, err := c.node.DownloadBundles(ctx, batch, state) require.Error(t, err) require.Nil(t, blobShards) require.Nil(t, rawBundles) } -func mockCommitment(t *testing.T) encoding.BlobCommitments { - var X1, Y1 fp.Element - X1 = *X1.SetBigInt(big.NewInt(1)) - Y1 = *Y1.SetBigInt(big.NewInt(2)) - - var lengthXA0, lengthXA1, lengthYA0, lengthYA1 fp.Element - _, err := lengthXA0.SetString("10857046999023057135944570762232829481370756359578518086990519993285655852781") - require.NoError(t, err) - _, err = lengthXA1.SetString("11559732032986387107991004021392285783925812861821192530917403151452391805634") - require.NoError(t, err) - _, err = lengthYA0.SetString("8495653923123431417604973247489272438418190587263600148770280649306958101930") - require.NoError(t, err) - _, err = lengthYA1.SetString("4082367875863433681332203403145435568316851327593401208105741076214120093531") - require.NoError(t, err) - - var lengthProof, lengthCommitment bn254.G2Affine - lengthProof.X.A0 = lengthXA0 - lengthProof.X.A1 = lengthXA1 - lengthProof.Y.A0 = lengthYA0 - lengthProof.Y.A1 = lengthYA1 - - lengthCommitment = lengthProof - - return encoding.BlobCommitments{ - Commitment: &encoding.G1Commitment{ - X: X1, - Y: Y1, - }, - LengthCommitment: (*encoding.G2Commitment)(&lengthCommitment), - LengthProof: (*encoding.G2Commitment)(&lengthProof), - Length: 10, - } -} - func bundleEqual(t *testing.T, expected, actual core.Bundle) { for i := range expected { frameEqual(t, expected[i], actual[i]) diff --git a/node/store_v2.go b/node/store_v2.go index c7b02ac6e8..c5979e72fe 100644 --- a/node/store_v2.go +++ b/node/store_v2.go @@ -2,7 +2,6 @@ package node import ( "bytes" - "context" "encoding/binary" "fmt" "time" @@ -19,40 +18,53 @@ const ( BundleTableName = "bundles" ) -type StoreV2 struct { +type StoreV2 interface { + StoreBatch(batch *corev2.Batch, rawBundles []*RawBundles) ([]kvstore.Key, error) + DeleteKeys(keys []kvstore.Key) error + GetChunks(blobKey corev2.BlobKey, quorum core.QuorumID) ([][]byte, error) +} + +type storeV2 struct { db kvstore.TableStore logger logging.Logger ttl time.Duration } -func NewLevelDBStoreV2(db kvstore.TableStore, logger logging.Logger) *StoreV2 { - return &StoreV2{ +var _ StoreV2 = &storeV2{} + +func NewLevelDBStoreV2(db kvstore.TableStore, logger logging.Logger) *storeV2 { + return &storeV2{ db: db, logger: logger, } } -func (s *StoreV2) StoreBatch(ctx context.Context, batch *corev2.Batch, rawBundles []*RawBundles) error { +func (s *storeV2) StoreBatch(batch *corev2.Batch, rawBundles []*RawBundles) ([]kvstore.Key, error) { dbBatch := s.db.NewTTLBatch() + keys := make([]kvstore.Key, 0) batchHeaderKeyBuilder, err := s.db.GetKeyBuilder(BatchHeaderTableName) if err != nil { - return fmt.Errorf("failed to get key builder for batch header: %v", err) + return nil, fmt.Errorf("failed to get key builder for batch header: %v", err) } batchHeaderHash, err := batch.BatchHeader.Hash() if err != nil { - return fmt.Errorf("failed to hash batch header: %v", err) + return nil, fmt.Errorf("failed to hash batch header: %v", err) } // Store batch header + batchHeaderKey := batchHeaderKeyBuilder.Key(batchHeaderHash[:]) + if _, err = s.db.Get(batchHeaderKey); err == nil { + return nil, ErrBatchAlreadyExist + } batchHeaderBytes, err := batch.BatchHeader.Serialize() if err != nil { - return fmt.Errorf("failed to serialize batch header: %v", err) + return nil, fmt.Errorf("failed to serialize batch header: %v", err) } - batchHeaderKey := batchHeaderKeyBuilder.Key(batchHeaderHash[:]) + keys = append(keys, batchHeaderKey) dbBatch.PutWithTTL(batchHeaderKey, batchHeaderBytes, s.ttl) // Store blob shards @@ -60,38 +72,76 @@ func (s *StoreV2) StoreBatch(ctx context.Context, batch *corev2.Batch, rawBundle // Store blob certificate blobCertificateKeyBuilder, err := s.db.GetKeyBuilder(BlobCertificateTableName) if err != nil { - return fmt.Errorf("failed to get key builder for blob certificate: %v", err) + return nil, fmt.Errorf("failed to get key builder for blob certificate: %v", err) } blobKey, err := bundles.BlobCertificate.BlobHeader.BlobKey() if err != nil { - return fmt.Errorf("failed to get blob key: %v", err) + return nil, fmt.Errorf("failed to get blob key: %v", err) } blobCertificateKey := blobCertificateKeyBuilder.Key(blobKey[:]) blobCertificateBytes, err := bundles.BlobCertificate.Serialize() if err != nil { - return fmt.Errorf("failed to serialize blob certificate: %v", err) + return nil, fmt.Errorf("failed to serialize blob certificate: %v", err) } + keys = append(keys, blobCertificateKey) dbBatch.PutWithTTL(blobCertificateKey, blobCertificateBytes, s.ttl) // Store bundles for quorum, bundle := range bundles.Bundles { bundlesKeyBuilder, err := s.db.GetKeyBuilder(BundleTableName) if err != nil { - return fmt.Errorf("failed to get key builder for bundles: %v", err) + return nil, fmt.Errorf("failed to get key builder for bundles: %v", err) } k, err := BundleKey(blobKey, quorum) if err != nil { - return fmt.Errorf("failed to get key for bundles: %v", err) + return nil, fmt.Errorf("failed to get key for bundles: %v", err) } + keys = append(keys, bundlesKeyBuilder.Key(k)) dbBatch.PutWithTTL(bundlesKeyBuilder.Key(k), bundle, s.ttl) } } + if err := dbBatch.Apply(); err != nil { + return nil, fmt.Errorf("failed to apply batch: %v", err) + } + + return keys, nil +} + +func (s *storeV2) DeleteKeys(keys []kvstore.Key) error { + dbBatch := s.db.NewTTLBatch() + for _, key := range keys { + dbBatch.Delete(key) + } return dbBatch.Apply() } +func (s *storeV2) GetChunks(blobKey corev2.BlobKey, quorum core.QuorumID) ([][]byte, error) { + bundlesKeyBuilder, err := s.db.GetKeyBuilder(BundleTableName) + if err != nil { + return nil, fmt.Errorf("failed to get key builder for bundles: %v", err) + } + + k, err := BundleKey(blobKey, quorum) + if err != nil { + return nil, fmt.Errorf("failed to get key for bundles: %v", err) + } + + bundle, err := s.db.Get(bundlesKeyBuilder.Key(k)) + if err != nil { + return nil, fmt.Errorf("failed to get bundle: %v", err) + } + + chunks, _, err := DecodeChunks(bundle) + if err != nil { + return nil, fmt.Errorf("failed to decode chunks: %v", err) + } + + return chunks, nil +} + func BundleKey(blobKey corev2.BlobKey, quorumID core.QuorumID) ([]byte, error) { buf := bytes.NewBuffer(blobKey[:]) err := binary.Write(buf, binary.LittleEndian, quorumID) diff --git a/node/store_v2_test.go b/node/store_v2_test.go index b64b936ecb..2a78c80800 100644 --- a/node/store_v2_test.go +++ b/node/store_v2_test.go @@ -1,7 +1,6 @@ package node_test import ( - "context" "testing" "github.com/Layr-Labs/eigenda/common/kvstore" @@ -9,14 +8,14 @@ import ( "github.com/Layr-Labs/eigenda/core" corev2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/node" + nodemock "github.com/Layr-Labs/eigenda/node/mock" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestStoreBatchV2(t *testing.T) { - ctx := context.Background() - _, batch, bundles := mockBatch(t) + _, batch, bundles := nodemock.MockBatch(t) blobShards := make([]*corev2.BlobShard, len(batch.BlobCertificates)) rawBundles := make([]*node.RawBundles, len(batch.BlobCertificates)) @@ -42,8 +41,9 @@ func TestStoreBatchV2(t *testing.T) { defer func() { _ = db.Shutdown() }() - err := s.StoreBatch(ctx, batch, rawBundles) + keys, err := s.StoreBatch(batch, rawBundles) require.NoError(t, err) + require.Len(t, keys, 10) tables := db.GetTables() require.ElementsMatch(t, []string{node.BatchHeaderTableName, node.BlobCertificateTableName, node.BundleTableName}, tables) @@ -90,9 +90,100 @@ func TestStoreBatchV2(t *testing.T) { require.Equal(t, bundle, bundleBytes) } } + + // Try to store the same batch again + _, err = s.StoreBatch(batch, rawBundles) + require.ErrorIs(t, err, node.ErrBatchAlreadyExist) + + // Check deletion + err = s.DeleteKeys(keys) + require.NoError(t, err) + + bhhBytes, err = db.Get(batchHeaderKeyBuilder.Key(bhh[:])) + require.Error(t, err) + require.Empty(t, bhhBytes) + for _, cert := range batch.BlobCertificates { + blobKey, err := cert.BlobHeader.BlobKey() + require.NoError(t, err) + blobCertKey := blobCertKeyBuilder.Key(blobKey[:]) + blobCertBytes, err := db.Get(blobCertKey) + require.Error(t, err) + require.Empty(t, blobCertBytes) + } + + for _, bundles := range rawBundles { + blobKey, err := bundles.BlobCertificate.BlobHeader.BlobKey() + require.NoError(t, err) + for quorum := range bundles.Bundles { + k, err := node.BundleKey(blobKey, quorum) + require.NoError(t, err) + bundleBytes, err := db.Get(bundleKeyBuilder.Key(k)) + require.Error(t, err) + require.Empty(t, bundleBytes) + } + } +} + +func TestGetChunks(t *testing.T) { + blobKeys, batch, bundles := nodemock.MockBatch(t) + + blobShards := make([]*corev2.BlobShard, len(batch.BlobCertificates)) + rawBundles := make([]*node.RawBundles, len(batch.BlobCertificates)) + for i, cert := range batch.BlobCertificates { + blobShards[i] = &corev2.BlobShard{ + BlobCertificate: cert, + Bundles: make(map[core.QuorumID]core.Bundle), + } + rawBundles[i] = &node.RawBundles{ + BlobCertificate: cert, + Bundles: make(map[core.QuorumID][]byte), + } + + for quorum, bundle := range bundles[i] { + blobShards[i].Bundles[quorum] = bundle + bundleBytes, err := bundle.Serialize() + assert.NoError(t, err) + rawBundles[i].Bundles[quorum] = bundleBytes + } + } + + s, db := createStoreV2(t) + defer func() { + _ = db.Shutdown() + }() + _, err := s.StoreBatch(batch, rawBundles) + require.NoError(t, err) + + chunks, err := s.GetChunks(blobKeys[0], 0) + require.NoError(t, err) + require.Len(t, chunks, len(bundles[0][0])) + + chunks, err = s.GetChunks(blobKeys[0], 1) + require.NoError(t, err) + require.Len(t, chunks, len(bundles[0][1])) + + chunks, err = s.GetChunks(blobKeys[1], 0) + require.NoError(t, err) + require.Len(t, chunks, len(bundles[1][0])) + + chunks, err = s.GetChunks(blobKeys[1], 1) + require.NoError(t, err) + require.Len(t, chunks, len(bundles[1][1])) + + chunks, err = s.GetChunks(blobKeys[2], 1) + require.NoError(t, err) + require.Len(t, chunks, len(bundles[2][1])) + + chunks, err = s.GetChunks(blobKeys[2], 2) + require.NoError(t, err) + require.Len(t, chunks, len(bundles[2][2])) + + // wrong quorum + _, err = s.GetChunks(blobKeys[0], 2) + require.Error(t, err) } -func createStoreV2(t *testing.T) (*node.StoreV2, kvstore.TableStore) { +func createStoreV2(t *testing.T) (node.StoreV2, kvstore.TableStore) { logger := logging.NewNoopLogger() config := tablestore.DefaultLevelDBConfig(t.TempDir()) config.Schema = []string{node.BatchHeaderTableName, node.BlobCertificateTableName, node.BundleTableName}