diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundService.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundService.kt index 1211a5b0e..80b122c2d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundService.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundService.kt @@ -13,6 +13,7 @@ class EthereumLowerBoundService( return listOf( EthereumLowerBoundStateDetector(upstream), EthereumLowerBoundBlockDetector(upstream), + EthereumLowerBoundTxDetector(upstream), ) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt new file mode 100644 index 000000000..36c7d8b69 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt @@ -0,0 +1,44 @@ +package io.emeraldpay.dshackle.upstream.ethereum + +import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundDetector +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType +import io.emeraldpay.dshackle.upstream.lowerbound.detector.RecursiveLowerBound +import io.emeraldpay.dshackle.upstream.lowerbound.toHex +import io.emeraldpay.dshackle.upstream.rpcclient.ListParams +import reactor.core.publisher.Flux + +const val MAX_OFFSET = 20 + +class EthereumLowerBoundTxDetector( + private val upstream: Upstream, +) : LowerBoundDetector() { + private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.TX, setOf("No tx data"), lowerBounds) + + override fun period(): Long { + return 3 + } + + override fun internalDetectLowerBound(): Flux { + return recursiveLowerBound.recursiveDetectLowerBoundWithOffset(MAX_OFFSET) { block -> + upstream.getIngressReader() + .read( + ChainRequest( + "eth_getBlockTransactionCountByNumber", + ListParams(block.toHex()), + ), + ) + .doOnNext { + if (it.hasResult() && (it.getResult().contentEquals("null".toByteArray()) || it.getResultAsProcessedString().substring(2).toLong(16) == 0L)) { + throw IllegalStateException("No tx data") + } + } + } + } + + override fun types(): Set { + return setOf(LowerBoundType.TX) + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/detector/RecursiveLowerBound.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/detector/RecursiveLowerBound.kt index 5622b0786..62103bc39 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/detector/RecursiveLowerBound.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/detector/RecursiveLowerBound.kt @@ -11,6 +11,7 @@ import reactor.kotlin.core.publisher.toFlux import reactor.util.retry.Retry import reactor.util.retry.RetryBackoffSpec import java.time.Duration +import java.util.concurrent.atomic.AtomicInteger class RecursiveLowerBound( private val upstream: Upstream, @@ -21,18 +22,7 @@ class RecursiveLowerBound( private val log = LoggerFactory.getLogger(this::class.java) fun recursiveDetectLowerBound(hasData: (Long) -> Mono): Flux { - return Mono.just(upstream.getHead()) - .flatMap { - val currentHeight = it.getCurrentHeight() - if (currentHeight == null) { - Mono.empty() - } else if (!lowerBounds.contains(type)) { - Mono.just(LowerBoundBinarySearch(0, currentHeight)) - } else { - // next calculations will be carried out only within the last range - Mono.just(LowerBoundBinarySearch(lowerBounds[type]!!.lowerBound, currentHeight)) - } - } + return initialRange() .expand { data -> if (data.found) { Mono.empty() @@ -41,22 +31,106 @@ class RecursiveLowerBound( if (data.left > data.right) { val current = if (data.current == 0L) 1 else data.current - Mono.just(LowerBoundBinarySearch(current, true)) + Mono.just(LowerBoundBinarySearchData(current, true)) } else { hasData(middle) - .retryWhen(retrySpec(nonRetryableErrors)) + .retryWhen(retrySpec(middle, nonRetryableErrors)) + .flatMap(ChainResponse::requireResult) + .map { LowerBoundBinarySearchData(data.left, middle - 1, middle) } + .onErrorReturn( + LowerBoundBinarySearchData( + middle + 1, + data.right, + data.current, + ), + ) + } + } + } + .filter { it.found } + .next() + .map { + LowerBoundData(it.current, type) + }.toFlux() + } + + fun recursiveDetectLowerBoundWithOffset(maxLimit: Int, hasData: (Long) -> Mono): Flux { + val visitedBlocks = HashSet() + return Mono.justOrEmpty(lowerBounds[type]?.lowerBound) + .flatMapMany { + // at first, we try to check the current bound to prevent huge calculations + hasData(it!!) + .retryWhen(retrySpec(it, nonRetryableErrors)) + .flatMap(ChainResponse::requireResult) + .map { LowerBoundData(lowerBounds[type]!!.lowerBound, type) } + .onErrorResume { Mono.empty() } + }.switchIfEmpty( + initialRange() + .expand { data -> + if (data.found) { + Mono.empty() + } else { + val middle = middleBlock(data) + + if (data.left > data.right) { + val current = if (data.current == 0L) 1 else data.current + Mono.just(LowerBoundBinarySearchData(current, true)) + } else { + hasData(middle) + .retryWhen(retrySpec(middle, nonRetryableErrors)) + .flatMap(ChainResponse::requireResult) + .map { LowerBoundBinarySearchData(data.left, middle - 1, middle) } + .onErrorResume { + if (middle < 0) { + Mono.just(LowerBoundBinarySearchData(middle + 1, data.right, data.current)) + } else { + shiftLeftAndSearch(data, middle, visitedBlocks, maxLimit, hasData) + } + } + } + } + } + .filter { it.found } + .next() + .map { + LowerBoundData(it.current, type) + }.toFlux(), + ) + } + + private fun shiftLeftAndSearch( + currentData: LowerBoundBinarySearchData, + currentMiddle: Long, + visitedBlocks: HashSet, + maxLimit: Int, + hasData: (Long) -> Mono, + ): Mono { + val count = AtomicInteger(0) + return Mono.just(LowerBoundBinarySearchData(currentMiddle - 1, false)) + .expand { currentBlock -> + if (currentBlock.found) { + // to avoid extra handling + Mono.empty() + } else { + if (visitedBlocks.contains(currentBlock.current) || currentBlock.current < 0) { + // if this block has been already seen there is no need to check it again + Mono.just(LowerBoundBinarySearchData(currentMiddle + 1, currentData.right, currentData.current, true)) + } else { + hasData(currentBlock.current) + .retryWhen(retrySpec(currentBlock.current, nonRetryableErrors)) .flatMap(ChainResponse::requireResult) - .map { true } - .onErrorReturn(false) .map { - if (it) { - LowerBoundBinarySearch(data.left, middle - 1, middle) + // we found data at once and return it + LowerBoundBinarySearchData(currentData.left, currentBlock.current - 1, currentBlock.current, true) + } + .onErrorResume { + // otherwise we go the left until we reach the specified limit + count.incrementAndGet() + if (count.get() in 1..maxLimit) { + visitedBlocks.add(currentBlock.current) + Mono.just(LowerBoundBinarySearchData(currentBlock.current - 1, false)) } else { - LowerBoundBinarySearch( - middle + 1, - data.right, - data.current, - ) + Mono.just(LowerBoundBinarySearchData(currentMiddle + 1, currentData.right, currentData.current, true)) } } } @@ -65,11 +139,27 @@ class RecursiveLowerBound( .filter { it.found } .next() .map { - LowerBoundData(it.current, type) - }.toFlux() + // in terms of the whole calculation we haven't found the bound + LowerBoundBinarySearchData(it.left, it.right, it.current) + } + } + + private fun initialRange(): Mono { + return Mono.just(upstream.getHead()) + .flatMap { + val currentHeight = it.getCurrentHeight() + if (currentHeight == null) { + Mono.empty() + } else if (!lowerBounds.contains(type)) { + Mono.just(LowerBoundBinarySearchData(0, currentHeight)) + } else { + // next calculations will be carried out only within the last range + Mono.just(LowerBoundBinarySearchData(lowerBounds[type]!!.lowerBound, currentHeight)) + } + } } - private fun retrySpec(nonRetryableErrors: Set): RetryBackoffSpec { + private fun retrySpec(block: Long, nonRetryableErrors: Set): RetryBackoffSpec { return Retry.backoff( Long.MAX_VALUE, Duration.ofSeconds(1), @@ -80,18 +170,20 @@ class RecursiveLowerBound( } .doAfterRetry { log.debug( - "Error in calculation of lower block of upstream {}, retry attempt - {}, message - {}", + "Error in calculation of lower block {} of upstream {}, type - {}, retry attempt - {}, message - {}", + block, upstream.getId(), + type, it.totalRetries(), it.failure().message, ) } } - private fun middleBlock(lowerBoundBinarySearch: LowerBoundBinarySearch): Long = - lowerBoundBinarySearch.left + (lowerBoundBinarySearch.right - lowerBoundBinarySearch.left) / 2 + private fun middleBlock(lowerBoundBinarySearchData: LowerBoundBinarySearchData): Long = + lowerBoundBinarySearchData.left + (lowerBoundBinarySearchData.right - lowerBoundBinarySearchData.left) / 2 - private data class LowerBoundBinarySearch( + private data class LowerBoundBinarySearchData( val left: Long, val right: Long, val current: Long, diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundServiceTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundServiceTest.kt index 6a4bd722e..d9d9541c6 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundServiceTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundServiceTest.kt @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.upstream import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.ethereum.EthereumLowerBoundService +import io.emeraldpay.dshackle.upstream.ethereum.MAX_OFFSET import io.emeraldpay.dshackle.upstream.ethereum.ZERO_ADDRESS import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService @@ -45,6 +46,9 @@ class RecursiveLowerBoundServiceTest { on { read(ChainRequest("eth_getBlockByNumber", ListParams(it.toHex(), false))) } doReturn Mono.just(ChainResponse(ByteArray(0), null)) + on { + read(ChainRequest("eth_getBlockTransactionCountByNumber", ListParams(it.toHex()))) + } doReturn Mono.just(ChainResponse("\"0x12\"".toByteArray(), null)) } else { on { read(ChainRequest("eth_getBalance", ListParams(ZERO_ADDRESS, it.toHex()))) @@ -52,6 +56,11 @@ class RecursiveLowerBoundServiceTest { on { read(ChainRequest("eth_getBlockByNumber", ListParams(it.toHex(), false))) } doReturn Mono.error(RuntimeException("No block data")) + for (block in it downTo it - MAX_OFFSET - 1) { + on { + read(ChainRequest("eth_getBlockTransactionCountByNumber", ListParams(block.toHex()))) + } doReturn Mono.error(RuntimeException("No tx data")) + } } } } @@ -67,6 +76,7 @@ class RecursiveLowerBoundServiceTest { .expectNoEvent(Duration.ofSeconds(15)) .expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.STATE } .expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.BLOCK } + .expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.TX } .thenCancel() .verify(Duration.ofSeconds(3)) @@ -76,6 +86,7 @@ class RecursiveLowerBoundServiceTest { listOf( LowerBoundData(17964844L, LowerBoundType.STATE), LowerBoundData(17964844L, LowerBoundType.BLOCK), + LowerBoundData(17964844L, LowerBoundType.TX), ), ) }