From 66b9b004542b6d783f18bff0e0d9f624fe6256d0 Mon Sep 17 00:00:00 2001 From: a10zn8 Date: Thu, 2 Nov 2023 16:18:54 +0300 Subject: [PATCH] add base support of polkadot/substrate/vara chain (#332) --- .../kotlin/chainsconfig.codegen.gradle.kts | 24 +++- emerald-grpc | 2 +- .../io/emeraldpay/dshackle/BlockchainType.kt | 5 + .../org/drpc/chainsconfig/ChainsConfig.kt | 2 + .../drpc/chainsconfig/ChainsConfigReader.kt | 9 ++ foundation/src/main/resources/chains.yaml | 23 ++++ .../test/resources/configs/chains-basic.yaml | 1 + .../io/emeraldpay/dshackle/BlockchainType.kt | 22 --- .../dshackle/config/TokensConfig.kt | 2 +- .../config/context/MultistreamsConfig.kt | 3 +- .../monitoring/accesslog/AccessHandlerGrpc.kt | 47 ------- .../monitoring/accesslog/EventsBuilder.kt | 94 ------------- .../io/emeraldpay/dshackle/rpc/NativeCall.kt | 5 +- .../dshackle/rpc/NativeSubscribe.kt | 2 +- .../io/emeraldpay/dshackle/rpc/StreamHead.kt | 2 +- .../dshackle/startup/ConfiguredUpstreams.kt | 3 +- .../dshackle/upstream/CallTargetsHolder.kt | 17 ++- .../upstream/bitcoin/RemoteUnspentReader.kt | 27 +--- .../upstream/calls/DefaultPolkadotMethods.kt | 130 ++++++++++++++++++ .../ethereum/EthereumChainSpecific.kt | 11 +- .../{EthereumWsHead.kt => GenericWsHead.kt} | 66 ++------- .../upstream/ethereum/WsSubscriptions.kt | 3 +- .../upstream/ethereum/WsSubscriptionsImpl.kt | 9 +- .../subscribe/WebsocketPendingTxes.kt | 3 +- .../upstream/generic/ChainSpecific.kt | 22 ++- .../dshackle/upstream/generic/GenericHead.kt | 4 +- .../upstream/generic/GenericUpstream.kt | 2 +- .../generic/connectors/ConnectorFactory.kt | 1 - .../connectors/GenericConnectorFactory.kt | 3 - .../generic/connectors/GenericRpcConnector.kt | 9 +- .../generic/connectors/GenericWsConnector.kt | 8 +- .../dshackle/upstream/grpc/GrpcUpstreams.kt | 3 +- .../polkadot/PolkadotChainSpecific.kt | 122 ++++++++++++++++ .../starknet/StarknetChainSpecific.kt | 16 ++- .../EventsBuilderSubscribeBalanceSpec.groovy | 83 ----------- .../dshackle/test/ConnectorFactoryMock.groovy | 2 +- .../test/MultistreamHolderMock.groovy | 4 +- ...adSpec.groovy => GenericWsHeadSpec.groovy} | 44 +++--- .../ethereum/WsSubscriptionsImplSpec.groovy | 4 +- .../subscribe/WebsocketPendingTxesSpec.groovy | 3 +- .../polkadot/PolkadotChainSpecificTest.kt | 44 ++++++ .../starknet/StarknetChainSpecificTest.kt | 3 +- 42 files changed, 464 insertions(+), 425 deletions(-) create mode 100644 foundation/src/main/kotlin/io/emeraldpay/dshackle/BlockchainType.kt delete mode 100644 src/main/kotlin/io/emeraldpay/dshackle/BlockchainType.kt create mode 100644 src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DefaultPolkadotMethods.kt rename src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/{EthereumWsHead.kt => GenericWsHead.kt} (60%) create mode 100644 src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt delete mode 100644 src/test/groovy/io/emeraldpay/dshackle/monitoring/accesslog/EventsBuilderSubscribeBalanceSpec.groovy rename src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/{EthereumWsHeadSpec.groovy => GenericWsHeadSpec.groovy} (82%) create mode 100644 src/test/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecificTest.kt diff --git a/buildSrc/src/main/kotlin/chainsconfig.codegen.gradle.kts b/buildSrc/src/main/kotlin/chainsconfig.codegen.gradle.kts index 2edf5238b..bdb8aab79 100644 --- a/buildSrc/src/main/kotlin/chainsconfig.codegen.gradle.kts +++ b/buildSrc/src/main/kotlin/chainsconfig.codegen.gradle.kts @@ -1,5 +1,6 @@ import com.squareup.kotlinpoet.* import com.squareup.kotlinpoet.ParameterizedTypeName.Companion.parameterizedBy +import io.emeraldpay.dshackle.BlockchainType import io.emeraldpay.dshackle.config.ChainsConfig import io.emeraldpay.dshackle.config.ChainsConfigReader import io.emeraldpay.dshackle.foundation.ChainOptionsReader @@ -18,7 +19,7 @@ open class CodeGen(private val config: ChainsConfig) { builder.addEnumConstant( "UNSPECIFIED", TypeSpec.anonymousClassBuilder() - .addSuperclassConstructorParameter("%L, %S, %S, %S, %L, %L", 0, "UNSPECIFIED", "Unknown", "0x0", "BigInteger.ZERO", "emptyList()") + .addSuperclassConstructorParameter("%L, %S, %S, %S, %L, %L, %L", 0, "UNSPECIFIED", "Unknown", "0x0", "BigInteger.ZERO", "emptyList()", "BlockchainType.UNKNOWN") .build(), ) for (chain in config) { @@ -27,13 +28,14 @@ open class CodeGen(private val config: ChainsConfig) { .replace(' ', '_'), TypeSpec.anonymousClassBuilder() .addSuperclassConstructorParameter( - "%L, %S, %S, %S, %L, %L", + "%L, %S, %S, %S, %L, %L, %L", chain.grpcId, chain.code, chain.blockchain.replaceFirstChar { it.uppercase() } + " " + chain.id.replaceFirstChar { it.uppercase() }, chain.chainId, "BigInteger(\"" + chain.netVersion + "\")", "listOf(" + chain.shortNames.map { "\"${it}\"" }.joinToString() + ")", + type(chain.type) ) .build(), ) @@ -65,6 +67,7 @@ open class CodeGen(private val config: ChainsConfig) { .addParameter("chainId", String::class) .addParameter("netVersion", BigInteger::class) .addParameter("shortNames", List::class.asClassName().parameterizedBy(String::class.asClassName())) + .addParameter("type", BlockchainType::class) .build(), ) .addProperty( @@ -96,12 +99,27 @@ open class CodeGen(private val config: ChainsConfig) { PropertySpec.builder("shortNames", List::class.asClassName().parameterizedBy(String::class.asClassName())) .initializer("shortNames") .build(), - ), + ) + .addProperty( + PropertySpec.builder("type", BlockchainType::class) + .initializer("type") + .build(), + ) ).build() return FileSpec.builder("io.emeraldpay.dshackle", "Chain") .addType(chainType) .build() } + + private fun type(type: String): String { + return when(type) { + "eth" -> "BlockchainType.ETHEREUM" + "bitcoin" -> "BlockchainType.BITCOIN" + "starknet" -> "BlockchainType.STARKNET" + "polkadot" -> "BlockchainType.POLKADOT" + else -> throw IllegalArgumentException("unknown blockchain type $type") + } + } } open class ChainsCodeGenTask : DefaultTask() { diff --git a/emerald-grpc b/emerald-grpc index 931ec0ff1..3a98f870f 160000 --- a/emerald-grpc +++ b/emerald-grpc @@ -1 +1 @@ -Subproject commit 931ec0ff1e3049b40280c66154a3dcc4a51f2373 +Subproject commit 3a98f870ff6098c62297f59761514e1e3c0b7783 diff --git a/foundation/src/main/kotlin/io/emeraldpay/dshackle/BlockchainType.kt b/foundation/src/main/kotlin/io/emeraldpay/dshackle/BlockchainType.kt new file mode 100644 index 000000000..3882ffece --- /dev/null +++ b/foundation/src/main/kotlin/io/emeraldpay/dshackle/BlockchainType.kt @@ -0,0 +1,5 @@ +package io.emeraldpay.dshackle + +enum class BlockchainType { + UNKNOWN, BITCOIN, ETHEREUM, STARKNET, POLKADOT; +} diff --git a/foundation/src/main/kotlin/org/drpc/chainsconfig/ChainsConfig.kt b/foundation/src/main/kotlin/org/drpc/chainsconfig/ChainsConfig.kt index e043a2495..9cca90b0e 100644 --- a/foundation/src/main/kotlin/org/drpc/chainsconfig/ChainsConfig.kt +++ b/foundation/src/main/kotlin/org/drpc/chainsconfig/ChainsConfig.kt @@ -30,6 +30,7 @@ data class ChainsConfig(private val chains: List) : Iterable) : Iterable val blockchain = getValueAsString(protocol, "id") ?: throw IllegalArgumentException("Blockchain id is not defined") + val type = getValueAsString(protocol, "type") + ?: throw IllegalArgumentException("undefined type for $blockchain") val settings = mergeMappingNode(default, getMapping(protocol, "settings")) acc.plus( getList(protocol, "chains")?.let { chains -> @@ -38,6 +40,10 @@ class ChainsConfigReader( ScalarNode(Tag.STR, "blockchain", null, null, DumperOptions.ScalarStyle.LITERAL), ScalarNode(Tag.STR, blockchain, null, null, DumperOptions.ScalarStyle.LITERAL), ), + NodeTuple( + ScalarNode(Tag.STR, "type", null, null, DumperOptions.ScalarStyle.LITERAL), + ScalarNode(Tag.STR, type, null, null, DumperOptions.ScalarStyle.LITERAL), + ), ), chain.flowStyle, ), @@ -78,6 +84,8 @@ class ChainsConfigReader( val netVersion = getValueAsLong(node, "net-version")?.toBigInteger() ?: BigInteger(chainId.drop(2), 16) val shortNames = getListOfString(node, "short-names") ?: throw IllegalArgumentException("undefined shortnames for $blockchain") + val type = getValueAsString(node, "type") + ?: throw IllegalArgumentException("undefined type for $blockchain") return ChainsConfig.ChainConfig( expectedBlockTime = expectedBlockTime, syncingLagSize = lags.first, @@ -91,6 +99,7 @@ class ChainsConfigReader( shortNames = shortNames, id = id, blockchain = blockchain, + type = type ) } diff --git a/foundation/src/main/resources/chains.yaml b/foundation/src/main/resources/chains.yaml index 8b816c1c9..33fe1387d 100644 --- a/foundation/src/main/resources/chains.yaml +++ b/foundation/src/main/resources/chains.yaml @@ -641,3 +641,26 @@ chain-settings: short-names: [ astar-zkatana ] chain-id: 0x133e40 grpcId: 10035 + - id: vara + label: varanet + type: polkadot + settings: + expected-block-time: 3s + options: + validate-peers: false + lags: + syncing: 10 + lagging: 5 + chains: + - id: Mainnet + priority: 1 + code: VARA_MAINNET + short-names: [ vara ] + chain-id: 0x0 + grpcId: 1027 + - id: Testnet + priority: 1 + code: VARA_TESTMET + short-names: [ vara-testnet ] + chain-id: 0x0 + grpcId: 10036 diff --git a/foundation/src/test/resources/configs/chains-basic.yaml b/foundation/src/test/resources/configs/chains-basic.yaml index e40d9b232..8ba3e3af9 100644 --- a/foundation/src/test/resources/configs/chains-basic.yaml +++ b/foundation/src/test/resources/configs/chains-basic.yaml @@ -3,6 +3,7 @@ version: v1 chain-settings: protocols: - id: fantom + type: eth settings: expected-block-time: 10s options: diff --git a/src/main/kotlin/io/emeraldpay/dshackle/BlockchainType.kt b/src/main/kotlin/io/emeraldpay/dshackle/BlockchainType.kt deleted file mode 100644 index 8d3421a82..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/BlockchainType.kt +++ /dev/null @@ -1,22 +0,0 @@ -package io.emeraldpay.dshackle - -enum class BlockchainType { - BITCOIN, ETHEREUM, STARKNET; - - companion object { - val bitcoin = setOf(Chain.BITCOIN__MAINNET, Chain.BITCOIN__TESTNET) - - val starknet = setOf(Chain.STARKNET__MAINNET, Chain.STARKNET__TESTNET, Chain.STARKNET__TESTNET_2) - - @JvmStatic - fun from(chain: Chain): BlockchainType { - return if (bitcoin.contains(chain)) { - BITCOIN - } else if (starknet.contains(chain)) { - STARKNET - } else { - ETHEREUM - } - } - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/TokensConfig.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/TokensConfig.kt index 4e8faadea..f24b19d3a 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/TokensConfig.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/TokensConfig.kt @@ -41,7 +41,7 @@ class TokensConfig( type == null -> type address.isNullOrBlank() -> "address" blockchain != null && - (BlockchainType.from(blockchain!!) == BlockchainType.ETHEREUM) && + (blockchain!!.type == BlockchainType.ETHEREUM) && !Address.isValidAddress(address) -> "address" else -> null } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/context/MultistreamsConfig.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/context/MultistreamsConfig.kt index 98c77954b..b3d2afdd0 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/context/MultistreamsConfig.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/context/MultistreamsConfig.kt @@ -1,6 +1,5 @@ package io.emeraldpay.dshackle.config.context -import io.emeraldpay.dshackle.BlockchainType import io.emeraldpay.dshackle.BlockchainType.BITCOIN import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.cache.CachesFactory @@ -30,7 +29,7 @@ open class MultistreamsConfig(val beanFactory: ConfigurableListableBeanFactory) return Chain.entries .filterNot { it == Chain.UNSPECIFIED } .map { chain -> - if (BlockchainType.from(chain) == BITCOIN) { + if (chain.type == BITCOIN) { bitcoinMultistream(chain, cachesFactory, headScheduler) } else { genericMultistream(chain, cachesFactory, headScheduler, tracer) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/accesslog/AccessHandlerGrpc.kt b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/accesslog/AccessHandlerGrpc.kt index eef4618b2..3c6a75509 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/accesslog/AccessHandlerGrpc.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/accesslog/AccessHandlerGrpc.kt @@ -43,14 +43,10 @@ class AccessHandlerGrpc( ): ServerCall.Listener { return when (val method = call.methodDescriptor.bareMethodName) { "SubscribeHead" -> processSubscribeHead(call, headers, next) - "SubscribeBalance" -> processSubscribeBalance(call, headers, next, true) - "SubscribeTxStatus" -> processSubscribeTxStatus(call, headers, next) - "GetBalance" -> processSubscribeBalance(call, headers, next, false) "NativeCall" -> processNativeCall(call, headers, next) "NativeSubscribe" -> processNativeSubscribe(call, headers, next) "Describe" -> processDescribe(call, headers, next) "SubscribeStatus" -> processStatus(call, headers, next) - "EstimateFee" -> processEstimateFee(call, headers, next) else -> { log.warn("unsupported method `{}`", method) next.startCall(call, headers) @@ -90,35 +86,6 @@ class AccessHandlerGrpc( ) } - @Suppress("UNCHECKED_CAST") - private fun processSubscribeBalance( - call: ServerCall, - headers: Metadata, - next: ServerCallHandler, - subscribe: Boolean, - ): ServerCall.Listener { - return process( - call, - headers, - next, - EventsBuilder.SubscribeBalance(subscribe) as EventsBuilder.RequestReply<*, ReqT, RespT>, - ) - } - - @Suppress("UNCHECKED_CAST") - private fun processSubscribeTxStatus( - call: ServerCall, - headers: Metadata, - next: ServerCallHandler, - ): ServerCall.Listener { - return process( - call, - headers, - next, - EventsBuilder.TxStatus() as EventsBuilder.RequestReply<*, ReqT, RespT>, - ) - } - @Suppress("UNCHECKED_CAST") private fun processNativeCall( call: ServerCall, @@ -175,20 +142,6 @@ class AccessHandlerGrpc( ) } - @Suppress("UNCHECKED_CAST") - private fun processEstimateFee( - call: ServerCall, - headers: Metadata, - next: ServerCallHandler, - ): ServerCall.Listener { - return process( - call, - headers, - next, - EventsBuilder.EstimateFee() as EventsBuilder.RequestReply<*, ReqT, RespT>, - ) - } - open class StdCallListener>( val next: ServerCall.Listener, val builder: EB, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/accesslog/EventsBuilder.kt b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/accesslog/EventsBuilder.kt index 52216a97b..31e3f1b49 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/accesslog/EventsBuilder.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/accesslog/EventsBuilder.kt @@ -32,7 +32,6 @@ import java.net.InetAddress import java.net.InetSocketAddress import java.time.Duration import java.time.Instant -import java.util.Locale import java.util.UUID class EventsBuilder { @@ -244,69 +243,6 @@ class EventsBuilder { } } - class SubscribeBalance(val subscribe: Boolean) : - Base(), - RequestReply { - - private var index = 0 - private var balanceRequest: Events.BalanceRequest? = null - - override fun getT(): SubscribeBalance { - return this - } - - override fun onRequest(msg: BlockchainOuterClass.BalanceRequest) { - balanceRequest = Events.BalanceRequest( - msg.asset.code.uppercase(Locale.getDefault()), - msg.address.addrTypeCase.name, - ) - } - - override fun onReply(msg: BlockchainOuterClass.AddressBalance): Events.SubscribeBalance { - if (balanceRequest == null) { - throw IllegalStateException("Request is not initialized") - } - val addressBalance = Events.AddressBalance(msg.asset.code, msg.address.address) - val chain = Chain.byId(msg.asset.chain.number) - return Events.SubscribeBalance( - chain, - UUID.randomUUID(), - subscribe, - requestDetails, - balanceRequest!!, - addressBalance, - index++, - ) - } - } - - class TxStatus : - Base(), - RequestReply { - private var index = 0 - private var txStatusRequest: Events.TxStatusRequest? = null - - override fun onRequest(msg: BlockchainOuterClass.TxStatusRequest) { - this.txStatusRequest = Events.TxStatusRequest(msg.txId) - withChain(msg.chainValue) - } - - override fun onReply(msg: BlockchainOuterClass.TxStatus): Events.TxStatus { - return Events.TxStatus( - chain, - UUID.randomUUID(), - requestDetails, - txStatusRequest!!, - Events.TxStatusResponse(msg.confirmations), - index++, - ) - } - - override fun getT(): TxStatus { - return this - } - } - class NativeCall(private val startTs: Instant) : Base(), RequestReply { @@ -496,34 +432,4 @@ class EventsBuilder { ) } } - - class EstimateFee : - Base(), - RequestReply { - - private var mode: String = "UNKNOWN" - private var blocks: Int = 0 - - override fun getT(): EstimateFee { - return this - } - - override fun onRequest(msg: BlockchainOuterClass.EstimateFeeRequest) { - this.chain = Chain.byId(msg.chain.number) - this.mode = msg.mode.name - this.blocks = msg.blocks - } - - override fun onReply(msg: BlockchainOuterClass.EstimateFeeResponse): Events.EstimateFee { - return Events.EstimateFee( - blockchain = chain, - request = requestDetails, - id = UUID.randomUUID(), - estimateFee = Events.EstimateFeeDetails( - mode = mode, - blocks = blocks, - ), - ) - } - } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt index 3b3db8427..29eca620a 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt @@ -19,7 +19,6 @@ package io.emeraldpay.dshackle.rpc import com.fasterxml.jackson.databind.ObjectMapper import com.google.protobuf.ByteString import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.dshackle.BlockchainType import io.emeraldpay.dshackle.BlockchainType.ETHEREUM import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Global @@ -84,7 +83,7 @@ open class NativeCall( @EventListener fun onUpstreamChangeEvent(event: UpstreamChangeEvent) { multistreamHolder.getUpstream(event.chain).let { up -> - if (BlockchainType.from(up.chain) == ETHEREUM) { + if (up.chain.type == ETHEREUM) { ethereumCallSelectors.putIfAbsent( event.chain, EthereumCallSelector(up.caches), @@ -306,7 +305,7 @@ open class NativeCall( } // for ethereum the actual block needed for the call may be specified in the call parameters val callSpecificMatcher: Mono = - if (BlockchainType.from(upstream.chain) == ETHEREUM) { + if (upstream.chain.type == ETHEREUM) { ethereumCallSelectors[chain]?.getMatcher(method, params, upstream.getHead(), passthrough) } else { null diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeSubscribe.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeSubscribe.kt index 93eb16d3e..5657097e3 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeSubscribe.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeSubscribe.kt @@ -57,7 +57,7 @@ open class NativeSubscribe( fun start(request: BlockchainOuterClass.NativeSubscribeRequest): Publisher { val chain = Chain.byId(request.chainValue) - if (BlockchainType.from(chain) != BlockchainType.ETHEREUM) { + if (chain.type != BlockchainType.ETHEREUM) { return Mono.error(UnsupportedOperationException("Native subscribe is not supported for ${chain.chainCode}")) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/StreamHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/StreamHead.kt index a8a074372..a573b320c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/StreamHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/StreamHead.kt @@ -43,7 +43,7 @@ class StreamHead( .getFlux() .map { asProto(chain, it!!) } .onErrorContinue { t, _ -> - log.warn("Head subscription error: ${t.message}") + log.warn("Head subscription error", t) } } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt index 6fff9716e..c59e35427 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt @@ -18,7 +18,6 @@ package io.emeraldpay.dshackle.startup import brave.grpc.GrpcTracing import com.google.common.annotations.VisibleForTesting -import io.emeraldpay.dshackle.BlockchainType import io.emeraldpay.dshackle.BlockchainType.BITCOIN import io.emeraldpay.dshackle.BlockchainType.ETHEREUM import io.emeraldpay.dshackle.Chain @@ -135,7 +134,7 @@ open class ConfiguredUpstreams( .merge(defaultOptions[chain] ?: ChainOptions.PartialOptions.getDefaults()) .merge(up.options ?: ChainOptions.PartialOptions()) .buildOptions() - val upstream = when (BlockchainType.from(chain)) { + val upstream = when (chain.type) { BITCOIN -> { buildBitcoinUpstream( up.cast(BitcoinConnection::class.java), diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/CallTargetsHolder.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/CallTargetsHolder.kt index 10a8db830..de90454ec 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/CallTargetsHolder.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/CallTargetsHolder.kt @@ -1,10 +1,15 @@ package io.emeraldpay.dshackle.upstream -import io.emeraldpay.dshackle.BlockchainType +import io.emeraldpay.dshackle.BlockchainType.BITCOIN +import io.emeraldpay.dshackle.BlockchainType.ETHEREUM +import io.emeraldpay.dshackle.BlockchainType.POLKADOT +import io.emeraldpay.dshackle.BlockchainType.STARKNET +import io.emeraldpay.dshackle.BlockchainType.UNKNOWN import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.calls.DefaultBitcoinMethods import io.emeraldpay.dshackle.upstream.calls.DefaultEthereumMethods +import io.emeraldpay.dshackle.upstream.calls.DefaultPolkadotMethods import io.emeraldpay.dshackle.upstream.calls.DefaultStarknetMethods import org.springframework.stereotype.Component @@ -17,10 +22,12 @@ class CallTargetsHolder { } private fun setupDefaultMethods(chain: Chain): CallMethods { - val created = when (BlockchainType.from(chain)) { - BlockchainType.BITCOIN -> DefaultBitcoinMethods() - BlockchainType.ETHEREUM -> DefaultEthereumMethods(chain) - BlockchainType.STARKNET -> DefaultStarknetMethods(chain) + val created = when (chain.type) { + BITCOIN -> DefaultBitcoinMethods() + ETHEREUM -> DefaultEthereumMethods(chain) + STARKNET -> DefaultStarknetMethods(chain) + POLKADOT -> DefaultPolkadotMethods() + UNKNOWN -> throw IllegalArgumentException("unknown chain") } callTargets[chain] = created return created diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/RemoteUnspentReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/RemoteUnspentReader.kt index 609f847bf..696448c45 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/RemoteUnspentReader.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/RemoteUnspentReader.kt @@ -1,22 +1,15 @@ package io.emeraldpay.dshackle.upstream.bitcoin -import io.emeraldpay.api.proto.BlockchainOuterClass.BalanceRequest import io.emeraldpay.dshackle.upstream.Capability import io.emeraldpay.dshackle.upstream.Selector import io.emeraldpay.dshackle.upstream.bitcoin.data.SimpleUnspent -import io.emeraldpay.dshackle.upstream.grpc.BitcoinGrpcUpstream import org.bitcoinj.core.Address -import org.slf4j.LoggerFactory import reactor.core.publisher.Mono class RemoteUnspentReader( val upstreams: BitcoinMultistream, ) : UnspentReader { - companion object { - private val log = LoggerFactory.getLogger(RemoteUnspentReader::class.java) - } - private val selector = Selector.MultiMatcher( listOf( Selector.GrpcMatcher(), @@ -27,24 +20,6 @@ class RemoteUnspentReader( override fun read(key: Address): Mono> { val apis = upstreams.getApiSource(selector) apis.request(1) - return Mono.from(apis) - .map { up -> - up.cast(BitcoinGrpcUpstream::class.java).remote - } - .flatMapMany { - val request = BalanceRequest.newBuilder() - .build() - it.getBalance(request) - } - .map { resp -> - resp.utxoList.map { utxo -> - SimpleUnspent( - utxo.txId, - utxo.index.toInt(), - utxo.balance.toLong(), - ) - } - } - .reduce(List::plus) + return Mono.empty() } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DefaultPolkadotMethods.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DefaultPolkadotMethods.kt new file mode 100644 index 000000000..274843dd8 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DefaultPolkadotMethods.kt @@ -0,0 +1,130 @@ +/** + * Copyright (c) 2020 EmeraldPay, Inc + * Copyright (c) 2019 ETCDEV GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.emeraldpay.dshackle.upstream.calls + +import io.emeraldpay.dshackle.quorum.AlwaysQuorum +import io.emeraldpay.dshackle.quorum.BroadcastQuorum +import io.emeraldpay.dshackle.quorum.CallQuorum +import io.emeraldpay.etherjar.rpc.RpcException + +/** + * Default configuration for Ethereum based RPC. Defines optimal Quorum strategies for different methods, and provides + * hardcoded results for base methods, such as `net_version`, `web3_clientVersion` and similar + */ +class DefaultPolkadotMethods : CallMethods { + + private val all = setOf( + "author_pendingExtrinsics", + "author_removeExtrinsic", + "chain_getBlock", + "chain_getBlockHash", + "chain_getFinalisedHead", + "chain_getFinalizedHead", + "chain_getHead", + "chain_getHeader", + "chain_getRuntimeVersion", + "chain_subscribeAllHeads", + "chain_subscribeFinalizedHeads", + "chain_subscribeNewHeads", + "chain_subscribeRuntimeVersion", + "chain_unsubscribeAllHeads", + "chain_unsubscribeFinalisedHeads", + "chain_unsubscribeFinalizedHeads", + "chain_unsubscribeNewHead", + "chain_unsubscribeNewHeads", + "chain_unsubscribeRuntimeVersion", + "childstate_getKeys", + "childstate_getKeysPaged", + "childstate_getKeysPagedAt", + "childstate_getStorage", + "childstate_getStorageEntries", + "childstate_getStorageHash", + "childstate_getStorageSize", + "gear_calculateHandleGas", + "gear_calculateInitCreateGas", + "gear_calculateInitUploadGas", + "gear_calculateReplyGas", + "gear_readMetahash", + "gear_readState", + "gear_readStateBatch", + "gear_readStateUsingWasm", + "gear_readStateUsingWasmBatch", + "grandpa_proveFinality", + "grandpa_roundState", + "payment_queryFeeDetails", + "payment_queryInfo", + "state_call", + "state_callAt", + "state_getChildReadProof", + "state_getKeys", + "state_getKeysPaged", + "state_getKeysPagedAt", + "state_getMetadata", + "state_getPairs", + "state_getReadProof", + "state_getRuntimeVersion", + "state_getStorage", + "state_getStorageAt", + "state_getStorageHash", + "state_getStorageHashAt", + "state_getStorageSize", + "state_getStorageSizeAt", + "state_queryStorage", + "state_queryStorageAt", + "state_traceBlock", + "state_trieMigrationStatus", + "subscribe_newHead", + "system_chain", + "unsubscribe_newHead", + ) + + private val add = setOf( + "author_submitExtrinsic", + ) + + private val allowedMethods: Set = all + add + + override fun createQuorumFor(method: String): CallQuorum { + return when { + add.contains(method) -> BroadcastQuorum() + all.contains(method) -> AlwaysQuorum() + else -> AlwaysQuorum() + } + } + + override fun isCallable(method: String): Boolean { + return allowedMethods.contains(method) + } + + override fun isHardcoded(method: String): Boolean { + return false + } + + override fun executeHardcoded(method: String): ByteArray { + throw RpcException(-32601, "Method not found") + } + + override fun getGroupMethods(groupName: String): Set = + when (groupName) { + "default" -> getSupportedMethods() + else -> emptyList() + }.toSet() + + override fun getSupportedMethods(): Set { + return allowedMethods.toSortedSet() + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt index f00f0eb20..008329b0d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt @@ -22,17 +22,22 @@ import io.emeraldpay.dshackle.upstream.generic.CachingReaderBuilder import io.emeraldpay.dshackle.upstream.generic.ChainSpecific import io.emeraldpay.dshackle.upstream.generic.GenericUpstream import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest -import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse import org.springframework.cloud.sleuth.Tracer import reactor.core.publisher.Mono import reactor.core.scheduler.Scheduler object EthereumChainSpecific : ChainSpecific { - override fun parseBlock(data: JsonRpcResponse, upstreamId: String): BlockContainer { - return BlockContainer.fromEthereumJson(data.getResult(), upstreamId) + override fun parseBlock(data: ByteArray, upstreamId: String): BlockContainer { + return BlockContainer.fromEthereumJson(data, upstreamId) + } + + override fun parseHeader(data: ByteArray, upstreamId: String): BlockContainer { + return parseBlock(data, upstreamId) } override fun latestBlockRequest() = JsonRpcRequest("eth_getBlockByNumber", listOf("latest", false)) + override fun listenNewHeadsRequest(): JsonRpcRequest = JsonRpcRequest("eth_subscribe", listOf("newHeads")) + override fun localReaderBuilder( cachingReader: CachingReader, methods: CallMethods, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt similarity index 60% rename from src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHead.kt rename to src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt index 0b11eed40..fded48743 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt @@ -16,41 +16,32 @@ */ package io.emeraldpay.dshackle.upstream.ethereum -import io.emeraldpay.dshackle.Global -import io.emeraldpay.dshackle.SilentException -import io.emeraldpay.dshackle.ThrottledLogger import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.reader.JsonRpcReader -import io.emeraldpay.dshackle.reader.Reader import io.emeraldpay.dshackle.upstream.BlockValidator import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Lifecycle import io.emeraldpay.dshackle.upstream.UpstreamAvailability -import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice import io.emeraldpay.dshackle.upstream.generic.ChainSpecific import io.emeraldpay.dshackle.upstream.generic.GenericHead -import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest -import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse -import io.emeraldpay.etherjar.domain.BlockHash -import io.emeraldpay.etherjar.rpc.json.TransactionRefJson import reactor.core.Disposable import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.Sinks import reactor.core.scheduler.Scheduler import java.time.Duration +import java.util.concurrent.atomic.AtomicInteger -class EthereumWsHead( +class GenericWsHead( forkChoice: ForkChoice, blockValidator: BlockValidator, private val api: JsonRpcReader, private val wsSubscriptions: WsSubscriptions, - private val skipEnhance: Boolean, private val wsConnectionResubscribeScheduler: Scheduler, - private val headScheduler: Scheduler, + headScheduler: Scheduler, private val upstream: DefaultUpstream, - chainSpecific: ChainSpecific, + private val chainSpecific: ChainSpecific, ) : GenericHead(upstream.getId(), forkChoice, blockValidator, headScheduler, chainSpecific), Lifecycle { private var connectionId: String? = null @@ -98,50 +89,7 @@ class EthereumWsHead( Flux.concat(it.next().doOnNext { upstream.setStatus(UpstreamAvailability.OK) }, it) } .map { - val block = Global.objectMapper.readValue(it, BlockJson::class.java) as BlockJson - if (!block.checkExtraData() && skipEnhance) { - ThrottledLogger.log(log, "$upstreamId recieved block with empty extradata through ws subscription") - } - return@map block - } - .flatMap { block -> - // newHeads returns incomplete blocks, i.e. without some fields and without transaction hashes, - // so we need to fetch the full block data - if (!skipEnhance && ( - block.difficulty == null || - block.transactions == null || - block.transactions.isEmpty() || - block.totalDifficulty == null - ) - ) { - EthereumBlockEnricher.enrich( - block.hash, - object : - Reader { - override fun read(key: BlockHash): Mono { - return api.read(JsonRpcRequest("eth_getBlockByHash", listOf(block.hash.toHex(), false))) - .flatMap { resp -> - if (resp.isNull()) { - Mono.error(SilentException("Received null for block ${block.hash}")) - } else { - Mono.just(resp) - } - } - .flatMap(JsonRpcResponse::requireResult) - .map { - val parsedBlock = BlockContainer.fromEthereumJson(it, upstreamId) - if (parsedBlock.parsed is BlockJson<*> && !parsedBlock.parsed.checkExtraData() && !skipEnhance) { - ThrottledLogger.log(log, "$upstreamId recieved block with empty extradata from block enrichment") - } - return@map parsedBlock - } - } - }, - headScheduler, - ) - } else { - Mono.just(BlockContainer.from(block)) - } + chainSpecific.parseHeader(it, "unknown") } .timeout(Duration.ofSeconds(60), Mono.error(RuntimeException("No response from subscribe to newHeads"))) .onErrorResume { @@ -158,9 +106,11 @@ class EthereumWsHead( noHeadUpdatesSink.tryEmitComplete() } + private val ids = AtomicInteger(1) + private fun subscribe(): Flux { return try { - wsSubscriptions.subscribe("newHeads") + wsSubscriptions.subscribe(chainSpecific.listenNewHeadsRequest().copy(id = ids.getAndIncrement())) .also { connectionId = it.connectionId if (!connected) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptions.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptions.kt index 229855f17..ad8cfe235 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptions.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptions.kt @@ -15,6 +15,7 @@ */ package io.emeraldpay.dshackle.upstream.ethereum +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import reactor.core.publisher.Flux /** @@ -37,7 +38,7 @@ interface WsSubscriptions { /** * Subscribe on remote */ - fun subscribe(method: String): SubscribeData + fun subscribe(request: JsonRpcRequest): SubscribeData fun connectionInfoFlux(): Flux diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt index c9dabe069..8712282e0 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt @@ -20,7 +20,6 @@ import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicReference class WsSubscriptionsImpl( @@ -31,9 +30,7 @@ class WsSubscriptionsImpl( private val log = LoggerFactory.getLogger(WsSubscriptionsImpl::class.java) } - private val ids = AtomicLong(1) - - override fun subscribe(method: String): WsSubscriptions.SubscribeData { + override fun subscribe(request: JsonRpcRequest): WsSubscriptions.SubscribeData { val subscriptionId = AtomicReference("") val conn = wsPool.getConnection() val messages = conn.getSubscribeResponses() @@ -41,10 +38,10 @@ class WsSubscriptionsImpl( .filter { it.result != null } // should never happen .map { it.result!! } - val messageFlux = conn.callRpc(JsonRpcRequest("eth_subscribe", listOf(method), ids.incrementAndGet())) + val messageFlux = conn.callRpc(request) .flatMapMany { if (it.hasError()) { - log.warn("Failed to establish ETH Subscription: ${it.error?.message}") + log.warn("Failed to establish subscription: ${it.error?.message}") Mono.error(JsonRpcException(it.id, it.error!!)) } else { subscriptionId.set(it.getResultAsProcessedString()) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/WebsocketPendingTxes.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/WebsocketPendingTxes.kt index e23fc96d2..a529ddcbc 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/WebsocketPendingTxes.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/WebsocketPendingTxes.kt @@ -18,6 +18,7 @@ package io.emeraldpay.dshackle.upstream.ethereum.subscribe import io.emeraldpay.dshackle.upstream.SubscriptionConnect import io.emeraldpay.dshackle.upstream.ethereum.EthereumEgressSubscription import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import io.emeraldpay.etherjar.domain.TransactionId import org.slf4j.LoggerFactory import reactor.core.publisher.Flux @@ -33,7 +34,7 @@ class WebsocketPendingTxes( } override fun createConnection(): Flux { - return wsSubscriptions.subscribe(EthereumEgressSubscription.METHOD_PENDING_TXES) + return wsSubscriptions.subscribe(JsonRpcRequest("eth_subscribe", listOf(EthereumEgressSubscription.METHOD_PENDING_TXES))) .data .timeout(Duration.ofSeconds(60), Mono.empty()) .map { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt index fd69eb60f..dccab9666 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt @@ -1,7 +1,10 @@ package io.emeraldpay.dshackle.upstream.generic -import io.emeraldpay.dshackle.BlockchainType +import io.emeraldpay.dshackle.BlockchainType.BITCOIN +import io.emeraldpay.dshackle.BlockchainType.ETHEREUM +import io.emeraldpay.dshackle.BlockchainType.POLKADOT import io.emeraldpay.dshackle.BlockchainType.STARKNET +import io.emeraldpay.dshackle.BlockchainType.UNKNOWN import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.cache.Caches import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig @@ -17,8 +20,8 @@ import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.ethereum.EthereumChainSpecific +import io.emeraldpay.dshackle.upstream.polkadot.PolkadotChainSpecific import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest -import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse import io.emeraldpay.dshackle.upstream.starknet.StarknetChainSpecific import org.apache.commons.collections4.Factory import org.springframework.cloud.sleuth.Tracer @@ -30,10 +33,14 @@ typealias LocalReaderBuilder = (CachingReader, CallMethods, Head) -> Mono) -> CachingReader interface ChainSpecific { - fun parseBlock(data: JsonRpcResponse, upstreamId: String): BlockContainer + fun parseBlock(data: ByteArray, upstreamId: String): BlockContainer + + fun parseHeader(data: ByteArray, upstreamId: String): BlockContainer fun latestBlockRequest(): JsonRpcRequest + fun listenNewHeadsRequest(): JsonRpcRequest + fun localReaderBuilder(cachingReader: CachingReader, methods: CallMethods, head: Head): Mono fun subscriptionBuilder(headScheduler: Scheduler): (Multistream) -> EgressSubscription @@ -51,9 +58,12 @@ object ChainSpecificRegistry { @JvmStatic fun resolve(chain: Chain): ChainSpecific { - if (BlockchainType.from(chain) == STARKNET) { - return StarknetChainSpecific + return when (chain.type) { + ETHEREUM -> EthereumChainSpecific + STARKNET -> StarknetChainSpecific + POLKADOT -> PolkadotChainSpecific + BITCOIN -> throw IllegalArgumentException("bitcoin should use custom streams implementation") + UNKNOWN -> throw IllegalArgumentException("unknown chain") } - return EthereumChainSpecific } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericHead.kt index ca5326847..cb2ef1ea8 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericHead.kt @@ -37,7 +37,9 @@ open class GenericHead( return api.read(chainSpecific.latestBlockRequest()) .subscribeOn(headScheduler) .timeout(Defaults.timeout, Mono.error(Exception("Block data not received"))) - .map { chainSpecific.parseBlock(it, upstreamId) } + .map { + chainSpecific.parseBlock(it.getResult(), upstreamId) + } .onErrorResume { err -> log.error("Failed to fetch latest block: ${err.message} $upstreamId", err) Mono.empty() diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index 3be3e9d9e..bfa0fbb7c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -49,7 +49,7 @@ open class GenericUpstream( private var validationSettingsSubscription: Disposable? = null private val hasLiveSubscriptionHead: AtomicBoolean = AtomicBoolean(false) - protected val connector: GenericConnector = connectorFactory.create(this, chain, true) + protected val connector: GenericConnector = connectorFactory.create(this, chain) private var livenessSubscription: Disposable? = null private val labelsDetector = labelsDetectorBuilder(chain, this.getIngressReader()) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/ConnectorFactory.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/ConnectorFactory.kt index dd90e29e5..a20bfb22d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/ConnectorFactory.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/ConnectorFactory.kt @@ -7,7 +7,6 @@ interface ConnectorFactory { fun create( upstream: DefaultUpstream, chain: Chain, - skipEnhance: Boolean, ): GenericConnector fun isValid(): Boolean diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnectorFactory.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnectorFactory.kt index f52640ec5..3d7c447e5 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnectorFactory.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnectorFactory.kt @@ -48,7 +48,6 @@ open class GenericConnectorFactory( override fun create( upstream: DefaultUpstream, chain: Chain, - skipEnhance: Boolean, ): GenericConnector { val specific = ChainSpecificRegistry.resolve(chain) if (wsFactory != null && connectorType == WS_ONLY) { @@ -57,7 +56,6 @@ open class GenericConnectorFactory( upstream, forkChoice, blockValidator, - skipEnhance, wsConnectionResubscribeScheduler, headScheduler, expectedBlockTime, @@ -74,7 +72,6 @@ open class GenericConnectorFactory( upstream, forkChoice, blockValidator, - skipEnhance, wsConnectionResubscribeScheduler, headScheduler, expectedBlockTime, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt index 6b2732f92..165fb2370 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt @@ -10,7 +10,7 @@ import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.Lifecycle import io.emeraldpay.dshackle.upstream.MergedHead -import io.emeraldpay.dshackle.upstream.ethereum.EthereumWsHead +import io.emeraldpay.dshackle.upstream.ethereum.GenericWsHead import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessValidator import io.emeraldpay.dshackle.upstream.ethereum.NoEthereumIngressSubscription import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionPool @@ -37,7 +37,6 @@ class GenericRpcConnector( upstream: DefaultUpstream, forkChoice: ForkChoice, blockValidator: BlockValidator, - skipEnhance: Boolean, wsConnectionResubscribeScheduler: Scheduler, headScheduler: Scheduler, expectedBlockTime: Duration, @@ -71,12 +70,11 @@ class GenericRpcConnector( RPC_REQUESTS_WITH_MIXED_HEAD -> { val wsHead = - EthereumWsHead( + GenericWsHead( AlwaysForkChoice(), blockValidator, getIngressReader(), WsSubscriptionsImpl(pool!!), - skipEnhance, wsConnectionResubscribeScheduler, headScheduler, upstream, @@ -97,12 +95,11 @@ class GenericRpcConnector( } RPC_REQUESTS_WITH_WS_HEAD -> { - EthereumWsHead( + GenericWsHead( AlwaysForkChoice(), blockValidator, getIngressReader(), WsSubscriptionsImpl(pool!!), - skipEnhance, wsConnectionResubscribeScheduler, headScheduler, upstream, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt index 2caa57b97..ea885a943 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt @@ -6,7 +6,7 @@ import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.ethereum.EthereumIngressSubscription -import io.emeraldpay.dshackle.upstream.ethereum.EthereumWsHead +import io.emeraldpay.dshackle.upstream.ethereum.GenericWsHead import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessValidator import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionPool import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionPoolFactory @@ -24,7 +24,6 @@ class GenericWsConnector( upstream: DefaultUpstream, forkChoice: ForkChoice, blockValidator: BlockValidator, - skipEnhance: Boolean, wsConnectionResubscribeScheduler: Scheduler, headScheduler: Scheduler, expectedBlockTime: Duration, @@ -32,19 +31,18 @@ class GenericWsConnector( ) : GenericConnector { private val pool: WsConnectionPool private val reader: JsonRpcReader - private val head: EthereumWsHead + private val head: GenericWsHead private val subscriptions: EthereumIngressSubscription private val liveness: HeadLivenessValidator init { pool = wsFactory.create(upstream) reader = JsonRpcWsClient(pool) val wsSubscriptions = WsSubscriptionsImpl(pool) - head = EthereumWsHead( + head = GenericWsHead( forkChoice, blockValidator, reader, wsSubscriptions, - skipEnhance, wsConnectionResubscribeScheduler, headScheduler, upstream, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt index 1cf32a483..e1f4a5ae9 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt @@ -24,7 +24,6 @@ import io.emeraldpay.api.proto.Common import io.emeraldpay.api.proto.Common.ChainRef.UNRECOGNIZED import io.emeraldpay.api.proto.ReactorAuthGrpc import io.emeraldpay.api.proto.ReactorBlockchainGrpc -import io.emeraldpay.dshackle.BlockchainType import io.emeraldpay.dshackle.BlockchainType.BITCOIN import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Defaults @@ -248,7 +247,7 @@ class GrpcUpstreams( private fun getOrCreate(chain: Chain): UpstreamChangeEvent { val metrics = makeMetrics(chain) - val creator = if (BlockchainType.from(chain) != BITCOIN) { + val creator = if (chain.type != BITCOIN) { { ch: Chain, rpcClient: JsonRpcGrpcClient -> GenericGrpcUpstream( id, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt new file mode 100644 index 000000000..714d19d34 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt @@ -0,0 +1,122 @@ +package io.emeraldpay.dshackle.upstream.polkadot + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties +import com.fasterxml.jackson.annotation.JsonProperty +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.Global +import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig +import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.data.BlockId +import io.emeraldpay.dshackle.foundation.ChainOptions.Options +import io.emeraldpay.dshackle.reader.JsonRpcReader +import io.emeraldpay.dshackle.upstream.CachingReader +import io.emeraldpay.dshackle.upstream.EgressSubscription +import io.emeraldpay.dshackle.upstream.EmptyEgressSubscription +import io.emeraldpay.dshackle.upstream.Head +import io.emeraldpay.dshackle.upstream.LabelsDetector +import io.emeraldpay.dshackle.upstream.Multistream +import io.emeraldpay.dshackle.upstream.NoopCachingReader +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.UpstreamValidator +import io.emeraldpay.dshackle.upstream.calls.CallMethods +import io.emeraldpay.dshackle.upstream.generic.CachingReaderBuilder +import io.emeraldpay.dshackle.upstream.generic.ChainSpecific +import io.emeraldpay.dshackle.upstream.generic.GenericUpstream +import io.emeraldpay.dshackle.upstream.generic.LocalReader +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest +import org.springframework.cloud.sleuth.Tracer +import reactor.core.publisher.Mono +import reactor.core.scheduler.Scheduler +import java.math.BigInteger +import java.time.Instant + +object PolkadotChainSpecific : ChainSpecific { + override fun parseBlock(data: ByteArray, upstreamId: String): BlockContainer { + val response = Global.objectMapper.readValue(data, PolkadotBlockResponse::class.java) + + return makeBlock(response.block.header, data, upstreamId) + } + + override fun parseHeader(data: ByteArray, upstreamId: String): BlockContainer { + val header = Global.objectMapper.readValue(data, PolkadotHeader::class.java) + + return makeBlock(header, data, upstreamId) + } + + private fun makeBlock(header: PolkadotHeader, data: ByteArray, upstreamId: String): BlockContainer { + return BlockContainer( + height = header.number.substring(2).toLong(16), + hash = BlockId.from(header.parentHash), // todo + difficulty = BigInteger.ZERO, + timestamp = Instant.EPOCH, + full = false, + json = data, + parsed = header, + transactions = emptyList(), + upstreamId = upstreamId, + parentHash = BlockId.from(header.parentHash), + ) + } + + override fun latestBlockRequest(): JsonRpcRequest = + JsonRpcRequest("chain_getBlock", listOf()) + + override fun listenNewHeadsRequest(): JsonRpcRequest = + JsonRpcRequest("chain_subscribeNewHeads", listOf()) + + override fun localReaderBuilder( + cachingReader: CachingReader, + methods: CallMethods, + head: Head, + ): Mono { + return Mono.just(LocalReader(methods)) + } + + override fun subscriptionBuilder(headScheduler: Scheduler): (Multistream) -> EgressSubscription { + return { _ -> EmptyEgressSubscription } + } + + override fun makeCachingReaderBuilder(tracer: Tracer): CachingReaderBuilder { + return { _, _, _ -> NoopCachingReader } + } + + override fun validator( + chain: Chain, + upstream: Upstream, + options: Options, + config: ChainConfig, + ): UpstreamValidator? { + return null + } + + override fun labelDetector(chain: Chain, reader: JsonRpcReader): LabelsDetector? { + return null + } + + override fun subscriptionTopics(upstream: GenericUpstream): List { + return emptyList() + } +} + +@JsonIgnoreProperties(ignoreUnknown = true) +data class PolkadotBlockResponse( + @JsonProperty("block") var block: PolkadotBlock, +) + +@JsonIgnoreProperties(ignoreUnknown = true) +data class PolkadotBlock( + @JsonProperty("header") var header: PolkadotHeader, +) + +@JsonIgnoreProperties(ignoreUnknown = true) +data class PolkadotHeader( + @JsonProperty("parentHash") var parentHash: String, + @JsonProperty("number") var number: String, + @JsonProperty("stateRoot") var stateRoot: String, + @JsonProperty("extrinsicsRoot") var extrinsicsRoot: String, + @JsonProperty("digest") var digest: PolkadotDigest, +) + +data class PolkadotDigest( + @JsonProperty("logs") var logs: List, +) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt index 96ddafe08..fb1178bb1 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt @@ -24,7 +24,6 @@ import io.emeraldpay.dshackle.upstream.generic.ChainSpecific import io.emeraldpay.dshackle.upstream.generic.GenericUpstream import io.emeraldpay.dshackle.upstream.generic.LocalReader import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest -import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse import org.springframework.cloud.sleuth.Tracer import reactor.core.publisher.Mono import reactor.core.scheduler.Scheduler @@ -32,9 +31,8 @@ import java.math.BigInteger import java.time.Instant object StarknetChainSpecific : ChainSpecific { - override fun parseBlock(data: JsonRpcResponse, upstreamId: String): BlockContainer { - val raw = data.getResult() - val block = Global.objectMapper.readValue(raw, StarknetBlock::class.java) + override fun parseBlock(data: ByteArray, upstreamId: String): BlockContainer { + val block = Global.objectMapper.readValue(data, StarknetBlock::class.java) return BlockContainer( height = block.number, @@ -42,7 +40,7 @@ object StarknetChainSpecific : ChainSpecific { difficulty = BigInteger.ZERO, timestamp = block.timestamp, full = false, - json = raw, + json = data, parsed = block, transactions = emptyList(), upstreamId = upstreamId, @@ -50,9 +48,17 @@ object StarknetChainSpecific : ChainSpecific { ) } + override fun parseHeader(data: ByteArray, upstreamId: String): BlockContainer { + throw NotImplementedError() + } + override fun latestBlockRequest(): JsonRpcRequest = JsonRpcRequest("starknet_getBlockWithTxHashes", listOf("latest")) + override fun listenNewHeadsRequest(): JsonRpcRequest { + throw NotImplementedError() + } + override fun localReaderBuilder( cachingReader: CachingReader, methods: CallMethods, diff --git a/src/test/groovy/io/emeraldpay/dshackle/monitoring/accesslog/EventsBuilderSubscribeBalanceSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/monitoring/accesslog/EventsBuilderSubscribeBalanceSpec.groovy deleted file mode 100644 index 79fbf85b7..000000000 --- a/src/test/groovy/io/emeraldpay/dshackle/monitoring/accesslog/EventsBuilderSubscribeBalanceSpec.groovy +++ /dev/null @@ -1,83 +0,0 @@ -package io.emeraldpay.dshackle.monitoring.accesslog - -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.api.proto.Common -import io.emeraldpay.dshackle.Chain -import spock.lang.Specification - -class EventsBuilderSubscribeBalanceSpec extends Specification { - - def "Basic ethereum event"() { - setup: - def request = BlockchainOuterClass.BalanceRequest.newBuilder() - .setAddress( - Common.AnyAddress.newBuilder() - .setAddressSingle( - Common.SingleAddress.newBuilder() - .setAddress("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D") - ) - ) - .setAsset( - Common.Asset.newBuilder() - .setChainValue(100) - .setCode("ETHER") - ) - .build() - def resp = BlockchainOuterClass.AddressBalance.newBuilder() - .setAddress(Common.SingleAddress.newBuilder() - .setAddress("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D")) - .setAsset(Common.Asset.newBuilder() - .setChainValue(100) - .setCode("ETHER")) - .setBalance("1234560000000000000") - .build() - when: - def act = new EventsBuilder.SubscribeBalance(true).tap { - it.onRequest(request) - }.onReply(resp) - then: - act.index == 0 - act.blockchain == Chain.ETHEREUM__MAINNET - act.balanceRequest.asset == "ETHER" - act.balanceRequest.addressType == "ADDRESS_SINGLE" - act.addressBalance.asset == "ETHER" - act.addressBalance.address == "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D" - } - - def "Basic bitcoin event"() { - setup: - def request = BlockchainOuterClass.BalanceRequest.newBuilder() - .setAddress( - Common.AnyAddress.newBuilder() - .setAddressSingle( - Common.SingleAddress.newBuilder() - .setAddress("1NDyJtNTjmwk5xPNhjgAMu4HDHigtobu1s") - ) - ) - .setAsset( - Common.Asset.newBuilder() - .setChainValue(1) - .setCode("BTC") - ) - .build() - def resp = BlockchainOuterClass.AddressBalance.newBuilder() - .setAddress(Common.SingleAddress.newBuilder() - .setAddress("1NDyJtNTjmwk5xPNhjgAMu4HDHigtobu1s")) - .setAsset(Common.Asset.newBuilder() - .setChainValue(1) - .setCode("BTC")) - .setBalance("12345600000000") - .build() - when: - def act = new EventsBuilder.SubscribeBalance(true).tap { - it.onRequest(request) - }.onReply(resp) - then: - act.index == 0 - act.blockchain == Chain.BITCOIN__MAINNET - act.balanceRequest.asset == "BTC" - act.balanceRequest.addressType == "ADDRESS_SINGLE" - act.addressBalance.asset == "BTC" - act.addressBalance.address == "1NDyJtNTjmwk5xPNhjgAMu4HDHigtobu1s" - } -} diff --git a/src/test/groovy/io/emeraldpay/dshackle/test/ConnectorFactoryMock.groovy b/src/test/groovy/io/emeraldpay/dshackle/test/ConnectorFactoryMock.groovy index 0af719000..598f1eb54 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/test/ConnectorFactoryMock.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/test/ConnectorFactoryMock.groovy @@ -23,7 +23,7 @@ class ConnectorFactoryMock implements ConnectorFactory { return true } - GenericConnector create(DefaultUpstream upstream, Chain chain, boolean skipEnhance) { + GenericConnector create(DefaultUpstream upstream, Chain chain) { return new GenericConnectorMock(api, head) } } \ No newline at end of file diff --git a/src/test/groovy/io/emeraldpay/dshackle/test/MultistreamHolderMock.groovy b/src/test/groovy/io/emeraldpay/dshackle/test/MultistreamHolderMock.groovy index 883e1f209..430aeaf16 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/test/MultistreamHolderMock.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/test/MultistreamHolderMock.groovy @@ -42,7 +42,7 @@ class MultistreamHolderMock implements MultistreamHolder { Multistream addUpstream(@NotNull Chain chain, @NotNull Upstream up) { if (!upstreams.containsKey(chain)) { - if (BlockchainType.from(chain) == BlockchainType.ETHEREUM) { + if (chain.type == BlockchainType.ETHEREUM) { if (up instanceof GenericMultistream) { upstreams[chain] = up } else if (up instanceof GenericUpstream) { @@ -57,7 +57,7 @@ class MultistreamHolderMock implements MultistreamHolder { throw new IllegalArgumentException("Unsupported upstream type ${up.class}") } upstreams[chain].start() - } else if (BlockchainType.from(chain) == BlockchainType.BITCOIN) { + } else if (chain.type == BlockchainType.BITCOIN) { if (up instanceof BitcoinMultistream) { upstreams[chain] = up } else if (up instanceof BitcoinRpcUpstream) { diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHeadSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadSpec.groovy similarity index 82% rename from src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHeadSpec.groovy rename to src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadSpec.groovy index 1cd1bfc53..23268d8a3 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHeadSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadSpec.groovy @@ -27,6 +27,7 @@ import io.emeraldpay.dshackle.upstream.forkchoice.AlwaysForkChoice import io.emeraldpay.etherjar.domain.BlockHash import io.emeraldpay.etherjar.domain.TransactionId import io.emeraldpay.etherjar.rpc.json.TransactionRefJson +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.Sinks @@ -38,7 +39,7 @@ import java.time.Duration import java.time.Instant import java.time.temporal.ChronoUnit -class EthereumWsHeadSpec extends Specification { +class GenericWsHeadSpec extends Specification { BlockHash parent = BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200") DefaultUpstream upstream = new GenericUpstreamMock(Chain.ETHEREUM__MAINNET, TestingCommons.api()) @@ -50,38 +51,29 @@ class EthereumWsHeadSpec extends Specification { block.hash = BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200") block.parentHash = parent block.timestamp = Instant.now().truncatedTo(ChronoUnit.SECONDS) - block.transactions = [ - new TransactionRefJson(TransactionId.from("0x29229361dc5aa1ec66c323dc7a299e2b61a8c8dd2a3522d41255ec10eca25dd8")), - new TransactionRefJson(TransactionId.from("0xebe8f22a55a9e26892a8545b93cbb2bfa4fd81c3184e50e5cf6276025bb42b93")) - ] block.uncles = [] block.totalDifficulty = BigInteger.ONE - def headBlock = block.copy().tap { - it.transactions = null - }.with { + def headBlock = block.copy().with { Global.objectMapper.writeValueAsBytes(it) } def apiMock = TestingCommons.api() - apiMock.answerOnce("eth_getBlockByHash", ["0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200", false], block) def ws = Mock(WsSubscriptions) { 1 * it.connectionInfoFlux() >> Flux.empty() } - def head = new EthereumWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, false, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE) + def head = new GenericWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE) + def res = BlockContainer.from(block) when: def act = head.listenNewHeads().blockFirst() then: - act == BlockContainer.from(block) - act.transactions.size() == 2 - act.transactions[0].toHexWithPrefix() == "0x29229361dc5aa1ec66c323dc7a299e2b61a8c8dd2a3522d41255ec10eca25dd8" - act.transactions[1].toHexWithPrefix() == "0xebe8f22a55a9e26892a8545b93cbb2bfa4fd81c3184e50e5cf6276025bb42b93" + act == res - 1 * ws.subscribe("newHeads") >> new WsSubscriptions.SubscribeData( + 1 * ws.subscribe(_) >> new WsSubscriptions.SubscribeData( Flux.fromIterable([headBlock]), "id" ) } @@ -99,19 +91,17 @@ class EthereumWsHeadSpec extends Specification { } def apiMock = TestingCommons.api() - apiMock.answerOnce("eth_getBlockByHash", ["0x29229361dc5aa1ec66c323dc7a299e2b61a8c8dd2a3522d41255ec10eca25dd8", false], null) - apiMock.answerOnce("eth_blockNumber", [], Mono.empty()) def connectionInfoSink = Sinks.many().multicast().directBestEffort() def ws = Mock(WsSubscriptions) { 1 * it.connectionInfoFlux() >> connectionInfoSink.asFlux() - 2 * subscribe("newHeads") >>> [ + 2 * subscribe(_) >>> [ new WsSubscriptions.SubscribeData(Flux.error(new RuntimeException()), "id"), new WsSubscriptions.SubscribeData(Flux.fromIterable([secondHeadBlock]), "id") ] } - def head = new EthereumWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE) + def head = new GenericWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE) when: def act = head.getFlux() @@ -159,13 +149,13 @@ class EthereumWsHeadSpec extends Specification { def ws = Mock(WsSubscriptions) { 1 * it.connectionInfoFlux() >> connectionInfoSink.asFlux() - 2 * subscribe("newHeads") >>> [ + 2 * subscribe(_) >>> [ new WsSubscriptions.SubscribeData(Flux.fromIterable([firstHeadBlock]), "id"), new WsSubscriptions.SubscribeData(Flux.fromIterable([secondHeadBlock]), "id") ] } - def head = new EthereumWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE) + def head = new GenericWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE) when: def act = head.getFlux() @@ -200,12 +190,12 @@ class EthereumWsHeadSpec extends Specification { def ws = Mock(WsSubscriptions) { 1 * it.connectionInfoFlux() >> connectionInfoSink.asFlux() - 1 * subscribe("newHeads") >>> [ + 1 * subscribe(_) >>> [ new WsSubscriptions.SubscribeData(Flux.fromIterable([firstHeadBlock]), "id"), ] } - def head = new EthereumWsHead( new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE) + def head = new GenericWsHead( new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE) when: def act = head.getFlux() @@ -239,12 +229,12 @@ class EthereumWsHeadSpec extends Specification { def ws = Mock(WsSubscriptions) { 1 * it.connectionInfoFlux() >> connectionInfoSink.asFlux() - 1 * subscribe("newHeads") >>> [ + 1 * subscribe(_) >>> [ new WsSubscriptions.SubscribeData(Flux.fromIterable([firstHeadBlock]), "id"), ] } - def head = new EthereumWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE) + def head = new GenericWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE) when: def act = head.getFlux() @@ -291,13 +281,13 @@ class EthereumWsHeadSpec extends Specification { def ws = Mock(WsSubscriptions) { 1 * it.connectionInfoFlux() >> connectionInfoSink.asFlux() - 2 * subscribe("newHeads") >>> [ + 2 * subscribe(_) >>> [ new WsSubscriptions.SubscribeData(Flux.fromIterable([firstHeadBlock]), "id"), new WsSubscriptions.SubscribeData(Flux.fromIterable([secondHeadBlock]), "id"), ] } - def head = new EthereumWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE) + def head = new GenericWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE) when: def act = head.getFlux() diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImplSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImplSpec.groovy index ee129c72c..714f17263 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImplSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImplSpec.groovy @@ -45,7 +45,7 @@ class WsSubscriptionsImplSpec extends Specification { def ws = new WsSubscriptionsImpl(pool) when: - def act = ws.subscribe("foo_bar") + def act = ws.subscribe(new JsonRpcRequest("eth_subscribe", ["foo_bar"])) .data .map { new String(it) } .take(3) @@ -83,7 +83,7 @@ class WsSubscriptionsImplSpec extends Specification { def ws = new WsSubscriptionsImpl(pool) when: - def act = ws.subscribe("foo_bar") + def act = ws.subscribe(new JsonRpcRequest("eth_subscribe", ["foo_bar"])) .data .map { new String(it) } .take(3) diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/subscribe/WebsocketPendingTxesSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/subscribe/WebsocketPendingTxesSpec.groovy index 34ebc89e3..4db0450fd 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/subscribe/WebsocketPendingTxesSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/subscribe/WebsocketPendingTxesSpec.groovy @@ -15,6 +15,7 @@ */ package io.emeraldpay.dshackle.upstream.ethereum.subscribe +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import io.emeraldpay.dshackle.upstream.Selector import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions import reactor.core.publisher.Flux @@ -40,7 +41,7 @@ class WebsocketPendingTxesSpec extends Specification { .collectList().block(Duration.ofSeconds(1)) then: - 1 * ws.subscribe("newPendingTransactions") >> new WsSubscriptions.SubscribeData( + 1 * ws.subscribe(new JsonRpcRequest("eth_subscribe", ["newPendingTransactions"])) >> new WsSubscriptions.SubscribeData( Flux.fromIterable(responses), "id" ) txes.collect {it.toHex() } == [ diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecificTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecificTest.kt new file mode 100644 index 000000000..7327b1a85 --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecificTest.kt @@ -0,0 +1,44 @@ +package io.emeraldpay.dshackle.upstream.polkadot + +import io.emeraldpay.dshackle.data.BlockId +import org.assertj.core.api.Assertions +import org.junit.jupiter.api.Test + +val example = """ +{ + "block": { + "header": { + "parentHash": "0xb52a9b51fb698a891cf378b990b0b6a5743e52fa5175b44a8a6d4e0b2cfd0a53", + "number": "0x1121bbc", + "stateRoot": "0x66835be7bb9d6e698a6785f3c03215d468fa1feddce7b3c8c97e8313df7207ca", + "extrinsicsRoot": "0x0103947815e820645daee747c608219f5745b0de688955a46816a7f016fc661f", + "digest": { + "logs": [ + "0x0642414245b50103f9000000722de01000000000d871116ec87ad9b8b45d73ccb9d1b7ba5fd11507dade591dcfa2559948b8314cbc20a9e0cfeedfafbb01dd9778bf3a57e426c83a668e79917952110fef280b01d452047028f36edb8a2e4983cc1207b06e638db5736a49203dcb678b882f1e0f", + "0x05424142450101a056f81200f7ce9a09ffd3342648c54f8ecdf8045436ad27388947a41f681d33c2da0990c313b04d106b76d515e38f36540fb860927caf6f9f7c7e3d7233a88a" + ] + } + }, + "extrinsics": [ + "0x280403000be01f29868b01", + "", + "0x4102840088e9f05bf9ce1ae5cf584ba3d43982bccf957db3e97d781220e5ddbdb5de6a1501e2775e3339265fab8c47f900e1a3d627e77d5ab36b8f3ff7de4ae6c7c9a37769201dc94a19d7fa74afea15e802ea7285b0919d95b45c00c295d50f71adb6788c45011400050300066a8fcd5add9fdc97485581b00862bddc2f5c7fef7d0a64bb9052d8534ecd4507001ea6a752", + "0x450284001c273165b790274c4232543b81ac6510525abe9ee8cb47b36544f371fb0cae49019c5677183aa6e23da0f9310ca90f7f6f0798a55d329cbccf36be7c64025f1407eafa798c5274203f66b604ced0c0bb4dc68e0e71db4d3f1d0599f29d4563b28945030c000503009cda5b3931d308d24e236f6e3ab5c50a99fd4922bd80f3a3b9bad5b0c8d93a350b009c62337201", + "0xb9018400c0d998fb9f36dc65132dd06a845548d564e1db500fec59a8926a8fc75a8ba4460138f470ba7cdee1cd52271bf24456deb6ac041c33a081ec719704a578c1dd2d5feb99322b9de9276f683b70e7f592b4ff84c635e50fdd57f29e8ef6317f8621887503ac0007010300286bee" + ] + }, + "justifications": null + } +""".trimIndent() + +class PolkadotChainSpecificTest { + @Test + fun parseResponse() { + val result = PolkadotChainSpecific.parseBlock(example.toByteArray(), "1") + + Assertions.assertThat(result.height).isEqualTo(17963964) + Assertions.assertThat(result.hash).isEqualTo(BlockId.from("0xb52a9b51fb698a891cf378b990b0b6a5743e52fa5175b44a8a6d4e0b2cfd0a53")) + Assertions.assertThat(result.upstreamId).isEqualTo("1") + Assertions.assertThat(result.parentHash).isEqualTo(BlockId.from("0xb52a9b51fb698a891cf378b990b0b6a5743e52fa5175b44a8a6d4e0b2cfd0a53")) + } +} diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecificTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecificTest.kt index f48122f56..5b5c66bc4 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecificTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecificTest.kt @@ -1,7 +1,6 @@ package io.emeraldpay.dshackle.upstream.starknet import io.emeraldpay.dshackle.data.BlockId -import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse import org.assertj.core.api.Assertions import org.junit.jupiter.api.Test @@ -23,7 +22,7 @@ val example = """ class StarknetChainSpecificTest { @Test fun parseResponse() { - val result = StarknetChainSpecific.parseBlock(JsonRpcResponse.ok(example), "1") + val result = StarknetChainSpecific.parseBlock(example.toByteArray(), "1") Assertions.assertThat(result.height).isEqualTo(304789) Assertions.assertThat(result.hash).isEqualTo(BlockId.from("046fa6638dc7fae06cece980ce4195436a79ef314ca49d99e0cef552d6f13c4e"))