Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check chain settings if the next height is very different #548

Merged
merged 2 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion foundation/src/main/resources/public
Submodule public updated 1 files
+1 −1 chains.yaml
25 changes: 24 additions & 1 deletion src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.micrometer.core.instrument.Metrics
import org.slf4j.LoggerFactory
import reactor.core.Disposable
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.SignalType
import reactor.core.publisher.Sinks
import reactor.core.publisher.Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER
Expand Down Expand Up @@ -95,6 +96,20 @@ abstract class AbstractHead @JvmOverloads constructor(
log.warn("Received signal $upstreamId $it, continue emit heads")
}
}
.flatMap {
if (isSuspiciousBlock(it)) {
log.warn(
"Got a suspicious head of upstream {} with a height {} " +
"that is very different from the previous head with a height {}, validating this upstream",
upstreamId,
it.height,
getCurrentHeight(),
)
checkSuspiciousBlock(it)
} else {
Mono.just(it)
}
}
.subscribeOn(headScheduler)
.subscribe { block ->
val valid = runCatching {
Expand Down Expand Up @@ -165,7 +180,7 @@ abstract class AbstractHead @JvmOverloads constructor(
metrics.forEach { Metrics.globalRegistry.remove(it) }
}

open fun onNoHeadUpdates() {
protected open fun onNoHeadUpdates() {
// NOOP
}

Expand Down Expand Up @@ -196,4 +211,12 @@ abstract class AbstractHead @JvmOverloads constructor(
)
}
}

protected open fun isSuspiciousBlock(block: BlockContainer): Boolean {
return false
}

protected open fun checkSuspiciousBlock(block: BlockContainer): Mono<BlockContainer> {
return Mono.just(block)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ object EthereumChainSpecific : AbstractPollChainSpecific() {
override fun chainSettingsValidator(
chain: Chain,
upstream: Upstream,
reader: ChainReader,
reader: ChainReader?,
): SingleValidator<ValidateUpstreamSettingsResult>? {
if (upstream.getOptions().disableUpstreamValidation) {
return null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.upstream.BlockValidator
import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.Lifecycle
import io.emeraldpay.dshackle.upstream.SingleValidator
import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult
import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR
import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR
import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult.UPSTREAM_VALID
import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice
Expand Down Expand Up @@ -69,7 +72,6 @@ class GenericWsHead(
private var subscription: Disposable? = null
private var headResubSubscription: Disposable? = null
private val noHeadUpdatesSink = Sinks.many().multicast().directBestEffort<Boolean>()
private val headLivenessSink = Sinks.many().multicast().directBestEffort<HeadLivenessState>()

private var subscriptionId = AtomicReference("")

Expand Down Expand Up @@ -130,7 +132,7 @@ class GenericWsHead(
subscribed = false
Mono.empty()
}
else -> {
UPSTREAM_FATAL_SETTINGS_ERROR -> {
log.error("Chain settings check hasn't been passed via ws connection, upstream {} will be removed", upstreamId)
headLivenessSink.emitNext(HeadLivenessState.FATAL_ERROR) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
Mono.empty()
Expand All @@ -146,7 +148,9 @@ class GenericWsHead(
headResubSubscription = null
}

override fun headLiveness(): Flux<HeadLivenessState> = headLivenessSink.asFlux()
override fun chainIdValidator(): SingleValidator<ValidateUpstreamSettingsResult>? {
return chainIdValidator
}

private fun unsubscribe(): Mono<BlockContainer> {
subscribed = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ abstract class AbstractChainSpecific : ChainSpecific {
override fun chainSettingsValidator(
chain: Chain,
upstream: Upstream,
reader: ChainReader,
reader: ChainReader?,
): SingleValidator<ValidateUpstreamSettingsResult>? {
return null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ interface ChainSpecific {

fun upstreamSettingsDetector(chain: Chain, upstream: Upstream): UpstreamSettingsDetector?

fun chainSettingsValidator(chain: Chain, upstream: Upstream, reader: ChainReader): SingleValidator<ValidateUpstreamSettingsResult>?
fun chainSettingsValidator(chain: Chain, upstream: Upstream, reader: ChainReader?): SingleValidator<ValidateUpstreamSettingsResult>?

fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,16 @@ import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.upstream.AbstractHead
import io.emeraldpay.dshackle.upstream.BlockValidator
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.SingleValidator
import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult
import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessState
import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.Sinks
import reactor.core.scheduler.Scheduler
import reactor.kotlin.core.publisher.switchIfEmpty
import kotlin.math.abs

open class GenericHead(
protected val upstreamId: String,
Expand All @@ -32,6 +39,7 @@ open class GenericHead(
private val headScheduler: Scheduler,
private val chainSpecific: ChainSpecific,
) : Head, AbstractHead(forkChoice, headScheduler, blockValidator, 60_000, upstreamId) {
protected val headLivenessSink: Sinks.Many<HeadLivenessState> = Sinks.many().multicast().directBestEffort()

fun getLatestBlock(api: ChainReader): Mono<BlockContainer> {
return chainSpecific.getLatestBlock(api, upstreamId)
Expand All @@ -42,4 +50,44 @@ open class GenericHead(
Mono.empty()
}
}

override fun isSuspiciousBlock(block: BlockContainer): Boolean {
return getCurrentHeight()
?.let {
abs(block.height - it) > 10000
} ?: false
}

override fun checkSuspiciousBlock(block: BlockContainer): Mono<BlockContainer> {
return Mono.justOrEmpty(chainIdValidator())
.flatMap {
it!!.validate(ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR)
}
.switchIfEmpty {
Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_VALID)
}
.flatMap { validationResult ->
when (validationResult) {
ValidateUpstreamSettingsResult.UPSTREAM_VALID -> {
log.info("Block {} of upstream {} has been received from the same chain, validation is passed", block.height, upstreamId)
Mono.just(block)
}
ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR -> {
log.warn("Block {} of upstream {} is filtered and can not be emitted due to upstream settings error check", block.height, upstreamId)
Mono.empty()
}
ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR -> {
log.error("Block {} of upstream {} can not be emitted due to chain inconsistency, upstream will be removed", block.height, upstreamId)
headLivenessSink.emitNext(HeadLivenessState.FATAL_ERROR) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
Mono.empty()
}
}
}
}

override fun headLiveness(): Flux<HeadLivenessState> = headLivenessSink.asFlux()

protected open fun chainIdValidator(): SingleValidator<ValidateUpstreamSettingsResult>? {
return null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package io.emeraldpay.dshackle.upstream.generic

import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.upstream.BlockValidator
import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.Lifecycle
import io.emeraldpay.dshackle.upstream.SingleValidator
import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult
import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice
import reactor.core.Disposable
import reactor.core.publisher.Flux
Expand All @@ -30,10 +33,12 @@ class GenericRpcHead(
forkChoice: ForkChoice,
upstreamId: String,
blockValidator: BlockValidator,
upstream: DefaultUpstream,
private val headScheduler: Scheduler,
private val chainSpecific: ChainSpecific,
private val interval: Duration = Duration.ofSeconds(10),
) : GenericHead(upstreamId, forkChoice, blockValidator, headScheduler, chainSpecific), Lifecycle {
private val chainIdValidator = chainSpecific.chainSettingsValidator(upstream.getChain(), upstream, null)

private var refreshSubscription: Disposable? = null
private var isSyncing = false
Expand Down Expand Up @@ -63,4 +68,8 @@ class GenericRpcHead(
refreshSubscription?.dispose()
refreshSubscription = null
}

override fun chainIdValidator(): SingleValidator<ValidateUpstreamSettingsResult>? {
return chainIdValidator
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class GenericRpcConnector(
forkChoice,
id,
blockValidator,
upstream,
headScheduler,
chainSpecific,
expectedBlockTime.coerceAtLeast(Duration.ofSeconds(1)),
Expand Down Expand Up @@ -112,6 +113,7 @@ class GenericRpcConnector(
AlwaysForkChoice(),
id,
blockValidator,
upstream,
headScheduler,
chainSpecific,
Duration.ofSeconds(30),
Expand Down
Loading
Loading