From 4b50a074c1c6f2095504ad1314195db29827c835 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Fri, 6 Oct 2023 12:50:39 -0600 Subject: [PATCH 01/28] arbitrum/handler_p2p : initial --- arbitrum/handler_p2p.go | 152 ++++++++++++++++++++++++++++++++ arbitrum/sync_test.go | 189 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 341 insertions(+) create mode 100644 arbitrum/handler_p2p.go create mode 100644 arbitrum/sync_test.go diff --git a/arbitrum/handler_p2p.go b/arbitrum/handler_p2p.go new file mode 100644 index 0000000000..22af972bcd --- /dev/null +++ b/arbitrum/handler_p2p.go @@ -0,0 +1,152 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package arbitrum + +import ( + "fmt" + "sync/atomic" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/eth/protocols/eth" + "github.com/ethereum/go-ethereum/eth/protocols/snap" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" +) + +type protocolHandler struct { + chain *core.BlockChain + eventMux *event.TypeMux + downloader *downloader.Downloader + done atomic.Bool +} + +func NewProtocolHandler(db ethdb.Database, bc *core.BlockChain) *protocolHandler { + evMux := new(event.TypeMux) + p := &protocolHandler{ + chain: bc, + eventMux: evMux, + } + peerDrop := func(id string) { + log.Info("dropping peer", "id", id) + } + success := func() { + p.done.Store(true) + log.Info("DOWNLOADER DONE") + } + p.downloader = downloader.New(db, evMux, bc, nil, peerDrop, success) + return p +} + +func (h *protocolHandler) MakeProtocols(dnsdisc enode.Iterator) []p2p.Protocol { + protos := eth.MakeProtocols((*ethHandler)(h), h.chain.Config().ChainID.Uint64(), dnsdisc) + protos = append(protos, snap.MakeProtocols((*snapHandler)(h), dnsdisc)...) + return protos +} + +// ethHandler implements the eth.Backend interface to handle the various network +// packets that are sent as replies or broadcasts. +type ethHandler protocolHandler + +func (h *ethHandler) Chain() *core.BlockChain { return h.chain } + +type dummyTxPool struct{} + +func (d *dummyTxPool) Get(hash common.Hash) *types.Transaction { + return nil +} + +func (h *ethHandler) TxPool() eth.TxPool { return &dummyTxPool{} } + +// RunPeer is invoked when a peer joins on the `eth` protocol. +func (h *ethHandler) RunPeer(peer *eth.Peer, hand eth.Handler) error { + if err := h.downloader.RegisterPeer(peer.ID(), peer.Version(), peer); err != nil { + peer.Log().Error("Failed to register peer in eth syncer", "err", err) + return err + } + log.Info("eth peer") + return hand(peer) +} + +// PeerInfo retrieves all known `eth` information about a peer. +func (h *ethHandler) PeerInfo(id enode.ID) interface{} { + return nil +} + +// AcceptTxs retrieves whether transaction processing is enabled on the node +// or if inbound transactions should simply be dropped. +func (h *ethHandler) AcceptTxs() bool { + return false +} + +// Handle is invoked from a peer's message handler when it receives a new remote +// message that the handler couldn't consume and serve itself. +func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { + // Consume any broadcasts and announces, forwarding the rest to the downloader + switch packet := packet.(type) { + case *eth.NewBlockHashesPacket: + return fmt.Errorf("unexpected eth packet type for nitro: %T", packet) + + case *eth.NewBlockPacket: + return fmt.Errorf("unexpected eth packet type for nitro: %T", packet) + + case *eth.NewPooledTransactionHashesPacket66: + return fmt.Errorf("unexpected eth packet type for nitro: %T", packet) + + case *eth.NewPooledTransactionHashesPacket68: + return fmt.Errorf("unexpected eth packet type for nitro: %T", packet) + + case *eth.TransactionsPacket: + return fmt.Errorf("unexpected eth packet type for nitro: %T", packet) + + case *eth.PooledTransactionsPacket: + return fmt.Errorf("unexpected eth packet type for nitro: %T", packet) + + default: + return fmt.Errorf("unexpected eth packet type for nitro: %T", packet) + } +} + +type snapHandler protocolHandler + +func (h *snapHandler) Chain() *core.BlockChain { return h.chain } + +// RunPeer is invoked when a peer joins on the `snap` protocol. +func (h *snapHandler) RunPeer(peer *snap.Peer, hand snap.Handler) error { + if err := h.downloader.SnapSyncer.Register(peer); err != nil { + peer.Log().Error("Failed to register peer in snap syncer", "err", err) + return err + } + log.Info("snap peer") + return hand(peer) +} + +// PeerInfo retrieves all known `snap` information about a peer. +func (h *snapHandler) PeerInfo(id enode.ID) interface{} { + return nil +} + +// Handle is invoked from a peer's message handler when it receives a new remote +// message that the handler couldn't consume and serve itself. +func (h *snapHandler) Handle(peer *snap.Peer, packet snap.Packet) error { + return h.downloader.DeliverSnapPacket(peer, packet) +} diff --git a/arbitrum/sync_test.go b/arbitrum/sync_test.go new file mode 100644 index 0000000000..4a3edc077d --- /dev/null +++ b/arbitrum/sync_test.go @@ -0,0 +1,189 @@ +package arbitrum + +import ( + "math/big" + "net" + "os" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/params" +) + +type dummyIterator struct { + lock sync.Mutex + nodes []*enode.Node //first one is never used +} + +func (i *dummyIterator) Next() bool { // moves to next node + i.lock.Lock() + defer i.lock.Unlock() + + if len(i.nodes) == 0 { + log.Info("dummy iterator: done") + return false + } + i.nodes = i.nodes[1:] + return len(i.nodes) > 0 +} + +func (i *dummyIterator) Node() *enode.Node { // returns current node + i.lock.Lock() + defer i.lock.Unlock() + if len(i.nodes) == 0 { + return nil + } + if i.nodes[0] != nil { + log.Info("dummy iterator: emit", "id", i.nodes[0].ID(), "ip", i.nodes[0].IP(), "tcp", i.nodes[0].TCP(), "udp", i.nodes[0].UDP()) + } + return i.nodes[0] +} + +func (i *dummyIterator) Close() { // ends the iterator + i.nodes = nil +} + +func TestSimpleSync(t *testing.T) { + const numBlocks = 100 + const oldBlock = 20 + + glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.TerminalFormat(false))) + glogger.Verbosity(log.Lvl(log.LvlTrace)) + log.Root().SetHandler(glogger) + + // key for source node p2p + sourceKey, err := crypto.GenerateKey() + if err != nil { + t.Fatal("generate key err:", err) + } + + // key for dest node p2p + destKey, err := crypto.GenerateKey() + if err != nil { + t.Fatal("generate key err:", err) + } + + // key for onchain user + testUser, err := crypto.GenerateKey() + if err != nil { + t.Fatal("generate key err:", err) + } + testUserAddress := crypto.PubkeyToAddress(testUser.PublicKey) + + sourceStackConf := node.DefaultConfig + sourceStackConf.DataDir = "" + sourceStackConf.P2P.NoDiscovery = true + sourceStackConf.P2P.ListenAddr = "127.0.0.1:0" + sourceStackConf.P2P.PrivateKey = sourceKey + + destStackConf := sourceStackConf + destStackConf.P2P.PrivateKey = destKey + + sourceStack, err := node.New(&sourceStackConf) + if err != nil { + t.Fatal(err) + } + + // create and populate chain + sourceDb := rawdb.NewMemoryDatabase() + gspec := &core.Genesis{ + Config: params.TestChainConfig, + Alloc: core.GenesisAlloc{testUserAddress: {Balance: new(big.Int).Lsh(big.NewInt(1), 250)}}, + } + sourceChain, _ := core.NewBlockChain(sourceDb, nil, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, nil) + signer := types.MakeSigner(sourceChain.Config(), big.NewInt(1), 0) + _, bs, _ := core.GenerateChainWithGenesis(gspec, ethash.NewFaker(), numBlocks, func(i int, gen *core.BlockGen) { + tx, err := types.SignNewTx(testUser, signer, &types.LegacyTx{ + Nonce: gen.TxNonce(testUserAddress), + GasPrice: gen.BaseFee(), + Gas: uint64(1000001), + }) + if err != nil { + t.Fatalf("failed to create tx: %v", err) + } + gen.AddTx(tx) + }) + if _, err := sourceChain.InsertChain(bs); err != nil { + t.Fatal(err) + } + + // source node + sourceHandler := NewProtocolHandler(sourceDb, sourceChain) + sourceStack.RegisterProtocols(sourceHandler.MakeProtocols(&dummyIterator{})) + sourceStack.Start() + + // figure out port of the source node and create dummy iter that points to it + sourceListenAddr := sourceStack.Server().Config.ListenAddr + splitAddr := strings.Split(sourceListenAddr, ":") + sourcePort, err := strconv.Atoi(splitAddr[len(splitAddr)-1]) + if err != nil { + t.Fatal(err) + } + sourceEnode := enode.NewV4(&sourceKey.PublicKey, net.IPv4(127, 0, 0, 1), sourcePort, 0) + iter := &dummyIterator{ + nodes: []*enode.Node{nil, sourceEnode}, + } + + // dest node + destDb := rawdb.NewMemoryDatabase() + destChain, _ := core.NewBlockChain(destDb, nil, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, nil) + destStack, err := node.New(&destStackConf) + if err != nil { + t.Fatal(err) + } + destHandler := NewProtocolHandler(destDb, destChain) + destStack.RegisterProtocols(destHandler.MakeProtocols(iter)) + destStack.Start() + + // start sync + log.Info("dest listener", "address", destStack.Server().Config.ListenAddr) + log.Info("initial source", "head", sourceChain.CurrentBlock()) + log.Info("initial dest", "head", destChain.CurrentBlock()) + err = destHandler.downloader.BeaconSync(downloader.SnapSync, sourceChain.CurrentBlock(), sourceChain.CurrentBlock()) + if err != nil { + t.Fatal(err) + } + <-time.After(time.Second * 5) + + // check sync + if sourceChain.CurrentBlock().Hash() != destChain.CurrentBlock().Hash() { + log.Info("final source", "head", sourceChain.CurrentBlock()) + log.Info("final dest", "head", destChain.CurrentBlock()) + t.Fatal("dest chain not synced to source") + } + + oldDest := destChain.GetHeaderByNumber(oldBlock) + if oldDest == nil { + t.Fatal("old dest block nil") + } + oldSource := sourceChain.GetHeaderByNumber(oldBlock) + if oldSource == nil { + t.Fatal("old source block nil") + } + if oldDest.Hash() != oldSource.Hash() { + log.Info("final source", "old", oldSource) + log.Info("final dest", "old", oldDest) + t.Fatal("dest and source differ") + } + _, err = sourceChain.StateAt(oldSource.Root) + if err != nil { + t.Fatal("source chain does not have state for old block") + } + _, err = destChain.StateAt(oldDest.Root) + if err == nil { + t.Fatal("dest chain does have state for old block, but should have been snap-synced") + } +} From b4e53cbff1518921ec0455795c8fdfcf1aadf571 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Fri, 6 Oct 2023 16:19:28 -0600 Subject: [PATCH 02/28] add pivotsync --- arbitrum/sync_test.go | 7 +++---- eth/downloader/beaconsync.go | 9 +++++++++ eth/downloader/downloader.go | 39 +++++++++++++++++++++++++++++------- 3 files changed, 44 insertions(+), 11 deletions(-) diff --git a/arbitrum/sync_test.go b/arbitrum/sync_test.go index 4a3edc077d..1b5f22189c 100644 --- a/arbitrum/sync_test.go +++ b/arbitrum/sync_test.go @@ -16,7 +16,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/enode" @@ -57,8 +56,8 @@ func (i *dummyIterator) Close() { // ends the iterator } func TestSimpleSync(t *testing.T) { - const numBlocks = 100 - const oldBlock = 20 + const numBlocks = 200 + const oldBlock = 198 glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.TerminalFormat(false))) glogger.Verbosity(log.Lvl(log.LvlTrace)) @@ -152,7 +151,7 @@ func TestSimpleSync(t *testing.T) { log.Info("dest listener", "address", destStack.Server().Config.ListenAddr) log.Info("initial source", "head", sourceChain.CurrentBlock()) log.Info("initial dest", "head", destChain.CurrentBlock()) - err = destHandler.downloader.BeaconSync(downloader.SnapSync, sourceChain.CurrentBlock(), sourceChain.CurrentBlock()) + err = destHandler.downloader.PivotSync(sourceChain.CurrentBlock()) if err != nil { t.Fatal(err) } diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index df8af68bc7..ebfe522b2e 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -164,6 +164,15 @@ func (d *Downloader) BeaconExtend(mode SyncMode, head *types.Header) error { return d.beaconSync(mode, head, nil, false) } +// PivotSync sets an explicit pivot and syncs from there. Pivot state will be read from peers. +func (d *Downloader) PivotSync(pivot *types.Header) error { + d.pivotLock.Lock() + d.pivotHeader = pivot + d.pivotExplicit = true + d.pivotLock.Unlock() + return d.beaconSync(SnapSync, pivot, nil, true) +} + // beaconSync is the post-merge version of the chain synchronization, where the // chain is not downloaded from genesis onward, rather from trusted head announces // backwards. diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 9a805396c4..a9251d06c1 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -131,8 +131,9 @@ type Downloader struct { skeleton *skeleton // Header skeleton to backfill the chain with (eth2 mode) // State sync - pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root - pivotLock sync.RWMutex // Lock protecting pivot header reads from updates + pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root + pivotExplicit bool // arbitrum: pivot is set explicitly only + pivotLock sync.RWMutex // Lock protecting pivot header reads from updates SnapSyncer *snap.Syncer // TODO(karalabe): make private! hack for now stateSyncStart chan *stateSync @@ -476,7 +477,12 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * }(time.Now()) // Look up the sync boundaries: the common ancestor and the target block - var latest, pivot, final *types.Header + var latest, pivot, explicitPivot, final *types.Header + d.pivotLock.Lock() + if d.pivotExplicit { + explicitPivot = d.pivotHeader + } + d.pivotLock.Unlock() if !beaconMode { // In legacy mode, use the master peer to retrieve the headers from latest, pivot, err = d.fetchHead(p) @@ -489,7 +495,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * if err != nil { return err } - if latest.Number.Uint64() > uint64(fsMinFullBlocks) { + if latest.Number.Uint64() > uint64(fsMinFullBlocks) && explicitPivot == nil { number := latest.Number.Uint64() - uint64(fsMinFullBlocks) // Retrieve the pivot header from the skeleton chain segment but @@ -521,6 +527,12 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * if mode == SnapSync && pivot == nil { pivot = d.blockchain.CurrentBlock() } + if explicitPivot != nil { + if explicitPivot.Number.Cmp(latest.Number) < 0 { + return fmt.Errorf("skeleton behind explicit pivot. cannot sync") + } + pivot = explicitPivot + } height := latest.Number.Uint64() var origin uint64 @@ -546,7 +558,9 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * // Ensure our origin point is below any snap sync pivot point if mode == SnapSync { - if height <= uint64(fsMinFullBlocks) { + if explicitPivot != nil { + rawdb.WriteLastPivotNumber(d.stateDB, pivot.Nonce.Uint64()) + } else if height <= uint64(fsMinFullBlocks) { origin = 0 } else { pivotNumber := pivot.Number.Uint64() @@ -635,7 +649,9 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * } if mode == SnapSync { d.pivotLock.Lock() - d.pivotHeader = pivot + if !d.pivotExplicit { + d.pivotHeader = pivot + } d.pivotLock.Unlock() fetchers = append(fetchers, func() error { return d.processSnapSyncContent() }) @@ -1034,8 +1050,13 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e case pivoting: d.pivotLock.RLock() pivot := d.pivotHeader.Number.Uint64() + pivotExplicit := d.pivotExplicit d.pivotLock.RUnlock() + if pivotExplicit { + pivoting = false + continue + } p.log.Trace("Fetching next pivot header", "number", pivot+uint64(fsMinFullBlocks)) headers, hashes, err = d.fetchHeadersByNumber(p, pivot+uint64(fsMinFullBlocks), 2, fsMinFullBlocks-9, false) // move +64 when it's 2x64-8 deep @@ -1080,6 +1101,9 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e if d.pivotHeader != nil { pivot = d.pivotHeader.Number.Uint64() } + if d.pivotExplicit { + pivoting = false + } d.pivotLock.RUnlock() if pivoting { @@ -1637,6 +1661,7 @@ func (d *Downloader) processSnapSyncContent() error { // notifications from the header downloader d.pivotLock.RLock() pivot := d.pivotHeader + pivotExplicit := d.pivotExplicit d.pivotLock.RUnlock() if oldPivot == nil { @@ -1659,7 +1684,7 @@ func (d *Downloader) processSnapSyncContent() error { // Note, we have `reorgProtHeaderDelay` number of blocks withheld, Those // need to be taken into account, otherwise we're detecting the pivot move // late and will drop peers due to unavailable state!!! - if height := latest.Number.Uint64(); height >= pivot.Number.Uint64()+2*uint64(fsMinFullBlocks)-uint64(reorgProtHeaderDelay) { + if height := latest.Number.Uint64(); height >= pivot.Number.Uint64()+2*uint64(fsMinFullBlocks)-uint64(reorgProtHeaderDelay) && !pivotExplicit { log.Warn("Pivot became stale, moving", "old", pivot.Number.Uint64(), "new", height-uint64(fsMinFullBlocks)+uint64(reorgProtHeaderDelay)) pivot = results[len(results)-1-fsMinFullBlocks+reorgProtHeaderDelay].Header // must exist as lower old pivot is uncommitted From e88a582a840330f1022bbcef65c4aa7aa2ec9a50 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Tue, 10 Oct 2023 16:58:56 -0600 Subject: [PATCH 03/28] snap handler: dont use blockchain directly --- eth/handler_snap.go | 24 +++++++++++++++++-- eth/protocols/snap/handler.go | 44 +++++++++++++++++++---------------- 2 files changed, 46 insertions(+), 22 deletions(-) diff --git a/eth/handler_snap.go b/eth/handler_snap.go index 767416ffd6..7db98e266a 100644 --- a/eth/handler_snap.go +++ b/eth/handler_snap.go @@ -17,16 +17,36 @@ package eth import ( - "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/trie" ) // snapHandler implements the snap.Backend interface to handle the various network // packets that are sent as replies or broadcasts. type snapHandler handler -func (h *snapHandler) Chain() *core.BlockChain { return h.chain } +func (h *snapHandler) ContractCodeWithPrefix(codeHash common.Hash) ([]byte, error) { + return (*handler)(h).chain.ContractCodeWithPrefix(codeHash) +} + +func (h *snapHandler) TrieDB() *trie.Database { + return (*handler)(h).chain.StateCache().TrieDB() +} + +func (h *snapHandler) Snapshot(root common.Hash) snapshot.Snapshot { + return (*handler)(h).chain.Snapshots().Snapshot(root) +} + +func (h *snapHandler) AccountIterator(root, account common.Hash) (snapshot.AccountIterator, error) { + return (*handler)(h).chain.Snapshots().AccountIterator(root, account) +} + +func (h *snapHandler) StorageIterator(root, account, origin common.Hash) (snapshot.StorageIterator, error) { + return (*handler)(h).chain.Snapshots().StorageIterator(root, account, origin) +} // RunPeer is invoked when a peer joins on the `snap` protocol. func (h *snapHandler) RunPeer(peer *snap.Peer, hand snap.Handler) error { diff --git a/eth/protocols/snap/handler.go b/eth/protocols/snap/handler.go index ec0c8afd12..e25fc9a5e2 100644 --- a/eth/protocols/snap/handler.go +++ b/eth/protocols/snap/handler.go @@ -22,7 +22,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/log" @@ -64,7 +64,11 @@ type Handler func(peer *Peer) error // callback methods to invoke on remote deliveries. type Backend interface { // Chain retrieves the blockchain object to serve data. - Chain() *core.BlockChain + ContractCodeWithPrefix(codeHash common.Hash) ([]byte, error) + TrieDB() *trie.Database + Snapshot(root common.Hash) snapshot.Snapshot + AccountIterator(root, account common.Hash) (snapshot.AccountIterator, error) + StorageIterator(root, account, origin common.Hash) (snapshot.StorageIterator, error) // RunPeer is invoked when a peer joins on the `eth` protocol. The handler // should do any peer maintenance work, handshakes and validations. If all @@ -103,7 +107,7 @@ func MakeProtocols(backend Backend, dnsdisc enode.Iterator) []p2p.Protocol { }) }, NodeInfo: func() interface{} { - return nodeInfo(backend.Chain()) + return nodeInfo() }, PeerInfo: func(id enode.ID) interface{} { return backend.PeerInfo(id) @@ -159,7 +163,7 @@ func HandleMessage(backend Backend, peer *Peer) error { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } // Service the request, potentially returning nothing in case of errors - accounts, proofs := ServiceGetAccountRangeQuery(backend.Chain(), &req) + accounts, proofs := ServiceGetAccountRangeQuery(backend, &req) // Send back anything accumulated (or empty in case of errors) return p2p.Send(peer.rw, AccountRangeMsg, &AccountRangePacket{ @@ -191,7 +195,7 @@ func HandleMessage(backend Backend, peer *Peer) error { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } // Service the request, potentially returning nothing in case of errors - slots, proofs := ServiceGetStorageRangesQuery(backend.Chain(), &req) + slots, proofs := ServiceGetStorageRangesQuery(backend, &req) // Send back anything accumulated (or empty in case of errors) return p2p.Send(peer.rw, StorageRangesMsg, &StorageRangesPacket{ @@ -225,7 +229,7 @@ func HandleMessage(backend Backend, peer *Peer) error { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } // Service the request, potentially returning nothing in case of errors - codes := ServiceGetByteCodesQuery(backend.Chain(), &req) + codes := ServiceGetByteCodesQuery(backend, &req) // Send back anything accumulated (or empty in case of errors) return p2p.Send(peer.rw, ByteCodesMsg, &ByteCodesPacket{ @@ -250,7 +254,7 @@ func HandleMessage(backend Backend, peer *Peer) error { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } // Service the request, potentially returning nothing in case of errors - nodes, err := ServiceGetTrieNodesQuery(backend.Chain(), &req, start) + nodes, err := ServiceGetTrieNodesQuery(backend, &req, start) if err != nil { return err } @@ -277,16 +281,16 @@ func HandleMessage(backend Backend, peer *Peer) error { // ServiceGetAccountRangeQuery assembles the response to an account range query. // It is exposed to allow external packages to test protocol behavior. -func ServiceGetAccountRangeQuery(chain *core.BlockChain, req *GetAccountRangePacket) ([]*AccountData, [][]byte) { +func ServiceGetAccountRangeQuery(backend Backend, req *GetAccountRangePacket) ([]*AccountData, [][]byte) { if req.Bytes > softResponseLimit { req.Bytes = softResponseLimit } // Retrieve the requested state and bail out if non existent - tr, err := trie.New(trie.StateTrieID(req.Root), chain.StateCache().TrieDB()) + tr, err := trie.New(trie.StateTrieID(req.Root), backend.TrieDB()) if err != nil { return nil, nil } - it, err := chain.Snapshots().AccountIterator(req.Root, req.Origin) + it, err := backend.AccountIterator(req.Root, req.Origin) if err != nil { return nil, nil } @@ -337,7 +341,7 @@ func ServiceGetAccountRangeQuery(chain *core.BlockChain, req *GetAccountRangePac return accounts, proofs } -func ServiceGetStorageRangesQuery(chain *core.BlockChain, req *GetStorageRangesPacket) ([][]*StorageData, [][]byte) { +func ServiceGetStorageRangesQuery(backend Backend, req *GetStorageRangesPacket) ([][]*StorageData, [][]byte) { if req.Bytes > softResponseLimit { req.Bytes = softResponseLimit } @@ -370,7 +374,7 @@ func ServiceGetStorageRangesQuery(chain *core.BlockChain, req *GetStorageRangesP limit, req.Limit = common.BytesToHash(req.Limit), nil } // Retrieve the requested state and bail out if non existent - it, err := chain.Snapshots().StorageIterator(req.Root, account, origin) + it, err := backend.StorageIterator(req.Root, account, origin) if err != nil { return nil, nil } @@ -412,7 +416,7 @@ func ServiceGetStorageRangesQuery(chain *core.BlockChain, req *GetStorageRangesP if origin != (common.Hash{}) || (abort && len(storage) > 0) { // Request started at a non-zero hash or was capped prematurely, add // the endpoint Merkle proofs - accTrie, err := trie.NewStateTrie(trie.StateTrieID(req.Root), chain.StateCache().TrieDB()) + accTrie, err := trie.NewStateTrie(trie.StateTrieID(req.Root), backend.TrieDB()) if err != nil { return nil, nil } @@ -421,7 +425,7 @@ func ServiceGetStorageRangesQuery(chain *core.BlockChain, req *GetStorageRangesP return nil, nil } id := trie.StorageTrieID(req.Root, account, acc.Root) - stTrie, err := trie.NewStateTrie(id, chain.StateCache().TrieDB()) + stTrie, err := trie.NewStateTrie(id, backend.TrieDB()) if err != nil { return nil, nil } @@ -450,7 +454,7 @@ func ServiceGetStorageRangesQuery(chain *core.BlockChain, req *GetStorageRangesP // ServiceGetByteCodesQuery assembles the response to a byte codes query. // It is exposed to allow external packages to test protocol behavior. -func ServiceGetByteCodesQuery(chain *core.BlockChain, req *GetByteCodesPacket) [][]byte { +func ServiceGetByteCodesQuery(backend Backend, req *GetByteCodesPacket) [][]byte { if req.Bytes > softResponseLimit { req.Bytes = softResponseLimit } @@ -467,7 +471,7 @@ func ServiceGetByteCodesQuery(chain *core.BlockChain, req *GetByteCodesPacket) [ // Peers should not request the empty code, but if they do, at // least sent them back a correct response without db lookups codes = append(codes, []byte{}) - } else if blob, err := chain.ContractCodeWithPrefix(hash); err == nil { + } else if blob, err := backend.ContractCodeWithPrefix(hash); err == nil { codes = append(codes, blob) bytes += uint64(len(blob)) } @@ -480,12 +484,12 @@ func ServiceGetByteCodesQuery(chain *core.BlockChain, req *GetByteCodesPacket) [ // ServiceGetTrieNodesQuery assembles the response to a trie nodes query. // It is exposed to allow external packages to test protocol behavior. -func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, start time.Time) ([][]byte, error) { +func ServiceGetTrieNodesQuery(backend Backend, req *GetTrieNodesPacket, start time.Time) ([][]byte, error) { if req.Bytes > softResponseLimit { req.Bytes = softResponseLimit } // Make sure we have the state associated with the request - triedb := chain.StateCache().TrieDB() + triedb := backend.TrieDB() accTrie, err := trie.NewStateTrie(trie.StateTrieID(req.Root), triedb) if err != nil { @@ -493,7 +497,7 @@ func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, s return nil, nil } // The 'snap' might be nil, in which case we cannot serve storage slots. - snap := chain.Snapshots().Snapshot(req.Root) + snap := backend.Snapshot(req.Root) // Retrieve trie nodes until the packet size limit is reached var ( nodes [][]byte @@ -570,6 +574,6 @@ func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, s type NodeInfo struct{} // nodeInfo retrieves some `snap` protocol metadata about the running host node. -func nodeInfo(chain *core.BlockChain) *NodeInfo { +func nodeInfo() *NodeInfo { return &NodeInfo{} } From aac8749827370b9bf4a5e3a63c89bd1908a01301 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Tue, 10 Oct 2023 17:00:06 -0600 Subject: [PATCH 04/28] update arb syncer --- arbitrum/handler_p2p.go | 90 ++++++++++++++++++++++++++++++++++-- eth/downloader/beaconsync.go | 8 +++- eth/downloader/downloader.go | 2 +- 3 files changed, 94 insertions(+), 6 deletions(-) diff --git a/arbitrum/handler_p2p.go b/arbitrum/handler_p2p.go index 22af972bcd..ce4111a91b 100644 --- a/arbitrum/handler_p2p.go +++ b/arbitrum/handler_p2p.go @@ -22,6 +22,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/protocols/eth" @@ -31,13 +32,17 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" ) type protocolHandler struct { chain *core.BlockChain eventMux *event.TypeMux downloader *downloader.Downloader - done atomic.Bool + db ethdb.Database + + done atomic.Bool } func NewProtocolHandler(db ethdb.Database, bc *core.BlockChain) *protocolHandler { @@ -45,6 +50,7 @@ func NewProtocolHandler(db ethdb.Database, bc *core.BlockChain) *protocolHandler p := &protocolHandler{ chain: bc, eventMux: evMux, + db: db, } peerDrop := func(id string) { log.Info("dropping peer", "id", id) @@ -120,7 +126,6 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { case *eth.PooledTransactionsPacket: return fmt.Errorf("unexpected eth packet type for nitro: %T", packet) - default: return fmt.Errorf("unexpected eth packet type for nitro: %T", packet) } @@ -128,7 +133,86 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { type snapHandler protocolHandler -func (h *snapHandler) Chain() *core.BlockChain { return h.chain } +func (h *snapHandler) ContractCodeWithPrefix(codeHash common.Hash) ([]byte, error) { + return h.chain.ContractCodeWithPrefix(codeHash) +} + +func (h *snapHandler) TrieDB() *trie.Database { + return h.chain.StateCache().TrieDB() +} + +func (h *snapHandler) Snapshot(root common.Hash) snapshot.Snapshot { + return nil +} + +type trieIteratorWrapper struct { + val *trie.Iterator +} + +func (i trieIteratorWrapper) Next() bool { return i.val.Next() } +func (i trieIteratorWrapper) Error() error { return i.val.Err } +func (i trieIteratorWrapper) Hash() common.Hash { return common.BytesToHash(i.val.Key) } +func (i trieIteratorWrapper) Release() {} + +type trieAccountIterator struct { + trieIteratorWrapper +} + +func (i trieAccountIterator) Account() []byte { return i.val.Value } + +func (h *snapHandler) AccountIterator(root, account common.Hash) (snapshot.AccountIterator, error) { + triedb := trie.NewDatabase(h.db) + t, err := trie.NewStateTrie(trie.StateTrieID(root), triedb) + if err != nil { + log.Error("Failed to open trie", "root", root, "err", err) + return nil, err + } + accIter := t.NodeIterator(account[:]) + return trieAccountIterator{trieIteratorWrapper{trie.NewIterator((accIter))}}, nil +} + +type trieStoreageIterator struct { + trieIteratorWrapper +} + +func (i trieStoreageIterator) Slot() []byte { return i.val.Value } + +type nilStoreageIterator struct{} + +func (i nilStoreageIterator) Next() bool { return false } +func (i nilStoreageIterator) Error() error { return nil } +func (i nilStoreageIterator) Hash() common.Hash { return types.EmptyRootHash } +func (i nilStoreageIterator) Release() {} +func (i nilStoreageIterator) Slot() []byte { return nil } + +func (h *snapHandler) StorageIterator(root, account, origin common.Hash) (snapshot.StorageIterator, error) { + triedb := trie.NewDatabase(h.db) + t, err := trie.NewStateTrie(trie.StateTrieID(root), triedb) + if err != nil { + log.Error("Failed to open trie", "root", root, "err", err) + return nil, err + } + val, _, err := t.GetNode(account[:]) + if err != nil { + log.Error("Failed to find account in trie", "root", root, "account", account, "err", err) + return nil, err + } + var acc types.StateAccount + if err := rlp.DecodeBytes(val, &acc); err != nil { + log.Error("Invalid account encountered during traversal", "err", err) + return nil, err + } + if acc.Root == types.EmptyRootHash { + return nilStoreageIterator{}, nil + } + id := trie.StorageTrieID(root, account, acc.Root) + storageTrie, err := trie.NewStateTrie(id, triedb) + if err != nil { + log.Error("Failed to open storage trie", "root", acc.Root, "err", err) + return nil, err + } + return trieStoreageIterator{trieIteratorWrapper{trie.NewIterator(storageTrie.NodeIterator(origin[:]))}}, nil +} // RunPeer is invoked when a peer joins on the `snap` protocol. func (h *snapHandler) RunPeer(peer *snap.Peer, hand snap.Handler) error { diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index ebfe522b2e..af55f14bb4 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -17,6 +17,7 @@ package downloader import ( + "errors" "fmt" "sync" "time" @@ -165,12 +166,15 @@ func (d *Downloader) BeaconExtend(mode SyncMode, head *types.Header) error { } // PivotSync sets an explicit pivot and syncs from there. Pivot state will be read from peers. -func (d *Downloader) PivotSync(pivot *types.Header) error { +func (d *Downloader) PivotSync(head *types.Header, pivot *types.Header) error { + if head.Number.Cmp(pivot.Number) < 0 { + return errors.New("pivot must be behind head") + } d.pivotLock.Lock() d.pivotHeader = pivot d.pivotExplicit = true d.pivotLock.Unlock() - return d.beaconSync(SnapSync, pivot, nil, true) + return d.beaconSync(SnapSync, head, nil, true) } // beaconSync is the post-merge version of the chain synchronization, where the diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index a9251d06c1..8557703a7c 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -528,7 +528,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * pivot = d.blockchain.CurrentBlock() } if explicitPivot != nil { - if explicitPivot.Number.Cmp(latest.Number) < 0 { + if explicitPivot.Number.Cmp(latest.Number) > 0 { return fmt.Errorf("skeleton behind explicit pivot. cannot sync") } pivot = explicitPivot From 414ac55d96bb546fbd3e9c9d99b17cbaa90e6448 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Tue, 10 Oct 2023 17:00:15 -0600 Subject: [PATCH 05/28] update sync_test --- arbitrum/sync_test.go | 116 ++++++++++++++++++++++++++---------------- 1 file changed, 71 insertions(+), 45 deletions(-) diff --git a/arbitrum/sync_test.go b/arbitrum/sync_test.go index 1b5f22189c..ee2cf72795 100644 --- a/arbitrum/sync_test.go +++ b/arbitrum/sync_test.go @@ -12,7 +12,6 @@ import ( "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" @@ -55,9 +54,28 @@ func (i *dummyIterator) Close() { // ends the iterator i.nodes = nil } +func testHasBlock(t *testing.T, chain *core.BlockChain, block *types.Block, shouldHaveState bool) { + t.Helper() + hasHeader := chain.GetHeaderByNumber(block.NumberU64()) + if hasHeader == nil { + t.Fatal("block not found") + } + if hasHeader.Hash() != block.Hash() { + t.Fatal("wrong block in blockchain") + } + _, err := chain.StateAt(hasHeader.Root) + if err != nil && shouldHaveState { + t.Fatal("should have state, but doesn't") + } + if err == nil && !shouldHaveState { + t.Fatal("should not have state, but does") + } +} + func TestSimpleSync(t *testing.T) { - const numBlocks = 200 - const oldBlock = 198 + const pivotBlockNum = 50 + const syncBlockNum = 70 + const extraBlocks = 200 glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.TerminalFormat(false))) glogger.Verbosity(log.Lvl(log.LvlTrace)) @@ -75,36 +93,36 @@ func TestSimpleSync(t *testing.T) { t.Fatal("generate key err:", err) } - // key for onchain user - testUser, err := crypto.GenerateKey() - if err != nil { - t.Fatal("generate key err:", err) - } - testUserAddress := crypto.PubkeyToAddress(testUser.PublicKey) - + // source node sourceStackConf := node.DefaultConfig - sourceStackConf.DataDir = "" + sourceStackConf.DataDir = t.TempDir() sourceStackConf.P2P.NoDiscovery = true sourceStackConf.P2P.ListenAddr = "127.0.0.1:0" sourceStackConf.P2P.PrivateKey = sourceKey - destStackConf := sourceStackConf - destStackConf.P2P.PrivateKey = destKey - sourceStack, err := node.New(&sourceStackConf) if err != nil { t.Fatal(err) } + sourceDb, err := sourceStack.OpenDatabaseWithFreezer("l2chaindata", 2048, 512, "", "", false) + if err != nil { + t.Fatal(err) + } // create and populate chain - sourceDb := rawdb.NewMemoryDatabase() + testUser, err := crypto.GenerateKey() + if err != nil { + t.Fatal("generate key err:", err) + } + testUserAddress := crypto.PubkeyToAddress(testUser.PublicKey) gspec := &core.Genesis{ Config: params.TestChainConfig, Alloc: core.GenesisAlloc{testUserAddress: {Balance: new(big.Int).Lsh(big.NewInt(1), 250)}}, } sourceChain, _ := core.NewBlockChain(sourceDb, nil, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, nil) signer := types.MakeSigner(sourceChain.Config(), big.NewInt(1), 0) - _, bs, _ := core.GenerateChainWithGenesis(gspec, ethash.NewFaker(), numBlocks, func(i int, gen *core.BlockGen) { + + _, blocks, _ := core.GenerateChainWithGenesis(gspec, ethash.NewFaker(), syncBlockNum+extraBlocks, func(i int, gen *core.BlockGen) { tx, err := types.SignNewTx(testUser, signer, &types.LegacyTx{ Nonce: gen.TxNonce(testUserAddress), GasPrice: gen.BaseFee(), @@ -115,10 +133,26 @@ func TestSimpleSync(t *testing.T) { } gen.AddTx(tx) }) - if _, err := sourceChain.InsertChain(bs); err != nil { + + if _, err := sourceChain.InsertChain(blocks[:pivotBlockNum]); err != nil { + t.Fatal(err) + } + + pivotBlock := blocks[pivotBlockNum-1] + syncBlock := blocks[syncBlockNum-1] + + testHasBlock(t, sourceChain, pivotBlock, true) + sourceChain.TrieDB().Commit(pivotBlock.Root(), true) + + if _, err := sourceChain.InsertChain(blocks[pivotBlockNum:]); err != nil { t.Fatal(err) } + // should have state of pivot but nothing around + testHasBlock(t, sourceChain, blocks[pivotBlockNum-2], false) + testHasBlock(t, sourceChain, blocks[pivotBlockNum-1], true) + testHasBlock(t, sourceChain, blocks[pivotBlockNum], false) + // source node sourceHandler := NewProtocolHandler(sourceDb, sourceChain) sourceStack.RegisterProtocols(sourceHandler.MakeProtocols(&dummyIterator{})) @@ -137,12 +171,19 @@ func TestSimpleSync(t *testing.T) { } // dest node - destDb := rawdb.NewMemoryDatabase() - destChain, _ := core.NewBlockChain(destDb, nil, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, nil) + destStackConf := sourceStackConf + destStackConf.DataDir = t.TempDir() + destStackConf.P2P.PrivateKey = destKey destStack, err := node.New(&destStackConf) if err != nil { t.Fatal(err) } + + destDb, err := destStack.OpenDatabaseWithFreezer("l2chaindata", 2048, 512, "", "", false) + if err != nil { + t.Fatal(err) + } + destChain, _ := core.NewBlockChain(destDb, nil, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, nil) destHandler := NewProtocolHandler(destDb, destChain) destStack.RegisterProtocols(destHandler.MakeProtocols(iter)) destStack.Start() @@ -151,38 +192,23 @@ func TestSimpleSync(t *testing.T) { log.Info("dest listener", "address", destStack.Server().Config.ListenAddr) log.Info("initial source", "head", sourceChain.CurrentBlock()) log.Info("initial dest", "head", destChain.CurrentBlock()) - err = destHandler.downloader.PivotSync(sourceChain.CurrentBlock()) + log.Info("pivot", "head", pivotBlock.Header()) + err = destHandler.downloader.PivotSync(syncBlock.Header(), pivotBlock.Header()) if err != nil { t.Fatal(err) } <-time.After(time.Second * 5) + log.Info("final source", "head", sourceChain.CurrentBlock()) + log.Info("final dest", "head", destChain.CurrentBlock()) + log.Info("sync block", "header", syncBlock.Header()) + // check sync - if sourceChain.CurrentBlock().Hash() != destChain.CurrentBlock().Hash() { - log.Info("final source", "head", sourceChain.CurrentBlock()) - log.Info("final dest", "head", destChain.CurrentBlock()) - t.Fatal("dest chain not synced to source") + if destChain.CurrentBlock().Number.Cmp(syncBlock.Number()) != 0 { + t.Fatal("did not sync to sync block") } - oldDest := destChain.GetHeaderByNumber(oldBlock) - if oldDest == nil { - t.Fatal("old dest block nil") - } - oldSource := sourceChain.GetHeaderByNumber(oldBlock) - if oldSource == nil { - t.Fatal("old source block nil") - } - if oldDest.Hash() != oldSource.Hash() { - log.Info("final source", "old", oldSource) - log.Info("final dest", "old", oldDest) - t.Fatal("dest and source differ") - } - _, err = sourceChain.StateAt(oldSource.Root) - if err != nil { - t.Fatal("source chain does not have state for old block") - } - _, err = destChain.StateAt(oldDest.Root) - if err == nil { - t.Fatal("dest chain does have state for old block, but should have been snap-synced") - } + testHasBlock(t, destChain, syncBlock, true) + testHasBlock(t, destChain, pivotBlock, true) + testHasBlock(t, destChain, blocks[pivotBlockNum-2], false) } From d9423b1a5dc4c3a54ad8c688155cf3d9be3a2c21 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Mon, 16 Oct 2023 19:35:24 -0600 Subject: [PATCH 06/28] arbitrum handler_p2p: fix iteration --- arbitrum/handler_p2p.go | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/arbitrum/handler_p2p.go b/arbitrum/handler_p2p.go index ce4111a91b..3d612a71e4 100644 --- a/arbitrum/handler_p2p.go +++ b/arbitrum/handler_p2p.go @@ -32,7 +32,6 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" ) @@ -146,19 +145,19 @@ func (h *snapHandler) Snapshot(root common.Hash) snapshot.Snapshot { } type trieIteratorWrapper struct { - val *trie.Iterator + iter *trie.Iterator } -func (i trieIteratorWrapper) Next() bool { return i.val.Next() } -func (i trieIteratorWrapper) Error() error { return i.val.Err } -func (i trieIteratorWrapper) Hash() common.Hash { return common.BytesToHash(i.val.Key) } +func (i trieIteratorWrapper) Next() bool { return i.iter.Next() } +func (i trieIteratorWrapper) Error() error { return i.iter.Err } +func (i trieIteratorWrapper) Hash() common.Hash { return common.BytesToHash(i.iter.Key) } func (i trieIteratorWrapper) Release() {} type trieAccountIterator struct { trieIteratorWrapper } -func (i trieAccountIterator) Account() []byte { return i.val.Value } +func (i trieAccountIterator) Account() []byte { return i.iter.Value } func (h *snapHandler) AccountIterator(root, account common.Hash) (snapshot.AccountIterator, error) { triedb := trie.NewDatabase(h.db) @@ -168,14 +167,16 @@ func (h *snapHandler) AccountIterator(root, account common.Hash) (snapshot.Accou return nil, err } accIter := t.NodeIterator(account[:]) - return trieAccountIterator{trieIteratorWrapper{trie.NewIterator((accIter))}}, nil + return trieAccountIterator{trieIteratorWrapper{ + iter: trie.NewIterator((accIter)), + }}, nil } type trieStoreageIterator struct { trieIteratorWrapper } -func (i trieStoreageIterator) Slot() []byte { return i.val.Value } +func (i trieStoreageIterator) Slot() []byte { return i.iter.Value } type nilStoreageIterator struct{} @@ -192,16 +193,11 @@ func (h *snapHandler) StorageIterator(root, account, origin common.Hash) (snapsh log.Error("Failed to open trie", "root", root, "err", err) return nil, err } - val, _, err := t.GetNode(account[:]) + acc, err := t.GetAccountByHash(account) if err != nil { log.Error("Failed to find account in trie", "root", root, "account", account, "err", err) return nil, err } - var acc types.StateAccount - if err := rlp.DecodeBytes(val, &acc); err != nil { - log.Error("Invalid account encountered during traversal", "err", err) - return nil, err - } if acc.Root == types.EmptyRootHash { return nilStoreageIterator{}, nil } @@ -211,7 +207,9 @@ func (h *snapHandler) StorageIterator(root, account, origin common.Hash) (snapsh log.Error("Failed to open storage trie", "root", acc.Root, "err", err) return nil, err } - return trieStoreageIterator{trieIteratorWrapper{trie.NewIterator(storageTrie.NodeIterator(origin[:]))}}, nil + return trieStoreageIterator{trieIteratorWrapper{ + iter: trie.NewIterator(storageTrie.NodeIterator(origin[:])), + }}, nil } // RunPeer is invoked when a peer joins on the `snap` protocol. From 67f54b0c39258a31ccf0886d5e3b462166d2fb4f Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Mon, 16 Oct 2023 19:35:56 -0600 Subject: [PATCH 07/28] sync_test: add storage to blocks --- arbitrum/sync_test.go | 88 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 84 insertions(+), 4 deletions(-) diff --git a/arbitrum/sync_test.go b/arbitrum/sync_test.go index ee2cf72795..8fcd990a49 100644 --- a/arbitrum/sync_test.go +++ b/arbitrum/sync_test.go @@ -1,6 +1,7 @@ package arbitrum import ( + "encoding/hex" "math/big" "net" "os" @@ -10,6 +11,7 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" @@ -110,21 +112,60 @@ func TestSimpleSync(t *testing.T) { } // create and populate chain + + // pragma solidity ^0.8.20; + // + // contract Temmp { + // uint256[0x10000] private store; + // + // fallback(bytes calldata data) external payable returns (bytes memory) { + // uint16 index = uint16(uint256(bytes32(data[0:32]))); + // store[index] += 1; + // return ""; + // } + // } + + contractCodeHex := "608060405234801561001057600080fd5b50610218806100206000396000f3fe608060405260003660606000838360009060209261001f9392919061008a565b9061002a91906100e7565b60001c9050600160008261ffff1662010000811061004b5761004a610146565b5b01600082825461005b91906101ae565b9250508190555060405180602001604052806000815250915050915050805190602001f35b600080fd5b600080fd5b6000808585111561009e5761009d610080565b5b838611156100af576100ae610085565b5b6001850283019150848603905094509492505050565b600082905092915050565b6000819050919050565b600082821b905092915050565b60006100f383836100c5565b826100fe81356100d0565b9250602082101561013e576101397fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff836020036008026100da565b831692505b505092915050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052603260045260246000fd5b6000819050919050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052601160045260246000fd5b60006101b982610175565b91506101c483610175565b92508282019050808211156101dc576101db61017f565b5b9291505056fea26469706673582212202777d6cb94519b9aa7026cf6dad162739731e124c6379b15c343ff1c6e84a5f264736f6c63430008150033" + contractCode, err := hex.DecodeString(contractCodeHex) + if err != nil { + t.Fatal("decode contract error:", err) + } testUser, err := crypto.GenerateKey() if err != nil { t.Fatal("generate key err:", err) } testUserAddress := crypto.PubkeyToAddress(testUser.PublicKey) + + testUser2, err := crypto.GenerateKey() + if err != nil { + t.Fatal("generate key err:", err) + } + testUser2Address := crypto.PubkeyToAddress(testUser2.PublicKey) + gspec := &core.Genesis{ Config: params.TestChainConfig, - Alloc: core.GenesisAlloc{testUserAddress: {Balance: new(big.Int).Lsh(big.NewInt(1), 250)}}, + Alloc: core.GenesisAlloc{ + testUserAddress: {Balance: new(big.Int).Lsh(big.NewInt(1), 250)}, + testUser2Address: {Balance: new(big.Int).Lsh(big.NewInt(1), 250)}, + }, } sourceChain, _ := core.NewBlockChain(sourceDb, nil, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, nil) signer := types.MakeSigner(sourceChain.Config(), big.NewInt(1), 0) - _, blocks, _ := core.GenerateChainWithGenesis(gspec, ethash.NewFaker(), syncBlockNum+extraBlocks, func(i int, gen *core.BlockGen) { - tx, err := types.SignNewTx(testUser, signer, &types.LegacyTx{ - Nonce: gen.TxNonce(testUserAddress), + firstAddress := common.Address{} + _, blocks, allReceipts := core.GenerateChainWithGenesis(gspec, ethash.NewFaker(), syncBlockNum+extraBlocks, func(i int, gen *core.BlockGen) { + creationNonce := gen.TxNonce(testUser2Address) + tx, err := types.SignTx(types.NewContractCreation(creationNonce, new(big.Int), 1000000, gen.BaseFee(), contractCode), signer, testUser2) + if err != nil { + t.Fatalf("failed to create contract: %v", err) + } + gen.AddTx(tx) + + contractAddress := crypto.CreateAddress(testUser2Address, creationNonce) + + nonce := gen.TxNonce(testUserAddress) + tx, err = types.SignNewTx(testUser, signer, &types.LegacyTx{ + Nonce: nonce, GasPrice: gen.BaseFee(), Gas: uint64(1000001), }) @@ -132,8 +173,47 @@ func TestSimpleSync(t *testing.T) { t.Fatalf("failed to create tx: %v", err) } gen.AddTx(tx) + + iterHash := common.BigToHash(big.NewInt(int64(i))) + tx, err = types.SignNewTx(testUser, signer, &types.LegacyTx{ + To: &contractAddress, + Nonce: nonce + 1, + GasPrice: gen.BaseFee(), + Gas: uint64(1000001), + Data: iterHash[:], + }) + if err != nil { + t.Fatalf("failed to create tx: %v", err) + } + gen.AddTx(tx) + + if firstAddress == (common.Address{}) { + firstAddress = contractAddress + } + + tx, err = types.SignNewTx(testUser, signer, &types.LegacyTx{ + To: &firstAddress, + Nonce: nonce + 2, + GasPrice: gen.BaseFee(), + Gas: uint64(1000001), + Data: iterHash[:], + }) + if err != nil { + t.Fatalf("failed to create tx: %v", err) + } + gen.AddTx(tx) }) + for _, receipts := range allReceipts { + if len(receipts) < 3 { + t.Fatal("missing receipts") + } + for _, receipt := range receipts { + if receipt.Status == 0 { + t.Fatal("failed transaction") + } + } + } if _, err := sourceChain.InsertChain(blocks[:pivotBlockNum]); err != nil { t.Fatal(err) } From e0b50228969770605cf18ea25f216dbc93658a76 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Thu, 26 Oct 2023 20:04:57 -0600 Subject: [PATCH 08/28] arb protocol: initial --- arbitrum/handler_p2p.go | 12 ++++++ eth/protocols/arb/enr.go | 14 ++++++ eth/protocols/arb/handler.go | 73 ++++++++++++++++++++++++++++++++ eth/protocols/arb/protocol.go | 80 +++++++++++++++++++++++++++++++++++ 4 files changed, 179 insertions(+) create mode 100644 eth/protocols/arb/enr.go create mode 100644 eth/protocols/arb/handler.go create mode 100644 eth/protocols/arb/protocol.go diff --git a/arbitrum/handler_p2p.go b/arbitrum/handler_p2p.go index 3d612a71e4..096df8d3cd 100644 --- a/arbitrum/handler_p2p.go +++ b/arbitrum/handler_p2p.go @@ -25,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/eth/protocols/arb" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/ethdb" @@ -65,9 +66,20 @@ func NewProtocolHandler(db ethdb.Database, bc *core.BlockChain) *protocolHandler func (h *protocolHandler) MakeProtocols(dnsdisc enode.Iterator) []p2p.Protocol { protos := eth.MakeProtocols((*ethHandler)(h), h.chain.Config().ChainID.Uint64(), dnsdisc) protos = append(protos, snap.MakeProtocols((*snapHandler)(h), dnsdisc)...) + protos = append(protos, arb.MakeProtocols((*arbHandler)(h), dnsdisc)...) return protos } +type arbHandler protocolHandler + +func (h *arbHandler) PeerInfo(id enode.ID) interface{} { + return nil +} + +func (h *arbHandler) LastConfirmed(id enode.ID, confirmed *types.Header) error { + return nil +} + // ethHandler implements the eth.Backend interface to handle the various network // packets that are sent as replies or broadcasts. type ethHandler protocolHandler diff --git a/eth/protocols/arb/enr.go b/eth/protocols/arb/enr.go new file mode 100644 index 0000000000..1da4cdce69 --- /dev/null +++ b/eth/protocols/arb/enr.go @@ -0,0 +1,14 @@ +package arb + +import "github.com/ethereum/go-ethereum/rlp" + +// enrEntry is the ENR entry which advertises `snap` protocol on the discovery. +type enrEntry struct { + // Ignore additional fields (for forward compatibility). + Rest []rlp.RawValue `rlp:"tail"` +} + +// ENRKey implements enr.Entry. +func (e enrEntry) ENRKey() string { + return "arb" +} diff --git a/eth/protocols/arb/handler.go b/eth/protocols/arb/handler.go new file mode 100644 index 0000000000..4913f41d87 --- /dev/null +++ b/eth/protocols/arb/handler.go @@ -0,0 +1,73 @@ +package arb + +import ( + "fmt" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/p2p" +) + +type Peer struct { + p2pPeer *p2p.Peer + rw p2p.MsgReadWriter +} + +func NewPeer(p2pPeer *p2p.Peer, rw p2p.MsgReadWriter) *Peer { + return &Peer{ + p2pPeer: p2pPeer, + rw: rw, + } +} + +func (p *Peer) Run(backend Backend) error { + return Handle(backend, p) +} + +// Handle is the callback invoked to manage the life cycle of a `snap` peer. +// When this function terminates, the peer is disconnected. +func Handle(backend Backend, peer *Peer) error { + for { + if err := HandleMessage(backend, peer); err != nil { + log.Debug("Message handling failed in `arb`", "err", err) + return err + } + } +} + +// HandleMessage is invoked whenever an inbound message is received from a +// remote peer on the `snap` protocol. The remote connection is torn down upon +// returning any error. +func HandleMessage(backend Backend, peer *Peer) error { + // Read the next message from the remote peer, and ensure it's fully consumed + msg, err := peer.rw.ReadMsg() + if err != nil { + return err + } + // if msg.Size > maxMessageSize { + // return fmt.Errorf("%w: %v > %v", errMsgTooLarge, msg.Size, maxMessageSize) + // } + defer msg.Discard() + start := time.Now() + // Track the amount of time it takes to serve the request and run the handler + if metrics.Enabled { + h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, ARB1, msg.Code) + defer func(start time.Time) { + sampler := func() metrics.Sample { + return metrics.NewBoundedHistogramSample() + } + metrics.GetOrRegisterHistogramLazy(h, nil, sampler).Update(time.Since(start).Microseconds()) + }(start) + } + if msg.Code == GetLastConfirmedMsg { + response := LastConfirmedMsgPacket{} + return p2p.Send(peer.rw, LastConfirmedMsg, &response) + } + if msg.Code == LastConfirmedMsg { + incoming := LastConfirmedMsgPacket{} + msg.Decode(&incoming) + return backend.LastConfirmed(peer.p2pPeer.ID(), &incoming.header) + } + return fmt.Errorf("Invalid message: %v", msg.Code) +} diff --git a/eth/protocols/arb/protocol.go b/eth/protocols/arb/protocol.go new file mode 100644 index 0000000000..4566f1bbbc --- /dev/null +++ b/eth/protocols/arb/protocol.go @@ -0,0 +1,80 @@ +package arb + +import ( + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" +) + +// Constants to match up protocol versions and messages +const ( + ARB1 = 1 +) + +// ProtocolName is the official short name of the `snap` protocol used during +// devp2p capability negotiation. +const ProtocolName = "arb" + +// ProtocolVersions are the supported versions of the `snap` protocol (first +// is primary). +var ProtocolVersions = []uint{ARB1} + +// protocolLengths are the number of implemented message corresponding to +// different protocol versions. +var protocolLengths = map[uint]uint64{ARB1: 8} + +const ( + GetLastConfirmedMsg = 0x00 + LastConfirmedMsg = 0x01 + GetLastCheckpointMsg = 0x02 + LastCheckpointMsg = 0x03 + QueryCheckpointSupportedMsg = 0x04 + CheckpointSupportedMsg = 0x05 + ProtocolLenArb1 = 6 +) + +type LastConfirmedMsgPacket struct { + header types.Header + node uint64 +} + +// NodeInfo represents a short summary of the `arb` sub-protocol metadata +// known about the host peer. +type NodeInfo struct{} + +// nodeInfo retrieves some `arb` protocol metadata about the running host node. +func nodeInfo() *NodeInfo { + return &NodeInfo{} +} + +type Backend interface { + PeerInfo(id enode.ID) interface{} + LastConfirmed(id enode.ID, confirmed *types.Header) error +} + +func MakeProtocols(backend Backend, dnsdisc enode.Iterator) []p2p.Protocol { + protocols := make([]p2p.Protocol, len(ProtocolVersions)) + for i, version := range ProtocolVersions { + version := version // Closure + + protocols[i] = p2p.Protocol{ + Name: ProtocolName, + Version: version, + Length: protocolLengths[version], + Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { + peer := NewPeer(p, rw) + return peer.Run(backend) + }, + NodeInfo: func() interface{} { + return nodeInfo() + }, + PeerInfo: func(id enode.ID) interface{} { + return backend.PeerInfo(id) + }, + Attributes: []enr.Entry{&enrEntry{}}, + DialCandidates: dnsdisc, + } + } + return protocols +} From 16518aae87c5364737be31a7db45af738735fda0 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Fri, 3 Nov 2023 17:39:56 -0600 Subject: [PATCH 09/28] downloader: handle nil pivot --- eth/downloader/beaconsync.go | 4 +-- eth/downloader/downloader.go | 52 +++++++++++++++++++++++++----------- 2 files changed, 38 insertions(+), 18 deletions(-) diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index af55f14bb4..188fefc83f 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -167,7 +167,7 @@ func (d *Downloader) BeaconExtend(mode SyncMode, head *types.Header) error { // PivotSync sets an explicit pivot and syncs from there. Pivot state will be read from peers. func (d *Downloader) PivotSync(head *types.Header, pivot *types.Header) error { - if head.Number.Cmp(pivot.Number) < 0 { + if pivot != nil && head.Number.Cmp(pivot.Number) < 0 { return errors.New("pivot must be behind head") } d.pivotLock.Lock() @@ -311,7 +311,7 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error { // If the pivot became stale (older than 2*64-8 (bit of wiggle room)), // move it ahead to HEAD-64 d.pivotLock.Lock() - if d.pivotHeader != nil { + if d.pivotHeader != nil && d.pivotExplicit == false { if head.Number.Uint64() > d.pivotHeader.Number.Uint64()+2*uint64(fsMinFullBlocks)-8 { // Retrieve the next pivot header, either from skeleton chain // or the filled chain diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 8557703a7c..461f5dab0d 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -477,13 +477,30 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * }(time.Now()) // Look up the sync boundaries: the common ancestor and the target block - var latest, pivot, explicitPivot, final *types.Header + var latest, pivot, final *types.Header + var pivotExplicit bool d.pivotLock.Lock() if d.pivotExplicit { - explicitPivot = d.pivotHeader + pivotExplicit = true + pivot = d.pivotHeader } d.pivotLock.Unlock() - if !beaconMode { + if pivotExplicit { + latest, _, _, err = d.skeleton.Bounds() + if err != nil { + return err + } + if pivot != nil { + localPivot := d.skeleton.Header(pivot.Number.Uint64()) + if localPivot == nil { + return fmt.Errorf("pivot not in skeleton chain") + } + if localPivot.Hash() != pivot.Hash() { + return fmt.Errorf("pivot disagrees with skeleton") + } + final = localPivot + } + } else if !beaconMode { // In legacy mode, use the master peer to retrieve the headers from latest, pivot, err = d.fetchHead(p) if err != nil { @@ -495,7 +512,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * if err != nil { return err } - if latest.Number.Uint64() > uint64(fsMinFullBlocks) && explicitPivot == nil { + if latest.Number.Uint64() > uint64(fsMinFullBlocks) { number := latest.Number.Uint64() - uint64(fsMinFullBlocks) // Retrieve the pivot header from the skeleton chain segment but @@ -524,15 +541,9 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * // threshold (i.e. new chain). In that case we won't really snap sync // anyway, but still need a valid pivot block to avoid some code hitting // nil panics on access. - if mode == SnapSync && pivot == nil { + if mode == SnapSync && pivot == nil && !pivotExplicit { pivot = d.blockchain.CurrentBlock() } - if explicitPivot != nil { - if explicitPivot.Number.Cmp(latest.Number) > 0 { - return fmt.Errorf("skeleton behind explicit pivot. cannot sync") - } - pivot = explicitPivot - } height := latest.Number.Uint64() var origin uint64 @@ -558,8 +569,10 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * // Ensure our origin point is below any snap sync pivot point if mode == SnapSync { - if explicitPivot != nil { - rawdb.WriteLastPivotNumber(d.stateDB, pivot.Nonce.Uint64()) + if pivotExplicit { + if pivot != nil { + rawdb.WriteLastPivotNumber(d.stateDB, pivot.Nonce.Uint64()) + } } else if height <= uint64(fsMinFullBlocks) { origin = 0 } else { @@ -573,7 +586,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * } } d.committed.Store(true) - if mode == SnapSync && pivot.Number.Uint64() != 0 { + if mode == SnapSync && pivot != nil && pivot.Number.Uint64() != 0 { d.committed.Store(false) } if mode == SnapSync { @@ -653,8 +666,15 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * d.pivotHeader = pivot } d.pivotLock.Unlock() - - fetchers = append(fetchers, func() error { return d.processSnapSyncContent() }) + if pivot != nil { + fetchers = append(fetchers, func() error { return d.processSnapSyncContent() }) + } else { + // no pivot yet - cannot complete this sync + fetchers = append(fetchers, func() error { + <-d.cancelCh + return errCanceled + }) + } } else if mode == FullSync { fetchers = append(fetchers, func() error { return d.processFullSyncContent(ttd, beaconMode) }) } From 9f516dea529da20e7661d314357d61c44eb333ab Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Fri, 3 Nov 2023 17:41:14 -0600 Subject: [PATCH 10/28] arb protocol updates --- arbitrum/handler_p2p.go | 143 ++++++++++++++++++++++++++++++++-- arbitrum/sync_test.go | 32 ++++++-- eth/protocols/arb/handler.go | 84 +++++++++++++++++--- eth/protocols/arb/protocol.go | 47 ++++++++--- 4 files changed, 271 insertions(+), 35 deletions(-) diff --git a/arbitrum/handler_p2p.go b/arbitrum/handler_p2p.go index 096df8d3cd..658d4ea152 100644 --- a/arbitrum/handler_p2p.go +++ b/arbitrum/handler_p2p.go @@ -18,6 +18,7 @@ package arbitrum import ( "fmt" + "sync" "sync/atomic" "github.com/ethereum/go-ethereum/common" @@ -36,27 +37,53 @@ import ( "github.com/ethereum/go-ethereum/trie" ) +type SyncHelper interface { + LastConfirmed() (*types.Header, uint64, error) + LastCheckpoint() (*types.Header, error) + CheckpointSupported(*types.Header) (bool, error) + ValidateConfirmed(*types.Header, uint64) (bool, error) +} + +type Peer struct { + arb *arb.Peer + eth *eth.Peer + snap *snap.Peer +} + +func NewPeer() *Peer { + return &Peer{} +} + type protocolHandler struct { chain *core.BlockChain eventMux *event.TypeMux downloader *downloader.Downloader db ethdb.Database + helper SyncHelper + + peersLock sync.RWMutex + peers map[string]*Peer - done atomic.Bool + confirmed *types.Header + checkpoint *types.Header + syncing atomic.Bool } -func NewProtocolHandler(db ethdb.Database, bc *core.BlockChain) *protocolHandler { +func NewProtocolHandler(db ethdb.Database, bc *core.BlockChain, helper SyncHelper, syncing bool) *protocolHandler { evMux := new(event.TypeMux) p := &protocolHandler{ chain: bc, eventMux: evMux, db: db, + helper: helper, + peers: make(map[string]*Peer), } + p.syncing.Store(syncing) peerDrop := func(id string) { log.Info("dropping peer", "id", id) } success := func() { - p.done.Store(true) + p.syncing.Store(false) log.Info("DOWNLOADER DONE") } p.downloader = downloader.New(db, evMux, bc, nil, peerDrop, success) @@ -70,14 +97,108 @@ func (h *protocolHandler) MakeProtocols(dnsdisc enode.Iterator) []p2p.Protocol { return protos } +func (h *protocolHandler) getCreatePeer(id string) *Peer { + h.peersLock.Lock() + defer h.peersLock.Unlock() + peer := h.peers[id] + if peer != nil { + return peer + } + peer = NewPeer() + h.peers[id] = peer + return peer +} + +func (h *protocolHandler) getPeer(id string) *Peer { + h.peersLock.RLock() + defer h.peersLock.RUnlock() + return h.peers[id] +} + type arbHandler protocolHandler func (h *arbHandler) PeerInfo(id enode.ID) interface{} { return nil } -func (h *arbHandler) LastConfirmed(id enode.ID, confirmed *types.Header) error { - return nil +func (h *arbHandler) HandleLastConfirmed(peer *arb.Peer, confirmed *types.Header, node uint64) { + // TODO: validate confirmed + validated := false + valid := false + if h.confirmed != nil { + if confirmed.Number.Cmp(h.confirmed.Number) == 0 { + validated = true + valid = h.confirmed.Hash() == confirmed.Hash() + } + } + if !validated { + var err error + valid, err = h.helper.ValidateConfirmed(confirmed, node) + if err != nil { + log.Error("error in validate confirmed", "id", peer.ID(), "err", err) + return + } + } + hPeer := (*protocolHandler)(h).getPeer(peer.ID()) + if hPeer == nil { + log.Warn("hPeer not found on HandleLastConfirmed") + return + } + if !valid { + //TODO: remove peer + return + } + peer.RequestCheckpoint(nil) + h.confirmed = confirmed + log.Info("lastconfirmed", "confirmed", h.confirmed, "checkpoint", "h.checkpoint") + h.downloader.PivotSync(h.confirmed, h.checkpoint) +} + +func (h *arbHandler) HandleCheckpoint(peer *arb.Peer, checkpoint *types.Header, supported bool) { + log.Error("got checkpoint", "from", peer.ID(), "checkpoint", checkpoint, "supported", supported) + if !supported { + return + } + if h.checkpoint != nil && h.checkpoint.Number.Uint64() > checkpoint.Number.Uint64() { + return + } + // TODO: confirm + // TODO: advance? + hPeer := (*protocolHandler)(h).getPeer(peer.ID()) + if hPeer == nil { + log.Warn("hPeer not found on HandleLastConfirmed") + return + } + h.checkpoint = checkpoint + h.downloader.PivotSync(h.confirmed, h.checkpoint) +} + +func (h *arbHandler) LastConfirmed() (*types.Header, uint64, error) { + return h.helper.LastConfirmed() +} + +func (h *arbHandler) LastCheckpoint() (*types.Header, error) { + return h.helper.LastCheckpoint() +} + +func (h *arbHandler) CheckpointSupported(checkpoint *types.Header) (bool, error) { + return h.helper.CheckpointSupported(checkpoint) +} + +func (h *arbHandler) RunPeer(peer *arb.Peer, handler arb.Handler) error { + //id := h.peers[] + hPeer := (*protocolHandler)(h).getCreatePeer(peer.ID()) + if hPeer.arb != nil { + return fmt.Errorf("peer id already known") + } + hPeer.arb = peer + if h.syncing.Load() { + err := peer.RequestLastConfirmed() + if err != nil { + return err + } + } + return handler(peer) } // ethHandler implements the eth.Backend interface to handle the various network @@ -96,11 +217,15 @@ func (h *ethHandler) TxPool() eth.TxPool { return &dummyTxPool{} } // RunPeer is invoked when a peer joins on the `eth` protocol. func (h *ethHandler) RunPeer(peer *eth.Peer, hand eth.Handler) error { + hPeer := (*protocolHandler)(h).getCreatePeer(peer.ID()) + if hPeer.eth != nil { + return fmt.Errorf("peer id already known") + } + hPeer.eth = peer if err := h.downloader.RegisterPeer(peer.ID(), peer.Version(), peer); err != nil { peer.Log().Error("Failed to register peer in eth syncer", "err", err) return err } - log.Info("eth peer") return hand(peer) } @@ -226,11 +351,15 @@ func (h *snapHandler) StorageIterator(root, account, origin common.Hash) (snapsh // RunPeer is invoked when a peer joins on the `snap` protocol. func (h *snapHandler) RunPeer(peer *snap.Peer, hand snap.Handler) error { + hPeer := (*protocolHandler)(h).getCreatePeer(peer.ID()) + if hPeer.snap != nil { + return fmt.Errorf("peer id already known") + } + hPeer.snap = peer if err := h.downloader.SnapSyncer.Register(peer); err != nil { peer.Log().Error("Failed to register peer in snap syncer", "err", err) return err } - log.Info("snap peer") return hand(peer) } diff --git a/arbitrum/sync_test.go b/arbitrum/sync_test.go index 8fcd990a49..c5b0f2238d 100644 --- a/arbitrum/sync_test.go +++ b/arbitrum/sync_test.go @@ -56,6 +56,27 @@ func (i *dummyIterator) Close() { // ends the iterator i.nodes = nil } +type dummySyncHelper struct { + confirmed *types.Header + checkpoint *types.Header +} + +func (d *dummySyncHelper) LastConfirmed() (*types.Header, uint64, error) { + return d.confirmed, 0, nil +} + +func (d *dummySyncHelper) LastCheckpoint() (*types.Header, error) { + return d.checkpoint, nil +} + +func (d *dummySyncHelper) CheckpointSupported(*types.Header) (bool, error) { + return true, nil +} + +func (d *dummySyncHelper) ValidateConfirmed(*types.Header, uint64) (bool, error) { + return true, nil +} + func testHasBlock(t *testing.T, chain *core.BlockChain, block *types.Block, shouldHaveState bool) { t.Helper() hasHeader := chain.GetHeaderByNumber(block.NumberU64()) @@ -234,7 +255,7 @@ func TestSimpleSync(t *testing.T) { testHasBlock(t, sourceChain, blocks[pivotBlockNum], false) // source node - sourceHandler := NewProtocolHandler(sourceDb, sourceChain) + sourceHandler := NewProtocolHandler(sourceDb, sourceChain, &dummySyncHelper{syncBlock.Header(), pivotBlock.Header()}, false) sourceStack.RegisterProtocols(sourceHandler.MakeProtocols(&dummyIterator{})) sourceStack.Start() @@ -264,19 +285,16 @@ func TestSimpleSync(t *testing.T) { t.Fatal(err) } destChain, _ := core.NewBlockChain(destDb, nil, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, nil) - destHandler := NewProtocolHandler(destDb, destChain) + destHandler := NewProtocolHandler(destDb, destChain, &dummySyncHelper{}, true) destStack.RegisterProtocols(destHandler.MakeProtocols(iter)) - destStack.Start() // start sync log.Info("dest listener", "address", destStack.Server().Config.ListenAddr) log.Info("initial source", "head", sourceChain.CurrentBlock()) log.Info("initial dest", "head", destChain.CurrentBlock()) log.Info("pivot", "head", pivotBlock.Header()) - err = destHandler.downloader.PivotSync(syncBlock.Header(), pivotBlock.Header()) - if err != nil { - t.Fatal(err) - } + destStack.Start() + <-time.After(time.Second * 5) log.Info("final source", "head", sourceChain.CurrentBlock()) diff --git a/eth/protocols/arb/handler.go b/eth/protocols/arb/handler.go index 4913f41d87..8a2f99b8e4 100644 --- a/eth/protocols/arb/handler.go +++ b/eth/protocols/arb/handler.go @@ -4,9 +4,11 @@ import ( "fmt" "time" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" ) type Peer struct { @@ -21,8 +23,25 @@ func NewPeer(p2pPeer *p2p.Peer, rw p2p.MsgReadWriter) *Peer { } } -func (p *Peer) Run(backend Backend) error { - return Handle(backend, p) +func (p *Peer) RequestCheckpoint(header *types.Header) error { + if header == nil { + return p2p.Send(p.rw, GetLastCheckpointMsg, struct{}{}) + } + return p2p.Send(p.rw, CheckpointQueryMsg, &CheckpointQueryPacket{ + Header: header, + }) +} + +func (p *Peer) RequestLastConfirmed() error { + return p2p.Send(p.rw, GetLastConfirmedMsg, struct{}{}) +} + +func (p *Peer) ID() string { + return p.p2pPeer.ID().String() +} + +func (p *Peer) Node() *enode.Node { + return p.p2pPeer.Node() } // Handle is the callback invoked to manage the life cycle of a `snap` peer. @@ -60,14 +79,61 @@ func HandleMessage(backend Backend, peer *Peer) error { metrics.GetOrRegisterHistogramLazy(h, nil, sampler).Update(time.Since(start).Microseconds()) }(start) } - if msg.Code == GetLastConfirmedMsg { - response := LastConfirmedMsgPacket{} + switch { + case msg.Code == GetLastConfirmedMsg: + confirmed, node, err := backend.LastConfirmed() + if err != nil || confirmed == nil { + return err + } + response := LastConfirmedMsgPacket{ + Header: confirmed, + Node: node, + } return p2p.Send(peer.rw, LastConfirmedMsg, &response) - } - if msg.Code == LastConfirmedMsg { - incoming := LastConfirmedMsgPacket{} - msg.Decode(&incoming) - return backend.LastConfirmed(peer.p2pPeer.ID(), &incoming.header) + case msg.Code == LastConfirmedMsg: + var incoming LastConfirmedMsgPacket + err := msg.Decode(&incoming) + if err != nil { + return err + } + if incoming.Header == nil { + return nil + } + backend.HandleLastConfirmed(peer, incoming.Header, incoming.Node) + return nil + case msg.Code == GetLastCheckpointMsg: + checkpoint, err := backend.LastCheckpoint() + if err != nil { + return err + } + response := CheckpointMsgPacket{ + Header: checkpoint, + HasState: true, + } + return p2p.Send(peer.rw, CheckpointMsg, &response) + case msg.Code == CheckpointQueryMsg: + incoming := CheckpointQueryPacket{} + err := msg.Decode(&incoming) + if err != nil { + return err + } + hasState, err := backend.CheckpointSupported(incoming.Header) + if err != nil { + return err + } + response := CheckpointMsgPacket{ + Header: incoming.Header, + HasState: hasState, + } + return p2p.Send(peer.rw, CheckpointMsg, &response) + case msg.Code == CheckpointMsg: + incoming := CheckpointMsgPacket{} + err := msg.Decode(&incoming) + if err != nil { + return err + } + backend.HandleCheckpoint(peer, incoming.Header, incoming.HasState) + return nil } return fmt.Errorf("Invalid message: %v", msg.Code) } diff --git a/eth/protocols/arb/protocol.go b/eth/protocols/arb/protocol.go index 4566f1bbbc..62a377f104 100644 --- a/eth/protocols/arb/protocol.go +++ b/eth/protocols/arb/protocol.go @@ -22,21 +22,29 @@ var ProtocolVersions = []uint{ARB1} // protocolLengths are the number of implemented message corresponding to // different protocol versions. -var protocolLengths = map[uint]uint64{ARB1: 8} +var protocolLengths = map[uint]uint64{ARB1: ProtocolLenArb1} const ( - GetLastConfirmedMsg = 0x00 - LastConfirmedMsg = 0x01 - GetLastCheckpointMsg = 0x02 - LastCheckpointMsg = 0x03 - QueryCheckpointSupportedMsg = 0x04 - CheckpointSupportedMsg = 0x05 - ProtocolLenArb1 = 6 + GetLastConfirmedMsg = 0x00 + LastConfirmedMsg = 0x01 + GetLastCheckpointMsg = 0x02 + CheckpointQueryMsg = 0x03 + CheckpointMsg = 0x04 + ProtocolLenArb1 = 5 ) type LastConfirmedMsgPacket struct { - header types.Header - node uint64 + Header *types.Header + Node uint64 +} + +type CheckpointMsgPacket struct { + Header *types.Header + HasState bool +} + +type CheckpointQueryPacket struct { + Header *types.Header } // NodeInfo represents a short summary of the `arb` sub-protocol metadata @@ -48,9 +56,22 @@ func nodeInfo() *NodeInfo { return &NodeInfo{} } +type Handler func(peer *Peer) error + +// Backend defines the data retrieval methods to serve remote requests and the +// callback methods to invoke on remote deliveries. type Backend interface { PeerInfo(id enode.ID) interface{} - LastConfirmed(id enode.ID, confirmed *types.Header) error + HandleLastConfirmed(peer *Peer, confirmed *types.Header, node uint64) + HandleCheckpoint(peer *Peer, header *types.Header, supported bool) + LastConfirmed() (*types.Header, uint64, error) + LastCheckpoint() (*types.Header, error) + CheckpointSupported(*types.Header) (bool, error) + // RunPeer is invoked when a peer joins on the `eth` protocol. The handler + // should do any peer maintenance work, handshakes and validations. If all + // is passed, control should be given back to the `handler` to process the + // inbound messages going forward. + RunPeer(peer *Peer, handler Handler) error } func MakeProtocols(backend Backend, dnsdisc enode.Iterator) []p2p.Protocol { @@ -64,7 +85,9 @@ func MakeProtocols(backend Backend, dnsdisc enode.Iterator) []p2p.Protocol { Length: protocolLengths[version], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := NewPeer(p, rw) - return peer.Run(backend) + return backend.RunPeer(peer, func(peer *Peer) error { + return Handle(backend, peer) + }) }, NodeInfo: func() interface{} { return nodeInfo() From 018e4700072b1f7a956f411a3dd814633ae9edea Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Tue, 7 Nov 2023 15:28:24 -0700 Subject: [PATCH 11/28] sync_test: add bad node --- arbitrum/sync_test.go | 81 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 67 insertions(+), 14 deletions(-) diff --git a/arbitrum/sync_test.go b/arbitrum/sync_test.go index c5b0f2238d..d3ef1f5a64 100644 --- a/arbitrum/sync_test.go +++ b/arbitrum/sync_test.go @@ -66,6 +66,9 @@ func (d *dummySyncHelper) LastConfirmed() (*types.Header, uint64, error) { } func (d *dummySyncHelper) LastCheckpoint() (*types.Header, error) { + if d.confirmed == nil { + return nil, nil + } return d.checkpoint, nil } @@ -73,8 +76,17 @@ func (d *dummySyncHelper) CheckpointSupported(*types.Header) (bool, error) { return true, nil } -func (d *dummySyncHelper) ValidateConfirmed(*types.Header, uint64) (bool, error) { - return true, nil +func (d *dummySyncHelper) ValidateConfirmed(header *types.Header, node uint64) (bool, error) { + if d.confirmed == nil { + return true, nil + } + if header == nil { + return false, nil + } + if d.confirmed.Hash() == header.Hash() { + return true, nil + } + return false, nil } func testHasBlock(t *testing.T, chain *core.BlockChain, block *types.Block, shouldHaveState bool) { @@ -95,6 +107,11 @@ func testHasBlock(t *testing.T, chain *core.BlockChain, block *types.Block, shou } } +func portFromAddress(address string) (int, error) { + splitAddr := strings.Split(address, ":") + return strconv.Atoi(splitAddr[len(splitAddr)-1]) +} + func TestSimpleSync(t *testing.T) { const pivotBlockNum = 50 const syncBlockNum = 70 @@ -116,6 +133,12 @@ func TestSimpleSync(t *testing.T) { t.Fatal("generate key err:", err) } + // key for bad node p2p + badNodeKey, err := crypto.GenerateKey() + if err != nil { + t.Fatal("generate key err:", err) + } + // source node sourceStackConf := node.DefaultConfig sourceStackConf.DataDir = t.TempDir() @@ -134,6 +157,7 @@ func TestSimpleSync(t *testing.T) { // create and populate chain + // code for contractcodehex below: // pragma solidity ^0.8.20; // // contract Temmp { @@ -235,16 +259,12 @@ func TestSimpleSync(t *testing.T) { } } } + pivotBlock := blocks[pivotBlockNum-1] + syncBlock := blocks[syncBlockNum-1] if _, err := sourceChain.InsertChain(blocks[:pivotBlockNum]); err != nil { t.Fatal(err) } - - pivotBlock := blocks[pivotBlockNum-1] - syncBlock := blocks[syncBlockNum-1] - - testHasBlock(t, sourceChain, pivotBlock, true) - sourceChain.TrieDB().Commit(pivotBlock.Root(), true) - + sourceChain.TrieDB().Commit(blocks[pivotBlockNum-1].Root(), true) if _, err := sourceChain.InsertChain(blocks[pivotBlockNum:]); err != nil { t.Fatal(err) } @@ -259,16 +279,49 @@ func TestSimpleSync(t *testing.T) { sourceStack.RegisterProtocols(sourceHandler.MakeProtocols(&dummyIterator{})) sourceStack.Start() + // bad node (on wrong blockchain) + _, badBlocks, _ := core.GenerateChainWithGenesis(gspec, ethash.NewFaker(), syncBlockNum+extraBlocks, func(i int, gen *core.BlockGen) { + creationNonce := gen.TxNonce(testUser2Address) + tx, err := types.SignTx(types.NewContractCreation(creationNonce, new(big.Int), 1000000, gen.BaseFee(), contractCode), signer, testUser2) + if err != nil { + t.Fatalf("failed to create contract: %v", err) + } + gen.AddTx(tx) + }) + badStackConf := sourceStackConf + badStackConf.DataDir = t.TempDir() + badStackConf.P2P.PrivateKey = badNodeKey + badStack, err := node.New(&badStackConf) + + badDb, err := badStack.OpenDatabaseWithFreezer("l2chaindata", 2048, 512, "", "", false) + if err != nil { + t.Fatal(err) + } + badChain, _ := core.NewBlockChain(badDb, nil, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, nil) + if _, err := badChain.InsertChain(badBlocks[:pivotBlockNum]); err != nil { + t.Fatal(err) + } + badChain.TrieDB().Commit(badBlocks[pivotBlockNum-1].Root(), true) + if _, err := badChain.InsertChain(badBlocks[pivotBlockNum:]); err != nil { + t.Fatal(err) + } + badHandler := NewProtocolHandler(badDb, badChain, &dummySyncHelper{badBlocks[syncBlockNum-1].Header(), badBlocks[pivotBlockNum-1].Header()}, false) + badStack.RegisterProtocols(badHandler.MakeProtocols(&dummyIterator{})) + badStack.Start() + // figure out port of the source node and create dummy iter that points to it - sourceListenAddr := sourceStack.Server().Config.ListenAddr - splitAddr := strings.Split(sourceListenAddr, ":") - sourcePort, err := strconv.Atoi(splitAddr[len(splitAddr)-1]) + sourcePort, err := portFromAddress(sourceStack.Server().Config.ListenAddr) + if err != nil { + t.Fatal(err) + } + badNodePort, err := portFromAddress(badStack.Server().Config.ListenAddr) if err != nil { t.Fatal(err) } + badEnode := enode.NewV4(&badNodeKey.PublicKey, net.IPv4(127, 0, 0, 1), badNodePort, 0) sourceEnode := enode.NewV4(&sourceKey.PublicKey, net.IPv4(127, 0, 0, 1), sourcePort, 0) iter := &dummyIterator{ - nodes: []*enode.Node{nil, sourceEnode}, + nodes: []*enode.Node{nil, badEnode, sourceEnode}, } // dest node @@ -285,7 +338,7 @@ func TestSimpleSync(t *testing.T) { t.Fatal(err) } destChain, _ := core.NewBlockChain(destDb, nil, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, nil) - destHandler := NewProtocolHandler(destDb, destChain, &dummySyncHelper{}, true) + destHandler := NewProtocolHandler(destDb, destChain, &dummySyncHelper{syncBlock.Header(), nil}, true) destStack.RegisterProtocols(destHandler.MakeProtocols(iter)) // start sync From c8f538cfced4e184e483d78600a6354e5c867871 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Wed, 8 Nov 2023 11:30:14 -0700 Subject: [PATCH 12/28] eth/snap: support nit enode.Iterator --- eth/protocols/snap/handler.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/eth/protocols/snap/handler.go b/eth/protocols/snap/handler.go index e25fc9a5e2..d552e15819 100644 --- a/eth/protocols/snap/handler.go +++ b/eth/protocols/snap/handler.go @@ -88,10 +88,12 @@ type Backend interface { // MakeProtocols constructs the P2P protocol definitions for `snap`. func MakeProtocols(backend Backend, dnsdisc enode.Iterator) []p2p.Protocol { // Filter the discovery iterator for nodes advertising snap support. - dnsdisc = enode.Filter(dnsdisc, func(n *enode.Node) bool { - var snap enrEntry - return n.Load(&snap) == nil - }) + if dnsdisc != nil { + dnsdisc = enode.Filter(dnsdisc, func(n *enode.Node) bool { + var snap enrEntry + return n.Load(&snap) == nil + }) + } protocols := make([]p2p.Protocol, len(ProtocolVersions)) for i, version := range ProtocolVersions { From 72e435d3da91637dba10feabbc8558c1a511c0e9 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Wed, 8 Nov 2023 11:30:55 -0700 Subject: [PATCH 13/28] arb p2p: support drop peer --- arbitrum/handler_p2p.go | 74 ++++++++++++++++++++++++++++++++--------- 1 file changed, 59 insertions(+), 15 deletions(-) diff --git a/arbitrum/handler_p2p.go b/arbitrum/handler_p2p.go index 658d4ea152..7c7d331f7e 100644 --- a/arbitrum/handler_p2p.go +++ b/arbitrum/handler_p2p.go @@ -45,9 +45,10 @@ type SyncHelper interface { } type Peer struct { - arb *arb.Peer - eth *eth.Peer - snap *snap.Peer + mutex sync.Mutex + arb *arb.Peer + eth *eth.Peer + snap *snap.Peer } func NewPeer() *Peer { @@ -79,20 +80,17 @@ func NewProtocolHandler(db ethdb.Database, bc *core.BlockChain, helper SyncHelpe peers: make(map[string]*Peer), } p.syncing.Store(syncing) - peerDrop := func(id string) { - log.Info("dropping peer", "id", id) - } success := func() { p.syncing.Store(false) log.Info("DOWNLOADER DONE") } - p.downloader = downloader.New(db, evMux, bc, nil, peerDrop, success) + p.downloader = downloader.New(db, evMux, bc, nil, p.peerDrop, success) return p } func (h *protocolHandler) MakeProtocols(dnsdisc enode.Iterator) []p2p.Protocol { - protos := eth.MakeProtocols((*ethHandler)(h), h.chain.Config().ChainID.Uint64(), dnsdisc) - protos = append(protos, snap.MakeProtocols((*snapHandler)(h), dnsdisc)...) + protos := eth.MakeProtocols((*ethHandler)(h), h.chain.Config().ChainID.Uint64(), nil) + protos = append(protos, snap.MakeProtocols((*snapHandler)(h), nil)...) protos = append(protos, arb.MakeProtocols((*arbHandler)(h), dnsdisc)...) return protos } @@ -109,12 +107,47 @@ func (h *protocolHandler) getCreatePeer(id string) *Peer { return peer } +func (h *protocolHandler) getRemovePeer(id string) *Peer { + h.peersLock.Lock() + defer h.peersLock.Unlock() + peer := h.peers[id] + if peer != nil { + h.peers[id] = nil + } + return peer +} + func (h *protocolHandler) getPeer(id string) *Peer { h.peersLock.RLock() defer h.peersLock.RUnlock() return h.peers[id] } +func (h *protocolHandler) peerDrop(id string) { + log.Info("dropping peer", "id", id) + hPeer := h.getRemovePeer(id) + if hPeer == nil { + return + } + hPeer.mutex.Lock() + defer hPeer.mutex.Unlock() + hPeer.arb = nil + if hPeer.eth != nil { + hPeer.eth.Disconnect(p2p.DiscSelf) + err := h.downloader.UnregisterPeer(id) + if err != nil { + log.Warn("failed deregistering peer from downloader", "err", err) + } + hPeer.eth = nil + } + if hPeer.snap != nil { + err := h.downloader.SnapSyncer.Unregister(id) + if err != nil { + log.Warn("failed deregistering peer from downloader", "err", err) + } + } +} + type arbHandler protocolHandler func (h *arbHandler) PeerInfo(id enode.ID) interface{} { @@ -139,15 +172,15 @@ func (h *arbHandler) HandleLastConfirmed(peer *arb.Peer, confirmed *types.Header return } } + if !valid { + (*protocolHandler)(h).peerDrop(peer.ID()) + return + } hPeer := (*protocolHandler)(h).getPeer(peer.ID()) if hPeer == nil { log.Warn("hPeer not found on HandleLastConfirmed") return } - if !valid { - //TODO: remove peer - return - } peer.RequestCheckpoint(nil) h.confirmed = confirmed log.Info("lastconfirmed", "confirmed", h.confirmed, "checkpoint", "h.checkpoint") @@ -188,10 +221,13 @@ func (h *arbHandler) CheckpointSupported(checkpoint *types.Header) (bool, error) func (h *arbHandler) RunPeer(peer *arb.Peer, handler arb.Handler) error { //id := h.peers[] hPeer := (*protocolHandler)(h).getCreatePeer(peer.ID()) + hPeer.mutex.Lock() if hPeer.arb != nil { + hPeer.mutex.Unlock() return fmt.Errorf("peer id already known") } hPeer.arb = peer + hPeer.mutex.Unlock() if h.syncing.Load() { err := peer.RequestLastConfirmed() if err != nil { @@ -218,11 +254,15 @@ func (h *ethHandler) TxPool() eth.TxPool { return &dummyTxPool{} } // RunPeer is invoked when a peer joins on the `eth` protocol. func (h *ethHandler) RunPeer(peer *eth.Peer, hand eth.Handler) error { hPeer := (*protocolHandler)(h).getCreatePeer(peer.ID()) + hPeer.mutex.Lock() if hPeer.eth != nil { + hPeer.mutex.Unlock() return fmt.Errorf("peer id already known") } hPeer.eth = peer - if err := h.downloader.RegisterPeer(peer.ID(), peer.Version(), peer); err != nil { + err := h.downloader.RegisterPeer(peer.ID(), peer.Version(), peer) + hPeer.mutex.Unlock() + if err != nil { peer.Log().Error("Failed to register peer in eth syncer", "err", err) return err } @@ -352,11 +392,15 @@ func (h *snapHandler) StorageIterator(root, account, origin common.Hash) (snapsh // RunPeer is invoked when a peer joins on the `snap` protocol. func (h *snapHandler) RunPeer(peer *snap.Peer, hand snap.Handler) error { hPeer := (*protocolHandler)(h).getCreatePeer(peer.ID()) + hPeer.mutex.Lock() if hPeer.snap != nil { + hPeer.mutex.Unlock() return fmt.Errorf("peer id already known") } hPeer.snap = peer - if err := h.downloader.SnapSyncer.Register(peer); err != nil { + err := h.downloader.SnapSyncer.Register(peer) + hPeer.mutex.Unlock() + if err != nil { peer.Log().Error("Failed to register peer in snap syncer", "err", err) return err } From f0d79911de28e1422126f7db9e8679ef11659c17 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Wed, 8 Nov 2023 16:46:10 -0700 Subject: [PATCH 14/28] sync_test: remove empty line --- arbitrum/sync_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/arbitrum/sync_test.go b/arbitrum/sync_test.go index d3ef1f5a64..46d0a6e33e 100644 --- a/arbitrum/sync_test.go +++ b/arbitrum/sync_test.go @@ -169,7 +169,6 @@ func TestSimpleSync(t *testing.T) { // return ""; // } // } - contractCodeHex := "608060405234801561001057600080fd5b50610218806100206000396000f3fe608060405260003660606000838360009060209261001f9392919061008a565b9061002a91906100e7565b60001c9050600160008261ffff1662010000811061004b5761004a610146565b5b01600082825461005b91906101ae565b9250508190555060405180602001604052806000815250915050915050805190602001f35b600080fd5b600080fd5b6000808585111561009e5761009d610080565b5b838611156100af576100ae610085565b5b6001850283019150848603905094509492505050565b600082905092915050565b6000819050919050565b600082821b905092915050565b60006100f383836100c5565b826100fe81356100d0565b9250602082101561013e576101397fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff836020036008026100da565b831692505b505092915050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052603260045260246000fd5b6000819050919050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052601160045260246000fd5b60006101b982610175565b91506101c483610175565b92508282019050808211156101dc576101db61017f565b5b9291505056fea26469706673582212202777d6cb94519b9aa7026cf6dad162739731e124c6379b15c343ff1c6e84a5f264736f6c63430008150033" contractCode, err := hex.DecodeString(contractCodeHex) if err != nil { From 41d5ed382c7d2eb609deea039534fe3c3d9d2aa3 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Thu, 9 Nov 2023 13:19:10 -0700 Subject: [PATCH 15/28] snap backend for tests --- eth/downloader/downloader_test.go | 42 +++++++++++++++++++++++++++--- tests/fuzzers/snap/fuzz_handler.go | 23 +++++++++++++++- 2 files changed, 60 insertions(+), 5 deletions(-) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 3fc2f7142a..d8c6f05019 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -31,12 +31,14 @@ import ( "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" @@ -153,6 +155,38 @@ type downloadTesterPeer struct { withholdHeaders map[common.Hash]struct{} } +type snapBackend struct { + chain *core.BlockChain +} + +func (d *snapBackend) ContractCodeWithPrefix(codeHash common.Hash) ([]byte, error) { + return d.chain.ContractCodeWithPrefix(codeHash) +} + +func (d *snapBackend) TrieDB() *trie.Database { + return d.chain.TrieDB() +} + +func (d *snapBackend) Snapshot(root common.Hash) snapshot.Snapshot { + return d.chain.Snapshots().Snapshot(root) +} + +func (d *snapBackend) AccountIterator(root, account common.Hash) (snapshot.AccountIterator, error) { + return d.chain.Snapshots().AccountIterator(root, account) +} + +func (d *snapBackend) StorageIterator(root, account, origin common.Hash) (snapshot.StorageIterator, error) { + return d.chain.Snapshots().StorageIterator(root, account, origin) +} + +func (d *snapBackend) RunPeer(*snap.Peer, snap.Handler) error { return nil } +func (d *snapBackend) PeerInfo(enode.ID) interface{} { return "Foo" } +func (d *snapBackend) Handle(*snap.Peer, snap.Packet) error { return nil } + +func (dlp *downloadTesterPeer) SnapBackend() *snapBackend { + return &snapBackend{dlp.chain} +} + // Head constructs a function to retrieve a peer's current head hash // and total difficulty. func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) { @@ -344,7 +378,7 @@ func (dlp *downloadTesterPeer) RequestAccountRange(id uint64, root, origin, limi Limit: limit, Bytes: bytes, } - slimaccs, proofs := snap.ServiceGetAccountRangeQuery(dlp.chain, req) + slimaccs, proofs := snap.ServiceGetAccountRangeQuery(dlp.SnapBackend(), req) // We need to convert to non-slim format, delegate to the packet code res := &snap.AccountRangePacket{ @@ -371,7 +405,7 @@ func (dlp *downloadTesterPeer) RequestStorageRanges(id uint64, root common.Hash, Limit: limit, Bytes: bytes, } - storage, proofs := snap.ServiceGetStorageRangesQuery(dlp.chain, req) + storage, proofs := snap.ServiceGetStorageRangesQuery(dlp.SnapBackend(), req) // We need to convert to demultiplex, delegate to the packet code res := &snap.StorageRangesPacket{ @@ -392,7 +426,7 @@ func (dlp *downloadTesterPeer) RequestByteCodes(id uint64, hashes []common.Hash, Hashes: hashes, Bytes: bytes, } - codes := snap.ServiceGetByteCodesQuery(dlp.chain, req) + codes := snap.ServiceGetByteCodesQuery(dlp.SnapBackend(), req) go dlp.dl.downloader.SnapSyncer.OnByteCodes(dlp, id, codes) return nil } @@ -406,7 +440,7 @@ func (dlp *downloadTesterPeer) RequestTrieNodes(id uint64, root common.Hash, pat Paths: paths, Bytes: bytes, } - nodes, _ := snap.ServiceGetTrieNodesQuery(dlp.chain, req, time.Now()) + nodes, _ := snap.ServiceGetTrieNodesQuery(dlp.SnapBackend(), req, time.Now()) go dlp.dl.downloader.SnapSyncer.OnTrieNodes(dlp, id, nodes) return nil } diff --git a/tests/fuzzers/snap/fuzz_handler.go b/tests/fuzzers/snap/fuzz_handler.go index 060c67188b..4ddb9e26de 100644 --- a/tests/fuzzers/snap/fuzz_handler.go +++ b/tests/fuzzers/snap/fuzz_handler.go @@ -27,12 +27,14 @@ import ( "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" fuzz "github.com/google/gofuzz" ) @@ -87,7 +89,26 @@ type dummyBackend struct { chain *core.BlockChain } -func (d *dummyBackend) Chain() *core.BlockChain { return d.chain } +func (d *dummyBackend) ContractCodeWithPrefix(codeHash common.Hash) ([]byte, error) { + return d.chain.ContractCodeWithPrefix(codeHash) +} + +func (d *dummyBackend) TrieDB() *trie.Database { + return d.chain.TrieDB() +} + +func (d *dummyBackend) Snapshot(root common.Hash) snapshot.Snapshot { + return d.chain.Snapshots().Snapshot(root) +} + +func (d *dummyBackend) AccountIterator(root, account common.Hash) (snapshot.AccountIterator, error) { + return d.chain.Snapshots().AccountIterator(root, account) +} + +func (d *dummyBackend) StorageIterator(root, account, origin common.Hash) (snapshot.StorageIterator, error) { + return d.chain.Snapshots().StorageIterator(root, account, origin) +} + func (d *dummyBackend) RunPeer(*snap.Peer, snap.Handler) error { return nil } func (d *dummyBackend) PeerInfo(enode.ID) interface{} { return "Foo" } func (d *dummyBackend) Handle(*snap.Peer, snap.Packet) error { return nil } From 41dc3640b3d0c96731f88d2066198d4e1a9ac185 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Thu, 9 Nov 2023 13:19:46 -0700 Subject: [PATCH 16/28] lint fixes --- arbitrum/sync_test.go | 5 ++++- eth/downloader/beaconsync.go | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/arbitrum/sync_test.go b/arbitrum/sync_test.go index 46d0a6e33e..8880c5a36c 100644 --- a/arbitrum/sync_test.go +++ b/arbitrum/sync_test.go @@ -118,7 +118,7 @@ func TestSimpleSync(t *testing.T) { const extraBlocks = 200 glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.TerminalFormat(false))) - glogger.Verbosity(log.Lvl(log.LvlTrace)) + glogger.Verbosity(log.LvlTrace) log.Root().SetHandler(glogger) // key for source node p2p @@ -291,6 +291,9 @@ func TestSimpleSync(t *testing.T) { badStackConf.DataDir = t.TempDir() badStackConf.P2P.PrivateKey = badNodeKey badStack, err := node.New(&badStackConf) + if err != nil { + t.Fatal(err) + } badDb, err := badStack.OpenDatabaseWithFreezer("l2chaindata", 2048, 512, "", "", false) if err != nil { diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index 188fefc83f..ca8b5bd39c 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -311,7 +311,7 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error { // If the pivot became stale (older than 2*64-8 (bit of wiggle room)), // move it ahead to HEAD-64 d.pivotLock.Lock() - if d.pivotHeader != nil && d.pivotExplicit == false { + if d.pivotHeader != nil && !d.pivotExplicit { if head.Number.Uint64() > d.pivotHeader.Number.Uint64()+2*uint64(fsMinFullBlocks)-8 { // Retrieve the next pivot header, either from skeleton chain // or the filled chain From 4999bac657f06678c37a17bd5301d52860eac4a2 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Wed, 15 Nov 2023 08:58:25 -0700 Subject: [PATCH 17/28] eth downloader: export backfiller and get as param --- arbitrum/handler_p2p.go | 11 +++++++---- eth/downloader/beaconsync.go | 4 ++-- eth/downloader/downloader.go | 4 ++-- eth/downloader/downloader_test.go | 3 ++- eth/downloader/skeleton.go | 12 ++++++------ eth/downloader/skeleton_test.go | 2 +- eth/handler.go | 27 +++++++++++++++------------ 7 files changed, 35 insertions(+), 28 deletions(-) diff --git a/arbitrum/handler_p2p.go b/arbitrum/handler_p2p.go index 7c7d331f7e..28c7e4b268 100644 --- a/arbitrum/handler_p2p.go +++ b/arbitrum/handler_p2p.go @@ -80,11 +80,14 @@ func NewProtocolHandler(db ethdb.Database, bc *core.BlockChain, helper SyncHelpe peers: make(map[string]*Peer), } p.syncing.Store(syncing) - success := func() { - p.syncing.Store(false) - log.Info("DOWNLOADER DONE") + backfillerCreator := func(dl *downloader.Downloader) downloader.Backfiller { + success := func() { + p.syncing.Store(false) + log.Info("DOWNLOADER DONE") + } + return downloader.NewBeaconBackfiller(dl, success) } - p.downloader = downloader.New(db, evMux, bc, nil, p.peerDrop, success) + p.downloader = downloader.New(db, evMux, bc, nil, p.peerDrop, backfillerCreator) return p } diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index ca8b5bd39c..c763a77259 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -42,8 +42,8 @@ type beaconBackfiller struct { lock sync.Mutex // Mutex protecting the sync lock } -// newBeaconBackfiller is a helper method to create the backfiller. -func newBeaconBackfiller(dl *Downloader, success func()) backfiller { +// NewBeaconBackfiller is a helper method to create the backfiller. +func NewBeaconBackfiller(dl *Downloader, success func()) Backfiller { return &beaconBackfiller{ downloader: dl, success: success, diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 461f5dab0d..65f40483d6 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -217,7 +217,7 @@ type BlockChain interface { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, success func()) *Downloader { +func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, backFillerCreator func(*Downloader) Backfiller) *Downloader { if lightchain == nil { lightchain = chain } @@ -236,7 +236,7 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchai syncStartBlock: chain.CurrentSnapBlock().Number.Uint64(), } // Create the post-merge skeleton syncer and start the process - dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success)) + dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, backFillerCreator(dl)) go dl.stateFetcher() return dl diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index d8c6f05019..318a0cd764 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -83,7 +83,8 @@ func newTesterWithNotification(t *testing.T, success func()) *downloadTester { chain: chain, peers: make(map[string]*downloadTesterPeer), } - tester.downloader = New(db, new(event.TypeMux), tester.chain, nil, tester.dropPeer, success) + backfillerCreator := func(dl *Downloader) Backfiller { return NewBeaconBackfiller(dl, success) } + tester.downloader = New(db, new(event.TypeMux), tester.chain, nil, tester.dropPeer, backfillerCreator) return tester } diff --git a/eth/downloader/skeleton.go b/eth/downloader/skeleton.go index 12eb5700f8..dc2be0683f 100644 --- a/eth/downloader/skeleton.go +++ b/eth/downloader/skeleton.go @@ -146,7 +146,7 @@ type headerResponse struct { // backfiller is a callback interface through which the skeleton sync can tell // the downloader that it should suspend or resume backfilling on specific head // events (e.g. suspend on forks or gaps, resume on successful linkups). -type backfiller interface { +type Backfiller interface { // suspend requests the backfiller to abort any running full or snap sync // based on the skeleton chain as it might be invalid. The backfiller should // gracefully handle multiple consecutive suspends without a resume, even @@ -192,7 +192,7 @@ type backfiller interface { // for now. type skeleton struct { db ethdb.Database // Database backing the skeleton - filler backfiller // Chain syncer suspended/resumed by head events + filler Backfiller // Chain syncer suspended/resumed by head events peers *peerSet // Set of peers we can sync from idles map[string]*peerConnection // Set of idle peers in the current sync cycle @@ -219,7 +219,7 @@ type skeleton struct { // newSkeleton creates a new sync skeleton that tracks a potentially dangling // header chain until it's linked into an existing set of blocks. -func newSkeleton(db ethdb.Database, peers *peerSet, drop peerDropFn, filler backfiller) *skeleton { +func newSkeleton(db ethdb.Database, peers *peerSet, drop peerDropFn, filler Backfiller) *skeleton { sk := &skeleton{ db: db, filler: filler, @@ -1164,14 +1164,14 @@ func (s *skeleton) cleanStales(filled *types.Header) error { // Bounds retrieves the current head and tail tracked by the skeleton syncer // and optionally the last known finalized header if any was announced and if -// it is still in the sync range. This method is used by the backfiller, whose +// it is still in the sync range. This method is used by the Backfiller, whose // life cycle is controlled by the skeleton syncer. // // Note, the method will not use the internal state of the skeleton, but will // rather blindly pull stuff from the database. This is fine, because the back- // filler will only run when the skeleton chain is fully downloaded and stable. // There might be new heads appended, but those are atomic from the perspective -// of this method. Any head reorg will first tear down the backfiller and only +// of this method. Any head reorg will first tear down the Backfiller and only // then make the modification. func (s *skeleton) Bounds() (head *types.Header, tail *types.Header, final *types.Header, err error) { // Read the current sync progress from disk and figure out the current head. @@ -1204,7 +1204,7 @@ func (s *skeleton) Bounds() (head *types.Header, tail *types.Header, final *type } // Header retrieves a specific header tracked by the skeleton syncer. This method -// is meant to be used by the backfiller, whose life cycle is controlled by the +// is meant to be used by the Backfiller, whose life cycle is controlled by the // skeleton syncer. // // Note, outside the permitted runtimes, this method might return nil results and diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go index 6a76d78ac8..2908175361 100644 --- a/eth/downloader/skeleton_test.go +++ b/eth/downloader/skeleton_test.go @@ -46,7 +46,7 @@ type hookedBackfiller struct { // newHookedBackfiller creates a hooked backfiller with all callbacks disabled, // essentially acting as a noop. -func newHookedBackfiller() backfiller { +func newHookedBackfiller() Backfiller { return new(hookedBackfiller) } diff --git a/eth/handler.go b/eth/handler.go index f0b043166e..afb6f4ae41 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -161,21 +161,24 @@ func newHandler(config *handlerConfig) (*handler, error) { h.snapSync.Store(true) } } - // If sync succeeds, pass a callback to potentially disable snap sync mode - // and enable transaction propagation. - success := func() { - // If we were running snap sync and it finished, disable doing another - // round on next sync cycle - if h.snapSync.Load() { - log.Info("Snap sync complete, auto disabling") - h.snapSync.Store(false) + backfillerCreator := func(dl *downloader.Downloader) downloader.Backfiller { + // If sync succeeds, pass a callback to potentially disable snap sync mode + // and enable transaction propagation. + success := func() { + // If we were running snap sync and it finished, disable doing another + // round on next sync cycle + if h.snapSync.Load() { + log.Info("Snap sync complete, auto disabling") + h.snapSync.Store(false) + } + // If we've successfully finished a sync cycle, accept transactions from + // the network + h.acceptTxs.Store(true) } - // If we've successfully finished a sync cycle, accept transactions from - // the network - h.acceptTxs.Store(true) + return downloader.NewBeaconBackfiller(dl, success) } // Construct the downloader (long sync) - h.downloader = downloader.New(config.Database, h.eventMux, h.chain, nil, h.removePeer, success) + h.downloader = downloader.New(config.Database, h.eventMux, h.chain, nil, h.removePeer, backfillerCreator) if ttd := h.chain.Config().TerminalTotalDifficulty; ttd != nil { if h.chain.Config().TerminalTotalDifficultyPassed { log.Info("Chain post-merge, sync via beacon client") From dcea7bcd9b455a77c1e642368425c43fbc14c3db Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Tue, 21 Nov 2023 14:31:55 -0700 Subject: [PATCH 18/28] make functions in backfillerinterface public --- eth/downloader/beaconsync.go | 8 ++++---- eth/downloader/skeleton.go | 10 +++++----- eth/downloader/skeleton_test.go | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index c763a77259..56a2aad0dd 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -52,7 +52,7 @@ func NewBeaconBackfiller(dl *Downloader, success func()) Backfiller { // suspend cancels any background downloader threads and returns the last header // that has been successfully backfilled. -func (b *beaconBackfiller) suspend() *types.Header { +func (b *beaconBackfiller) Suspend() *types.Header { // If no filling is running, don't waste cycles b.lock.Lock() filling := b.filling @@ -80,7 +80,7 @@ func (b *beaconBackfiller) suspend() *types.Header { } // resume starts the downloader threads for backfilling state and chain data. -func (b *beaconBackfiller) resume() { +func (b *beaconBackfiller) Resume() { b.lock.Lock() if b.filling { // If a previous filling cycle is still running, just ignore this start @@ -134,8 +134,8 @@ func (b *beaconBackfiller) setMode(mode SyncMode) { return } log.Error("Downloader sync mode changed mid-run", "old", mode.String(), "new", mode.String()) - b.suspend() - b.resume() + b.Suspend() + b.Resume() } // SetBadBlockCallback sets the callback to run when a bad block is hit by the diff --git a/eth/downloader/skeleton.go b/eth/downloader/skeleton.go index dc2be0683f..ef8b30641c 100644 --- a/eth/downloader/skeleton.go +++ b/eth/downloader/skeleton.go @@ -154,13 +154,13 @@ type Backfiller interface { // // The method should return the last block header that has been successfully // backfilled, or nil if the backfiller was not resumed. - suspend() *types.Header + Suspend() *types.Header // resume requests the backfiller to start running fill or snap sync based on // the skeleton chain as it has successfully been linked. Appending new heads // to the end of the chain will not result in suspend/resume cycles. // leaking too much sync logic out to the filler. - resume() + Resume() } // skeleton represents a header chain synchronized after the merge where blocks @@ -364,10 +364,10 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) { rawdb.HasBody(s.db, s.progress.Subchains[0].Next, s.scratchHead) && rawdb.HasReceipts(s.db, s.progress.Subchains[0].Next, s.scratchHead) if linked { - s.filler.resume() + s.filler.Resume() } defer func() { - if filled := s.filler.suspend(); filled != nil { + if filled := s.filler.Suspend(); filled != nil { // If something was filled, try to delete stale sync helpers. If // unsuccessful, warn the user, but not much else we can do (it's // a programming error, just let users report an issue and don't @@ -456,7 +456,7 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) { // is still running, it will pick it up. If it already terminated, // a new cycle needs to be spun up. if linked { - s.filler.resume() + s.filler.Resume() } case req := <-requestFails: diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go index 2908175361..3be5392800 100644 --- a/eth/downloader/skeleton_test.go +++ b/eth/downloader/skeleton_test.go @@ -54,7 +54,7 @@ func newHookedBackfiller() Backfiller { // based on the skeleton chain as it might be invalid. The backfiller should // gracefully handle multiple consecutive suspends without a resume, even // on initial startup. -func (hf *hookedBackfiller) suspend() *types.Header { +func (hf *hookedBackfiller) Suspend() *types.Header { if hf.suspendHook != nil { return hf.suspendHook() } @@ -64,7 +64,7 @@ func (hf *hookedBackfiller) suspend() *types.Header { // resume requests the backfiller to start running fill or snap sync based on // the skeleton chain as it has successfully been linked. Appending new heads // to the end of the chain will not result in suspend/resume cycles. -func (hf *hookedBackfiller) resume() { +func (hf *hookedBackfiller) Resume() { if hf.resumeHook != nil { hf.resumeHook() } From 5dd7b4a7ba4a9d69d6763ff316553e1e86723171 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Wed, 22 Nov 2023 19:23:41 -0700 Subject: [PATCH 19/28] downloader/backfiller: move SetMode to public interface --- eth/downloader/beaconsync.go | 12 ++++++++++-- eth/downloader/skeleton.go | 2 ++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index 56a2aad0dd..ab2ae85950 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -120,7 +120,7 @@ func (b *beaconBackfiller) Resume() { // setMode updates the sync mode from the current one to the requested one. If // there's an active sync in progress, it will be cancelled and restarted. -func (b *beaconBackfiller) setMode(mode SyncMode) { +func (b *beaconBackfiller) SetMode(mode SyncMode) { // Update the old sync mode and track if it was changed b.lock.Lock() updated := b.syncMode != mode @@ -190,7 +190,7 @@ func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, final *types. // // Super crazy dangerous type cast. Should be fine (TM), we're only using a // different backfiller implementation for skeleton tests. - d.skeleton.filler.(*beaconBackfiller).setMode(mode) + d.skeleton.filler.SetMode(mode) // Signal the skeleton sync to switch to a new head, however it wants if err := d.skeleton.Sync(head, final, force); err != nil { @@ -280,6 +280,14 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) { return start, nil } +func (d *Downloader) SkeletonHead() (*types.Header, error) { + head, _, _, err := d.skeleton.Bounds() + if err != nil { + return nil, err + } + return head, nil +} + // fetchBeaconHeaders feeds skeleton headers to the downloader queue for scheduling // until sync errors or is finished. func (d *Downloader) fetchBeaconHeaders(from uint64) error { diff --git a/eth/downloader/skeleton.go b/eth/downloader/skeleton.go index ef8b30641c..550e732c54 100644 --- a/eth/downloader/skeleton.go +++ b/eth/downloader/skeleton.go @@ -161,6 +161,8 @@ type Backfiller interface { // to the end of the chain will not result in suspend/resume cycles. // leaking too much sync logic out to the filler. Resume() + + SetMode(mode SyncMode) } // skeleton represents a header chain synchronized after the merge where blocks From a4af6d45f0d4e4fc981d77af94fe5f6e93a07a1f Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Wed, 22 Nov 2023 19:26:23 -0700 Subject: [PATCH 20/28] arbitrum p2p: verify checkpoint block checkpoint is verified via skeleton --- arbitrum/handler_p2p.go | 176 +++++++++++++++++++++++++++++++++++----- 1 file changed, 155 insertions(+), 21 deletions(-) diff --git a/arbitrum/handler_p2p.go b/arbitrum/handler_p2p.go index 28c7e4b268..b789593e7b 100644 --- a/arbitrum/handler_p2p.go +++ b/arbitrum/handler_p2p.go @@ -23,6 +23,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" @@ -65,9 +66,15 @@ type protocolHandler struct { peersLock sync.RWMutex peers map[string]*Peer - confirmed *types.Header - checkpoint *types.Header - syncing atomic.Bool + beaconBackFiller downloader.Backfiller + + confirmed *types.Header + checkpoint *types.Header + syncedBlockNum uint64 // blocks that were synced by skeleton-downloader + syncedCond *sync.Cond + headersLock sync.RWMutex + + syncing atomic.Bool } func NewProtocolHandler(db ethdb.Database, bc *core.BlockChain, helper SyncHelper, syncing bool) *protocolHandler { @@ -79,13 +86,15 @@ func NewProtocolHandler(db ethdb.Database, bc *core.BlockChain, helper SyncHelpe helper: helper, peers: make(map[string]*Peer), } + p.syncedCond = sync.NewCond(&p.headersLock) p.syncing.Store(syncing) backfillerCreator := func(dl *downloader.Downloader) downloader.Backfiller { success := func() { p.syncing.Store(false) log.Info("DOWNLOADER DONE") } - return downloader.NewBeaconBackfiller(dl, success) + p.beaconBackFiller = downloader.NewBeaconBackfiller(dl, success) + return (*filler)(p) } p.downloader = downloader.New(db, evMux, bc, nil, p.peerDrop, backfillerCreator) return p @@ -110,6 +119,18 @@ func (h *protocolHandler) getCreatePeer(id string) *Peer { return peer } +func (h *protocolHandler) waitBlockSync(num uint64) error { + h.headersLock.Lock() + defer h.headersLock.Unlock() + for { + if h.syncedBlockNum >= num { + break + } + h.syncedCond.Wait() + } + return nil +} + func (h *protocolHandler) getRemovePeer(id string) *Peer { h.peersLock.Lock() defer h.peersLock.Unlock() @@ -151,6 +172,99 @@ func (h *protocolHandler) peerDrop(id string) { } } +func (h *protocolHandler) getHeaders() (*types.Header, *types.Header) { + h.peersLock.RLock() + defer h.peersLock.RUnlock() + return h.checkpoint, h.confirmed +} + +func (h *protocolHandler) advanceCheckpoint(checkpoint *types.Header) { + h.peersLock.Lock() + defer h.peersLock.Unlock() + if h.checkpoint != nil { + compare := h.checkpoint.Number.Cmp(checkpoint.Number) + if compare > 0 { + return + } + if compare == 0 { + if h.checkpoint.Hash() != checkpoint.Hash() { + log.Error("arbitrum_p2p: hash for checkpoint changed", "number", checkpoint.Number, "old", h.checkpoint.Hash(), "new", checkpoint.Hash()) + } else { + return + } + } + } + if h.confirmed == nil || checkpoint.Number.Cmp(h.confirmed.Number) > 0 { + confirmedNum := common.Big0 + if h.confirmed != nil { + confirmedNum = h.confirmed.Number + } + log.Error("arbitrum_p2p: trying to move checkpont ahead of confirmed", "number", checkpoint.Number, "confirmed", confirmedNum) + return + } + h.checkpoint = checkpoint + log.Info("arbitrum_p2p: checkpoint", "number", checkpoint.Number, "hash", checkpoint.Hash()) + h.downloader.PivotSync(h.confirmed, h.checkpoint) +} + +func (h *protocolHandler) advanceConfirmed(confirmed *types.Header) { + h.peersLock.Lock() + defer h.peersLock.Unlock() + if h.confirmed != nil { + compare := h.confirmed.Number.Cmp(confirmed.Number) + if compare > 0 { + return + } + if compare == 0 { + if h.confirmed.Hash() != confirmed.Hash() { + log.Error("arbitrum_p2p: hash for confirmed changed", "number", confirmed.Number, "old", h.confirmed.Hash(), "new", confirmed.Hash()) + } else { + return + } + } + } + h.confirmed = confirmed + log.Info("arbitrum_p2p: confirmed", "number", confirmed.Number, "hash", confirmed.Hash()) + h.downloader.PivotSync(h.confirmed, h.checkpoint) +} + +type filler protocolHandler + +func (h *filler) Suspend() *types.Header { + h.headersLock.Lock() + defer h.headersLock.Unlock() + if h.syncedBlockNum > 0 && h.syncing.Load() { + log.Warn("arbitrum_p2p: suspend while syncing", "head", h.syncedBlockNum) + } + return h.beaconBackFiller.Suspend() +} + +func (h *filler) Resume() { + defer h.beaconBackFiller.Resume() + head, err := h.downloader.SkeletonHead() + if err != nil || head == nil { + log.Error("arbitrum_p2p: error from SkeletonHead", "err", err) + return + } + if !head.Number.IsUint64() { + log.Error("arbitrum_p2p: syncedBlockNum bad number", "num", head.Number) + return + } + h.headersLock.Lock() + if h.confirmed.Number.Cmp(head.Number) < 0 { + // confirmed only moves forward and is used as head for sync.. somerthing bad already happened + log.Error("arbitrum_p2p: skeleton head ahead of confirmed", "skeleton", head.Number, "confirmed", h.confirmed.Number) + } + h.syncedBlockNum = head.Number.Uint64() + h.syncedCond.Broadcast() + h.headersLock.Unlock() + log.Trace("arbitrum_p2p: resume", "skeletonhead", h.syncedBlockNum) +} + +func (h *filler) SetMode(mode downloader.SyncMode) { + h.beaconBackFiller.SetMode(mode) +} + type arbHandler protocolHandler func (h *arbHandler) PeerInfo(id enode.ID) interface{} { @@ -158,13 +272,14 @@ func (h *arbHandler) PeerInfo(id enode.ID) interface{} { } func (h *arbHandler) HandleLastConfirmed(peer *arb.Peer, confirmed *types.Header, node uint64) { - // TODO: validate confirmed + protoHandler := (*protocolHandler)(h) validated := false valid := false - if h.confirmed != nil { - if confirmed.Number.Cmp(h.confirmed.Number) == 0 { + current, _ := protoHandler.getHeaders() + if current != nil { + if confirmed.Number.Cmp(current.Number) == 0 { validated = true - valid = h.confirmed.Hash() == confirmed.Hash() + valid = current.Hash() == confirmed.Hash() } } if !validated { @@ -176,37 +291,56 @@ func (h *arbHandler) HandleLastConfirmed(peer *arb.Peer, confirmed *types.Header } } if !valid { - (*protocolHandler)(h).peerDrop(peer.ID()) + protoHandler.peerDrop(peer.ID()) return } - hPeer := (*protocolHandler)(h).getPeer(peer.ID()) + hPeer := protoHandler.getPeer(peer.ID()) if hPeer == nil { log.Warn("hPeer not found on HandleLastConfirmed") return } peer.RequestCheckpoint(nil) - h.confirmed = confirmed - log.Info("lastconfirmed", "confirmed", h.confirmed, "checkpoint", "h.checkpoint") - h.downloader.PivotSync(h.confirmed, h.checkpoint) + protoHandler.advanceConfirmed(confirmed) } func (h *arbHandler) HandleCheckpoint(peer *arb.Peer, checkpoint *types.Header, supported bool) { + protoHandler := (*protocolHandler)(h) log.Error("got checkpoint", "from", peer.ID(), "checkpoint", checkpoint, "supported", supported) if !supported { return } - if h.checkpoint != nil && h.checkpoint.Number.Uint64() > checkpoint.Number.Uint64() { + if !h.syncing.Load() { return } - // TODO: confirm - // TODO: advance? - hPeer := (*protocolHandler)(h).getPeer(peer.ID()) - if hPeer == nil { - log.Warn("hPeer not found on HandleLastConfirmed") + if !checkpoint.Number.IsUint64() { + log.Warn("got bad header from peer - number not uint64", "peer", peer.ID()) + protoHandler.peerDrop(peer.ID()) return } - h.checkpoint = checkpoint - h.downloader.PivotSync(h.confirmed, h.checkpoint) + number := checkpoint.Number.Uint64() + log.Info("handler_p2p: handle checkpoint - before", "peer", peer.ID()) + protoHandler.waitBlockSync(number) + log.Info("handler_p2p: handle checkpoint - after", "peer", peer.ID()) + if !h.syncing.Load() { + return + } + canonical := rawdb.ReadCanonicalHash(h.db, number) + if canonical == (common.Hash{}) { + skeleton := rawdb.ReadSkeletonHeader(h.db, number) + if skeleton == nil { + log.Error("arbitrum handler_p2p: canonical not found", "number", number, "peer", peer.ID()) + } + canonical = skeleton.Hash() + } + if canonical == (common.Hash{}) { + log.Error("arbitrum handler_p2p: did not find a canonical hash", "number", number, "peer", peer.ID()) + } + if canonical != checkpoint.Hash() { + log.Warn("got bad header from peer - bad hash", "peer", peer.ID(), "number", number, "expected", canonical, "peer", checkpoint.Hash()) + protoHandler.peerDrop(peer.ID()) + return + } + protoHandler.advanceCheckpoint(checkpoint) } func (h *arbHandler) LastConfirmed() (*types.Header, uint64, error) { From c49e6ae982bb5d2c2bcc28863eb167de42ec2091 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Wed, 22 Nov 2023 19:26:50 -0700 Subject: [PATCH 21/28] sync_test: bad node publishes correct validated --- arbitrum/sync_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arbitrum/sync_test.go b/arbitrum/sync_test.go index 8880c5a36c..194577fafb 100644 --- a/arbitrum/sync_test.go +++ b/arbitrum/sync_test.go @@ -307,7 +307,7 @@ func TestSimpleSync(t *testing.T) { if _, err := badChain.InsertChain(badBlocks[pivotBlockNum:]); err != nil { t.Fatal(err) } - badHandler := NewProtocolHandler(badDb, badChain, &dummySyncHelper{badBlocks[syncBlockNum-1].Header(), badBlocks[pivotBlockNum-1].Header()}, false) + badHandler := NewProtocolHandler(badDb, badChain, &dummySyncHelper{blocks[syncBlockNum-1].Header(), badBlocks[pivotBlockNum-1].Header()}, false) badStack.RegisterProtocols(badHandler.MakeProtocols(&dummyIterator{})) badStack.Start() From e2d158322ee24e57f13faabbf5fbca39c06325b6 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Wed, 22 Nov 2023 19:35:34 -0700 Subject: [PATCH 22/28] eth/skeleton_test: fix --- eth/downloader/skeleton_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go index 3be5392800..52ab540ad9 100644 --- a/eth/downloader/skeleton_test.go +++ b/eth/downloader/skeleton_test.go @@ -70,6 +70,8 @@ func (hf *hookedBackfiller) Resume() { } } +func (hf *hookedBackfiller) SetMode(SyncMode) {} + // skeletonTestPeer is a mock peer that can only serve header requests from a // pre-perated header chain (which may be arbitrarily wrong for testing). // From 15e55cf0dfacd7da5a026633f0cd6c48dffbc14e Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Mon, 27 Nov 2023 12:28:38 -0700 Subject: [PATCH 23/28] arb handler_p2p: update for geth 1.12.2 --- arbitrum/handler_p2p.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/arbitrum/handler_p2p.go b/arbitrum/handler_p2p.go index b789593e7b..cca9296b06 100644 --- a/arbitrum/handler_p2p.go +++ b/arbitrum/handler_p2p.go @@ -25,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/snapshot" + "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/protocols/arb" @@ -382,7 +383,7 @@ func (h *ethHandler) Chain() *core.BlockChain { return h.chain } type dummyTxPool struct{} -func (d *dummyTxPool) Get(hash common.Hash) *types.Transaction { +func (d *dummyTxPool) Get(hash common.Hash) *txpool.Transaction { return nil } @@ -480,7 +481,11 @@ func (h *snapHandler) AccountIterator(root, account common.Hash) (snapshot.Accou log.Error("Failed to open trie", "root", root, "err", err) return nil, err } - accIter := t.NodeIterator(account[:]) + accIter, err := t.NodeIterator(account[:]) + if err != nil { + log.Error("Failed to open nodeIterator for trie", "root", root, "err", err) + return nil, err + } return trieAccountIterator{trieIteratorWrapper{ iter: trie.NewIterator((accIter)), }}, nil @@ -521,8 +526,13 @@ func (h *snapHandler) StorageIterator(root, account, origin common.Hash) (snapsh log.Error("Failed to open storage trie", "root", acc.Root, "err", err) return nil, err } + nodeIter, err := storageTrie.NodeIterator(origin[:]) + if err != nil { + log.Error("Failed node iterator to open storage trie", "root", acc.Root, "err", err) + return nil, err + } return trieStoreageIterator{trieIteratorWrapper{ - iter: trie.NewIterator(storageTrie.NodeIterator(origin[:])), + iter: trie.NewIterator(nodeIter), }}, nil } From bbc51307c86b1e3731337c9edd132a0d408243fb Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Mon, 27 Nov 2023 12:28:59 -0700 Subject: [PATCH 24/28] arb sync_test: update for geth 1.12.2 --- arbitrum/sync_test.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/arbitrum/sync_test.go b/arbitrum/sync_test.go index 194577fafb..62168c00fc 100644 --- a/arbitrum/sync_test.go +++ b/arbitrum/sync_test.go @@ -142,7 +142,8 @@ func TestSimpleSync(t *testing.T) { // source node sourceStackConf := node.DefaultConfig sourceStackConf.DataDir = t.TempDir() - sourceStackConf.P2P.NoDiscovery = true + sourceStackConf.P2P.DiscoveryV4 = false + sourceStackConf.P2P.DiscoveryV5 = false sourceStackConf.P2P.ListenAddr = "127.0.0.1:0" sourceStackConf.P2P.PrivateKey = sourceKey @@ -276,7 +277,9 @@ func TestSimpleSync(t *testing.T) { // source node sourceHandler := NewProtocolHandler(sourceDb, sourceChain, &dummySyncHelper{syncBlock.Header(), pivotBlock.Header()}, false) sourceStack.RegisterProtocols(sourceHandler.MakeProtocols(&dummyIterator{})) - sourceStack.Start() + if err := sourceStack.Start(); err != nil { + t.Fatal(err) + } // bad node (on wrong blockchain) _, badBlocks, _ := core.GenerateChainWithGenesis(gspec, ethash.NewFaker(), syncBlockNum+extraBlocks, func(i int, gen *core.BlockGen) { @@ -309,7 +312,9 @@ func TestSimpleSync(t *testing.T) { } badHandler := NewProtocolHandler(badDb, badChain, &dummySyncHelper{blocks[syncBlockNum-1].Header(), badBlocks[pivotBlockNum-1].Header()}, false) badStack.RegisterProtocols(badHandler.MakeProtocols(&dummyIterator{})) - badStack.Start() + if err := badStack.Start(); err != nil { + t.Fatal(err) + } // figure out port of the source node and create dummy iter that points to it sourcePort, err := portFromAddress(sourceStack.Server().Config.ListenAddr) @@ -348,7 +353,9 @@ func TestSimpleSync(t *testing.T) { log.Info("initial source", "head", sourceChain.CurrentBlock()) log.Info("initial dest", "head", destChain.CurrentBlock()) log.Info("pivot", "head", pivotBlock.Header()) - destStack.Start() + if err := destStack.Start(); err != nil { + t.Fatal(err) + } <-time.After(time.Second * 5) From 5e224f97772c3bfedceba29a1d29061e0f3d2146 Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Thu, 16 May 2024 12:55:18 +0200 Subject: [PATCH 25/28] fix skeleton merge --- eth/downloader/skeleton.go | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/eth/downloader/skeleton.go b/eth/downloader/skeleton.go index de1b396e5d..0d7d68ddd7 100644 --- a/eth/downloader/skeleton.go +++ b/eth/downloader/skeleton.go @@ -384,17 +384,7 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) { done := make(chan struct{}) go func() { defer close(done) -<<<<<<< HEAD - if filled := s.filler.Suspend(); filled != nil { - // If something was filled, try to delete stale sync helpers. If - // unsuccessful, warn the user, but not much else we can do (it's - // a programming error, just let users report an issue and don't - // choke in the meantime). - if err := s.cleanStales(filled); err != nil { - log.Error("Failed to clean stale beacon headers", "err", err) - } -======= - filled := s.filler.suspend() + filled := s.filler.Suspend() if filled == nil { log.Error("Latest filled block is not available") return @@ -405,7 +395,6 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) { // choke in the meantime). if err := s.cleanStales(filled); err != nil { log.Error("Failed to clean stale beacon headers", "err", err) ->>>>>>> master } }() // Wait for the suspend to finish, consuming head events in the meantime From c7e362f31aba980efaaa98061214e0161358f315 Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Thu, 16 May 2024 13:36:19 +0000 Subject: [PATCH 26/28] use triedb config when creating trieIteratorWrapper, close triedb on release --- arbitrum/handler_p2p.go | 23 +++++++++++------------ core/blockchain.go | 9 +++++++++ 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/arbitrum/handler_p2p.go b/arbitrum/handler_p2p.go index cca9296b06..44e12b8dd4 100644 --- a/arbitrum/handler_p2p.go +++ b/arbitrum/handler_p2p.go @@ -25,7 +25,6 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/snapshot" - "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/protocols/arb" @@ -383,7 +382,7 @@ func (h *ethHandler) Chain() *core.BlockChain { return h.chain } type dummyTxPool struct{} -func (d *dummyTxPool) Get(hash common.Hash) *txpool.Transaction { +func (d *dummyTxPool) Get(hash common.Hash) *types.Transaction { return nil } @@ -429,10 +428,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { case *eth.NewBlockPacket: return fmt.Errorf("unexpected eth packet type for nitro: %T", packet) - case *eth.NewPooledTransactionHashesPacket66: - return fmt.Errorf("unexpected eth packet type for nitro: %T", packet) - - case *eth.NewPooledTransactionHashesPacket68: + case *eth.NewPooledTransactionHashesPacket: return fmt.Errorf("unexpected eth packet type for nitro: %T", packet) case *eth.TransactionsPacket: @@ -460,13 +456,14 @@ func (h *snapHandler) Snapshot(root common.Hash) snapshot.Snapshot { } type trieIteratorWrapper struct { - iter *trie.Iterator + iter *trie.Iterator + triedb *trie.Database } func (i trieIteratorWrapper) Next() bool { return i.iter.Next() } func (i trieIteratorWrapper) Error() error { return i.iter.Err } func (i trieIteratorWrapper) Hash() common.Hash { return common.BytesToHash(i.iter.Key) } -func (i trieIteratorWrapper) Release() {} +func (i trieIteratorWrapper) Release() { i.triedb.Close() } type trieAccountIterator struct { trieIteratorWrapper @@ -475,7 +472,7 @@ type trieAccountIterator struct { func (i trieAccountIterator) Account() []byte { return i.iter.Value } func (h *snapHandler) AccountIterator(root, account common.Hash) (snapshot.AccountIterator, error) { - triedb := trie.NewDatabase(h.db) + triedb := trie.NewDatabase(h.db, h.chain.CacheConfig().TriedbConfig()) t, err := trie.NewStateTrie(trie.StateTrieID(root), triedb) if err != nil { log.Error("Failed to open trie", "root", root, "err", err) @@ -487,7 +484,8 @@ func (h *snapHandler) AccountIterator(root, account common.Hash) (snapshot.Accou return nil, err } return trieAccountIterator{trieIteratorWrapper{ - iter: trie.NewIterator((accIter)), + iter: trie.NewIterator((accIter)), + triedb: triedb, }}, nil } @@ -506,7 +504,7 @@ func (i nilStoreageIterator) Release() {} func (i nilStoreageIterator) Slot() []byte { return nil } func (h *snapHandler) StorageIterator(root, account, origin common.Hash) (snapshot.StorageIterator, error) { - triedb := trie.NewDatabase(h.db) + triedb := trie.NewDatabase(h.db, h.chain.CacheConfig().TriedbConfig()) t, err := trie.NewStateTrie(trie.StateTrieID(root), triedb) if err != nil { log.Error("Failed to open trie", "root", root, "err", err) @@ -532,7 +530,8 @@ func (h *snapHandler) StorageIterator(root, account, origin common.Hash) (snapsh return nil, err } return trieStoreageIterator{trieIteratorWrapper{ - iter: trie.NewIterator(nodeIter), + iter: trie.NewIterator(nodeIter), + triedb: triedb, }}, nil } diff --git a/core/blockchain.go b/core/blockchain.go index 98b5cbc55c..35b5394676 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -176,6 +176,11 @@ func (c *CacheConfig) triedbConfig() *trie.Config { return config } +// arbitrum: expose triedbConfig +func (c *CacheConfig) TriedbConfig() *trie.Config { + return c.triedbConfig() +} + // defaultCacheConfig are the default caching values if none are specified by the // user (also used during testing). var defaultCacheConfig = &CacheConfig{ @@ -2579,3 +2584,7 @@ func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) { func (bc *BlockChain) GetTrieFlushInterval() time.Duration { return time.Duration(bc.flushInterval.Load()) } + +func (bc *BlockChain) CacheConfig() *CacheConfig { + return bc.cacheConfig +} From 44bfcdf31f006cbbf1e742e55b7027021675f38f Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Thu, 16 May 2024 15:05:52 +0000 Subject: [PATCH 27/28] use new log api in sync_test --- arbitrum/sync_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/arbitrum/sync_test.go b/arbitrum/sync_test.go index 62168c00fc..15838b3b76 100644 --- a/arbitrum/sync_test.go +++ b/arbitrum/sync_test.go @@ -117,9 +117,7 @@ func TestSimpleSync(t *testing.T) { const syncBlockNum = 70 const extraBlocks = 200 - glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.TerminalFormat(false))) - glogger.Verbosity(log.LvlTrace) - log.Root().SetHandler(glogger) + log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, false))) // key for source node p2p sourceKey, err := crypto.GenerateKey() From b78a746f7adbc55fda727f5f8876589a5311f820 Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Thu, 6 Jun 2024 17:30:49 -0600 Subject: [PATCH 28/28] handler_p2p: address comments --- arbitrum/handler_p2p.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/arbitrum/handler_p2p.go b/arbitrum/handler_p2p.go index 44e12b8dd4..9a6146b35e 100644 --- a/arbitrum/handler_p2p.go +++ b/arbitrum/handler_p2p.go @@ -157,7 +157,7 @@ func (h *protocolHandler) peerDrop(id string) { defer hPeer.mutex.Unlock() hPeer.arb = nil if hPeer.eth != nil { - hPeer.eth.Disconnect(p2p.DiscSelf) + hPeer.eth.Disconnect(p2p.DiscUselessPeer) err := h.downloader.UnregisterPeer(id) if err != nil { log.Warn("failed deregistering peer from downloader", "err", err) @@ -329,12 +329,10 @@ func (h *arbHandler) HandleCheckpoint(peer *arb.Peer, checkpoint *types.Header, skeleton := rawdb.ReadSkeletonHeader(h.db, number) if skeleton == nil { log.Error("arbitrum handler_p2p: canonical not found", "number", number, "peer", peer.ID()) + return } canonical = skeleton.Hash() } - if canonical == (common.Hash{}) { - log.Error("arbitrum handler_p2p: did not find a canonical hash", "number", number, "peer", peer.ID()) - } if canonical != checkpoint.Hash() { log.Warn("got bad header from peer - bad hash", "peer", peer.ID(), "number", number, "expected", canonical, "peer", checkpoint.Hash()) protoHandler.peerDrop(peer.ID()) @@ -526,7 +524,7 @@ func (h *snapHandler) StorageIterator(root, account, origin common.Hash) (snapsh } nodeIter, err := storageTrie.NodeIterator(origin[:]) if err != nil { - log.Error("Failed node iterator to open storage trie", "root", acc.Root, "err", err) + log.Error("Failed creating node iterator to open storage trie", "root", acc.Root, "err", err) return nil, err } return trieStoreageIterator{trieIteratorWrapper{