Skip to content

Commit

Permalink
Rework tx lower bound logic (#533)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Jul 29, 2024
1 parent ee8181e commit 0095426
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
Expand Down Expand Up @@ -29,16 +30,32 @@ class EthereumLowerBoundTxDetector(
return recursiveLowerBound.recursiveDetectLowerBoundWithOffset(MAX_OFFSET) { block ->
upstream.getIngressReader()
.read(
ChainRequest(
"eth_getBlockTransactionCountByNumber",
ListParams(block.toHex()),
),
ChainRequest("eth_getBlockByNumber", ListParams(block.toHex(), false)),
)
.doOnNext {
if (it.hasResult() && (it.getResult().contentEquals("null".toByteArray()) || it.getResultAsProcessedString().substring(2).toLong(16) == 0L)) {
if (it.hasResult() && it.getResult().contentEquals("null".toByteArray())) {
throw IllegalStateException(NO_TX_DATA)
}
}
.handle { it, sink ->
val blockJson = BlockContainer.fromEthereumJson(it.getResult(), upstream.getId())
if (blockJson.transactions.isEmpty()) {
sink.error(IllegalStateException(NO_TX_DATA))
return@handle
}
sink.next(blockJson.transactions[0].toHexWithPrefix())
}
.flatMap { tx ->
upstream.getIngressReader()
.read(
ChainRequest("eth_getTransactionByHash", ListParams(tx)),
)
.doOnNext {
if (it.hasResult() && it.getResult().contentEquals("null".toByteArray())) {
throw IllegalStateException(NO_TX_DATA)
}
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.emeraldpay.dshackle.upstream

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.upstream.ethereum.EthereumLowerBoundService
import io.emeraldpay.dshackle.upstream.ethereum.EthereumLowerBoundTxDetector.Companion.MAX_OFFSET
Expand All @@ -21,6 +22,8 @@ import org.mockito.kotlin.doReturn
import org.mockito.kotlin.mock
import reactor.core.publisher.Mono
import reactor.test.StepVerifier
import java.io.File
import java.nio.file.Files
import java.time.Duration

class RecursiveLowerBoundServiceTest {
Expand All @@ -37,6 +40,9 @@ class RecursiveLowerBoundServiceTest {
val head = mock<Head> {
on { getCurrentHeight() } doReturn 18000000
}
val blockBytes = Files.readAllBytes(
File(this::class.java.getResource("/responses/get-by-number-response.json")!!.toURI()).toPath(),
)
val reader = mock<ChainReader> {
blocks.forEach {
if (it == 17964844L) {
Expand All @@ -45,24 +51,21 @@ class RecursiveLowerBoundServiceTest {
} doReturn Mono.just(ChainResponse(ByteArray(0), null))
on {
read(ChainRequest("eth_getBlockByNumber", ListParams(it.toHex(), false)))
} doReturn Mono.just(ChainResponse(ByteArray(0), null))
} doReturn Mono.just(ChainResponse(blockBytes, null))
on {
read(ChainRequest("eth_getBlockTransactionCountByNumber", ListParams(it.toHex())))
} doReturn Mono.just(ChainResponse("\"0x12\"".toByteArray(), null))
read(ChainRequest("eth_getTransactionByHash", ListParams("0x99e52a94cfdf83a5bdadcd2e25c71574a5a24fa4df56a33f9f8b5cb6fa0ac657")))
} doReturn Mono.just(ChainResponse(ByteArray(0), null))
on {
read(ChainRequest("eth_getLogs", ListParams(mapOf("fromBlock" to it.toHex(), "toBlock" to it.toHex()))))
} doReturn Mono.just(ChainResponse("[\"0x12\"]".toByteArray(), null))
} else {
on {
read(ChainRequest("eth_getBalance", ListParams(ZERO_ADDRESS, it.toHex())))
} doReturn Mono.error(RuntimeException("missing trie node"))
on {
read(ChainRequest("eth_getBlockByNumber", ListParams(it.toHex(), false)))
} doReturn Mono.error(RuntimeException("No block data"))
for (block in it downTo it - MAX_OFFSET - 1) {
on {
read(ChainRequest("eth_getBlockTransactionCountByNumber", ListParams(block.toHex())))
} doReturn Mono.error(RuntimeException("No tx data"))
read(ChainRequest("eth_getBlockByNumber", ListParams(block.toHex(), false)))
} doReturn Mono.just(ChainResponse(Global.nullValue, null))
on {
read(ChainRequest("eth_getLogs", ListParams(mapOf("fromBlock" to block.toHex(), "toBlock" to block.toHex()))))
} doReturn Mono.error(RuntimeException("No logs data"))
Expand All @@ -71,6 +74,7 @@ class RecursiveLowerBoundServiceTest {
}
}
val upstream = mock<Upstream> {
on { getId() } doReturn "id"
on { getHead() } doReturn head
on { getIngressReader() } doReturn reader
}
Expand Down

0 comments on commit 0095426

Please sign in to comment.