diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetector.kt index 47e5a6c84..1f6ae90e5 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetector.kt @@ -1,7 +1,9 @@ package io.emeraldpay.dshackle.upstream.ethereum +import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.ChainResponse import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionRefJson @@ -13,6 +15,7 @@ import io.emeraldpay.dshackle.upstream.finalization.FinalizationType import io.emeraldpay.dshackle.upstream.rpcclient.ListParams import org.slf4j.LoggerFactory import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import reactor.core.publisher.Sinks import java.time.Duration import java.util.concurrent.ConcurrentHashMap @@ -23,12 +26,13 @@ class EthereumFinalizationDetector : FinalizationDetector { } val data: ConcurrentHashMap = ConcurrentHashMap() - + private val disableDetector: ConcurrentHashMap = ConcurrentHashMap() private val finalizationSink = Sinks.many().multicast().directBestEffort() override fun detectFinalization( upstream: Upstream, blockTime: Duration, + chain: Chain, ): Flux { return Flux.merge( finalizationSink.asFlux(), @@ -56,23 +60,38 @@ class EthereumFinalizationDetector : FinalizationDetector { ), ), ).flatMap { (type, req) -> - upstream - .getIngressReader() - .read(req) - .flatMap { - it.requireResult().map { result -> - val block = - Global.objectMapper - .readValue(result, BlockJson::class.java) as BlockJson? - if (block != null) { - FinalizationData(block.number, type) + if (!disableDetector.getOrDefault(type, false)) { + upstream + .getIngressReader() + .read(req) + .onErrorResume { + if (it.message != null && it.message!!.matches(Regex("(bad request|block not found|Unknown block|tag not supported on pre-merge network)"))) { + log.warn("Can't retrieve tagged block, finalization detector for upstream ${upstream.getId()} $chain tag $type disabled") + disableDetector[type] = true } else { - throw RpcException(RpcResponseError.CODE_INVALID_JSON, "can't parse block data") + throw it + } + Mono.empty() + } + .flatMap { + it.requireResult().map { result -> + val block = Global.objectMapper + .readValue( + result, + BlockJson::class.java, + ) as BlockJson? + if (block != null) { + FinalizationData(block.number, type) + } else { + throw RpcException(RpcResponseError.CODE_INVALID_JSON, "can't parse block data") + } } } - } + } else { + Flux.empty() + } }.onErrorResume { - log.error("Error during retrieving — $it") + log.error("Error in FinalizationDetector for upstream ${upstream.getId()} $chain — $it") Flux.empty() } }.filter { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/finalization/FinalizationDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/finalization/FinalizationDetector.kt index b4aa6ff26..6563256be 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/finalization/FinalizationDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/finalization/FinalizationDetector.kt @@ -1,5 +1,6 @@ package io.emeraldpay.dshackle.upstream.finalization +import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.upstream.Upstream import reactor.core.publisher.Flux import java.time.Duration @@ -8,6 +9,7 @@ interface FinalizationDetector { fun detectFinalization( upstream: Upstream, blockTime: Duration, + chain: Chain, ): Flux fun getFinalizations(): Collection @@ -19,6 +21,7 @@ class NoopFinalizationDetector : FinalizationDetector { override fun detectFinalization( upstream: Upstream, blockTime: Duration, + chain: Chain, ): Flux { return Flux.empty() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index 2184b50bc..326db67a2 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -308,7 +308,7 @@ open class GenericUpstream( private fun detectFinalization() { finalizationDetectorSubscription = - finalizationDetector.detectFinalization(this, chainConfig.expectedBlockTime).subscribe { + finalizationDetector.detectFinalization(this, chainConfig.expectedBlockTime, chain).subscribe { sendUpstreamStateEvent(UPDATED) } } diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetectorTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetectorTest.kt index c1df16722..f109645a5 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetectorTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetectorTest.kt @@ -1,5 +1,6 @@ package io.emeraldpay.dshackle.upstream.ethereum +import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.ChainRequest @@ -33,9 +34,13 @@ class EthereumFinalizationDetectorTest { val upstream = mock { on { getIngressReader() } doReturn reader } + + val chain = mock { + on { toString() } doReturn "TestChain" + } val detector = EthereumFinalizationDetector() - StepVerifier.withVirtualTime { detector.detectFinalization(upstream, Duration.ofMillis(200)) } + StepVerifier.withVirtualTime { detector.detectFinalization(upstream, Duration.ofMillis(200), chain) } .expectSubscription() .thenAwait(Duration.ofSeconds(0)) .expectNext(FinalizationData(1L, FinalizationType.SAFE_BLOCK))