Skip to content

Commit

Permalink
Handle trace and debug bounds (#570)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Sep 16, 2024
1 parent f916f0a commit f739920
Show file tree
Hide file tree
Showing 11 changed files with 166 additions and 35 deletions.
2 changes: 1 addition & 1 deletion emerald-grpc
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class ChainEventMapper {
LowerBoundType.BLOCK -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_BLOCK
LowerBoundType.TX -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_TX
LowerBoundType.LOGS -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_LOGS
LowerBoundType.TRACE -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_TRACE
}
}
}
1 change: 1 addition & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/StreamHead.kt
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class StreamHead(
LowerBoundType.BLOCK -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_BLOCK
LowerBoundType.TX -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_TX
LowerBoundType.LOGS -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_LOGS
LowerBoundType.TRACE -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_TRACE
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.emeraldpay.dshackle.upstream.error

import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import org.slf4j.LoggerFactory

abstract class EthereumLowerBoundErrorHandler : ErrorHandler {
protected val log = LoggerFactory.getLogger(this::class.java)

override fun handle(upstream: Upstream, request: ChainRequest, errorMessage: String?) {
try {
if (canHandle(request, errorMessage)) {
parseTagParam(request, tagIndex(request.method))?.let {
upstream.updateLowerBound(it, type())
}
}
} catch (e: RuntimeException) {
log.warn("Couldn't update the {} lower bound of {}, reason - {}", type(), upstream.getId(), e.message)
}
}

protected abstract fun tagIndex(method: String): Int

protected abstract fun type(): LowerBoundType

private fun parseTagParam(request: ChainRequest, tagIndex: Int): Long? {
if (tagIndex != -1 && request.params is ListParams) {
val params = request.params.list
if (params.size >= tagIndex) {
val tag = params[tagIndex]
if (tag is String && tag.startsWith("0x")) {
return tag.substring(2).toLong(16)
}
}
}
return null
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
package io.emeraldpay.dshackle.upstream.error

import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.ethereum.EthereumLowerBoundStateDetector.Companion.stateErrors
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import org.slf4j.LoggerFactory

object EthereumStateLowerBoundErrorHandler : ErrorHandler {
private val log = LoggerFactory.getLogger(this::class.java)

object EthereumStateLowerBoundErrorHandler : EthereumLowerBoundErrorHandler() {
private val firstTagIndexMethods = setOf(
"eth_call",
"debug_traceCall",
Expand All @@ -25,36 +20,11 @@ object EthereumStateLowerBoundErrorHandler : ErrorHandler {

private val applicableMethods = firstTagIndexMethods + secondTagIndexMethods

override fun handle(upstream: Upstream, request: ChainRequest, errorMessage: String?) {
try {
if (canHandle(request, errorMessage)) {
parseTagParam(request, tagIndex(request.method))?.let {
upstream.updateLowerBound(it, LowerBoundType.STATE)
}
}
} catch (e: RuntimeException) {
log.warn("Couldn't update the {} lower bound of {}, reason - {}", LowerBoundType.STATE, upstream.getId(), e.message)
}
}

override fun canHandle(request: ChainRequest, errorMessage: String?): Boolean {
return stateErrors.any { errorMessage?.contains(it) ?: false } && applicableMethods.contains(request.method)
}

private fun parseTagParam(request: ChainRequest, tagIndex: Int): Long? {
if (tagIndex != -1 && request.params is ListParams) {
val params = request.params.list
if (params.size >= tagIndex) {
val tag = params[tagIndex]
if (tag is String && tag.startsWith("0x")) {
return tag.substring(2).toLong(16)
}
}
}
return null
}

private fun tagIndex(method: String): Int {
override fun tagIndex(method: String): Int {
return if (firstTagIndexMethods.contains(method)) {
1
} else if (secondTagIndexMethods.contains(method)) {
Expand All @@ -63,4 +33,6 @@ object EthereumStateLowerBoundErrorHandler : ErrorHandler {
-1
}
}

override fun type(): LowerBoundType = LowerBoundType.STATE
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.emeraldpay.dshackle.upstream.error

import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.ethereum.EthereumLowerBoundStateDetector.Companion.stateErrors
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType

object EthereumTraceLowerBoundErrorHandler : EthereumLowerBoundErrorHandler() {
private val zeroTagIndexMethods = setOf(
"trace_block",
"arbtrace_block",
"debug_traceBlockByNumber",
)
private val firstTagIndexMethods = setOf(
"trace_callMany",
"arbtrace_callMany",
"debug_traceCall",
)
private val secondTagIndexMethods = setOf(
"trace_call",
"arbtrace_call",
)

private val applicableMethods = zeroTagIndexMethods + firstTagIndexMethods + secondTagIndexMethods

private val errors = stateErrors
.plus(
setOf(
"historical state not available",
),
)
private val errorRegexp = Regex("block .* not found")

override fun canHandle(request: ChainRequest, errorMessage: String?): Boolean {
return (errors.any { errorMessage?.contains(it) ?: false } || (errorMessage?.matches(errorRegexp) ?: false)) &&
applicableMethods.contains(request.method)
}

override fun tagIndex(method: String): Int {
return if (firstTagIndexMethods.contains(method)) {
1
} else if (secondTagIndexMethods.contains(method)) {
2
} else if (zeroTagIndexMethods.contains(method)) {
0
} else {
-1
}
}

override fun type(): LowerBoundType = LowerBoundType.TRACE
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ interface ErrorHandler {
object UpstreamErrorHandler {
private val errorHandlers = listOf(
EthereumStateLowerBoundErrorHandler,
EthereumTraceLowerBoundErrorHandler,
)
private val errorHandlerExecutor = Executors.newFixedThreadPool(
2 * SchedulersConfig.threadsMultiplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,20 @@ class EthereumLowerBoundStateDetector(
throw IllegalStateException("No state data")
}
}
}.flatMap {
Flux.just(it, lowerBoundFromState(it, LowerBoundType.TRACE))
}
}

private fun lowerBoundFromState(stateLowerBoundData: LowerBoundData, newType: LowerBoundType): LowerBoundData {
val currentBound = lowerBounds.getLastBound(newType)
if (currentBound == null || stateLowerBoundData.lowerBound >= currentBound.lowerBound) {
return stateLowerBoundData.copy(type = newType)
}
return LowerBoundData(currentBound.lowerBound, newType)
}

override fun types(): Set<LowerBoundType> {
return setOf(LowerBoundType.STATE)
return setOf(LowerBoundType.STATE, LowerBoundType.TRACE)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ data class LowerBoundData(
}

enum class LowerBoundType {
UNKNOWN, STATE, SLOT, BLOCK, TX, LOGS
UNKNOWN, STATE, SLOT, BLOCK, TX, LOGS, TRACE
}

fun BlockchainOuterClass.LowerBoundType.fromProtoType(): LowerBoundType {
Expand All @@ -32,5 +32,6 @@ fun BlockchainOuterClass.LowerBoundType.fromProtoType(): LowerBoundType {
BlockchainOuterClass.LowerBoundType.UNRECOGNIZED -> LowerBoundType.UNKNOWN
BlockchainOuterClass.LowerBoundType.LOWER_BOUND_TX -> LowerBoundType.TX
BlockchainOuterClass.LowerBoundType.LOWER_BOUND_LOGS -> LowerBoundType.LOGS
BlockchainOuterClass.LowerBoundType.LOWER_BOUND_TRACE -> LowerBoundType.TRACE
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.emeraldpay.dshackle.upstream.error

import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.Arguments.of
import org.junit.jupiter.params.provider.MethodSource
import org.mockito.Mockito.mock
import org.mockito.kotlin.verify

class EthereumTraceLowerBoundErrorHandlerTest {

@ParameterizedTest
@MethodSource("requests")
fun `update lower bound`(request: ChainRequest) {
val upstream = mock<Upstream>()
val handler = EthereumTraceLowerBoundErrorHandler

handler.handle(upstream, request, "missing trie node d5648cc9aef48154159d53800f2f")

verify(upstream).updateLowerBound(213229736, LowerBoundType.TRACE)
}

@Test
fun `update lower bound base on regexp`() {
val upstream = mock<Upstream>()
val handler = EthereumTraceLowerBoundErrorHandler

handler.handle(upstream, ChainRequest("trace_block", ListParams("0xCB5A0A8")), "block #1 not found")

verify(upstream).updateLowerBound(213229736, LowerBoundType.TRACE)
}

companion object {
@JvmStatic
fun requests(): List<Arguments> =
listOf(
of(ChainRequest("trace_block", ListParams("0xCB5A0A8"))),
of(ChainRequest("arbtrace_block", ListParams("0xCB5A0A8"))),
of(ChainRequest("debug_traceBlockByNumber", ListParams("0xCB5A0A8", mapOf("tracer" to "tracer")))),
of(ChainRequest("trace_callMany", ListParams(arrayOf(mapOf("val" to 1)), "0xCB5A0A8"))),
of(ChainRequest("arbtrace_callMany", ListParams(arrayOf(mapOf("val" to 1)), "0xCB5A0A8"))),
of(ChainRequest("debug_traceCall", ListParams(mapOf("val" to 1), "0xCB5A0A8", mapOf("val" to 1)))),
of(ChainRequest("trace_call", ListParams(mapOf("val" to 1), arrayOf(mapOf("val" to 1)), "0xCB5A0A8"))),
of(ChainRequest("arbtrace_call", ListParams(mapOf("val" to 1), arrayOf(mapOf("val" to 1)), "0xCB5A0A8"))),
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class RecursiveLowerBoundServiceTest {
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(15))
.expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.STATE }
.expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.TRACE }
.expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.BLOCK }
.expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.TX }
.expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.LOGS }
Expand All @@ -97,6 +98,7 @@ class RecursiveLowerBoundServiceTest {
.hasSameElementsAs(
listOf(
LowerBoundData(17964844L, LowerBoundType.STATE),
LowerBoundData(17964844L, LowerBoundType.TRACE),
LowerBoundData(17964844L, LowerBoundType.BLOCK),
LowerBoundData(17964844L, LowerBoundType.TX),
LowerBoundData(17964844L, LowerBoundType.LOGS),
Expand Down

0 comments on commit f739920

Please sign in to comment.