Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate out client v2 package #986

Merged
merged 1 commit into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ import (
"google.golang.org/grpc"
)

type DisperserClientV2Config struct {
type DisperserClientConfig struct {
Hostname string
Port string
UseSecureGrpcFlag bool
}

type DisperserClientV2 interface {
type DisperserClient interface {
Comment on lines +19 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep the package name clients, then we could simplify these to clients.DisperserConfig and clients.Disperser.

Close() error
DisperseBlob(ctx context.Context, data []byte, blobVersion corev2.BlobVersion, quorums []core.QuorumID, salt uint32) (*dispv2.BlobStatus, corev2.BlobKey, error)
GetBlobStatus(ctx context.Context, blobKey corev2.BlobKey) (*disperser_rpc.BlobStatusReply, error)
GetBlobCommitment(ctx context.Context, data []byte) (*disperser_rpc.BlobCommitmentReply, error)
}

type disperserClientV2 struct {
config *DisperserClientV2Config
type disperserClient struct {
config *DisperserClientConfig
signer corev2.BlobRequestSigner
initOnce sync.Once
conn *grpc.ClientConn
Expand All @@ -39,18 +39,18 @@ type disperserClientV2 struct {
accountant *Accountant
}

var _ DisperserClientV2 = &disperserClientV2{}
var _ DisperserClient = &disperserClient{}

// DisperserClientV2 maintains a single underlying grpc connection to the disperser server,
// DisperserClient maintains a single underlying grpc connection to the disperser server,
// through which it sends requests to disperse blobs and get blob status.
// The connection is established lazily on the first method call. Don't forget to call Close(),
// which is safe to call even if the connection was never established.
//
// DisperserClientV2 is safe to be used concurrently by multiple goroutines.
// DisperserClient is safe to be used concurrently by multiple goroutines.
//
// Example usage:
//
// client := NewDisperserClientV2(config, signer)
// client := NewDisperserClient(config, signer)
// defer client.Close()
//
// // The connection will be established on the first call
Expand All @@ -61,7 +61,7 @@ var _ DisperserClientV2 = &disperserClientV2{}
//
// // Subsequent calls will use the existing connection
// status2, blobKey2, err := client.DisperseBlob(ctx, data, blobHeader)
func NewDisperserClientV2(config *DisperserClientV2Config, signer corev2.BlobRequestSigner, prover encoding.Prover, accountant *Accountant) (*disperserClientV2, error) {
func NewDisperserClient(config *DisperserClientConfig, signer corev2.BlobRequestSigner, prover encoding.Prover, accountant *Accountant) (*disperserClient, error) {
if config == nil {
return nil, api.NewErrorInvalidArg("config must be provided")
}
Expand All @@ -75,7 +75,7 @@ func NewDisperserClientV2(config *DisperserClientV2Config, signer corev2.BlobReq
return nil, api.NewErrorInvalidArg("signer must be provided")
}

return &disperserClientV2{
return &disperserClient{
config: config,
signer: signer,
prover: prover,
Expand All @@ -85,7 +85,7 @@ func NewDisperserClientV2(config *DisperserClientV2Config, signer corev2.BlobReq
}

// PopulateAccountant populates the accountant with the payment state from the disperser.
func (c *disperserClientV2) PopulateAccountant(ctx context.Context) error {
func (c *disperserClient) PopulateAccountant(ctx context.Context) error {
paymentState, err := c.GetPaymentState(ctx)
if err != nil {
return fmt.Errorf("error getting payment state for initializing accountant: %w", err)
Expand All @@ -100,7 +100,7 @@ func (c *disperserClientV2) PopulateAccountant(ctx context.Context) error {

// Close closes the grpc connection to the disperser server.
// It is thread safe and can be called multiple times.
func (c *disperserClientV2) Close() error {
func (c *disperserClient) Close() error {
if c.conn != nil {
err := c.conn.Close()
c.conn = nil
Expand All @@ -110,7 +110,7 @@ func (c *disperserClientV2) Close() error {
return nil
}

func (c *disperserClientV2) DisperseBlob(
func (c *disperserClient) DisperseBlob(
ctx context.Context,
data []byte,
blobVersion corev2.BlobVersion,
Expand Down Expand Up @@ -217,7 +217,7 @@ func (c *disperserClientV2) DisperseBlob(
}

// GetBlobStatus returns the status of a blob with the given blob key.
func (c *disperserClientV2) GetBlobStatus(ctx context.Context, blobKey corev2.BlobKey) (*disperser_rpc.BlobStatusReply, error) {
func (c *disperserClient) GetBlobStatus(ctx context.Context, blobKey corev2.BlobKey) (*disperser_rpc.BlobStatusReply, error) {
err := c.initOnceGrpcConnection()
if err != nil {
return nil, api.NewErrorInternal(err.Error())
Expand All @@ -230,7 +230,7 @@ func (c *disperserClientV2) GetBlobStatus(ctx context.Context, blobKey corev2.Bl
}

// GetPaymentState returns the payment state of the disperser client
func (c *disperserClientV2) GetPaymentState(ctx context.Context) (*disperser_rpc.GetPaymentStateReply, error) {
func (c *disperserClient) GetPaymentState(ctx context.Context) (*disperser_rpc.GetPaymentStateReply, error) {
err := c.initOnceGrpcConnection()
if err != nil {
return nil, api.NewErrorInternal(err.Error())
Expand All @@ -257,7 +257,7 @@ func (c *disperserClientV2) GetPaymentState(ctx context.Context) (*disperser_rpc
// While the blob commitment can be calculated by anyone, it requires SRS points to
// be loaded. For service that does not have access to SRS points, this method can be
// used to calculate the blob commitment in blob header, which is required for dispersal.
func (c *disperserClientV2) GetBlobCommitment(ctx context.Context, data []byte) (*disperser_rpc.BlobCommitmentReply, error) {
func (c *disperserClient) GetBlobCommitment(ctx context.Context, data []byte) (*disperser_rpc.BlobCommitmentReply, error) {
err := c.initOnceGrpcConnection()
if err != nil {
return nil, api.NewErrorInternal(err.Error())
Expand All @@ -271,7 +271,7 @@ func (c *disperserClientV2) GetBlobCommitment(ctx context.Context, data []byte)

// initOnceGrpcConnection initializes the grpc connection and client if they are not already initialized.
// If initialization fails, it caches the error and will return it on every subsequent call.
func (c *disperserClientV2) initOnceGrpcConnection() error {
func (c *disperserClient) initOnceGrpcConnection() error {
var initErr error
c.initOnce.Do(func() {
addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@ package mock
import (
"context"

"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/api/clients/v2"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/stretchr/testify/mock"
)

type MockNodeClientV2 struct {
type MockNodeClient struct {
mock.Mock
}

var _ clients.NodeClientV2 = (*MockNodeClientV2)(nil)
var _ clients.NodeClient = (*MockNodeClient)(nil)

func NewNodeClientV2() *MockNodeClientV2 {
return &MockNodeClientV2{}
func NewNodeClient() *MockNodeClient {
return &MockNodeClient{}
}

func (c *MockNodeClientV2) StoreChunks(ctx context.Context, batch *corev2.Batch) (*core.Signature, error) {
func (c *MockNodeClient) StoreChunks(ctx context.Context, batch *corev2.Batch) (*core.Signature, error) {
args := c.Called()
var signature *core.Signature
if args.Get(0) != nil {
Expand All @@ -28,7 +28,7 @@ func (c *MockNodeClientV2) StoreChunks(ctx context.Context, batch *corev2.Batch)
return signature, args.Error(1)
}

func (c *MockNodeClientV2) Close() error {
func (c *MockNodeClient) Close() error {
args := c.Called()
return args.Error(0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package mock
import (
"context"

"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/api/clients/v2"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/stretchr/testify/mock"
)
Expand Down
20 changes: 10 additions & 10 deletions api/clients/node_client_v2.go → api/clients/v2/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,37 @@ import (
"google.golang.org/grpc"
)

type NodeClientV2Config struct {
type NodeClientConfig struct {
Hostname string
Port string
UseSecureGrpcFlag bool
}

type NodeClientV2 interface {
type NodeClient interface {
StoreChunks(ctx context.Context, certs *corev2.Batch) (*core.Signature, error)
Close() error
}

type nodeClientV2 struct {
config *NodeClientV2Config
type nodeClient struct {
config *NodeClientConfig
initOnce sync.Once
conn *grpc.ClientConn

dispersalClient nodegrpc.DispersalClient
}

var _ NodeClientV2 = (*nodeClientV2)(nil)
var _ NodeClient = (*nodeClient)(nil)

func NewNodeClientV2(config *NodeClientV2Config) (*nodeClientV2, error) {
func NewNodeClient(config *NodeClientConfig) (*nodeClient, error) {
if config == nil || config.Hostname == "" || config.Port == "" {
return nil, fmt.Errorf("invalid config: %v", config)
}
return &nodeClientV2{
return &nodeClient{
config: config,
}, nil
}

func (c *nodeClientV2) StoreChunks(ctx context.Context, batch *corev2.Batch) (*core.Signature, error) {
func (c *nodeClient) StoreChunks(ctx context.Context, batch *corev2.Batch) (*core.Signature, error) {
if len(batch.BlobCertificates) == 0 {
return nil, fmt.Errorf("no blob certificates in the batch")
}
Expand Down Expand Up @@ -89,7 +89,7 @@ func (c *nodeClientV2) StoreChunks(ctx context.Context, batch *corev2.Batch) (*c

// Close closes the grpc connection to the disperser server.
// It is thread safe and can be called multiple times.
func (c *nodeClientV2) Close() error {
func (c *nodeClient) Close() error {
if c.conn != nil {
err := c.conn.Close()
c.conn = nil
Expand All @@ -99,7 +99,7 @@ func (c *nodeClientV2) Close() error {
return nil
}

func (c *nodeClientV2) initOnceGrpcConnection() error {
func (c *nodeClient) initOnceGrpcConnection() error {
var initErr error
c.initOnce.Do(func() {
addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port)
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"

"github.com/Layr-Labs/eigenda/api/clients"
grpcnode "github.com/Layr-Labs/eigenda/api/grpc/node/v2"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
Expand All @@ -16,30 +17,30 @@ import (
"github.com/gammazero/workerpool"
)

// RetrievalClientV2 is an object that can retrieve blobs from the DA nodes.
// RetrievalClient is an object that can retrieve blobs from the DA nodes.
// To retrieve a blob from the relay, use RelayClient instead.
type RetrievalClientV2 interface {
type RetrievalClient interface {
// GetBlob downloads chunks of a blob from operator network and reconstructs the blob.
GetBlob(ctx context.Context, blobHeader corev2.BlobHeader, referenceBlockNumber uint64, quorumID core.QuorumID) ([]byte, error)
}

type retrievalClientV2 struct {
type retrievalClient struct {
logger logging.Logger
ethClient core.Reader
indexedChainState core.IndexedChainState
verifier encoding.Verifier
numConnections int
}

// NewRetrievalClientV2 creates a new retrieval client.
func NewRetrievalClientV2(
// NewRetrievalClient creates a new retrieval client.
func NewRetrievalClient(
logger logging.Logger,
ethClient core.Reader,
chainState core.IndexedChainState,
verifier encoding.Verifier,
numConnections int,
) RetrievalClientV2 {
return &retrievalClientV2{
) RetrievalClient {
return &retrievalClient{
logger: logger.With("component", "RetrievalClient"),
ethClient: ethClient,
indexedChainState: chainState,
Expand All @@ -48,7 +49,7 @@ func NewRetrievalClientV2(
}
}

func (r *retrievalClientV2) GetBlob(ctx context.Context, blobHeader corev2.BlobHeader, referenceBlockNumber uint64, quorumID core.QuorumID) ([]byte, error) {
func (r *retrievalClient) GetBlob(ctx context.Context, blobHeader corev2.BlobHeader, referenceBlockNumber uint64, quorumID core.QuorumID) ([]byte, error) {
blobKey, err := blobHeader.BlobKey()
if err != nil {
return nil, err
Expand Down Expand Up @@ -90,7 +91,7 @@ func (r *retrievalClientV2) GetBlob(ctx context.Context, blobHeader corev2.BlobH
}

// Fetch chunks from all operators
chunksChan := make(chan RetrievedChunks, len(operators))
chunksChan := make(chan clients.RetrievedChunks, len(operators))
pool := workerpool.New(r.numConnections)
for opID := range operators {
opID := opID
Expand Down Expand Up @@ -139,13 +140,13 @@ func (r *retrievalClientV2) GetBlob(ctx context.Context, blobHeader corev2.BlobH
)
}

func (r *retrievalClientV2) getChunksFromOperator(
func (r *retrievalClient) getChunksFromOperator(
ctx context.Context,
opID core.OperatorID,
opInfo *core.IndexedOperatorInfo,
blobKey corev2.BlobKey,
quorumID core.QuorumID,
chunksChan chan RetrievedChunks,
chunksChan chan clients.RetrievedChunks,
) {
conn, err := grpc.NewClient(
core.OperatorSocket(opInfo.Socket).GetRetrievalSocket(),
Expand All @@ -158,7 +159,7 @@ func (r *retrievalClientV2) getChunksFromOperator(
}
}()
if err != nil {
chunksChan <- RetrievedChunks{
chunksChan <- clients.RetrievedChunks{
OperatorID: opID,
Err: err,
Chunks: nil,
Expand All @@ -174,7 +175,7 @@ func (r *retrievalClientV2) getChunksFromOperator(

reply, err := n.GetChunks(ctx, request)
if err != nil {
chunksChan <- RetrievedChunks{
chunksChan <- clients.RetrievedChunks{
OperatorID: opID,
Err: err,
Chunks: nil,
Expand All @@ -187,7 +188,7 @@ func (r *retrievalClientV2) getChunksFromOperator(
var chunk *encoding.Frame
chunk, err = new(encoding.Frame).DeserializeGnark(data)
if err != nil {
chunksChan <- RetrievedChunks{
chunksChan <- clients.RetrievedChunks{
OperatorID: opID,
Err: err,
Chunks: nil,
Expand All @@ -197,7 +198,7 @@ func (r *retrievalClientV2) getChunksFromOperator(

chunks[i] = chunk
}
chunksChan <- RetrievedChunks{
chunksChan <- clients.RetrievedChunks{
OperatorID: opID,
Err: nil,
Chunks: chunks,
Expand Down
19 changes: 19 additions & 0 deletions api/clients/v2/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package clients

import (
"crypto/tls"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

func getGrpcDialOptions(useSecureGrpcFlag bool) []grpc.DialOption {
options := []grpc.DialOption{}
if useSecureGrpcFlag {
options = append(options, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
} else {
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
return options
}
Loading
Loading