From 14b146bac077e55db4cbc25ac208e8788cb52e44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB?= Date: Wed, 25 Oct 2023 14:59:57 +0400 Subject: [PATCH 1/2] Check if config was reloaded, remove all upstream metrics if it was removed from ms --- .../config/reload/ReloadConfigSetup.kt | 16 ++++++++++--- .../reload/ReloadConfigUpstreamService.kt | 2 +- .../dshackle/reader/JsonRpcReader.kt | 4 ++++ .../dshackle/startup/ConfiguredUpstreams.kt | 3 +-- .../dshackle/upstream/AbstractHead.kt | 23 +++++++++++-------- .../dshackle/upstream/HttpFactory.kt | 4 ++-- .../dshackle/upstream/HttpRpcFactory.kt | 4 ++-- .../upstream/bitcoin/BitcoinRpcUpstream.kt | 4 +++- .../generic/connectors/GenericRpcConnector.kt | 4 +++- .../upstream/rpcclient/JsonRpcHttpClient.kt | 10 ++++++-- .../dshackle/upstream/FilteredApisSpec.groovy | 3 ++- .../config/reload/ReloadConfigTest.kt | 23 ++++++++++++++++++- 12 files changed, 75 insertions(+), 25 deletions(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigSetup.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigSetup.kt index 7d1fcb3fe..5f3ad6b12 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigSetup.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigSetup.kt @@ -43,9 +43,13 @@ class ReloadConfigSetup( try { log.info("Reloading config...") - reloadConfig() + val reloaded = reloadConfig() - log.info("Config is reloaded") + if (reloaded) { + log.info("Config is reloaded") + } else { + log.info("There is nothing to reload, config is the same") + } } finally { reloadLock.unlock() } @@ -54,10 +58,14 @@ class ReloadConfigSetup( } } - private fun reloadConfig() { + private fun reloadConfig(): Boolean { val newUpstreamsConfig = reloadConfigService.readUpstreamsConfig() val currentUpstreamsConfig = reloadConfigService.currentUpstreamsConfig() + if (newUpstreamsConfig == currentUpstreamsConfig) { + return false + } + val chainsToReload = analyzeDefaultOptions( currentUpstreamsConfig.defaultOptions, newUpstreamsConfig.defaultOptions, @@ -76,6 +84,8 @@ class ReloadConfigSetup( reloadConfigUpstreamService.reloadUpstreams(chainsToReload, upstreamsToRemove, upstreamsToAdd, newUpstreamsConfig) reloadConfigService.updateUpstreamsConfig(newUpstreamsConfig) + + return true } private fun analyzeUpstreams( diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigUpstreamService.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigUpstreamService.kt index a877782e7..7f32ca159 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigUpstreamService.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigUpstreamService.kt @@ -10,7 +10,7 @@ import org.springframework.context.ApplicationEventPublisher import org.springframework.stereotype.Component @Component -class ReloadConfigUpstreamService( +open class ReloadConfigUpstreamService( private val eventPublisher: ApplicationEventPublisher, private val multistreamHolder: CurrentMultistreamHolder, private val configuredUpstreams: ConfiguredUpstreams, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/reader/JsonRpcReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/reader/JsonRpcReader.kt index 00017c7ec..89333b6ee 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/reader/JsonRpcReader.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/reader/JsonRpcReader.kt @@ -4,3 +4,7 @@ import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse typealias JsonRpcReader = Reader + +interface JsonRpcHttpReader : JsonRpcReader { + fun onStop() +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt index 4ea6f14b9..2c15e3455 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt @@ -38,7 +38,6 @@ import io.emeraldpay.dshackle.config.UpstreamsConfig.HttpEndpoint import io.emeraldpay.dshackle.config.UpstreamsConfig.RpcConnection import io.emeraldpay.dshackle.foundation.ChainOptions import io.emeraldpay.dshackle.foundation.ChainOptions.Options -import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.upstream.BlockValidator import io.emeraldpay.dshackle.upstream.CallTargetsHolder import io.emeraldpay.dshackle.upstream.Head @@ -343,7 +342,7 @@ open class ConfiguredUpstreams( log.warn("Upstream doesn't have API configuration") return null } - val directApi: JsonRpcReader = httpFactory.create(config.id, chain) + val directApi = httpFactory.create(config.id, chain) val esplora = conn.esplora?.let { endpoint -> val tls = endpoint.tls?.let { tls -> tls.ca?.let { ca -> diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt index 7dba456d5..3ab362dee 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt @@ -18,6 +18,7 @@ package io.emeraldpay.dshackle.upstream import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice import io.micrometer.core.instrument.Gauge +import io.micrometer.core.instrument.Meter import io.micrometer.core.instrument.Metrics import org.slf4j.LoggerFactory import reactor.core.Disposable @@ -32,7 +33,6 @@ import java.util.concurrent.Executors import java.util.concurrent.Future import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.locks.ReentrantLock abstract class AbstractHead @JvmOverloads constructor( @@ -57,14 +57,24 @@ abstract class AbstractHead @JvmOverloads constructor( private var future: Future<*>? = null private val delayed = AtomicBoolean(false) + private val metrics = mutableSetOf() + init { val className = this.javaClass.simpleName Gauge.builder("stuck_head", delayed) { if (it.get()) 1.0 else 0.0 - }.tag("upstream", upstreamId).tag("class", className).register(Metrics.globalRegistry) + } + .tag("upstream", upstreamId) + .tag("class", className) + .register(Metrics.globalRegistry) + .also { metrics.add(it) } Gauge.builder("current_head", forkChoice) { it.getHead()?.height?.toDouble() ?: 0.0 - }.tag("upstream", upstreamId).tag("class", className).register(Metrics.globalRegistry) + } + .tag("upstream", upstreamId) + .tag("class", className) + .register(Metrics.globalRegistry) + .also { metrics.add(it) } } fun follow(source: Flux): Disposable { @@ -147,6 +157,7 @@ abstract class AbstractHead @JvmOverloads constructor( it.cancel(true) } future = null + metrics.forEach { Metrics.globalRegistry.remove(it) } } protected open fun onNoHeadUpdates() { @@ -176,10 +187,4 @@ abstract class AbstractHead @JvmOverloads constructor( ) } } - - private fun toHeadCountMetric(counter: AtomicInteger, status: String) { - Gauge.builder("head_count", counter) { - it.get().toDouble() - }.tag("class", this.javaClass.simpleName).tag("status", status).register(Metrics.globalRegistry) - } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/HttpFactory.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/HttpFactory.kt index e0257d493..11289871b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/HttpFactory.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/HttpFactory.kt @@ -1,8 +1,8 @@ package io.emeraldpay.dshackle.upstream import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.reader.JsonRpcReader +import io.emeraldpay.dshackle.reader.JsonRpcHttpReader interface HttpFactory { - fun create(id: String?, chain: Chain): JsonRpcReader + fun create(id: String?, chain: Chain): JsonRpcHttpReader } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/HttpRpcFactory.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/HttpRpcFactory.kt index 136502273..c325b1a30 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/HttpRpcFactory.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/HttpRpcFactory.kt @@ -2,7 +2,7 @@ package io.emeraldpay.dshackle.upstream import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.config.AuthConfig -import io.emeraldpay.dshackle.reader.JsonRpcReader +import io.emeraldpay.dshackle.reader.JsonRpcHttpReader import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcHttpClient import io.emeraldpay.dshackle.upstream.rpcclient.RpcMetrics import io.micrometer.core.instrument.Counter @@ -15,7 +15,7 @@ open class HttpRpcFactory( private val basicAuth: AuthConfig.ClientBasicAuth?, private val tls: ByteArray?, ) : HttpFactory { - override fun create(id: String?, chain: Chain): JsonRpcReader { + override fun create(id: String?, chain: Chain): JsonRpcHttpReader { val metricsTags = listOf( // "unknown" is not supposed to happen Tag.of("upstream", id ?: "unknown"), diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt index 2d9b5d8a8..18c05c95c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt @@ -19,6 +19,7 @@ import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.config.ChainsConfig import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.foundation.ChainOptions +import io.emeraldpay.dshackle.reader.JsonRpcHttpReader import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.startup.QuorumForLabels import io.emeraldpay.dshackle.upstream.Capability @@ -32,7 +33,7 @@ import reactor.core.Disposable open class BitcoinRpcUpstream( id: String, chain: Chain, - private val directApi: JsonRpcReader, + private val directApi: JsonRpcHttpReader, private val head: Head, options: ChainOptions.Options, role: UpstreamsConfig.UpstreamRole, @@ -115,5 +116,6 @@ open class BitcoinRpcUpstream( head.stop() } validatorSubscription?.dispose() + directApi.onStop() } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt index 3292d45ca..43ee5ea06 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt @@ -2,6 +2,7 @@ package io.emeraldpay.dshackle.upstream.generic.connectors import io.emeraldpay.dshackle.cache.Caches import io.emeraldpay.dshackle.cache.CachesEnabled +import io.emeraldpay.dshackle.reader.JsonRpcHttpReader import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.upstream.BlockValidator import io.emeraldpay.dshackle.upstream.DefaultUpstream @@ -31,7 +32,7 @@ import java.time.Duration class GenericRpcConnector( connectorType: ConnectorMode, - private val directReader: JsonRpcReader, + private val directReader: JsonRpcHttpReader, wsFactory: WsConnectionPoolFactory?, upstream: DefaultUpstream, forkChoice: ForkChoice, @@ -137,6 +138,7 @@ class GenericRpcConnector( head.stop() } pool?.close() + directReader.onStop() } override fun getIngressReader(): JsonRpcReader { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt index abd9601dc..e111176de 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt @@ -16,9 +16,10 @@ package io.emeraldpay.dshackle.upstream.rpcclient import io.emeraldpay.dshackle.config.AuthConfig -import io.emeraldpay.dshackle.reader.JsonRpcReader +import io.emeraldpay.dshackle.reader.JsonRpcHttpReader import io.emeraldpay.etherjar.rpc.RpcException import io.emeraldpay.etherjar.rpc.RpcResponseError +import io.micrometer.core.instrument.Metrics import io.netty.buffer.Unpooled import io.netty.handler.codec.http.HttpHeaderNames import io.netty.handler.codec.http.HttpHeaders @@ -47,7 +48,7 @@ class JsonRpcHttpClient( private val metrics: RpcMetrics, basicAuth: AuthConfig.ClientBasicAuth? = null, tlsCAAuth: ByteArray? = null, -) : JsonRpcReader { +) : JsonRpcHttpReader { private val parser = ResponseRpcParser() private val httpClient: HttpClient @@ -104,6 +105,11 @@ class JsonRpcHttpClient( }.single() } + override fun onStop() { + Metrics.globalRegistry.remove(metrics.timer) + Metrics.globalRegistry.remove(metrics.fails) + } + override fun read(key: JsonRpcRequest): Mono { val startTime = StopWatch() return Mono.just(key) diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy index a69eba342..dd8812ef8 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy @@ -19,6 +19,7 @@ package io.emeraldpay.dshackle.upstream import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.config.ChainsConfig import io.emeraldpay.dshackle.config.UpstreamsConfig +import io.emeraldpay.dshackle.reader.JsonRpcHttpReader import io.emeraldpay.dshackle.startup.QuorumForLabels import io.emeraldpay.dshackle.test.EthereumApiStub import io.emeraldpay.dshackle.test.TestingCommons @@ -51,7 +52,7 @@ class FilteredApisSpec extends Specification { [test: "baz"] ].collect { def httpFactory = Mock(HttpFactory) { - create(_, _) >> TestingCommons.api().tap { it.id = "${i++}" } + create(_, _) >> Stub(JsonRpcHttpReader) } def connectorFactory = new GenericConnectorFactory( GenericConnectorFactory.ConnectorMode.RPC_ONLY, diff --git a/src/test/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigTest.kt index 1886f26f2..458fa265e 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigTest.kt @@ -17,8 +17,10 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.mockito.ArgumentCaptor +import org.mockito.kotlin.any import org.mockito.kotlin.doReturn import org.mockito.kotlin.mock +import org.mockito.kotlin.never import org.mockito.kotlin.times import org.mockito.kotlin.verify import org.mockito.kotlin.whenever @@ -99,7 +101,7 @@ class ReloadConfigTest { } @Test - fun `stop multistream of there are no upstreams left`() { + fun `stop multistream if there are no upstreams left`() { val up1 = upstream("local1") val up2 = upstream("local2") val up3 = upstream("local3") @@ -154,6 +156,25 @@ class ReloadConfigTest { ) } + @Test + fun `reload the same config cause to nothing`() { + val initialConfigFile = ResourceUtils.getFile("classpath:configs/upstreams-initial.yaml") + val initialConfig = upstreamsConfigReader.read(initialConfigFile.inputStream())!! + mainConfig.upstreams = initialConfig + + val reloadConfigUpstreamService = mock() + + val reloadConfig = ReloadConfigSetup(reloadConfigService, reloadConfigUpstreamService) + + whenever(config.getConfigPath()).thenReturn(initialConfigFile) + + reloadConfig.handle(Signal("HUP")) + + verify(reloadConfigUpstreamService, never()).reloadUpstreams(any(), any(), any(), any()) + + assertEquals(initialConfig, mainConfig.upstreams) + } + private fun upstream(id: String): Upstream = mock { on { getId() } doReturn id From 418fe6db3f26ef4a0505a2eb49cc7c11c4301ba9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB?= Date: Wed, 25 Oct 2023 15:20:57 +0400 Subject: [PATCH 2/2] Refactoring --- .../io/emeraldpay/dshackle/config/reload/ReloadConfigSetup.kt | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigSetup.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigSetup.kt index 5f3ad6b12..47ad7f164 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigSetup.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigSetup.kt @@ -43,9 +43,7 @@ class ReloadConfigSetup( try { log.info("Reloading config...") - val reloaded = reloadConfig() - - if (reloaded) { + if (reloadConfig()) { log.info("Config is reloaded") } else { log.info("There is nothing to reload, config is the same")