Skip to content

Commit

Permalink
Disable futher finalized/safe requests for detectors if received "tag…
Browse files Browse the repository at this point in the history
… not supported" error
  • Loading branch information
msizov authored Jul 17, 2024
1 parent 2e2a538 commit f1c669f
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.ChainResponse
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson
import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionRefJson
Expand All @@ -13,6 +15,7 @@ import io.emeraldpay.dshackle.upstream.finalization.FinalizationType
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.Sinks
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
Expand All @@ -23,12 +26,13 @@ class EthereumFinalizationDetector : FinalizationDetector {
}

val data: ConcurrentHashMap<FinalizationType, FinalizationData> = ConcurrentHashMap()

private val disableDetector: ConcurrentHashMap<FinalizationType, Boolean> = ConcurrentHashMap()
private val finalizationSink = Sinks.many().multicast().directBestEffort<FinalizationData>()

override fun detectFinalization(
upstream: Upstream,
blockTime: Duration,
chain: Chain,
): Flux<FinalizationData> {
return Flux.merge(
finalizationSink.asFlux(),
Expand Down Expand Up @@ -56,23 +60,38 @@ class EthereumFinalizationDetector : FinalizationDetector {
),
),
).flatMap { (type, req) ->
upstream
.getIngressReader()
.read(req)
.flatMap {
it.requireResult().map { result ->
val block =
Global.objectMapper
.readValue(result, BlockJson::class.java) as BlockJson<TransactionRefJson>?
if (block != null) {
FinalizationData(block.number, type)
if (!disableDetector.getOrDefault(type, false)) {
upstream
.getIngressReader()
.read(req)
.onErrorResume {
if (it.message != null && it.message!!.matches(Regex("(bad request|block not found|Unknown block|tag not supported on pre-merge network)"))) {
log.warn("Can't retrieve tagged block, finalization detector for upstream ${upstream.getId()} $chain tag $type disabled")
disableDetector[type] = true
} else {
throw RpcException(RpcResponseError.CODE_INVALID_JSON, "can't parse block data")
throw it
}
Mono.empty<ChainResponse>()
}
.flatMap {
it.requireResult().map { result ->
val block = Global.objectMapper
.readValue(
result,
BlockJson::class.java,
) as BlockJson<TransactionRefJson>?
if (block != null) {
FinalizationData(block.number, type)
} else {
throw RpcException(RpcResponseError.CODE_INVALID_JSON, "can't parse block data")
}
}
}
}
} else {
Flux.empty()
}
}.onErrorResume {
log.error("Error during retrieving$it")
log.error("Error in FinalizationDetector for upstream ${upstream.getId()} $chain$it")
Flux.empty()
}
}.filter {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.emeraldpay.dshackle.upstream.finalization

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.upstream.Upstream
import reactor.core.publisher.Flux
import java.time.Duration
Expand All @@ -8,6 +9,7 @@ interface FinalizationDetector {
fun detectFinalization(
upstream: Upstream,
blockTime: Duration,
chain: Chain,
): Flux<FinalizationData>

fun getFinalizations(): Collection<FinalizationData>
Expand All @@ -19,6 +21,7 @@ class NoopFinalizationDetector : FinalizationDetector {
override fun detectFinalization(
upstream: Upstream,
blockTime: Duration,
chain: Chain,
): Flux<FinalizationData> {
return Flux.empty()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ open class GenericUpstream(

private fun detectFinalization() {
finalizationDetectorSubscription =
finalizationDetector.detectFinalization(this, chainConfig.expectedBlockTime).subscribe {
finalizationDetector.detectFinalization(this, chainConfig.expectedBlockTime, chain).subscribe {
sendUpstreamStateEvent(UPDATED)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.upstream.ChainRequest
Expand Down Expand Up @@ -33,9 +34,13 @@ class EthereumFinalizationDetectorTest {
val upstream = mock<Upstream> {
on { getIngressReader() } doReturn reader
}

val chain = mock<Chain> {
on { toString() } doReturn "TestChain"
}
val detector = EthereumFinalizationDetector()

StepVerifier.withVirtualTime { detector.detectFinalization(upstream, Duration.ofMillis(200)) }
StepVerifier.withVirtualTime { detector.detectFinalization(upstream, Duration.ofMillis(200), chain) }
.expectSubscription()
.thenAwait(Duration.ofSeconds(0))
.expectNext(FinalizationData(1L, FinalizationType.SAFE_BLOCK))
Expand Down

0 comments on commit f1c669f

Please sign in to comment.