From e729c1f57ae582697aaef567384a70e30166fc81 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Wed, 16 Oct 2024 16:16:47 +0700 Subject: [PATCH 01/36] save --- cmd/rpcdaemon/cli/config.go | 31 +++++- erigon-lib/downloader/downloader.go | 52 +++------ .../downloader/downloadercfg/downloadercfg.go | 37 ++++--- .../downloaderrawdb/accessors_downloader.go | 103 ++++++++++++++++++ erigon-lib/downloader/util.go | 36 +----- 5 files changed, 173 insertions(+), 86 deletions(-) create mode 100644 erigon-lib/downloader/downloaderrawdb/accessors_downloader.go diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 02de6c062e0..1d8d0e6fc42 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -30,6 +30,10 @@ import ( "strings" "time" + "github.com/erigontech/erigon-lib/chain/snapcfg" + "github.com/erigontech/erigon-lib/downloader/downloadercfg" + "github.com/erigontech/erigon-lib/downloader/downloaderrawdb" + "github.com/erigontech/erigon/cmd/hack/tool" "github.com/spf13/cobra" "golang.org/x/sync/semaphore" "google.golang.org/grpc" @@ -404,13 +408,32 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, errors.New("chain config not found in db. Need start erigon at least once on this db") } + doOptimisticOpen := false + + { + cc := tool.ChainConfigFromDB(db) + if cc != nil { + snapcfg.LoadRemotePreverified() + preverifiedCfg := downloadercfg.ReadPreverifiedToml(cfg.Dirs, cc.ChainName) + if preverifiedCfg != nil { + allFilesDownloadComplete, err := downloaderrawdb.AllFilesComplete(preverifiedCfg, cfg.Dirs) + if err != nil { + return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err + } + doOptimisticOpen = allFilesDownloadComplete + } + } + } + // Configure sapshots allSnapshots = freezeblocks.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger) allBorSnapshots = freezeblocks.NewBorRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger) // To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down // Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection - allSnapshots.OptimisticalyReopenFolder() - allBorSnapshots.OptimisticalyReopenFolder() + if doOptimisticOpen { + allSnapshots.OptimisticalyReopenFolder() + allBorSnapshots.OptimisticalyReopenFolder() + } allSnapshots.LogStat("remote") allBorSnapshots.LogStat("bor:remote") blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots) @@ -420,7 +443,9 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger if err != nil { return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, fmt.Errorf("create aggregator: %w", err) } - _ = agg.OpenFolder() //TODO: must use analog of `OptimisticReopenWithDB` + if doOptimisticOpen { + _ = agg.OpenFolder() //TODO: must use analog of `OptimisticReopenWithDB` + } db.View(context.Background(), func(tx kv.Tx) error { aggTx := agg.BeginFilesRo() diff --git a/erigon-lib/downloader/downloader.go b/erigon-lib/downloader/downloader.go index 828532c1102..4a8d6d6c3bc 100644 --- a/erigon-lib/downloader/downloader.go +++ b/erigon-lib/downloader/downloader.go @@ -21,7 +21,6 @@ import ( "bytes" "context" "encoding/hex" - "encoding/json" "errors" "fmt" "math" @@ -46,6 +45,7 @@ import ( "github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/types/infohash" "github.com/c2h5oh/datasize" + "github.com/erigontech/erigon-lib/downloader/downloaderrawdb" "github.com/tidwall/btree" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" @@ -725,23 +725,10 @@ func localHashBytes(ctx context.Context, fileInfo snaptype.FileInfo, db kv.RoDB, if db != nil { err := db.View(ctx, func(tx kv.Tx) (err error) { - infoBytes, err := tx.GetOne(kv.BittorrentInfo, []byte(fileInfo.Name())) - + hashBytes, err = downloaderrawdb.ReadTorrentInfoHash(tx, fileInfo.Name()) if err != nil { return err } - - if len(infoBytes) == 20 { - hashBytes = infoBytes - return nil - } - - var info torrentInfo - - if err = json.Unmarshal(infoBytes, &info); err == nil { - hashBytes = info.Hash - } - return nil }) @@ -1503,18 +1490,14 @@ func logSeedHashMismatches(torrentHash infohash.T, name string, seedHashMismatch } } -func (d *Downloader) checkComplete(name string) (bool, int64, *time.Time) { - if info, err := d.torrentInfo(name); err == nil { - if info.Completed != nil && info.Completed.Before(time.Now()) { - if info.Length != nil { - if fi, err := os.Stat(filepath.Join(d.SnapDir(), name)); err == nil { - return fi.Size() == *info.Length && fi.ModTime().Equal(*info.Completed), *info.Length, info.Completed - } - } - } +func (d *Downloader) checkComplete(name string) (complete bool, fileLen int64, completedAt *time.Time) { + if err := d.db.View(d.ctx, func(tx kv.Tx) error { + complete, fileLen, completedAt = downloaderrawdb.CheckFileComplete(tx, name, d.SnapDir()) + return nil + }); err != nil { + return false, 0, nil } - - return false, 0, nil + return } func (d *Downloader) getWebDownloadInfo(t *torrent.Torrent) (webDownloadInfo, []*seedHash, error) { @@ -1934,28 +1917,19 @@ func availableTorrents(ctx context.Context, pending []*torrent.Torrent, download func (d *Downloader) SnapDir() string { return d.cfg.Dirs.Snap } -func (d *Downloader) torrentInfo(name string) (*torrentInfo, error) { - var info torrentInfo - +func (d *Downloader) torrentInfo(name string) (*downloaderrawdb.TorrentInfo, error) { + var info *downloaderrawdb.TorrentInfo err := d.db.View(d.ctx, func(tx kv.Tx) (err error) { - infoBytes, err := tx.GetOne(kv.BittorrentInfo, []byte(name)) - + info, err = downloaderrawdb.ReadTorrentInfo(tx, name) if err != nil { return err } - - if err = json.Unmarshal(infoBytes, &info); err != nil { - return err - } - return nil }) - if err != nil { return nil, err } - - return &info, nil + return info, nil } func (d *Downloader) ReCalcStats(interval time.Duration) { diff --git a/erigon-lib/downloader/downloadercfg/downloadercfg.go b/erigon-lib/downloader/downloadercfg/downloadercfg.go index e093aec8058..8c3710fc887 100644 --- a/erigon-lib/downloader/downloadercfg/downloadercfg.go +++ b/erigon-lib/downloader/downloadercfg/downloadercfg.go @@ -219,7 +219,8 @@ func New(dirs datadir.Dirs, version string, verbosity lg.Level, downloadRate, up } // setup snapcfg - if err := loadSnapshotsEitherFromDiskIfNeeded(dirs, chainName); err != nil { + preverifiedCfg, err := loadSnapshotsEitherFromDiskIfNeeded(dirs, chainName) + if err != nil { return nil, err } @@ -227,28 +228,38 @@ func New(dirs datadir.Dirs, version string, verbosity lg.Level, downloadRate, up ClientConfig: torrentConfig, DownloadSlots: downloadSlots, WebSeedUrls: webseedHttpProviders, WebSeedFileProviders: webseedFileProviders, DownloadTorrentFilesFromWebseed: true, AddTorrentsFromDisk: true, SnapshotLock: lockSnapshots, - SnapshotConfig: snapcfg.KnownCfg(chainName), + SnapshotConfig: preverifiedCfg, MdbxWriteMap: mdbxWriteMap, }, nil } -func loadSnapshotsEitherFromDiskIfNeeded(dirs datadir.Dirs, chainName string) error { +func ReadPreverifiedToml(dirs datadir.Dirs, chainName string) *snapcfg.Cfg { preverifiedToml := filepath.Join(dirs.Snap, "preverified.toml") - exists, err := dir.FileExist(preverifiedToml) if err != nil { - return err + return nil } - if exists { - // Read the preverified.toml and load the snapshots - haveToml, err := os.ReadFile(preverifiedToml) - if err != nil { - return err - } - snapcfg.SetToml(chainName, haveToml) + if !exists { return nil } - return dir.WriteFileWithFsync(preverifiedToml, snapcfg.GetToml(chainName), 0644) + // Read the preverified.toml and load the snapshots + haveToml, err := os.ReadFile(preverifiedToml) + if err != nil { + return nil + } + snapcfg.SetToml(chainName, haveToml) + return snapcfg.KnownCfg(preverifiedToml) +} + +func loadSnapshotsEitherFromDiskIfNeeded(dirs datadir.Dirs, chainName string) (*snapcfg.Cfg, error) { + if cfg := ReadPreverifiedToml(dirs, chainName); cfg != nil { + return cfg, nil + } + preverifiedToml := filepath.Join(dirs.Snap, "preverified.toml") + if err := dir.WriteFileWithFsync(preverifiedToml, snapcfg.GetToml(chainName), 0644); err != nil { + return nil, err + } + return snapcfg.KnownCfg(preverifiedToml), nil } func getIpv6Enabled() bool { diff --git a/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go new file mode 100644 index 00000000000..8231e000cd8 --- /dev/null +++ b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go @@ -0,0 +1,103 @@ +package downloaderrawdb + +import ( + "context" + "encoding/json" + "os" + "path/filepath" + "time" + + "github.com/erigontech/erigon-lib/chain/snapcfg" + "github.com/erigontech/erigon-lib/common/datadir" + "github.com/erigontech/erigon-lib/kv" + kv2 "github.com/erigontech/erigon-lib/kv/mdbx" + "github.com/erigontech/erigon-lib/log/v3" + "golang.org/x/sync/semaphore" +) + +type TorrentInfo struct { + Name string `json:"name"` + Hash []byte `json:"hash"` + Length *int64 `json:"length,omitempty"` + Created *time.Time `json:"created,omitempty"` + Completed *time.Time `json:"completed,omitempty"` +} + +func ReadTorrentInfo(downloaderDBTx kv.Tx, name string) (*TorrentInfo, error) { + var info TorrentInfo + infoBytes, err := downloaderDBTx.GetOne(kv.BittorrentInfo, []byte(name)) + if err != nil { + return nil, err + } + if err = json.Unmarshal(infoBytes, &info); err != nil { + return nil, err + } + return &info, nil +} + +func ReadTorrentInfoHash(downloaderDBTx kv.Tx, name string) (hashBytes []byte, err error) { + infoBytes, err := downloaderDBTx.GetOne(kv.BittorrentInfo, []byte(name)) + if err != nil { + return nil, err + } + + if len(infoBytes) == 20 { + return infoBytes, nil + } + + var info TorrentInfo + if err = json.Unmarshal(infoBytes, &info); err == nil { + return info.Hash, nil + } + return nil, nil +} + +func WriteTorrentInfo(tx kv.RwTx, info *TorrentInfo) error { + infoBytes, err := json.Marshal(info) + if err != nil { + return err + } + return tx.Put(kv.BittorrentInfo, []byte(info.Name), infoBytes) +} + +func CheckFileComplete(tx kv.Tx, name string, snapDir string) (bool, int64, *time.Time) { + info, err := ReadTorrentInfo(tx, name) + if err != nil { + return false, 0, nil + } + if info.Completed != nil && info.Completed.Before(time.Now()) { + if info.Length != nil { + if fi, err := os.Stat(filepath.Join(snapDir, name)); err == nil { + return fi.Size() == *info.Length && fi.ModTime().Equal(*info.Completed), *info.Length, info.Completed + } + } + } + return false, 0, nil +} + +func allFilesComplete(tx kv.Tx, preverifiedCfg *snapcfg.Cfg, dirs datadir.Dirs) (allFilesDownloadComplete bool) { + for _, p := range preverifiedCfg.Preverified { + complete, _, _ := CheckFileComplete(tx, p.Name, dirs.Snap) + if !complete { + return false + } + } + return true +} + +func AllFilesComplete(preverifiedCfg *snapcfg.Cfg, dirs datadir.Dirs) (allFilesDownloadComplete bool, err error) { + limiter := semaphore.NewWeighted(9_000) + downloaderDB, err := kv2.NewMDBX(log.Root()).RoTxsLimiter(limiter).Path(dirs.Downloader).Accede().Open(context.Background()) + if err != nil { + return false, err + } + defer downloaderDB.Close() + + if err := downloaderDB.View(context.Background(), func(tx kv.Tx) error { + allFilesDownloadComplete = allFilesComplete(tx, preverifiedCfg, dirs) + return nil + }); err != nil { + return false, err + } + return allFilesDownloadComplete, nil +} diff --git a/erigon-lib/downloader/util.go b/erigon-lib/downloader/util.go index e47655fecab..5be5b7901aa 100644 --- a/erigon-lib/downloader/util.go +++ b/erigon-lib/downloader/util.go @@ -20,7 +20,6 @@ import ( "bytes" "context" "crypto/sha1" - "encoding/json" "errors" "fmt" "io" @@ -34,6 +33,7 @@ import ( "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/metainfo" + "github.com/erigontech/erigon-lib/downloader/downloaderrawdb" "golang.org/x/sync/errgroup" "github.com/erigontech/erigon-lib/chain/snapcfg" @@ -67,14 +67,6 @@ var Trackers = [][]string{ //websocketTrackers // TODO: Ws protocol producing too many errors and flooding logs. But it's also very fast and reactive. } -type torrentInfo struct { - Name string `json:"name"` - Hash []byte `json:"hash"` - Length *int64 `json:"length,omitempty"` - Created *time.Time `json:"created,omitempty"` - Completed *time.Time `json:"completed,omitempty"` -} - func seedableSegmentFiles(dir string, chainName string, skipSeedableCheck bool) ([]string, error) { extensions := snaptype.SeedableV2Extensions() if skipSeedableCheck { @@ -388,16 +380,11 @@ func _addTorrentFile(ctx context.Context, ts *torrent.TorrentSpec, torrentClient func torrentInfoUpdater(fileName string, infoHash []byte, length int64, completionTime *time.Time) func(tx kv.RwTx) error { return func(tx kv.RwTx) error { - infoBytes, err := tx.GetOne(kv.BittorrentInfo, []byte(fileName)) - + info, err := downloaderrawdb.ReadTorrentInfo(tx, fileName) if err != nil { return err } - var info torrentInfo - - err = json.Unmarshal(infoBytes, &info) - changed := false if err != nil || (len(infoHash) > 0 && !bytes.Equal(info.Hash, infoHash)) { @@ -423,13 +410,7 @@ func torrentInfoUpdater(fileName string, infoHash []byte, length int64, completi return nil } - infoBytes, err = json.Marshal(info) - - if err != nil { - return err - } - - return tx.Put(kv.BittorrentInfo, []byte(fileName), infoBytes) + return downloaderrawdb.WriteTorrentInfo(tx, info) } } @@ -437,7 +418,7 @@ func torrentInfoReset(fileName string, infoHash []byte, length int64) func(tx kv return func(tx kv.RwTx) error { now := time.Now() - info := torrentInfo{ + info := downloaderrawdb.TorrentInfo{ Name: fileName, Hash: infoHash, Created: &now, @@ -446,14 +427,7 @@ func torrentInfoReset(fileName string, infoHash []byte, length int64) func(tx kv if length > 0 { info.Length = &length } - - infoBytes, err := json.Marshal(info) - - if err != nil { - return err - } - - return tx.Put(kv.BittorrentInfo, []byte(fileName), infoBytes) + return downloaderrawdb.WriteTorrentInfo(tx, &info) } } From 9c0fe92b53db7d0780e16abbb77766498ac82996 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Wed, 16 Oct 2024 16:18:03 +0700 Subject: [PATCH 02/36] save --- cmd/rpcdaemon/cli/config.go | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 1d8d0e6fc42..4ab878db574 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -409,19 +409,15 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger } doOptimisticOpen := false - - { - cc := tool.ChainConfigFromDB(db) - if cc != nil { - snapcfg.LoadRemotePreverified() - preverifiedCfg := downloadercfg.ReadPreverifiedToml(cfg.Dirs, cc.ChainName) - if preverifiedCfg != nil { - allFilesDownloadComplete, err := downloaderrawdb.AllFilesComplete(preverifiedCfg, cfg.Dirs) - if err != nil { - return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err - } - doOptimisticOpen = allFilesDownloadComplete + if cc := tool.ChainConfigFromDB(db); cc != nil { + snapcfg.LoadRemotePreverified() + preverifiedCfg := downloadercfg.ReadPreverifiedToml(cfg.Dirs, cc.ChainName) + if preverifiedCfg != nil { + allFilesDownloadComplete, err := downloaderrawdb.AllFilesComplete(preverifiedCfg, cfg.Dirs) + if err != nil { + return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err } + doOptimisticOpen = allFilesDownloadComplete } } From 8c3eeb4e99d4aed9507ea4c06f437b68f4153c38 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Wed, 16 Oct 2024 16:18:40 +0700 Subject: [PATCH 03/36] save --- cmd/rpcdaemon/cli/config.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 4ab878db574..1206d04a24e 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -411,8 +411,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger doOptimisticOpen := false if cc := tool.ChainConfigFromDB(db); cc != nil { snapcfg.LoadRemotePreverified() - preverifiedCfg := downloadercfg.ReadPreverifiedToml(cfg.Dirs, cc.ChainName) - if preverifiedCfg != nil { + if preverifiedCfg := downloadercfg.ReadPreverifiedToml(cfg.Dirs, cc.ChainName); preverifiedCfg != nil { allFilesDownloadComplete, err := downloaderrawdb.AllFilesComplete(preverifiedCfg, cfg.Dirs) if err != nil { return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err From 30be59cf86d3245d24f8e4f8e0d65cc566638f1d Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Wed, 16 Oct 2024 17:26:12 +0700 Subject: [PATCH 04/36] save --- cmd/rpcdaemon/cli/config.go | 26 ++++++++++--------- .../downloader/downloadercfg/downloadercfg.go | 5 +++- .../downloaderrawdb/accessors_downloader.go | 19 +++++++------- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 1206d04a24e..0eb11f9ab0f 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -30,10 +30,6 @@ import ( "strings" "time" - "github.com/erigontech/erigon-lib/chain/snapcfg" - "github.com/erigontech/erigon-lib/downloader/downloadercfg" - "github.com/erigontech/erigon-lib/downloader/downloaderrawdb" - "github.com/erigontech/erigon/cmd/hack/tool" "github.com/spf13/cobra" "golang.org/x/sync/semaphore" "google.golang.org/grpc" @@ -41,11 +37,14 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "github.com/erigontech/erigon-lib/chain" + "github.com/erigontech/erigon-lib/chain/snapcfg" libcommon "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/common/datadir" "github.com/erigontech/erigon-lib/common/hexutility" "github.com/erigontech/erigon-lib/config3" "github.com/erigontech/erigon-lib/direct" + "github.com/erigontech/erigon-lib/downloader/downloadercfg" + "github.com/erigontech/erigon-lib/downloader/downloaderrawdb" "github.com/erigontech/erigon-lib/gointerfaces" "github.com/erigontech/erigon-lib/gointerfaces/grpcutil" remote "github.com/erigontech/erigon-lib/gointerfaces/remoteproto" @@ -409,15 +408,18 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger } doOptimisticOpen := false - if cc := tool.ChainConfigFromDB(db); cc != nil { - snapcfg.LoadRemotePreverified() - if preverifiedCfg := downloadercfg.ReadPreverifiedToml(cfg.Dirs, cc.ChainName); preverifiedCfg != nil { - allFilesDownloadComplete, err := downloaderrawdb.AllFilesComplete(preverifiedCfg, cfg.Dirs) - if err != nil { - return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err - } - doOptimisticOpen = allFilesDownloadComplete + snapcfg.LoadRemotePreverified() + if preverifiedCfg := downloadercfg.ReadPreverifiedToml(cfg.Dirs, cc.ChainName); preverifiedCfg != nil { + allFilesDownloadComplete, lastUncomplete, err := downloaderrawdb.AllFilesComplete(preverifiedCfg, cfg.Dirs) + if err != nil { + return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err } + if !allFilesDownloadComplete { + log.Warn("[rpc] download of segments not complete yet (need wait, then RPC will work)", "example_uncomplete_file", lastUncomplete) + } + doOptimisticOpen = allFilesDownloadComplete + } else { + log.Warn("[rpc] download of segments not complete yet") } // Configure sapshots diff --git a/erigon-lib/downloader/downloadercfg/downloadercfg.go b/erigon-lib/downloader/downloadercfg/downloadercfg.go index 8c3710fc887..7172c6698b6 100644 --- a/erigon-lib/downloader/downloadercfg/downloadercfg.go +++ b/erigon-lib/downloader/downloadercfg/downloadercfg.go @@ -237,6 +237,7 @@ func ReadPreverifiedToml(dirs datadir.Dirs, chainName string) *snapcfg.Cfg { preverifiedToml := filepath.Join(dirs.Snap, "preverified.toml") exists, err := dir.FileExist(preverifiedToml) if err != nil { + panic(err) return nil } if !exists { @@ -245,10 +246,11 @@ func ReadPreverifiedToml(dirs datadir.Dirs, chainName string) *snapcfg.Cfg { // Read the preverified.toml and load the snapshots haveToml, err := os.ReadFile(preverifiedToml) if err != nil { + panic(err) return nil } snapcfg.SetToml(chainName, haveToml) - return snapcfg.KnownCfg(preverifiedToml) + return snapcfg.KnownCfg(chainName) } func loadSnapshotsEitherFromDiskIfNeeded(dirs datadir.Dirs, chainName string) (*snapcfg.Cfg, error) { @@ -257,6 +259,7 @@ func loadSnapshotsEitherFromDiskIfNeeded(dirs datadir.Dirs, chainName string) (* } preverifiedToml := filepath.Join(dirs.Snap, "preverified.toml") if err := dir.WriteFileWithFsync(preverifiedToml, snapcfg.GetToml(chainName), 0644); err != nil { + panic(err) return nil, err } return snapcfg.KnownCfg(preverifiedToml), nil diff --git a/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go index 8231e000cd8..1aa6a88fd89 100644 --- a/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go +++ b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go @@ -75,29 +75,30 @@ func CheckFileComplete(tx kv.Tx, name string, snapDir string) (bool, int64, *tim return false, 0, nil } -func allFilesComplete(tx kv.Tx, preverifiedCfg *snapcfg.Cfg, dirs datadir.Dirs) (allFilesDownloadComplete bool) { +func allFilesComplete(tx kv.Tx, preverifiedCfg *snapcfg.Cfg, dirs datadir.Dirs) (allFilesDownloadComplete bool, lastUncomplete string) { + log.Warn("see", "l", len(preverifiedCfg.Preverified)) for _, p := range preverifiedCfg.Preverified { complete, _, _ := CheckFileComplete(tx, p.Name, dirs.Snap) if !complete { - return false + return false, p.Name } } - return true + return true, "" } -func AllFilesComplete(preverifiedCfg *snapcfg.Cfg, dirs datadir.Dirs) (allFilesDownloadComplete bool, err error) { +func AllFilesComplete(preverifiedCfg *snapcfg.Cfg, dirs datadir.Dirs) (allFilesDownloadComplete bool, lastUncomplete string, err error) { limiter := semaphore.NewWeighted(9_000) - downloaderDB, err := kv2.NewMDBX(log.Root()).RoTxsLimiter(limiter).Path(dirs.Downloader).Accede().Open(context.Background()) + downloaderDB, err := kv2.NewMDBX(log.Root()).Label(kv.DownloaderDB).RoTxsLimiter(limiter).Path(dirs.Downloader).Accede().Open(context.Background()) if err != nil { - return false, err + return false, "", err } defer downloaderDB.Close() if err := downloaderDB.View(context.Background(), func(tx kv.Tx) error { - allFilesDownloadComplete = allFilesComplete(tx, preverifiedCfg, dirs) + allFilesDownloadComplete, lastUncomplete = allFilesComplete(tx, preverifiedCfg, dirs) return nil }); err != nil { - return false, err + return false, "", err } - return allFilesDownloadComplete, nil + return allFilesDownloadComplete, lastUncomplete, nil } From 152632f6f7e8e8a2d94b7a2186b783b2252276c2 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Wed, 16 Oct 2024 17:26:19 +0700 Subject: [PATCH 05/36] save --- erigon-lib/downloader/downloaderrawdb/accessors_downloader.go | 1 - 1 file changed, 1 deletion(-) diff --git a/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go index 1aa6a88fd89..63ca1d77cce 100644 --- a/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go +++ b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go @@ -76,7 +76,6 @@ func CheckFileComplete(tx kv.Tx, name string, snapDir string) (bool, int64, *tim } func allFilesComplete(tx kv.Tx, preverifiedCfg *snapcfg.Cfg, dirs datadir.Dirs) (allFilesDownloadComplete bool, lastUncomplete string) { - log.Warn("see", "l", len(preverifiedCfg.Preverified)) for _, p := range preverifiedCfg.Preverified { complete, _, _ := CheckFileComplete(tx, p.Name, dirs.Snap) if !complete { From d9034307979fd1c451d20c6f7716e934325b9113 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Wed, 16 Oct 2024 17:27:23 +0700 Subject: [PATCH 06/36] save --- erigon-lib/kv/mdbx/kv_mdbx.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/erigon-lib/kv/mdbx/kv_mdbx.go b/erigon-lib/kv/mdbx/kv_mdbx.go index 4bdfcd87c4d..ea51e6a82fd 100644 --- a/erigon-lib/kv/mdbx/kv_mdbx.go +++ b/erigon-lib/kv/mdbx/kv_mdbx.go @@ -1013,7 +1013,7 @@ func (tx *MdbxTx) CreateBucket(name string) error { dbi, err = tx.tx.OpenDBI(name, nativeFlags, nil, nil) if err != nil { - return fmt.Errorf("db-talbe doesn't exists: %s, %w. Tip: try run `integration run_migrations` to create non-existing tables", name, err) + return fmt.Errorf("db-talbe doesn't exists: %s, lable: %s, %w. Tip: try run `integration run_migrations` to create non-existing tables", name, tx.db.opts.label, err) } cnfCopy.DBI = kv.DBI(dbi) From 333a4feb80464c8e99fa839fca9cfe9ba35ff25c Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Wed, 16 Oct 2024 17:39:55 +0700 Subject: [PATCH 07/36] save --- erigon-lib/downloader/downloaderrawdb/accessors_downloader.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go index 63ca1d77cce..f0e67e8ffdc 100644 --- a/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go +++ b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go @@ -87,7 +87,9 @@ func allFilesComplete(tx kv.Tx, preverifiedCfg *snapcfg.Cfg, dirs datadir.Dirs) func AllFilesComplete(preverifiedCfg *snapcfg.Cfg, dirs datadir.Dirs) (allFilesDownloadComplete bool, lastUncomplete string, err error) { limiter := semaphore.NewWeighted(9_000) - downloaderDB, err := kv2.NewMDBX(log.Root()).Label(kv.DownloaderDB).RoTxsLimiter(limiter).Path(dirs.Downloader).Accede().Open(context.Background()) + downloaderDB, err := kv2.NewMDBX(log.Root()).Label(kv.DownloaderDB).WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { + return kv.TablesCfgByLabel(kv.DownloaderDB) + }).RoTxsLimiter(limiter).Path(dirs.Downloader).Accede().Open(context.Background()) if err != nil { return false, "", err } From 74161df76b52dd937b1c39c735f15cab476f3dbc Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 17 Oct 2024 14:03:02 +0700 Subject: [PATCH 08/36] save --- erigon-lib/downloader/downloader.go | 12 ++++++++++++ .../downloaderrawdb/accessors_downloader.go | 17 +++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/erigon-lib/downloader/downloader.go b/erigon-lib/downloader/downloader.go index 4a8d6d6c3bc..5da0b4bd0f2 100644 --- a/erigon-lib/downloader/downloader.go +++ b/erigon-lib/downloader/downloader.go @@ -2293,6 +2293,12 @@ func (d *Downloader) ReCalcStats(interval time.Duration) { d.lock.Unlock() + if stats.Completed { + logger.Warn("[dbg] completed", "completed", stats.Completed, "len", len(torrents)) + //d.save() + //d.writeCompletionMarker() + } + if !stats.Completed { logger.Debug("[snapshots] downloading", "len", len(torrents), @@ -2933,6 +2939,12 @@ func (d *Downloader) Completed() bool { return d.stats.Completed } +func (d *Downloader) saveAllCompleteFlag() { + if err := d.db.Update(d.ctx, downloaderrawdb.WriteAllCompleteFlag); err != nil { + d.logger.Debug("[snapshots] Can't update 'all complete' flag", "err", err) + } +} + // Store completed torrents in order to notify GrpcServer subscribers when they subscribe and there is already downloaded files func (d *Downloader) torrentCompleted(tName string, tHash metainfo.Hash) { d.lock.Lock() diff --git a/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go index f0e67e8ffdc..af61c70b676 100644 --- a/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go +++ b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go @@ -29,6 +29,9 @@ func ReadTorrentInfo(downloaderDBTx kv.Tx, name string) (*TorrentInfo, error) { if err != nil { return nil, err } + if len(infoBytes) == 0 { + return &info, nil + } if err = json.Unmarshal(infoBytes, &info); err != nil { return nil, err } @@ -103,3 +106,17 @@ func AllFilesComplete(preverifiedCfg *snapcfg.Cfg, dirs datadir.Dirs) (allFilesD } return allFilesDownloadComplete, lastUncomplete, nil } + +func WriteAllCompleteFlag(tx kv.RwTx) error { + return tx.Put(kv.BittorrentInfo, []byte("all_complete"), []byte{1}) +} +func ReadAllCompleteFlag(tx kv.RwTx) (bool, error) { + v, err := tx.GetOne(kv.BittorrentInfo, []byte("all_complete")) + if err != nil { + return false, err + } + if len(v) == 0 { + return false, nil + } + return v[0] == 1, nil +} From 9e2c1c18c70669b053e2647de043fd606e37c205 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 17 Oct 2024 14:04:55 +0700 Subject: [PATCH 09/36] save --- .../downloader/downloaderrawdb/accessors_downloader.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go index af61c70b676..7b408f715f4 100644 --- a/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go +++ b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go @@ -107,11 +107,13 @@ func AllFilesComplete(preverifiedCfg *snapcfg.Cfg, dirs datadir.Dirs) (allFilesD return allFilesDownloadComplete, lastUncomplete, nil } +var AllCompleteFlagKey = []byte("all_complete") + func WriteAllCompleteFlag(tx kv.RwTx) error { - return tx.Put(kv.BittorrentInfo, []byte("all_complete"), []byte{1}) + return tx.Put(kv.BittorrentInfo, AllCompleteFlagKey, []byte{1}) } func ReadAllCompleteFlag(tx kv.RwTx) (bool, error) { - v, err := tx.GetOne(kv.BittorrentInfo, []byte("all_complete")) + v, err := tx.GetOne(kv.BittorrentInfo, AllCompleteFlagKey) if err != nil { return false, err } From f5827124fb370d5eb61ced7f118c92eb10a0ff97 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 17 Oct 2024 14:35:57 +0700 Subject: [PATCH 10/36] save --- cmd/rpcdaemon/cli/config.go | 29 ++++++++++--------- .../downloaderrawdb/accessors_downloader.go | 25 +++++----------- 2 files changed, 22 insertions(+), 32 deletions(-) diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 0eb11f9ab0f..e6f40bdb79d 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -407,19 +407,20 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, errors.New("chain config not found in db. Need start erigon at least once on this db") } - doOptimisticOpen := false - snapcfg.LoadRemotePreverified() - if preverifiedCfg := downloadercfg.ReadPreverifiedToml(cfg.Dirs, cc.ChainName); preverifiedCfg != nil { - allFilesDownloadComplete, lastUncomplete, err := downloaderrawdb.AllFilesComplete(preverifiedCfg, cfg.Dirs) - if err != nil { - return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err - } - if !allFilesDownloadComplete { - log.Warn("[rpc] download of segments not complete yet (need wait, then RPC will work)", "example_uncomplete_file", lastUncomplete) + allSegmentsDownloadComplete := false + { + snapcfg.LoadRemotePreverified() + if preverifiedCfg := downloadercfg.ReadPreverifiedToml(cfg.Dirs, cc.ChainName); preverifiedCfg != nil { + allSegmentsDownloadComplete, err = downloaderrawdb.AllSegmentsDownloadCompleteFlag(cfg.Dirs) + if err != nil { + return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err + } + if !allSegmentsDownloadComplete { + log.Warn("[rpc] download of segments not complete yet (need wait, then RPC will work)") + } + } else { + log.Warn("[rpc] download of segments not complete yet") } - doOptimisticOpen = allFilesDownloadComplete - } else { - log.Warn("[rpc] download of segments not complete yet") } // Configure sapshots @@ -427,7 +428,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger allBorSnapshots = freezeblocks.NewBorRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger) // To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down // Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection - if doOptimisticOpen { + if allSegmentsDownloadComplete { allSnapshots.OptimisticalyReopenFolder() allBorSnapshots.OptimisticalyReopenFolder() } @@ -440,7 +441,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger if err != nil { return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, fmt.Errorf("create aggregator: %w", err) } - if doOptimisticOpen { + if allSegmentsDownloadComplete { _ = agg.OpenFolder() //TODO: must use analog of `OptimisticReopenWithDB` } diff --git a/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go index 7b408f715f4..ee29513c96c 100644 --- a/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go +++ b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go @@ -7,7 +7,6 @@ import ( "path/filepath" "time" - "github.com/erigontech/erigon-lib/chain/snapcfg" "github.com/erigontech/erigon-lib/common/datadir" "github.com/erigontech/erigon-lib/kv" kv2 "github.com/erigontech/erigon-lib/kv/mdbx" @@ -78,33 +77,23 @@ func CheckFileComplete(tx kv.Tx, name string, snapDir string) (bool, int64, *tim return false, 0, nil } -func allFilesComplete(tx kv.Tx, preverifiedCfg *snapcfg.Cfg, dirs datadir.Dirs) (allFilesDownloadComplete bool, lastUncomplete string) { - for _, p := range preverifiedCfg.Preverified { - complete, _, _ := CheckFileComplete(tx, p.Name, dirs.Snap) - if !complete { - return false, p.Name - } - } - return true, "" -} - -func AllFilesComplete(preverifiedCfg *snapcfg.Cfg, dirs datadir.Dirs) (allFilesDownloadComplete bool, lastUncomplete string, err error) { +func AllSegmentsDownloadCompleteFlag(dirs datadir.Dirs) (allFilesDownloadComplete bool, err error) { limiter := semaphore.NewWeighted(9_000) downloaderDB, err := kv2.NewMDBX(log.Root()).Label(kv.DownloaderDB).WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TablesCfgByLabel(kv.DownloaderDB) }).RoTxsLimiter(limiter).Path(dirs.Downloader).Accede().Open(context.Background()) if err != nil { - return false, "", err + return false, err } defer downloaderDB.Close() if err := downloaderDB.View(context.Background(), func(tx kv.Tx) error { - allFilesDownloadComplete, lastUncomplete = allFilesComplete(tx, preverifiedCfg, dirs) - return nil + allFilesDownloadComplete, err = ReadAllCompleteFlag(tx) + return err }); err != nil { - return false, "", err + return false, err } - return allFilesDownloadComplete, lastUncomplete, nil + return allFilesDownloadComplete, nil } var AllCompleteFlagKey = []byte("all_complete") @@ -112,7 +101,7 @@ var AllCompleteFlagKey = []byte("all_complete") func WriteAllCompleteFlag(tx kv.RwTx) error { return tx.Put(kv.BittorrentInfo, AllCompleteFlagKey, []byte{1}) } -func ReadAllCompleteFlag(tx kv.RwTx) (bool, error) { +func ReadAllCompleteFlag(tx kv.Tx) (bool, error) { v, err := tx.GetOne(kv.BittorrentInfo, AllCompleteFlagKey) if err != nil { return false, err From c50a0c96320e242ded19c5addd634d5b6a273b2a Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 17 Oct 2024 14:37:08 +0700 Subject: [PATCH 11/36] save --- cmd/rpcdaemon/cli/config.go | 22 ++++++---------------- erigon-lib/downloader/downloader.go | 3 +-- 2 files changed, 7 insertions(+), 18 deletions(-) diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index e6f40bdb79d..4b07c0291a3 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -37,13 +37,11 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "github.com/erigontech/erigon-lib/chain" - "github.com/erigontech/erigon-lib/chain/snapcfg" libcommon "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/common/datadir" "github.com/erigontech/erigon-lib/common/hexutility" "github.com/erigontech/erigon-lib/config3" "github.com/erigontech/erigon-lib/direct" - "github.com/erigontech/erigon-lib/downloader/downloadercfg" "github.com/erigontech/erigon-lib/downloader/downloaderrawdb" "github.com/erigontech/erigon-lib/gointerfaces" "github.com/erigontech/erigon-lib/gointerfaces/grpcutil" @@ -407,20 +405,12 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, errors.New("chain config not found in db. Need start erigon at least once on this db") } - allSegmentsDownloadComplete := false - { - snapcfg.LoadRemotePreverified() - if preverifiedCfg := downloadercfg.ReadPreverifiedToml(cfg.Dirs, cc.ChainName); preverifiedCfg != nil { - allSegmentsDownloadComplete, err = downloaderrawdb.AllSegmentsDownloadCompleteFlag(cfg.Dirs) - if err != nil { - return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err - } - if !allSegmentsDownloadComplete { - log.Warn("[rpc] download of segments not complete yet (need wait, then RPC will work)") - } - } else { - log.Warn("[rpc] download of segments not complete yet") - } + allSegmentsDownloadComplete, err := downloaderrawdb.AllSegmentsDownloadCompleteFlag(cfg.Dirs) + if err != nil { + return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err + } + if !allSegmentsDownloadComplete { + log.Warn("[rpc] download of segments not complete yet (need wait, then RPC will work)") } // Configure sapshots diff --git a/erigon-lib/downloader/downloader.go b/erigon-lib/downloader/downloader.go index 5da0b4bd0f2..c39bf51a021 100644 --- a/erigon-lib/downloader/downloader.go +++ b/erigon-lib/downloader/downloader.go @@ -2295,8 +2295,7 @@ func (d *Downloader) ReCalcStats(interval time.Duration) { if stats.Completed { logger.Warn("[dbg] completed", "completed", stats.Completed, "len", len(torrents)) - //d.save() - //d.writeCompletionMarker() + d.saveAllCompleteFlag() } if !stats.Completed { From ab8f414b48207ceb84d31148f5065ece14d570c8 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 17 Oct 2024 14:59:12 +0700 Subject: [PATCH 12/36] save --- cmd/rpcdaemon/cli/config.go | 36 ++++++++++++++++++------------------ eth/backend.go | 34 +++++++++++++++------------------- 2 files changed, 33 insertions(+), 37 deletions(-) diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 4b07c0291a3..24dcd2e706d 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -416,14 +416,6 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger // Configure sapshots allSnapshots = freezeblocks.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger) allBorSnapshots = freezeblocks.NewBorRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger) - // To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down - // Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection - if allSegmentsDownloadComplete { - allSnapshots.OptimisticalyReopenFolder() - allBorSnapshots.OptimisticalyReopenFolder() - } - allSnapshots.LogStat("remote") - allBorSnapshots.LogStat("bor:remote") blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots) txNumsReader := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, blockReader)) @@ -431,19 +423,28 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger if err != nil { return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, fmt.Errorf("create aggregator: %w", err) } + + // To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down + // Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection if allSegmentsDownloadComplete { + allSnapshots.OptimisticalyReopenFolder() + allBorSnapshots.OptimisticalyReopenFolder() + + allSnapshots.LogStat("remote") + allBorSnapshots.LogStat("bor:remote") _ = agg.OpenFolder() //TODO: must use analog of `OptimisticReopenWithDB` - } - db.View(context.Background(), func(tx kv.Tx) error { - aggTx := agg.BeginFilesRo() - defer aggTx.Close() - aggTx.LogStats(tx, func(endTxNumMinimax uint64) (uint64, error) { - _, histBlockNumProgress, err := txNumsReader.FindBlockNum(tx, endTxNumMinimax) - return histBlockNumProgress, err + db.View(context.Background(), func(tx kv.Tx) error { + aggTx := agg.BeginFilesRo() + defer aggTx.Close() + aggTx.LogStats(tx, func(endTxNumMinimax uint64) (uint64, error) { + _, histBlockNumProgress, err := txNumsReader.FindBlockNum(tx, endTxNumMinimax) + return histBlockNumProgress, err + }) + return nil }) - return nil - }) + } + onNewSnapshot = func() { go func() { // don't block events processing by network communication reply, err := remoteKvClient.Snapshots(ctx, &remote.SnapshotsRequest{}, grpc.WaitForReady(true)) @@ -462,7 +463,6 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger allBorSnapshots.LogStat("bor:reopen") } - //if err = agg.openList(reply.HistoryFiles, true); err != nil { if err = agg.OpenFolder(); err != nil { logger.Error("[snapshots] reopen", "err", err) } else { diff --git a/eth/backend.go b/eth/backend.go index d9d478cf0d6..50f9a68fbae 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -36,10 +36,10 @@ import ( "sync/atomic" "time" + "github.com/erigontech/erigon-lib/downloader/downloaderrawdb" "github.com/erigontech/mdbx-go/mdbx" lru "github.com/hashicorp/golang-lru/arc/v2" "github.com/holiman/uint256" - "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -1433,37 +1433,33 @@ func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConf } allSnapshots := freezeblocks.NewRoSnapshots(snConfig.Snapshot, dirs.Snap, minFrozenBlock, logger) - var allBorSnapshots *freezeblocks.BorRoSnapshots if isBor { allBorSnapshots = freezeblocks.NewBorRoSnapshots(snConfig.Snapshot, dirs.Snap, minFrozenBlock, logger) } - - g := &errgroup.Group{} - g.Go(func() error { - allSnapshots.OptimisticalyReopenFolder() - return nil - }) - g.Go(func() error { - if isBor { - allBorSnapshots.OptimisticalyReopenFolder() - } - return nil - }) - blockReader := freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots) + agg, err := libstate.NewAggregator(ctx, dirs, config3.HistoryV3AggregationStep, db, logger) if err != nil { return nil, nil, nil, nil, nil, err } agg.SetProduceMod(snConfig.Snapshot.ProduceE3) - g.Go(func() error { - return agg.OpenFolder() - }) - if err = g.Wait(); err != nil { + allSegmentsDownloadComplete, err := downloaderrawdb.AllSegmentsDownloadCompleteFlag(dirs) + if err != nil { return nil, nil, nil, nil, nil, err } + if !allSegmentsDownloadComplete { + log.Warn("[rpc] download of segments not complete yet (need wait, then RPC will work)") + } + + if allSegmentsDownloadComplete { + allSnapshots.OptimisticalyReopenFolder() + if isBor { + allBorSnapshots.OptimisticalyReopenFolder() + } + _ = agg.OpenFolder() + } blockWriter := blockio.NewBlockWriter() From 6db4f42e832aaa6b564a7194f771de6955dccd6b Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 17 Oct 2024 15:12:10 +0700 Subject: [PATCH 13/36] save --- erigon-lib/downloader/downloadercfg/downloadercfg.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/erigon-lib/downloader/downloadercfg/downloadercfg.go b/erigon-lib/downloader/downloadercfg/downloadercfg.go index 7172c6698b6..a6110fe4c8a 100644 --- a/erigon-lib/downloader/downloadercfg/downloadercfg.go +++ b/erigon-lib/downloader/downloadercfg/downloadercfg.go @@ -237,7 +237,6 @@ func ReadPreverifiedToml(dirs datadir.Dirs, chainName string) *snapcfg.Cfg { preverifiedToml := filepath.Join(dirs.Snap, "preverified.toml") exists, err := dir.FileExist(preverifiedToml) if err != nil { - panic(err) return nil } if !exists { @@ -246,7 +245,6 @@ func ReadPreverifiedToml(dirs datadir.Dirs, chainName string) *snapcfg.Cfg { // Read the preverified.toml and load the snapshots haveToml, err := os.ReadFile(preverifiedToml) if err != nil { - panic(err) return nil } snapcfg.SetToml(chainName, haveToml) From 17262181a0bdfca14635cc03c31c9af4da0c8519 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 17 Oct 2024 15:12:24 +0700 Subject: [PATCH 14/36] save --- erigon-lib/downloader/downloadercfg/downloadercfg.go | 1 - 1 file changed, 1 deletion(-) diff --git a/erigon-lib/downloader/downloadercfg/downloadercfg.go b/erigon-lib/downloader/downloadercfg/downloadercfg.go index a6110fe4c8a..1aff69c55e3 100644 --- a/erigon-lib/downloader/downloadercfg/downloadercfg.go +++ b/erigon-lib/downloader/downloadercfg/downloadercfg.go @@ -257,7 +257,6 @@ func loadSnapshotsEitherFromDiskIfNeeded(dirs datadir.Dirs, chainName string) (* } preverifiedToml := filepath.Join(dirs.Snap, "preverified.toml") if err := dir.WriteFileWithFsync(preverifiedToml, snapcfg.GetToml(chainName), 0644); err != nil { - panic(err) return nil, err } return snapcfg.KnownCfg(preverifiedToml), nil From 690ae7b08007149aa01f3f2fc7dbf601fb7718cf Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 17 Oct 2024 15:31:27 +0700 Subject: [PATCH 15/36] save --- erigon-lib/downloader/downloader.go | 1 - 1 file changed, 1 deletion(-) diff --git a/erigon-lib/downloader/downloader.go b/erigon-lib/downloader/downloader.go index c39bf51a021..11699fc4bf4 100644 --- a/erigon-lib/downloader/downloader.go +++ b/erigon-lib/downloader/downloader.go @@ -2294,7 +2294,6 @@ func (d *Downloader) ReCalcStats(interval time.Duration) { d.lock.Unlock() if stats.Completed { - logger.Warn("[dbg] completed", "completed", stats.Completed, "len", len(torrents)) d.saveAllCompleteFlag() } From c1071e2fde78c3e11e923d763522b3d729299557 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 17 Oct 2024 15:55:24 +0700 Subject: [PATCH 16/36] save --- erigon-lib/downloader/downloader.go | 2 +- erigon-lib/downloader/util.go | 2 +- eth/backend.go | 7 +++---- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/erigon-lib/downloader/downloader.go b/erigon-lib/downloader/downloader.go index 11699fc4bf4..3dec0022e2c 100644 --- a/erigon-lib/downloader/downloader.go +++ b/erigon-lib/downloader/downloader.go @@ -45,7 +45,6 @@ import ( "github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/types/infohash" "github.com/c2h5oh/datasize" - "github.com/erigontech/erigon-lib/downloader/downloaderrawdb" "github.com/tidwall/btree" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" @@ -58,6 +57,7 @@ import ( "github.com/erigontech/erigon-lib/common/dir" "github.com/erigontech/erigon-lib/diagnostics" "github.com/erigontech/erigon-lib/downloader/downloadercfg" + "github.com/erigontech/erigon-lib/downloader/downloaderrawdb" "github.com/erigontech/erigon-lib/downloader/snaptype" prototypes "github.com/erigontech/erigon-lib/gointerfaces/typesproto" "github.com/erigontech/erigon-lib/kv" diff --git a/erigon-lib/downloader/util.go b/erigon-lib/downloader/util.go index 5be5b7901aa..5dc141ddbca 100644 --- a/erigon-lib/downloader/util.go +++ b/erigon-lib/downloader/util.go @@ -33,7 +33,6 @@ import ( "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/metainfo" - "github.com/erigontech/erigon-lib/downloader/downloaderrawdb" "golang.org/x/sync/errgroup" "github.com/erigontech/erigon-lib/chain/snapcfg" @@ -42,6 +41,7 @@ import ( "github.com/erigontech/erigon-lib/common/dbg" dir2 "github.com/erigontech/erigon-lib/common/dir" "github.com/erigontech/erigon-lib/downloader/downloadercfg" + "github.com/erigontech/erigon-lib/downloader/downloaderrawdb" "github.com/erigontech/erigon-lib/downloader/snaptype" "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon-lib/log/v3" diff --git a/eth/backend.go b/eth/backend.go index 44dc5993f5a..6ef6356812d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -36,7 +36,6 @@ import ( "sync/atomic" "time" - "github.com/erigontech/erigon-lib/downloader/downloaderrawdb" "github.com/erigontech/mdbx-go/mdbx" lru "github.com/hashicorp/golang-lru/arc/v2" "github.com/holiman/uint256" @@ -45,22 +44,22 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/protobuf/types/known/emptypb" - "github.com/erigontech/erigon-lib/common/dir" - "github.com/erigontech/erigon-lib/config3" - "github.com/erigontech/erigon-lib/chain" "github.com/erigontech/erigon-lib/chain/networkname" "github.com/erigontech/erigon-lib/chain/snapcfg" libcommon "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/common/datadir" "github.com/erigontech/erigon-lib/common/dbg" + "github.com/erigontech/erigon-lib/common/dir" "github.com/erigontech/erigon-lib/common/disk" "github.com/erigontech/erigon-lib/common/mem" + "github.com/erigontech/erigon-lib/config3" "github.com/erigontech/erigon-lib/diagnostics" "github.com/erigontech/erigon-lib/direct" "github.com/erigontech/erigon-lib/downloader" "github.com/erigontech/erigon-lib/downloader/downloadercfg" "github.com/erigontech/erigon-lib/downloader/downloadergrpc" + "github.com/erigontech/erigon-lib/downloader/downloaderrawdb" "github.com/erigontech/erigon-lib/downloader/snaptype" protodownloader "github.com/erigontech/erigon-lib/gointerfaces/downloaderproto" "github.com/erigontech/erigon-lib/gointerfaces/grpcutil" From 1f962b11e543d8f2b5a23a8057c119c01459d065 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 17 Oct 2024 15:57:55 +0700 Subject: [PATCH 17/36] save --- cmd/rpcdaemon/cli/config.go | 15 ++++++--------- eth/backend.go | 6 ++---- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 7efb31fff15..6726ed43565 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -406,14 +406,6 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, errors.New("chain config not found in db. Need start erigon at least once on this db") } - allSegmentsDownloadComplete, err := downloaderrawdb.AllSegmentsDownloadCompleteFlag(cfg.Dirs) - if err != nil { - return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err - } - if !allSegmentsDownloadComplete { - log.Warn("[rpc] download of segments not complete yet (need wait, then RPC will work)") - } - // Configure sapshots allSnapshots = freezeblocks.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger) allBorSnapshots = freezeblocks.NewBorRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger) @@ -424,9 +416,12 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger if err != nil { return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, fmt.Errorf("create aggregator: %w", err) } - // To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down // Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection + allSegmentsDownloadComplete, err := downloaderrawdb.AllSegmentsDownloadCompleteFlag(cfg.Dirs) + if err != nil { + return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err + } if allSegmentsDownloadComplete { allSnapshots.OptimisticalyReopenFolder() allBorSnapshots.OptimisticalyReopenFolder() @@ -444,6 +439,8 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger }) return nil }) + } else { + log.Warn("[rpc] download of segments not complete yet (need wait, then RPC will work)") } onNewSnapshot = func() { diff --git a/eth/backend.go b/eth/backend.go index 6ef6356812d..dfefb12794e 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -1448,16 +1448,14 @@ func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConf if err != nil { return nil, nil, nil, nil, nil, err } - if !allSegmentsDownloadComplete { - log.Warn("[rpc] download of segments not complete yet (need wait, then RPC will work)") - } - if allSegmentsDownloadComplete { allSnapshots.OptimisticalyReopenFolder() if isBor { allBorSnapshots.OptimisticalyReopenFolder() } _ = agg.OpenFolder() + } else { + log.Warn("[rpc] download of segments not complete yet (need wait, then RPC will work)") } blockWriter := blockio.NewBlockWriter() From e47c663f78400383870cc9053100e6ecc6879d58 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 17 Oct 2024 16:04:33 +0700 Subject: [PATCH 18/36] save --- .../downloader/downloaderrawdb/accessors_downloader.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go index ee29513c96c..4994d08a852 100644 --- a/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go +++ b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go @@ -8,6 +8,7 @@ import ( "time" "github.com/erigontech/erigon-lib/common/datadir" + "github.com/erigontech/erigon-lib/common/dir" "github.com/erigontech/erigon-lib/kv" kv2 "github.com/erigontech/erigon-lib/kv/mdbx" "github.com/erigontech/erigon-lib/log/v3" @@ -78,6 +79,10 @@ func CheckFileComplete(tx kv.Tx, name string, snapDir string) (bool, int64, *tim } func AllSegmentsDownloadCompleteFlag(dirs datadir.Dirs) (allFilesDownloadComplete bool, err error) { + if exists, err := dir.FileExist(filepath.Join(dirs.Downloader, "mdbx.dat")); err != nil || !exists { + return false, err + } + limiter := semaphore.NewWeighted(9_000) downloaderDB, err := kv2.NewMDBX(log.Root()).Label(kv.DownloaderDB).WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TablesCfgByLabel(kv.DownloaderDB) From 0678cee712d2630bfa08cd16ce62ae9aa49cd129 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 17 Oct 2024 16:49:54 +0700 Subject: [PATCH 19/36] save --- cmd/rpcdaemon/cli/config.go | 2 +- erigon-lib/downloader/downloader.go | 2 +- .../downloaderrawdb/accessors_downloader.go | 52 ++++++++++++------ eth/backend.go | 2 +- migrations/download_complete.go | 54 +++++++++++++++++++ migrations/migrations.go | 1 + 6 files changed, 94 insertions(+), 19 deletions(-) create mode 100644 migrations/download_complete.go diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 6726ed43565..8482016ddd6 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -418,7 +418,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger } // To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down // Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection - allSegmentsDownloadComplete, err := downloaderrawdb.AllSegmentsDownloadCompleteFlag(cfg.Dirs) + allSegmentsDownloadComplete, err := downloaderrawdb.ReadSegmentsDownloadCompleteWithoutDB(cfg.Dirs) if err != nil { return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err } diff --git a/erigon-lib/downloader/downloader.go b/erigon-lib/downloader/downloader.go index 3dec0022e2c..6abccb513f3 100644 --- a/erigon-lib/downloader/downloader.go +++ b/erigon-lib/downloader/downloader.go @@ -2938,7 +2938,7 @@ func (d *Downloader) Completed() bool { } func (d *Downloader) saveAllCompleteFlag() { - if err := d.db.Update(d.ctx, downloaderrawdb.WriteAllCompleteFlag); err != nil { + if err := d.db.Update(d.ctx, downloaderrawdb.WriteSegmentsDownloadComplete); err != nil { d.logger.Debug("[snapshots] Can't update 'all complete' flag", "err", err) } } diff --git a/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go index 4994d08a852..9a36bbd4032 100644 --- a/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go +++ b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go @@ -78,41 +78,61 @@ func CheckFileComplete(tx kv.Tx, name string, snapDir string) (bool, int64, *tim return false, 0, nil } -func AllSegmentsDownloadCompleteFlag(dirs datadir.Dirs) (allFilesDownloadComplete bool, err error) { - if exists, err := dir.FileExist(filepath.Join(dirs.Downloader, "mdbx.dat")); err != nil || !exists { +var AllCompleteFlagKey = []byte("all_complete") + +func WriteSegmentsDownloadComplete(tx kv.RwTx) error { + return tx.Put(kv.BittorrentInfo, AllCompleteFlagKey, []byte{1}) +} +func ReadSegmentsDownloadComplete(tx kv.Tx) (bool, error) { + v, err := tx.GetOne(kv.BittorrentInfo, AllCompleteFlagKey) + if err != nil { return false, err } + if len(v) == 0 { + return false, nil + } + return v[0] == 1, nil +} + +func openDB(dirs datadir.Dirs) (db kv.RwDB, exists bool, err error) { + if exists, err := dir.FileExist(filepath.Join(dirs.Downloader, "mdbx.dat")); err != nil || !exists { + return nil, false, err + } limiter := semaphore.NewWeighted(9_000) downloaderDB, err := kv2.NewMDBX(log.Root()).Label(kv.DownloaderDB).WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TablesCfgByLabel(kv.DownloaderDB) }).RoTxsLimiter(limiter).Path(dirs.Downloader).Accede().Open(context.Background()) if err != nil { + return nil, false, err + } + return downloaderDB, true, nil +} + +func ReadSegmentsDownloadCompleteWithoutDB(dirs datadir.Dirs) (allFilesDownloadComplete bool, err error) { + downloaderDB, exists, err := openDB(dirs) + if err != nil || !exists { return false, err } defer downloaderDB.Close() if err := downloaderDB.View(context.Background(), func(tx kv.Tx) error { - allFilesDownloadComplete, err = ReadAllCompleteFlag(tx) + allFilesDownloadComplete, err = ReadSegmentsDownloadComplete(tx) return err }); err != nil { return false, err } return allFilesDownloadComplete, nil } - -var AllCompleteFlagKey = []byte("all_complete") - -func WriteAllCompleteFlag(tx kv.RwTx) error { - return tx.Put(kv.BittorrentInfo, AllCompleteFlagKey, []byte{1}) -} -func ReadAllCompleteFlag(tx kv.Tx) (bool, error) { - v, err := tx.GetOne(kv.BittorrentInfo, AllCompleteFlagKey) - if err != nil { - return false, err +func WriteSegmentsDownloadCompleteWithoutDB(dirs datadir.Dirs) (err error) { + downloaderDB, exists, err := openDB(dirs) + if err != nil || !exists { + return err } - if len(v) == 0 { - return false, nil + defer downloaderDB.Close() + + if err := downloaderDB.Update(context.Background(), WriteSegmentsDownloadComplete); err != nil { + return err } - return v[0] == 1, nil + return nil } diff --git a/eth/backend.go b/eth/backend.go index dfefb12794e..f5982690d75 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -1444,7 +1444,7 @@ func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConf } agg.SetProduceMod(snConfig.Snapshot.ProduceE3) - allSegmentsDownloadComplete, err := downloaderrawdb.AllSegmentsDownloadCompleteFlag(dirs) + allSegmentsDownloadComplete, err := downloaderrawdb.ReadSegmentsDownloadCompleteWithoutDB(dirs) if err != nil { return nil, nil, nil, nil, nil, err } diff --git a/migrations/download_complete.go b/migrations/download_complete.go new file mode 100644 index 00000000000..d5dff000da8 --- /dev/null +++ b/migrations/download_complete.go @@ -0,0 +1,54 @@ +// Copyright 2024 The Erigon Authors +// This file is part of Erigon. +// +// Erigon is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Erigon is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with Erigon. If not, see . + +package migrations + +import ( + "context" + + "github.com/erigontech/erigon-lib/common/datadir" + "github.com/erigontech/erigon-lib/downloader/downloaderrawdb" + "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon-lib/log/v3" + "github.com/erigontech/erigon/eth/stagedsync/stages" +) + +var DownloadComplete = Migration{ + Name: "download_complete", + Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) (err error) { + tx, err := db.BeginRw(context.Background()) + if err != nil { + return err + } + defer tx.Rollback() + + snapshotsStageProgress, err := stages.GetStageProgress(tx, stages.Snapshots) + if err != nil { + return err + } + if snapshotsStageProgress > 0 { + if err := downloaderrawdb.WriteSegmentsDownloadCompleteWithoutDB(dirs); err != nil { + return err + } + } + + // This migration is no-op, but it forces the migration mechanism to apply it and thus write the DB schema version info + if err := BeforeCommit(tx, nil, true); err != nil { + return err + } + return tx.Commit() + }, +} diff --git a/migrations/migrations.go b/migrations/migrations.go index 211d77f860e..296bb091b6d 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -55,6 +55,7 @@ var migrations = map[kv.Label][]Migration{ ProhibitNewDownloadsLock, ProhibitNewDownloadsLock2, ClearBorTables, + DownloadComplete, }, kv.TxPoolDB: {}, kv.SentryDB: {}, From 9a699046ad8c9a04c08e60db5ba72a1d74336d5d Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 17 Oct 2024 16:50:33 +0700 Subject: [PATCH 20/36] save --- migrations/download_complete.go | 4 ++-- migrations/migrations.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/migrations/download_complete.go b/migrations/download_complete.go index d5dff000da8..2401e99bc41 100644 --- a/migrations/download_complete.go +++ b/migrations/download_complete.go @@ -26,8 +26,8 @@ import ( "github.com/erigontech/erigon/eth/stagedsync/stages" ) -var DownloadComplete = Migration{ - Name: "download_complete", +var SegmentsDownloadComplete = Migration{ + Name: "segments_download_complete", Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) (err error) { tx, err := db.BeginRw(context.Background()) if err != nil { diff --git a/migrations/migrations.go b/migrations/migrations.go index 296bb091b6d..eec8d5af9cb 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -55,7 +55,7 @@ var migrations = map[kv.Label][]Migration{ ProhibitNewDownloadsLock, ProhibitNewDownloadsLock2, ClearBorTables, - DownloadComplete, + SegmentsDownloadComplete, }, kv.TxPoolDB: {}, kv.SentryDB: {}, From 80586ac969dbd7ab85063c8625172a65d89c54a2 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 17 Oct 2024 21:59:21 +0700 Subject: [PATCH 21/36] save --- .../freezeblocks/block_snapshots.go | 41 ++++++++++--------- .../freezeblocks/block_snapshots_test.go | 4 ++ 2 files changed, 26 insertions(+), 19 deletions(-) diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index aabeb4d88e7..ae955dfb976 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -276,12 +276,15 @@ func (s *DirtySegment) isSubSetOf(j *DirtySegment) bool { } func (s *DirtySegment) reopenSeg(dir string) (err error) { - if s.refcount.Load() == 0 { - s.closeSeg() - s.Decompressor, err = seg.NewDecompressor(filepath.Join(dir, s.FileName())) - if err != nil { - return fmt.Errorf("%w, fileName: %s", err, s.FileName()) - } + if s.refcount.Load() != 0 { + return + } + if s.Decompressor != nil { + return + } + s.Decompressor, err = seg.NewDecompressor(filepath.Join(dir, s.FileName())) + if err != nil { + return fmt.Errorf("%w, fileName: %s", err, s.FileName()) } return nil } @@ -334,12 +337,7 @@ func (s *DirtySegment) openFiles() []string { } func (s *DirtySegment) reopenIdxIfNeed(dir string, optimistic bool) (err error) { - if len(s.Type().IdxFileNames(s.version, s.from, s.to)) == 0 { - return nil - } - err = s.reopenIdx(dir) - if err != nil { if !errors.Is(err, os.ErrNotExist) { if optimistic { @@ -354,23 +352,26 @@ func (s *DirtySegment) reopenIdxIfNeed(dir string, optimistic bool) (err error) } func (s *DirtySegment) reopenIdx(dir string) (err error) { - if s.refcount.Load() > 0 { + if s.Decompressor == nil { return nil } - - s.closeIdx() - if s.Decompressor == nil { + if s.refcount.Load() > 0 { return nil } - for _, fileName := range s.Type().IdxFileNames(s.version, s.from, s.to) { - index, err := recsplit.OpenIndex(filepath.Join(dir, fileName)) + for i, fileName := range s.Type().IdxFileNames(s.version, s.from, s.to) { + if len(s.indexes) <= i { + s.indexes = append(s.indexes, nil) + } + if s.indexes[i] != nil { + continue + } + index, err := recsplit.OpenIndex(filepath.Join(dir, fileName)) if err != nil { return fmt.Errorf("%w, fileName: %s", err, fileName) } - - s.indexes = append(s.indexes, index) + s.indexes[i] = index } return nil @@ -852,6 +853,8 @@ func (s *RoSnapshots) Ranges() []Range { func (s *RoSnapshots) OptimisticalyReopenFolder() { _ = s.ReopenFolder() } func (s *RoSnapshots) ReopenFolder() error { + defer func(t time.Time) { fmt.Printf("block_snapshots.go:861: %s\n", time.Since(t)) }(time.Now()) + defer s.recalcVisibleFiles() s.dirtySegmentsLock.Lock() diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots_test.go b/turbo/snapshotsync/freezeblocks/block_snapshots_test.go index 5350ed8032e..4763967c9d8 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots_test.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots_test.go @@ -18,6 +18,7 @@ package freezeblocks import ( "context" + "fmt" "os" "path/filepath" "testing" @@ -217,6 +218,9 @@ func TestMergeSnapshots(t *testing.T) { s := NewRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.MainnetChainName}, dir, 0, logger) defer s.Close() require.NoError(s.ReopenFolder()) + fmt.Printf("[dbg] 1\n") + require.NoError(s.ReopenFolder()) + fmt.Printf("[dbg] 2\n") { merger := NewMerger(dir, 1, log.LvlInfo, nil, params.MainnetChainConfig, logger) merger.DisableFsync() From 678ee95b34166d9fd2f2cf974c231ee62d17cc92 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Fri, 18 Oct 2024 09:48:06 +0700 Subject: [PATCH 22/36] save --- cmd/rpcdaemon/cli/config.go | 10 ++- erigon-lib/downloader/downloader.go | 10 --- .../downloaderrawdb/accessors_downloader.go | 65 ------------------- migrations/download_complete.go | 54 --------------- migrations/migrations.go | 1 - .../freezeblocks/block_snapshots.go | 10 ++- .../freezeblocks/block_snapshots_test.go | 4 -- .../freezeblocks/bor_snapshots.go | 4 +- 8 files changed, 13 insertions(+), 145 deletions(-) delete mode 100644 migrations/download_complete.go diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 8482016ddd6..d14e893c0db 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -30,6 +30,7 @@ import ( "strings" "time" + "github.com/erigontech/erigon/eth/stagedsync/stages" "github.com/spf13/cobra" "golang.org/x/sync/semaphore" "google.golang.org/grpc" @@ -42,7 +43,6 @@ import ( "github.com/erigontech/erigon-lib/common/hexutility" "github.com/erigontech/erigon-lib/config3" "github.com/erigontech/erigon-lib/direct" - "github.com/erigontech/erigon-lib/downloader/downloaderrawdb" "github.com/erigontech/erigon-lib/gointerfaces" "github.com/erigontech/erigon-lib/gointerfaces/grpcutil" remote "github.com/erigontech/erigon-lib/gointerfaces/remoteproto" @@ -418,8 +418,12 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger } // To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down // Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection - allSegmentsDownloadComplete, err := downloaderrawdb.ReadSegmentsDownloadCompleteWithoutDB(cfg.Dirs) - if err != nil { + var allSegmentsDownloadComplete bool + if err := rwKv.View(ctx, func(tx kv.Tx) error { + snapshotsStageProgress, err := stages.GetStageProgress(tx, stages.Snapshots) + allSegmentsDownloadComplete = snapshotsStageProgress > 0 + return err + }); err != nil { return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err } if allSegmentsDownloadComplete { diff --git a/erigon-lib/downloader/downloader.go b/erigon-lib/downloader/downloader.go index 6abccb513f3..9e6d3373cee 100644 --- a/erigon-lib/downloader/downloader.go +++ b/erigon-lib/downloader/downloader.go @@ -2293,10 +2293,6 @@ func (d *Downloader) ReCalcStats(interval time.Duration) { d.lock.Unlock() - if stats.Completed { - d.saveAllCompleteFlag() - } - if !stats.Completed { logger.Debug("[snapshots] downloading", "len", len(torrents), @@ -2937,12 +2933,6 @@ func (d *Downloader) Completed() bool { return d.stats.Completed } -func (d *Downloader) saveAllCompleteFlag() { - if err := d.db.Update(d.ctx, downloaderrawdb.WriteSegmentsDownloadComplete); err != nil { - d.logger.Debug("[snapshots] Can't update 'all complete' flag", "err", err) - } -} - // Store completed torrents in order to notify GrpcServer subscribers when they subscribe and there is already downloaded files func (d *Downloader) torrentCompleted(tName string, tHash metainfo.Hash) { d.lock.Lock() diff --git a/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go index 9a36bbd4032..1e5c9ea0246 100644 --- a/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go +++ b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go @@ -1,18 +1,12 @@ package downloaderrawdb import ( - "context" "encoding/json" "os" "path/filepath" "time" - "github.com/erigontech/erigon-lib/common/datadir" - "github.com/erigontech/erigon-lib/common/dir" "github.com/erigontech/erigon-lib/kv" - kv2 "github.com/erigontech/erigon-lib/kv/mdbx" - "github.com/erigontech/erigon-lib/log/v3" - "golang.org/x/sync/semaphore" ) type TorrentInfo struct { @@ -77,62 +71,3 @@ func CheckFileComplete(tx kv.Tx, name string, snapDir string) (bool, int64, *tim } return false, 0, nil } - -var AllCompleteFlagKey = []byte("all_complete") - -func WriteSegmentsDownloadComplete(tx kv.RwTx) error { - return tx.Put(kv.BittorrentInfo, AllCompleteFlagKey, []byte{1}) -} -func ReadSegmentsDownloadComplete(tx kv.Tx) (bool, error) { - v, err := tx.GetOne(kv.BittorrentInfo, AllCompleteFlagKey) - if err != nil { - return false, err - } - if len(v) == 0 { - return false, nil - } - return v[0] == 1, nil -} - -func openDB(dirs datadir.Dirs) (db kv.RwDB, exists bool, err error) { - if exists, err := dir.FileExist(filepath.Join(dirs.Downloader, "mdbx.dat")); err != nil || !exists { - return nil, false, err - } - - limiter := semaphore.NewWeighted(9_000) - downloaderDB, err := kv2.NewMDBX(log.Root()).Label(kv.DownloaderDB).WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { - return kv.TablesCfgByLabel(kv.DownloaderDB) - }).RoTxsLimiter(limiter).Path(dirs.Downloader).Accede().Open(context.Background()) - if err != nil { - return nil, false, err - } - return downloaderDB, true, nil -} - -func ReadSegmentsDownloadCompleteWithoutDB(dirs datadir.Dirs) (allFilesDownloadComplete bool, err error) { - downloaderDB, exists, err := openDB(dirs) - if err != nil || !exists { - return false, err - } - defer downloaderDB.Close() - - if err := downloaderDB.View(context.Background(), func(tx kv.Tx) error { - allFilesDownloadComplete, err = ReadSegmentsDownloadComplete(tx) - return err - }); err != nil { - return false, err - } - return allFilesDownloadComplete, nil -} -func WriteSegmentsDownloadCompleteWithoutDB(dirs datadir.Dirs) (err error) { - downloaderDB, exists, err := openDB(dirs) - if err != nil || !exists { - return err - } - defer downloaderDB.Close() - - if err := downloaderDB.Update(context.Background(), WriteSegmentsDownloadComplete); err != nil { - return err - } - return nil -} diff --git a/migrations/download_complete.go b/migrations/download_complete.go deleted file mode 100644 index 2401e99bc41..00000000000 --- a/migrations/download_complete.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2024 The Erigon Authors -// This file is part of Erigon. -// -// Erigon is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Erigon is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with Erigon. If not, see . - -package migrations - -import ( - "context" - - "github.com/erigontech/erigon-lib/common/datadir" - "github.com/erigontech/erigon-lib/downloader/downloaderrawdb" - "github.com/erigontech/erigon-lib/kv" - "github.com/erigontech/erigon-lib/log/v3" - "github.com/erigontech/erigon/eth/stagedsync/stages" -) - -var SegmentsDownloadComplete = Migration{ - Name: "segments_download_complete", - Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) (err error) { - tx, err := db.BeginRw(context.Background()) - if err != nil { - return err - } - defer tx.Rollback() - - snapshotsStageProgress, err := stages.GetStageProgress(tx, stages.Snapshots) - if err != nil { - return err - } - if snapshotsStageProgress > 0 { - if err := downloaderrawdb.WriteSegmentsDownloadCompleteWithoutDB(dirs); err != nil { - return err - } - } - - // This migration is no-op, but it forces the migration mechanism to apply it and thus write the DB schema version info - if err := BeforeCommit(tx, nil, true); err != nil { - return err - } - return tx.Commit() - }, -} diff --git a/migrations/migrations.go b/migrations/migrations.go index eec8d5af9cb..211d77f860e 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -55,7 +55,6 @@ var migrations = map[kv.Label][]Migration{ ProhibitNewDownloadsLock, ProhibitNewDownloadsLock2, ClearBorTables, - SegmentsDownloadComplete, }, kv.TxPoolDB: {}, kv.SentryDB: {}, diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index ae955dfb976..f49b561c5fd 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -853,14 +853,12 @@ func (s *RoSnapshots) Ranges() []Range { func (s *RoSnapshots) OptimisticalyReopenFolder() { _ = s.ReopenFolder() } func (s *RoSnapshots) ReopenFolder() error { - defer func(t time.Time) { fmt.Printf("block_snapshots.go:861: %s\n", time.Since(t)) }(time.Now()) - defer s.recalcVisibleFiles() s.dirtySegmentsLock.Lock() defer s.dirtySegmentsLock.Unlock() - files, _, err := typedSegments(s.dir, s.segmentsMin.Load(), s.Types(), false) + files, _, err := typedSegments(s.dir, s.Types(), false) if err != nil { return err } @@ -883,7 +881,7 @@ func (s *RoSnapshots) ReopenSegments(types []snaptype.Type, allowGaps bool) erro s.dirtySegmentsLock.Lock() defer s.dirtySegmentsLock.Unlock() - files, _, err := typedSegments(s.dir, s.segmentsMin.Load(), types, allowGaps) + files, _, err := typedSegments(s.dir, types, allowGaps) if err != nil { return err @@ -1305,10 +1303,10 @@ func SegmentsCaplin(dir string, minBlock uint64) (res []snaptype.FileInfo, missi } func Segments(dir string, minBlock uint64) (res []snaptype.FileInfo, missingSnapshots []Range, err error) { - return typedSegments(dir, minBlock, coresnaptype.BlockSnapshotTypes, true) + return typedSegments(dir, coresnaptype.BlockSnapshotTypes, true) } -func typedSegments(dir string, minBlock uint64, types []snaptype.Type, allowGaps bool) (res []snaptype.FileInfo, missingSnapshots []Range, err error) { +func typedSegments(dir string, types []snaptype.Type, allowGaps bool) (res []snaptype.FileInfo, missingSnapshots []Range, err error) { segmentsTypeCheck := func(dir string, in []snaptype.FileInfo) (res []snaptype.FileInfo) { return typeOfSegmentsMustExist(dir, in, types) } diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots_test.go b/turbo/snapshotsync/freezeblocks/block_snapshots_test.go index 4763967c9d8..5350ed8032e 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots_test.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots_test.go @@ -18,7 +18,6 @@ package freezeblocks import ( "context" - "fmt" "os" "path/filepath" "testing" @@ -218,9 +217,6 @@ func TestMergeSnapshots(t *testing.T) { s := NewRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.MainnetChainName}, dir, 0, logger) defer s.Close() require.NoError(s.ReopenFolder()) - fmt.Printf("[dbg] 1\n") - require.NoError(s.ReopenFolder()) - fmt.Printf("[dbg] 2\n") { merger := NewMerger(dir, 1, log.LvlInfo, nil, params.MainnetChainConfig, logger) merger.DisableFsync() diff --git a/turbo/snapshotsync/freezeblocks/bor_snapshots.go b/turbo/snapshotsync/freezeblocks/bor_snapshots.go index fde041e8d7f..5ba8e4d2957 100644 --- a/turbo/snapshotsync/freezeblocks/bor_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/bor_snapshots.go @@ -136,7 +136,7 @@ func (br *BlockRetire) retireBorBlocks(ctx context.Context, minBlockNum uint64, } { - files, _, err := typedSegments(br.borSnapshots().dir, br.borSnapshots().segmentsMin.Load(), borsnaptype.BorSnapshotTypes(), false) + files, _, err := typedSegments(br.borSnapshots().dir, borsnaptype.BorSnapshotTypes(), false) if err != nil { return blocksRetired, err } @@ -239,7 +239,7 @@ func removeBorOverlaps(dir string, active []snaptype.FileInfo, max uint64) { } func (s *BorRoSnapshots) ReopenFolder() error { - files, _, err := typedSegments(s.dir, s.segmentsMin.Load(), borsnaptype.BorSnapshotTypes(), false) + files, _, err := typedSegments(s.dir, borsnaptype.BorSnapshotTypes(), false) if err != nil { return err } From 29ba71d51e760c724da4d5e284d342a927c441af Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Fri, 18 Oct 2024 09:52:57 +0700 Subject: [PATCH 23/36] save --- cmd/rpcdaemon/cli/config.go | 9 ++------- core/rawdb/accessors_chain.go | 4 +--- core/rawdb/accessors_metadata.go | 19 ++++++++++++++----- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index d14e893c0db..eea722b0c20 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -30,7 +30,6 @@ import ( "strings" "time" - "github.com/erigontech/erigon/eth/stagedsync/stages" "github.com/spf13/cobra" "golang.org/x/sync/semaphore" "google.golang.org/grpc" @@ -418,12 +417,8 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger } // To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down // Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection - var allSegmentsDownloadComplete bool - if err := rwKv.View(ctx, func(tx kv.Tx) error { - snapshotsStageProgress, err := stages.GetStageProgress(tx, stages.Snapshots) - allSegmentsDownloadComplete = snapshotsStageProgress > 0 - return err - }); err != nil { + allSegmentsDownloadComplete, err := rawdb.AllSegmentsDownloadCompleteFromDB(rwKv) + if err != nil { return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err } if allSegmentsDownloadComplete { diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index edebdfe5f4a..ea8deff567c 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -30,8 +30,6 @@ import ( "github.com/gballet/go-verkle" - "github.com/erigontech/erigon-lib/log/v3" - "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/common/dbg" "github.com/erigontech/erigon-lib/common/hexutility" @@ -39,7 +37,7 @@ import ( "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon-lib/kv/dbutils" "github.com/erigontech/erigon-lib/kv/rawdbv3" - + "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon/core/types" "github.com/erigontech/erigon/ethdb/cbor" "github.com/erigontech/erigon/rlp" diff --git a/core/rawdb/accessors_metadata.go b/core/rawdb/accessors_metadata.go index d8e7ab6d65b..fe31e55e1a9 100644 --- a/core/rawdb/accessors_metadata.go +++ b/core/rawdb/accessors_metadata.go @@ -20,10 +20,12 @@ package rawdb import ( + "context" "encoding/json" "fmt" "github.com/erigontech/erigon/core/types" + "github.com/erigontech/erigon/eth/stagedsync/stages" "github.com/erigontech/erigon/polygon/bor/borcfg" "github.com/erigontech/erigon-lib/chain" @@ -81,11 +83,6 @@ func WriteChainConfig(db kv.Putter, hash libcommon.Hash, cfg *chain.Config) erro return nil } -// DeleteChainConfig retrieves the consensus settings based on the given genesis hash. -func DeleteChainConfig(db kv.Deleter, hash libcommon.Hash) error { - return db.Delete(kv.ConfigTable, hash[:]) -} - func WriteGenesisIfNotExist(db kv.RwTx, g *types.Genesis) error { has, err := db.Has(kv.ConfigTable, kv.GenesisKey) if err != nil { @@ -117,3 +114,15 @@ func ReadGenesis(db kv.Getter) (*types.Genesis, error) { } return &g, nil } + +func AllSegmentsDownloadComplete(tx kv.Getter) (allSegmentsDownloadComplete bool, err error) { + snapshotsStageProgress, err := stages.GetStageProgress(tx, stages.Snapshots) + return snapshotsStageProgress > 0, err +} +func AllSegmentsDownloadCompleteFromDB(db kv.RoDB) (allSegmentsDownloadComplete bool, err error) { + err = db.View(context.Background(), func(tx kv.Tx) error { + allSegmentsDownloadComplete, err = AllSegmentsDownloadComplete(tx) + return err + }) + return allSegmentsDownloadComplete, err +} From 7c00ba50365f2ac7996b95afae0418ff3cac9a5a Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Fri, 18 Oct 2024 09:53:30 +0700 Subject: [PATCH 24/36] save --- eth/backend.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index f5982690d75..17e7fb94373 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -59,7 +59,6 @@ import ( "github.com/erigontech/erigon-lib/downloader" "github.com/erigontech/erigon-lib/downloader/downloadercfg" "github.com/erigontech/erigon-lib/downloader/downloadergrpc" - "github.com/erigontech/erigon-lib/downloader/downloaderrawdb" "github.com/erigontech/erigon-lib/downloader/snaptype" protodownloader "github.com/erigontech/erigon-lib/gointerfaces/downloaderproto" "github.com/erigontech/erigon-lib/gointerfaces/grpcutil" @@ -1444,7 +1443,7 @@ func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConf } agg.SetProduceMod(snConfig.Snapshot.ProduceE3) - allSegmentsDownloadComplete, err := downloaderrawdb.ReadSegmentsDownloadCompleteWithoutDB(dirs) + allSegmentsDownloadComplete, err := rawdb.AllSegmentsDownloadCompleteFromDB(db) if err != nil { return nil, nil, nil, nil, nil, err } From e7c06348538931df100476b79a81df8fcf5f5032 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Fri, 18 Oct 2024 10:22:16 +0700 Subject: [PATCH 25/36] save --- .../downloader/downloadercfg/downloadercfg.go | 28 +++++++------------ eth/backend.go | 1 - eth/stagedsync/stage_snapshots.go | 10 +++---- .../freezeblocks/block_snapshots.go | 18 ++++++------ 4 files changed, 25 insertions(+), 32 deletions(-) diff --git a/erigon-lib/downloader/downloadercfg/downloadercfg.go b/erigon-lib/downloader/downloadercfg/downloadercfg.go index 1aff69c55e3..fb6e6e78d3a 100644 --- a/erigon-lib/downloader/downloadercfg/downloadercfg.go +++ b/erigon-lib/downloader/downloadercfg/downloadercfg.go @@ -233,29 +233,21 @@ func New(dirs datadir.Dirs, version string, verbosity lg.Level, downloadRate, up }, nil } -func ReadPreverifiedToml(dirs datadir.Dirs, chainName string) *snapcfg.Cfg { +func loadSnapshotsEitherFromDiskIfNeeded(dirs datadir.Dirs, chainName string) (*snapcfg.Cfg, error) { preverifiedToml := filepath.Join(dirs.Snap, "preverified.toml") + exists, err := dir.FileExist(preverifiedToml) if err != nil { - return nil - } - if !exists { - return nil - } - // Read the preverified.toml and load the snapshots - haveToml, err := os.ReadFile(preverifiedToml) - if err != nil { - return nil + return nil, err } - snapcfg.SetToml(chainName, haveToml) - return snapcfg.KnownCfg(chainName) -} - -func loadSnapshotsEitherFromDiskIfNeeded(dirs datadir.Dirs, chainName string) (*snapcfg.Cfg, error) { - if cfg := ReadPreverifiedToml(dirs, chainName); cfg != nil { - return cfg, nil + if exists { + // Read the preverified.toml and load the snapshots + haveToml, err := os.ReadFile(preverifiedToml) + if err != nil { + return nil, err + } + snapcfg.SetToml(chainName, haveToml) } - preverifiedToml := filepath.Join(dirs.Snap, "preverified.toml") if err := dir.WriteFileWithFsync(preverifiedToml, snapcfg.GetToml(chainName), 0644); err != nil { return nil, err } diff --git a/eth/backend.go b/eth/backend.go index 17e7fb94373..45a079262da 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -1436,7 +1436,6 @@ func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConf allBorSnapshots = freezeblocks.NewBorRoSnapshots(snConfig.Snapshot, dirs.Snap, minFrozenBlock, logger) } blockReader := freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots) - agg, err := libstate.NewAggregator(ctx, dirs, config3.HistoryV3AggregationStep, db, logger) if err != nil { return nil, nil, nil, nil, nil, err diff --git a/eth/stagedsync/stage_snapshots.go b/eth/stagedsync/stage_snapshots.go index df7a01f801e..5abf088b48e 100644 --- a/eth/stagedsync/stage_snapshots.go +++ b/eth/stagedsync/stage_snapshots.go @@ -284,11 +284,6 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R return err } - // It's ok to notify before tx.Commit(), because RPCDaemon does read list of files by gRPC (not by reading from db) - if cfg.notifier.Events != nil { - cfg.notifier.Events.OnNewSnapshot() - } - diagnostics.Send(diagnostics.CurrentSyncSubStage{SubStage: "E2 Indexing"}) if err := cfg.blockRetire.BuildMissedIndicesIfNeed(ctx, s.LogPrefix(), cfg.notifier.Events, &cfg.chainConfig); err != nil { return err @@ -317,6 +312,11 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R casted.ForceReopenAggCtx() // otherwise next stages will not see just-indexed-files } + // It's ok to notify before tx.Commit(), because RPCDaemon does read list of files by gRPC (not by reading from db) + if cfg.notifier.Events != nil { + cfg.notifier.Events.OnNewSnapshot() + } + frozenBlocks := cfg.blockReader.FrozenBlocks() if s.BlockNumber < frozenBlocks { // allow genesis if err := s.Update(tx, frozenBlocks); err != nil { diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index f49b561c5fd..3ff4abc3fb5 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -276,7 +276,7 @@ func (s *DirtySegment) isSubSetOf(j *DirtySegment) bool { } func (s *DirtySegment) reopenSeg(dir string) (err error) { - if s.refcount.Load() != 0 { + if s.refcount.Load() > 0 { return } if s.Decompressor != nil { @@ -352,26 +352,28 @@ func (s *DirtySegment) reopenIdxIfNeed(dir string, optimistic bool) (err error) } func (s *DirtySegment) reopenIdx(dir string) (err error) { - if s.Decompressor == nil { + if s.refcount.Load() > 0 { return nil } - if s.refcount.Load() > 0 { + if s.Decompressor == nil { return nil } + for len(s.indexes) <= len(s.Type().Indexes()) { + s.indexes = append(s.indexes, nil) + } - for i, fileName := range s.Type().IdxFileNames(s.version, s.from, s.to) { - if len(s.indexes) <= i { - s.indexes = append(s.indexes, nil) - } + for i, index := range s.Type().Indexes() { if s.indexes[i] != nil { continue } + fileName := s.Type().IdxFileName(s.version, s.from, s.to, index) index, err := recsplit.OpenIndex(filepath.Join(dir, fileName)) if err != nil { return fmt.Errorf("%w, fileName: %s", err, fileName) } - s.indexes[i] = index + + s.indexes = append(s.indexes, index) } return nil From a0ffe2a8fd08aaef87f0fcc4df8e2f85a190f1f6 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Fri, 18 Oct 2024 10:31:45 +0700 Subject: [PATCH 26/36] save --- turbo/snapshotsync/freezeblocks/block_snapshots.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index 3ff4abc3fb5..ff5b961c75c 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -368,12 +368,13 @@ func (s *DirtySegment) reopenIdx(dir string) (err error) { } fileName := s.Type().IdxFileName(s.version, s.from, s.to, index) + log.Warn("[dbg] open ", "f", fileName) index, err := recsplit.OpenIndex(filepath.Join(dir, fileName)) if err != nil { return fmt.Errorf("%w, fileName: %s", err, fileName) } - s.indexes = append(s.indexes, index) + s.indexes[i] = index } return nil From 79f57b83913e4b3c752ac26faf1ccd0cc24ac688 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Fri, 18 Oct 2024 10:31:56 +0700 Subject: [PATCH 27/36] save --- turbo/snapshotsync/freezeblocks/block_snapshots.go | 1 - 1 file changed, 1 deletion(-) diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index ff5b961c75c..b97231d3427 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -368,7 +368,6 @@ func (s *DirtySegment) reopenIdx(dir string) (err error) { } fileName := s.Type().IdxFileName(s.version, s.from, s.to, index) - log.Warn("[dbg] open ", "f", fileName) index, err := recsplit.OpenIndex(filepath.Join(dir, fileName)) if err != nil { return fmt.Errorf("%w, fileName: %s", err, fileName) From d5f6c3fdd9dcd15954488e21947b3caaca609d13 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Fri, 18 Oct 2024 10:52:03 +0700 Subject: [PATCH 28/36] save --- cmd/rpcdaemon/cli/config.go | 10 +++++++--- eth/stagedsync/stage_snapshots.go | 3 --- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index eea722b0c20..43d80203023 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -31,6 +31,7 @@ import ( "time" "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "google.golang.org/grpc" grpcHealth "google.golang.org/grpc/health" @@ -442,12 +443,14 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger log.Warn("[rpc] download of segments not complete yet (need wait, then RPC will work)") } + wg := errgroup.Group{} + wg.SetLimit(1) onNewSnapshot = func() { - go func() { // don't block events processing by network communication + wg.Go(func() error { // don't block events processing by network communication reply, err := remoteKvClient.Snapshots(ctx, &remote.SnapshotsRequest{}, grpc.WaitForReady(true)) if err != nil { logger.Warn("[snapshots] reopen", "err", err) - return + return nil } if err := allSnapshots.ReopenList(reply.BlocksFiles, true); err != nil { logger.Error("[snapshots] reopen", "err", err) @@ -473,7 +476,8 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger return nil }) } - }() + return nil + }) } onNewSnapshot() diff --git a/eth/stagedsync/stage_snapshots.go b/eth/stagedsync/stage_snapshots.go index 5abf088b48e..f067a9e5804 100644 --- a/eth/stagedsync/stage_snapshots.go +++ b/eth/stagedsync/stage_snapshots.go @@ -304,9 +304,6 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R if err := cfg.agg.BuildMissedIndices(ctx, indexWorkers); err != nil { return err } - if cfg.notifier.Events != nil { - cfg.notifier.Events.OnNewSnapshot() - } if casted, ok := tx.(*temporal.Tx); ok { casted.ForceReopenAggCtx() // otherwise next stages will not see just-indexed-files From ad65988768737b766b76493ad0b8b1a804d27a2b Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Fri, 18 Oct 2024 10:54:18 +0700 Subject: [PATCH 29/36] save --- eth/stagedsync/stage_snapshots.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/eth/stagedsync/stage_snapshots.go b/eth/stagedsync/stage_snapshots.go index f067a9e5804..6d0d0140c70 100644 --- a/eth/stagedsync/stage_snapshots.go +++ b/eth/stagedsync/stage_snapshots.go @@ -271,7 +271,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R diagnostics.Send(diagnostics.CurrentSyncSubStage{SubStage: "Download header-chain"}) // Download only the snapshots that are for the header chain. - if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix() /*headerChain=*/, cfg.dirs, true, cfg.blobs, cfg.prune, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader, s.state.StagesIdsList()); err != nil { + if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix(), cfg.dirs, true /*headerChain=*/, cfg.blobs, cfg.prune, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader, s.state.StagesIdsList()); err != nil { return err } @@ -280,9 +280,12 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R } diagnostics.Send(diagnostics.CurrentSyncSubStage{SubStage: "Download snapshots"}) - if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix() /*headerChain=*/, cfg.dirs, false, cfg.blobs, cfg.prune, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader, s.state.StagesIdsList()); err != nil { + if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix(), cfg.dirs, false /*headerChain=*/, cfg.blobs, cfg.prune, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader, s.state.StagesIdsList()); err != nil { return err } + if cfg.notifier.Events != nil { + cfg.notifier.Events.OnNewSnapshot() + } diagnostics.Send(diagnostics.CurrentSyncSubStage{SubStage: "E2 Indexing"}) if err := cfg.blockRetire.BuildMissedIndicesIfNeed(ctx, s.LogPrefix(), cfg.notifier.Events, &cfg.chainConfig); err != nil { From 66fed7b125586385527eba69e62416aa3dfcc9f3 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Fri, 18 Oct 2024 11:14:39 +0700 Subject: [PATCH 30/36] save --- .../freezeblocks/block_snapshots.go | 38 ++++++++++++------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index b97231d3427..70bfe3a1f0a 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -279,9 +279,7 @@ func (s *DirtySegment) reopenSeg(dir string) (err error) { if s.refcount.Load() > 0 { return } - if s.Decompressor != nil { - return - } + s.closeSeg() s.Decompressor, err = seg.NewDecompressor(filepath.Join(dir, s.FileName())) if err != nil { return fmt.Errorf("%w, fileName: %s", err, s.FileName()) @@ -355,25 +353,37 @@ func (s *DirtySegment) reopenIdx(dir string) (err error) { if s.refcount.Load() > 0 { return nil } + + s.closeIdx() if s.Decompressor == nil { return nil } - for len(s.indexes) <= len(s.Type().Indexes()) { - s.indexes = append(s.indexes, nil) - } - - for i, index := range s.Type().Indexes() { - if s.indexes[i] != nil { - continue - } - - fileName := s.Type().IdxFileName(s.version, s.from, s.to, index) + //for len(s.indexes) <= len(s.Type().Indexes()) { + // s.indexes = append(s.indexes, nil) + //} + // + //for i, index := range s.Type().Indexes() { + // if s.indexes[i] != nil { + // continue + // } + // + // fileName := s.Type().IdxFileName(s.version, s.from, s.to, index) + // index, err := recsplit.OpenIndex(filepath.Join(dir, fileName)) + // if err != nil { + // return fmt.Errorf("%w, fileName: %s", err, fileName) + // } + // + // s.indexes[i] = index + //} + + for _, fileName := range s.Type().IdxFileNames(s.version, s.from, s.to) { index, err := recsplit.OpenIndex(filepath.Join(dir, fileName)) + if err != nil { return fmt.Errorf("%w, fileName: %s", err, fileName) } - s.indexes[i] = index + s.indexes = append(s.indexes, index) } return nil From f3e05fce3720f1abb6722b6f38f009b0fbe9bdb2 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Fri, 18 Oct 2024 11:31:07 +0700 Subject: [PATCH 31/36] save --- .../freezeblocks/block_snapshots.go | 33 ++++++++----------- .../freezeblocks/block_snapshots_test.go | 6 ++-- 2 files changed, 16 insertions(+), 23 deletions(-) diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index 70bfe3a1f0a..896d3555dfc 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -276,10 +276,10 @@ func (s *DirtySegment) isSubSetOf(j *DirtySegment) bool { } func (s *DirtySegment) reopenSeg(dir string) (err error) { - if s.refcount.Load() > 0 { - return + if s.Decompressor != nil { + return nil } - s.closeSeg() + //s.closeSeg() s.Decompressor, err = seg.NewDecompressor(filepath.Join(dir, s.FileName())) if err != nil { return fmt.Errorf("%w, fileName: %s", err, s.FileName()) @@ -350,22 +350,14 @@ func (s *DirtySegment) reopenIdxIfNeed(dir string, optimistic bool) (err error) } func (s *DirtySegment) reopenIdx(dir string) (err error) { - if s.refcount.Load() > 0 { - return nil - } - - s.closeIdx() + //s.closeIdx() if s.Decompressor == nil { return nil } - //for len(s.indexes) <= len(s.Type().Indexes()) { - // s.indexes = append(s.indexes, nil) - //} - // + for len(s.indexes) <= len(s.Type().Indexes()) { + s.indexes = append(s.indexes, nil) + } //for i, index := range s.Type().Indexes() { - // if s.indexes[i] != nil { - // continue - // } // // fileName := s.Type().IdxFileName(s.version, s.from, s.to, index) // index, err := recsplit.OpenIndex(filepath.Join(dir, fileName)) @@ -376,14 +368,17 @@ func (s *DirtySegment) reopenIdx(dir string) (err error) { // s.indexes[i] = index //} - for _, fileName := range s.Type().IdxFileNames(s.version, s.from, s.to) { - index, err := recsplit.OpenIndex(filepath.Join(dir, fileName)) + for i, fileName := range s.Type().IdxFileNames(s.version, s.from, s.to) { + if s.indexes[i] != nil { + continue + } + index, err := recsplit.OpenIndex(filepath.Join(dir, fileName)) if err != nil { return fmt.Errorf("%w, fileName: %s", err, fileName) } - s.indexes = append(s.indexes, index) + s.indexes[i] = index } return nil @@ -629,7 +624,7 @@ func (s *RoSnapshots) recalcVisibleFiles() { if seg.Decompressor == nil { continue } - if seg.indexes == nil { + if len(seg.indexes) != len(segtype.Type().Indexes()) { break } for len(newVisibleSegments) > 0 && newVisibleSegments[len(newVisibleSegments)-1].src.isSubSetOf(seg) { diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots_test.go b/turbo/snapshotsync/freezeblocks/block_snapshots_test.go index 5350ed8032e..1faa7e5bb6c 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots_test.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots_test.go @@ -26,19 +26,17 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/exp/slices" - "github.com/erigontech/erigon-lib/log/v3" - "github.com/erigontech/erigon/turbo/testlog" - "github.com/erigontech/erigon-lib/chain/networkname" "github.com/erigontech/erigon-lib/chain/snapcfg" "github.com/erigontech/erigon-lib/downloader/snaptype" + "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon-lib/recsplit" "github.com/erigontech/erigon-lib/seg" - "github.com/erigontech/erigon/common/math" coresnaptype "github.com/erigontech/erigon/core/snaptype" "github.com/erigontech/erigon/eth/ethconfig" "github.com/erigontech/erigon/params" + "github.com/erigontech/erigon/turbo/testlog" ) func createTestSegmentFile(t *testing.T, from, to uint64, name snaptype.Enum, dir string, version snaptype.Version, logger log.Logger) { From 29259b46baffed231ccf508a8893ea01ef80a4cb Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Fri, 18 Oct 2024 11:32:13 +0700 Subject: [PATCH 32/36] save --- turbo/snapshotsync/freezeblocks/block_snapshots.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index 896d3555dfc..d2ebd86d1ba 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -624,6 +624,9 @@ func (s *RoSnapshots) recalcVisibleFiles() { if seg.Decompressor == nil { continue } + if len(seg.indexes) == 0 { + break + } if len(seg.indexes) != len(segtype.Type().Indexes()) { break } From 028e2d7905133261834c9dc19fd80784ba7aad29 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Fri, 18 Oct 2024 11:58:32 +0700 Subject: [PATCH 33/36] save --- .../snapshotsync/freezeblocks/block_snapshots.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index d2ebd86d1ba..12b1fd0c2c0 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -279,7 +279,6 @@ func (s *DirtySegment) reopenSeg(dir string) (err error) { if s.Decompressor != nil { return nil } - //s.closeSeg() s.Decompressor, err = seg.NewDecompressor(filepath.Join(dir, s.FileName())) if err != nil { return fmt.Errorf("%w, fileName: %s", err, s.FileName()) @@ -350,11 +349,10 @@ func (s *DirtySegment) reopenIdxIfNeed(dir string, optimistic bool) (err error) } func (s *DirtySegment) reopenIdx(dir string) (err error) { - //s.closeIdx() if s.Decompressor == nil { return nil } - for len(s.indexes) <= len(s.Type().Indexes()) { + for len(s.indexes) < len(s.Type().Indexes()) { s.indexes = append(s.indexes, nil) } //for i, index := range s.Type().Indexes() { @@ -617,6 +615,7 @@ func (s *RoSnapshots) recalcVisibleFiles() { dirtySegments := value.DirtySegments newVisibleSegments := make([]*VisibleSegment, 0, dirtySegments.Len()) dirtySegments.Walk(func(segs []*DirtySegment) bool { + Loop: for _, seg := range segs { if seg.canDelete.Load() { continue @@ -624,12 +623,12 @@ func (s *RoSnapshots) recalcVisibleFiles() { if seg.Decompressor == nil { continue } - if len(seg.indexes) == 0 { - break - } - if len(seg.indexes) != len(segtype.Type().Indexes()) { - break + for _, idx := range seg.indexes { + if idx == nil { + continue Loop + } } + for len(newVisibleSegments) > 0 && newVisibleSegments[len(newVisibleSegments)-1].src.isSubSetOf(seg) { newVisibleSegments[len(newVisibleSegments)-1].src = nil newVisibleSegments = newVisibleSegments[:len(newVisibleSegments)-1] From d66c8345998c5fd1efc7f161229b677318a8e1b5 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Fri, 18 Oct 2024 12:13:37 +0700 Subject: [PATCH 34/36] save --- .../freezeblocks/block_snapshots.go | 38 +++++++++++++++---- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index 12b1fd0c2c0..9d6ac9486fd 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -249,6 +249,21 @@ func (s *DirtySegment) Version() snaptype.Version { return s.version } +func (s *DirtySegment) Indexed() bool { + if s.Decompressor == nil { + return false + } + if len(s.indexes) != len(s.Type().Indexes()) { + return false + } + for _, idx := range s.indexes { + if idx == nil { + return false + } + } + return true +} + func (s *DirtySegment) Index(index ...snaptype.Index) *recsplit.Index { if len(index) == 0 { index = []snaptype.Index{{}} @@ -615,20 +630,16 @@ func (s *RoSnapshots) recalcVisibleFiles() { dirtySegments := value.DirtySegments newVisibleSegments := make([]*VisibleSegment, 0, dirtySegments.Len()) dirtySegments.Walk(func(segs []*DirtySegment) bool { - Loop: for _, seg := range segs { if seg.canDelete.Load() { continue } - if seg.Decompressor == nil { + if !seg.Indexed() { continue } - for _, idx := range seg.indexes { - if idx == nil { - continue Loop - } - } + fmt.Printf("see: %s\n", seg.Decompressor.FileName()) + //protect from overlaps overlaps for len(newVisibleSegments) > 0 && newVisibleSegments[len(newVisibleSegments)-1].src.isSubSetOf(seg) { newVisibleSegments[len(newVisibleSegments)-1].src = nil newVisibleSegments = newVisibleSegments[:len(newVisibleSegments)-1] @@ -643,6 +654,19 @@ func (s *RoSnapshots) recalcVisibleFiles() { return true }) + // protect from gaps + if len(newVisibleSegments) > 0 { + prevEnd := newVisibleSegments[0].from + for i, seg := range newVisibleSegments { + if seg.from != prevEnd { + newVisibleSegments = newVisibleSegments[:i] //remove tail if see gap + break + } + fmt.Printf("see1: %s\n", seg.src.Decompressor.FileName()) + prevEnd = seg.to + } + } + value.VisibleSegments = newVisibleSegments var to uint64 if len(newVisibleSegments) > 0 { From 21da3b5c61f54cd8c924de3ea6d26481ae0c0c75 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Fri, 18 Oct 2024 12:15:27 +0700 Subject: [PATCH 35/36] save --- turbo/snapshotsync/freezeblocks/block_snapshots.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index 9d6ac9486fd..f0963b3b438 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -637,7 +637,6 @@ func (s *RoSnapshots) recalcVisibleFiles() { if !seg.Indexed() { continue } - fmt.Printf("see: %s\n", seg.Decompressor.FileName()) //protect from overlaps overlaps for len(newVisibleSegments) > 0 && newVisibleSegments[len(newVisibleSegments)-1].src.isSubSetOf(seg) { @@ -662,7 +661,6 @@ func (s *RoSnapshots) recalcVisibleFiles() { newVisibleSegments = newVisibleSegments[:i] //remove tail if see gap break } - fmt.Printf("see1: %s\n", seg.src.Decompressor.FileName()) prevEnd = seg.to } } @@ -676,6 +674,7 @@ func (s *RoSnapshots) recalcVisibleFiles() { return true }) + // all types must have same hight minMaxVisibleBlock := slices.Min(maxVisibleBlocks) s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { if minMaxVisibleBlock == 0 { From f079aa1bd238e606a9f6993ac06ae2ebd158f066 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Fri, 18 Oct 2024 14:58:12 +0700 Subject: [PATCH 36/36] save --- turbo/snapshotsync/freezeblocks/block_snapshots.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index f0963b3b438..93a6e7f581c 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -370,16 +370,6 @@ func (s *DirtySegment) reopenIdx(dir string) (err error) { for len(s.indexes) < len(s.Type().Indexes()) { s.indexes = append(s.indexes, nil) } - //for i, index := range s.Type().Indexes() { - // - // fileName := s.Type().IdxFileName(s.version, s.from, s.to, index) - // index, err := recsplit.OpenIndex(filepath.Join(dir, fileName)) - // if err != nil { - // return fmt.Errorf("%w, fileName: %s", err, fileName) - // } - // - // s.indexes[i] = index - //} for i, fileName := range s.Type().IdxFileNames(s.version, s.from, s.to) { if s.indexes[i] != nil {