Skip to content

Commit

Permalink
Fix reload config
Browse files Browse the repository at this point in the history
  • Loading branch information
Кирилл committed Nov 15, 2023
1 parent 2e80da5 commit 739d9b4
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,9 @@ class ReloadConfigSetup(
)

val upstreamsToRemove = upstreamsAnalyzeData.removed
.plus(upstreamsAnalyzeData.reloaded)
.filterNot { chainsToReload.contains(it.second) }
.toSet()
val upstreamsToAdd = upstreamsAnalyzeData.added
.plus(upstreamsAnalyzeData.reloaded.map { it.first })

reloadConfigUpstreamService.reloadUpstreams(chainsToReload, upstreamsToRemove, upstreamsToAdd, newUpstreamsConfig)

Expand Down Expand Up @@ -107,9 +106,12 @@ class ReloadConfigSetup(
}
}

val added = newUpstreamsMap.minus(currentUpstreamsMap.keys).mapTo(mutableSetOf()) { it.key.first }
val added = newUpstreamsMap
.minus(currentUpstreamsMap.keys)
.mapTo(mutableSetOf()) { it.key }
.plus(reloaded)

return UpstreamAnalyzeData(added, removed, reloaded)
return UpstreamAnalyzeData(added, removed.plus(reloaded))
}

