Skip to content

Commit

Permalink
Revert " Autodetect module groups, temporarily ban unavailable method…
Browse files Browse the repository at this point in the history
…s" (#502)

This reverts commit b08c04f.
  • Loading branch information
msizov authored Jun 7, 2024
1 parent b08c04f commit 2b94078
Show file tree
Hide file tree
Showing 13 changed files with 12 additions and 201 deletions.
1 change: 0 additions & 1 deletion src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,5 @@ class Defaults {
val grpcServerKeepAliveTimeout: Long = 5
val grpcServerPermitKeepAliveTime: Long = 15
val grpcServerMaxConnectionIdle: Long = 3600
val multistreamUnavailableMethodDisableDuration: Long = 20 // minutes
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ data class UpstreamsConfig(
)

data class MethodGroups(
var enabled: Set<String>,
var disabled: Set<String>,
val enabled: Set<String>,
val disabled: Set<String>,
)

data class Method(
Expand Down
16 changes: 2 additions & 14 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,7 @@ open class NativeCall(
CallResult.fail(id, 0, err, null),
)
}
.doOnNext {
callRes ->
if (callRes.error?.message?.contains(Regex("method ([A-Za-z0-9_]+) does not exist/is not available")) == true) {
if (it is ValidCallContext<*>) {
if (it.payload is ParsedCallDetails) {
log.error("nativeCallResult method ${it.payload.method} of ${it.upstream.getId()} is not available, disabling")
val cm = (it.upstream.getMethods() as Multistream.DisabledCallMethods)
cm.disableMethodTemporarily(it.payload.method)
it.upstream.updateMethods(cm)
}
}
}
completeSpan(callRes, requestCount)
}
.doOnNext { callRes -> completeSpan(callRes, requestCount) }
.doOnCancel {
tracer.currentSpan()?.tag(SPAN_STATUS_MESSAGE, SPAN_REQUEST_CANCELLED)?.end()
}
Expand Down Expand Up @@ -328,6 +315,7 @@ open class NativeCall(
""
}
val availableMethods = upstream.getMethods()

if (!availableMethods.isAvailable(method)) {
val errorMessage = "The method $method is not available"
return Mono.just(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,25 @@ open class GenericUpstreamCreator(
chainConfig,
) ?: return UpstreamCreationData.default()

val methods = buildMethods(config, chain)

val hashUrl = connection.let {
if (it.connectorMode == GenericConnectorFactory.ConnectorMode.RPC_REQUESTS_WITH_MIXED_HEAD.name) it.rpc?.url ?: it.ws?.url else it.ws?.url ?: it.rpc?.url
}
val hash = getHash(nodeId, hashUrl!!, hashes)
val buildMethodsFun = { a: UpstreamsConfig.Upstream<*>, b: Chain -> this.buildMethods(a, b) }

val upstream = GenericUpstream(
config,
config.id!!,
chain,
hash,
options,
config.role,
methods,
QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(config.labels)),
chainConfig,
connectorFactory,
cs::validator,
cs::upstreamSettingsDetector,
cs::upstreamRpcModulesDetector,
buildMethodsFun,
cs::lowerBoundService,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,6 @@ abstract class UpstreamCreator(

protected fun buildMethods(config: UpstreamsConfig.Upstream<*>, chain: Chain): CallMethods {
return if (config.methods != null || config.methodGroups != null) {
if (!config.methodGroups!!.disabled.contains("filter")) {
if (config.methodGroups == null) {
config.methodGroups = UpstreamsConfig.MethodGroups(setOf("filter"), setOf())
} else {
config.methodGroups!!.enabled = config.methodGroups!!.enabled.plus("filter")
}
}
ManagedCallMethods(
delegate = callTargets.getDefaultMethods(chain, indexConfig.isChainEnabled(chain)),
enabled = config.methods?.enabled?.map { it.name }?.toSet() ?: emptySet(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ abstract class DefaultUpstream(
defaultAvail: UpstreamAvailability,
private val options: ChainOptions.Options,
private val role: UpstreamsConfig.UpstreamRole,
private var targets: CallMethods?,
private val targets: CallMethods?,
private val node: QuorumForLabels.QuorumItem?,
private val chainConfig: ChainsConfig.ChainConfig,
private val chain: Chain,
Expand Down Expand Up @@ -147,11 +147,6 @@ abstract class DefaultUpstream(
return targets ?: throw IllegalStateException("Methods are not set")
}

override fun updateMethods(m: CallMethods) {
targets = m
sendUpstreamStateEvent(UpstreamChangeEvent.ChangeType.UPDATED)
}

override fun nodeId(): Byte = hash

open fun getQuorumByLabel(): QuorumForLabels {
Expand Down
70 changes: 2 additions & 68 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,12 @@
*/
package io.emeraldpay.dshackle.upstream

import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Defaults.Companion.multistreamUnavailableMethodDisableDuration
import io.emeraldpay.dshackle.cache.Caches
import io.emeraldpay.dshackle.cache.CachesEnabled
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.quorum.CallQuorum
import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.startup.QuorumForLabels
import io.emeraldpay.dshackle.startup.UpstreamChangeEvent
Expand Down Expand Up @@ -72,7 +68,7 @@ abstract class Multistream(
private var cacheSubscription: Disposable? = null

@Volatile
private var callMethods: DisabledCallMethods? = null
private var callMethods: CallMethods? = null
private var callMethodsFactory: Factory<CallMethods> = Factory {
return@Factory callMethods ?: throw FunctorException("Not initialized yet")
}
Expand Down Expand Up @@ -231,16 +227,7 @@ abstract class Multistream(
val upstreams = getAll()
val availableUpstreams = upstreams.filter { it.isAvailable() }
availableUpstreams.map { it.getMethods() }.let {
if (callMethods == null) {
callMethods = DisabledCallMethods(this, multistreamUnavailableMethodDisableDuration, AggregatedCallMethods(it))
} else {
callMethods = DisabledCallMethods(
this,
multistreamUnavailableMethodDisableDuration,
AggregatedCallMethods(it),
callMethods!!.disabledMethods,
)
}
callMethods = AggregatedCallMethods(it)
}
capabilities = if (upstreams.isEmpty()) {
emptySet()
Expand Down Expand Up @@ -333,10 +320,6 @@ abstract class Multistream(
return callMethods ?: throw IllegalStateException("Methods are not initialized yet")
}

override fun updateMethods(m: CallMethods) {
onUpstreamsUpdated()
}

fun getMethodsFactory(): Factory<CallMethods> {
return callMethodsFactory
}
Expand Down Expand Up @@ -569,55 +552,6 @@ abstract class Multistream(

// --------------------------------------------------------------------------------------------------------

class DisabledCallMethods(private val multistream: Multistream, private val defaultDisableTimeout: Long, private val callMethods: CallMethods) : CallMethods {
var disabledMethods: Cache<String, Boolean> = Caffeine.newBuilder()
.removalListener { key: String?, _: Boolean?, cause ->
if (cause.wasEvicted() && key != null) {
multistream.log.info("${multistream.getId()} restoring method $key")
multistream.onUpstreamsUpdated()
}
}
.expireAfterWrite(Duration.ofMinutes(defaultDisableTimeout))
.build<String, Boolean>()

constructor(
multistream: Multistream,
defaultDisableTimeout: Long,
callMethods: CallMethods,
disabledMethodsCopy: Cache<String, Boolean>,
) : this(multistream, defaultDisableTimeout, callMethods) {
disabledMethods = disabledMethodsCopy
}

override fun createQuorumFor(method: String): CallQuorum {
return callMethods.createQuorumFor(method)
}

override fun isCallable(method: String): Boolean {
return callMethods.isCallable(method) && disabledMethods.getIfPresent(method) == null
}

override fun getSupportedMethods(): Set<String> {
return callMethods.getSupportedMethods() - disabledMethods.asMap().keys
}

override fun isHardcoded(method: String): Boolean {
return callMethods.isHardcoded(method)
}

override fun executeHardcoded(method: String): ByteArray {
return callMethods.executeHardcoded(method)
}

override fun getGroupMethods(groupName: String): Set<String> {
return callMethods.getGroupMethods(groupName)
}

fun disableMethodTemporarily(method: String) {
disabledMethods.put(method, true)
}
}

class UpstreamStatus(val upstream: Upstream, val status: UpstreamAvailability)

class FilterBestAvailability : java.util.function.Function<UpstreamStatus, UpstreamAvailability> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ interface Upstream : Lifecycle {
fun getLag(): Long?
fun getLabels(): Collection<UpstreamsConfig.Labels>
fun getMethods(): CallMethods
fun updateMethods(m: CallMethods)
fun getId(): String
fun getCapabilities(): Set<Capability>
fun isGrpc(): Boolean
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig
import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.foundation.ChainOptions.Options
import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.upstream.BasicEthUpstreamRpcModulesDetector
import io.emeraldpay.dshackle.upstream.CachingReader
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.EgressSubscription
Expand All @@ -15,7 +14,6 @@ import io.emeraldpay.dshackle.upstream.IngressSubscription
import io.emeraldpay.dshackle.upstream.LogsOracle
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector
import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector
import io.emeraldpay.dshackle.upstream.UpstreamValidator
import io.emeraldpay.dshackle.upstream.calls.CallMethods
Expand Down Expand Up @@ -93,10 +91,6 @@ object EthereumChainSpecific : AbstractPollChainSpecific() {
return EthereumUpstreamValidator(chain, upstream, options, config)
}

override fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector {
return BasicEthUpstreamRpcModulesDetector(upstream)
}

override fun lowerBoundService(chain: Chain, upstream: Upstream): LowerBoundService {
return EthereumLowerBoundService(chain, upstream)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.NoIngressSubscription
import io.emeraldpay.dshackle.upstream.NoopCachingReader
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector
import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.calls.CallSelector
Expand Down Expand Up @@ -43,10 +42,6 @@ abstract class AbstractChainSpecific : ChainSpecific {
return null
}

override fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector? {
return null
}

override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription {
return NoIngressSubscription()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import io.emeraldpay.dshackle.upstream.IngressSubscription
import io.emeraldpay.dshackle.upstream.LogsOracle
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector
import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector
import io.emeraldpay.dshackle.upstream.UpstreamValidator
import io.emeraldpay.dshackle.upstream.beaconchain.BeaconChainSpecific
Expand Down Expand Up @@ -65,8 +64,6 @@ interface ChainSpecific {

fun upstreamSettingsDetector(chain: Chain, upstream: Upstream): UpstreamSettingsDetector?

fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector?

fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription

fun callSelector(caches: Caches): CallSelector?
Expand Down
Loading

0 comments on commit 2b94078

Please sign in to comment.