From 5fe044bba674e00caaa01417b9f758b8e5a3e977 Mon Sep 17 00:00:00 2001 From: Anton Date: Wed, 11 Sep 2024 15:58:47 +0300 Subject: [PATCH] Autodetect methods (#562) * Autodetect methods * add polkadot magic method detection * fix lint * change config.methods writing * fix log * change Flux to Mono.zip * change block to subscribe * split chain method detectors * eth method detector use rpc_modules * rm UpstreamRpcModulesDetector.kt * add tests * getAllMethods for eth * prefer enable/disable methods from config * update tests * whiter list & full is not available error check * prefer config method group * mv notAvailableRegexps to class variable --- .../configure/GenericUpstreamCreator.kt | 2 +- .../upstream/UpstreamRpcMethodsDetector.kt | 58 +++++++ .../upstream/UpstreamRpcModulesDetector.kt | 41 ----- .../upstream/calls/DefaultEthereumMethods.kt | 8 + .../BasicEthUpstreamRpcMethodsDetector.kt | 52 +++++++ .../ethereum/EthereumChainSpecific.kt | 11 +- .../upstream/generic/AbstractChainSpecific.kt | 10 +- .../upstream/generic/ChainSpecific.kt | 8 +- .../upstream/generic/GenericUpstream.kt | 118 ++++++++------ ...BasicPolkadotUpstreamRpcMethodsDetector.kt | 35 +++++ .../polkadot/PolkadotChainSpecific.kt | 8 +- .../BasicEthUpstreamRpcMethodsDetectorTest.kt | 145 ++++++++++++++++++ ...cPolkadotUpstreamRpcMethodsDetectorTest.kt | 69 +++++++++ 13 files changed, 468 insertions(+), 97 deletions(-) create mode 100644 src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamRpcMethodsDetector.kt delete mode 100644 src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamRpcModulesDetector.kt create mode 100644 src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/BasicEthUpstreamRpcMethodsDetector.kt create mode 100644 src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/BasicPolkadotUpstreamRpcMethodsDetector.kt create mode 100644 src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/BasicEthUpstreamRpcMethodsDetectorTest.kt create mode 100644 src/test/kotlin/io/emeraldpay/dshackle/upstream/polkadot/BasicPolkadotUpstreamRpcMethodsDetectorTest.kt diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt index 06c62ff50..82c9496fb 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt @@ -84,7 +84,7 @@ open class GenericUpstreamCreator( connectorFactory, cs::validator, cs::upstreamSettingsDetector, - cs::upstreamRpcModulesDetector, + cs::upstreamRpcMethodsDetector, buildMethodsFun, cs::lowerBoundService, cs::finalizationDetectorBuilder, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamRpcMethodsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamRpcMethodsDetector.kt new file mode 100644 index 000000000..fbde8ad2f --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamRpcMethodsDetector.kt @@ -0,0 +1,58 @@ +package io.emeraldpay.dshackle.upstream + +import io.emeraldpay.dshackle.config.UpstreamsConfig +import io.emeraldpay.dshackle.upstream.rpcclient.CallParams +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import reactor.core.publisher.Mono + +typealias UpstreamRpcMethodsDetectorBuilder = (Upstream, UpstreamsConfig.Upstream<*>?) -> UpstreamRpcMethodsDetector? + +abstract class UpstreamRpcMethodsDetector( + private val upstream: Upstream, + private val config: UpstreamsConfig.Upstream<*>? = null, +) { + protected val log: Logger = LoggerFactory.getLogger(this::class.java) + + private val notAvailableRegexps = + listOf( + "method ([A-Za-z0-9_]+) does not exist/is not available", + "([A-Za-z0-9_]+) found but the containing module is disabled", + "Method not found", + "The method ([A-Za-z0-9_]+) is not available", + ).map { s -> s.toRegex() } + + open fun detectRpcMethods(): Mono> = detectByMagicMethod().switchIfEmpty(detectByMethod()) + + protected fun detectByMethod(): Mono> = + Mono.zip( + rpcMethods().map { + Mono + .just(it) + .flatMap { (method, param) -> + upstream + .getIngressReader() + .read(ChainRequest(method, param)) + .flatMap(ChainResponse::requireResult) + .map { method to true } + .onErrorResume { err -> + val notAvailableError = + notAvailableRegexps.any { s -> s.containsMatchIn(err.message ?: "") } + if (notAvailableError) { + Mono.just(method to false) + } else { + Mono.empty() + } + } + } + }, + ) { + it + .map { p -> p as Pair } + .associate { (method, enabled) -> method to enabled } + } + + protected abstract fun detectByMagicMethod(): Mono> + + protected abstract fun rpcMethods(): Set> +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamRpcModulesDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamRpcModulesDetector.kt deleted file mode 100644 index 447468d99..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamRpcModulesDetector.kt +++ /dev/null @@ -1,41 +0,0 @@ -package io.emeraldpay.dshackle.upstream - -import com.fasterxml.jackson.core.type.TypeReference -import io.emeraldpay.dshackle.Global -import io.emeraldpay.dshackle.upstream.rpcclient.ListParams -import org.slf4j.Logger -import org.slf4j.LoggerFactory -import reactor.core.publisher.Mono - -typealias UpstreamRpcModulesDetectorBuilder = (Upstream) -> UpstreamRpcModulesDetector? - -abstract class UpstreamRpcModulesDetector( - private val upstream: Upstream, -) { - protected val log: Logger = LoggerFactory.getLogger(this::class.java) - - open fun detectRpcModules(): Mono> { - return upstream.getIngressReader() - .read(rpcModulesRequest()) - .flatMap(ChainResponse::requireResult) - .map(::parseRpcModules) - .onErrorResume { - log.warn("Can't detect rpc_modules of upstream ${upstream.getId()}, reason - {}", it.message) - Mono.just(HashMap()) - } - } - - protected abstract fun rpcModulesRequest(): ChainRequest - - protected abstract fun parseRpcModules(data: ByteArray): HashMap -} - -class BasicEthUpstreamRpcModulesDetector( - upstream: Upstream, -) : UpstreamRpcModulesDetector(upstream) { - override fun rpcModulesRequest(): ChainRequest = ChainRequest("rpc_modules", ListParams()) - - override fun parseRpcModules(data: ByteArray): HashMap { - return Global.objectMapper.readValue(data, object : TypeReference>() {}) - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DefaultEthereumMethods.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DefaultEthereumMethods.kt index befbf5476..ba0656f50 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DefaultEthereumMethods.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DefaultEthereumMethods.kt @@ -809,4 +809,12 @@ class DefaultEthereumMethods( override fun getSupportedMethods(): Set { return allowedMethods.plus(hardcodedMethods).toSortedSet() } + + fun getAllMethods(): Set = + getSupportedMethods() + .plus(getGroupMethods("filter")) + .plus(getGroupMethods("trace")) + .plus(getGroupMethods("debug")) + .plus(getChainSpecificMethods(chain)) + .toSet() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/BasicEthUpstreamRpcMethodsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/BasicEthUpstreamRpcMethodsDetector.kt new file mode 100644 index 000000000..a2b03e4af --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/BasicEthUpstreamRpcMethodsDetector.kt @@ -0,0 +1,52 @@ +package io.emeraldpay.dshackle.upstream.ethereum + +import com.fasterxml.jackson.core.type.TypeReference +import io.emeraldpay.dshackle.Global +import io.emeraldpay.dshackle.config.UpstreamsConfig +import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.ChainResponse +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.UpstreamRpcMethodsDetector +import io.emeraldpay.dshackle.upstream.calls.DefaultEthereumMethods +import io.emeraldpay.dshackle.upstream.rpcclient.CallParams +import io.emeraldpay.dshackle.upstream.rpcclient.ListParams +import reactor.core.publisher.Mono + +class BasicEthUpstreamRpcMethodsDetector( + private val upstream: Upstream, + private val config: UpstreamsConfig.Upstream<*>, +) : UpstreamRpcMethodsDetector(upstream) { + override fun detectByMagicMethod(): Mono> = + upstream + .getIngressReader() + .read(ChainRequest("rpc_modules", ListParams())) + .flatMap(ChainResponse::requireResult) + .map(::parseRpcModules) + // force check all methods from rpcMethods + .zipWith(detectByMethod()) { a, b -> + a.plus(b) + }.onErrorResume { + log.warn("Can't detect rpc_modules of upstream ${upstream.getId()}, reason - {}", it.message) + Mono.empty() + } + + override fun rpcMethods(): Set> = + setOf( + "eth_getBlockReceipts" to ListParams("latest"), + ) + + private fun parseRpcModules(data: ByteArray): Map { + val modules = Global.objectMapper.readValue(data, object : TypeReference>() {}) + return DefaultEthereumMethods(upstream.getChain()) + .getAllMethods() + .associateWith { method -> + if (config.methodGroups?.enabled?.any { group -> method.startsWith(group) } == true) { + return@associateWith true + } + if (config.methodGroups?.disabled?.any { group -> method.startsWith(group) } == true) { + return@associateWith false + } + modules.any { (module, _) -> method.startsWith(module) } + } + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt index a4a134d47..fd547c812 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt @@ -4,10 +4,10 @@ import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.cache.Caches import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig +import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.foundation.ChainOptions.Options import io.emeraldpay.dshackle.reader.ChainReader -import io.emeraldpay.dshackle.upstream.BasicEthUpstreamRpcModulesDetector import io.emeraldpay.dshackle.upstream.CachingReader import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.EgressSubscription @@ -19,7 +19,7 @@ import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.SingleValidator import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability -import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector +import io.emeraldpay.dshackle.upstream.UpstreamRpcMethodsDetector import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult import io.emeraldpay.dshackle.upstream.calls.CallMethods @@ -180,9 +180,10 @@ object EthereumChainSpecific : AbstractPollChainSpecific() { return ChainIdValidator(upstream, chain, reader) } - override fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector { - return BasicEthUpstreamRpcModulesDetector(upstream) - } + override fun upstreamRpcMethodsDetector( + upstream: Upstream, + config: UpstreamsConfig.Upstream<*>?, + ): UpstreamRpcMethodsDetector? = config?.let { BasicEthUpstreamRpcMethodsDetector(upstream, it) } override fun lowerBoundService(chain: Chain, upstream: Upstream): LowerBoundService { return EthereumLowerBoundService(chain, upstream) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt index 315ade7fd..ee5e922ed 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.upstream.generic import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.cache.Caches import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig +import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.config.hot.CompatibleVersionsRules import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.foundation.ChainOptions.Options @@ -20,7 +21,7 @@ import io.emeraldpay.dshackle.upstream.NoopCachingReader import io.emeraldpay.dshackle.upstream.SingleValidator import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability -import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector +import io.emeraldpay.dshackle.upstream.UpstreamRpcMethodsDetector import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult @@ -68,9 +69,10 @@ abstract class AbstractChainSpecific : ChainSpecific { return null } - override fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector? { - return null - } + override fun upstreamRpcMethodsDetector( + upstream: Upstream, + config: UpstreamsConfig.Upstream<*>?, + ): UpstreamRpcMethodsDetector? = null override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription { return NoIngressSubscription() diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt index db1844756..2b5c86ed1 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt @@ -12,6 +12,7 @@ import io.emeraldpay.dshackle.BlockchainType.UNKNOWN import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.cache.Caches import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig +import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.config.hot.CompatibleVersionsRules import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.foundation.ChainOptions @@ -25,7 +26,7 @@ import io.emeraldpay.dshackle.upstream.LogsOracle import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.SingleValidator import io.emeraldpay.dshackle.upstream.Upstream -import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector +import io.emeraldpay.dshackle.upstream.UpstreamRpcMethodsDetector import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult @@ -86,7 +87,10 @@ interface ChainSpecific { fun chainSettingsValidator(chain: Chain, upstream: Upstream, reader: ChainReader?): SingleValidator? - fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector? + fun upstreamRpcMethodsDetector( + upstream: Upstream, + config: UpstreamsConfig.Upstream<*>?, + ): UpstreamRpcMethodsDetector? fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index 29f4bf0a0..0969b1924 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -1,7 +1,6 @@ package io.emeraldpay.dshackle.upstream.generic import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.config.ChainsConfig import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.config.UpstreamsConfig.Labels @@ -18,8 +17,8 @@ import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.UNKNOWN_CLIENT_VERSION import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability -import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector -import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetectorBuilder +import io.emeraldpay.dshackle.upstream.UpstreamRpcMethodsDetector +import io.emeraldpay.dshackle.upstream.UpstreamRpcMethodsDetectorBuilder import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetectorBuilder import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.UpstreamValidatorBuilder @@ -62,8 +61,8 @@ open class GenericUpstream( lowerBoundServiceBuilder: LowerBoundServiceBuilder, finalizationDetectorBuilder: FinalizationDetectorBuilder, versionRules: Supplier, -) : DefaultUpstream(id, hash, null, UpstreamAvailability.OK, options, role, targets, node, chainConfig, chain), Lifecycle { - +) : DefaultUpstream(id, hash, null, UpstreamAvailability.OK, options, role, targets, node, chainConfig, chain), + Lifecycle { constructor( config: UpstreamsConfig.Upstream<*>, chain: Chain, @@ -74,14 +73,29 @@ open class GenericUpstream( connectorFactory: ConnectorFactory, validatorBuilder: UpstreamValidatorBuilder, upstreamSettingsDetectorBuilder: UpstreamSettingsDetectorBuilder, - upstreamRpcModulesDetectorBuilder: UpstreamRpcModulesDetectorBuilder, + upstreamRpcMethodsDetectorBuilder: UpstreamRpcMethodsDetectorBuilder, buildMethods: (UpstreamsConfig.Upstream<*>, Chain) -> CallMethods, lowerBoundServiceBuilder: LowerBoundServiceBuilder, finalizationDetectorBuilder: FinalizationDetectorBuilder, versionRules: Supplier, - ) : this(config.id!!, chain, hash, options, config.role, buildMethods(config, chain), node, chainConfig, connectorFactory, validatorBuilder, upstreamSettingsDetectorBuilder, lowerBoundServiceBuilder, finalizationDetectorBuilder, versionRules) { - rpcModulesDetector = upstreamRpcModulesDetectorBuilder(this) - detectRpcModules(config, buildMethods) + ) : this( + config.id!!, + chain, + hash, + options, + config.role, + buildMethods(config, chain), + node, + chainConfig, + connectorFactory, + validatorBuilder, + upstreamSettingsDetectorBuilder, + lowerBoundServiceBuilder, + finalizationDetectorBuilder, + versionRules, + ) { + rpcMethodsDetector = upstreamRpcMethodsDetectorBuilder(this, config) + detectRpcMethods(config, buildMethods) } private val validator: UpstreamValidator? = validatorBuilder(chain, this, getOptions(), chainConfig, versionRules) @@ -93,7 +107,7 @@ open class GenericUpstream( protected val connector: GenericConnector = connectorFactory.create(this, chain) private var livenessSubscription: Disposable? = null private val settingsDetector = upstreamSettingsDetectorBuilder(chain, this) - private var rpcModulesDetector: UpstreamRpcModulesDetector? = null + private var rpcMethodsDetector: UpstreamRpcMethodsDetector? = null private val lowerBoundService = lowerBoundServiceBuilder(chain, this) @@ -168,10 +182,12 @@ open class GenericUpstream( connector.stop() return } + UPSTREAM_SETTINGS_ERROR -> { log.warn("Non fatal upstream settings error, continue validation...") connector.getHead().stop() } + UPSTREAM_VALID -> { isUpstreamValid.set(true) upstreamStart() @@ -247,31 +263,44 @@ open class GenericUpstream( }.subscribe() } - private fun detectRpcModules(config: UpstreamsConfig.Upstream<*>, buildMethods: (UpstreamsConfig.Upstream<*>, Chain) -> CallMethods) { + private fun detectRpcMethods( + config: UpstreamsConfig.Upstream<*>, + buildMethods: (UpstreamsConfig.Upstream<*>, Chain) -> CallMethods, + ) { try { - val rpcDetector = rpcModulesDetector?.detectRpcModules()?.block(Defaults.internalCallsTimeout) - ?: HashMap() - log.info("Upstream rpc detector for ${getId()} returned $rpcDetector ") - if (rpcDetector.size != 0) { - var changed = false - for ((group, _) in rpcDetector) { - if (group == "trace" || group == "debug" || group == "filter") { - if (config.methodGroups == null) { - config.methodGroups = UpstreamsConfig.MethodGroups(setOf("filter"), setOf()) - } else { - val disabled = config.methodGroups!!.disabled - val enabled = config.methodGroups!!.enabled - if (!disabled.contains(group) && !enabled.contains(group)) { - config.methodGroups!!.enabled = enabled.plus(group) - changed = true - } - } - } + rpcMethodsDetector?.detectRpcMethods()?.subscribe { rpcDetector -> + log.info("Upstream rpc method detector for ${getId()} returned $rpcDetector ") + if (rpcDetector.isEmpty()) { + return@subscribe + } + if (config.methods == null) { + config.methods = UpstreamsConfig.Methods(mutableSetOf(), mutableSetOf()) } - if (changed) updateMethods(buildMethods(config, getChain())) + val enableMethods = + rpcDetector + .filter { (_, enabled) -> enabled } + .keys + .map { UpstreamsConfig.Method(it) } + .toSet() + val disableMethods = + rpcDetector + .filter { (_, enabled) -> !enabled } + .keys + .map { UpstreamsConfig.Method(it) } + .toSet() + config.methods = + UpstreamsConfig.Methods( + enableMethods + .minus(disableMethods) + .plus(config.methods!!.enabled), + disableMethods + .minus(enableMethods) + .plus(config.methods!!.disabled), + ) + updateMethods(buildMethods(config, getChain())) } - } catch (e: RuntimeException) { - log.error("Couldn't detect rpc modules of upstream {} due to error {}", getId(), e.message) + } catch (e: Exception) { + log.error("Couldn't detect methods of upstream ${getId()} due to error {}", e.message) } } @@ -285,17 +314,20 @@ open class GenericUpstream( validatorSubscription = validator?.start() ?.subscribe(this::setStatus) } - livenessSubscription = connector.headLivenessEvents().subscribe({ - val hasSub = it == HeadLivenessState.OK - hasLiveSubscriptionHead.set(hasSub) - if (it == HeadLivenessState.FATAL_ERROR) { - headLivenessState.emitNext(UPSTREAM_FATAL_SETTINGS_ERROR) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } - } else { - sendUpstreamStateEvent(UPDATED) - } - }, { - log.debug("Error while checking live subscription for ${getId()}", it) - },) + livenessSubscription = connector.headLivenessEvents().subscribe( + { + val hasSub = it == HeadLivenessState.OK + hasLiveSubscriptionHead.set(hasSub) + if (it == HeadLivenessState.FATAL_ERROR) { + headLivenessState.emitNext(UPSTREAM_FATAL_SETTINGS_ERROR) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } + } else { + sendUpstreamStateEvent(UPDATED) + } + }, + { + log.debug("Error while checking live subscription for ${getId()}", it) + }, + ) detectSettings() detectLowerBlock() diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/BasicPolkadotUpstreamRpcMethodsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/BasicPolkadotUpstreamRpcMethodsDetector.kt new file mode 100644 index 000000000..6d0dfcf20 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/BasicPolkadotUpstreamRpcMethodsDetector.kt @@ -0,0 +1,35 @@ +package io.emeraldpay.dshackle.upstream.polkadot + +import com.fasterxml.jackson.core.type.TypeReference +import io.emeraldpay.dshackle.Global +import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.ChainResponse +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.UpstreamRpcMethodsDetector +import io.emeraldpay.dshackle.upstream.rpcclient.CallParams +import io.emeraldpay.dshackle.upstream.rpcclient.ListParams +import reactor.core.publisher.Mono + +class BasicPolkadotUpstreamRpcMethodsDetector( + private val upstream: Upstream, +) : UpstreamRpcMethodsDetector(upstream) { + override fun detectByMagicMethod(): Mono> = + upstream + .getIngressReader() + .read(ChainRequest("rpc_methods", ListParams())) + .flatMap(ChainResponse::requireResult) + .map { + Global.objectMapper + .readValue(it, object : TypeReference>>() {}) + .getOrDefault("methods", emptyList()) + .associateWith { true } + }.onErrorResume { + log.warn( + "Can't detect rpc method rpc_methods of upstream ${upstream.getId()}, reason - {}", + it.message, + ) + Mono.empty() + } + + override fun rpcMethods(): Set> = emptySet() +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt index 25fdfd65b..6325ae181 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt @@ -5,6 +5,7 @@ import com.fasterxml.jackson.annotation.JsonProperty import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig +import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.data.BlockId import io.emeraldpay.dshackle.foundation.ChainOptions.Options @@ -20,6 +21,7 @@ import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.SingleValidator import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability +import io.emeraldpay.dshackle.upstream.UpstreamRpcMethodsDetector import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.calls.DefaultPolkadotMethods @@ -37,7 +39,6 @@ import java.math.BigInteger import java.time.Instant object PolkadotChainSpecific : AbstractPollChainSpecific() { - private val log = LoggerFactory.getLogger(PolkadotChainSpecific::class.java) override fun parseBlock(data: ByteArray, upstreamId: String, api: ChainReader): Mono { val response = Global.objectMapper.readValue(data, PolkadotBlockResponse::class.java) @@ -150,6 +151,11 @@ object PolkadotChainSpecific : AbstractPollChainSpecific() { override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription { return GenericIngressSubscription(ws, DefaultPolkadotMethods.subs.map { it.first }) } + + override fun upstreamRpcMethodsDetector( + upstream: Upstream, + config: UpstreamsConfig.Upstream<*>?, + ): UpstreamRpcMethodsDetector = BasicPolkadotUpstreamRpcMethodsDetector(upstream) } @JsonIgnoreProperties(ignoreUnknown = true) diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/BasicEthUpstreamRpcMethodsDetectorTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/BasicEthUpstreamRpcMethodsDetectorTest.kt new file mode 100644 index 000000000..d760e8b49 --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/BasicEthUpstreamRpcMethodsDetectorTest.kt @@ -0,0 +1,145 @@ +package io.emeraldpay.dshackle.upstream.ethereum + +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.config.UpstreamsConfig +import io.emeraldpay.dshackle.reader.ChainReader +import io.emeraldpay.dshackle.upstream.ChainCallError +import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.ChainResponse +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.rpcclient.ListParams +import org.assertj.core.api.Assertions +import org.junit.jupiter.api.Test +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.mock +import reactor.core.publisher.Mono + +class BasicEthUpstreamRpcMethodsDetectorTest { + @Test + fun `rpc_modules without web3 and eth_getBlockReceipts`() { + val reader = + mock { + on { + read(ChainRequest("rpc_modules", ListParams())) + } doReturn + Mono.just( + ChainResponse( + """{"net": "1.0","debug": "1.0","txpool": "1.0","drpc": "1.0","erigon": "1.0","eth": "1.0","trace": "1.0"}""" + .toByteArray(), + null, + ), + ) + on { + read(ChainRequest("eth_getBlockReceipts", ListParams("latest"))) + } doReturn + Mono.just( + ChainResponse( + """[{"blockHash": "0xd12897f54acaa79f4824aa4f8e1d0f045b5568f5b942073555e9977202c5c474","blockNumber": "0x13c1108"}]""" + .toByteArray(), + null, + ), + ) + } + + val upstream = + mock { + on { getIngressReader() } doReturn reader + on { getChain() } doReturn Chain.ETHEREUM__MAINNET + } + val config = mock> { } + val detector = BasicEthUpstreamRpcMethodsDetector(upstream, config) + Assertions.assertThat(detector.detectRpcMethods().block()).apply { + isNotNull() + size().isGreaterThanOrEqualTo(2) + containsEntry("web3_clientVersion", false) + containsValues(true) + } + } + + @Test + fun `rpc_modules disabled and eth_getBlockReceipts`() { + val reader = + mock { + on { + read(ChainRequest("rpc_modules", ListParams())) + } doReturn + Mono.just( + ChainResponse( + null, + ChainCallError(32601, "the method rpc_modules does not exist/is not available"), + ), + ) + on { + read(ChainRequest("eth_getBlockReceipts", ListParams("latest"))) + } doReturn + Mono.just( + ChainResponse( + """[{"blockHash": "0xd12897f54acaa79f4824aa4f8e1d0f045b5568f5b942073555e9977202c5c474","blockNumber": "0x13c1108"}]""" + .toByteArray(), + null, + ), + ) + } + + val upstream = + mock { + on { getIngressReader() } doReturn reader + on { getChain() } doReturn Chain.ETHEREUM__MAINNET + } + val config = mock> { } + val detector = BasicEthUpstreamRpcMethodsDetector(upstream, config) + Assertions.assertThat(detector.detectRpcMethods().block()).apply { + isNotNull() + hasSize(1) + containsEntry("eth_getBlockReceipts", true) + } + } + + @Test + fun `prefer local config methods group`() { + val reader = + mock { + on { + read(ChainRequest("rpc_modules", ListParams())) + } doReturn + Mono.just( + ChainResponse( + """{"net": "1.0","debug": "1.0","txpool": "1.0","drpc": "1.0","erigon": "1.0","eth": "1.0","trace": "1.0"}""" + .toByteArray(), + null, + ), + ) + on { + read(ChainRequest("eth_getBlockReceipts", ListParams("latest"))) + } doReturn + Mono.just( + ChainResponse( + """[{"blockHash": "0xd12897f54acaa79f4824aa4f8e1d0f045b5568f5b942073555e9977202c5c474","blockNumber": "0x13c1108"}]""" + .toByteArray(), + null, + ), + ) + } + + val upstream = + mock { + on { getIngressReader() } doReturn reader + on { getChain() } doReturn Chain.ETHEREUM__MAINNET + } + val config = + mock> { + on { methodGroups } doReturn + UpstreamsConfig.MethodGroups( + emptySet(), + setOf("eth"), + ) + } + val detector = BasicEthUpstreamRpcMethodsDetector(upstream, config) + Assertions.assertThat(detector.detectRpcMethods().block()).apply { + isNotNull() + containsEntry("eth_getBlockByNumber", false) + containsEntry("eth_getBlockReceipts", true) + containsEntry("debug_traceBlock", true) + } + } +} diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/polkadot/BasicPolkadotUpstreamRpcMethodsDetectorTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/polkadot/BasicPolkadotUpstreamRpcMethodsDetectorTest.kt new file mode 100644 index 000000000..187990dd6 --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/polkadot/BasicPolkadotUpstreamRpcMethodsDetectorTest.kt @@ -0,0 +1,69 @@ +package io.emeraldpay.dshackle.upstream.polkadot + +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.reader.ChainReader +import io.emeraldpay.dshackle.upstream.ChainCallError +import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.ChainResponse +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.rpcclient.ListParams +import org.assertj.core.api.Assertions +import org.junit.jupiter.api.Test +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.mock +import reactor.core.publisher.Mono + +class BasicPolkadotUpstreamRpcMethodsDetectorTest { + @Test + fun `rpc_methods enabled`() { + val reader = + mock { + on { + read(ChainRequest("rpc_methods", ListParams())) + } doReturn + Mono.just( + ChainResponse( + """ {"methods": ["account_nextIndex","archive_unstable_body","archive_unstable_call"]}""" + .toByteArray(), + null, + ), + ) + } + + val upstream = + mock { + on { getIngressReader() } doReturn reader + on { getChain() } doReturn Chain.POLKADOT__MAINNET + } + val detector = BasicPolkadotUpstreamRpcMethodsDetector(upstream) + Assertions.assertThat(detector.detectRpcMethods().block()).apply { + isNotNull() + hasSize(3) + containsKeys("account_nextIndex", "archive_unstable_body", "archive_unstable_call") + } + } + + @Test + fun `rpc_methods disabled`() { + val reader = + mock { + on { + read(ChainRequest("rpc_methods", ListParams())) + } doReturn + Mono.just( + ChainResponse( + null, + ChainCallError(32601, "the method rpc_methods does not exist/is not available"), + ), + ) + } + + val upstream = + mock { + on { getIngressReader() } doReturn reader + on { getChain() } doReturn Chain.POLKADOT__MAINNET + } + val detector = BasicPolkadotUpstreamRpcMethodsDetector(upstream) + Assertions.assertThat(detector.detectRpcMethods().block()).isNull() + } +}