Skip to content

Commit

Permalink
separate out client v2 package
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Dec 12, 2024
1 parent 1f84a21 commit 2ae40a7
Show file tree
Hide file tree
Showing 19 changed files with 95 additions and 73 deletions.
2 changes: 1 addition & 1 deletion api/clients/accountant.go → api/clients/v2/accountant.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package clients
package v2

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package clients
package v2

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package clients
package v2

import (
"context"
Expand All @@ -16,21 +16,21 @@ import (
"google.golang.org/grpc"
)

type DisperserClientV2Config struct {
type DisperserClientConfig struct {
Hostname string
Port string
UseSecureGrpcFlag bool
}

type DisperserClientV2 interface {
type DisperserClient interface {
Close() error
DisperseBlob(ctx context.Context, data []byte, blobVersion corev2.BlobVersion, quorums []core.QuorumID, salt uint32) (*dispv2.BlobStatus, corev2.BlobKey, error)
GetBlobStatus(ctx context.Context, blobKey corev2.BlobKey) (*disperser_rpc.BlobStatusReply, error)
GetBlobCommitment(ctx context.Context, data []byte) (*disperser_rpc.BlobCommitmentReply, error)
}

type disperserClientV2 struct {
config *DisperserClientV2Config
type disperserClient struct {
config *DisperserClientConfig
signer corev2.BlobRequestSigner
initOnce sync.Once
conn *grpc.ClientConn
Expand All @@ -39,18 +39,18 @@ type disperserClientV2 struct {
accountant *Accountant
}

var _ DisperserClientV2 = &disperserClientV2{}
var _ DisperserClient = &disperserClient{}

// DisperserClientV2 maintains a single underlying grpc connection to the disperser server,
// DisperserClient maintains a single underlying grpc connection to the disperser server,
// through which it sends requests to disperse blobs and get blob status.
// The connection is established lazily on the first method call. Don't forget to call Close(),
// which is safe to call even if the connection was never established.
//
// DisperserClientV2 is safe to be used concurrently by multiple goroutines.
// DisperserClient is safe to be used concurrently by multiple goroutines.
//
// Example usage:
//
// client := NewDisperserClientV2(config, signer)
// client := NewDisperserClient(config, signer)
// defer client.Close()
//
// // The connection will be established on the first call
Expand All @@ -61,7 +61,7 @@ var _ DisperserClientV2 = &disperserClientV2{}
//
// // Subsequent calls will use the existing connection
// status2, blobKey2, err := client.DisperseBlob(ctx, data, blobHeader)
func NewDisperserClientV2(config *DisperserClientV2Config, signer corev2.BlobRequestSigner, prover encoding.Prover, accountant *Accountant) (*disperserClientV2, error) {
func NewDisperserClient(config *DisperserClientConfig, signer corev2.BlobRequestSigner, prover encoding.Prover, accountant *Accountant) (*disperserClient, error) {
if config == nil {
return nil, api.NewErrorInvalidArg("config must be provided")
}
Expand All @@ -75,7 +75,7 @@ func NewDisperserClientV2(config *DisperserClientV2Config, signer corev2.BlobReq
return nil, api.NewErrorInvalidArg("signer must be provided")
}

return &disperserClientV2{
return &disperserClient{
config: config,
signer: signer,
prover: prover,
Expand All @@ -85,7 +85,7 @@ func NewDisperserClientV2(config *DisperserClientV2Config, signer corev2.BlobReq
}

// PopulateAccountant populates the accountant with the payment state from the disperser.
func (c *disperserClientV2) PopulateAccountant(ctx context.Context) error {
func (c *disperserClient) PopulateAccountant(ctx context.Context) error {
paymentState, err := c.GetPaymentState(ctx)
if err != nil {
return fmt.Errorf("error getting payment state for initializing accountant: %w", err)
Expand All @@ -100,7 +100,7 @@ func (c *disperserClientV2) PopulateAccountant(ctx context.Context) error {

// Close closes the grpc connection to the disperser server.
// It is thread safe and can be called multiple times.
func (c *disperserClientV2) Close() error {
func (c *disperserClient) Close() error {
if c.conn != nil {
err := c.conn.Close()
c.conn = nil
Expand All @@ -110,7 +110,7 @@ func (c *disperserClientV2) Close() error {
return nil
}

func (c *disperserClientV2) DisperseBlob(
func (c *disperserClient) DisperseBlob(
ctx context.Context,
data []byte,
blobVersion corev2.BlobVersion,
Expand Down Expand Up @@ -217,7 +217,7 @@ func (c *disperserClientV2) DisperseBlob(
}

// GetBlobStatus returns the status of a blob with the given blob key.
func (c *disperserClientV2) GetBlobStatus(ctx context.Context, blobKey corev2.BlobKey) (*disperser_rpc.BlobStatusReply, error) {
func (c *disperserClient) GetBlobStatus(ctx context.Context, blobKey corev2.BlobKey) (*disperser_rpc.BlobStatusReply, error) {
err := c.initOnceGrpcConnection()
if err != nil {
return nil, api.NewErrorInternal(err.Error())
Expand All @@ -230,7 +230,7 @@ func (c *disperserClientV2) GetBlobStatus(ctx context.Context, blobKey corev2.Bl
}

// GetPaymentState returns the payment state of the disperser client
func (c *disperserClientV2) GetPaymentState(ctx context.Context) (*disperser_rpc.GetPaymentStateReply, error) {
func (c *disperserClient) GetPaymentState(ctx context.Context) (*disperser_rpc.GetPaymentStateReply, error) {
err := c.initOnceGrpcConnection()
if err != nil {
return nil, api.NewErrorInternal(err.Error())
Expand All @@ -257,7 +257,7 @@ func (c *disperserClientV2) GetPaymentState(ctx context.Context) (*disperser_rpc
// While the blob commitment can be calculated by anyone, it requires SRS points to
// be loaded. For service that does not have access to SRS points, this method can be
// used to calculate the blob commitment in blob header, which is required for dispersal.
func (c *disperserClientV2) GetBlobCommitment(ctx context.Context, data []byte) (*disperser_rpc.BlobCommitmentReply, error) {
func (c *disperserClient) GetBlobCommitment(ctx context.Context, data []byte) (*disperser_rpc.BlobCommitmentReply, error) {
err := c.initOnceGrpcConnection()
if err != nil {
return nil, api.NewErrorInternal(err.Error())
Expand All @@ -271,7 +271,7 @@ func (c *disperserClientV2) GetBlobCommitment(ctx context.Context, data []byte)

// initOnceGrpcConnection initializes the grpc connection and client if they are not already initialized.
// If initialization fails, it caches the error and will return it on every subsequent call.
func (c *disperserClientV2) initOnceGrpcConnection() error {
func (c *disperserClient) initOnceGrpcConnection() error {
var initErr error
c.initOnce.Do(func() {
addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package mock
import (
"context"

"github.com/Layr-Labs/eigenda/api/clients"
clients "github.com/Layr-Labs/eigenda/api/clients/v2"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/stretchr/testify/mock"
Expand All @@ -13,7 +13,7 @@ type MockNodeClientV2 struct {
mock.Mock
}

var _ clients.NodeClientV2 = (*MockNodeClientV2)(nil)
var _ clients.NodeClient = (*MockNodeClientV2)(nil)

func NewNodeClientV2() *MockNodeClientV2 {
return &MockNodeClientV2{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package mock
import (
"context"

"github.com/Layr-Labs/eigenda/api/clients"
clients "github.com/Layr-Labs/eigenda/api/clients/v2"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/stretchr/testify/mock"
)
Expand Down
22 changes: 11 additions & 11 deletions api/clients/node_client_v2.go → api/clients/v2/node_client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package clients
package v2

import (
"context"
Expand All @@ -12,37 +12,37 @@ import (
"google.golang.org/grpc"
)

type NodeClientV2Config struct {
type NodeClientConfig struct {
Hostname string
Port string
UseSecureGrpcFlag bool
}

type NodeClientV2 interface {
type NodeClient interface {
StoreChunks(ctx context.Context, certs *corev2.Batch) (*core.Signature, error)
Close() error
}

type nodeClientV2 struct {
config *NodeClientV2Config
type nodeClient struct {
config *NodeClientConfig
initOnce sync.Once
conn *grpc.ClientConn

dispersalClient nodegrpc.DispersalClient
}

var _ NodeClientV2 = (*nodeClientV2)(nil)
var _ NodeClient = (*nodeClient)(nil)

func NewNodeClientV2(config *NodeClientV2Config) (*nodeClientV2, error) {
func NewNodeClient(config *NodeClientConfig) (*nodeClient, error) {
if config == nil || config.Hostname == "" || config.Port == "" {
return nil, fmt.Errorf("invalid config: %v", config)
}
return &nodeClientV2{
return &nodeClient{
config: config,
}, nil
}

func (c *nodeClientV2) StoreChunks(ctx context.Context, batch *corev2.Batch) (*core.Signature, error) {
func (c *nodeClient) StoreChunks(ctx context.Context, batch *corev2.Batch) (*core.Signature, error) {
if len(batch.BlobCertificates) == 0 {
return nil, fmt.Errorf("no blob certificates in the batch")
}
Expand Down Expand Up @@ -89,7 +89,7 @@ func (c *nodeClientV2) StoreChunks(ctx context.Context, batch *corev2.Batch) (*c

// Close closes the grpc connection to the disperser server.
// It is thread safe and can be called multiple times.
func (c *nodeClientV2) Close() error {
func (c *nodeClient) Close() error {
if c.conn != nil {
err := c.conn.Close()
c.conn = nil
Expand All @@ -99,7 +99,7 @@ func (c *nodeClientV2) Close() error {
return nil
}

func (c *nodeClientV2) initOnceGrpcConnection() error {
func (c *nodeClient) initOnceGrpcConnection() error {
var initErr error
c.initOnce.Do(func() {
addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package clients
package v2

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package clients
package v2

import (
"context"
"errors"
"fmt"

"github.com/Layr-Labs/eigenda/api/clients"
grpcnode "github.com/Layr-Labs/eigenda/api/grpc/node/v2"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
Expand All @@ -16,30 +17,30 @@ import (
"github.com/gammazero/workerpool"
)

// RetrievalClientV2 is an object that can retrieve blobs from the DA nodes.
// RetrievalClient is an object that can retrieve blobs from the DA nodes.
// To retrieve a blob from the relay, use RelayClient instead.
type RetrievalClientV2 interface {
type RetrievalClient interface {
// GetBlob downloads chunks of a blob from operator network and reconstructs the blob.
GetBlob(ctx context.Context, blobHeader corev2.BlobHeader, referenceBlockNumber uint64, quorumID core.QuorumID) ([]byte, error)
}

type retrievalClientV2 struct {
type retrievalClient struct {
logger logging.Logger
ethClient core.Reader
indexedChainState core.IndexedChainState
verifier encoding.Verifier
numConnections int
}

// NewRetrievalClientV2 creates a new retrieval client.
func NewRetrievalClientV2(
// NewRetrievalClient creates a new retrieval client.
func NewRetrievalClient(
logger logging.Logger,
ethClient core.Reader,
chainState core.IndexedChainState,
verifier encoding.Verifier,
numConnections int,
) RetrievalClientV2 {
return &retrievalClientV2{
) RetrievalClient {
return &retrievalClient{
logger: logger.With("component", "RetrievalClient"),
ethClient: ethClient,
indexedChainState: chainState,
Expand All @@ -48,7 +49,7 @@ func NewRetrievalClientV2(
}
}

func (r *retrievalClientV2) GetBlob(ctx context.Context, blobHeader corev2.BlobHeader, referenceBlockNumber uint64, quorumID core.QuorumID) ([]byte, error) {
func (r *retrievalClient) GetBlob(ctx context.Context, blobHeader corev2.BlobHeader, referenceBlockNumber uint64, quorumID core.QuorumID) ([]byte, error) {
blobKey, err := blobHeader.BlobKey()
if err != nil {
return nil, err
Expand Down Expand Up @@ -90,7 +91,7 @@ func (r *retrievalClientV2) GetBlob(ctx context.Context, blobHeader corev2.BlobH
}

// Fetch chunks from all operators
chunksChan := make(chan RetrievedChunks, len(operators))
chunksChan := make(chan clients.RetrievedChunks, len(operators))
pool := workerpool.New(r.numConnections)
for opID := range operators {
opID := opID
Expand Down Expand Up @@ -139,13 +140,13 @@ func (r *retrievalClientV2) GetBlob(ctx context.Context, blobHeader corev2.BlobH
)
}

func (r *retrievalClientV2) getChunksFromOperator(
func (r *retrievalClient) getChunksFromOperator(
ctx context.Context,
opID core.OperatorID,
opInfo *core.IndexedOperatorInfo,
blobKey corev2.BlobKey,
quorumID core.QuorumID,
chunksChan chan RetrievedChunks,
chunksChan chan clients.RetrievedChunks,
) {
conn, err := grpc.NewClient(
core.OperatorSocket(opInfo.Socket).GetRetrievalSocket(),
Expand All @@ -158,7 +159,7 @@ func (r *retrievalClientV2) getChunksFromOperator(
}
}()
if err != nil {
chunksChan <- RetrievedChunks{
chunksChan <- clients.RetrievedChunks{
OperatorID: opID,
Err: err,
Chunks: nil,
Expand All @@ -174,7 +175,7 @@ func (r *retrievalClientV2) getChunksFromOperator(

reply, err := n.GetChunks(ctx, request)
if err != nil {
chunksChan <- RetrievedChunks{
chunksChan <- clients.RetrievedChunks{
OperatorID: opID,
Err: err,
Chunks: nil,
Expand All @@ -187,7 +188,7 @@ func (r *retrievalClientV2) getChunksFromOperator(
var chunk *encoding.Frame
chunk, err = new(encoding.Frame).DeserializeGnark(data)
if err != nil {
chunksChan <- RetrievedChunks{
chunksChan <- clients.RetrievedChunks{
OperatorID: opID,
Err: err,
Chunks: nil,
Expand All @@ -197,7 +198,7 @@ func (r *retrievalClientV2) getChunksFromOperator(

chunks[i] = chunk
}
chunksChan <- RetrievedChunks{
chunksChan <- clients.RetrievedChunks{
OperatorID: opID,
Err: nil,
Chunks: chunks,
Expand Down
19 changes: 19 additions & 0 deletions api/clients/v2/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package v2

import (
"crypto/tls"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

func getGrpcDialOptions(useSecureGrpcFlag bool) []grpc.DialOption {
options := []grpc.DialOption{}
if useSecureGrpcFlag {
options = append(options, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
} else {
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
return options
}
Loading

0 comments on commit 2ae40a7

Please sign in to comment.