Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPCDaemon and Erigon: don't open files at startup if download is not finished (StageSnapshots == 0) #12332

Merged
merged 33 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 30 additions & 17 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/erigontech/erigon-lib/common/hexutility"
"github.com/erigontech/erigon-lib/config3"
"github.com/erigontech/erigon-lib/direct"
"github.com/erigontech/erigon-lib/downloader/downloaderrawdb"
"github.com/erigontech/erigon-lib/gointerfaces"
"github.com/erigontech/erigon-lib/gointerfaces/grpcutil"
remote "github.com/erigontech/erigon-lib/gointerfaces/remoteproto"
Expand Down Expand Up @@ -404,33 +405,46 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, errors.New("chain config not found in db. Need start erigon at least once on this db")
}

allSegmentsDownloadComplete, err := downloaderrawdb.AllSegmentsDownloadCompleteFlag(cfg.Dirs)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err
}
if !allSegmentsDownloadComplete {
log.Warn("[rpc] download of segments not complete yet (need wait, then RPC will work)")
}

// Configure sapshots
allSnapshots = freezeblocks.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger)
allBorSnapshots = freezeblocks.NewBorRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger)
// To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down
// Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection
allSnapshots.OptimisticalyReopenFolder()
allBorSnapshots.OptimisticalyReopenFolder()
allSnapshots.LogStat("remote")
allBorSnapshots.LogStat("bor:remote")
blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots)
txNumsReader := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, blockReader))

agg, err := libstate.NewAggregator(ctx, cfg.Dirs, config3.HistoryV3AggregationStep, db, logger)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, fmt.Errorf("create aggregator: %w", err)
}
_ = agg.OpenFolder() //TODO: must use analog of `OptimisticReopenWithDB`

