Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework tx lower bound logic #533

Merged
merged 1 commit into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
Expand Down Expand Up @@ -29,16 +30,32 @@ class EthereumLowerBoundTxDetector(
return recursiveLowerBound.recursiveDetectLowerBoundWithOffset(MAX_OFFSET) { block ->
upstream.getIngressReader()
.read(
ChainRequest(
"eth_getBlockTransactionCountByNumber",
ListParams(block.toHex()),
),
ChainRequest("eth_getBlockByNumber", ListParams(block.toHex(), false)),
)
.doOnNext {
if (it.hasResult() && (it.getResult().contentEquals("null".toByteArray()) || it.getResultAsProcessedString().substring(2).toLong(16) == 0L)) {
if (it.hasResult() && it.getResult().contentEquals("null".toByteArray())) {
throw IllegalStateException(NO_TX_DATA)
}
}
.handle { it, sink ->
val blockJson = BlockContainer.fromEthereumJson(it.getResult(), upstream.getId())
if (blockJson.transactions.isEmpty()) {
sink.error(IllegalStateException(NO_TX_DATA))
return@handle
}
sink.next(blockJson.transactions[0].toHexWithPrefix())
}
.flatMap { tx ->
upstream.getIngressReader()
.read(
ChainRequest("eth_getTransactionByHash", ListParams(tx)),
)
.doOnNext {
if (it.hasResult() && it.getResult().contentEquals("null".toByteArray())) {
throw IllegalStateException(NO_TX_DATA)
}
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.emeraldpay.dshackle.upstream

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.upstream.ethereum.EthereumLowerBoundService
import io.emeraldpay.dshackle.upstream.ethereum.EthereumLowerBoundTxDetector.Companion.MAX_OFFSET
Expand All @@ -21,6 +22,8 @@ import org.mockito.kotlin.doReturn
import org.mockito.kotlin.mock
import reactor.core.publisher.Mono
import reactor.test.StepVerifier
import java.io.File
import java.nio.file.Files
import java.time.Duration

class RecursiveLowerBoundServiceTest {
Expand All @@ -37,6 +40,9 @@ class RecursiveLowerBoundServiceTest {
val head = mock<Head> {
on { getCurrentHeight() } doReturn 18000000
}
val blockBytes = Files.readAllBytes(
File(this::class.java.getResource("/responses/get-by-number-response.json")!!.toURI()).toPath(),
)
val reader = mock<ChainReader> {
blocks.forEach {
if (it == 17964844L) {
Expand All @@ -45,24 +51,21 @@ class RecursiveLowerBoundServiceTest {
} doReturn Mono.just(ChainResponse(ByteArray(0), null))
on {
read(ChainRequest("eth_getBlockByNumber", ListParams(it.toHex(), false)))
} doReturn Mono.just(ChainResponse(ByteArray(0), null))
} doReturn Mono.just(ChainResponse(blockBytes, null))
on {
read(ChainRequest("eth_getBlockTransactionCountByNumber", ListParams(it.toHex())))
} doReturn Mono.just(ChainResponse("\"0x12\"".toByteArray(), null))
read(ChainRequest("eth_getTransactionByHash", ListParams("0x99e52a94cfdf83a5bdadcd2e25c71574a5a24fa4df56a33f9f8b5cb6fa0ac657")))
} doReturn Mono.just(ChainResponse(ByteArray(0), null))
on {
read(ChainRequest("eth_getLogs", ListParams(mapOf("fromBlock" to it.toHex(), "toBlock" to it.toHex()))))
} doReturn Mono.just(ChainResponse("[\"0x12\"]".toByteArray(), null))
} else {
on {
read(ChainRequest("eth_getBalance", ListParams(ZERO_ADDRESS, it.toHex())))
} doReturn Mono.error(RuntimeException("missing trie node"))
on {
read(ChainRequest("eth_getBlockByNumber", ListParams(it.toHex(), false)))
} doReturn Mono.error(RuntimeException("No block data"))
for (block in it downTo it - MAX_OFFSET - 1) {
on {
read(ChainRequest("eth_getBlockTransactionCountByNumber", ListParams(block.toHex())))
} doReturn Mono.error(RuntimeException("No tx data"))
read(ChainRequest("eth_getBlockByNumber", ListParams(block.toHex(), false)))
} doReturn Mono.just(ChainResponse(Global.nullValue, null))
on {
read(ChainRequest("eth_getLogs", ListParams(mapOf("fromBlock" to block.toHex(), "toBlock" to block.toHex()))))
} doReturn Mono.error(RuntimeException("No logs data"))
Expand All @@ -71,6 +74,7 @@ class RecursiveLowerBoundServiceTest {
}
}
val upstream = mock<Upstream> {
on { getId() } doReturn "id"
on { getHead() } doReturn head
on { getIngressReader() } doReturn reader
}
Expand Down
Loading