Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 committed Oct 7, 2024
1 parent b80b6e3 commit c5ce6df
Showing 1 changed file with 124 additions and 127 deletions.
251 changes: 124 additions & 127 deletions cl/antiquary/antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,17 @@ package antiquary

import (
"context"
"math"
"sync/atomic"
"time"

"github.com/ledgerwatch/erigon-lib/common/datadir"
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies"
"github.com/ledgerwatch/erigon/cl/persistence/blob_storage"
state_accessors "github.com/ledgerwatch/erigon/cl/persistence/state"
"github.com/ledgerwatch/erigon/cl/phase1/core/state"
"github.com/ledgerwatch/erigon/cl/utils"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks"
"github.com/ledgerwatch/log/v3"
)
Expand Down Expand Up @@ -165,9 +162,9 @@ func (a *Antiquary) Loop() error {
if a.states {
go a.loopStates(a.ctx)
}
if a.blobs {
go a.loopBlobs(a.ctx)
}
// if a.blobs {
// go a.loopBlobs(a.ctx)
// }

// write the indicies
if err := beacon_indicies.WriteLastBeaconSnapshot(tx, frozenSlots); err != nil {
Expand All @@ -178,49 +175,49 @@ func (a *Antiquary) Loop() error {
return err
}
// Check for snapshots retirement every 3 minutes
retirementTicker := time.NewTicker(3 * time.Minute)
defer retirementTicker.Stop()
for {
select {
case <-retirementTicker.C:
if !a.backfilled.Load() {
continue
}
var (
from uint64
to uint64
)
if err := a.mainDB.View(a.ctx, func(roTx kv.Tx) error {
// read the last beacon snapshots
from, err = beacon_indicies.ReadLastBeaconSnapshot(roTx)
if err != nil {
return err
}
from += 1
// read the finalized head
to, err = beacon_indicies.ReadHighestFinalized(roTx)
if err != nil {
return err
}
return nil
}); err != nil {
return err
}
// Sanity checks just to be safe.
if from >= to {
continue
}
to = utils.Min64(to, to-safetyMargin) // We don't want to retire snapshots that are too close to the finalized head
to = (to / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit
if to-from < snaptype.Erigon2MergeLimit {
continue
}
if err := a.antiquate(from, to); err != nil {
return err
}
case <-a.ctx.Done():
}
}
// retirementTicker := time.NewTicker(3 * time.Minute)
// defer retirementTicker.Stop()
// for {
// select {
// case <-retirementTicker.C:
// if !a.backfilled.Load() {
// continue
// }
// var (
// from uint64
// to uint64
// )
// if err := a.mainDB.View(a.ctx, func(roTx kv.Tx) error {
// // read the last beacon snapshots
// from, err = beacon_indicies.ReadLastBeaconSnapshot(roTx)
// if err != nil {
// return err
// }
// from += 1
// // read the finalized head
// to, err = beacon_indicies.ReadHighestFinalized(roTx)
// if err != nil {
// return err
// }
// return nil
// }); err != nil {
// return err
// }
// // Sanity checks just to be safe.
// if from >= to {
// continue
// }
// to = utils.Min64(to, to-safetyMargin) // We don't want to retire snapshots that are too close to the finalized head
// to = (to / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit
// if to-from < snaptype.Erigon2MergeLimit {
// continue
// }
// if err := a.antiquate(from, to); err != nil {
// return err
// }
// case <-a.ctx.Done():
// }
// }
}

Check failure on line 221 in cl/antiquary/antiquary.go

View workflow job for this annotation

GitHub Actions / tests-windows (windows-2022)

missing return

// Antiquate will antiquate a specific block range (aka. retire snapshots), this should be ran in the background.
Expand Down Expand Up @@ -276,84 +273,84 @@ func (a *Antiquary) NotifyBlobBackfilled() {
a.blobBackfilled.Store(true)
}

func (a *Antiquary) loopBlobs(ctx context.Context) {
if a.cfg.DenebForkEpoch == math.MaxUint64 {
return
}
blobAntiquationTicker := time.NewTicker(10 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-blobAntiquationTicker.C:
if !a.blobBackfilled.Load() {
continue
}
if err := a.antiquateBlobs(); err != nil {
log.Error("[Antiquary]: Failed to antiquate blobs", "err", err)
}
}
}
}
// func (a *Antiquary) loopBlobs(ctx context.Context) {
// if a.cfg.DenebForkEpoch == math.MaxUint64 {
// return
// }
// blobAntiquationTicker := time.NewTicker(10 * time.Second)
// for {
// select {
// case <-ctx.Done():
// return
// case <-blobAntiquationTicker.C:
// if !a.blobBackfilled.Load() {
// continue
// }
// if err := a.antiquateBlobs(); err != nil {
// log.Error("[Antiquary]: Failed to antiquate blobs", "err", err)
// }
// }
// }
// }

func (a *Antiquary) antiquateBlobs() error {
roTx, err := a.mainDB.BeginRo(a.ctx)
if err != nil {
return err
}
defer roTx.Rollback()
// perform blob antiquation if it is time to.
currentBlobsProgress := a.sn.FrozenBlobs()
// We should NEVER get ahead of the block snapshots.
if currentBlobsProgress >= a.sn.BlocksAvailable() {
return nil
}
minimunBlobsProgress := ((a.cfg.DenebForkEpoch * a.cfg.SlotsPerEpoch) / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit
currentBlobsProgress = utils.Max64(currentBlobsProgress, minimunBlobsProgress)
// read the finalized head
to, err := beacon_indicies.ReadHighestFinalized(roTx)
if err != nil {
return err
}
if to <= currentBlobsProgress || to-currentBlobsProgress < snaptype.Erigon2MergeLimit {
return nil
}
roTx.Rollback()
a.logger.Info("[Antiquary]: Antiquating blobs", "from", currentBlobsProgress, "to", to)
// now, we need to retire the blobs
if err := freezeblocks.DumpBlobsSidecar(a.ctx, a.blobStorage, a.mainDB, currentBlobsProgress, to, a.sn.Salt, a.dirs, 1, log.LvlDebug, a.logger); err != nil {
return err
}
to = (to / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit
a.logger.Info("[Antiquary]: Finished Antiquating blobs", "from", currentBlobsProgress, "to", to)
if err := a.sn.ReopenFolder(); err != nil {
return err
}
// func (a *Antiquary) antiquateBlobs() error {
// roTx, err := a.mainDB.BeginRo(a.ctx)
// if err != nil {
// return err
// }
// defer roTx.Rollback()
// // perform blob antiquation if it is time to.
// currentBlobsProgress := a.sn.FrozenBlobs()
// // We should NEVER get ahead of the block snapshots.
// if currentBlobsProgress >= a.sn.BlocksAvailable() {
// return nil
// }
// minimunBlobsProgress := ((a.cfg.DenebForkEpoch * a.cfg.SlotsPerEpoch) / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit
// currentBlobsProgress = utils.Max64(currentBlobsProgress, minimunBlobsProgress)
// // read the finalized head
// to, err := beacon_indicies.ReadHighestFinalized(roTx)
// if err != nil {
// return err
// }
// if to <= currentBlobsProgress || to-currentBlobsProgress < snaptype.Erigon2MergeLimit {
// return nil
// }
// roTx.Rollback()
// a.logger.Info("[Antiquary]: Antiquating blobs", "from", currentBlobsProgress, "to", to)
// // now, we need to retire the blobs
// if err := freezeblocks.DumpBlobsSidecar(a.ctx, a.blobStorage, a.mainDB, currentBlobsProgress, to, a.sn.Salt, a.dirs, 1, log.LvlDebug, a.logger); err != nil {
// return err
// }
// to = (to / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit
// a.logger.Info("[Antiquary]: Finished Antiquating blobs", "from", currentBlobsProgress, "to", to)
// if err := a.sn.ReopenFolder(); err != nil {
// return err
// }

paths := a.sn.SegFilePaths(currentBlobsProgress, to)
downloadItems := make([]*proto_downloader.AddItem, len(paths))
for i, path := range paths {
downloadItems[i] = &proto_downloader.AddItem{
Path: path,
}
}
// Notify bittorent to seed the new snapshots
if _, err := a.downloader.Add(a.ctx, &proto_downloader.AddRequest{Items: downloadItems}); err != nil {
log.Warn("[Antiquary]: Failed to add items to bittorent", "err", err)
}
// paths := a.sn.SegFilePaths(currentBlobsProgress, to)
// downloadItems := make([]*proto_downloader.AddItem, len(paths))
// for i, path := range paths {
// downloadItems[i] = &proto_downloader.AddItem{
// Path: path,
// }
// }
// // Notify bittorent to seed the new snapshots
// if _, err := a.downloader.Add(a.ctx, &proto_downloader.AddRequest{Items: downloadItems}); err != nil {
// log.Warn("[Antiquary]: Failed to add items to bittorent", "err", err)
// }

roTx, err = a.mainDB.BeginRo(a.ctx)
if err != nil {
return err
}
defer roTx.Rollback()
// now prune blobs from the database
for i := currentBlobsProgress; i < to; i++ {
blockRoot, err := beacon_indicies.ReadCanonicalBlockRoot(roTx, i)
if err != nil {
return err
}
a.blobStorage.RemoveBlobSidecars(a.ctx, i, blockRoot)
}
return nil
}
// roTx, err = a.mainDB.BeginRo(a.ctx)
// if err != nil {
// return err
// }
// defer roTx.Rollback()
// // now prune blobs from the database
// for i := currentBlobsProgress; i < to; i++ {
// blockRoot, err := beacon_indicies.ReadCanonicalBlockRoot(roTx, i)
// if err != nil {
// return err
// }
// a.blobStorage.RemoveBlobSidecars(a.ctx, i, blockRoot)
// }
// return nil
// }

0 comments on commit c5ce6df

Please sign in to comment.