Skip to content

Commit

Permalink
Check if config was reloaded, remove all upstream metrics if it was r… (
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Oct 25, 2023
1 parent 788db75 commit 8aa89a1
Show file tree
Hide file tree
Showing 12 changed files with 74 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ class ReloadConfigSetup(
try {
log.info("Reloading config...")

reloadConfig()

log.info("Config is reloaded")
if (reloadConfig()) {
log.info("Config is reloaded")
} else {
log.info("There is nothing to reload, config is the same")
}
} finally {
reloadLock.unlock()
}
Expand All @@ -54,10 +56,14 @@ class ReloadConfigSetup(
}
}

private fun reloadConfig() {
private fun reloadConfig(): Boolean {
val newUpstreamsConfig = reloadConfigService.readUpstreamsConfig()
val currentUpstreamsConfig = reloadConfigService.currentUpstreamsConfig()

if (newUpstreamsConfig == currentUpstreamsConfig) {
return false
}

val chainsToReload = analyzeDefaultOptions(
currentUpstreamsConfig.defaultOptions,
newUpstreamsConfig.defaultOptions,
Expand All @@ -76,6 +82,8 @@ class ReloadConfigSetup(
reloadConfigUpstreamService.reloadUpstreams(chainsToReload, upstreamsToRemove, upstreamsToAdd, newUpstreamsConfig)

reloadConfigService.updateUpstreamsConfig(newUpstreamsConfig)

return true
}

private fun analyzeUpstreams(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.springframework.context.ApplicationEventPublisher
import org.springframework.stereotype.Component

@Component
class ReloadConfigUpstreamService(
open class ReloadConfigUpstreamService(
private val eventPublisher: ApplicationEventPublisher,
private val multistreamHolder: CurrentMultistreamHolder,
private val configuredUpstreams: ConfiguredUpstreams,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse

typealias JsonRpcReader = Reader<JsonRpcRequest, JsonRpcResponse>

interface JsonRpcHttpReader : JsonRpcReader {
fun onStop()
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import io.emeraldpay.dshackle.config.UpstreamsConfig.HttpEndpoint
import io.emeraldpay.dshackle.config.UpstreamsConfig.RpcConnection
import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.foundation.ChainOptions.Options
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.upstream.BlockValidator
import io.emeraldpay.dshackle.upstream.CallTargetsHolder
import io.emeraldpay.dshackle.upstream.Head
Expand Down Expand Up @@ -343,7 +342,7 @@ open class ConfiguredUpstreams(
log.warn("Upstream doesn't have API configuration")
return null
}
val directApi: JsonRpcReader = httpFactory.create(config.id, chain)
val directApi = httpFactory.create(config.id, chain)
val esplora = conn.esplora?.let { endpoint ->
val tls = endpoint.tls?.let { tls ->
tls.ca?.let { ca ->
Expand Down
23 changes: 14 additions & 9 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.emeraldpay.dshackle.upstream
import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice
import io.micrometer.core.instrument.Gauge
import io.micrometer.core.instrument.Meter
import io.micrometer.core.instrument.Metrics
import org.slf4j.LoggerFactory
import reactor.core.Disposable
Expand All @@ -32,7 +33,6 @@ import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock

abstract class AbstractHead @JvmOverloads constructor(
Expand All @@ -57,14 +57,24 @@ abstract class AbstractHead @JvmOverloads constructor(
private var future: Future<*>? = null
private val delayed = AtomicBoolean(false)

private val metrics = mutableSetOf<Meter>()

init {
val className = this.javaClass.simpleName
Gauge.builder("stuck_head", delayed) {
if (it.get()) 1.0 else 0.0
}.tag("upstream", upstreamId).tag("class", className).register(Metrics.globalRegistry)
}
.tag("upstream", upstreamId)
.tag("class", className)
.register(Metrics.globalRegistry)
.also { metrics.add(it) }
Gauge.builder("current_head", forkChoice) {
it.getHead()?.height?.toDouble() ?: 0.0
}.tag("upstream", upstreamId).tag("class", className).register(Metrics.globalRegistry)
}
.tag("upstream", upstreamId)
.tag("class", className)
.register(Metrics.globalRegistry)
.also { metrics.add(it) }
}

fun follow(source: Flux<BlockContainer>): Disposable {
Expand Down Expand Up @@ -147,6 +157,7 @@ abstract class AbstractHead @JvmOverloads constructor(
it.cancel(true)
}
future = null
metrics.forEach { Metrics.globalRegistry.remove(it) }
}

protected open fun onNoHeadUpdates() {
Expand Down Expand Up @@ -176,10 +187,4 @@ abstract class AbstractHead @JvmOverloads constructor(
)
}
}

private fun toHeadCountMetric(counter: AtomicInteger, status: String) {
Gauge.builder("head_count", counter) {
it.get().toDouble()
}.tag("class", this.javaClass.simpleName).tag("status", status).register(Metrics.globalRegistry)
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.emeraldpay.dshackle.upstream

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.reader.JsonRpcHttpReader

interface HttpFactory {
fun create(id: String?, chain: Chain): JsonRpcReader
fun create(id: String?, chain: Chain): JsonRpcHttpReader
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.emeraldpay.dshackle.upstream

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.config.AuthConfig
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.reader.JsonRpcHttpReader
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcHttpClient
import io.emeraldpay.dshackle.upstream.rpcclient.RpcMetrics
import io.micrometer.core.instrument.Counter
Expand All @@ -15,7 +15,7 @@ open class HttpRpcFactory(
private val basicAuth: AuthConfig.ClientBasicAuth?,
private val tls: ByteArray?,
) : HttpFactory {
override fun create(id: String?, chain: Chain): JsonRpcReader {
override fun create(id: String?, chain: Chain): JsonRpcHttpReader {
val metricsTags = listOf(
// "unknown" is not supposed to happen
Tag.of("upstream", id ?: "unknown"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.config.ChainsConfig
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.reader.JsonRpcHttpReader
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.startup.QuorumForLabels
import io.emeraldpay.dshackle.upstream.Capability
Expand All @@ -32,7 +33,7 @@ import reactor.core.Disposable
open class BitcoinRpcUpstream(
id: String,
chain: Chain,
private val directApi: JsonRpcReader,
private val directApi: JsonRpcHttpReader,
private val head: Head,
options: ChainOptions.Options,
role: UpstreamsConfig.UpstreamRole,
Expand Down Expand Up @@ -115,5 +116,6 @@ open class BitcoinRpcUpstream(
head.stop()
}
validatorSubscription?.dispose()
directApi.onStop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.emeraldpay.dshackle.upstream.generic.connectors

import io.emeraldpay.dshackle.cache.Caches
import io.emeraldpay.dshackle.cache.CachesEnabled
import io.emeraldpay.dshackle.reader.JsonRpcHttpReader
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.upstream.BlockValidator
import io.emeraldpay.dshackle.upstream.DefaultUpstream
Expand Down Expand Up @@ -31,7 +32,7 @@ import java.time.Duration

class GenericRpcConnector(
connectorType: ConnectorMode,
private val directReader: JsonRpcReader,
private val directReader: JsonRpcHttpReader,
wsFactory: WsConnectionPoolFactory?,
upstream: DefaultUpstream,
forkChoice: ForkChoice,
Expand Down Expand Up @@ -137,6 +138,7 @@ class GenericRpcConnector(
head.stop()
}
pool?.close()
directReader.onStop()
}

override fun getIngressReader(): JsonRpcReader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
package io.emeraldpay.dshackle.upstream.rpcclient

import io.emeraldpay.dshackle.config.AuthConfig
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.reader.JsonRpcHttpReader
import io.emeraldpay.etherjar.rpc.RpcException
import io.emeraldpay.etherjar.rpc.RpcResponseError
import io.micrometer.core.instrument.Metrics
import io.netty.buffer.Unpooled
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaders
Expand Down Expand Up @@ -47,7 +48,7 @@ class JsonRpcHttpClient(
private val metrics: RpcMetrics,
basicAuth: AuthConfig.ClientBasicAuth? = null,
tlsCAAuth: ByteArray? = null,
) : JsonRpcReader {
) : JsonRpcHttpReader {

private val parser = ResponseRpcParser()
private val httpClient: HttpClient
Expand Down Expand Up @@ -104,6 +105,11 @@ class JsonRpcHttpClient(
}.single()
}

override fun onStop() {
Metrics.globalRegistry.remove(metrics.timer)
Metrics.globalRegistry.remove(metrics.fails)
}

override fun read(key: JsonRpcRequest): Mono<JsonRpcResponse> {
val startTime = StopWatch()
return Mono.just(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package io.emeraldpay.dshackle.upstream
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.config.ChainsConfig
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.reader.JsonRpcHttpReader
import io.emeraldpay.dshackle.startup.QuorumForLabels
import io.emeraldpay.dshackle.test.EthereumApiStub
import io.emeraldpay.dshackle.test.TestingCommons
Expand Down Expand Up @@ -51,7 +52,7 @@ class FilteredApisSpec extends Specification {
[test: "baz"]
].collect {
def httpFactory = Mock(HttpFactory) {
create(_, _) >> TestingCommons.api().tap { it.id = "${i++}" }
create(_, _) >> Stub(JsonRpcHttpReader)
}
def connectorFactory = new GenericConnectorFactory(
GenericConnectorFactory.ConnectorMode.RPC_ONLY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.mockito.ArgumentCaptor
import org.mockito.kotlin.any
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.mock
import org.mockito.kotlin.never
import org.mockito.kotlin.times
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
Expand Down Expand Up @@ -99,7 +101,7 @@ class ReloadConfigTest {
}

@Test
fun `stop multistream of there are no upstreams left`() {
fun `stop multistream if there are no upstreams left`() {
val up1 = upstream("local1")
val up2 = upstream("local2")
val up3 = upstream("local3")
Expand Down Expand Up @@ -154,6 +156,25 @@ class ReloadConfigTest {
)
}

@Test
fun `reload the same config cause to nothing`() {
val initialConfigFile = ResourceUtils.getFile("classpath:configs/upstreams-initial.yaml")
val initialConfig = upstreamsConfigReader.read(initialConfigFile.inputStream())!!
mainConfig.upstreams = initialConfig

val reloadConfigUpstreamService = mock<ReloadConfigUpstreamService>()

val reloadConfig = ReloadConfigSetup(reloadConfigService, reloadConfigUpstreamService)

whenever(config.getConfigPath()).thenReturn(initialConfigFile)

reloadConfig.handle(Signal("HUP"))

verify(reloadConfigUpstreamService, never()).reloadUpstreams(any(), any(), any(), any())

assertEquals(initialConfig, mainConfig.upstreams)
}

private fun upstream(id: String): Upstream =
mock {
on { getId() } doReturn id
Expand Down

0 comments on commit 8aa89a1

Please sign in to comment.