diff --git a/build.gradle b/build.gradle index 10a1e22a3..d4f151016 100644 --- a/build.gradle +++ b/build.gradle @@ -1,3 +1,5 @@ +import org.jetbrains.kotlin.gradle.dsl.JvmTarget + import java.time.Instant import java.time.ZoneId import java.time.format.DateTimeFormatter @@ -124,14 +126,10 @@ dependencies { } compileKotlin { - kotlinOptions { - jvmTarget = "20" - } + compilerOptions.jvmTarget.set(JvmTarget.JVM_20) } compileTestKotlin { - kotlinOptions { - jvmTarget = "20" - } + compilerOptions.jvmTarget.set(JvmTarget.JVM_20) } test { diff --git a/foundation/gradle/wrapper/gradle-wrapper.jar b/foundation/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 000000000..7454180f2 Binary files /dev/null and b/foundation/gradle/wrapper/gradle-wrapper.jar differ diff --git a/foundation/gradle/wrapper/gradle-wrapper.properties b/foundation/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 000000000..db9a6b825 --- /dev/null +++ b/foundation/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,5 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/foundation/src/main/kotlin/io/emeraldpay/dshackle/foundation/ChainOptions.kt b/foundation/src/main/kotlin/io/emeraldpay/dshackle/foundation/ChainOptions.kt index 6c83289bd..b1920892b 100644 --- a/foundation/src/main/kotlin/io/emeraldpay/dshackle/foundation/ChainOptions.kt +++ b/foundation/src/main/kotlin/io/emeraldpay/dshackle/foundation/ChainOptions.kt @@ -16,12 +16,23 @@ class ChainOptions { val validateChain: Boolean, ) - open class DefaultOptions : PartialOptions() { - var chains: List? = null + data class DefaultOptions( + var chains: List? = null, var options: PartialOptions? = null - } + ) - open class PartialOptions { + data class PartialOptions( + var disableValidation: Boolean? = null, + var disableUpstreamValidation: Boolean? = null, + var validationInterval: Int? = null, + var timeout: Duration? = null, + var providesBalance: Boolean? = null, + var validatePeers: Boolean? = null, + var validateCalllimit: Boolean? = null, + var minPeers: Int? = null, + var validateSyncing: Boolean? = null, + var validateChain: Boolean? = null + ) { companion object { @JvmStatic fun getDefaults(): PartialOptions { @@ -31,29 +42,6 @@ class ChainOptions { } } - var disableValidation: Boolean? = null - var disableUpstreamValidation: Boolean? = null - var validationInterval: Int? = null - set(value) { - require(value == null || value > 0) { - "validation-interval must be a positive number: $value" - } - field = value - } - var timeout: Duration? = null - var providesBalance: Boolean? = null - var validatePeers: Boolean? = null - var validateCalllimit: Boolean? = null - var minPeers: Int? = null - set(value) { - require(value == null || value >= 0) { - "min-peers must be a positive number: $value" - } - field = value - } - var validateSyncing: Boolean? = null - var validateChain: Boolean? = null - fun merge(overwrites: PartialOptions?): PartialOptions { if (overwrites == null) { return this diff --git a/src/main/kotlin/io/emeraldpay/dshackle/FileResolver.kt b/src/main/kotlin/io/emeraldpay/dshackle/FileResolver.kt index de1e73c90..403bf948d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/FileResolver.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/FileResolver.kt @@ -35,4 +35,6 @@ open class FileResolver( } return File(baseDir, path) } + + fun file() = baseDir } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/commons/DynamicMergeFlux.kt b/src/main/kotlin/io/emeraldpay/dshackle/commons/DynamicMergeFlux.kt index 0a51b89d2..f11432602 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/commons/DynamicMergeFlux.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/commons/DynamicMergeFlux.kt @@ -28,6 +28,7 @@ class DynamicMergeFlux(private val scheduler: Scheduler) { fun stop() { sources.forEach { (_, d) -> d.dispose() } + sources.clear() merge.emitComplete { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt index f5024fbb0..fa3ada24b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt @@ -23,21 +23,23 @@ import java.util.Arrays import java.util.Locale import java.util.concurrent.ConcurrentHashMap -open class UpstreamsConfig { - var defaultOptions: MutableList = ArrayList() - var upstreams: MutableList> = ArrayList>() - - class Upstream { - var id: String? = null - var nodeId: Int? = null - var chain: String? = null - var options: ChainOptions.PartialOptions? = null - var isEnabled = true - var connection: T? = null - val labels = Labels() - var methods: Methods? = null - var methodGroups: MethodGroups? = null - var role: UpstreamRole = UpstreamRole.PRIMARY +data class UpstreamsConfig( + var defaultOptions: MutableList = ArrayList(), + var upstreams: MutableList> = ArrayList(), +) { + + data class Upstream( + var id: String? = null, + var nodeId: Int? = null, + var chain: String? = null, + var options: ChainOptions.PartialOptions? = null, + var isEnabled: Boolean = true, + var connection: T? = null, + val labels: Labels = Labels(), + var methods: Methods? = null, + var methodGroups: MethodGroups? = null, + var role: UpstreamRole = UpstreamRole.PRIMARY, + ) { @Suppress("UNCHECKED_CAST") fun cast(type: Class): Upstream { @@ -56,9 +58,9 @@ open class UpstreamsConfig { open class UpstreamConnection - open class RpcConnection : UpstreamConnection() { - var rpc: HttpEndpoint? = null - } + open class RpcConnection( + open var rpc: HttpEndpoint? = null, + ) : UpstreamConnection() class GrpcConnection : UpstreamConnection() { var host: String? = null @@ -68,9 +70,11 @@ open class UpstreamsConfig { var upstreamRating: Int = 0 } - class EthereumConnection : RpcConnection() { - var ws: WsEndpoint? = null - var connectorMode: String? = null + data class EthereumConnection( + override var rpc: HttpEndpoint? = null, + var ws: WsEndpoint? = null, + var connectorMode: String? = null, + ) : RpcConnection(rpc) { fun resolveMode(): ConnectorMode { return if (connectorMode == null) { @@ -87,27 +91,28 @@ open class UpstreamsConfig { } } - class BitcoinConnection : RpcConnection() { - var esplora: HttpEndpoint? = null - var zeroMq: BitcoinZeroMq? = null - } + data class BitcoinConnection( + override var rpc: HttpEndpoint? = null, + var esplora: HttpEndpoint? = null, + var zeroMq: BitcoinZeroMq? = null, + ) : RpcConnection() - class EthereumPosConnection : UpstreamConnection() { - var execution: EthereumConnection? = null - var upstreamRating: Int = 0 - } + data class EthereumPosConnection( + var execution: EthereumConnection? = null, + var upstreamRating: Int = 0, + ) : UpstreamConnection() data class BitcoinZeroMq( val host: String = "127.0.0.1", val port: Int, ) - class HttpEndpoint(val url: URI) { + data class HttpEndpoint(val url: URI) { var basicAuth: AuthConfig.ClientBasicAuth? = null var tls: AuthConfig.ClientTlsAuth? = null } - class WsEndpoint(val url: URI) { + data class WsEndpoint(val url: URI) { var origin: URI? = null var basicAuth: AuthConfig.ClientBasicAuth? = null var frameSize: Int? = null @@ -159,17 +164,17 @@ open class UpstreamsConfig { } } - class Methods( + data class Methods( val enabled: Set, val disabled: Set, ) - class MethodGroups( + data class MethodGroups( val enabled: Set, val disabled: Set, ) - class Method( + data class Method( val name: String, val quorum: String? = null, val static: String? = null, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigService.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigService.kt new file mode 100644 index 000000000..e5ced0455 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigService.kt @@ -0,0 +1,28 @@ +package io.emeraldpay.dshackle.config.reload + +import io.emeraldpay.dshackle.Config +import io.emeraldpay.dshackle.FileResolver +import io.emeraldpay.dshackle.config.MainConfig +import io.emeraldpay.dshackle.config.UpstreamsConfig +import io.emeraldpay.dshackle.config.UpstreamsConfigReader +import io.emeraldpay.dshackle.foundation.ChainOptionsReader +import org.springframework.stereotype.Component + +@Component +class ReloadConfigService( + private val config: Config, + fileResolver: FileResolver, + private val mainConfig: MainConfig, + +) { + private val optionsReader = ChainOptionsReader() + private val upstreamsConfigReader = UpstreamsConfigReader(fileResolver, optionsReader) + + fun readUpstreamsConfig() = upstreamsConfigReader.read(config.getConfigPath().inputStream())!! + + fun currentUpstreamsConfig() = mainConfig.upstreams!! + + fun updateUpstreamsConfig(newConfig: UpstreamsConfig) { + mainConfig.upstreams = newConfig + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigSetup.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigSetup.kt new file mode 100644 index 000000000..7d1fcb3fe --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigSetup.kt @@ -0,0 +1,157 @@ +package io.emeraldpay.dshackle.config.reload + +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.Global.Companion.chainById +import io.emeraldpay.dshackle.config.UpstreamsConfig +import io.emeraldpay.dshackle.foundation.ChainOptions +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Component +import sun.misc.Signal +import sun.misc.SignalHandler +import java.util.concurrent.locks.ReentrantLock +import java.util.stream.Collectors + +@Component +class ReloadConfigSetup( + private val reloadConfigService: ReloadConfigService, + private val reloadConfigUpstreamService: ReloadConfigUpstreamService, +) : SignalHandler { + + companion object { + private val log = LoggerFactory.getLogger(this::class.java) + private val signalHup = Signal("HUP") + } + + private val reloadLock = ReentrantLock() + + init { + Signal.handle(signalHup, this) + } + + override fun handle(sig: Signal) { + if (sig == signalHup) { + try { + handle() + } catch (e: Exception) { + log.warn("Config is not reloaded, cause - ${e.message}", e) + } + } + } + + private fun handle() { + if (reloadLock.tryLock()) { + try { + log.info("Reloading config...") + + reloadConfig() + + log.info("Config is reloaded") + } finally { + reloadLock.unlock() + } + } else { + log.warn("Reloading is in progress") + } + } + + private fun reloadConfig() { + val newUpstreamsConfig = reloadConfigService.readUpstreamsConfig() + val currentUpstreamsConfig = reloadConfigService.currentUpstreamsConfig() + + val chainsToReload = analyzeDefaultOptions( + currentUpstreamsConfig.defaultOptions, + newUpstreamsConfig.defaultOptions, + ) + val upstreamsAnalyzeData = analyzeUpstreams( + currentUpstreamsConfig.upstreams, + newUpstreamsConfig.upstreams, + ) + + val upstreamsToRemove = upstreamsAnalyzeData.removed + .plus(upstreamsAnalyzeData.reloaded) + .filterNot { chainsToReload.contains(it.second) } + val upstreamsToAdd = upstreamsAnalyzeData.added + .plus(upstreamsAnalyzeData.reloaded.map { it.first }) + + reloadConfigUpstreamService.reloadUpstreams(chainsToReload, upstreamsToRemove, upstreamsToAdd, newUpstreamsConfig) + + reloadConfigService.updateUpstreamsConfig(newUpstreamsConfig) + } + + private fun analyzeUpstreams( + currentUpstreams: List>, + newUpstreams: List>, + ): UpstreamAnalyzeData { + if (currentUpstreams == newUpstreams) { + return UpstreamAnalyzeData() + } + val reloaded = mutableSetOf>() + val removed = mutableSetOf>() + val currentUpstreamsMap = currentUpstreams.associateBy { it.id!! to chainById(it.chain) } + val newUpstreamsMap = newUpstreams.associateBy { it.id!! to chainById(it.chain) } + + currentUpstreamsMap.forEach { + val newUpstream = newUpstreamsMap[it.key] + if (newUpstream == null) { + removed.add(it.key) + } else if (newUpstream != it.value) { + reloaded.add(it.key) + } + } + + val added = newUpstreamsMap.minus(currentUpstreamsMap.keys).mapTo(mutableSetOf()) { it.key.first } + + return UpstreamAnalyzeData(added, removed, reloaded) + } + + private fun analyzeDefaultOptions( + currentDefaultOptions: List, + newDefaultOptions: List, + ): Set { + val chainsToReload = mutableSetOf() + + val currentOptions = getChainOptions(currentDefaultOptions) + val newOptions = getChainOptions(newDefaultOptions) + + if (currentOptions == newOptions) { + return emptySet() + } + + val removed = mutableSetOf() + + currentOptions.forEach { + val newChainOption = newOptions[it.key] + if (newChainOption == null) { + removed.add(chainById(it.key)) + } else if (newChainOption != it.value) { + chainsToReload.add(chainById(it.key)) + } + } + + val added = newOptions.minus(currentOptions.keys).map { chainById(it.key) } + + return chainsToReload.plus(added).plus(removed) + } + + private fun getChainOptions( + defaultOptions: List, + ): Map> { + return defaultOptions.stream() + .flatMap { options -> options.chains?.stream()?.map { it to options.options } } + .collect( + Collectors.groupingBy( + { it.first }, + Collectors.mapping( + { it.second }, + Collectors.toUnmodifiableList(), + ), + ), + ) + } + + private data class UpstreamAnalyzeData( + val added: Set = emptySet(), + val removed: Set> = emptySet(), + val reloaded: Set> = emptySet(), + ) +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigUpstreamService.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigUpstreamService.kt new file mode 100644 index 000000000..a877782e7 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigUpstreamService.kt @@ -0,0 +1,79 @@ +package io.emeraldpay.dshackle.config.reload + +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.Global +import io.emeraldpay.dshackle.config.UpstreamsConfig +import io.emeraldpay.dshackle.startup.ConfiguredUpstreams +import io.emeraldpay.dshackle.startup.UpstreamChangeEvent +import io.emeraldpay.dshackle.upstream.CurrentMultistreamHolder +import org.springframework.context.ApplicationEventPublisher +import org.springframework.stereotype.Component + +@Component +class ReloadConfigUpstreamService( + private val eventPublisher: ApplicationEventPublisher, + private val multistreamHolder: CurrentMultistreamHolder, + private val configuredUpstreams: ConfiguredUpstreams, +) { + + fun reloadUpstreams( + chainsToReload: Set, + upstreamsToRemove: List>, + upstreamsToAdd: Set, + newUpstreamsConfig: UpstreamsConfig, + ) { + val usedChains = removeUpstreams(chainsToReload, upstreamsToRemove) + + addUpstreams(newUpstreamsConfig, chainsToReload, upstreamsToAdd) + + usedChains.forEach { + multistreamHolder.getUpstream(it) + .run { + if (!this.haveUpstreams() && this.isRunning()) { + this.stop() + } + } + } + } + + private fun removeUpstreams( + chainsToReload: Set, + upstreamsToRemove: List>, + ): Set { + val usedChains = mutableSetOf() + + chainsToReload.forEach { + usedChains.add(it) + multistreamHolder.getUpstream(it) + .getAll() + .forEach { up -> + eventPublisher.publishEvent(UpstreamChangeEvent(it, up, UpstreamChangeEvent.ChangeType.REMOVED)) + } + } + upstreamsToRemove.forEach { pair -> + usedChains.add(pair.second) + multistreamHolder.getUpstream(pair.second) + .getAll() + .find { pair.first == it.getId() } + ?.let { + eventPublisher.publishEvent(UpstreamChangeEvent(pair.second, it, UpstreamChangeEvent.ChangeType.REMOVED)) + } + } + + return usedChains + } + + private fun addUpstreams( + newUpstreamsConfig: UpstreamsConfig, + chainsToReload: Set, + upstreamsToAdd: Set, + ) { + val configToReload = UpstreamsConfig( + newUpstreamsConfig.defaultOptions, + newUpstreamsConfig.upstreams.filter { + chainsToReload.contains(Global.chainById(it.chain)) || upstreamsToAdd.contains(it.id) + }.toMutableList(), + ) + configuredUpstreams.processUpstreams(configToReload) + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt index fb3dc88e4..50e0d8eba 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt @@ -102,6 +102,10 @@ open class ConfiguredUpstreams( override fun run(args: ApplicationArguments) { log.debug("Starting upstreams") + processUpstreams(this.config) + } + + fun processUpstreams(config: UpstreamsConfig) { val defaultOptions = buildDefaultOptions(config) config.upstreams.parallelStream().forEach { up -> if (!up.isEnabled) { @@ -239,7 +243,7 @@ open class ConfiguredUpstreams( options, config.role, methods, - QuorumForLabels.QuorumItem(1, config.labels), + QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(config.labels)), connectorFactory, chainConf, true, @@ -289,7 +293,7 @@ open class ConfiguredUpstreams( ?: "bitcoin-${seq.getAndIncrement()}", chain, directApi, head, options, config.role, - QuorumForLabels.QuorumItem(1, config.labels), + QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(config.labels)), methods, esplora, chainConf, ) upstream.start() @@ -328,7 +332,7 @@ open class ConfiguredUpstreams( chain, options, config.role, methods, - QuorumForLabels.QuorumItem(1, config.labels), + QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(config.labels)), connectorFactory, chainConf, false, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/UpstreamChangeEvent.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/UpstreamChangeEvent.kt index bd66465dc..8e6b9275c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/UpstreamChangeEvent.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/UpstreamChangeEvent.kt @@ -24,7 +24,7 @@ import io.emeraldpay.dshackle.upstream.Upstream /** * An update event to the list of currently available upstreams. */ -class UpstreamChangeEvent( +data class UpstreamChangeEvent( /** * Target blockchain */ diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DynamicMergedHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DynamicMergedHead.kt index f4b676003..893ec02d6 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DynamicMergedHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DynamicMergedHead.kt @@ -9,11 +9,13 @@ import reactor.core.scheduler.Scheduler open class DynamicMergedHead( forkChoice: ForkChoice, private val label: String = "", - headScheduler: Scheduler, + private val headScheduler: Scheduler, ) : AbstractHead(forkChoice, headScheduler, upstreamId = label), Lifecycle { private var subscription: Disposable? = null - private val dynamicFlux: DynamicMergeFlux = DynamicMergeFlux(headScheduler) + + @Volatile + private var dynamicFlux: DynamicMergeFlux = DynamicMergeFlux(headScheduler) override fun isRunning(): Boolean { return subscription != null @@ -30,6 +32,7 @@ open class DynamicMergedHead( override fun stop() { super.stop() dynamicFlux.stop() + dynamicFlux = DynamicMergeFlux(headScheduler) subscription?.dispose() subscription = null } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt index 74273f600..629ea3920 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt @@ -81,7 +81,6 @@ abstract class Multistream( @Volatile private var quorumLabels: List? = null - private val removed: MutableMap = HashMap() private val meters: MutableMap> = HashMap() private val addedUpstreams = Sinks.many() .multicast() @@ -169,7 +168,6 @@ abstract class Multistream( }.also { if (it) { upstreams.add(upstream) - removed.remove(upstream.getId()) addHead(upstream) monitorUpstream(upstream) } @@ -179,7 +177,7 @@ abstract class Multistream( upstreams.removeIf { up -> (up.getId() == id).also { if (it) { - removed[id] = up + up.stop() } } }.also { @@ -392,17 +390,17 @@ abstract class Multistream( } catch (e: Exception) { log.warn("Head processing error: ${e.javaClass} ${e.message}") } - val statuses = upstreams.asSequence().plus(removed.values).map { it.getStatus() } + val statuses = upstreams.asSequence().map { it.getStatus() } .groupBy { it } .map { "${it.key.name}/${it.value.size}" } .joinToString(",") - val lag = upstreams.plus(removed.values).joinToString(", ") { + val lag = upstreams.joinToString(", ") { // by default, when no lag is available it uses Long.MAX_VALUE, and it doesn't make sense to print // status with such value. use NA (as Not Available) instead val value = it.getLag() value?.toString() ?: "NA" } - val weak = upstreams.plus(removed.values) + val weak = upstreams .filter { it.getStatus() != UpstreamAvailability.OK } .joinToString(", ") { it.getId() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt index b1752d3b8..817832349 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt @@ -22,7 +22,7 @@ import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.upstream.calls.CallMethods import reactor.core.publisher.Flux -interface Upstream { +interface Upstream : Lifecycle { fun isAvailable(): Boolean fun getStatus(): UpstreamAvailability fun observeStatus(): Flux diff --git a/src/test/groovy/io/emeraldpay/dshackle/startup/ConfiguredUpstreamsSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/startup/ConfiguredUpstreamsSpec.groovy index d09685267..c3b66f95c 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/startup/ConfiguredUpstreamsSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/startup/ConfiguredUpstreamsSpec.groovy @@ -25,7 +25,7 @@ class ConfiguredUpstreamsSpec extends Specification { def callTargetsHolder = new CallTargetsHolder() def configurer = new ConfiguredUpstreams( Stub(FileResolver), - Stub(UpstreamsConfig), + new UpstreamsConfig(), Stub(CompressionConfig), callTargetsHolder, Mock(ApplicationEventPublisher), @@ -59,7 +59,7 @@ class ConfiguredUpstreamsSpec extends Specification { def callTargetsHolder = new CallTargetsHolder() def configurer = new ConfiguredUpstreams( Stub(FileResolver), - Stub(UpstreamsConfig), + new UpstreamsConfig(), Stub(CompressionConfig), callTargetsHolder, Mock(ApplicationEventPublisher), @@ -92,7 +92,7 @@ class ConfiguredUpstreamsSpec extends Specification { def callTargetsHolder = new CallTargetsHolder() def configurer = new ConfiguredUpstreams( Stub(FileResolver), - Stub(UpstreamsConfig), + new UpstreamsConfig(), Stub(CompressionConfig), callTargetsHolder, Mock(ApplicationEventPublisher), @@ -120,7 +120,7 @@ class ConfiguredUpstreamsSpec extends Specification { def callTargetsHolder = new CallTargetsHolder() def configurer = new ConfiguredUpstreams( Stub(FileResolver), - Stub(UpstreamsConfig), + new UpstreamsConfig(), Stub(CompressionConfig), callTargetsHolder, Mock(ApplicationEventPublisher), @@ -153,7 +153,7 @@ class ConfiguredUpstreamsSpec extends Specification { def callTargetsHolder = new CallTargetsHolder() def configurer = new ConfiguredUpstreams( Stub(FileResolver), - Stub(UpstreamsConfig), + new UpstreamsConfig(), Stub(CompressionConfig), callTargetsHolder, Mock(ApplicationEventPublisher), diff --git a/src/test/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigTest.kt new file mode 100644 index 000000000..1886f26f2 --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigTest.kt @@ -0,0 +1,161 @@ +package io.emeraldpay.dshackle.config.reload + +import io.emeraldpay.dshackle.Chain.ETHEREUM__MAINNET +import io.emeraldpay.dshackle.Chain.POLYGON__MAINNET +import io.emeraldpay.dshackle.Config +import io.emeraldpay.dshackle.FileResolver +import io.emeraldpay.dshackle.config.MainConfig +import io.emeraldpay.dshackle.config.UpstreamsConfig +import io.emeraldpay.dshackle.config.UpstreamsConfigReader +import io.emeraldpay.dshackle.foundation.ChainOptionsReader +import io.emeraldpay.dshackle.startup.ConfiguredUpstreams +import io.emeraldpay.dshackle.startup.UpstreamChangeEvent +import io.emeraldpay.dshackle.upstream.CurrentMultistreamHolder +import io.emeraldpay.dshackle.upstream.Multistream +import io.emeraldpay.dshackle.upstream.Upstream +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.mockito.ArgumentCaptor +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.mock +import org.mockito.kotlin.times +import org.mockito.kotlin.verify +import org.mockito.kotlin.whenever +import org.springframework.context.ApplicationEventPublisher +import org.springframework.util.ResourceUtils +import sun.misc.Signal +import java.io.File + +class ReloadConfigTest { + private val fileResolver = FileResolver(File("")) + private val mainConfig = MainConfig() + + private val optionsReader = ChainOptionsReader() + private val upstreamsConfigReader = UpstreamsConfigReader(fileResolver, optionsReader) + + private val config = mock() + private val reloadConfigService = ReloadConfigService(config, fileResolver, mainConfig) + private val applicationEventPublisher = mock() + private val configuredUpstreams = mock() + + @BeforeEach + fun setupTests() { + mainConfig.upstreams = null + } + + @Test + fun `reload upstreams changes`() { + val up1 = upstream("local1") + val up2 = upstream("local2") + val up3 = upstream("local3") + + val msEth = mock { + on { getAll() } doReturn listOf(up1, up2) + } + val msPoly = mock { + on { getAll() } doReturn listOf(up3) + } + val newConfigFile = ResourceUtils.getFile("classpath:configs/upstreams-changed.yaml") + whenever(config.getConfigPath()).thenReturn(newConfigFile) + + val currentMultistreamHolder = mock { + on { getUpstream(ETHEREUM__MAINNET) } doReturn msEth + on { getUpstream(POLYGON__MAINNET) } doReturn msPoly + } + val reloadConfigUpstreamService = ReloadConfigUpstreamService( + applicationEventPublisher, + currentMultistreamHolder, + configuredUpstreams, + ) + val reloadConfig = ReloadConfigSetup(reloadConfigService, reloadConfigUpstreamService) + + val initialConfigIs = ResourceUtils.getFile("classpath:configs/upstreams-initial.yaml").inputStream() + val initialConfig = upstreamsConfigReader.read(initialConfigIs)!! + val newConfig = upstreamsConfigReader.read(newConfigFile.inputStream())!! + mainConfig.upstreams = initialConfig + + reloadConfig.handle(Signal("HUP")) + + val captor = ArgumentCaptor.forClass(UpstreamChangeEvent::class.java) + verify(applicationEventPublisher, times(2)).publishEvent(captor.capture()) + verify(configuredUpstreams).processUpstreams( + UpstreamsConfig( + newConfig.defaultOptions, + mutableListOf(newConfig.upstreams[0], newConfig.upstreams[2]), + ), + ) + + assertEquals(3, mainConfig.upstreams!!.upstreams.size) + assertEquals(newConfig, mainConfig.upstreams) + assertEquals( + UpstreamChangeEvent(ETHEREUM__MAINNET, up1, UpstreamChangeEvent.ChangeType.REMOVED), + captor.allValues[0], + ) + assertEquals( + UpstreamChangeEvent(POLYGON__MAINNET, up3, UpstreamChangeEvent.ChangeType.REMOVED), + captor.allValues[1], + ) + } + + @Test + fun `stop multistream of there are no upstreams left`() { + val up1 = upstream("local1") + val up2 = upstream("local2") + val up3 = upstream("local3") + + val msEth = mock { + on { getAll() } doReturn listOf(up1, up2) + on { haveUpstreams() } doReturn false + on { isRunning() } doReturn true + } + val msPoly = mock { + on { getAll() } doReturn listOf(up3) + } + + val newConfigFile = ResourceUtils.getFile("classpath:configs/upstreams-changed-upstreams-removed.yaml") + whenever(config.getConfigPath()).thenReturn(newConfigFile) + + val currentMultistreamHolder = mock { + on { getUpstream(ETHEREUM__MAINNET) } doReturn msEth + on { getUpstream(POLYGON__MAINNET) } doReturn msPoly + } + val reloadConfigUpstreamService = ReloadConfigUpstreamService( + applicationEventPublisher, + currentMultistreamHolder, + configuredUpstreams, + ) + val reloadConfig = ReloadConfigSetup(reloadConfigService, reloadConfigUpstreamService) + val initialConfigIs = ResourceUtils.getFile("classpath:configs/upstreams-initial.yaml").inputStream() + val initialConfig = upstreamsConfigReader.read(initialConfigIs)!! + val newConfig = upstreamsConfigReader.read(newConfigFile.inputStream())!! + mainConfig.upstreams = initialConfig + + reloadConfig.handle(Signal("HUP")) + + val captor = ArgumentCaptor.forClass(UpstreamChangeEvent::class.java) + verify(applicationEventPublisher, times(2)).publishEvent(captor.capture()) + verify(configuredUpstreams).processUpstreams( + UpstreamsConfig( + newConfig.defaultOptions, + mutableListOf(), + ), + ) + verify(msEth).stop() + assertEquals(1, mainConfig.upstreams!!.upstreams.size) + assertEquals(newConfig, mainConfig.upstreams) + assertEquals( + UpstreamChangeEvent(ETHEREUM__MAINNET, up1, UpstreamChangeEvent.ChangeType.REMOVED), + captor.allValues[0], + ) + assertEquals( + UpstreamChangeEvent(ETHEREUM__MAINNET, up2, UpstreamChangeEvent.ChangeType.REMOVED), + captor.allValues[1], + ) + } + + private fun upstream(id: String): Upstream = + mock { + on { getId() } doReturn id + } +} diff --git a/src/test/resources/configs/upstreams-changed-upstreams-removed.yaml b/src/test/resources/configs/upstreams-changed-upstreams-removed.yaml new file mode 100644 index 000000000..67ff9b421 --- /dev/null +++ b/src/test/resources/configs/upstreams-changed-upstreams-removed.yaml @@ -0,0 +1,13 @@ +cluster: + upstreams: + - id: local3 + chain: polygon + labels: + provider: provider1 + connection: + ethereum-pos: + execution: + rpc: + url: "http://localhost" + ws: + url: "ws://localhost" \ No newline at end of file diff --git a/src/test/resources/configs/upstreams-changed.yaml b/src/test/resources/configs/upstreams-changed.yaml new file mode 100644 index 000000000..7adb44ada --- /dev/null +++ b/src/test/resources/configs/upstreams-changed.yaml @@ -0,0 +1,34 @@ +cluster: + upstreams: + - id: local1 + chain: ethereum + labels: + provider: provider1 + connection: + ethereum-pos: + execution: + rpc: + url: "http://localhost" + - id: local2 + chain: ethereum + labels: + provider: provider1 + connection: + ethereum-pos: + execution: + rpc: + url: "http://localhost" + ws: + url: "ws://localhost" + - id: local3 + chain: polygon + labels: + new: label + provider: provider1 + connection: + ethereum-pos: + execution: + rpc: + url: "http://localhost" + ws: + url: "ws://localhost" \ No newline at end of file diff --git a/src/test/resources/configs/upstreams-initial.yaml b/src/test/resources/configs/upstreams-initial.yaml new file mode 100644 index 000000000..d5ec41efb --- /dev/null +++ b/src/test/resources/configs/upstreams-initial.yaml @@ -0,0 +1,35 @@ +cluster: + upstreams: + - id: local1 + chain: ethereum + labels: + provider: provider1 + connection: + ethereum-pos: + execution: + rpc: + url: "http://localhost" + ws: + url: "ws://localhost" + - id: local2 + chain: ethereum + labels: + provider: provider1 + connection: + ethereum-pos: + execution: + rpc: + url: "http://localhost" + ws: + url: "ws://localhost" + - id: local3 + chain: polygon + labels: + provider: provider1 + connection: + ethereum-pos: + execution: + rpc: + url: "http://localhost" + ws: + url: "ws://localhost" \ No newline at end of file