From 91910368a111e3b4284f3257e0bfac074d8b6214 Mon Sep 17 00:00:00 2001 From: a10zn8 Date: Thu, 30 May 2024 12:50:51 +0300 Subject: [PATCH] add cosmos rpc support --- .../kotlin/chainsconfig.codegen.gradle.kts | 1 + .../io/emeraldpay/dshackle/BlockchainType.kt | 3 +- foundation/src/main/resources/chains.yaml | 25 +++- .../dshackle/upstream/CallTargetsHolder.kt | 3 + .../beaconchain/BeaconChainSpecific.kt | 4 +- .../upstream/calls/DefaultCosmosMethods.kt | 71 +++++++++ .../upstream/cosmos/CosmosChainSpecific.kt | 137 ++++++++++++++++++ .../cosmos/CosmosLowerBoundService.kt | 43 ++++++ .../cosmos/CosmosUpstreamSettingsDetector.kt | 37 +++++ .../ethereum/EthereumChainSpecific.kt | 4 +- .../upstream/ethereum/GenericWsHead.kt | 4 +- .../upstream/ethereum/WsSubscriptionsImpl.kt | 7 +- .../upstream/generic/ChainSpecific.kt | 5 +- .../upstream/near/NearChainSpecific.kt | 4 +- .../polkadot/PolkadotChainSpecific.kt | 4 +- .../upstream/solana/SolanaChainSpecific.kt | 4 +- .../starknet/StarknetChainSpecific.kt | 4 +- .../solana/SolanaChainSpecificTest.kt | 6 +- 18 files changed, 350 insertions(+), 16 deletions(-) create mode 100644 src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DefaultCosmosMethods.kt create mode 100644 src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosChainSpecific.kt create mode 100644 src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosLowerBoundService.kt create mode 100644 src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosUpstreamSettingsDetector.kt diff --git a/buildSrc/src/main/kotlin/chainsconfig.codegen.gradle.kts b/buildSrc/src/main/kotlin/chainsconfig.codegen.gradle.kts index 6a9072acc..cf8d318b9 100644 --- a/buildSrc/src/main/kotlin/chainsconfig.codegen.gradle.kts +++ b/buildSrc/src/main/kotlin/chainsconfig.codegen.gradle.kts @@ -126,6 +126,7 @@ open class CodeGen(private val config: ChainsConfig) { "solana" -> "BlockchainType.SOLANA" "near" -> "BlockchainType.NEAR" "eth-beacon-chain" -> "BlockchainType.ETHEREUM_BEACON_CHAIN" + "cosmos" -> "BlockchainType.COSMOS" else -> throw IllegalArgumentException("unknown blockchain type $type") } } diff --git a/foundation/src/main/kotlin/io/emeraldpay/dshackle/BlockchainType.kt b/foundation/src/main/kotlin/io/emeraldpay/dshackle/BlockchainType.kt index acfe2f9d2..55e94eac0 100644 --- a/foundation/src/main/kotlin/io/emeraldpay/dshackle/BlockchainType.kt +++ b/foundation/src/main/kotlin/io/emeraldpay/dshackle/BlockchainType.kt @@ -10,7 +10,8 @@ enum class BlockchainType( POLKADOT(ApiType.JSON_RPC), SOLANA(ApiType.JSON_RPC), NEAR(ApiType.JSON_RPC), - ETHEREUM_BEACON_CHAIN(ApiType.REST); + ETHEREUM_BEACON_CHAIN(ApiType.REST), + COSMOS(ApiType.JSON_RPC); } enum class ApiType { diff --git a/foundation/src/main/resources/chains.yaml b/foundation/src/main/resources/chains.yaml index 82a33765d..36f335499 100644 --- a/foundation/src/main/resources/chains.yaml +++ b/foundation/src/main/resources/chains.yaml @@ -38,6 +38,7 @@ chain-settings: lags: syncing: 6 lagging: 1 + fork-choice: quorum chains: - id: Mainnet chain-id: 0x1 @@ -182,7 +183,7 @@ chain-settings: lags: syncing: 20 lagging: 10 - fork-choice: quorum + fork-choice: height chains: - id: Mainnet priority: 100 @@ -1403,3 +1404,25 @@ chain-settings: grpcId: 1055 short-names: [real] chain-id: 0x1b254 + - id: cosmos-hub + label: Cosmos Hub + type: cosmos + settings: + expected-block-time: 6s + lags: + syncing: 6 + lagging: 1 + chains: + - id: Mainnet + chain-id: 0x0 + short-names: [ cosmos-hub ] + code: COSMOS_HUB + grpcId: 1057 + priority: 100 + - id: Testnet + chain-id: 0x0 + short-names: [ cosmos-hub-testnet ] + code: COSMOS_HUB_TESTNET + grpcId: 10079 + priority: 10 + diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/CallTargetsHolder.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/CallTargetsHolder.kt index b67135233..b17a96373 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/CallTargetsHolder.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/CallTargetsHolder.kt @@ -1,6 +1,7 @@ package io.emeraldpay.dshackle.upstream import io.emeraldpay.dshackle.BlockchainType.BITCOIN +import io.emeraldpay.dshackle.BlockchainType.COSMOS import io.emeraldpay.dshackle.BlockchainType.ETHEREUM import io.emeraldpay.dshackle.BlockchainType.ETHEREUM_BEACON_CHAIN import io.emeraldpay.dshackle.BlockchainType.NEAR @@ -12,6 +13,7 @@ import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.calls.DefaultBeaconChainMethods import io.emeraldpay.dshackle.upstream.calls.DefaultBitcoinMethods +import io.emeraldpay.dshackle.upstream.calls.DefaultCosmosMethods import io.emeraldpay.dshackle.upstream.calls.DefaultEthereumMethods import io.emeraldpay.dshackle.upstream.calls.DefaultPolkadotMethods import io.emeraldpay.dshackle.upstream.calls.DefaultStarknetMethods @@ -34,6 +36,7 @@ class CallTargetsHolder { SOLANA -> DefaultSolanaMethods() NEAR -> DefaultNearMethods() ETHEREUM_BEACON_CHAIN -> DefaultBeaconChainMethods() + COSMOS -> DefaultCosmosMethods() UNKNOWN -> throw IllegalArgumentException("unknown chain") } callTargets[chain] = created diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainSpecific.kt index 338a1e5cc..d0634e581 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainSpecific.kt @@ -11,6 +11,7 @@ import io.emeraldpay.dshackle.config.ChainsConfig import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.data.BlockId import io.emeraldpay.dshackle.foundation.ChainOptions +import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector @@ -18,11 +19,12 @@ import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.generic.AbstractPollChainSpecific import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService import io.emeraldpay.dshackle.upstream.rpcclient.RestParams +import reactor.core.publisher.Mono import java.math.BigInteger import java.time.Instant object BeaconChainSpecific : AbstractPollChainSpecific() { - override fun parseHeader(data: ByteArray, upstreamId: String): BlockContainer { + override fun getFromHeader(data: ByteArray, upstreamId: String, api: ChainReader): Mono { throw NotImplementedError() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DefaultCosmosMethods.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DefaultCosmosMethods.kt new file mode 100644 index 000000000..3f77a279c --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DefaultCosmosMethods.kt @@ -0,0 +1,71 @@ +package io.emeraldpay.dshackle.upstream.calls + +import io.emeraldpay.dshackle.quorum.AlwaysQuorum +import io.emeraldpay.dshackle.quorum.BroadcastQuorum +import io.emeraldpay.dshackle.quorum.CallQuorum +import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcException + +class DefaultCosmosMethods : CallMethods { + private val all = setOf( + "health", + "status", + "net_info", + "blockchain", + "block", + "block_by_hash", + "block_results", + "commit", + "validators", + "genesis", + "genesis_chunked", + // "dump_consensus_state", // not safe + // "consensus_state", // not safe + "consensus_params", + "unconfirmed_txs", + "num_unconfirmed_txs", + "tx_search", + "block_search", + "tx", + "check_tx", + "abci_info", + "abci_query", + ) + + private val add = setOf( + "broadcast_evidence", + "broadcast_tx_sync", + "broadcast_tx_async", + "broadcast_tx_commit", + ) + + private val allowedMethods: Set = all + add + + override fun createQuorumFor(method: String): CallQuorum { + if (add.contains(method)) { + return BroadcastQuorum() + } + return AlwaysQuorum() + } + + override fun isCallable(method: String): Boolean { + return allowedMethods.contains(method) + } + + override fun isHardcoded(method: String): Boolean { + return false + } + + override fun executeHardcoded(method: String): ByteArray { + throw RpcException(-32601, "Method not found") + } + + override fun getGroupMethods(groupName: String): Set = + when (groupName) { + "default" -> getSupportedMethods() + else -> emptyList() + }.toSet() + + override fun getSupportedMethods(): Set { + return allowedMethods.toSortedSet() + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosChainSpecific.kt new file mode 100644 index 000000000..8dfd719e2 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosChainSpecific.kt @@ -0,0 +1,137 @@ +package io.emeraldpay.dshackle.upstream.cosmos + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties +import com.fasterxml.jackson.annotation.JsonProperty +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.Global +import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig +import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.data.BlockId +import io.emeraldpay.dshackle.foundation.ChainOptions.Options +import io.emeraldpay.dshackle.reader.ChainReader +import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.SingleCallValidator +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.UpstreamAvailability.OK +import io.emeraldpay.dshackle.upstream.UpstreamValidator +import io.emeraldpay.dshackle.upstream.generic.AbstractPollChainSpecific +import io.emeraldpay.dshackle.upstream.generic.GenericUpstreamValidator +import io.emeraldpay.dshackle.upstream.rpcclient.ListParams +import io.emeraldpay.dshackle.upstream.rpcclient.ObjectParams +import reactor.core.publisher.Mono +import java.math.BigInteger +import java.time.Instant + +object CosmosChainSpecific : AbstractPollChainSpecific() { + override fun latestBlockRequest(): ChainRequest = ChainRequest("block", ObjectParams()) + + override fun parseBlock(data: ByteArray, upstreamId: String): BlockContainer { + val result = Global.objectMapper.readValue(data, CosmosBlockResult::class.java) + + return BlockContainer( + height = result.block.header.height.toLong(), + hash = BlockId.from(result.blockId.hash), + difficulty = BigInteger.ZERO, + timestamp = result.block.header.time, + full = false, + json = data, + parsed = result, + transactions = emptyList(), + upstreamId = upstreamId, + parentHash = BlockId.from(result.block.header.lastBlockId.hash), + ) + } + + override fun getFromHeader(data: ByteArray, upstreamId: String, api: ChainReader): Mono { + val event = Global.objectMapper.readValue(data, CosmosBlockEvent::class.java) + + return api.read(ChainRequest("block", ObjectParams("height" to event.data.value.header.height))).flatMap { + val blockData = it.getResult() + val result = Global.objectMapper.readValue(blockData, CosmosBlockResult::class.java) + Mono.just( + BlockContainer( + height = result.block.header.height.toLong(), + hash = BlockId.from(result.blockId.hash), + difficulty = BigInteger.ZERO, + timestamp = result.block.header.time, + full = false, + json = blockData, + parsed = result, + transactions = emptyList(), + upstreamId = upstreamId, + parentHash = BlockId.from(result.block.header.lastBlockId.hash), + ), + ) + } + } + + override fun listenNewHeadsRequest() = throw NotImplementedError() + // ChainRequest("subscribe", ListParams("tm.event = 'NewBlockHeader'")) + + override fun unsubscribeNewHeadsRequest(subId: String) = throw NotImplementedError() + // ChainRequest("unsubscribe", ListParams("tm.event = 'NewBlockHeader'")) + + override fun validator(chain: Chain, upstream: Upstream, options: Options, config: ChainConfig): UpstreamValidator { + return GenericUpstreamValidator( + upstream, + options, + SingleCallValidator( + ChainRequest("health", ListParams()), + ) { _ -> OK }, + ) + } + + override fun lowerBoundService(chain: Chain, upstream: Upstream) = + CosmosLowerBoundService(chain, upstream) + + override fun upstreamSettingsDetector(chain: Chain, upstream: Upstream) = + CosmosUpstreamSettingsDetector(upstream) +} + +@JsonIgnoreProperties(ignoreUnknown = true) +data class CosmosBlockResult( + @JsonProperty("block_id") var blockId: CosmosBlockId, + @JsonProperty("block") var block: CosmosBlockData, +) + +@JsonIgnoreProperties(ignoreUnknown = true) +data class CosmosBlockId( + @JsonProperty("hash") var hash: String, +) + +@JsonIgnoreProperties(ignoreUnknown = true) +data class CosmosHeader( + @JsonProperty("last_block_id") var lastBlockId: CosmosBlockId, + @JsonProperty("height") var height: String, + @JsonProperty("time") var time: Instant, +) + +@JsonIgnoreProperties(ignoreUnknown = true) +data class CosmosStatus( + @JsonProperty("node_info") var nodeInfo: CosmosNodeInfo, + @JsonProperty("sync_info") var syncInfo: CosmosSyncInfo, +) + +@JsonIgnoreProperties(ignoreUnknown = true) +data class CosmosNodeInfo( + @JsonProperty("version") var version: String, +) + +@JsonIgnoreProperties(ignoreUnknown = true) +data class CosmosSyncInfo( + @JsonProperty("earliest_block_height") var earliestBlockHeight: String, +) + +@JsonIgnoreProperties(ignoreUnknown = true) +data class CosmosBlockEvent( + @JsonProperty("data") var data: CosmosBlockEventData, +) + +@JsonIgnoreProperties(ignoreUnknown = true) +data class CosmosBlockEventData( + @JsonProperty("value") var value: CosmosBlockData, +) + +data class CosmosBlockData( + @JsonProperty("header") var header: CosmosHeader, +) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosLowerBoundService.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosLowerBoundService.kt new file mode 100644 index 000000000..7f49e5d55 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosLowerBoundService.kt @@ -0,0 +1,43 @@ +package io.emeraldpay.dshackle.upstream.cosmos + +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.Global +import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundDetector +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType.STATE +import io.emeraldpay.dshackle.upstream.rpcclient.ListParams +import reactor.core.publisher.Flux +import reactor.kotlin.core.publisher.toFlux + +class CosmosLowerBoundService( + chain: Chain, + private val upstream: Upstream, +) : LowerBoundService(chain, upstream) { + override fun detectors(): List { + return listOf(CosmosLowerBoundStateDetector(upstream)) + } +} + +class CosmosLowerBoundStateDetector( + private val upstream: Upstream, +) : LowerBoundDetector() { + + override fun period(): Long { + return 3 + } + + override fun internalDetectLowerBound(): Flux { + return upstream.getIngressReader().read(ChainRequest("status", ListParams())).map { + val resp = Global.objectMapper.readValue(it.getResult(), CosmosStatus::class.java) + LowerBoundData(resp.syncInfo.earliestBlockHeight.toLong(), STATE) + }.toFlux() + } + + override fun types(): Set { + return setOf(STATE) + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosUpstreamSettingsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosUpstreamSettingsDetector.kt new file mode 100644 index 000000000..383a8998c --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosUpstreamSettingsDetector.kt @@ -0,0 +1,37 @@ +package io.emeraldpay.dshackle.upstream.cosmos + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.module.kotlin.readValue +import io.emeraldpay.dshackle.Global +import io.emeraldpay.dshackle.upstream.BasicUpstreamSettingsDetector +import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.NodeTypeRequest +import io.emeraldpay.dshackle.upstream.UNKNOWN_CLIENT_VERSION +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.rpcclient.ListParams +import reactor.core.publisher.Flux + +class CosmosUpstreamSettingsDetector( + upstream: Upstream, +) : BasicUpstreamSettingsDetector(upstream) { + override fun detectLabels(): Flux> { + return Flux.merge( + detectNodeType(), + ) + } + + override fun clientVersionRequest(): ChainRequest { + return ChainRequest("status", ListParams()) + } + + override fun parseClientVersion(data: ByteArray): String { + return Global.objectMapper.readValue(data).nodeInfo.version + } + + override fun nodeTypeRequest(): NodeTypeRequest = NodeTypeRequest(clientVersionRequest()) + + override fun clientType(node: JsonNode): String? = null + + override fun clientVersion(node: JsonNode): String? = + node.get("node_info")?.get("version")?.asText() ?: UNKNOWN_CLIENT_VERSION +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt index a2edee9f4..1724bb6d4 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt @@ -37,8 +37,8 @@ object EthereumChainSpecific : AbstractPollChainSpecific() { return BlockContainer.fromEthereumJson(data, upstreamId) } - override fun parseHeader(data: ByteArray, upstreamId: String): BlockContainer { - return parseBlock(data, upstreamId) + override fun getFromHeader(data: ByteArray, upstreamId: String, api: ChainReader): Mono { + return Mono.just(parseBlock(data, upstreamId)) } override fun latestBlockRequest() = diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt index 7d9c8f769..0e1051439 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt @@ -100,8 +100,8 @@ class GenericWsHead( private fun listenNewHeads(): Flux { return subscribe() - .map { - chainSpecific.parseHeader(it, "unknown") + .flatMap { + chainSpecific.getFromHeader(it, "unknown", api) } .timeout(wsHeadTimeout, Mono.error(RuntimeException("No response from subscribe to newHeads"))) .onErrorResume { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt index 697202819..fc562340a 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt @@ -46,7 +46,12 @@ class WsSubscriptionsImpl( log.warn("Failed to establish subscription: ${it.error?.message}") Mono.error(ChainException(it.id, it.error!!)) } else { - subscriptionId.set(it.getResultAsProcessedString()) + val id = if (it.getResultAsRawString() == "{}") { + request.id.toString() // in case empty result - match by request id + } else { + it.getResultAsProcessedString() + } + subscriptionId.set(id) messages } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt index 714283076..bf4e83c5d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt @@ -1,6 +1,7 @@ package io.emeraldpay.dshackle.upstream.generic import io.emeraldpay.dshackle.BlockchainType.BITCOIN +import io.emeraldpay.dshackle.BlockchainType.COSMOS import io.emeraldpay.dshackle.BlockchainType.ETHEREUM import io.emeraldpay.dshackle.BlockchainType.ETHEREUM_BEACON_CHAIN import io.emeraldpay.dshackle.BlockchainType.NEAR @@ -27,6 +28,7 @@ import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.beaconchain.BeaconChainSpecific import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.calls.CallSelector +import io.emeraldpay.dshackle.upstream.cosmos.CosmosChainSpecific import io.emeraldpay.dshackle.upstream.ethereum.EthereumChainSpecific import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService @@ -44,7 +46,7 @@ typealias LocalReaderBuilder = (CachingReader, CallMethods, Head, LogsOracle?) - typealias CachingReaderBuilder = (Multistream, Caches, Factory) -> CachingReader interface ChainSpecific { - fun parseHeader(data: ByteArray, upstreamId: String): BlockContainer + fun getFromHeader(data: ByteArray, upstreamId: String, api: ChainReader): Mono fun getLatestBlock(api: ChainReader, upstreamId: String): Mono @@ -80,6 +82,7 @@ object ChainSpecificRegistry { SOLANA -> SolanaChainSpecific NEAR -> NearChainSpecific ETHEREUM_BEACON_CHAIN -> BeaconChainSpecific + COSMOS -> CosmosChainSpecific BITCOIN -> throw IllegalArgumentException("bitcoin should use custom streams implementation") UNKNOWN -> throw IllegalArgumentException("unknown chain") } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearChainSpecific.kt index 8256af6d7..2ad004010 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearChainSpecific.kt @@ -8,6 +8,7 @@ import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.data.BlockId import io.emeraldpay.dshackle.foundation.ChainOptions.Options +import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.SingleCallValidator import io.emeraldpay.dshackle.upstream.Upstream @@ -19,6 +20,7 @@ import io.emeraldpay.dshackle.upstream.generic.GenericUpstreamValidator import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService import io.emeraldpay.dshackle.upstream.rpcclient.ListParams import io.emeraldpay.dshackle.upstream.rpcclient.ObjectParams +import reactor.core.publisher.Mono import java.math.BigInteger import java.time.Instant import java.util.concurrent.TimeUnit @@ -41,7 +43,7 @@ object NearChainSpecific : AbstractPollChainSpecific() { ) } - override fun parseHeader(data: ByteArray, upstreamId: String): BlockContainer { + override fun getFromHeader(data: ByteArray, upstreamId: String, api: ChainReader): Mono { throw NotImplementedError() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt index ffdc75bda..da5bf15de 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt @@ -45,10 +45,10 @@ object PolkadotChainSpecific : AbstractPollChainSpecific() { return makeBlock(response.block.header, data, upstreamId) } - override fun parseHeader(data: ByteArray, upstreamId: String): BlockContainer { + override fun getFromHeader(data: ByteArray, upstreamId: String, api: ChainReader): Mono { val header = Global.objectMapper.readValue(data, PolkadotHeader::class.java) - return makeBlock(header, data, upstreamId) + return Mono.just(makeBlock(header, data, upstreamId)) } private fun makeBlock(header: PolkadotHeader, data: ByteArray, upstreamId: String): BlockContainer { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt index f650a723e..1d5ddc3c1 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt @@ -77,9 +77,9 @@ object SolanaChainSpecific : AbstractChainSpecific() { } } - override fun parseHeader(data: ByteArray, upstreamId: String): BlockContainer { + override fun getFromHeader(data: ByteArray, upstreamId: String, api: ChainReader): Mono { val res = Global.objectMapper.readValue(data, SolanaWrapper::class.java) - return makeBlock(data, res.value.block, upstreamId, res.context.slot) + return Mono.just(makeBlock(data, res.value.block, upstreamId, res.context.slot)) } private fun makeBlock(raw: ByteArray, block: SolanaBlock, upstreamId: String, slot: Long): BlockContainer { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt index da582e4d3..c2f241e19 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt @@ -8,6 +8,7 @@ import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.data.BlockId import io.emeraldpay.dshackle.foundation.ChainOptions.Options +import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.SingleCallValidator import io.emeraldpay.dshackle.upstream.Upstream @@ -18,6 +19,7 @@ import io.emeraldpay.dshackle.upstream.generic.GenericUpstreamValidator import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService import io.emeraldpay.dshackle.upstream.rpcclient.ListParams import org.slf4j.LoggerFactory +import reactor.core.publisher.Mono import java.math.BigInteger import java.time.Instant @@ -42,7 +44,7 @@ object StarknetChainSpecific : AbstractPollChainSpecific() { ) } - override fun parseHeader(data: ByteArray, upstreamId: String): BlockContainer { + override fun getFromHeader(data: ByteArray, upstreamId: String, api: ChainReader): Mono { throw NotImplementedError() } diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecificTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecificTest.kt index 30387a81f..f2eafb137 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecificTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecificTest.kt @@ -1,8 +1,10 @@ package io.emeraldpay.dshackle.upstream.solana import io.emeraldpay.dshackle.data.BlockId +import io.emeraldpay.dshackle.reader.ChainReader import org.assertj.core.api.Assertions import org.junit.jupiter.api.Test +import org.mockito.kotlin.mock val example = """{ "context": { @@ -25,7 +27,9 @@ class SolanaChainSpecificTest { @Test fun parseBlock() { - val result = SolanaChainSpecific.parseHeader(example.toByteArray(), "1") + val reader = mock {} + + val result = SolanaChainSpecific.getFromHeader(example.toByteArray(), "1", reader).block()!! Assertions.assertThat(result.height).isEqualTo(101210751) Assertions.assertThat(result.hash).isEqualTo(BlockId.fromBase64("6ojMHjctdqfB55JDpEpqfHnP96fiaHEcvzEQ2NNcxzHP"))