From 6cf6978f0235e9616435da8a6d5da1b2f80e6b2f Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 12 Mar 2024 12:22:45 +0800 Subject: [PATCH] feat(blocksync): sort peers by download rate & multiple requests for closer blocks (backport #2475) (#2576) This is an automatic backport of pull request #2475 done by [Mergify](https://mergify.com). Cherry-pick of f8366fc4290e7ab5109c8943dfadf9d75c6ca2f0 has failed: ``` On branch mergify/bp/v0.38.x/pr-2475 Your branch is up to date with 'origin/v0.38.x'. You are currently cherry-picking commit f8366fc42. (fix conflicts and run "git cherry-pick --continue") (use "git cherry-pick --skip" to skip this patch) (use "git cherry-pick --abort" to cancel the cherry-pick operation) Changes to be committed: new file: .changelog/unreleased/improvements/2475-blocksync-2nd-request.md new file: .changelog/unreleased/improvements/2475-blocksync-no-block-response.md new file: .changelog/unreleased/improvements/2475-blocksync-sort-peers.md modified: blocksync/reactor.go Unmerged paths: (use "git add ..." to mark resolution) both modified: blocksync/pool.go ``` To fix up this pull request, you can check it out locally. See documentation: https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/reviewing-changes-in-pull-requests/checking-out-pull-requests-locally ---
Mergify commands and options
More conditions and actions can be found in the [documentation](https://docs.mergify.com/). You can also trigger Mergify actions by commenting on this pull request: - `@Mergifyio refresh` will re-evaluate the rules - `@Mergifyio rebase` will rebase this PR on its base branch - `@Mergifyio update` will merge the base branch into this PR - `@Mergifyio backport ` will backport this PR on `` branch Additionally, on Mergify [dashboard](https://dashboard.mergify.com) you can: - look at your merge queues - generate the Mergify configuration with the config editor. Finally, you can contact us on https://mergify.com
--------- Co-authored-by: Anton Kaliaev --- .../2475-blocksync-2nd-request.md | 3 + .../2475-blocksync-no-block-response.md | 3 + .../improvements/2475-blocksync-sort-peers.md | 2 + blocksync/pool.go | 411 +++++++++++++----- blocksync/reactor.go | 7 +- 5 files changed, 307 insertions(+), 119 deletions(-) create mode 100644 .changelog/unreleased/improvements/2475-blocksync-2nd-request.md create mode 100644 .changelog/unreleased/improvements/2475-blocksync-no-block-response.md create mode 100644 .changelog/unreleased/improvements/2475-blocksync-sort-peers.md diff --git a/.changelog/unreleased/improvements/2475-blocksync-2nd-request.md b/.changelog/unreleased/improvements/2475-blocksync-2nd-request.md new file mode 100644 index 00000000000..67614a8e35f --- /dev/null +++ b/.changelog/unreleased/improvements/2475-blocksync-2nd-request.md @@ -0,0 +1,3 @@ +- `[blocksync]` Request a block from peer B if we are approaching pool's height + (less than 50 blocks) and the current peer A is slow in sending us the + block [\#2475](https://github.com/cometbft/cometbft/pull/2475) diff --git a/.changelog/unreleased/improvements/2475-blocksync-no-block-response.md b/.changelog/unreleased/improvements/2475-blocksync-no-block-response.md new file mode 100644 index 00000000000..d01b3679866 --- /dev/null +++ b/.changelog/unreleased/improvements/2475-blocksync-no-block-response.md @@ -0,0 +1,3 @@ +- `[blocksync]` Request the block N from peer B immediately after getting + `NoBlockResponse` from peer A + [\#2475](https://github.com/cometbft/cometbft/pull/2475) diff --git a/.changelog/unreleased/improvements/2475-blocksync-sort-peers.md b/.changelog/unreleased/improvements/2475-blocksync-sort-peers.md new file mode 100644 index 00000000000..5c544401ba6 --- /dev/null +++ b/.changelog/unreleased/improvements/2475-blocksync-sort-peers.md @@ -0,0 +1,2 @@ +- `[blocksync]` Sort peers by download rate (the fastest peer is picked first) + [\#2475](https://github.com/cometbft/cometbft/pull/2475) diff --git a/blocksync/pool.go b/blocksync/pool.go index bf8ceab97c6..519be6f2741 100644 --- a/blocksync/pool.go +++ b/blocksync/pool.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "math" + "sort" "sync/atomic" "time" @@ -36,9 +37,20 @@ const ( // enough. If a peer is not sending us data at at least that rate, we // consider them to have timedout and we disconnect. // - // Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s, - // sending data across atlantic ~ 7.5 KB/s. - minRecvRate = 7680 + // Based on the experiments with [Osmosis](https://osmosis.zone/), the + // minimum rate could be as high as 500 KB/s. However, we're setting it to + // 128 KB/s for now to be conservative. + minRecvRate = 128 * 1024 // 128 KB/s + + // peerConnWait is the time that must have elapsed since the pool routine + // was created before we start making requests. This is to give the peer + // routine time to connect to peers. + peerConnWait = 3 * time.Second + + // If we're within minBlocksForSingleRequest blocks of the pool's height, we + // send 2 parallel requests to 2 peers for the same block. If we're further + // away, we send a single request. + minBlocksForSingleRequest = 50 ) var peerTimeout = 15 * time.Second // not const so we can override with tests @@ -57,7 +69,8 @@ var peerTimeout = 15 * time.Second // not const so we can override with tests // BlockPool keeps track of the block sync peers, block requests and block responses. type BlockPool struct { service.BaseService - startTime time.Time + startTime time.Time + startHeight int64 mtx cmtsync.Mutex // block requests @@ -65,7 +78,8 @@ type BlockPool struct { height int64 // the lowest key in requesters. // peers peers map[p2p.ID]*bpPeer - maxPeerHeight int64 // the biggest reported height + sortedPeers []*bpPeer // sorted by curRate, highest first + maxPeerHeight int64 // the biggest reported height // atomic numPending int32 // number of requests pending assignment or block response @@ -80,9 +94,10 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p bp := &BlockPool{ peers: make(map[p2p.ID]*bpPeer), - requesters: make(map[int64]*bpRequester), - height: start, - numPending: 0, + requesters: make(map[int64]*bpRequester), + height: start, + startHeight: start, + numPending: 0, requestsCh: requestsCh, errorsCh: errorsCh, @@ -94,8 +109,8 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p // OnStart implements service.Service by spawning requesters routine and recording // pool's start time. func (pool *BlockPool) OnStart() error { - go pool.makeRequestersRoutine() pool.startTime = time.Now() + go pool.makeRequestersRoutine() return nil } @@ -106,6 +121,14 @@ func (pool *BlockPool) makeRequestersRoutine() { return } + // Check if we are within peerConnWait seconds of start time + // This gives us some time to connect to peers before starting a wave of requests + if time.Since(pool.startTime) < peerConnWait { + // Calculate the duration to sleep until peerConnWait seconds have passed since pool.startTime + sleepDuration := peerConnWait - time.Since(pool.startTime) + time.Sleep(sleepDuration) + } + pool.mtx.Lock() var ( maxRequestersCreated = len(pool.requesters) >= len(pool.peers)*maxPendingRequestsPerPeer @@ -124,6 +147,8 @@ func (pool *BlockPool) makeRequestersRoutine() { default: // request for more blocks. pool.makeNextRequester(nextHeight) + // Sleep for a bit to make the requests more ordered. + time.Sleep(requestIntervalMS * time.Millisecond) } } } @@ -145,11 +170,16 @@ func (pool *BlockPool) removeTimedoutPeers() { "minRate", fmt.Sprintf("%d KB/s", minRecvRate/1024)) peer.didTimeout = true } + + peer.curRate = curRate } + if peer.didTimeout { pool.removePeer(peer.id) } } + + pool.sortPeers() } // GetStatus returns pool's height, numPending requests and the number of @@ -205,45 +235,61 @@ func (pool *BlockPool) PeekTwoBlocks() (first, second *types.Block, firstExtComm return } -// PopRequest pops the first block at pool.height. -// It must have been validated by the second Commit from PeekTwoBlocks. -// TODO(thane): (?) and its corresponding ExtendedCommit. +// PopRequest removes the requester at pool.height and increments pool.height. func (pool *BlockPool) PopRequest() { pool.mtx.Lock() defer pool.mtx.Unlock() - if r := pool.requesters[pool.height]; r != nil { - /* The block can disappear at any time, due to removePeer(). - if r := pool.requesters[pool.height]; r == nil || r.block == nil { - PanicSanity("PopRequest() requires a valid block") - } - */ - if err := r.Stop(); err != nil { - pool.Logger.Error("Error stopping requester", "err", err) - } - delete(pool.requesters, pool.height) - pool.height++ - } else { + r := pool.requesters[pool.height] + if r == nil { panic(fmt.Sprintf("Expected requester to pop, got nothing at height %v", pool.height)) } + + if err := r.Stop(); err != nil { + pool.Logger.Error("Error stopping requester", "err", err) + } + delete(pool.requesters, pool.height) + pool.height++ + + // Notify the next minBlocksForSingleRequest requesters about new height, so + // they can potentially request a block from the second peer. + for i := int64(0); i < minBlocksForSingleRequest && i < int64(len(pool.requesters)); i++ { + pool.requesters[pool.height+i].newHeight(pool.height) + } } -// RedoRequest invalidates the block at pool.height, -// Remove the peer and redo request from others. +// RemovePeerAndRedoAllPeerRequests retries the request at the given height and +// all the requests made to the same peer. The peer is removed from the pool. // Returns the ID of the removed peer. -func (pool *BlockPool) RedoRequest(height int64) p2p.ID { +func (pool *BlockPool) RemovePeerAndRedoAllPeerRequests(height int64) p2p.ID { pool.mtx.Lock() defer pool.mtx.Unlock() request := pool.requesters[height] - peerID := request.getPeerID() - if peerID != p2p.ID("") { - // RemovePeer will redo all requesters associated with this peer. - pool.removePeer(peerID) - } + peerID := request.gotBlockFromPeerID() + // RemovePeer will redo all requesters associated with this peer. + pool.removePeer(peerID) return peerID } +// RedoRequestFrom retries the request at the given height. It does not remove the +// peer. +func (pool *BlockPool) RedoRequestFrom(height int64, peerID p2p.ID) { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + if requester, ok := pool.requesters[height]; ok { // If we requested this block + if requester.didRequestFrom(peerID) { // From this specific peer + requester.redo(peerID) + } + } +} + +// Deprecated: use RemovePeerAndRedoAllPeerRequests instead. +func (pool *BlockPool) RedoRequest(height int64) p2p.ID { + return pool.RemovePeerAndRedoAllPeerRequests(height) +} + // AddBlock validates that the block comes from the peer it was expected from // and calls the requester to store it. // @@ -258,45 +304,50 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, extCommit *ty defer pool.mtx.Unlock() if extCommit != nil && block.Height != extCommit.Height { - return fmt.Errorf("heights don't match, not adding block (block height: %d, commit height: %d)", block.Height, extCommit.Height) + err := fmt.Errorf("block height %d != extCommit height %d", block.Height, extCommit.Height) + // Peer sent us an invalid block => remove it. + pool.sendError(err, peerID) + return err } requester := pool.requesters[block.Height] if requester == nil { - pool.Logger.Info( - "peer sent us a block we didn't expect", - "peer", - peerID, - "curHeight", - pool.height, - "blockHeight", - block.Height) - diff := pool.height - block.Height - if diff < 0 { - diff *= -1 - } - const maxDiff = 100 // maximum difference between current and received block height - if diff > maxDiff { - pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID) + // Because we're issuing 2nd requests for closer blocks, it's possible to + // receive a block we've already processed from a second peer. Hence, we + // can't punish it. But if the peer sent us a block we clearly didn't + // request, we disconnect. + if block.Height > pool.height || block.Height < pool.startHeight { + err := fmt.Errorf("peer sent us block #%d we didn't expect (current height: %d, start height: %d)", + block.Height, pool.height, pool.startHeight) + pool.sendError(err, peerID) + return err } - return fmt.Errorf("peer sent us a block we didn't expect (peer: %s, current height: %d, block height: %d)", peerID, pool.height, block.Height) + + return fmt.Errorf("got an already committed block #%d (possibly from the slow peer %s)", block.Height, peerID) } - if requester.setBlock(block, extCommit, peerID) { - atomic.AddInt32(&pool.numPending, -1) - peer := pool.peers[peerID] - if peer != nil { - peer.decrPending(blockSize) - } - } else { - err := errors.New("requester is different or block already exists") + if !requester.setBlock(block, extCommit, peerID) { + err := fmt.Errorf("requested block #%d from %v, not %s", block.Height, requester.requestedFrom(), peerID) pool.sendError(err, peerID) - return fmt.Errorf("%w (peer: %s, requester: %s, block height: %d)", err, peerID, requester.getPeerID(), block.Height) + return err + } + + atomic.AddInt32(&pool.numPending, -1) + peer := pool.peers[peerID] + if peer != nil { + peer.decrPending(blockSize) } return nil } +// Height returns the pool's height. +func (pool *BlockPool) Height() int64 { + pool.mtx.Lock() + defer pool.mtx.Unlock() + return pool.height +} + // MaxPeerHeight returns the highest reported height. func (pool *BlockPool) MaxPeerHeight() int64 { pool.mtx.Lock() @@ -317,6 +368,9 @@ func (pool *BlockPool) SetPeerRange(peerID p2p.ID, base int64, height int64) { peer = newBPPeer(pool, peerID, base, height) peer.setLogger(pool.Logger.With("peer", peerID)) pool.peers[peerID] = peer + // no need to sort because curRate is 0 at start. + // just add to the beginning so it's picked first by pickIncrAvailablePeer. + pool.sortedPeers = append([]*bpPeer{peer}, pool.sortedPeers...) } if height > pool.maxPeerHeight { @@ -335,7 +389,7 @@ func (pool *BlockPool) RemovePeer(peerID p2p.ID) { func (pool *BlockPool) removePeer(peerID p2p.ID) { for _, requester := range pool.requesters { - if requester.getPeerID() == peerID { + if requester.didRequestFrom(peerID) { requester.redo(peerID) } } @@ -347,6 +401,12 @@ func (pool *BlockPool) removePeer(peerID p2p.ID) { } delete(pool.peers, peerID) + for i, p := range pool.sortedPeers { + if p.id == peerID { + pool.sortedPeers = append(pool.sortedPeers[:i], pool.sortedPeers[i+1:]...) + break + } + } // Find a new peer with the biggest height and update maxPeerHeight if the // peer's height was the biggest. @@ -369,11 +429,14 @@ func (pool *BlockPool) updateMaxPeerHeight() { // Pick an available peer with the given height available. // If no peers are available, returns nil. -func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { +func (pool *BlockPool) pickIncrAvailablePeer(height int64, excludePeerID p2p.ID) *bpPeer { pool.mtx.Lock() defer pool.mtx.Unlock() - for _, peer := range pool.peers { + for _, peer := range pool.sortedPeers { + if peer.id == excludePeerID { + continue + } if peer.didTimeout { pool.removePeer(peer.id) continue @@ -387,9 +450,19 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { peer.incrPending() return peer } + return nil } +// Sort peers by curRate, highest first. +// +// CONTRACT: pool.mtx must be locked. +func (pool *BlockPool) sortPeers() { + sort.Slice(pool.sortedPeers, func(i, j int) bool { + return pool.sortedPeers[i].curRate > pool.sortedPeers[j].curRate + }) +} + func (pool *BlockPool) makeNextRequester(nextHeight int64) { pool.mtx.Lock() defer pool.mtx.Unlock() @@ -443,6 +516,7 @@ func (pool *BlockPool) debug() string { type bpPeer struct { didTimeout bool + curRate int64 numPending int32 height int64 base int64 @@ -515,28 +589,42 @@ func (peer *bpPeer) onTimeout() { //------------------------------------- +// bpRequester requests a block from a peer. +// +// If the height is within minBlocksForSingleRequest blocks of the pool's +// height, it will send an additional request to another peer. This is to avoid +// a situation where blocksync is stuck because of a single slow peer. Note +// that it's okay to send a single request when the requested height is far +// from the pool's height. If the peer is slow, it will timeout and be replaced +// with another peer. type bpRequester struct { service.BaseService - pool *BlockPool - height int64 - gotBlockCh chan struct{} - redoCh chan p2p.ID // redo may send multitime, add peerId to identify repeat - mtx cmtsync.Mutex - peerID p2p.ID - block *types.Block - extCommit *types.ExtendedCommit + pool *BlockPool + height int64 + gotBlockCh chan struct{} + redoCh chan p2p.ID // redo may got multiple messages, add peerId to identify repeat + newHeightCh chan int64 + + mtx cmtsync.Mutex + peerID p2p.ID + secondPeerID p2p.ID // alternative peer to request from (if close to pool's height) + gotBlockFrom p2p.ID + block *types.Block + extCommit *types.ExtendedCommit } func newBPRequester(pool *BlockPool, height int64) *bpRequester { bpr := &bpRequester{ - pool: pool, - height: height, - gotBlockCh: make(chan struct{}, 1), - redoCh: make(chan p2p.ID, 1), - - peerID: "", - block: nil, + pool: pool, + height: height, + gotBlockCh: make(chan struct{}, 1), + redoCh: make(chan p2p.ID, 1), + newHeightCh: make(chan int64, 1), + + peerID: "", + secondPeerID: "", + block: nil, } bpr.BaseService = *service.NewBaseService(nil, "bpRequester", bpr) return bpr @@ -547,15 +635,21 @@ func (bpr *bpRequester) OnStart() error { return nil } -// Returns true if the peer matches and block doesn't already exist. +// Returns true if the peer(s) match and block doesn't already exist. func (bpr *bpRequester) setBlock(block *types.Block, extCommit *types.ExtendedCommit, peerID p2p.ID) bool { bpr.mtx.Lock() - if bpr.block != nil || bpr.peerID != peerID { + if bpr.peerID != peerID && bpr.secondPeerID != peerID { bpr.mtx.Unlock() return false } + if bpr.block != nil { + bpr.mtx.Unlock() + return true // getting a block from both peers is not an error + } + bpr.block = block bpr.extCommit = extCommit + bpr.gotBlockFrom = peerID bpr.mtx.Unlock() select { @@ -577,24 +671,55 @@ func (bpr *bpRequester) getExtendedCommit() *types.ExtendedCommit { return bpr.extCommit } -func (bpr *bpRequester) getPeerID() p2p.ID { +// Returns the IDs of peers we've requested a block from. +func (bpr *bpRequester) requestedFrom() []p2p.ID { bpr.mtx.Lock() defer bpr.mtx.Unlock() - return bpr.peerID + peerIDs := make([]p2p.ID, 0, 2) + if bpr.peerID != "" { + peerIDs = append(peerIDs, bpr.peerID) + } + if bpr.secondPeerID != "" { + peerIDs = append(peerIDs, bpr.secondPeerID) + } + return peerIDs } -// This is called from the requestRoutine, upon redo(). -func (bpr *bpRequester) reset() { +// Returns true if we've requested a block from the given peer. +func (bpr *bpRequester) didRequestFrom(peerID p2p.ID) bool { bpr.mtx.Lock() defer bpr.mtx.Unlock() + return bpr.peerID == peerID || bpr.secondPeerID == peerID +} - if bpr.block != nil { +// Returns the ID of the peer who sent us the block. +func (bpr *bpRequester) gotBlockFromPeerID() p2p.ID { + bpr.mtx.Lock() + defer bpr.mtx.Unlock() + return bpr.gotBlockFrom +} + +// Removes the block (IF we got it from the given peer) and resets the peer. +func (bpr *bpRequester) reset(peerID p2p.ID) (removedBlock bool) { + bpr.mtx.Lock() + defer bpr.mtx.Unlock() + + // Only remove the block if we got it from that peer. + if bpr.gotBlockFrom == peerID { + bpr.block = nil + bpr.extCommit = nil + bpr.gotBlockFrom = "" + removedBlock = true atomic.AddInt32(&bpr.pool.numPending, 1) } - bpr.peerID = "" - bpr.block = nil - bpr.extCommit = nil + if bpr.peerID == peerID { + bpr.peerID = "" + } else { + bpr.secondPeerID = "" + } + + return removedBlock } // Tells bpRequester to pick another peer and try again. @@ -607,34 +732,75 @@ func (bpr *bpRequester) redo(peerID p2p.ID) { } } +func (bpr *bpRequester) pickPeerAndSendRequest() { + bpr.mtx.Lock() + secondPeerID := bpr.secondPeerID + bpr.mtx.Unlock() + + var peer *bpPeer +PICK_PEER_LOOP: + for { + if !bpr.IsRunning() || !bpr.pool.IsRunning() { + return + } + peer = bpr.pool.pickIncrAvailablePeer(bpr.height, secondPeerID) + if peer == nil { + bpr.Logger.Debug("No peers currently available; will retry shortly", "height", bpr.height) + time.Sleep(requestIntervalMS * time.Millisecond) + continue PICK_PEER_LOOP + } + break PICK_PEER_LOOP + } + bpr.mtx.Lock() + bpr.peerID = peer.id + bpr.mtx.Unlock() + + bpr.pool.sendRequest(bpr.height, peer.id) +} + +// Picks a second peer and sends a request to it. If the second peer is already +// set, does nothing. +func (bpr *bpRequester) pickSecondPeerAndSendRequest() { + bpr.mtx.Lock() + if bpr.secondPeerID != "" { + bpr.mtx.Unlock() + return + } + peerID := bpr.peerID + bpr.mtx.Unlock() + + secondPeer := bpr.pool.pickIncrAvailablePeer(bpr.height, peerID) + if secondPeer != nil { + bpr.mtx.Lock() + bpr.secondPeerID = secondPeer.id + bpr.mtx.Unlock() + + bpr.pool.sendRequest(bpr.height, secondPeer.id) + } +} + +// Informs the requester of a new pool's height. +func (bpr *bpRequester) newHeight(height int64) { + select { + case bpr.newHeightCh <- height: + default: + } +} + // Responsible for making more requests as necessary // Returns only when a block is found (e.g. AddBlock() is called) func (bpr *bpRequester) requestRoutine() { + gotBlock := false + OUTER_LOOP: for { - // Pick a peer to send request to. - var peer *bpPeer - PICK_PEER_LOOP: - for { - if !bpr.IsRunning() || !bpr.pool.IsRunning() { - return - } - peer = bpr.pool.pickIncrAvailablePeer(bpr.height) - if peer == nil { - bpr.Logger.Debug("No peers currently available; will retry shortly", "height", bpr.height) - time.Sleep(requestIntervalMS * time.Millisecond) - continue PICK_PEER_LOOP - } - break PICK_PEER_LOOP + bpr.pickPeerAndSendRequest() + + poolHeight := bpr.pool.Height() + if bpr.height-poolHeight < minBlocksForSingleRequest { + bpr.pickSecondPeerAndSendRequest() } - bpr.mtx.Lock() - bpr.peerID = peer.id - bpr.mtx.Unlock() - to := time.NewTimer(requestRetrySeconds * time.Second) - // Send request and wait. - bpr.pool.sendRequest(bpr.height, peer.id) - WAIT_LOOP: for { select { case <-bpr.pool.Quit(): @@ -644,21 +810,34 @@ OUTER_LOOP: return case <-bpr.Quit(): return - case <-to.C: - bpr.Logger.Debug("Retrying block request after timeout", "height", bpr.height, "peer", bpr.peerID) - // Simulate a redo - bpr.reset() - continue OUTER_LOOP + case <-time.After(requestRetrySeconds * time.Second): + if !gotBlock { + bpr.Logger.Debug("Retrying block request(s) after timeout", "height", bpr.height, "peer", bpr.peerID, "secondPeerID", bpr.secondPeerID) + bpr.reset(bpr.peerID) + bpr.reset(bpr.secondPeerID) + continue OUTER_LOOP + } case peerID := <-bpr.redoCh: - if peerID == bpr.peerID { - bpr.reset() + if bpr.didRequestFrom(peerID) { + removedBlock := bpr.reset(peerID) + if removedBlock { + gotBlock = false + } + } + // If both peers returned NoBlockResponse or bad block, reschedule both + // requests. If not, wait for the other peer. + if len(bpr.requestedFrom()) == 0 { continue OUTER_LOOP } - continue WAIT_LOOP + case newHeight := <-bpr.newHeightCh: + if !gotBlock && bpr.height-newHeight < minBlocksForSingleRequest { + // The operation is a noop if the second peer is already set. The cost is checking a mutex. + bpr.pickSecondPeerAndSendRequest() + } case <-bpr.gotBlockCh: + gotBlock = true // We got a block! // Continue the for-loop and wait til Quit. - continue WAIT_LOOP } } } diff --git a/blocksync/reactor.go b/blocksync/reactor.go index 497d87b67c8..5acd6069314 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -272,7 +272,7 @@ func (bcR *Reactor) Receive(e p2p.Envelope) { } if err := bcR.pool.AddBlock(e.Src.ID(), bi, extCommit, msg.Block.Size()); err != nil { - bcR.Logger.Error("failed to add block", "err", err) + bcR.Logger.Error("failed to add block", "peer", e.Src, "err", err) } case *bcproto.StatusRequest: // Send peer our state. @@ -288,6 +288,7 @@ func (bcR *Reactor) Receive(e p2p.Envelope) { bcR.pool.SetPeerRange(e.Src.ID(), msg.Base, msg.Height) case *bcproto.NoBlockResponse: bcR.Logger.Debug("Peer does not have requested block", "peer", e.Src, "height", msg.Height) + bcR.pool.RedoRequestFrom(msg.Height, e.Src.ID()) default: bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) } @@ -496,14 +497,14 @@ FOR_LOOP: } if err != nil { bcR.Logger.Error("Error in validation", "err", err) - peerID := bcR.pool.RedoRequest(first.Height) + peerID := bcR.pool.RemovePeerAndRedoAllPeerRequests(first.Height) peer := bcR.Switch.Peers().Get(peerID) if peer != nil { // NOTE: we've already removed the peer's request, but we // still need to clean up the rest. bcR.Switch.StopPeerForError(peer, ErrReactorValidation{Err: err}) } - peerID2 := bcR.pool.RedoRequest(second.Height) + peerID2 := bcR.pool.RemovePeerAndRedoAllPeerRequests(second.Height) peer2 := bcR.Switch.Peers().Get(peerID2) if peer2 != nil && peer2 != peer { // NOTE: we've already removed the peer's request, but we