diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index f886a58e5ec..398772ecfac 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -18,6 +18,7 @@ import ( "github.com/ledgerwatch/erigon-lib/kv/temporal" "github.com/ledgerwatch/log/v3" "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "google.golang.org/grpc" grpcHealth "google.golang.org/grpc/health" @@ -386,31 +387,43 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger // Configure sapshots allSnapshots = freezeblocks.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger) allBorSnapshots = freezeblocks.NewBorRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger) - // To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down - // Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection - allSnapshots.OptimisticReopenWithDB(db) - allBorSnapshots.OptimisticalyReopenWithDB(db) - allSnapshots.LogStat("remote") - allBorSnapshots.LogStat("bor:remote") - if agg, err = libstate.NewAggregator(ctx, cfg.Dirs.SnapHistory, cfg.Dirs.Tmp, config3.HistoryV3AggregationStep, db, logger); err != nil { return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err) } - _ = agg.OpenFolder() - db.View(context.Background(), func(tx kv.Tx) error { - agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 { - _, histBlockNumProgress, _ := rawdbv3.TxNums.FindBlockNum(tx, endTxNumMinimax) - return histBlockNumProgress + // To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down + // Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection + allSegmentsDownloadComplete, err := rawdb.AllSegmentsDownloadCompleteFromDB(db) + if err != nil { + return nil, nil, nil, nil, nil, nil, nil, ff, nil, err + } + if allSegmentsDownloadComplete { + allSnapshots.OptimisticReopenWithDB(db) + allBorSnapshots.OptimisticalyReopenWithDB(db) + allSnapshots.LogStat("remote") + allBorSnapshots.LogStat("bor:remote") + + _ = agg.OpenFolder() + + db.View(context.Background(), func(tx kv.Tx) error { + agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 { + _, histBlockNumProgress, _ := rawdbv3.TxNums.FindBlockNum(tx, endTxNumMinimax) + return histBlockNumProgress + }) + return nil }) - return nil - }) + } else { + logger.Debug("[rpc] download of segments not complete yet. please wait StageSnapshots to finish") + } + + wg := errgroup.Group{} + wg.SetLimit(1) onNewSnapshot = func() { - go func() { // don't block events processing by network communication + wg.Go(func() error { // don't block events processing by network communication reply, err := remoteKvClient.Snapshots(ctx, &remote.SnapshotsRequest{}, grpc.WaitForReady(true)) if err != nil { logger.Warn("[snapshots] reopen", "err", err) - return + return nil } if err := allSnapshots.ReopenList(reply.BlocksFiles, true); err != nil { logger.Error("[snapshots] reopen", "err", err) @@ -436,7 +449,8 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger return nil }) } - }() + return nil + }) } onNewSnapshot() blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots) diff --git a/cmd/rpcdaemon/main.go b/cmd/rpcdaemon/main.go index ad5a372ab90..ff75ed802c4 100644 --- a/cmd/rpcdaemon/main.go +++ b/cmd/rpcdaemon/main.go @@ -22,7 +22,7 @@ func main() { rootCtx, rootCancel := common.RootContext() cmd.RunE = func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() - logger := debug.SetupCobra(cmd, "sentry") + logger := debug.SetupCobra(cmd, "rpcdaemon") db, backend, txPool, mining, stateCache, blockReader, engine, ff, agg, err := cli.RemoteServices(ctx, cfg, logger, rootCancel) if err != nil { if !errors.Is(err, context.Canceled) { diff --git a/core/rawdb/accessors_metadata.go b/core/rawdb/accessors_metadata.go index 73292258960..c3b0500228b 100644 --- a/core/rawdb/accessors_metadata.go +++ b/core/rawdb/accessors_metadata.go @@ -17,8 +17,11 @@ package rawdb import ( + "context" "encoding/json" "fmt" + + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/polygon/bor/borcfg" "github.com/ledgerwatch/erigon-lib/chain" @@ -81,3 +84,15 @@ func WriteChainConfig(db kv.Putter, hash libcommon.Hash, cfg *chain.Config) erro func DeleteChainConfig(db kv.Deleter, hash libcommon.Hash) error { return db.Delete(kv.ConfigTable, hash[:]) } + +func AllSegmentsDownloadComplete(tx kv.Getter) (allSegmentsDownloadComplete bool, err error) { + snapshotsStageProgress, err := stages.GetStageProgress(tx, stages.Snapshots) + return snapshotsStageProgress > 0, err +} +func AllSegmentsDownloadCompleteFromDB(db kv.RoDB) (allSegmentsDownloadComplete bool, err error) { + err = db.View(context.Background(), func(tx kv.Tx) error { + allSegmentsDownloadComplete, err = AllSegmentsDownloadComplete(tx) + return err + }) + return allSegmentsDownloadComplete, err +} diff --git a/eth/backend.go b/eth/backend.go index 4317397f635..355b4346297 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -34,16 +34,9 @@ import ( "sync/atomic" "time" - "github.com/ledgerwatch/erigon-lib/common/dir" - "github.com/ledgerwatch/erigon-lib/common/disk" - "github.com/ledgerwatch/erigon-lib/common/mem" - "github.com/ledgerwatch/erigon-lib/diagnostics" - "github.com/erigontech/mdbx-go/mdbx" lru "github.com/hashicorp/golang-lru/arc/v2" "github.com/holiman/uint256" - "github.com/ledgerwatch/erigon-lib/config3" - "github.com/ledgerwatch/erigon-lib/kv/temporal" "github.com/ledgerwatch/log/v3" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -54,6 +47,11 @@ import ( "github.com/ledgerwatch/erigon-lib/chain/snapcfg" libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/datadir" + "github.com/ledgerwatch/erigon-lib/common/dir" + "github.com/ledgerwatch/erigon-lib/common/disk" + "github.com/ledgerwatch/erigon-lib/common/mem" + "github.com/ledgerwatch/erigon-lib/config3" + "github.com/ledgerwatch/erigon-lib/diagnostics" "github.com/ledgerwatch/erigon-lib/direct" "github.com/ledgerwatch/erigon-lib/downloader" "github.com/ledgerwatch/erigon-lib/downloader/downloadercfg" @@ -70,6 +68,7 @@ import ( "github.com/ledgerwatch/erigon-lib/kv/kvcache" "github.com/ledgerwatch/erigon-lib/kv/kvcfg" "github.com/ledgerwatch/erigon-lib/kv/remotedbserver" + "github.com/ledgerwatch/erigon-lib/kv/temporal" libstate "github.com/ledgerwatch/erigon-lib/state" "github.com/ledgerwatch/erigon-lib/txpool" "github.com/ledgerwatch/erigon-lib/txpool/txpoolcfg" @@ -1373,18 +1372,6 @@ func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConf allBorSnapshots = freezeblocks.NewBorRoSnapshots(snConfig.Snapshot, dirs.Snap, minFrozenBlock, logger) } - var err error - if snConfig.Snapshot.NoDownloader { - allSnapshots.ReopenFolder() - if isBor { - allBorSnapshots.ReopenFolder() - } - } else { - allSnapshots.OptimisticalyReopenWithDB(db) - if isBor { - allBorSnapshots.OptimisticalyReopenWithDB(db) - } - } blockReader := freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots) blockWriter := blockio.NewBlockWriter(histV3) @@ -1392,9 +1379,30 @@ func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConf if err != nil { return nil, nil, nil, nil, nil, err } - if err = agg.OpenFolder(); err != nil { + + allSegmentsDownloadComplete, err := rawdb.AllSegmentsDownloadCompleteFromDB(db) + if err != nil { return nil, nil, nil, nil, nil, err } + if allSegmentsDownloadComplete { + if snConfig.Snapshot.NoDownloader { + allSnapshots.ReopenFolder() + if isBor { + allBorSnapshots.ReopenFolder() + } + } else { + allSnapshots.OptimisticalyReopenWithDB(db) + if isBor { + allBorSnapshots.OptimisticalyReopenWithDB(db) + } + } + if err = agg.OpenFolder(); err != nil { + return nil, nil, nil, nil, nil, err + } + } else { + logger.Debug("[rpc] download of segments not complete yet. please wait StageSnapshots to finish") + } + return blockReader, blockWriter, allSnapshots, allBorSnapshots, agg, nil }