diff --git a/build.gradle b/build.gradle index e74b817e3..72622b8a5 100644 --- a/build.gradle +++ b/build.gradle @@ -190,7 +190,7 @@ jib { } } container { - jvmFlags = ['-XX:+UseG1GC', '-XX:+ExitOnOutOfMemoryError', '-Xms1024M', '-XX:+UnlockDiagnosticVMOptions', '-XX:GCLockerRetryAllocationCount=10', '--enable-preview'] + jvmFlags = ['-XX:+UseG1GC', '-XX:+ExitOnOutOfMemoryError', '-Xms1024M', '-XX:NativeMemoryTracking=summary', '-XX:+UnlockDiagnosticVMOptions', '-XX:GCLockerRetryAllocationCount=10', '--enable-preview'] mainClass = 'io.emeraldpay.dshackle.StarterKt' args = [] ports = ['2448', '2449', '8545'] diff --git a/src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt b/src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt index 727b13153..50e700345 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt @@ -25,6 +25,7 @@ class Defaults { const val maxMetadataSize = 16384 val timeout: Duration = Duration.ofSeconds(60) val timeoutInternal: Duration = timeout.dividedBy(4) + val internalCallsTimeout = Duration.ofSeconds(3) val retryConnection: Duration = Duration.ofSeconds(10) val grpcServerKeepAliveTime: Long = 15 // seconds val grpcServerKeepAliveTimeout: Long = 5 diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt index a174b0d6c..9f5a99d29 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt @@ -12,7 +12,9 @@ import reactor.core.scheduler.Scheduler import reactor.core.scheduler.Schedulers import java.util.concurrent.Executor import java.util.concurrent.ExecutorService -import java.util.concurrent.Executors +import java.util.concurrent.SynchronousQueue +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit @Configuration open class SchedulersConfig { @@ -79,17 +81,24 @@ open class SchedulersConfig { } private fun makePool(name: String, size: Int, monitoringConfig: MonitoringConfig): ExecutorService { - val pool = Executors.newFixedThreadPool(size, CustomizableThreadFactory("$name-")) + val cachedPool = ThreadPoolExecutor( + size, + size * threadsMultiplier, + 60L, + TimeUnit.SECONDS, + SynchronousQueue(), + CustomizableThreadFactory("$name-"), + ) return if (monitoringConfig.enableExtended) { ExecutorServiceMetrics.monitor( Metrics.globalRegistry, - pool, + cachedPool, name, Tag.of("reactor_scheduler_id", "_"), ) } else { - pool + cachedPool } } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/quorum/QuorumRequestReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/quorum/QuorumRequestReader.kt index d7667e87b..d64a9f1c8 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/quorum/QuorumRequestReader.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/quorum/QuorumRequestReader.kt @@ -178,11 +178,11 @@ class QuorumRequestReader( src.onErrorResume { err -> errorHandler.handle(api, key, err.message) - val msgError = "Error during call upstream ${api.getId()} with method ${key.method}" + val msgError = "Error during call upstream ${api.getId()} with method ${key.method}, reason - ${err.message}" if (err is ChainCallUpstreamException) { - log.debug(msgError, err) + log.debug(msgError) } else { - log.warn(msgError, err) + log.warn(msgError) } // when the call failed with an error we want to notify the quorum because diff --git a/src/main/kotlin/io/emeraldpay/dshackle/reader/CompoundReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/reader/CompoundReader.kt index 336442643..722c7bfb3 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/reader/CompoundReader.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/reader/CompoundReader.kt @@ -41,7 +41,7 @@ class CompoundReader ( .flatMap({ rdr -> rdr.read(key) .timeout(Defaults.timeoutInternal, Mono.empty()) - .doOnError { t -> log.warn("Failed to read from $rdr", t) } + .doOnError { t -> log.warn("Failed to read from {}, reason - {}", rdr, t.message) } }, 1,) .next() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/HttpReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/HttpReader.kt index 1ea637792..69cfaff9c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/HttpReader.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/HttpReader.kt @@ -15,6 +15,7 @@ import java.io.ByteArrayInputStream import java.security.KeyStore import java.security.cert.CertificateFactory import java.security.cert.X509Certificate +import java.time.Duration import java.util.Base64 import java.util.function.Consumer import java.util.function.Function @@ -31,7 +32,8 @@ abstract class HttpReader( init { val connectionProvider = ConnectionProvider.builder("dshackleConnectionPool") .maxConnections(1500) - .pendingAcquireMaxCount(10000) + .pendingAcquireMaxCount(1000) + .pendingAcquireTimeout(Duration.ofSeconds(10)) .build() var build = HttpClient.create(connectionProvider) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamSettingsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamSettingsDetector.kt index 8037870ac..f26470307 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamSettingsDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamSettingsDetector.kt @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.upstream import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.module.kotlin.readValue import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.Defaults.Companion.internalCallsTimeout import io.emeraldpay.dshackle.Global import org.slf4j.LoggerFactory import reactor.core.publisher.Flux @@ -17,15 +18,25 @@ abstract class UpstreamSettingsDetector( ) { protected val log = LoggerFactory.getLogger(this::class.java) - abstract fun detectLabels(): Flux> + fun detectLabels(): Flux> { + return internalDetectLabels() + .timeout(internalCallsTimeout) + .onErrorResume { + log.warn("Couldn't detect lables of upstream {}, message - {}", upstream.getId(), it.message) + Flux.empty() + } + } + + protected abstract fun internalDetectLabels(): Flux> fun detectClientVersion(): Mono { return upstream.getIngressReader() .read(clientVersionRequest()) .flatMap(ChainResponse::requireResult) .map(::parseClientVersion) + .timeout(internalCallsTimeout) .onErrorResume { - log.warn("Can't detect the client version of upstream ${upstream.getId()}, reason - {}", it.message) + log.warn("Can't detect the client version of upstream {}, reason - {}", upstream.getId(), it.message) Mono.just(UNKNOWN_CLIENT_VERSION) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainUpstreamSettingsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainUpstreamSettingsDetector.kt index 8d8af4ef2..e9799a3c1 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainUpstreamSettingsDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainUpstreamSettingsDetector.kt @@ -21,7 +21,7 @@ class BeaconChainUpstreamSettingsDetector( ) } - override fun detectLabels(): Flux> { + override fun internalDetectLabels(): Flux> { return Flux.merge( detectNodeType(), ) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosLowerBoundService.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosLowerBoundService.kt index a5cdc691c..b45f44e33 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosLowerBoundService.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosLowerBoundService.kt @@ -1,6 +1,7 @@ package io.emeraldpay.dshackle.upstream.cosmos import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.Upstream @@ -31,10 +32,13 @@ class CosmosLowerBoundStateDetector( } override fun internalDetectLowerBound(): Flux { - return upstream.getIngressReader().read(ChainRequest("status", ListParams())).map { - val resp = Global.objectMapper.readValue(it.getResult(), CosmosStatus::class.java) - LowerBoundData(resp.syncInfo.earliestBlockHeight.toLong(), STATE) - }.toFlux() + return upstream.getIngressReader() + .read(ChainRequest("status", ListParams())) + .timeout(Defaults.internalCallsTimeout) + .map { + val resp = Global.objectMapper.readValue(it.getResult(), CosmosStatus::class.java) + LowerBoundData(resp.syncInfo.earliestBlockHeight.toLong(), STATE) + }.toFlux() } override fun types(): Set { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosUpstreamSettingsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosUpstreamSettingsDetector.kt index db8434145..61f6ac123 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosUpstreamSettingsDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosUpstreamSettingsDetector.kt @@ -14,7 +14,7 @@ import reactor.core.publisher.Flux class CosmosUpstreamSettingsDetector( upstream: Upstream, ) : BasicUpstreamSettingsDetector(upstream) { - override fun detectLabels(): Flux> { + override fun internalDetectLabels(): Flux> { return Flux.merge( detectNodeType(), ) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetector.kt index afffb61d1..0c8722f69 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetector.kt @@ -2,6 +2,7 @@ package io.emeraldpay.dshackle.upstream.ethereum import com.fasterxml.jackson.module.kotlin.readValue import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.Upstream @@ -72,6 +73,7 @@ class EthereumFinalizationDetector : FinalizationDetector { upstream .getIngressReader() .read(req) + .timeout(Defaults.internalCallsTimeout) .flatMap { it.requireResult().flatMap { result -> val block = Global.objectMapper.readValue>(result) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundBlockDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundBlockDetector.kt index ea37df1a7..572902ab1 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundBlockDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundBlockDetector.kt @@ -1,5 +1,6 @@ package io.emeraldpay.dshackle.upstream.ethereum +import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.upstream.ChainCallError import io.emeraldpay.dshackle.upstream.ChainCallUpstreamException import io.emeraldpay.dshackle.upstream.ChainRequest @@ -45,6 +46,7 @@ class EthereumLowerBoundBlockDetector( ListParams(block.toHex(), false), ), ) + .timeout(Defaults.internalCallsTimeout) .doOnNext { if (it.hasResult() && it.getResult().contentEquals("null".toByteArray())) { throw IllegalStateException(NO_BLOCK_DATA) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundLogsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundLogsDetector.kt index ed875bd2c..5349c7ce6 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundLogsDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundLogsDetector.kt @@ -1,5 +1,6 @@ package io.emeraldpay.dshackle.upstream.ethereum +import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData @@ -39,6 +40,7 @@ class EthereumLowerBoundLogsDetector( ), ), ) + .timeout(Defaults.internalCallsTimeout) .doOnNext { if (it.hasResult() && (it.getResult().contentEquals("null".toByteArray()) || it.getResult().contentEquals("[]".toByteArray()))) { throw IllegalStateException(NO_LOGS_DATA) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundStateDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundStateDetector.kt index 7fb6b985d..24ef33fb8 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundStateDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundStateDetector.kt @@ -1,5 +1,6 @@ package io.emeraldpay.dshackle.upstream.ethereum +import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.ChainResponse @@ -60,7 +61,7 @@ class EthereumLowerBoundStateDetector( "eth_getBalance", ListParams(ZERO_ADDRESS, block.toHex()), ), - ) + ).timeout(Defaults.internalCallsTimeout) }.doOnNext { if (it.hasResult() && it.getResult().contentEquals(Global.nullValue)) { throw IllegalStateException("No state data") diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt index d86c74328..a4c2d6df5 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt @@ -1,5 +1,6 @@ package io.emeraldpay.dshackle.upstream.ethereum +import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.Upstream @@ -32,6 +33,7 @@ class EthereumLowerBoundTxDetector( .read( ChainRequest("eth_getBlockByNumber", ListParams(block.toHex(), false)), ) + .timeout(Defaults.internalCallsTimeout) .doOnNext { if (it.hasResult() && it.getResult().contentEquals("null".toByteArray())) { throw IllegalStateException(NO_TX_DATA) @@ -50,6 +52,7 @@ class EthereumLowerBoundTxDetector( .read( ChainRequest("eth_getTransactionByHash", ListParams(tx)), ) + .timeout(Defaults.internalCallsTimeout) .doOnNext { if (it.hasResult() && it.getResult().contentEquals("null".toByteArray())) { throw IllegalStateException(NO_TX_DATA) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamSettingsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamSettingsDetector.kt index 140472a81..76d3f756e 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamSettingsDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamSettingsDetector.kt @@ -11,7 +11,6 @@ import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.rpcclient.ListParams import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import kotlin.text.toBigInteger const val ZERO_ADDRESS = "0x0000000000000000000000000000000000000000" @@ -21,7 +20,7 @@ class EthereumUpstreamSettingsDetector( ) : BasicEthUpstreamSettingsDetector(_upstream) { private val blockNumberReader = EthereumArchiveBlockNumberReader(upstream.getIngressReader()) - override fun detectLabels(): Flux> { + override fun internalDetectLabels(): Flux> { return Flux.merge( detectNodeType(), detectArchiveNode(), diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AggregatedUpstreamValidator.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AggregatedUpstreamValidator.kt index dacd06b67..aa921ed0b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AggregatedUpstreamValidator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AggregatedUpstreamValidator.kt @@ -1,5 +1,6 @@ package io.emeraldpay.dshackle.upstream.generic +import io.emeraldpay.dshackle.Defaults.Companion.internalCallsTimeout import io.emeraldpay.dshackle.foundation.ChainOptions import io.emeraldpay.dshackle.upstream.SingleValidator import io.emeraldpay.dshackle.upstream.Upstream @@ -21,8 +22,9 @@ class AggregatedUpstreamValidator( ) { a -> a.map { it as UpstreamAvailability } } .map(::resolve) .defaultIfEmpty(UpstreamAvailability.OK) // upstream is OK on case there are no validators + .timeout(internalCallsTimeout) .onErrorResume { - log.error("Error during upstream validation for ${upstream.getId()}", it) + log.error("Error during upstream validation for {}, reason - {}", upstream.getId(), it.message) Mono.just(UpstreamAvailability.UNAVAILABLE) } } @@ -33,13 +35,16 @@ class AggregatedUpstreamValidator( ) { a -> a.map { it as ValidateUpstreamSettingsResult } } .map(::resolve) .defaultIfEmpty(ValidateUpstreamSettingsResult.UPSTREAM_VALID) + .timeout(internalCallsTimeout) .onErrorResume { - log.error("Error during upstream validation for ${upstream.getId()}", it) + log.error("Error during upstream validation for {}, reason - {}", upstream.getId(), it.message) Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR) } } override fun validateUpstreamSettingsOnStartup(): ValidateUpstreamSettingsResult { - return validateUpstreamSettings().block() ?: ValidateUpstreamSettingsResult.UPSTREAM_VALID + return runCatching { + validateUpstreamSettings().block(internalCallsTimeout) ?: ValidateUpstreamSettingsResult.UPSTREAM_VALID + }.getOrDefault(ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericRpcHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericRpcHead.kt index 49e44743d..8412cc11c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericRpcHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericRpcHead.kt @@ -46,12 +46,15 @@ class GenericRpcHead( override fun start() { super.start() refreshSubscription?.dispose() + val base = Flux.interval(interval) + .onBackpressureDrop() .publishOn(headScheduler) .filter { !isSyncing } - .flatMap { + .concatMap { getLatestBlock(api) } + refreshSubscription = super.follow(base) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index 4d57763c1..29f4bf0a0 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -1,6 +1,7 @@ package io.emeraldpay.dshackle.upstream.generic import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.config.ChainsConfig import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.config.UpstreamsConfig.Labels @@ -35,10 +36,13 @@ import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundServiceBuilder import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType import org.springframework.context.Lifecycle +import org.springframework.scheduling.concurrent.CustomizableThreadFactory import reactor.core.Disposable import reactor.core.publisher.Flux import reactor.core.publisher.Sinks +import reactor.core.scheduler.Schedulers import java.time.Duration +import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference import java.util.function.Supplier @@ -192,6 +196,7 @@ open class GenericUpstream( }, headLivenessState.asFlux(), ) + .subscribeOn(upstreamSettingsScheduler) .distinctUntilChanged() .subscribe { when (it) { @@ -238,29 +243,35 @@ open class GenericUpstream( clientVersion.set(it) }, ) + .subscribeOn(settingsScheduler) }.subscribe() } private fun detectRpcModules(config: UpstreamsConfig.Upstream<*>, buildMethods: (UpstreamsConfig.Upstream<*>, Chain) -> CallMethods) { - val rpcDetector = rpcModulesDetector?.detectRpcModules()?.block() ?: HashMap() - log.info("Upstream rpc detector for ${getId()} returned $rpcDetector ") - if (rpcDetector.size != 0) { - var changed = false - for ((group, _) in rpcDetector) { - if (group == "trace" || group == "debug" || group == "filter") { - if (config.methodGroups == null) { - config.methodGroups = UpstreamsConfig.MethodGroups(setOf("filter"), setOf()) - } else { - val disabled = config.methodGroups!!.disabled - val enabled = config.methodGroups!!.enabled - if (!disabled.contains(group) && !enabled.contains(group)) { - config.methodGroups!!.enabled = enabled.plus(group) - changed = true + try { + val rpcDetector = rpcModulesDetector?.detectRpcModules()?.block(Defaults.internalCallsTimeout) + ?: HashMap() + log.info("Upstream rpc detector for ${getId()} returned $rpcDetector ") + if (rpcDetector.size != 0) { + var changed = false + for ((group, _) in rpcDetector) { + if (group == "trace" || group == "debug" || group == "filter") { + if (config.methodGroups == null) { + config.methodGroups = UpstreamsConfig.MethodGroups(setOf("filter"), setOf()) + } else { + val disabled = config.methodGroups!!.disabled + val enabled = config.methodGroups!!.enabled + if (!disabled.contains(group) && !enabled.contains(group)) { + config.methodGroups!!.enabled = enabled.plus(group) + changed = true + } } } } + if (changed) updateMethods(buildMethods(config, getChain())) } - if (changed) updateMethods(buildMethods(config, getChain())) + } catch (e: RuntimeException) { + log.error("Couldn't detect rpc modules of upstream {} due to error {}", getId(), e.message) } } @@ -331,14 +342,17 @@ open class GenericUpstream( private fun detectFinalization() { finalizationDetectorSubscription = - finalizationDetector.detectFinalization(this, chainConfig.expectedBlockTime, getChain()).subscribe { - sendUpstreamStateEvent(UPDATED) - } + finalizationDetector.detectFinalization(this, chainConfig.expectedBlockTime, getChain()) + .subscribeOn(finalizationScheduler) + .subscribe { + sendUpstreamStateEvent(UPDATED) + } } private fun detectLowerBlock() { lowerBlockDetectorSubscription = lowerBoundService.detectLowerBounds() + .subscribeOn(lowerScheduler) .subscribe { sendUpstreamStateEvent(UPDATED) } @@ -359,4 +373,15 @@ open class GenericUpstream( } fun isValid(): Boolean = isUpstreamValid.get() + + companion object { + private val upstreamSettingsScheduler = + Schedulers.fromExecutor(Executors.newFixedThreadPool(4, CustomizableThreadFactory("upstreamSettings-"))) + private val finalizationScheduler = + Schedulers.fromExecutor(Executors.newFixedThreadPool(4, CustomizableThreadFactory("finalization-"))) + private val settingsScheduler = + Schedulers.fromExecutor(Executors.newFixedThreadPool(4, CustomizableThreadFactory("settings-"))) + private val lowerScheduler = + Schedulers.fromExecutor(Executors.newFixedThreadPool(4, CustomizableThreadFactory("lowerBound-"))) + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearLowerBoundStateDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearLowerBoundStateDetector.kt index e3c9e12fd..51dd40186 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearLowerBoundStateDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearLowerBoundStateDetector.kt @@ -1,5 +1,6 @@ package io.emeraldpay.dshackle.upstream.near +import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.Upstream @@ -18,17 +19,20 @@ class NearLowerBoundStateDetector( } override fun internalDetectLowerBound(): Flux { - return upstream.getIngressReader().read(ChainRequest("status", ListParams())).map { - val resp = Global.objectMapper.readValue(it.getResult(), NearStatus::class.java) - resp.syncInfo.earliestHeight - }.flatMapMany { - Flux.fromIterable( - listOf( - LowerBoundData(it, LowerBoundType.STATE), - LowerBoundData(it, LowerBoundType.BLOCK), - ), - ) - } + return upstream.getIngressReader() + .read(ChainRequest("status", ListParams())) + .timeout(Defaults.internalCallsTimeout) + .map { + val resp = Global.objectMapper.readValue(it.getResult(), NearStatus::class.java) + resp.syncInfo.earliestHeight + }.flatMapMany { + Flux.fromIterable( + listOf( + LowerBoundData(it, LowerBoundType.STATE), + LowerBoundData(it, LowerBoundType.BLOCK), + ), + ) + } } override fun types(): Set { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearUpstreamSettingsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearUpstreamSettingsDetector.kt index 408eb580d..120667fa1 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearUpstreamSettingsDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearUpstreamSettingsDetector.kt @@ -16,7 +16,7 @@ import reactor.core.publisher.Flux class NearUpstreamSettingsDetector( upstream: Upstream, ) : BasicUpstreamSettingsDetector(upstream) { - override fun detectLabels(): Flux> { + override fun internalDetectLabels(): Flux> { return Flux.merge( detectNodeType(), ) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotLowerBoundStateDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotLowerBoundStateDetector.kt index 2192953ae..36b33dc49 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotLowerBoundStateDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotLowerBoundStateDetector.kt @@ -1,5 +1,6 @@ package io.emeraldpay.dshackle.upstream.polkadot +import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.ChainResponse import io.emeraldpay.dshackle.upstream.Upstream @@ -34,6 +35,7 @@ class PolkadotLowerBoundStateDetector( ListParams(block.toHex()), // in polkadot state methods work only with hash ), ) + .timeout(Defaults.internalCallsTimeout) .flatMap(ChainResponse::requireResult) .map { String(it, 1, it.size - 2) @@ -44,7 +46,7 @@ class PolkadotLowerBoundStateDetector( "state_getMetadata", ListParams(it), ), - ) + ).timeout(Defaults.internalCallsTimeout) } } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundSlotDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundSlotDetector.kt index cfa679ff6..e79045b7b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundSlotDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundSlotDetector.kt @@ -1,5 +1,6 @@ package io.emeraldpay.dshackle.upstream.solana +import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.ChainResponse @@ -28,7 +29,7 @@ class SolanaLowerBoundSlotDetector( .flatMap { it.read( ChainRequest("getFirstAvailableBlock", ListParams()), // in case of solana we talk about the slot of the lowest confirmed block - ) + ).timeout(Defaults.internalCallsTimeout) } .flatMap(ChainResponse::requireResult) .map { @@ -53,6 +54,7 @@ class SolanaLowerBoundSlotDetector( ), ), ) + .timeout(Defaults.internalCallsTimeout) .flatMap(ChainResponse::requireResult) .flatMapMany { blockData -> val block = Global.objectMapper.readValue(blockData, SolanaBlock::class.java) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaUpstreamSettingsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaUpstreamSettingsDetector.kt index 2e4d15300..b0e1ee99e 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaUpstreamSettingsDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaUpstreamSettingsDetector.kt @@ -16,7 +16,7 @@ import reactor.core.publisher.Flux class SolanaUpstreamSettingsDetector( upstream: Upstream, ) : BasicUpstreamSettingsDetector(upstream) { - override fun detectLabels(): Flux> { + override fun internalDetectLabels(): Flux> { return Flux.merge( detectNodeType(), ) diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamSettingsDetectorSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamSettingsDetectorSpec.groovy index 67c93bddf..8c7b8fd72 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamSettingsDetectorSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamSettingsDetectorSpec.groovy @@ -40,7 +40,7 @@ class EthereumUpstreamSettingsDetectorSpec extends Specification { def detector = new EthereumUpstreamSettingsDetector(up, Chain.ETHEREUM__MAINNET) when: - def act = detector.detectLabels() + def act = detector.internalDetectLabels() then: StepVerifier.create(act) .expectNext( @@ -72,7 +72,7 @@ class EthereumUpstreamSettingsDetectorSpec extends Specification { def detector = new EthereumUpstreamSettingsDetector(up, Chain.ETHEREUM__MAINNET) when: - def act = detector.detectLabels() + def act = detector.internalDetectLabels() then: StepVerifier.create(act) .expectNext( @@ -113,7 +113,7 @@ class EthereumUpstreamSettingsDetectorSpec extends Specification { } def detector = new EthereumUpstreamSettingsDetector(up, Chain.ETHEREUM__MAINNET) when: - def act = detector.detectLabels() + def act = detector.internalDetectLabels() then: StepVerifier.create(act) .expectNext(new Pair("archive", "false")) diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadTest.kt index 49ddb9877..b405bad2a 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadTest.kt @@ -444,10 +444,10 @@ class GenericWsHeadTest { StepVerifier.create(wsHead.headLiveness()) .expectNext(HeadLivenessState.FATAL_ERROR) .thenCancel() - .verify(Duration.ofSeconds(1)) + .verify(Duration.ofSeconds(2)) } .thenCancel() - .verify(Duration.ofSeconds(1)) + .verify(Duration.ofSeconds(3)) verify(connection, times(2)).callRpc(ChainRequest("eth_chainId", ListParams())) verify(connection, times(2)).callRpc(ChainRequest("net_version", ListParams()))