From f8537e09dc0e1b0050627f3f584724c00bbe820d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB?= Date: Mon, 3 Jun 2024 17:09:17 +0400 Subject: [PATCH] Tx lower bound --- .../ethereum/EthereumLowerBoundService.kt | 1 + .../ethereum/EthereumLowerBoundTxDetector.kt | 42 +++++ .../detector/RecursiveLowerBound.kt | 168 ++++++++++++++---- 3 files changed, 180 insertions(+), 31 deletions(-) create mode 100644 src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundService.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundService.kt index 1211a5b0e..80b122c2d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundService.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundService.kt @@ -13,6 +13,7 @@ class EthereumLowerBoundService( return listOf( EthereumLowerBoundStateDetector(upstream), EthereumLowerBoundBlockDetector(upstream), + EthereumLowerBoundTxDetector(upstream), ) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt new file mode 100644 index 000000000..8ebd23963 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt @@ -0,0 +1,42 @@ +package io.emeraldpay.dshackle.upstream.ethereum + +import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundDetector +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType +import io.emeraldpay.dshackle.upstream.lowerbound.detector.RecursiveLowerBound +import io.emeraldpay.dshackle.upstream.lowerbound.toHex +import io.emeraldpay.dshackle.upstream.rpcclient.ListParams +import reactor.core.publisher.Flux + +class EthereumLowerBoundTxDetector( + private val upstream: Upstream, +) : LowerBoundDetector() { + private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.TX, setOf("No tx data"), lowerBounds) + + override fun period(): Long { + return 3 + } + + override fun internalDetectLowerBound(): Flux { + return recursiveLowerBound.recursiveDetectLowerBoundWithOffset(20) { block -> + upstream.getIngressReader() + .read( + ChainRequest( + "eth_getBlockTransactionCountByNumber", + ListParams(block.toHex()), + ), + ) + .doOnNext { + if (it.hasResult() && (it.getResult().contentEquals("null".toByteArray()) || it.getResultAsProcessedString().substring(2).toLong(16) == 0L)) { + throw IllegalStateException("No tx data") + } + } + } + } + + override fun types(): Set { + return setOf(LowerBoundType.TX) + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/detector/RecursiveLowerBound.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/detector/RecursiveLowerBound.kt index 5622b0786..4f88dcf9a 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/detector/RecursiveLowerBound.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/detector/RecursiveLowerBound.kt @@ -11,6 +11,7 @@ import reactor.kotlin.core.publisher.toFlux import reactor.util.retry.Retry import reactor.util.retry.RetryBackoffSpec import java.time.Duration +import java.util.concurrent.atomic.AtomicInteger class RecursiveLowerBound( private val upstream: Upstream, @@ -21,18 +22,7 @@ class RecursiveLowerBound( private val log = LoggerFactory.getLogger(this::class.java) fun recursiveDetectLowerBound(hasData: (Long) -> Mono): Flux { - return Mono.just(upstream.getHead()) - .flatMap { - val currentHeight = it.getCurrentHeight() - if (currentHeight == null) { - Mono.empty() - } else if (!lowerBounds.contains(type)) { - Mono.just(LowerBoundBinarySearch(0, currentHeight)) - } else { - // next calculations will be carried out only within the last range - Mono.just(LowerBoundBinarySearch(lowerBounds[type]!!.lowerBound, currentHeight)) - } - } + return initialRange() .expand { data -> if (data.found) { Mono.empty() @@ -41,35 +31,149 @@ class RecursiveLowerBound( if (data.left > data.right) { val current = if (data.current == 0L) 1 else data.current - Mono.just(LowerBoundBinarySearch(current, true)) + Mono.just(LowerBoundBinarySearchData(current, true)) } else { hasData(middle) - .retryWhen(retrySpec(nonRetryableErrors)) + .retryWhen(retrySpec(middle, nonRetryableErrors)) + .flatMap(ChainResponse::requireResult) + .map { LowerBoundBinarySearchData(data.left, middle - 1, middle) } + .onErrorReturn( + LowerBoundBinarySearchData( + middle + 1, + data.right, + data.current, + ), + ) + } + } + } + .filter { it.found } + .next() + .map { + LowerBoundData(it.current, type) + }.toFlux() + } + + fun recursiveDetectLowerBoundWithOffset(maxLimit: Int, hasData: (Long) -> Mono): Flux { + val visitedBlocks = HashSet() + return Mono.justOrEmpty(lowerBounds[type]?.lowerBound) + .flatMapMany { + // at first, we try to check the current bound to prevent huge calculations + hasData(it!!) + .retryWhen(retrySpec(it, nonRetryableErrors)) + .flatMap(ChainResponse::requireResult) + .map { LowerBoundData(lowerBounds[type]!!.lowerBound, type) } + .onErrorResume { Mono.empty() } + }.switchIfEmpty( + initialRange() + .expand { data -> + if (data.found) { + Mono.empty() + } else { + val middle = middleBlock(data) + + if (data.left > data.right) { + val current = if (data.current == 0L) 1 else data.current + Mono.just(LowerBoundBinarySearchData(current, true)) + } else { + hasData(middle) + .retryWhen(retrySpec(middle, nonRetryableErrors)) + .flatMap(ChainResponse::requireResult) + .map { LowerBoundBinarySearchData(data.left, middle - 1, middle) } + .onErrorResume { + if (middle < 0) { + Mono.just(LowerBoundBinarySearchData(middle + 1, data.right, data.current)) + } else { + shiftLeftAndSearch(data, middle, visitedBlocks, maxLimit, hasData) + } + } + } + } + } + .filter { it.found } + .next() + .map { + LowerBoundData(it.current, type) + }.toFlux(), + ) + } + + private fun shiftLeftAndSearch( + currentData: LowerBoundBinarySearchData, + currentMiddle: Long, + visitedBlocks: HashSet, + maxLimit: Int, + hasData: (Long) -> Mono, + ): Mono { + val count = AtomicInteger(0) + return Mono.just(LowerBoundBinarySearchData(currentMiddle - 1, false)) + .expand { currentBlock -> + if (visitedBlocks.contains(currentBlock.current)) { + // if this block has been already seen there is no need to check it again + count.set(-1) + Mono.just(LowerBoundBinarySearchData(currentMiddle + 1, currentData.right, currentData.current)) + } else { + if (currentBlock.current < 0) { + // to avoid negative numbers + count.set(-1) + Mono.just(LowerBoundBinarySearchData(currentMiddle + 1, currentData.right, currentData.current)) + } else { + hasData(currentBlock.current) + .retryWhen(retrySpec(currentBlock.current, nonRetryableErrors)) .flatMap(ChainResponse::requireResult) - .map { true } - .onErrorReturn(false) .map { - if (it) { - LowerBoundBinarySearch(data.left, middle - 1, middle) + // we found data at once and return it + count.set(-1) + LowerBoundBinarySearchData( + currentData.left, + currentBlock.current - 1, + currentBlock.current, + ) + } + .onErrorResume { + // otherwise we go the left until we reach the specified limit + count.incrementAndGet() + if (count.get() in 1..maxLimit) { + visitedBlocks.add(currentBlock.current) + Mono.just( + LowerBoundBinarySearchData( + currentBlock.current - 1, + false, + ), + ) } else { - LowerBoundBinarySearch( - middle + 1, - data.right, - data.current, + Mono.just( + LowerBoundBinarySearchData( + currentMiddle + 1, + currentData.right, + currentData.current, + ), ) } } } } } - .filter { it.found } + .filter { count.get() > maxLimit || count.get() == -1 } .next() - .map { - LowerBoundData(it.current, type) - }.toFlux() } - private fun retrySpec(nonRetryableErrors: Set): RetryBackoffSpec { + private fun initialRange(): Mono { + return Mono.just(upstream.getHead()) + .flatMap { + val currentHeight = it.getCurrentHeight() + if (currentHeight == null) { + Mono.empty() + } else if (!lowerBounds.contains(type)) { + Mono.just(LowerBoundBinarySearchData(0, currentHeight)) + } else { + // next calculations will be carried out only within the last range + Mono.just(LowerBoundBinarySearchData(lowerBounds[type]!!.lowerBound, currentHeight)) + } + } + } + + private fun retrySpec(block: Long, nonRetryableErrors: Set): RetryBackoffSpec { return Retry.backoff( Long.MAX_VALUE, Duration.ofSeconds(1), @@ -80,18 +184,20 @@ class RecursiveLowerBound( } .doAfterRetry { log.debug( - "Error in calculation of lower block of upstream {}, retry attempt - {}, message - {}", + "Error in calculation of lower block {} of upstream {}, type - {}, retry attempt - {}, message - {}", + block, upstream.getId(), + type, it.totalRetries(), it.failure().message, ) } } - private fun middleBlock(lowerBoundBinarySearch: LowerBoundBinarySearch): Long = - lowerBoundBinarySearch.left + (lowerBoundBinarySearch.right - lowerBoundBinarySearch.left) / 2 + private fun middleBlock(lowerBoundBinarySearchData: LowerBoundBinarySearchData): Long = + lowerBoundBinarySearchData.left + (lowerBoundBinarySearchData.right - lowerBoundBinarySearchData.left) / 2 - private data class LowerBoundBinarySearch( + private data class LowerBoundBinarySearchData( val left: Long, val right: Long, val current: Long,