Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Respect the sync limit of blocks insertion when inserting in MDBX. #10341

Merged
merged 14 commits into from
May 15, 2024
36 changes: 0 additions & 36 deletions cl/persistence/db_config/db_config.go

This file was deleted.

24 changes: 0 additions & 24 deletions cl/persistence/db_config/db_config_test.go

This file was deleted.

14 changes: 13 additions & 1 deletion cl/phase1/execution_client/block_collector/block_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ type blockCollector struct {
size uint64
logger log.Logger
engine execution_client.ExecutionEngine
syncBackLoop uint64

mu sync.Mutex
}

// NewBlockCollector creates a new block collector
func NewBlockCollector(logger log.Logger, engine execution_client.ExecutionEngine, beaconChainCfg *clparams.BeaconChainConfig, tmpdir string) BlockCollector {
func NewBlockCollector(logger log.Logger, engine execution_client.ExecutionEngine, beaconChainCfg *clparams.BeaconChainConfig, syncBackLoopAmount uint64, tmpdir string) BlockCollector {
return &blockCollector{
collector: etl.NewCollector(etlPrefix, tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), logger),
tmpdir: tmpdir,
Expand Down Expand Up @@ -83,6 +84,8 @@ func (b *blockCollector) Flush(ctx context.Context) error {
defer tmpTx.Rollback()
blocksBatch := []*types.Block{}

inserted := uint64(0)

if err := b.collector.Load(tmpTx, kv.Headers, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if len(v) == 0 {
return nil
Expand Down Expand Up @@ -115,7 +118,16 @@ func (b *blockCollector) Flush(ctx context.Context) error {
if err := b.engine.InsertBlocks(ctx, blocksBatch, true); err != nil {
b.logger.Warn("failed to insert blocks", "err", err)
}
inserted += uint64(len(blocksBatch))
b.logger.Info("[Caplin] Inserted blocks", "progress", blocksBatch[len(blocksBatch)-1].NumberU64())
// If we have inserted enough blocks, update fork choice (Optimation for E35)
lastBlockHash := blocksBatch[len(blocksBatch)-1].Hash()
if inserted >= b.syncBackLoop {
if _, err := b.engine.ForkChoiceUpdate(ctx, lastBlockHash, lastBlockHash, nil); err != nil {
b.logger.Warn("failed to update fork choice", "err", err)
}
inserted = 0
}
blocksBatch = []*types.Block{}
}
return next(k, nil, nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package block_collector_test

import (
"context"
"math"
"testing"

"github.com/ledgerwatch/erigon/cl/antiquary/tests"
Expand Down Expand Up @@ -32,7 +33,7 @@ func TestBlockCollectorAccumulateAndFlush(t *testing.T) {
}
return nil
})
bc := block_collector.NewBlockCollector(log.Root(), engine, &clparams.MainnetBeaconConfig, ".")
bc := block_collector.NewBlockCollector(log.Root(), engine, &clparams.MainnetBeaconConfig, math.MaxUint64, ".")
for _, block := range blocks {
err := bc.AddBlock(block.Block)
if err != nil {
Expand Down
7 changes: 2 additions & 5 deletions cl/phase1/stages/clstages.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/ledgerwatch/erigon/cl/persistence"
"github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies"
"github.com/ledgerwatch/erigon/cl/persistence/blob_storage"
"github.com/ledgerwatch/erigon/cl/persistence/db_config"
state_accessors "github.com/ledgerwatch/erigon/cl/persistence/state"
"github.com/ledgerwatch/erigon/cl/phase1/core/state"
"github.com/ledgerwatch/erigon/cl/phase1/execution_client"
Expand All @@ -50,7 +49,6 @@ type Cfg struct {
forkChoice *forkchoice.ForkChoiceStore
indiciesDB kv.RwDB
tmpdir string
dbConfig db_config.DatabaseConfiguration
blockReader freezeblocks.BeaconSnapshotReader
antiquary *antiquary.Antiquary
syncedData *synced_data.SyncedDataManager
Expand Down Expand Up @@ -85,7 +83,7 @@ func ClStagesCfg(
sn *freezeblocks.CaplinSnapshots,
blockReader freezeblocks.BeaconSnapshotReader,
tmpdir string,
dbConfig db_config.DatabaseConfiguration,
syncBackLoopLimit uint64,
backfilling bool,
blobBackfilling bool,
syncedData *synced_data.SyncedDataManager,
Expand All @@ -104,14 +102,13 @@ func ClStagesCfg(
forkChoice: forkChoice,
tmpdir: tmpdir,
indiciesDB: indiciesDB,
dbConfig: dbConfig,
sn: sn,
blockReader: blockReader,
backfilling: backfilling,
syncedData: syncedData,
emitter: emitters,
blobStore: blobStore,
blockCollector: block_collector.NewBlockCollector(log.Root(), executionClient, beaconCfg, tmpdir),
blockCollector: block_collector.NewBlockCollector(log.Root(), executionClient, beaconCfg, syncBackLoopLimit, tmpdir),
blobBackfilling: blobBackfilling,
attestationDataProducer: attestationDataProducer,
}
Expand Down
19 changes: 9 additions & 10 deletions cmd/capcli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies"
"github.com/ledgerwatch/erigon/cl/persistence/db_config"
"github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format"
"github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format/getters"
state_accessors "github.com/ledgerwatch/erigon/cl/persistence/state"
Expand Down Expand Up @@ -133,7 +132,7 @@ func (c *Chain) Run(ctx *Context) error {
return err
}
ethClock := eth_clock.NewEthereumClock(bs.GenesisTime(), bs.GenesisValidatorsRoot(), beaconConfig)
db, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, ethClock, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0)
db, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, beaconConfig, ethClock, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0)
if err != nil {
return err
}
Expand Down Expand Up @@ -185,7 +184,7 @@ func (c *ChainEndpoint) Run(ctx *Context) error {
ethClock := eth_clock.NewEthereumClock(bs.GenesisTime(), bs.GenesisValidatorsRoot(), beaconConfig)

dirs := datadir.New(c.Datadir)
db, _, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, ethClock, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0)
db, _, err := caplin1.OpenCaplinDatabase(ctx, beaconConfig, ethClock, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0)
if err != nil {
return err
}
Expand Down Expand Up @@ -306,7 +305,7 @@ func (c *DumpSnapshots) Run(ctx *Context) error {
dirs := datadir.New(c.Datadir)
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler))

db, _, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0)
db, _, err := caplin1.OpenCaplinDatabase(ctx, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0)
if err != nil {
return err
}
Expand Down Expand Up @@ -346,7 +345,7 @@ func (c *CheckSnapshots) Run(ctx *Context) error {
dirs := datadir.New(c.Datadir)
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler))

db, _, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0)
db, _, err := caplin1.OpenCaplinDatabase(ctx, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0)
if err != nil {
return err
}
Expand Down Expand Up @@ -426,7 +425,7 @@ func (c *LoopSnapshots) Run(ctx *Context) error {
dirs := datadir.New(c.Datadir)
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler))

db, _, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0)
db, _, err := caplin1.OpenCaplinDatabase(ctx, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0)
if err != nil {
return err
}
Expand Down Expand Up @@ -474,7 +473,7 @@ func (r *RetrieveHistoricalState) Run(ctx *Context) error {
return err
}
dirs := datadir.New(r.Datadir)
db, _, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0)
db, _, err := caplin1.OpenCaplinDatabase(ctx, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0)
if err != nil {
return err
}
Expand Down Expand Up @@ -822,7 +821,7 @@ func (b *BlobArchiveStoreCheck) Run(ctx *Context) error {

dirs := datadir.New(b.Datadir)

db, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0)
db, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0)
if err != nil {
return err
}
Expand Down Expand Up @@ -892,7 +891,7 @@ func (c *DumpBlobsSnapshots) Run(ctx *Context) error {
dirs := datadir.New(c.Datadir)
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler))

db, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0)
db, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0)
if err != nil {
return err
}
Expand Down Expand Up @@ -933,7 +932,7 @@ func (c *CheckBlobsSnapshots) Run(ctx *Context) error {
dirs := datadir.New(c.Datadir)
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler))

db, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0)
db, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0)
if err != nil {
return err
}
Expand Down
13 changes: 1 addition & 12 deletions cmd/caplin/caplin1/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (

"github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies"
"github.com/ledgerwatch/erigon/cl/persistence/blob_storage"
"github.com/ledgerwatch/erigon/cl/persistence/db_config"
"github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format"
state_accessors "github.com/ledgerwatch/erigon/cl/persistence/state"
"github.com/ledgerwatch/erigon/cl/persistence/state/historical_states_reader"
Expand All @@ -60,7 +59,6 @@ import (
)

func OpenCaplinDatabase(ctx context.Context,
databaseConfig db_config.DatabaseConfiguration,
beaconConfig *clparams.BeaconChainConfig,
ethClock eth_clock.EthereumClock,
dbPath string,
Expand Down Expand Up @@ -89,10 +87,6 @@ func OpenCaplinDatabase(ctx context.Context,
}
defer tx.Rollback()

if err := db_config.WriteConfigurationIfNotExist(ctx, tx, databaseConfig); err != nil {
return nil, nil, err
}

if err := tx.Commit(); err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -234,11 +228,6 @@ func RunCaplinPhase1(ctx context.Context, engine execution_client.ExecutionEngin
}
defer tx.Rollback()

dbConfig, err := db_config.ReadConfiguration(ctx, tx)
if err != nil {
return err
}

if err := state_accessors.InitializeStaticTables(tx, state); err != nil {
return err
}
Expand Down Expand Up @@ -310,7 +299,7 @@ func RunCaplinPhase1(ctx context.Context, engine execution_client.ExecutionEngin
log.Info("Beacon API started", "addr", config.BeaconRouter.Address)
}

stageCfg := stages.ClStagesCfg(beaconRpc, antiq, ethClock, beaconConfig, state, engine, gossipManager, forkChoice, indexDB, csn, rcsn, dirs.Tmp, dbConfig, backfilling, blobBackfilling, syncedDataManager, emitters, blobStorage, attestationProducer)
stageCfg := stages.ClStagesCfg(beaconRpc, antiq, ethClock, beaconConfig, state, engine, gossipManager, forkChoice, indexDB, csn, rcsn, dirs.Tmp, uint64(config.LoopBlockLimit), backfilling, blobBackfilling, syncedDataManager, emitters, blobStorage, attestationProducer)
sync := stages.ConsensusClStages(ctx, stageCfg)

logger.Info("[Caplin] starting clstages loop")
Expand Down
3 changes: 1 addition & 2 deletions cmd/caplin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/ledgerwatch/erigon-lib/common/disk"
"github.com/ledgerwatch/erigon-lib/common/mem"
"github.com/ledgerwatch/erigon/cl/beacon/beacon_router_configuration"
"github.com/ledgerwatch/erigon/cl/persistence/db_config"
"github.com/ledgerwatch/erigon/cl/phase1/core"
"github.com/ledgerwatch/erigon/cl/phase1/core/state"
execution_client2 "github.com/ledgerwatch/erigon/cl/phase1/execution_client"
Expand Down Expand Up @@ -131,7 +130,7 @@ func runCaplinNode(cliCtx *cli.Context) error {
executionEngine = cc
}

indiciesDB, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, db_config.DefaultDatabaseConfiguration, cfg.BeaconCfg, ethClock, cfg.Dirs.CaplinIndexing, cfg.Dirs.CaplinBlobs, executionEngine, false, 100_000)
indiciesDB, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, cfg.BeaconCfg, ethClock, cfg.Dirs.CaplinIndexing, cfg.Dirs.CaplinBlobs, executionEngine, false, 100_000)
if err != nil {
return err
}
Expand Down
3 changes: 1 addition & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ import (
libtypes "github.com/ledgerwatch/erigon-lib/types"
"github.com/ledgerwatch/erigon-lib/wrap"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/persistence/db_config"
"github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format/getters"
clcore "github.com/ledgerwatch/erigon/cl/phase1/core"
executionclient "github.com/ledgerwatch/erigon/cl/phase1/execution_client"
Expand Down Expand Up @@ -939,7 +938,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
pruneBlobDistance = math.MaxUint64
}

