Skip to content

Commit

Permalink
RPCDaemon: don't open uncomplete files at startup (#12332)
Browse files Browse the repository at this point in the history
Probably right way is: check `StageSnapshots > 0` as "all downloads
done". Because:
- --prune.mode=minimal has not much files, but `preverified.toml` stays
the same
- downloader has logic to of prohibit_new_downloads.lock - downloading
consider doen't even if `preverified.toml` has more files
- small files may get merged - while `preverified.toml` stays the same
- developer may generate new version of file (hash or len changed) -
while `preverified.toml` stays the same
- Downloader's `Complete` flag can't be used - because e3 does download
headers and bodies first, then Complete=true happens, and then we
download transactions (Complete=false happens).
  • Loading branch information
AskAlexSharov authored Oct 18, 2024
1 parent ea2ca63 commit 200c85b
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 152 deletions.
53 changes: 33 additions & 20 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"time"

"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
grpcHealth "google.golang.org/grpc/health"
Expand Down Expand Up @@ -408,36 +409,48 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
// Configure sapshots
allSnapshots = freezeblocks.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger)
allBorSnapshots = freezeblocks.NewBorRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger)
// To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down
// Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection
allSnapshots.OptimisticalyReopenFolder()
allBorSnapshots.OptimisticalyReopenFolder()
allSnapshots.LogStat("remote")
allBorSnapshots.LogStat("bor:remote")
blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots)
txNumsReader := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, blockReader))

agg, err := libstate.NewAggregator(ctx, cfg.Dirs, config3.HistoryV3AggregationStep, db, logger)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, fmt.Errorf("create aggregator: %w", err)
}
_ = agg.OpenFolder() //TODO: must use analog of `OptimisticReopenWithDB`

db.View(context.Background(), func(tx kv.Tx) error {
aggTx := agg.BeginFilesRo()
defer aggTx.Close()
aggTx.LogStats(tx, func(endTxNumMinimax uint64) (uint64, error) {
_, histBlockNumProgress, err := txNumsReader.FindBlockNum(tx, endTxNumMinimax)
return histBlockNumProgress, err
// To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down
// Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection
allSegmentsDownloadComplete, err := rawdb.AllSegmentsDownloadCompleteFromDB(rwKv)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err
}
if allSegmentsDownloadComplete {
allSnapshots.OptimisticalyReopenFolder()
allBorSnapshots.OptimisticalyReopenFolder()

allSnapshots.LogStat("remote")
allBorSnapshots.LogStat("bor:remote")
_ = agg.OpenFolder() //TODO: must use analog of `OptimisticReopenWithDB`

db.View(context.Background(), func(tx kv.Tx) error {
aggTx := agg.BeginFilesRo()
defer aggTx.Close()
aggTx.LogStats(tx, func(endTxNumMinimax uint64) (uint64, error) {
_, histBlockNumProgress, err := txNumsReader.FindBlockNum(tx, endTxNumMinimax)
return histBlockNumProgress, err
})
return nil
})
return nil
})
} else {
log.Warn("[rpc] download of segments not complete yet (need wait, then RPC will work)")
}

wg := errgroup.Group{}
wg.SetLimit(1)
onNewSnapshot = func() {
go func() { // don't block events processing by network communication
wg.Go(func() error { // don't block events processing by network communication
reply, err := remoteKvClient.Snapshots(ctx, &remote.SnapshotsRequest{}, grpc.WaitForReady(true))
if err != nil {
logger.Warn("[snapshots] reopen", "err", err)
return
return nil
}
if err := allSnapshots.ReopenList(reply.BlocksFiles, true); err != nil {
logger.Error("[snapshots] reopen", "err", err)
Expand All @@ -450,7 +463,6 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
allBorSnapshots.LogStat("bor:reopen")
}

//if err = agg.openList(reply.HistoryFiles, true); err != nil {
if err = agg.OpenFolder(); err != nil {
logger.Error("[snapshots] reopen", "err", err)
} else {
Expand All @@ -464,7 +476,8 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
return nil
})
}
}()
return nil
})
}
onNewSnapshot()

