Skip to content

Commit

Permalink
Add retry for graph node (#471)
Browse files Browse the repository at this point in the history
  • Loading branch information
mooselumph authored Apr 12, 2024
1 parent ac6f3c1 commit b215782
Show file tree
Hide file tree
Showing 15 changed files with 243 additions and 49 deletions.
53 changes: 53 additions & 0 deletions core/thegraph/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package thegraph

import (
"time"

"github.com/Layr-Labs/eigenda/common"
"github.com/urfave/cli"
)

const (
EndpointFlagName = "thegraph.endpoint"
BackoffFlagName = "thegraph.backoff"
MaxRetriesFlagName = "thegraph.max_retries"
)

type Config struct {
Endpoint string // The Graph endpoint
PullInterval time.Duration // The interval to pull data from The Graph
MaxRetries int // The maximum number of retries to pull data from The Graph
}

func CLIFlags(envPrefix string) []cli.Flag {
return []cli.Flag{
cli.StringFlag{
Name: EndpointFlagName,
Usage: "The Graph endpoint",
Required: true,
EnvVar: common.PrefixEnvVar(envPrefix, "GRAPH_URL"),
},
cli.DurationFlag{
Name: BackoffFlagName,
Usage: "Backoff for retries",
Value: 100 * time.Millisecond,
EnvVar: common.PrefixEnvVar(envPrefix, "GRAPH_BACKOFF"),
},
cli.UintFlag{
Name: MaxRetriesFlagName,
Usage: "The maximum number of retries",
Value: 5,
EnvVar: common.PrefixEnvVar(envPrefix, "GRAPH_MAX_RETRIES"),
},
}
}

func ReadCLIConfig(ctx *cli.Context) Config {

return Config{
Endpoint: ctx.String(EndpointFlagName),
PullInterval: ctx.Duration(BackoffFlagName),
MaxRetries: ctx.Int(MaxRetriesFlagName),
}

}
48 changes: 48 additions & 0 deletions core/thegraph/querier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package thegraph

import (
"context"
"errors"
"time"
)

type RetryQuerier struct {
GraphQLQuerier
Backoff time.Duration
MaxRetries int
}

var _ GraphQLQuerier = (*RetryQuerier)(nil)

func NewRetryQuerier(q GraphQLQuerier, backoff time.Duration, maxRetries int) *RetryQuerier {
return &RetryQuerier{
GraphQLQuerier: q,
Backoff: backoff,
MaxRetries: maxRetries,
}
}

func (q *RetryQuerier) Query(ctx context.Context, query any, variables map[string]any) error {

retryCount := 0
backoff := q.Backoff
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if retryCount > q.MaxRetries {
return errors.New("max retries exceeded")
}
retryCount++

err := q.GraphQLQuerier.Query(ctx, query, variables)
if err == nil {
return nil
}

time.Sleep(backoff)
backoff *= 2
}
}
}
75 changes: 75 additions & 0 deletions core/thegraph/querier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package thegraph_test