private fun analyzeDefaultOptions(
Expand Down Expand Up @@ -158,8 +160,7 @@ class ReloadConfigSetup(
}

private data class UpstreamAnalyzeData(
val added: Set<String> = emptySet(),
val added: Set<Pair<String, Chain>> = emptySet(),
val removed: Set<Pair<String, Chain>> = emptySet(),
val reloaded: Set<Pair<String, Chain>> = emptySet(),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,30 @@ open class ReloadConfigUpstreamService(

fun reloadUpstreams(
chainsToReload: Set<Chain>,
upstreamsToRemove: List<Pair<String, Chain>>,
upstreamsToAdd: Set<String>,
upstreamsToRemove: Set<Pair<String, Chain>>,
upstreamsToAdd: Set<Pair<String, Chain>>,
newUpstreamsConfig: UpstreamsConfig,
) {
val usedChains = removeUpstreams(chainsToReload, upstreamsToRemove)

addUpstreams(newUpstreamsConfig, chainsToReload, upstreamsToAdd)
addUpstreams(newUpstreamsConfig, chainsToReload, upstreamsToAdd.map { it.first }.toSet())

usedChains.forEach { chain ->
val upstreamsToRemovePerChain = upstreamsToRemove.filter { it.second == chain }
val upstreamsToAddPerChain = upstreamsToAdd.filter { it.second == chain }

usedChains.forEach {
multistreamHolder.getUpstream(it)
.run {
if (!this.haveUpstreams() && this.isRunning()) {
if (upstreamsToAddPerChain.isEmpty() && upstreamsToRemovePerChain.isNotEmpty()) {
multistreamHolder.getUpstream(chain)
.run {
this.stop()
}
}
}
}
}

private fun removeUpstreams(
chainsToReload: Set<Chain>,
upstreamsToRemove: List<Pair<String, Chain>>,
upstreamsToRemove: Set<Pair<String, Chain>>,
): Set<Chain> {
val usedChains = mutableSetOf<Chain>()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ open class GenericMultistream(
head.removeHead(upstreamId)
}

override fun isRunning(): Boolean {
return super.isRunning() || cachingReader.isRunning()
}

override fun makeLagObserver(): HeadLagObserver =
HeadLagObserver(head, upstreams, DistanceExtractor::extractPriorityDistance, headScheduler, 6).apply {
start()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package io.emeraldpay.dshackle.config.reload

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Chain.ETHEREUM__MAINNET
import io.emeraldpay.dshackle.Chain.POLYGON__MAINNET
import io.emeraldpay.dshackle.Config
import io.emeraldpay.dshackle.FileResolver
import io.emeraldpay.dshackle.cache.Caches
import io.emeraldpay.dshackle.config.MainConfig
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.config.UpstreamsConfigReader
import io.emeraldpay.dshackle.foundation.ChainOptionsReader
import io.emeraldpay.dshackle.startup.ConfiguredUpstreams
import io.emeraldpay.dshackle.startup.UpstreamChangeEvent
import io.emeraldpay.dshackle.upstream.CurrentMultistreamHolder
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.generic.ChainSpecificRegistry
import io.emeraldpay.dshackle.upstream.generic.GenericMultistream
import io.emeraldpay.dshackle.upstream.generic.GenericUpstream
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.mockito.kotlin.any
Expand All @@ -22,9 +29,13 @@ import org.mockito.kotlin.mock
import org.mockito.kotlin.never
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
import org.springframework.cloud.sleuth.Tracer
import org.springframework.util.ResourceUtils
import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers
import sun.misc.Signal
import java.io.File
import java.util.concurrent.Executors

class ReloadConfigTest {
private val fileResolver = FileResolver(File(""))
Expand Down Expand Up @@ -82,25 +93,28 @@ class ReloadConfigTest {
mutableListOf(newConfig.upstreams[0], newConfig.upstreams[2]),
),
)
verify(msEth, never()).stop()
verify(msPoly, never()).stop()

assertEquals(3, mainConfig.upstreams!!.upstreams.size)
assertEquals(newConfig, mainConfig.upstreams)
}

@Test
fun `stop multistream if there are no upstreams left`() {
fun `stop multistream if all upstreams are removed from it`() {
val up1 = upstream("local1")
val up2 = upstream("local2")
val up3 = upstream("local3")

val msEth = mock<Multistream> {
on { getAll() } doReturn listOf(up1, up2)
on { haveUpstreams() } doReturn false
on { isRunning() } doReturn true
}
val msPoly = mock<Multistream> {
on { getAll() } doReturn listOf(up3)
}
val msEth = multistream(ETHEREUM__MAINNET)
.also {
it.processUpstreamsEvents(UpstreamChangeEvent(ETHEREUM__MAINNET, up1, UpstreamChangeEvent.ChangeType.ADDED))
it.processUpstreamsEvents(UpstreamChangeEvent(ETHEREUM__MAINNET, up2, UpstreamChangeEvent.ChangeType.ADDED))
}
val msPoly = multistream(POLYGON__MAINNET)
.also {
it.processUpstreamsEvents(UpstreamChangeEvent(POLYGON__MAINNET, up3, UpstreamChangeEvent.ChangeType.ADDED))
}

val newConfigFile = ResourceUtils.getFile("classpath:configs/upstreams-changed-upstreams-removed.yaml")
whenever(config.getConfigPath()).thenReturn(newConfigFile)
Expand All @@ -121,15 +135,15 @@ class ReloadConfigTest {

reloadConfig.handle(Signal("HUP"))

verify(msEth).processUpstreamsEvents(UpstreamChangeEvent(ETHEREUM__MAINNET, up1, UpstreamChangeEvent.ChangeType.REMOVED))
verify(msEth).processUpstreamsEvents(UpstreamChangeEvent(ETHEREUM__MAINNET, up2, UpstreamChangeEvent.ChangeType.REMOVED))
verify(configuredUpstreams).processUpstreams(
UpstreamsConfig(
newConfig.defaultOptions,
mutableListOf(),
),
)
verify(msEth).stop()

assertFalse(msEth.isRunning())
assertTrue(msPoly.isRunning())
assertEquals(1, mainConfig.upstreams!!.upstreams.size)
assertEquals(newConfig, mainConfig.upstreams)
}
Expand All @@ -153,8 +167,29 @@ class ReloadConfigTest {
assertEquals(initialConfig, mainConfig.upstreams)
}

private fun upstream(id: String): Upstream =
private fun multistream(chain: Chain): Multistream {
val cs = ChainSpecificRegistry.resolve(chain)
return GenericMultistream(
chain,
Schedulers.fromExecutor(Executors.newFixedThreadPool(6)),
null,
ArrayList(),
Caches.default(),
Schedulers.boundedElastic(),
cs.makeCachingReaderBuilder(mock<Tracer>()),
cs::localReaderBuilder,
cs.subscriptionBuilder(Schedulers.boundedElastic())
)
}

private fun upstream(id: String): GenericUpstream =
mock {
val head = mock<Head> {
on { getFlux() } doReturn Flux.empty()
}
on { getId() } doReturn id
on { getHead() } doReturn head
on { observeState() } doReturn Flux.empty()
on { observeStatus() } doReturn Flux.empty()
}
}

0 comments on commit 739d9b4

Please sign in to comment.