Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BeginTemporalRo #13046

Merged
merged 34 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
uses: golangci/golangci-lint-action@v6
with:
version: v1.62.2
args: --help
skip-cache: true

- name: Lint
if: runner.os == 'Linux'
Expand Down
14 changes: 7 additions & 7 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func NewTestSimulatedBackendWithConfig(t *testing.T, alloc types.GenesisAlloc, c
t.Cleanup(b.Close)
return b
}
func (b *SimulatedBackend) DB() kv.RwDB { return b.m.DB }
func (b *SimulatedBackend) DB() kv.TemporalRwDB { return b.m.DB }
func (b *SimulatedBackend) Agg() *state2.Aggregator { return b.m.HistoryV3Components() }
func (b *SimulatedBackend) HistoryV3() bool { return b.m.HistoryV3 }
func (b *SimulatedBackend) Engine() consensus.Engine { return b.m.Engine }
Expand Down Expand Up @@ -190,7 +190,7 @@ func (b *SimulatedBackend) emptyPendingBlock() {
}

// stateByBlockNumber retrieves a state by a given blocknumber.
func (b *SimulatedBackend) stateByBlockNumber(db kv.Tx, blockNumber *big.Int) *state.IntraBlockState {
func (b *SimulatedBackend) stateByBlockNumber(db kv.TemporalTx, blockNumber *big.Int) *state.IntraBlockState {
if blockNumber == nil || blockNumber.Cmp(b.pendingBlock.Number()) == 0 {
return state.New(b.m.NewHistoryStateReader(b.pendingBlock.NumberU64()+1, db))
}
Expand All @@ -201,7 +201,7 @@ func (b *SimulatedBackend) stateByBlockNumber(db kv.Tx, blockNumber *big.Int) *s
func (b *SimulatedBackend) CodeAt(ctx context.Context, contract libcommon.Address, blockNumber *big.Int) ([]byte, error) {
b.mu.Lock()
defer b.mu.Unlock()
tx, err := b.m.DB.BeginRo(context.Background())
tx, err := b.m.DB.BeginTemporalRo(context.Background())
if err != nil {
return nil, err
}
Expand All @@ -214,7 +214,7 @@ func (b *SimulatedBackend) CodeAt(ctx context.Context, contract libcommon.Addres
func (b *SimulatedBackend) BalanceAt(ctx context.Context, contract libcommon.Address, blockNumber *big.Int) (*uint256.Int, error) {
b.mu.Lock()
defer b.mu.Unlock()
tx, err := b.m.DB.BeginRo(context.Background())
tx, err := b.m.DB.BeginTemporalRo(context.Background())
if err != nil {
return nil, err
}
Expand All @@ -227,7 +227,7 @@ func (b *SimulatedBackend) BalanceAt(ctx context.Context, contract libcommon.Add
func (b *SimulatedBackend) NonceAt(ctx context.Context, contract libcommon.Address, blockNumber *big.Int) (uint64, error) {
b.mu.Lock()
defer b.mu.Unlock()
tx, err := b.m.DB.BeginRo(context.Background())
tx, err := b.m.DB.BeginTemporalRo(context.Background())
if err != nil {
return 0, err
}
Expand All @@ -241,7 +241,7 @@ func (b *SimulatedBackend) NonceAt(ctx context.Context, contract libcommon.Addre
func (b *SimulatedBackend) StorageAt(ctx context.Context, contract libcommon.Address, key libcommon.Hash, blockNumber *big.Int) ([]byte, error) {
b.mu.Lock()
defer b.mu.Unlock()
tx, err := b.m.DB.BeginRo(context.Background())
tx, err := b.m.DB.BeginTemporalRo(context.Background())
if err != nil {
return nil, err
}
Expand All @@ -258,7 +258,7 @@ func (b *SimulatedBackend) TransactionReceipt(ctx context.Context, txHash libcom
b.mu.Lock()
defer b.mu.Unlock()

tx, err := b.m.DB.BeginRo(context.Background())
tx, err := b.m.DB.BeginTemporalRo(context.Background())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion accounts/abi/bind/backends/simulated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestNewSimulatedBackend(t *testing.T) {
if sim.m.ChainConfig != params.TestChainConfig {
t.Errorf("expected sim blockchain config to equal params.TestChainConfig, got %v", sim.m.ChainConfig)
}
tx, err1 := sim.DB().BeginRo(context.Background())
tx, err1 := sim.DB().BeginTemporalRo(context.Background())
if err1 != nil {
t.Errorf("TestNewSimulatedBackend create tx: %v", err1)
}
Expand Down
3 changes: 1 addition & 2 deletions cmd/devnet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ import (

"github.com/urfave/cli/v2"

"github.com/erigontech/erigon-lib/log/v3"

"github.com/erigontech/erigon-lib/chain/networkname"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/cmd/devnet/accounts"
_ "github.com/erigontech/erigon/cmd/devnet/accounts/steps"
_ "github.com/erigontech/erigon/cmd/devnet/admin"
Expand Down
2 changes: 1 addition & 1 deletion cmd/devnet/services/polygon/proofgenerator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (rg *requestGenerator) GetTransactionReceipt(ctx context.Context, hash libc
chain: rg.chain,
}

tx, err := rg.sentry.DB.BeginRo(context.Background())
tx, err := rg.sentry.DB.BeginTemporalRo(context.Background())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/evm/internal/t8ntool/transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func Main(ctx *cli.Context) error {
defer db.Close()
defer agg.Close()

tx, err := db.BeginRw(context.Background())
tx, err := db.BeginTemporalRw(context.Background())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/evm/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func runCmd(ctx *cli.Context) error {
if err != nil {
return err
}
tx, err := tdb.BeginRw(context.Background())
tx, err := tdb.BeginTemporalRw(context.Background())
if err != nil {
return err
}
Expand Down
53 changes: 23 additions & 30 deletions cmd/integration/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,15 @@ import (
"path/filepath"
"strings"

"github.com/erigontech/erigon/migrations"
"github.com/spf13/cobra"
"golang.org/x/sync/semaphore"

"github.com/erigontech/erigon-lib/kv/temporal"
"github.com/erigontech/erigon-lib/log/v3"

"github.com/erigontech/erigon-lib/kv"
kv2 "github.com/erigontech/erigon-lib/kv/mdbx"

"github.com/erigontech/erigon-lib/kv/temporal"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/cmd/utils"
"github.com/erigontech/erigon/migrations"
"github.com/erigontech/erigon/turbo/debug"
"github.com/erigontech/erigon/turbo/logging"
)
Expand Down Expand Up @@ -88,34 +86,29 @@ func dbCfg(label kv.Label, path string) kv2.MdbxOpts {
return opts
}

func openDB(opts kv2.MdbxOpts, applyMigrations bool, logger log.Logger) (kv.RwDB, error) {
db := opts.MustOpen()
if opts.GetLabel() == kv.ChainDB {
if applyMigrations {
migrator := migrations.NewMigrator(opts.GetLabel())
has, err := migrator.HasPendingMigrations(db)
if err != nil {
return nil, err
}
if has {
logger.Info("Re-Opening DB in exclusive mode to apply DB migrations")
db.Close()
db = opts.Exclusive(true).MustOpen()
if err := migrator.Apply(db, datadirCli, "", logger); err != nil {
return nil, err
}
db.Close()
db = opts.MustOpen()
}
}

_, _, agg, _, _, _ := allSnapshots(context.Background(), db, logger)
tdb, err := temporal.New(db, agg)
func openDB(opts kv2.MdbxOpts, applyMigrations bool, logger log.Logger) (tdb kv.TemporalRwDB, err error) {
if opts.GetLabel() != kv.ChainDB {
panic(opts.GetLabel())
}
rawDB := opts.MustOpen()
if applyMigrations {
migrator := migrations.NewMigrator(opts.GetLabel())
has, err := migrator.HasPendingMigrations(rawDB)
if err != nil {
return nil, err
}
db = tdb
if has {
logger.Info("Re-Opening DB in exclusive mode to apply DB migrations")
rawDB.Close()
rawDB = opts.Exclusive(true).MustOpen()
if err := migrator.Apply(rawDB, datadirCli, "", logger); err != nil {
return nil, err
}
rawDB.Close()
rawDB = opts.MustOpen()
}
}

return db, nil
_, _, agg, _, _, _ := allSnapshots(context.Background(), rawDB, logger)
return temporal.New(rawDB, agg)
}
22 changes: 11 additions & 11 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ func init() {
rootCmd.AddCommand(cmdSetPrune)
}

func stageSnapshots(db kv.RwDB, ctx context.Context, logger log.Logger) error {
func stageSnapshots(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
Expand Down Expand Up @@ -688,7 +688,7 @@ func stageSnapshots(db kv.RwDB, ctx context.Context, logger log.Logger) error {
})
}

func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
func stageHeaders(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
dirs := datadir.New(datadirCli)
if err := datadir.ApplyMigrations(dirs); err != nil {
return err
Expand Down Expand Up @@ -780,7 +780,7 @@ func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
})
}

func stageBorHeimdall(db kv.RwDB, ctx context.Context, unwindTypes []string, logger log.Logger) error {
func stageBorHeimdall(db kv.TemporalRwDB, ctx context.Context, unwindTypes []string, logger log.Logger) error {
engine, _, sync, _, miningState := newSync(ctx, db, nil /* miningConfig */, logger)
chainConfig := fromdb.ChainConfig(db)

Expand Down Expand Up @@ -855,7 +855,7 @@ func stageBorHeimdall(db kv.RwDB, ctx context.Context, unwindTypes []string, log
return nil
}

func stageBodies(db kv.RwDB, ctx context.Context, logger log.Logger) error {
func stageBodies(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
Expand Down Expand Up @@ -893,7 +893,7 @@ func stageBodies(db kv.RwDB, ctx context.Context, logger log.Logger) error {
return nil
}

func stagePolygonSync(db kv.RwDB, ctx context.Context, logger log.Logger) error {
func stagePolygonSync(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
engine, _, stageSync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
heimdallClient := engine.(*bor.Bor).HeimdallClient
sn, borSn, agg, _, bridgeStore, heimdallStore := allSnapshots(ctx, db, logger)
Expand Down Expand Up @@ -924,7 +924,7 @@ func stagePolygonSync(db kv.RwDB, ctx context.Context, logger log.Logger) error
})
}

func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
func stageSenders(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
tmpdir := datadir.New(datadirCli).Tmp
chainConfig := fromdb.ChainConfig(db)
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
Expand Down Expand Up @@ -1017,7 +1017,7 @@ func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
return tx.Commit()
}

func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error {
func stageExec(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
dirs := datadir.New(datadirCli)
if err := datadir.ApplyMigrations(dirs); err != nil {
return err
Expand Down Expand Up @@ -1160,7 +1160,7 @@ func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error {
return nil
}

func stageCustomTrace(db kv.RwDB, ctx context.Context, logger log.Logger) error {
func stageCustomTrace(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
dirs := datadir.New(datadirCli)
if err := datadir.ApplyMigrations(dirs); err != nil {
return err
Expand Down Expand Up @@ -1201,7 +1201,7 @@ func stageCustomTrace(db kv.RwDB, ctx context.Context, logger log.Logger) error
return nil
}

func stagePatriciaTrie(db kv.RwDB, ctx context.Context, logger log.Logger) error {
func stagePatriciaTrie(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
dirs, pm := datadir.New(datadirCli), fromdb.PruneMode(db)
_ = pm
sn, _, agg, _, _, _ := allSnapshots(ctx, db, logger)
Expand All @@ -1223,7 +1223,7 @@ func stagePatriciaTrie(db kv.RwDB, ctx context.Context, logger log.Logger) error
return nil
}

func stageTxLookup(db kv.RwDB, ctx context.Context, logger log.Logger) error {
func stageTxLookup(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
dirs, pm := datadir.New(datadirCli), fromdb.PruneMode(db)
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
chainConfig := fromdb.ChainConfig(db)
Expand Down Expand Up @@ -1405,7 +1405,7 @@ func blocksIO(db kv.RoDB, logger log.Logger) (services.FullBlockReader, *blockio

const blockBufferSize = 128

func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig, logger log.Logger) (consensus.Engine, *vm.Config, *stagedsync.Sync, *stagedsync.Sync, stagedsync.MiningState) {
func newSync(ctx context.Context, db kv.TemporalRwDB, miningConfig *params.MiningConfig, logger log.Logger) (consensus.Engine, *vm.Config, *stagedsync.Sync, *stagedsync.Sync, stagedsync.MiningState) {
dirs, pm := datadir.New(datadirCli), fromdb.PruneMode(db)

vmConfig := &vm.Config{}
Expand Down
4 changes: 2 additions & 2 deletions cmd/integration/commands/state_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func init() {
rootCmd.AddCommand(loopExecCmd)
}

func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context.Context, logger1 log.Logger) error {
func syncBySmallSteps(db kv.TemporalRwDB, miningConfig params.MiningConfig, ctx context.Context, logger1 log.Logger) error {
dirs := datadir.New(datadirCli)
if err := datadir.ApplyMigrations(dirs); err != nil {
return err
Expand Down Expand Up @@ -384,7 +384,7 @@ func checkMinedBlock(b1, b2 *types.Block, chainConfig *chain2.Config) {
}
}

func loopExec(db kv.RwDB, ctx context.Context, unwind uint64, logger log.Logger) error {
func loopExec(db kv.TemporalRwDB, ctx context.Context, unwind uint64, logger log.Logger) error {
chainConfig := fromdb.ChainConfig(db)
dirs, pm := datadir.New(datadirCli), fromdb.PruneMode(db)
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
Expand Down
24 changes: 11 additions & 13 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func EmbeddedServices(ctx context.Context,
// RemoteServices - use when RPCDaemon run as independent process. Still it can use --datadir flag to enable
// `cfg.WithDatadir` (mode when it on 1 machine with Erigon)
func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger, rootCancel context.CancelFunc) (
db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient,
db kv.TemporalRoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient,
stateCache kvcache.Cache, blockReader services.FullBlockReader, engine consensus.EngineReader,
ff *rpchelper.Filters, bridgeReader BridgeReader, heimdallReader HeimdallReader, err error) {
if !cfg.WithDatadir && cfg.PrivateApiAddr == "" {
Expand Down Expand Up @@ -381,19 +381,17 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
// Accede mode preventing db-creation:
// at first start RpcDaemon may start earlier than Erigon
// Accede mode will check db existence (may wait with retries). It's ok to fail in this case - some supervisor will restart us.
var rwKv kv.RwDB
logger.Warn("Opening chain db", "path", cfg.Dirs.Chaindata)
limiter := semaphore.NewWeighted(roTxLimit)
rwKv, err = kv2.New(kv.ChainDB, logger).RoTxsLimiter(limiter).Path(cfg.Dirs.Chaindata).Accede(true).Open(ctx)
rawDB, err := kv2.New(kv.ChainDB, logger).RoTxsLimiter(limiter).Path(cfg.Dirs.Chaindata).Accede(true).Open(ctx)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, err
}
if compatErr := checkDbCompatibility(ctx, rwKv); compatErr != nil {
if compatErr := checkDbCompatibility(ctx, rawDB); compatErr != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, compatErr
}
db = rwKv

if err := db.View(context.Background(), func(tx kv.Tx) error {
if err := rawDB.View(context.Background(), func(tx kv.Tx) error {
genesisHash, err := rawdb.ReadCanonicalHash(tx, 0)
if err != nil {
return err
Expand All @@ -418,14 +416,14 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
heimdallStore = heimdall.NewSnapshotStore(heimdall.NewMdbxStore(logger, cfg.Dirs.DataDir, roTxLimit), allBorSnapshots)
bridgeStore = bridge.NewSnapshotStore(bridge.NewMdbxStore(cfg.Dirs.DataDir, logger, true, roTxLimit), allBorSnapshots, cc.Bor)
} else {
bridgeStore = bridge.NewSnapshotStore(bridge.NewDbStore(db), allBorSnapshots, cc.Bor)
heimdallStore = heimdall.NewSnapshotStore(heimdall.NewDbStore(db), allBorSnapshots)
bridgeStore = bridge.NewSnapshotStore(bridge.NewDbStore(rawDB), allBorSnapshots, cc.Bor)
heimdallStore = heimdall.NewSnapshotStore(heimdall.NewDbStore(rawDB), allBorSnapshots)
}

blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots, heimdallStore, bridgeStore)
txNumsReader := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, blockReader))

agg, err := libstate.NewAggregator(ctx, cfg.Dirs, config3.DefaultStepSize, db, logger)
agg, err := libstate.NewAggregator(ctx, cfg.Dirs, config3.DefaultStepSize, rawDB, logger)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, fmt.Errorf("create aggregator: %w", err)
}
Expand All @@ -434,7 +432,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger

//TODO - its probably better to use: <-blockReader.Ready() here - but it depends how
//this is called at a process level
allSegmentsDownloadComplete, err := rawdb.AllSegmentsDownloadCompleteFromDB(rwKv)
allSegmentsDownloadComplete, err := rawdb.AllSegmentsDownloadCompleteFromDB(rawDB)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err
}
Expand All @@ -446,7 +444,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
allBorSnapshots.LogStat("bor:remote")
_ = agg.OpenFolder() //TODO: must use analog of `OptimisticReopenWithDB`

db.View(context.Background(), func(tx kv.Tx) error {
rawDB.View(context.Background(), func(tx kv.Tx) error {
aggTx := agg.BeginFilesRo()
defer aggTx.Close()
aggTx.LogStats(tx, func(endTxNumMinimax uint64) (uint64, error) {
Expand Down Expand Up @@ -482,7 +480,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
if err = agg.OpenFolder(); err != nil {
logger.Error("[snapshots] reopen", "err", err)
} else {
db.View(context.Background(), func(tx kv.Tx) error {
rawDB.View(context.Background(), func(tx kv.Tx) error {
ac := agg.BeginFilesRo()
defer ac.Close()
ac.LogStats(tx, func(endTxNumMinimax uint64) (uint64, error) {
Expand All @@ -497,7 +495,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
}
onNewSnapshot()

db, err = temporal.New(rwKv, agg)
db, err = temporal.New(rawDB, agg)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err
}
Expand Down
Loading
Loading