diff --git a/src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt b/src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt index ee2b51fe8..727b13153 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt @@ -30,5 +30,6 @@ class Defaults { val grpcServerKeepAliveTimeout: Long = 5 val grpcServerPermitKeepAliveTime: Long = 15 val grpcServerMaxConnectionIdle: Long = 3600 + val multistreamUnavailableMethodDisableDuration: Long = 20 // minutes } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt index 0accf7c96..1d44baade 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt @@ -166,8 +166,8 @@ data class UpstreamsConfig( ) data class MethodGroups( - val enabled: Set, - val disabled: Set, + var enabled: Set, + var disabled: Set, ) data class Method( diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt index 45b1892f3..9d65c1518 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt @@ -102,7 +102,20 @@ open class NativeCall( CallResult.fail(id, 0, err, null), ) } - .doOnNext { callRes -> completeSpan(callRes, requestCount) } + .doOnNext { + callRes -> + if (callRes.error?.message?.contains(Regex("method ([A-Za-z0-9_]+) does not exist/is not available")) == true) { + if (it is ValidCallContext<*>) { + if (it.payload is ParsedCallDetails) { + log.error("nativeCallResult method ${it.payload.method} of ${it.upstream.getId()} is not available, disabling") + val cm = (it.upstream.getMethods() as Multistream.DisabledCallMethods) + cm.disableMethodTemporarily(it.payload.method) + it.upstream.updateMethods(cm) + } + } + } + completeSpan(callRes, requestCount) + } .doOnCancel { tracer.currentSpan()?.tag(SPAN_STATUS_MESSAGE, SPAN_REQUEST_CANCELLED)?.end() } @@ -315,7 +328,6 @@ open class NativeCall( "" } val availableMethods = upstream.getMethods() - if (!availableMethods.isAvailable(method)) { val errorMessage = "The method $method is not available" return Mono.just( diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt index 26f9ce28c..f92d86985 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt @@ -65,25 +65,24 @@ open class GenericUpstreamCreator( chainConfig, ) ?: return UpstreamCreationData.default() - val methods = buildMethods(config, chain) - val hashUrl = connection.let { if (it.connectorMode == GenericConnectorFactory.ConnectorMode.RPC_REQUESTS_WITH_MIXED_HEAD.name) it.rpc?.url ?: it.ws?.url else it.ws?.url ?: it.rpc?.url } val hash = getHash(nodeId, hashUrl!!, hashes) + val buildMethodsFun = { a: UpstreamsConfig.Upstream<*>, b: Chain -> this.buildMethods(a, b) } val upstream = GenericUpstream( - config.id!!, + config, chain, hash, options, - config.role, - methods, QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(config.labels)), chainConfig, connectorFactory, cs::validator, cs::upstreamSettingsDetector, + cs::upstreamRpcModulesDetector, + buildMethodsFun, cs::lowerBoundService, ) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/UpstreamCreator.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/UpstreamCreator.kt index 1dcfe51a5..c4e98cdcf 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/UpstreamCreator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/UpstreamCreator.kt @@ -70,6 +70,13 @@ abstract class UpstreamCreator( protected fun buildMethods(config: UpstreamsConfig.Upstream<*>, chain: Chain): CallMethods { return if (config.methods != null || config.methodGroups != null) { + if (!config.methodGroups!!.disabled.contains("filter")) { + if (config.methodGroups == null) { + config.methodGroups = UpstreamsConfig.MethodGroups(setOf("filter"), setOf()) + } else { + config.methodGroups!!.enabled = config.methodGroups!!.enabled.plus("filter") + } + } ManagedCallMethods( delegate = callTargets.getDefaultMethods(chain, indexConfig.isChainEnabled(chain)), enabled = config.methods?.enabled?.map { it.name }?.toSet() ?: emptySet(), diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt index 6def8615b..16e3e8b5f 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt @@ -37,7 +37,7 @@ abstract class DefaultUpstream( defaultAvail: UpstreamAvailability, private val options: ChainOptions.Options, private val role: UpstreamsConfig.UpstreamRole, - private val targets: CallMethods?, + private var targets: CallMethods?, private val node: QuorumForLabels.QuorumItem?, private val chainConfig: ChainsConfig.ChainConfig, private val chain: Chain, @@ -147,6 +147,11 @@ abstract class DefaultUpstream( return targets ?: throw IllegalStateException("Methods are not set") } + override fun updateMethods(m: CallMethods) { + targets = m + sendUpstreamStateEvent(UpstreamChangeEvent.ChangeType.UPDATED) + } + override fun nodeId(): Byte = hash open fun getQuorumByLabel(): QuorumForLabels { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt index 7f04ea32d..5742c7ce7 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt @@ -16,12 +16,16 @@ */ package io.emeraldpay.dshackle.upstream +import com.github.benmanes.caffeine.cache.Cache +import com.github.benmanes.caffeine.cache.Caffeine import io.emeraldpay.api.proto.BlockchainOuterClass import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.Defaults.Companion.multistreamUnavailableMethodDisableDuration import io.emeraldpay.dshackle.cache.Caches import io.emeraldpay.dshackle.cache.CachesEnabled import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.foundation.ChainOptions +import io.emeraldpay.dshackle.quorum.CallQuorum import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.startup.QuorumForLabels import io.emeraldpay.dshackle.startup.UpstreamChangeEvent @@ -68,7 +72,7 @@ abstract class Multistream( private var cacheSubscription: Disposable? = null @Volatile - private var callMethods: CallMethods? = null + private var callMethods: DisabledCallMethods? = null private var callMethodsFactory: Factory = Factory { return@Factory callMethods ?: throw FunctorException("Not initialized yet") } @@ -227,7 +231,16 @@ abstract class Multistream( val upstreams = getAll() val availableUpstreams = upstreams.filter { it.isAvailable() } availableUpstreams.map { it.getMethods() }.let { - callMethods = AggregatedCallMethods(it) + if (callMethods == null) { + callMethods = DisabledCallMethods(this, multistreamUnavailableMethodDisableDuration, AggregatedCallMethods(it)) + } else { + callMethods = DisabledCallMethods( + this, + multistreamUnavailableMethodDisableDuration, + AggregatedCallMethods(it), + callMethods!!.disabledMethods, + ) + } } capabilities = if (upstreams.isEmpty()) { emptySet() @@ -320,6 +333,10 @@ abstract class Multistream( return callMethods ?: throw IllegalStateException("Methods are not initialized yet") } + override fun updateMethods(m: CallMethods) { + onUpstreamsUpdated() + } + fun getMethodsFactory(): Factory { return callMethodsFactory } @@ -552,6 +569,55 @@ abstract class Multistream( // -------------------------------------------------------------------------------------------------------- + class DisabledCallMethods(private val multistream: Multistream, private val defaultDisableTimeout: Long, private val callMethods: CallMethods) : CallMethods { + var disabledMethods: Cache = Caffeine.newBuilder() + .removalListener { key: String?, _: Boolean?, cause -> + if (cause.wasEvicted() && key != null) { + multistream.log.info("${multistream.getId()} restoring method $key") + multistream.onUpstreamsUpdated() + } + } + .expireAfterWrite(Duration.ofMinutes(defaultDisableTimeout)) + .build() + + constructor( + multistream: Multistream, + defaultDisableTimeout: Long, + callMethods: CallMethods, + disabledMethodsCopy: Cache, + ) : this(multistream, defaultDisableTimeout, callMethods) { + disabledMethods = disabledMethodsCopy + } + + override fun createQuorumFor(method: String): CallQuorum { + return callMethods.createQuorumFor(method) + } + + override fun isCallable(method: String): Boolean { + return callMethods.isCallable(method) && disabledMethods.getIfPresent(method) == null + } + + override fun getSupportedMethods(): Set { + return callMethods.getSupportedMethods() - disabledMethods.asMap().keys + } + + override fun isHardcoded(method: String): Boolean { + return callMethods.isHardcoded(method) + } + + override fun executeHardcoded(method: String): ByteArray { + return callMethods.executeHardcoded(method) + } + + override fun getGroupMethods(groupName: String): Set { + return callMethods.getGroupMethods(groupName) + } + + fun disableMethodTemporarily(method: String) { + disabledMethods.put(method, true) + } + } + class UpstreamStatus(val upstream: Upstream, val status: UpstreamAvailability) class FilterBestAvailability : java.util.function.Function { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt index b00daadfb..d1ff4841e 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt @@ -43,6 +43,7 @@ interface Upstream : Lifecycle { fun getLag(): Long? fun getLabels(): Collection fun getMethods(): CallMethods + fun updateMethods(m: CallMethods) fun getId(): String fun getCapabilities(): Set fun isGrpc(): Boolean diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamRpcModulesDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamRpcModulesDetector.kt new file mode 100644 index 000000000..447468d99 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamRpcModulesDetector.kt @@ -0,0 +1,41 @@ +package io.emeraldpay.dshackle.upstream + +import com.fasterxml.jackson.core.type.TypeReference +import io.emeraldpay.dshackle.Global +import io.emeraldpay.dshackle.upstream.rpcclient.ListParams +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import reactor.core.publisher.Mono + +typealias UpstreamRpcModulesDetectorBuilder = (Upstream) -> UpstreamRpcModulesDetector? + +abstract class UpstreamRpcModulesDetector( + private val upstream: Upstream, +) { + protected val log: Logger = LoggerFactory.getLogger(this::class.java) + + open fun detectRpcModules(): Mono> { + return upstream.getIngressReader() + .read(rpcModulesRequest()) + .flatMap(ChainResponse::requireResult) + .map(::parseRpcModules) + .onErrorResume { + log.warn("Can't detect rpc_modules of upstream ${upstream.getId()}, reason - {}", it.message) + Mono.just(HashMap()) + } + } + + protected abstract fun rpcModulesRequest(): ChainRequest + + protected abstract fun parseRpcModules(data: ByteArray): HashMap +} + +class BasicEthUpstreamRpcModulesDetector( + upstream: Upstream, +) : UpstreamRpcModulesDetector(upstream) { + override fun rpcModulesRequest(): ChainRequest = ChainRequest("rpc_modules", ListParams()) + + override fun parseRpcModules(data: ByteArray): HashMap { + return Global.objectMapper.readValue(data, object : TypeReference>() {}) + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt index 1724bb6d4..400280ce3 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt @@ -6,6 +6,7 @@ import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.foundation.ChainOptions.Options import io.emeraldpay.dshackle.reader.ChainReader +import io.emeraldpay.dshackle.upstream.BasicEthUpstreamRpcModulesDetector import io.emeraldpay.dshackle.upstream.CachingReader import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.EgressSubscription @@ -14,6 +15,7 @@ import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.LogsOracle import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.calls.CallMethods @@ -91,6 +93,10 @@ object EthereumChainSpecific : AbstractPollChainSpecific() { return EthereumUpstreamValidator(chain, upstream, options, config) } + override fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector { + return BasicEthUpstreamRpcModulesDetector(upstream) + } + override fun lowerBoundService(chain: Chain, upstream: Upstream): LowerBoundService { return EthereumLowerBoundService(chain, upstream) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt index eeb8227b2..da7c68550 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt @@ -15,6 +15,7 @@ import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.NoIngressSubscription import io.emeraldpay.dshackle.upstream.NoopCachingReader import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.calls.CallSelector @@ -42,6 +43,10 @@ abstract class AbstractChainSpecific : ChainSpecific { return null } + override fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector? { + return null + } + override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription { return NoIngressSubscription() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt index bf4e83c5d..6d05f9cef 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt @@ -23,6 +23,7 @@ import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.LogsOracle import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.beaconchain.BeaconChainSpecific @@ -64,6 +65,8 @@ interface ChainSpecific { fun upstreamSettingsDetector(chain: Chain, upstream: Upstream): UpstreamSettingsDetector? + fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector? + fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription fun callSelector(caches: Caches): CallSelector? diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index 33756867d..ad849f993 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -16,6 +16,8 @@ import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.UNKNOWN_CLIENT_VERSION import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability +import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector +import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetectorBuilder import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetectorBuilder import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.UpstreamValidatorBuilder @@ -48,6 +50,24 @@ open class GenericUpstream( lowerBoundServiceBuilder: LowerBoundServiceBuilder, ) : DefaultUpstream(id, hash, null, UpstreamAvailability.OK, options, role, targets, node, chainConfig, chain), Lifecycle { + constructor( + config: UpstreamsConfig.Upstream<*>, + chain: Chain, + hash: Byte, + options: ChainOptions.Options, + node: QuorumForLabels.QuorumItem?, + chainConfig: ChainsConfig.ChainConfig, + connectorFactory: ConnectorFactory, + validatorBuilder: UpstreamValidatorBuilder, + upstreamSettingsDetectorBuilder: UpstreamSettingsDetectorBuilder, + upstreamRpcModulesDetectorBuilder: UpstreamRpcModulesDetectorBuilder, + buildMethods: (UpstreamsConfig.Upstream<*>, Chain) -> CallMethods, + lowerBoundServiceBuilder: LowerBoundServiceBuilder, + ) : this(config.id!!, chain, hash, options, config.role, buildMethods(config, chain), node, chainConfig, connectorFactory, validatorBuilder, upstreamSettingsDetectorBuilder, lowerBoundServiceBuilder) { + rpcModulesDetector = upstreamRpcModulesDetectorBuilder(this) + detectRpcModules(config, buildMethods) + } + private val validator: UpstreamValidator? = validatorBuilder(chain, this, getOptions(), chainConfig) private var validatorSubscription: Disposable? = null private var validationSettingsSubscription: Disposable? = null @@ -57,6 +77,7 @@ open class GenericUpstream( protected val connector: GenericConnector = connectorFactory.create(this, chain) private var livenessSubscription: Disposable? = null private val settingsDetector = upstreamSettingsDetectorBuilder(chain, this) + private var rpcModulesDetector: UpstreamRpcModulesDetector? = null private val lowerBoundService = lowerBoundServiceBuilder(chain, this) @@ -190,6 +211,28 @@ open class GenericUpstream( } } + private fun detectRpcModules(config: UpstreamsConfig.Upstream<*>, buildMethods: (UpstreamsConfig.Upstream<*>, Chain) -> CallMethods) { + rpcModulesDetector?.detectRpcModules() + + val rpcDetector = rpcModulesDetector?.detectRpcModules()?.block() ?: HashMap() + log.info("Upstream rpc detector for ${getId()} returned $rpcDetector ") + if (rpcDetector.size != 0) { + var changed = false + for ((group, _) in rpcDetector) { + if (group == "trace" || group == "debug" || group == "filter") { + if (config.methodGroups == null) { + config.methodGroups = UpstreamsConfig.MethodGroups(setOf("filter"), setOf()) + } + if (!config.methodGroups!!.disabled.contains(group) && !config.methodGroups!!.enabled.contains(group)) { + config.methodGroups!!.enabled = config.methodGroups!!.enabled.plus(group) + changed = true + } + } + } + if (changed) updateMethods(buildMethods(config, chain)) + } + } + private fun upstreamStart() { if (getOptions().disableValidation) { log.warn("Disable validation for upstream ${this.getId()}")