Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Autodetect method groups via rpc_modules #500

Merged
merged 2 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ class Defaults {
val grpcServerKeepAliveTimeout: Long = 5
val grpcServerPermitKeepAliveTime: Long = 15
val grpcServerMaxConnectionIdle: Long = 3600
val multistreamUnavailableMethodDisableDuration: Long = 20 // minutes
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ data class UpstreamsConfig(
)

data class MethodGroups(
val enabled: Set<String>,
val disabled: Set<String>,
var enabled: Set<String>,
var disabled: Set<String>,
)

data class Method(
Expand Down
16 changes: 14 additions & 2 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,20 @@ open class NativeCall(
CallResult.fail(id, 0, err, null),
)
}
.doOnNext { callRes -> completeSpan(callRes, requestCount) }
.doOnNext {
callRes ->
if (callRes.error?.message?.contains(Regex("method ([A-Za-z0-9_]+) does not exist/is not available")) == true) {
if (it is ValidCallContext<*>) {
if (it.payload is ParsedCallDetails) {
log.error("nativeCallResult method ${it.payload.method} of ${it.upstream.getId()} is not available, disabling")
val cm = (it.upstream.getMethods() as Multistream.DisabledCallMethods)
cm.disableMethodTemporarily(it.payload.method)
it.upstream.updateMethods(cm)
}
}
}
completeSpan(callRes, requestCount)
}
.doOnCancel {
tracer.currentSpan()?.tag(SPAN_STATUS_MESSAGE, SPAN_REQUEST_CANCELLED)?.end()
}
Expand Down Expand Up @@ -315,7 +328,6 @@ open class NativeCall(
""
}
val availableMethods = upstream.getMethods()

if (!availableMethods.isAvailable(method)) {
val errorMessage = "The method $method is not available"
return Mono.just(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,24 @@ open class GenericUpstreamCreator(
chainConfig,
) ?: return UpstreamCreationData.default()

val methods = buildMethods(config, chain)

val hashUrl = connection.let {
if (it.connectorMode == GenericConnectorFactory.ConnectorMode.RPC_REQUESTS_WITH_MIXED_HEAD.name) it.rpc?.url ?: it.ws?.url else it.ws?.url ?: it.rpc?.url
}
val hash = getHash(nodeId, hashUrl!!, hashes)
val buildMethodsFun = { a: UpstreamsConfig.Upstream<*>, b: Chain -> this.buildMethods(a, b) }

val upstream = GenericUpstream(
config.id!!,
config,
chain,
hash,
options,
config.role,
methods,
QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(config.labels)),
chainConfig,
connectorFactory,
cs::validator,
cs::upstreamSettingsDetector,
cs::upstreamRpcModulesDetector,
buildMethodsFun,
cs::lowerBoundService,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ abstract class UpstreamCreator(

protected fun buildMethods(config: UpstreamsConfig.Upstream<*>, chain: Chain): CallMethods {
return if (config.methods != null || config.methodGroups != null) {
if (!config.methodGroups!!.disabled.contains("filter")) {
a10zn8 marked this conversation as resolved.
Show resolved Hide resolved
if (config.methodGroups == null) {
config.methodGroups = UpstreamsConfig.MethodGroups(setOf("filter"), setOf())
} else {
config.methodGroups!!.enabled = config.methodGroups!!.enabled.plus("filter")
}
}
ManagedCallMethods(
delegate = callTargets.getDefaultMethods(chain, indexConfig.isChainEnabled(chain)),
enabled = config.methods?.enabled?.map { it.name }?.toSet() ?: emptySet(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ abstract class DefaultUpstream(
defaultAvail: UpstreamAvailability,
private val options: ChainOptions.Options,
private val role: UpstreamsConfig.UpstreamRole,
private val targets: CallMethods?,
private var targets: CallMethods?,
private val node: QuorumForLabels.QuorumItem?,
private val chainConfig: ChainsConfig.ChainConfig,
private val chain: Chain,
Expand Down Expand Up @@ -147,6 +147,11 @@ abstract class DefaultUpstream(
return targets ?: throw IllegalStateException("Methods are not set")
}

override fun updateMethods(m: CallMethods) {
targets = m
sendUpstreamStateEvent(UpstreamChangeEvent.ChangeType.UPDATED)
}

override fun nodeId(): Byte = hash

open fun getQuorumByLabel(): QuorumForLabels {
Expand Down
70 changes: 68 additions & 2 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
*/
package io.emeraldpay.dshackle.upstream

import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Defaults.Companion.multistreamUnavailableMethodDisableDuration
import io.emeraldpay.dshackle.cache.Caches
import io.emeraldpay.dshackle.cache.CachesEnabled
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.quorum.CallQuorum
import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.startup.QuorumForLabels
import io.emeraldpay.dshackle.startup.UpstreamChangeEvent
Expand Down Expand Up @@ -68,7 +72,7 @@ abstract class Multistream(
private var cacheSubscription: Disposable? = null

@Volatile
private var callMethods: CallMethods? = null
private var callMethods: DisabledCallMethods? = null
private var callMethodsFactory: Factory<CallMethods> = Factory {
return@Factory callMethods ?: throw FunctorException("Not initialized yet")
}
Expand Down Expand Up @@ -227,7 +231,16 @@ abstract class Multistream(
val upstreams = getAll()
val availableUpstreams = upstreams.filter { it.isAvailable() }
availableUpstreams.map { it.getMethods() }.let {
callMethods = AggregatedCallMethods(it)
if (callMethods == null) {
callMethods = DisabledCallMethods(this, multistreamUnavailableMethodDisableDuration, AggregatedCallMethods(it))
} else {
callMethods = DisabledCallMethods(
this,
multistreamUnavailableMethodDisableDuration,
AggregatedCallMethods(it),
callMethods!!.disabledMethods,
)
}
}
capabilities = if (upstreams.isEmpty()) {
emptySet()
Expand Down Expand Up @@ -320,6 +333,10 @@ abstract class Multistream(
return callMethods ?: throw IllegalStateException("Methods are not initialized yet")
}

override fun updateMethods(m: CallMethods) {
onUpstreamsUpdated()
}

fun getMethodsFactory(): Factory<CallMethods> {
return callMethodsFactory
}
Expand Down Expand Up @@ -552,6 +569,55 @@ abstract class Multistream(

// --------------------------------------------------------------------------------------------------------

class DisabledCallMethods(private val multistream: Multistream, private val defaultDisableTimeout: Long, private val callMethods: CallMethods) : CallMethods {
var disabledMethods: Cache<String, Boolean> = Caffeine.newBuilder()
.removalListener { key: String?, _: Boolean?, cause ->
if (cause.wasEvicted() && key != null) {
multistream.log.info("${multistream.getId()} restoring method $key")
multistream.onUpstreamsUpdated()
}
}
.expireAfterWrite(Duration.ofMinutes(defaultDisableTimeout))
.build<String, Boolean>()

constructor(
multistream: Multistream,
defaultDisableTimeout: Long,
callMethods: CallMethods,
disabledMethodsCopy: Cache<String, Boolean>,
) : this(multistream, defaultDisableTimeout, callMethods) {
disabledMethods = disabledMethodsCopy
}

override fun createQuorumFor(method: String): CallQuorum {
return callMethods.createQuorumFor(method)
}

override fun isCallable(method: String): Boolean {
return callMethods.isCallable(method) && disabledMethods.getIfPresent(method) == null
}

override fun getSupportedMethods(): Set<String> {
return callMethods.getSupportedMethods() - disabledMethods.asMap().keys
}

override fun isHardcoded(method: String): Boolean {
return callMethods.isHardcoded(method)
}

override fun executeHardcoded(method: String): ByteArray {
return callMethods.executeHardcoded(method)
}

override fun getGroupMethods(groupName: String): Set<String> {
return callMethods.getGroupMethods(groupName)
}

fun disableMethodTemporarily(method: String) {
disabledMethods.put(method, true)
}
}

class UpstreamStatus(val upstream: Upstream, val status: UpstreamAvailability)

class FilterBestAvailability : java.util.function.Function<UpstreamStatus, UpstreamAvailability> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ interface Upstream : Lifecycle {
fun getLag(): Long?
fun getLabels(): Collection<UpstreamsConfig.Labels>
fun getMethods(): CallMethods
fun updateMethods(m: CallMethods)
fun getId(): String
fun getCapabilities(): Set<Capability>
fun isGrpc(): Boolean
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.emeraldpay.dshackle.upstream

import com.fasterxml.jackson.core.type.TypeReference
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono

typealias UpstreamRpcModulesDetectorBuilder = (Upstream) -> UpstreamRpcModulesDetector?

abstract class UpstreamRpcModulesDetector(
a10zn8 marked this conversation as resolved.
Show resolved Hide resolved
private val upstream: Upstream,
) {
protected val log: Logger = LoggerFactory.getLogger(this::class.java)

open fun detectRpcModules(): Mono<HashMap<String, String>> {
return upstream.getIngressReader()
.read(rpcModulesRequest())
.flatMap(ChainResponse::requireResult)
.map(::parseRpcModules)
.onErrorResume {
log.warn("Can't detect rpc_modules of upstream ${upstream.getId()}, reason - {}", it.message)
Mono.just(HashMap())
}
}

protected abstract fun rpcModulesRequest(): ChainRequest

protected abstract fun parseRpcModules(data: ByteArray): HashMap<String, String>
}

class BasicEthUpstreamRpcModulesDetector(
upstream: Upstream,
) : UpstreamRpcModulesDetector(upstream) {
override fun rpcModulesRequest(): ChainRequest = ChainRequest("rpc_modules", ListParams())

override fun parseRpcModules(data: ByteArray): HashMap<String, String> {
return Global.objectMapper.readValue(data, object : TypeReference<HashMap<String, String>>() {})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig
import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.foundation.ChainOptions.Options
import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.upstream.BasicEthUpstreamRpcModulesDetector
import io.emeraldpay.dshackle.upstream.CachingReader
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.EgressSubscription
Expand All @@ -14,6 +15,7 @@ import io.emeraldpay.dshackle.upstream.IngressSubscription
import io.emeraldpay.dshackle.upstream.LogsOracle
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector
import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector
import io.emeraldpay.dshackle.upstream.UpstreamValidator
import io.emeraldpay.dshackle.upstream.calls.CallMethods
Expand Down Expand Up @@ -91,6 +93,10 @@ object EthereumChainSpecific : AbstractPollChainSpecific() {
return EthereumUpstreamValidator(chain, upstream, options, config)
}

override fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector {
return BasicEthUpstreamRpcModulesDetector(upstream)
}

override fun lowerBoundService(chain: Chain, upstream: Upstream): LowerBoundService {
return EthereumLowerBoundService(chain, upstream)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.NoIngressSubscription
import io.emeraldpay.dshackle.upstream.NoopCachingReader
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector
import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.calls.CallSelector
Expand Down Expand Up @@ -42,6 +43,10 @@ abstract class AbstractChainSpecific : ChainSpecific {
return null
}

override fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector? {
return null
}

override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription {
return NoIngressSubscription()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.emeraldpay.dshackle.upstream.IngressSubscription
import io.emeraldpay.dshackle.upstream.LogsOracle
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector
import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector
import io.emeraldpay.dshackle.upstream.UpstreamValidator
import io.emeraldpay.dshackle.upstream.beaconchain.BeaconChainSpecific
Expand Down Expand Up @@ -64,6 +65,8 @@ interface ChainSpecific {

fun upstreamSettingsDetector(chain: Chain, upstream: Upstream): UpstreamSettingsDetector?

fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector?

fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription

fun callSelector(caches: Caches): CallSelector?
Expand Down
Loading
Loading