diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt index 8ebd23963..36c7d8b69 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt @@ -10,6 +10,8 @@ import io.emeraldpay.dshackle.upstream.lowerbound.toHex import io.emeraldpay.dshackle.upstream.rpcclient.ListParams import reactor.core.publisher.Flux +const val MAX_OFFSET = 20 + class EthereumLowerBoundTxDetector( private val upstream: Upstream, ) : LowerBoundDetector() { @@ -20,7 +22,7 @@ class EthereumLowerBoundTxDetector( } override fun internalDetectLowerBound(): Flux { - return recursiveLowerBound.recursiveDetectLowerBoundWithOffset(20) { block -> + return recursiveLowerBound.recursiveDetectLowerBoundWithOffset(MAX_OFFSET) { block -> upstream.getIngressReader() .read( ChainRequest( diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/detector/RecursiveLowerBound.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/detector/RecursiveLowerBound.kt index 4f88dcf9a..62103bc39 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/detector/RecursiveLowerBound.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/detector/RecursiveLowerBound.kt @@ -108,54 +108,40 @@ class RecursiveLowerBound( val count = AtomicInteger(0) return Mono.just(LowerBoundBinarySearchData(currentMiddle - 1, false)) .expand { currentBlock -> - if (visitedBlocks.contains(currentBlock.current)) { - // if this block has been already seen there is no need to check it again - count.set(-1) - Mono.just(LowerBoundBinarySearchData(currentMiddle + 1, currentData.right, currentData.current)) + if (currentBlock.found) { + // to avoid extra handling + Mono.empty() } else { - if (currentBlock.current < 0) { - // to avoid negative numbers - count.set(-1) - Mono.just(LowerBoundBinarySearchData(currentMiddle + 1, currentData.right, currentData.current)) + if (visitedBlocks.contains(currentBlock.current) || currentBlock.current < 0) { + // if this block has been already seen there is no need to check it again + Mono.just(LowerBoundBinarySearchData(currentMiddle + 1, currentData.right, currentData.current, true)) } else { hasData(currentBlock.current) .retryWhen(retrySpec(currentBlock.current, nonRetryableErrors)) .flatMap(ChainResponse::requireResult) .map { // we found data at once and return it - count.set(-1) - LowerBoundBinarySearchData( - currentData.left, - currentBlock.current - 1, - currentBlock.current, - ) + LowerBoundBinarySearchData(currentData.left, currentBlock.current - 1, currentBlock.current, true) } .onErrorResume { // otherwise we go the left until we reach the specified limit count.incrementAndGet() if (count.get() in 1..maxLimit) { visitedBlocks.add(currentBlock.current) - Mono.just( - LowerBoundBinarySearchData( - currentBlock.current - 1, - false, - ), - ) + Mono.just(LowerBoundBinarySearchData(currentBlock.current - 1, false)) } else { - Mono.just( - LowerBoundBinarySearchData( - currentMiddle + 1, - currentData.right, - currentData.current, - ), - ) + Mono.just(LowerBoundBinarySearchData(currentMiddle + 1, currentData.right, currentData.current, true)) } } } } } - .filter { count.get() > maxLimit || count.get() == -1 } + .filter { it.found } .next() + .map { + // in terms of the whole calculation we haven't found the bound + LowerBoundBinarySearchData(it.left, it.right, it.current) + } } private fun initialRange(): Mono { diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundServiceTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundServiceTest.kt index 6a4bd722e..d9d9541c6 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundServiceTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundServiceTest.kt @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.upstream import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.ethereum.EthereumLowerBoundService +import io.emeraldpay.dshackle.upstream.ethereum.MAX_OFFSET import io.emeraldpay.dshackle.upstream.ethereum.ZERO_ADDRESS import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService @@ -45,6 +46,9 @@ class RecursiveLowerBoundServiceTest { on { read(ChainRequest("eth_getBlockByNumber", ListParams(it.toHex(), false))) } doReturn Mono.just(ChainResponse(ByteArray(0), null)) + on { + read(ChainRequest("eth_getBlockTransactionCountByNumber", ListParams(it.toHex()))) + } doReturn Mono.just(ChainResponse("\"0x12\"".toByteArray(), null)) } else { on { read(ChainRequest("eth_getBalance", ListParams(ZERO_ADDRESS, it.toHex()))) @@ -52,6 +56,11 @@ class RecursiveLowerBoundServiceTest { on { read(ChainRequest("eth_getBlockByNumber", ListParams(it.toHex(), false))) } doReturn Mono.error(RuntimeException("No block data")) + for (block in it downTo it - MAX_OFFSET - 1) { + on { + read(ChainRequest("eth_getBlockTransactionCountByNumber", ListParams(block.toHex()))) + } doReturn Mono.error(RuntimeException("No tx data")) + } } } } @@ -67,6 +76,7 @@ class RecursiveLowerBoundServiceTest { .expectNoEvent(Duration.ofSeconds(15)) .expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.STATE } .expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.BLOCK } + .expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.TX } .thenCancel() .verify(Duration.ofSeconds(3)) @@ -76,6 +86,7 @@ class RecursiveLowerBoundServiceTest { listOf( LowerBoundData(17964844L, LowerBoundType.STATE), LowerBoundData(17964844L, LowerBoundType.BLOCK), + LowerBoundData(17964844L, LowerBoundType.TX), ), ) }