indiciesDB, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, db_config.DefaultDatabaseConfiguration, beaconCfg, ethClock, dirs.CaplinIndexing, dirs.CaplinBlobs, executionEngine, false, pruneBlobDistance)
indiciesDB, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, beaconCfg, ethClock, dirs.CaplinIndexing, dirs.CaplinBlobs, executionEngine, false, pruneBlobDistance)
if err != nil {
return nil, err
}
Expand Down
6 changes: 5 additions & 1 deletion eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ func ExecV3(ctx context.Context,
}

blockNum = doms.BlockNum()
initialBlockNum := blockNum
outputTxNum.Store(doms.TxNum())

var err error
Expand Down Expand Up @@ -872,8 +873,11 @@ Loop:
t1 = time.Since(tt)

tt = time.Now()
// If execute more than 100 blocks then, it is safe to assume that we are not on the tip of the chain.
// In this case, we can prune the state to save memory.
pruneBlockMargin := uint64(100)

if !useExternalTx {
if blockNum-initialBlockNum > pruneBlockMargin {
if _, err := applyTx.(state2.HasAggTx).AggTx().(*state2.AggregatorRoTx).PruneSmallBatches(ctx, 10*time.Minute, applyTx); err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion eth/stagedsync/stage_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,6 @@ func FillDBFromSnapshots(logPrefix string, ctx context.Context, tx kv.RwTx, dirs

func computeBlocksToPrune(cfg SnapshotsCfg) (blocksToPrune uint64, historyToPrune uint64) {
frozenBlocks := cfg.blockReader.Snapshots().SegmentsMax()
fmt.Println("O", cfg.prune.Blocks.PruneTo(frozenBlocks), cfg.prune.History.PruneTo(frozenBlocks))
return frozenBlocks - cfg.prune.Blocks.PruneTo(frozenBlocks), frozenBlocks - cfg.prune.History.PruneTo(frozenBlocks)
}

Expand Down
9 changes: 9 additions & 0 deletions turbo/engineapi/engine_block_downloader/block_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func (e *EngineBlockDownloader) insertHeadersAndBodies(ctx context.Context, tx k
if err != nil {
return err
}
inserted := uint64(0)

log.Info("Beginning downloaded blocks insertion")
// Start by seeking headers
Expand All @@ -245,6 +246,14 @@ func (e *EngineBlockDownloader) insertHeadersAndBodies(ctx context.Context, tx k
if err := e.chainRW.InsertBlocksAndWait(ctx, blocksBatch); err != nil {
return err
}
inserted += uint64(len(blocksBatch))
if inserted >= uint64(e.syncCfg.LoopBlockLimit) {
lastHash := blocksBatch[len(blocksBatch)-1].Hash()
if _, _, _, err := e.chainRW.UpdateForkChoice(ctx, lastHash, lastHash, lastHash); err != nil {
return err
}
inserted = 0
}
blocksBatch = blocksBatch[:0]
}
header := new(types.Header)
Expand Down
4 changes: 0 additions & 4 deletions turbo/execution/eth1/eth1_chain_reader.go/chain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,6 @@ func (c ChainReaderWriterEth1) InsertBlocksAndWait(ctx context.Context, blocks [
return err
}

// limit the number of retries
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

for response.Result == execution.ExecutionStatus_Busy {
const retryDelay = 100 * time.Millisecond
select {
Expand Down
Loading