diff --git a/.changelog/unreleased/improvements/2467-decrease-n-of-requested-blocks.md b/.changelog/unreleased/improvements/2467-decrease-n-of-requested-blocks.md new file mode 100644 index 00000000000..3b5ea17ce5a --- /dev/null +++ b/.changelog/unreleased/improvements/2467-decrease-n-of-requested-blocks.md @@ -0,0 +1,3 @@ +- `[blocksync]` make the max number of downloaded blocks dynamic. + Previously it was a const 600. Now it's `peersCount * maxPendingRequestsPerPeer (20)` + [\#2467](https://github.com/cometbft/cometbft/pull/2467) diff --git a/blocksync/pool.go b/blocksync/pool.go index 919586693d9..bf8ceab97c6 100644 --- a/blocksync/pool.go +++ b/blocksync/pool.go @@ -29,8 +29,6 @@ eg, L = latency = 0.1s const ( requestIntervalMS = 2 - maxTotalRequesters = 600 - maxPendingRequests = maxTotalRequesters maxPendingRequestsPerPeer = 20 requestRetrySeconds = 30 @@ -41,9 +39,6 @@ const ( // Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s, // sending data across atlantic ~ 7.5 KB/s. minRecvRate = 7680 - - // Maximum difference between current and new block's height. - maxDiffBetweenCurrentAndReceivedBlockHeight = 100 ) var peerTimeout = 15 * time.Second // not const so we can override with tests @@ -108,24 +103,27 @@ func (pool *BlockPool) OnStart() error { func (pool *BlockPool) makeRequestersRoutine() { for { if !pool.IsRunning() { - break + return } - _, numPending, lenRequesters := pool.GetStatus() + pool.mtx.Lock() + var ( + maxRequestersCreated = len(pool.requesters) >= len(pool.peers)*maxPendingRequestsPerPeer + + nextHeight = pool.height + int64(len(pool.requesters)) + maxPeerHeightReached = nextHeight > pool.maxPeerHeight + ) + pool.mtx.Unlock() + switch { - case numPending >= maxPendingRequests: - // sleep for a bit. + case maxRequestersCreated: // If we have enough requesters, wait for them to finish. time.Sleep(requestIntervalMS * time.Millisecond) - // check for timed out peers pool.removeTimedoutPeers() - case lenRequesters >= maxTotalRequesters: - // sleep for a bit. + case maxPeerHeightReached: // If we're caught up, wait for a bit so reactor could finish or a higher height is reported. time.Sleep(requestIntervalMS * time.Millisecond) - // check for timed out peers - pool.removeTimedoutPeers() default: // request for more blocks. - pool.makeNextRequester() + pool.makeNextRequester(nextHeight) } } } @@ -277,7 +275,8 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, extCommit *ty if diff < 0 { diff *= -1 } - if diff > maxDiffBetweenCurrentAndReceivedBlockHeight { + const maxDiff = 100 // maximum difference between current and received block height + if diff > maxDiff { pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID) } return fmt.Errorf("peer sent us a block we didn't expect (peer: %s, current height: %d, block height: %d)", peerID, pool.height, block.Height) @@ -391,30 +390,20 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { return nil } -func (pool *BlockPool) makeNextRequester() { +func (pool *BlockPool) makeNextRequester(nextHeight int64) { pool.mtx.Lock() defer pool.mtx.Unlock() - nextHeight := pool.height + pool.requestersLen() - if nextHeight > pool.maxPeerHeight { - return - } - request := newBPRequester(pool, nextHeight) pool.requesters[nextHeight] = request atomic.AddInt32(&pool.numPending, 1) - err := request.Start() - if err != nil { + if err := request.Start(); err != nil { request.Logger.Error("Error starting request", "err", err) } } -func (pool *BlockPool) requestersLen() int64 { - return int64(len(pool.requesters)) -} - func (pool *BlockPool) sendRequest(height int64, peerID p2p.ID) { if !pool.IsRunning() { return @@ -437,7 +426,7 @@ func (pool *BlockPool) debug() string { defer pool.mtx.Unlock() str := "" - nextHeight := pool.height + pool.requestersLen() + nextHeight := pool.height + int64(len(pool.requesters)) for h := pool.height; h < nextHeight; h++ { if pool.requesters[h] == nil { str += fmt.Sprintf("H(%v):X ", h) diff --git a/blocksync/pool_test.go b/blocksync/pool_test.go index c5bfab46b5a..275f2fd1fa5 100644 --- a/blocksync/pool_test.go +++ b/blocksync/pool_test.go @@ -81,10 +81,12 @@ func makePeers(numPeers int, minHeight, maxHeight int64) testPeers { } func TestBlockPoolBasic(t *testing.T) { - start := int64(42) - peers := makePeers(10, start+1, 1000) - errorsCh := make(chan peerError, 1000) - requestsCh := make(chan BlockRequest, 1000) + var ( + start = int64(42) + peers = makePeers(10, start, 1000) + errorsCh = make(chan peerError) + requestsCh = make(chan BlockRequest) + ) pool := NewBlockPool(start, requestsCh, errorsCh) pool.SetLogger(log.TestingLogger()) @@ -141,10 +143,13 @@ func TestBlockPoolBasic(t *testing.T) { } func TestBlockPoolTimeout(t *testing.T) { - start := int64(42) - peers := makePeers(10, start+1, 1000) - errorsCh := make(chan peerError, 1000) - requestsCh := make(chan BlockRequest, 1000) + var ( + start = int64(42) + peers = makePeers(10, start, 1000) + errorsCh = make(chan peerError) + requestsCh = make(chan BlockRequest) + ) + pool := NewBlockPool(start, requestsCh, errorsCh) pool.SetLogger(log.TestingLogger()) err := pool.Start() diff --git a/blocksync/reactor.go b/blocksync/reactor.go index 302ddaadf6c..497d87b67c8 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -85,7 +85,10 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS panic(fmt.Sprintf("state (%v) and store (%v) height mismatch, stores were left in an inconsistent state", state.LastBlockHeight, storeHeight)) } - requestsCh := make(chan BlockRequest, maxTotalRequesters) + + // It's okay to block since sendRequest is called from a separate goroutine + // (bpRequester#requestRoutine; 1 per each peer). + requestsCh := make(chan BlockRequest) const capacity = 1000 // must be bigger than peers count errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock