Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPCDaemon and Erigon: don't open files at startup if download is not finished (StageSnapshots == 0) #12332

Merged
merged 33 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/erigontech/erigon-lib/chain"
"github.com/erigontech/erigon-lib/chain/snapcfg"
libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/common/datadir"
"github.com/erigontech/erigon-lib/common/hexutility"
"github.com/erigontech/erigon-lib/config3"
"github.com/erigontech/erigon-lib/direct"
"github.com/erigontech/erigon-lib/downloader/downloadercfg"
"github.com/erigontech/erigon-lib/downloader/downloaderrawdb"
"github.com/erigontech/erigon-lib/gointerfaces"
"github.com/erigontech/erigon-lib/gointerfaces/grpcutil"
remote "github.com/erigontech/erigon-lib/gointerfaces/remoteproto"
Expand Down Expand Up @@ -404,13 +407,30 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, errors.New("chain config not found in db. Need start erigon at least once on this db")
}

doOptimisticOpen := false
snapcfg.LoadRemotePreverified()
if preverifiedCfg := downloadercfg.ReadPreverifiedToml(cfg.Dirs, cc.ChainName); preverifiedCfg != nil {
allFilesDownloadComplete, lastUncomplete, err := downloaderrawdb.AllFilesComplete(preverifiedCfg, cfg.Dirs)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err
}
if !allFilesDownloadComplete {
log.Warn("[rpc] download of segments not complete yet (need wait, then RPC will work)", "example_uncomplete_file", lastUncomplete)
}
doOptimisticOpen = allFilesDownloadComplete
} else {
log.Warn("[rpc] download of segments not complete yet")
}

// Configure sapshots
allSnapshots = freezeblocks.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger)
allBorSnapshots = freezeblocks.NewBorRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger)
// To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down
// Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection
allSnapshots.OptimisticalyReopenFolder()
allBorSnapshots.OptimisticalyReopenFolder()
if doOptimisticOpen {
allSnapshots.OptimisticalyReopenFolder()
allBorSnapshots.OptimisticalyReopenFolder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the naming is OptimisticallReopenFolder, but the ReopenFolder function it called doesn't open file in optimistic way.

}
allSnapshots.LogStat("remote")
allBorSnapshots.LogStat("bor:remote")
blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots)
Expand All @@ -420,7 +440,9 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, fmt.Errorf("create aggregator: %w", err)
}
_ = agg.OpenFolder() //TODO: must use analog of `OptimisticReopenWithDB`
if doOptimisticOpen {
_ = agg.OpenFolder() //TODO: must use analog of `OptimisticReopenWithDB`
}

