Skip to content

Commit

Permalink
Add graph indexer to retriever (#124)
Browse files Browse the repository at this point in the history
  • Loading branch information
mooselumph authored Dec 15, 2023
1 parent 1c9add1 commit 459b0fb
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 31 deletions.
16 changes: 3 additions & 13 deletions clients/retrieval_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ import (
"github.com/gammazero/workerpool"
"github.com/wealdtech/go-merkletree"
"github.com/wealdtech/go-merkletree/keccak256"

coreindexer "github.com/Layr-Labs/eigenda/core/indexer"
"github.com/Layr-Labs/eigenda/indexer"
)

type RetrievalClient interface {
Expand All @@ -38,23 +35,16 @@ var _ RetrievalClient = (*retrievalClient)(nil)

func NewRetrievalClient(
logger common.Logger,
chainState core.ChainState,
indexer indexer.Indexer,
chainState core.IndexedChainState,
assignmentCoordinator core.AssignmentCoordinator,
nodeClient NodeClient,
encoder core.Encoder,
numConnections int,
) (*retrievalClient, error) {
indexedState, err := coreindexer.NewIndexedChainState(
chainState,
indexer,
)
if err != nil {
return nil, err
}

return &retrievalClient{
logger: logger,
indexedChainState: indexedState,
indexedChainState: chainState,
assignmentCoordinator: assignmentCoordinator,
nodeClient: nodeClient,
encoder: encoder,
Expand Down
8 changes: 7 additions & 1 deletion clients/tests/retrieval_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,13 @@ func setup(t *testing.T) {

indexer = &indexermock.MockIndexer{}
indexer.On("Index").Return(nil).Once()
retrievalClient, err = clients.NewRetrievalClient(logger, chainState, indexer, coordinator, nodeClient, encoder, 2)

ics, err := coreindexer.NewIndexedChainState(chainState, indexer)
if err != nil {
panic("failed to create a new indexed chain state")
}

retrievalClient, err = clients.NewRetrievalClient(logger, ics, coordinator, nodeClient, encoder, 2)
if err != nil {
panic("failed to create a new retrieval client")
}
Expand Down
7 changes: 6 additions & 1 deletion inabox/tests/integration_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,12 @@ func setupRetrievalClient(testConfig *deploy.Config) error {
return err
}

retrievalClient, err = clients.NewRetrievalClient(logger, cs, indexer, agn, nodeClient, encoder, 10)
ics, err := coreindexer.NewIndexedChainState(cs, indexer)
if err != nil {
return err
}

retrievalClient, err = clients.NewRetrievalClient(logger, ics, agn, nodeClient, encoder, 10)
if err != nil {
return err
}
Expand Down
43 changes: 27 additions & 16 deletions retriever/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
"github.com/Layr-Labs/eigenda/core/encoding"
"github.com/Layr-Labs/eigenda/core/eth"
coreindexer "github.com/Layr-Labs/eigenda/core/indexer"
"github.com/Layr-Labs/eigenda/core/thegraph"
"github.com/Layr-Labs/eigenda/retriever"
retrivereth "github.com/Layr-Labs/eigenda/retriever/eth"
"github.com/Layr-Labs/eigenda/retriever/flags"
"github.com/ethereum/go-ethereum/rpc"
"github.com/shurcooL/graphql"
"github.com/urfave/cli"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
Expand Down Expand Up @@ -98,30 +100,39 @@ func RetrieverMain(ctx *cli.Context) error {
log.Fatalln("could not start tcp listener", err)
}

indexer, err := coreindexer.CreateNewIndexer(
&config.IndexerConfig,
gethClient,
rpcClient,
config.EigenDAServiceManagerAddr,
logger,
)
if err != nil {
log.Fatalln("could not start tcp listener", err)
var ics core.IndexedChainState
if config.UseGraph {
logger.Info("Using graph node")
querier := graphql.NewClient(config.GraphUrl, nil)
logger.Info("Connecting to subgraph", "url", config.GraphUrl)
ics = thegraph.NewIndexedChainState(cs, querier, logger)
} else {
logger.Info("Using built-in indexer")

indexer, err := coreindexer.CreateNewIndexer(
&config.IndexerConfig,
gethClient,
rpcClient,
config.EigenDAServiceManagerAddr,
logger,
)
if err != nil {
return err
}
ics, err = coreindexer.NewIndexedChainState(cs, indexer)
if err != nil {
return err
}
}

agn := &core.StdAssignmentCoordinator{}
retrievalClient, err := clients.NewRetrievalClient(logger, cs, indexer, agn, nodeClient, encoder, config.NumConnections)
if err != nil {
log.Fatalln("could not start tcp listener", err)
}

indexedState, err := coreindexer.NewIndexedChainState(cs, indexer)
retrievalClient, err := clients.NewRetrievalClient(logger, ics, agn, nodeClient, encoder, config.NumConnections)
if err != nil {
log.Fatalln("could not start tcp listener", err)
}

chainClient := retrivereth.NewChainClient(gethClient, logger)
retrieverServiceServer := retriever.NewServer(config, logger, retrievalClient, encoder, indexedState, chainClient)
retrieverServiceServer := retriever.NewServer(config, logger, retrievalClient, encoder, ics, chainClient)
if err = retrieverServiceServer.Start(context.Background()); err != nil {
log.Fatalln("failed to start retriever service server", err)
}
Expand Down
4 changes: 4 additions & 0 deletions retriever/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type Config struct {
NumConnections int
BLSOperatorStateRetrieverAddr string
EigenDAServiceManagerAddr string
GraphUrl string
UseGraph bool
}

func NewConfig(ctx *cli.Context) *Config {
Expand All @@ -39,5 +41,7 @@ func NewConfig(ctx *cli.Context) *Config {
NumConnections: ctx.Int(flags.NumConnectionsFlag.Name),
BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name),
EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name),
GraphUrl: ctx.GlobalString(flags.GraphUrlFlag.Name),
UseGraph: ctx.GlobalBool(flags.UseGraphFlag.Name),
}
}
14 changes: 14 additions & 0 deletions retriever/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ var (
Value: "9100",
EnvVar: common.PrefixEnvVar(envPrefix, "METRICS_HTTP_PORT"),
}
GraphUrlFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "graph-url"),
Usage: "The url of the graph node",
Required: false,
EnvVar: common.PrefixEnvVar(envPrefix, "GRAPH_URL"),
}
UseGraphFlag = cli.BoolFlag{
Name: common.PrefixFlag(FlagPrefix, "use-graph"),
Usage: "Whether to use the graph node",
Required: false,
EnvVar: common.PrefixEnvVar(envPrefix, "USE_GRAPH"),
}
)

var requiredFlags = []cli.Flag{
Expand All @@ -82,6 +94,8 @@ var optionalFlags = []cli.Flag{
NumConnectionsFlag,
IndexerDataDirFlag,
MetricsHTTPPortFlag,
GraphUrlFlag,
UseGraphFlag,
}

// Flags contains the list of configuration options available to the binary.
Expand Down

0 comments on commit 459b0fb

Please sign in to comment.