diff --git a/README.md b/README.md index 25fcc9c..0a28762 100644 --- a/README.md +++ b/README.md @@ -294,6 +294,61 @@ committer: blocksPerCommit: 1000 ``` +#### Reorg Handler +Whether to enable the reorg handler. Default is `true`. + +cmd: `--reorgHandler-enabled` +env: `REORGHANDLER_ENABLED` +yaml: +```yaml +reorgHandler: + enabled: true +``` + +#### Reorg Handler Interval +Reorg handler trigger interval in milliseconds. Default is `1000`. + +cmd: `--reorgHandler-interval` +env: `REORGHANDLER_INTERVAL` +yaml: +```yaml +reorgHandler: + interval: 3000 +``` + +#### Reorg Handler Blocks Per Scan +How many blocks to scan for reorgs. Default is `100`. + +cmd: `--reorgHandler-blocks-per-scan` +env: `REORGHANDLER_BLOCKSPERSCAN` +yaml: +```yaml +reorgHandler: + blocksPerScan: 1000 +``` + +#### Reorg Handler From Block +From which block to start scanning for reorgs. Default is `0`. + +cmd: `--reorgHandler-from-block` +env: `REORGHANDLER_FROMBLOCK` +yaml: +```yaml +reorgHandler: + fromBlock: 20000000 +``` + +#### Reorg Handler Force From Block +Whether to force the reorg handler to start from the block specified in `reorgHandler-from-block`. Default is `false`. + +cmd: `--reorgHandler-force-from-block` +env: `REORGHANDLER_FORCEFROMBLOCK` +yaml: +```yaml +reorgHandler: + forceFromBlock: true +``` + #### Failure Recoverer Whether to enable the failure recoverer. Default is `true`. diff --git a/cmd/root.go b/cmd/root.go index 0ab2b89..2f1c4e6 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -55,6 +55,11 @@ func init() { rootCmd.PersistentFlags().Bool("committer-enabled", true, "Toggle committer") rootCmd.PersistentFlags().Int("committer-blocks-per-commit", 10, "How many blocks to commit each interval") rootCmd.PersistentFlags().Int("committer-interval", 1000, "How often to commit blocks in milliseconds") + rootCmd.PersistentFlags().Bool("reorgHandler-enabled", true, "Toggle reorg handler") + rootCmd.PersistentFlags().Int("reorgHandler-interval", 1000, "How often to run reorg handler in milliseconds") + rootCmd.PersistentFlags().Int("reorgHandler-blocks-per-scan", 100, "How many blocks to scan for reorgs") + rootCmd.PersistentFlags().Int("reorgHandler-from-block", 0, "From which block to start scanning for reorgs") + rootCmd.PersistentFlags().Bool("reorgHandler-force-from-block", false, "Force the reorg handler to start from the block specified in `reorgHandler-from-block`") rootCmd.PersistentFlags().Bool("failure-recoverer-enabled", true, "Toggle failure recoverer") rootCmd.PersistentFlags().Int("failure-recoverer-blocks-per-run", 10, "How many blocks to run failure recoverer for") rootCmd.PersistentFlags().Int("failure-recoverer-interval", 1000, "How often to run failure recoverer in milliseconds") @@ -97,6 +102,11 @@ func init() { viper.BindPFlag("committer.enabled", rootCmd.PersistentFlags().Lookup("committer-enabled")) viper.BindPFlag("committer.blocksPerCommit", rootCmd.PersistentFlags().Lookup("committer-blocks-per-commit")) viper.BindPFlag("committer.interval", rootCmd.PersistentFlags().Lookup("committer-interval")) + viper.BindPFlag("reorgHandler.enabled", rootCmd.PersistentFlags().Lookup("reorgHandler-enabled")) + viper.BindPFlag("reorgHandler.interval", rootCmd.PersistentFlags().Lookup("reorgHandler-interval")) + viper.BindPFlag("reorgHandler.blocksPerScan", rootCmd.PersistentFlags().Lookup("reorgHandler-blocks-per-scan")) + viper.BindPFlag("reorgHandler.fromBlock", rootCmd.PersistentFlags().Lookup("reorgHandler-from-block")) + viper.BindPFlag("reorgHandler.forceFromBlock", rootCmd.PersistentFlags().Lookup("reorgHandler-force-from-block")) viper.BindPFlag("failureRecoverer.enabled", rootCmd.PersistentFlags().Lookup("failure-recoverer-enabled")) viper.BindPFlag("failureRecoverer.blocksPerRun", rootCmd.PersistentFlags().Lookup("failure-recoverer-blocks-per-run")) viper.BindPFlag("failureRecoverer.interval", rootCmd.PersistentFlags().Lookup("failure-recoverer-interval")) diff --git a/configs/config.go b/configs/config.go index 90256d9..7084efe 100644 --- a/configs/config.go +++ b/configs/config.go @@ -28,6 +28,14 @@ type CommitterConfig struct { BlocksPerCommit int `mapstructure:"blocksPerCommit"` } +type ReorgHandlerConfig struct { + Enabled bool `mapstructure:"enabled"` + Interval int `mapstructure:"interval"` + BlocksPerScan int `mapstructure:"blocksPerScan"` + FromBlock int `mapstructure:"fromBlock"` + ForceFromBlock bool `mapstructure:"forceFromBlock"` +} + type FailureRecovererConfig struct { Enabled bool `mapstructure:"enabled"` Interval int `mapstructure:"interval"` @@ -96,6 +104,7 @@ type Config struct { Poller PollerConfig `mapstructure:"poller"` Committer CommitterConfig `mapstructure:"committer"` FailureRecoverer FailureRecovererConfig `mapstructure:"failureRecoverer"` + ReorgHandler ReorgHandlerConfig `mapstructure:"reorgHandler"` Storage StorageConfig `mapstructure:"storage"` } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 1faf4b2..704e993 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -69,3 +69,16 @@ var ( Help: "The first block number in the failure recoverer batch", }) ) + +// Reorg Handler Metrics +var ( + ReorgHandlerLastCheckedBlock = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "reorg_handler_last_checked_block", + Help: "The last block number that the reorg handler checked", + }) + + ReorgCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "reorg_handler_reorg_counter", + Help: "The number of reorgs detected", + }) +) diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index e4cffa3..41eadab 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -14,6 +14,7 @@ type Orchestrator struct { pollerEnabled bool failureRecovererEnabled bool committerEnabled bool + reorgHandlerEnabled bool } func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) { @@ -28,6 +29,7 @@ func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) { pollerEnabled: config.Cfg.Poller.Enabled, failureRecovererEnabled: config.Cfg.FailureRecoverer.Enabled, committerEnabled: config.Cfg.Committer.Enabled, + reorgHandlerEnabled: config.Cfg.ReorgHandler.Enabled, }, nil } @@ -61,6 +63,15 @@ func (o *Orchestrator) Start() { }() } + if o.reorgHandlerEnabled { + wg.Add(1) + go func() { + defer wg.Done() + reorgHandler := NewReorgHandler(o.rpc, o.storage) + reorgHandler.Start() + }() + } + // The chain tracker is always running wg.Add(1) go func() { diff --git a/internal/orchestrator/reorg_handler.go b/internal/orchestrator/reorg_handler.go new file mode 100644 index 0000000..0a7309e --- /dev/null +++ b/internal/orchestrator/reorg_handler.go @@ -0,0 +1,200 @@ +package orchestrator + +import ( + "fmt" + "math/big" + "time" + + "github.com/rs/zerolog/log" + config "github.com/thirdweb-dev/indexer/configs" + "github.com/thirdweb-dev/indexer/internal/common" + "github.com/thirdweb-dev/indexer/internal/metrics" + "github.com/thirdweb-dev/indexer/internal/rpc" + "github.com/thirdweb-dev/indexer/internal/storage" + "github.com/thirdweb-dev/indexer/internal/worker" +) + +type ReorgHandler struct { + rpc rpc.Client + storage storage.IStorage + triggerInterval int + blocksPerScan int + lastCheckedBlock *big.Int + worker *worker.Worker +} + +const DEFAULT_REORG_HANDLER_INTERVAL = 1000 +const DEFAULT_REORG_HANDLER_BLOCKS_PER_SCAN = 100 + +func NewReorgHandler(rpc rpc.Client, storage storage.IStorage) *ReorgHandler { + triggerInterval := config.Cfg.ReorgHandler.Interval + if triggerInterval == 0 { + triggerInterval = DEFAULT_REORG_HANDLER_INTERVAL + } + blocksPerScan := config.Cfg.ReorgHandler.BlocksPerScan + if blocksPerScan == 0 { + blocksPerScan = DEFAULT_REORG_HANDLER_BLOCKS_PER_SCAN + } + return &ReorgHandler{ + rpc: rpc, + storage: storage, + worker: worker.NewWorker(rpc), + triggerInterval: triggerInterval, + blocksPerScan: blocksPerScan, + lastCheckedBlock: getInitialCheckedBlockNumber(storage, rpc.ChainID), + } +} + +func getInitialCheckedBlockNumber(storage storage.IStorage, chainId *big.Int) *big.Int { + bn := big.NewInt(int64(config.Cfg.ReorgHandler.FromBlock)) + if !config.Cfg.ReorgHandler.ForceFromBlock { + storedFromBlock, err := storage.OrchestratorStorage.GetLastReorgCheckedBlockNumber(chainId) + if err != nil { + log.Debug().Err(err).Msgf("Error getting last reorg checked block number, using configured: %s", bn) + return bn + } + if storedFromBlock.Sign() <= 0 { + log.Debug().Msgf("Last reorg checked block number not found, using configured: %s", bn) + return bn + } + log.Debug().Msgf("Last reorg checked block number found, using: %s", storedFromBlock) + return storedFromBlock + } + log.Debug().Msgf("Force from block reorg check flag set, using configured: %s", bn) + return bn +} + +func (rh *ReorgHandler) Start() { + interval := time.Duration(rh.triggerInterval) * time.Millisecond + ticker := time.NewTicker(interval) + + log.Debug().Msgf("Reorg handler running") + go func() { + for range ticker.C { + lookbackFrom := new(big.Int).Add(rh.lastCheckedBlock, big.NewInt(int64(rh.blocksPerScan))) + blockHeaders, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.ChainID, rh.blocksPerScan, lookbackFrom) + if err != nil { + log.Error().Err(err).Msg("Error getting recent block headers") + continue + } + if len(blockHeaders) == 0 { + log.Warn().Msg("No block headers found") + continue + } + mostRecentBlockHeader := blockHeaders[0] + reorgEndIndex := findReorgEndIndex(blockHeaders) + if reorgEndIndex == -1 { + rh.lastCheckedBlock = mostRecentBlockHeader.Number + rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.ChainID, mostRecentBlockHeader.Number) + metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64())) + continue + } + metrics.ReorgCounter.Inc() + forkPoint, err := rh.findForkPoint(blockHeaders[reorgEndIndex:]) + if err != nil { + log.Error().Err(err).Msg("Error while finding fork point") + continue + } + err = rh.handleReorg(forkPoint, lookbackFrom) + if err != nil { + log.Error().Err(err).Msg("Error while handling reorg") + continue + } + rh.lastCheckedBlock = mostRecentBlockHeader.Number + rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.ChainID, mostRecentBlockHeader.Number) + metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64())) + } + }() + + // Keep the program running (otherwise it will exit) + select {} +} + +func findReorgEndIndex(reversedBlockHeaders []common.BlockHeader) (index int) { + for i := 0; i < len(reversedBlockHeaders)-1; i++ { + currentBlock := reversedBlockHeaders[i] + previousBlock := reversedBlockHeaders[i+1] + + if currentBlock.ParentHash != previousBlock.Hash { + log.Debug(). + Str("currentBlockNumber", currentBlock.Number.String()). + Str("currentBlockHash", currentBlock.Hash). + Str("currentBlockParentHash", currentBlock.ParentHash). + Str("previousBlockNumber", previousBlock.Number.String()). + Str("previousBlockHash", previousBlock.Hash). + Msg("Reorg detected: parent hash mismatch") + return i + } + } + return -1 +} + +func (rh *ReorgHandler) findForkPoint(reversedBlockHeaders []common.BlockHeader) (forkPoint *big.Int, err error) { + newBlocksByNumber, err := rh.getNewBlocksByNumber(reversedBlockHeaders) + if err != nil { + return nil, err + } + + for i := 0; i < len(reversedBlockHeaders)-1; i++ { + blockHeader := reversedBlockHeaders[i] + block, ok := (*newBlocksByNumber)[blockHeader.Number.String()] + if !ok { + return nil, fmt.Errorf("block not found: %s", blockHeader.Number.String()) + } + if block.Hash == blockHeader.Hash { + previousBlock := reversedBlockHeaders[i+1] + return previousBlock.Number, nil + } + } + lookbackFrom := reversedBlockHeaders[len(reversedBlockHeaders)-1].Number + nextHeadersBatch, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.ChainID, rh.blocksPerScan, lookbackFrom) + if err != nil { + return nil, fmt.Errorf("error getting next headers batch: %w", err) + } + return rh.findForkPoint(nextHeadersBatch) +} + +func (rh *ReorgHandler) getNewBlocksByNumber(reversedBlockHeaders []common.BlockHeader) (*map[string]common.Block, error) { + blockNumbers := make([]*big.Int, 0, len(reversedBlockHeaders)) + for _, header := range reversedBlockHeaders { + blockNumbers = append(blockNumbers, header.Number) + } + blockResults := rh.rpc.GetBlocks(blockNumbers) + fetchedBlocksByNumber := make(map[string]common.Block) + for _, blockResult := range blockResults { + if blockResult.Error != nil { + return nil, fmt.Errorf("error fetching block %s: %w", blockResult.BlockNumber.String(), blockResult.Error) + } + fetchedBlocksByNumber[blockResult.BlockNumber.String()] = blockResult.Data + } + return &fetchedBlocksByNumber, nil +} + +func (rh *ReorgHandler) handleReorg(reorgStart *big.Int, reorgEnd *big.Int) error { + blockRange := make([]*big.Int, 0, new(big.Int).Sub(reorgEnd, reorgStart).Int64()) + for i := new(big.Int).Set(reorgStart); i.Cmp(reorgEnd) <= 0; i.Add(i, big.NewInt(1)) { + blockRange = append(blockRange, new(big.Int).Set(i)) + } + + results := rh.worker.Run(blockRange) + data := make([]common.BlockData, 0, len(results)) + for _, result := range results { + if result.Error != nil { + return fmt.Errorf("cannot fix reorg: failed block %s: %w", result.BlockNumber.String(), result.Error) + } + data = append(data, common.BlockData{ + Block: result.Data.Block, + Logs: result.Data.Logs, + Transactions: result.Data.Transactions, + Traces: result.Data.Traces, + }) + } + // TODO make delete and insert atomic + if err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.ChainID, blockRange); err != nil { + return fmt.Errorf("error deleting data for blocks %v: %w", blockRange, err) + } + if err := rh.storage.MainStorage.InsertBlockData(&data); err != nil { + return fmt.Errorf("error saving data to main storage: %w", err) + } + return nil +}