From 9ebf2a6bccf06e14ac687212e285b9feb2ec95c7 Mon Sep 17 00:00:00 2001 From: Ian Shim Date: Wed, 18 Dec 2024 20:25:49 -0800 Subject: [PATCH] v2 retriever --- api/clients/v2/mock/retrieval_client.go | 27 +++ api/docs/eigenda-protos.html | 120 ++++++++++ api/docs/eigenda-protos.md | 76 +++++++ api/docs/retriever.html | 38 ++-- api/docs/retriever.md | 35 ++- api/grpc/retriever/v2/retriever.pb.go | 248 +++++++++++++++++++++ api/grpc/retriever/v2/retriever_grpc.pb.go | 113 ++++++++++ api/proto/retriever/v2/retriever.proto | 39 ++++ core/data.go | 4 + core/v2/types.go | 3 + go.mod | 1 + retriever/cmd/main.go | 94 ++++---- retriever/config.go | 37 ++- retriever/flags/flags.go | 20 +- retriever/v2/server.go | 85 +++++++ retriever/v2/server_test.go | 134 +++++++++++ tools/traffic/config/config.go | 8 +- 17 files changed, 955 insertions(+), 127 deletions(-) create mode 100644 api/clients/v2/mock/retrieval_client.go create mode 100644 api/grpc/retriever/v2/retriever.pb.go create mode 100644 api/grpc/retriever/v2/retriever_grpc.pb.go create mode 100644 api/proto/retriever/v2/retriever.proto create mode 100644 retriever/v2/server.go create mode 100644 retriever/v2/server_test.go diff --git a/api/clients/v2/mock/retrieval_client.go b/api/clients/v2/mock/retrieval_client.go new file mode 100644 index 0000000000..6f0e23b723 --- /dev/null +++ b/api/clients/v2/mock/retrieval_client.go @@ -0,0 +1,27 @@ +package mock + +import ( + "context" + + "github.com/Layr-Labs/eigenda/api/clients/v2" + "github.com/Layr-Labs/eigenda/core" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/stretchr/testify/mock" +) + +type MockRetrievalClient struct { + mock.Mock +} + +var _ clients.RetrievalClient = (*MockRetrievalClient)(nil) + +func NewRetrievalClient() *MockRetrievalClient { + return &MockRetrievalClient{} +} + +func (c *MockRetrievalClient) GetBlob(ctx context.Context, blobHeader *corev2.BlobHeader, referenceBlockNumber uint64, quorumID core.QuorumID) ([]byte, error) { + args := c.Called() + + result := args.Get(0) + return result.([]byte), args.Error(1) +} diff --git a/api/docs/eigenda-protos.html b/api/docs/eigenda-protos.html index 44474db3c8..3981fbfb15 100644 --- a/api/docs/eigenda-protos.html +++ b/api/docs/eigenda-protos.html @@ -624,6 +624,29 @@

Table of Contents

+ +
  • + retriever/v2/retriever.proto + +
  • +
  • Scalar Value Types
  • @@ -3955,6 +3978,103 @@

    Retriever

    + +
    +

    retriever/v2/retriever.proto

    Top +
    +

    + + +

    BlobReply

    +

    + + + + + + + + + + + + + + + + +
    FieldTypeLabelDescription
    databytes

    The blob retrieved and reconstructed from the EigenDA Nodes per BlobRequest.

    + + + + + +

    BlobRequest

    +

    + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    FieldTypeLabelDescription
    blob_headercommon.v2.BlobHeader

    header of the blob to be retrieved

    reference_block_numberuint32

    The Ethereum block number at which the batch for this blob was constructed.

    quorum_iduint32

    Which quorum of the blob this is requesting for (note a blob can participate in +multiple quorums).

    + + + + + + + + + + + +

    Retriever

    +

    The Retriever is a service for retrieving chunks corresponding to a blob from

    the EigenDA operator nodes and reconstructing the original blob from the chunks.

    This is a client-side library that the users are supposed to operationalize.

    Note: Users generally have two ways to retrieve a blob from EigenDA V2:

    1) Retrieve from the relay that the blob is assigned to: the API

    is Relay.GetBlob() as defined in api/proto/relay/relay.proto

    2) Retrieve directly from the EigenDA Nodes, which is supported by this Retriever.

    The Relay.GetBlob() (the 1st approach) is generally faster and cheaper as the

    relay manages the blobs that it has processed, whereas the Retriever.RetrieveBlob()

    (the 2nd approach here) removes the need to trust the relay, with the downside of

    worse cost and performance.

    + + + + + + + + + + + + + + +
    Method NameRequest TypeResponse TypeDescription
    RetrieveBlobBlobRequestBlobReply

    This fans out request to EigenDA Nodes to retrieve the chunks and returns the +reconstructed original blob in response.

    + + +

    Scalar Value Types

    diff --git a/api/docs/eigenda-protos.md b/api/docs/eigenda-protos.md index 60d2a21c90..69f4413969 100644 --- a/api/docs/eigenda-protos.md +++ b/api/docs/eigenda-protos.md @@ -119,6 +119,12 @@ - [Retriever](#retriever-Retriever) +- [retriever/v2/retriever.proto](#retriever_v2_retriever-proto) + - [BlobReply](#retriever-v2-BlobReply) + - [BlobRequest](#retriever-v2-BlobRequest) + + - [Retriever](#retriever-v2-Retriever) + - [Scalar Value Types](#scalar-value-types) @@ -1737,6 +1743,76 @@ worse cost and performance. + +

    Top

    + +## retriever/v2/retriever.proto + + + + + +### BlobReply + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| data | [bytes](#bytes) | | The blob retrieved and reconstructed from the EigenDA Nodes per BlobRequest. | + + + + + + + + +### BlobRequest + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| blob_header | [common.v2.BlobHeader](#common-v2-BlobHeader) | | header of the blob to be retrieved | +| reference_block_number | [uint32](#uint32) | | The Ethereum block number at which the batch for this blob was constructed. | +| quorum_id | [uint32](#uint32) | | Which quorum of the blob this is requesting for (note a blob can participate in multiple quorums). | + + + + + + + + + + + + + + +### Retriever +The Retriever is a service for retrieving chunks corresponding to a blob from +the EigenDA operator nodes and reconstructing the original blob from the chunks. +This is a client-side library that the users are supposed to operationalize. + +Note: Users generally have two ways to retrieve a blob from EigenDA V2: + 1) Retrieve from the relay that the blob is assigned to: the API + is Relay.GetBlob() as defined in api/proto/relay/relay.proto + 2) Retrieve directly from the EigenDA Nodes, which is supported by this Retriever. + +The Relay.GetBlob() (the 1st approach) is generally faster and cheaper as the +relay manages the blobs that it has processed, whereas the Retriever.RetrieveBlob() +(the 2nd approach here) removes the need to trust the relay, with the downside of +worse cost and performance. + +| Method Name | Request Type | Response Type | Description | +| ----------- | ------------ | ------------- | ------------| +| RetrieveBlob | [BlobRequest](#retriever-v2-BlobRequest) | [BlobReply](#retriever-v2-BlobReply) | This fans out request to EigenDA Nodes to retrieve the chunks and returns the reconstructed original blob in response. | + + + + + ## Scalar Value Types | .proto Type | Notes | C++ | Java | Python | Go | C# | PHP | Ruby | diff --git a/api/docs/retriever.html b/api/docs/retriever.html index 07a0cef13d..c5de205af5 100644 --- a/api/docs/retriever.html +++ b/api/docs/retriever.html @@ -175,22 +175,22 @@

    Table of Contents

  • - retriever/retriever.proto + retriever/v2/retriever.proto @@ -203,12 +203,12 @@

    Table of Contents

    -

    retriever/retriever.proto

    Top +

    retriever/v2/retriever.proto

    Top

    -

    BlobReply

    +

    BlobReply

    @@ -232,7 +232,7 @@

    BlobReply

    -

    BlobRequest

    +

    BlobRequest

    @@ -243,20 +243,10 @@

    BlobRequest

  • - - - - - - - - - + + - + @@ -287,8 +277,8 @@

    BlobRequest

    -

    Retriever

    -

    The Retriever is a service for retrieving chunks corresponding to a blob from

    the EigenDA operator nodes and reconstructing the original blob from the chunks.

    This is a client-side library that the users are supposed to operationalize.

    Note: Users generally have two ways to retrieve a blob from EigenDA:

    1) Retrieve from the Disperser that the user initially used for dispersal: the API

    is Disperser.RetrieveBlob() as defined in api/proto/disperser/disperser.proto

    2) Retrieve directly from the EigenDA Nodes, which is supported by this Retriever.

    The Disperser.RetrieveBlob() (the 1st approach) is generally faster and cheaper as the

    Disperser manages the blobs that it has processed, whereas the Retriever.RetrieveBlob()

    (the 2nd approach here) removes the need to trust the Disperser, with the downside of

    worse cost and performance.

    +

    Retriever

    +

    The Retriever is a service for retrieving chunks corresponding to a blob from

    the EigenDA operator nodes and reconstructing the original blob from the chunks.

    This is a client-side library that the users are supposed to operationalize.

    Note: Users generally have two ways to retrieve a blob from EigenDA V2:

    1) Retrieve from the relay that the blob is assigned to: the API

    is Relay.GetBlob() as defined in api/proto/relay/relay.proto

    2) Retrieve directly from the EigenDA Nodes, which is supported by this Retriever.

    The Relay.GetBlob() (the 1st approach) is generally faster and cheaper as the

    relay manages the blobs that it has processed, whereas the Retriever.RetrieveBlob()

    (the 2nd approach here) removes the need to trust the relay, with the downside of

    worse cost and performance.

    batch_header_hashbytes

    The hash of the ReducedBatchHeader defined onchain, see: -https://github.com/Layr-Labs/eigenda/blob/master/contracts/src/interfaces/IEigenDAServiceManager.sol#L43 -This identifies the batch that this blob belongs to.

    blob_indexuint32blob_headercommon.v2.BlobHeader

    Which blob in the batch this is requesting for (note: a batch is logically an -ordered list of blobs).

    header of the blob to be retrieved

    @@ -297,8 +287,8 @@

    Retriever

    - - + + diff --git a/api/docs/retriever.md b/api/docs/retriever.md index d78b27fe97..27452d2f43 100644 --- a/api/docs/retriever.md +++ b/api/docs/retriever.md @@ -3,24 +3,24 @@ ## Table of Contents -- [retriever/retriever.proto](#retriever_retriever-proto) - - [BlobReply](#retriever-BlobReply) - - [BlobRequest](#retriever-BlobRequest) +- [retriever/v2/retriever.proto](#retriever_v2_retriever-proto) + - [BlobReply](#retriever-v2-BlobReply) + - [BlobRequest](#retriever-v2-BlobRequest) - - [Retriever](#retriever-Retriever) + - [Retriever](#retriever-v2-Retriever) - [Scalar Value Types](#scalar-value-types) - +

    Top

    -## retriever/retriever.proto +## retriever/v2/retriever.proto - + ### BlobReply @@ -35,7 +35,7 @@ - + ### BlobRequest @@ -43,8 +43,7 @@ | Field | Type | Label | Description | | ----- | ---- | ----- | ----------- | -| batch_header_hash | [bytes](#bytes) | | The hash of the ReducedBatchHeader defined onchain, see: https://github.com/Layr-Labs/eigenda/blob/master/contracts/src/interfaces/IEigenDAServiceManager.sol#L43 This identifies the batch that this blob belongs to. | -| blob_index | [uint32](#uint32) | | Which blob in the batch this is requesting for (note: a batch is logically an ordered list of blobs). | +| blob_header | [common.v2.BlobHeader](#common-v2-BlobHeader) | | header of the blob to be retrieved | | reference_block_number | [uint32](#uint32) | | The Ethereum block number at which the batch for this blob was constructed. | | quorum_id | [uint32](#uint32) | | Which quorum of the blob this is requesting for (note a blob can participate in multiple quorums). | @@ -59,26 +58,26 @@ - + ### Retriever The Retriever is a service for retrieving chunks corresponding to a blob from the EigenDA operator nodes and reconstructing the original blob from the chunks. This is a client-side library that the users are supposed to operationalize. -Note: Users generally have two ways to retrieve a blob from EigenDA: - 1) Retrieve from the Disperser that the user initially used for dispersal: the API - is Disperser.RetrieveBlob() as defined in api/proto/disperser/disperser.proto +Note: Users generally have two ways to retrieve a blob from EigenDA V2: + 1) Retrieve from the relay that the blob is assigned to: the API + is Relay.GetBlob() as defined in api/proto/relay/relay.proto 2) Retrieve directly from the EigenDA Nodes, which is supported by this Retriever. -The Disperser.RetrieveBlob() (the 1st approach) is generally faster and cheaper as the -Disperser manages the blobs that it has processed, whereas the Retriever.RetrieveBlob() -(the 2nd approach here) removes the need to trust the Disperser, with the downside of +The Relay.GetBlob() (the 1st approach) is generally faster and cheaper as the +relay manages the blobs that it has processed, whereas the Retriever.RetrieveBlob() +(the 2nd approach here) removes the need to trust the relay, with the downside of worse cost and performance. | Method Name | Request Type | Response Type | Description | | ----------- | ------------ | ------------- | ------------| -| RetrieveBlob | [BlobRequest](#retriever-BlobRequest) | [BlobReply](#retriever-BlobReply) | This fans out request to EigenDA Nodes to retrieve the chunks and returns the reconstructed original blob in response. | +| RetrieveBlob | [BlobRequest](#retriever-v2-BlobRequest) | [BlobReply](#retriever-v2-BlobReply) | This fans out request to EigenDA Nodes to retrieve the chunks and returns the reconstructed original blob in response. | diff --git a/api/grpc/retriever/v2/retriever.pb.go b/api/grpc/retriever/v2/retriever.pb.go new file mode 100644 index 0000000000..d9ef75061c --- /dev/null +++ b/api/grpc/retriever/v2/retriever.pb.go @@ -0,0 +1,248 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v4.23.4 +// source: retriever/v2/retriever.proto + +package v2 + +import ( + v2 "github.com/Layr-Labs/eigenda/api/grpc/common/v2" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type BlobRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // header of the blob to be retrieved + BlobHeader *v2.BlobHeader `protobuf:"bytes,1,opt,name=blob_header,json=blobHeader,proto3" json:"blob_header,omitempty"` + // The Ethereum block number at which the batch for this blob was constructed. + ReferenceBlockNumber uint32 `protobuf:"varint,2,opt,name=reference_block_number,json=referenceBlockNumber,proto3" json:"reference_block_number,omitempty"` + // Which quorum of the blob this is requesting for (note a blob can participate in + // multiple quorums). + QuorumId uint32 `protobuf:"varint,3,opt,name=quorum_id,json=quorumId,proto3" json:"quorum_id,omitempty"` +} + +func (x *BlobRequest) Reset() { + *x = BlobRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_retriever_v2_retriever_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BlobRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BlobRequest) ProtoMessage() {} + +func (x *BlobRequest) ProtoReflect() protoreflect.Message { + mi := &file_retriever_v2_retriever_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BlobRequest.ProtoReflect.Descriptor instead. +func (*BlobRequest) Descriptor() ([]byte, []int) { + return file_retriever_v2_retriever_proto_rawDescGZIP(), []int{0} +} + +func (x *BlobRequest) GetBlobHeader() *v2.BlobHeader { + if x != nil { + return x.BlobHeader + } + return nil +} + +func (x *BlobRequest) GetReferenceBlockNumber() uint32 { + if x != nil { + return x.ReferenceBlockNumber + } + return 0 +} + +func (x *BlobRequest) GetQuorumId() uint32 { + if x != nil { + return x.QuorumId + } + return 0 +} + +type BlobReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The blob retrieved and reconstructed from the EigenDA Nodes per BlobRequest. + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *BlobReply) Reset() { + *x = BlobReply{} + if protoimpl.UnsafeEnabled { + mi := &file_retriever_v2_retriever_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BlobReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BlobReply) ProtoMessage() {} + +func (x *BlobReply) ProtoReflect() protoreflect.Message { + mi := &file_retriever_v2_retriever_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BlobReply.ProtoReflect.Descriptor instead. +func (*BlobReply) Descriptor() ([]byte, []int) { + return file_retriever_v2_retriever_proto_rawDescGZIP(), []int{1} +} + +func (x *BlobReply) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +var File_retriever_v2_retriever_proto protoreflect.FileDescriptor + +var file_retriever_v2_retriever_proto_rawDesc = []byte{ + 0x0a, 0x1c, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x2f, 0x72, + 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0c, + 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x1a, 0x16, 0x63, 0x6f, + 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x76, 0x32, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x98, 0x01, 0x0a, 0x0b, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a, 0x0b, 0x62, 0x6c, 0x6f, 0x62, 0x5f, 0x68, 0x65, 0x61, + 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, + 0x6f, 0x6e, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, + 0x52, 0x0a, 0x62, 0x6c, 0x6f, 0x62, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x34, 0x0a, 0x16, + 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, + 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x14, 0x72, 0x65, + 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x4e, 0x75, 0x6d, 0x62, + 0x65, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x71, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x5f, 0x69, 0x64, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x71, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x49, 0x64, 0x22, + 0x1f, 0x0a, 0x09, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x12, 0x0a, 0x04, + 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x32, 0x51, 0x0a, 0x09, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x72, 0x12, 0x44, 0x0a, + 0x0c, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x19, 0x2e, + 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x6c, 0x6f, + 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x72, 0x65, 0x74, 0x72, 0x69, + 0x65, 0x76, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, + 0x79, 0x22, 0x00, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x4c, 0x61, 0x79, 0x72, 0x2d, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x65, 0x69, 0x67, 0x65, + 0x6e, 0x64, 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x72, 0x65, 0x74, + 0x72, 0x69, 0x65, 0x76, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, +} + +var ( + file_retriever_v2_retriever_proto_rawDescOnce sync.Once + file_retriever_v2_retriever_proto_rawDescData = file_retriever_v2_retriever_proto_rawDesc +) + +func file_retriever_v2_retriever_proto_rawDescGZIP() []byte { + file_retriever_v2_retriever_proto_rawDescOnce.Do(func() { + file_retriever_v2_retriever_proto_rawDescData = protoimpl.X.CompressGZIP(file_retriever_v2_retriever_proto_rawDescData) + }) + return file_retriever_v2_retriever_proto_rawDescData +} + +var file_retriever_v2_retriever_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_retriever_v2_retriever_proto_goTypes = []interface{}{ + (*BlobRequest)(nil), // 0: retriever.v2.BlobRequest + (*BlobReply)(nil), // 1: retriever.v2.BlobReply + (*v2.BlobHeader)(nil), // 2: common.v2.BlobHeader +} +var file_retriever_v2_retriever_proto_depIdxs = []int32{ + 2, // 0: retriever.v2.BlobRequest.blob_header:type_name -> common.v2.BlobHeader + 0, // 1: retriever.v2.Retriever.RetrieveBlob:input_type -> retriever.v2.BlobRequest + 1, // 2: retriever.v2.Retriever.RetrieveBlob:output_type -> retriever.v2.BlobReply + 2, // [2:3] is the sub-list for method output_type + 1, // [1:2] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_retriever_v2_retriever_proto_init() } +func file_retriever_v2_retriever_proto_init() { + if File_retriever_v2_retriever_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_retriever_v2_retriever_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BlobRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_retriever_v2_retriever_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BlobReply); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_retriever_v2_retriever_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_retriever_v2_retriever_proto_goTypes, + DependencyIndexes: file_retriever_v2_retriever_proto_depIdxs, + MessageInfos: file_retriever_v2_retriever_proto_msgTypes, + }.Build() + File_retriever_v2_retriever_proto = out.File + file_retriever_v2_retriever_proto_rawDesc = nil + file_retriever_v2_retriever_proto_goTypes = nil + file_retriever_v2_retriever_proto_depIdxs = nil +} diff --git a/api/grpc/retriever/v2/retriever_grpc.pb.go b/api/grpc/retriever/v2/retriever_grpc.pb.go new file mode 100644 index 0000000000..31fa39a90b --- /dev/null +++ b/api/grpc/retriever/v2/retriever_grpc.pb.go @@ -0,0 +1,113 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.23.4 +// source: retriever/v2/retriever.proto + +package v2 + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + Retriever_RetrieveBlob_FullMethodName = "/retriever.v2.Retriever/RetrieveBlob" +) + +// RetrieverClient is the client API for Retriever service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type RetrieverClient interface { + // This fans out request to EigenDA Nodes to retrieve the chunks and returns the + // reconstructed original blob in response. + RetrieveBlob(ctx context.Context, in *BlobRequest, opts ...grpc.CallOption) (*BlobReply, error) +} + +type retrieverClient struct { + cc grpc.ClientConnInterface +} + +func NewRetrieverClient(cc grpc.ClientConnInterface) RetrieverClient { + return &retrieverClient{cc} +} + +func (c *retrieverClient) RetrieveBlob(ctx context.Context, in *BlobRequest, opts ...grpc.CallOption) (*BlobReply, error) { + out := new(BlobReply) + err := c.cc.Invoke(ctx, Retriever_RetrieveBlob_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// RetrieverServer is the server API for Retriever service. +// All implementations must embed UnimplementedRetrieverServer +// for forward compatibility +type RetrieverServer interface { + // This fans out request to EigenDA Nodes to retrieve the chunks and returns the + // reconstructed original blob in response. + RetrieveBlob(context.Context, *BlobRequest) (*BlobReply, error) + mustEmbedUnimplementedRetrieverServer() +} + +// UnimplementedRetrieverServer must be embedded to have forward compatible implementations. +type UnimplementedRetrieverServer struct { +} + +func (UnimplementedRetrieverServer) RetrieveBlob(context.Context, *BlobRequest) (*BlobReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method RetrieveBlob not implemented") +} +func (UnimplementedRetrieverServer) mustEmbedUnimplementedRetrieverServer() {} + +// UnsafeRetrieverServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to RetrieverServer will +// result in compilation errors. +type UnsafeRetrieverServer interface { + mustEmbedUnimplementedRetrieverServer() +} + +func RegisterRetrieverServer(s grpc.ServiceRegistrar, srv RetrieverServer) { + s.RegisterService(&Retriever_ServiceDesc, srv) +} + +func _Retriever_RetrieveBlob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BlobRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RetrieverServer).RetrieveBlob(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Retriever_RetrieveBlob_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RetrieverServer).RetrieveBlob(ctx, req.(*BlobRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// Retriever_ServiceDesc is the grpc.ServiceDesc for Retriever service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Retriever_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "retriever.v2.Retriever", + HandlerType: (*RetrieverServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "RetrieveBlob", + Handler: _Retriever_RetrieveBlob_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "retriever/v2/retriever.proto", +} diff --git a/api/proto/retriever/v2/retriever.proto b/api/proto/retriever/v2/retriever.proto new file mode 100644 index 0000000000..87a758ec12 --- /dev/null +++ b/api/proto/retriever/v2/retriever.proto @@ -0,0 +1,39 @@ +syntax = "proto3"; + +import "common/v2/common.proto"; +option go_package = "github.com/Layr-Labs/eigenda/api/grpc/retriever/v2"; +package retriever.v2; + +// The Retriever is a service for retrieving chunks corresponding to a blob from +// the EigenDA operator nodes and reconstructing the original blob from the chunks. +// This is a client-side library that the users are supposed to operationalize. +// +// Note: Users generally have two ways to retrieve a blob from EigenDA V2: +// 1) Retrieve from the relay that the blob is assigned to: the API +// is Relay.GetBlob() as defined in api/proto/relay/relay.proto +// 2) Retrieve directly from the EigenDA Nodes, which is supported by this Retriever. +// +// The Relay.GetBlob() (the 1st approach) is generally faster and cheaper as the +// relay manages the blobs that it has processed, whereas the Retriever.RetrieveBlob() +// (the 2nd approach here) removes the need to trust the relay, with the downside of +// worse cost and performance. +service Retriever { + // This fans out request to EigenDA Nodes to retrieve the chunks and returns the + // reconstructed original blob in response. + rpc RetrieveBlob(BlobRequest) returns (BlobReply) {} +} + +message BlobRequest { + // header of the blob to be retrieved + common.v2.BlobHeader blob_header = 1; + // The Ethereum block number at which the batch for this blob was constructed. + uint32 reference_block_number = 2; + // Which quorum of the blob this is requesting for (note a blob can participate in + // multiple quorums). + uint32 quorum_id = 3; +} + +message BlobReply { + // The blob retrieved and reconstructed from the EigenDA Nodes per BlobRequest. + bytes data = 1; +} diff --git a/core/data.go b/core/data.go index 367faad32d..d68e598a88 100644 --- a/core/data.go +++ b/core/data.go @@ -596,6 +596,10 @@ func (pm *PaymentMetadata) ToProtobuf() *commonpb.PaymentHeader { // ConvertToProtoPaymentHeader converts a PaymentMetadata to a protobuf payment header func ConvertToPaymentMetadata(ph *commonpb.PaymentHeader) *PaymentMetadata { + if ph == nil { + return nil + } + return &PaymentMetadata{ AccountID: ph.AccountId, ReservationPeriod: ph.ReservationPeriod, diff --git a/core/v2/types.go b/core/v2/types.go index 3ff9411081..41f58c6772 100644 --- a/core/v2/types.go +++ b/core/v2/types.go @@ -109,6 +109,9 @@ func BlobHeaderFromProtobuf(proto *commonpb.BlobHeader) (*BlobHeader, error) { } paymentMetadata := core.ConvertToPaymentMetadata(proto.GetPaymentHeader()) + if paymentMetadata == nil { + return nil, errors.New("payment metadata is nil") + } return &BlobHeader{ BlobVersion: BlobVersion(proto.GetVersion()), diff --git a/go.mod b/go.mod index f14a3556e6..c306d29bfd 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/fxamacker/cbor/v2 v2.5.0 github.com/gin-contrib/logger v0.2.6 github.com/gin-gonic/gin v1.9.1 + github.com/go-errors/errors v1.4.2 github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 github.com/hashicorp/go-multierror v1.1.1 github.com/ingonyama-zk/icicle/v3 v3.1.0 diff --git a/retriever/cmd/main.go b/retriever/cmd/main.go index 8ea7b0429a..6c13e7f899 100644 --- a/retriever/cmd/main.go +++ b/retriever/cmd/main.go @@ -8,20 +8,22 @@ import ( "os" "github.com/Layr-Labs/eigenda/api/clients" + clientsv2 "github.com/Layr-Labs/eigenda/api/clients/v2" pb "github.com/Layr-Labs/eigenda/api/grpc/retriever" + pbv2 "github.com/Layr-Labs/eigenda/api/grpc/retriever/v2" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/common/geth" "github.com/Layr-Labs/eigenda/common/healthcheck" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/eth" - coreindexer "github.com/Layr-Labs/eigenda/core/indexer" "github.com/Layr-Labs/eigenda/core/thegraph" "github.com/Layr-Labs/eigenda/encoding/kzg/verifier" "github.com/Layr-Labs/eigenda/retriever" retrivereth "github.com/Layr-Labs/eigenda/retriever/eth" "github.com/Layr-Labs/eigenda/retriever/flags" + retrieverv2 "github.com/Layr-Labs/eigenda/retriever/v2" gethcommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/rpc" + "github.com/go-errors/errors" "github.com/urfave/cli" "google.golang.org/grpc" "google.golang.org/grpc/reflection" @@ -89,69 +91,65 @@ func RetrieverMain(ctx *cli.Context) error { log.Fatalln("could not start tcp listener", err) } - // TODO(ian-shim): uncomment when https://github.com/Layr-Labs/eigenda-internal/issues/77 is done - // store, err := leveldb.NewHeaderStore(config.IndexerDataDir) - // if err != nil { - // return err - // } - tx, err := eth.NewReader(logger, gethClient, config.BLSOperatorStateRetrieverAddr, config.EigenDAServiceManagerAddr) if err != nil { log.Fatalln("could not start tcp listener", err) } cs := eth.NewChainState(tx, gethClient) - rpcClient, err := rpc.Dial(config.EthClientConfig.RPCURLs[0]) if err != nil { log.Fatalln("could not start tcp listener", err) } - var ics core.IndexedChainState - if config.UseGraph { - logger.Info("Using graph node") - - logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint) - ics = thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger) - } else { - logger.Info("Using built-in indexer") - - indexer, err := coreindexer.CreateNewIndexer( - &config.IndexerConfig, - gethClient, - rpcClient, - config.EigenDAServiceManagerAddr, - logger, - ) + logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint) + ics := thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger) + + if config.EigenDAVersion == 1 { + agn := &core.StdAssignmentCoordinator{} + retrievalClient, err := clients.NewRetrievalClient(logger, ics, agn, nodeClient, v, config.NumConnections) if err != nil { - return err + log.Fatalln("could not start tcp listener", err) } - ics, err = coreindexer.NewIndexedChainState(cs, indexer) - if err != nil { - return err + + chainClient := retrivereth.NewChainClient(gethClient, logger) + retrieverServiceServer := retriever.NewServer(config, logger, retrievalClient, ics, chainClient) + if err = retrieverServiceServer.Start(context.Background()); err != nil { + log.Fatalln("failed to start retriever service server", err) } - } - agn := &core.StdAssignmentCoordinator{} - retrievalClient, err := clients.NewRetrievalClient(logger, ics, agn, nodeClient, v, config.NumConnections) - if err != nil { - log.Fatalln("could not start tcp listener", err) - } + // Register reflection service on gRPC server + // This makes "grpcurl -plaintext localhost:9000 list" command work + reflection.Register(gs) + + pb.RegisterRetrieverServer(gs, retrieverServiceServer) - chainClient := retrivereth.NewChainClient(gethClient, logger) - retrieverServiceServer := retriever.NewServer(config, logger, retrievalClient, ics, chainClient) - if err = retrieverServiceServer.Start(context.Background()); err != nil { - log.Fatalln("failed to start retriever service server", err) + // Register Server for Health Checks + name := pb.Retriever_ServiceDesc.ServiceName + healthcheck.RegisterHealthServer(name, gs) + + log.Printf("server listening at %s", addr) + return gs.Serve(listener) } - // Register reflection service on gRPC server - // This makes "grpcurl -plaintext localhost:9000 list" command work - reflection.Register(gs) + if config.EigenDAVersion == 2 { + retrievalClient := clientsv2.NewRetrievalClient(logger, tx, ics, v, config.NumConnections) + retrieverServiceServer := retrieverv2.NewServer(config, logger, retrievalClient, ics) + if err = retrieverServiceServer.Start(context.Background()); err != nil { + log.Fatalln("failed to start retriever service server", err) + } - pb.RegisterRetrieverServer(gs, retrieverServiceServer) + // Register reflection service on gRPC server + // This makes "grpcurl -plaintext localhost:9000 list" command work + reflection.Register(gs) - // Register Server for Health Checks - name := pb.Retriever_ServiceDesc.ServiceName - healthcheck.RegisterHealthServer(name, gs) + pbv2.RegisterRetrieverServer(gs, retrieverServiceServer) + + // Register Server for Health Checks + name := pb.Retriever_ServiceDesc.ServiceName + healthcheck.RegisterHealthServer(name, gs) + + log.Printf("server listening at %s", addr) + return gs.Serve(listener) + } - log.Printf("server listening at %s", addr) - return gs.Serve(listener) + return errors.New("invalid EigenDA version") } diff --git a/retriever/config.go b/retriever/config.go index c317980954..6476fa3537 100644 --- a/retriever/config.go +++ b/retriever/config.go @@ -1,13 +1,13 @@ package retriever import ( + "errors" "time" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/common/geth" "github.com/Layr-Labs/eigenda/core/thegraph" "github.com/Layr-Labs/eigenda/encoding/kzg" - "github.com/Layr-Labs/eigenda/indexer" "github.com/Layr-Labs/eigenda/retriever/flags" "github.com/urfave/cli" ) @@ -16,44 +16,39 @@ type Config struct { EncoderConfig kzg.KzgConfig EthClientConfig geth.EthClientConfig LoggerConfig common.LoggerConfig - IndexerConfig indexer.Config MetricsConfig MetricsConfig ChainStateConfig thegraph.Config - IndexerDataDir string Timeout time.Duration NumConnections int BLSOperatorStateRetrieverAddr string EigenDAServiceManagerAddr string - UseGraph bool + + EigenDAVersion int } -func ReadRetrieverConfig(ctx *cli.Context) *Config { +func NewConfig(ctx *cli.Context) (*Config, error) { + version := ctx.GlobalInt(flags.EigenDAVersionFlag.Name) + if version != 1 && version != 2 { + return nil, errors.New("invalid EigenDA version") + } + loggerConfig, err := common.ReadLoggerCLIConfig(ctx, flags.FlagPrefix) + if err != nil { + return nil, err + } + return &Config{ + LoggerConfig: *loggerConfig, EncoderConfig: kzg.ReadCLIConfig(ctx), EthClientConfig: geth.ReadEthClientConfig(ctx), - IndexerConfig: indexer.ReadIndexerConfig(ctx), MetricsConfig: MetricsConfig{ HTTPPort: ctx.GlobalString(flags.MetricsHTTPPortFlag.Name), }, ChainStateConfig: thegraph.ReadCLIConfig(ctx), - IndexerDataDir: ctx.GlobalString(flags.IndexerDataDirFlag.Name), Timeout: ctx.Duration(flags.TimeoutFlag.Name), NumConnections: ctx.Int(flags.NumConnectionsFlag.Name), BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name), EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name), - UseGraph: ctx.GlobalBool(flags.UseGraphFlag.Name), - } -} - -func NewConfig(ctx *cli.Context) (*Config, error) { - loggerConfig, err := common.ReadLoggerCLIConfig(ctx, flags.FlagPrefix) - if err != nil { - return nil, err - } - - config := ReadRetrieverConfig(ctx) - config.LoggerConfig = *loggerConfig - - return config, nil + EigenDAVersion: version, + }, nil } diff --git a/retriever/flags/flags.go b/retriever/flags/flags.go index c8f1276a7d..e8f07cba85 100644 --- a/retriever/flags/flags.go +++ b/retriever/flags/flags.go @@ -5,7 +5,6 @@ import ( "github.com/Layr-Labs/eigenda/common/geth" "github.com/Layr-Labs/eigenda/core/thegraph" "github.com/Layr-Labs/eigenda/encoding/kzg" - "github.com/Layr-Labs/eigenda/indexer" "github.com/urfave/cli" ) @@ -55,12 +54,6 @@ var ( EnvVar: common.PrefixEnvVar(envPrefix, "NUM_CONNECTIONS"), Value: 20, } - IndexerDataDirFlag = cli.StringFlag{ - Name: common.PrefixFlag(FlagPrefix, "indexer-data-dir"), - Usage: "the data directory for the indexer", - EnvVar: common.PrefixEnvVar(envPrefix, "DATA_DIR"), - Value: "./data/retriever", - } MetricsHTTPPortFlag = cli.StringFlag{ Name: common.PrefixFlag(FlagPrefix, "metrics-http-port"), Usage: "the http port which the metrics prometheus server is listening", @@ -68,11 +61,12 @@ var ( Value: "9100", EnvVar: common.PrefixEnvVar(envPrefix, "METRICS_HTTP_PORT"), } - UseGraphFlag = cli.BoolFlag{ - Name: common.PrefixFlag(FlagPrefix, "use-graph"), - Usage: "Whether to use the graph node", + EigenDAVersionFlag = cli.IntFlag{ + Name: common.PrefixFlag(FlagPrefix, "eigenda-version"), + Usage: "EigenDA version: currently supports 1 and 2", Required: false, - EnvVar: common.PrefixEnvVar(envPrefix, "USE_GRAPH"), + EnvVar: common.PrefixEnvVar(envPrefix, "EIGENDA_VERSION"), + Value: 1, } ) @@ -84,9 +78,8 @@ func RetrieverFlags(envPrefix string) []cli.Flag { BlsOperatorStateRetrieverFlag, EigenDAServiceManagerFlag, NumConnectionsFlag, - IndexerDataDirFlag, MetricsHTTPPortFlag, - UseGraphFlag, + EigenDAVersionFlag, } } @@ -98,6 +91,5 @@ func init() { Flags = append(Flags, kzg.CLIFlags(envPrefix)...) Flags = append(Flags, geth.EthClientFlags(envPrefix)...) Flags = append(Flags, common.LoggerCLIFlags(envPrefix, FlagPrefix)...) - Flags = append(Flags, indexer.CLIFlags(envPrefix)...) Flags = append(Flags, thegraph.CLIFlags(envPrefix)...) } diff --git a/retriever/v2/server.go b/retriever/v2/server.go new file mode 100644 index 0000000000..f212a68cef --- /dev/null +++ b/retriever/v2/server.go @@ -0,0 +1,85 @@ +package v2 + +import ( + "bytes" + "context" + "encoding/hex" + "errors" + + "github.com/Layr-Labs/eigenda/api/clients/v2" + pb "github.com/Layr-Labs/eigenda/api/grpc/retriever/v2" + "github.com/Layr-Labs/eigenda/core" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + "github.com/Layr-Labs/eigenda/retriever" + "github.com/Layr-Labs/eigensdk-go/logging" +) + +type Config = retriever.Config + +type Server struct { + pb.UnimplementedRetrieverServer + + config *Config + retrievalClient clients.RetrievalClient + indexedState core.IndexedChainState + logger logging.Logger + metrics *retriever.Metrics +} + +func NewServer( + config *Config, + logger logging.Logger, + retrievalClient clients.RetrievalClient, + indexedState core.IndexedChainState, +) *Server { + metrics := retriever.NewMetrics(config.MetricsConfig.HTTPPort, logger) + + return &Server{ + config: config, + retrievalClient: retrievalClient, + indexedState: indexedState, + logger: logger.With("component", "RetrieverServer"), + metrics: metrics, + } +} + +func (s *Server) Start(ctx context.Context) error { + s.metrics.Start(ctx) + return s.indexedState.Start(ctx) +} + +func (s *Server) RetrieveBlob(ctx context.Context, req *pb.BlobRequest) (*pb.BlobReply, error) { + if req.GetBlobHeader() == nil { + return nil, errors.New("blob header is nil") + } + if req.GetReferenceBlockNumber() == 0 { + return nil, errors.New("reference block number is 0") + } + + blobHeader, err := corev2.BlobHeaderFromProtobuf(req.GetBlobHeader()) + if err != nil { + return nil, err + } + + blobKey, err := blobHeader.BlobKey() + if err != nil { + return nil, err + } + + s.logger.Info("Received request: ", "blobKey", hex.EncodeToString(blobKey[:]), "referenceBlockNumber", req.GetReferenceBlockNumber(), "quorumId", req.GetQuorumId()) + s.metrics.IncrementRetrievalRequestCounter() + + ctxWithTimeout, cancel := context.WithTimeout(ctx, s.config.Timeout) + defer cancel() + data, err := s.retrievalClient.GetBlob(ctxWithTimeout, blobHeader, uint64(req.GetReferenceBlockNumber()), core.QuorumID(req.GetQuorumId())) + if err != nil { + return nil, err + } + restored := bytes.TrimRight(data, "\x00") + restored = codec.RemoveEmptyByteFromPaddedBytes(restored) + + return &pb.BlobReply{ + Data: restored, + }, nil +} diff --git a/retriever/v2/server_test.go b/retriever/v2/server_test.go new file mode 100644 index 0000000000..5585c9d4f3 --- /dev/null +++ b/retriever/v2/server_test.go @@ -0,0 +1,134 @@ +package v2_test + +import ( + "context" + "math/big" + "runtime" + "testing" + + clientsmock "github.com/Layr-Labs/eigenda/api/clients/v2/mock" + commonpb "github.com/Layr-Labs/eigenda/api/grpc/common" + commonpbv2 "github.com/Layr-Labs/eigenda/api/grpc/common/v2" + pb "github.com/Layr-Labs/eigenda/api/grpc/retriever/v2" + "github.com/Layr-Labs/eigenda/core" + coremock "github.com/Layr-Labs/eigenda/core/mock" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/Layr-Labs/eigenda/encoding/kzg" + "github.com/Layr-Labs/eigenda/encoding/kzg/prover" + "github.com/Layr-Labs/eigenda/encoding/kzg/verifier" + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + "github.com/Layr-Labs/eigenda/retriever/mock" + retriever "github.com/Layr-Labs/eigenda/retriever/v2" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/consensys/gnark-crypto/ecc/bn254" + "github.com/consensys/gnark-crypto/ecc/bn254/fp" + "github.com/stretchr/testify/require" +) + +const numOperators = 10 + +var ( + indexedChainState core.IndexedChainState + retrievalClient *clientsmock.MockRetrievalClient + chainClient *mock.MockChainClient + gettysburgAddressBytes = []byte("Fourscore and seven years ago our fathers brought forth, on this continent, a new nation, conceived in liberty, and dedicated to the proposition that all men are created equal. Now we are engaged in a great civil war, testing whether that nation, or any nation so conceived, and so dedicated, can long endure. We are met on a great battle-field of that war. We have come to dedicate a portion of that field, as a final resting-place for those who here gave their lives, that that nation might live. It is altogether fitting and proper that we should do this. But, in a larger sense, we cannot dedicate, we cannot consecrate—we cannot hallow—this ground. The brave men, living and dead, who struggled here, have consecrated it far above our poor power to add or detract. The world will little note, nor long remember what we say here, but it can never forget what they did here. It is for us the living, rather, to be dedicated here to the unfinished work which they who fought here have thus far so nobly advanced. It is rather for us to be here dedicated to the great task remaining before us—that from these honored dead we take increased devotion to that cause for which they here gave the last full measure of devotion—that we here highly resolve that these dead shall not have died in vain—that this nation, under God, shall have a new birth of freedom, and that government of the people, by the people, for the people, shall not perish from the earth.") +) + +func makeTestComponents() (encoding.Prover, encoding.Verifier, error) { + config := &kzg.KzgConfig{ + G1Path: "../../inabox/resources/kzg/g1.point", + G2Path: "../../inabox/resources/kzg/g2.point", + CacheDir: "../../inabox/resources/kzg/SRSTables", + SRSOrder: 3000, + SRSNumberToLoad: 3000, + NumWorker: uint64(runtime.GOMAXPROCS(0)), + LoadG2Points: true, + } + + p, err := prover.NewProver(config, nil) + if err != nil { + return nil, nil, err + } + + v, err := verifier.NewVerifier(config, nil) + if err != nil { + return nil, nil, err + } + + return p, v, nil +} + +func newTestServer(t *testing.T) *retriever.Server { + var err error + config := &retriever.Config{} + + logger := logging.NewNoopLogger() + + indexedChainState, err = coremock.MakeChainDataMock(map[uint8]int{ + 0: numOperators, + 1: numOperators, + 2: numOperators, + }) + require.NoError(t, err) + + _, _, err = makeTestComponents() + require.NoError(t, err) + + retrievalClient = &clientsmock.MockRetrievalClient{} + chainClient = mock.NewMockChainClient() + return retriever.NewServer(config, logger, retrievalClient, indexedChainState) +} + +func TestRetrieveBlob(t *testing.T) { + server := newTestServer(t) + data := codec.ConvertByPaddingEmptyByte(gettysburgAddressBytes) + retrievalClient.On("GetBlob").Return(data, nil) + + var X1, Y1 fp.Element + X1 = *X1.SetBigInt(big.NewInt(1)) + Y1 = *Y1.SetBigInt(big.NewInt(2)) + + var lengthXA0, lengthXA1, lengthYA0, lengthYA1 fp.Element + _, err := lengthXA0.SetString("10857046999023057135944570762232829481370756359578518086990519993285655852781") + require.NoError(t, err) + _, err = lengthXA1.SetString("11559732032986387107991004021392285783925812861821192530917403151452391805634") + require.NoError(t, err) + _, err = lengthYA0.SetString("8495653923123431417604973247489272438418190587263600148770280649306958101930") + require.NoError(t, err) + _, err = lengthYA1.SetString("4082367875863433681332203403145435568316851327593401208105741076214120093531") + require.NoError(t, err) + + var lengthProof, lengthCommitment bn254.G2Affine + lengthProof.X.A0 = lengthXA0 + lengthProof.X.A1 = lengthXA1 + lengthProof.Y.A0 = lengthYA0 + lengthProof.Y.A1 = lengthYA1 + + lengthCommitment = lengthProof + + mockCommitment := encoding.BlobCommitments{ + Commitment: &encoding.G1Commitment{ + X: X1, + Y: Y1, + }, + LengthCommitment: (*encoding.G2Commitment)(&lengthCommitment), + LengthProof: (*encoding.G2Commitment)(&lengthProof), + Length: 16, + } + c, err := mockCommitment.ToProtobuf() + require.NoError(t, err) + retrievalReply, err := server.RetrieveBlob(context.Background(), &pb.BlobRequest{ + BlobHeader: &commonpbv2.BlobHeader{ + Version: 0, + QuorumNumbers: []uint32{0}, + Commitment: c, + PaymentHeader: &commonpb.PaymentHeader{ + AccountId: "account_id", + }, + }, + ReferenceBlockNumber: 100, + QuorumId: 0, + }) + require.NoError(t, err) + require.Equal(t, gettysburgAddressBytes, retrievalReply.Data) +} diff --git a/tools/traffic/config/config.go b/tools/traffic/config/config.go index 2b147f1a49..9702c7be86 100644 --- a/tools/traffic/config/config.go +++ b/tools/traffic/config/config.go @@ -3,10 +3,11 @@ package config import ( "errors" "fmt" + "time" + "github.com/Layr-Labs/eigenda/api/clients" "github.com/Layr-Labs/eigenda/core/thegraph" "github.com/Layr-Labs/eigenda/retriever" - "time" "github.com/Layr-Labs/eigenda/common" "github.com/urfave/cli" @@ -55,7 +56,10 @@ func NewConfig(ctx *cli.Context) (*Config, error) { customQuorumsUint8[i] = uint8(q) } - retrieverConfig := retriever.ReadRetrieverConfig(ctx) + retrieverConfig, err := retriever.NewConfig(ctx) + if err != nil { + return nil, err + } config := &Config{ DisperserClientConfig: &clients.Config{
    Method NameRequest TypeResponse TypeDescription
    RetrieveBlobBlobRequestBlobReplyBlobRequestBlobReply

    This fans out request to EigenDA Nodes to retrieve the chunks and returns the reconstructed original blob in response.