From b600d5add8c7d5b42eb7be27a76506c7258e262e Mon Sep 17 00:00:00 2001 From: Giulio Date: Tue, 14 May 2024 19:16:24 +0200 Subject: [PATCH 01/12] save --- cl/persistence/db_config/db_config.go | 36 ------------------- cl/persistence/db_config/db_config_test.go | 24 ------------- .../block_collector/block_collector.go | 14 +++++++- cl/phase1/stages/clstages.go | 7 ++-- cmd/caplin/caplin1/run.go | 13 +------ eth/backend.go | 3 +- 6 files changed, 17 insertions(+), 80 deletions(-) delete mode 100644 cl/persistence/db_config/db_config.go delete mode 100644 cl/persistence/db_config/db_config_test.go diff --git a/cl/persistence/db_config/db_config.go b/cl/persistence/db_config/db_config.go deleted file mode 100644 index 8845fa4a20a..00000000000 --- a/cl/persistence/db_config/db_config.go +++ /dev/null @@ -1,36 +0,0 @@ -package db_config - -import ( - "bytes" - "context" - "math" - - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/ethdb/cbor" -) - -type DatabaseConfiguration struct{ PruneDepth uint64 } - -var DefaultDatabaseConfiguration = DatabaseConfiguration{PruneDepth: math.MaxUint64} - -func WriteConfigurationIfNotExist(ctx context.Context, tx kv.RwTx, cfg DatabaseConfiguration) error { - var b bytes.Buffer - if err := cbor.Encoder(&b).Encode(cfg); err != nil { - return err - } - - return tx.Put(kv.DatabaseInfo, []byte("config"), b.Bytes()) -} - -func ReadConfiguration(ctx context.Context, tx kv.Tx) (DatabaseConfiguration, error) { - var cfg DatabaseConfiguration - - cfgEncoded, err := tx.GetOne(kv.DatabaseInfo, []byte("config")) - if err != nil { - return cfg, err - } - if err := cbor.Decoder(bytes.NewReader(cfgEncoded)).Decode(&cfg); err != nil { - return cfg, err - } - return cfg, err -} diff --git a/cl/persistence/db_config/db_config_test.go b/cl/persistence/db_config/db_config_test.go deleted file mode 100644 index 0133f4862a5..00000000000 --- a/cl/persistence/db_config/db_config_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package db_config - -import ( - "context" - "testing" - - "github.com/ledgerwatch/erigon-lib/kv/memdb" - "github.com/stretchr/testify/require" - _ "modernc.org/sqlite" -) - -func TestDBConfig(t *testing.T) { - db := memdb.NewTestDB(t) - defer db.Close() - tx, err := db.BeginRw(context.Background()) - defer tx.Rollback() - require.NoError(t, err) - - c := DatabaseConfiguration{PruneDepth: 69} - require.NoError(t, WriteConfigurationIfNotExist(context.Background(), tx, c)) - cfg, err := ReadConfiguration(context.Background(), tx) - require.NoError(t, err) - require.Equal(t, cfg, c) -} diff --git a/cl/phase1/execution_client/block_collector/block_collector.go b/cl/phase1/execution_client/block_collector/block_collector.go index 79eea139781..6609271683c 100644 --- a/cl/phase1/execution_client/block_collector/block_collector.go +++ b/cl/phase1/execution_client/block_collector/block_collector.go @@ -35,12 +35,13 @@ type blockCollector struct { size uint64 logger log.Logger engine execution_client.ExecutionEngine + syncBackLoop uint64 mu sync.Mutex } // NewBlockCollector creates a new block collector -func NewBlockCollector(logger log.Logger, engine execution_client.ExecutionEngine, beaconChainCfg *clparams.BeaconChainConfig, tmpdir string) BlockCollector { +func NewBlockCollector(logger log.Logger, engine execution_client.ExecutionEngine, beaconChainCfg *clparams.BeaconChainConfig, syncBackLoopAmount uint64, tmpdir string) BlockCollector { return &blockCollector{ collector: etl.NewCollector(etlPrefix, tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), logger), tmpdir: tmpdir, @@ -83,6 +84,8 @@ func (b *blockCollector) Flush(ctx context.Context) error { defer tmpTx.Rollback() blocksBatch := []*types.Block{} + inserted := uint64(0) + if err := b.collector.Load(tmpTx, kv.Headers, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { if len(v) == 0 { return nil @@ -115,8 +118,17 @@ func (b *blockCollector) Flush(ctx context.Context) error { if err := b.engine.InsertBlocks(ctx, blocksBatch, true); err != nil { b.logger.Warn("failed to insert blocks", "err", err) } + inserted += uint64(len(blocksBatch)) b.logger.Info("[Caplin] Inserted blocks", "progress", blocksBatch[len(blocksBatch)-1].NumberU64()) blocksBatch = []*types.Block{} + // If we have inserted enough blocks, update fork choice (Optimation for E35) + lastBlockHash := blocksBatch[len(blocksBatch)-1].Hash() + if inserted >= b.syncBackLoop { + if _, err := b.engine.ForkChoiceUpdate(ctx, lastBlockHash, lastBlockHash, nil); err != nil { + b.logger.Warn("failed to update fork choice", "err", err) + } + inserted = 0 + } } return next(k, nil, nil) }, etl.TransformArgs{}); err != nil { diff --git a/cl/phase1/stages/clstages.go b/cl/phase1/stages/clstages.go index 2118800f4d1..4dd3882c640 100644 --- a/cl/phase1/stages/clstages.go +++ b/cl/phase1/stages/clstages.go @@ -23,7 +23,6 @@ import ( "github.com/ledgerwatch/erigon/cl/persistence" "github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies" "github.com/ledgerwatch/erigon/cl/persistence/blob_storage" - "github.com/ledgerwatch/erigon/cl/persistence/db_config" state_accessors "github.com/ledgerwatch/erigon/cl/persistence/state" "github.com/ledgerwatch/erigon/cl/phase1/core/state" "github.com/ledgerwatch/erigon/cl/phase1/execution_client" @@ -50,7 +49,6 @@ type Cfg struct { forkChoice *forkchoice.ForkChoiceStore indiciesDB kv.RwDB tmpdir string - dbConfig db_config.DatabaseConfiguration blockReader freezeblocks.BeaconSnapshotReader antiquary *antiquary.Antiquary syncedData *synced_data.SyncedDataManager @@ -85,7 +83,7 @@ func ClStagesCfg( sn *freezeblocks.CaplinSnapshots, blockReader freezeblocks.BeaconSnapshotReader, tmpdir string, - dbConfig db_config.DatabaseConfiguration, + syncBackLoopLimit uint64, backfilling bool, blobBackfilling bool, syncedData *synced_data.SyncedDataManager, @@ -104,14 +102,13 @@ func ClStagesCfg( forkChoice: forkChoice, tmpdir: tmpdir, indiciesDB: indiciesDB, - dbConfig: dbConfig, sn: sn, blockReader: blockReader, backfilling: backfilling, syncedData: syncedData, emitter: emitters, blobStore: blobStore, - blockCollector: block_collector.NewBlockCollector(log.Root(), executionClient, beaconCfg, tmpdir), + blockCollector: block_collector.NewBlockCollector(log.Root(), executionClient, beaconCfg, syncBackLoopLimit, tmpdir), blobBackfilling: blobBackfilling, attestationDataProducer: attestationDataProducer, } diff --git a/cmd/caplin/caplin1/run.go b/cmd/caplin/caplin1/run.go index e135a47a0a5..987a10addeb 100644 --- a/cmd/caplin/caplin1/run.go +++ b/cmd/caplin/caplin1/run.go @@ -38,7 +38,6 @@ import ( "github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies" "github.com/ledgerwatch/erigon/cl/persistence/blob_storage" - "github.com/ledgerwatch/erigon/cl/persistence/db_config" "github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format" state_accessors "github.com/ledgerwatch/erigon/cl/persistence/state" "github.com/ledgerwatch/erigon/cl/persistence/state/historical_states_reader" @@ -60,7 +59,6 @@ import ( ) func OpenCaplinDatabase(ctx context.Context, - databaseConfig db_config.DatabaseConfiguration, beaconConfig *clparams.BeaconChainConfig, ethClock eth_clock.EthereumClock, dbPath string, @@ -89,10 +87,6 @@ func OpenCaplinDatabase(ctx context.Context, } defer tx.Rollback() - if err := db_config.WriteConfigurationIfNotExist(ctx, tx, databaseConfig); err != nil { - return nil, nil, err - } - if err := tx.Commit(); err != nil { return nil, nil, err } @@ -234,11 +228,6 @@ func RunCaplinPhase1(ctx context.Context, engine execution_client.ExecutionEngin } defer tx.Rollback() - dbConfig, err := db_config.ReadConfiguration(ctx, tx) - if err != nil { - return err - } - if err := state_accessors.InitializeStaticTables(tx, state); err != nil { return err } @@ -310,7 +299,7 @@ func RunCaplinPhase1(ctx context.Context, engine execution_client.ExecutionEngin log.Info("Beacon API started", "addr", config.BeaconRouter.Address) } - stageCfg := stages.ClStagesCfg(beaconRpc, antiq, ethClock, beaconConfig, state, engine, gossipManager, forkChoice, indexDB, csn, rcsn, dirs.Tmp, dbConfig, backfilling, blobBackfilling, syncedDataManager, emitters, blobStorage, attestationProducer) + stageCfg := stages.ClStagesCfg(beaconRpc, antiq, ethClock, beaconConfig, state, engine, gossipManager, forkChoice, indexDB, csn, rcsn, dirs.Tmp, uint64(config.LoopBlockLimit), backfilling, blobBackfilling, syncedDataManager, emitters, blobStorage, attestationProducer) sync := stages.ConsensusClStages(ctx, stageCfg) logger.Info("[Caplin] starting clstages loop") diff --git a/eth/backend.go b/eth/backend.go index 256db206c3e..5b371913e87 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -78,7 +78,6 @@ import ( libtypes "github.com/ledgerwatch/erigon-lib/types" "github.com/ledgerwatch/erigon-lib/wrap" "github.com/ledgerwatch/erigon/cl/clparams" - "github.com/ledgerwatch/erigon/cl/persistence/db_config" "github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format/getters" clcore "github.com/ledgerwatch/erigon/cl/phase1/core" executionclient "github.com/ledgerwatch/erigon/cl/phase1/execution_client" @@ -939,7 +938,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger pruneBlobDistance = math.MaxUint64 } - indiciesDB, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, db_config.DefaultDatabaseConfiguration, beaconCfg, ethClock, dirs.CaplinIndexing, dirs.CaplinBlobs, executionEngine, false, pruneBlobDistance) + indiciesDB, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, beaconCfg, ethClock, dirs.CaplinIndexing, dirs.CaplinBlobs, executionEngine, false, pruneBlobDistance) if err != nil { return nil, err } From 252d103f00e9e38c3e91a7204c55005294663417 Mon Sep 17 00:00:00 2001 From: Giulio Date: Tue, 14 May 2024 20:47:47 +0200 Subject: [PATCH 02/12] svaE --- .../engine_block_downloader/block_downloader.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/turbo/engineapi/engine_block_downloader/block_downloader.go b/turbo/engineapi/engine_block_downloader/block_downloader.go index 510c5cf4fa4..685850aa038 100644 --- a/turbo/engineapi/engine_block_downloader/block_downloader.go +++ b/turbo/engineapi/engine_block_downloader/block_downloader.go @@ -234,6 +234,7 @@ func (e *EngineBlockDownloader) insertHeadersAndBodies(ctx context.Context, tx k if err != nil { return err } + inserted := uint64(0) log.Info("Beginning downloaded blocks insertion") // Start by seeking headers @@ -245,6 +246,15 @@ func (e *EngineBlockDownloader) insertHeadersAndBodies(ctx context.Context, tx k if err := e.chainRW.InsertBlocksAndWait(ctx, blocksBatch); err != nil { return err } + inserted += uint64(len(blocksBatch)) + if inserted >= uint64(e.syncCfg.LoopBlockLimit) { + lastHash := blocksBatch[len(blocksBatch)-1].Hash() + if _, _, _, err := e.chainRW.UpdateForkChoice(ctx, lastHash, lastHash, lastHash); err != nil { + return err + } + e.chainRW.Ready(ctx) + return nil + } blocksBatch = blocksBatch[:0] } header := new(types.Header) From 81b8b9541ac864c3f2e291e3f3289cb5068452f9 Mon Sep 17 00:00:00 2001 From: Giulio Date: Tue, 14 May 2024 20:53:27 +0200 Subject: [PATCH 03/12] svaE --- cmd/capcli/cli.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/cmd/capcli/cli.go b/cmd/capcli/cli.go index f088b60279f..fbbb54422e3 100644 --- a/cmd/capcli/cli.go +++ b/cmd/capcli/cli.go @@ -33,7 +33,6 @@ import ( "github.com/ledgerwatch/erigon-lib/downloader/snaptype" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies" - "github.com/ledgerwatch/erigon/cl/persistence/db_config" "github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format" "github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format/getters" state_accessors "github.com/ledgerwatch/erigon/cl/persistence/state" @@ -133,7 +132,7 @@ func (c *Chain) Run(ctx *Context) error { return err } ethClock := eth_clock.NewEthereumClock(bs.GenesisTime(), bs.GenesisValidatorsRoot(), beaconConfig) - db, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, ethClock, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0) + db, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, beaconConfig, ethClock, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0) if err != nil { return err } @@ -185,7 +184,7 @@ func (c *ChainEndpoint) Run(ctx *Context) error { ethClock := eth_clock.NewEthereumClock(bs.GenesisTime(), bs.GenesisValidatorsRoot(), beaconConfig) dirs := datadir.New(c.Datadir) - db, _, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, ethClock, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0) + db, _, err := caplin1.OpenCaplinDatabase(ctx, beaconConfig, ethClock, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0) if err != nil { return err } @@ -306,7 +305,7 @@ func (c *DumpSnapshots) Run(ctx *Context) error { dirs := datadir.New(c.Datadir) log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler)) - db, _, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0) + db, _, err := caplin1.OpenCaplinDatabase(ctx, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0) if err != nil { return err } @@ -346,7 +345,7 @@ func (c *CheckSnapshots) Run(ctx *Context) error { dirs := datadir.New(c.Datadir) log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler)) - db, _, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0) + db, _, err := caplin1.OpenCaplinDatabase(ctx, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0) if err != nil { return err } @@ -426,7 +425,7 @@ func (c *LoopSnapshots) Run(ctx *Context) error { dirs := datadir.New(c.Datadir) log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler)) - db, _, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0) + db, _, err := caplin1.OpenCaplinDatabase(ctx, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0) if err != nil { return err } @@ -474,7 +473,7 @@ func (r *RetrieveHistoricalState) Run(ctx *Context) error { return err } dirs := datadir.New(r.Datadir) - db, _, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0) + db, _, err := caplin1.OpenCaplinDatabase(ctx, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0) if err != nil { return err } @@ -822,7 +821,7 @@ func (b *BlobArchiveStoreCheck) Run(ctx *Context) error { dirs := datadir.New(b.Datadir) - db, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0) + db, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0) if err != nil { return err } @@ -892,7 +891,7 @@ func (c *DumpBlobsSnapshots) Run(ctx *Context) error { dirs := datadir.New(c.Datadir) log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler)) - db, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0) + db, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0) if err != nil { return err } @@ -933,7 +932,7 @@ func (c *CheckBlobsSnapshots) Run(ctx *Context) error { dirs := datadir.New(c.Datadir) log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler)) - db, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0) + db, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0) if err != nil { return err } From 40c1aaa993f903b4452512ae7d56c343648ce6ac Mon Sep 17 00:00:00 2001 From: Giulio Date: Tue, 14 May 2024 21:58:17 +0200 Subject: [PATCH 04/12] svaE --- turbo/engineapi/engine_block_downloader/block_downloader.go | 1 + 1 file changed, 1 insertion(+) diff --git a/turbo/engineapi/engine_block_downloader/block_downloader.go b/turbo/engineapi/engine_block_downloader/block_downloader.go index 685850aa038..5718477485c 100644 --- a/turbo/engineapi/engine_block_downloader/block_downloader.go +++ b/turbo/engineapi/engine_block_downloader/block_downloader.go @@ -252,6 +252,7 @@ func (e *EngineBlockDownloader) insertHeadersAndBodies(ctx context.Context, tx k if _, _, _, err := e.chainRW.UpdateForkChoice(ctx, lastHash, lastHash, lastHash); err != nil { return err } + inserted = 0 e.chainRW.Ready(ctx) return nil } From 1a76b70a3087ac01b99d2270cfc7d48a1a8422c9 Mon Sep 17 00:00:00 2001 From: Giulio Date: Tue, 14 May 2024 22:00:01 +0200 Subject: [PATCH 05/12] svaE --- cl/phase1/execution_client/block_collector/block_collector.go | 2 +- .../execution_client/block_collector/block_collector_test.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cl/phase1/execution_client/block_collector/block_collector.go b/cl/phase1/execution_client/block_collector/block_collector.go index 6609271683c..9e42e00f252 100644 --- a/cl/phase1/execution_client/block_collector/block_collector.go +++ b/cl/phase1/execution_client/block_collector/block_collector.go @@ -120,7 +120,6 @@ func (b *blockCollector) Flush(ctx context.Context) error { } inserted += uint64(len(blocksBatch)) b.logger.Info("[Caplin] Inserted blocks", "progress", blocksBatch[len(blocksBatch)-1].NumberU64()) - blocksBatch = []*types.Block{} // If we have inserted enough blocks, update fork choice (Optimation for E35) lastBlockHash := blocksBatch[len(blocksBatch)-1].Hash() if inserted >= b.syncBackLoop { @@ -129,6 +128,7 @@ func (b *blockCollector) Flush(ctx context.Context) error { } inserted = 0 } + blocksBatch = []*types.Block{} } return next(k, nil, nil) }, etl.TransformArgs{}); err != nil { diff --git a/cl/phase1/execution_client/block_collector/block_collector_test.go b/cl/phase1/execution_client/block_collector/block_collector_test.go index dc42cfd38ba..3e36726be12 100644 --- a/cl/phase1/execution_client/block_collector/block_collector_test.go +++ b/cl/phase1/execution_client/block_collector/block_collector_test.go @@ -2,6 +2,7 @@ package block_collector_test import ( "context" + "math" "testing" "github.com/ledgerwatch/erigon/cl/antiquary/tests" @@ -32,7 +33,7 @@ func TestBlockCollectorAccumulateAndFlush(t *testing.T) { } return nil }) - bc := block_collector.NewBlockCollector(log.Root(), engine, &clparams.MainnetBeaconConfig, ".") + bc := block_collector.NewBlockCollector(log.Root(), engine, &clparams.MainnetBeaconConfig, math.MaxUint64, ".") for _, block := range blocks { err := bc.AddBlock(block.Block) if err != nil { From b1ea49f78c39ca80c9184ac80c21714fa62491f9 Mon Sep 17 00:00:00 2001 From: Giulio Date: Tue, 14 May 2024 23:39:00 +0200 Subject: [PATCH 06/12] save --- turbo/execution/eth1/eth1_chain_reader.go/chain_reader.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/turbo/execution/eth1/eth1_chain_reader.go/chain_reader.go b/turbo/execution/eth1/eth1_chain_reader.go/chain_reader.go index 14b8b0db66f..20913e9af99 100644 --- a/turbo/execution/eth1/eth1_chain_reader.go/chain_reader.go +++ b/turbo/execution/eth1/eth1_chain_reader.go/chain_reader.go @@ -280,10 +280,6 @@ func (c ChainReaderWriterEth1) InsertBlocksAndWait(ctx context.Context, blocks [ return err } - // limit the number of retries - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - for response.Result == execution.ExecutionStatus_Busy { const retryDelay = 100 * time.Millisecond select { From 37224e8b7da1c12e5d2844b0df4ca3172db63c73 Mon Sep 17 00:00:00 2001 From: Giulio Date: Wed, 15 May 2024 00:01:46 +0200 Subject: [PATCH 07/12] save --- turbo/engineapi/engine_block_downloader/block_downloader.go | 1 - 1 file changed, 1 deletion(-) diff --git a/turbo/engineapi/engine_block_downloader/block_downloader.go b/turbo/engineapi/engine_block_downloader/block_downloader.go index 5718477485c..750fcb8cb58 100644 --- a/turbo/engineapi/engine_block_downloader/block_downloader.go +++ b/turbo/engineapi/engine_block_downloader/block_downloader.go @@ -253,7 +253,6 @@ func (e *EngineBlockDownloader) insertHeadersAndBodies(ctx context.Context, tx k return err } inserted = 0 - e.chainRW.Ready(ctx) return nil } blocksBatch = blocksBatch[:0] From 7a5d74aba24810a34d9b0e2e5b8713f74535d429 Mon Sep 17 00:00:00 2001 From: Giulio Date: Wed, 15 May 2024 00:09:22 +0200 Subject: [PATCH 08/12] save --- eth/stagedsync/stage_snapshots.go | 1 - 1 file changed, 1 deletion(-) diff --git a/eth/stagedsync/stage_snapshots.go b/eth/stagedsync/stage_snapshots.go index d489c977ab2..f0774186b7d 100644 --- a/eth/stagedsync/stage_snapshots.go +++ b/eth/stagedsync/stage_snapshots.go @@ -423,7 +423,6 @@ func FillDBFromSnapshots(logPrefix string, ctx context.Context, tx kv.RwTx, dirs func computeBlocksToPrune(cfg SnapshotsCfg) (blocksToPrune uint64, historyToPrune uint64) { frozenBlocks := cfg.blockReader.Snapshots().SegmentsMax() - fmt.Println("O", cfg.prune.Blocks.PruneTo(frozenBlocks), cfg.prune.History.PruneTo(frozenBlocks)) return frozenBlocks - cfg.prune.Blocks.PruneTo(frozenBlocks), frozenBlocks - cfg.prune.History.PruneTo(frozenBlocks) } From 8ea4003cb89e9b19c3aa592f96eaa1fd3b0db0b6 Mon Sep 17 00:00:00 2001 From: Giulio Date: Wed, 15 May 2024 00:19:30 +0200 Subject: [PATCH 09/12] save --- cmd/caplin/main.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/caplin/main.go b/cmd/caplin/main.go index 77ac2efe9e4..33899789d29 100644 --- a/cmd/caplin/main.go +++ b/cmd/caplin/main.go @@ -20,7 +20,6 @@ import ( "github.com/ledgerwatch/erigon-lib/common/disk" "github.com/ledgerwatch/erigon-lib/common/mem" "github.com/ledgerwatch/erigon/cl/beacon/beacon_router_configuration" - "github.com/ledgerwatch/erigon/cl/persistence/db_config" "github.com/ledgerwatch/erigon/cl/phase1/core" "github.com/ledgerwatch/erigon/cl/phase1/core/state" execution_client2 "github.com/ledgerwatch/erigon/cl/phase1/execution_client" @@ -131,7 +130,7 @@ func runCaplinNode(cliCtx *cli.Context) error { executionEngine = cc } - indiciesDB, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, db_config.DefaultDatabaseConfiguration, cfg.BeaconCfg, ethClock, cfg.Dirs.CaplinIndexing, cfg.Dirs.CaplinBlobs, executionEngine, false, 100_000) + indiciesDB, blobStorage, err := caplin1.OpenCaplinDatabase(ctx, cfg.BeaconCfg, ethClock, cfg.Dirs.CaplinIndexing, cfg.Dirs.CaplinBlobs, executionEngine, false, 100_000) if err != nil { return err } From 0da8305c56613a25619d8a8112f5e6d129c6edfa Mon Sep 17 00:00:00 2001 From: Giulio Date: Wed, 15 May 2024 00:41:35 +0200 Subject: [PATCH 10/12] save --- turbo/engineapi/engine_block_downloader/block_downloader.go | 1 - 1 file changed, 1 deletion(-) diff --git a/turbo/engineapi/engine_block_downloader/block_downloader.go b/turbo/engineapi/engine_block_downloader/block_downloader.go index 750fcb8cb58..2b16a904c64 100644 --- a/turbo/engineapi/engine_block_downloader/block_downloader.go +++ b/turbo/engineapi/engine_block_downloader/block_downloader.go @@ -253,7 +253,6 @@ func (e *EngineBlockDownloader) insertHeadersAndBodies(ctx context.Context, tx k return err } inserted = 0 - return nil } blocksBatch = blocksBatch[:0] } From 0d9bd9e82a5e670f8043210989b9e3b9d0252cf4 Mon Sep 17 00:00:00 2001 From: Giulio Date: Wed, 15 May 2024 02:12:56 +0200 Subject: [PATCH 11/12] save --- eth/stagedsync/exec3.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index ce44320cc66..ca06e245118 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -288,6 +288,7 @@ func ExecV3(ctx context.Context, } blockNum = doms.BlockNum() + initialBlockNum := blockNum outputTxNum.Store(doms.TxNum()) var err error @@ -873,7 +874,9 @@ Loop: tt = time.Now() - if !useExternalTx { + pruneBlockMargin := uint64(100) + + if blockNum-initialBlockNum > pruneBlockMargin { if _, err := applyTx.(state2.HasAggTx).AggTx().(*state2.AggregatorRoTx).PruneSmallBatches(ctx, 10*time.Minute, applyTx); err != nil { return err } From 4bfa33eb76ad6fffe27bfd61333419cb45f476f2 Mon Sep 17 00:00:00 2001 From: Giulio Date: Wed, 15 May 2024 02:13:36 +0200 Subject: [PATCH 12/12] save --- eth/stagedsync/exec3.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index ca06e245118..6852bb1b84c 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -873,7 +873,8 @@ Loop: t1 = time.Since(tt) tt = time.Now() - + // If execute more than 100 blocks then, it is safe to assume that we are not on the tip of the chain. + // In this case, we can prune the state to save memory. pruneBlockMargin := uint64(100) if blockNum-initialBlockNum > pruneBlockMargin {