diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeRpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeRpcUpstream.kt index a2f001a49..03c574929 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeRpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeRpcUpstream.kt @@ -30,12 +30,17 @@ import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.calls.CallMethods +import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR +import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR +import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_VALID import io.emeraldpay.dshackle.upstream.ethereum.connectors.ConnectorFactory import io.emeraldpay.dshackle.upstream.ethereum.connectors.EthereumConnector import io.emeraldpay.dshackle.upstream.ethereum.subscribe.EthereumLabelsDetector import org.springframework.context.ApplicationEventPublisher import org.springframework.context.Lifecycle import reactor.core.Disposable +import reactor.core.publisher.Flux +import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean open class EthereumLikeRpcUpstream( @@ -58,6 +63,7 @@ open class EthereumLikeRpcUpstream( private var validatorSubscription: Disposable? = null private var livenessSubscription: Disposable? = null + private var validationSettingsSubscription: Disposable? = null override fun getCapabilities(): Set { return if (hasLiveSubscriptionHead.get()) { @@ -76,11 +82,53 @@ open class EthereumLikeRpcUpstream( override fun start() { log.info("Configured for ${chain.chainName}") connector.start() - if (!getOptions().disableUpstreamValidation && !validator.validateUpstreamSettings()) { - connector.stop() - log.warn("Upstream ${getId()} couldn't start, invalid upstream settings") - return + val validSettingsResult = validator.validateUpstreamSettingsOnStartup() + when (validSettingsResult) { + UPSTREAM_FATAL_SETTINGS_ERROR -> { + connector.stop() + log.warn("Upstream ${getId()} couldn't start, invalid upstream settings") + return + } + UPSTREAM_SETTINGS_ERROR -> { + validateUpstreamSettings() + } + else -> { + upstreamStart() + labelsDetector.detectLabels() + .toStream() + .forEach { updateLabels(it) } + } + } + } + + private fun validateUpstreamSettings() { + validationSettingsSubscription = Flux.interval( + Duration.ofSeconds(10), + Duration.ofSeconds(20), + ).flatMap { + validator.validateUpstreamSettings() + }.subscribe { + when (it) { + UPSTREAM_FATAL_SETTINGS_ERROR -> { + connector.stop() + log.warn("Upstream ${getId()} couldn't start, invalid upstream settings") + disposeValidationSettingsSubscription() + } + UPSTREAM_VALID -> { + upstreamStart() + labelsDetector.detectLabels() + .subscribe { label -> updateLabels(label) } + eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UpstreamChangeEvent.ChangeType.ADDED)) + disposeValidationSettingsSubscription() + } + else -> { + log.warn("Continue validation of upstream ${getId()}") + } + } } + } + + private fun upstreamStart() { if (getOptions().disableValidation) { log.warn("Disable validation for upstream ${this.getId()}") this.setLag(0) @@ -96,14 +144,13 @@ open class EthereumLikeRpcUpstream( }, { log.debug("Error while checking live subscription for ${getId()}", it) },) - labelsDetector.detectLabels() - .toStream() - .forEach { - log.info("Detected label ${it.first} with value ${it.second} for upstream ${getId()}") - node?.labels?.let { labels -> - labels[it.first] = it.second - } - } + } + + private fun updateLabels(label: Pair) { + log.info("Detected label ${label.first} with value ${label.second} for upstream ${getId()}") + node?.labels?.let { labels -> + labels[label.first] = label.second + } } override fun getIngressSubscription(): EthereumIngressSubscription { @@ -128,11 +175,12 @@ open class EthereumLikeRpcUpstream( validatorSubscription = null livenessSubscription?.dispose() livenessSubscription = null + disposeValidationSettingsSubscription() connector.stop() } override fun isRunning(): Boolean { - return connector.isRunning() + return connector.isRunning() && validationSettingsSubscription == null } override fun getIngressReader(): JsonRpcReader { @@ -150,4 +198,9 @@ open class EthereumLikeRpcUpstream( } return this as T } + + private fun disposeValidationSettingsSubscription() { + validationSettingsSubscription?.dispose() + validationSettingsSubscription = null + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt index c4feea0be..b41f9322d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt @@ -23,6 +23,9 @@ import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.foundation.ChainOptions import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability +import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR +import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR +import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_VALID import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse import io.emeraldpay.etherjar.domain.Address @@ -137,19 +140,26 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( } } - fun validateUpstreamSettings(): Boolean { + fun validateUpstreamSettingsOnStartup(): ValidateUpstreamSettingsResult { + return validateUpstreamSettings().block() ?: UPSTREAM_FATAL_SETTINGS_ERROR + } + + fun validateUpstreamSettings(): Mono { + if (options.disableUpstreamValidation) { + return Mono.just(UPSTREAM_VALID) + } return Mono.zip( validateChain(), validateCallLimit(), validateOldBlocks(), ).map { - it.t1 && it.t2 && it.t3 - }.block() ?: false + listOf(it.t1, it.t2, it.t3).sorted().last() + } } - private fun validateChain(): Mono { + private fun validateChain(): Mono { if (!options.validateChain) { - return Mono.just(true) + return Mono.just(UPSTREAM_VALID) } return Mono.zip( chainId(), @@ -167,17 +177,21 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( ) } - isChainValid + if (isChainValid) { + UPSTREAM_VALID + } else { + UPSTREAM_FATAL_SETTINGS_ERROR + } } .onErrorResume { log.error("Error during chain validation", it) - Mono.just(false) + Mono.just(UPSTREAM_SETTINGS_ERROR) } } - private fun validateCallLimit(): Mono { + private fun validateCallLimit(): Mono { if (!options.validateCallLimit || callLimitContract == null) { - return Mono.just(true) + return Mono.just(UPSTREAM_VALID) } return upstream.getIngressReader() .read( @@ -195,7 +209,7 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( ), ) .flatMap(JsonRpcResponse::requireResult) - .map { true } + .map { UPSTREAM_VALID } .onErrorResume { if (it.message != null && it.message!!.contains("rpc.returndata.limit")) { log.warn( @@ -203,7 +217,7 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( "You need to set up your return limit to at least 1_100_000. " + "Erigon config example: https://github.com/ledgerwatch/erigon/blob/d014da4dc039ea97caf04ed29feb2af92b7b129d/cmd/utils/flags.go#L369", ) - Mono.just(false) + Mono.just(UPSTREAM_FATAL_SETTINGS_ERROR) } else { Mono.error(it) } @@ -219,10 +233,10 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( "message ${ctx.exception().message}", ) } - .onErrorReturn(false) + .onErrorReturn(UPSTREAM_SETTINGS_ERROR) } - private fun validateOldBlocks(): Mono { + private fun validateOldBlocks(): Mono { return EthereumArchiveBlockNumberReader(upstream.getIngressReader()) .readArchiveBlock() .flatMap { @@ -243,11 +257,11 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( "Node ${upstream.getId()} probably is synced incorrectly, it is not possible to get old blocks", ) } - true + UPSTREAM_VALID } .onErrorResume { log.warn("Error during old blocks validation", it) - Mono.just(true) + Mono.just(UPSTREAM_VALID) } } @@ -276,4 +290,10 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( .doOnError { log.error("Error during execution 'net_version' - ${it.message} for ${upstream.getId()}") } .flatMap(JsonRpcResponse::requireStringResult) } + + enum class ValidateUpstreamSettingsResult { + UPSTREAM_VALID, + UPSTREAM_SETTINGS_ERROR, + UPSTREAM_FATAL_SETTINGS_ERROR, + } } diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidatorSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidatorSpec.groovy index 362e8ae6f..aa847c096 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidatorSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidatorSpec.groovy @@ -37,6 +37,9 @@ import java.time.Duration import static io.emeraldpay.dshackle.Chain.ETHEREUM__MAINNET import static io.emeraldpay.dshackle.Chain.OPTIMISM__MAINNET import static io.emeraldpay.dshackle.upstream.UpstreamAvailability.* +import static io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR +import static io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR +import static io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_VALID import static java.util.Collections.emptyList class EthereumUpstreamValidatorSpec extends Specification { @@ -45,18 +48,18 @@ class EthereumUpstreamValidatorSpec extends Specification { setup: def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, Stub(EthereumLikeUpstream), ChainOptions.PartialOptions.getDefaults().buildOptions()) expect: - validator.resolve(Tuples.of(sync, peers, call)) == exp + validator.resolve(Tuples.of(sync, peers)) == exp where: - exp | sync | peers | call - OK | OK | OK | OK - IMMATURE | OK | IMMATURE | OK - UNAVAILABLE | OK | UNAVAILABLE | OK - SYNCING | SYNCING | OK | OK - SYNCING | SYNCING | IMMATURE | OK - UNAVAILABLE | SYNCING | UNAVAILABLE | OK - UNAVAILABLE | UNAVAILABLE | OK | OK - UNAVAILABLE | UNAVAILABLE | IMMATURE | OK - UNAVAILABLE | UNAVAILABLE | UNAVAILABLE | OK + exp | sync | peers + OK | OK | OK + IMMATURE | OK | IMMATURE + UNAVAILABLE | OK | UNAVAILABLE + SYNCING | SYNCING | OK + SYNCING | SYNCING | IMMATURE + UNAVAILABLE | SYNCING | UNAVAILABLE + UNAVAILABLE | UNAVAILABLE | OK + UNAVAILABLE | UNAVAILABLE | IMMATURE + UNAVAILABLE | UNAVAILABLE | UNAVAILABLE } def "Doesnt check eth_syncing when disabled"() { @@ -280,9 +283,9 @@ class EthereumUpstreamValidatorSpec extends Specification { def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options) when: - def act = validator.validateUpstreamSettings() + def act = validator.validateUpstreamSettingsOnStartup() then: - act + act == UPSTREAM_VALID } def "Upstream is valid if not error from call limit check"() { @@ -304,9 +307,9 @@ class EthereumUpstreamValidatorSpec extends Specification { def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, "0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96") when: - def act = validator.validateUpstreamSettings() + def act = validator.validateUpstreamSettingsOnStartup() then: - act + act == UPSTREAM_VALID } def "Upstream is not valid if error returned on call limit check"() { @@ -328,9 +331,9 @@ class EthereumUpstreamValidatorSpec extends Specification { def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, "0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96") when: - def act = validator.validateUpstreamSettings() + def act = validator.validateUpstreamSettingsOnStartup() then: - !act + act == UPSTREAM_SETTINGS_ERROR } def "Upstream is valid if chain settings are valid"() { @@ -350,9 +353,9 @@ class EthereumUpstreamValidatorSpec extends Specification { def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, "0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96") when: - def act = validator.validateUpstreamSettings() + def act = validator.validateUpstreamSettingsOnStartup() then: - act + act == UPSTREAM_VALID } def "Upstream is not valid - specified optimism but got ethereum"() { @@ -372,9 +375,9 @@ class EthereumUpstreamValidatorSpec extends Specification { def validator = new EthereumUpstreamValidator(OPTIMISM__MAINNET, up, options, "0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96") when: - def act = validator.validateUpstreamSettings() + def act = validator.validateUpstreamSettingsOnStartup() then: - !act + act == UPSTREAM_FATAL_SETTINGS_ERROR } def "Upstream is valid if all setting are valid"() { @@ -396,9 +399,9 @@ class EthereumUpstreamValidatorSpec extends Specification { def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, "0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96") when: - def act = validator.validateUpstreamSettings() + def act = validator.validateUpstreamSettingsOnStartup() then: - act + act == UPSTREAM_VALID } def "Upstream is not valid if there are errors"() { @@ -420,9 +423,9 @@ class EthereumUpstreamValidatorSpec extends Specification { def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, "0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96") when: - def act = validator.validateUpstreamSettings() + def act = validator.validateUpstreamSettingsOnStartup() then: - !act + act == UPSTREAM_SETTINGS_ERROR }