Skip to content

Commit

Permalink
Tx lower bound (#498)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Jun 4, 2024
1 parent f9e2e7f commit 160dd34
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class EthereumLowerBoundService(
return listOf(
EthereumLowerBoundStateDetector(upstream),
EthereumLowerBoundBlockDetector(upstream),
EthereumLowerBoundTxDetector(upstream),
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundDetector
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import io.emeraldpay.dshackle.upstream.lowerbound.detector.RecursiveLowerBound
import io.emeraldpay.dshackle.upstream.lowerbound.toHex
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import reactor.core.publisher.Flux

const val MAX_OFFSET = 20

class EthereumLowerBoundTxDetector(
private val upstream: Upstream,
) : LowerBoundDetector() {
private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.TX, setOf("No tx data"), lowerBounds)

override fun period(): Long {
return 3
}

override fun internalDetectLowerBound(): Flux<LowerBoundData> {
return recursiveLowerBound.recursiveDetectLowerBoundWithOffset(MAX_OFFSET) { block ->
upstream.getIngressReader()
.read(
ChainRequest(
"eth_getBlockTransactionCountByNumber",
ListParams(block.toHex()),
),
)
.doOnNext {
if (it.hasResult() && (it.getResult().contentEquals("null".toByteArray()) || it.getResultAsProcessedString().substring(2).toLong(16) == 0L)) {
throw IllegalStateException("No tx data")
}
}
}
}

override fun types(): Set<LowerBoundType> {
return setOf(LowerBoundType.TX)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import reactor.kotlin.core.publisher.toFlux
import reactor.util.retry.Retry
import reactor.util.retry.RetryBackoffSpec
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger

class RecursiveLowerBound(
private val upstream: Upstream,
Expand All @@ -21,18 +22,7 @@ class RecursiveLowerBound(
private val log = LoggerFactory.getLogger(this::class.java)

fun recursiveDetectLowerBound(hasData: (Long) -> Mono<ChainResponse>): Flux<LowerBoundData> {
return Mono.just(upstream.getHead())
.flatMap {
val currentHeight = it.getCurrentHeight()
if (currentHeight == null) {
Mono.empty()
} else if (!lowerBounds.contains(type)) {
Mono.just(LowerBoundBinarySearch(0, currentHeight))
} else {
// next calculations will be carried out only within the last range
Mono.just(LowerBoundBinarySearch(lowerBounds[type]!!.lowerBound, currentHeight))
}
}
return initialRange()
.expand { data ->
if (data.found) {
Mono.empty()
Expand All @@ -41,22 +31,106 @@ class RecursiveLowerBound(

if (data.left > data.right) {
val current = if (data.current == 0L) 1 else data.current
Mono.just(LowerBoundBinarySearch(current, true))
Mono.just(LowerBoundBinarySearchData(current, true))
} else {
hasData(middle)
.retryWhen(retrySpec(nonRetryableErrors))
.retryWhen(retrySpec(middle, nonRetryableErrors))
.flatMap(ChainResponse::requireResult)
.map { LowerBoundBinarySearchData(data.left, middle - 1, middle) }
.onErrorReturn(
LowerBoundBinarySearchData(
middle + 1,
data.right,
data.current,
),
)
}
}
}
.filter { it.found }
.next()
.map {
LowerBoundData(it.current, type)
}.toFlux()
}

fun recursiveDetectLowerBoundWithOffset(maxLimit: Int, hasData: (Long) -> Mono<ChainResponse>): Flux<LowerBoundData> {
val visitedBlocks = HashSet<Long>()
return Mono.justOrEmpty(lowerBounds[type]?.lowerBound)
.flatMapMany {
// at first, we try to check the current bound to prevent huge calculations
hasData(it!!)
.retryWhen(retrySpec(it, nonRetryableErrors))
.flatMap(ChainResponse::requireResult)
.map { LowerBoundData(lowerBounds[type]!!.lowerBound, type) }
.onErrorResume { Mono.empty() }
}.switchIfEmpty(
initialRange()
.expand { data ->
if (data.found) {
Mono.empty()
} else {
val middle = middleBlock(data)

if (data.left > data.right) {
val current = if (data.current == 0L) 1 else data.current
Mono.just(LowerBoundBinarySearchData(current, true))
} else {
hasData(middle)
.retryWhen(retrySpec(middle, nonRetryableErrors))
.flatMap(ChainResponse::requireResult)
.map { LowerBoundBinarySearchData(data.left, middle - 1, middle) }
.onErrorResume {
if (middle < 0) {
Mono.just(LowerBoundBinarySearchData(middle + 1, data.right, data.current))
} else {
shiftLeftAndSearch(data, middle, visitedBlocks, maxLimit, hasData)
}
}
}
}
}
.filter { it.found }
.next()
.map {
LowerBoundData(it.current, type)
}.toFlux(),
)
}

private fun shiftLeftAndSearch(
currentData: LowerBoundBinarySearchData,
currentMiddle: Long,
visitedBlocks: HashSet<Long>,
maxLimit: Int,
hasData: (Long) -> Mono<ChainResponse>,
): Mono<LowerBoundBinarySearchData> {
val count = AtomicInteger(0)
return Mono.just(LowerBoundBinarySearchData(currentMiddle - 1, false))
.expand { currentBlock ->
if (currentBlock.found) {
// to avoid extra handling
Mono.empty()
} else {
if (visitedBlocks.contains(currentBlock.current) || currentBlock.current < 0) {
// if this block has been already seen there is no need to check it again
Mono.just(LowerBoundBinarySearchData(currentMiddle + 1, currentData.right, currentData.current, true))
} else {
hasData(currentBlock.current)
.retryWhen(retrySpec(currentBlock.current, nonRetryableErrors))
.flatMap(ChainResponse::requireResult)
.map { true }
.onErrorReturn(false)
.map {
if (it) {
LowerBoundBinarySearch(data.left, middle - 1, middle)
// we found data at once and return it
LowerBoundBinarySearchData(currentData.left, currentBlock.current - 1, currentBlock.current, true)
}
.onErrorResume {
// otherwise we go the left until we reach the specified limit
count.incrementAndGet()
if (count.get() in 1..maxLimit) {
visitedBlocks.add(currentBlock.current)
Mono.just(LowerBoundBinarySearchData(currentBlock.current - 1, false))
} else {
LowerBoundBinarySearch(
middle + 1,
data.right,
data.current,
)
Mono.just(LowerBoundBinarySearchData(currentMiddle + 1, currentData.right, currentData.current, true))
}
}
}
Expand All @@ -65,11 +139,27 @@ class RecursiveLowerBound(
.filter { it.found }
.next()
.map {
LowerBoundData(it.current, type)
}.toFlux()
// in terms of the whole calculation we haven't found the bound
LowerBoundBinarySearchData(it.left, it.right, it.current)
}
}

private fun initialRange(): Mono<LowerBoundBinarySearchData> {
return Mono.just(upstream.getHead())
.flatMap {
val currentHeight = it.getCurrentHeight()
if (currentHeight == null) {
Mono.empty()
} else if (!lowerBounds.contains(type)) {
Mono.just(LowerBoundBinarySearchData(0, currentHeight))
} else {
// next calculations will be carried out only within the last range
Mono.just(LowerBoundBinarySearchData(lowerBounds[type]!!.lowerBound, currentHeight))
}
}
}

private fun retrySpec(nonRetryableErrors: Set<String>): RetryBackoffSpec {
private fun retrySpec(block: Long, nonRetryableErrors: Set<String>): RetryBackoffSpec {
return Retry.backoff(
Long.MAX_VALUE,
Duration.ofSeconds(1),
Expand All @@ -80,18 +170,20 @@ class RecursiveLowerBound(
}
.doAfterRetry {
log.debug(
"Error in calculation of lower block of upstream {}, retry attempt - {}, message - {}",
"Error in calculation of lower block {} of upstream {}, type - {}, retry attempt - {}, message - {}",
block,
upstream.getId(),
type,
it.totalRetries(),
it.failure().message,
)
}
}

private fun middleBlock(lowerBoundBinarySearch: LowerBoundBinarySearch): Long =
lowerBoundBinarySearch.left + (lowerBoundBinarySearch.right - lowerBoundBinarySearch.left) / 2
private fun middleBlock(lowerBoundBinarySearchData: LowerBoundBinarySearchData): Long =
lowerBoundBinarySearchData.left + (lowerBoundBinarySearchData.right - lowerBoundBinarySearchData.left) / 2

private data class LowerBoundBinarySearch(
private data class LowerBoundBinarySearchData(
val left: Long,
val right: Long,
val current: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.upstream
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.upstream.ethereum.EthereumLowerBoundService
import io.emeraldpay.dshackle.upstream.ethereum.MAX_OFFSET
import io.emeraldpay.dshackle.upstream.ethereum.ZERO_ADDRESS
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService
Expand Down Expand Up @@ -45,13 +46,21 @@ class RecursiveLowerBoundServiceTest {
on {
read(ChainRequest("eth_getBlockByNumber", ListParams(it.toHex(), false)))
} doReturn Mono.just(ChainResponse(ByteArray(0), null))
on {
read(ChainRequest("eth_getBlockTransactionCountByNumber", ListParams(it.toHex())))
} doReturn Mono.just(ChainResponse("\"0x12\"".toByteArray(), null))
} else {
on {
read(ChainRequest("eth_getBalance", ListParams(ZERO_ADDRESS, it.toHex())))
} doReturn Mono.error(RuntimeException("missing trie node"))
on {
read(ChainRequest("eth_getBlockByNumber", ListParams(it.toHex(), false)))
} doReturn Mono.error(RuntimeException("No block data"))
for (block in it downTo it - MAX_OFFSET - 1) {
on {
read(ChainRequest("eth_getBlockTransactionCountByNumber", ListParams(block.toHex())))
} doReturn Mono.error(RuntimeException("No tx data"))
}
}
}
}
Expand All @@ -67,6 +76,7 @@ class RecursiveLowerBoundServiceTest {
.expectNoEvent(Duration.ofSeconds(15))
.expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.STATE }
.expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.BLOCK }
.expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.TX }
.thenCancel()
.verify(Duration.ofSeconds(3))

Expand All @@ -76,6 +86,7 @@ class RecursiveLowerBoundServiceTest {
listOf(
LowerBoundData(17964844L, LowerBoundType.STATE),
LowerBoundData(17964844L, LowerBoundType.BLOCK),
LowerBoundData(17964844L, LowerBoundType.TX),
),
)
}
Expand Down

0 comments on commit 160dd34

Please sign in to comment.