diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 1d2266530..d7fe359f3 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -53,6 +53,7 @@ jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", ver jackson-datatype-jdk8 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jdk8", version.ref = "jackson" } jackson-datatype-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" } jackson-module-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" } +jackson-yaml = { module = "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml", version.ref = "jackson" } java-websocket = "org.java-websocket:Java-WebSocket:1.5.1" @@ -124,7 +125,7 @@ mockito-inline = "org.mockito:mockito-inline:4.0.0" apache-commons = ["commons-io", "apache-commons-lang3", "apache-commons-collections4"] grpc = ["grpc-protobuf", "grpc-stub", "grpc-netty", "grpc-proto-util", "grpc-services"] httpcomponents = ["httpcomponents-httpmime", "httpcomponents-httpclient"] -jackson = ["jackson-core", "jackson-databind", "jackson-datatype-jdk8", "jackson-datatype-jsr310", "jackson-module-kotlin"] +jackson = ["jackson-core", "jackson-databind", "jackson-datatype-jdk8", "jackson-datatype-jsr310", "jackson-module-kotlin", "jackson-yaml"] kotlin = ["kotlin-stdlib-jdk8", "kotlin-reflect"] netty = ["netty-common", "netty-transport", "netty-handler-core", "netty-handler-proxy", "netty-resolver-core", "netty-resolver-dns", "netty-codec-core", "netty-codec-http", "netty-codec-http2", "netty-buffer", "netty-tcnative-core"] reactor = ["reactor-core", "reactor-netty", "reactor-extra", "reactor-kotlin"] diff --git a/src/main/kotlin/io/emeraldpay/dshackle/Global.kt b/src/main/kotlin/io/emeraldpay/dshackle/Global.kt index 32118b677..3cefefc96 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/Global.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/Global.kt @@ -19,6 +19,7 @@ import com.fasterxml.jackson.core.Version import com.fasterxml.jackson.databind.DeserializationFeature import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.module.SimpleModule +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory import com.fasterxml.jackson.datatype.jdk8.Jdk8Module import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule import com.fasterxml.jackson.module.kotlin.registerKotlinModule @@ -64,6 +65,9 @@ class Global { @JvmStatic val objectMapper: ObjectMapper = createObjectMapper() + @JvmStatic + val yamlMapper: ObjectMapper = ObjectMapper(YAMLFactory()) + var version: String = "DEV" val control: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor() diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/context/HotConfig.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/context/HotConfig.kt new file mode 100644 index 000000000..dd6065ea5 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/context/HotConfig.kt @@ -0,0 +1,28 @@ +package io.emeraldpay.dshackle.config.context + +import io.emeraldpay.dshackle.config.hot.AutoReloadbleConfig +import io.emeraldpay.dshackle.config.hot.CompatibleVersionsRules +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.core.io.ClassPathResource +import java.util.function.Supplier + +@Configuration +open class HotConfig { + @Bean + open fun hotVersionsConfig( + @Value("\${compatibility.url}") + url: String, + @Value("\${compatibility.enabled}") + enabled: Boolean, + ): Supplier { + return if (enabled) { + val initialContent = + ClassPathResource("public/compatible-clients.yaml").inputStream.readBytes().toString(Charsets.UTF_8) + AutoReloadbleConfig(initialContent, url, CompatibleVersionsRules::class.java).also { it.start() } + } else { + Supplier { null } + } + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/hot/AutoReloadbleConfig.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/hot/AutoReloadbleConfig.kt new file mode 100644 index 000000000..be60bc7bf --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/hot/AutoReloadbleConfig.kt @@ -0,0 +1,50 @@ +package io.emeraldpay.dshackle.config.hot + +import io.emeraldpay.dshackle.Global +import org.slf4j.LoggerFactory +import org.springframework.web.client.RestTemplate +import reactor.core.publisher.Flux +import java.time.Duration +import java.util.concurrent.atomic.AtomicReference +import java.util.function.Supplier + +class AutoReloadbleConfig( + val initialContent: String, + val configUrl: String, + private val type: Class, +) : Supplier { + companion object { + private val log = LoggerFactory.getLogger(AutoReloadbleConfig::class.java) + } + + private val restTemplate = RestTemplate() + private val instance = AtomicReference() + + fun reload() { + try { + val response = restTemplate.getForObject(configUrl, String::class.java) + if (response != null) { + instance.set(parseConfig(response)) + } + } catch (e: Exception) { + log.error("Failed to reload config from $configUrl for type $type", e) + } + } + + fun start() { + instance.set(parseConfig(initialContent)) + Flux.interval( + Duration.ofSeconds(600), + ).subscribe { + reload() + } + } + + private fun parseConfig(config: String): T { + return Global.yamlMapper.readValue(config, type) + } + + override fun get(): T { + return instance.get() + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/hot/CompatibleVersionsRules.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/hot/CompatibleVersionsRules.kt new file mode 100644 index 000000000..1d2445675 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/hot/CompatibleVersionsRules.kt @@ -0,0 +1,20 @@ +package io.emeraldpay.dshackle.config.hot + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties +import com.fasterxml.jackson.annotation.JsonProperty + +@JsonIgnoreProperties(ignoreUnknown = true) +data class CompatibleVersionsRules( + @JsonProperty("rules") + val rules: List, +) + +@JsonIgnoreProperties(ignoreUnknown = true) +data class CompatibleVersionsRule( + @JsonProperty("client") + val client: String, + @JsonProperty("blacklist") + val blacklist: List?, + @JsonProperty("whitelist") + val whitelist: List?, +) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt index bcd773f2a..fb739d8e1 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt @@ -520,7 +520,7 @@ open class NativeCall( "reason", reason, "chain", - ctx.upstream.chain.chainCode, + ctx.upstream.getChain().chainCode, ).increment() } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatus.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatus.kt index 3322a06d6..319361fbe 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatus.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatus.kt @@ -29,7 +29,7 @@ class SubscribeChainStatus( // we need to track not only multistreams with upstreams but all of them // because upstreams can be added in runtime with hot config reload multistreamHolder.all() - .filter { Common.ChainRef.forNumber(it.chain.id) != null } + .filter { Common.ChainRef.forNumber(it.getChain().id) != null } .map { ms -> Flux.concat( // the first event must be filled with all fields @@ -52,7 +52,7 @@ class SubscribeChainStatus( .map { events -> val response = BlockchainOuterClass.SubscribeChainStatusResponse.newBuilder() val chainDescription = BlockchainOuterClass.ChainDescription.newBuilder() - .setChain(Common.ChainRef.forNumber(ms.chain.id)) + .setChain(Common.ChainRef.forNumber(ms.getChain().id)) events.forEach { chainDescription.addChainEvent(processMsEvent(it)) @@ -100,7 +100,7 @@ class SubscribeChainStatus( BlockchainOuterClass.SubscribeChainStatusResponse.newBuilder() .setChainDescription( BlockchainOuterClass.ChainDescription.newBuilder() - .setChain(Common.ChainRef.forNumber(ms.chain.id)) + .setChain(Common.ChainRef.forNumber(ms.getChain().id)) .addChainEvent(chainEventMapper.mapHead(it)) .build(), ) @@ -112,7 +112,7 @@ class SubscribeChainStatus( return BlockchainOuterClass.SubscribeChainStatusResponse.newBuilder() .setChainDescription( BlockchainOuterClass.ChainDescription.newBuilder() - .setChain(Common.ChainRef.forNumber(ms.chain.id)) + .setChain(Common.ChainRef.forNumber(ms.getChain().id)) .addChainEvent(chainEventMapper.chainStatus(ms.getStatus())) .addChainEvent(chainEventMapper.mapHead(head)) .addChainEvent(chainEventMapper.supportedMethods(ms.getMethods().getSupportedMethods())) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeNodeStatus.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeNodeStatus.kt index b81c99fa8..c7e8477aa 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeNodeStatus.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeNodeStatus.kt @@ -120,7 +120,7 @@ class SubscribeNodeStatus( .setDescription( NodeDescription.newBuilder() .setNodeId(upstream.nodeId().toInt()) - .setChain(Common.ChainRef.forNumber(ms.chain.id)) + .setChain(Common.ChainRef.forNumber(ms.getChain().id)) .build(), ) .setNodeId(upstream.getId()) @@ -148,7 +148,7 @@ class SubscribeNodeStatus( private fun buildDescription(ms: Multistream, up: Upstream): NodeDescription.Builder { val builder = NodeDescription.newBuilder() - .setChain(Common.ChainRef.forNumber(ms.chain.id)) + .setChain(Common.ChainRef.forNumber(ms.getChain().id)) .setNodeId(up.nodeId().toInt()) .addAllNodeLabels( up.getLabels().map { nodeLabels -> diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/EthereumUpstreamCreator.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/EthereumUpstreamCreator.kt index fd7dabdb7..54df5dbc1 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/EthereumUpstreamCreator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/EthereumUpstreamCreator.kt @@ -4,9 +4,11 @@ import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.config.ChainsConfig import io.emeraldpay.dshackle.config.IndexConfig import io.emeraldpay.dshackle.config.UpstreamsConfig +import io.emeraldpay.dshackle.config.hot.CompatibleVersionsRules import io.emeraldpay.dshackle.foundation.ChainOptions import io.emeraldpay.dshackle.upstream.CallTargetsHolder import org.springframework.stereotype.Component +import java.util.function.Supplier @Component class EthereumUpstreamCreator( @@ -14,7 +16,8 @@ class EthereumUpstreamCreator( indexConfig: IndexConfig, callTargets: CallTargetsHolder, connectorFactoryCreatorResolver: ConnectorFactoryCreatorResolver, -) : GenericUpstreamCreator(chainsConfig, indexConfig, callTargets, connectorFactoryCreatorResolver) { + versionRules: Supplier, +) : GenericUpstreamCreator(chainsConfig, indexConfig, callTargets, connectorFactoryCreatorResolver, versionRules) { override fun createUpstream( upstreamsConfig: UpstreamsConfig.Upstream<*>, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt index 243f4d369..945e438c5 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt @@ -4,6 +4,7 @@ import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.config.ChainsConfig import io.emeraldpay.dshackle.config.IndexConfig import io.emeraldpay.dshackle.config.UpstreamsConfig +import io.emeraldpay.dshackle.config.hot.CompatibleVersionsRules import io.emeraldpay.dshackle.foundation.ChainOptions import io.emeraldpay.dshackle.startup.QuorumForLabels import io.emeraldpay.dshackle.upstream.BlockValidator @@ -13,6 +14,7 @@ import io.emeraldpay.dshackle.upstream.generic.ChainSpecificRegistry import io.emeraldpay.dshackle.upstream.generic.GenericUpstream import io.emeraldpay.dshackle.upstream.generic.connectors.GenericConnectorFactory import org.springframework.stereotype.Component +import java.util.function.Supplier @Component open class GenericUpstreamCreator( @@ -20,6 +22,7 @@ open class GenericUpstreamCreator( indexConfig: IndexConfig, callTargets: CallTargetsHolder, private val connectorFactoryCreatorResolver: ConnectorFactoryCreatorResolver, + private val versionRules: Supplier, ) : UpstreamCreator(chainsConfig, indexConfig, callTargets) { private val hashes: MutableMap = HashMap() @@ -85,6 +88,7 @@ open class GenericUpstreamCreator( buildMethodsFun, cs::lowerBoundService, cs::finalizationDetectorBuilder, + versionRules, ) upstream.start() diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/UpstreamFactory.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/UpstreamFactory.kt index 9bef5c111..5671c671c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/UpstreamFactory.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/UpstreamFactory.kt @@ -43,6 +43,6 @@ class UpstreamFactory( config: UpstreamsConfig.Upstream, chainsConfig: ChainsConfig, ): GrpcUpstreams { - return grpcUpstreamCreator.creatGrpcUpstream(config, chainsConfig) + return grpcUpstreamCreator.createGrpcUpstream(config, chainsConfig) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/BasicEthUpstreamValidator.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/BasicEthUpstreamValidator.kt deleted file mode 100644 index 602bfb2a2..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/BasicEthUpstreamValidator.kt +++ /dev/null @@ -1,94 +0,0 @@ -package io.emeraldpay.dshackle.upstream - -import io.emeraldpay.dshackle.Defaults -import io.emeraldpay.dshackle.foundation.ChainOptions -import reactor.core.publisher.Mono -import java.util.concurrent.TimeoutException -import java.util.function.Supplier - -abstract class BasicEthUpstreamValidator( - upstream: Upstream, - options: ChainOptions.Options, -) : UpstreamValidator(upstream, options) { - - override fun validate(): Mono { - return Mono.zip( - validatorFunctions().map { it.get() }, - ) { a -> a.map { it as UpstreamAvailability } } - .map(::resolve) - .defaultIfEmpty(UpstreamAvailability.UNAVAILABLE) - .onErrorResume { - log.error("Error during upstream validation for ${upstream.getId()}", it) - Mono.just(UpstreamAvailability.UNAVAILABLE) - } - } - - protected fun validateSyncing(): Mono { - if (!options.validateSyncing) { - return Mono.just(UpstreamAvailability.OK) - } - val validateSyncingRequest = validateSyncingRequest() - return upstream.getIngressReader() - .read(validateSyncingRequest.request) - .flatMap(ChainResponse::requireResult) - .map { validateSyncingRequest.mapper(it) } - .timeout( - Defaults.timeoutInternal, - Mono.fromCallable { log.warn("No response for ${validateSyncingRequest.request.method} from ${upstream.getId()}") } - .then(Mono.error(TimeoutException("Validation timeout for Syncing"))), - ) - .map { - upstream.getHead().onSyncingNode(it) - if (it) { - UpstreamAvailability.SYNCING - } else { - UpstreamAvailability.OK - } - } - .doOnError { err -> log.error("Error during syncing validation for ${upstream.getId()}", err) } - .onErrorReturn(UpstreamAvailability.UNAVAILABLE) - } - - protected fun validatePeers(): Mono { - if (!options.validatePeers || options.minPeers == 0) { - return Mono.just(UpstreamAvailability.OK) - } - val validatePeersRequest = validatePeersRequest() - return upstream - .getIngressReader() - .read(validatePeersRequest.request) - .flatMap(ChainResponse::checkError) - .map { validatePeersRequest.mapper(it) } - .timeout( - Defaults.timeoutInternal, - Mono.fromCallable { log.warn("No response for ${validatePeersRequest.request.method} from ${upstream.getId()}") } - .then(Mono.error(TimeoutException("Validation timeout for Peers"))), - ) - .map { count -> - val minPeers = options.minPeers - if (count < minPeers) { - UpstreamAvailability.IMMATURE - } else { - UpstreamAvailability.OK - } - } - .doOnError { err -> log.error("Error during peer count validation for ${upstream.getId()}", err) } - .onErrorReturn(UpstreamAvailability.UNAVAILABLE) - } - - protected abstract fun validateSyncingRequest(): ValidateSyncingRequest - - protected abstract fun validatePeersRequest(): ValidatePeersRequest - - protected abstract fun validatorFunctions(): List>> - - data class ValidateSyncingRequest( - val request: ChainRequest, - val mapper: (ByteArray) -> Boolean, - ) - - data class ValidatePeersRequest( - val request: ChainRequest, - val mapper: (ChainResponse) -> Int, - ) -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/CurrentMultistreamHolder.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/CurrentMultistreamHolder.kt index dca5e1ba4..c3d340564 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/CurrentMultistreamHolder.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/CurrentMultistreamHolder.kt @@ -28,7 +28,7 @@ open class CurrentMultistreamHolder( private val log = LoggerFactory.getLogger(CurrentMultistreamHolder::class.java) - private val chainMapping = multistreams.associateBy { it.chain } + private val chainMapping = multistreams.associateBy { it.getChain() } override fun getUpstream(chain: Chain): Multistream { return chainMapping.getValue(chain) @@ -37,7 +37,7 @@ open class CurrentMultistreamHolder( override fun getAvailable(): List { return chainMapping.values.asSequence() .filter { it.haveUpstreams() } - .map { it.chain } + .map { it.getChain() } .toList() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt index 7bbc62e12..f8f57a037 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt @@ -175,4 +175,8 @@ abstract class DefaultUpstream( } data class Status(val lag: Long?, val avail: UpstreamAvailability, val status: UpstreamAvailability) + + override fun getChain(): Chain { + return chain + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt index 70d276c30..e1a58f8bd 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt @@ -51,7 +51,7 @@ import java.util.concurrent.ConcurrentHashMap * Aggregation of multiple upstreams responding to a single blockchain */ abstract class Multistream( - val chain: Chain, + private val chain: Chain, val caches: Caches, val callSelector: CallSelector?, multistreamEventsScheduler: Scheduler, @@ -530,4 +530,8 @@ abstract class Multistream( } abstract fun getEgressSubscription(): EgressSubscription + + override fun getChain(): Chain { + return chain + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt index b4aca8b73..4e9e67fe6 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt @@ -16,6 +16,7 @@ */ package io.emeraldpay.dshackle.upstream +import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.foundation.ChainOptions import io.emeraldpay.dshackle.reader.ChainReader @@ -55,6 +56,8 @@ interface Upstream : Lifecycle { fun getUpstreamSettingsData(): UpstreamSettingsData? fun updateLowerBound(lowerBound: Long, type: LowerBoundType) + fun getChain(): Chain + fun cast(selfType: Class): T fun nodeId(): Byte diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamSettingsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamSettingsDetector.kt index 69b6a0e18..8037870ac 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamSettingsDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamSettingsDetector.kt @@ -68,7 +68,7 @@ abstract class BasicUpstreamSettingsDetector( } abstract class BasicEthUpstreamSettingsDetector( - upstream: Upstream, + val upstream: Upstream, ) : BasicUpstreamSettingsDetector(upstream) { abstract fun mapping(node: JsonNode): String diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamValidator.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamValidator.kt index 55d622369..69d043eaa 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamValidator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamValidator.kt @@ -1,27 +1,34 @@ package io.emeraldpay.dshackle.upstream import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig +import io.emeraldpay.dshackle.config.hot.CompatibleVersionsRules import io.emeraldpay.dshackle.foundation.ChainOptions -import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator +import io.emeraldpay.dshackle.upstream.UpstreamAvailability.OK +import io.emeraldpay.dshackle.upstream.UpstreamAvailability.UNAVAILABLE +import org.slf4j.Logger import org.slf4j.LoggerFactory +import org.springframework.scheduling.concurrent.CustomizableThreadFactory import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers import java.time.Duration +import java.util.concurrent.Executors +import java.util.concurrent.TimeoutException +import java.util.function.Supplier -typealias UpstreamValidatorBuilder = (Chain, Upstream, ChainOptions.Options, ChainConfig) -> UpstreamValidator? +typealias UpstreamValidatorBuilder = (Chain, Upstream, ChainOptions.Options, ChainConfig, Supplier) -> UpstreamValidator? abstract class UpstreamValidator( val upstream: Upstream, val options: ChainOptions.Options, ) { - protected val log = LoggerFactory.getLogger(this::class.java) - fun start(): Flux { return Flux.interval( Duration.ZERO, Duration.ofSeconds(options.validationInterval.toLong()), - ).subscribeOn(EthereumUpstreamValidator.scheduler) + ).subscribeOn(scheduler) .flatMap { validate() } @@ -49,6 +56,12 @@ abstract class UpstreamValidator( val cp = Comparator { res1: ValidateUpstreamSettingsResult, res2: ValidateUpstreamSettingsResult -> if (res1.priority < res2.priority) -1 else 1 } return results.sortedWith(cp).last() } + + @JvmStatic + protected val log: Logger = LoggerFactory.getLogger(UpstreamValidator::class.java) + + val scheduler = + Schedulers.fromExecutor(Executors.newFixedThreadPool(4, CustomizableThreadFactory("upstream-validator"))) } } @@ -58,7 +71,63 @@ enum class ValidateUpstreamSettingsResult(val priority: Int) { UPSTREAM_FATAL_SETTINGS_ERROR(2), } -data class SingleCallValidator( +interface SingleValidator { + fun validate(onError: T): Mono +} + +class GenericSingleCallValidator( val method: ChainRequest, + val upstream: Upstream, val check: (ByteArray) -> T, -) +) : SingleValidator { + + companion object { + @JvmStatic + val log: Logger = LoggerFactory.getLogger(GenericSingleCallValidator::class.java) + } + override fun validate(onError: T): Mono { + return upstream.getIngressReader() + .read(method) + .flatMap(ChainResponse::requireResult) + .map { check(it) } + .timeout( + Defaults.timeoutInternal, + Mono.fromCallable { log.warn("No response for ${method.method} from ${upstream.getId()}") } + .then(Mono.error(TimeoutException("Validation timeout for ${method.method}"))), + ) + .doOnError { err -> log.error("Error during ${method.method} validation for ${upstream.getId()}", err) } + .onErrorReturn(onError) + } +} + +class VersionValidator( + private val upstream: Upstream, + private val versionsConfig: Supplier, +) : SingleValidator { + companion object { + private val log = LoggerFactory.getLogger(VersionValidator::class.java) + } + override fun validate(onError: UpstreamAvailability): Mono { + if (upstream.getUpstreamSettingsData() == null) { + // for now - just skip validation + log.info("Empty settings for upstream ${upstream.getId()}, skipping version validation") + return Mono.just(OK) + } + val type = upstream.getLabels().first().getOrDefault("client_type", "unknown") + val version = upstream.getLabels().first().getOrDefault("client_version", "unknown") + val rule = versionsConfig.get()!!.rules.find { it.client == type } + if (rule == null) { + log.info("No rules for client type $type, skipping validation for upstream ${upstream.getId()}") + return Mono.just(OK) + } + if (!rule.whitelist.isNullOrEmpty() && !rule.whitelist.contains(version)) { + log.warn("Version $version is in not in defined whitelist for $type, please change client version for upstream ${upstream.getId()}") + return Mono.just(UNAVAILABLE) + } + if (!rule.blacklist.isNullOrEmpty() && rule.blacklist.contains(version)) { + log.warn("Version $version is in defined blacklist for $type, please change client version for upstream ${upstream.getId()}") + return Mono.just(UNAVAILABLE) + } + return Mono.just(OK) + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainSpecific.kt index d0634e581..f9065b099 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainSpecific.kt @@ -1,5 +1,7 @@ package io.emeraldpay.dshackle.upstream.beaconchain +import com.fasterxml.jackson.annotation.JsonIgnoreProperties +import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.core.JsonParser import com.fasterxml.jackson.databind.DeserializationContext import com.fasterxml.jackson.databind.JsonDeserializer @@ -7,15 +9,18 @@ import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.module.kotlin.readValue import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Global -import io.emeraldpay.dshackle.config.ChainsConfig +import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.data.BlockId -import io.emeraldpay.dshackle.foundation.ChainOptions +import io.emeraldpay.dshackle.foundation.ChainOptions.Options import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.GenericSingleCallValidator +import io.emeraldpay.dshackle.upstream.SingleValidator import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector -import io.emeraldpay.dshackle.upstream.UpstreamValidator +import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult import io.emeraldpay.dshackle.upstream.generic.AbstractPollChainSpecific import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService import io.emeraldpay.dshackle.upstream.rpcclient.RestParams @@ -61,13 +66,54 @@ object BeaconChainSpecific : AbstractPollChainSpecific() { return BeaconChainUpstreamSettingsDetector(upstream) } - override fun validator( + override fun upstreamValidators( chain: Chain, upstream: Upstream, - options: ChainOptions.Options, - config: ChainsConfig.ChainConfig, - ): UpstreamValidator { - return BeaconChainValidator(upstream, options) + options: Options, + config: ChainConfig, + ): List> { + var validators = listOf( + GenericSingleCallValidator( + ChainRequest("GET#/eth/v1/node/health", RestParams.emptyParams()), + upstream, + ) { _ -> UpstreamAvailability.OK }, + ) + if (options.validateSyncing) { + validators += GenericSingleCallValidator( + ChainRequest("GET#/eth/v1/node/syncing", RestParams.emptyParams()), + upstream, + ) { data -> + val syncing = Global.objectMapper.readValue(data, BeaconChainSyncing::class.java).data.isSyncing + upstream.getHead().onSyncingNode(syncing) + if (syncing) { + UpstreamAvailability.SYNCING + } else { + UpstreamAvailability.OK + } + } + } + if (options.validatePeers && options.minPeers > 0) { + validators += GenericSingleCallValidator( + ChainRequest("GET#/eth/v1/node/peer_count", RestParams.emptyParams()), + upstream, + ) { data -> + val connected = Global.objectMapper.readValue( + data, + BeaconChainPeers::class.java, + ).data.connected.toInt() + if (connected < options.minPeers) UpstreamAvailability.IMMATURE else UpstreamAvailability.OK + } + } + return validators + } + + override fun upstreamSettingsValidators( + chain: Chain, + upstream: Upstream, + options: Options, + config: ChainConfig, + ): List> { + return emptyList() } override fun lowerBoundService(chain: Chain, upstream: Upstream): LowerBoundService { @@ -94,3 +140,25 @@ class BeaconChainBlockHeaderDeserializer : JsonDeserializer>> { - return listOf( - Supplier { validateSyncing() }, - Supplier { validateHealth() }, - Supplier { validatePeers() }, - ) - } - - override fun validateUpstreamSettings(): Mono { - return Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_VALID) - } - - private fun validateHealth(): Mono { - return upstream.getIngressReader() - .read(ChainRequest("GET#/eth/v1/node/health", RestParams.emptyParams())) - .flatMap(ChainResponse::requireResult) - .map { UpstreamAvailability.OK } - .timeout( - Defaults.timeoutInternal, - Mono.fromCallable { log.warn("No response for /eth/v1/node/health from ${upstream.getId()}") } - .then(Mono.error(TimeoutException("Validation timeout for /eth/v1/node/health"))), - ) - .doOnError { err -> log.error("Error during /eth/v1/node/health validation for ${upstream.getId()}", err) } - .onErrorReturn(UpstreamAvailability.UNAVAILABLE) - } - - override fun validateSyncingRequest(): ValidateSyncingRequest { - return ValidateSyncingRequest( - ChainRequest("GET#/eth/v1/node/syncing", RestParams.emptyParams()), - ) { bytes -> Global.objectMapper.readValue(bytes, BeaconChainSyncing::class.java).data.isSyncing } - } - - override fun validatePeersRequest(): ValidatePeersRequest { - return ValidatePeersRequest( - ChainRequest("GET#/eth/v1/node/peer_count", RestParams.emptyParams()), - ) { resp -> - Global.objectMapper.readValue( - resp.getResult(), - BeaconChainPeers::class.java, - ).data.connected.toInt() - } - } - - private data class BeaconChainSyncing( - @JsonProperty("data") - val data: BeaconChainSyncingData, - ) - - @JsonIgnoreProperties(ignoreUnknown = true) - private data class BeaconChainSyncingData( - @JsonProperty("is_syncing") - val isSyncing: Boolean, - ) - - private data class BeaconChainPeers( - @JsonProperty("data") - val data: BeaconChainPeersData, - ) - - @JsonIgnoreProperties(ignoreUnknown = true) - private data class BeaconChainPeersData( - @JsonProperty("connected") - val connected: String, - ) -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinMultistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinMultistream.kt index 2ba847c19..e495da824 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinMultistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinMultistream.kt @@ -96,7 +96,7 @@ open class BitcoinMultistream( apis.request(1) return Mono.from(apis) .map(Upstream::getIngressReader) - .switchIfEmpty(Mono.error(Exception("No API available for $chain"))) + .switchIfEmpty(Mono.error(Exception("No API available for ${getChain()}"))) } override fun getLocalReader(): Mono { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt index 1852588ca..8d762627b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt @@ -110,7 +110,7 @@ open class BitcoinRpcUpstream( } override fun start() { - log.info("Configured for ${chain.chainName}") + log.info("Configured for ${getChain().chainName}") if (head is Lifecycle) { if (!head.isRunning()) { head.start() diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinUpstream.kt index 2f9cdf9df..7c7d682db 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinUpstream.kt @@ -26,7 +26,7 @@ import io.emeraldpay.dshackle.upstream.calls.DefaultBitcoinMethods abstract class BitcoinUpstream( id: String, - val chain: Chain, + private val chain: Chain, options: ChainOptions.Options, role: UpstreamsConfig.UpstreamRole, callMethods: CallMethods, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosChainSpecific.kt index 1ec56c6e0..e1094ab9c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosChainSpecific.kt @@ -10,13 +10,13 @@ import io.emeraldpay.dshackle.data.BlockId import io.emeraldpay.dshackle.foundation.ChainOptions.Options import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.ChainRequest -import io.emeraldpay.dshackle.upstream.SingleCallValidator +import io.emeraldpay.dshackle.upstream.GenericSingleCallValidator +import io.emeraldpay.dshackle.upstream.SingleValidator import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.UpstreamAvailability.OK -import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult import io.emeraldpay.dshackle.upstream.generic.AbstractPollChainSpecific -import io.emeraldpay.dshackle.upstream.generic.GenericUpstreamValidator import io.emeraldpay.dshackle.upstream.rpcclient.ListParams import io.emeraldpay.dshackle.upstream.rpcclient.ObjectParams import org.slf4j.LoggerFactory @@ -75,28 +75,38 @@ object CosmosChainSpecific : AbstractPollChainSpecific() { override fun unsubscribeNewHeadsRequest(subId: String) = throw NotImplementedError() // ChainRequest("unsubscribe", ListParams("tm.event = 'NewBlockHeader'")) - override fun validator(chain: Chain, upstream: Upstream, options: Options, config: ChainConfig): UpstreamValidator { - return GenericUpstreamValidator( - upstream, - options, - listOf( - SingleCallValidator( - ChainRequest("health", ListParams()), - ) { _ -> OK }, - ), - listOf( - SingleCallValidator( - ChainRequest("status", ListParams()), - ) { data -> - val resp = Global.objectMapper.readValue(data, CosmosStatus::class.java) - if (chain.chainId.isNotEmpty() && resp.nodeInfo.network.lowercase() != chain.chainId.lowercase()) { - ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR - } else { - ValidateUpstreamSettingsResult.UPSTREAM_VALID - } - }, - ), + override fun upstreamValidators( + chain: Chain, + upstream: Upstream, + options: Options, + config: ChainConfig, + ): List> { + return listOf( + GenericSingleCallValidator( + ChainRequest("health", ListParams()), + upstream, + ) { _ -> OK }, + ) + } + override fun upstreamSettingsValidators( + chain: Chain, + upstream: Upstream, + options: Options, + config: ChainConfig, + ): List> { + return listOf( + GenericSingleCallValidator( + ChainRequest("status", ListParams()), + upstream, + ) { data -> + val resp = Global.objectMapper.readValue(data, CosmosStatus::class.java) + if (chain.chainId.isNotEmpty() && resp.nodeInfo.network.lowercase() != chain.chainId.lowercase()) { + ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR + } else { + ValidateUpstreamSettingsResult.UPSTREAM_VALID + } + }, ) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosUpstreamSettingsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosUpstreamSettingsDetector.kt index 383a8998c..db8434145 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosUpstreamSettingsDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosUpstreamSettingsDetector.kt @@ -30,8 +30,8 @@ class CosmosUpstreamSettingsDetector( override fun nodeTypeRequest(): NodeTypeRequest = NodeTypeRequest(clientVersionRequest()) - override fun clientType(node: JsonNode): String? = null + override fun clientType(node: JsonNode): String = "cosmos" - override fun clientVersion(node: JsonNode): String? = + override fun clientVersion(node: JsonNode): String = node.get("node_info")?.get("version")?.asText() ?: UNKNOWN_CLIENT_VERSION } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt index 8776eb3d6..dc9332def 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt @@ -1,6 +1,7 @@ package io.emeraldpay.dshackle.upstream.ethereum import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.cache.Caches import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig import io.emeraldpay.dshackle.data.BlockContainer @@ -10,14 +11,17 @@ import io.emeraldpay.dshackle.upstream.BasicEthUpstreamRpcModulesDetector import io.emeraldpay.dshackle.upstream.CachingReader import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.EgressSubscription +import io.emeraldpay.dshackle.upstream.GenericSingleCallValidator import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.LogsOracle import io.emeraldpay.dshackle.upstream.Multistream +import io.emeraldpay.dshackle.upstream.SingleValidator import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector -import io.emeraldpay.dshackle.upstream.UpstreamValidator +import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.calls.CallSelector import io.emeraldpay.dshackle.upstream.calls.EthereumCallSelector @@ -31,11 +35,16 @@ import io.emeraldpay.dshackle.upstream.generic.CachingReaderBuilder import io.emeraldpay.dshackle.upstream.generic.GenericUpstream import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService import io.emeraldpay.dshackle.upstream.rpcclient.ListParams +import org.slf4j.Logger +import org.slf4j.LoggerFactory import org.springframework.cloud.sleuth.Tracer import reactor.core.publisher.Mono import reactor.core.scheduler.Scheduler object EthereumChainSpecific : AbstractPollChainSpecific() { + + private val log: Logger = LoggerFactory.getLogger(EthereumChainSpecific::class.java) + override fun parseBlock(data: ByteArray, upstreamId: String): BlockContainer { return BlockContainer.fromEthereumJson(data, upstreamId) } @@ -85,13 +94,61 @@ object EthereumChainSpecific : AbstractPollChainSpecific() { return { ms, caches, methodsFactory -> EthereumCachingReader(ms, caches, methodsFactory, tracer) } } - override fun validator( + override fun upstreamValidators( chain: Chain, upstream: Upstream, options: Options, config: ChainConfig, - ): UpstreamValidator { - return EthereumUpstreamValidator(chain, upstream, options, config) + ): List> { + var validators = emptyList>() + if (options.validateSyncing) { + validators += GenericSingleCallValidator( + ChainRequest("eth_syncing", ListParams()), + upstream, + ) { data -> + val raw = Global.objectMapper.readTree(data) + if (raw.isBoolean) { + upstream.getHead().onSyncingNode(raw.asBoolean()) + if (raw.asBoolean()) { + UpstreamAvailability.SYNCING + } else { + UpstreamAvailability.OK + } + } else { + log.warn("Received syncing object ${raw.toPrettyString()} for upstream ${upstream.getId()}") + UpstreamAvailability.SYNCING + } + } + } + if (options.validatePeers) { + validators += GenericSingleCallValidator( + ChainRequest("net_peerCount", ListParams()), + upstream, + ) { data -> + val peers = Integer.decode(String(data).trim('"')) + if (peers < options.minPeers) UpstreamAvailability.IMMATURE else UpstreamAvailability.OK + } + } + return validators + } + + override fun upstreamSettingsValidators( + chain: Chain, + upstream: Upstream, + options: Options, + config: ChainConfig, + ): List> { + val limitValidator = callLimitValidatorFactory(upstream, options, config, chain) + + return listOf( + ChainIdValidator(upstream, chain), + OldBlockValidator(upstream), + GasPriceValidator(upstream, config), + ) + if (limitValidator.isEnabled()) { + listOf(limitValidator) + } else { + emptyList() + } } override fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamSettingsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamSettingsDetector.kt index 6225c5fc1..430f70e6e 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamSettingsDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamSettingsDetector.kt @@ -15,9 +15,9 @@ import reactor.core.publisher.Mono const val ZERO_ADDRESS = "0x0000000000000000000000000000000000000000" class EthereumUpstreamSettingsDetector( - private val upstream: Upstream, + private val _upstream: Upstream, private val chain: Chain, -) : BasicEthUpstreamSettingsDetector(upstream) { +) : BasicEthUpstreamSettingsDetector(_upstream) { private val blockNumberReader = EthereumArchiveBlockNumberReader(upstream.getIngressReader()) override fun detectLabels(): Flux> { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt index 204c35565..ef04afff2 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt @@ -16,101 +16,140 @@ */ package io.emeraldpay.dshackle.upstream.ethereum -import com.fasterxml.jackson.databind.ObjectMapper import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig import io.emeraldpay.dshackle.foundation.ChainOptions -import io.emeraldpay.dshackle.upstream.BasicEthUpstreamValidator import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.ChainResponse +import io.emeraldpay.dshackle.upstream.SingleValidator import io.emeraldpay.dshackle.upstream.Upstream -import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult import io.emeraldpay.dshackle.upstream.ethereum.domain.Address import io.emeraldpay.dshackle.upstream.ethereum.hex.HexData import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionCallJson import io.emeraldpay.dshackle.upstream.rpcclient.ListParams -import org.springframework.scheduling.concurrent.CustomizableThreadFactory +import org.slf4j.Logger +import org.slf4j.LoggerFactory import reactor.core.publisher.Mono -import reactor.core.scheduler.Schedulers import reactor.kotlin.extra.retry.retryRandomBackoff import java.time.Duration -import java.util.concurrent.Executors import java.util.concurrent.TimeoutException -import java.util.function.Supplier +interface CallLimitValidator : SingleValidator { + fun isEnabled(): Boolean +} + +abstract class AbstractCallLimitValidator( + private val upstream: Upstream, + private val options: ChainOptions.Options, +) : CallLimitValidator { -open class EthereumUpstreamValidator @JvmOverloads constructor( - private val chain: Chain, - upstream: Upstream, - options: ChainOptions.Options, - private val config: ChainConfig, -) : BasicEthUpstreamValidator(upstream, options) { companion object { - val scheduler = - Schedulers.fromExecutor(Executors.newCachedThreadPool(CustomizableThreadFactory("ethereum-validator"))) + @JvmStatic + val log: Logger = LoggerFactory.getLogger(AbstractCallLimitValidator::class.java) } - - private val objectMapper: ObjectMapper = Global.objectMapper - - override fun validateSyncingRequest(): ValidateSyncingRequest { - return ValidateSyncingRequest( - ChainRequest("eth_syncing", ListParams()), - ) { bytes -> - val raw = Global.objectMapper.readTree(bytes) - if (raw.isBoolean) { - raw.asBoolean() - } else { - log.warn("Received syncing object ${raw.toPrettyString()} for upstream ${upstream.getId()}") - true + override fun validate(onError: ValidateUpstreamSettingsResult): Mono { + return upstream.getIngressReader() + .read(createRequest()) + .flatMap(ChainResponse::requireResult) + .map { ValidateUpstreamSettingsResult.UPSTREAM_VALID } + .onErrorResume { + if (isLimitError(it)) { + log.warn( + "Error: ${it.message}. Node ${upstream.getId()} is probably incorrectly configured. " + + "You need to set up your return limit to at least ${options.callLimitSize}. " + + "Erigon config example: https://github.com/ledgerwatch/erigon/blob/d014da4dc039ea97caf04ed29feb2af92b7b129d/cmd/utils/flags.go#L369", + ) + Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR) + } else { + Mono.error(it) + } } - } + .timeout( + Defaults.timeoutInternal, + Mono.fromCallable { log.error("No response for eth_call limit check from ${upstream.getId()}") } + .then(Mono.error(TimeoutException("Validation timeout for call limit"))), + ) + .retryRandomBackoff(3, Duration.ofMillis(100), Duration.ofMillis(500)) { ctx -> + log.warn( + "error during validateCallLimit for ${upstream.getId()}, iteration ${ctx.iteration()}, " + + "message ${ctx.exception().message}", + ) + } + .onErrorReturn(ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR) } - override fun validatePeersRequest(): ValidatePeersRequest { - return ValidatePeersRequest( - ChainRequest("net_peerCount", ListParams()), - ) { resp -> Integer.decode(resp.getResultAsProcessedString()) } - } + abstract fun createRequest(): ChainRequest - override fun validatorFunctions(): List>> { - return listOf( - Supplier { validateSyncing() }, - Supplier { validatePeers() }, - ) - } + abstract fun isLimitError(err: Throwable): Boolean +} - override fun validateUpstreamSettings(): Mono { - if (options.disableUpstreamValidation) { - return Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_VALID) - } - return Mono.zip( - validateChain(), - validateOldBlocks(), - ).map { - listOf(it.t1, it.t2).maxOf { it } - } - } +class EthCallLimitValidator( + upstream: Upstream, + private val options: ChainOptions.Options, + private val config: ChainConfig, +) : AbstractCallLimitValidator(upstream, options) { + override fun isEnabled() = options.validateCallLimit && config.callLimitContract != null - override fun validateUpstreamSettingsOnStartup(): ValidateUpstreamSettingsResult { - if (options.disableUpstreamValidation) { - return ValidateUpstreamSettingsResult.UPSTREAM_VALID - } - return Mono.zip( - validateChain(), - validateOldBlocks(), - validateCallLimit(), - validateGasPrice(), - ).map { - listOf(it.t1, it.t2, it.t3, it.t4).maxOf { it } - }.block() ?: ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR + override fun createRequest() = ChainRequest( + "eth_call", + ListParams( + TransactionCallJson( + Address.from(config.callLimitContract), + // contract like https://github.com/drpcorg/dshackle/pull/246 + // meta + size in hex + HexData.from("0xd8a26e3a" + options.callLimitSize.toString(16).padStart(64, '0')), + ), + "latest", + ), + ) + + override fun isLimitError(err: Throwable): Boolean = + err.message != null && err.message!!.contains("rpc.returndata.limit") +} + +class ZkSyncCallLimitValidator( + private val upstream: Upstream, + private val options: ChainOptions.Options, +) : AbstractCallLimitValidator(upstream, options) { + private val method = "debug_traceBlockByNumber" + + override fun isEnabled() = + options.validateCallLimit && upstream.getMethods().getSupportedMethods().contains(method) + + override fun createRequest() = ChainRequest( + method, + ListParams("0x1b73b2b", mapOf("tracer" to "callTracer")), + ) + + override fun isLimitError(err: Throwable): Boolean = + err.message != null && err.message!!.contains("response size should not greater than") +} + +fun callLimitValidatorFactory( + upstream: Upstream, + options: ChainOptions.Options, + config: ChainConfig, + chain: Chain, +): CallLimitValidator { + return if (listOf(Chain.ZKSYNC__MAINNET).contains(chain)) { + ZkSyncCallLimitValidator(upstream, options) + } else { + EthCallLimitValidator(upstream, options, config) } +} - private fun validateChain(): Mono { - if (!options.validateChain) { - return Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_VALID) - } +class ChainIdValidator( + private val upstream: Upstream, + private val chain: Chain, +) : SingleValidator { + + companion object { + @JvmStatic + val log: Logger = LoggerFactory.getLogger(ChainIdValidator::class.java) + } + override fun validate(onError: ValidateUpstreamSettingsResult): Mono { return Mono.zip( chainId(), netVersion(), @@ -135,46 +174,46 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( } .onErrorResume { log.error("Error during chain validation", it) - Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR) + Mono.just(onError) } } - private fun validateCallLimit(): Mono { - val validator = callLimitValidatorFactory(upstream, options, config, chain) - if (!validator.isEnabled()) { - return Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_VALID) - } + private fun chainId(): Mono { return upstream.getIngressReader() - .read(validator.createRequest()) - .flatMap(ChainResponse::requireResult) - .map { ValidateUpstreamSettingsResult.UPSTREAM_VALID } - .onErrorResume { - if (validator.isLimitError(it)) { - log.warn( - "Error: ${it.message}. Node ${upstream.getId()} is probably incorrectly configured. " + - "You need to set up your return limit to at least ${options.callLimitSize}. " + - "Erigon config example: https://github.com/ledgerwatch/erigon/blob/d014da4dc039ea97caf04ed29feb2af92b7b129d/cmd/utils/flags.go#L369", - ) - Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR) - } else { - Mono.error(it) - } + .read(ChainRequest("eth_chainId", ListParams())) + .retryRandomBackoff(3, Duration.ofMillis(100), Duration.ofMillis(500)) { ctx -> + log.warn( + "error during chainId retrieving for ${upstream.getId()}, iteration ${ctx.iteration()}, " + + "message ${ctx.exception().message}", + ) } - .timeout( - Defaults.timeoutInternal, - Mono.fromCallable { log.error("No response for eth_call limit check from ${upstream.getId()}") } - .then(Mono.error(TimeoutException("Validation timeout for call limit"))), - ) + .doOnError { log.error("Error during execution 'eth_chainId' - ${it.message} for ${upstream.getId()}") } + .flatMap(ChainResponse::requireStringResult) + } + + private fun netVersion(): Mono { + return upstream.getIngressReader() + .read(ChainRequest("net_version", ListParams())) .retryRandomBackoff(3, Duration.ofMillis(100), Duration.ofMillis(500)) { ctx -> log.warn( - "error during validateCallLimit for ${upstream.getId()}, iteration ${ctx.iteration()}, " + + "error during netVersion retrieving for ${upstream.getId()}, iteration ${ctx.iteration()}, " + "message ${ctx.exception().message}", ) } - .onErrorReturn(ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR) + .doOnError { log.error("Error during execution 'net_version' - ${it.message} for ${upstream.getId()}") } + .flatMap(ChainResponse::requireStringResult) } +} + +class OldBlockValidator( + private val upstream: Upstream, +) : SingleValidator { - private fun validateOldBlocks(): Mono { + companion object { + @JvmStatic + val log: Logger = LoggerFactory.getLogger(OldBlockValidator::class.java) + } + override fun validate(onError: ValidateUpstreamSettingsResult): Mono { return EthereumArchiveBlockNumberReader(upstream.getIngressReader()) .readArchiveBlock() .flatMap { @@ -202,11 +241,18 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_VALID) } } +} + +class GasPriceValidator( + private val upstream: Upstream, + private val config: ChainConfig, +) : SingleValidator { - private fun validateGasPrice(): Mono { - if (!options.validateGasPrice) { - return Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_VALID) - } + companion object { + @JvmStatic + val log: Logger = LoggerFactory.getLogger(GasPriceValidator::class.java) + } + override fun validate(onError: ValidateUpstreamSettingsResult): Mono { return upstream.getIngressReader() .read(ChainRequest("eth_gasPrice", ListParams())) .flatMap(ChainResponse::requireStringResult) @@ -224,93 +270,7 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( } .onErrorResume { err -> log.warn("Error during gasPrice validation", err) - Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR) - } - } - - private fun chainId(): Mono { - return upstream.getIngressReader() - .read(ChainRequest("eth_chainId", ListParams())) - .retryRandomBackoff(3, Duration.ofMillis(100), Duration.ofMillis(500)) { ctx -> - log.warn( - "error during chainId retrieving for ${upstream.getId()}, iteration ${ctx.iteration()}, " + - "message ${ctx.exception().message}", - ) - } - .doOnError { log.error("Error during execution 'eth_chainId' - ${it.message} for ${upstream.getId()}") } - .flatMap(ChainResponse::requireStringResult) - } - - private fun netVersion(): Mono { - return upstream.getIngressReader() - .read(ChainRequest("net_version", ListParams())) - .retryRandomBackoff(3, Duration.ofMillis(100), Duration.ofMillis(500)) { ctx -> - log.warn( - "error during netVersion retrieving for ${upstream.getId()}, iteration ${ctx.iteration()}, " + - "message ${ctx.exception().message}", - ) + Mono.just(onError) } - .doOnError { log.error("Error during execution 'net_version' - ${it.message} for ${upstream.getId()}") } - .flatMap(ChainResponse::requireStringResult) - } -} - -interface CallLimitValidator { - fun isEnabled(): Boolean - fun createRequest(): ChainRequest - fun isLimitError(err: Throwable): Boolean -} - -class EthCallLimitValidator( - private val options: ChainOptions.Options, - private val config: ChainConfig, -) : CallLimitValidator { - override fun isEnabled() = options.validateCallLimit && config.callLimitContract != null - - override fun createRequest() = ChainRequest( - "eth_call", - ListParams( - TransactionCallJson( - Address.from(config.callLimitContract), - // contract like https://github.com/drpcorg/dshackle/pull/246 - // meta + size in hex - HexData.from("0xd8a26e3a" + options.callLimitSize.toString(16).padStart(64, '0')), - ), - "latest", - ), - ) - - override fun isLimitError(err: Throwable): Boolean = - err.message != null && err.message!!.contains("rpc.returndata.limit") -} - -class ZkSyncCallLimitValidator( - private val upstream: Upstream, - private val options: ChainOptions.Options, -) : CallLimitValidator { - private val method = "debug_traceBlockByNumber" - - override fun isEnabled() = - options.validateCallLimit && upstream.getMethods().getSupportedMethods().contains(method) - - override fun createRequest() = ChainRequest( - method, - ListParams("0x1b73b2b", mapOf("tracer" to "callTracer")), - ) - - override fun isLimitError(err: Throwable): Boolean = - err.message != null && err.message!!.contains("response size should not greater than") -} - -fun callLimitValidatorFactory( - upstream: Upstream, - options: ChainOptions.Options, - config: ChainConfig, - chain: Chain, -): CallLimitValidator { - return if (listOf(Chain.ZKSYNC__MAINNET).contains(chain)) { - ZkSyncCallLimitValidator(upstream, options) - } else { - EthCallLimitValidator(options, config) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ProduceLogs.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ProduceLogs.kt index d998cd7a6..3d67537a1 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ProduceLogs.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ProduceLogs.kt @@ -43,7 +43,7 @@ class ProduceLogs( } constructor(upstream: Multistream) : - this((upstream.getCachingReader() as EthereumCachingReader).logsByHash(), (upstream as Multistream).chain) + this((upstream.getCachingReader() as EthereumCachingReader).logsByHash(), (upstream as Multistream).getChain()) // need to keep history of recent messages in case they get removed. cannot rely on // any other cache or upstream because if when it gets removed it's unavailable in any other source diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt index 5f7c53635..bc37f5e08 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt @@ -2,7 +2,10 @@ package io.emeraldpay.dshackle.upstream.generic import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.cache.Caches +import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig +import io.emeraldpay.dshackle.config.hot.CompatibleVersionsRules import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.foundation.ChainOptions.Options import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.CachingReader import io.emeraldpay.dshackle.upstream.ChainRequest @@ -14,9 +17,14 @@ import io.emeraldpay.dshackle.upstream.LogsOracle import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.NoIngressSubscription import io.emeraldpay.dshackle.upstream.NoopCachingReader +import io.emeraldpay.dshackle.upstream.SingleValidator import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector +import io.emeraldpay.dshackle.upstream.UpstreamValidator +import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult +import io.emeraldpay.dshackle.upstream.VersionValidator import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.calls.CallSelector import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions @@ -25,6 +33,7 @@ import io.emeraldpay.dshackle.upstream.finalization.NoopFinalizationDetector import org.springframework.cloud.sleuth.Tracer import reactor.core.publisher.Mono import reactor.core.scheduler.Scheduler +import java.util.function.Supplier abstract class AbstractChainSpecific : ChainSpecific { override fun localReaderBuilder( @@ -66,6 +75,47 @@ abstract class AbstractChainSpecific : ChainSpecific { override fun callSelector(caches: Caches): CallSelector? { return null } + + override fun validator( + chain: Chain, + upstream: Upstream, + options: Options, + config: ChainConfig, + versionRules: Supplier, + ): UpstreamValidator { + val validators = if (options.disableValidation) { + emptyList() + } else { + upstreamValidators(chain, upstream, options, config) + + if (versionRules.get() != null) listOf(VersionValidator(upstream, versionRules)) else listOf() + } + + val settingsValidators = if (options.disableUpstreamValidation) { + emptyList() + } else { + upstreamSettingsValidators(chain, upstream, options, config) + } + + return AggregatedUpstreamValidator( + upstream, + options, + validators, + settingsValidators, + ) + } + + abstract fun upstreamValidators( + chain: Chain, + upstream: Upstream, + options: Options, + config: ChainConfig, + ): List> + abstract fun upstreamSettingsValidators( + chain: Chain, + upstream: Upstream, + options: Options, + config: ChainConfig, + ): List> } abstract class AbstractPollChainSpecific : AbstractChainSpecific() { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstreamValidator.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AggregatedUpstreamValidator.kt similarity index 53% rename from src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstreamValidator.kt rename to src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AggregatedUpstreamValidator.kt index 4a5474294..dacd06b67 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstreamValidator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AggregatedUpstreamValidator.kt @@ -1,51 +1,35 @@ package io.emeraldpay.dshackle.upstream.generic -import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.foundation.ChainOptions -import io.emeraldpay.dshackle.upstream.ChainResponse -import io.emeraldpay.dshackle.upstream.SingleCallValidator +import io.emeraldpay.dshackle.upstream.SingleValidator import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult import reactor.core.publisher.Mono -import java.util.concurrent.TimeoutException -class GenericUpstreamValidator( +class AggregatedUpstreamValidator( upstream: Upstream, options: ChainOptions.Options, - private val validators: List>, - private val startupValidators: List>, + private val validators: List>, + private val startupValidators: List>, ) : UpstreamValidator(upstream, options) { override fun validate(): Mono { return Mono.zip( - validators.map { exec(it, UpstreamAvailability.UNAVAILABLE) }, + validators.map { it.validate(UpstreamAvailability.UNAVAILABLE) }, ) { a -> a.map { it as UpstreamAvailability } } .map(::resolve) - .defaultIfEmpty(UpstreamAvailability.UNAVAILABLE) + .defaultIfEmpty(UpstreamAvailability.OK) // upstream is OK on case there are no validators .onErrorResume { log.error("Error during upstream validation for ${upstream.getId()}", it) Mono.just(UpstreamAvailability.UNAVAILABLE) } } - fun exec(validator: SingleCallValidator, onError: T): Mono { - return upstream.getIngressReader() - .read(validator.method) - .flatMap(ChainResponse::requireResult) - .map { validator.check(it) } - .timeout( - Defaults.timeoutInternal, - Mono.fromCallable { log.warn("No response for ${validator.method.method} from ${upstream.getId()}") } - .then(Mono.error(TimeoutException("Validation timeout for ${validator.method.method}"))), - ) - .doOnError { err -> log.error("Error during ${validator.method.method} validation for ${upstream.getId()}", err) } - .onErrorReturn(onError) - } override fun validateUpstreamSettings(): Mono { return Mono.zip( - startupValidators.map { exec(it, ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR) }, + startupValidators.map { it.validate(ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR) }, ) { a -> a.map { it as ValidateUpstreamSettingsResult } } .map(::resolve) .defaultIfEmpty(ValidateUpstreamSettingsResult.UPSTREAM_VALID) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt index ec852d438..7ea9b39b2 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt @@ -12,6 +12,7 @@ import io.emeraldpay.dshackle.BlockchainType.UNKNOWN import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.cache.Caches import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig +import io.emeraldpay.dshackle.config.hot.CompatibleVersionsRules import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.foundation.ChainOptions import io.emeraldpay.dshackle.reader.ChainReader @@ -42,6 +43,7 @@ import org.apache.commons.collections4.Factory import org.springframework.cloud.sleuth.Tracer import reactor.core.publisher.Mono import reactor.core.scheduler.Scheduler +import java.util.function.Supplier typealias SubscriptionBuilder = (Multistream) -> EgressSubscription typealias LocalReaderBuilder = (CachingReader, CallMethods, Head, LogsOracle?) -> Mono @@ -70,7 +72,13 @@ interface ChainSpecific { fun makeCachingReaderBuilder(tracer: Tracer): CachingReaderBuilder - fun validator(chain: Chain, upstream: Upstream, options: ChainOptions.Options, config: ChainConfig): UpstreamValidator + fun validator( + chain: Chain, + upstream: Upstream, + options: ChainOptions.Options, + config: ChainConfig, + versionRules: Supplier, + ): UpstreamValidator fun upstreamSettingsDetector(chain: Chain, upstream: Upstream): UpstreamSettingsDetector? diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index 326db67a2..4f5051cc6 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -4,6 +4,7 @@ import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.config.ChainsConfig import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.config.UpstreamsConfig.Labels +import io.emeraldpay.dshackle.config.hot.CompatibleVersionsRules import io.emeraldpay.dshackle.foundation.ChainOptions import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.startup.QuorumForLabels @@ -35,10 +36,11 @@ import reactor.core.publisher.Flux import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference +import java.util.function.Supplier open class GenericUpstream( id: String, - val chain: Chain, + chain: Chain, hash: Byte, options: ChainOptions.Options, role: UpstreamsConfig.UpstreamRole, @@ -50,6 +52,7 @@ open class GenericUpstream( upstreamSettingsDetectorBuilder: UpstreamSettingsDetectorBuilder, lowerBoundServiceBuilder: LowerBoundServiceBuilder, finalizationDetectorBuilder: FinalizationDetectorBuilder, + versionRules: Supplier, ) : DefaultUpstream(id, hash, null, UpstreamAvailability.OK, options, role, targets, node, chainConfig, chain), Lifecycle { constructor( @@ -66,12 +69,13 @@ open class GenericUpstream( buildMethods: (UpstreamsConfig.Upstream<*>, Chain) -> CallMethods, lowerBoundServiceBuilder: LowerBoundServiceBuilder, finalizationDetectorBuilder: FinalizationDetectorBuilder, - ) : this(config.id!!, chain, hash, options, config.role, buildMethods(config, chain), node, chainConfig, connectorFactory, validatorBuilder, upstreamSettingsDetectorBuilder, lowerBoundServiceBuilder, finalizationDetectorBuilder) { + versionRules: Supplier, + ) : this(config.id!!, chain, hash, options, config.role, buildMethods(config, chain), node, chainConfig, connectorFactory, validatorBuilder, upstreamSettingsDetectorBuilder, lowerBoundServiceBuilder, finalizationDetectorBuilder, versionRules) { rpcModulesDetector = upstreamRpcModulesDetectorBuilder(this) detectRpcModules(config, buildMethods) } - private val validator: UpstreamValidator? = validatorBuilder(chain, this, getOptions(), chainConfig) + private val validator: UpstreamValidator? = validatorBuilder(chain, this, getOptions(), chainConfig, versionRules) private var validatorSubscription: Disposable? = null private var validationSettingsSubscription: Disposable? = null private var lowerBlockDetectorSubscription: Disposable? = null @@ -142,7 +146,7 @@ open class GenericUpstream( } override fun start() { - log.info("Configured for ${chain.chainName}") + log.info("Configured for ${getChain().chainName}") connector.start() if (validator != null) { @@ -208,17 +212,23 @@ open class GenericUpstream( } private fun detectSettings() { - settingsDetector?.detectLabels() - ?.subscribe { label -> - updateLabels(label) - sendUpstreamStateEvent(UPDATED) - } - - settingsDetector?.detectClientVersion() - ?.subscribe { - log.info("Detected node version $it for upstream ${getId()}") - clientVersion.set(it) - } + Flux.interval( + Duration.ZERO, + Duration.ofSeconds(getOptions().validationInterval.toLong() * 5), + ).flatMap { + Flux.merge( + settingsDetector?.detectLabels() + ?.doOnNext { label -> + updateLabels(label) + sendUpstreamStateEvent(UPDATED) + }, + settingsDetector?.detectClientVersion() + ?.doOnNext { + log.info("Detected node version $it for upstream ${getId()}") + clientVersion.set(it) + }, + ) + }.subscribe() } private fun detectRpcModules(config: UpstreamsConfig.Upstream<*>, buildMethods: (UpstreamsConfig.Upstream<*>, Chain) -> CallMethods) { @@ -242,7 +252,7 @@ open class GenericUpstream( } } } - if (changed) updateMethods(buildMethods(config, chain)) + if (changed) updateMethods(buildMethods(config, getChain())) } } @@ -308,7 +318,7 @@ open class GenericUpstream( private fun detectFinalization() { finalizationDetectorSubscription = - finalizationDetector.detectFinalization(this, chainConfig.expectedBlockTime, chain).subscribe { + finalizationDetector.detectFinalization(this, chainConfig.expectedBlockTime, getChain()).subscribe { sendUpstreamStateEvent(UPDATED) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreamCreator.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreamCreator.kt index 1a96a75ba..f0dff1ba5 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreamCreator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreamCreator.kt @@ -41,7 +41,7 @@ class GrpcUpstreamCreator( ) } - fun creatGrpcUpstream( + fun createGrpcUpstream( config: UpstreamsConfig.Upstream, chainsConfig: ChainsConfig, ): GrpcUpstreams { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearChainSpecific.kt index 1facfb95a..f1f4c7760 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearChainSpecific.kt @@ -10,14 +10,13 @@ import io.emeraldpay.dshackle.data.BlockId import io.emeraldpay.dshackle.foundation.ChainOptions.Options import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.ChainRequest -import io.emeraldpay.dshackle.upstream.SingleCallValidator +import io.emeraldpay.dshackle.upstream.GenericSingleCallValidator +import io.emeraldpay.dshackle.upstream.SingleValidator import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector -import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult import io.emeraldpay.dshackle.upstream.generic.AbstractPollChainSpecific -import io.emeraldpay.dshackle.upstream.generic.GenericUpstreamValidator import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService import io.emeraldpay.dshackle.upstream.rpcclient.ListParams import io.emeraldpay.dshackle.upstream.rpcclient.ObjectParams @@ -56,29 +55,33 @@ object NearChainSpecific : AbstractPollChainSpecific() { throw NotImplementedError() } - override fun validator( + override fun upstreamValidators( chain: Chain, upstream: Upstream, options: Options, config: ChainConfig, - ): UpstreamValidator { - return GenericUpstreamValidator( - upstream, - options, - listOf( - SingleCallValidator( - ChainRequest("status", ListParams()), - ) { data -> - validate(data) - }, - ), - listOf( - SingleCallValidator( - ChainRequest("status", ListParams()), - ) { data -> - validateSettings(data, chain) - }, - ), + ): List> { + return listOf( + GenericSingleCallValidator( + ChainRequest("status", ListParams()), + upstream, + ) { data -> validate(data) }, + ) + } + + override fun upstreamSettingsValidators( + chain: Chain, + upstream: Upstream, + options: Options, + config: ChainConfig, + ): List> { + return listOf( + GenericSingleCallValidator( + ChainRequest("status", ListParams()), + upstream, + ) { data -> + validateSettings(data, chain) + }, ) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearUpstreamSettingsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearUpstreamSettingsDetector.kt index 85edb9415..408eb580d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearUpstreamSettingsDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearUpstreamSettingsDetector.kt @@ -44,8 +44,8 @@ class NearUpstreamSettingsDetector( override fun nodeTypeRequest(): NodeTypeRequest = NodeTypeRequest(clientVersionRequest()) - override fun clientType(node: JsonNode): String? = null + override fun clientType(node: JsonNode): String = "near" - override fun clientVersion(node: JsonNode): String? = + override fun clientVersion(node: JsonNode): String = node.get("version")?.get("version")?.asText() ?: UNKNOWN_CLIENT_VERSION } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt index 5c87b469e..720758b65 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt @@ -12,14 +12,14 @@ import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.CachingReader import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.EgressSubscription +import io.emeraldpay.dshackle.upstream.GenericSingleCallValidator import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.LogsOracle import io.emeraldpay.dshackle.upstream.Multistream -import io.emeraldpay.dshackle.upstream.SingleCallValidator +import io.emeraldpay.dshackle.upstream.SingleValidator import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability -import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.calls.DefaultPolkadotMethods @@ -27,7 +27,6 @@ import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions import io.emeraldpay.dshackle.upstream.generic.AbstractPollChainSpecific import io.emeraldpay.dshackle.upstream.generic.GenericEgressSubscription import io.emeraldpay.dshackle.upstream.generic.GenericIngressSubscription -import io.emeraldpay.dshackle.upstream.generic.GenericUpstreamValidator import io.emeraldpay.dshackle.upstream.generic.LocalReader import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService import io.emeraldpay.dshackle.upstream.rpcclient.ListParams @@ -89,29 +88,35 @@ object PolkadotChainSpecific : AbstractPollChainSpecific() { return { ms -> GenericEgressSubscription(ms, headScheduler) } } - override fun validator( + override fun upstreamValidators( chain: Chain, upstream: Upstream, options: Options, config: ChainConfig, - ): UpstreamValidator { - return GenericUpstreamValidator( - upstream, - options, - listOf( - SingleCallValidator( - ChainRequest("system_health", ListParams()), - ) { data -> - validate(data, options.minPeers, upstream.getId()) - }, - ), - listOf( - SingleCallValidator( - ChainRequest("system_chain", ListParams()), - ) { data -> - validateSettings(data, chain) - }, - ), + ): List> { + return listOf( + GenericSingleCallValidator( + ChainRequest("system_health", ListParams()), + upstream, + ) { data -> + validate(data, options.minPeers, upstream.getId()) + }, + ) + } + + override fun upstreamSettingsValidators( + chain: Chain, + upstream: Upstream, + options: Options, + config: ChainConfig, + ): List> { + return listOf( + GenericSingleCallValidator( + ChainRequest("system_chain", ListParams()), + upstream, + ) { data -> + validateSettings(data, chain) + }, ) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt index 40710dbdc..388d76c19 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt @@ -12,18 +12,18 @@ import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.DefaultSolanaMethods import io.emeraldpay.dshackle.upstream.EgressSubscription +import io.emeraldpay.dshackle.upstream.GenericSingleCallValidator import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.Multistream -import io.emeraldpay.dshackle.upstream.SingleCallValidator +import io.emeraldpay.dshackle.upstream.SingleValidator import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector -import io.emeraldpay.dshackle.upstream.UpstreamValidator +import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions import io.emeraldpay.dshackle.upstream.generic.AbstractChainSpecific import io.emeraldpay.dshackle.upstream.generic.GenericEgressSubscription import io.emeraldpay.dshackle.upstream.generic.GenericIngressSubscription -import io.emeraldpay.dshackle.upstream.generic.GenericUpstreamValidator import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService import io.emeraldpay.dshackle.upstream.rpcclient.ListParams import org.slf4j.LoggerFactory @@ -115,32 +115,37 @@ object SolanaChainSpecific : AbstractChainSpecific() { return ChainRequest("blockUnsubscribe", ListParams(subId)) } - override fun validator( + override fun upstreamValidators( chain: Chain, upstream: Upstream, options: Options, config: ChainConfig, - ): UpstreamValidator { - return GenericUpstreamValidator( - upstream, - options, - listOf( - SingleCallValidator( - ChainRequest("getHealth", ListParams()), - ) { data -> - val resp = String(data) - if (resp == "\"ok\"") { - UpstreamAvailability.OK - } else { - log.warn("Upstream {} validation failed, solana status is {}", upstream.getId(), resp) - UpstreamAvailability.UNAVAILABLE - } - }, - ), - listOf(), + ): List> { + return listOf( + GenericSingleCallValidator( + ChainRequest("getHealth", ListParams()), + upstream, + ) { data -> + val resp = String(data) + if (resp == "\"ok\"") { + UpstreamAvailability.OK + } else { + log.warn("Upstream {} validation failed, solana status is {}", upstream.getId(), resp) + UpstreamAvailability.UNAVAILABLE + } + }, ) } + override fun upstreamSettingsValidators( + chain: Chain, + upstream: Upstream, + options: Options, + config: ChainConfig, + ): List> { + return listOf() + } + override fun lowerBoundService(chain: Chain, upstream: Upstream): LowerBoundService { return SolanaLowerBoundService(chain, upstream) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaUpstreamSettingsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaUpstreamSettingsDetector.kt index da4386e0b..2e4d15300 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaUpstreamSettingsDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaUpstreamSettingsDetector.kt @@ -38,8 +38,8 @@ class SolanaUpstreamSettingsDetector( override fun nodeTypeRequest(): NodeTypeRequest = NodeTypeRequest(clientVersionRequest()) - override fun clientVersion(node: JsonNode): String? = + override fun clientVersion(node: JsonNode): String = node.get("solana-core")?.textValue() ?: UNKNOWN_CLIENT_VERSION - override fun clientType(node: JsonNode): String? = null + override fun clientType(node: JsonNode): String = "solana" // todo - solana have different types of nodes, need to detect } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt index b68829993..69da6f7cf 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt @@ -10,12 +10,12 @@ import io.emeraldpay.dshackle.data.BlockId import io.emeraldpay.dshackle.foundation.ChainOptions.Options import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.ChainRequest -import io.emeraldpay.dshackle.upstream.SingleCallValidator +import io.emeraldpay.dshackle.upstream.GenericSingleCallValidator +import io.emeraldpay.dshackle.upstream.SingleValidator import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability -import io.emeraldpay.dshackle.upstream.UpstreamValidator +import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult import io.emeraldpay.dshackle.upstream.generic.AbstractPollChainSpecific -import io.emeraldpay.dshackle.upstream.generic.GenericUpstreamValidator import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService import io.emeraldpay.dshackle.upstream.rpcclient.ListParams import org.slf4j.LoggerFactory @@ -56,26 +56,31 @@ object StarknetChainSpecific : AbstractPollChainSpecific() { throw NotImplementedError() } - override fun validator( + override fun upstreamValidators( chain: Chain, upstream: Upstream, options: Options, config: ChainConfig, - ): UpstreamValidator { - return GenericUpstreamValidator( - upstream, - options, - listOf( - SingleCallValidator( - ChainRequest("starknet_syncing", ListParams()), - ) { data -> - validate(data, config.laggingLagSize, upstream.getId()) - }, - ), - listOf(), + ): List> { + return listOf( + GenericSingleCallValidator( + ChainRequest("starknet_syncing", ListParams()), + upstream, + ) { data -> + validate(data, config.laggingLagSize, upstream.getId()) + }, ) } + override fun upstreamSettingsValidators( + chain: Chain, + upstream: Upstream, + options: Options, + config: ChainConfig, + ): List> { + return listOf() + } + override fun lowerBoundService(chain: Chain, upstream: Upstream): LowerBoundService { return StarknetLowerBoundService(chain, upstream) } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0e1c8ee99..db5bfba84 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -20,4 +20,8 @@ spring: spans: collect: - long-span-threshold: ${LONG_SPAN_THRESHOLD:1000} \ No newline at end of file + long-span-threshold: ${LONG_SPAN_THRESHOLD:1000} + +compatibility: + enabled: ${DSHACKLE_COMPATIBILITY_ENABLED:true} + url: ${DSHACKLE_COMPATIBILITY_URL:https://raw.githubusercontent.com/drpcorg/public/main/compatible-clients.yaml} diff --git a/src/test/groovy/io/emeraldpay/dshackle/test/GenericUpstreamMock.groovy b/src/test/groovy/io/emeraldpay/dshackle/test/GenericUpstreamMock.groovy index fdfd9a772..52621d6f7 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/test/GenericUpstreamMock.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/test/GenericUpstreamMock.groovy @@ -78,6 +78,7 @@ class GenericUpstreamMock extends GenericUpstream { io.emeraldpay.dshackle.upstream.starknet.StarknetChainSpecific.INSTANCE.&upstreamSettingsDetector, io.emeraldpay.dshackle.upstream.starknet.StarknetChainSpecific.INSTANCE.&lowerBoundService, io.emeraldpay.dshackle.upstream.starknet.StarknetChainSpecific.INSTANCE.&finalizationDetectorBuilder, + [get: { null }] as java.util.function.Supplier, ) this.ethereumHeadMock = this.getHead() as EthereumHeadMock setLag(0) diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy index bf126a0ad..d0e5da75d 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy @@ -78,7 +78,8 @@ class FilteredApisSpec extends Specification { cs.&validator, cs.&upstreamSettingsDetector, cs.&lowerBoundService, - cs.&finalizationDetectorBuilder + cs.&finalizationDetectorBuilder, + [get: { null }] as java.util.function.Supplier, ) } def matcher = new Selector.LabelMatcher("test", ["foo"]) diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidatorSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidatorSpec.groovy deleted file mode 100644 index 284c3b5dc..000000000 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidatorSpec.groovy +++ /dev/null @@ -1,493 +0,0 @@ -/** - * Copyright (c) 2022 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.upstream.ethereum - - -import io.emeraldpay.dshackle.foundation.ChainOptions -import io.emeraldpay.dshackle.reader.Reader -import io.emeraldpay.dshackle.test.ApiReaderMock -import io.emeraldpay.dshackle.test.TestingCommons -import io.emeraldpay.dshackle.upstream.Head -import io.emeraldpay.dshackle.upstream.ChainCallError -import io.emeraldpay.dshackle.upstream.ChainRequest -import io.emeraldpay.dshackle.upstream.ChainResponse -import io.emeraldpay.dshackle.upstream.rpcclient.ListParams -import io.emeraldpay.dshackle.upstream.Upstream -import io.emeraldpay.dshackle.upstream.ethereum.domain.Address -import io.emeraldpay.dshackle.upstream.ethereum.hex.HexData -import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcResponseError -import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionCallJson -import reactor.core.publisher.Mono -import reactor.util.function.Tuples -import spock.lang.Specification - -import java.time.Duration - -import static io.emeraldpay.dshackle.Chain.BSC__MAINNET -import static io.emeraldpay.dshackle.Chain.ETHEREUM__MAINNET -import static io.emeraldpay.dshackle.Chain.OPTIMISM__MAINNET -import static io.emeraldpay.dshackle.upstream.UpstreamAvailability.* -import static io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR -import static io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR -import static io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult.UPSTREAM_VALID -import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig - -class EthereumUpstreamValidatorSpec extends Specification { - - def conf = ChainConfig.defaultWithContract("0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96") - - def "Resolve to final availability"() { - setup: - - def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, Stub(Upstream), ChainOptions.PartialOptions.getDefaults().buildOptions(), conf) - expect: - validator.resolve(Tuples.of(sync, peers)) == exp - where: - exp | sync | peers - OK | OK | OK - IMMATURE | OK | IMMATURE - UNAVAILABLE | OK | UNAVAILABLE - SYNCING | SYNCING | OK - SYNCING | SYNCING | IMMATURE - UNAVAILABLE | SYNCING | UNAVAILABLE - UNAVAILABLE | UNAVAILABLE | OK - UNAVAILABLE | UNAVAILABLE | IMMATURE - UNAVAILABLE | UNAVAILABLE | UNAVAILABLE - } - - def "Doesnt check eth_syncing when disabled"() { - setup: - def options = ChainOptions.PartialOptions.getDefaults().tap { - it.validateSyncing = false - }.buildOptions() - def up = Mock(Upstream) - def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, conf) - - when: - def act = validator.validateSyncing().block(Duration.ofSeconds(1)) - then: - act == OK - 0 * up.getIngressReader() - } - - def "Syncing is OK when false returned from upstream"() { - setup: - def options = ChainOptions.PartialOptions.getDefaults().tap { - it.validateSyncing = true - }.buildOptions() - def up = TestingCommons.upstream( - new ApiReaderMock().tap { - answer("eth_syncing", [], false) - } - ) - def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, conf) - - when: - def act = validator.validateSyncing().block(Duration.ofSeconds(1)) - then: - act == OK - } - - def "Execute onSyncingNode with result of eth_syncing"() { - setup: - def options = ChainOptions.PartialOptions.getDefaults().tap { - it.validateSyncing = true - }.buildOptions() - def up = Mock(Upstream) { - 2 * getIngressReader() >> Mock(Reader) { reader -> - 2 * reader.read(_) >>> [ - Mono.just(new ChainResponse('true'.getBytes(), null)), - Mono.just(new ChainResponse('false'.getBytes(), null)) - ] - } - 2 * getHead() >> Mock(Head) { head -> - 1 * head.onSyncingNode(true) - 1 * head.onSyncingNode(false) - } - } - def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, conf) - - when: - def act = validator.validateSyncing().block(Duration.ofSeconds(1)) - def act2 = validator.validateSyncing().block(Duration.ofSeconds(1)) - then: - act == SYNCING - act2 == OK - } - - def "Syncing is SYNCING when state returned from upstream"() { - setup: - def options = ChainOptions.PartialOptions.getDefaults().tap { - it.validateSyncing = true - }.buildOptions() - def up = TestingCommons.upstream( - new ApiReaderMock().tap { - answer("eth_syncing", [], [startingBlock: 100, currentBlock: 50]) - } - ) - def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, conf) - - when: - def act = validator.validateSyncing().block(Duration.ofSeconds(1)) - then: - act == SYNCING - } - - def "Syncing is UNAVAILABLE when error returned from upstream"() { - setup: - def options = ChainOptions.PartialOptions.getDefaults().tap { - it.validateSyncing = true - }.buildOptions() - def up = TestingCommons.upstream( - new ApiReaderMock().tap { - answer("eth_syncing", [], new RpcResponseError(RpcResponseError.CODE_METHOD_NOT_EXIST, "Unavailable")) - } - ) - def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, conf) - - when: - def act = validator.validateSyncing().block(Duration.ofSeconds(1)) - then: - act == UNAVAILABLE - } - - def "Doesnt validate peers when disabled"() { - setup: - def options = ChainOptions.PartialOptions.getDefaults().tap { - it.validatePeers = false - it.minPeers = 10 - }.buildOptions() - def up = Mock(Upstream) - def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, conf) - - when: - def act = validator.validatePeers().block(Duration.ofSeconds(1)) - then: - act == OK - 0 * up.getApi() - } - - def "Doesnt validate peers when zero peers is expected"() { - setup: - def options = ChainOptions.PartialOptions.getDefaults().tap { - it.validatePeers = true - it.minPeers = 0 - }.buildOptions() - def up = Mock(Upstream) - def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, conf) - - when: - def act = validator.validatePeers().block(Duration.ofSeconds(1)) - then: - act == OK - 0 * up.getIngressReader() - } - - def "Peers is IMMATURE when state returned too few peers"() { - setup: - def options = ChainOptions.PartialOptions.getDefaults().tap { - it.validatePeers = true - it.minPeers = 10 - }.buildOptions() - def up = TestingCommons.upstream( - new ApiReaderMock().tap { - answer("net_peerCount", [], "0x5") - } - ) - def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, conf) - - when: - def act = validator.validatePeers().block(Duration.ofSeconds(1)) - then: - act == IMMATURE - } - - def "Peers is OK when state returned exactly min peers"() { - setup: - def options = ChainOptions.PartialOptions.getDefaults().tap { - it.validatePeers = true - it.minPeers = 10 - }.buildOptions() - def up = TestingCommons.upstream( - new ApiReaderMock().tap { - answer("net_peerCount", [], "0xa") - } - ) - def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, conf) - - when: - def act = validator.validatePeers().block(Duration.ofSeconds(1)) - then: - act == OK - } - - def "Peers is OK when state returned more than enough peers"() { - setup: - def options = ChainOptions.PartialOptions.getDefaults().tap { - it.validatePeers = true - it.minPeers = 10 - }.buildOptions() - def up = TestingCommons.upstream( - new ApiReaderMock().tap { - answer("net_peerCount", [], "0xff") - } - ) - def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, conf) - - when: - def act = validator.validatePeers().block(Duration.ofSeconds(1)) - then: - act == OK - } - - def "Peers is UNAVAILABLE when state returned error"() { - setup: - def options = ChainOptions.PartialOptions.getDefaults().tap { - it.validatePeers = true - it.minPeers = 10 - }.buildOptions() - def up = TestingCommons.upstream( - new ApiReaderMock().tap { - answer("net_peerCount", [], new RpcResponseError(RpcResponseError.CODE_METHOD_NOT_EXIST, "Unavailable")) - } - ) - def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, conf) - - when: - def act = validator.validatePeers().block(Duration.ofSeconds(1)) - then: - act == UNAVAILABLE - } - - def "Doesnt validate chan and callLimit when disabled"() { - setup: - def options = ChainOptions.PartialOptions.getDefaults().tap { - it.validateCallLimit = false - it.validateChain = false - it.validateGasPrice = false - }.buildOptions() - def up = Mock(Upstream) { - 2 * getIngressReader() >> - Mock(Reader) { - 1 * read(new ChainRequest("eth_blockNumber", new ListParams())) >> Mono.just(new ChainResponse('"0x10ff9be"'.getBytes(), null)) - 1 * read(new ChainRequest("eth_getBlockByNumber", new ListParams(["0x10fd2ae", false]))) >> - Mono.just(new ChainResponse('"result"'.getBytes(), null)) - } - } - def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, conf) - - when: - def act = validator.validateUpstreamSettingsOnStartup() - then: - act == UPSTREAM_VALID - } - - def "Upstream is valid if not error from call limit check"() { - setup: - def options = ChainOptions.PartialOptions.getDefaults().tap { - it.validateChain = false - it.validateGasPrice = false - }.buildOptions() - def up = Mock(Upstream) { - 3 * getIngressReader() >> Mock(Reader) { - 1 * read(new ChainRequest("eth_call", new ListParams([new TransactionCallJson( - Address.from("0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96"), - HexData.from("0xd8a26e3a00000000000000000000000000000000000000000000000000000000000f4240") - ), "latest"]))) >> Mono.just(new ChainResponse("0x00000000000000000000".getBytes(), null)) - 1 * read(new ChainRequest("eth_blockNumber", new ListParams())) >> Mono.just(new ChainResponse('"0x10ff9be"'.getBytes(), null)) - 1 * read(new ChainRequest("eth_getBlockByNumber", new ListParams(["0x10fd2ae", false]))) >> - Mono.just(new ChainResponse('"result"'.getBytes(), null)) - } - } - def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, conf) -//"0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96 - when: - def act = validator.validateUpstreamSettingsOnStartup() - then: - act == UPSTREAM_VALID - } - - def "Upstream is not valid if error returned on call limit check"() { - setup: - def options = ChainOptions.PartialOptions.getDefaults().tap { - it.validateChain = false - it.validateGasPrice = false - }.buildOptions() - def up = Mock(Upstream) { - 3 * getIngressReader() >> Mock(Reader) { - 1 * read(new ChainRequest("eth_call", new ListParams([new TransactionCallJson( - Address.from("0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96"), - HexData.from("0xd8a26e3a00000000000000000000000000000000000000000000000000000000000f4240") - ), "latest"]))) >> Mono.just(new ChainResponse(null, new ChainCallError(1, "Too long"))) - 1 * read(new ChainRequest("eth_blockNumber", new ListParams())) >> Mono.just(new ChainResponse('"0x10ff9be"'.getBytes(), null)) - 1 * read(new ChainRequest("eth_getBlockByNumber", new ListParams(["0x10fd2ae", false]))) >> - Mono.just(new ChainResponse('"result"'.getBytes(), null)) - } - } - def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, conf) -// "0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96" - when: - def act = validator.validateUpstreamSettingsOnStartup() - then: - act == UPSTREAM_SETTINGS_ERROR - } - - def "Upstream is valid if gas price is equal to expected"() { - setup: - def options = ChainOptions.PartialOptions.getDefaults().tap { - it.validateChain = false - it.validateCallLimit = false - }.buildOptions() - def conf = ChainConfig.defaultWithGasPriceCondition(["ne 3000000000", "ne 5000000000"]) - def up = Mock(Upstream) { - 3 * getIngressReader() >> - Mock(Reader) { - 1 * read(new ChainRequest("eth_gasPrice", new ListParams())) >> Mono.just(new ChainResponse('"0x3b9aca00"'.getBytes(), null)) - 1 * read(new ChainRequest("eth_blockNumber", new ListParams())) >> Mono.just(new ChainResponse('"0x10ff9be"'.getBytes(), null)) - 1 * read(new ChainRequest("eth_getBlockByNumber", new ListParams(["0x10fd2ae", false]))) >> - Mono.just(new ChainResponse('"result"'.getBytes(), null)) - } - } - def validator = new EthereumUpstreamValidator(BSC__MAINNET, up, options, conf) - when: - def act = validator.validateUpstreamSettingsOnStartup() - then: - act == UPSTREAM_VALID - } - - def "Upstream is NOT valid if gas price is different from expected"() { - setup: - def options = ChainOptions.PartialOptions.getDefaults().tap { - it.validateChain = false - it.validateCallLimit = false - }.buildOptions() - def conf = ChainConfig.defaultWithGasPriceCondition(["eq 1000000000"]) - def up = Mock(Upstream) { - 3 * getIngressReader() >> - Mock(Reader) { - 1 * read(new ChainRequest("eth_gasPrice", new ListParams())) >> Mono.just(new ChainResponse('"0xb2d05e00"'.getBytes(), null)) - 1 * read(new ChainRequest("eth_blockNumber", new ListParams())) >> Mono.just(new ChainResponse('"0x10ff9be"'.getBytes(), null)) - 1 * read(new ChainRequest("eth_getBlockByNumber", new ListParams(["0x10fd2ae", false]))) >> - Mono.just(new ChainResponse('"result"'.getBytes(), null)) - } - } - def validator = new EthereumUpstreamValidator(BSC__MAINNET, up, options, conf) - when: - def act = validator.validateUpstreamSettingsOnStartup() - then: - act == UPSTREAM_FATAL_SETTINGS_ERROR - } - - def "Upstream is valid if chain settings are valid"() { - setup: - def options = ChainOptions.PartialOptions.getDefaults().tap { - it.validateCallLimit = false - it.validateGasPrice = false - }.buildOptions() - def up = Mock(Upstream) { - 4 * getIngressReader() >> Mock(Reader) { - 1 * read(new ChainRequest("eth_chainId", new ListParams())) >> Mono.just(new ChainResponse('"0x1"'.getBytes(), null)) - 1 * read(new ChainRequest("net_version", new ListParams())) >> Mono.just(new ChainResponse('"1"'.getBytes(), null)) - 1 * read(new ChainRequest("eth_blockNumber", new ListParams())) >> Mono.just(new ChainResponse('"0x10ff9be"'.getBytes(), null)) - 1 * read(new ChainRequest("eth_getBlockByNumber", new ListParams(["0x10fd2ae", false]))) >> - Mono.just(new ChainResponse('"result"'.getBytes(), null)) - } - } - def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, conf) -// "0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96" - when: - def act = validator.validateUpstreamSettingsOnStartup() - then: - act == UPSTREAM_VALID - } - - def "Upstream is not valid - specified optimism but got ethereum"() { - setup: - def options = ChainOptions.PartialOptions.getDefaults().tap { - it.validateCallLimit = false - it.validateGasPrice = false - }.buildOptions() - def up = Mock(Upstream) { - 4 * getIngressReader() >> Mock(Reader) { - 1 * read(new ChainRequest("eth_chainId", new ListParams())) >> Mono.just(new ChainResponse('"0x1"'.getBytes(), null)) - 1 * read(new ChainRequest("net_version", new ListParams())) >> Mono.just(new ChainResponse('"1"'.getBytes(), null)) - 1 * read(new ChainRequest("eth_blockNumber", new ListParams())) >> Mono.just(new ChainResponse('"0x10ff9be"'.getBytes(), null)) - 1 * read(new ChainRequest("eth_getBlockByNumber", new ListParams(["0x10fd2ae", false]))) >> - Mono.just(new ChainResponse('"result"'.getBytes(), null)) - } - } - def validator = new EthereumUpstreamValidator(OPTIMISM__MAINNET, up, options, conf) -// "0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96" - when: - def act = validator.validateUpstreamSettingsOnStartup() - then: - act == UPSTREAM_FATAL_SETTINGS_ERROR - } - - def "Upstream is valid if all setting are valid"() { - setup: - def options = ChainOptions.PartialOptions.getDefaults().tap{ - it.validateGasPrice = false - }.buildOptions() - def up = Mock(Upstream) { - 5 * getIngressReader() >> Mock(Reader) { - 1 * read(new ChainRequest("eth_chainId", new ListParams())) >> Mono.just(new ChainResponse('"0x1"'.getBytes(), null)) - 1 * read(new ChainRequest("net_version", new ListParams())) >> Mono.just(new ChainResponse('"1"'.getBytes(), null)) - 1 * read(new ChainRequest("eth_call", new ListParams([new TransactionCallJson( - Address.from("0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96"), - HexData.from("0xd8a26e3a00000000000000000000000000000000000000000000000000000000000f4240") - ), "latest"]))) >> Mono.just(new ChainResponse("0x00000000000000000000".getBytes(), null)) - 1 * read(new ChainRequest("eth_blockNumber", new ListParams())) >> Mono.just(new ChainResponse('"0x10ff9be"'.getBytes(), null)) - 1 * read(new ChainRequest("eth_getBlockByNumber", new ListParams(["0x10fd2ae", false]))) >> - Mono.just(new ChainResponse('"result"'.getBytes(), null)) - } - } - def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, conf) -// "0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96" - when: - def act = validator.validateUpstreamSettingsOnStartup() - then: - act == UPSTREAM_VALID - } - - def "Upstream is not valid if there are errors"() { - setup: - def options = ChainOptions.PartialOptions.getDefaults().tap { - it.validateGasPrice = false - }.buildOptions() - def up = Mock(Upstream) { - 5 * getIngressReader() >> Mock(Reader) { - 1 * read(new ChainRequest("eth_chainId", new ListParams())) >> Mono.just(new ChainResponse(null, new ChainCallError(1, "Too long"))) - 1 * read(new ChainRequest("net_version", new ListParams())) >> Mono.just(new ChainResponse(null, new ChainCallError(1, "Too long"))) - 1 * read(new ChainRequest("eth_call", new ListParams([new TransactionCallJson( - Address.from("0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96"), - HexData.from("0xd8a26e3a00000000000000000000000000000000000000000000000000000000000f4240") - ), "latest"]))) >> Mono.just(new ChainResponse(null, new ChainCallError(1, "Too long"))) - 1 * read(new ChainRequest("eth_blockNumber", new ListParams())) >> Mono.just(new ChainResponse('"0x10ff9be"'.getBytes(), null)) - 1 * read(new ChainRequest("eth_getBlockByNumber", new ListParams(["0x10fd2ae", false]))) >> - Mono.just(new ChainResponse('"result"'.getBytes(), null)) - } - } - def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, conf) -// "0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96" - when: - def act = validator.validateUpstreamSettingsOnStartup() - then: - act == UPSTREAM_SETTINGS_ERROR - } - - -} diff --git a/src/test/kotlin/io/emeraldpay/dshackle/IntegrationTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/IntegrationTest.kt index 95417097d..5617472e0 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/IntegrationTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/IntegrationTest.kt @@ -14,6 +14,7 @@ import io.grpc.inprocess.InProcessChannelBuilder import io.grpc.inprocess.InProcessServerBuilder import org.assertj.core.api.Assertions import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest @@ -29,6 +30,7 @@ import java.net.URI @SpringBootTest(properties = ["spring.main.allow-bean-definition-overriding=true"]) @Import(Config::class) @ActiveProfiles("integration-test") +@Disabled class IntegrationTest { @Autowired diff --git a/src/test/kotlin/io/emeraldpay/dshackle/config/hot/CompatibleVersionsRulesTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/config/hot/CompatibleVersionsRulesTest.kt new file mode 100644 index 000000000..4af6e4e5e --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/config/hot/CompatibleVersionsRulesTest.kt @@ -0,0 +1,45 @@ +package io.emeraldpay.dshackle.config.hot + +import io.emeraldpay.dshackle.Global +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +class CompatibleVersionsRulesTest { + @Test + fun `test parsing`() { + val raw = """ + rules: + - client: "client1" + blacklist: + - 1.0.0 + - 1.0.1 + whitelist: + - 1.0.2 + - 1.0.3 + - client: "client2" + blacklist: + - 1.0.0 + - 1.0.1 + whitelist: + - 1.0.2 + - 1.0.3 + """.trimIndent() + + val rules = Global.yamlMapper.readValue(raw, CompatibleVersionsRules::class.java)!!.rules + assertEquals(2, rules.size) + assertEquals("client1", rules[0].client) + assertEquals(2, rules[0].blacklist!!.size) + assertEquals("1.0.0", rules[0].blacklist!![0]) + assertEquals("1.0.1", rules[0].blacklist!![1]) + assertEquals(2, rules[0].whitelist!!.size) + assertEquals("1.0.2", rules[0].whitelist!![0]) + assertEquals("1.0.3", rules[0].whitelist!![1]) + assertEquals("client2", rules[1].client) + assertEquals(2, rules[1].blacklist!!.size) + assertEquals("1.0.0", rules[1].blacklist!![0]) + assertEquals("1.0.1", rules[1].blacklist!![1]) + assertEquals(2, rules[1].whitelist!!.size) + assertEquals("1.0.2", rules[1].whitelist!![0]) + assertEquals("1.0.3", rules[1].whitelist!![1]) + } +} diff --git a/src/test/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatusTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatusTest.kt index 7b95f0e51..93bd62c9a 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatusTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatusTest.kt @@ -47,7 +47,7 @@ class SubscribeChainStatusTest { on { getFlux() } doReturn Flux.error(IllegalStateException()) } val ms = mock { - on { chain } doReturn Chain.ETHEREUM__MAINNET + on { getChain() } doReturn Chain.ETHEREUM__MAINNET on { getHead() } doReturn head on { stateEvents() } doReturn Flux.empty() } diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainValidatorTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainValidatorTest.kt index 7de9099f7..0f3d87a6f 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainValidatorTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainValidatorTest.kt @@ -1,5 +1,7 @@ package io.emeraldpay.dshackle.upstream.beaconchain +import io.emeraldpay.dshackle.Chain.ETH_BEACON_CHAIN__MAINNET +import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig import io.emeraldpay.dshackle.foundation.ChainOptions import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.ChainCallError @@ -39,7 +41,13 @@ class BeaconChainValidatorTest { on { getIngressReader() } doReturn reader on { getHead() } doReturn mock() } - val validator = BeaconChainValidator(upstream, ChainOptions.PartialOptions.getDefaults().buildOptions()) + val validator = BeaconChainSpecific.validator( + ETH_BEACON_CHAIN__MAINNET, + upstream, + ChainOptions.PartialOptions.getDefaults().buildOptions(), + ChainConfig.default(), + { null }, + ) val result = validator.validate().block()