import (
"context"
"errors"
"testing"
"time"

"github.com/Layr-Labs/eigenda/core/thegraph"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

type MockGraphQLQuerier struct {
mock.Mock
}

func (m *MockGraphQLQuerier) Query(ctx context.Context, query any, variables map[string]any) error {
args := m.Called(ctx, query, variables)
return args.Error(0)
}

func TestRetryQuerier_Query(t *testing.T) {
ctx := context.Background()
query := "query"
variables := map[string]any{"key": "value"}

mockQuerier := new(MockGraphQLQuerier)
mockQuerier.On("Query", ctx, query, variables).Return(errors.New("query error")).Once()
mockQuerier.On("Query", ctx, query, variables).Return(errors.New("query error")).Once()
mockQuerier.On("Query", ctx, query, variables).Return(nil)

retryQuerier := thegraph.NewRetryQuerier(mockQuerier, time.Millisecond, 2)

err := retryQuerier.Query(ctx, query, variables)
assert.NoError(t, err)

mockQuerier.AssertExpectations(t)
}

func TestRetryQuerier_ExceedMaxRetries(t *testing.T) {
ctx := context.Background()
query := "query"
variables := map[string]any{"key": "value"}

mockQuerier := new(MockGraphQLQuerier)
mockQuerier.On("Query", ctx, query, variables).Return(errors.New("query error")).Once()
mockQuerier.On("Query", ctx, query, variables).Return(errors.New("query error")).Once()
mockQuerier.On("Query", ctx, query, variables).Return(errors.New("query error")).Once()

retryQuerier := thegraph.NewRetryQuerier(mockQuerier, time.Millisecond, 2)

err := retryQuerier.Query(ctx, query, variables)
assert.ErrorContains(t, err, "max retries exceeded")

mockQuerier.AssertExpectations(t)
}

func TestRetryQuerier_Timeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
query := "query"
variables := map[string]any{"key": "value"}

mockQuerier := new(MockGraphQLQuerier)
mockQuerier.On("Query", ctx, query, variables).Return(errors.New("query error")).Once()
mockQuerier.On("Query", ctx, query, variables).Return(errors.New("query error")).Once()
mockQuerier.On("Query", ctx, query, variables).Return(nil)

retryQuerier := thegraph.NewRetryQuerier(mockQuerier, 100*time.Millisecond, 2)

err := retryQuerier.Query(ctx, query, variables)
assert.ErrorContains(t, err, "context deadline exceeded")

}
11 changes: 11 additions & 0 deletions core/thegraph/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ type (

var _ IndexedChainState = (*indexedChainState)(nil)

func MakeIndexedChainState(config Config, cs core.ChainState, logger logging.Logger) *indexedChainState {

logger.Info("Using graph node")
querier := graphql.NewClient(config.Endpoint, nil)

// RetryQuerier is a wrapper around the GraphQLQuerier that retries queries on failure
retryQuerier := NewRetryQuerier(querier, config.PullInterval, config.MaxRetries)

return NewIndexedChainState(cs, retryQuerier, logger)
}

func NewIndexedChainState(cs core.ChainState, querier GraphQLQuerier, logger logging.Logger) *indexedChainState {
return &indexedChainState{
ChainState: cs,
Expand Down
2 changes: 2 additions & 0 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type TimeoutConfig struct {
AttestationTimeout time.Duration
ChainReadTimeout time.Duration
ChainWriteTimeout time.Duration
ChainStateTimeout time.Duration
}

type Config struct {
Expand Down Expand Up @@ -114,6 +115,7 @@ func NewBatcher(
TargetNumChunks: config.TargetNumChunks,
MaxBlobsToFetchFromStore: config.MaxBlobsToFetchFromStore,
FinalizationBlockDelay: config.FinalizationBlockDelay,
ChainStateTimeout: timeoutConfig.ChainStateTimeout,
}
encodingWorkerPool := workerpool.New(config.NumConnections)
encodingStreamer, err := NewEncodingStreamer(streamerConfig, queue, chainState, encoderClient, assignmentCoordinator, batchTrigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger)
Expand Down
13 changes: 11 additions & 2 deletions disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type StreamerConfig struct {
// EncodingRequestTimeout is the timeout for each encoding request
EncodingRequestTimeout time.Duration

// ChainStateTimeout is the timeout used for getting the chainstate
ChainStateTimeout time.Duration

// EncodingQueueLimit is the maximum number of encoding requests that can be queued
EncodingQueueLimit int

Expand Down Expand Up @@ -249,7 +252,10 @@ func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan
e.logger.Debug("new metadatas to encode", "numMetadata", len(metadatas), "duration", time.Since(stageTimer))

// Get the operator state
state, err := e.getOperatorState(ctx, metadatas, referenceBlockNumber)

timeoutCtx, cancel := context.WithTimeout(ctx, e.ChainStateTimeout)
defer cancel()
state, err := e.getOperatorState(timeoutCtx, metadatas, referenceBlockNumber)
if err != nil {
return fmt.Errorf("error getting operator state: %w", err)
}
Expand Down Expand Up @@ -532,7 +538,10 @@ func (e *EncodingStreamer) CreateBatch() (*batch, error) {
i++
}

state, err := e.getOperatorState(context.Background(), metadatas, e.ReferenceBlockNumber)
timeoutCtx, cancel := context.WithTimeout(context.Background(), e.ChainStateTimeout)
defer cancel()

state, err := e.getOperatorState(timeoutCtx, metadatas, e.ReferenceBlockNumber)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions disperser/cmd/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/aws"
"github.com/Layr-Labs/eigenda/common/geth"
"github.com/Layr-Labs/eigenda/core/thegraph"
"github.com/Layr-Labs/eigenda/disperser/batcher"
"github.com/Layr-Labs/eigenda/disperser/cmd/batcher/flags"
"github.com/Layr-Labs/eigenda/disperser/common/blobstore"
Expand All @@ -23,7 +24,7 @@ type Config struct {
MetricsConfig batcher.MetricsConfig
IndexerConfig indexer.Config
FireblocksConfig common.FireblocksConfig
GraphUrl string
ChainStateConfig thegraph.Config
UseGraph bool

IndexerDataDir string
Expand Down Expand Up @@ -70,13 +71,14 @@ func NewConfig(ctx *cli.Context) (Config, error) {
AttestationTimeout: ctx.GlobalDuration(flags.AttestationTimeoutFlag.Name),
ChainReadTimeout: ctx.GlobalDuration(flags.ChainReadTimeoutFlag.Name),
ChainWriteTimeout: ctx.GlobalDuration(flags.ChainWriteTimeoutFlag.Name),
ChainStateTimeout: ctx.GlobalDuration(flags.ChainStateTimeoutFlag.Name),
},
MetricsConfig: batcher.MetricsConfig{
HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name),
EnableMetrics: ctx.GlobalBool(flags.EnableMetrics.Name),
},
ChainStateConfig: thegraph.ReadCLIConfig(ctx),
UseGraph: ctx.Bool(flags.UseGraphFlag.Name),
GraphUrl: ctx.GlobalString(flags.GraphUrlFlag.Name),
BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name),
EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name),
IndexerDataDir: ctx.GlobalString(flags.IndexerDataDirFlag.Name),
Expand Down
17 changes: 10 additions & 7 deletions disperser/cmd/batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/aws"
"github.com/Layr-Labs/eigenda/common/geth"
"github.com/Layr-Labs/eigenda/core/thegraph"
"github.com/Layr-Labs/eigenda/indexer"
"github.com/urfave/cli"
)
Expand Down Expand Up @@ -59,12 +60,6 @@ var (
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "ENABLE_METRICS"),
}
GraphUrlFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "graph-url"),
Usage: "The url of the graph node",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "GRAPH_URL"),
}
UseGraphFlag = cli.BoolFlag{
Name: common.PrefixFlag(FlagPrefix, "use-graph"),
Usage: "Whether to use the graph node",
Expand Down Expand Up @@ -119,6 +114,13 @@ var (
Value: 90 * time.Second,
EnvVar: common.PrefixEnvVar(envVarPrefix, "CHAIN_WRITE_TIMEOUT"),
}
ChainStateTimeoutFlag = cli.DurationFlag{
Name: "chain-state-timeout",
Usage: "connection timeout to read state from chain",
Required: false,
Value: 15 * time.Second,
EnvVar: common.PrefixEnvVar(envVarPrefix, "CHAIN_STATE_TIMEOUT"),
}
NumConnectionsFlag = cli.IntFlag{
Name: "num-connections",
Usage: "maximum number of connections to encoders (defaults to 256)",
Expand Down Expand Up @@ -194,7 +196,6 @@ var requiredFlags = []cli.Flag{
EigenDAServiceManagerFlag,
EncoderSocket,
EnableMetrics,
GraphUrlFlag,
BatchSizeLimitFlag,
UseGraphFlag,
SRSOrderFlag,
Expand All @@ -207,6 +208,7 @@ var optionalFlags = []cli.Flag{
AttestationTimeoutFlag,
ChainReadTimeoutFlag,
ChainWriteTimeoutFlag,
ChainStateTimeoutFlag,
NumConnectionsFlag,
FinalizerIntervalFlag,
FinalizerPoolSizeFlag,
Expand All @@ -227,4 +229,5 @@ func init() {
Flags = append(Flags, indexer.CLIFlags(envVarPrefix)...)
Flags = append(Flags, aws.ClientFlags(envVarPrefix, FlagPrefix)...)
Flags = append(Flags, common.FireblocksCLIFlags(envVarPrefix, FlagPrefix)...)
Flags = append(Flags, thegraph.CLIFlags(envVarPrefix)...)
}
8 changes: 3 additions & 5 deletions disperser/cmd/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"os"
"time"

"github.com/shurcooL/graphql"

"github.com/Layr-Labs/eigenda/common"
coreindexer "github.com/Layr-Labs/eigenda/core/indexer"
"github.com/Layr-Labs/eigenda/core/thegraph"
Expand Down Expand Up @@ -140,9 +138,9 @@ func RunBatcher(ctx *cli.Context) error {
var ics core.IndexedChainState
if config.UseGraph {
logger.Info("Using graph node")
querier := graphql.NewClient(config.GraphUrl, nil)
logger.Info("Connecting to subgraph", "url", config.GraphUrl)
ics = thegraph.NewIndexedChainState(cs, querier, logger)

logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint)
ics = thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger)
} else {
logger.Info("Using built-in indexer")

Expand Down
8 changes: 5 additions & 3 deletions operators/churner/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/Layr-Labs/eigenda/operators/churner"
"github.com/Layr-Labs/eigenda/operators/churner/flags"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/shurcooL/graphql"
"github.com/urfave/cli"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
Expand Down Expand Up @@ -82,8 +81,11 @@ func run(ctx *cli.Context) error {

cs := coreeth.NewChainState(tx, gethClient)

querier := graphql.NewClient(config.GraphUrl, nil)
indexer := thegraph.NewIndexedChainState(cs, querier, logger)
logger.Info("Using graph node")

logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint)
indexer := thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger)

metrics := churner.NewMetrics(config.MetricsConfig.HTTPPort, logger)

cn, err := churner.NewChurner(config, indexer, tx, logger, metrics)
Expand Down
Loading

0 comments on commit b215782

Please sign in to comment.