diff --git a/src/main/kotlin/io/emeraldpay/dshackle/cache/CachesFactory.kt b/src/main/kotlin/io/emeraldpay/dshackle/cache/CachesFactory.kt index 9eff19f75..81ff29419 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/cache/CachesFactory.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/cache/CachesFactory.kt @@ -97,7 +97,6 @@ open class CachesFactory( caches.setReceipts(ReceiptRedisCache(redis.reactive(), chain)) caches.setHeightByHash(HeightByHashRedisCache(redis.reactive(), chain)) } - caches.setCacheEnabled(cacheConfig.requestsCacheEnabled) return caches.build() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/CacheConfig.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/CacheConfig.kt index 3c681de22..3ce10690d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/CacheConfig.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/CacheConfig.kt @@ -17,8 +17,6 @@ package io.emeraldpay.dshackle.config class CacheConfig { - var requestsCacheEnabled = true - var redis: Redis? = null class Redis( diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/CacheConfigReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/CacheConfigReader.kt index 593d7b8b7..9e7c43aea 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/CacheConfigReader.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/CacheConfigReader.kt @@ -16,21 +16,13 @@ package io.emeraldpay.dshackle.config import io.emeraldpay.dshackle.foundation.YamlConfigReader -import org.slf4j.LoggerFactory import org.yaml.snakeyaml.nodes.MappingNode class CacheConfigReader : YamlConfigReader() { - companion object { - private val log = LoggerFactory.getLogger(CacheConfigReader::class.java) - } - override fun read(input: MappingNode?): CacheConfig? { return getMapping(input, "cache")?.let { node -> val config = CacheConfig() - getValueAsBool(node, "requests-cache-enabled")?.let { - config.requestsCacheEnabled = it - } getMapping(node, "redis")?.let { redisNode -> val redis = CacheConfig.Redis() val enabled = getValueAsBool(redisNode, "enabled") ?: true @@ -50,7 +42,7 @@ class CacheConfigReader : YamlConfigReader() { config.redis = redis } } - if (config.redis == null && config.requestsCacheEnabled) { + if (config.redis == null) { return null } config diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/context/MultistreamsConfig.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/context/MultistreamsConfig.kt index 0d7827ffd..98c77954b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/context/MultistreamsConfig.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/context/MultistreamsConfig.kt @@ -2,16 +2,12 @@ package io.emeraldpay.dshackle.config.context import io.emeraldpay.dshackle.BlockchainType import io.emeraldpay.dshackle.BlockchainType.BITCOIN -import io.emeraldpay.dshackle.BlockchainType.EVM_POS -import io.emeraldpay.dshackle.BlockchainType.EVM_POW -import io.emeraldpay.dshackle.BlockchainType.STARKNET import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.cache.CachesFactory import io.emeraldpay.dshackle.upstream.CallTargetsHolder import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinMultistream -import io.emeraldpay.dshackle.upstream.ethereum.EthereumMultistream -import io.emeraldpay.dshackle.upstream.ethereum.EthereumPosMultiStream +import io.emeraldpay.dshackle.upstream.generic.ChainSpecificRegistry import io.emeraldpay.dshackle.upstream.generic.GenericMultistream import org.springframework.beans.factory.annotation.Qualifier import org.springframework.beans.factory.config.ConfigurableListableBeanFactory @@ -31,14 +27,13 @@ open class MultistreamsConfig(val beanFactory: ConfigurableListableBeanFactory) headScheduler: Scheduler, tracer: Tracer, ): List { - return Chain.values() + return Chain.entries .filterNot { it == Chain.UNSPECIFIED } .map { chain -> - when (BlockchainType.from(chain)) { - EVM_POS -> ethereumPosMultistream(chain, cachesFactory, headScheduler, tracer) - EVM_POW -> ethereumMultistream(chain, cachesFactory, headScheduler, tracer) - BITCOIN -> bitcoinMultistream(chain, cachesFactory, headScheduler) - STARKNET -> genericMultistream(chain, cachesFactory, headScheduler) + if (BlockchainType.from(chain) == BITCOIN) { + bitcoinMultistream(chain, cachesFactory, headScheduler) + } else { + genericMultistream(chain, cachesFactory, headScheduler, tracer) } } } @@ -47,47 +42,18 @@ open class MultistreamsConfig(val beanFactory: ConfigurableListableBeanFactory) chain: Chain, cachesFactory: CachesFactory, headScheduler: Scheduler, + tracer: Tracer, ): Multistream { val name = "multi-$chain" + val cs = ChainSpecificRegistry.resolve(chain) return GenericMultistream( chain, CopyOnWriteArrayList(), cachesFactory.getCaches(chain), headScheduler, - ).also { register(it, name) } - } - - private fun ethereumMultistream( - chain: Chain, - cachesFactory: CachesFactory, - headScheduler: Scheduler, - tracer: Tracer, - ): EthereumMultistream { - val name = "multi-ethereum-$chain" - - return EthereumMultistream( - chain, - CopyOnWriteArrayList(), - cachesFactory.getCaches(chain), - headScheduler, - tracer, - ).also { register(it, name) } - } - - open fun ethereumPosMultistream( - chain: Chain, - cachesFactory: CachesFactory, - headScheduler: Scheduler, - tracer: Tracer, - ): EthereumPosMultiStream { - val name = "multi-ethereum-pos-$chain" - - return EthereumPosMultiStream( - chain, - CopyOnWriteArrayList(), - cachesFactory.getCaches(chain), - headScheduler, - tracer, + cs.makeCachingReaderBuilder(tracer), + cs::localReaderBuilder, + cs.subscriptionBuilder(headScheduler), ).also { register(it, name) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt index 536bdec9e..0c5e1a247 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt @@ -76,7 +76,6 @@ open class NativeCall( private val log = LoggerFactory.getLogger(NativeCall::class.java) private val objectMapper: ObjectMapper = Global.objectMapper - private val localRouterEnabled = config.cache?.requestsCacheEnabled ?: true private val passthrough = config.passthrough var rpcReaderFactory: RpcReaderFactory = RpcReaderFactory.default() @@ -359,7 +358,7 @@ open class NativeCall( if (method in DefaultEthereumMethods.newFilterMethods) CreateFilterDecorator() else NoneResultDecorator() fun fetch(ctx: ValidCallContext): Mono { - return ctx.upstream.getLocalReader(localRouterEnabled) + return ctx.upstream.getLocalReader() .flatMap { api -> SpannedReader(api, tracer, LOCAL_READER) .read(JsonRpcRequest(ctx.payload.method, ctx.payload.params, ctx.nonce, ctx.forwardedSelector)) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt index 2c15e3455..d19b1e95d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt @@ -21,8 +21,6 @@ import com.google.common.annotations.VisibleForTesting import io.emeraldpay.dshackle.BlockchainType import io.emeraldpay.dshackle.BlockchainType.BITCOIN import io.emeraldpay.dshackle.BlockchainType.EVM_POS -import io.emeraldpay.dshackle.BlockchainType.EVM_POW -import io.emeraldpay.dshackle.BlockchainType.STARKNET import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.FileResolver @@ -52,13 +50,12 @@ import io.emeraldpay.dshackle.upstream.bitcoin.ExtractBlock import io.emeraldpay.dshackle.upstream.bitcoin.ZMQServer import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.calls.ManagedCallMethods -import io.emeraldpay.dshackle.upstream.ethereum.EthereumBlockValidator -import io.emeraldpay.dshackle.upstream.ethereum.EthereumLikeRpcUpstream import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionFactory import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionPoolFactory import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice import io.emeraldpay.dshackle.upstream.forkchoice.MostWorkForkChoice import io.emeraldpay.dshackle.upstream.forkchoice.NoChoiceWithPriorityForkChoice +import io.emeraldpay.dshackle.upstream.generic.ChainSpecificRegistry import io.emeraldpay.dshackle.upstream.generic.GenericUpstream import io.emeraldpay.dshackle.upstream.generic.connectors.GenericConnectorFactory import io.emeraldpay.dshackle.upstream.generic.connectors.GenericConnectorFactory.ConnectorMode.RPC_REQUESTS_WITH_MIXED_HEAD @@ -75,6 +72,7 @@ import org.springframework.context.ApplicationEventPublisher import org.springframework.stereotype.Component import reactor.core.scheduler.Scheduler import reactor.core.scheduler.Schedulers +import java.lang.IllegalStateException import java.net.URI import java.util.concurrent.Executor import java.util.concurrent.Executors @@ -138,40 +136,38 @@ open class ConfiguredUpstreams( .merge(up.options ?: ChainOptions.PartialOptions()) .buildOptions() val upstream = when (BlockchainType.from(chain)) { - EVM_POS -> { - buildEthereumPosUpstream( - up.nodeId, - up.cast(EthereumPosConnection::class.java), + BITCOIN -> { + buildBitcoinUpstream( + up.cast(BitcoinConnection::class.java), chain, options, chainConfig, ) } - EVM_POW -> { - buildEthereumUpstream( + + EVM_POS -> { + val posConn = up.cast(EthereumPosConnection::class.java) + + buildGenericUpstream( up.nodeId, - up.cast(RpcConnection::class.java), - chain, - options, - chainConfig, - ) - } - BITCOIN -> { - buildBitcoinUpstream( - up.cast(BitcoinConnection::class.java), + up, + posConn.connection?.execution ?: throw IllegalStateException("Empty execution config"), chain, options, chainConfig, + posConn.connection?.upstreamRating ?: 0, ) } - STARKNET -> { - buildStarknetUpstream( + else -> { + buildGenericUpstream( up.nodeId, - up.cast(RpcConnection::class.java), + up, + up.connection as RpcConnection, chain, options, chainConfig, + 0, ) } } @@ -224,31 +220,30 @@ open class ConfiguredUpstreams( } } - private fun buildStarknetUpstream( + private fun buildGenericUpstream( nodeId: Int?, - config: UpstreamsConfig.Upstream, + config: UpstreamsConfig.Upstream<*>, + connection: RpcConnection, chain: Chain, options: Options, chainConfig: ChainConfig, + nodeRating: Int, ): Upstream? { if (config.connection == null) { log.warn("Upstream doesn't have connection configuration") return null } - val connection = config.connection!! + val cs = ChainSpecificRegistry.resolve(chain) val connectorFactory = buildConnectorFactory( config.id!!, connection, chain, - NoChoiceWithPriorityForkChoice(0, config.id!!), + NoChoiceWithPriorityForkChoice(nodeRating, config.id!!), BlockValidator.ALWAYS_VALID, chainConfig, - ) - if (connectorFactory == null) { - return null - } + ) ?: return null val methods = buildMethods(config, chain) @@ -268,6 +263,9 @@ open class ConfiguredUpstreams( chainConfig, connectorFactory, eventPublisher, + cs::validator, + cs::labelDetector, + cs::subscriptionTopics, ) upstream.start() @@ -278,58 +276,6 @@ open class ConfiguredUpstreams( return upstream } - private fun buildEthereumPosUpstream( - nodeId: Int?, - config: UpstreamsConfig.Upstream, - chain: Chain, - options: Options, - chainConf: ChainConfig, - ): Upstream? { - val conn = config.connection!! - val execution = conn.execution - if (execution == null) { - log.warn("Upstream doesn't have execution layer configuration") - return null - } - val urls = ArrayList() - val connectorFactory = buildConnectorFactory( - config.id!!, - execution, - chain, - NoChoiceWithPriorityForkChoice(conn.upstreamRating, config.id!!), - BlockValidator.ALWAYS_VALID, - chainConf, - ) - val methods = buildMethods(config, chain) - if (connectorFactory == null) { - return null - } - - val hashUrl = conn.execution!!.let { - if (it.connectorMode == RPC_REQUESTS_WITH_MIXED_HEAD.name) it.rpc?.url ?: it.ws?.url else it.ws?.url ?: it.rpc?.url - } - val hash = getHash(nodeId, hashUrl!!) - val upstream = EthereumLikeRpcUpstream( - config.id!!, - hash, - chain, - options, - config.role, - methods, - QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(config.labels)), - connectorFactory, - chainConf, - true, - eventPublisher, - ) - upstream.start() - if (!upstream.isRunning) { - log.debug("Upstream ${upstream.getId()} is not running, it can't be added") - return null - } - return upstream - } - private fun buildBitcoinUpstream( config: UpstreamsConfig.Upstream, chain: Chain, @@ -373,45 +319,6 @@ open class ConfiguredUpstreams( return upstream } - private fun buildEthereumUpstream( - nodeId: Int?, - config: UpstreamsConfig.Upstream, - chain: Chain, - options: Options, - chainConf: ChainConfig, - ): Upstream? { - val conn = config.connection!! - val methods = buildMethods(config, chain) - - val connectorFactory = buildConnectorFactory( - config.id!!, - conn, - chain, - MostWorkForkChoice(), - EthereumBlockValidator(), - chainConf, - ) - if (connectorFactory == null) { - return null - } - - val hashUrl = if (conn.connectorMode == RPC_REQUESTS_WITH_MIXED_HEAD.name) conn.rpc?.url ?: conn.ws?.url else conn.ws?.url ?: conn.rpc?.url - val upstream = EthereumLikeRpcUpstream( - config.id!!, - getHash(nodeId, hashUrl!!), - chain, - options, config.role, - methods, - QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(config.labels)), - connectorFactory, - chainConf, - false, - eventPublisher, - ) - upstream.start() - return upstream - } - private fun buildGrpcUpstream( nodeId: Int?, config: UpstreamsConfig.Upstream, @@ -460,13 +367,13 @@ open class ConfiguredUpstreams( private fun buildHttpFactory(conn: HttpEndpoint?, urls: ArrayList? = null): HttpRpcFactory? { return conn?.let { endpoint -> - val tls = conn?.tls?.let { tls -> + val tls = conn.tls?.let { tls -> tls.ca?.let { ca -> fileResolver.resolve(ca).readBytes() } } urls?.add(endpoint.url) - HttpRpcFactory(endpoint.url.toString(), conn?.basicAuth, tls) + HttpRpcFactory(endpoint.url.toString(), conn.basicAuth, tls) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractChainFees.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractChainFees.kt deleted file mode 100644 index 11ec0dd27..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractChainFees.kt +++ /dev/null @@ -1,131 +0,0 @@ -package io.emeraldpay.dshackle.upstream - -import com.github.benmanes.caffeine.cache.Caffeine -import io.emeraldpay.api.proto.BlockchainOuterClass -import org.slf4j.LoggerFactory -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import java.time.Duration -import java.util.EnumMap -import java.util.function.Function - -abstract class AbstractChainFees( - private val heightLimit: Int, - private val upstreams: Multistream, - extractTx: (B) -> List?, -) : ChainFees { - - companion object { - private val log = LoggerFactory.getLogger(AbstractChainFees::class.java) - } - - private val txSource = EnumMap>(ChainFees.Mode::class.java) - - init { - txSource[ChainFees.Mode.AVG_TOP] = TxAtTop(extractTx) - txSource[ChainFees.Mode.MIN_ALWAYS] = TxAtBottom(extractTx) - txSource[ChainFees.Mode.AVG_MIDDLE] = TxAtMiddle(extractTx) - txSource[ChainFees.Mode.AVG_LAST] = TxAtBottom(extractTx) - txSource[ChainFees.Mode.AVG_T5] = TxAtPos(extractTx, 5) - txSource[ChainFees.Mode.AVG_T20] = TxAtPos(extractTx, 20) - txSource[ChainFees.Mode.AVG_T50] = TxAtPos(extractTx, 50) - } - - override fun estimate(mode: ChainFees.Mode, blocks: Int): Mono { - return usingBlocks(blocks) - .flatMap { readFeesAt(it, mode) } - .transform(feeAggregation(mode)) - .next() - .map(getResponseBuilder()) - } - - // --- - - private val feeCache = Caffeine.newBuilder() - .expireAfterWrite(Duration.ofMinutes(60)) - .build, F>() - - fun usingBlocks(exp: Int): Flux { - val useBlocks = exp.coerceAtMost(heightLimit).coerceAtLeast(1) - - val height = upstreams.getHead().getCurrentHeight() - ?: return Mono.fromCallable { log.warn("Upstream is not ready. No current height") }.thenMany(Mono.empty()) // TODO or throw an exception to build a gRPC error? - val startBlock: Int = height.toInt() - useBlocks + 1 - if (startBlock < 0) { - log.warn("Blockchain doesn't have enough blocks. Height: $height") - return Flux.empty() - } - - return Flux.range(startBlock, useBlocks).map { it.toLong() } - } - - fun readFeesAt(height: Long, mode: ChainFees.Mode): Mono { - val current = feeCache.getIfPresent(Pair(height, mode)) - if (current != null) { - return Mono.just(current) - } - val txSelector = txSourceFor(mode) - return readFeesAt(height, txSelector).doOnNext { - // TODO it may be EMPTY for some blocks (ex. a no tx block), so nothing gets cached and goes to do the same call each time. so do cache empty values to avoid useless requests - feeCache.put(Pair(height, mode), it!!) - } - } - - open fun txSourceFor(mode: ChainFees.Mode): TxAt { - return txSource[mode] ?: throw IllegalStateException("No TS Source for mode $mode") - } - - abstract fun readFeesAt(height: Long, selector: TxAt): Mono - abstract fun feeAggregation(mode: ChainFees.Mode): Function, Mono> - abstract fun getResponseBuilder(): Function - - abstract class TxAt(private val extractTx: Function?>) { - fun get(block: B): TR? { - val txes = extractTx.apply(block) ?: return null - return get(txes) - } - - abstract fun get(transactions: List): TR? - } - - class TxAtPos(extractTx: Function?>, private val pos: Int) : TxAt(extractTx) { - - override fun get(transactions: List): TR? { - val index = pos.coerceAtMost(transactions.size - 1) - if (index < 0) { - return null - } - return transactions[transactions.size - index - 1] - } - } - - class TxAtTop(extractTx: Function?>) : TxAt(extractTx) { - override fun get(transactions: List): TR? { - if (transactions.isEmpty()) { - return null - } - return transactions[0] - } - } - - class TxAtBottom(extractTx: Function?>) : TxAt(extractTx) { - override fun get(transactions: List): TR? { - if (transactions.isEmpty()) { - return null - } - return transactions.last() - } - } - - class TxAtMiddle(extractTx: Function?>) : TxAt(extractTx) { - override fun get(transactions: List): TR? { - if (transactions.isEmpty()) { - return null - } - if (transactions.size == 1) { - return transactions[0] - } - return transactions[transactions.size / 2] - } - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/CachingReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/CachingReader.kt index ce67e5ee7..c9e0030f4 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/CachingReader.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/CachingReader.kt @@ -1,3 +1,15 @@ package io.emeraldpay.dshackle.upstream -interface CachingReader +interface CachingReader : Lifecycle + +object NoopCachingReader : CachingReader { + override fun start() { + } + + override fun stop() { + } + + override fun isRunning(): Boolean { + return true + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ChainFees.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ChainFees.kt deleted file mode 100644 index 9cb860920..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ChainFees.kt +++ /dev/null @@ -1,35 +0,0 @@ -package io.emeraldpay.dshackle.upstream - -import io.emeraldpay.api.proto.BlockchainOuterClass -import reactor.core.publisher.Mono - -interface ChainFees { - - companion object { - fun extractMode(req: BlockchainOuterClass.EstimateFeeRequest): Mode? { - return when (req.mode!!) { - BlockchainOuterClass.FeeEstimationMode.INVALID -> null - BlockchainOuterClass.FeeEstimationMode.AVG_LAST -> Mode.AVG_LAST - BlockchainOuterClass.FeeEstimationMode.AVG_T5 -> Mode.AVG_T5 - BlockchainOuterClass.FeeEstimationMode.AVG_T20 -> Mode.AVG_T20 - BlockchainOuterClass.FeeEstimationMode.AVG_T50 -> Mode.AVG_T50 - BlockchainOuterClass.FeeEstimationMode.MIN_ALWAYS -> Mode.MIN_ALWAYS - BlockchainOuterClass.FeeEstimationMode.AVG_MIDDLE -> Mode.AVG_MIDDLE - BlockchainOuterClass.FeeEstimationMode.AVG_TOP -> Mode.AVG_TOP - BlockchainOuterClass.FeeEstimationMode.UNRECOGNIZED -> null - } - } - } - - fun estimate(mode: Mode, blocks: Int): Mono - - enum class Mode { - AVG_LAST, - AVG_T5, - AVG_T20, - AVG_T50, - MIN_ALWAYS, - AVG_MIDDLE, - AVG_TOP, - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/EgressSubscription.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/EgressSubscription.kt index 4c83c4de3..c46612a29 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/EgressSubscription.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/EgressSubscription.kt @@ -22,7 +22,7 @@ interface EgressSubscription { fun subscribe(topic: String, params: Any?, matcher: Selector.Matcher): Flux } -class EmptyEgressSubscription : EgressSubscription { +object EmptyEgressSubscription : EgressSubscription { override fun getAvailableTopics(): List { return emptyList() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/LabelsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/LabelsDetector.kt new file mode 100644 index 000000000..d9884bd6a --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/LabelsDetector.kt @@ -0,0 +1,10 @@ +package io.emeraldpay.dshackle.upstream + +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.reader.JsonRpcReader +import reactor.core.publisher.Flux + +typealias LabelsDetectorBuilder = (Chain, JsonRpcReader) -> LabelsDetector? +interface LabelsDetector { + fun detectLabels(): Flux> +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt index 5552619fc..215500cb9 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt @@ -51,10 +51,12 @@ import kotlin.concurrent.withLock */ abstract class Multistream( val chain: Chain, - private val upstreams: MutableList, val caches: Caches, ) : Upstream, Lifecycle, HasEgressSubscription { + abstract fun getUpstreams(): MutableList + abstract fun addUpstreamInternal(u: Upstream) + companion object { private const val metrics = "upstreams" } @@ -93,30 +95,6 @@ abstract class Multistream( .multicast() .directBestEffort() - init { - UpstreamAvailability.values().forEach { status -> - Metrics.gauge( - "$metrics.availability", - listOf(Tag.of("chain", chain.chainCode), Tag.of("status", status.name.lowercase())), - this, - ) { - upstreams.count { it.getStatus() == status }.toDouble() - } - } - - Metrics.gauge( - "$metrics.connected", - listOf(Tag.of("chain", chain.chainCode)), - this, - ) { - upstreams.size.toDouble() - } - - upstreams.forEach { up -> - monitorUpstream(up) - } - } - override fun getSubscriptionTopics(): List { return getEgressSubscription().getAvailableTopics() } @@ -157,25 +135,25 @@ abstract class Multistream( * Get list of all underlying upstreams */ open fun getAll(): List { - return upstreams + return getUpstreams() } /** * Add an upstream */ fun addUpstream(upstream: Upstream): Boolean = - upstreams.none { + getUpstreams().none { it.getId() == upstream.getId() }.also { if (it) { - upstreams.add(upstream) + addUpstreamInternal(upstream) addHead(upstream) monitorUpstream(upstream) } } fun removeUpstream(id: String): Boolean = - upstreams.removeIf { up -> + getUpstreams().removeIf { up -> (up.getId() == id).also { if (it) { up.stop() @@ -196,13 +174,13 @@ abstract class Multistream( if (seq >= Int.MAX_VALUE / 2) { seq = 0 } - return FilteredApis(chain, upstreams, matcher, i) + return FilteredApis(chain, getUpstreams(), matcher, i) } /** * Finds an API that leverages caches and other optimizations/transformations of the request. */ - abstract fun getLocalReader(localEnabled: Boolean): Mono + abstract fun getLocalReader(): Mono override fun getIngressReader(): JsonRpcReader { throw NotImplementedError("Immediate direct API is not implemented for Aggregated Upstream") @@ -235,6 +213,7 @@ abstract class Multistream( lagObserver = null upstreams[0].setLag(0) } + upstreams.size > 1 -> if (lagObserver == null) lagObserver = makeLagObserver() } } @@ -389,17 +368,17 @@ abstract class Multistream( } catch (e: Exception) { log.warn("Head processing error: ${e.javaClass} ${e.message}") } - val statuses = upstreams.asSequence().map { it.getStatus() } + val statuses = getUpstreams().asSequence().map { it.getStatus() } .groupBy { it } .map { "${it.key.name}/${it.value.size}" } .joinToString(",") - val lag = upstreams.joinToString(", ") { + val lag = getUpstreams().joinToString(", ") { // by default, when no lag is available it uses Long.MAX_VALUE, and it doesn't make sense to print // status with such value. use NA (as Not Available) instead val value = it.getLag() value?.toString() ?: "NA" } - val weak = upstreams + val weak = getUpstreams() .filter { it.getStatus() != UpstreamAvailability.OK } .joinToString(", ") { it.getId() } @@ -424,6 +403,7 @@ abstract class Multistream( onUpstreamsUpdated() updateUpstreams.emitNext(event.upstream) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } } + UpstreamChangeEvent.ChangeType.ADDED -> { if (!started) { start() @@ -441,6 +421,7 @@ abstract class Multistream( } } } + UpstreamChangeEvent.ChangeType.REMOVED -> { removeUpstream(event.upstream.getId()).takeIf { it }?.let { try { @@ -458,10 +439,10 @@ abstract class Multistream( } fun haveUpstreams(): Boolean = - upstreams.isNotEmpty() + getUpstreams().isNotEmpty() fun hasMatchingUpstream(matcher: Selector.LabelSelectorMatcher): Boolean { - return upstreams.any { matcher.matches(it) } + return getUpstreams().any { matcher.matches(it) } } fun subscribeAddedUpstreams(): Flux = @@ -469,12 +450,16 @@ abstract class Multistream( fun subscribeRemovedUpstreams(): Flux = removedUpstreams.asFlux() + fun subscribeUpdatedUpstreams(): Flux = updateUpstreams.asFlux() abstract fun makeLagObserver(): HeadLagObserver - open fun tryProxySubscribe(matcher: Selector.Matcher, request: BlockchainOuterClass.NativeSubscribeRequest): Flux? = null + open fun tryProxySubscribe( + matcher: Selector.Matcher, + request: BlockchainOuterClass.NativeSubscribeRequest, + ): Flux? = null abstract fun getCachingReader(): CachingReader? diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamValidator.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamValidator.kt new file mode 100644 index 000000000..995ad5e18 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/UpstreamValidator.kt @@ -0,0 +1,25 @@ +package io.emeraldpay.dshackle.upstream + +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig +import io.emeraldpay.dshackle.foundation.ChainOptions +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +typealias UpstreamValidatorBuilder = (Chain, Upstream, ChainOptions.Options, ChainConfig) -> UpstreamValidator? + +interface UpstreamValidator { + fun start(): Flux + + fun validateUpstreamSettings(): Mono + + fun validateUpstreamSettingsOnStartup(): ValidateUpstreamSettingsResult { + return validateUpstreamSettings().block() ?: ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR + } +} + +enum class ValidateUpstreamSettingsResult { + UPSTREAM_VALID, + UPSTREAM_SETTINGS_ERROR, + UPSTREAM_FATAL_SETTINGS_ERROR, +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinMultistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinMultistream.kt index 4e5ef5791..03f2bcdb9 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinMultistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinMultistream.kt @@ -43,7 +43,7 @@ open class BitcoinMultistream( private val sourceUpstreams: MutableList, caches: Caches, private val headScheduler: Scheduler, -) : Multistream(chain, sourceUpstreams as MutableList, caches), Lifecycle { +) : Multistream(chain, caches), Lifecycle { private var head: Head = EmptyHead() private var esplora = sourceUpstreams.find { it.esploraClient != null }?.esploraClient @@ -51,6 +51,13 @@ open class BitcoinMultistream( private var addressActiveCheck: AddressActiveCheck? = null private var xpubAddresses: XpubAddresses? = null private var callRouter: LocalCallRouter = LocalCallRouter(DefaultBitcoinMethods(), reader) + override fun getUpstreams(): MutableList { + return sourceUpstreams + } + + override fun addUpstreamInternal(u: Upstream) { + sourceUpstreams.add(u as BitcoinUpstream) + } override fun init() { if (sourceUpstreams.size > 0) { @@ -59,11 +66,6 @@ open class BitcoinMultistream( super.init() } - open val upstreams: List - get() { - return sourceUpstreams - } - open fun getXpubAddresses(): XpubAddresses? { return xpubAddresses } @@ -103,7 +105,7 @@ open class BitcoinMultistream( .switchIfEmpty(Mono.error(Exception("No API available for $chain"))) } - override fun getLocalReader(localEnabled: Boolean): Mono { + override fun getLocalReader(): Mono { return Mono.just(callRouter) } @@ -146,7 +148,7 @@ open class BitcoinMultistream( } override fun getEgressSubscription(): EgressSubscription { - return EmptyEgressSubscription() + return EmptyEgressSubscription } override fun isRunning(): Boolean { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinReader.kt index f7c99796b..d076ea9b0 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinReader.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinReader.kt @@ -44,7 +44,7 @@ open class BitcoinReader( private val unspentReader: UnspentReader = if (esploraClient != null) { EsploraUnspentReader(esploraClient, head) - } else if (upstreams.upstreams.any { it.isGrpc() && it.getCapabilities().contains(Capability.BALANCE) }) { + } else if (upstreams.getUpstreams().any { it.isGrpc() && it.getCapabilities().contains(Capability.BALANCE) }) { RemoteUnspentReader(upstreams) } else { RpcUnspentReader(upstreams) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/ERC20Balance.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/ERC20Balance.kt deleted file mode 100644 index cf8892795..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/ERC20Balance.kt +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Copyright (c) 2021 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.upstream.ethereum - -import io.emeraldpay.dshackle.upstream.ApiSource -import io.emeraldpay.dshackle.upstream.Head -import io.emeraldpay.dshackle.upstream.Selector -import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest -import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse -import io.emeraldpay.etherjar.domain.Address -import io.emeraldpay.etherjar.erc20.ERC20Token -import io.emeraldpay.etherjar.hex.Hex32 -import io.emeraldpay.etherjar.hex.HexQuantity -import org.slf4j.LoggerFactory -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import java.math.BigInteger - -/** - * Query for a ERC20 token balance for an address - */ -open class ERC20Balance { - - companion object { - private val log = LoggerFactory.getLogger(ERC20Balance::class.java) - } - - open fun getBalance(upstreams: EthereumPosMultiStream, token: ERC20Token, address: Address): Mono { - return upstreams - // use only up-to-date upstreams - .getApiSource(Selector.HeightMatcher(upstreams.getHead().getCurrentHeight() ?: 0)) - .let { getBalance(it, token, address) } - } - - open fun getBalance(apis: ApiSource, token: ERC20Token, address: Address): Mono { - apis.request(1) - return Flux.from(apis) - .flatMap { - getBalance(it.cast(EthereumLikeRpcUpstream::class.java), token, address) - } - .doOnNext { - apis.resolve() - } - .next() - } - - open fun getBalance(upstream: EthereumLikeRpcUpstream, token: ERC20Token, address: Address): Mono { - return upstream - .getIngressReader() - .read(prepareEthCall(token, address, upstream.getHead())) - .flatMap(JsonRpcResponse::requireStringResult) - .map { Hex32.from(it).asQuantity().value } - } - - fun prepareEthCall(token: ERC20Token, target: Address, head: Head): JsonRpcRequest { - val call = token - .readBalanceOf(target) - .toJson() - val height = head.getCurrentHeight()?.let { HexQuantity.from(it).toHex() } ?: "latest" - return JsonRpcRequest("eth_call", listOf(call, height)) - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReader.kt index 4af3d69ee..7a5066507 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReader.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReader.kt @@ -34,7 +34,6 @@ import io.emeraldpay.dshackle.reader.RekeyingReader import io.emeraldpay.dshackle.reader.SpannedReader import io.emeraldpay.dshackle.reader.TransformingReader import io.emeraldpay.dshackle.upstream.CachingReader -import io.emeraldpay.dshackle.upstream.Lifecycle import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.ethereum.EthereumDirectReader.Result @@ -58,7 +57,7 @@ open class EthereumCachingReader( private val caches: Caches, callMethodsFactory: Factory, private val tracer: Tracer, -) : Lifecycle, CachingReader { +) : CachingReader { private val objectMapper: ObjectMapper = Global.objectMapper private val balanceCache = CurrentBlockCache() diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt index 557c93161..e28cfd720 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt @@ -1,9 +1,29 @@ package io.emeraldpay.dshackle.upstream.ethereum +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.foundation.ChainOptions.Options +import io.emeraldpay.dshackle.reader.JsonRpcReader +import io.emeraldpay.dshackle.upstream.CachingReader +import io.emeraldpay.dshackle.upstream.Capability +import io.emeraldpay.dshackle.upstream.EgressSubscription +import io.emeraldpay.dshackle.upstream.EmptyEgressSubscription +import io.emeraldpay.dshackle.upstream.Head +import io.emeraldpay.dshackle.upstream.LabelsDetector +import io.emeraldpay.dshackle.upstream.Multistream +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.UpstreamValidator +import io.emeraldpay.dshackle.upstream.calls.CallMethods +import io.emeraldpay.dshackle.upstream.ethereum.subscribe.EthereumLabelsDetector +import io.emeraldpay.dshackle.upstream.generic.CachingReaderBuilder import io.emeraldpay.dshackle.upstream.generic.ChainSpecific +import io.emeraldpay.dshackle.upstream.generic.GenericUpstream import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse +import org.springframework.cloud.sleuth.Tracer +import reactor.core.publisher.Mono +import reactor.core.scheduler.Scheduler object EthereumChainSpecific : ChainSpecific { override fun parseBlock(data: JsonRpcResponse, upstreamId: String): BlockContainer { @@ -11,4 +31,56 @@ object EthereumChainSpecific : ChainSpecific { } override fun latestBlockRequest() = JsonRpcRequest("eth_getBlockByNumber", listOf("latest", false)) + override fun localReaderBuilder( + cachingReader: CachingReader, + methods: CallMethods, + head: Head, + ): Mono { + return Mono.just(EthereumLocalReader(cachingReader as EthereumCachingReader, methods, head)) + } + + override fun subscriptionBuilder(headScheduler: Scheduler): (Multistream) -> EgressSubscription { + return { ms -> +// val pendingTxes: PendingTxesSource = (ms.upstreams as MutableList) +// .mapNotNull { +// it.getIngressSubscription().getPendingTxes() +// }.let { +// if (it.isEmpty()) { +// NoPendingTxes() +// } else if (it.size == 1) { +// it.first() +// } else { +// AggregatedPendingTxes(it) +// } +// } +// return + EmptyEgressSubscription + } + } + + override fun makeCachingReaderBuilder(tracer: Tracer): CachingReaderBuilder { + return { ms, caches, methodsFactory -> EthereumCachingReader(ms, caches, methodsFactory, tracer) } + } + + override fun validator( + chain: Chain, + upstream: Upstream, + options: Options, + config: ChainConfig, + ): UpstreamValidator? { + return EthereumUpstreamValidator(chain, upstream, options, config) + } + + override fun labelDetector(chain: Chain, reader: JsonRpcReader): LabelsDetector? { + return EthereumLabelsDetector(reader, chain) + } + + override fun subscriptionTopics(upstream: GenericUpstream): List { + val subs = if (upstream.getCapabilities().contains(Capability.WS_HEAD)) { + listOf(EthereumEgressSubscription.METHOD_NEW_HEADS, EthereumEgressSubscription.METHOD_LOGS) + } else { + listOf() + } + return upstream.getIngressSubscription().getAvailableTopics().plus(subs).toSet().toList() + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFees.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFees.kt deleted file mode 100644 index 1c01e0b01..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFees.kt +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Copyright (c) 2021 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.upstream.ethereum - -import io.emeraldpay.dshackle.upstream.AbstractChainFees -import io.emeraldpay.dshackle.upstream.ChainFees -import io.emeraldpay.dshackle.upstream.Multistream -import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson -import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionJsonSnapshot -import io.emeraldpay.etherjar.domain.Wei -import io.emeraldpay.etherjar.rpc.json.TransactionRefJson -import org.slf4j.LoggerFactory -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.util.function.Tuples -import java.util.function.Function - -abstract class EthereumFees( - upstreams: Multistream, - private val reader: EthereumCachingReader, - heightLimit: Int, -) : AbstractChainFees, TransactionRefJson, TransactionJsonSnapshot>(heightLimit, upstreams, extractTx), ChainFees { - - companion object { - private val log = LoggerFactory.getLogger(EthereumFees::class.java) - - private val extractTx = { block: BlockJson -> - block.transactions - } - } - - abstract fun extractFee(block: BlockJson, tx: TransactionJsonSnapshot): EthereumFee - - override fun readFeesAt(height: Long, selector: TxAt, TransactionRefJson>): Mono { - return reader.blocksByHeightParsed().read(height) - .flatMap { block -> - Mono.justOrEmpty(selector.get(block)) - .cast(TransactionRefJson::class.java) - .flatMap { reader.txByHash().read(it.hash) } - .map { tx -> extractFee(block, tx) } - } - } - - override fun feeAggregation(mode: ChainFees.Mode): Function, Mono> { - if (mode == ChainFees.Mode.MIN_ALWAYS) { - return Function { src -> - src.reduce { a, b -> - EthereumFee( - a.max.coerceAtLeast(b.max), - a.priority.coerceAtLeast(b.priority), - a.paid.coerceAtLeast(b.paid), - Wei.ZERO, - ) - } - } - } - return Function { src -> - src.map { Tuples.of(1, it) } - .reduce { a, b -> - Tuples.of(a.t1 + b.t1, a.t2.plus(b.t2)) - }.map { - EthereumFee(it.t2.max / it.t1, it.t2.priority / it.t1, it.t2.paid / it.t1, it.t2.base / it.t1) - } - } - } - - // --- - - data class EthereumFee(val max: Wei, val priority: Wei, val paid: Wei, val base: Wei) { - fun plus(o: EthereumFee): EthereumFee { - return EthereumFee(max + o.max, priority + o.priority, paid + o.paid, base + o.base) - } - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLegacyFees.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLegacyFees.kt deleted file mode 100644 index ab7780503..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLegacyFees.kt +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Copyright (c) 2021 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.upstream.ethereum - -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson -import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionJsonSnapshot -import io.emeraldpay.etherjar.domain.Wei -import io.emeraldpay.etherjar.rpc.json.TransactionRefJson -import org.slf4j.LoggerFactory -import java.util.function.Function - -class EthereumLegacyFees(upstreams: EthereumMultistream, reader: EthereumCachingReader, heightLimit: Int) : - EthereumFees(upstreams, reader, heightLimit) { - - companion object { - private val log = LoggerFactory.getLogger(EthereumLegacyFees::class.java) - } - - private val toGrpc: Function = Function { - BlockchainOuterClass.EstimateFeeResponse.newBuilder() - .setEthereumStd( - BlockchainOuterClass.EthereumStdFees.newBuilder() - .setFee(it.paid.amount.toString()), - ) - .build() - } - - override fun extractFee(block: BlockJson, tx: TransactionJsonSnapshot): EthereumFee { - return EthereumFee(tx.gasPrice, tx.gasPrice, tx.gasPrice, Wei.ZERO) - } - - override fun getResponseBuilder(): Function { - return toGrpc - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeRpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeRpcUpstream.kt deleted file mode 100644 index d2041cb5b..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeRpcUpstream.kt +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Copyright (c) 2020 EmeraldPay, Inc - * Copyright (c) 2019 ETCDEV GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.upstream.ethereum - -import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.cache.Caches -import io.emeraldpay.dshackle.cache.CachesEnabled -import io.emeraldpay.dshackle.config.ChainsConfig -import io.emeraldpay.dshackle.config.UpstreamsConfig -import io.emeraldpay.dshackle.foundation.ChainOptions -import io.emeraldpay.dshackle.reader.JsonRpcReader -import io.emeraldpay.dshackle.startup.QuorumForLabels -import io.emeraldpay.dshackle.startup.UpstreamChangeEvent -import io.emeraldpay.dshackle.upstream.Capability -import io.emeraldpay.dshackle.upstream.Head -import io.emeraldpay.dshackle.upstream.Upstream -import io.emeraldpay.dshackle.upstream.UpstreamAvailability -import io.emeraldpay.dshackle.upstream.calls.CallMethods -import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR -import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR -import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_VALID -import io.emeraldpay.dshackle.upstream.ethereum.subscribe.EthereumLabelsDetector -import io.emeraldpay.dshackle.upstream.generic.connectors.ConnectorFactory -import io.emeraldpay.dshackle.upstream.generic.connectors.GenericConnector -import org.springframework.context.ApplicationEventPublisher -import org.springframework.context.Lifecycle -import reactor.core.Disposable -import reactor.core.publisher.Flux -import java.time.Duration -import java.util.concurrent.atomic.AtomicBoolean - -open class EthereumLikeRpcUpstream( - id: String, - hash: Byte, - val chain: Chain, - options: ChainOptions.Options, - role: UpstreamsConfig.UpstreamRole, - targets: CallMethods?, - private val node: QuorumForLabels.QuorumItem?, - connectorFactory: ConnectorFactory, - chainConfig: ChainsConfig.ChainConfig, - skipEnhance: Boolean, - private val eventPublisher: ApplicationEventPublisher?, -) : EthereumLikeUpstream(id, hash, options, role, targets, node, chainConfig), Lifecycle, Upstream, CachesEnabled { - private val validator: EthereumUpstreamValidator = EthereumUpstreamValidator(chain, this, getOptions(), chainConfig.callLimitContract) - protected val connector: GenericConnector = connectorFactory.create(this, chain, skipEnhance) - private val labelsDetector = EthereumLabelsDetector(this.getIngressReader(), chain) - private var hasLiveSubscriptionHead: AtomicBoolean = AtomicBoolean(false) - - private var validatorSubscription: Disposable? = null - private var livenessSubscription: Disposable? = null - private var validationSettingsSubscription: Disposable? = null - - override fun getCapabilities(): Set { - return if (hasLiveSubscriptionHead.get()) { - setOf(Capability.RPC, Capability.BALANCE, Capability.WS_HEAD) - } else { - setOf(Capability.RPC, Capability.BALANCE) - } - } - - override fun setCaches(caches: Caches) { - if (connector is CachesEnabled) { - connector.setCaches(caches) - } - } - - override fun start() { - log.info("Configured for ${chain.chainName}") - connector.start() - val validSettingsResult = validator.validateUpstreamSettingsOnStartup() - when (validSettingsResult) { - UPSTREAM_FATAL_SETTINGS_ERROR -> { - connector.stop() - log.warn("Upstream ${getId()} couldn't start, invalid upstream settings") - return - } - UPSTREAM_SETTINGS_ERROR -> { - validateUpstreamSettings() - } - else -> { - upstreamStart() - labelsDetector.detectLabels() - .toStream() - .forEach { updateLabels(it) } - } - } - } - - private fun validateUpstreamSettings() { - validationSettingsSubscription = Flux.interval( - Duration.ofSeconds(10), - Duration.ofSeconds(20), - ).flatMap { - validator.validateUpstreamSettings() - }.subscribe { - when (it) { - UPSTREAM_FATAL_SETTINGS_ERROR -> { - connector.stop() - log.warn("Upstream ${getId()} couldn't start, invalid upstream settings") - disposeValidationSettingsSubscription() - } - UPSTREAM_VALID -> { - upstreamStart() - labelsDetector.detectLabels() - .subscribe { label -> updateLabels(label) } - eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UpstreamChangeEvent.ChangeType.ADDED)) - disposeValidationSettingsSubscription() - } - else -> { - log.warn("Continue validation of upstream ${getId()}") - } - } - } - } - - private fun upstreamStart() { - if (getOptions().disableValidation) { - log.warn("Disable validation for upstream ${this.getId()}") - this.setLag(0) - this.setStatus(UpstreamAvailability.OK) - } else { - log.debug("Start validation for upstream ${this.getId()}") - validatorSubscription = validator.start() - .subscribe(this::setStatus) - } - livenessSubscription = connector.hasLiveSubscriptionHead().subscribe({ - hasLiveSubscriptionHead.set(it) - eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UpstreamChangeEvent.ChangeType.UPDATED)) - }, { - log.debug("Error while checking live subscription for ${getId()}", it) - },) - } - - private fun updateLabels(label: Pair) { - log.info("Detected label ${label.first} with value ${label.second} for upstream ${getId()}") - node?.labels?.let { labels -> - labels[label.first] = label.second - } - } - - override fun getIngressSubscription(): EthereumIngressSubscription { - return connector.getIngressSubscription() - } - - override fun getSubscriptionTopics(): List { - val subs = if (getCapabilities().contains(Capability.WS_HEAD)) { - listOf(EthereumEgressSubscription.METHOD_NEW_HEADS, EthereumEgressSubscription.METHOD_LOGS) - } else { - listOf() - } - return getIngressSubscription().getAvailableTopics().plus(subs).toSet().toList() - } - - override fun getHead(): Head { - return connector.getHead() - } - - override fun stop() { - validatorSubscription?.dispose() - validatorSubscription = null - livenessSubscription?.dispose() - livenessSubscription = null - disposeValidationSettingsSubscription() - connector.stop() - } - - override fun isRunning(): Boolean { - return connector.isRunning() && validationSettingsSubscription == null - } - - override fun getIngressReader(): JsonRpcReader { - return connector.getIngressReader() - } - - override fun isGrpc(): Boolean { - return false - } - - @Suppress("UNCHECKED_CAST") - override fun cast(selfType: Class): T { - if (!selfType.isAssignableFrom(this.javaClass)) { - throw ClassCastException("Cannot cast ${this.javaClass} to $selfType") - } - return this as T - } - - private fun disposeValidationSettingsSubscription() { - validationSettingsSubscription?.dispose() - validationSettingsSubscription = null - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeUpstream.kt deleted file mode 100644 index c02e58785..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeUpstream.kt +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Copyright (c) 2020 EmeraldPay, Inc - * Copyright (c) 2019 ETCDEV GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.upstream.ethereum - -import io.emeraldpay.dshackle.config.ChainsConfig -import io.emeraldpay.dshackle.config.UpstreamsConfig -import io.emeraldpay.dshackle.foundation.ChainOptions -import io.emeraldpay.dshackle.startup.QuorumForLabels -import io.emeraldpay.dshackle.upstream.Capability -import io.emeraldpay.dshackle.upstream.DefaultUpstream -import io.emeraldpay.dshackle.upstream.calls.CallMethods - -abstract class EthereumLikeUpstream( - id: String, - hash: Byte, - options: ChainOptions.Options, - role: UpstreamsConfig.UpstreamRole, - targets: CallMethods?, - private val node: QuorumForLabels.QuorumItem?, - val chainConfig: ChainsConfig.ChainConfig, -) : DefaultUpstream(id, hash, options, role, targets, node, chainConfig) { - - private val capabilities = setOf(Capability.RPC, Capability.BALANCE) - - override fun getCapabilities(): Set { - return capabilities - } - - override fun getLabels(): Collection { - return node?.let { listOf(it.labels) } ?: emptyList() - } - - abstract fun getIngressSubscription(): EthereumIngressSubscription -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLocalReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLocalReader.kt index 03b51881a..26212358d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLocalReader.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLocalReader.kt @@ -41,7 +41,6 @@ class EthereumLocalReader( private val reader: EthereumCachingReader, private val methods: CallMethods, private val head: Head, - private val localEnabled: Boolean, ) : JsonRpcReader { override fun read(key: JsonRpcRequest): Mono { @@ -49,9 +48,6 @@ class EthereumLocalReader( return Mono.just(methods.executeHardcoded(key.method)) .map { JsonRpcResponse(it, null) } } - if (!localEnabled) { - return Mono.empty() - } if (!methods.isCallable(key.method)) { return Mono.error(RpcException(RpcResponseError.CODE_METHOD_NOT_EXIST, "Unsupported method")) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumMultistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumMultistream.kt deleted file mode 100644 index 79986fa5c..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumMultistream.kt +++ /dev/null @@ -1,215 +0,0 @@ -/** - * Copyright (c) 2020 EmeraldPay, Inc - * Copyright (c) 2020 ETCDEV GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.upstream.ethereum - -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.cache.Caches -import io.emeraldpay.dshackle.config.UpstreamsConfig -import io.emeraldpay.dshackle.data.BlockContainer -import io.emeraldpay.dshackle.reader.JsonRpcReader -import io.emeraldpay.dshackle.reader.Reader -import io.emeraldpay.dshackle.upstream.DistanceExtractor -import io.emeraldpay.dshackle.upstream.DynamicMergedHead -import io.emeraldpay.dshackle.upstream.EgressSubscription -import io.emeraldpay.dshackle.upstream.EmptyHead -import io.emeraldpay.dshackle.upstream.Head -import io.emeraldpay.dshackle.upstream.HeadLagObserver -import io.emeraldpay.dshackle.upstream.Lifecycle -import io.emeraldpay.dshackle.upstream.MergedHead -import io.emeraldpay.dshackle.upstream.Multistream -import io.emeraldpay.dshackle.upstream.Selector -import io.emeraldpay.dshackle.upstream.Upstream -import io.emeraldpay.dshackle.upstream.ethereum.subscribe.AggregatedPendingTxes -import io.emeraldpay.dshackle.upstream.ethereum.subscribe.NoPendingTxes -import io.emeraldpay.dshackle.upstream.ethereum.subscribe.PendingTxesSource -import io.emeraldpay.dshackle.upstream.forkchoice.MostWorkForkChoice -import io.emeraldpay.dshackle.upstream.forkchoice.PriorityForkChoice -import io.emeraldpay.dshackle.upstream.grpc.GrpcUpstream -import io.emeraldpay.etherjar.domain.BlockHash -import org.springframework.cloud.sleuth.Tracer -import org.springframework.util.ConcurrentReferenceHashMap -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.core.scheduler.Scheduler - -@Suppress("UNCHECKED_CAST") -open class EthereumMultistream( - chain: Chain, - val upstreams: MutableList, - caches: Caches, - private val headScheduler: Scheduler, - tracer: Tracer, -) : Multistream(chain, upstreams as MutableList, caches) { - - private var head: DynamicMergedHead = DynamicMergedHead( - PriorityForkChoice(), - "ETH Multistream of ${chain.chainCode}", - headScheduler, - ) - - private val filteredHeads: MutableMap = - ConcurrentReferenceHashMap(16, ConcurrentReferenceHashMap.ReferenceType.WEAK) - - private val reader: EthereumCachingReader = EthereumCachingReader(this, this.caches, getMethodsFactory(), tracer) - private var subscribe = EthereumEgressSubscription(this, headScheduler, NoPendingTxes()) - - init { - this.init() - } - - override fun init() { - if (upstreams.size > 0) { - upstreams.forEach { addHead(it) } - } - super.init() - } - - override fun onUpstreamsUpdated() { - super.onUpstreamsUpdated() - - val pendingTxes: PendingTxesSource = upstreams - .mapNotNull { - it.getIngressSubscription().getPendingTxes() - }.let { - if (it.isEmpty()) { - NoPendingTxes() - } else if (it.size == 1) { - it.first() - } else { - AggregatedPendingTxes(it) - } - } - subscribe = EthereumEgressSubscription(this, headScheduler, pendingTxes) - } - - override fun start() { - super.start() - head.start() - onHeadUpdated(head) - reader.start() - } - - override fun stop() { - super.stop() - reader.stop() - filteredHeads.clear() - } - - override fun addHead(upstream: Upstream) { - val newHead = upstream.getHead() - if (newHead is Lifecycle && !newHead.isRunning()) { - newHead.start() - } - head.addHead(upstream) - } - - override fun removeHead(upstreamId: String) { - head.removeHead(upstreamId) - } - - override fun makeLagObserver(): HeadLagObserver = - HeadLagObserver(head, upstreams, DistanceExtractor::extractPowDistance, headScheduler, 6).apply { - start() - } - - override fun isRunning(): Boolean { - return super.isRunning() || reader.isRunning() - } - - override fun getCachingReader(): EthereumCachingReader { - return reader - } - - override fun getHead(): Head { - return head - } - - override fun tryProxySubscribe( - matcher: Selector.Matcher, - request: BlockchainOuterClass.NativeSubscribeRequest, - ): Flux? = - upstreams.filter { - matcher.matches(it) - }.takeIf { ups -> - ups.size == 1 && ups.all { it.isGrpc() } - }?.map { - it as GrpcUpstream - }?.map { - it.getBlockchainApi().nativeSubscribe(request) - }?.let { - Flux.merge(it) - } - - override fun getLabels(): Collection { - return upstreams.flatMap { it.getLabels() } - } - - @Suppress("UNCHECKED_CAST") - override fun cast(selfType: Class): T { - if (!selfType.isAssignableFrom(this.javaClass)) { - throw ClassCastException("Cannot cast ${this.javaClass} to $selfType") - } - return this as T - } - - override fun getEgressSubscription(): EgressSubscription { - return subscribe - } - - override fun getLocalReader(localEnabled: Boolean): Mono { - return Mono.just(EthereumLocalReader(reader, getMethods(), getHead(), localEnabled)) - } - - override fun getHead(mather: Selector.Matcher): Head = - filteredHeads.computeIfAbsent(mather.describeInternal().intern()) { _ -> - upstreams.filter { mather.matches(it) } - .apply { - log.debug("Found $size upstreams matching [${mather.describeInternal()}]") - }.let { - val selected = it.map { it.getHead() } - when (it.size) { - 0 -> EmptyHead() - 1 -> selected.first() - else -> MergedHead(selected, MostWorkForkChoice(), headScheduler, "Eth head ${it.map { it.getId() }}").apply { - start() - } - } - } - } - - override fun getEnrichedHead(mather: Selector.Matcher): Head = - filteredHeads.computeIfAbsent(mather.describeInternal().intern()) { _ -> - upstreams.filter { mather.matches(it) } - .apply { - log.debug("Found $size upstreams matching [${mather.describeInternal()}]") - }.let { - val selected = it.map { source -> source.getHead() } - EnrichedMergedHead( - selected, - getHead(), - headScheduler, - object : - Reader { - override fun read(key: BlockHash): Mono { - return reader.blocksByHashAsCont().read(key).map { res -> res.data } - } - }, - ) - } - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumPriorityFees.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumPriorityFees.kt deleted file mode 100644 index d5488be73..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumPriorityFees.kt +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Copyright (c) 2021 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.upstream.ethereum - -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.dshackle.upstream.Multistream -import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson -import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionJsonSnapshot -import io.emeraldpay.etherjar.domain.Wei -import io.emeraldpay.etherjar.rpc.json.TransactionRefJson -import org.slf4j.LoggerFactory -import java.util.function.Function - -class EthereumPriorityFees(upstreams: Multistream, reader: EthereumCachingReader, heightLimit: Int) : - EthereumFees(upstreams, reader, heightLimit) { - - companion object { - private val log = LoggerFactory.getLogger(EthereumPriorityFees::class.java) - } - - private val toGrpc: Function = - Function { - BlockchainOuterClass.EstimateFeeResponse.newBuilder() - .setEthereumExtended( - BlockchainOuterClass.EthereumExtFees.newBuilder() - .setMax(it.max.amount.toString()) - .setPriority(it.priority.amount.toString()) - .setExpect(it.paid.amount.toString()), - ) - .build() - } - - override fun extractFee(block: BlockJson, tx: TransactionJsonSnapshot): EthereumFee { - val baseFee = block.baseFeePerGas ?: Wei.ZERO - if (tx.type == 2) { - // an EIP-1559 Transaction provides Max and Priority fee - val paid = (baseFee + tx.maxPriorityFeePerGas).coerceAtMost(tx.maxFeePerGas) - return EthereumFee(tx.maxFeePerGas, tx.maxPriorityFeePerGas, paid, baseFee) - } - return EthereumFee(tx.gasPrice, (tx.gasPrice - baseFee).coerceAtLeast(Wei.ZERO), tx.gasPrice, baseFee) - } - - override fun getResponseBuilder(): Function { - return toGrpc - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt index b41f9322d..d07a47e2e 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt @@ -20,12 +20,12 @@ import com.fasterxml.jackson.databind.ObjectMapper import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.Global +import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig import io.emeraldpay.dshackle.foundation.ChainOptions import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability -import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR -import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR -import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_VALID +import io.emeraldpay.dshackle.upstream.UpstreamValidator +import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse import io.emeraldpay.etherjar.domain.Address @@ -47,8 +47,8 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( private val chain: Chain, private val upstream: Upstream, private val options: ChainOptions.Options, - private val callLimitContract: String? = null, -) { + private val config: ChainConfig, +) : UpstreamValidator { companion object { private val log = LoggerFactory.getLogger(EthereumUpstreamValidator::class.java) val scheduler = @@ -127,7 +127,7 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( .onErrorReturn(UpstreamAvailability.UNAVAILABLE) } - fun start(): Flux { + override fun start(): Flux { return Flux.interval( Duration.ZERO, Duration.ofSeconds(options.validationInterval.toLong()), @@ -140,13 +140,9 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( } } - fun validateUpstreamSettingsOnStartup(): ValidateUpstreamSettingsResult { - return validateUpstreamSettings().block() ?: UPSTREAM_FATAL_SETTINGS_ERROR - } - - fun validateUpstreamSettings(): Mono { + override fun validateUpstreamSettings(): Mono { if (options.disableUpstreamValidation) { - return Mono.just(UPSTREAM_VALID) + return Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_VALID) } return Mono.zip( validateChain(), @@ -159,7 +155,7 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( private fun validateChain(): Mono { if (!options.validateChain) { - return Mono.just(UPSTREAM_VALID) + return Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_VALID) } return Mono.zip( chainId(), @@ -178,20 +174,20 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( } if (isChainValid) { - UPSTREAM_VALID + ValidateUpstreamSettingsResult.UPSTREAM_VALID } else { - UPSTREAM_FATAL_SETTINGS_ERROR + ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR } } .onErrorResume { log.error("Error during chain validation", it) - Mono.just(UPSTREAM_SETTINGS_ERROR) + Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR) } } private fun validateCallLimit(): Mono { - if (!options.validateCallLimit || callLimitContract == null) { - return Mono.just(UPSTREAM_VALID) + if (!options.validateCallLimit || config.callLimitContract == null) { + return Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_VALID) } return upstream.getIngressReader() .read( @@ -199,7 +195,7 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( "eth_call", listOf( TransactionCallJson( - Address.from(callLimitContract), + Address.from(config.callLimitContract), // calling contract with param 200_000, meaning it will generate 200k symbols or response // f4240 + metadata — ~1 million HexData.from("0xd8a26e3a00000000000000000000000000000000000000000000000000000000000f4240"), @@ -209,7 +205,7 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( ), ) .flatMap(JsonRpcResponse::requireResult) - .map { UPSTREAM_VALID } + .map { ValidateUpstreamSettingsResult.UPSTREAM_VALID } .onErrorResume { if (it.message != null && it.message!!.contains("rpc.returndata.limit")) { log.warn( @@ -217,7 +213,7 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( "You need to set up your return limit to at least 1_100_000. " + "Erigon config example: https://github.com/ledgerwatch/erigon/blob/d014da4dc039ea97caf04ed29feb2af92b7b129d/cmd/utils/flags.go#L369", ) - Mono.just(UPSTREAM_FATAL_SETTINGS_ERROR) + Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR) } else { Mono.error(it) } @@ -233,7 +229,7 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( "message ${ctx.exception().message}", ) } - .onErrorReturn(UPSTREAM_SETTINGS_ERROR) + .onErrorReturn(ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR) } private fun validateOldBlocks(): Mono { @@ -257,11 +253,11 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( "Node ${upstream.getId()} probably is synced incorrectly, it is not possible to get old blocks", ) } - UPSTREAM_VALID + ValidateUpstreamSettingsResult.UPSTREAM_VALID } .onErrorResume { log.warn("Error during old blocks validation", it) - Mono.just(UPSTREAM_VALID) + Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_VALID) } } @@ -290,10 +286,4 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( .doOnError { log.error("Error during execution 'net_version' - ${it.message} for ${upstream.getId()}") } .flatMap(JsonRpcResponse::requireStringResult) } - - enum class ValidateUpstreamSettingsResult { - UPSTREAM_VALID, - UPSTREAM_SETTINGS_ERROR, - UPSTREAM_FATAL_SETTINGS_ERROR, - } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/EthereumLabelsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/EthereumLabelsDetector.kt index 9bf33c49a..29067ced8 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/EthereumLabelsDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/EthereumLabelsDetector.kt @@ -5,6 +5,7 @@ import com.fasterxml.jackson.module.kotlin.readValue import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Global.Companion.objectMapper import io.emeraldpay.dshackle.reader.JsonRpcReader +import io.emeraldpay.dshackle.upstream.LabelsDetector import io.emeraldpay.dshackle.upstream.ethereum.EthereumArchiveBlockNumberReader import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse @@ -14,10 +15,10 @@ import reactor.core.publisher.Mono class EthereumLabelsDetector( private val reader: JsonRpcReader, private val chain: Chain, -) { +) : LabelsDetector { private val blockNumberReader = EthereumArchiveBlockNumberReader(reader) - fun detectLabels(): Flux> { + override fun detectLabels(): Flux> { return Flux.merge( detectNodeType(), detectArchiveNode(), diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum_pos/EthereumPosMultiStream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum_pos/EthereumPosMultiStream.kt deleted file mode 100644 index 46b14e47b..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum_pos/EthereumPosMultiStream.kt +++ /dev/null @@ -1,222 +0,0 @@ -/** - * Copyright (c) 2020 EmeraldPay, Inc - * Copyright (c) 2020 ETCDEV GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.upstream.ethereum - -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.cache.Caches -import io.emeraldpay.dshackle.config.UpstreamsConfig -import io.emeraldpay.dshackle.data.BlockContainer -import io.emeraldpay.dshackle.reader.JsonRpcReader -import io.emeraldpay.dshackle.reader.Reader -import io.emeraldpay.dshackle.upstream.DistanceExtractor -import io.emeraldpay.dshackle.upstream.DynamicMergedHead -import io.emeraldpay.dshackle.upstream.EmptyHead -import io.emeraldpay.dshackle.upstream.Head -import io.emeraldpay.dshackle.upstream.HeadLagObserver -import io.emeraldpay.dshackle.upstream.Lifecycle -import io.emeraldpay.dshackle.upstream.MergedHead -import io.emeraldpay.dshackle.upstream.Multistream -import io.emeraldpay.dshackle.upstream.Selector -import io.emeraldpay.dshackle.upstream.Upstream -import io.emeraldpay.dshackle.upstream.ethereum.subscribe.AggregatedPendingTxes -import io.emeraldpay.dshackle.upstream.ethereum.subscribe.NoPendingTxes -import io.emeraldpay.dshackle.upstream.ethereum.subscribe.PendingTxesSource -import io.emeraldpay.dshackle.upstream.forkchoice.PriorityForkChoice -import io.emeraldpay.dshackle.upstream.grpc.GrpcUpstream -import io.emeraldpay.etherjar.domain.BlockHash -import org.springframework.cloud.sleuth.Tracer -import org.springframework.util.ConcurrentReferenceHashMap -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.core.scheduler.Scheduler - -@Suppress("UNCHECKED_CAST") -open class EthereumPosMultiStream( - chain: Chain, - val upstreams: MutableList, - caches: Caches, - private val headScheduler: Scheduler, - tracer: Tracer, -) : Multistream(chain, upstreams as MutableList, caches) { - - private var head: DynamicMergedHead = DynamicMergedHead( - PriorityForkChoice(), - "ETH Pos Multistream of ${chain.chainCode}", - headScheduler, - ) - - private val reader: EthereumCachingReader = EthereumCachingReader(this, this.caches, getMethodsFactory(), tracer) - private var subscribe = EthereumEgressSubscription(this, headScheduler, NoPendingTxes()) - private val filteredHeads: MutableMap = - ConcurrentReferenceHashMap(16, ConcurrentReferenceHashMap.ReferenceType.WEAK) - - init { - this.init() - } - - override fun init() { - if (upstreams.size > 0) { - upstreams.forEach { addHead(it) } - } - super.init() - } - - override fun start() { - super.start() - head.start() - onHeadUpdated(head) - reader.start() - } - - override fun stop() { - super.stop() - reader.stop() - filteredHeads.clear() - } - - override fun addHead(upstream: Upstream) { - val newHead = upstream.getHead() - if (newHead is Lifecycle && !newHead.isRunning()) { - newHead.start() - } - head.addHead(upstream) - } - - override fun removeHead(upstreamId: String) { - head.removeHead(upstreamId) - } - - override fun isRunning(): Boolean { - return super.isRunning() || reader.isRunning() - } - - override fun makeLagObserver(): HeadLagObserver = - HeadLagObserver(head, upstreams, DistanceExtractor::extractPriorityDistance, headScheduler, 6).apply { - start() - } - - override fun getCachingReader(): EthereumCachingReader { - return reader - } - - override fun getHead(): Head { - return head - } - - override fun tryProxySubscribe( - matcher: Selector.Matcher, - request: BlockchainOuterClass.NativeSubscribeRequest, - ): Flux? = - upstreams.filter { - matcher.matches(it) - }.takeIf { ups -> - ups.size == 1 && ups.all { it.isGrpc() } - }?.map { - it as GrpcUpstream - }?.map { - it.proxySubscribe(request) - }?.let { - Flux.merge(it) - } - - override fun getLabels(): Collection { - return upstreams.flatMap { it.getLabels() } - } - - @Suppress("UNCHECKED_CAST") - override fun cast(selfType: Class): T { - if (!selfType.isAssignableFrom(this.javaClass)) { - throw ClassCastException("Cannot cast ${this.javaClass} to $selfType") - } - return this as T - } - - override fun getLocalReader(localEnabled: Boolean): Mono { - return Mono.just(EthereumLocalReader(reader, getMethods(), getHead(), localEnabled)) - } - - override fun getEgressSubscription(): EthereumEgressSubscription { - return subscribe - } - - override fun getHead(mather: Selector.Matcher): Head = - if (mather == Selector.empty || mather == Selector.anyLabel) { - head - } else { - filteredHeads.computeIfAbsent(mather.describeInternal().intern()) { _ -> - upstreams.filter { mather.matches(it) } - .apply { - log.debug("Found $size upstreams matching [${mather.describeInternal()}]") - } - .let { - val selected = it.map { it.getHead() } - when (it.size) { - 0 -> EmptyHead() - 1 -> selected.first() - else -> MergedHead( - selected, - PriorityForkChoice(), - headScheduler, - "ETH head for ${it.map { it.getId() }}", - ).apply { - start() - } - } - } - } - } - - override fun getEnrichedHead(mather: Selector.Matcher): Head = - filteredHeads.computeIfAbsent(mather.describeInternal().intern()) { _ -> - upstreams.filter { mather.matches(it) } - .apply { - log.debug("Found $size upstreams matching [${mather.describeInternal()}]") - }.let { - val selected = it.map { source -> source.getHead() } - EnrichedMergedHead( - selected, - getHead(), - headScheduler, - object : - Reader { - override fun read(key: BlockHash): Mono { - return reader.blocksByHashAsCont().read(key).map { res -> res.data } - } - }, - ) - } - } - - override fun onUpstreamsUpdated() { - super.onUpstreamsUpdated() - - val pendingTxes: PendingTxesSource = upstreams - .mapNotNull { - it.getIngressSubscription().getPendingTxes() - }.let { - if (it.isEmpty()) { - NoPendingTxes() - } else if (it.size == 1) { - it.first() - } else { - AggregatedPendingTxes(it) - } - } - subscribe = EthereumEgressSubscription(this, headScheduler, pendingTxes) - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt index 90e90e137..cb9538a46 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt @@ -3,16 +3,48 @@ package io.emeraldpay.dshackle.upstream.generic import io.emeraldpay.dshackle.BlockchainType import io.emeraldpay.dshackle.BlockchainType.STARKNET import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.cache.Caches +import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.foundation.ChainOptions +import io.emeraldpay.dshackle.reader.JsonRpcReader +import io.emeraldpay.dshackle.upstream.CachingReader +import io.emeraldpay.dshackle.upstream.EgressSubscription +import io.emeraldpay.dshackle.upstream.Head +import io.emeraldpay.dshackle.upstream.LabelsDetector +import io.emeraldpay.dshackle.upstream.Multistream +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.UpstreamValidator +import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.ethereum.EthereumChainSpecific import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse import io.emeraldpay.dshackle.upstream.starknet.StarknetChainSpecific +import org.apache.commons.collections4.Factory +import org.springframework.cloud.sleuth.Tracer +import reactor.core.publisher.Mono +import reactor.core.scheduler.Scheduler + +typealias SubscriptionBuilder = (Multistream) -> EgressSubscription +typealias LocalReaderBuilder = (CachingReader, CallMethods, Head) -> Mono +typealias CachingReaderBuilder = (Multistream, Caches, Factory) -> CachingReader interface ChainSpecific { fun parseBlock(data: JsonRpcResponse, upstreamId: String): BlockContainer fun latestBlockRequest(): JsonRpcRequest + + fun localReaderBuilder(cachingReader: CachingReader, methods: CallMethods, head: Head): Mono + + fun subscriptionBuilder(headScheduler: Scheduler): (Multistream) -> EgressSubscription + + fun makeCachingReaderBuilder(tracer: Tracer): CachingReaderBuilder + + fun validator(chain: Chain, upstream: Upstream, options: ChainOptions.Options, config: ChainConfig): UpstreamValidator? + + fun labelDetector(chain: Chain, reader: JsonRpcReader): LabelsDetector? + + fun subscriptionTopics(upstream: GenericUpstream): List } object ChainSpecificRegistry { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericMultistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericMultistream.kt index 3e351ce2c..10e3f7b10 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericMultistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericMultistream.kt @@ -24,24 +24,40 @@ import io.emeraldpay.dshackle.upstream.CachingReader import io.emeraldpay.dshackle.upstream.DistanceExtractor import io.emeraldpay.dshackle.upstream.DynamicMergedHead import io.emeraldpay.dshackle.upstream.EgressSubscription -import io.emeraldpay.dshackle.upstream.EmptyEgressSubscription +import io.emeraldpay.dshackle.upstream.EmptyHead import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.HeadLagObserver import io.emeraldpay.dshackle.upstream.Lifecycle +import io.emeraldpay.dshackle.upstream.MergedHead import io.emeraldpay.dshackle.upstream.Multistream +import io.emeraldpay.dshackle.upstream.Selector import io.emeraldpay.dshackle.upstream.Selector.Matcher import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.forkchoice.PriorityForkChoice +import org.springframework.util.ConcurrentReferenceHashMap +import org.springframework.util.ConcurrentReferenceHashMap.ReferenceType.WEAK import reactor.core.publisher.Mono import reactor.core.scheduler.Scheduler -@Suppress("UNCHECKED_CAST") -open class GenericMultistream( +class GenericMultistream( chain: Chain, - val upstreams: MutableList, + private val upstreams: MutableList, caches: Caches, private val headScheduler: Scheduler, -) : Multistream(chain, upstreams as MutableList, caches) { + cachingReaderBuilder: CachingReaderBuilder, + private val localReaderBuilder: LocalReaderBuilder, + private val subscriptionBuilder: SubscriptionBuilder, +) : Multistream(chain, caches) { + + private val cachingReader = cachingReaderBuilder(this, caches, getMethodsFactory()) + + override fun getUpstreams(): MutableList { + return upstreams + } + + override fun addUpstreamInternal(u: Upstream) { + upstreams.add(u as GenericUpstream) + } private var head: DynamicMergedHead = DynamicMergedHead( PriorityForkChoice(), @@ -53,6 +69,8 @@ open class GenericMultistream( this.init() } + private var subscription: EgressSubscription = subscriptionBuilder(this) + override fun init() { if (upstreams.size > 0) { upstreams.forEach { addHead(it) } @@ -60,10 +78,20 @@ open class GenericMultistream( super.init() } + private val filteredHeads: MutableMap = + ConcurrentReferenceHashMap(16, WEAK) + override fun start() { super.start() head.start() onHeadUpdated(head) + cachingReader.start() + } + + override fun stop() { + super.stop() + cachingReader.stop() + filteredHeads.clear() } override fun addHead(upstream: Upstream) { @@ -78,17 +106,45 @@ open class GenericMultistream( head.removeHead(upstreamId) } + override fun isRunning(): Boolean { + return super.isRunning() || cachingReader.isRunning() + } + override fun makeLagObserver(): HeadLagObserver = HeadLagObserver(head, upstreams, DistanceExtractor::extractPriorityDistance, headScheduler, 6).apply { start() } override fun getCachingReader(): CachingReader? { - return null + return cachingReader } override fun getHead(mather: Matcher): Head { - return getHead() + if (mather == Selector.empty || mather == Selector.anyLabel) { + return head + } else { + return filteredHeads.computeIfAbsent(mather.describeInternal().intern()) { _ -> + upstreams.filter { mather.matches(it) } + .apply { + log.debug("Found $size upstreams matching [${mather.describeInternal()}]") + } + .let { + val selected = it.map { it.getHead() } + when (it.size) { + 0 -> EmptyHead() + 1 -> selected.first() + else -> MergedHead( + selected, + PriorityForkChoice(), + headScheduler, + "Head for ${it.map { it.getId() }}", + ).apply { + start() + } + } + } + } + } } override fun getHead(): Head { @@ -111,11 +167,16 @@ open class GenericMultistream( return this as T } - override fun getLocalReader(localEnabled: Boolean): Mono { - return Mono.just(LocalReader(getMethods())) + override fun getLocalReader(): Mono { + return localReaderBuilder(cachingReader, getMethods(), getHead()) } override fun getEgressSubscription(): EgressSubscription { - return EmptyEgressSubscription() + return subscription + } + + override fun onUpstreamsUpdated() { + super.onUpstreamsUpdated() + subscription = subscriptionBuilder(this) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index b849206f7..977f73de6 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -12,14 +12,21 @@ import io.emeraldpay.dshackle.startup.UpstreamChangeEvent.ChangeType.UPDATED import io.emeraldpay.dshackle.upstream.Capability import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Head +import io.emeraldpay.dshackle.upstream.LabelsDetectorBuilder import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability +import io.emeraldpay.dshackle.upstream.UpstreamValidator +import io.emeraldpay.dshackle.upstream.UpstreamValidatorBuilder +import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult import io.emeraldpay.dshackle.upstream.calls.CallMethods +import io.emeraldpay.dshackle.upstream.ethereum.EthereumIngressSubscription import io.emeraldpay.dshackle.upstream.generic.connectors.ConnectorFactory import io.emeraldpay.dshackle.upstream.generic.connectors.GenericConnector import org.springframework.context.ApplicationEventPublisher import org.springframework.context.Lifecycle import reactor.core.Disposable +import reactor.core.publisher.Flux +import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean class GenericUpstream( @@ -30,14 +37,23 @@ class GenericUpstream( role: UpstreamsConfig.UpstreamRole, targets: CallMethods?, private val node: QuorumForLabels.QuorumItem?, - val chainConfig: ChainsConfig.ChainConfig, + chainConfig: ChainsConfig.ChainConfig, connectorFactory: ConnectorFactory, private val eventPublisher: ApplicationEventPublisher?, + validatorBuilder: UpstreamValidatorBuilder, + labelsDetectorBuilder: LabelsDetectorBuilder, + private val subscriptionTopics: (GenericUpstream) -> List, ) : DefaultUpstream(id, hash, null, UpstreamAvailability.OK, options, role, targets, node, chainConfig), Lifecycle { + private val validator: UpstreamValidator? = validatorBuilder(chain, this, getOptions(), chainConfig) + private var validatorSubscription: Disposable? = null + private var validationSettingsSubscription: Disposable? = null + private val hasLiveSubscriptionHead: AtomicBoolean = AtomicBoolean(false) private val connector: GenericConnector = connectorFactory.create(this, chain, true) private var livenessSubscription: Disposable? = null + private val labelsDetector = labelsDetectorBuilder(chain, this.getIngressReader()) + override fun getHead(): Head { return connector.getHead() } @@ -51,10 +67,7 @@ class GenericUpstream( } override fun getSubscriptionTopics(): List { - // should be implemented in next iterations - // starknet doesn't have any subscriptions at all - // polkadot serves subscriptions like separate json-rpc methods - return emptyList() + return subscriptionTopics(this) } // outdated, looks like applicable only for bitcoin and our ws_head trick @@ -83,19 +96,107 @@ class GenericUpstream( log.info("Configured for ${chain.chainName}") connector.start() + if (validator != null) { + val validSettingsResult = validator.validateUpstreamSettingsOnStartup() + when (validSettingsResult) { + ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR -> { + connector.stop() + log.warn("Upstream ${getId()} couldn't start, invalid upstream settings") + return + } + ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR -> { + validateUpstreamSettings() + } + else -> { + upstreamStart() + } + } + } else { + upstreamStart() + } + } + + private fun validateUpstreamSettings() { + if (validator != null) { + validationSettingsSubscription = Flux.interval( + Duration.ofSeconds(10), + Duration.ofSeconds(20), + ).flatMap { + validator.validateUpstreamSettings() + }.subscribe { + when (it) { + ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR -> { + connector.stop() + disposeValidationSettingsSubscription() + } + + ValidateUpstreamSettingsResult.UPSTREAM_VALID -> { + upstreamStart() + eventPublisher?.publishEvent( + UpstreamChangeEvent( + chain, + this, + UpstreamChangeEvent.ChangeType.ADDED, + ), + ) + disposeValidationSettingsSubscription() + } + + else -> { + log.warn("Continue validation of upstream ${getId()}") + } + } + } + } + } + + private fun detectLabels() { + labelsDetector?.detectLabels()?.subscribe { label -> updateLabels(label) } + } + + private fun upstreamStart() { + if (getOptions().disableValidation) { + log.warn("Disable validation for upstream ${this.getId()}") + this.setLag(0) + this.setStatus(UpstreamAvailability.OK) + } else { + log.debug("Start validation for upstream ${this.getId()}") + validatorSubscription = validator?.start() + ?.subscribe(this::setStatus) + } livenessSubscription = connector.hasLiveSubscriptionHead().subscribe({ hasLiveSubscriptionHead.set(it) - eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UPDATED)) + eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UpstreamChangeEvent.ChangeType.UPDATED)) }, { log.debug("Error while checking live subscription for ${getId()}", it) },) + detectLabels() } override fun stop() { + validatorSubscription?.dispose() + validatorSubscription = null livenessSubscription?.dispose() livenessSubscription = null + disposeValidationSettingsSubscription() connector.stop() } + private fun disposeValidationSettingsSubscription() { + validationSettingsSubscription?.dispose() + validationSettingsSubscription = null + } + + private fun updateLabels(label: Pair) { + log.info("Detected label ${label.first} with value ${label.second} for upstream ${getId()}") + node?.labels?.let { labels -> + labels[label.first] = label.second + } + } + + fun getIngressSubscription(): EthereumIngressSubscription { + return connector.getIngressSubscription() + } + override fun isRunning() = connector.isRunning() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstream.kt deleted file mode 100644 index eb6fdeb3b..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstream.kt +++ /dev/null @@ -1,223 +0,0 @@ -/** - * Copyright (c) 2020 EmeraldPay, Inc - * Copyright (c) 2019 ETCDEV GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.upstream.grpc - -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.api.proto.ReactorBlockchainGrpc.ReactorBlockchainStub -import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.Defaults -import io.emeraldpay.dshackle.config.ChainsConfig -import io.emeraldpay.dshackle.config.UpstreamsConfig -import io.emeraldpay.dshackle.data.BlockContainer -import io.emeraldpay.dshackle.data.BlockId -import io.emeraldpay.dshackle.foundation.ChainOptions -import io.emeraldpay.dshackle.reader.JsonRpcReader -import io.emeraldpay.dshackle.startup.QuorumForLabels -import io.emeraldpay.dshackle.upstream.BuildInfo -import io.emeraldpay.dshackle.upstream.Capability -import io.emeraldpay.dshackle.upstream.Head -import io.emeraldpay.dshackle.upstream.Lifecycle -import io.emeraldpay.dshackle.upstream.Upstream -import io.emeraldpay.dshackle.upstream.UpstreamAvailability -import io.emeraldpay.dshackle.upstream.calls.CallMethods -import io.emeraldpay.dshackle.upstream.ethereum.EthereumIngressSubscription -import io.emeraldpay.dshackle.upstream.ethereum.EthereumLikeUpstream -import io.emeraldpay.dshackle.upstream.ethereum.subscribe.EthereumDshackleIngressSubscription -import io.emeraldpay.dshackle.upstream.forkchoice.MostWorkForkChoice -import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcGrpcClient -import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest -import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse -import io.emeraldpay.etherjar.domain.BlockHash -import io.emeraldpay.etherjar.rpc.RpcException -import org.reactivestreams.Publisher -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.core.scheduler.Scheduler -import java.math.BigInteger -import java.time.Instant -import java.util.Locale -import java.util.concurrent.TimeoutException -import java.util.function.Function - -open class EthereumGrpcUpstream( - private val parentId: String, - hash: Byte, - role: UpstreamsConfig.UpstreamRole, - private val chain: Chain, - private val remote: ReactorBlockchainStub, - client: JsonRpcGrpcClient, - overrideLabels: UpstreamsConfig.Labels?, - chainConfig: ChainsConfig.ChainConfig, - headScheduler: Scheduler, -) : EthereumLikeUpstream( - "${parentId}_${chain.chainCode.lowercase(Locale.getDefault())}", - hash, - ChainOptions.PartialOptions.getDefaults().buildOptions(), - role, - null, - null, - chainConfig, -), - GrpcUpstream, - Lifecycle { - - private val blockConverter: Function = Function { value -> - val parentHash = - if (value.parentBlockId.isBlank()) { - null - } else { - BlockId.from(BlockHash.from("0x" + value.parentBlockId)) - } - val block = BlockContainer( - value.height, - BlockId.from(BlockHash.from("0x" + value.blockId)), - BigInteger(1, value.weight.toByteArray()), - Instant.ofEpochMilli(value.timestamp), - false, - null, - null, - parentHash, - ) - block - } - - override fun getSubscriptionTopics(): List { - return subscriptionTopics - } - - private val reloadBlock: Function> = Function { existingBlock -> - // head comes without transaction data - // need to download transactions for the block - defaultReader.read(JsonRpcRequest("eth_getBlockByHash", listOf(existingBlock.hash.toHexWithPrefix(), false))) - .flatMap(JsonRpcResponse::requireResult) - .map { - BlockContainer.fromEthereumJson(it, getId()) - } - .timeout(timeout, Mono.error(TimeoutException("Timeout from upstream"))) - .doOnError { t -> - setStatus(UpstreamAvailability.UNAVAILABLE) - val msg = "Failed to download block data for chain $chain on $parentId" - if (t is RpcException || t is TimeoutException) { - log.warn("$msg. Message: ${t.message}") - } else { - log.error(msg, t) - } - } - } - - private val upstreamStatus = GrpcUpstreamStatus(overrideLabels) - private val grpcHead = GrpcHead( - getId(), - chain, - this, - remote, - blockConverter, - reloadBlock, - MostWorkForkChoice(), - headScheduler, - ) - private var capabilities: Set = emptySet() - private val buildInfo: BuildInfo = BuildInfo() - private var subscriptionTopics = listOf() - - private val defaultReader: JsonRpcReader = client.getReader() - var timeout = Defaults.timeout - private val ethereumSubscriptions = EthereumDshackleIngressSubscription(chain, remote) - - override fun getBlockchainApi(): ReactorBlockchainStub { - return remote - } - - override fun proxySubscribe(request: BlockchainOuterClass.NativeSubscribeRequest): Flux = - remote.nativeSubscribe(request) - - override fun start() { - } - - override fun isRunning(): Boolean { - return true - } - - override fun stop() { - } - - override fun getBuildInfo(): BuildInfo { - return buildInfo - } - - override fun update(conf: BlockchainOuterClass.DescribeChain, buildInfo: BlockchainOuterClass.BuildInfo): Boolean { - val newBuildInfo = BuildInfo.extract(buildInfo) - val buildInfoChanged = this.buildInfo.update(newBuildInfo) - val newCapabilities = RemoteCapabilities.extract(conf) - val upstreamStatusChanged = (upstreamStatus.update(conf) || (newCapabilities != capabilities)).also { - capabilities = newCapabilities - } - conf.status?.let { status -> onStatus(status) } - val subsChanged = (conf.supportedSubscriptionsList != subscriptionTopics).also { - subscriptionTopics = conf.supportedSubscriptionsList - } - return buildInfoChanged || upstreamStatusChanged || subsChanged - } - - override fun getQuorumByLabel(): QuorumForLabels { - return upstreamStatus.getNodes() - } - - // ------------------------------------------------------------------------------------------ - - override fun getLabels(): Collection { - return upstreamStatus.getLabels() - } - - override fun getIngressSubscription(): EthereumIngressSubscription { - return ethereumSubscriptions - } - - override fun getMethods(): CallMethods { - return upstreamStatus.getCallMethods() - } - - override fun isAvailable(): Boolean { - return super.isAvailable() && grpcHead.getCurrent() != null && getQuorumByLabel().getAll().any { - it.quorum > 0 - } - } - - override fun getHead(): Head { - return grpcHead - } - - override fun getIngressReader(): JsonRpcReader { - return defaultReader - } - - @Suppress("UNCHECKED_CAST") - override fun cast(selfType: Class): T { - if (!selfType.isAssignableFrom(this.javaClass)) { - throw ClassCastException("Cannot cast ${this.javaClass} to $selfType") - } - return this as T - } - - override fun getCapabilities(): Set { - return capabilities - } - - override fun isGrpc(): Boolean { - return true - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/EthereumPosGrpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstream.kt similarity index 92% rename from src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/EthereumPosGrpcUpstream.kt rename to src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstream.kt index f5710a58e..56f291836 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/EthereumPosGrpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstream.kt @@ -28,13 +28,11 @@ import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.startup.QuorumForLabels import io.emeraldpay.dshackle.upstream.BuildInfo import io.emeraldpay.dshackle.upstream.Capability +import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.Lifecycle import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.calls.CallMethods -import io.emeraldpay.dshackle.upstream.ethereum.EthereumIngressSubscription -import io.emeraldpay.dshackle.upstream.ethereum.EthereumLikeUpstream -import io.emeraldpay.dshackle.upstream.ethereum.subscribe.EthereumDshackleIngressSubscription import io.emeraldpay.dshackle.upstream.forkchoice.NoChoiceWithPriorityForkChoice import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcGrpcClient import io.emeraldpay.etherjar.domain.BlockHash @@ -45,7 +43,7 @@ import java.time.Instant import java.util.Locale import java.util.function.Function -open class EthereumPosGrpcUpstream( +open class GenericGrpcUpstream( parentId: String, hash: Byte, role: UpstreamsConfig.UpstreamRole, @@ -56,7 +54,7 @@ open class EthereumPosGrpcUpstream( overrideLabels: UpstreamsConfig.Labels?, chainConfig: ChainsConfig.ChainConfig, headScheduler: Scheduler, -) : EthereumLikeUpstream( +) : DefaultUpstream( "${parentId}_${chain.chainCode.lowercase(Locale.getDefault())}", hash, ChainOptions.PartialOptions.getDefaults().buildOptions(), @@ -103,7 +101,8 @@ open class EthereumPosGrpcUpstream( private val buildInfo: BuildInfo = BuildInfo() private val defaultReader: JsonRpcReader = client.getReader() - private val ethereumSubscriptions = EthereumDshackleIngressSubscription(chain, remote) + + // private val ethereumSubscriptions = EthereumDshackleIngressSubscription(chain, remote) private var subscriptionTopics = listOf() override fun start() { @@ -155,10 +154,6 @@ open class EthereumPosGrpcUpstream( return upstreamStatus.getLabels() } - override fun getIngressSubscription(): EthereumIngressSubscription { - return ethereumSubscriptions - } - override fun getMethods(): CallMethods { return upstreamStatus.getCallMethods() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt index 83e8f1b7f..1cf32a483 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt @@ -25,6 +25,7 @@ import io.emeraldpay.api.proto.Common.ChainRef.UNRECOGNIZED import io.emeraldpay.api.proto.ReactorAuthGrpc import io.emeraldpay.api.proto.ReactorBlockchainGrpc import io.emeraldpay.dshackle.BlockchainType +import io.emeraldpay.dshackle.BlockchainType.BITCOIN import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.FileResolver @@ -34,7 +35,6 @@ import io.emeraldpay.dshackle.config.ChainsConfig import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.startup.UpstreamChangeEvent import io.emeraldpay.dshackle.upstream.DefaultUpstream -import io.emeraldpay.dshackle.upstream.Lifecycle import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.grpc.auth.AuthException import io.emeraldpay.dshackle.upstream.grpc.auth.ClientAuthenticationInterceptor @@ -246,42 +246,37 @@ class GrpcUpstreams( return sslContext.build() } - private val creators: Map DefaultUpstream> = mapOf( - BlockchainType.EVM_POW to { chain, rpcClient -> - EthereumGrpcUpstream( - id, - hash, - role, - chain, - client, - rpcClient, - labels, - chainsConfig.resolve(chain.chainName), - headScheduler, - ) - }, - BlockchainType.EVM_POS to { chain, rpcClient -> - EthereumPosGrpcUpstream( - id, - hash, - role, - chain, - client, - rpcClient, - nodeRating, - labels, - chainsConfig.resolve(chain.chainName), - headScheduler, - ) - }, - BlockchainType.BITCOIN to { chain, rpcClient -> - BitcoinGrpcUpstream(id, role, chain, client, rpcClient, labels, chainsConfig.resolve(chain.chainCode), headScheduler) - }, - ) - private fun getOrCreate(chain: Chain): UpstreamChangeEvent { val metrics = makeMetrics(chain) - val creator = creators.getValue(BlockchainType.from(chain)) + val creator = if (BlockchainType.from(chain) != BITCOIN) { + { ch: Chain, rpcClient: JsonRpcGrpcClient -> + GenericGrpcUpstream( + id, + hash, + role, + ch, + client, + rpcClient, + nodeRating, + labels, + chainsConfig.resolve(chain.chainName), + headScheduler, + ) + } + } else { + { ch: Chain, rpcClient: JsonRpcGrpcClient -> + BitcoinGrpcUpstream( + id, + role, + chain, + client, + rpcClient, + labels, + chainsConfig.resolve(chain.chainCode), + headScheduler, + ) + } + } return getOrCreate(chain, metrics, creator) } @@ -315,7 +310,7 @@ class GrpcUpstreams( val rpcClient = JsonRpcGrpcClient(client, chain, metrics) val created = creator(chain, rpcClient) known[chain] = created - if (created is Lifecycle) created.start() + created.start() UpstreamChangeEvent(chain, created, UpstreamChangeEvent.ChangeType.ADDED) } else { UpstreamChangeEvent(chain, current, UpstreamChangeEvent.ChangeType.REVALIDATED) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt index 45679d69a..96ddafe08 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt @@ -2,12 +2,32 @@ package io.emeraldpay.dshackle.upstream.starknet import com.fasterxml.jackson.annotation.JsonIgnoreProperties import com.fasterxml.jackson.annotation.JsonProperty +import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Global +import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.data.BlockId +import io.emeraldpay.dshackle.foundation.ChainOptions.Options +import io.emeraldpay.dshackle.reader.JsonRpcReader +import io.emeraldpay.dshackle.upstream.CachingReader +import io.emeraldpay.dshackle.upstream.EgressSubscription +import io.emeraldpay.dshackle.upstream.EmptyEgressSubscription +import io.emeraldpay.dshackle.upstream.Head +import io.emeraldpay.dshackle.upstream.LabelsDetector +import io.emeraldpay.dshackle.upstream.Multistream +import io.emeraldpay.dshackle.upstream.NoopCachingReader +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.UpstreamValidator +import io.emeraldpay.dshackle.upstream.calls.CallMethods +import io.emeraldpay.dshackle.upstream.generic.CachingReaderBuilder import io.emeraldpay.dshackle.upstream.generic.ChainSpecific +import io.emeraldpay.dshackle.upstream.generic.GenericUpstream +import io.emeraldpay.dshackle.upstream.generic.LocalReader import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse +import org.springframework.cloud.sleuth.Tracer +import reactor.core.publisher.Mono +import reactor.core.scheduler.Scheduler import java.math.BigInteger import java.time.Instant @@ -30,7 +50,41 @@ object StarknetChainSpecific : ChainSpecific { ) } - override fun latestBlockRequest(): JsonRpcRequest = JsonRpcRequest("starknet_getBlockWithTxHashes", listOf("latest")) + override fun latestBlockRequest(): JsonRpcRequest = + JsonRpcRequest("starknet_getBlockWithTxHashes", listOf("latest")) + + override fun localReaderBuilder( + cachingReader: CachingReader, + methods: CallMethods, + head: Head, + ): Mono { + return Mono.just(LocalReader(methods)) + } + + override fun subscriptionBuilder(headScheduler: Scheduler): (Multistream) -> EgressSubscription { + return { _ -> EmptyEgressSubscription } + } + + override fun makeCachingReaderBuilder(tracer: Tracer): CachingReaderBuilder { + return { _, _, _ -> NoopCachingReader } + } + + override fun validator( + chain: Chain, + upstream: Upstream, + options: Options, + config: ChainConfig, + ): UpstreamValidator? { + return null + } + + override fun labelDetector(chain: Chain, reader: JsonRpcReader): LabelsDetector? { + return null + } + + override fun subscriptionTopics(upstream: GenericUpstream): List { + return emptyList() + } } @JsonIgnoreProperties(ignoreUnknown = true) diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/MultistreamSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/MultistreamSpec.groovy index 09328b419..f476d84a0 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/MultistreamSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/MultistreamSpec.groovy @@ -31,7 +31,7 @@ import io.emeraldpay.dshackle.upstream.calls.DirectCallMethods import io.emeraldpay.dshackle.upstream.ethereum.EthereumLikeRpcUpstream import io.emeraldpay.dshackle.upstream.ethereum.EthereumPosMultiStream import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson -import io.emeraldpay.dshackle.upstream.grpc.EthereumPosGrpcUpstream +import io.emeraldpay.dshackle.upstream.grpc.GenericGrpcUpstream import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse import io.emeraldpay.etherjar.domain.BlockHash @@ -205,13 +205,13 @@ class MultistreamSpec extends Specification { .setMethod("newHeads") .build() - def up1 = Mock(EthereumPosGrpcUpstream) { + def up1 = Mock(GenericGrpcUpstream) { 1 * isGrpc() >> true 1 * getId() >> "internal" 1 * getLabels() >> [UpstreamsConfig.Labels.fromMap(Collections.singletonMap("provider", "internal"))] 1 * proxySubscribe(call) >> Flux.just("{}") } - def up2 = Mock(EthereumPosGrpcUpstream) { + def up2 = Mock(GenericGrpcUpstream) { 1 * getId() >> "external" 1 * getLabels() >> [UpstreamsConfig.Labels.fromMap(Collections.singletonMap("provider", "external"))] } @@ -235,7 +235,7 @@ class MultistreamSpec extends Specification { .setMethod("newHeads") .build() - def up2 = Mock(EthereumPosGrpcUpstream) { + def up2 = Mock(GenericGrpcUpstream) { 1 * isGrpc() >> false 1 * getId() >> "2" 1 * getLabels() >> [UpstreamsConfig.Labels.fromMap(Collections.singletonMap("provider", "internal"))] diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/ERC20BalanceSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/ERC20BalanceSpec.groovy deleted file mode 100644 index 2c684a7d1..000000000 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/ERC20BalanceSpec.groovy +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Copyright (c) 2021 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.upstream.ethereum - -import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.test.EthereumPosRpcUpstreamMock -import io.emeraldpay.dshackle.test.ReaderMock -import io.emeraldpay.dshackle.upstream.ApiSource -import io.emeraldpay.dshackle.upstream.FilteredApis -import io.emeraldpay.dshackle.upstream.Selector -import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest -import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse -import io.emeraldpay.etherjar.domain.Address -import io.emeraldpay.etherjar.erc20.ERC20Token -import io.emeraldpay.etherjar.hex.HexData -import io.emeraldpay.etherjar.rpc.json.TransactionCallJson -import spock.lang.Specification - -import java.time.Duration - -class ERC20BalanceSpec extends Specification { - - def "Gets balance from upstream"() { - setup: - ReaderMock api = new ReaderMock() - .with( - new JsonRpcRequest("eth_call", [ - new TransactionCallJson().tap { json -> - json.setTo(Address.from("0x54EedeAC495271d0F6B175474E89094C44Da98b9")) - json.setData(HexData.from("0x70a0823100000000000000000000000016c15c65ad00b6dfbcc2cb8a7b6c2d0103a3883b")) - }, - "latest" - ]), - JsonRpcResponse.ok('"0x0000000000000000000000000000000000000000000000000000001f28d72868"') - ) - - EthereumLikeRpcUpstream upstream = new EthereumPosRpcUpstreamMock(Chain.ETHEREUM__MAINNET, api) - ERC20Token token = new ERC20Token(Address.from("0x54EedeAC495271d0F6B175474E89094C44Da98b9")) - ERC20Balance query = new ERC20Balance() - - when: - def act = query.getBalance(upstream, token, Address.from("0x16c15c65ad00b6dfbcc2cb8a7b6c2d0103a3883b")) - .block(Duration.ofSeconds(1)) - - then: - act.toLong() == 0x1f28d72868 - } - - def "Gets balance from api source"() { - setup: - ReaderMock api = new ReaderMock() - .with( - new JsonRpcRequest("eth_call", [ - new TransactionCallJson().tap { json -> - json.setTo(Address.from("0x54EedeAC495271d0F6B175474E89094C44Da98b9")) - json.setData(HexData.from("0x70a0823100000000000000000000000016c15c65ad00b6dfbcc2cb8a7b6c2d0103a3883b")) - }, - "latest" - ]), - JsonRpcResponse.ok('"0x0000000000000000000000000000000000000000000000000000001f28d72868"') - ) - - EthereumLikeRpcUpstream upstream = new EthereumPosRpcUpstreamMock(Chain.ETHEREUM__MAINNET, api) - ERC20Token token = new ERC20Token(Address.from("0x54EedeAC495271d0F6B175474E89094C44Da98b9")) - ERC20Balance query = new ERC20Balance() - - ApiSource apiSource = new FilteredApis( - Chain.ETHEREUM__MAINNET, [upstream], Selector.empty - ) - - when: - def act = query.getBalance(apiSource, token, Address.from("0x16c15c65ad00b6dfbcc2cb8a7b6c2d0103a3883b")) - .block(Duration.ofSeconds(1)) - - then: - act.toLong() == 0x1f28d72868 - } -} diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumPriorityFeesSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumPriorityFeesSpec.groovy deleted file mode 100644 index 5deb8f7d5..000000000 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumPriorityFeesSpec.groovy +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Copyright (c) 2021 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.upstream.ethereum - -import io.emeraldpay.dshackle.reader.Reader -import io.emeraldpay.dshackle.upstream.ChainFees -import io.emeraldpay.dshackle.upstream.Head -import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson -import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionJsonSnapshot -import io.emeraldpay.etherjar.domain.TransactionId -import io.emeraldpay.etherjar.domain.Wei -import io.emeraldpay.etherjar.rpc.json.TransactionRefJson -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import spock.lang.Specification - -import java.time.Duration - -class EthereumPriorityFeesSpec extends Specification { - - def "Extract fee from EIP1559 tx"() { - setup: - // 13756007 - def block = new BlockJson().tap { - it.baseFeePerGas = new Wei(104197355513) - } - // 0x5da50f35a51e56ecd4313417b1c30f9c088222f3f8763701effe14f3dd18b6cc - def tx = new TransactionJsonSnapshot().tap { - it.type = 2 - it.maxFeePerGas = Wei.ofUnits(999, Wei.Unit.GWEI) - it.maxPriorityFeePerGas = Wei.ofUnits(5.0001, Wei.Unit.GWEI) - } - - def fees = new EthereumPriorityFees(Stub(EthereumMultistream), Stub(EthereumCachingReader), 10) - when: - def act = fees.extractFee(block, tx) - then: - act.priority == Wei.ofUnits(5.0001, Wei.Unit.GWEI) - act.max == Wei.ofUnits(999, Wei.Unit.GWEI) - act.base == new Wei(104197355513) - } - - def "Extract fee from legacy tx"() { - setup: - // 13756007 - def block = new BlockJson().tap { - it.baseFeePerGas = new Wei(104197355513) - } - // 0x1f507982bef0f11a8304287d41f228b5f1dda1114a446ee781c3d95ef4a7b891 - def tx = new TransactionJsonSnapshot().tap { - it.type = 0 - // 109.564020111 Gwei - it.gasPrice = Wei.from("0x198286458f") - } - - def fees = new EthereumPriorityFees(Stub(EthereumMultistream), Stub(EthereumCachingReader), 10) - when: - def act = fees.extractFee(block, tx) - then: - // as difference between base minimum and actually paid - act.priority == Wei.ofUnits(5.366664598, Wei.Unit.GWEI) - act.max == Wei.from("0x198286458f") - act.base == new Wei(104197355513) - } - - def "Calculates average fee"() { - setup: - def inputs = [ - new EthereumFees.EthereumFee(Wei.ofEthers(1), Wei.ofEthers(0.5), Wei.ofEthers(0.75), Wei.ofEthers(0.5)), - new EthereumFees.EthereumFee(Wei.ofEthers(0.75), Wei.ofEthers(0.5), Wei.ofEthers(0.75), Wei.ofEthers(0.5)), - new EthereumFees.EthereumFee(Wei.ofEthers(0.6), Wei.ofEthers(0.2), Wei.ofEthers(0.6), Wei.ofEthers(0.5)), - ] - def fees = new EthereumPriorityFees(Stub(EthereumMultistream), Stub(EthereumCachingReader), 10) - when: - def act = Flux.fromIterable(inputs) - .transform(fees.feeAggregation(ChainFees.Mode.AVG_LAST)) - .next().block(Duration.ofSeconds(1)) - then: - act.priority == Wei.ofEthers(0.4) // 0.5 + 0.5 + 0.2 - act.paid == Wei.ofEthers(0.7) // 0.75 + 0.75 + 0.6 - } - - def "Estimate"() { - setup: - def block1 = new BlockJson().tap { - it.baseFeePerGas = new Wei(92633661632) - it.transactions = [ - new TransactionRefJson(TransactionId.from("0x00000000fad596cad644b785a8a74f6580ceec9ae13c8aa174f819c0223b8c77")), - new TransactionRefJson(TransactionId.from("0x11111111fad596cad644b785a8a74f6580ceec9ae13c8aa174f819c0223b8c77")), - new TransactionRefJson(TransactionId.from("0x22222222fad596cad644b785a8a74f6580ceec9ae13c8aa174f819c0223b8c77")), - ] - } - def block2 = new BlockJson().tap { - it.baseFeePerGas = new Wei(104197355513) - it.transactions = [ - new TransactionRefJson(TransactionId.from("0x33333333fad596cad644b785a8a74f6580ceec9ae13c8aa174f819c0223b8c77")), - new TransactionRefJson(TransactionId.from("0x44444444fad596cad644b785a8a74f6580ceec9ae13c8aa174f819c0223b8c77")), - new TransactionRefJson(TransactionId.from("0x55555555fad596cad644b785a8a74f6580ceec9ae13c8aa174f819c0223b8c77")), - ] - } - def tx1 = new TransactionJsonSnapshot().tap { - it.type = 2 - it.maxFeePerGas = Wei.ofUnits(150, Wei.Unit.GWEI) - it.maxPriorityFeePerGas = Wei.ofUnits(3, Wei.Unit.GWEI) - } - def tx2 = new TransactionJsonSnapshot().tap { - it.type = 2 - it.maxFeePerGas = Wei.ofUnits(200, Wei.Unit.GWEI) - it.maxPriorityFeePerGas = Wei.ofUnits(6, Wei.Unit.GWEI) - } - - def ups = Mock(EthereumMultistream) { - 1 * getHead() >> Mock(Head) { - 1 * getCurrentHeight() >> 13756007 - } - } - def reader = Mock(EthereumCachingReader) { - _ * it.blocksByHeightParsed() >> Mock(Reader) { - 1 * it.read(13756006) >> Mono.just(block1) - 1 * it.read(13756007) >> Mono.just(block2) - } - _ * it.txByHash() >> Mock(Reader) { - 1 * it.read(TransactionId.from("0x22222222fad596cad644b785a8a74f6580ceec9ae13c8aa174f819c0223b8c77")) >> Mono.just(tx1) - 1 * it.read(TransactionId.from("0x55555555fad596cad644b785a8a74f6580ceec9ae13c8aa174f819c0223b8c77")) >> Mono.just(tx2) - } - } - def fees = new EthereumPriorityFees(ups, reader, 10) - when: - def act = fees.estimate(ChainFees.Mode.AVG_LAST, 2).block(Duration.ofSeconds(1)) - - then: - act.hasEthereumExtended() - act.ethereumExtended.priority == "4500000000" - act.ethereumExtended.max == "175000000000" - act.ethereumExtended.expect == "102915508572" - } -} diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstreamSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstreamSpec.groovy deleted file mode 100644 index c181fbe16..000000000 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstreamSpec.groovy +++ /dev/null @@ -1,264 +0,0 @@ -/** - * Copyright (c) 2019 ETCDEV GmbH - * Copyright (c) 2020 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.upstream.grpc - -import com.fasterxml.jackson.databind.ObjectMapper -import com.google.protobuf.ByteString -import io.emeraldpay.api.proto.BlockchainGrpc -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.api.proto.Common -import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.Global -import io.emeraldpay.dshackle.config.ChainsConfig -import io.emeraldpay.dshackle.config.UpstreamsConfig -import io.emeraldpay.dshackle.data.BlockId -import io.emeraldpay.dshackle.test.MockGrpcServer -import io.emeraldpay.dshackle.test.TestingCommons -import io.emeraldpay.dshackle.upstream.BuildInfo -import io.emeraldpay.dshackle.upstream.UpstreamAvailability -import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson -import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcGrpcClient -import io.emeraldpay.dshackle.upstream.rpcclient.RpcMetrics -import io.emeraldpay.etherjar.domain.BlockHash -import io.grpc.stub.StreamObserver -import io.micrometer.core.instrument.Counter -import io.micrometer.core.instrument.Timer -import reactor.core.scheduler.Schedulers -import spock.lang.Specification - -import java.time.Duration -import java.time.Instant -import java.util.concurrent.CompletableFuture - -class EthereumGrpcUpstreamSpec extends Specification { - - MockGrpcServer mockServer = new MockGrpcServer() - ObjectMapper objectMapper = Global.objectMapper - RpcMetrics metrics = new RpcMetrics( - Timer.builder("test1").register(TestingCommons.meterRegistry), - Counter.builder("test2").register(TestingCommons.meterRegistry) - ) - BlockHash parent = BlockHash.from("0x50d26e119968e791970d84a7bf5d0ec474d3ec2ef85d5ec8915210ac6bc09ad7") - - def hash = (byte)123 - def buildInfo = new BuildInfo("v0.0.1-test") - - def "Subscribe to head"() { - setup: - def callData = [:] - def chain = Chain.ETHEREUM__MAINNET - def api = TestingCommons.api() - def block1 = new BlockJson().with { - it.number = 650246 - it.hash = BlockHash.from("0x50d26e119968e791970d84a7bf5d0ec474d3ec2ef85d5ec8915210ac6bc09ad7") - it.totalDifficulty = new BigInteger("35bbde5595de6456", 16) - it.timestamp = Instant.now() - it.parentHash = parent - return it - } - api.answer("eth_getBlockByHash", [block1.hash.toHex(), false], block1) - def client = mockServer.clientForServer(new BlockchainGrpc.BlockchainImplBase() { - @Override - void nativeCall(BlockchainOuterClass.NativeCallRequest request, StreamObserver responseObserver) { - api.nativeCall(request, responseObserver) - } - - @Override - void subscribeHead(Common.Chain request, StreamObserver responseObserver) { - callData.chain = request.getTypeValue() - responseObserver.onNext( - BlockchainOuterClass.ChainHead.newBuilder() - .setBlockId(block1.hash.toHex().substring(2)) - .setHeight(block1.number) - .setParentBlockId(parent.toHex().substring(2)) - .setWeight(ByteString.copyFrom(block1.totalDifficulty.toByteArray())) - .build() - ) - } - }) - def upstream = new EthereumGrpcUpstream("test", hash, UpstreamsConfig.UpstreamRole.PRIMARY, chain, client, new JsonRpcGrpcClient(client, chain, metrics), null, ChainsConfig.ChainConfig.default(), Schedulers.boundedElastic()) - upstream.setLag(0) - upstream.update( - BlockchainOuterClass.DescribeChain.newBuilder() - .setStatus(BlockchainOuterClass.ChainStatus.newBuilder().setQuorum(1).setAvailabilityValue(UpstreamAvailability.OK.grpcId)) - .addAllSupportedMethods(["eth_getBlockByHash"]) - .build(), - BlockchainOuterClass.BuildInfo.newBuilder() - .setVersion(buildInfo.version) - .build(), - ) - when: - new Thread({ Thread.sleep(50); upstream.head.start() }).start() - def h = upstream.head.getFlux().next().block(Duration.ofSeconds(1)) - then: - callData.chain == Chain.ETHEREUM__MAINNET.id - upstream.status == UpstreamAvailability.OK - upstream.getBuildInfo() == buildInfo - h.hash == BlockId.from("0x50d26e119968e791970d84a7bf5d0ec474d3ec2ef85d5ec8915210ac6bc09ad7") - } - - def "Follows difficulty, ignores less difficult"() { - setup: - def api = TestingCommons.api() - def block1 = new BlockJson().with { - it.number = 650246 - it.hash = BlockHash.from("0x50d26e119968e791970d84a7bf5d0ec474d3ec2ef85d5ec8915210ac6bc09ad7") - it.totalDifficulty = new BigInteger("35bbde5595de6456", 16) - it.timestamp = Instant.now() - it.parentHash = parent - return it - } - def block2 = new BlockJson().with { - it.number = 650247 - it.hash = BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec891521a") - it.totalDifficulty = new BigInteger("35bbde5595de6455", 16) - it.timestamp = Instant.now() - it.parentHash = parent - return it - } - api.answer("eth_getBlockByHash", [block1.hash.toHex(), false], block1) - api.answer("eth_getBlockByHash", [block2.hash.toHex(), false], block2) - def client = mockServer.clientForServer(new BlockchainGrpc.BlockchainImplBase() { - @Override - void nativeCall(BlockchainOuterClass.NativeCallRequest request, StreamObserver responseObserver) { - api.nativeCall(request, responseObserver) - } - - @Override - void subscribeHead(Common.Chain request, StreamObserver responseObserver) { - new Thread({ - responseObserver.onNext( - BlockchainOuterClass.ChainHead.newBuilder() - .setBlockId(block1.hash.toHex().substring(2)) - .setHeight(block1.number) - .setParentBlockId(parent.toHex().substring(2)) - .setWeight(ByteString.copyFrom(block1.totalDifficulty.toByteArray())) - .build() - ) - Thread.sleep(100) - responseObserver.onNext( - BlockchainOuterClass.ChainHead.newBuilder() - .setBlockId(block2.hash.toHex().substring(2)) - .setHeight(block2.number) - .setParentBlockId(parent.toHex().substring(2)) - .setWeight(ByteString.copyFrom(block2.totalDifficulty.toByteArray())) - .build() - ) - }).start() - } - }) - def upstream = new EthereumGrpcUpstream("test", hash, UpstreamsConfig.UpstreamRole.PRIMARY, Chain.ETHEREUM__MAINNET, client, new JsonRpcGrpcClient(client, Chain.ETHEREUM__MAINNET, metrics), null, ChainsConfig.ChainConfig.default(), Schedulers.boundedElastic()) - upstream.setLag(0) - upstream.update( - BlockchainOuterClass.DescribeChain.newBuilder() - .setStatus(BlockchainOuterClass.ChainStatus.newBuilder().setQuorum(1).setAvailabilityValue(UpstreamAvailability.OK.grpcId)) - .addAllSupportedMethods(["eth_getBlockByHash"]) - .build(), - BlockchainOuterClass.BuildInfo.newBuilder() - .setVersion(buildInfo.version) - .build(), - ) - when: - new Thread({ Thread.sleep(50); upstream.head.start() }).start() - def h = upstream.head.getFlux().take(Duration.ofSeconds(1)).last().block(Duration.ofSeconds(2)) - then: - upstream.status == UpstreamAvailability.OK - upstream.getBuildInfo() == buildInfo - h.hash == BlockId.from("0x50d26e119968e791970d84a7bf5d0ec474d3ec2ef85d5ec8915210ac6bc09ad7") - h.height == 650246 - } - - def "Follows difficulty"() { - setup: - def callData = [:] - def finished = new CompletableFuture() - def chain = Chain.ETHEREUM__MAINNET - def api = TestingCommons.api() - def block1 = new BlockJson().with { - it.number = 650246 - it.hash = BlockHash.from("0x50d26e119968e791970d84a7bf5d0ec474d3ec2ef85d5ec8915210ac6bc09ad7") - it.totalDifficulty = new BigInteger("35bbde5595de6456", 16) - it.timestamp = Instant.now() - it.parentHash = parent - return it - } - def block2 = new BlockJson().with { - it.number = 650247 - it.hash = BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec891521a") - it.totalDifficulty = new BigInteger("35bbde5595de6457", 16) - it.timestamp = Instant.now() - it.parentHash = parent - return it - } - api.answer("eth_getBlockByHash", [block1.hash.toHex(), false], block1) - api.answer("eth_getBlockByHash", [block2.hash.toHex(), false], block2) - def client = mockServer.clientForServer(new BlockchainGrpc.BlockchainImplBase() { - @Override - void nativeCall(BlockchainOuterClass.NativeCallRequest request, StreamObserver responseObserver) { - api.nativeCall(request, responseObserver) - } - - @Override - void subscribeHead(Common.Chain request, StreamObserver responseObserver) { - responseObserver.onNext( - BlockchainOuterClass.ChainHead.newBuilder() - .setBlockId(block1.hash.toHex().substring(2)) - .setHeight(block1.number) - .setParentBlockId(parent.toHex().substring(2)) - .setWeight(ByteString.copyFrom(block1.totalDifficulty.toByteArray())) - .build() - ) - responseObserver.onNext( - BlockchainOuterClass.ChainHead.newBuilder() - .setBlockId(block2.hash.toHex().substring(2)) - .setHeight(block2.number) - .setParentBlockId(parent.toHex().substring(2)) - .setWeight(ByteString.copyFrom(block2.totalDifficulty.toByteArray())) - .build() - ) - finished.complete(true) - } - }) - def upstream = new EthereumGrpcUpstream("test", hash, UpstreamsConfig.UpstreamRole.PRIMARY, chain, client, new JsonRpcGrpcClient(client, chain, metrics), null, ChainsConfig.ChainConfig.default(), Schedulers.boundedElastic()) - upstream.setLag(0) - upstream.update( - BlockchainOuterClass.DescribeChain.newBuilder() - .setStatus(BlockchainOuterClass.ChainStatus.newBuilder().setQuorum(1).setAvailabilityValue(UpstreamAvailability.OK.grpcId)) - .addAllSupportedMethods(["eth_getBlockByHash"]) - .build(), - BlockchainOuterClass.BuildInfo.newBuilder() - .setVersion(buildInfo.version) - .build(), - ) - when: - new Thread({ Thread.sleep(50); upstream.head.start() }).start() - finished.get() - def h = upstream.head.getFlux().take(Duration.ofSeconds(1)).last().block(Duration.ofSeconds(2)) - then: - upstream.status == UpstreamAvailability.OK - upstream.getBuildInfo() == buildInfo - h.hash == BlockId.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec891521a") - h.height == 650247 - } - - private BlockchainOuterClass.DescribeChain describe(List methods) { - return BlockchainOuterClass.DescribeChain.newBuilder() - .setStatus(BlockchainOuterClass.ChainStatus.newBuilder().setQuorum(1).setAvailabilityValue(UpstreamAvailability.OK.grpcId)) - .addAllSupportedMethods(methods) - .build() - } -}