db.View(context.Background(), func(tx kv.Tx) error {
aggTx := agg.BeginFilesRo()
defer aggTx.Close()
aggTx.LogStats(tx, func(endTxNumMinimax uint64) (uint64, error) {
_, histBlockNumProgress, err := txNumsReader.FindBlockNum(tx, endTxNumMinimax)
return histBlockNumProgress, err

// To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down
// Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection
if allSegmentsDownloadComplete {
allSnapshots.OptimisticalyReopenFolder()
allBorSnapshots.OptimisticalyReopenFolder()

allSnapshots.LogStat("remote")
allBorSnapshots.LogStat("bor:remote")
_ = agg.OpenFolder() //TODO: must use analog of `OptimisticReopenWithDB`

db.View(context.Background(), func(tx kv.Tx) error {
aggTx := agg.BeginFilesRo()
defer aggTx.Close()
aggTx.LogStats(tx, func(endTxNumMinimax uint64) (uint64, error) {
_, histBlockNumProgress, err := txNumsReader.FindBlockNum(tx, endTxNumMinimax)
return histBlockNumProgress, err
})
return nil
})
return nil
})
}

onNewSnapshot = func() {
go func() { // don't block events processing by network communication
reply, err := remoteKvClient.Snapshots(ctx, &remote.SnapshotsRequest{}, grpc.WaitForReady(true))
Expand All @@ -449,7 +463,6 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
allBorSnapshots.LogStat("bor:reopen")
}

//if err = agg.openList(reply.HistoryFiles, true); err != nil {
if err = agg.OpenFolder(); err != nil {
logger.Error("[snapshots] reopen", "err", err)
} else {
Expand Down
63 changes: 24 additions & 39 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"math"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/types/infohash"
"github.com/c2h5oh/datasize"
"github.com/erigontech/erigon-lib/downloader/downloaderrawdb"
"github.com/tidwall/btree"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -725,23 +725,10 @@ func localHashBytes(ctx context.Context, fileInfo snaptype.FileInfo, db kv.RoDB,

if db != nil {
err := db.View(ctx, func(tx kv.Tx) (err error) {
infoBytes, err := tx.GetOne(kv.BittorrentInfo, []byte(fileInfo.Name()))

hashBytes, err = downloaderrawdb.ReadTorrentInfoHash(tx, fileInfo.Name())
if err != nil {
return err
}

if len(infoBytes) == 20 {
hashBytes = infoBytes
return nil
}

var info torrentInfo

if err = json.Unmarshal(infoBytes, &info); err == nil {
hashBytes = info.Hash
}

return nil
})

Expand Down Expand Up @@ -1503,18 +1490,14 @@ func logSeedHashMismatches(torrentHash infohash.T, name string, seedHashMismatch
}
}

func (d *Downloader) checkComplete(name string) (bool, int64, *time.Time) {
if info, err := d.torrentInfo(name); err == nil {
if info.Completed != nil && info.Completed.Before(time.Now()) {
if info.Length != nil {
if fi, err := os.Stat(filepath.Join(d.SnapDir(), name)); err == nil {
return fi.Size() == *info.Length && fi.ModTime().Equal(*info.Completed), *info.Length, info.Completed
}
}
}
func (d *Downloader) checkComplete(name string) (complete bool, fileLen int64, completedAt *time.Time) {
if err := d.db.View(d.ctx, func(tx kv.Tx) error {
complete, fileLen, completedAt = downloaderrawdb.CheckFileComplete(tx, name, d.SnapDir())
return nil
}); err != nil {
return false, 0, nil
}

return false, 0, nil
return
}

func (d *Downloader) getWebDownloadInfo(t *torrent.Torrent) (webDownloadInfo, []*seedHash, error) {
Expand Down Expand Up @@ -1934,28 +1917,19 @@ func availableTorrents(ctx context.Context, pending []*torrent.Torrent, download

func (d *Downloader) SnapDir() string { return d.cfg.Dirs.Snap }

func (d *Downloader) torrentInfo(name string) (*torrentInfo, error) {
var info torrentInfo

func (d *Downloader) torrentInfo(name string) (*downloaderrawdb.TorrentInfo, error) {
var info *downloaderrawdb.TorrentInfo
err := d.db.View(d.ctx, func(tx kv.Tx) (err error) {
infoBytes, err := tx.GetOne(kv.BittorrentInfo, []byte(name))

info, err = downloaderrawdb.ReadTorrentInfo(tx, name)
if err != nil {
return err
}

if err = json.Unmarshal(infoBytes, &info); err != nil {
return err
}

return nil
})

if err != nil {
return nil, err
}

return &info, nil
return info, nil
}

func (d *Downloader) ReCalcStats(interval time.Duration) {
Expand Down Expand Up @@ -2319,6 +2293,11 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {

d.lock.Unlock()

if stats.Completed {
logger.Warn("[dbg] completed", "completed", stats.Completed, "len", len(torrents))
d.saveAllCompleteFlag()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There're two steps of downloading: 1. header chain segments(headers and bodies) 2. other segments(transactions...)
stats.Completed is true after the first step completed, but will be re-written to false once step2 starts.

should we wait all types of segments complete and then set the flag?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some logs locally and here's logs test on sepolia:

Stats.Completed false
[INFO] [10-17|23:03:55.301] [1/6 OtterSync] Downloading              progress="(228/228 files) 99.71% - 2.7GB/2.7GB" rate=5.9MB/s time-left=0hrs:0m total-time=1m40s download-rate=5.9MB/s completion-rate=5.9MB/s alloc=552.7MB sys=653.8MB
Stats.Completed true
header and body snapshots downloaded completely
other snapshots downloading started
[INFO] [10-17|23:03:55.810] [snapshots] no metadata yet              files=90 list=domain/v1-accounts.0-128.bt,domain/v1-receipt.200-202.kv,v1-006710-006720-transactions.idx,v1-006730-006740-transactions.seg,domain/v1-accounts.192-200.bt,...
Stats.Completed false

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And i'm a little confused that the flag seems to work the same way as prohibit_new_downloads.lock file, cause the file is written in right after specific types of segments complete downloading and stats.Completed turns true. However you said In the past i used prohibit_new_downloads.lock file for this - but now it created before download completion (and it’s fine).

If the prohibit_new_downloads file cannot mark the completion of download, then the flag cannot too.

If above is right, we can still use the way used by previous version of the PR , this function should work:
e729c1f#diff-f98a3c65d957c3d92bee0d12aeed5ec1ee902de014f1cda287f430808ea1aa2eR63

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You right. Maybe i must use “StageSnapshots progress > 0”

}

if !stats.Completed {
logger.Debug("[snapshots] downloading",
"len", len(torrents),
Expand Down Expand Up @@ -2959,6 +2938,12 @@ func (d *Downloader) Completed() bool {
return d.stats.Completed
}

func (d *Downloader) saveAllCompleteFlag() {
if err := d.db.Update(d.ctx, downloaderrawdb.WriteAllCompleteFlag); err != nil {
d.logger.Debug("[snapshots] Can't update 'all complete' flag", "err", err)
}
}

// Store completed torrents in order to notify GrpcServer subscribers when they subscribe and there is already downloaded files
func (d *Downloader) torrentCompleted(tName string, tHash metainfo.Hash) {
d.lock.Lock()
Expand Down
40 changes: 27 additions & 13 deletions erigon-lib/downloader/downloadercfg/downloadercfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,36 +219,50 @@ func New(dirs datadir.Dirs, version string, verbosity lg.Level, downloadRate, up
}

// setup snapcfg
if err := loadSnapshotsEitherFromDiskIfNeeded(dirs, chainName); err != nil {
preverifiedCfg, err := loadSnapshotsEitherFromDiskIfNeeded(dirs, chainName)
if err != nil {
return nil, err
}

return &Cfg{Dirs: dirs, ChainName: chainName,
ClientConfig: torrentConfig, DownloadSlots: downloadSlots,
WebSeedUrls: webseedHttpProviders, WebSeedFileProviders: webseedFileProviders,
DownloadTorrentFilesFromWebseed: true, AddTorrentsFromDisk: true, SnapshotLock: lockSnapshots,
SnapshotConfig: snapcfg.KnownCfg(chainName),
SnapshotConfig: preverifiedCfg,
MdbxWriteMap: mdbxWriteMap,
}, nil
}

func loadSnapshotsEitherFromDiskIfNeeded(dirs datadir.Dirs, chainName string) error {
func ReadPreverifiedToml(dirs datadir.Dirs, chainName string) *snapcfg.Cfg {
preverifiedToml := filepath.Join(dirs.Snap, "preverified.toml")

exists, err := dir.FileExist(preverifiedToml)
if err != nil {
return err
panic(err)
return nil
}
if exists {
// Read the preverified.toml and load the snapshots
haveToml, err := os.ReadFile(preverifiedToml)
if err != nil {
return err
}
snapcfg.SetToml(chainName, haveToml)
if !exists {
return nil
}
return dir.WriteFileWithFsync(preverifiedToml, snapcfg.GetToml(chainName), 0644)
// Read the preverified.toml and load the snapshots
haveToml, err := os.ReadFile(preverifiedToml)
if err != nil {
panic(err)
return nil
}
snapcfg.SetToml(chainName, haveToml)
return snapcfg.KnownCfg(chainName)
}

func loadSnapshotsEitherFromDiskIfNeeded(dirs datadir.Dirs, chainName string) (*snapcfg.Cfg, error) {
if cfg := ReadPreverifiedToml(dirs, chainName); cfg != nil {
return cfg, nil
}
preverifiedToml := filepath.Join(dirs.Snap, "preverified.toml")
if err := dir.WriteFileWithFsync(preverifiedToml, snapcfg.GetToml(chainName), 0644); err != nil {
panic(err)
return nil, err
}
return snapcfg.KnownCfg(preverifiedToml), nil
}

func getIpv6Enabled() bool {
Expand Down
113 changes: 113 additions & 0 deletions erigon-lib/downloader/downloaderrawdb/accessors_downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package downloaderrawdb

import (
"context"
"encoding/json"
"os"
"path/filepath"
"time"

"github.com/erigontech/erigon-lib/common/datadir"
"github.com/erigontech/erigon-lib/kv"
kv2 "github.com/erigontech/erigon-lib/kv/mdbx"
"github.com/erigontech/erigon-lib/log/v3"
"golang.org/x/sync/semaphore"
)

type TorrentInfo struct {
Name string `json:"name"`
Hash []byte `json:"hash"`
Length *int64 `json:"length,omitempty"`
Created *time.Time `json:"created,omitempty"`
Completed *time.Time `json:"completed,omitempty"`
}

func ReadTorrentInfo(downloaderDBTx kv.Tx, name string) (*TorrentInfo, error) {
var info TorrentInfo
infoBytes, err := downloaderDBTx.GetOne(kv.BittorrentInfo, []byte(name))
if err != nil {
return nil, err
}
if len(infoBytes) == 0 {
return &info, nil
}
if err = json.Unmarshal(infoBytes, &info); err != nil {
return nil, err
}
return &info, nil
}

func ReadTorrentInfoHash(downloaderDBTx kv.Tx, name string) (hashBytes []byte, err error) {
infoBytes, err := downloaderDBTx.GetOne(kv.BittorrentInfo, []byte(name))
if err != nil {
return nil, err
}

if len(infoBytes) == 20 {
return infoBytes, nil
}

var info TorrentInfo
if err = json.Unmarshal(infoBytes, &info); err == nil {
return info.Hash, nil
}
return nil, nil
}

func WriteTorrentInfo(tx kv.RwTx, info *TorrentInfo) error {
infoBytes, err := json.Marshal(info)
if err != nil {
return err
}
return tx.Put(kv.BittorrentInfo, []byte(info.Name), infoBytes)
}

func CheckFileComplete(tx kv.Tx, name string, snapDir string) (bool, int64, *time.Time) {
info, err := ReadTorrentInfo(tx, name)
if err != nil {
return false, 0, nil
}
if info.Completed != nil && info.Completed.Before(time.Now()) {
if info.Length != nil {
if fi, err := os.Stat(filepath.Join(snapDir, name)); err == nil {
return fi.Size() == *info.Length && fi.ModTime().Equal(*info.Completed), *info.Length, info.Completed
}
}
}
return false, 0, nil
}

func AllSegmentsDownloadCompleteFlag(dirs datadir.Dirs) (allFilesDownloadComplete bool, err error) {
limiter := semaphore.NewWeighted(9_000)
downloaderDB, err := kv2.NewMDBX(log.Root()).Label(kv.DownloaderDB).WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg {
return kv.TablesCfgByLabel(kv.DownloaderDB)
}).RoTxsLimiter(limiter).Path(dirs.Downloader).Accede().Open(context.Background())
if err != nil {
return false, err
}
defer downloaderDB.Close()

if err := downloaderDB.View(context.Background(), func(tx kv.Tx) error {
allFilesDownloadComplete, err = ReadAllCompleteFlag(tx)
return err
}); err != nil {
return false, err
}
return allFilesDownloadComplete, nil
}

var AllCompleteFlagKey = []byte("all_complete")

func WriteAllCompleteFlag(tx kv.RwTx) error {
return tx.Put(kv.BittorrentInfo, AllCompleteFlagKey, []byte{1})
}
func ReadAllCompleteFlag(tx kv.Tx) (bool, error) {
v, err := tx.GetOne(kv.BittorrentInfo, AllCompleteFlagKey)
if err != nil {
return false, err
}
if len(v) == 0 {
return false, nil
}
return v[0] == 1, nil
}
Loading
Loading