Expand Down
4 changes: 1 addition & 3 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,14 @@ import (

"github.com/gballet/go-verkle"

"github.com/erigontech/erigon-lib/log/v3"

"github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/common/dbg"
"github.com/erigontech/erigon-lib/common/hexutility"
"github.com/erigontech/erigon-lib/common/length"
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/kv/dbutils"
"github.com/erigontech/erigon-lib/kv/rawdbv3"

"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/core/types"
"github.com/erigontech/erigon/ethdb/cbor"
"github.com/erigontech/erigon/rlp"
Expand Down
19 changes: 14 additions & 5 deletions core/rawdb/accessors_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
package rawdb

import (
"context"
"encoding/json"
"fmt"

"github.com/erigontech/erigon/core/types"
"github.com/erigontech/erigon/eth/stagedsync/stages"
"github.com/erigontech/erigon/polygon/bor/borcfg"

"github.com/erigontech/erigon-lib/chain"
Expand Down Expand Up @@ -81,11 +83,6 @@ func WriteChainConfig(db kv.Putter, hash libcommon.Hash, cfg *chain.Config) erro
return nil
}

// DeleteChainConfig retrieves the consensus settings based on the given genesis hash.
func DeleteChainConfig(db kv.Deleter, hash libcommon.Hash) error {
return db.Delete(kv.ConfigTable, hash[:])
}

func WriteGenesisIfNotExist(db kv.RwTx, g *types.Genesis) error {
has, err := db.Has(kv.ConfigTable, kv.GenesisKey)
if err != nil {
Expand Down Expand Up @@ -117,3 +114,15 @@ func ReadGenesis(db kv.Getter) (*types.Genesis, error) {
}
return &g, nil
}

func AllSegmentsDownloadComplete(tx kv.Getter) (allSegmentsDownloadComplete bool, err error) {
snapshotsStageProgress, err := stages.GetStageProgress(tx, stages.Snapshots)
return snapshotsStageProgress > 0, err
}
func AllSegmentsDownloadCompleteFromDB(db kv.RoDB) (allSegmentsDownloadComplete bool, err error) {
err = db.View(context.Background(), func(tx kv.Tx) error {
allSegmentsDownloadComplete, err = AllSegmentsDownloadComplete(tx)
return err
})
return allSegmentsDownloadComplete, err
}
52 changes: 13 additions & 39 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -58,6 +57,7 @@ import (
"github.com/erigontech/erigon-lib/common/dir"
"github.com/erigontech/erigon-lib/diagnostics"
"github.com/erigontech/erigon-lib/downloader/downloadercfg"
"github.com/erigontech/erigon-lib/downloader/downloaderrawdb"
"github.com/erigontech/erigon-lib/downloader/snaptype"
prototypes "github.com/erigontech/erigon-lib/gointerfaces/typesproto"
"github.com/erigontech/erigon-lib/kv"
Expand Down Expand Up @@ -725,23 +725,10 @@ func localHashBytes(ctx context.Context, fileInfo snaptype.FileInfo, db kv.RoDB,

if db != nil {
err := db.View(ctx, func(tx kv.Tx) (err error) {
infoBytes, err := tx.GetOne(kv.BittorrentInfo, []byte(fileInfo.Name()))

hashBytes, err = downloaderrawdb.ReadTorrentInfoHash(tx, fileInfo.Name())
if err != nil {
return err
}

if len(infoBytes) == 20 {
hashBytes = infoBytes
return nil
}

var info torrentInfo

if err = json.Unmarshal(infoBytes, &info); err == nil {
hashBytes = info.Hash
}

return nil
})

Expand Down Expand Up @@ -1503,18 +1490,14 @@ func logSeedHashMismatches(torrentHash infohash.T, name string, seedHashMismatch
}
}

func (d *Downloader) checkComplete(name string) (bool, int64, *time.Time) {
if info, err := d.torrentInfo(name); err == nil {
if info.Completed != nil && info.Completed.Before(time.Now()) {
if info.Length != nil {
if fi, err := os.Stat(filepath.Join(d.SnapDir(), name)); err == nil {
return fi.Size() == *info.Length && fi.ModTime().Equal(*info.Completed), *info.Length, info.Completed
}
}
}
func (d *Downloader) checkComplete(name string) (complete bool, fileLen int64, completedAt *time.Time) {
if err := d.db.View(d.ctx, func(tx kv.Tx) error {
complete, fileLen, completedAt = downloaderrawdb.CheckFileComplete(tx, name, d.SnapDir())
return nil
}); err != nil {
return false, 0, nil
}

return false, 0, nil
return
}

func (d *Downloader) getWebDownloadInfo(t *torrent.Torrent) (webDownloadInfo, []*seedHash, error) {
Expand Down Expand Up @@ -1934,28 +1917,19 @@ func availableTorrents(ctx context.Context, pending []*torrent.Torrent, download

func (d *Downloader) SnapDir() string { return d.cfg.Dirs.Snap }

func (d *Downloader) torrentInfo(name string) (*torrentInfo, error) {
var info torrentInfo

func (d *Downloader) torrentInfo(name string) (*downloaderrawdb.TorrentInfo, error) {
var info *downloaderrawdb.TorrentInfo
err := d.db.View(d.ctx, func(tx kv.Tx) (err error) {
infoBytes, err := tx.GetOne(kv.BittorrentInfo, []byte(name))

info, err = downloaderrawdb.ReadTorrentInfo(tx, name)
if err != nil {
return err
}

if err = json.Unmarshal(infoBytes, &info); err != nil {
return err
}

return nil
})

if err != nil {
return nil, err
}

return &info, nil
return info, nil
}

func (d *Downloader) ReCalcStats(interval time.Duration) {
Expand Down
17 changes: 10 additions & 7 deletions erigon-lib/downloader/downloadercfg/downloadercfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,36 +219,39 @@ func New(dirs datadir.Dirs, version string, verbosity lg.Level, downloadRate, up
}

// setup snapcfg
if err := loadSnapshotsEitherFromDiskIfNeeded(dirs, chainName); err != nil {
preverifiedCfg, err := loadSnapshotsEitherFromDiskIfNeeded(dirs, chainName)
if err != nil {
return nil, err
}

return &Cfg{Dirs: dirs, ChainName: chainName,
ClientConfig: torrentConfig, DownloadSlots: downloadSlots,
WebSeedUrls: webseedHttpProviders, WebSeedFileProviders: webseedFileProviders,
DownloadTorrentFilesFromWebseed: true, AddTorrentsFromDisk: true, SnapshotLock: lockSnapshots,
SnapshotConfig: snapcfg.KnownCfg(chainName),
SnapshotConfig: preverifiedCfg,
MdbxWriteMap: mdbxWriteMap,
}, nil
}

func loadSnapshotsEitherFromDiskIfNeeded(dirs datadir.Dirs, chainName string) error {
func loadSnapshotsEitherFromDiskIfNeeded(dirs datadir.Dirs, chainName string) (*snapcfg.Cfg, error) {
preverifiedToml := filepath.Join(dirs.Snap, "preverified.toml")

exists, err := dir.FileExist(preverifiedToml)
if err != nil {
return err
return nil, err
}
if exists {
// Read the preverified.toml and load the snapshots
haveToml, err := os.ReadFile(preverifiedToml)
if err != nil {
return err
return nil, err
}
snapcfg.SetToml(chainName, haveToml)
return nil
}
return dir.WriteFileWithFsync(preverifiedToml, snapcfg.GetToml(chainName), 0644)
if err := dir.WriteFileWithFsync(preverifiedToml, snapcfg.GetToml(chainName), 0644); err != nil {
return nil, err
}
return snapcfg.KnownCfg(preverifiedToml), nil
}

func getIpv6Enabled() bool {
Expand Down
73 changes: 73 additions & 0 deletions erigon-lib/downloader/downloaderrawdb/accessors_downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package downloaderrawdb

import (
"encoding/json"
"os"
"path/filepath"
"time"

"github.com/erigontech/erigon-lib/kv"
)

type TorrentInfo struct {
Name string `json:"name"`
Hash []byte `json:"hash"`
Length *int64 `json:"length,omitempty"`
Created *time.Time `json:"created,omitempty"`
Completed *time.Time `json:"completed,omitempty"`
}

func ReadTorrentInfo(downloaderDBTx kv.Tx, name string) (*TorrentInfo, error) {
var info TorrentInfo
infoBytes, err := downloaderDBTx.GetOne(kv.BittorrentInfo, []byte(name))
if err != nil {
return nil, err
}
if len(infoBytes) == 0 {
return &info, nil
}
if err = json.Unmarshal(infoBytes, &info); err != nil {
return nil, err
}
return &info, nil
}

func ReadTorrentInfoHash(downloaderDBTx kv.Tx, name string) (hashBytes []byte, err error) {
infoBytes, err := downloaderDBTx.GetOne(kv.BittorrentInfo, []byte(name))
if err != nil {
return nil, err
}

if len(infoBytes) == 20 {
return infoBytes, nil
}

var info TorrentInfo
if err = json.Unmarshal(infoBytes, &info); err == nil {
return info.Hash, nil
}
return nil, nil
}

func WriteTorrentInfo(tx kv.RwTx, info *TorrentInfo) error {
infoBytes, err := json.Marshal(info)
if err != nil {
return err
}
return tx.Put(kv.BittorrentInfo, []byte(info.Name), infoBytes)
}

func CheckFileComplete(tx kv.Tx, name string, snapDir string) (bool, int64, *time.Time) {
info, err := ReadTorrentInfo(tx, name)
if err != nil {
return false, 0, nil
}
if info.Completed != nil && info.Completed.Before(time.Now()) {
if info.Length != nil {
if fi, err := os.Stat(filepath.Join(snapDir, name)); err == nil {
return fi.Size() == *info.Length && fi.ModTime().Equal(*info.Completed), *info.Length, info.Completed
}
}
}
return false, 0, nil
}
Loading

0 comments on commit 200c85b

Please sign in to comment.