db.View(context.Background(), func(tx kv.Tx) error {
aggTx := agg.BeginFilesRo()
Expand Down
52 changes: 13 additions & 39 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"math"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/types/infohash"
"github.com/c2h5oh/datasize"
"github.com/erigontech/erigon-lib/downloader/downloaderrawdb"
"github.com/tidwall/btree"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -725,23 +725,10 @@ func localHashBytes(ctx context.Context, fileInfo snaptype.FileInfo, db kv.RoDB,

if db != nil {
err := db.View(ctx, func(tx kv.Tx) (err error) {
infoBytes, err := tx.GetOne(kv.BittorrentInfo, []byte(fileInfo.Name()))

hashBytes, err = downloaderrawdb.ReadTorrentInfoHash(tx, fileInfo.Name())
if err != nil {
return err
}

if len(infoBytes) == 20 {
hashBytes = infoBytes
return nil
}

var info torrentInfo

if err = json.Unmarshal(infoBytes, &info); err == nil {
hashBytes = info.Hash
}

return nil
})

Expand Down Expand Up @@ -1503,18 +1490,14 @@ func logSeedHashMismatches(torrentHash infohash.T, name string, seedHashMismatch
}
}

func (d *Downloader) checkComplete(name string) (bool, int64, *time.Time) {
if info, err := d.torrentInfo(name); err == nil {
if info.Completed != nil && info.Completed.Before(time.Now()) {
if info.Length != nil {
if fi, err := os.Stat(filepath.Join(d.SnapDir(), name)); err == nil {
return fi.Size() == *info.Length && fi.ModTime().Equal(*info.Completed), *info.Length, info.Completed
}
}
}
func (d *Downloader) checkComplete(name string) (complete bool, fileLen int64, completedAt *time.Time) {
if err := d.db.View(d.ctx, func(tx kv.Tx) error {
complete, fileLen, completedAt = downloaderrawdb.CheckFileComplete(tx, name, d.SnapDir())
return nil
}); err != nil {
return false, 0, nil
}

return false, 0, nil
return
}

func (d *Downloader) getWebDownloadInfo(t *torrent.Torrent) (webDownloadInfo, []*seedHash, error) {
Expand Down Expand Up @@ -1934,28 +1917,19 @@ func availableTorrents(ctx context.Context, pending []*torrent.Torrent, download

func (d *Downloader) SnapDir() string { return d.cfg.Dirs.Snap }

func (d *Downloader) torrentInfo(name string) (*torrentInfo, error) {
var info torrentInfo

func (d *Downloader) torrentInfo(name string) (*downloaderrawdb.TorrentInfo, error) {
var info *downloaderrawdb.TorrentInfo
err := d.db.View(d.ctx, func(tx kv.Tx) (err error) {
infoBytes, err := tx.GetOne(kv.BittorrentInfo, []byte(name))

info, err = downloaderrawdb.ReadTorrentInfo(tx, name)
if err != nil {
return err
}

if err = json.Unmarshal(infoBytes, &info); err != nil {
return err
}

return nil
})

if err != nil {
return nil, err
}

return &info, nil
return info, nil
}

func (d *Downloader) ReCalcStats(interval time.Duration) {
Expand Down
40 changes: 27 additions & 13 deletions erigon-lib/downloader/downloadercfg/downloadercfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,36 +219,50 @@ func New(dirs datadir.Dirs, version string, verbosity lg.Level, downloadRate, up
}

// setup snapcfg
if err := loadSnapshotsEitherFromDiskIfNeeded(dirs, chainName); err != nil {
preverifiedCfg, err := loadSnapshotsEitherFromDiskIfNeeded(dirs, chainName)
if err != nil {
return nil, err
}

return &Cfg{Dirs: dirs, ChainName: chainName,
ClientConfig: torrentConfig, DownloadSlots: downloadSlots,
WebSeedUrls: webseedHttpProviders, WebSeedFileProviders: webseedFileProviders,
DownloadTorrentFilesFromWebseed: true, AddTorrentsFromDisk: true, SnapshotLock: lockSnapshots,
SnapshotConfig: snapcfg.KnownCfg(chainName),
SnapshotConfig: preverifiedCfg,
MdbxWriteMap: mdbxWriteMap,
}, nil
}

func loadSnapshotsEitherFromDiskIfNeeded(dirs datadir.Dirs, chainName string) error {
func ReadPreverifiedToml(dirs datadir.Dirs, chainName string) *snapcfg.Cfg {
preverifiedToml := filepath.Join(dirs.Snap, "preverified.toml")

exists, err := dir.FileExist(preverifiedToml)
if err != nil {
return err
panic(err)
return nil
}
if exists {
// Read the preverified.toml and load the snapshots
haveToml, err := os.ReadFile(preverifiedToml)
if err != nil {
return err
}
snapcfg.SetToml(chainName, haveToml)
if !exists {
return nil
}
return dir.WriteFileWithFsync(preverifiedToml, snapcfg.GetToml(chainName), 0644)
// Read the preverified.toml and load the snapshots
haveToml, err := os.ReadFile(preverifiedToml)
if err != nil {
panic(err)
return nil
}
snapcfg.SetToml(chainName, haveToml)
return snapcfg.KnownCfg(chainName)
}

func loadSnapshotsEitherFromDiskIfNeeded(dirs datadir.Dirs, chainName string) (*snapcfg.Cfg, error) {
if cfg := ReadPreverifiedToml(dirs, chainName); cfg != nil {
return cfg, nil
}
preverifiedToml := filepath.Join(dirs.Snap, "preverified.toml")
if err := dir.WriteFileWithFsync(preverifiedToml, snapcfg.GetToml(chainName), 0644); err != nil {
panic(err)
return nil, err
}
return snapcfg.KnownCfg(preverifiedToml), nil
}

func getIpv6Enabled() bool {
Expand Down
105 changes: 105 additions & 0 deletions erigon-lib/downloader/downloaderrawdb/accessors_downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package downloaderrawdb

import (
"context"
"encoding/json"
"os"
"path/filepath"
"time"

"github.com/erigontech/erigon-lib/chain/snapcfg"
"github.com/erigontech/erigon-lib/common/datadir"
"github.com/erigontech/erigon-lib/kv"
kv2 "github.com/erigontech/erigon-lib/kv/mdbx"
"github.com/erigontech/erigon-lib/log/v3"
"golang.org/x/sync/semaphore"
)

type TorrentInfo struct {
Name string `json:"name"`
Hash []byte `json:"hash"`
Length *int64 `json:"length,omitempty"`
Created *time.Time `json:"created,omitempty"`
Completed *time.Time `json:"completed,omitempty"`
}

func ReadTorrentInfo(downloaderDBTx kv.Tx, name string) (*TorrentInfo, error) {
var info TorrentInfo
infoBytes, err := downloaderDBTx.GetOne(kv.BittorrentInfo, []byte(name))
if err != nil {
return nil, err
}
if err = json.Unmarshal(infoBytes, &info); err != nil {
return nil, err
}
return &info, nil
}

func ReadTorrentInfoHash(downloaderDBTx kv.Tx, name string) (hashBytes []byte, err error) {
infoBytes, err := downloaderDBTx.GetOne(kv.BittorrentInfo, []byte(name))
if err != nil {
return nil, err
}

if len(infoBytes) == 20 {
return infoBytes, nil
}

var info TorrentInfo
if err = json.Unmarshal(infoBytes, &info); err == nil {
return info.Hash, nil
}
return nil, nil
}

func WriteTorrentInfo(tx kv.RwTx, info *TorrentInfo) error {
infoBytes, err := json.Marshal(info)
if err != nil {
return err
}
return tx.Put(kv.BittorrentInfo, []byte(info.Name), infoBytes)
}

func CheckFileComplete(tx kv.Tx, name string, snapDir string) (bool, int64, *time.Time) {
info, err := ReadTorrentInfo(tx, name)
if err != nil {
return false, 0, nil
}
if info.Completed != nil && info.Completed.Before(time.Now()) {
if info.Length != nil {
if fi, err := os.Stat(filepath.Join(snapDir, name)); err == nil {
return fi.Size() == *info.Length && fi.ModTime().Equal(*info.Completed), *info.Length, info.Completed
}
}
}
return false, 0, nil
}

func allFilesComplete(tx kv.Tx, preverifiedCfg *snapcfg.Cfg, dirs datadir.Dirs) (allFilesDownloadComplete bool, lastUncomplete string) {
for _, p := range preverifiedCfg.Preverified {
complete, _, _ := CheckFileComplete(tx, p.Name, dirs.Snap)
if !complete {
return false, p.Name
}
}
return true, ""
}

func AllFilesComplete(preverifiedCfg *snapcfg.Cfg, dirs datadir.Dirs) (allFilesDownloadComplete bool, lastUncomplete string, err error) {
limiter := semaphore.NewWeighted(9_000)
downloaderDB, err := kv2.NewMDBX(log.Root()).Label(kv.DownloaderDB).WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg {
return kv.TablesCfgByLabel(kv.DownloaderDB)
}).RoTxsLimiter(limiter).Path(dirs.Downloader).Accede().Open(context.Background())
if err != nil {
return false, "", err
}
defer downloaderDB.Close()

if err := downloaderDB.View(context.Background(), func(tx kv.Tx) error {
allFilesDownloadComplete, lastUncomplete = allFilesComplete(tx, preverifiedCfg, dirs)
return nil
}); err != nil {
return false, "", err
}
return allFilesDownloadComplete, lastUncomplete, nil
}
Loading
Loading