From 43ab38cf453b2f406cfc86d45faaab1fc830be5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB?= Date: Fri, 26 Jul 2024 14:04:28 +0400 Subject: [PATCH] Rework tx lower bound logic --- .../ethereum/EthereumLowerBoundTxDetector.kt | 27 +++++++++++++++---- .../RecursiveLowerBoundServiceTest.kt | 20 ++++++++------ 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt index 5e855c419..ffa2329fc 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt @@ -1,5 +1,6 @@ package io.emeraldpay.dshackle.upstream.ethereum +import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData @@ -29,16 +30,32 @@ class EthereumLowerBoundTxDetector( return recursiveLowerBound.recursiveDetectLowerBoundWithOffset(MAX_OFFSET) { block -> upstream.getIngressReader() .read( - ChainRequest( - "eth_getBlockTransactionCountByNumber", - ListParams(block.toHex()), - ), + ChainRequest("eth_getBlockByNumber", ListParams(block.toHex(), false)), ) .doOnNext { - if (it.hasResult() && (it.getResult().contentEquals("null".toByteArray()) || it.getResultAsProcessedString().substring(2).toLong(16) == 0L)) { + if (it.hasResult() && it.getResult().contentEquals("null".toByteArray())) { throw IllegalStateException(NO_TX_DATA) } } + .handle { it, sink -> + val blockJson = BlockContainer.fromEthereumJson(it.getResult(), upstream.getId()) + if (blockJson.transactions.isEmpty()) { + sink.error(IllegalStateException(NO_TX_DATA)) + return@handle + } + sink.next(blockJson.transactions[0].toHexWithPrefix()) + } + .flatMap { tx -> + upstream.getIngressReader() + .read( + ChainRequest("eth_getTransactionByHash", ListParams(tx)), + ) + .doOnNext { + if (it.hasResult() && it.getResult().contentEquals("null".toByteArray())) { + throw IllegalStateException(NO_TX_DATA) + } + } + } } } diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundServiceTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundServiceTest.kt index ff39c5db5..437e5e7f7 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundServiceTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundServiceTest.kt @@ -1,6 +1,7 @@ package io.emeraldpay.dshackle.upstream import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.ethereum.EthereumLowerBoundService import io.emeraldpay.dshackle.upstream.ethereum.EthereumLowerBoundTxDetector.Companion.MAX_OFFSET @@ -21,6 +22,8 @@ import org.mockito.kotlin.doReturn import org.mockito.kotlin.mock import reactor.core.publisher.Mono import reactor.test.StepVerifier +import java.io.File +import java.nio.file.Files import java.time.Duration class RecursiveLowerBoundServiceTest { @@ -37,6 +40,9 @@ class RecursiveLowerBoundServiceTest { val head = mock { on { getCurrentHeight() } doReturn 18000000 } + val blockBytes = Files.readAllBytes( + File(this::class.java.getResource("/responses/get-by-number-response.json")!!.toURI()).toPath(), + ) val reader = mock { blocks.forEach { if (it == 17964844L) { @@ -45,10 +51,10 @@ class RecursiveLowerBoundServiceTest { } doReturn Mono.just(ChainResponse(ByteArray(0), null)) on { read(ChainRequest("eth_getBlockByNumber", ListParams(it.toHex(), false))) - } doReturn Mono.just(ChainResponse(ByteArray(0), null)) + } doReturn Mono.just(ChainResponse(blockBytes, null)) on { - read(ChainRequest("eth_getBlockTransactionCountByNumber", ListParams(it.toHex()))) - } doReturn Mono.just(ChainResponse("\"0x12\"".toByteArray(), null)) + read(ChainRequest("eth_getTransactionByHash", ListParams("0x99e52a94cfdf83a5bdadcd2e25c71574a5a24fa4df56a33f9f8b5cb6fa0ac657"))) + } doReturn Mono.just(ChainResponse(ByteArray(0), null)) on { read(ChainRequest("eth_getLogs", ListParams(mapOf("fromBlock" to it.toHex(), "toBlock" to it.toHex())))) } doReturn Mono.just(ChainResponse("[\"0x12\"]".toByteArray(), null)) @@ -56,13 +62,10 @@ class RecursiveLowerBoundServiceTest { on { read(ChainRequest("eth_getBalance", ListParams(ZERO_ADDRESS, it.toHex()))) } doReturn Mono.error(RuntimeException("missing trie node")) - on { - read(ChainRequest("eth_getBlockByNumber", ListParams(it.toHex(), false))) - } doReturn Mono.error(RuntimeException("No block data")) for (block in it downTo it - MAX_OFFSET - 1) { on { - read(ChainRequest("eth_getBlockTransactionCountByNumber", ListParams(block.toHex()))) - } doReturn Mono.error(RuntimeException("No tx data")) + read(ChainRequest("eth_getBlockByNumber", ListParams(block.toHex(), false))) + } doReturn Mono.just(ChainResponse(Global.nullValue, null)) on { read(ChainRequest("eth_getLogs", ListParams(mapOf("fromBlock" to block.toHex(), "toBlock" to block.toHex())))) } doReturn Mono.error(RuntimeException("No logs data")) @@ -71,6 +74,7 @@ class RecursiveLowerBoundServiceTest { } } val upstream = mock { + on { getId() } doReturn "id" on { getHead() } doReturn head on { getIngressReader() } doReturn reader }