From 4516809ff9fdc8770160725277cedd958911a4c9 Mon Sep 17 00:00:00 2001 From: Giulio Date: Mon, 7 Oct 2024 17:42:47 +0200 Subject: [PATCH] save --- cl/antiquary/antiquary.go | 251 +++++++++++++++++++------------------- 1 file changed, 124 insertions(+), 127 deletions(-) diff --git a/cl/antiquary/antiquary.go b/cl/antiquary/antiquary.go index 91f363c55c1..9642800d15c 100644 --- a/cl/antiquary/antiquary.go +++ b/cl/antiquary/antiquary.go @@ -2,12 +2,10 @@ package antiquary import ( "context" - "math" "sync/atomic" "time" "github.com/ledgerwatch/erigon-lib/common/datadir" - "github.com/ledgerwatch/erigon-lib/downloader/snaptype" proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/cl/clparams" @@ -15,7 +13,6 @@ import ( "github.com/ledgerwatch/erigon/cl/persistence/blob_storage" state_accessors "github.com/ledgerwatch/erigon/cl/persistence/state" "github.com/ledgerwatch/erigon/cl/phase1/core/state" - "github.com/ledgerwatch/erigon/cl/utils" "github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks" "github.com/ledgerwatch/log/v3" ) @@ -165,9 +162,9 @@ func (a *Antiquary) Loop() error { if a.states { go a.loopStates(a.ctx) } - if a.blobs { - go a.loopBlobs(a.ctx) - } + // if a.blobs { + // go a.loopBlobs(a.ctx) + // } // write the indicies if err := beacon_indicies.WriteLastBeaconSnapshot(tx, frozenSlots); err != nil { @@ -178,49 +175,49 @@ func (a *Antiquary) Loop() error { return err } // Check for snapshots retirement every 3 minutes - retirementTicker := time.NewTicker(3 * time.Minute) - defer retirementTicker.Stop() - for { - select { - case <-retirementTicker.C: - if !a.backfilled.Load() { - continue - } - var ( - from uint64 - to uint64 - ) - if err := a.mainDB.View(a.ctx, func(roTx kv.Tx) error { - // read the last beacon snapshots - from, err = beacon_indicies.ReadLastBeaconSnapshot(roTx) - if err != nil { - return err - } - from += 1 - // read the finalized head - to, err = beacon_indicies.ReadHighestFinalized(roTx) - if err != nil { - return err - } - return nil - }); err != nil { - return err - } - // Sanity checks just to be safe. - if from >= to { - continue - } - to = utils.Min64(to, to-safetyMargin) // We don't want to retire snapshots that are too close to the finalized head - to = (to / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit - if to-from < snaptype.Erigon2MergeLimit { - continue - } - if err := a.antiquate(from, to); err != nil { - return err - } - case <-a.ctx.Done(): - } - } + // retirementTicker := time.NewTicker(3 * time.Minute) + // defer retirementTicker.Stop() + // for { + // select { + // case <-retirementTicker.C: + // if !a.backfilled.Load() { + // continue + // } + // var ( + // from uint64 + // to uint64 + // ) + // if err := a.mainDB.View(a.ctx, func(roTx kv.Tx) error { + // // read the last beacon snapshots + // from, err = beacon_indicies.ReadLastBeaconSnapshot(roTx) + // if err != nil { + // return err + // } + // from += 1 + // // read the finalized head + // to, err = beacon_indicies.ReadHighestFinalized(roTx) + // if err != nil { + // return err + // } + // return nil + // }); err != nil { + // return err + // } + // // Sanity checks just to be safe. + // if from >= to { + // continue + // } + // to = utils.Min64(to, to-safetyMargin) // We don't want to retire snapshots that are too close to the finalized head + // to = (to / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit + // if to-from < snaptype.Erigon2MergeLimit { + // continue + // } + // if err := a.antiquate(from, to); err != nil { + // return err + // } + // case <-a.ctx.Done(): + // } + // } } // Antiquate will antiquate a specific block range (aka. retire snapshots), this should be ran in the background. @@ -276,84 +273,84 @@ func (a *Antiquary) NotifyBlobBackfilled() { a.blobBackfilled.Store(true) } -func (a *Antiquary) loopBlobs(ctx context.Context) { - if a.cfg.DenebForkEpoch == math.MaxUint64 { - return - } - blobAntiquationTicker := time.NewTicker(10 * time.Second) - for { - select { - case <-ctx.Done(): - return - case <-blobAntiquationTicker.C: - if !a.blobBackfilled.Load() { - continue - } - if err := a.antiquateBlobs(); err != nil { - log.Error("[Antiquary]: Failed to antiquate blobs", "err", err) - } - } - } -} +// func (a *Antiquary) loopBlobs(ctx context.Context) { +// if a.cfg.DenebForkEpoch == math.MaxUint64 { +// return +// } +// blobAntiquationTicker := time.NewTicker(10 * time.Second) +// for { +// select { +// case <-ctx.Done(): +// return +// case <-blobAntiquationTicker.C: +// if !a.blobBackfilled.Load() { +// continue +// } +// if err := a.antiquateBlobs(); err != nil { +// log.Error("[Antiquary]: Failed to antiquate blobs", "err", err) +// } +// } +// } +// } -func (a *Antiquary) antiquateBlobs() error { - roTx, err := a.mainDB.BeginRo(a.ctx) - if err != nil { - return err - } - defer roTx.Rollback() - // perform blob antiquation if it is time to. - currentBlobsProgress := a.sn.FrozenBlobs() - // We should NEVER get ahead of the block snapshots. - if currentBlobsProgress >= a.sn.BlocksAvailable() { - return nil - } - minimunBlobsProgress := ((a.cfg.DenebForkEpoch * a.cfg.SlotsPerEpoch) / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit - currentBlobsProgress = utils.Max64(currentBlobsProgress, minimunBlobsProgress) - // read the finalized head - to, err := beacon_indicies.ReadHighestFinalized(roTx) - if err != nil { - return err - } - if to <= currentBlobsProgress || to-currentBlobsProgress < snaptype.Erigon2MergeLimit { - return nil - } - roTx.Rollback() - a.logger.Info("[Antiquary]: Antiquating blobs", "from", currentBlobsProgress, "to", to) - // now, we need to retire the blobs - if err := freezeblocks.DumpBlobsSidecar(a.ctx, a.blobStorage, a.mainDB, currentBlobsProgress, to, a.sn.Salt, a.dirs, 1, log.LvlDebug, a.logger); err != nil { - return err - } - to = (to / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit - a.logger.Info("[Antiquary]: Finished Antiquating blobs", "from", currentBlobsProgress, "to", to) - if err := a.sn.ReopenFolder(); err != nil { - return err - } +// func (a *Antiquary) antiquateBlobs() error { +// roTx, err := a.mainDB.BeginRo(a.ctx) +// if err != nil { +// return err +// } +// defer roTx.Rollback() +// // perform blob antiquation if it is time to. +// currentBlobsProgress := a.sn.FrozenBlobs() +// // We should NEVER get ahead of the block snapshots. +// if currentBlobsProgress >= a.sn.BlocksAvailable() { +// return nil +// } +// minimunBlobsProgress := ((a.cfg.DenebForkEpoch * a.cfg.SlotsPerEpoch) / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit +// currentBlobsProgress = utils.Max64(currentBlobsProgress, minimunBlobsProgress) +// // read the finalized head +// to, err := beacon_indicies.ReadHighestFinalized(roTx) +// if err != nil { +// return err +// } +// if to <= currentBlobsProgress || to-currentBlobsProgress < snaptype.Erigon2MergeLimit { +// return nil +// } +// roTx.Rollback() +// a.logger.Info("[Antiquary]: Antiquating blobs", "from", currentBlobsProgress, "to", to) +// // now, we need to retire the blobs +// if err := freezeblocks.DumpBlobsSidecar(a.ctx, a.blobStorage, a.mainDB, currentBlobsProgress, to, a.sn.Salt, a.dirs, 1, log.LvlDebug, a.logger); err != nil { +// return err +// } +// to = (to / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit +// a.logger.Info("[Antiquary]: Finished Antiquating blobs", "from", currentBlobsProgress, "to", to) +// if err := a.sn.ReopenFolder(); err != nil { +// return err +// } - paths := a.sn.SegFilePaths(currentBlobsProgress, to) - downloadItems := make([]*proto_downloader.AddItem, len(paths)) - for i, path := range paths { - downloadItems[i] = &proto_downloader.AddItem{ - Path: path, - } - } - // Notify bittorent to seed the new snapshots - if _, err := a.downloader.Add(a.ctx, &proto_downloader.AddRequest{Items: downloadItems}); err != nil { - log.Warn("[Antiquary]: Failed to add items to bittorent", "err", err) - } +// paths := a.sn.SegFilePaths(currentBlobsProgress, to) +// downloadItems := make([]*proto_downloader.AddItem, len(paths)) +// for i, path := range paths { +// downloadItems[i] = &proto_downloader.AddItem{ +// Path: path, +// } +// } +// // Notify bittorent to seed the new snapshots +// if _, err := a.downloader.Add(a.ctx, &proto_downloader.AddRequest{Items: downloadItems}); err != nil { +// log.Warn("[Antiquary]: Failed to add items to bittorent", "err", err) +// } - roTx, err = a.mainDB.BeginRo(a.ctx) - if err != nil { - return err - } - defer roTx.Rollback() - // now prune blobs from the database - for i := currentBlobsProgress; i < to; i++ { - blockRoot, err := beacon_indicies.ReadCanonicalBlockRoot(roTx, i) - if err != nil { - return err - } - a.blobStorage.RemoveBlobSidecars(a.ctx, i, blockRoot) - } - return nil -} +// roTx, err = a.mainDB.BeginRo(a.ctx) +// if err != nil { +// return err +// } +// defer roTx.Rollback() +// // now prune blobs from the database +// for i := currentBlobsProgress; i < to; i++ { +// blockRoot, err := beacon_indicies.ReadCanonicalBlockRoot(roTx, i) +// if err != nil { +// return err +// } +// a.blobStorage.RemoveBlobSidecars(a.ctx, i, blockRoot) +// } +// return nil +// }