From 136e4f653a480e9a8d7be04a642e7fbd8bbd6641 Mon Sep 17 00:00:00 2001 From: msizov Date: Fri, 7 Jun 2024 17:42:23 +0700 Subject: [PATCH] Revert " Autodetect module groups, temporarily ban unavailable methods" This reverts commit b08c04f405a670d01ab65b8b6fa06dd177b19140. --- .../kotlin/io/emeraldpay/dshackle/Defaults.kt | 1 - .../dshackle/config/UpstreamsConfig.kt | 4 +- .../io/emeraldpay/dshackle/rpc/NativeCall.kt | 16 +---- .../configure/GenericUpstreamCreator.kt | 9 +-- .../startup/configure/UpstreamCreator.kt | 7 -- .../dshackle/upstream/DefaultUpstream.kt | 7 +- .../dshackle/upstream/Multistream.kt | 70 +------------------ .../emeraldpay/dshackle/upstream/Upstream.kt | 1 - .../upstream/UpstreamRpcModulesDetector.kt | 41 ----------- .../ethereum/EthereumChainSpecific.kt | 6 -- .../upstream/generic/AbstractChainSpecific.kt | 5 -- .../upstream/generic/ChainSpecific.kt | 3 - .../upstream/generic/GenericUpstream.kt | 43 ------------ 13 files changed, 12 insertions(+), 201 deletions(-) delete mode 100644 src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamRpcModulesDetector.kt diff --git a/src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt b/src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt index 727b13153..ee2b51fe8 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt @@ -30,6 +30,5 @@ class Defaults { val grpcServerKeepAliveTimeout: Long = 5 val grpcServerPermitKeepAliveTime: Long = 15 val grpcServerMaxConnectionIdle: Long = 3600 - val multistreamUnavailableMethodDisableDuration: Long = 20 // minutes } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt index 1d44baade..0accf7c96 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt @@ -166,8 +166,8 @@ data class UpstreamsConfig( ) data class MethodGroups( - var enabled: Set, - var disabled: Set, + val enabled: Set, + val disabled: Set, ) data class Method( diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt index 9d65c1518..45b1892f3 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt @@ -102,20 +102,7 @@ open class NativeCall( CallResult.fail(id, 0, err, null), ) } - .doOnNext { - callRes -> - if (callRes.error?.message?.contains(Regex("method ([A-Za-z0-9_]+) does not exist/is not available")) == true) { - if (it is ValidCallContext<*>) { - if (it.payload is ParsedCallDetails) { - log.error("nativeCallResult method ${it.payload.method} of ${it.upstream.getId()} is not available, disabling") - val cm = (it.upstream.getMethods() as Multistream.DisabledCallMethods) - cm.disableMethodTemporarily(it.payload.method) - it.upstream.updateMethods(cm) - } - } - } - completeSpan(callRes, requestCount) - } + .doOnNext { callRes -> completeSpan(callRes, requestCount) } .doOnCancel { tracer.currentSpan()?.tag(SPAN_STATUS_MESSAGE, SPAN_REQUEST_CANCELLED)?.end() } @@ -328,6 +315,7 @@ open class NativeCall( "" } val availableMethods = upstream.getMethods() + if (!availableMethods.isAvailable(method)) { val errorMessage = "The method $method is not available" return Mono.just( diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt index f92d86985..26f9ce28c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt @@ -65,24 +65,25 @@ open class GenericUpstreamCreator( chainConfig, ) ?: return UpstreamCreationData.default() + val methods = buildMethods(config, chain) + val hashUrl = connection.let { if (it.connectorMode == GenericConnectorFactory.ConnectorMode.RPC_REQUESTS_WITH_MIXED_HEAD.name) it.rpc?.url ?: it.ws?.url else it.ws?.url ?: it.rpc?.url } val hash = getHash(nodeId, hashUrl!!, hashes) - val buildMethodsFun = { a: UpstreamsConfig.Upstream<*>, b: Chain -> this.buildMethods(a, b) } val upstream = GenericUpstream( - config, + config.id!!, chain, hash, options, + config.role, + methods, QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(config.labels)), chainConfig, connectorFactory, cs::validator, cs::upstreamSettingsDetector, - cs::upstreamRpcModulesDetector, - buildMethodsFun, cs::lowerBoundService, ) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/UpstreamCreator.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/UpstreamCreator.kt index c4e98cdcf..1dcfe51a5 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/UpstreamCreator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/UpstreamCreator.kt @@ -70,13 +70,6 @@ abstract class UpstreamCreator( protected fun buildMethods(config: UpstreamsConfig.Upstream<*>, chain: Chain): CallMethods { return if (config.methods != null || config.methodGroups != null) { - if (!config.methodGroups!!.disabled.contains("filter")) { - if (config.methodGroups == null) { - config.methodGroups = UpstreamsConfig.MethodGroups(setOf("filter"), setOf()) - } else { - config.methodGroups!!.enabled = config.methodGroups!!.enabled.plus("filter") - } - } ManagedCallMethods( delegate = callTargets.getDefaultMethods(chain, indexConfig.isChainEnabled(chain)), enabled = config.methods?.enabled?.map { it.name }?.toSet() ?: emptySet(), diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt index 16e3e8b5f..6def8615b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt @@ -37,7 +37,7 @@ abstract class DefaultUpstream( defaultAvail: UpstreamAvailability, private val options: ChainOptions.Options, private val role: UpstreamsConfig.UpstreamRole, - private var targets: CallMethods?, + private val targets: CallMethods?, private val node: QuorumForLabels.QuorumItem?, private val chainConfig: ChainsConfig.ChainConfig, private val chain: Chain, @@ -147,11 +147,6 @@ abstract class DefaultUpstream( return targets ?: throw IllegalStateException("Methods are not set") } - override fun updateMethods(m: CallMethods) { - targets = m - sendUpstreamStateEvent(UpstreamChangeEvent.ChangeType.UPDATED) - } - override fun nodeId(): Byte = hash open fun getQuorumByLabel(): QuorumForLabels { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt index 5742c7ce7..7f04ea32d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt @@ -16,16 +16,12 @@ */ package io.emeraldpay.dshackle.upstream -import com.github.benmanes.caffeine.cache.Cache -import com.github.benmanes.caffeine.cache.Caffeine import io.emeraldpay.api.proto.BlockchainOuterClass import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.Defaults.Companion.multistreamUnavailableMethodDisableDuration import io.emeraldpay.dshackle.cache.Caches import io.emeraldpay.dshackle.cache.CachesEnabled import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.foundation.ChainOptions -import io.emeraldpay.dshackle.quorum.CallQuorum import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.startup.QuorumForLabels import io.emeraldpay.dshackle.startup.UpstreamChangeEvent @@ -72,7 +68,7 @@ abstract class Multistream( private var cacheSubscription: Disposable? = null @Volatile - private var callMethods: DisabledCallMethods? = null + private var callMethods: CallMethods? = null private var callMethodsFactory: Factory = Factory { return@Factory callMethods ?: throw FunctorException("Not initialized yet") } @@ -231,16 +227,7 @@ abstract class Multistream( val upstreams = getAll() val availableUpstreams = upstreams.filter { it.isAvailable() } availableUpstreams.map { it.getMethods() }.let { - if (callMethods == null) { - callMethods = DisabledCallMethods(this, multistreamUnavailableMethodDisableDuration, AggregatedCallMethods(it)) - } else { - callMethods = DisabledCallMethods( - this, - multistreamUnavailableMethodDisableDuration, - AggregatedCallMethods(it), - callMethods!!.disabledMethods, - ) - } + callMethods = AggregatedCallMethods(it) } capabilities = if (upstreams.isEmpty()) { emptySet() @@ -333,10 +320,6 @@ abstract class Multistream( return callMethods ?: throw IllegalStateException("Methods are not initialized yet") } - override fun updateMethods(m: CallMethods) { - onUpstreamsUpdated() - } - fun getMethodsFactory(): Factory { return callMethodsFactory } @@ -569,55 +552,6 @@ abstract class Multistream( // -------------------------------------------------------------------------------------------------------- - class DisabledCallMethods(private val multistream: Multistream, private val defaultDisableTimeout: Long, private val callMethods: CallMethods) : CallMethods { - var disabledMethods: Cache = Caffeine.newBuilder() - .removalListener { key: String?, _: Boolean?, cause -> - if (cause.wasEvicted() && key != null) { - multistream.log.info("${multistream.getId()} restoring method $key") - multistream.onUpstreamsUpdated() - } - } - .expireAfterWrite(Duration.ofMinutes(defaultDisableTimeout)) - .build() - - constructor( - multistream: Multistream, - defaultDisableTimeout: Long, - callMethods: CallMethods, - disabledMethodsCopy: Cache, - ) : this(multistream, defaultDisableTimeout, callMethods) { - disabledMethods = disabledMethodsCopy - } - - override fun createQuorumFor(method: String): CallQuorum { - return callMethods.createQuorumFor(method) - } - - override fun isCallable(method: String): Boolean { - return callMethods.isCallable(method) && disabledMethods.getIfPresent(method) == null - } - - override fun getSupportedMethods(): Set { - return callMethods.getSupportedMethods() - disabledMethods.asMap().keys - } - - override fun isHardcoded(method: String): Boolean { - return callMethods.isHardcoded(method) - } - - override fun executeHardcoded(method: String): ByteArray { - return callMethods.executeHardcoded(method) - } - - override fun getGroupMethods(groupName: String): Set { - return callMethods.getGroupMethods(groupName) - } - - fun disableMethodTemporarily(method: String) { - disabledMethods.put(method, true) - } - } - class UpstreamStatus(val upstream: Upstream, val status: UpstreamAvailability) class FilterBestAvailability : java.util.function.Function { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt index d1ff4841e..b00daadfb 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt @@ -43,7 +43,6 @@ interface Upstream : Lifecycle { fun getLag(): Long? fun getLabels(): Collection fun getMethods(): CallMethods - fun updateMethods(m: CallMethods) fun getId(): String fun getCapabilities(): Set fun isGrpc(): Boolean diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamRpcModulesDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamRpcModulesDetector.kt deleted file mode 100644 index 447468d99..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamRpcModulesDetector.kt +++ /dev/null @@ -1,41 +0,0 @@ -package io.emeraldpay.dshackle.upstream - -import com.fasterxml.jackson.core.type.TypeReference -import io.emeraldpay.dshackle.Global -import io.emeraldpay.dshackle.upstream.rpcclient.ListParams -import org.slf4j.Logger -import org.slf4j.LoggerFactory -import reactor.core.publisher.Mono - -typealias UpstreamRpcModulesDetectorBuilder = (Upstream) -> UpstreamRpcModulesDetector? - -abstract class UpstreamRpcModulesDetector( - private val upstream: Upstream, -) { - protected val log: Logger = LoggerFactory.getLogger(this::class.java) - - open fun detectRpcModules(): Mono> { - return upstream.getIngressReader() - .read(rpcModulesRequest()) - .flatMap(ChainResponse::requireResult) - .map(::parseRpcModules) - .onErrorResume { - log.warn("Can't detect rpc_modules of upstream ${upstream.getId()}, reason - {}", it.message) - Mono.just(HashMap()) - } - } - - protected abstract fun rpcModulesRequest(): ChainRequest - - protected abstract fun parseRpcModules(data: ByteArray): HashMap -} - -class BasicEthUpstreamRpcModulesDetector( - upstream: Upstream, -) : UpstreamRpcModulesDetector(upstream) { - override fun rpcModulesRequest(): ChainRequest = ChainRequest("rpc_modules", ListParams()) - - override fun parseRpcModules(data: ByteArray): HashMap { - return Global.objectMapper.readValue(data, object : TypeReference>() {}) - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt index 400280ce3..1724bb6d4 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt @@ -6,7 +6,6 @@ import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.foundation.ChainOptions.Options import io.emeraldpay.dshackle.reader.ChainReader -import io.emeraldpay.dshackle.upstream.BasicEthUpstreamRpcModulesDetector import io.emeraldpay.dshackle.upstream.CachingReader import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.EgressSubscription @@ -15,7 +14,6 @@ import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.LogsOracle import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.Upstream -import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.calls.CallMethods @@ -93,10 +91,6 @@ object EthereumChainSpecific : AbstractPollChainSpecific() { return EthereumUpstreamValidator(chain, upstream, options, config) } - override fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector { - return BasicEthUpstreamRpcModulesDetector(upstream) - } - override fun lowerBoundService(chain: Chain, upstream: Upstream): LowerBoundService { return EthereumLowerBoundService(chain, upstream) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt index da7c68550..eeb8227b2 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt @@ -15,7 +15,6 @@ import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.NoIngressSubscription import io.emeraldpay.dshackle.upstream.NoopCachingReader import io.emeraldpay.dshackle.upstream.Upstream -import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.calls.CallSelector @@ -43,10 +42,6 @@ abstract class AbstractChainSpecific : ChainSpecific { return null } - override fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector? { - return null - } - override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription { return NoIngressSubscription() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt index 6d05f9cef..bf4e83c5d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt @@ -23,7 +23,6 @@ import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.LogsOracle import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.Upstream -import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.beaconchain.BeaconChainSpecific @@ -65,8 +64,6 @@ interface ChainSpecific { fun upstreamSettingsDetector(chain: Chain, upstream: Upstream): UpstreamSettingsDetector? - fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector? - fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription fun callSelector(caches: Caches): CallSelector? diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index ad849f993..33756867d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -16,8 +16,6 @@ import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.UNKNOWN_CLIENT_VERSION import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability -import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector -import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetectorBuilder import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetectorBuilder import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.UpstreamValidatorBuilder @@ -50,24 +48,6 @@ open class GenericUpstream( lowerBoundServiceBuilder: LowerBoundServiceBuilder, ) : DefaultUpstream(id, hash, null, UpstreamAvailability.OK, options, role, targets, node, chainConfig, chain), Lifecycle { - constructor( - config: UpstreamsConfig.Upstream<*>, - chain: Chain, - hash: Byte, - options: ChainOptions.Options, - node: QuorumForLabels.QuorumItem?, - chainConfig: ChainsConfig.ChainConfig, - connectorFactory: ConnectorFactory, - validatorBuilder: UpstreamValidatorBuilder, - upstreamSettingsDetectorBuilder: UpstreamSettingsDetectorBuilder, - upstreamRpcModulesDetectorBuilder: UpstreamRpcModulesDetectorBuilder, - buildMethods: (UpstreamsConfig.Upstream<*>, Chain) -> CallMethods, - lowerBoundServiceBuilder: LowerBoundServiceBuilder, - ) : this(config.id!!, chain, hash, options, config.role, buildMethods(config, chain), node, chainConfig, connectorFactory, validatorBuilder, upstreamSettingsDetectorBuilder, lowerBoundServiceBuilder) { - rpcModulesDetector = upstreamRpcModulesDetectorBuilder(this) - detectRpcModules(config, buildMethods) - } - private val validator: UpstreamValidator? = validatorBuilder(chain, this, getOptions(), chainConfig) private var validatorSubscription: Disposable? = null private var validationSettingsSubscription: Disposable? = null @@ -77,7 +57,6 @@ open class GenericUpstream( protected val connector: GenericConnector = connectorFactory.create(this, chain) private var livenessSubscription: Disposable? = null private val settingsDetector = upstreamSettingsDetectorBuilder(chain, this) - private var rpcModulesDetector: UpstreamRpcModulesDetector? = null private val lowerBoundService = lowerBoundServiceBuilder(chain, this) @@ -211,28 +190,6 @@ open class GenericUpstream( } } - private fun detectRpcModules(config: UpstreamsConfig.Upstream<*>, buildMethods: (UpstreamsConfig.Upstream<*>, Chain) -> CallMethods) { - rpcModulesDetector?.detectRpcModules() - - val rpcDetector = rpcModulesDetector?.detectRpcModules()?.block() ?: HashMap() - log.info("Upstream rpc detector for ${getId()} returned $rpcDetector ") - if (rpcDetector.size != 0) { - var changed = false - for ((group, _) in rpcDetector) { - if (group == "trace" || group == "debug" || group == "filter") { - if (config.methodGroups == null) { - config.methodGroups = UpstreamsConfig.MethodGroups(setOf("filter"), setOf()) - } - if (!config.methodGroups!!.disabled.contains(group) && !config.methodGroups!!.enabled.contains(group)) { - config.methodGroups!!.enabled = config.methodGroups!!.enabled.plus(group) - changed = true - } - } - } - if (changed) updateMethods(buildMethods(config, chain)) - } - } - private fun upstreamStart() { if (getOptions().disableValidation) { log.warn("Disable validation for upstream ${this.getId()}")