diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundBlockDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundBlockDetector.kt index 572902ab1..de515a5de 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundBlockDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundBlockDetector.kt @@ -57,6 +57,8 @@ class EthereumLowerBoundBlockDetector( } } } + }.flatMap { + Flux.just(it, lowerBoundFrom(it, LowerBoundType.LOGS)) } } @@ -65,6 +67,6 @@ class EthereumLowerBoundBlockDetector( } override fun types(): Set { - return setOf(LowerBoundType.BLOCK) + return setOf(LowerBoundType.BLOCK, LowerBoundType.LOGS) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundLogsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundLogsDetector.kt deleted file mode 100644 index 5349c7ce6..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundLogsDetector.kt +++ /dev/null @@ -1,55 +0,0 @@ -package io.emeraldpay.dshackle.upstream.ethereum - -import io.emeraldpay.dshackle.Defaults -import io.emeraldpay.dshackle.upstream.ChainRequest -import io.emeraldpay.dshackle.upstream.Upstream -import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData -import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundDetector -import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType -import io.emeraldpay.dshackle.upstream.lowerbound.detector.RecursiveLowerBound -import io.emeraldpay.dshackle.upstream.lowerbound.toHex -import io.emeraldpay.dshackle.upstream.rpcclient.ListParams -import reactor.core.publisher.Flux - -class EthereumLowerBoundLogsDetector( - private val upstream: Upstream, -) : LowerBoundDetector(upstream.getChain()) { - - companion object { - const val MAX_OFFSET = 20 - private const val NO_LOGS_DATA = "No logs data" - } - - private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.LOGS, setOf(NO_LOGS_DATA), lowerBounds) - - override fun period(): Long { - return 3 - } - - override fun internalDetectLowerBound(): Flux { - return recursiveLowerBound.recursiveDetectLowerBoundWithOffset(MAX_OFFSET) { block -> - upstream.getIngressReader() - .read( - ChainRequest( - "eth_getLogs", - ListParams( - mapOf( - "fromBlock" to block.toHex(), - "toBlock" to block.toHex(), - ), - ), - ), - ) - .timeout(Defaults.internalCallsTimeout) - .doOnNext { - if (it.hasResult() && (it.getResult().contentEquals("null".toByteArray()) || it.getResult().contentEquals("[]".toByteArray()))) { - throw IllegalStateException(NO_LOGS_DATA) - } - } - } - } - - override fun types(): Set { - return setOf(LowerBoundType.LOGS) - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundService.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundService.kt index b6671387f..80b122c2d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundService.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundService.kt @@ -14,7 +14,6 @@ class EthereumLowerBoundService( EthereumLowerBoundStateDetector(upstream), EthereumLowerBoundBlockDetector(upstream), EthereumLowerBoundTxDetector(upstream), - EthereumLowerBoundLogsDetector(upstream), ) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundStateDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundStateDetector.kt index 30870231e..90bd949ab 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundStateDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundStateDetector.kt @@ -70,18 +70,10 @@ class EthereumLowerBoundStateDetector( } } }.flatMap { - Flux.just(it, lowerBoundFromState(it, LowerBoundType.TRACE)) + Flux.just(it, lowerBoundFrom(it, LowerBoundType.TRACE)) } } - private fun lowerBoundFromState(stateLowerBoundData: LowerBoundData, newType: LowerBoundType): LowerBoundData { - val currentBound = lowerBounds.getLastBound(newType) - if (currentBound == null || stateLowerBoundData.lowerBound >= currentBound.lowerBound) { - return stateLowerBoundData.copy(type = newType) - } - return LowerBoundData(currentBound.lowerBound, newType) - } - override fun types(): Set { return setOf(LowerBoundType.STATE, LowerBoundType.TRACE) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundDetector.kt index 3409b6eba..72738e7dc 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundDetector.kt @@ -50,6 +50,14 @@ abstract class LowerBoundDetector( protected abstract fun internalDetectLowerBound(): Flux + protected fun lowerBoundFrom(lowerBoundFrom: LowerBoundData, newType: LowerBoundType): LowerBoundData { + val currentBound = lowerBounds.getLastBound(newType) + if (currentBound == null || lowerBoundFrom.lowerBound >= currentBound.lowerBound) { + return lowerBoundFrom.copy(type = newType) + } + return LowerBoundData(currentBound.lowerBound, newType) + } + abstract fun types(): Set fun updateLowerBound(lowerBound: Long, type: LowerBoundType) { diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/RecursiveLowerBoundServiceTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/RecursiveLowerBoundServiceTest.kt index 23bb85707..144ad911c 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/RecursiveLowerBoundServiceTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/RecursiveLowerBoundServiceTest.kt @@ -55,9 +55,6 @@ class RecursiveLowerBoundServiceTest { on { read(ChainRequest("eth_getTransactionByHash", ListParams("0x99e52a94cfdf83a5bdadcd2e25c71574a5a24fa4df56a33f9f8b5cb6fa0ac657"))) } doReturn Mono.just(ChainResponse(ByteArray(0), null)) - on { - read(ChainRequest("eth_getLogs", ListParams(mapOf("fromBlock" to it.toHex(), "toBlock" to it.toHex())))) - } doReturn Mono.just(ChainResponse("[\"0x12\"]".toByteArray(), null)) } else { on { read(ChainRequest("eth_getBalance", ListParams(ZERO_ADDRESS, it.toHex()))) @@ -66,9 +63,6 @@ class RecursiveLowerBoundServiceTest { on { read(ChainRequest("eth_getBlockByNumber", ListParams(block.toHex(), false))) } doReturn Mono.just(ChainResponse(Global.nullValue, null)) - on { - read(ChainRequest("eth_getLogs", ListParams(mapOf("fromBlock" to block.toHex(), "toBlock" to block.toHex())))) - } doReturn Mono.error(RuntimeException("No logs data")) } } } @@ -88,8 +82,8 @@ class RecursiveLowerBoundServiceTest { .expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.STATE } .expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.TRACE } .expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.BLOCK } - .expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.TX } .expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.LOGS } + .expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.TX } .thenCancel() .verify(Duration.ofSeconds(3)) @@ -100,8 +94,8 @@ class RecursiveLowerBoundServiceTest { LowerBoundData(17964844L, LowerBoundType.STATE), LowerBoundData(17964844L, LowerBoundType.TRACE), LowerBoundData(17964844L, LowerBoundType.BLOCK), - LowerBoundData(17964844L, LowerBoundType.TX), LowerBoundData(17964844L, LowerBoundType.LOGS), + LowerBoundData(17964844L, LowerBoundType.TX), ), ) }